Skip to content

Commit

Permalink
fix(cluster): create new cluster server wf with stateful loop (#1940)
Browse files Browse the repository at this point in the history
<!-- Please make sure there is an issue that this PR is correlated to. -->

## Changes

<!-- If there are frontend changes, please include screenshots. -->
  • Loading branch information
MasterPtato committed Jan 27, 2025
1 parent ce8db74 commit 82c0290
Show file tree
Hide file tree
Showing 6 changed files with 439 additions and 373 deletions.
1 change: 1 addition & 0 deletions packages/services/cluster/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ pub fn registry() -> WorkflowResult<Registry> {
registry.register_workflow::<datacenter::scale::Workflow>()?;
registry.register_workflow::<datacenter::tls_issue::Workflow>()?;
registry.register_workflow::<server::Workflow>()?;
registry.register_workflow::<server::Workflow2>()?;
registry.register_workflow::<server::install::Workflow>()?;
registry.register_workflow::<server::dns_create::Workflow>()?;
registry.register_workflow::<server::dns_delete::Workflow>()?;
Expand Down
21 changes: 1 addition & 20 deletions packages/services/cluster/src/workflows/datacenter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ pub mod scale;
pub mod tls_issue;

use crate::types::{
BuildDeliveryMethod, GuardPublicHostname, Pool, PoolType, PoolUpdate, Provider, TlsState,
BuildDeliveryMethod, GuardPublicHostname, Pool, PoolUpdate, Provider, TlsState,
};

#[derive(Debug, Serialize, Deserialize)]
Expand Down Expand Up @@ -104,17 +104,6 @@ pub(crate) async fn cluster_datacenter(ctx: &mut WorkflowCtx, input: &Input) ->
.output()
.await?;
}
Main::ServerCreate(sig) => {
ctx.workflow(crate::workflows::server::Input {
datacenter_id,
server_id: sig.server_id,
pool_type: sig.pool_type,
tags: sig.tags,
})
.tag("server_id", sig.server_id)
.dispatch()
.await?;
}
Main::TlsRenew(_) => {
if ctx.config().server()?.is_tls_enabled() {
ctx.workflow(tls_issue::Input {
Expand Down Expand Up @@ -278,17 +267,9 @@ pub struct Scale {}
#[signal("cluster_datacenter_tls_renew")]
pub struct TlsRenew {}

#[signal("cluster_datacenter_server_create")]
pub struct ServerCreate {
pub server_id: Uuid,
pub pool_type: PoolType,
pub tags: Vec<String>,
}

join_signal!(Main {
Update,
Scale,
ServerCreate,
TlsRenew,
});

Expand Down
35 changes: 24 additions & 11 deletions packages/services/cluster/src/workflows/datacenter/scale.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,17 +144,30 @@ impl Action {
Action::Provision {
server_id,
pool_type,
} => {
ctx.workflow(crate::workflows::server::Input {
datacenter_id,
server_id,
pool_type,
tags: Vec::new(),
})
.tag("server_id", server_id)
.dispatch()
.await?;
}
} => match ctx.check_version(2).await? {
1 => {
ctx.workflow(crate::workflows::server::Input {
datacenter_id,
server_id,
pool_type,
tags: Vec::new(),
})
.tag("server_id", server_id)
.dispatch()
.await?;
}
_ => {
ctx.v(2).workflow(crate::workflows::server::Input2 {
datacenter_id,
server_id,
pool_type,
tags: Vec::new(),
})
.tag("server_id", server_id)
.dispatch()
.await?;
}
},
Action::Drain { server_id } => {
ctx.signal(crate::workflows::server::Drain {})
.tag("server_id", server_id)
Expand Down
241 changes: 156 additions & 85 deletions packages/services/cluster/src/workflows/server/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use chirp_workflow::prelude::*;
use futures_util::FutureExt;
use ipnet::Ipv4Net;
use rand::Rng;
use serde_json::json;
Expand All @@ -18,16 +19,89 @@ use crate::{
types::{Pool, PoolType, Provider},
};

#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub(crate) struct Input2 {
pub datacenter_id: Uuid,
pub server_id: Uuid,
pub pool_type: PoolType,
pub tags: Vec<String>,
}

#[workflow(Workflow2)]
pub(crate) async fn cluster_server2(ctx: &mut WorkflowCtx, input: &Input2) -> GlobalResult<()> {
let (dc, provider_server_workflow_id) = provision_server(ctx, input).await?;

let has_dns = ctx
.loope(State::default(), |ctx, state| {
let input = input.clone();
let dc = dc.clone();

async move { lifecycle(ctx, &input, &dc, state).await }.boxed()
})
.await?;

cleanup(
ctx,
input,
&dc.provider,
provider_server_workflow_id,
has_dns,
)
.await?;

Ok(())
}

/// Old cluster_server workflow before loop state was implemented.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub(crate) struct Input {
pub datacenter_id: Uuid,
pub server_id: Uuid,
pub pool_type: PoolType,
pub tags: Vec<String>,
}

impl From<Input> for Input2 {
fn from(input: Input) -> Self {
Input2 {
datacenter_id: input.datacenter_id,
server_id: input.server_id,
pool_type: input.pool_type,
tags: input.tags,
}
}
}

#[workflow]
pub(crate) async fn cluster_server(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResult<()> {
let input = input.clone().into();
let (dc, provider_server_workflow_id) = provision_server(ctx, &input).await?;

// NOTE: This loop has side effects (for state) so we do not use `ctx.repeat`
let mut state = State::default();
loop {
match lifecycle(ctx, &input, &dc, &mut state).await? {
Loop::Continue => {}
Loop::Break(_) => break,
}
}

cleanup(
ctx,
&input,
&dc.provider,
provider_server_workflow_id,
state.has_dns,
)
.await?;

Ok(())
}

async fn provision_server(
ctx: &mut WorkflowCtx,
input: &Input2,
) -> GlobalResult<(GetDcOutput, Uuid)> {
let dc = ctx
.activity(GetDcInput {
datacenter_id: input.datacenter_id,
Expand Down Expand Up @@ -234,105 +308,101 @@ pub(crate) async fn cluster_server(ctx: &mut WorkflowCtx, input: &Input) -> Glob
bail!("failed all attempts to provision server");
};

// NOTE: This loop has side effects (for state) so we do not use `ctx.repeat`
let mut state = State::default();
loop {
match state.run(ctx).await? {
Main::DnsCreate(_) => {
ctx.workflow(dns_create::Input {
server_id: input.server_id,
})
.output()
.await?;
}
Main::DnsDelete(_) => {
ctx.workflow(dns_delete::Input {
server_id: input.server_id,
})
.output()
.await?;
}
Main::NomadRegistered(sig) => {
ctx.activity(SetNomadNodeIdInput {
server_id: input.server_id,
cluster_id: dc.cluster_id,
datacenter_id: dc.datacenter_id,
provider_datacenter_id: dc.provider_datacenter_id.clone(),
datacenter_name_id: dc.name_id.clone(),
node_id: sig.node_id,
})
.await?;
Ok((dc, provider_server_workflow_id))
}

// Scale to get rid of tainted servers
ctx.signal(crate::workflows::datacenter::Scale {})
.tag("datacenter_id", input.datacenter_id)
.send()
.await?;
}
Main::PegboardRegistered(_) => {
ctx.activity(SetPegboardClientIdInput {
server_id: input.server_id,
cluster_id: dc.cluster_id,
datacenter_id: dc.datacenter_id,
provider_datacenter_id: dc.provider_datacenter_id.clone(),
datacenter_name_id: dc.name_id.clone(),
client_id: input.server_id,
})
.await?;
async fn lifecycle(
ctx: &mut WorkflowCtx,
input: &Input2,
dc: &GetDcOutput,
state: &mut State,
) -> GlobalResult<Loop<bool>> {
match state.run(ctx).await? {
Main::DnsCreate(_) => {
ctx.workflow(dns_create::Input {
server_id: input.server_id,
})
.output()
.await?;
}
Main::DnsDelete(_) => {
ctx.workflow(dns_delete::Input {
server_id: input.server_id,
})
.output()
.await?;
}
Main::NomadRegistered(sig) => {
ctx.activity(SetNomadNodeIdInput {
server_id: input.server_id,
cluster_id: dc.cluster_id,
datacenter_id: dc.datacenter_id,
provider_datacenter_id: dc.provider_datacenter_id.clone(),
datacenter_name_id: dc.name_id.clone(),
node_id: sig.node_id,
})
.await?;

// Scale to get rid of tainted servers
ctx.signal(crate::workflows::datacenter::Scale {})
.tag("datacenter_id", input.datacenter_id)
.send()
.await?;
}
Main::Drain(_) => {
ctx.workflow(drain::Input {
datacenter_id: input.datacenter_id,
server_id: input.server_id,
pool_type: input.pool_type,
})
.output()
// Scale to get rid of tainted servers
ctx.signal(crate::workflows::datacenter::Scale {})
.tag("datacenter_id", input.datacenter_id)
.send()
.await?;
}
Main::Undrain(_) => {
ctx.workflow(undrain::Input {
datacenter_id: input.datacenter_id,
server_id: input.server_id,
pool_type: input.pool_type,
})
.output()
}
Main::PegboardRegistered(_) => {
ctx.activity(SetPegboardClientIdInput {
server_id: input.server_id,
cluster_id: dc.cluster_id,
datacenter_id: dc.datacenter_id,
provider_datacenter_id: dc.provider_datacenter_id.clone(),
datacenter_name_id: dc.name_id.clone(),
client_id: input.server_id,
})
.await?;

// Scale to get rid of tainted servers
ctx.signal(crate::workflows::datacenter::Scale {})
.tag("datacenter_id", input.datacenter_id)
.send()
.await?;
}
Main::Drain(_) => {
ctx.workflow(drain::Input {
datacenter_id: input.datacenter_id,
server_id: input.server_id,
pool_type: input.pool_type,
})
.output()
.await?;
}
Main::Undrain(_) => {
ctx.workflow(undrain::Input {
datacenter_id: input.datacenter_id,
server_id: input.server_id,
pool_type: input.pool_type,
})
.output()
.await?;
}
Main::Taint(_) => {} // Only for state
Main::Destroy(_) => {
if let PoolType::Fdb = input.pool_type {
bail!("you cant kill fdb you stupid chud");
}
Main::Taint(_) => {} // Only for state
Main::Destroy(_) => {
if let PoolType::Fdb = input.pool_type {
bail!("you cant kill fdb you stupid chud");
}

break;
}
return Ok(Loop::Break(state.has_dns));
}
}

cleanup(
ctx,
input,
&dc.provider,
provider_server_workflow_id,
state.has_dns,
)
.await?;

Ok(())
Ok(Loop::Continue)
}

#[derive(Debug, Serialize, Deserialize, Hash)]
pub(crate) struct GetDcInput {
pub datacenter_id: Uuid,
}

#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub(crate) struct GetDcOutput {
pub datacenter_id: Uuid,
pub cluster_id: Uuid,
Expand Down Expand Up @@ -797,7 +867,7 @@ async fn set_drain_complete(ctx: &ActivityCtx, input: &SetDrainCompleteInput) ->

async fn cleanup(
ctx: &mut WorkflowCtx,
input: &Input,
input: &Input2,
provider: &Provider,
provider_server_workflow_id: Uuid,
cleanup_dns: bool,
Expand Down Expand Up @@ -838,6 +908,7 @@ async fn cleanup(
}

/// Finite state machine for handling server updates.
#[derive(Debug, Serialize, Deserialize)]
struct State {
draining: bool,
has_dns: bool,
Expand Down
Loading

0 comments on commit 82c0290

Please sign in to comment.