@@ -449,6 +449,70 @@ def validate(self, mods: dict[type[Modification], Any]) -> None:
449449 ), f"Expected environmentd image { expected } , but found { image } "
450450
451451
452+ class NumMaterializeEnvironments (Modification ):
453+ # Only done intentionally
454+ pick_by_default = False
455+
456+ @classmethod
457+ def values (cls ) -> list [Any ]:
458+ return [1 , 2 ]
459+
460+ @classmethod
461+ def default (cls ) -> Any :
462+ return 1
463+
464+ def modify (self , definition : dict [str , Any ]) -> None :
465+ if self .value == 2 :
466+ definition ["materialize2" ] = copy .deepcopy (definition ["materialize" ])
467+ definition ["materialize2" ]["metadata" ][
468+ "name"
469+ ] = "12345678-1234-1234-1234-123456789013"
470+ elif self .value == 1 :
471+ if "materialize2" in definition :
472+ del definition ["materialize2" ]
473+ else :
474+ raise ValueError (f"Unhandled value { self .value } " )
475+
476+ def validate (self , mods : dict [type [Modification ], Any ]) -> None :
477+ service_names = (
478+ spawn .capture (
479+ [
480+ "kubectl" ,
481+ "get" ,
482+ "services" ,
483+ "-n" ,
484+ "materialize-environment" ,
485+ "-o" ,
486+ "name" ,
487+ ],
488+ stderr = subprocess .DEVNULL ,
489+ )
490+ .strip ()
491+ .split ("\n " )
492+ )
493+ for service_name in service_names :
494+ if not "-cluster-" in service_name :
495+ continue
496+ data = json .loads (
497+ spawn .capture (
498+ [
499+ "kubectl" ,
500+ "get" ,
501+ "endpoints" ,
502+ service_name .removeprefix ("service/" ),
503+ "-n" ,
504+ "materialize-environment" ,
505+ "-o" ,
506+ "json" ,
507+ ]
508+ )
509+ )
510+ addresses = data ["subsets" ][0 ]["addresses" ]
511+ assert (
512+ len (addresses ) == 1
513+ ), f"Expected 1 address for clusterd, but found { addresses } "
514+
515+
452516class TelemetryEnabled (Modification ):
453517 @classmethod
454518 def values (cls ) -> list [Any ]:
@@ -1124,6 +1188,8 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
11241188 definition ["materialize" ]["spec" ]["environmentdImageRef" ] = get_image (
11251189 c .compose ["services" ]["environmentd" ]["image" ], args .tag
11261190 )
1191+ # kubectl get endpoints mzel5y3f42l6-cluster-u1-replica-u1-gen-1 -n materialize-environment -o json
1192+ # more than one address
11271193
11281194 rng = random .Random (args .seed )
11291195
@@ -1158,6 +1224,8 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
11581224 def get_mods () -> Iterator [list [Modification ]]:
11591225 if properties == Properties .Defaults :
11601226 assert not args .runtime
1227+ # TODO: Enable when https://github.com/MaterializeInc/materialize/pull/33489 is merged
1228+ # yield [NumMaterializeEnvironments(2)]
11611229 yield [mod_class (mod_class .default ()) for mod_class in mod_classes ]
11621230 elif properties == Properties .Individual :
11631231 assert not args .runtime
@@ -1427,17 +1495,17 @@ def upgrade(definition: dict[str, Any]) -> None:
14271495
14281496
14291497def run (definition : dict [str , Any ]) -> None :
1430- apply_input = yaml . dump_all (
1431- [
1432- definition ["namespace " ],
1433- definition ["secret " ],
1434- definition [ "materialize" ],
1435- ]
1436- )
1498+ defs = [
1499+ definition [ "namespace" ],
1500+ definition ["secret " ],
1501+ definition ["materialize " ],
1502+ ]
1503+ if "materialize2" in definition :
1504+ defs . append ( definition [ "materialize2" ] )
14371505 try :
14381506 spawn .runv (
14391507 ["kubectl" , "apply" , "-f" , "-" ],
1440- stdin = apply_input .encode (),
1508+ stdin = yaml . dump_all ( defs ) .encode (),
14411509 )
14421510 except subprocess .CalledProcessError as e :
14431511 print (f"Failed to apply: { e .stdout } \n STDERR:{ e .stderr } " )
0 commit comments