1
+ <?php
2
+
3
+ namespace EcomDev \MySQL2JSONL \Command ;
4
+
5
+ use Amp \Pipeline \Queue ;
6
+ use EcomDev \MySQL2JSONL \Configuration ;
7
+ use EcomDev \MySQL2JSONL \ConfigurationException ;
8
+ use EcomDev \MySQL2JSONL \ExportTableFactory ;
9
+ use EcomDev \MySQL2JSONL \Progress \ExportProgressNotifier ;
10
+ use Revolt \EventLoop ;
11
+ use Symfony \Component \Console \Attribute \AsCommand ;
12
+ use Symfony \Component \Console \Command \Command ;
13
+ use Symfony \Component \Console \Helper \FormatterHelper ;
14
+ use Symfony \Component \Console \Input \InputArgument ;
15
+ use Symfony \Component \Console \Input \InputInterface ;
16
+ use Symfony \Component \Console \Output \OutputInterface ;
17
+ use Symfony \Component \Console \Input \InputOption ;
18
+ use function Amp \async ;
19
+ use function Amp \delay ;
20
+ use function Amp \Future \awaitAll ;
21
+
22
+ #[AsCommand(name: 'export ' , description: 'Export data from MySQL to a directory with JSONL files ' )]
23
+ class ExportCommand extends Command
24
+ {
25
+ public function configure ()
26
+ {
27
+ $ this ->addOption (
28
+ 'config ' ,
29
+ 'c ' , InputOption::VALUE_REQUIRED ,
30
+ 'Configuration file ' ,
31
+ 'config.json '
32
+ );
33
+
34
+ $ this ->addArgument (
35
+ 'directory ' ,
36
+ InputArgument::OPTIONAL ,
37
+ 'Directory to export data to ' ,
38
+ './data-dump '
39
+ );
40
+ }
41
+
42
+ public function execute (InputInterface $ input , OutputInterface $ output ): int
43
+ {
44
+ $ file = $ input ->getOption ('config ' );
45
+ if (!file_exists ($ file )) {
46
+ /* @var FormatterHelper $formatter*/
47
+ $ formatter = $ this ->getHelper ('formatter ' );
48
+ $ output ->write ($ formatter ->formatSection ('Error ' , sprintf (
49
+ 'Configuration file %s does not exist ' ,
50
+ $ file
51
+ ), 'error ' ));
52
+ return 1 ;
53
+ }
54
+ try {
55
+ $ config = Configuration::fromJSON (file_get_contents ($ file ));
56
+ } catch (ConfigurationException $ error ) {
57
+ $ error ->output ($ output );
58
+ return 1 ;
59
+ }
60
+
61
+ $ notifiers = new ExportProgressNotifier ($ output );
62
+ $ futures = [];
63
+ $ connectionPool = $ config ->createConnectionPool ();
64
+ $ factory = new ExportTableFactory ($ connectionPool , $ notifiers );
65
+ foreach ($ factory ->tablesToExport ($ config ) as $ table ) {
66
+ $ queue = new Queue (100 );
67
+
68
+ $ futures [] = async (function () use ($ factory , $ table , $ queue ) {
69
+ $ factory ->createExport ($ table )->run ($ queue );
70
+ });
71
+
72
+ $ futures [] = async (function () use ($ queue , $ output ) {
73
+ foreach ($ queue ->iterate () as $ item ) {
74
+ }
75
+ });
76
+
77
+ if (count ($ futures ) >= 100 ) {
78
+ awaitAll ($ futures );
79
+ $ futures = [];
80
+ }
81
+ }
82
+
83
+ EventLoop::run ();
84
+ return 0 ;
85
+ }
86
+ }
0 commit comments