Skip to content

Commit 80451af

Browse files
fix: replaced taskcluster rest api calls with package
fixes #221
1 parent 5ffdbdb commit 80451af

File tree

10 files changed

+1697
-505
lines changed

10 files changed

+1697
-505
lines changed

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ dependencies = [
2929
"redo>=2.0",
3030
"requests>=2.25",
3131
"slugid>=2.0",
32+
"taskcluster>=55.0",
3233
"taskcluster-urls>=11.0",
3334
"voluptuous>=0.12.1",
3435
]

src/taskgraph/actions/retrigger.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,8 @@ def retrigger_action(parameters, graph_config, input, task_group_id, task_id):
163163
to_run = full_task_graph.graph.transitive_closure(
164164
set(to_run), reverse=True
165165
).nodes
166-
to_run = to_run & set(label_to_taskid.keys())
166+
if label_to_taskid:
167+
to_run = to_run & set(label_to_taskid.keys())
167168
with_downstream = " (with downstream) "
168169

169170
times = input.get("times", 1)
@@ -202,7 +203,7 @@ def rerun_action(parameters, graph_config, input, task_group_id, task_id):
202203
parameters, graph_config, task_group_id=task_group_id
203204
)
204205
label = task["metadata"]["name"]
205-
if task_id not in label_to_taskid.values():
206+
if label_to_taskid and task_id not in label_to_taskid.values():
206207
logger.error(
207208
f"Refusing to rerun {label}: taskId {task_id} not in decision task {decision_task_id} label_to_taskid!"
208209
)
@@ -276,7 +277,8 @@ def retrigger_multiple(parameters, graph_config, input, task_group_id, task_id):
276277
# In practice, this shouldn't matter, as only completed tasks
277278
# are pulled in from other pushes and treeherder won't pass
278279
# those labels.
279-
_rerun_task(label_to_taskid[label], label)
280+
if label_to_taskid and label in label_to_taskid:
281+
_rerun_task(label_to_taskid[label], label)
280282

281283
for j in range(times):
282284
suffix = f"{i}-{j}"

src/taskgraph/create.py

Lines changed: 3 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,14 @@
44

55

66
import logging
7-
import os
87
import sys
98
from concurrent import futures
109

1110
from slugid import nice as slugid
1211

1312
from taskgraph.util import json
1413
from taskgraph.util.parameterization import resolve_timestamps
15-
from taskgraph.util.taskcluster import CONCURRENCY, get_session
14+
from taskgraph.util.taskcluster import CONCURRENCY, get_session, get_taskcluster_queue
1615
from taskgraph.util.time import current_json_time
1716

1817
logger = logging.getLogger(__name__)
@@ -104,9 +103,6 @@ def submit(task_id, label, task_def):
104103

105104

106105
def create_task(session, task_id, label, task_def):
107-
# create the task using 'http://taskcluster/queue', which is proxied to the queue service
108-
# with credentials appropriate to this task.
109-
110106
# Resolve timestamps
111107
now = current_json_time(datetime_format=True)
112108
task_def = resolve_timestamps(now, task_def)
@@ -123,16 +119,5 @@ def create_task(session, task_id, label, task_def):
123119
return
124120

125121
logger.info(f"Creating task with taskId {task_id} for {label}")
126-
proxy_url = os.environ.get("TASKCLUSTER_PROXY_URL", "http://taskcluster").rstrip(
127-
"/"
128-
)
129-
res = session.put(
130-
f"{proxy_url}/queue/v1/task/{task_id}",
131-
json=task_def,
132-
)
133-
if res.status_code != 200:
134-
try:
135-
logger.error(res.json()["message"])
136-
except Exception:
137-
logger.error(res.text)
138-
res.raise_for_status()
122+
queue = get_taskcluster_queue()
123+
queue.createTask(task_id, task_def)

src/taskgraph/run-task/fetch-content

Lines changed: 42 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,11 @@ try:
3939
except ImportError:
4040
certifi = None
4141

42+
try:
43+
import taskcluster
44+
except ImportError:
45+
taskcluster = None
46+
4247

4348
CONCURRENCY = multiprocessing.cpu_count()
4449

@@ -734,15 +739,30 @@ def git_checkout_archive(
734739
env = os.environ.copy()
735740
keypath = ""
736741
if ssh_key:
737-
taskcluster_secret_url = api(
738-
os.environ.get("TASKCLUSTER_PROXY_URL"),
739-
"secrets",
740-
"v1",
741-
"secret/{keypath}".format(keypath=ssh_key),
742-
)
743-
taskcluster_secret = b"".join(stream_download(taskcluster_secret_url))
744-
taskcluster_secret = json.loads(taskcluster_secret)
745-
sshkey = taskcluster_secret["secret"]["ssh_privkey"]
742+
err = False
743+
744+
if taskcluster:
745+
secrets_client = get_taskcluster_secrets()
746+
if secrets_client:
747+
taskcluster_secret = secrets_client.get(ssh_key)
748+
if taskcluster_secret:
749+
sshkey = taskcluster_secret["secret"]["ssh_privkey"]
750+
else:
751+
err=True
752+
else:
753+
err = True
754+
755+
if not taskcluster or err:
756+
# Fallback to old method if taskcluster package not available
757+
taskcluster_secret_url = api(
758+
os.environ.get("TASKCLUSTER_PROXY_URL"),
759+
"secrets",
760+
"v1",
761+
"secret/{keypath}".format(keypath=ssh_key),
762+
)
763+
taskcluster_secret = b"".join(stream_download(taskcluster_secret_url))
764+
taskcluster_secret = json.loads(taskcluster_secret)
765+
sshkey = taskcluster_secret["secret"]["ssh_privkey"]
746766

747767
keypath = temp_dir.joinpath("ssh-key")
748768
keypath.write_text(sshkey)
@@ -900,6 +920,19 @@ def api(root_url, service, version, path):
900920
)
901921

902922

923+
def get_taskcluster_secrets():
924+
"""Get a taskcluster secrets client."""
925+
if not taskcluster:
926+
return None
927+
928+
if "TASKCLUSTER_PROXY_URL" in os.environ:
929+
secrets_options = {"rootUrl": os.environ["TASKCLUSTER_PROXY_URL"]}
930+
else:
931+
secrets_options = taskcluster.optionsFromEnvironment()
932+
933+
return taskcluster.Secrets(secrets_options)
934+
935+
903936
def get_hash(fetch, root_url):
904937
path = "task/{task}/artifacts/{artifact}".format(
905938
task=fetch["task"], artifact="public/chain-of-trust.json"

0 commit comments

Comments
 (0)