Skip to content

Child tasks #27

@lyriccoder

Description

@lyriccoder

Guys, could you please tell me how can I create child tasks? Suppose, Task1 has a list as an output (A)
Then, Task2 must be executed for each value of A. It will have a list as an output (B).
Then I want to run Task3 for each result of list B. How can I do it?
I've create the aggregator:

class TaskAggregatorJavaFiles(d6tflow.tasks.TaskAggregator):
    dir_to_search = d6tflow.Parameter()
    dir_to_save = d6tflow.Parameter()

    def run(self):
        test_files = set(Path(self.dir_to_search).glob('**/*Test*.java'))
        not_test_files = set(Path(self.dir_to_search).glob('**/*.java'))
        files_without_tests = list(not_test_files.difference(test_files))
        for _, file in enumerate(files_without_tests):
            print(_)
            yield TaskPreprocessJavaFile(file=str(file))

I see, that it iterates over files. But then the child Task is not spawned:

class TaskPreprocessJavaFile(d6tflow.tasks.TaskPickle):
    file = d6tflow.Parameter()

    def run(self):
        # it is not executed in parallel
        self.save({'bla': 'bla', "text": self.file})

The problem that they are not executed in parallel. How can I run asynchronous child (in parallel)

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions