1
+ <?php
2
+
3
+ namespace Voryx \React \EventLoop ;
4
+
5
+ use Interop \Async \Loop ;
6
+ use React \EventLoop \LoopInterface ;
7
+ use React \EventLoop \Timer \TimerInterface ;
8
+
9
+ class ReactAsyncInteropLoop implements LoopInterface
10
+ {
11
+ private $ readStreams = [];
12
+ private $ writeStreams = [];
13
+
14
+ public function addReadStream ($ stream , callable $ listener )
15
+ {
16
+ $ key = (int )$ stream ;
17
+ if (isset ($ this ->readStreams [$ key ])) {
18
+ throw new \Exception ('key set twice ' );
19
+ }
20
+ $ this ->readStreams [$ key ] = Loop::get ()->onReadable ($ stream , function () use ($ listener , $ stream ) {
21
+ $ listener ($ stream );
22
+ });
23
+ }
24
+
25
+ public function addWriteStream ($ stream , callable $ listener )
26
+ {
27
+ $ key = (int )$ stream ;
28
+
29
+ if (isset ($ this ->writeStreams [$ key ])) {
30
+ throw new \Exception ('key set twice ' );
31
+ }
32
+
33
+ $ this ->writeStreams [$ key ] = Loop::get ()->onWritable ($ stream , function () use ($ listener , $ stream ) {
34
+ $ listener ($ stream );
35
+ });
36
+ }
37
+
38
+ public function removeReadStream ($ stream )
39
+ {
40
+ $ key = (int )$ stream ;
41
+ if (isset ($ this ->readStreams [$ key ])) {
42
+ Loop::get ()->cancel ($ this ->readStreams [$ key ]);
43
+ unset($ this ->readStreams [$ key ]);
44
+ }
45
+ }
46
+
47
+ public function removeWriteStream ($ stream )
48
+ {
49
+ $ key = (int )$ stream ;
50
+ if (isset ($ this ->writeStreams [$ key ])) {
51
+ Loop::get ()->cancel ($ this ->writeStreams [$ key ]);
52
+ unset($ this ->writeStreams [$ key ]);
53
+ }
54
+ }
55
+
56
+ public function removeStream ($ stream )
57
+ {
58
+ $ this ->removeReadStream ($ stream );
59
+ $ this ->removeWriteStream ($ stream );
60
+ }
61
+
62
+ private function addWrappedTimer ($ interval , callable $ callback , $ isPeriodic = false )
63
+ {
64
+ $ wrappedCallback = function () use (&$ timer , $ callback ) {
65
+ $ callback ($ timer );
66
+ };
67
+ $ millis = $ interval * 1000 ;
68
+ if ($ isPeriodic ) {
69
+ $ timerKey = Loop::get ()->repeat ($ millis , $ wrappedCallback );
70
+ } else {
71
+ $ timerKey = Loop::get ()->delay ($ millis , $ wrappedCallback );
72
+ }
73
+ $ timer = new ReactAsyncInteropTimer (
74
+ $ timerKey ,
75
+ $ interval ,
76
+ $ callback ,
77
+ $ this ,
78
+ false
79
+ );
80
+ return $ timer ;
81
+ }
82
+
83
+ public function addTimer ($ interval , callable $ callback )
84
+ {
85
+ return $ this ->addWrappedTimer ($ interval , $ callback );
86
+ }
87
+
88
+ public function addPeriodicTimer ($ interval , callable $ callback )
89
+ {
90
+ return $ this ->addWrappedTimer ($ interval , $ callback , true );
91
+ }
92
+
93
+ public function cancelTimer (TimerInterface $ timer )
94
+ {
95
+ $ timer ->cancel ();
96
+ }
97
+
98
+ public function isTimerActive (TimerInterface $ timer )
99
+ {
100
+ return $ timer ->isActive ();
101
+ }
102
+
103
+ public function nextTick (callable $ listener )
104
+ {
105
+ Loop::get ()->defer (function () use ($ listener ) {
106
+ $ listener ($ this );
107
+ });
108
+ }
109
+
110
+ public function futureTick (callable $ listener )
111
+ {
112
+ $ this ->nextTick ($ listener );
113
+ }
114
+
115
+ public function tick ()
116
+ {
117
+ $ loop = Loop::get ();
118
+
119
+ $ loop ->defer (function () use ($ loop ) {
120
+ $ loop ->stop ();
121
+ });
122
+
123
+ $ loop ->run ();
124
+ }
125
+
126
+ public function run ()
127
+ {
128
+ Loop::get ()->run ();
129
+ }
130
+
131
+ public function stop ()
132
+ {
133
+ Loop::get ()->stop ();
134
+ }
135
+ }
0 commit comments