Skip to content

Commit 6da6cf9

Browse files
committed
Convert all transforms to async
1 parent e2fa43e commit 6da6cf9

File tree

10 files changed

+55
-57
lines changed

10 files changed

+55
-57
lines changed

src/taskgraph/transforms/cached_tasks.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ def format_task_digest(cached_task):
5050

5151

5252
@transforms.add
53-
def cache_task(config, tasks):
53+
async def cache_task(config, tasks):
5454
if taskgraph.fast:
5555
for task in tasks:
5656
yield task
@@ -61,7 +61,7 @@ def cache_task(config, tasks):
6161
if "cached_task" in task.attributes:
6262
digests[task.label] = format_task_digest(task.attributes["cached_task"])
6363

64-
for task in order_tasks(config, tasks):
64+
for task in order_tasks(config, [t async for t in tasks]):
6565
cache = task.pop("cache", None)
6666
if cache is None:
6767
yield task

src/taskgraph/transforms/chunking.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,8 @@
5050

5151

5252
@transforms.add
53-
def chunk_tasks(config, tasks):
54-
for task in tasks:
53+
async def chunk_tasks(config, tasks):
54+
async for task in tasks:
5555
chunk_config = task.pop("chunk", None)
5656
if not chunk_config:
5757
yield task

src/taskgraph/transforms/code_review.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@
1212

1313

1414
@transforms.add
15-
def add_dependencies(config, jobs):
16-
for job in jobs:
15+
async def add_dependencies(config, jobs):
16+
async for job in jobs:
1717
job.setdefault("soft-dependencies", [])
1818
job["soft-dependencies"] += [
1919
dep_task.label

src/taskgraph/transforms/docker_image.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@
6565

6666

6767
@transforms.add
68-
def fill_template(config, tasks):
68+
async def fill_template(config, tasks):
6969
available_packages = set()
7070
for task in config.kind_dependencies_tasks.values():
7171
if task.kind != "packages":
@@ -75,13 +75,11 @@ def fill_template(config, tasks):
7575

7676
context_hashes = {}
7777

78-
tasks = list(tasks)
79-
8078
if not taskgraph.fast and config.write_artifacts:
8179
if not os.path.isdir(CONTEXTS_DIR):
8280
os.makedirs(CONTEXTS_DIR)
8381

84-
for task in tasks:
82+
async for task in tasks:
8583
image_name = task.pop("name")
8684
job_symbol = task.pop("symbol", None)
8785
args = task.pop("args", {})

src/taskgraph/transforms/fetch.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -78,9 +78,9 @@ def wrap(func):
7878

7979

8080
@transforms.add
81-
def process_fetch_job(config, jobs):
81+
async def process_fetch_job(config, jobs):
8282
# Converts fetch-url entries to the job schema.
83-
for job in jobs:
83+
async for job in jobs:
8484
typ = job["fetch"]["type"]
8585
name = job["name"]
8686
fetch = job.pop("fetch")
@@ -103,15 +103,15 @@ def configure_fetch(config, typ, name, fetch):
103103

104104

105105
@transforms.add
106-
def make_task(config, jobs):
106+
async def make_task(config, jobs):
107107
# Fetch tasks are idempotent and immutable. Have them live for
108108
# essentially forever.
109109
if config.params["level"] == "3":
110110
expires = "1000 years"
111111
else:
112112
expires = "28 days"
113113

114-
for job in jobs:
114+
async for job in jobs:
115115
name = job["name"]
116116
artifact_prefix = job.get("artifact-prefix", "public")
117117
env = job.get("env", {})

src/taskgraph/transforms/from_deps.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,8 +113,8 @@
113113

114114

115115
@transforms.add
116-
def from_deps(config, tasks):
117-
for task in tasks:
116+
async def from_deps(config, tasks):
117+
async for task in tasks:
118118
# Setup and error handling.
119119
from_deps = task.pop("from-deps")
120120
kind_deps = config.config.get("kind-dependencies", [])

src/taskgraph/transforms/job/__init__.py

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -112,8 +112,8 @@
112112

113113

114114
@transforms.add
115-
def rewrite_when_to_optimization(config, jobs):
116-
for job in jobs:
115+
async def rewrite_when_to_optimization(config, jobs):
116+
async for job in jobs:
117117
when = job.pop("when", {})
118118
if not when:
119119
yield job
@@ -132,8 +132,8 @@ def rewrite_when_to_optimization(config, jobs):
132132

133133

134134
@transforms.add
135-
def set_implementation(config, jobs):
136-
for job in jobs:
135+
async def set_implementation(config, jobs):
136+
async for job in jobs:
137137
impl, os = worker_type_implementation(config.graph_config, job["worker-type"])
138138
if os:
139139
job.setdefault("tags", {})["os"] = os
@@ -148,8 +148,8 @@ def set_implementation(config, jobs):
148148

149149

150150
@transforms.add
151-
def set_label(config, jobs):
152-
for job in jobs:
151+
async def set_label(config, jobs):
152+
async for job in jobs:
153153
if "label" not in job:
154154
if "name" not in job:
155155
raise Exception("job has neither a name nor a label")
@@ -160,8 +160,8 @@ def set_label(config, jobs):
160160

161161

162162
@transforms.add
163-
def add_resource_monitor(config, jobs):
164-
for job in jobs:
163+
async def add_resource_monitor(config, jobs):
164+
async for job in jobs:
165165
if job.get("attributes", {}).get("resource-monitor"):
166166
worker_implementation, worker_os = worker_type_implementation(
167167
config.graph_config, job["worker-type"]
@@ -204,13 +204,13 @@ def get_attribute(dict, key, attributes, attribute_name):
204204

205205

206206
@transforms.add
207-
def use_fetches(config, jobs):
207+
async def use_fetches(config, jobs):
208208
artifact_names = {}
209209
aliases = {}
210210
extra_env = {}
211211

212+
jobs = [j async for j in jobs]
212213
if config.kind in ("toolchain", "fetch"):
213-
jobs = list(jobs)
214214
for job in jobs:
215215
run = job.get("run", {})
216216
label = job["label"]
@@ -353,12 +353,12 @@ def cmp_artifacts(a):
353353

354354

355355
@transforms.add
356-
def make_task_description(config, jobs):
356+
async def make_task_description(config, jobs):
357357
"""Given a build description, create a task description"""
358358
# import plugin modules first, before iterating over jobs
359359
import_sibling_modules(exceptions=("common.py",))
360360

361-
for job in jobs:
361+
async for job in jobs:
362362
# always-optimized tasks never execute, so have no workdir
363363
if job["worker"]["implementation"] in ("docker-worker", "generic-worker"):
364364
job["run"].setdefault("workdir", "/builds/worker")

src/taskgraph/transforms/notify.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -140,8 +140,8 @@ def _convert_content(content):
140140

141141

142142
@transforms.add
143-
def add_notifications(config, tasks):
144-
for task in tasks:
143+
async def add_notifications(config, tasks):
144+
async for task in tasks:
145145
label = "{}-{}".format(config.kind, task["name"])
146146
if "notifications" in task:
147147
notify = _convert_legacy(config, task.pop("notifications"), label)

src/taskgraph/transforms/task.py

Lines changed: 25 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -835,11 +835,11 @@ def build_dummy_payload(config, task, task_def):
835835

836836

837837
@transforms.add
838-
def set_implementation(config, tasks):
838+
async def set_implementation(config, tasks):
839839
"""
840840
Set the worker implementation based on the worker-type alias.
841841
"""
842-
for task in tasks:
842+
async for task in tasks:
843843
worker = task.setdefault("worker", {})
844844
if "implementation" in task["worker"]:
845845
yield task
@@ -859,8 +859,8 @@ def set_implementation(config, tasks):
859859

860860

861861
@transforms.add
862-
def set_defaults(config, tasks):
863-
for task in tasks:
862+
async def set_defaults(config, tasks):
863+
async for task in tasks:
864864
task.setdefault("always-target", False)
865865
task.setdefault("optimization", None)
866866
task.setdefault("needs-sccache", False)
@@ -903,8 +903,8 @@ def set_defaults(config, tasks):
903903

904904

905905
@transforms.add
906-
def task_name_from_label(config, tasks):
907-
for task in tasks:
906+
async def task_name_from_label(config, tasks):
907+
async for task in tasks:
908908
if "label" not in task:
909909
if "name" not in task:
910910
raise Exception("task has neither a name nor a label")
@@ -915,8 +915,8 @@ def task_name_from_label(config, tasks):
915915

916916

917917
@transforms.add
918-
def validate(config, tasks):
919-
for task in tasks:
918+
async def validate(config, tasks):
919+
async for task in tasks:
920920
validate_schema(
921921
task_description_schema,
922922
task,
@@ -953,8 +953,8 @@ def add_generic_index_routes(config, task):
953953

954954

955955
@transforms.add
956-
def process_treeherder_metadata(config, tasks):
957-
for task in tasks:
956+
async def process_treeherder_metadata(config, tasks):
957+
async for task in tasks:
958958
routes = task.get("routes", [])
959959
extra = task.get("extra", {})
960960
task_th = task.get("treeherder")
@@ -1025,8 +1025,8 @@ def process_treeherder_metadata(config, tasks):
10251025

10261026

10271027
@transforms.add
1028-
def add_index_routes(config, tasks):
1029-
for task in tasks:
1028+
async def add_index_routes(config, tasks):
1029+
async for task in tasks:
10301030
index = task.get("index", {})
10311031

10321032
# The default behavior is to rank tasks according to their tier
@@ -1057,8 +1057,8 @@ def add_index_routes(config, tasks):
10571057

10581058

10591059
@transforms.add
1060-
def build_task(config, tasks):
1061-
for task in tasks:
1060+
async def build_task(config, tasks):
1061+
async for task in tasks:
10621062
level = str(config.params["level"])
10631063

10641064
provisioner_id, worker_type = get_worker_type(
@@ -1219,24 +1219,24 @@ def build_task(config, tasks):
12191219

12201220

12211221
@transforms.add
1222-
def add_github_checks(config, tasks):
1222+
async def add_github_checks(config, tasks):
12231223
"""
12241224
For git repositories, add checks route to all tasks.
12251225
12261226
This will be replaced by a configurable option in the future.
12271227
"""
12281228
if config.params["repository_type"] != "git":
1229-
for task in tasks:
1229+
async for task in tasks:
12301230
yield task
12311231

1232-
for task in tasks:
1232+
async for task in tasks:
12331233
task["task"]["routes"].append("checks")
12341234
yield task
12351235

12361236

12371237
@transforms.add
1238-
def chain_of_trust(config, tasks):
1239-
for task in tasks:
1238+
async def chain_of_trust(config, tasks):
1239+
async for task in tasks:
12401240
if task["task"].get("payload", {}).get("features", {}).get("chainOfTrust"):
12411241
image = task.get("dependencies", {}).get("docker-image")
12421242
if image:
@@ -1250,12 +1250,12 @@ def chain_of_trust(config, tasks):
12501250

12511251

12521252
@transforms.add
1253-
def check_task_identifiers(config, tasks):
1253+
async def check_task_identifiers(config, tasks):
12541254
"""Ensures that all tasks have well defined identifiers:
12551255
``^[a-zA-Z0-9_-]{1,38}$``
12561256
"""
12571257
e = re.compile("^[a-zA-Z0-9_-]{1,38}$")
1258-
for task in tasks:
1258+
async for task in tasks:
12591259
for attrib in ("workerType", "provisionerId"):
12601260
if not e.match(task["task"][attrib]):
12611261
raise Exception(
@@ -1267,9 +1267,9 @@ def check_task_identifiers(config, tasks):
12671267

12681268

12691269
@transforms.add
1270-
def check_task_dependencies(config, tasks):
1270+
async def check_task_dependencies(config, tasks):
12711271
"""Ensures that tasks don't have more than 100 dependencies."""
1272-
for task in tasks:
1272+
async for task in tasks:
12731273
number_of_dependencies = (
12741274
len(task["dependencies"])
12751275
+ len(task["if-dependencies"])
@@ -1315,7 +1315,7 @@ def check_caches_are_volumes(task):
13151315

13161316

13171317
@transforms.add
1318-
def check_run_task_caches(config, tasks):
1318+
async def check_run_task_caches(config, tasks):
13191319
"""Audit for caches requiring run-task.
13201320
13211321
run-task manages caches in certain ways. If a cache managed by run-task
@@ -1340,7 +1340,7 @@ def check_run_task_caches(config, tasks):
13401340

13411341
suffix = _run_task_suffix()
13421342

1343-
for task in tasks:
1343+
async for task in tasks:
13441344
payload = task["task"].get("payload", {})
13451345
command = payload.get("command") or [""]
13461346

src/taskgraph/transforms/task_context.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,8 +81,8 @@
8181

8282

8383
@transforms.add
84-
def render_task(config, jobs):
85-
for job in jobs:
84+
async def render_task(config, jobs):
85+
async for job in jobs:
8686
sub_config = job.pop("task-context")
8787
params_context = {}
8888
for var, path in sub_config.pop("from-parameters", {}).items():

0 commit comments

Comments
 (0)