@@ -87,6 +87,12 @@ final class StreamingServer extends EventEmitter
8787    /** @var Clock */ 
8888    private  $ clock ;
8989
90+     /** @var LoopInterface */ 
91+     private  $ loop ;
92+ 
93+     /** @var int */ 
94+     private  $ idleConnectionTimeout ;
95+ 
9096    /** 
9197     * Creates an HTTP server that invokes the given callback for each incoming HTTP request 
9298     * 
@@ -95,19 +101,21 @@ final class StreamingServer extends EventEmitter
95101     * connections in order to then parse incoming data as HTTP. 
96102     * See also [listen()](#listen) for more details. 
97103     * 
98-      * @param LoopInterface $loop 
99104     * @param callable $requestHandler 
105+      * @param int $idleConnectionTimeout 
100106     * @see self::listen() 
101107     */ 
102-     public  function  __construct (LoopInterface   $ loop , $ requestHandler )
108+     public  function  __construct (LoopInterface   $ loop , $ requestHandler,  $ idleConnectionTimeout  )
103109    {
104110        if  (!\is_callable ($ requestHandler )) {
105111            throw  new  \InvalidArgumentException ('Invalid request handler given ' );
106112        }
107113
114+         $ this  ->loop  = $ loop ;
108115        $ this  ->callback  = $ requestHandler ;
109116        $ this  ->clock  = new  Clock ($ loop );
110117        $ this  ->parser  = new  RequestHeaderParser ($ this  ->clock );
118+         $ this  ->idleConnectionTimeout  = $ idleConnectionTimeout ;
111119
112120        $ that  = $ this  ;
113121        $ this  ->parser ->on ('headers ' , function  (ServerRequestInterface   $ request , ConnectionInterface   $ conn ) use  ($ that ) {
@@ -134,7 +142,35 @@ public function __construct(LoopInterface $loop, $requestHandler)
134142     */ 
135143    public  function  listen (ServerInterface   $ socket )
136144    {
137-         $ socket ->on ('connection ' , array ($ this  ->parser , 'handle ' ));
145+         $ socket ->on ('connection ' , array ($ this  , 'handleConnection ' ));
146+     }
147+ 
148+     /** @internal */ 
149+     public  function  handleConnection (ConnectionInterface   $ connection )
150+     {
151+         $ idleConnectionTimeout  = $ this  ->idleConnectionTimeout ;
152+         $ loop  = $ this  ->loop ;
153+         $ idleConnectionTimeoutHandler  = function  () use  ($ connection , &$ closeEventHandler , &$ dataEventHandler ) {
154+             $ connection ->removeListener ('close ' , $ closeEventHandler );
155+             $ connection ->removeListener ('data ' , $ dataEventHandler );
156+ 
157+             $ connection ->close ();
158+         };
159+         $ timer  = $ this  ->loop ->addTimer ($ idleConnectionTimeout , $ idleConnectionTimeoutHandler );
160+         $ closeEventHandler  = function  () use  ($ connection , &$ closeEventHandler , &$ dataEventHandler , $ loop , &$ timer ) {
161+             $ connection ->removeListener ('close ' , $ closeEventHandler );
162+             $ connection ->removeListener ('data ' , $ dataEventHandler );
163+ 
164+             $ loop ->cancelTimer ($ timer );
165+         };
166+         $ dataEventHandler  = function  () use  ($ loop , $ idleConnectionTimeout , $ idleConnectionTimeoutHandler , &$ timer ) {
167+             $ loop ->cancelTimer ($ timer );
168+             $ timer  = $ loop ->addTimer ($ idleConnectionTimeout , $ idleConnectionTimeoutHandler );
169+         };
170+         $ connection ->on ('close ' , $ closeEventHandler );
171+         $ connection ->on ('data ' , $ dataEventHandler );
172+ 
173+         $ this  ->parseRequest ($ connection );
138174    }
139175
140176    /** @internal */ 
@@ -372,7 +408,7 @@ public function handleResponse(ConnectionInterface $connection, ServerRequestInt
372408
373409            // either wait for next request over persistent connection or end connection 
374410            if  ($ persist ) {
375-                 $ this  ->parser -> handle ($ connection );
411+                 $ this  ->parseRequest ($ connection );
376412            } else  {
377413                $ connection ->end ();
378414            }
@@ -393,13 +429,67 @@ public function handleResponse(ConnectionInterface $connection, ServerRequestInt
393429        // write streaming body and then wait for next request over persistent connection 
394430        if  ($ persist ) {
395431            $ body ->pipe ($ connection , array ('end '  => false ));
396-             $ parser   = $ this -> parser ;
397-             $ body ->on ('end ' , function  () use  ($ connection , $ parser ,  $ body  ) {
432+             $ that   = $ this  ;
433+             $ body ->on ('end ' , function  () use  ($ connection , $ body , & $ that  ) {
398434                $ connection ->removeListener ('close ' , array ($ body , 'close ' ));
399-                 $ parser -> handle ($ connection );
435+                 $ that -> parseRequest ($ connection );
400436            });
401437        } else  {
402438            $ body ->pipe ($ connection );
403439        }
404440    }
441+ 
442+     /** 
443+      * @internal 
444+      */ 
445+     public  function  parseRequest (ConnectionInterface   $ connection )
446+     {
447+         $ idleConnectionTimeout  = $ this  ->idleConnectionTimeout ;
448+         $ loop  = $ this  ->loop ;
449+         $ parser  = $ this  ->parser ;
450+         $ idleConnectionTimeoutHandler  = function  () use  ($ connection , $ parser , &$ removeTimerHandler ) {
451+             $ parser ->removeListener ('headers ' , $ removeTimerHandler );
452+             $ parser ->removeListener ('error ' , $ removeTimerHandler );
453+ 
454+             $ parser ->emit ('error ' , array (
455+                 new  \RuntimeException ('Request timed out ' , Response::STATUS_REQUEST_TIMEOUT ),
456+                 $ connection
457+             ));
458+         };
459+         $ timer  = $ this  ->loop ->addTimer ($ idleConnectionTimeout , $ idleConnectionTimeoutHandler );
460+         $ removeTimerHandler  = function  ($ requestOrError , $ conn ) use  ($ loop , &$ timer , $ parser , $ connection , &$ removeTimerHandler , $ idleConnectionTimeout , $ idleConnectionTimeoutHandler ) {
461+             if  ($ conn  !== $ connection ) {
462+                 return ;
463+             }
464+ 
465+             $ loop ->cancelTimer ($ timer );
466+             $ parser ->removeListener ('headers ' , $ removeTimerHandler );
467+             $ parser ->removeListener ('error ' , $ removeTimerHandler );
468+ 
469+             if  (!($ requestOrError  instanceof  ServerRequestInterface)) {
470+                 return ;
471+             }
472+ 
473+             $ requestBody  = $ requestOrError ->getBody ();
474+             if  (!($ requestBody  instanceof  HttpBodyStream)) {
475+                 return ;
476+             }
477+ 
478+             $ timer  = $ loop ->addTimer ($ idleConnectionTimeout , $ idleConnectionTimeoutHandler );
479+             $ requestBody ->on ('data ' , function  () use  (&$ timer , $ loop , $ idleConnectionTimeout , $ idleConnectionTimeoutHandler ) {
480+                 $ loop ->cancelTimer ($ timer );
481+                 $ timer  = $ loop ->addTimer ($ idleConnectionTimeout , $ idleConnectionTimeoutHandler );
482+             });
483+             $ requestBody ->on ('end ' , function  () use  (&$ timer , $ loop ) {
484+                 $ loop ->cancelTimer ($ timer );
485+             });
486+             $ requestBody ->on ('close ' , function  () use  (&$ timer , $ loop ) {
487+                 $ loop ->cancelTimer ($ timer );
488+             });
489+         };
490+         $ this  ->parser ->on ('headers ' , $ removeTimerHandler );
491+         $ this  ->parser ->on ('error ' , $ removeTimerHandler );
492+ 
493+         $ this  ->parser ->handle ($ connection );
494+     }
405495}
0 commit comments