-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathSubmissionGroup.swift
105 lines (94 loc) · 2.85 KB
/
SubmissionGroup.swift
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
//
// Copyright (c) 2023 PADL Software Pty Ltd
//
// Licensed under the Apache License, Version 2.0 (the License);
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an 'AS IS' BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
import AsyncAlgorithms
import AsyncQueue
import Glibc
extension SingleshotSubmission {
func enqueue() {
guard let group else { return }
Task {
do {
let result = try await submit()
await group.resultChannel.send(result)
} catch {
group.resultChannel.fail(error)
}
}
}
func ready() async {
await group?.readinessChannel.send(())
}
}
actor SubmissionGroup<T: Sendable> {
private let ring: IORing
private let queue = ActorQueue<SubmissionGroup>()
private var submissions = [SingleshotSubmission<T>]()
fileprivate let readinessChannel = AsyncChannel<()>()
fileprivate let resultChannel = AsyncThrowingChannel<T, Error>()
init(ring: IORing) async throws {
self.ring = ring
queue.adoptExecutionContext(of: self)
}
///
/// Asynchronously enqueues an submission. Submission must call `ready()` when
/// its continuation is registered in the SQE `user_data` otherwise the group
/// will never be submitted.
///
func enqueue(submission: SingleshotSubmission<T>) {
submissions.append(submission)
Task(on: queue) { _ in
await submission.enqueue()
}
}
private func allReady() async {
_ = await readinessChannel.collect(max: submissions.count)
}
private func allComplete() async throws -> [T] {
defer { resultChannel.finish() }
return try await resultChannel.collect(max: submissions.count)
}
///
/// Call `finish()` once all submissions have been submitted to the submission group.
///
/// Completing the submission group involves the following:
///
/// - Await all submissions to be scheduled on queue
/// - Wait for all submissions to have continuations registered
/// - Submit SQEs to I/O ring
/// - Collect results from results channel
///
func finish() async throws -> [T] {
await Task(on: queue) { _ in }.value
await allReady()
try await ring.submit()
readinessChannel.finish()
return try await allComplete()
}
}
private extension AsyncSequence {
func collect(max: Int) async rethrows -> [Element] {
var collected = 0
var elements = [Element]()
for try await element in self {
elements.append(element)
collected += 1
if collected == max {
break
}
}
return elements
}
}