DiscardingTaskGroups
01 何が問題だったのか
SE-0304で導入された TaskGroup / ThrowingTaskGroup は、構造化並行性の土台として、子タスクのキャンセル伝播・エラー伝播・寿命管理などを提供してくれます。ただし、それらの子タスクの戻り値を後から取り出せる(next() や AsyncSequence 適合を通じて消費できる)という性質のために、完了した子タスクの Task オブジェクトをグループがずっと保持し続けるという副作用があります。
この性質は、長時間にわたって終わらないループ、たとえばHTTPサーバのような「接続を受け付けては子タスクを起動する」タイプの処理と相性が悪く、以下のようなコードはメモリリークを引き起こします。
try await withThrowingTaskGroup(of: Void.self) { group in
while let newConnection = try await listeningSocket.accept() {
group.addTask {
handleConnection(newConnection)
}
}
}
listeningSocket が終端するか例外を投げるまで group は生き続けるため、数日稼働するサーバであれば何千もの Task オブジェクトが蓄積され、最終的にOSにプロセスを落とされます。
素朴な回避策として、ループの中で try await group.next() を呼び、完了したタスクを間引くことが思いつきます。しかし TaskGroup は Sendable ではなく別の Task に渡せないため、新規タスク投入と完了タスク消費は同じ Task で交互に行うしかありません。すべての子タスクが実行中のときは next() が待機してしまい、その間は新しい接続を受け付けられなくなる(ライブロック状態に陥る)という別の問題が発生します。
もう一段工夫した「事前に maxConcurrency 本だけタスクを走らせ、1本終わったら1本足す」というパターンなら機能しますが、maxConcurrency の適切な値を事前に決めるのは難しく、大きすぎるとメモリを無駄にし、小さすぎると並行性を活かせません。結果として、長時間走り続けるリクエスト受付ループのための、素直で安全な書き方が言語側から提供されていない状況でした。
02 どのように解決されるのか
新しいタスクグループ型として DiscardingTaskGroup と ThrowingDiscardingTaskGroup を導入します。それぞれ withDiscardingTaskGroup / withThrowingDiscardingTaskGroup で開きます。これらは通常の TaskGroup と似ていますが、次の点が異なります。
- 子タスクが完了した時点で、その結果も
Taskオブジェクトも 即座に破棄 します。完了タスクが溜まり続けることがありません。 next()やAsyncSequence適合を持ちません。子タスクの値を受け取る手段が無いぶん、子タスクの結果型は常にVoidに固定されます。
構造化並行性のプリミティブである点は通常のタスクグループと同じで、with[Throwing]DiscardingTaskGroup { body } は本体を抜ける前に、投入された全ての子タスクの完了を必ず待ちます。
基本の使い方
Motivationの例は、次のように書き換えるだけで安全になります。完了した子タスクはすぐに解放されるので、長時間走り続けてもメモリは増えません。
// GOOD, no leaks!
try await withThrowingDiscardingTaskGroup { group in
while let newConnection = try await listeningSocket.accept() {
group.addTask {
handleConnection(newConnection)
}
}
}
API
大まかには、TaskGroup から next() 系を取り除いた形になっています。
public struct DiscardingTaskGroup {
public mutating func addTask(
priority: TaskPriority? = nil,
operation: @Sendable @escaping () async -> Void
)
public mutating func addTaskUnlessCancelled(
priority: TaskPriority? = nil,
operation: @Sendable @escaping () async -> Void
) -> Bool
public var isEmpty: Bool
public func cancelAll()
public var isCancelled: Bool
}
public struct ThrowingDiscardingTaskGroup<Failure: Error> {
public mutating func addTask(
priority: TaskPriority? = nil,
operation: @Sendable @escaping () async throws -> Void
)
public mutating func addTaskUnlessCancelled(
priority: TaskPriority? = nil,
operation: @Sendable @escaping () async throws -> Void
) -> Bool
public var isEmpty: Bool
public func cancelAll()
public var isCancelled: Bool
}
どちらも Sendable ではありません(通常のタスクグループと同様)。
エラー伝播とキャンセルの扱い
通常の ThrowingTaskGroup では、子タスクのエラーは try await group.next() や try await group.waitForAll() を書いた場所で再送出されます。ThrowingDiscardingTaskGroup にはこれらのメソッドが無いため、代わりに次のルールに従って 暗黙的 にエラーを扱います。
- 子タスクのいずれかが最初に投げたエラーを記録する。
- その時点で即座にグループ全体をキャンセルし、残りの子タスクにキャンセルを波及させる。
with[Throwing]DiscardingTaskGroupを抜けるときに、その最初のエラーを再送出する。
つまり「one for all, and all for one」の方針で、一つの子タスクの失敗が兄弟タスク全体の打ち切りにつながります。
try await withThrowingDiscardingTaskGroup { group in
group.addTask { try boom(1) }
group.addTask { try boom(2) }
group.addTask { try boom(3) }
// 最初に発生したエラーが記録され、メソッドを抜けるときに再送出される。
// 残りのタスクはキャンセルされる。
}
この自動キャンセルを避けたい場合の方法は次の2つです。
- 子タスクで
throwできないwithDiscardingTaskGroup(非スローイング版)を使う。エラーは子タスク側で処理する必要があります。 - 子タスク内で
do { ... } catch { ... }を使い、グループに伝播させたくないエラーを握り潰すか、必要なものだけ再送出する。
「最初のエラーだけを投げる」という挙動は、情報欠落(2つ目以降のエラーと、キャンセル波及で大量に発生する CancellationError が失われる)と引き換えに、catch MyModuleError のような通常の catch パターンがそのまま使えることを優先した選択です。
Future Directions(speculative)
コミュニティからは「エラーを集約したい」「最初の1件以外も何らかの形で扱いたい」といった要望が挙がっています。将来的には、複数のエラーをフィルタ・集約・破棄する仕組み(エラーフィルタ関数のようなもの)を追加する余地が残されていますが、本提案時点では実利用例を踏まえて後から決める、とされています。