Skip to content

Commit

Permalink
[orion-decision] Schedule all tasks per separate (service, arch)
Browse files Browse the repository at this point in the history
  • Loading branch information
yescomply committed Oct 30, 2024
1 parent c054412 commit 101da26
Showing 1 changed file with 60 additions and 38 deletions.
98 changes: 60 additions & 38 deletions services/orion-decision/src/orion_decision/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
)

LOG = getLogger(__name__)
LOG.setLevel(level=10)
TEMPLATES = (Path(__file__).parent / "task_templates").resolve()
BUILD_TASK = Template((TEMPLATES / "build.yaml").read_text())
MSYS_TASK = Template((TEMPLATES / "build_msys.yaml").read_text())
Expand Down Expand Up @@ -453,18 +454,27 @@ def create_tasks(self) -> None:
service_build_tasks = {
(service, arch): slugId()
for service in self.services
for arch in WORKERS_ARCHS
for arch in getattr(self.services[service], "archs", ["amd64"])
}
for (service, arch), task_id in service_build_tasks.items():
LOG.debug("Task %s is a build of %s on %s", task_id, service, arch)
recipe_test_tasks = {recipe: slugId() for recipe in self.services.recipes}
test_tasks_created: Set[str] = set()
for recipe, task_id in recipe_test_tasks.items():
LOG.debug("Task %s is a recipe test for %s", task_id, recipe)
test_tasks_created: Dict[Tuple[str, str], str] = {}
recipe_tasks_created: Set[str] = set()
build_tasks_created: Set[str] = set()
combine_tasks_created: Dict[str, str] = {}
push_tasks_created: Set[str] = set()
to_create = sorted(
self.services.recipes.values(), key=lambda x: x.name
) + sorted(self.services.values(), key=lambda x: x.name)
to_create = [
(recipe, "amd64")
for recipe in sorted(self.services.recipes.values(), key=lambda x: x.name)
]
for service in sorted(self.services.values(), key=lambda x: x.name):
for arch in getattr(service, "archs", ["amd64"]):
to_create.append((service, arch))
while to_create:
obj = to_create.pop(0)
obj, arch = to_create.pop(0)
is_svc = isinstance(obj, Service)

if not obj.dirty:
Expand All @@ -477,18 +487,18 @@ def create_tasks(self) -> None:
for arch in getattr(obj, "archs", ["amd64"])
if self.services[dep].dirty
]
# TODO: implement tests for all archs in the future
if is_svc:
assert isinstance(obj, Service)
dirty_test_dep_tasks = []
for test in obj.tests:
assert isinstance(test, ToxServiceTest)
for arch in obj.archs:
if (test.image, arch) in service_build_tasks and self.services[
test.image
].dirty:
dirty_test_dep_tasks.append(
service_build_tasks[(test.image, arch)]
)
if (test.image, "amd64") in service_build_tasks and self.services[
test.image
].dirty:
dirty_test_dep_tasks.append(
service_build_tasks[(test.image, "amd64")]
)
else:
dirty_test_dep_tasks = []
dirty_recipe_test_tasks = [
Expand All @@ -500,11 +510,12 @@ def create_tasks(self) -> None:
pending_deps = (
set(dirty_dep_tasks) | set(dirty_test_dep_tasks)
) - build_tasks_created
pending_deps |= set(dirty_recipe_test_tasks) - test_tasks_created
pending_deps |= (
set(dirty_recipe_test_tasks) - set(test_tasks_created.values())
) - recipe_tasks_created
if pending_deps:
if is_svc:
for arch in obj.archs:
task_id = service_build_tasks[(obj.name, arch)]
task_id = service_build_tasks[(obj.name, arch)]
else:
task_id = recipe_test_tasks[obj.name]

Expand All @@ -515,59 +526,70 @@ def create_tasks(self) -> None:
task_id,
", ".join(pending_deps),
)
to_create.append(obj)
assert to_create, f"{obj.name} creates circular dependency"
to_create.append((obj, arch))
continue

if is_svc:
test_tasks = []
assert isinstance(obj, Service)
for test in obj.tests:
assert isinstance(test, ToxServiceTest)
for arch in obj.archs:
if arch == "amd64":
task_id = self._create_svc_test_task(
obj, test, service_build_tasks, arch
)
test_tasks_created.add(task_id)
test_tasks.append(task_id)
test_tasks_created[(obj.name, test.name)] = task_id
else:
task_id = test_tasks_created[(obj.name, test.name)]
test_tasks.append(task_id)
test_tasks.extend(dirty_recipe_test_tasks)

if isinstance(obj, ServiceTestOnly):
assert obj.tests
continue

for arch in obj.archs:
build_tasks_created.add(
self._create_build_task(
obj, dirty_dep_tasks, test_tasks, arch, service_build_tasks
)
build_tasks_created.add(
self._create_build_task(
obj, dirty_dep_tasks, test_tasks, arch, service_build_tasks
)
if len(obj.archs) > 1:
)
multi_arch = len(obj.archs) > 1
last_build_for_svc = not any(
obj is pending for (pending, _) in to_create
)

if multi_arch and last_build_for_svc:
combine_tasks_created[obj.name] = self._create_combine_task(
obj, service_build_tasks
)
if should_push:
if len(obj.archs) > 1:
push_tasks_created.add(
self._create_push_task(obj, combine_tasks_created[obj.name])
)
if multi_arch:
if last_build_for_svc:
push_tasks_created.add(
self._create_push_task(
obj, combine_tasks_created[obj.name]
)
)
else:
push_tasks_created.add(
self._create_push_task(
obj, service_build_tasks[(obj.name, obj.archs[0])]
obj, service_build_tasks[(obj.name, arch)]
)
)
else:
test_tasks_created.add(
self._create_recipe_test_task(
obj,
dirty_dep_tasks + dirty_recipe_test_tasks,
recipe_test_tasks,
if arch == "amd64":
recipe_tasks_created.add(
self._create_recipe_test_task(
obj,
dirty_dep_tasks + dirty_recipe_test_tasks,
recipe_test_tasks,
)
)
)
LOG.info(
"%s %d test tasks, %d build tasks, %d combine tasks and %d push tasks",
self._created_str,
len(test_tasks_created),
len(test_tasks_created) + len(recipe_tasks_created),
len(build_tasks_created),
len(combine_tasks_created),
len(push_tasks_created),
Expand Down

0 comments on commit 101da26

Please sign in to comment.