diff --git a/lib/src/pool/pool_impl.dart b/lib/src/pool/pool_impl.dart index 3fbd5b1..7d61d23 100644 --- a/lib/src/pool/pool_impl.dart +++ b/lib/src/pool/pool_impl.dart @@ -21,6 +21,8 @@ class PoolImplementation implements Pool { final ResolvedPoolSettings _settings; final _connections = <_PoolConnection>[]; + final _poolResources = <_PoolConnectionResource>[]; + late final _maxConnectionCount = _settings.maxConnectionCount; late final _semaphore = pool.Pool( _maxConnectionCount, @@ -42,14 +44,20 @@ class PoolImplementation implements Pool { @override Future close({bool force = false}) async { - // TODO: Implement force close. + // If forcing the close, release the pool resources before closing the _semaphore, + // otherwise the close will wait for the queries to finish. + if (force) { + for (final r in [..._poolResources]) { + r.release(); + } + } await _semaphore.close(); // Connections are closed when they are returned to the pool if it's closed. // We still need to close statements that are currently unused. for (final connection in [..._connections]) { - if (!connection._isInUse) { - await connection._dispose(); + if (force || !connection._isInUse) { + await connection._dispose(force: force); } } } @@ -132,8 +140,9 @@ class PoolImplementation implements Pool { ConnectionSettings? settings, L? locality, }) async { - final resource = await _semaphore.request(); - _PoolConnection? connection; + final resource = await _requestResource(); + _poolResources.add(resource); + bool reuse = true; final sw = Stopwatch(); try { @@ -144,10 +153,11 @@ class PoolImplementation implements Pool { // Find an existing connection that is currently unused, or open another // one. - connection = await _selectOrCreate( + final _PoolConnection connection = await _selectOrCreate( selection.endpoint, ResolvedConnectionSettings(settings, this._settings), ); + resource._connection = connection; sw.start(); try { @@ -157,7 +167,10 @@ class PoolImplementation implements Pool { rethrow; } } finally { + final connection = resource._connection; resource.release(); + _poolResources.remove(resource); + sw.stop(); // If the pool has been closed, this connection needs to be closed as @@ -175,6 +188,12 @@ class PoolImplementation implements Pool { } } + Future<_PoolConnectionResource> _requestResource() async { + final resource = await _semaphore.request(); + // Wrap the pool resource so that we can be aware when it's released. + return _PoolConnectionResource(resource); + } + Future<_PoolConnection> _selectOrCreate( Endpoint endpoint, ResolvedConnectionSettings settings) async { final oldc = @@ -255,9 +274,9 @@ class _PoolConnection implements Connection { return false; } - Future _dispose() async { + Future _dispose({bool force = false}) async { _pool._connections.remove(this); - await _connection.close(); + await _connection.close(force: force); } @override @@ -279,10 +298,12 @@ class _PoolConnection implements Connection { @override Future close({bool force = false}) async { - // Don't forward the close call, the underlying connection should be re-used + // Don't forward the close call unless forcing. The underlying connection should be re-used // when another pool connection is requested. - // TODO: Implement force close. + if (force) { + await _connection.close(force: force); + } } @override @@ -350,3 +371,23 @@ class _PoolStatement implements Statement { return _underlying.run(parameters, timeout: timeout); } } + +class _PoolConnectionResource { + final pool.PoolResource _poolResource; + bool _released = false; + _PoolConnection? _connection; + + _PoolConnectionResource(this._poolResource); + + /// Same as [PoolResource.release], but it doesn't throw if the resource has + /// already been released. + /// We might call release outside the [withConnection] block, like when force + /// closing the pool, so [release] would be called multiple times. + void release() { + if (_released) { + return; + } + _poolResource.release(); + _released = true; + } +} diff --git a/test/pool_test.dart b/test/pool_test.dart index c5a7602..5cd6c06 100644 --- a/test/pool_test.dart +++ b/test/pool_test.dart @@ -210,7 +210,7 @@ void main() { }); }); - group(skip: 'not implemented', 'force close', () { + group('force close', () { Future openPool(PostgresServer server) async { final pool = Pool.withEndpoints( [await server.endpoint()],