Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions misc/python/materialize/mzcompose/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ def get_minimal_system_parameters(
# -----
# Others (ordered by name)
"allow_real_time_recency": "true",
"clusterd_malloc_conf": "thp:always",
"constraint_based_timestamp_selection": "verify",
"enable_compute_peek_response_stash": "true",
"enable_0dt_deployment": "true" if zero_downtime else "false",
Expand Down Expand Up @@ -564,6 +565,7 @@ def get_default_system_parameters(
"enable_ctp_cluster_protocols",
"enable_paused_cluster_readhold_downgrade",
"force_swap_for_cc_sizes",
"clusterd_malloc_conf",
]


Expand Down
1 change: 1 addition & 0 deletions misc/python/materialize/mzcompose/services/clusterd.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ def __init__(
"MZ_SOFT_ASSERTIONS=1",
"MZ_EAT_MY_DATA=1",
f"CLUSTERD_PERSIST_PUBSUB_URL=http://{mz_service}:6879",
"MALLOC_CONF=thp:always",
*environment_extra,
]

Expand Down
1 change: 1 addition & 0 deletions misc/python/materialize/parallel_workload/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -1322,6 +1322,7 @@ def __init__(
"enable_mz_join_core_v2",
"force_swap_for_cc_sizes",
"enable_with_ordinality_legacy_fallback",
"clusterd_malloc_conf",
]

def run(self, exe: Executor) -> bool:
Expand Down
7 changes: 7 additions & 0 deletions src/controller-types/src/dyncfgs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,12 @@ pub const ENABLE_PAUSED_CLUSTER_READHOLD_DOWNGRADE: Config<bool> = Config::new(
"Aggressively downgrade input read holds for indexes on zero-replica clusters.",
);

pub const CLUSTERD_MALLOC_CONF: Config<&str> = Config::new(
"clusterd_malloc_conf",
"",
"MALLOC_CONF to pass to clusterd. If empty, no MALLOC_CONF is set.",
);

/// Adds the full set of all controller `Config`s.
pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet {
configs
Expand All @@ -90,6 +96,7 @@ pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet {
.add(&WALLCLOCK_LAG_HISTOGRAM_PERIOD_INTERVAL)
.add(&ENABLE_TIMELY_ZERO_COPY)
.add(&ENABLE_TIMELY_ZERO_COPY_LGALLOC)
.add(&CLUSTERD_MALLOC_CONF)
.add(&TIMELY_ZERO_COPY_LIMIT)
.add(&ARRANGEMENT_EXERT_PROPORTIONALITY)
.add(&ENABLE_CTP_CLUSTER_PROTOCOLS)
Expand Down
11 changes: 8 additions & 3 deletions src/controller/src/clusters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ use mz_compute_client::logging::LogVariant;
use mz_compute_client::service::{ComputeClient, ComputeGrpcClient};
use mz_compute_types::config::{ComputeReplicaConfig, ComputeReplicaLogging};
use mz_controller_types::dyncfgs::{
ARRANGEMENT_EXERT_PROPORTIONALITY, CONTROLLER_PAST_GENERATION_REPLICA_CLEANUP_RETRY_INTERVAL,
ENABLE_CTP_CLUSTER_PROTOCOLS, ENABLE_TIMELY_ZERO_COPY, ENABLE_TIMELY_ZERO_COPY_LGALLOC,
TIMELY_ZERO_COPY_LIMIT,
ARRANGEMENT_EXERT_PROPORTIONALITY, CLUSTERD_MALLOC_CONF,
CONTROLLER_PAST_GENERATION_REPLICA_CLEANUP_RETRY_INTERVAL, ENABLE_CTP_CLUSTER_PROTOCOLS,
ENABLE_TIMELY_ZERO_COPY, ENABLE_TIMELY_ZERO_COPY_LGALLOC, TIMELY_ZERO_COPY_LIMIT,
};
use mz_controller_types::{ClusterId, ReplicaId};
use mz_orchestrator::NamespacedOrchestrator;
Expand Down Expand Up @@ -665,6 +665,10 @@ where
});
}

let clusterd_malloc_conf = CLUSTERD_MALLOC_CONF.get(&self.dyncfg);
let clusterd_malloc_conf =
(!clusterd_malloc_conf.is_empty()).then_some(clusterd_malloc_conf);

let service = self.orchestrator.ensure_service(
&service_name,
ServiceConfig {
Expand Down Expand Up @@ -819,6 +823,7 @@ where
}],
disk_limit,
node_selector: location.allocation.selectors,
clusterd_malloc_conf,
},
)?;

Expand Down
24 changes: 18 additions & 6 deletions src/orchestrator-kubernetes/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,7 @@ impl NamespacedOrchestrator for NamespacedKubernetesOrchestrator {
replicas_selector,
disk_limit,
node_selector,
clusterd_malloc_conf,
}: ServiceConfig,
) -> Result<Box<dyn Service>, anyhow::Error> {
// This is extremely cheap to clone, so just look into the lock once.
Expand Down Expand Up @@ -971,15 +972,26 @@ impl NamespacedOrchestrator for NamespacedKubernetesOrchestrator {
}]
});

let env = if self.config.coverage {
Some(vec![EnvVar {
let mut env = Vec::new();

if self.config.coverage {
env.push(EnvVar {
name: "LLVM_PROFILE_FILE".to_string(),
value: Some(format!("/coverage/{}-%p-%9m%c.profraw", self.namespace)),
..Default::default()
}])
} else {
None
};
});
}

// Configure jemalloc's MALLOC_CONF environment symbol.
if let Some(malloc_conf) = clusterd_malloc_conf {
env.push(EnvVar {
name: "MALLOC_CONF".to_string(),
value: Some(malloc_conf),
..Default::default()
});
}

let env = (!env.is_empty()).then_some(env);

let mut volume_mounts = vec![];

Expand Down
10 changes: 10 additions & 0 deletions src/orchestrator-process/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,7 @@ impl NamespacedOrchestrator for NamespacedProcessOrchestrator {
cpu_limit: config.cpu_limit,
scale: config.scale,
labels: config.labels,
clusterd_malloc_conf: config.clusterd_malloc_conf,
disk,
};

Expand Down Expand Up @@ -461,6 +462,8 @@ struct EnsureServiceConfig {
pub labels: BTreeMap<String, String>,
/// Whether scratch disk space should be allocated for the service.
pub disk: bool,
/// Optional jemalloc configuration.
pub clusterd_malloc_conf: Option<String>,
}

/// A task executing blocking work for a [`NamespacedProcessOrchestrator`] in the background.
Expand Down Expand Up @@ -572,6 +575,7 @@ impl OrchestratorWorker {
scale,
labels,
disk,
clusterd_malloc_conf,
}: EnsureServiceConfig,
) -> Result<(), anyhow::Error> {
let full_id = self.config.full_id(&id);
Expand Down Expand Up @@ -686,6 +690,7 @@ impl OrchestratorWorker {
memory_limit,
cpu_limit,
launch_spec: self.config.launch_spec,
clusterd_malloc_conf: clusterd_malloc_conf.clone(),
}),
);

Expand Down Expand Up @@ -791,6 +796,7 @@ impl OrchestratorWorker {
memory_limit,
cpu_limit,
launch_spec,
clusterd_malloc_conf,
}: ServiceProcessConfig,
) -> impl Future<Output = ()> + use<> {
let suppress_output = self.config.suppress_output;
Expand Down Expand Up @@ -838,6 +844,9 @@ impl OrchestratorWorker {
memory_limit.as_ref(),
cpu_limit.as_ref(),
);
if let Some(malloc_conf) = &clusterd_malloc_conf {
cmd.env("MALLOC_CONF", malloc_conf);
}
info!(
"launching {full_id}-{i} via {} {}...",
cmd.as_std().get_program().to_string_lossy(),
Expand Down Expand Up @@ -933,6 +942,7 @@ struct ServiceProcessConfig {
memory_limit: Option<MemoryLimit>,
cpu_limit: Option<CpuLimit>,
launch_spec: LaunchSpec,
clusterd_malloc_conf: Option<String>,
}

struct ServiceProcessPort {
Expand Down
2 changes: 2 additions & 0 deletions src/orchestrator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,8 @@ pub struct ServiceConfig {
pub disk_limit: Option<DiskLimit>,
/// Node selector for this service.
pub node_selector: BTreeMap<String, String>,
/// Optional jemalloc configuration.
pub clusterd_malloc_conf: Option<String>,
}

/// A named port associated with a service.
Expand Down