Skip to content

Commit

Permalink
Improve async() by making its promises cancelable
Browse files Browse the repository at this point in the history
Since `async()` returns a promise and those are normally cancelable, implementing this puts them in line with the rest of our ecosystem. As such the following example will throw a timeout exception from the canceled `sleep()` call.

```php
$promise = async(static function (): int {
    echo 'a';
    await(sleep(2));
    echo 'b';

    return time();
})();

$promise->cancel();
await($promise);
````

This builds on top of reactphp#15, reactphp#18, reactphp#19, reactphp#26, reactphp#28, reactphp#30, and reactphp#32.
  • Loading branch information
WyriHaximus committed Feb 20, 2022
1 parent 4cadacc commit 1dd9138
Show file tree
Hide file tree
Showing 5 changed files with 319 additions and 9 deletions.
102 changes: 102 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,108 @@ $promise->then(function (int $bytes) {
});
```

Promises returned by the `async` function can be cancelled and when done they will cancel any recursive `async` call
and any currently awaited promise using the `await` function. In the following example `echo 'b';` will never
be reached, and the `await` function at the bottom will also throw an exception with the following message
`Timer cancelled`.

```php
$promise = async(static function (): int {
echo 'a';
await(sleep(2));
echo 'b';

return time();
})();

$promise->cancel();
await($promise);
````

If you however decide to try and catch that `await` you will reach `echo 'b';`. The exception you caught however
isn't thrown by the bottom await function. Just as with synchronous code catching it lets you ignore the exception or
error that is thrown.

```php
$promise = async(static function (): int {
echo 'a';
try {
await(sleep(2));
} catch (\Throwable) {
// No-Op
}
echo 'b';

return time();
})();

$promise->cancel();
await($promise);
```

When a fiber is cancelled, all currently pending and future awaited promises will be cancelled. As such the following
example will never output `c` and a timeout exception will be thrown.

```php
$promise = async(static function (): int {
echo 'a';
try {
await(sleep(2));
} catch (\Throwable) {
// No-Op
}
echo 'b';
await(sleep(0.1));
echo 'c';

return time();
})();

$promise->cancel();
await($promise);
```

Any nested `async` and `await` calls are also canceled. You can nest this as deep as you want. As long as you await
every promise yielding function you call. The following example will output `abc`.

```php
$promise = async(static function (): int {
echo 'a';
await(async(static function(): void {
echo 'b';
await(async(static function(): void {
echo 'c';
await(sleep(2));
echo 'd';
})());
echo 'e';
})());
echo 'f';

return time();
})();

$promise->cancel();
await($promise);
```

Be very much aware that if you call a promise yielding function and not await it, it will not be cancelled. The
following example will output `acb`.

```php
$promise = async(static function (): int {
echo 'a';
sleep(0.001)->then(static function (): void {
echo 'b';
});
echo 'c';

return time();
})();

$promise->cancel();
```

### await()

The `await(PromiseInterface $promise): mixed` function can be used to
Expand Down
3 changes: 2 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@
"react/promise": "^2.8 || ^1.2.1"
},
"require-dev": {
"phpunit/phpunit": "^9.3"
"phpunit/phpunit": "^9.3",
"react/promise-timer": "^1.8"
},
"autoload": {
"psr-4": {
Expand Down
51 changes: 51 additions & 0 deletions src/FiberMap.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
<?php

namespace React\Async;

use Fiber;
use React\Promise\PromiseInterface;

/**
* @internal
*/
final class FiberMap
{
private array $status = [];
private array $map = [];

public function register(Fiber $fiber): void
{
$this->status[spl_object_hash($fiber)] = false;
$this->map[spl_object_hash($fiber)] = [];
}

public function cancel(Fiber $fiber): void
{
$this->status[spl_object_hash($fiber)] = true;
}

public function isCanceled(Fiber $fiber): bool
{
return $this->status[spl_object_hash($fiber)];
}

public function attachPromise(Fiber $fiber, PromiseInterface $promise): void
{
$this->map[spl_object_hash($fiber)][spl_object_hash($promise)] = $promise;
}

public function has(Fiber $fiber): bool
{
return array_key_exists(spl_object_hash($fiber), $this->map);
}

public function getPromises(Fiber $fiber): array
{
return $this->map[spl_object_hash($fiber)];
}

public function unregister(Fiber $fiber): void
{
unset($this->status[spl_object_hash($fiber)], $this->map[spl_object_hash($fiber)]);
}
}
65 changes: 57 additions & 8 deletions src/functions.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

namespace React\Async;

use Fiber;
use React\EventLoop\Loop;
use React\Promise\CancellablePromiseInterface;
use React\Promise\Deferred;
Expand Down Expand Up @@ -155,17 +156,40 @@
*/
function async(callable $function): callable
{
return static fn (mixed ...$args): PromiseInterface => new Promise(function (callable $resolve, callable $reject) use ($function, $args): void {
$fiber = new \Fiber(function () use ($resolve, $reject, $function, $args): void {
try {
$resolve($function(...$args));
} catch (\Throwable $exception) {
$reject($exception);
return static function (mixed ...$args) use ($function): PromiseInterface {
$fiber = null;
$promise = new Promise(function (callable $resolve, callable $reject) use ($function, $args, &$fiber): void {
$fiber = new \Fiber(function () use ($resolve, $reject, $function, $args, &$fiber): void {
try {
$resolve($function(...$args));
} catch (\Throwable $exception) {
$reject($exception);
} finally {
fiberMap()->unregister($fiber);
}
});

fiberMap()->register($fiber);

$fiber->start();
}, function () use (&$fiber): void {
if ($fiber instanceof Fiber) {
fiberMap()->cancel($fiber);
foreach (fiberMap()->getPromises($fiber) as $promise) {
if (method_exists($promise, 'cancel')) {
$promise->cancel();
}
}
}
});

$fiber->start();
});
$lowLevelFiber = \Fiber::getCurrent();
if ($lowLevelFiber !== null) {
fiberMap()->attachPromise($lowLevelFiber, $promise);
}

return $promise;
};
}


Expand Down Expand Up @@ -230,6 +254,13 @@ function await(PromiseInterface $promise): mixed
$rejected = false;
$resolvedValue = null;
$rejectedThrowable = null;
$lowLevelFiber = \Fiber::getCurrent();

if ($lowLevelFiber !== null) {
if (fiberMap()->isCanceled($lowLevelFiber) && $promise instanceof CancellablePromiseInterface) {
$promise->cancel();
}
}

$promise->then(
function (mixed $value) use (&$resolved, &$resolvedValue, &$fiber): void {
Expand Down Expand Up @@ -285,6 +316,10 @@ function (mixed $throwable) use (&$rejected, &$rejectedThrowable, &$fiber): void
throw $rejectedThrowable;
}

if ($lowLevelFiber !== null) {
fiberMap()->attachPromise($lowLevelFiber, $promise);
}

$fiber = FiberFactory::create();

return $fiber->suspend();
Expand Down Expand Up @@ -601,3 +636,17 @@ function waterfall(array $tasks): PromiseInterface

return $deferred->promise();
}

/**
* @internal
*/
function fiberMap(): FiberMap
{
static $wm = null;

if ($wm === null) {
$wm = new FiberMap();
}

return $wm;
}
107 changes: 107 additions & 0 deletions tests/AsyncTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
use function React\Promise\all;
use function React\Promise\reject;
use function React\Promise\resolve;
use function React\Promise\Timer\sleep;

class AsyncTest extends TestCase
{
Expand Down Expand Up @@ -185,4 +186,110 @@ public function testAsyncReturnsPromiseThatFulfillsWithValueWhenCallbackReturnsA
$this->assertGreaterThan(0.1, $time);
$this->assertLessThan(0.12, $time);
}

public function testCancel()
{
self::expectOutputString('a');
$this->expectException(\Exception::class);
$this->expectExceptionMessage('Timer cancelled');

$promise = async(static function (): int {
echo 'a';
await(sleep(2));
echo 'b';

return time();
})();

$promise->cancel();
await($promise);
}

public function testCancelTryCatch()
{
self::expectOutputString('ab');
// $this->expectException(\Exception::class);
// $this->expectExceptionMessage('Timer cancelled');

$promise = async(static function (): int {
echo 'a';
try {
await(sleep(2));
} catch (\Throwable) {
// No-Op
}
echo 'b';

return time();
})();

$promise->cancel();
await($promise);
}

public function testNestedCancel()
{
self::expectOutputString('abc');
$this->expectException(\Exception::class);
$this->expectExceptionMessage('Timer cancelled');

$promise = async(static function (): int {
echo 'a';
await(async(static function(): void {
echo 'b';
await(async(static function(): void {
echo 'c';
await(sleep(2));
echo 'd';
})());
echo 'e';
})());
echo 'f';

return time();
})();

$promise->cancel();
await($promise);
}

public function testCancelFiberThatCatchesExceptions()
{
self::expectOutputString('ab');
$this->expectException(\Exception::class);
$this->expectExceptionMessage('Timer cancelled');

$promise = async(static function (): int {
echo 'a';
try {
await(sleep(2));
} catch (\Throwable) {
// No-Op
}
echo 'b';
await(sleep(0.1));
echo 'c';

return time();
})();

$promise->cancel();
await($promise);
}

public function testNotAwaitedPromiseWillNotBeCanceled()
{
self::expectOutputString('acb');

async(static function (): int {
echo 'a';
sleep(0.001)->then(static function (): void {
echo 'b';
});
echo 'c';

return time();
})()->cancel();
Loop::run();
}
}

0 comments on commit 1dd9138

Please sign in to comment.