Backpressure support for AsyncStream
01 何が問題だったのか
SE-0314 で導入された AsyncStream / AsyncThrowingStream は、コールバックやデリゲートのような同期的な値の発生源を AsyncSequence に橋渡しするためのルート AsyncSequence です。ただし、実際に使ってみるとルート AsyncSequence に求められるいくつかの振る舞いを満たせていないことがわかってきました。特に大きな問題が バックプレッシャー のサポート不足です。
既存 API でバックプレッシャーが表現できない
バックプレッシャーとは、消費側の処理が追いついていないときに生産側を止め、追いついたら再開させるフロー制御の仕組みです。既存の AsyncStream.Continuation.yield() は戻り値の YieldResult として現在のバッファ深さを返すので「これ以上生産するな」という停止シグナルとしては使えますが、「もう一度生産を再開してよい」というシグナルを生産側に伝える手段がありません。
そのため、既存 API でバックプレッシャーを表現しようとすると、一定時間生産を止めてから投機的に再開する、というタイマーベースのバックオフしか書けません。これはレイテンシもリソース効率も悪いパターンで、ネットワークやファイル I/O のようにバッファ溢れを避けたい現実的なユースケースには向いていません。
終了に関する振る舞いの曖昧さ
AsyncStream の終了挙動にも実用上困る点があります。
Continuationがfinish()を呼ばれないままdeinitされると、ストリームが永遠に終了せず消費側のタスクが止まってしまいます。挙動の変更は既存コードの意味を壊しかねないため、既存 API 内では直せません。AsyncStreamは複数のイテレータ生成を許しているのに、AsyncThrowingStreamは 2 つ目のイテレータが値を待つとfatalErrorで落ちます。ドキュメントには unicast のつもりで書かれているのに、実際の実装はバリアントごとに食い違っていました。- 生産側に終了を知らせる
onTerminationは、複数コンシューマのうち どれか 1 つがキャンセル されただけで呼ばれてしまい、残りのコンシューマごと停止してしまいます。
これらは Continuation ベースの既存 API を拡張して直そうとすると、意味論やパフォーマンスを壊してしまうため、新しい API を追加する形で解決する必要がありました。
02 どのように解決されるのか
AsyncStream / AsyncThrowingStream に、バックプレッシャー戦略を指定してストリームと書き込み口(Source)のペアを作る新しいファクトリメソッド makeStream(of:backpressureStrategy:) を追加します。生産側は Source に対して write してバッファへ値を積み、バッファが上限に達したら生産を一時停止し、再開してよい状態になったらコールバックで通知を受け取る、という形でバックプレッシャーを制御できます。
新しい API が提供するのは マルチプロデューサ / シングルコンシューマ の厳密な unicast セマンティクスで、複数イテレータ問題や onTermination の多重発火問題も同時に解消されます。
このダイジェストが扱う SE-0406 の API 形状は Swift Evolution レビューで Returned for revision となっており、正式に標準ライブラリに入った最終形ではありません。以下はあくまで当時提案されていた設計の要約です。
バックプレッシャー付きストリームを作る
makeStream(of:backpressureStrategy:) はストリームと Source のタプルを返します。Source を生産側に、ストリームを消費側に渡して使います。戦略としてはハイウォーターマーク / ローウォーターマークを用いる .watermark(low:high:) が用意されており、バッファ量が high に達したら生産側を停止させ、low まで減ったら再開させます。
let (stream, source) = AsyncStream.makeStream(
of: Int.self,
backpressureStrategy: .watermark(low: 2, high: 4)
)
同期的に書き込む: write と WriteResult
もっとも低レベルで高速な書き込み API は同期版の write(_:) / write(contentsOf:) です。これらは WriteResult を返し、「そのまま生産を続けてよい」か「コールバックをエンキューして停止すべきか」を呼び出し元に伝えます。停止が必要なときだけコールバック用のアロケーションが発生するため、通常時のオーバーヘッドが小さく済みます。
do {
let writeResult = try source.write(contentsOf: sequence)
switch writeResult {
case .produceMore:
// さらに生産を続ける
case .enqueueCallback(let callbackToken):
source.enqueueCallback(token: callbackToken, onProduceMore: { result in
switch result {
case .success:
// 生産を再開する
case .failure(let error):
// 下流の生産源を終了させる
}
})
}
} catch {
// ストリームが既に終了している場合は write が throw する
}
enqueueCallback(token:onProduceMore:) は、もらった CallbackToken と一緒にクロージャを渡しておくと、戦略側が「もう一度生産してよい」と判断したタイミングで onProduceMore を呼んでくれます。登録済みコールバックを取り消したいときは cancelCallback(token:) を使います。
よりシンプルな書き込み口
毎回 WriteResult を分岐させるのが煩雑な場合のために、より使いやすいラッパーも用意されます。コールバック版と async 版の 2 種類があり、ブロッキング・コールバックベース・非同期のいずれの生産源にも橋渡しできます。
// 書き込み + 生産再開を知らせるコールバックを同時に渡す
try source.write(contentsOf: sequence, onProduceMore: { result in
switch result {
case .success:
// 生産を再開する
case .failure(let error):
// 下流の生産源を終了させる
}
})
// 生産を再開してよくなるまで suspend する async 版
try await source.write(contentsOf: sequence)
async 版は「生産してよい状態になるまで自動で待つ」という、もっとも直感的な書き方を提供します。
消費側の終了
消費側を終了させる方法は 2 通りです。
source.finish()(throwing 版ではsource.finish(throwing:))を呼ぶSourceがdeinitされる
どちらの場合も、既にバッファに積まれている要素を消費しきった後 でイテレーションが nil(throwing 版では throw)を返します。Source の deinit でも終了させる設計により、Continuation の deinit で止まってしまう既存の問題も解消されます。終了後の write は AsyncStreamAlreadyFinishedError を throw します。
let (stream, source) = AsyncStream.makeStream(
of: Int.self,
backpressureStrategy: .watermark(low: 2, high: 4)
)
_ = try await source.write(1)
source.finish()
for await element in stream {
print(element)
}
print("Finished")
// 1
// Finished
生産側の終了
生産側には Source.onTermination クロージャで終了が通知されます。通知が発生するのは次の場合です。
- イテレータが作られないままストリームが
deinitされた - 作られたイテレータが
deinitされた finish()/finish(throwing:)後にすべての要素が消費された- 消費側タスクがキャンセルされた
unicast になったことで、既存 API で問題になっていた「複数コンシューマのうち 1 つのキャンセルで全体が止まる」挙動は発生しなくなります。生産側も終了後に write しようとすると AsyncStreamAlreadyFinishedError が throw されます。
今後の見通し
この提案は最終形ではなく Returned for revision となっており、以下のような発展方向が議論の俎上に載っていました(speculative で実現を約束するものではありません)。
- 消費と生産のレートに応じて閾値を自動調整する適応的なバックプレッシャー戦略。
- 要素が
Collectionの場合に各要素のメモリサイズを考慮した戦略。 - 既存の
Async[Throwing]Stream.Continuationベース API の段階的な deprecate。 Sourceが適合するようなWriter/AsyncWriterプロトコルの導入。