AsyncStream and AsyncThrowingStream
01 何が問題だったのか
SE-0300 で導入された withUnsafeContinuation / withCheckedContinuation は、完了時に1回だけコールバックが呼ばれるような「単一の結果を返す」非同期APIを async 関数に橋渡しするのに向いています。たとえば次のような API です。
func getInt(completion: @escaping (Int) -> Void) {
DispatchQueue(label: "myQueue").async {
sleep(1)
completion(42)
}
}
func getInt() async -> Int {
await withUnsafeContinuation { continuation in
getInt { result in
continuation.resume(returning: result)
}
}
}
一方、世の中には「時間をかけて複数の値を流し続ける」タイプの非同期APIが数多く存在します。デリゲートや、何度も呼び出されるコールバックで値を渡してくるパターンです。
class QuakeMonitor {
var quakeHandler: (Quake) -> Void
func startMonitoring()
func stopMonitoring()
}
let monitor = QuakeMonitor()
monitor.quakeHandler = { quake in
// ...
}
monitor.startMonitoring()
// ...
monitor.stopMonitoring()
AppKit の NSSpeechRecognizerDelegate のように、認識されたコマンドごとに呼ばれるだけの「通知型」のデリゲートも同じ構造です。これらは本質的に非同期ですが、返値が1つに決まらないため、そのままでは async 関数として表現できません。
こうしたAPIを SE-0298 で導入された AsyncSequence として扱えれば、for await や AsyncSequence の各種メソッドで素直に消費できます。ところが、AsyncSequence は呼び出し側が next() を呼ぶたびに1要素を要求する backpressure ベースの仕組みであるのに対し、コールバックやデリゲートは生産側のタイミングで勝手に値を押し込んでくる backpressure のない仕組みです。この差を埋めるには、バッファリング・ドロップ・ブロッキングのいずれかの戦略を取る中間層が必要で、各自が自前のロックつきバッファを書くと、プラットフォームごとに異なるプリミティブを使わざるを得ず、スレッドセーフに仕上げるのも難しくなります。
標準ライブラリとして、「複数回値を送る側」と「AsyncSequence として受け取る側」を安全につなぐ仕組みが必要でした。
02 どのように解決されるのか
複数の値を非同期に流すAPIを AsyncSequence に橋渡しするための型として、AsyncStream<Element> と AsyncThrowingStream<Element, Failure> を追加します。どちらもネストされた Continuation 型を持ち、外側の型が消費側(AsyncSequence としてのインターフェース)、内側の Continuation が生産側 という役割分担になっています。Continuation は Sendable で、ストリームの外の並行な文脈から値を送り込めます。
エラーのない AsyncStream を作る
AsyncStream の初期化時に要素型と、Continuation を受け取るクロージャを渡します。クロージャの中で continuation.yield(_:) を呼ぶたびに、値がバッファに積まれるか、待機中のイテレータに直接渡されます。先ほどの QuakeMonitor は次のように AsyncStream のインターフェースを持てます。
extension QuakeMonitor {
static var quakes: AsyncStream<Quake> {
AsyncStream { continuation in
let monitor = QuakeMonitor()
monitor.quakeHandler = { quake in
continuation.yield(quake)
}
continuation.onTermination = { _ in
monitor.stopMonitoring()
}
monitor.startMonitoring()
}
}
}
for await quake in QuakeMonitor.quakes {
// ...
}
消費側は for await や、AsyncSequence の prefix などの各種メソッドで値を取り出せます。
for await notif in NotificationCenter
.notifications(for: ...)
.prefix(3)
{
// update with notif
}
AsyncStream もそのイテレータも @Sendable ではなく、同じストリームを複数箇所から並行にイテレートするのはプログラマエラーです(next() の同時呼び出しは fatalError になります)。
AsyncThrowingStream でエラーを伝える
途中でエラーを投げうるAPIには AsyncThrowingStream を使います。終了時に finish(throwing:) にエラーを渡すと、イテレータがその時点でスローします。
func findVegetables(shoppingList: [String]) -> AsyncThrowingStream<Vegetable, Error> {
AsyncThrowingStream { continuation in
buyVegetables(
shoppingList: shoppingList,
onGotVegetable: { veggie in continuation.yield(veggie) },
onAllVegetablesFound: { continuation.finish() },
onNonVegetable: { error in continuation.finish(throwing: error) }
)
}
}
AsyncThrowingStream は Failure で一般化されていますが、初期化子は Failure == Error のときだけ提供されます。ストリームの消費を終わらせるには、必ず finish()(または finish(throwing:))を呼ぶ必要があります。バッファに積まれた要素が消費し切られたあとで、nil(通常終了)またはスローされたエラー(異常終了)が伝わります。finish() は冪等で、2回目以降の呼び出しや、終了後の yield(_:) は無視されます。
バッファリングポリシー
AsyncStream は生産側(backpressure なし)と消費側(backpressure あり)の間のバッファとして動きます。初期化時に bufferingPolicy を指定することで、バッファの挙動を選べます。
public enum BufferingPolicy {
case unbounded // 無制限にバッファ(デフォルト)
case bufferingOldest(Int) // 満杯なら新しい要素を捨て、古い方を残す
case bufferingNewest(Int) // 満杯なら古い要素を捨て、新しい方を残す
}
デフォルトは .unbounded で、通知やDBレコードのように「すべての値を取りこぼしたくない」ケースに合います。上限 n を指定すると直近 n 個だけを残す動きになり、0 を指定すれば「イテレータが待っていなければ即捨てる」という純粋なドロップ動作になります。
yield(_:) の結果
Continuation.yield(_:) は、値がどう扱われたかを表す YieldResult を返します。
public enum YieldResult {
case enqueued(remaining: Int) // バッファに積まれた or 待機中イテレータに即配送された
case dropped(Element) // バッファが満杯で捨てられた
case terminated // ストリームが既に終了していた
}
バッファが上限に達したらエラーとして扱いたい、ドロップされた値を次回にまとめて送りたい、といった用途で使えます。remaining は yield を呼び出しが排他的な場合にのみ有意な値です。AsyncThrowingStream.Continuation.YieldResult も同様で、こちらは enqueued / dropped / terminated のケースを持ちます。
終了とキャンセルのハンドリング
Continuation.onTermination に (Termination) -> Void を設定しておくと、イテレーションが終わったとき、ストリームがスコープを抜けたとき、あるいは消費側のタスクがキャンセルされたときに呼ばれます。Termination は .finished と .cancelled のどちらかで(AsyncThrowingStream の場合は .finished(Failure?) でエラー付きの終了も区別できます)、ストリーム開始時に確保したリソースの解放に利用できます。
AsyncStream<Int> { continuation in
continuation.onTermination = { termination in
switch termination {
case .finished: print("Regular finish")
case .cancelled: print("Cancellation")
}
}
// ...
}
キャンセル時は、まず onTermination が .cancelled で呼ばれ、その後イテレータは nil を返します。
便利な yield のバリエーション
yield(with:)はResult<Element, Failure>を受け取り、成功なら値を yield、失敗ならfinish(throwing:)相当の動きをします。- 要素型が
Voidのときは、引数なしのyield()が使えます。
バックプレッシャーが効く unfolding 初期化子
非同期関数そのもの(呼び出されるたびに次の要素を返すような関数)を値源にしたい場合は、init(unfolding:onCancel:) でストリームを作れます。produce が nil を返した時点でストリームは終了し、以降の呼び出しも必ず nil を返すという AsyncSequence のルールは AsyncStream が面倒を見てくれます。
let stream = AsyncStream(unfolding: {
await nextChunk() // nil で終了
})
この初期化子では、produce の呼び出し自体が消費側の next() に駆動される形になるため、素直な backpressure 動作が得られます。AsyncThrowingStream にも unfolding: 版があり、produce が async throws になります。