Skip to content

Commit 8eb2723

Browse files
committed
Merge pull request #10 from ypopovych/master
RunLoop refactoring
2 parents f928b3d + e525e27 commit 8eb2723

File tree

4 files changed

+114
-62
lines changed

4 files changed

+114
-62
lines changed

ExecutionContext/LoopSemaphore.swift

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@
1515
//===----------------------------------------------------------------------===//
1616

1717
import Foundation
18+
#if os(Linux)
19+
import CoreFoundation
20+
#endif
1821

1922
#if !os(Linux)
2023
import Dispatch
@@ -79,8 +82,13 @@ public class CFRunLoopSemaphore : SemaphoreType {
7982
self.source = RunLoopSource( { [unowned self] in
8083
self.signaled = true
8184
self.value += 1
82-
}, priority: 1)
83-
RunLoop.currentRunLoop().addSource(source!, mode: RunLoop.defaultMode)
85+
// On linux timeout not working in run loop
86+
#if os(Linux)
87+
CFRunLoopStop(CFRunLoopGetCurrent())
88+
#endif
89+
}, priority: 2)
90+
let loop:RunLoop = RunLoop.currentRunLoop()
91+
loop.addSource(source!, mode: RunLoop.defaultMode)
8492
}
8593

8694
/// Creates a new semaphore with initial value 0

ExecutionContext/PThreadExecutionContext.swift

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,11 +66,22 @@
6666
return Unmanaged<AnyObject>.fromOpaque(COpaquePointer(val)).takeUnretainedValue()
6767
}
6868

69-
static func setSpecific(obj: AnyObject?, key: PThreadKey) {
69+
static func setSpecific(obj: AnyObject?, key: PThreadKey, retain: Bool = false) {
70+
if retain {
71+
let old = pthread_getspecific(key.key)
72+
if old != nil {
73+
Unmanaged<AnyObject>.fromOpaque(COpaquePointer(old)).release()
74+
}
75+
}
7076
if obj == nil {
7177
pthread_setspecific(key.key, nil)
7278
} else {
73-
pthread_setspecific(key.key, UnsafePointer<Void>(Unmanaged.passUnretained(obj!).toOpaque()))
79+
if retain {
80+
print("Pass retained \(obj)")
81+
pthread_setspecific(key.key, UnsafePointer<Void>(Unmanaged.passRetained(obj!).toOpaque()))
82+
} else {
83+
pthread_setspecific(key.key, UnsafePointer<Void>(Unmanaged.passUnretained(obj!).toOpaque()))
84+
}
7485
}
7586
}
7687

@@ -109,6 +120,7 @@
109120

110121
PThread(task: { [unowned holder] in
111122
holder.loop = RunLoop.currentRunLoop()
123+
holder.loop!.startTaskQueue()
112124
sema.signal()
113125
RunLoop.run()
114126
}).start()
@@ -120,6 +132,11 @@
120132

121133
init(runLoop:RunLoop) {
122134
rl = runLoop
135+
rl.startTaskQueue()
136+
}
137+
138+
deinit {
139+
rl.stopTaskQueue()
123140
}
124141

125142
func async(task:SafeTask) {

ExecutionContext/RunLoop.swift

Lines changed: 84 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -71,17 +71,13 @@ import CoreFoundation
7171
}
7272
}
7373

74-
internal protocol WakeableRunLoop : AnyObject {
75-
func wakeUp()
76-
}
77-
7874
private class RunLoopCallbackInfo {
79-
private var task: SafeTask
80-
private var runLoops: [WakeableRunLoop] = []
75+
private let task: SafeTask
76+
private var runLoops: [RunLoop] = []
8177

82-
init(_ task: SafeTask) {
78+
init(_ task: SafeTask) {
8379
self.task = task
84-
}
80+
}
8581

8682
func run() {
8783
task()
@@ -133,13 +129,27 @@ import CoreFoundation
133129
}
134130
}
135131

136-
init(_ task: SafeTask, priority: Int = 0) {
137-
self.info = RunLoopCallbackInfo(task)
138-
self.priority = priority
132+
init(_ task: SafeTask, priority: Int = 0, runOnce: Bool = false) {
133+
self.priority = priority
134+
if runOnce {
135+
var stopTask:SafeTask?
136+
self.info = RunLoopCallbackInfo({
137+
task()
138+
stopTask?()
139+
})
140+
stopTask = { self.stop() }
141+
} else {
142+
self.info = RunLoopCallbackInfo(task)
143+
}
144+
139145
}
140146

141147
deinit {
142-
if _source != nil && CFRunLoopSourceIsValid(_source) {
148+
stop()
149+
}
150+
151+
func stop() {
152+
if _source != nil && CFRunLoopSourceIsValid(_source) {
143153
CFRunLoopSourceInvalidate(_source)
144154
_source = nil
145155
}
@@ -155,6 +165,24 @@ import CoreFoundation
155165
}
156166
}
157167

168+
class RunLoopTaskQueueSource : RunLoopSource {
169+
private let queue = TaskQueue()
170+
171+
init(priority: Int = 1) {
172+
super.init({ [unowned queue] in
173+
if let element = queue.dequeue() {
174+
element.run()
175+
element.source.signal()
176+
}
177+
}, priority: priority)
178+
}
179+
180+
func addTask(task: SafeTask) {
181+
queue.enqueue(TaskQueueElement(task, runLoopSource: self))
182+
self.signal()
183+
}
184+
}
185+
158186
private func timerRunCallback(timer: CFRunLoopTimer!, i: UnsafeMutablePointer<Void>) {
159187
runLoopCallbackInfoRun(i)
160188
}
@@ -186,66 +214,40 @@ import CoreFoundation
186214
}
187215
}
188216

189-
private class CFRunLoopWakeupHolder: WakeableRunLoop {
190-
private let loop:CFRunLoop!
191-
192-
init(loop: CFRunLoop!) {
193-
self.loop = loop
194-
}
195-
196-
func wakeUp() {
197-
CFRunLoopWakeUp(loop)
198-
}
199-
}
200-
201-
class RunLoop : WakeableRunLoop {
217+
class RunLoop {
202218
private let cfRunLoop: CFRunLoop!
203219

204-
private var taskQueueSource: RunLoopSource
205-
private var taskQueue: TaskQueue
220+
private var taskQueueSource: RunLoopTaskQueueSource? = nil
221+
private let taskQueueLock = NSLock()
206222

207223
#if !os(Linux)
208224
static let defaultMode:NSString = "kCFRunLoopDefaultMode" as NSString
209225
#else
210226
static let defaultMode:NSString = "kCFRunLoopDefaultMode".bridge()
211227
#endif
212228

213-
private static let threadKey = PThreadKey()
229+
private static let threadKey = PThreadKey(destructionCallback: { loop in
230+
Unmanaged<RunLoop>.fromOpaque(COpaquePointer(loop)).release()
231+
})
214232

215233
private static let MainRunLoop = RunLoop.createMainRunLoop()
216234

217235
init(_ cfRunLoop: CFRunLoop) {
218236
self.cfRunLoop = cfRunLoop
219-
220-
let queue = TaskQueue()
221-
222-
taskQueueSource = RunLoopSource({
223-
if let element = queue.dequeue() {
224-
element.run()
225-
element.source.signal()
226-
}
227-
})
228-
taskQueue = queue
229-
addSource(taskQueueSource, mode: RunLoop.defaultMode, retainLoop: false)
230-
}
231-
232-
deinit {
233-
addTask {
234-
PThread.setSpecific(nil, key: RunLoop.threadKey)
235-
}
236237
}
238+
237239
convenience init(_ runLoop: AnyObject) {
238240
self.init(unsafeBitCast(runLoop, CFRunLoop.self))
239241
}
240242

241243
private static func createMainRunLoop() -> RunLoop {
242244
let runLoop = RunLoop(CFRunLoopGetMain())
243245
if runLoop.isCurrent() {
244-
PThread.setSpecific(runLoop, key: RunLoop.threadKey)
246+
PThread.setSpecific(runLoop, key: RunLoop.threadKey, retain: true)
245247
} else {
246-
let sema = Semaphore()
248+
let sema = LoopSemaphore()
247249
runLoop.addTask({
248-
PThread.setSpecific(runLoop, key: RunLoop.threadKey)
250+
PThread.setSpecific(runLoop, key: RunLoop.threadKey, retain: true)
249251
sema.signal()
250252
})
251253
sema.wait()
@@ -256,7 +258,7 @@ import CoreFoundation
256258
static func currentRunLoop() -> RunLoop {
257259
guard let loop = PThread.getSpecific(RunLoop.threadKey) else {
258260
let loop = RunLoop(CFRunLoopGetCurrent())
259-
PThread.setSpecific(loop, key: RunLoop.threadKey)
261+
PThread.setSpecific(loop, key: RunLoop.threadKey, retain: true)
260262
return loop
261263
}
262264
return unsafeBitCast(loop, RunLoop.self)
@@ -266,6 +268,26 @@ import CoreFoundation
266268
return MainRunLoop
267269
}
268270

271+
func startTaskQueue(priority: Int = 1) {
272+
print("Start queue called")
273+
defer {
274+
taskQueueLock.unlock()
275+
}
276+
taskQueueLock.lock()
277+
self.taskQueueSource = RunLoopTaskQueueSource()
278+
279+
addSource(taskQueueSource!, mode: RunLoop.defaultMode)
280+
}
281+
282+
func stopTaskQueue() {
283+
defer {
284+
taskQueueLock.unlock()
285+
}
286+
taskQueueLock.lock()
287+
self.taskQueueSource = nil
288+
}
289+
290+
269291
func isCurrent() -> Bool {
270292
return cfRunLoop === CFRunLoopGetCurrent()
271293
}
@@ -307,15 +329,11 @@ import CoreFoundation
307329
while true { run() }
308330
}
309331

310-
func addSource(rls: RunLoopSource, mode: NSString, retainLoop: Bool = true) {
332+
func addSource(rls: RunLoopSource, mode: NSString) {
311333
let crls = unsafeBitCast(rls.cfObject, CFRunLoopSource.self)
312334
if CFRunLoopSourceIsValid(crls) {
313335
CFRunLoopAddSource(cfRunLoop, crls, mode.cfString)
314-
if retainLoop {
315-
rls.info.runLoops.append(self)
316-
} else {
317-
rls.info.runLoops.append(CFRunLoopWakeupHolder(loop: cfRunLoop))
318-
}
336+
rls.info.runLoops.append(self)
319337
wakeUp()
320338
}
321339
}
@@ -330,8 +348,17 @@ import CoreFoundation
330348
}
331349

332350
func addTask(task: SafeTask) {
333-
taskQueue.enqueue(TaskQueueElement(task, runLoopSource: taskQueueSource))
334-
taskQueueSource.signal()
351+
defer {
352+
taskQueueLock.unlock()
353+
}
354+
taskQueueLock.lock()
355+
if let queue = taskQueueSource {
356+
queue.addTask(task)
357+
} else {
358+
let source = RunLoopSource(task, priority: 0, runOnce: true)
359+
addSource(source, mode: RunLoop.defaultMode)
360+
source.signal()
361+
}
335362
}
336363

337364
func wakeUp() {

Tests/ExecutionContext/ExecutionContextTests.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import XCTest
2323

2424
class ExecutionContextTests: XCTestCase {
2525
//Tests does not create static variables. We need initialized main thread
26-
let mainContext = DefaultExecutionContext.main
26+
//let mainContext = DefaultExecutionContext.main
2727

2828
func syncTest(context:ExecutionContextType) {
2929
let expectation = self.expectationWithDescription("OK SYNC")

0 commit comments

Comments
 (0)