Skip to content

Commit d979278

Browse files
committed
Close connection on flush failure
1 parent f0f233b commit d979278

File tree

2 files changed

+28
-7
lines changed

2 files changed

+28
-7
lines changed

lib/PgSqlHandle.php

+11-2
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ public function __construct($handle, $socket) {
115115
}
116116
});
117117

118-
$this->await = Loop::onWritable($socket, static function ($watcher) use (&$deferred, $handle) {
118+
$this->await = Loop::onWritable($socket, static function ($watcher) use (&$deferred, &$listeners, &$handle) {
119119
$flush = \pg_flush($handle);
120120
if ($flush === 0) {
121121
return; // Not finished sending data, listen again.
@@ -124,7 +124,16 @@ public function __construct($handle, $socket) {
124124
Loop::disable($watcher);
125125

126126
if ($flush === false) {
127-
$deferred->fail(new FailureException(\pg_last_error($handle)));
127+
$exception = new ConnectionException(\pg_last_error($handle));
128+
$handle = null; // Marks connection as dead.
129+
130+
foreach ($listeners as $listener) {
131+
$listener->fail($exception);
132+
}
133+
134+
if ($deferred !== null) {
135+
$deferred->fail($exception);
136+
}
128137
}
129138
});
130139

lib/PqHandle.php

+17-5
Original file line numberDiff line numberDiff line change
@@ -66,11 +66,10 @@ public function __construct(pq\Connection $handle) {
6666

6767
$this->poll = Loop::onReadable($this->handle->socket, static function ($watcher) use (&$deferred, &$listeners, &$handle) {
6868
if ($handle->poll() === pq\Connection::POLLING_FAILED) {
69+
$exception = new ConnectionException($handle->errorMessage);
6970
$handle = null; // Marks connection as dead.
7071
Loop::disable($watcher);
7172

72-
$exception = new ConnectionException($handle->errorMessage);
73-
7473
foreach ($listeners as $listener) {
7574
$listener->fail($exception);
7675
}
@@ -97,9 +96,22 @@ public function __construct(pq\Connection $handle) {
9796
}
9897
});
9998

100-
$this->await = Loop::onWritable($this->handle->socket, static function ($watcher) use (&$deferred, $handle) {
101-
if (!$handle->flush()) {
102-
return; // Not finished sending data, continue polling for writability.
99+
$this->await = Loop::onWritable($this->handle->socket, static function ($watcher) use (&$deferred, &$listeners, &$handle) {
100+
try {
101+
if (!$handle->flush()) {
102+
return; // Not finished sending data, continue polling for writability.
103+
}
104+
} catch (pq\Exception $exception) {
105+
$exception = new ConnectionException("Flushing the connection failed", 0, $exception);
106+
$handle = null; // Marks connection as dead.
107+
108+
foreach ($listeners as $listener) {
109+
$listener->fail($exception);
110+
}
111+
112+
if ($deferred !== null) {
113+
$deferred->fail($exception);
114+
}
103115
}
104116

105117
Loop::disable($watcher);

0 commit comments

Comments
 (0)