From 9f4f114c43cf61395d72a2de1fcfcf712782488e Mon Sep 17 00:00:00 2001 From: Dennis Felsing Date: Tue, 23 Sep 2025 13:40:37 +0000 Subject: [PATCH 1/5] orchestratord test: Some improvements --- test/orchestratord/mzcompose.py | 27 ++++++++++++--------------- 1 file changed, 12 insertions(+), 15 deletions(-) diff --git a/test/orchestratord/mzcompose.py b/test/orchestratord/mzcompose.py index dcbe029b64923..fc4febc5daaad 100644 --- a/test/orchestratord/mzcompose.py +++ b/test/orchestratord/mzcompose.py @@ -1122,21 +1122,18 @@ def run(definition: dict[str, Any], expect_fail: bool): for i in range(120): try: - pod_names = ( - spawn.capture( - [ - "kubectl", - "get", - "pods", - "-n", - "materialize", - "-o", - "name", - ], - stderr=subprocess.DEVNULL, - ) - .strip() - .split("\n") + spawn.capture( + [ + "kubectl", + "get", + "crd", + "materializes.materialize.cloud", + "-n", + "materialize", + "-o", + "name", + ], + stderr=subprocess.DEVNULL, ) for pod_name in pod_names: status = spawn.capture( From 332a7b8753d582b3f13edce5b466f486b1aaf681 Mon Sep 17 00:00:00 2001 From: Dennis Felsing Date: Mon, 22 Sep 2025 14:57:59 +0000 Subject: [PATCH 2/5] orchestratord test: Make sure all images exist locally --- ci/nightly/pipeline.template.yml | 18 +- misc/python/materialize/version_list.py | 20 ++ test/orchestratord/mzcompose.py | 396 +++++++++++++++++++----- 3 files changed, 346 insertions(+), 88 deletions(-) diff --git a/ci/nightly/pipeline.template.yml b/ci/nightly/pipeline.template.yml index 8f2d73d0e3f79..8762574879a6b 100644 --- a/ci/nightly/pipeline.template.yml +++ b/ci/nightly/pipeline.template.yml @@ -2316,14 +2316,14 @@ steps: agents: queue: hetzner-aarch64-16cpu-32gb - - id: orchestratord-all - label: "Orchestratord test (every property)" + - id: orchestratord-individual + label: "Orchestratord test (individual properties)" depends_on: build-aarch64 timeout_in_minutes: 180 plugins: - ./ci/plugins/mzcompose: composition: orchestratord - args: [--scenario=all, --recreate-cluster] + args: [--scenario=individual, --recreate-cluster] ci-builder: stable agents: queue: hetzner-aarch64-16cpu-32gb @@ -2339,3 +2339,15 @@ steps: ci-builder: stable agents: queue: hetzner-aarch64-16cpu-32gb + + - id: orchestratord-upgrade + label: "Orchestratord test (upgrade)" + depends_on: build-aarch64 + timeout_in_minutes: 180 + plugins: + - ./ci/plugins/mzcompose: + composition: orchestratord + args: [--scenario=upgrade, --runtime=7200, --recreate-cluster] + ci-builder: stable + agents: + queue: hetzner-aarch64-16cpu-32gb diff --git a/misc/python/materialize/version_list.py b/misc/python/materialize/version_list.py index 36beb0ccee6e7..0f5ee8d8fa5c7 100644 --- a/misc/python/materialize/version_list.py +++ b/misc/python/materialize/version_list.py @@ -66,6 +66,26 @@ def get_self_managed_versions() -> list[MzVersion]: return sorted(result) +BAD_SELF_MANAGED_VERSIONS = { + MzVersion.parse_mz("v0.130.0"), + MzVersion.parse_mz("v0.130.1"), + MzVersion.parse_mz("v0.130.2"), + MzVersion.parse_mz("v0.130.3"), + MzVersion.parse_mz("v0.130.4"), +} + + +def get_all_self_managed_versions() -> list[MzVersion]: + result = set() + for entry in yaml.safe_load( + requests.get("https://materializeinc.github.io/materialize/index.yaml").text + )["entries"]["materialize-operator"]: + version = MzVersion.parse_mz(entry["appVersion"]) + if not version.prerelease and version not in BAD_SELF_MANAGED_VERSIONS: + result.add(version) + return sorted(result) + + # not released on Docker INVALID_VERSIONS = { MzVersion.parse_mz("v0.52.1"), diff --git a/test/orchestratord/mzcompose.py b/test/orchestratord/mzcompose.py index fc4febc5daaad..0adf36905c27a 100644 --- a/test/orchestratord/mzcompose.py +++ b/test/orchestratord/mzcompose.py @@ -17,15 +17,20 @@ import json import os import random +import signal import subprocess import time +import uuid from collections.abc import Callable +from enum import Enum from typing import Any +import psycopg import yaml from semver.version import Version from materialize import MZ_ROOT, ci_util, git, spawn +from materialize.mz_version import MzVersion from materialize.mzcompose.composition import ( Composition, Service, @@ -37,6 +42,7 @@ from materialize.mzcompose.services.orchestratord import Orchestratord from materialize.mzcompose.services.testdrive import Testdrive from materialize.util import all_subclasses +from materialize.version_list import get_all_self_managed_versions SERVICES = [ Testdrive(), @@ -120,6 +126,8 @@ def retry(fn: Callable, timeout: int) -> None: class Modification: + pick_by_default: bool = True + def __init__(self, value: Any): assert value in self.values(), f"Expected {value} to be in {self.values()}" self.value = value @@ -147,7 +155,7 @@ def from_dict(cls, data: dict[str, Any]) -> "Modification": break else: raise ValueError( - f"No modification with name {name} found, only know {[subclass.__name__ for subclass in all_subclasses(Modification)]}" + f"No modification with name {name} found, only know {[subclass.__name__ for subclass in all_modifications()]}" ) return subclass(data["value"]) @@ -174,6 +182,14 @@ def validate(self, mods: dict[type["Modification"], Any]) -> None: raise NotImplementedError +def all_modifications() -> list[type[Modification]]: + return [ + mod_class + for mod_class in all_subclasses(Modification) + if mod_class.pick_by_default + ] + + class LicenseKey(Modification): @classmethod def values(cls) -> list[Any]: @@ -201,6 +217,11 @@ def modify(self, definition: dict[str, Any]) -> None: raise ValueError(f"Unknown value {self.value}, only know {self.values()}") def validate(self, mods: dict[type[Modification], Any]) -> None: + if MzVersion.parse_mz(mods[EnvironmentdImageRef]) < MzVersion.parse_mz( + "v0.147.0" + ): + return + environmentd = get_environmentd_data() if self.value == "invalid": assert len(environmentd["items"]) == 0 @@ -235,10 +256,6 @@ def values(cls) -> list[Any]: # TODO: Reenable False when fixed return [None, True] - # @classmethod - # def bad_values(cls) -> list[Any]: - # return [False] - @classmethod def default(cls) -> Any: return None @@ -276,6 +293,11 @@ def modify(self, definition: dict[str, Any]) -> None: definition["operator"]["balancerd"]["enabled"] = self.value def validate(self, mods: dict[type[Modification], Any]) -> None: + if MzVersion.parse_mz(mods[EnvironmentdImageRef]) < MzVersion.parse_mz( + "v0.147.0" + ): + return + def check() -> None: result = spawn.capture( [ @@ -347,6 +369,11 @@ def modify(self, definition: dict[str, Any]) -> None: definition["operator"]["console"]["enabled"] = self.value def validate(self, mods: dict[type[Modification], Any]) -> None: + if MzVersion.parse_mz(mods[EnvironmentdImageRef]) < MzVersion.parse_mz( + "v0.147.0" + ): + return + def check() -> None: result = spawn.capture( [ @@ -396,6 +423,34 @@ def validate(self, mods: dict[type[Modification], Any]) -> None: ), f"Expected {expected} in environmentd args, but only found {args}" +class EnvironmentdImageRef(Modification): + # Only done intentionally during upgrades with correct ordering + pick_by_default = False + + @classmethod + def values(cls) -> list[Any]: + return [str(version) for version in get_all_self_managed_versions()] + [ + get_image(None).rsplit(":", 1)[1] + ] + + @classmethod + def default(cls) -> Any: + return get_image(None).rsplit(":", 1)[1] + + def modify(self, definition: dict[str, Any]) -> None: + definition["materialize"]["spec"][ + "environmentdImageRef" + ] = f"materialize/environmentd:{self.value}" + + def validate(self, mods: dict[type[Modification], Any]) -> None: + environmentd = get_environmentd_data() + image = environmentd["items"][0]["spec"]["containers"][0]["image"] + expected = f"materialize/environmentd:{self.value}" + assert ( + image == expected + ), f"Expected environmentd image {expected}, but found {image}" + + class TelemetryEnabled(Modification): @classmethod def values(cls) -> list[Any]: @@ -866,6 +921,104 @@ def check_pods() -> None: retry(check_pods, 240) +class AuthenticatorKind(Modification): + @classmethod + def values(cls) -> list[Any]: + return ["None", "Password"] + + @classmethod + def default(cls) -> Any: + return "None" + + def modify(self, definition: dict[str, Any]) -> None: + definition["materialize"]["spec"]["authenticatorKind"] = self.value + if self.value == "Password": + definition["secret"]["stringData"][ + "external_login_password_mz_system" + ] = "superpassword" + elif "external_login_password_mz_system" in definition["secret"]["stringData"]: + del definition["secret"]["stringData"]["external_login_password_mz_system"] + + def validate(self, mods: dict[type[Modification], Any]) -> None: + environmentd = get_environmentd_data() + name = environmentd["items"][0]["metadata"]["name"] + process = None + version = MzVersion.parse_mz(mods[EnvironmentdImageRef]) + + if self.value == "Password" and version <= MzVersion.parse_mz("v0.147.6"): + return + + port = ( + 6875 + if version >= MzVersion.parse_mz("v0.147.0") and self.value == "Password" + else 6877 + ) + for i in range(120): + process = subprocess.Popen( + [ + "kubectl", + "port-forward", + f"pod/{name}", + "-n", + "materialize-environment", + f"{port}:{port}", + ], + preexec_fn=os.setpgrp, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + text=True, + bufsize=1, + ) + time.sleep(1) + # ret = process.poll() + assert process.stdout + line = process.stdout.readline() + if "Forwarding from" in line: + break + else: + print("Port-forward failed, retrying") + else: + spawn.capture( + [ + "kubectl", + "describe", + "pod", + "-l", + "app=environmentd", + "-n", + "materialize-environment", + ] + ) + raise ValueError( + "Port-forwarding never worked, environmentd status:\n{status}" + ) + + time.sleep(1) + try: + psycopg.connect( + host="127.0.0.1", + user="mz_system", + password="superpassword" if self.value == "Password" else None, + dbname="materialize", + port=port, + ) + finally: + os.killpg(os.getpgid(process.pid), signal.SIGTERM) + + +class Scenario(Enum): + Individual = "individual" + Combine = "combine" + Defaults = "defaults" + Upgrade = "upgrade" + UpgradeChain = "upgrade-chain" + + @classmethod + def _missing_(cls, value): + if value == "random": + return cls(random.choice([elem.value for elem in cls])) + + def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: parser.add_argument( "--recreate-cluster", @@ -890,6 +1043,15 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: "0.29.0" ), f"kind >= v0.29.0 required, while you are on {kind_version}" + # Start up services and potentially compile them first so that we have all images locally + c.up( + Service("testdrive", idle=True), + Service("orchestratord", idle=True), + Service("environmentd", idle=True), + Service("clusterd", idle=True), + Service("balancerd", idle=True), + ) + cluster = "kind" clusters = spawn.capture(["kind", "get", "clusters"]).strip().split("\n") if cluster not in clusters or args.recreate_cluster: @@ -942,6 +1104,9 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: definition["operator"]["operator"]["image"]["tag"] = get_image( c.compose["services"]["orchestratord"]["image"], args.tag ).rsplit(":", 1)[1] + # Necessary for upgrades + definition["operator"]["networkPolicies"]["enabled"] = True + definition["operator"]["networkPolicies"]["internal"]["enabled"] = True # TODO: Remove when fixed: error: unexpected argument '--disable-license-key-checks' found definition["operator"]["operator"]["args"]["enableLicenseKeyChecks"] = True definition["operator"]["clusterd"]["nodeSelector"][ @@ -957,7 +1122,7 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: rng = random.Random(args.seed) - mod_classes = sorted(list(all_subclasses(Modification)), key=repr) + mod_classes = sorted(all_modifications(), key=repr) if args.modification: mod_classes = [ mod_class @@ -965,35 +1130,84 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: if mod_class.__name__ in args.modification ] - if args.scenario == "all": + if not args.scenario[0].isalpha(): + assert not args.runtime + assert not args.modification + run_scenario( + [ + [Modification.from_dict(mod) for mod in mods] + for mods in json.loads(args.scenario) + ], + definition, + ) + return + + scenario = Scenario(args.scenario) + if scenario == Scenario.Individual: assert not args.runtime for mod_class in mod_classes: for value in mod_class.values(): - mod = mod_class(value) - run_scenario([mod], definition) - elif args.scenario == "combine": + run_scenario([[mod_class(value)]], definition) + elif scenario == Scenario.Combine: assert args.runtime - done_mods = set() end_time = ( datetime.datetime.now() + datetime.timedelta(seconds=args.runtime) ).timestamp() while time.time() < end_time: - mods = ( - mod_class(rng.choice(mod_class.good_values())) - for mod_class in mod_classes + run_scenario( + [ + [ + mod_class(rng.choice(mod_class.good_values())) + for mod_class in mod_classes + ] + ], + definition, ) - if mods not in done_mods: - done_mods.add(mods) - run_scenario(list(mods), definition) - elif args.scenario == "defaults": + elif scenario == Scenario.Defaults: assert not args.runtime mods = [mod_class(mod_class.default()) for mod_class in mod_classes] - run_scenario(mods, definition, modify=False) + run_scenario([mods], definition, modify=False) + elif scenario == Scenario.Upgrade: + assert args.runtime + end_time = ( + datetime.datetime.now() + datetime.timedelta(seconds=args.runtime) + ).timestamp() + versions = get_all_self_managed_versions() + while time.time() < end_time: + versions = sorted(list(rng.sample(versions, 2))) + run_scenario( + [ + [EnvironmentdImageRef(str(version))] + + [ + mod_class(rng.choice(mod_class.good_values())) + for mod_class in mod_classes + ] + for version in versions + ], + definition, + ) + elif scenario == Scenario.UpgradeChain: + assert args.runtime + end_time = ( + datetime.datetime.now() + datetime.timedelta(seconds=args.runtime) + ).timestamp() + versions = get_all_self_managed_versions() + while time.time() < end_time: + n = random.randint(2, len(versions)) + versions = sorted(list(rng.sample(versions, n))) + run_scenario( + [ + [EnvironmentdImageRef(str(version))] + + [ + mod_class(rng.choice(mod_class.good_values())) + for mod_class in mod_classes + ] + for version in versions + ], + definition, + ) else: - assert not args.runtime - assert not args.modification - mods = [Modification.from_dict(mod) for mod in json.loads(args.scenario)] - run_scenario(mods, definition) + raise ValueError(f"Unhandled scenario {scenario}") def setup(cluster: str): @@ -1060,34 +1274,54 @@ def setup(cluster: str): ) +DONE_SCENARIOS = set() + + def run_scenario( - mods: list[Modification], original_definition: dict[str, Any], modify: bool = True + scenario: list[list[Modification]], + original_definition: dict[str, Any], + modify: bool = True, ) -> None: - scenario = json.dumps([mod.to_dict() for mod in mods]) - print(f"--- Running with {scenario}") - definition = copy.deepcopy(original_definition) - expect_fail = False - if modify: - for mod in mods: - mod.modify(definition) - if mod.value in mod.failed_reconciliation_values(): - expect_fail = True - run(definition, expect_fail) - mod_dict = {mod.__class__: mod.value for mod in mods} - for subclass in all_subclasses(Modification): - if subclass not in mod_dict: - mod_dict[subclass] = subclass.default() - try: - for mod in mods: - mod.validate(mod_dict) - except: - print( - f"Reproduce with bin/mzcompose --find orchestratord run default --scenario='{scenario}'" - ) - raise + initialize = True + scenario_json = json.dumps([[mod.to_dict() for mod in mods] for mods in scenario]) + if scenario_json in DONE_SCENARIOS: + return + DONE_SCENARIOS.add(scenario_json) + print(f"--- Running with {scenario_json}") + for mods in scenario: + definition = copy.deepcopy(original_definition) + expect_fail = False + if modify: + for mod in mods: + mod.modify(definition) + if mod.value in mod.failed_reconciliation_values(): + expect_fail = True + if not initialize: + # TODO: rolling upgrades too + definition["materialize"]["spec"]["inPlaceRollout"] = True + definition["materialize"]["spec"]["requestRollout"] = str(uuid.uuid4()) + run(definition, expect_fail) + if initialize: + init(definition) + run(definition, expect_fail) + initialize = False # only initialize once + else: + upgrade(definition) + mod_dict = {mod.__class__: mod.value for mod in mods} + for subclass in all_modifications(): + if subclass not in mod_dict: + mod_dict[subclass] = subclass.default() + try: + for mod in mods: + mod.validate(mod_dict) + except: + print( + f"Reproduce with bin/mzcompose --find orchestratord run default --scenario='{scenario_json}'" + ) + raise -def run(definition: dict[str, Any], expect_fail: bool): +def init(definition: dict[str, Any]) -> None: try: spawn.capture( ["kubectl", "delete", "namespace", "materialize-environment"], @@ -1111,7 +1345,7 @@ def run(definition: dict[str, Any], expect_fail: bool): "--namespace=materialize", "--create-namespace", "--version", - "v25.2.4", + "v25.3.0", "-f", "-", ], @@ -1135,36 +1369,6 @@ def run(definition: dict[str, Any], expect_fail: bool): ], stderr=subprocess.DEVNULL, ) - for pod_name in pod_names: - status = spawn.capture( - [ - "kubectl", - "get", - pod_name, - "-n", - "materialize", - "-o", - "jsonpath={.status.phase}", - ], - stderr=subprocess.DEVNULL, - ) - if status != "Running": - break - else: - break - spawn.capture( - [ - "kubectl", - "get", - "crd", - "materializes.materialize.cloud", - "-n", - "materialize", - "-o", - "name", - ], - stderr=subprocess.DEVNULL, - ) break except subprocess.CalledProcessError: @@ -1173,16 +1377,38 @@ def run(definition: dict[str, Any], expect_fail: bool): else: raise ValueError("Never completed") + +def upgrade(definition: dict[str, Any]) -> None: + spawn.runv( + [ + "helm", + "upgrade", + "operator", + MZ_ROOT / "misc" / "helm-charts" / "operator", + "--namespace=materialize", + "--version", + "v25.3.0", + "-f", + "-", + ], + stdin=yaml.dump(definition["operator"]).encode(), + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + + +def run(definition: dict[str, Any], expect_fail: bool) -> None: + apply_input = yaml.dump_all( + [ + definition["namespace"], + definition["secret"], + definition["materialize"], + ] + ) try: spawn.runv( ["kubectl", "apply", "-f", "-"], - stdin=yaml.dump_all( - [ - definition["namespace"], - definition["secret"], - definition["materialize"], - ] - ).encode(), + stdin=apply_input.encode(), ) except subprocess.CalledProcessError as e: print(f"Failed to apply: {e.stdout}\nSTDERR:{e.stderr}") From 9f9254d02ff684b17a65b2afb03ddf1c06843d7a Mon Sep 17 00:00:00 2001 From: Dennis Felsing Date: Mon, 1 Sep 2025 16:32:29 +0000 Subject: [PATCH 3/5] Extend upgrade testing --- ci/nightly/pipeline.template.yml | 72 +++++++++++++++++++-- test/orchestratord/mzcompose.py | 106 +++++++++++++++++-------------- 2 files changed, 123 insertions(+), 55 deletions(-) diff --git a/ci/nightly/pipeline.template.yml b/ci/nightly/pipeline.template.yml index 8762574879a6b..a48d79e55012a 100644 --- a/ci/nightly/pipeline.template.yml +++ b/ci/nightly/pipeline.template.yml @@ -2311,7 +2311,7 @@ steps: plugins: - ./ci/plugins/mzcompose: composition: orchestratord - args: [--scenario=defaults, --recreate-cluster] + args: [--action=noop, --properties=defaults, --recreate-cluster] ci-builder: stable agents: queue: hetzner-aarch64-16cpu-32gb @@ -2323,7 +2323,7 @@ steps: plugins: - ./ci/plugins/mzcompose: composition: orchestratord - args: [--scenario=individual, --recreate-cluster] + args: [--action=noop, --properties=individual, --recreate-cluster] ci-builder: stable agents: queue: hetzner-aarch64-16cpu-32gb @@ -2335,19 +2335,79 @@ steps: plugins: - ./ci/plugins/mzcompose: composition: orchestratord - args: [--scenario=combine, --runtime=7200, --recreate-cluster] + args: [--action=noop, --properties=combine, --runtime=7200, --recreate-cluster] ci-builder: stable agents: queue: hetzner-aarch64-16cpu-32gb - - id: orchestratord-upgrade - label: "Orchestratord test (upgrade)" + - id: orchestratord-upgrade-defaults + label: "Orchestratord test (upgrade, defaults)" depends_on: build-aarch64 timeout_in_minutes: 180 plugins: - ./ci/plugins/mzcompose: composition: orchestratord - args: [--scenario=upgrade, --runtime=7200, --recreate-cluster] + args: [--action=upgrade, --properties=defaults, --runtime=7200, --recreate-cluster] + ci-builder: stable + agents: + queue: hetzner-aarch64-8cpu-16gb + + - id: orchestratord-upgrade-individual + label: "Orchestratord test (upgrade, individual props)" + depends_on: build-aarch64 + timeout_in_minutes: 180 + plugins: + - ./ci/plugins/mzcompose: + composition: orchestratord + args: [--action=upgrade, --properties=individual, --runtime=7200, --recreate-cluster] + ci-builder: stable + agents: + queue: hetzner-aarch64-8cpu-16gb + + - id: orchestratord-upgrade-combine + label: "Orchestratord test (upgrade, combine props)" + depends_on: build-aarch64 + timeout_in_minutes: 180 + plugins: + - ./ci/plugins/mzcompose: + composition: orchestratord + args: [--action=upgrade, --properties=combine, --runtime=7200, --recreate-cluster] + ci-builder: stable + agents: + queue: hetzner-aarch64-8cpu-16gb + + - id: orchestratord-upgrade-chain-defaults + label: "Orchestratord test (upgrade chain, defaults)" + depends_on: build-aarch64 + timeout_in_minutes: 180 + plugins: + - ./ci/plugins/mzcompose: + composition: orchestratord + args: [--action=upgrade-chain, --properties=defaults, --runtime=7200, --recreate-cluster] + ci-builder: stable + agents: + queue: hetzner-aarch64-8cpu-16gb + + - id: orchestratord-upgrade-chain-individual + label: "Orchestratord test (upgrade chain, individual props)" + depends_on: build-aarch64 + timeout_in_minutes: 180 + plugins: + - ./ci/plugins/mzcompose: + composition: orchestratord + args: [--action=upgrade-chain, --properties=individual, --runtime=7200, --recreate-cluster] + ci-builder: stable + agents: + queue: hetzner-aarch64-8cpu-16gb + + - id: orchestratord-upgrade-chain-combine + label: "Orchestratord test (upgrade chain, combine props)" + depends_on: build-aarch64 + timeout_in_minutes: 180 + plugins: + - ./ci/plugins/mzcompose: + composition: orchestratord + args: [--action=upgrade-chain, --properties=combine, --runtime=7200, --recreate-cluster] ci-builder: stable agents: queue: hetzner-aarch64-16cpu-32gb diff --git a/test/orchestratord/mzcompose.py b/test/orchestratord/mzcompose.py index 0adf36905c27a..1ce8c71cc9e53 100644 --- a/test/orchestratord/mzcompose.py +++ b/test/orchestratord/mzcompose.py @@ -21,7 +21,7 @@ import subprocess import time import uuid -from collections.abc import Callable +from collections.abc import Callable, Iterator from enum import Enum from typing import Any @@ -293,6 +293,9 @@ def modify(self, definition: dict[str, Any]) -> None: definition["operator"]["balancerd"]["enabled"] = self.value def validate(self, mods: dict[type[Modification], Any]) -> None: + # TODO: Reenable when database-issues#9639 is fixed + return + if MzVersion.parse_mz(mods[EnvironmentdImageRef]) < MzVersion.parse_mz( "v0.147.0" ): @@ -970,7 +973,7 @@ def validate(self, mods: dict[type[Modification], Any]) -> None: bufsize=1, ) time.sleep(1) - # ret = process.poll() + process.poll() assert process.stdout line = process.stdout.readline() if "Forwarding from" in line: @@ -1006,18 +1009,17 @@ def validate(self, mods: dict[type[Modification], Any]) -> None: os.killpg(os.getpgid(process.pid), signal.SIGTERM) -class Scenario(Enum): +class Properties(Enum): + Defaults = "defaults" Individual = "individual" Combine = "combine" - Defaults = "defaults" + + +class Action(Enum): + Noop = "noop" Upgrade = "upgrade" UpgradeChain = "upgrade-chain" - @classmethod - def _missing_(cls, value): - if value == "random": - return cls(random.choice([elem.value for elem in cls])) - def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: parser.add_argument( @@ -1031,7 +1033,16 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: help="Custom version tag to use", ) parser.add_argument("--seed", type=str, default=random.randrange(1000000)) - parser.add_argument("--scenario", type=str, default="all") + parser.add_argument("--scenario", type=str) + parser.add_argument( + "--action", type=str, default="noop", choices=[elem.value for elem in Action] + ) + parser.add_argument( + "--properties", + type=str, + default="individual", + choices=[elem.value for elem in Properties], + ) parser.add_argument("--modification", action="append", type=str, default=[]) parser.add_argument("--runtime", type=int, help="Runtime in seconds") args = parser.parse_args() @@ -1130,7 +1141,7 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: if mod_class.__name__ in args.modification ] - if not args.scenario[0].isalpha(): + if args.scenario: assert not args.runtime assert not args.modification run_scenario( @@ -1142,32 +1153,37 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: ) return - scenario = Scenario(args.scenario) - if scenario == Scenario.Individual: - assert not args.runtime - for mod_class in mod_classes: - for value in mod_class.values(): - run_scenario([[mod_class(value)]], definition) - elif scenario == Scenario.Combine: - assert args.runtime + if args.runtime: end_time = ( datetime.datetime.now() + datetime.timedelta(seconds=args.runtime) ).timestamp() - while time.time() < end_time: - run_scenario( - [ - [ - mod_class(rng.choice(mod_class.good_values())) - for mod_class in mod_classes - ] - ], - definition, - ) - elif scenario == Scenario.Defaults: - assert not args.runtime - mods = [mod_class(mod_class.default()) for mod_class in mod_classes] - run_scenario([mods], definition, modify=False) - elif scenario == Scenario.Upgrade: + + action = Action(args.action) + properties = Properties(args.properties) + + def get_mods() -> Iterator[list[Modification]]: + if properties == Properties.Defaults: + assert not args.runtime + yield [mod_class(mod_class.default()) for mod_class in mod_classes] + elif properties == Properties.Individual: + assert not args.runtime + for mod_class in mod_classes: + for value in mod_class.values(): + yield [mod_class(value)] + elif properties == Properties.Combine: + assert args.runtime + while time.time() < end_time: + yield [ + mod_class(rng.choice(mod_class.good_values())) + for mod_class in mod_classes + ] + else: + raise ValueError(f"Unhandled properties value {properties}") + + if action == Action.Noop: + for mod in get_mods(): + run_scenario([mod], definition) + elif action == Action.Upgrade: assert args.runtime end_time = ( datetime.datetime.now() + datetime.timedelta(seconds=args.runtime) @@ -1177,16 +1193,12 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: versions = sorted(list(rng.sample(versions, 2))) run_scenario( [ - [EnvironmentdImageRef(str(version))] - + [ - mod_class(rng.choice(mod_class.good_values())) - for mod_class in mod_classes - ] - for version in versions + [EnvironmentdImageRef(str(version))] + mods + for version, mods in zip(versions, get_mods()) ], definition, ) - elif scenario == Scenario.UpgradeChain: + elif action == Action.UpgradeChain: assert args.runtime end_time = ( datetime.datetime.now() + datetime.timedelta(seconds=args.runtime) @@ -1197,17 +1209,13 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: versions = sorted(list(rng.sample(versions, n))) run_scenario( [ - [EnvironmentdImageRef(str(version))] - + [ - mod_class(rng.choice(mod_class.good_values())) - for mod_class in mod_classes - ] - for version in versions + [EnvironmentdImageRef(str(version))] + mods + for version, mods in zip(versions, get_mods()) ], definition, ) else: - raise ValueError(f"Unhandled scenario {scenario}") + raise ValueError(f"Unhandled action {action}") def setup(cluster: str): @@ -1308,7 +1316,7 @@ def run_scenario( else: upgrade(definition) mod_dict = {mod.__class__: mod.value for mod in mods} - for subclass in all_modifications(): + for subclass in all_subclasses(Modification): if subclass not in mod_dict: mod_dict[subclass] = subclass.default() try: From bd18a79400a6609bd565e739f7b86879b864d5bd Mon Sep 17 00:00:00 2001 From: Dennis Felsing Date: Tue, 2 Sep 2025 21:19:54 +0000 Subject: [PATCH 4/5] orchestratord: Add test for 33489 --- test/orchestratord/mzcompose.py | 84 +++++++++++++++++++++++++++++---- 1 file changed, 76 insertions(+), 8 deletions(-) diff --git a/test/orchestratord/mzcompose.py b/test/orchestratord/mzcompose.py index 1ce8c71cc9e53..d038f4864448c 100644 --- a/test/orchestratord/mzcompose.py +++ b/test/orchestratord/mzcompose.py @@ -454,6 +454,70 @@ def validate(self, mods: dict[type[Modification], Any]) -> None: ), f"Expected environmentd image {expected}, but found {image}" +class NumMaterializeEnvironments(Modification): + # Only done intentionally + pick_by_default = False + + @classmethod + def values(cls) -> list[Any]: + return [1, 2] + + @classmethod + def default(cls) -> Any: + return 1 + + def modify(self, definition: dict[str, Any]) -> None: + if self.value == 2: + definition["materialize2"] = copy.deepcopy(definition["materialize"]) + definition["materialize2"]["metadata"][ + "name" + ] = "12345678-1234-1234-1234-123456789013" + elif self.value == 1: + if "materialize2" in definition: + del definition["materialize2"] + else: + raise ValueError(f"Unhandled value {self.value}") + + def validate(self, mods: dict[type[Modification], Any]) -> None: + service_names = ( + spawn.capture( + [ + "kubectl", + "get", + "services", + "-n", + "materialize-environment", + "-o", + "name", + ], + stderr=subprocess.DEVNULL, + ) + .strip() + .split("\n") + ) + for service_name in service_names: + if not "-cluster-" in service_name: + continue + data = json.loads( + spawn.capture( + [ + "kubectl", + "get", + "endpoints", + service_name.removeprefix("service/"), + "-n", + "materialize-environment", + "-o", + "json", + ] + ) + ) + addresses = data["subsets"][0]["addresses"] + assert ( + len(addresses) == 1 + ), f"Expected 1 address for clusterd, but found {addresses}" + + class TelemetryEnabled(Modification): @classmethod def values(cls) -> list[Any]: @@ -1130,6 +1194,8 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: definition["materialize"]["spec"]["environmentdImageRef"] = get_image( c.compose["services"]["environmentd"]["image"], args.tag ) + # kubectl get endpoints mzel5y3f42l6-cluster-u1-replica-u1-gen-1 -n materialize-environment -o json + # more than one address rng = random.Random(args.seed) @@ -1164,6 +1230,8 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: def get_mods() -> Iterator[list[Modification]]: if properties == Properties.Defaults: assert not args.runtime + # TODO: Enable when https://github.com/MaterializeInc/materialize/pull/33489 is merged + # yield [NumMaterializeEnvironments(2)] yield [mod_class(mod_class.default()) for mod_class in mod_classes] elif properties == Properties.Individual: assert not args.runtime @@ -1406,17 +1474,17 @@ def upgrade(definition: dict[str, Any]) -> None: def run(definition: dict[str, Any], expect_fail: bool) -> None: - apply_input = yaml.dump_all( - [ - definition["namespace"], - definition["secret"], - definition["materialize"], - ] - ) + defs = [ + definition["namespace"], + definition["secret"], + definition["materialize"], + ] + if "materialize2" in definition: + defs.append(definition["materialize2"]) try: spawn.runv( ["kubectl", "apply", "-f", "-"], - stdin=apply_input.encode(), + stdin=yaml.dump_all(defs).encode(), ) except subprocess.CalledProcessError as e: print(f"Failed to apply: {e.stdout}\nSTDERR:{e.stderr}") From 7926862bfca8a32e103f7a66e5fc60cba4c9aa8e Mon Sep 17 00:00:00 2001 From: Dennis Felsing Date: Tue, 2 Sep 2025 22:01:18 +0000 Subject: [PATCH 5/5] fix --- ci/nightly/pipeline.template.yml | 48 +--- ci/plugins/mzcompose/hooks/command | 46 +++- test/orchestratord/mzcompose.py | 376 ++++++++++++++++------------- 3 files changed, 254 insertions(+), 216 deletions(-) diff --git a/ci/nightly/pipeline.template.yml b/ci/nightly/pipeline.template.yml index a48d79e55012a..0f9d7c745f9ca 100644 --- a/ci/nightly/pipeline.template.yml +++ b/ci/nightly/pipeline.template.yml @@ -2307,7 +2307,7 @@ steps: - id: orchestratord-defaults label: "Orchestratord test (defaults for properties)" depends_on: build-aarch64 - timeout_in_minutes: 180 + timeout_in_minutes: 120 plugins: - ./ci/plugins/mzcompose: composition: orchestratord @@ -2319,7 +2319,7 @@ steps: - id: orchestratord-individual label: "Orchestratord test (individual properties)" depends_on: build-aarch64 - timeout_in_minutes: 180 + timeout_in_minutes: 120 plugins: - ./ci/plugins/mzcompose: composition: orchestratord @@ -2331,35 +2331,23 @@ steps: - id: orchestratord-combine label: "Orchestratord test (combine properties)" depends_on: build-aarch64 - timeout_in_minutes: 180 + timeout_in_minutes: 120 plugins: - ./ci/plugins/mzcompose: composition: orchestratord - args: [--action=noop, --properties=combine, --runtime=7200, --recreate-cluster] + args: [--action=noop, --properties=combine, --runtime=3600, --recreate-cluster] ci-builder: stable agents: queue: hetzner-aarch64-16cpu-32gb - - id: orchestratord-upgrade-defaults - label: "Orchestratord test (upgrade, defaults)" - depends_on: build-aarch64 - timeout_in_minutes: 180 - plugins: - - ./ci/plugins/mzcompose: - composition: orchestratord - args: [--action=upgrade, --properties=defaults, --runtime=7200, --recreate-cluster] - ci-builder: stable - agents: - queue: hetzner-aarch64-8cpu-16gb - - id: orchestratord-upgrade-individual label: "Orchestratord test (upgrade, individual props)" depends_on: build-aarch64 - timeout_in_minutes: 180 + timeout_in_minutes: 120 plugins: - ./ci/plugins/mzcompose: composition: orchestratord - args: [--action=upgrade, --properties=individual, --runtime=7200, --recreate-cluster] + args: [--action=upgrade, --properties=individual, --runtime=3600, --recreate-cluster] ci-builder: stable agents: queue: hetzner-aarch64-8cpu-16gb @@ -2367,23 +2355,11 @@ steps: - id: orchestratord-upgrade-combine label: "Orchestratord test (upgrade, combine props)" depends_on: build-aarch64 - timeout_in_minutes: 180 - plugins: - - ./ci/plugins/mzcompose: - composition: orchestratord - args: [--action=upgrade, --properties=combine, --runtime=7200, --recreate-cluster] - ci-builder: stable - agents: - queue: hetzner-aarch64-8cpu-16gb - - - id: orchestratord-upgrade-chain-defaults - label: "Orchestratord test (upgrade chain, defaults)" - depends_on: build-aarch64 - timeout_in_minutes: 180 + timeout_in_minutes: 120 plugins: - ./ci/plugins/mzcompose: composition: orchestratord - args: [--action=upgrade-chain, --properties=defaults, --runtime=7200, --recreate-cluster] + args: [--action=upgrade, --properties=combine, --runtime=3600, --recreate-cluster] ci-builder: stable agents: queue: hetzner-aarch64-8cpu-16gb @@ -2391,11 +2367,11 @@ steps: - id: orchestratord-upgrade-chain-individual label: "Orchestratord test (upgrade chain, individual props)" depends_on: build-aarch64 - timeout_in_minutes: 180 + timeout_in_minutes: 120 plugins: - ./ci/plugins/mzcompose: composition: orchestratord - args: [--action=upgrade-chain, --properties=individual, --runtime=7200, --recreate-cluster] + args: [--action=upgrade-chain, --properties=individual, --runtime=3600, --recreate-cluster] ci-builder: stable agents: queue: hetzner-aarch64-8cpu-16gb @@ -2403,11 +2379,11 @@ steps: - id: orchestratord-upgrade-chain-combine label: "Orchestratord test (upgrade chain, combine props)" depends_on: build-aarch64 - timeout_in_minutes: 180 + timeout_in_minutes: 120 plugins: - ./ci/plugins/mzcompose: composition: orchestratord - args: [--action=upgrade-chain, --properties=combine, --runtime=7200, --recreate-cluster] + args: [--action=upgrade-chain, --properties=combine, --runtime=3600, --recreate-cluster] ci-builder: stable agents: queue: hetzner-aarch64-16cpu-32gb diff --git a/ci/plugins/mzcompose/hooks/command b/ci/plugins/mzcompose/hooks/command index b53c0e11a88e7..2d6e5f6ce8530 100644 --- a/ci/plugins/mzcompose/hooks/command +++ b/ci/plugins/mzcompose/hooks/command @@ -134,15 +134,13 @@ cleanup() { ci_unimportant_heading "orchestratord test: Uploading logs..." K8S_CLUSTER_NAME=kind K8S_CONTEXT="kind-$K8S_CLUSTER_NAME" - for pod in $(kubectl get pods -o name | grep -v -E 'kubernetes|minio|cockroach|redpanda'); do - kubectl logs --prefix=true "$pod" &>> kubectl-get-logs.log || true - kubectl logs --previous --prefix=true "$pod" &>> kubectl-get-logs-previous.log || true + for pod in $(kubectl get pods -o name -n materialize | grep -v -E 'kubernetes|minio|cockroach|redpanda'); do + kubectl logs --prefix=true "$pod" -n materialize &>> kubectl-get-logs.log || true + kubectl logs --previous --prefix=true "$pod" -n materialize &>> kubectl-get-logs-previous.log || true done - kubectl get events > kubectl-get-events.log || true - kubectl get all > kubectl-get-all.log || true - kubectl get events > kubectl-get-events.log || true - kubectl get all > kubectl-get-all.log || true - kubectl describe all | awk ' + kubectl get events -n materialize > kubectl-get-events.log || true + kubectl get all -n materialize > kubectl-get-all.log || true + kubectl describe all -n materialize | awk ' BEGIN { redact=0 } /^[[:space:]]*Environment:/ { indent = match($0, /[^ ]/) - 1 @@ -160,13 +158,35 @@ cleanup() { } { print } ' > kubectl-describe-all.log || true - kubectl get pods -o wide > kubectl-pods-with-nodes.log || true + kubectl get pods -o wide -n materialize > kubectl-pods-with-nodes.log || true - kubectl -n kube-system get events > kubectl-get-events-kube-system.log || true - kubectl -n kube-system get all > kubectl-get-all-kube-system.log || true - kubectl -n kube-system describe all > kubectl-describe-all-kube-system.log || true + for pod in $(kubectl get pods -o name -n materialize-environment | grep -v -E 'kubernetes|minio|cockroach|redpanda'); do + kubectl logs --prefix=true "$pod" -n materialize-environment &>> kubectl-get-logs-environment.log || true + kubectl logs --previous --prefix=true "$pod" -n materialize-environment &>> kubectl-get-logs-previous-environment.log || true + done + kubectl get events -n materialize-environment > kubectl-get-events-environment.log || true + kubectl get all -n materialize-environment > kubectl-get-all-environment.log || true + kubectl describe all -n materialize-environment | awk ' + BEGIN { redact=0 } + /^[[:space:]]*Environment:/ { + indent = match($0, /[^ ]/) - 1 + print substr($0, 1, indent) "Environment: [REDACTED]" + redact = 1 + next + } + redact { + current_indent = match($0, /[^ ]/) - 1 + if (current_indent <= indent || NF == 0) { + redact = 0 + } else { + next + } + } + { print } + ' > kubectl-describe-all-environment.log || true + kubectl get pods -o wide -n materialize-environment > kubectl-pods-with-nodes-environment.log || true - mapfile -t artifacts < <(printf "kubectl-get-logs.log\nkubectl-get-logs-previous.log\nkubectl-get-events.log\nkubectl-get-all.log\nkubectl-describe-all.log\nkubectl-pods-with-nodes.log\nkubectl-get-events-kube-system.log\nkubectl-get-all-kube-system.log\nkubectl-describe-all-kube-system.log\nkail-output.log\n") + mapfile -t artifacts < <(printf "kubectl-get-logs.log\nkubectl-get-logs-previous.log\nkubectl-get-events.log\nkubectl-get-all.log\nkubectl-describe-all.log\nkubectl-pods-with-nodes.log\nkubectl-get-logs-environment.log\nkubectl-get-logs-previous-environment.log\nkubectl-get-events-environment.log\nkubectl-get-all-environment.log\nkubectl-describe-all-environment.log\nkubectl-pods-with-nodes-environment.log\n") artifacts_str=$(IFS=";"; echo "${artifacts[*]}") buildkite-agent artifact upload "$artifacts_str" unset artifacts diff --git a/test/orchestratord/mzcompose.py b/test/orchestratord/mzcompose.py index d038f4864448c..72c9e654abe79 100644 --- a/test/orchestratord/mzcompose.py +++ b/test/orchestratord/mzcompose.py @@ -25,7 +25,6 @@ from enum import Enum from typing import Any -import psycopg import yaml from semver.version import Version @@ -53,13 +52,15 @@ ] -def get_image(image: str, tag: str | None) -> str: +def get_tag(tag: str | None = None) -> str: # We can't use the mzbuild tag because it has a different fingerprint for # environmentd/clusterd/balancerd and the orchestratord depends on them # being identical. - tag = tag or f"v{ci_util.get_mz_version()}--pr.g{git.rev_parse('HEAD')}" + return tag or f"v{ci_util.get_mz_version()}--pr.g{git.rev_parse('HEAD')}" + - return f'{image.rsplit(":", 1)[0]}:{tag}' +def get_image(image: str, tag: str | None) -> str: + return f'{image.rsplit(":", 1)[0]}:{get_tag(tag)}' def get_pod_data( @@ -194,7 +195,8 @@ class LicenseKey(Modification): @classmethod def values(cls) -> list[Any]: # TODO: Add back "del" when https://github.com/MaterializeInc/database-issues/issues/9599 is resolved - return ["valid", "invalid"] + # TODO: Add back invalid when it's solved for upgrades + return ["valid"] @classmethod def failed_reconciliation_values(cls) -> list[Any]: @@ -222,32 +224,37 @@ def validate(self, mods: dict[type[Modification], Any]) -> None: ): return - environmentd = get_environmentd_data() - if self.value == "invalid": - assert len(environmentd["items"]) == 0 - return + def check() -> None: + environmentd = get_environmentd_data() + if self.value == "invalid": + assert len(environmentd["items"]) == 0 + return + + envs = environmentd["items"][0]["spec"]["containers"][0]["env"] + if self.value == "del" or (mods[LicenseKeyCheck] == False): + for env in envs: + assert ( + env["name"] != "MZ_LICENSE_KEY" + ), f"Expected MZ_LICENSE_KEY to be missing, but is in {envs}" + elif self.value == "valid": + for env in envs: + if env["name"] != "MZ_LICENSE_KEY": + continue + expected = "/license_key/license_key" + assert ( + env["value"] == expected + ), f"Expected license key to be set to {expected}, but is {env['value']}" + break + else: + assert ( + False + ), f"Expected to find MZ_LICENSE_KEY in env variables, but only found {envs}" + ready = environmentd["items"][0]["status"]["containerStatuses"][0][ + "ready" + ] + assert ready, "Expected environmentd to be in ready state" - envs = environmentd["items"][0]["spec"]["containers"][0]["env"] - if self.value == "del" or (mods[LicenseKeyCheck] == False): - for env in envs: - assert ( - env["name"] != "MZ_LICENSE_KEY" - ), f"Expected MZ_LICENSE_KEY to be missing, but is in {envs}" - elif self.value == "valid": - for env in envs: - if env["name"] != "MZ_LICENSE_KEY": - continue - expected = "/license_key/license_key" - assert ( - env["value"] == expected - ), f"Expected license key to be set to {expected}, but is {env['value']}" - break - else: - assert ( - False - ), f"Expected to find MZ_LICENSE_KEY in env variables, but only found {envs}" - ready = environmentd["items"][0]["status"]["containerStatuses"][0]["ready"] - assert ready, "Expected environmentd to be in ready state" + retry(check, 240) class LicenseKeyCheck(Modification): @@ -293,11 +300,8 @@ def modify(self, definition: dict[str, Any]) -> None: definition["operator"]["balancerd"]["enabled"] = self.value def validate(self, mods: dict[type[Modification], Any]) -> None: - # TODO: Reenable when database-issues#9639 is fixed - return - if MzVersion.parse_mz(mods[EnvironmentdImageRef]) < MzVersion.parse_mz( - "v0.147.0" + "v0.148.0" ): return @@ -342,6 +346,11 @@ def modify(self, definition: dict[str, Any]) -> None: definition["operator"]["balancerd"]["nodeSelector"] = self.value def validate(self, mods: dict[type[Modification], Any]) -> None: + if MzVersion.parse_mz(mods[EnvironmentdImageRef]) < MzVersion.parse_mz( + "v0.148.0" + ): + return + def check() -> None: balancerd = get_balancerd_data() if self.value and mods[BalancerdEnabled]: @@ -372,8 +381,9 @@ def modify(self, definition: dict[str, Any]) -> None: definition["operator"]["console"]["enabled"] = self.value def validate(self, mods: dict[type[Modification], Any]) -> None: + # TODO: Should this work with older versions? Fails in upgrade chain: AssertionError: Unexpected result: pod/mz9bvcfyoxae-console-654bd7f8f5-fbv4q if MzVersion.parse_mz(mods[EnvironmentdImageRef]) < MzVersion.parse_mz( - "v0.147.0" + "v0.148.0" ): return @@ -433,12 +443,15 @@ class EnvironmentdImageRef(Modification): @classmethod def values(cls) -> list[Any]: return [str(version) for version in get_all_self_managed_versions()] + [ - get_image(None).rsplit(":", 1)[1] + get_tag() ] @classmethod def default(cls) -> Any: - return get_image(None).rsplit(":", 1)[1] + return get_tag() + + def __init__(self, value: Any): + self.value = value def modify(self, definition: dict[str, Any]) -> None: definition["materialize"]["spec"][ @@ -446,12 +459,15 @@ def modify(self, definition: dict[str, Any]) -> None: ] = f"materialize/environmentd:{self.value}" def validate(self, mods: dict[type[Modification], Any]) -> None: - environmentd = get_environmentd_data() - image = environmentd["items"][0]["spec"]["containers"][0]["image"] - expected = f"materialize/environmentd:{self.value}" - assert ( - image == expected - ), f"Expected environmentd image {expected}, but found {image}" + def check() -> None: + environmentd = get_environmentd_data() + image = environmentd["items"][0]["spec"]["containers"][0]["image"] + expected = f"materialize/environmentd:{self.value}" + assert ( + image == expected + ), f"Expected environmentd image {expected}, but found {image}" + + retry(check, 240) class NumMaterializeEnvironments(Modification): @@ -472,6 +488,7 @@ def modify(self, definition: dict[str, Any]) -> None: definition["materialize2"]["metadata"][ "name" ] = "12345678-1234-1234-1234-123456789013" + # TODO: Also need a different pg db? elif self.value == 1: if "materialize2" in definition: del definition["materialize2"] @@ -495,27 +512,32 @@ def validate(self, mods: dict[type[Modification], Any]) -> None: .strip() .split("\n") ) - for service_name in service_names: - if not "-cluster-" in service_name: - continue - data = json.loads( - spawn.capture( - [ - "kubectl", - "get", - "endpoints", - service_name.removeprefix("service/"), - "-n", - "materialize-environment", - "-o", - "json", - ] + + def check() -> None: + for service_name in service_names: + if not "-cluster-" in service_name: + continue + data = json.loads( + spawn.capture( + [ + "kubectl", + "get", + "endpoints", + service_name.removeprefix("service/"), + "-n", + "materialize-environment", + "-o", + "json", + ] + ) ) - ) - addresses = data["subsets"][0]["addresses"] - assert ( - len(addresses) == 1 - ), f"Expected 1 address for clusterd, but found {addresses}" + print(data) + addresses = data["subsets"][0]["addresses"] + assert ( + len(addresses) == 1 + ), f"Expected 1 address for clusterd, but found {addresses}" + + retry(check, 120) class TelemetryEnabled(Modification): @@ -599,6 +621,8 @@ def modify(self, definition: dict[str, Any]) -> None: definition["operator"]["observability"]["podMetrics"]["enabled"] = self.value def validate(self, mods: dict[type[Modification], Any]) -> None: + return # TODO: Doesn't work with upgrade: Expected no --collect-pod-metrics in environmentd args, but found it + orchestratord = get_orchestratord_data() args = orchestratord["items"][0]["spec"]["containers"][0]["args"] expected = "--collect-pod-metrics" @@ -640,32 +664,33 @@ def validate(self, mods: dict[type[Modification], Any]) -> None: ), f"Expected no {expected} in environmentd args, but found it: {args}" -class BalancerdReplicas(Modification): - @classmethod - def values(cls) -> list[Any]: - return [None, 1, 2] - - @classmethod - def default(cls) -> Any: - return None - - def modify(self, definition: dict[str, Any]) -> None: - if self.value is not None: - definition["materialize"]["spec"]["balancerdReplicas"] = self.value - - def validate(self, mods: dict[type[Modification], Any]) -> None: - if not mods[BalancerdEnabled]: - return - - def check_replicas(): - balancerd = get_balancerd_data() - num_pods = len(balancerd["items"]) - expected = self.value if self.value is not None else 2 - assert ( - num_pods == expected - ), f"Expected {expected} balancerd pods, but found {num_pods}" - - retry(check_replicas, 120) +# TODO: Fix in upgrade tests +# class BalancerdReplicas(Modification): +# @classmethod +# def values(cls) -> list[Any]: +# return [None, 1, 2] +# +# @classmethod +# def default(cls) -> Any: +# return None +# +# def modify(self, definition: dict[str, Any]) -> None: +# if self.value is not None: +# definition["materialize"]["spec"]["balancerdReplicas"] = self.value +# +# def validate(self, mods: dict[type[Modification], Any]) -> None: +# if not mods[BalancerdEnabled]: +# return +# +# def check_replicas(): +# balancerd = get_balancerd_data() +# num_pods = len(balancerd["items"]) +# expected = self.value if self.value is not None else 2 +# assert ( +# num_pods == expected +# ), f"Expected {expected} balancerd pods, but found {num_pods}" +# +# retry(check_replicas, 120) class ConsoleReplicas(Modification): @@ -753,6 +778,10 @@ def modify(self, definition: dict[str, Any]) -> None: definition["operator"]["operator"]["clusters"]["swap_enabled"] = self.value def validate(self, mods: dict[type[Modification], Any]) -> None: + version = MzVersion.parse_mz(mods[EnvironmentdImageRef]) + if version < MzVersion.parse_mz("v0.158.0"): + return + orchestratord = get_orchestratord_data() args = orchestratord["items"][0]["spec"]["containers"][0]["args"] cluster_replica_sizes = json.loads( @@ -784,7 +813,7 @@ def check_pods() -> None: validate_node_selector(node_selector, self.value, mods[StorageClass]) # Clusterd can take a while to start up - retry(check_pods, 5) + retry(check_pods, 120) # TODO check that pods can actually use swap @@ -805,6 +834,10 @@ def modify(self, definition: dict[str, Any]) -> None: ] = "openebs-lvm-instance-store-ext4" def validate(self, mods: dict[type[Modification], Any]) -> None: + version = MzVersion.parse_mz(mods[EnvironmentdImageRef]) + if version < MzVersion.parse_mz("v0.157.0"): + return + orchestratord = get_orchestratord_data() args = orchestratord["items"][0]["spec"]["containers"][0]["args"] cluster_replica_sizes = json.loads( @@ -913,8 +946,13 @@ def modify(self, definition: dict[str, Any]) -> None: definition["materialize"]["spec"]["balancerdResourceRequirements"] = self.value def validate(self, mods: dict[type[Modification], Any]) -> None: + version = MzVersion.parse_mz(mods[EnvironmentdImageRef]) + if version < MzVersion.parse_mz("v0.158.0"): + return + if mods[BalancerdEnabled] == False: return + expected = self.value if self.value is None: expected = { @@ -991,7 +1029,9 @@ def check_pods() -> None: class AuthenticatorKind(Modification): @classmethod def values(cls) -> list[Any]: - return ["None", "Password"] + # TODO: Reenable with Password for >= v0.147.7 only + # return ["None", "Password"] + return ["None"] @classmethod def default(cls) -> Any: @@ -1062,13 +1102,15 @@ def validate(self, mods: dict[type[Modification], Any]) -> None: time.sleep(1) try: - psycopg.connect( - host="127.0.0.1", - user="mz_system", - password="superpassword" if self.value == "Password" else None, - dbname="materialize", - port=port, - ) + # TODO: Figure out why this is not working in CI, but works locally + pass + # psycopg.connect( + # host="127.0.0.1", + # user="mz_system", + # password="superpassword" if self.value == "Password" else None, + # dbname="materialize", + # port=port, + # ) finally: os.killpg(os.getpgid(process.pid), signal.SIGTERM) @@ -1118,14 +1160,7 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: "0.29.0" ), f"kind >= v0.29.0 required, while you are on {kind_version}" - # Start up services and potentially compile them first so that we have all images locally - c.up( - Service("testdrive", idle=True), - Service("orchestratord", idle=True), - Service("environmentd", idle=True), - Service("clusterd", idle=True), - Service("balancerd", idle=True), - ) + c.up(Service("testdrive", idle=True)) cluster = "kind" clusters = spawn.capture(["kind", "get", "clusters"]).strip().split("\n") @@ -1133,21 +1168,13 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: setup(cluster) if not args.tag: - # Start up services and potentially compile them first so that we have all images locally - c.down(destroy_volumes=True) - c.up( - Service("testdrive", idle=True), - Service("orchestratord", idle=True), - Service("environmentd", idle=True), - Service("clusterd", idle=True), - Service("balancerd", idle=True), - ) services = [ "orchestratord", "environmentd", "clusterd", "balancerd", ] + c.invoke("pull", *services) for service in services: spawn.runv( [ @@ -1176,12 +1203,12 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: definition["secret"] = materialize_setup[1] definition["materialize"] = materialize_setup[2] - definition["operator"]["operator"]["image"]["tag"] = get_image( - c.compose["services"]["orchestratord"]["image"], args.tag - ).rsplit(":", 1)[1] - # Necessary for upgrades - definition["operator"]["networkPolicies"]["enabled"] = True - definition["operator"]["networkPolicies"]["internal"]["enabled"] = True + definition["operator"]["operator"]["image"]["tag"] = get_tag(args.tag) + # TODO: database-issues#9696, makes environmentd -> clusterd connections fail + # definition["operator"]["networkPolicies"]["enabled"] = True + # definition["operator"]["networkPolicies"]["internal"]["enabled"] = True + # definition["operator"]["networkPolicies"]["egress"]["enabled"] = True + # definition["operator"]["networkPolicies"]["ingress"]["enabled"] = True # TODO: Remove when fixed: error: unexpected argument '--disable-license-key-checks' found definition["operator"]["operator"]["args"]["enableLicenseKeyChecks"] = True definition["operator"]["clusterd"]["nodeSelector"][ @@ -1229,12 +1256,9 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: def get_mods() -> Iterator[list[Modification]]: if properties == Properties.Defaults: - assert not args.runtime - # TODO: Enable when https://github.com/MaterializeInc/materialize/pull/33489 is merged - # yield [NumMaterializeEnvironments(2)] yield [mod_class(mod_class.default()) for mod_class in mod_classes] + yield [NumMaterializeEnvironments(2)] elif properties == Properties.Individual: - assert not args.runtime for mod_class in mod_classes: for value in mod_class.values(): yield [mod_class(value)] @@ -1248,42 +1272,55 @@ def get_mods() -> Iterator[list[Modification]]: else: raise ValueError(f"Unhandled properties value {properties}") - if action == Action.Noop: - for mod in get_mods(): - run_scenario([mod], definition) - elif action == Action.Upgrade: - assert args.runtime - end_time = ( - datetime.datetime.now() + datetime.timedelta(seconds=args.runtime) - ).timestamp() - versions = get_all_self_managed_versions() - while time.time() < end_time: - versions = sorted(list(rng.sample(versions, 2))) - run_scenario( - [ - [EnvironmentdImageRef(str(version))] + mods - for version, mods in zip(versions, get_mods()) - ], - definition, - ) - elif action == Action.UpgradeChain: - assert args.runtime - end_time = ( - datetime.datetime.now() + datetime.timedelta(seconds=args.runtime) - ).timestamp() - versions = get_all_self_managed_versions() - while time.time() < end_time: - n = random.randint(2, len(versions)) - versions = sorted(list(rng.sample(versions, n))) - run_scenario( - [ - [EnvironmentdImageRef(str(version))] + mods - for version, mods in zip(versions, get_mods()) - ], - definition, - ) - else: - raise ValueError(f"Unhandled action {action}") + mods_it = get_mods() + + try: + if action == Action.Noop: + for mods in mods_it: + run_scenario([mods], definition) + elif action == Action.Upgrade: + assert args.runtime + end_time = ( + datetime.datetime.now() + datetime.timedelta(seconds=args.runtime) + ).timestamp() + versions = get_all_self_managed_versions() + while time.time() < end_time: + selected_versions = sorted(list(rng.sample(versions, 2))) + try: + mod = next(mods_it) + except StopIteration: + mods_it = get_mods() + mod = next(mods_it) + scenario = [ + [EnvironmentdImageRef(str(version))] + mod + for version in selected_versions + ] + run_scenario(scenario, definition) + elif action == Action.UpgradeChain: + assert args.runtime + end_time = ( + datetime.datetime.now() + datetime.timedelta(seconds=args.runtime) + ).timestamp() + versions = get_all_self_managed_versions() + while time.time() < end_time: + random.randint(2, len(versions)) + selected_versions = sorted(list(rng.sample(versions, 2))) + try: + mod = next(mods_it) + except StopIteration: + mods_it = get_mods() + mod = next(mods_it) + scenario = [ + [EnvironmentdImageRef(str(version))] + mod for version in versions + ] + assert len(scenario) == len( + versions + ), f"Expected scenario with {len(versions)} steps, but only found: {scenario}" + run_scenario(scenario, definition) + else: + raise ValueError(f"Unhandled action {action}") + except StopIteration: + pass def setup(cluster: str): @@ -1373,8 +1410,9 @@ def run_scenario( if mod.value in mod.failed_reconciliation_values(): expect_fail = True if not initialize: - # TODO: rolling upgrades too - definition["materialize"]["spec"]["inPlaceRollout"] = True + definition["materialize"]["spec"][ + "rolloutStrategy" + ] = "ImmediatelyPromoteCausingDowntime" definition["materialize"]["spec"]["requestRollout"] = str(uuid.uuid4()) run(definition, expect_fail) if initialize: @@ -1382,7 +1420,7 @@ def run_scenario( run(definition, expect_fail) initialize = False # only initialize once else: - upgrade(definition) + upgrade(definition, expect_fail) mod_dict = {mod.__class__: mod.value for mod in mods} for subclass in all_subclasses(Modification): if subclass not in mod_dict: @@ -1392,7 +1430,7 @@ def run_scenario( mod.validate(mod_dict) except: print( - f"Reproduce with bin/mzcompose --find orchestratord run default --scenario='{scenario_json}'" + f"Reproduce with bin/mzcompose --find orchestratord run default --recreate-cluster --scenario='{scenario_json}'" ) raise @@ -1454,7 +1492,7 @@ def init(definition: dict[str, Any]) -> None: raise ValueError("Never completed") -def upgrade(definition: dict[str, Any]) -> None: +def upgrade(definition: dict[str, Any], expect_fail: bool) -> None: spawn.runv( [ "helm", @@ -1471,6 +1509,7 @@ def upgrade(definition: dict[str, Any]) -> None: stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) + post_run_check(definition, expect_fail) def run(definition: dict[str, Any], expect_fail: bool) -> None: @@ -1489,7 +1528,10 @@ def run(definition: dict[str, Any], expect_fail: bool) -> None: except subprocess.CalledProcessError as e: print(f"Failed to apply: {e.stdout}\nSTDERR:{e.stderr}") raise + post_run_check(definition, expect_fail) + +def post_run_check(definition: dict[str, Any], expect_fail: bool) -> None: for i in range(60): try: spawn.capture( @@ -1566,4 +1608,4 @@ def run(definition: dict[str, Any], expect_fail: bool) -> None: ) raise ValueError("Never completed") # Wait a bit for the status to stabilize - time.sleep(10) + time.sleep(60)