Swift Digest
SE-0417 | Swift Evolution

Task Executor Preference

Proposal
SE-0417
Authors
Konrad 'ktoso' Malawski, John McCall, Franz Busch
Review Manager
Doug Gregor
Status
Implemented (Swift 6.0)

01 何が問題だったのか

SE-0392 でカスタムのシリアルエグゼキュータが導入され、アクターに isolated なコードを「どのスレッド/キューで走らせるか」を開発者側で制御できるようになりました。特定のスレッドやキューを SerialExecutor として実装し、アクターに結び付けることで、既存のキュー資産と協調したり、スレッドローカル前提のコードとつないだりできます。

一方で、アクターに isolated で ない コードには同等の制御手段がありませんでした。Swift 5.7 の SE-0338 以降、nonisolated な async 関数は常にデフォルトのグローバル並行エグゼキュータへ hop するよう定義されており、これは MainActor を握りっぱなしにしないためには望ましい既定です。

しかし、性能に敏感なコードベース、特にネットワークサーバーのようなイベントループベースのシステムでは、この「毎回グローバル並行エグゼキュータへ hop する」挙動そのものが問題になります。リクエストを捌くタイトなループの内部で nonisolated 関数を呼び出すたびにスレッド切り替えが発生すると、スループットが大きく損なわれてしまいます。

さらに、アクターに isolated なコードであっても、カスタムエグゼキュータを指定していない「デフォルトアクター」の場合、どの物理スレッドで走るかはランタイム任せになります。イベントループ上で動いているタスクから呼び出すなら、同じループ上で走ってほしい、というニーズに応える仕組みもありませんでした。

つまり、「isolation の要件は満たしたうえで、nonisolated な部分やデフォルトアクターの部分が どのスレッドで走るか について、タスク側からヒントを与えたい」という制御軸が欠けていたのです。

02 どのように解決されるのか

タスクに「好ましいエグゼキュータ(task executor preference)」を付与する仕組みを導入します。このpreferenceはタスクに「sticky」に貼り付き、structured concurrencyの子タスクにも伝播します。ただし、アクターの isolation 要件を上書きする力は持たず、あくまで「isolation で縛られていない部分をどのスレッドで動かすか」を指示するヒントです。

コードがどこで動くかの判断は、preference導入後は次のようになります。

  • アクターに isolated な関数/クロージャ
    • そのアクターがカスタムエグゼキュータを持っているなら、そのエグゼキュータで走ります(preferenceは無視されます)。
    • デフォルトアクター(カスタムエグゼキュータを持たないアクター)で、タスクにpreferenceが 設定されていれば、そのpreferredエグゼキュータのスレッドで走ります。preferenceが なければ 従来どおりデフォルトのエグゼキュータで走ります。
  • nonisolated な async 関数/クロージャ
    • preferenceが 設定されていれば そのpreferredエグゼキュータで走ります。
    • preferenceが なければ 従来どおりグローバル並行エグゼキュータで走ります。

SerialExecutor がアクターの相互排他(isolation)を保証するのに対し、新しく導入される TaskExecutor は「スレッドの供給源」として働く、と整理できます。両者はランタイム的には別の枠で追跡されます。

TaskExecutor プロトコル

スレッドの供給源となる新しいエグゼキュータ型です。API形状は Executor / SerialExecutor と似ていますが、isolationを提供するものではありません。

public protocol TaskExecutor: Executor {
  func enqueue(_ job: consuming ExecutorJob)

  func asUnownedTaskExecutor() -> UnownedTaskExecutor
}

シリアルエグゼキュータは既定では task executor には なりません。「相互排他を提供するもの」と「スレッドを提供するもの」を同時に兼ねるとランタイム上の挙動が分かりにくくなるためです。ただし、必要なら一つの型を SerialExecutorTaskExecutor の両方に適合させることはできます(後述)。

preferenceの設定

preferenceの指定方法は大きく2種類あります。

スコープ単位で指定する withTaskExecutorPreference

async 文脈で呼び出すと、クロージャの実行中だけpreferenceを設定します。with... 形式なので、クロージャが走っている間はエグゼキュータがきちんと生存しているという保証も得られます。

await withTaskExecutorPreference(someExecutor) {
  // ここは someExecutor 上で走ることが保証される

  await withDiscardingTaskGroup { group in
    group.addTask {
      // 子タスクも someExecutor 上で走る
      await nonisolatedAsyncFunc() // 同じく someExecutor 上
    }
  }

  async let number = nonisolatedAsyncFunc() // someExecutor 上
  await number
}

タスク生成時に指定する Task(executorPreference:)

unstructured なタスクに対しても、生成時にpreferenceを直接渡せます。このタスクは 即座にそのエグゼキュータへ enqueue されます。

Task(executorPreference: executor) {
  // 最初から 'executor' 上で動き始める
  await nonisolatedAsyncFunc()
}

Task.detached(executorPreference: executor) {
  await nonisolatedAsyncFunc()
}

TaskGroup 系にも addTask(executorPreference:) オーバーロードが追加されます。

await withTaskGroup(of: Int.self) { group in
  group.addTask(executorPreference: executor) {
    await nonisolatedAsyncFunc()
    return 42
  }
}

preferenceを受け取るAPIにはいずれも nil を渡せます。「preferenceなし」を意味し、構造化タスクなら囲むスコープから継承し、unstructured なタスクなら「明示的にpreferenceを選ばない」ことのドキュメントとして機能します。

継承のルール

preferenceは structured concurrency の子に伝播しますが、unstructured な境界は越えません。

  • 継承する
    • TaskGroupaddTask()(明示指定で上書き可)
    • async let
    • デフォルトアクター(カスタムエグゼキュータを持たないアクター)のメソッド
  • 継承しない
    • Task { } / Task.detached { }
    • カスタムエグゼキュータを持つアクターのメソッド(MainActor を含む)

これにより、トップレベルのタスクで一度preferenceを指定すれば、そこから派生する structured なタスクツリー全体の nonisolated 部分を同じエグゼキュータに寄せることができます。

isolation との関係

繰り返しになりますが、preferenceは isolation を上書きしません。カスタムエグゼキュータを持つアクターに対する呼び出しは、必ずそのアクターのエグゼキュータへ hop します。一方、デフォルトアクターは「走る場所に要件を持たない」ため、preferenceで指定したエグゼキュータのスレッド上で、アクターのisolationを保ったまま走ります。

actor Capybara { func eat() {} }
let capy = Capybara() // デフォルトアクター

Task(executorPreference: executor) {
  // 'executor' 上で開始
  await capy.eat()
  // isolation は capy のものに従う(相互排他は保たれる)が、
  // 実際に走るスレッドは 'executor' が供給するもの
}

globalConcurrentExecutor

デフォルトのグローバル並行エグゼキュータを表す参照 globalConcurrentExecutor が公開されます。

nonisolated(unsafe)
public var globalConcurrentExecutor: any TaskExecutor { get }

これは、囲むスコープでpreferenceが設定されていても、特定の子タスクやブロックだけは従来どおりグローバル並行エグゼキュータで走らせたい、という局所的な上書きに使えます。

await withTaskExecutorPreference(specific) {
  async let compute = computation() // 'specific' 上

  await withTaskGroup(of: Int.self) { group in
    group.addTask { computation() } // 'specific' 上

    group.addTask(executorPreference: globalConcurrentExecutor) {
      // ここだけ明示的にグローバル並行エグゼキュータ上へ戻す
      computation()
    }
  }
}

SerialExecutorTaskExecutor の兼任

一つの型を両方のプロトコルに適合させることもできます。こうすると、同じエグゼキュータをアクターの isolation 保証と、タスクのスレッド供給の両方に使えます。兼任する型は SerialExecutor の「一度にひとつのジョブしか走らせない」という契約を必ず守る必要があります。

enqueue(_:) はどちらのプロトコルでも同じシグネチャなので実装は一つで済みます。ただし、ジョブを走らせるときに新しい runSynchronously(isolatedOn:taskExecutor:) オーバーロードを使い、isolationとtask executorの両方の情報をランタイムに伝えることが重要です。

final class NaiveQueueExecutor: TaskExecutor, SerialExecutor {
  let queue: DispatchQueue
  init(_ queue: DispatchQueue) { self.queue = queue }

  public func enqueue(_ _job: consuming ExecutorJob) {
    let job = UnownedJob(_job)
    queue.async {
      job.runSynchronously(
        isolatedOn: self.asUnownedSerialExecutor(),
        taskExecutor: self.asUnownedTaskExecutor())
    }
  }

  public func asUnownedSerialExecutor() -> UnownedSerialExecutor {
    UnownedSerialExecutor(ordinary: self)
  }

  public func asUnownedTaskExecutor() -> UnownedTaskExecutor {
    UnownedTaskExecutor(ordinary: self)
  }
}

所有権

task executor はタスクが走っている間、そのタスクによって強く保持されます。withTaskExecutorPreference はクロージャ本体が走っている間エグゼキュータを生かし続け、その間に派生する async letTaskGroup の子タスクもその恩恵を受けます。unstructured な Task(executorPreference:) は、タスクが走り終わるまでエグゼキュータを保持します。したがって、ローカル変数として作ったエグゼキュータをそのままpreferenceに渡しても、走っている最中に解放されてしまう心配はありません。

func computeThings() async {
  let eventLoop: any TaskExecutor = MyCoolEventLoop()
  defer { eventLoop.shutdown() }

  let computed = await withTaskExecutorPreference(eventLoop) {
    async let first = computation(1)
    async let second = computation(2)
    return await first + second
  }
  _ = computed
}

一方、UnownedTaskExecutorSerialExecutorUnownedSerialExecutor と同様に 所有権を持たない ハンドルです。キャッシュから引いてきたunownedな参照から元の any TaskExecutor を復元するようなコードを書く場合は、元のエグゼキュータをどこかで強参照して生かしておく責任は呼び出し側にあります。

現在のpreferenceの確認

UnsafeCurrentTask.unownedTaskExecutor で、現在のタスクに設定されているpreferredエグゼキュータを UnownedTaskExecutor? として取り出せます。比較はポインタ等価で行われ、Equatable に適合します。同期関数から「期待したエグゼキュータ上で動いているか」を確認する用途に使えますが、unownedな性質上あくまで高度な用途向けであり、通常の非同期コードでは withTaskExecutorPreference ブロックや assumeIsolated などの静的な手段を優先すべきです。

使いどころに関する注意

preferenceは万能ではありません。nonisolated な async 関数が毎回グローバル並行エグゼキュータへ hop するSE-0338の既定は、MainActor のような重要なエグゼキュータを握り続けないためのものであり、多くの場面で望ましい振る舞いです。preferenceの導入は、その既定を 特定の箇所だけ意図的に外す ためのツールと捉えるべきです。

よい使いどころの例として、イベントループベースのネットワークサーバーのように特定スレッドで回したい系や、ブロッキングIOを行う処理を専用のエグゼキュータに逃がしてグローバルプールを枯渇させないようにしたい場合が挙げられます。

public func callRead() async -> Bytes {
  await withTaskExecutorPreference(DedicatedIOExecutor.shared) {
    blockingRead() // 専用スレッド上で実行
  }
}

逆に、あるライブラリの関数が内部でブロッキングをしてしまうことに気付いた利用側でも、外側から withTaskExecutorPreference でラップして、その関数を専用エグゼキュータに逃がす、という使い方ができます。

Future Directions

今回のスコープ外として、次のような拡張が議論されています(speculativeであり、実現を約束するものではありません)。

  • グローバルアクターやアクターインスタンスを直接 executorPreference: に渡し、Task { @MainActor in ... } のような余計な hop を挟まずに最初からそのアクター上で開始できるようにする。たとえば Task(executorPreference: MainActor.shared) { ... } のような形。
  • タスクのクロージャを、渡したアクターに対して静的に isolated として推論できるようにする(@isolated(target) のような仕組みと組み合わせる方向)。