Skip to content

Commit 89a1107

Browse files
authored
Merge pull request #6 from php-etl/fix/pipeline-execution
Created a Proxy to fix pipeline executions in a workflow
2 parents d065493 + 812a4af commit 89a1107

File tree

6 files changed

+93
-8
lines changed

6 files changed

+93
-8
lines changed

.github/workflows/phpstan-6.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
name: PHPStan level 6
22
on: push
33
jobs:
4-
phpstan:
4+
phpstan6:
55
runs-on: ubuntu-latest
66
steps:
77
- uses: actions/checkout@v3

.github/workflows/phpstan-7.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
name: PHPStan level 7
22
on: push
33
jobs:
4-
phpstan:
4+
phpstan7:
55
runs-on: ubuntu-latest
66
steps:
77
- uses: actions/checkout@v3

.github/workflows/phpstan-8.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
name: PHPStan level 8
22
on: push
33
jobs:
4-
phpstan:
4+
phpstan8:
55
runs-on: ubuntu-latest
66
steps:
77
- uses: actions/checkout@v3

composer.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@
4444
},
4545
"extra": {
4646
"branch-alias": {
47-
"dev-main": "0.3.x-dev"
47+
"dev-main": "0.4.x-dev"
4848
}
4949
}
5050
}

src/Console.php

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,9 @@
66

77
use Kiboko\Component\Action\Action;
88
use Kiboko\Component\Pipeline\Pipeline;
9+
use Kiboko\Component\Runtime\Action\ActionRuntimeInterface;
910
use Kiboko\Component\Runtime\Action\Console as ActionConsoleRuntime;
10-
use Kiboko\Component\Runtime\Pipeline\Console as PipelineConsoleRuntime;
11+
use Kiboko\Component\Runtime\Pipeline\PipelineRuntimeInterface;
1112
use Kiboko\Component\State;
1213
use Kiboko\Contract\Pipeline\PipelineRunnerInterface;
1314
use Kiboko\Contract\Satellite\RunnableInterface as JobRunnableInterface;
@@ -27,16 +28,16 @@ public function __construct(
2728
$this->state = new State\StateOutput\Workflow($output);
2829
}
2930

30-
public function loadPipeline(string $filename): PipelineConsoleRuntime
31+
public function loadPipeline(string $filename): PipelineRuntimeInterface
3132
{
3233
$factory = require $filename;
3334

3435
$pipeline = new Pipeline($this->pipelineRunner);
3536

36-
return $factory(new PipelineConsoleRuntime($this->output, $pipeline, $this->state->withPipeline(basename($filename))));
37+
return new PipelineProxy($factory, $this->output, $pipeline, $this->state, basename($filename));
3738
}
3839

39-
public function loadAction(string $filename): ActionConsoleRuntime
40+
public function loadAction(string $filename): ActionRuntimeInterface
4041
{
4142
$factory = require $filename;
4243

src/PipelineProxy.php

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Kiboko\Component\Runtime\Workflow;
6+
7+
use Kiboko\Component\Runtime\Pipeline\Console as PipelineConsoleRuntime;
8+
use Kiboko\Component\Runtime\Pipeline\PipelineRuntimeInterface;
9+
use Kiboko\Component\State;
10+
use Kiboko\Contract\Pipeline\ExtractorInterface;
11+
use Kiboko\Contract\Pipeline\LoaderInterface;
12+
use Kiboko\Contract\Pipeline\PipelineInterface;
13+
use Kiboko\Contract\Pipeline\RejectionInterface;
14+
use Kiboko\Contract\Pipeline\StateInterface;
15+
use Kiboko\Contract\Pipeline\TransformerInterface;
16+
use Kiboko\Contract\Pipeline\WalkableInterface;
17+
use Symfony\Component\Console\Output\ConsoleOutput;
18+
19+
class PipelineProxy implements PipelineRuntimeInterface
20+
{
21+
/** @var list<callable> */
22+
private array $queuedCalls = [];
23+
24+
public function __construct(
25+
callable $factory,
26+
private readonly ConsoleOutput $output,
27+
private readonly PipelineInterface&WalkableInterface $pipeline,
28+
private readonly State\StateOutput\Workflow $state,
29+
private readonly string $filename,
30+
) {
31+
$this->queuedCalls[] = static function (PipelineConsoleRuntime $runtime) use ($factory): void {
32+
$factory($runtime);
33+
};
34+
}
35+
36+
public function extract(
37+
ExtractorInterface $extractor,
38+
RejectionInterface $rejection,
39+
StateInterface $state,
40+
): self {
41+
$this->queuedCalls[] = static function (PipelineConsoleRuntime $runtime) use ($extractor, $rejection, $state): void {
42+
$runtime->extract($extractor, $rejection, $state);
43+
};
44+
45+
return $this;
46+
}
47+
48+
public function transform(
49+
TransformerInterface $transformer,
50+
RejectionInterface $rejection,
51+
StateInterface $state,
52+
): self {
53+
$this->queuedCalls[] = static function (PipelineConsoleRuntime $runtime) use ($transformer, $rejection, $state): void {
54+
$runtime->transform($transformer, $rejection, $state);
55+
};
56+
57+
return $this;
58+
}
59+
60+
public function load(
61+
LoaderInterface $loader,
62+
RejectionInterface $rejection,
63+
StateInterface $state,
64+
): self {
65+
$this->queuedCalls[] = static function (PipelineConsoleRuntime $runtime) use ($loader, $rejection, $state): void {
66+
$runtime->load($loader, $rejection, $state);
67+
};
68+
69+
return $this;
70+
}
71+
72+
public function run(int $interval = 1000): int
73+
{
74+
$runtime = new PipelineConsoleRuntime($this->output, $this->pipeline, $this->state->withPipeline($this->filename));
75+
76+
foreach ($this->queuedCalls as $queuedCall) {
77+
$queuedCall($this);
78+
}
79+
80+
$this->queuedCalls = [];
81+
82+
return $runtime->run($interval);
83+
}
84+
}

0 commit comments

Comments
 (0)