@@ -87,6 +87,12 @@ final class StreamingServer extends EventEmitter
8787 /** @var Clock */
8888 private $ clock ;
8989
90+ /** @var LoopInterface */
91+ private $ loop ;
92+
93+ /** @var int */
94+ private $ idleConnectionTimeout ;
95+
9096 /**
9197 * Creates an HTTP server that invokes the given callback for each incoming HTTP request
9298 *
@@ -95,19 +101,21 @@ final class StreamingServer extends EventEmitter
95101 * connections in order to then parse incoming data as HTTP.
96102 * See also [listen()](#listen) for more details.
97103 *
98- * @param LoopInterface $loop
99104 * @param callable $requestHandler
105+ * @param int $idleConnectionTimeout
100106 * @see self::listen()
101107 */
102- public function __construct (LoopInterface $ loop , $ requestHandler )
108+ public function __construct (LoopInterface $ loop , $ requestHandler, $ idleConnectionTimeout )
103109 {
104110 if (!\is_callable ($ requestHandler )) {
105111 throw new \InvalidArgumentException ('Invalid request handler given ' );
106112 }
107113
114+ $ this ->loop = $ loop ;
108115 $ this ->callback = $ requestHandler ;
109116 $ this ->clock = new Clock ($ loop );
110117 $ this ->parser = new RequestHeaderParser ($ this ->clock );
118+ $ this ->idleConnectionTimeout = $ idleConnectionTimeout ;
111119
112120 $ that = $ this ;
113121 $ this ->parser ->on ('headers ' , function (ServerRequestInterface $ request , ConnectionInterface $ conn ) use ($ that ) {
@@ -134,7 +142,35 @@ public function __construct(LoopInterface $loop, $requestHandler)
134142 */
135143 public function listen (ServerInterface $ socket )
136144 {
137- $ socket ->on ('connection ' , array ($ this ->parser , 'handle ' ));
145+ $ socket ->on ('connection ' , array ($ this , 'handleConnection ' ));
146+ }
147+
148+ /** @internal */
149+ public function handleConnection (ConnectionInterface $ connection )
150+ {
151+ $ idleConnectionTimeout = $ this ->idleConnectionTimeout ;
152+ $ loop = $ this ->loop ;
153+ $ idleConnectionTimeoutHandler = function () use ($ connection , &$ closeEventHandler , &$ dataEventHandler ) {
154+ $ connection ->removeListener ('close ' , $ closeEventHandler );
155+ $ connection ->removeListener ('data ' , $ dataEventHandler );
156+
157+ $ connection ->close ();
158+ };
159+ $ timer = $ this ->loop ->addTimer ($ idleConnectionTimeout , $ idleConnectionTimeoutHandler );
160+ $ closeEventHandler = function () use ($ connection , &$ closeEventHandler , &$ dataEventHandler , $ loop , &$ timer ) {
161+ $ connection ->removeListener ('close ' , $ closeEventHandler );
162+ $ connection ->removeListener ('data ' , $ dataEventHandler );
163+
164+ $ loop ->cancelTimer ($ timer );
165+ };
166+ $ dataEventHandler = function () use ($ loop , $ idleConnectionTimeout , $ idleConnectionTimeoutHandler , &$ timer ) {
167+ $ loop ->cancelTimer ($ timer );
168+ $ timer = $ loop ->addTimer ($ idleConnectionTimeout , $ idleConnectionTimeoutHandler );
169+ };
170+ $ connection ->on ('close ' , $ closeEventHandler );
171+ $ connection ->on ('data ' , $ dataEventHandler );
172+
173+ $ this ->parseRequest ($ connection );
138174 }
139175
140176 /** @internal */
@@ -372,7 +408,7 @@ public function handleResponse(ConnectionInterface $connection, ServerRequestInt
372408
373409 // either wait for next request over persistent connection or end connection
374410 if ($ persist ) {
375- $ this ->parser -> handle ($ connection );
411+ $ this ->parseRequest ($ connection );
376412 } else {
377413 $ connection ->end ();
378414 }
@@ -393,13 +429,67 @@ public function handleResponse(ConnectionInterface $connection, ServerRequestInt
393429 // write streaming body and then wait for next request over persistent connection
394430 if ($ persist ) {
395431 $ body ->pipe ($ connection , array ('end ' => false ));
396- $ parser = $ this -> parser ;
397- $ body ->on ('end ' , function () use ($ connection , $ parser , $ body ) {
432+ $ that = $ this ;
433+ $ body ->on ('end ' , function () use ($ connection , $ body , & $ that ) {
398434 $ connection ->removeListener ('close ' , array ($ body , 'close ' ));
399- $ parser -> handle ($ connection );
435+ $ that -> parseRequest ($ connection );
400436 });
401437 } else {
402438 $ body ->pipe ($ connection );
403439 }
404440 }
441+
442+ /**
443+ * @internal
444+ */
445+ public function parseRequest (ConnectionInterface $ connection )
446+ {
447+ $ idleConnectionTimeout = $ this ->idleConnectionTimeout ;
448+ $ loop = $ this ->loop ;
449+ $ parser = $ this ->parser ;
450+ $ idleConnectionTimeoutHandler = function () use ($ connection , $ parser , &$ removeTimerHandler ) {
451+ $ parser ->removeListener ('headers ' , $ removeTimerHandler );
452+ $ parser ->removeListener ('error ' , $ removeTimerHandler );
453+
454+ $ parser ->emit ('error ' , array (
455+ new \RuntimeException ('Request timed out ' , Response::STATUS_REQUEST_TIMEOUT ),
456+ $ connection
457+ ));
458+ };
459+ $ timer = $ this ->loop ->addTimer ($ idleConnectionTimeout , $ idleConnectionTimeoutHandler );
460+ $ removeTimerHandler = function ($ requestOrError , $ conn ) use ($ loop , &$ timer , $ parser , $ connection , &$ removeTimerHandler , $ idleConnectionTimeout , $ idleConnectionTimeoutHandler ) {
461+ if ($ conn !== $ connection ) {
462+ return ;
463+ }
464+
465+ $ loop ->cancelTimer ($ timer );
466+ $ parser ->removeListener ('headers ' , $ removeTimerHandler );
467+ $ parser ->removeListener ('error ' , $ removeTimerHandler );
468+
469+ if (!($ requestOrError instanceof ServerRequestInterface)) {
470+ return ;
471+ }
472+
473+ $ requestBody = $ requestOrError ->getBody ();
474+ if (!($ requestBody instanceof HttpBodyStream)) {
475+ return ;
476+ }
477+
478+ $ timer = $ loop ->addTimer ($ idleConnectionTimeout , $ idleConnectionTimeoutHandler );
479+ $ requestBody ->on ('data ' , function () use (&$ timer , $ loop , $ idleConnectionTimeout , $ idleConnectionTimeoutHandler ) {
480+ $ loop ->cancelTimer ($ timer );
481+ $ timer = $ loop ->addTimer ($ idleConnectionTimeout , $ idleConnectionTimeoutHandler );
482+ });
483+ $ requestBody ->on ('end ' , function () use (&$ timer , $ loop ) {
484+ $ loop ->cancelTimer ($ timer );
485+ });
486+ $ requestBody ->on ('close ' , function () use (&$ timer , $ loop ) {
487+ $ loop ->cancelTimer ($ timer );
488+ });
489+ };
490+ $ this ->parser ->on ('headers ' , $ removeTimerHandler );
491+ $ this ->parser ->on ('error ' , $ removeTimerHandler );
492+
493+ $ this ->parser ->handle ($ connection );
494+ }
405495}
0 commit comments