@@ -454,6 +454,70 @@ def validate(self, mods: dict[type[Modification], Any]) -> None:
454454 ), f"Expected environmentd image { expected } , but found { image } "
455455
456456
457+ class NumMaterializeEnvironments (Modification ):
458+ # Only done intentionally
459+ pick_by_default = False
460+
461+ @classmethod
462+ def values (cls ) -> list [Any ]:
463+ return [1 , 2 ]
464+
465+ @classmethod
466+ def default (cls ) -> Any :
467+ return 1
468+
469+ def modify (self , definition : dict [str , Any ]) -> None :
470+ if self .value == 2 :
471+ definition ["materialize2" ] = copy .deepcopy (definition ["materialize" ])
472+ definition ["materialize2" ]["metadata" ][
473+ "name"
474+ ] = "12345678-1234-1234-1234-123456789013"
475+ elif self .value == 1 :
476+ if "materialize2" in definition :
477+ del definition ["materialize2" ]
478+ else :
479+ raise ValueError (f"Unhandled value { self .value } " )
480+
481+ def validate (self , mods : dict [type [Modification ], Any ]) -> None :
482+ service_names = (
483+ spawn .capture (
484+ [
485+ "kubectl" ,
486+ "get" ,
487+ "services" ,
488+ "-n" ,
489+ "materialize-environment" ,
490+ "-o" ,
491+ "name" ,
492+ ],
493+ stderr = subprocess .DEVNULL ,
494+ )
495+ .strip ()
496+ .split ("\n " )
497+ )
498+ for service_name in service_names :
499+ if not "-cluster-" in service_name :
500+ continue
501+ data = json .loads (
502+ spawn .capture (
503+ [
504+ "kubectl" ,
505+ "get" ,
506+ "endpoints" ,
507+ service_name .removeprefix ("service/" ),
508+ "-n" ,
509+ "materialize-environment" ,
510+ "-o" ,
511+ "json" ,
512+ ]
513+ )
514+ )
515+ addresses = data ["subsets" ][0 ]["addresses" ]
516+ assert (
517+ len (addresses ) == 1
518+ ), f"Expected 1 address for clusterd, but found { addresses } "
519+
520+
457521class TelemetryEnabled (Modification ):
458522 @classmethod
459523 def values (cls ) -> list [Any ]:
@@ -1130,6 +1194,8 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
11301194 definition ["materialize" ]["spec" ]["environmentdImageRef" ] = get_image (
11311195 c .compose ["services" ]["environmentd" ]["image" ], args .tag
11321196 )
1197+ # kubectl get endpoints mzel5y3f42l6-cluster-u1-replica-u1-gen-1 -n materialize-environment -o json
1198+ # more than one address
11331199
11341200 rng = random .Random (args .seed )
11351201
@@ -1164,6 +1230,8 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
11641230 def get_mods () -> Iterator [list [Modification ]]:
11651231 if properties == Properties .Defaults :
11661232 assert not args .runtime
1233+ # TODO: Enable when https://github.com/MaterializeInc/materialize/pull/33489 is merged
1234+ # yield [NumMaterializeEnvironments(2)]
11671235 yield [mod_class (mod_class .default ()) for mod_class in mod_classes ]
11681236 elif properties == Properties .Individual :
11691237 assert not args .runtime
@@ -1406,17 +1474,17 @@ def upgrade(definition: dict[str, Any]) -> None:
14061474
14071475
14081476def run (definition : dict [str , Any ], expect_fail : bool ) -> None :
1409- apply_input = yaml . dump_all (
1410- [
1411- definition ["namespace " ],
1412- definition ["secret " ],
1413- definition [ "materialize" ],
1414- ]
1415- )
1477+ defs = [
1478+ definition [ "namespace" ],
1479+ definition ["secret " ],
1480+ definition ["materialize " ],
1481+ ]
1482+ if "materialize2" in definition :
1483+ defs . append ( definition [ "materialize2" ] )
14161484 try :
14171485 spawn .runv (
14181486 ["kubectl" , "apply" , "-f" , "-" ],
1419- stdin = apply_input .encode (),
1487+ stdin = yaml . dump_all ( defs ) .encode (),
14201488 )
14211489 except subprocess .CalledProcessError as e :
14221490 print (f"Failed to apply: { e .stdout } \n STDERR:{ e .stderr } " )
0 commit comments