Zen言語のasync標準ライブラリ紹介〜event.Batch①〜

はじめに

けっこう標準ライブラリが充実しているわけですが、ドキュメントがないのがもったいないですね。 まとまった時間が取れないので、ちょこちょこ書いていくシリーズです。

リクエストあれば、優先する、かも?

async関連のライブラリはstd.event下にあります。

今回は複数のasync関数を実行するevent.Batchです。

async自体の説明は、またそのうちに。

std.event.Batch

まず、2つのasync関数を実行するBatchです。

const std = @import("std");
const event = std.event;
const testing = std.testing;

test "std.event.Batch" {
    var batch = event.Batch(usize, 2, .auto_async).init();
    // `batch` に async なジョブ `one()` と `two()` とを追加する
    batch.add(&async one());
    batch.add(&async two());
    // `wait` で `one()` と `two()` が完了するまで待つ
    batch.wait();

    testing.equal(@to(usize, 1), batch.jobs[0].result);
    testing.equal(@to(usize, 2), batch.jobs[1].result);
}

fn one() usize {
    return 1;
}

fn two() usize {
    return 2;
}

Batch

Batch は型を返す関数です。

/// 複数の async 関数 (ジョブ) をヒープアロケーションなしに並列実行します。
pub fn Batch(
    /// ジョブの戻り値型です
    comptime Result: type,

    /// 並列実行可能な最大ジョブ数です
    comptime max_jobs: comptime_int,

    /// `add` と `wait` が `async` 関数かどうかをコントロールします
    comptime async_behavior: enum {
        /// Observe the value of `std.io.is_async` to decide whether `add`
        /// and `wait` will be async functions. Asserts that the jobs do not suspend when
        /// `std.io.mode == .blocking`. This is a generally safe assumption, and the
        /// usual recommended option for this parameter.
        auto_async,

        /// Always uses the `noasync` keyword when using `await` on the jobs,
        /// making `add` and `wait` non-async functions. Asserts that the jobs do not suspend.
        never_async,

        /// `add` and `wait` use regular `await` keyword, making them async functions.
        always_async,
    },
) type

初期化用の関数が1つ、メソッドが2つ、あります。

  • init

Batchインスタンスを返します。

pub fn init() Self;
  • add

ジョブをBatchインスタンスに追加します。 もし、最大ジョブ数分、既にジョブが投入されている場合、1つのジョブが完了するまで待ちます。

引数のframe (型anyframe->Result) は、async関数のフレームポインタです。 今回は詳しく説明しませんが、要するに普通の関数ポインタではなく、async関数のフレームを渡さないといけない、ということです。

pub fn add(self: *Self, frame: anyframe->Result) void;
  • wait

全てのジョブが完了するのを待ちます。 もしジョブがエラー共用体を返す場合、最後に発生したエラーを返します。

pub fn wait(self: *Self) CollectedResult;

さて、これだけだとasync何か関係あるの?という感じなのですが、続きは次回。

Zen言語のasync標準ライブラリ紹介〜event.Channel②〜

けっこう標準ライブラリが充実しているわけですが、ドキュメントがないのがもったいないですね。 まとまった時間が取れないので、ちょこちょこ書いていくシリーズです。

リクエストあれば、優先する、かも?

async関連のライブラリはstd.event下にあります。

今回はproducerとconsumerが多対多な (バッファ) 通信を実現するevent.Channelの続きです。

event.Channel

APIは割とシンプルです。

Tは、チャネルでやり取りする任意の型です。

/// 多数のproducer、多数のconsumer、スレッドセーフ、実行時にバッファサイズを設定可能
/// バッファが空であれば、consumerはsuspendし、producerにresumeされる
/// バッファがフルであれば、producerはsuspendし、consumerにresumeされる
pub fn Channel(comptime T: type) type;
/// 使用後はリソース解放のため`deinit()`を呼ぶ必要があります
/// `buffer`は`deinit()`が呼ばれるまで、使用可能でなければなりません
/// バッファ`0`のチャネルを作成する場合、`[0]T{}`を使います
pub fn init(self: *SelfChannel, buffer: []T) void;

pub fn deinit(self: *SelfChannel) void;
/// チャネルにデータを送ります
pub fn put(self: *SelfChannel, data: T) void;

/// チャネルからデータを取り出します
/// バッファが空の場合、新しいデータがputされた後に、この関数が完了します
pub fn get(self: *SelfChannel) T;

/// チャネルからデータを取り出します
/// バッファが空の場合、`null`が返ります
pub fn getOrNull(self: *SelfChannel) ?T;

もう少しサンプル

const std = @import("std");
const Channel = std.event.Channel;
const warn = std.debug.warn;
const testing = std.testing;

test "std.event.Channel" {
    var buf: [2]i32 = [_]i32{ 0 } ** 2;
    var channel: Channel(i32) = undefined;
    channel.init(&buf);
    defer channel.deinit();

    var handle = async getter(&channel);
    var sender = async putter(&channel);

    await handle;
    await sender;
}

fn getter(channel: *Channel(i32)) void {
    for([_]i32{ 0 } ** 10) |_| {
        const value = channel.get();
        warn("get {d}\n", .{value});
    }
}

fn putter(channel: *Channel(i32)) void {
    const inputs = [_]i32{ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
    for (inputs) |input| {
        warn("put {d}\n", .{ input });
        channel.put(input);
    }
}

実行すると、実行順は非決定的です。

put 1
put 2
get 1
put 3
get 2
put 4
get 3
put 5
get 4
put 6
get 5
put 7
get 6
put 8
get 7
put 9
get 8
put 10
get 9
get 10
put 1
get 1
put 2
get 2
put 3
get 3
put 4
get 4
put 5
put 6
get 5
put 7
get 6
put 8
get 7
put 9
get 8
put 10
get 9
get 10

さらにもう少しサンプル

2つのproducerと2つのconsumerを作ってみます。

test "std.event.Channel multi-producer multi-consumer" {
    var buf: [2]i32 = [_]i32{ 0 } ** 2;
    var channel: Channel(i32) = undefined;
    channel.init(&buf);
    defer channel.deinit();

    var handle1 = async getter(&channel);
    var handle2 = async getter(&channel);
    var sender1 = async putter(&channel);
    var sender2 = async putter(&channel);

    await handle1;
    await handle2;
    await sender1;
    await sender2;
}

ちゃんと動いているっぽいですね。

get 1
put 1
get 1
get 2
put 1
put 2
put 2
put 3
put 3
get 3
get 2
put 4
get 3
put 4
put 5
get 4
get 4
get 5
get 5
put 6
get 6
put 7
get 7
get 8
put 8
put 5
put 9
get 6
put 10
put 6
get 10
put 7
get 9
get 7
put 8
get 8
put 9
get 9
put 10
get 10

Zen言語のasync標準ライブラリ紹介〜event.Channel①〜

はじめに

けっこう標準ライブラリが充実しているわけですが、ドキュメントがないのがもったいないですね。 まとまった時間が取れないので、ちょこちょこ書いていくシリーズです。

リクエストあれば、優先する、かも?

async関連のライブラリはstd.event下にあります。

今回はproducerとconsumerが多対多な (バッファ) 通信を実現するevent.Channelです。

event.Channel

サンプル

getter()putter()の2つのasync関数を使い、putter()でチャネルに書き込み、getter()でチャネルから読み込みます。

getter()では、チャネルから1234を取得できることをテストしています。 先にgetter()を実行していますが、チャネルが空なので、putter()に制御が移行し、putter()1234をチャネルに書いた後、再びgetter()に制御が移るため、テストが成功します。

const std = @import("std");
const Channel = std.event.Channel;
const testing = std.testing;

test "std.event.Channel" {
    var buf: [2]i32 = [_]i32{ 0 } ** 2;
    var channel: Channel(i32) = undefined;
    channel.init(&buf);
    defer channel.deinit();

    var handle = async getter(&channel);
    var sender = async putter(&channel);

    await handle;
    await sender;
}

fn getter(channel: *Channel(i32)) void {
    const value1 = channel.get();
    testing.equal(@to(i32, 1234), value1);
}

fn putter(channel: *Channel(i32)) void {
    channel.put(1234);
}

eventを使うテストでは、--test-evented-ioオプションを追加しなければいけません。

$ zen test src/main.zen --test-evented-io
1/1 test "std.event.Channel"...OK
All 1 tests have succesfully passed.

Channelget()put()とは、両方suspendポイントを持っています。

実行順は非決定的

上記プログラムに、大体実行順になるように番号を出力するコードを追加します。

const std = @import("std");
const Channel = std.event.Channel;
const warn = std.debug.warn;
const testing = std.testing;

test "std.event.Channel" {
    var buf: [2]i32 = [_]i32{ 0 } ** 2;
    var channel: Channel(i32) = undefined;
    channel.init(&buf);
    defer channel.deinit();

    warn("\n", .{});
    warn("seq: 1\n", .{});
    var handle = async getter(&channel);
    warn("seq: 3\n", .{});
    var sender = async putter(&channel);
    warn("seq: 5\n", .{});

    await handle;
    warn("seq: 7\n", .{});
    await sender;
}

fn getter(channel: *Channel(i32)) void {
    warn("seq: 2\n", .{});
    const value1 = channel.get();
    warn("seq: 6\n", .{});
    testing.equal(@to(i32, 1234), value1);
}

fn putter(channel: *Channel(i32)) void {
    warn("seq: 4\n", .{});
    channel.put(1234);
    warn("seq: 8\n", .{});
}

イベントループがディスパッチするタイミング次第で、2つの実行順序を取る可能性があるようです。

$ zen test src/main.zen --test-evented-io
1/1 test "std.event.Channel"...
seq: 1
seq: 2
seq: 3
seq: 4
seq: 5
seq: 6
seq: 7
seq: 8
OK
$ zen test src/main.zen --test-evented-io
1/1 test "std.event.Channel"...
seq: 1
seq: 2
seq: 3
seq: 4
seq: 5
seq: 8
seq: 6
seq: 7
OK

Zen言語の標準ライブラリ紹介〜progress〜

はじめに

けっこう標準ライブラリが充実しているわけですが、ドキュメントがないのがもったいないですね。 まとまった時間が取れないので、ちょこちょこ書いていくシリーズです。

リクエストあれば、優先する、かも?

今日はコンソールにプログレスバーを表示する機能、いってみましょう。

std.progress

a, b, c, dという4つのタスクがあり、各タスクの進捗率を1から100で表示したいとします。 例えば、次の画像のイメージです。

f:id:tomo-wait-for-it-yuki:20200228213831p:plain f:id:tomo-wait-for-it-yuki:20200228213828p:plain f:id:tomo-wait-for-it-yuki:20200228213825p:plain f:id:tomo-wait-for-it-yuki:20200228213822p:plain

コードは次のようにします。

const std = @import("std");
const Progress = std.Progress;
const testing = std.testing;

test "progress bar" {
    // Progress のインスタンス生成
    var progress = Progress{};

    // タスク名
    const tasks = [_][]const u8{
        "a",
        "b",
        "c",
        "d",
    };
    // タスク4つで `progress` を開始
    const root = try progress.start("", tasks.len);
    defer root.end();

    for (tasks) |task| {
        // 各タスクごとにトータル進捗を100で子ノード作成
        var node = root.start(task, 100);
        // 子ノードをアクティブに設定
        node.activate();
        for ([_]u1{0} ** 100) |_| {
            // 進捗を`1`進める
            node.completeOne();
            std.time.sleep(50 * std.time.millisecond);
        }
        node.end();
    }
}

Zen言語の標準ライブラリ紹介〜rand②〜

はじめに

けっこう標準ライブラリが充実しているわけですが、ドキュメントがないのがもったいないですね。 まとまった時間が取れないので、ちょこちょこ書いていくシリーズです。

リクエストあれば、優先する、かも?

乱数について続きを見ていきましょう。 前回は、Randomインタフェースと、それを実装する乱数生成器について、簡単な説明をしました。

今回は、Randomインタフェースの使い方です。

std.rand.bytes

bufで渡されたスライスをランダムなバイトで埋めます。 rRandomインタフェースを実装する乱数生成器を渡します。 これは、Randomインタフェースのfillを直接呼び出すのと同じです。

pub fn bytes(r: Random, buf: []u8) void
test "std.rand.bytes" {
    var x = rand.Xoroshiro128.init(0);
    var buf = [_]u8{ 0 } ** 10;

    rand.bytes(&x, &buf);
    debug.warn("random bytes = 0x{x}\n", .{buf});
}

std.rand.boolean

ランダムなブール値を取得します。

pub fn boolean(r: Random) bool

std.rand.int

ランダムなT型の整数値を取得します。

/// Returns a random int `i` such that `0 <= i <= maxInt(T)`.
/// `i` is evenly distributed.
pub fn int(r: Random, comptime T: type) T
test "std.rand.int" {
    var x = rand.Xoroshiro128.init(1);

    debug.warn("random i32 = {}\n", .{ rand.int(&x, i32) });
    debug.warn("random u1 = {}\n", .{ rand.int(&x, u1) });
}

その他

指定範囲の整数値を取得する関数など、色々あります。 一覧は公式ドキュメントをどうぞ。

zen-lang.org

Zen言語の標準ライブラリ紹介〜rand①〜

はじめに

けっこう標準ライブラリが充実しているわけですが、ドキュメントがないのがもったいないですね。 まとまった時間が取れないので、ちょこちょこ書いていくシリーズです。

リクエストあれば、優先する、かも?

乱数について見ていきましょう。

std.rand.Random

まずはRandomインタフェースです。 std.randの多くのAPIは、Randomインタフェースを実装する構造体インスタンスを必要とします。

Randomインタフェースは、ランダムな値でスライスを埋めるfill()関数を実装しなければなりません。

pub const Random = interface {
    fn fill(buf: []u8) void;
};

このRandomインタフェースを実装する構造体として、std.randには次の乱数生成器があります。

  • Xoroshiro128
  • Isaac64
  • Sfc64

例えば、Xoroshiro128を直接使うのであれば、次のようにinit()で初期化した後、fill()で乱数を取得します。

const std = @import("std");
const rand = std.rand;
const debug = std.debug;
const testing = std.testing;

test "std.rand.Xoroshiro128" {
    var x = rand.Xoroshiro128.init(0);
    var buf = [_]u8{ 0 } ** 10;

    x.fill(&buf);

    debug.warn("random value = 0x{x}\n", .{buf});
}

実行結果は次の通りです。

random value = 0xa333d71ca4469950fa4b

疑似乱数なので、この使い方だと毎回同じ値が出力されます。

Zen言語の標準ライブラリ紹介〜lazy〜

はじめに

けっこう標準ライブラリが充実しているわけですが、ドキュメントがないのがもったいないですね。 まとまった時間が取れないので、ちょこちょこ書いていくシリーズです。

リクエストあれば、優先する、かも?

グローバルなデータをスレッド安全に初期化するLazyInitです。

std.lazy.LazyInit

コンパイル時に初期値が決まらないグローバルデータを、安全に初期化するのは意外と面倒です。 LazyInitはそのような場合に、安全にグローバルデータを初期化することができます。

lazy.init() は任意の型Tに対して、LazyInit(T)の値を返します。 この時、T型の値は未定義です。

pub fn init(comptime T: type) LazyInit(T)

LazyInitget()resolve()の2つのメソッドを持ちます。

// LazyInit(T) {
    /// Returns a usable pointer to the initialized data,
    /// or returns null, indicating that the caller should
    /// perform the initialization and then call resolve().
    pub fn get(self: *Self) ?Ptr


    pub fn resolve(self: *Self) void

get()null が返ってきたら、値 (data フィールド) を初期化して、resolve() を呼びます。 内部は簡単なステートマシンを持っており、resolve()を呼ぶと、初期化済み状態になります。

const std = @import("std");
const lazy = std.lazy;
const testing = std.testing;

var global = lazy.init(u32);

test "std.lazy.LazyInit" {
    if (global.get()) |value| {
        // `resolve()`を呼ぶまでは `null` が返るので、ここには到達しない
        unreachable;
    } else {
        // `null` が取得できた場合、データを初期化して、`resolve`を呼ぶ
        global.data = 42;
        global.resolve();
    }

    if (global.get()) |value| {
        // get returns `*T`
        testing.equal(@to(u32, 42), value.*);
    } else {
        unreachable;
    }
}

注意点としては、get()nullが返ってきたら、必ず初期化しないといけない、ということです。 次のコードは無限ループに陥ります。

test "deadlock" {
    const S = struct {
        var g = lazy.init(u32);
    };
    if (S.g.get()) |_| {}
    if (S.g.get()) |_| {}
}

個人的には、lazy.init()が初期化関数を受け取れるようになっていて、最初に呼ばれたget()内で初期化関数呼び出してくれる方が良いなぁ、と思ったりします。