@@ -461,6 +461,70 @@ def validate(self, mods: dict[type[Modification], Any]) -> None:
461461 ), f"Expected environmentd image { expected } , but found { image } "
462462
463463
464+ class NumMaterializeEnvironments (Modification ):
465+ # Only done intentionally
466+ pick_by_default = False
467+
468+ @classmethod
469+ def values (cls ) -> list [Any ]:
470+ return [1 , 2 ]
471+
472+ @classmethod
473+ def default (cls ) -> Any :
474+ return 1
475+
476+ def modify (self , definition : dict [str , Any ]) -> None :
477+ if self .value == 2 :
478+ definition ["materialize2" ] = copy .deepcopy (definition ["materialize" ])
479+ definition ["materialize2" ]["metadata" ][
480+ "name"
481+ ] = "12345678-1234-1234-1234-123456789013"
482+ elif self .value == 1 :
483+ if "materialize2" in definition :
484+ del definition ["materialize2" ]
485+ else :
486+ raise ValueError (f"Unhandled value { self .value } " )
487+
488+ def validate (self , mods : dict [type [Modification ], Any ]) -> None :
489+ service_names = (
490+ spawn .capture (
491+ [
492+ "kubectl" ,
493+ "get" ,
494+ "services" ,
495+ "-n" ,
496+ "materialize-environment" ,
497+ "-o" ,
498+ "name" ,
499+ ],
500+ stderr = subprocess .DEVNULL ,
501+ )
502+ .strip ()
503+ .split ("\n " )
504+ )
505+ for service_name in service_names :
506+ if not "-cluster-" in service_name :
507+ continue
508+ data = json .loads (
509+ spawn .capture (
510+ [
511+ "kubectl" ,
512+ "get" ,
513+ "endpoints" ,
514+ service_name .removeprefix ("service/" ),
515+ "-n" ,
516+ "materialize-environment" ,
517+ "-o" ,
518+ "json" ,
519+ ]
520+ )
521+ )
522+ addresses = data ["subsets" ][0 ]["addresses" ]
523+ assert (
524+ len (addresses ) == 1
525+ ), f"Expected 1 address for clusterd, but found { addresses } "
526+
527+
464528class TelemetryEnabled (Modification ):
465529 @classmethod
466530 def values (cls ) -> list [Any ]:
@@ -907,6 +971,8 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
907971 definition ["materialize" ]["spec" ][
908972 "environmentdImageRef"
909973 ] = f"materialize/environmentd:{ get_tag (args .tag )} "
974+ # kubectl get endpoints mzel5y3f42l6-cluster-u1-replica-u1-gen-1 -n materialize-environment -o json
975+ # more than one address
910976
911977 rng = random .Random (args .seed )
912978
@@ -941,6 +1007,8 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
9411007 def get_mods () -> Iterator [list [Modification ]]:
9421008 if properties == Properties .Defaults :
9431009 assert not args .runtime
1010+ # TODO: Enable when https://github.com/MaterializeInc/materialize/pull/33489 is merged
1011+ # yield [NumMaterializeEnvironments(2)]
9441012 yield [mod_class (mod_class .default ()) for mod_class in mod_classes ]
9451013 elif properties == Properties .Individual :
9461014 assert not args .runtime
@@ -1198,17 +1266,17 @@ def upgrade(definition: dict[str, Any]) -> None:
11981266
11991267
12001268def run (definition : dict [str , Any ]) -> None :
1201- apply_input = yaml . dump_all (
1202- [
1203- definition ["namespace " ],
1204- definition ["secret " ],
1205- definition [ "materialize" ],
1206- ]
1207- )
1269+ defs = [
1270+ definition [ "namespace" ],
1271+ definition ["secret " ],
1272+ definition ["materialize " ],
1273+ ]
1274+ if "materialize2" in definition :
1275+ defs . append ( definition [ "materialize2" ] )
12081276 try :
12091277 spawn .runv (
12101278 ["kubectl" , "apply" , "-f" , "-" ],
1211- stdin = apply_input .encode (),
1279+ stdin = yaml . dump_all ( defs ) .encode (),
12121280 )
12131281 except subprocess .CalledProcessError as e :
12141282 print (f"Failed to apply: { e .stdout } \n STDERR:{ e .stderr } " )
0 commit comments