Swift Digest
SE-0314 | Swift Evolution

AsyncStream and AsyncThrowingStream

Proposal
SE-0314
Authors
Philippe Hausler, Tony Parker, Ben D. Jones, Nate Cook
Review Manager
Doug Gregor
Status
Implemented (Swift 5.5)

01 何が問題だったのか

SE-0300 で導入された withUnsafeContinuation / withCheckedContinuation は、完了時に1回だけコールバックが呼ばれるような「単一の結果を返す」非同期APIを async 関数に橋渡しするのに向いています。たとえば次のような API です。

func getInt(completion: @escaping (Int) -> Void) {
    DispatchQueue(label: "myQueue").async {
        sleep(1)
        completion(42)
    }
}

func getInt() async -> Int {
    await withUnsafeContinuation { continuation in
        getInt { result in
            continuation.resume(returning: result)
        }
    }
}

一方、世の中には「時間をかけて複数の値を流し続ける」タイプの非同期APIが数多く存在します。デリゲートや、何度も呼び出されるコールバックで値を渡してくるパターンです。

class QuakeMonitor {
  var quakeHandler: (Quake) -> Void
  func startMonitoring()
  func stopMonitoring()
}

let monitor = QuakeMonitor()
monitor.quakeHandler = { quake in
  // ...
}
monitor.startMonitoring()
// ...
monitor.stopMonitoring()

AppKit の NSSpeechRecognizerDelegate のように、認識されたコマンドごとに呼ばれるだけの「通知型」のデリゲートも同じ構造です。これらは本質的に非同期ですが、返値が1つに決まらないため、そのままでは async 関数として表現できません。

こうしたAPIを SE-0298 で導入された AsyncSequence として扱えれば、for awaitAsyncSequence の各種メソッドで素直に消費できます。ところが、AsyncSequence は呼び出し側が next() を呼ぶたびに1要素を要求する backpressure ベースの仕組みであるのに対し、コールバックやデリゲートは生産側のタイミングで勝手に値を押し込んでくる backpressure のない仕組みです。この差を埋めるには、バッファリング・ドロップ・ブロッキングのいずれかの戦略を取る中間層が必要で、各自が自前のロックつきバッファを書くと、プラットフォームごとに異なるプリミティブを使わざるを得ず、スレッドセーフに仕上げるのも難しくなります。

標準ライブラリとして、「複数回値を送る側」と「AsyncSequence として受け取る側」を安全につなぐ仕組みが必要でした。

02 どのように解決されるのか

複数の値を非同期に流すAPIを AsyncSequence に橋渡しするための型として、AsyncStream<Element>AsyncThrowingStream<Element, Failure> を追加します。どちらもネストされた Continuation 型を持ち、外側の型が消費側(AsyncSequence としてのインターフェース)内側の Continuation が生産側 という役割分担になっています。ContinuationSendable で、ストリームの外の並行な文脈から値を送り込めます。

エラーのない AsyncStream を作る

AsyncStream の初期化時に要素型と、Continuation を受け取るクロージャを渡します。クロージャの中で continuation.yield(_:) を呼ぶたびに、値がバッファに積まれるか、待機中のイテレータに直接渡されます。先ほどの QuakeMonitor は次のように AsyncStream のインターフェースを持てます。

extension QuakeMonitor {
  static var quakes: AsyncStream<Quake> {
    AsyncStream { continuation in
      let monitor = QuakeMonitor()
      monitor.quakeHandler = { quake in
        continuation.yield(quake)
      }
      continuation.onTermination = { _ in
        monitor.stopMonitoring()
      }
      monitor.startMonitoring()
    }
  }
}

for await quake in QuakeMonitor.quakes {
    // ...
}

消費側は for await や、AsyncSequenceprefix などの各種メソッドで値を取り出せます。

for await notif in NotificationCenter
      .notifications(for: ...)
      .prefix(3)
{
    // update with notif
}

AsyncStream もそのイテレータも @Sendable ではなく、同じストリームを複数箇所から並行にイテレートするのはプログラマエラーです(next() の同時呼び出しは fatalError になります)。

AsyncThrowingStream でエラーを伝える

途中でエラーを投げうるAPIには AsyncThrowingStream を使います。終了時に finish(throwing:) にエラーを渡すと、イテレータがその時点でスローします。

func findVegetables(shoppingList: [String]) -> AsyncThrowingStream<Vegetable, Error> {
  AsyncThrowingStream { continuation in
    buyVegetables(
      shoppingList: shoppingList,
      onGotVegetable: { veggie in continuation.yield(veggie) },
      onAllVegetablesFound: { continuation.finish() },
      onNonVegetable: { error in continuation.finish(throwing: error) }
    )
  }
}

AsyncThrowingStreamFailure で一般化されていますが、初期化子は Failure == Error のときだけ提供されます。ストリームの消費を終わらせるには、必ず finish()(または finish(throwing:))を呼ぶ必要があります。バッファに積まれた要素が消費し切られたあとで、nil(通常終了)またはスローされたエラー(異常終了)が伝わります。finish() は冪等で、2回目以降の呼び出しや、終了後の yield(_:) は無視されます。

バッファリングポリシー

AsyncStream は生産側(backpressure なし)と消費側(backpressure あり)の間のバッファとして動きます。初期化時に bufferingPolicy を指定することで、バッファの挙動を選べます。

public enum BufferingPolicy {
  case unbounded                 // 無制限にバッファ(デフォルト)
  case bufferingOldest(Int)      // 満杯なら新しい要素を捨て、古い方を残す
  case bufferingNewest(Int)      // 満杯なら古い要素を捨て、新しい方を残す
}

デフォルトは .unbounded で、通知やDBレコードのように「すべての値を取りこぼしたくない」ケースに合います。上限 n を指定すると直近 n 個だけを残す動きになり、0 を指定すれば「イテレータが待っていなければ即捨てる」という純粋なドロップ動作になります。

yield(_:) の結果

Continuation.yield(_:) は、値がどう扱われたかを表す YieldResult を返します。

public enum YieldResult {
  case enqueued(remaining: Int) // バッファに積まれた or 待機中イテレータに即配送された
  case dropped(Element)         // バッファが満杯で捨てられた
  case terminated               // ストリームが既に終了していた
}

バッファが上限に達したらエラーとして扱いたい、ドロップされた値を次回にまとめて送りたい、といった用途で使えます。remaining は yield を呼び出しが排他的な場合にのみ有意な値です。AsyncThrowingStream.Continuation.YieldResult も同様で、こちらは enqueued / dropped / terminated のケースを持ちます。

終了とキャンセルのハンドリング

Continuation.onTermination(Termination) -> Void を設定しておくと、イテレーションが終わったとき、ストリームがスコープを抜けたとき、あるいは消費側のタスクがキャンセルされたときに呼ばれます。Termination.finished.cancelled のどちらかで(AsyncThrowingStream の場合は .finished(Failure?) でエラー付きの終了も区別できます)、ストリーム開始時に確保したリソースの解放に利用できます。

AsyncStream<Int> { continuation in
  continuation.onTermination = { termination in
    switch termination {
    case .finished:  print("Regular finish")
    case .cancelled: print("Cancellation")
    }
  }
  // ...
}

キャンセル時は、まず onTermination.cancelled で呼ばれ、その後イテレータは nil を返します。

便利な yield のバリエーション

  • yield(with:)Result<Element, Failure> を受け取り、成功なら値を yield、失敗なら finish(throwing:) 相当の動きをします。
  • 要素型が Void のときは、引数なしの yield() が使えます。

バックプレッシャーが効く unfolding 初期化子

非同期関数そのもの(呼び出されるたびに次の要素を返すような関数)を値源にしたい場合は、init(unfolding:onCancel:) でストリームを作れます。producenil を返した時点でストリームは終了し、以降の呼び出しも必ず nil を返すという AsyncSequence のルールは AsyncStream が面倒を見てくれます。

let stream = AsyncStream(unfolding: {
  await nextChunk() // nil で終了
})

この初期化子では、produce の呼び出し自体が消費側の next() に駆動される形になるため、素直な backpressure 動作が得られます。AsyncThrowingStream にも unfolding: 版があり、produceasync throws になります。