Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ dependencies = [
"redo>=2.0",
"requests>=2.25",
"slugid>=2.0",
"taskcluster>=55.0",
"taskcluster-urls>=11.0",
"voluptuous>=0.12.1",
]
Expand Down
2 changes: 1 addition & 1 deletion src/taskgraph/actions/cancel.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def cancel_action(parameters, graph_config, input, task_group_id, task_id):
# Note that this is limited by the scopes afforded to generic actions to
# only cancel tasks with the level-specific schedulerId.
try:
cancel_task(task_id, use_proxy=True)
cancel_task(task_id)
except requests.HTTPError as e:
if e.response.status_code == 409:
# A 409 response indicates that this task is past its deadline. It
Expand Down
2 changes: 1 addition & 1 deletion src/taskgraph/actions/cancel_all.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def cancel_all_action(parameters, graph_config, input, task_group_id, task_id):
def do_cancel_task(task_id):
logger.info(f"Cancelling task {task_id}")
try:
cancel_task(task_id, use_proxy=True)
cancel_task(task_id)
except requests.HTTPError as e:
if e.response.status_code == 409:
# A 409 response indicates that this task is past its deadline. It
Expand Down
12 changes: 7 additions & 5 deletions src/taskgraph/actions/retrigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ def retrigger_action(parameters, graph_config, input, task_group_id, task_id):
)

task = taskcluster.get_task_definition(task_id)
label = task["metadata"]["name"]
label = task["metadata"]["name"] # type: ignore

with_downstream = " "
to_run = [label]
Expand All @@ -163,7 +163,8 @@ def retrigger_action(parameters, graph_config, input, task_group_id, task_id):
to_run = full_task_graph.graph.transitive_closure(
set(to_run), reverse=True
).nodes
to_run = to_run & set(label_to_taskid.keys())
if label_to_taskid:
to_run = to_run & set(label_to_taskid.keys())
with_downstream = " (with downstream) "

times = input.get("times", 1)
Expand Down Expand Up @@ -201,8 +202,8 @@ def rerun_action(parameters, graph_config, input, task_group_id, task_id):
decision_task_id, full_task_graph, label_to_taskid = fetch_graph_and_labels(
parameters, graph_config, task_group_id=task_group_id
)
label = task["metadata"]["name"]
if task_id not in label_to_taskid.values():
label = task["metadata"]["name"] # type: ignore
if label_to_taskid and task_id not in label_to_taskid.values():
logger.error(
f"Refusing to rerun {label}: taskId {task_id} not in decision task {decision_task_id} label_to_taskid!"
)
Expand Down Expand Up @@ -276,7 +277,8 @@ def retrigger_multiple(parameters, graph_config, input, task_group_id, task_id):
# In practice, this shouldn't matter, as only completed tasks
# are pulled in from other pushes and treeherder won't pass
# those labels.
_rerun_task(label_to_taskid[label], label)
if label_to_taskid and label in label_to_taskid:
_rerun_task(label_to_taskid[label], label)

for j in range(times):
suffix = f"{i}-{j}"
Expand Down
10 changes: 4 additions & 6 deletions src/taskgraph/actions/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,10 @@ def fetch_graph_and_labels(parameters, graph_config, task_group_id=None):
# for old ones
def fetch_action(task_id):
logger.info(f"fetching label-to-taskid.json for action task {task_id}")
try:
run_label_to_id = get_artifact(task_id, "public/label-to-taskid.json")
run_label_to_id = get_artifact(task_id, "public/label-to-taskid.json")
if label_to_taskid and run_label_to_id:
label_to_taskid.update(run_label_to_id)
except HTTPError as e:
if e.response.status_code != 404:
raise
else:
logger.debug(f"No label-to-taskid.json found for {task_id}: {e}")

# for backwards compatibility, look up actions via pushlog-id
Expand All @@ -84,7 +82,7 @@ def fetch_cron(task_id):
logger.info(f"fetching label-to-taskid.json for cron task {task_id}")
try:
run_label_to_id = get_artifact(task_id, "public/label-to-taskid.json")
label_to_taskid.update(run_label_to_id)
label_to_taskid.update(run_label_to_id) # type: ignore
except HTTPError as e:
if e.response.status_code != 404:
raise
Expand Down
21 changes: 3 additions & 18 deletions src/taskgraph/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,14 @@


import logging
import os
import sys
from concurrent import futures

from slugid import nice as slugid

from taskgraph.util import json
from taskgraph.util.parameterization import resolve_timestamps
from taskgraph.util.taskcluster import CONCURRENCY, get_session
from taskgraph.util.taskcluster import CONCURRENCY, get_session, get_taskcluster_client
from taskgraph.util.time import current_json_time

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -104,9 +103,6 @@ def submit(task_id, label, task_def):


def create_task(session, task_id, label, task_def):
# create the task using 'http://taskcluster/queue', which is proxied to the queue service
# with credentials appropriate to this task.

# Resolve timestamps
now = current_json_time(datetime_format=True)
task_def = resolve_timestamps(now, task_def)
Expand All @@ -123,16 +119,5 @@ def create_task(session, task_id, label, task_def):
return

logger.info(f"Creating task with taskId {task_id} for {label}")
proxy_url = os.environ.get("TASKCLUSTER_PROXY_URL", "http://taskcluster").rstrip(
"/"
)
res = session.put(
f"{proxy_url}/queue/v1/task/{task_id}",
json=task_def,
)
if res.status_code != 200:
try:
logger.error(res.json()["message"])
except Exception:
logger.error(res.text)
res.raise_for_status()
queue = get_taskcluster_client("queue")
queue.createTask(task_id, task_def)
10 changes: 5 additions & 5 deletions src/taskgraph/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,12 +270,12 @@ def load_task(task_id, remove=True, user=None):
task_def = get_task_definition(task_id)

if (
impl := task_def.get("tags", {}).get("worker-implementation")
impl := task_def.get("tags", {}).get("worker-implementation") # type: ignore
) != "docker-worker":
print(f"Tasks with worker-implementation '{impl}' are not supported!")
return 1

command = task_def["payload"].get("command")
command = task_def["payload"].get("command") # type: ignore
if not command or not command[0].endswith("run-task"):
print("Only tasks using `run-task` are supported!")
return 1
Expand Down Expand Up @@ -308,18 +308,18 @@ def load_task(task_id, remove=True, user=None):
else:
task_cwd = "$TASK_WORKDIR"

image_task_id = task_def["payload"]["image"]["taskId"]
image_task_id = task_def["payload"]["image"]["taskId"] # type: ignore
image_tag = load_image_by_task_id(image_task_id)

# Set some env vars the worker would normally set.
env = {
"RUN_ID": "0",
"TASK_GROUP_ID": task_def.get("taskGroupId", ""),
"TASK_GROUP_ID": task_def.get("taskGroupId", ""), # type: ignore
"TASK_ID": task_id,
"TASKCLUSTER_ROOT_URL": get_root_url(False),
}
# Add the task's environment variables.
env.update(task_def["payload"].get("env", {}))
env.update(task_def["payload"].get("env", {})) # type: ignore

envfile = None
initfile = None
Expand Down
5 changes: 3 additions & 2 deletions src/taskgraph/optimize/strategies.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,15 @@ def should_replace_task(self, task, params, deadline, arg):
status = status_task(task_id)
# status can be `None` if we're in `testing` mode
# (e.g. test-action-callback)
if not status or status.get("state") in ("exception", "failed"):
if not status or status.get("state") in ("exception", "failed"): # type: ignore
logger.debug(
f"not replacing {task.label} with {task_id} because it is in failed or exception state"
)
continue

if deadline and datetime.strptime(
status["expires"], self.fmt
status["expires"], # type: ignore
self.fmt,
) < datetime.strptime(deadline, self.fmt):
logger.debug(
f"not replacing {task.label} with {task_id} because it expires before {deadline}"
Expand Down
Loading
Loading