Skip to content

Commit 51afae0

Browse files
committed
Convert all transforms to async
1 parent b2b2ae2 commit 51afae0

File tree

12 files changed

+59
-58
lines changed

12 files changed

+59
-58
lines changed

src/taskgraph/generator.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ async def load_tasks(self, parameters, loaded_tasks, write_artifacts):
8888
soft_dependencies=t.get("soft-dependencies"),
8989
if_dependencies=t.get("if-dependencies"),
9090
)
91-
for t in tasks
91+
async for t in tasks
9292
]
9393
logger.info(f"Generated {len(tasks)} tasks for kind {self.name}")
9494
return tasks

src/taskgraph/transforms/base.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,9 @@ async def __call__(self, config, items):
148148
items = xform(config, items)
149149
if items is None:
150150
raise Exception(f"Transform {xform} is not a generator")
151+
152+
if not inspect.isasyncgen(items):
153+
items = convert_async(items)
151154
return items
152155

153156
def add(self, func):

src/taskgraph/transforms/cached_tasks.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ def format_task_digest(cached_task):
5050

5151

5252
@transforms.add
53-
def cache_task(config, tasks):
53+
async def cache_task(config, tasks):
5454
if taskgraph.fast:
5555
for task in tasks:
5656
yield task
@@ -61,7 +61,7 @@ def cache_task(config, tasks):
6161
if "cached_task" in task.attributes:
6262
digests[task.label] = format_task_digest(task.attributes["cached_task"])
6363

64-
for task in order_tasks(config, tasks):
64+
for task in order_tasks(config, [t async for t in tasks]):
6565
cache = task.pop("cache", None)
6666
if cache is None:
6767
yield task

src/taskgraph/transforms/chunking.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,8 @@
5050

5151

5252
@transforms.add
53-
def chunk_tasks(config, tasks):
54-
for task in tasks:
53+
async def chunk_tasks(config, tasks):
54+
async for task in tasks:
5555
chunk_config = task.pop("chunk", None)
5656
if not chunk_config:
5757
yield task

src/taskgraph/transforms/code_review.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@
1212

1313

1414
@transforms.add
15-
def add_dependencies(config, jobs):
16-
for job in jobs:
15+
async def add_dependencies(config, jobs):
16+
async for job in jobs:
1717
job.setdefault("soft-dependencies", [])
1818
job["soft-dependencies"] += [
1919
dep_task.label

src/taskgraph/transforms/docker_image.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@
6565

6666

6767
@transforms.add
68-
def fill_template(config, tasks):
68+
async def fill_template(config, tasks):
6969
available_packages = set()
7070
for task in config.kind_dependencies_tasks.values():
7171
if task.kind != "packages":
@@ -75,13 +75,11 @@ def fill_template(config, tasks):
7575

7676
context_hashes = {}
7777

78-
tasks = list(tasks)
79-
8078
if not taskgraph.fast and config.write_artifacts:
8179
if not os.path.isdir(CONTEXTS_DIR):
8280
os.makedirs(CONTEXTS_DIR)
8381

84-
for task in tasks:
82+
async for task in tasks:
8583
image_name = task.pop("name")
8684
job_symbol = task.pop("symbol", None)
8785
args = task.pop("args", {})

src/taskgraph/transforms/fetch.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -78,9 +78,9 @@ def wrap(func):
7878

7979

8080
@transforms.add
81-
def process_fetch_job(config, jobs):
81+
async def process_fetch_job(config, jobs):
8282
# Converts fetch-url entries to the job schema.
83-
for job in jobs:
83+
async for job in jobs:
8484
typ = job["fetch"]["type"]
8585
name = job["name"]
8686
fetch = job.pop("fetch")
@@ -103,15 +103,15 @@ def configure_fetch(config, typ, name, fetch):
103103

104104

105105
@transforms.add
106-
def make_task(config, jobs):
106+
async def make_task(config, jobs):
107107
# Fetch tasks are idempotent and immutable. Have them live for
108108
# essentially forever.
109109
if config.params["level"] == "3":
110110
expires = "1000 years"
111111
else:
112112
expires = "28 days"
113113

114-
for job in jobs:
114+
async for job in jobs:
115115
name = job["name"]
116116
artifact_prefix = job.get("artifact-prefix", "public")
117117
env = job.get("env", {})

src/taskgraph/transforms/from_deps.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,8 +113,8 @@
113113

114114

115115
@transforms.add
116-
def from_deps(config, tasks):
117-
for task in tasks:
116+
async def from_deps(config, tasks):
117+
async for task in tasks:
118118
# Setup and error handling.
119119
from_deps = task.pop("from-deps")
120120
kind_deps = config.config.get("kind-dependencies", [])

src/taskgraph/transforms/job/__init__.py

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -112,8 +112,8 @@
112112

113113

114114
@transforms.add
115-
def rewrite_when_to_optimization(config, jobs):
116-
for job in jobs:
115+
async def rewrite_when_to_optimization(config, jobs):
116+
async for job in jobs:
117117
when = job.pop("when", {})
118118
if not when:
119119
yield job
@@ -132,8 +132,8 @@ def rewrite_when_to_optimization(config, jobs):
132132

133133

134134
@transforms.add
135-
def set_implementation(config, jobs):
136-
for job in jobs:
135+
async def set_implementation(config, jobs):
136+
async for job in jobs:
137137
impl, os = worker_type_implementation(config.graph_config, job["worker-type"])
138138
if os:
139139
job.setdefault("tags", {})["os"] = os
@@ -148,8 +148,8 @@ def set_implementation(config, jobs):
148148

149149

150150
@transforms.add
151-
def set_label(config, jobs):
152-
for job in jobs:
151+
async def set_label(config, jobs):
152+
async for job in jobs:
153153
if "label" not in job:
154154
if "name" not in job:
155155
raise Exception("job has neither a name nor a label")
@@ -160,8 +160,8 @@ def set_label(config, jobs):
160160

161161

162162
@transforms.add
163-
def add_resource_monitor(config, jobs):
164-
for job in jobs:
163+
async def add_resource_monitor(config, jobs):
164+
async for job in jobs:
165165
if job.get("attributes", {}).get("resource-monitor"):
166166
worker_implementation, worker_os = worker_type_implementation(
167167
config.graph_config, job["worker-type"]
@@ -204,13 +204,13 @@ def get_attribute(dict, key, attributes, attribute_name):
204204

205205

206206
@transforms.add
207-
def use_fetches(config, jobs):
207+
async def use_fetches(config, jobs):
208208
artifact_names = {}
209209
aliases = {}
210210
extra_env = {}
211211

212+
jobs = [j async for j in jobs]
212213
if config.kind in ("toolchain", "fetch"):
213-
jobs = list(jobs)
214214
for job in jobs:
215215
run = job.get("run", {})
216216
label = job["label"]
@@ -353,12 +353,12 @@ def cmp_artifacts(a):
353353

354354

355355
@transforms.add
356-
def make_task_description(config, jobs):
356+
async def make_task_description(config, jobs):
357357
"""Given a build description, create a task description"""
358358
# import plugin modules first, before iterating over jobs
359359
import_sibling_modules(exceptions=("common.py",))
360360

361-
for job in jobs:
361+
async for job in jobs:
362362
# always-optimized tasks never execute, so have no workdir
363363
if job["worker"]["implementation"] in ("docker-worker", "generic-worker"):
364364
job["run"].setdefault("workdir", "/builds/worker")

src/taskgraph/transforms/notify.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -140,8 +140,8 @@ def _convert_content(content):
140140

141141

142142
@transforms.add
143-
def add_notifications(config, tasks):
144-
for task in tasks:
143+
async def add_notifications(config, tasks):
144+
async for task in tasks:
145145
label = "{}-{}".format(config.kind, task["name"])
146146
if "notifications" in task:
147147
notify = _convert_legacy(config, task.pop("notifications"), label)

src/taskgraph/transforms/task.py

Lines changed: 25 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -831,11 +831,11 @@ def build_dummy_payload(config, task, task_def):
831831

832832

833833
@transforms.add
834-
def set_implementation(config, tasks):
834+
async def set_implementation(config, tasks):
835835
"""
836836
Set the worker implementation based on the worker-type alias.
837837
"""
838-
for task in tasks:
838+
async for task in tasks:
839839
worker = task.setdefault("worker", {})
840840
if "implementation" in task["worker"]:
841841
yield task
@@ -855,8 +855,8 @@ def set_implementation(config, tasks):
855855

856856

857857
@transforms.add
858-
def set_defaults(config, tasks):
859-
for task in tasks:
858+
async def set_defaults(config, tasks):
859+
async for task in tasks:
860860
task.setdefault("always-target", False)
861861
task.setdefault("optimization", None)
862862
task.setdefault("needs-sccache", False)
@@ -899,8 +899,8 @@ def set_defaults(config, tasks):
899899

900900

901901
@transforms.add
902-
def task_name_from_label(config, tasks):
903-
for task in tasks:
902+
async def task_name_from_label(config, tasks):
903+
async for task in tasks:
904904
if "label" not in task:
905905
if "name" not in task:
906906
raise Exception("task has neither a name nor a label")
@@ -911,8 +911,8 @@ def task_name_from_label(config, tasks):
911911

912912

913913
@transforms.add
914-
def validate(config, tasks):
915-
for task in tasks:
914+
async def validate(config, tasks):
915+
async for task in tasks:
916916
validate_schema(
917917
task_description_schema,
918918
task,
@@ -949,8 +949,8 @@ def add_generic_index_routes(config, task):
949949

950950

951951
@transforms.add
952-
def process_treeherder_metadata(config, tasks):
953-
for task in tasks:
952+
async def process_treeherder_metadata(config, tasks):
953+
async for task in tasks:
954954
routes = task.get("routes", [])
955955
extra = task.get("extra", {})
956956
task_th = task.get("treeherder")
@@ -1021,8 +1021,8 @@ def process_treeherder_metadata(config, tasks):
10211021

10221022

10231023
@transforms.add
1024-
def add_index_routes(config, tasks):
1025-
for task in tasks:
1024+
async def add_index_routes(config, tasks):
1025+
async for task in tasks:
10261026
index = task.get("index", {})
10271027

10281028
# The default behavior is to rank tasks according to their tier
@@ -1053,8 +1053,8 @@ def add_index_routes(config, tasks):
10531053

10541054

10551055
@transforms.add
1056-
def build_task(config, tasks):
1057-
for task in tasks:
1056+
async def build_task(config, tasks):
1057+
async for task in tasks:
10581058
level = str(config.params["level"])
10591059

10601060
provisioner_id, worker_type = get_worker_type(
@@ -1215,24 +1215,24 @@ def build_task(config, tasks):
12151215

12161216

12171217
@transforms.add
1218-
def add_github_checks(config, tasks):
1218+
async def add_github_checks(config, tasks):
12191219
"""
12201220
For git repositories, add checks route to all tasks.
12211221
12221222
This will be replaced by a configurable option in the future.
12231223
"""
12241224
if config.params["repository_type"] != "git":
1225-
for task in tasks:
1225+
async for task in tasks:
12261226
yield task
12271227

1228-
for task in tasks:
1228+
async for task in tasks:
12291229
task["task"]["routes"].append("checks")
12301230
yield task
12311231

12321232

12331233
@transforms.add
1234-
def chain_of_trust(config, tasks):
1235-
for task in tasks:
1234+
async def chain_of_trust(config, tasks):
1235+
async for task in tasks:
12361236
if task["task"].get("payload", {}).get("features", {}).get("chainOfTrust"):
12371237
image = task.get("dependencies", {}).get("docker-image")
12381238
if image:
@@ -1246,12 +1246,12 @@ def chain_of_trust(config, tasks):
12461246

12471247

12481248
@transforms.add
1249-
def check_task_identifiers(config, tasks):
1249+
async def check_task_identifiers(config, tasks):
12501250
"""Ensures that all tasks have well defined identifiers:
12511251
``^[a-zA-Z0-9_-]{1,38}$``
12521252
"""
12531253
e = re.compile("^[a-zA-Z0-9_-]{1,38}$")
1254-
for task in tasks:
1254+
async for task in tasks:
12551255
for attrib in ("workerType", "provisionerId"):
12561256
if not e.match(task["task"][attrib]):
12571257
raise Exception(
@@ -1263,9 +1263,9 @@ def check_task_identifiers(config, tasks):
12631263

12641264

12651265
@transforms.add
1266-
def check_task_dependencies(config, tasks):
1266+
async def check_task_dependencies(config, tasks):
12671267
"""Ensures that tasks don't have more than 100 dependencies."""
1268-
for task in tasks:
1268+
async for task in tasks:
12691269
number_of_dependencies = (
12701270
len(task["dependencies"])
12711271
+ len(task["if-dependencies"])
@@ -1311,7 +1311,7 @@ def check_caches_are_volumes(task):
13111311

13121312

13131313
@transforms.add
1314-
def check_run_task_caches(config, tasks):
1314+
async def check_run_task_caches(config, tasks):
13151315
"""Audit for caches requiring run-task.
13161316
13171317
run-task manages caches in certain ways. If a cache managed by run-task
@@ -1336,7 +1336,7 @@ def check_run_task_caches(config, tasks):
13361336

13371337
suffix = _run_task_suffix()
13381338

1339-
for task in tasks:
1339+
async for task in tasks:
13401340
payload = task["task"].get("payload", {})
13411341
command = payload.get("command") or [""]
13421342

src/taskgraph/transforms/task_context.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,8 +81,8 @@
8181

8282

8383
@transforms.add
84-
def render_task(config, jobs):
85-
for job in jobs:
84+
async def render_task(config, jobs):
85+
async for job in jobs:
8686
sub_config = job.pop("task-context")
8787
params_context = {}
8888
for var, path in sub_config.pop("from-parameters", {}).items():

0 commit comments

Comments
 (0)