Zen言語のasync標準ライブラリ紹介〜event.Channel②〜

けっこう標準ライブラリが充実しているわけですが、ドキュメントがないのがもったいないですね。 まとまった時間が取れないので、ちょこちょこ書いていくシリーズです。

リクエストあれば、優先する、かも?

async関連のライブラリはstd.event下にあります。

今回はproducerとconsumerが多対多な (バッファ) 通信を実現するevent.Channelの続きです。

event.Channel

APIは割とシンプルです。

Tは、チャネルでやり取りする任意の型です。

/// 多数のproducer、多数のconsumer、スレッドセーフ、実行時にバッファサイズを設定可能
/// バッファが空であれば、consumerはsuspendし、producerにresumeされる
/// バッファがフルであれば、producerはsuspendし、consumerにresumeされる
pub fn Channel(comptime T: type) type;
/// 使用後はリソース解放のため`deinit()`を呼ぶ必要があります
/// `buffer`は`deinit()`が呼ばれるまで、使用可能でなければなりません
/// バッファ`0`のチャネルを作成する場合、`[0]T{}`を使います
pub fn init(self: *SelfChannel, buffer: []T) void;

pub fn deinit(self: *SelfChannel) void;
/// チャネルにデータを送ります
pub fn put(self: *SelfChannel, data: T) void;

/// チャネルからデータを取り出します
/// バッファが空の場合、新しいデータがputされた後に、この関数が完了します
pub fn get(self: *SelfChannel) T;

/// チャネルからデータを取り出します
/// バッファが空の場合、`null`が返ります
pub fn getOrNull(self: *SelfChannel) ?T;

もう少しサンプル

const std = @import("std");
const Channel = std.event.Channel;
const warn = std.debug.warn;
const testing = std.testing;

test "std.event.Channel" {
    var buf: [2]i32 = [_]i32{ 0 } ** 2;
    var channel: Channel(i32) = undefined;
    channel.init(&buf);
    defer channel.deinit();

    var handle = async getter(&channel);
    var sender = async putter(&channel);

    await handle;
    await sender;
}

fn getter(channel: *Channel(i32)) void {
    for([_]i32{ 0 } ** 10) |_| {
        const value = channel.get();
        warn("get {d}\n", .{value});
    }
}

fn putter(channel: *Channel(i32)) void {
    const inputs = [_]i32{ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
    for (inputs) |input| {
        warn("put {d}\n", .{ input });
        channel.put(input);
    }
}

実行すると、実行順は非決定的です。

put 1
put 2
get 1
put 3
get 2
put 4
get 3
put 5
get 4
put 6
get 5
put 7
get 6
put 8
get 7
put 9
get 8
put 10
get 9
get 10
put 1
get 1
put 2
get 2
put 3
get 3
put 4
get 4
put 5
put 6
get 5
put 7
get 6
put 8
get 7
put 9
get 8
put 10
get 9
get 10

さらにもう少しサンプル

2つのproducerと2つのconsumerを作ってみます。

test "std.event.Channel multi-producer multi-consumer" {
    var buf: [2]i32 = [_]i32{ 0 } ** 2;
    var channel: Channel(i32) = undefined;
    channel.init(&buf);
    defer channel.deinit();

    var handle1 = async getter(&channel);
    var handle2 = async getter(&channel);
    var sender1 = async putter(&channel);
    var sender2 = async putter(&channel);

    await handle1;
    await handle2;
    await sender1;
    await sender2;
}

ちゃんと動いているっぽいですね。

get 1
put 1
get 1
get 2
put 1
put 2
put 2
put 3
put 3
get 3
get 2
put 4
get 3
put 4
put 5
get 4
get 4
get 5
get 5
put 6
get 6
put 7
get 7
get 8
put 8
put 5
put 9
get 6
put 10
put 6
get 10
put 7
get 9
get 7
put 8
get 8
put 9
get 9
put 10
get 10