88use React \EventLoop \LoopInterface ;
99use React \Http \Message \Response ;
1010use React \Http \Message \ServerRequest ;
11+ use React \Http \Middleware \InactiveConnectionTimeoutMiddleware ;
1112use React \Promise ;
1213use React \Promise \CancellablePromiseInterface ;
1314use React \Promise \PromiseInterface ;
@@ -85,6 +86,7 @@ final class StreamingServer extends EventEmitter
8586 private $ callback ;
8687 private $ parser ;
8788 private $ loop ;
89+ private $ idleConnectionTimeout ;
8890
8991 /**
9092 * Creates an HTTP server that invokes the given callback for each incoming HTTP request
@@ -96,15 +98,17 @@ final class StreamingServer extends EventEmitter
9698 *
9799 * @param LoopInterface $loop
98100 * @param callable $requestHandler
101+ * @param float $idleConnectTimeout
99102 * @see self::listen()
100103 */
101- public function __construct (LoopInterface $ loop , $ requestHandler )
104+ public function __construct (LoopInterface $ loop , $ requestHandler, $ idleConnectTimeout = InactiveConnectionTimeoutMiddleware:: DEFAULT_TIMEOUT )
102105 {
103106 if (!\is_callable ($ requestHandler )) {
104107 throw new \InvalidArgumentException ('Invalid request handler given ' );
105108 }
106109
107110 $ this ->loop = $ loop ;
111+ $ this ->idleConnectionTimeout = $ idleConnectTimeout ;
108112
109113 $ this ->callback = $ requestHandler ;
110114 $ this ->parser = new RequestHeaderParser ();
@@ -134,7 +138,29 @@ public function __construct(LoopInterface $loop, $requestHandler)
134138 */
135139 public function listen (ServerInterface $ socket )
136140 {
137- $ socket ->on ('connection ' , array ($ this ->parser , 'handle ' ));
141+ $ socket ->on ('connection ' , array ($ this , 'handle ' ));
142+ }
143+
144+ /** @internal */
145+ public function handle (ConnectionInterface $ conn )
146+ {
147+ $ timer = $ this ->loop ->addTimer ($ this ->idleConnectionTimeout , function () use ($ conn ) {
148+ $ conn ->close ();
149+ });
150+ $ loop = $ this ->loop ;
151+ $ conn ->on ('data ' , function ($ data ) use ($ loop , $ timer ) {
152+ if ($ data !== '' ) {
153+ $ loop ->cancelTimer ($ timer );
154+ }
155+ });
156+ $ conn ->on ('end ' , function () use ($ loop , $ timer ) {
157+ $ loop ->cancelTimer ($ timer );
158+ });
159+ $ conn ->on ('close ' , function () use ($ loop , $ timer ) {
160+ $ loop ->cancelTimer ($ timer );
161+ });
162+
163+ $ this ->parser ->handle ($ conn );
138164 }
139165
140166 /** @internal */
@@ -345,7 +371,7 @@ public function handleResponse(ConnectionInterface $connection, ServerRequestInt
345371
346372 // either wait for next request over persistent connection or end connection
347373 if ($ persist ) {
348- $ this ->parser -> handle ($ connection );
374+ $ this ->handle ($ connection );
349375 } else {
350376 $ connection ->end ();
351377 }
@@ -366,10 +392,10 @@ public function handleResponse(ConnectionInterface $connection, ServerRequestInt
366392 // write streaming body and then wait for next request over persistent connection
367393 if ($ persist ) {
368394 $ body ->pipe ($ connection , array ('end ' => false ));
369- $ parser = $ this -> parser ;
370- $ body ->on ('end ' , function () use ($ connection , $ parser , $ body ) {
395+ $ that = $ this ;
396+ $ body ->on ('end ' , function () use ($ connection , $ that , $ body ) {
371397 $ connection ->removeListener ('close ' , array ($ body , 'close ' ));
372- $ parser ->handle ($ connection );
398+ $ that ->handle ($ connection );
373399 });
374400 } else {
375401 $ body ->pipe ($ connection );
0 commit comments