Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/mz-debug/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ rust_binary(
"@//misc/bazel/platforms:xlang_lto_enabled": ["-Clinker-plugin-lto"],
"//conditions:default": [],
}),
version = "0.4.0",
version = "0.4.1",
deps = [
"//src/build-info:mz_build_info",
"//src/cloud-resources:mz_cloud_resources",
Expand Down
2 changes: 1 addition & 1 deletion src/mz-debug/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "mz-debug"
description = "Debug tool for self-managed Materialize."
version = "0.4.0"
version = "0.4.1"
edition.workspace = true
rust-version.workspace = true
publish = false
Expand Down
24 changes: 17 additions & 7 deletions src/mz-debug/src/internal_http_dumper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,23 +307,33 @@ pub async fn dump_self_managed_http_resources(
&self_managed_context.mz_instance_name,
)
.await
.with_context(|| "Failed to find cluster services")?;
.unwrap_or_else(|e| {
warn!("Failed to find cluster services: {:#}", e);
vec![]
});

let environmentd_service = find_environmentd_service(
&self_managed_context.k8s_client,
&self_managed_context.k8s_namespace,
&self_managed_context.mz_instance_name,
)
.await
.with_context(|| "Failed to find environmentd service")?;
.await;

// Add warning when environmentd service is not found
if let Err(e) = &environmentd_service {
warn!("Failed to find environmentd service: {:#}", e);
}

let services = cluster_services
.iter()
.map(|service| (service, ServiceType::Clusterd))
.chain(std::iter::once((
&environmentd_service,
ServiceType::Environmentd,
)));
.chain(
environmentd_service
.as_ref()
.ok()
.into_iter()
.map(|service| (service, ServiceType::Environmentd)),
);

// Scrape each service
for (service_info, service_type) in services {
Expand Down
127 changes: 78 additions & 49 deletions src/mz-debug/src/kubectl_port_forwarder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use k8s_openapi::api::apps::v1::StatefulSet;
use k8s_openapi::api::core::v1::{Service, ServicePort};
use kube::api::ListParams;
use kube::{Api, Client};
use mz_cloud_resources::crd::materialize::v1alpha1::Materialize;
use tokio::io::AsyncBufReadExt;

use tracing::info;
Expand Down Expand Up @@ -129,62 +130,76 @@ pub struct ServiceInfo {
pub namespace: String,
}

/// Returns ServiceInfo for balancerd
pub async fn find_environmentd_service(
/// Returns the Materialize CR for the given instance
async fn get_materialize_cr(
client: &Client,
k8s_namespace: &String,
mz_instance_name: &String,
) -> Result<ServiceInfo> {
let services_api: Api<Service> = Api::namespaced(client.clone(), k8s_namespace);
) -> Result<Materialize> {
let materializes_api: Api<Materialize> = Api::namespaced(client.clone(), k8s_namespace);

let label_filter = format!(
// mz-resource-id is used to identify environmentd services
"materialize.cloud/mz-resource-id,materialize.cloud/organization-name={}",
mz_instance_name
);

let services = services_api
.list(&ListParams::default().labels(&label_filter))
let mz_cr = materializes_api
.get(mz_instance_name)
.await
.with_context(|| format!("Failed to list services in namespace {}", k8s_namespace))?;
.with_context(|| {
format!(
"Failed to get Materialize CR {} in namespace {}",
mz_instance_name, k8s_namespace
)
})?;

Ok(mz_cr)
}

// Find the first sql service that contains environmentd
let maybe_service =
services
.iter()
.find_map(|service| match (&service.metadata.name, &service.spec) {
(Some(service_name), Some(spec)) => {
// TODO (debug_tool3): This could match both the generation service and the globally active one. We should use the active one.
if !service_name.to_lowercase().contains("environmentd") {
return None;
}
/// Returns ServiceInfo for the active environmentd service
pub async fn find_environmentd_service(
client: &Client,
k8s_namespace: &String,
mz_instance_name: &String,
) -> Result<ServiceInfo> {
// Get the Materialize CR to extract the resource ID
let mz_cr = get_materialize_cr(client, k8s_namespace, mz_instance_name).await?;
let resource_id = mz_cr.resource_id();

if let Some(ports) = &spec.ports {
Some(ServiceInfo {
service_name: service_name.clone(),
service_ports: ports.clone(),
namespace: k8s_namespace.clone(),
})
} else {
None
}
}
_ => None,
});
// The environmentd service name is always mz${resource_id}-environmentd
let service_name = format!("mz{}-environmentd", resource_id);

if let Some(service) = maybe_service {
return Ok(service);
}
let services_api: Api<Service> = Api::namespaced(client.clone(), k8s_namespace);

Err(anyhow::anyhow!("Could not find environmentd service"))
let service = services_api.get(&service_name).await.with_context(|| {
format!(
"Failed to get environmentd service {} in namespace {}",
service_name, k8s_namespace
)
})?;

let service_ports = service
.spec
.as_ref()
.and_then(|spec| spec.ports.clone())
.ok_or_else(|| anyhow::anyhow!("Service {} has no ports defined", service_name))?;

Ok(ServiceInfo {
service_name,
service_ports,
namespace: k8s_namespace.clone(),
})
}

/// Returns Vec<(service_name, ports)> for cluster services
/// Returns Vec<(service_name, ports)> for cluster services from the active generation
pub async fn find_cluster_services(
client: &Client,
k8s_namespace: &String,
mz_instance_name: &String,
) -> Result<Vec<ServiceInfo>> {
// Get the Materialize CR to extract the active generation
let mz_cr = get_materialize_cr(client, k8s_namespace, mz_instance_name).await?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick, it would be nicer to just get this once, rather than in each function that needs it. Not a big deal, though.

let active_generation = mz_cr
.status
.as_ref()
.map(|status| status.active_generation)
.ok_or_else(|| anyhow::anyhow!("Materialize CR has no status"))?;

let services: Api<Service> = Api::namespaced(client.clone(), k8s_namespace);
let services = services
.list(&ListParams::default())
Expand All @@ -199,7 +214,7 @@ pub async fn find_cluster_services(
let statefulsets = statefulsets_api
.list(&ListParams::default().labels(&organization_name_filter))
.await
.with_context(|| format!("Failed to list services in namespace {}", k8s_namespace))?;
.with_context(|| format!("Failed to list statefulsets in namespace {}", k8s_namespace))?;

let cluster_services: Vec<ServiceInfo> = services
.iter()
Expand All @@ -214,22 +229,33 @@ pub async fn find_cluster_services(
return None;
}

// Check if the owner reference points to environmentd StatefulSet in the same mz instance
// Check if the owner reference points to an environmentd StatefulSet
let envd_statefulset_reference_name = service
.metadata
.owner_references
.as_ref()?
.iter()
// There should only be one StatefulSet reference to environmentd
// There should only be one StatefulSet reference to environmentd
.find(|owner_reference| owner_reference.kind == "StatefulSet")?
.name
.clone();

if !statefulsets
.iter()
.filter_map(|statefulset| statefulset.metadata.name.clone())
.any(|name| name == envd_statefulset_reference_name)
{
// Find the StatefulSet that owns this service and check if it matches the active generation
let matching_statefulset = statefulsets.iter().find(|statefulset| {
statefulset.metadata.name.as_ref() == Some(&envd_statefulset_reference_name)
})?;

// Filter by active generation - only include services whose owner StatefulSet
// has metadata.generation matching the Materialize CR's active_generation
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// has metadata.generation matching the Materialize CR's active_generation
// has materialize.cloud/generation matching the Materialize CR's active_generation

Kubernetes also has a concept of generation and it means something else.

let statefulset_generation = matching_statefulset
.metadata
.annotations
.as_ref()?
.get("materialize.cloud/generation")?
.parse::<i64>()
.ok()?;
let active_generation_i64 = i64::try_from(active_generation).ok()?;
if statefulset_generation != active_generation_i64 {
return None;
}

Expand All @@ -245,7 +271,10 @@ pub async fn find_cluster_services(
return Ok(cluster_services);
}

Err(anyhow::anyhow!("Could not find cluster services"))
Err(anyhow::anyhow!(
"Could not find cluster services for active generation {}",
active_generation
))
}

/// Creates a port forwarder for the external pg wire port of environmentd.
Expand Down