Hashable conformance for Async(Throwing)Stream.Continuation
01 何が問題だったのか
複数の AsyncStream を同時に扱う場面では、AsyncStream<Element>.Continuation を集合に入れて管理したいことがあります。たとえば、同じ値を複数の購読者に配信するオブザーバーのような仕組みを AsyncStream を土台に組み、新しい購読者が現れるたびに AsyncStream を作って発行元に continuation を保持させる、といったパターンです。購読が終わったら onTermination コールバックの中で、終了した continuation を集合から取り除く必要があります。
@MainActor private class Sender {
var value: Int = 0 {
didSet {
for c in continuations {
c.yield(value)
}
}
}
var values: some AsyncSequence<Int, Never> {
AsyncStream<Int>(bufferingPolicy: .bufferingNewest(1)) { continuation in
continuation.yield(value)
self.continuations.insert(continuation) // ここで Hashable が必要
continuation.onTermination = { _ in
DispatchQueue.main.async {
self.continuations.remove(continuation)
}
}
}
}
private var continuations: Set<AsyncStream<Int>.Continuation> = []
}
しかし、AsyncStream.Continuation と AsyncThrowingStream.Continuation はこれまで Hashable にも Equatable にも適合しておらず、Set に入れたり辞書のキーにしたりすることができませんでした。そのため上のようなコードを書くには、continuation をクラスでラップして ObjectIdentifier ベースの人工的なキーを与えるなど、余計な間接参照を挟む必要がありました。一方で、AsyncStream.Continuation は内部的に AsyncStream._Storage への参照を保持していて、そのアイデンティティを使えば素直に Hashable を実装できる状態でした。
02 どのように解決されるのか
AsyncStream.Continuation と AsyncThrowingStream.Continuation に Hashable 適合を追加します(Equatable も同時に得られます)。これにより、continuation をそのまま Set に入れたり辞書のキーにしたりできるようになります。
var continuations: Set<AsyncStream<Int>.Continuation> = []
let stream = AsyncStream<Int> { continuation in
continuations.insert(continuation)
continuation.onTermination = { _ in
continuations.remove(continuation)
}
}
等価性のルールは、continuation のアイデンティティに基づきます。
AsyncStream.init/AsyncThrowingStream.initの build クロージャが呼ばれるたびに、他のどの continuation とも等しくない新しい continuation が渡されます。- 同じ continuation をコピーして得た値同士は互いに等しくなります。
yield(_:)で値やエラーを流したり、finish()でストリームを終わらせたり、イテレーションをキャンセルしたりしても、等価性は変化しません。onTerminationを代入しても等価性は変化しません。
つまり、一度得た continuation は、その生涯を通じて同じ「同一性」を持ち続け、コレクションから安全に挿入・削除できます。冒頭の例のような購読者管理コードも、ラッパークラスを介さずに Set<AsyncStream<Int>.Continuation> のまま書けるようになります。