Swift Digest
SE-0462 | Swift Evolution

Task Priority Escalation APIs

Proposal
SE-0462
Authors
Konrad 'ktoso' Malawski
Review Manager
Freddy Kellison-Linn
Status
Implemented (Swift 6.2)

01 何が問題だったのか

Swift Concurrencyには、タスクの優先度が自動的にエスカレーション(昇格)される仕組みがあります。たとえば medium 優先度のタスクから子タスクを作ると子も medium で始まりますが、より高い優先度のタスクがその親タスクを await で待つと、親とすべての子タスクの優先度が昇格されます。これは優先度逆転を防ぐための仕組みで、Structured Concurrency(async let やタスクグループで作る親子関係)の範囲内では透過的に機能します。

しかし実際のコードでは、どうしても非構造化タスク(Task { ... })を使わざるを得ない場面があります。たとえば swift-async-algorithmsmerge 実装では、上流のシーケンスを消費するために下流の呼び出しよりも長く生きる非構造化タスクが必要になります。このような非構造化タスクは親子関係の外側にいるため、自動の優先度エスカレーションが届きません。

典型的なパターンとして、継続(continuation)と、それを resume するために作られた Task のペアを扱う場合があります。継続の待機の周りに withTaskCancellationHandler を仕掛けてキャンセルに対応するのと同じように、優先度エスカレーションにも反応して、resume 用の Task の優先度を引き上げたいことがあります。これまでは、そのためのフックが存在しませんでした。

func next() async throws {
    self.state.withLock { state in
        if state.task == nil {
            state.task = Task {
                // 上流のイテレータから消費する非構造化タスク
            }
        }
    }

    if let element = self.state.withLock({ $0.buffer.popFirst() }) {
        return element
    } else {
        try await withTaskCancellationHandler {
            // ここで優先度エスカレーションを捕まえて state.task の優先度を上げたい
            try await withCheckedContinuation { cont in
                self.state.withLock { $0.consumerContinuation = cont }
            }
        } onCancel: {
            // キャンセル処理
        }
    }
}

もうひとつの動機は、プロセス境界をまたいで通信するライブラリです。ビルトインの優先度エスカレーションは同一プロセス内でしか動かないため、エスカレーションの発生を検知して別プロセス側へ伝搬する、といった使い方もできる必要があります。

02 どのように解決されるのか

優先度エスカレーションに参加するための API を追加します。ブロック内で起きたエスカレーションに反応するためのハンドラ API と、任意のタスクの優先度を手動で引き上げる API の 2 つです。

withTaskPriorityEscalationHandler

withTaskCancellationHandler と似た形の新しいハンドラです。

public func withTaskPriorityEscalationHandler<T, E>(
    operation: () async throws(E) -> T,
    onPriorityEscalated handler: @Sendable (TaskPriority, TaskPriority) -> Void,
    isolation: isolated (any Actor)? = #isolation
) async throws(E) -> T

onPriorityEscalated には「昇格前の優先度」と「昇格後の優先度」が渡されます。キャンセルハンドラと異なり、このハンドラは複数回呼ばれる可能性があります。優先度は下がることがなく単調増加するため、複数のスレッドから同じ優先度への昇格が同時に試みられた場合は 1 回だけ発火します。一方で、high に昇格した後さらに高い優先度へ昇格した場合は 2 回呼ばれます。

このハンドラには本質的な競合があり、ハンドラがインストールされる直前に起きたエスカレーションは取りこぼされます。その場合でも、operation の内部で Task.currentPriority を確認して、すでに上がっている前提で処理を進めるという回避策があります。

構造化・非構造化・detached の区別なく、どのタスク種別でも動作し、ネストした場合は外側から内側へと順に発火します。

let t = Task {
    await withTaskPriorityEscalationHandler {
        await withTaskGroup { group in
            group.addTask {
                await withTaskPriorityEscalationHandler {
                    try? await Task.sleep(for: .seconds(1))
                } onPriorityEscalated: { _, newPriority in
                    print("inner: \(newPriority)")
                }
            }
        }
    } onPriorityEscalated: { _, newPriority in
        print("outer: \(newPriority)")
    }
}

// t を high にエスカレーションすると
// "outer: high"
// "inner: high"

withTaskCancellationHandler との併用や、同じタスクに対して別々の箇所で複数登録することもできます。

Task.escalatePriority(of:to:)

非構造化タスクの優先度を手動で引き上げるための API です。誤用を避けるため、Task のインスタンスメソッドではなく static メソッドとして提供されます。

extension Task {
    public static func escalatePriority(of task: Task, to newPriority: TaskPriority)
}

extension UnsafeCurrentTask {
    public static func escalatePriority(of task: UnsafeCurrentTask, to newPriority: TaskPriority)
}

UnsafeCurrentTask 版は、対象のタスクがすでに破棄されていないことを呼び出し側が保証する必要があります。Task 版は常に安全です。

現時点では、async let やタスクグループが作る子タスクには Task ハンドルが得られないため、これらの子タスクを個別に指定してエスカレーションすることはできません。子タスクへのハンドル公開は将来的な拡張として残されており、その際に同じ形の API で自然に対応できる設計になっています。

組み合わせて使う

典型的な使い方は、継続と resume 用の Task のペアを、優先度エスカレーションハンドラの中で結びつけるパターンです。ハンドラ登録のタイミングと Task 作成のタイミングの間に起きるエスカレーションも取りこぼさないように、状態を mutex で保持しておきます。

enum State {
    case initialized
    case task(Task<Void, Never>)
    case priority(TaskPriority)
}
let state: Mutex<State> = .init(.initialized)

await withTaskPriorityEscalationHandler {
    await withCheckedContinuation { cc in
        let task = Task { cc.resume() }

        let newPriority: TaskPriority? = state.withLock { state -> TaskPriority? in
            defer { state = .task(task) }
            switch state {
            case .initialized:
                return nil
            case .task:
                preconditionFailure("unreachable")
            case .priority(let priority):
                return priority
            }
        }
        // task を mutex に入れる直前に昇格が起きていた場合、ここで追いつかせる
        if let newPriority {
            Task.escalatePriority(of: task, to: newPriority)
        }
    } onPriorityEscalated: { _, newPriority in
        state.withLock { state in
            switch state {
            case .initialized, .priority:
                // ハンドラが先に発火して task がまだ格納されていない場合
                state = .priority(newPriority)
            case .task(let task):
                Task.escalatePriority(of: task, to: newPriority)
            }
        }
    }
}

優先度エスカレーションは本質的に競合的な事象であり、「間に合わない」ケースを完全になくすことはできません。ただし、これらの API と上記のようなパターンを組み合わせれば、実用上問題になる大半の状況に対応できます。通常のコードでは自動エスカレーションに任せ、非構造化タスクを含むライブラリ実装や、プロセス境界をまたぐライブラリのようにどうしても必要な場面で使うことが想定されています。