From c8e5674001d77fafff99df8548ca8ee1baad254c Mon Sep 17 00:00:00 2001 From: MasterPtato Date: Mon, 2 Mar 2026 12:28:57 -0800 Subject: [PATCH] fix(ups): formalize subjects --- .../skills/git-spice-merge-my-stack/SKILL.md | 60 +++++++++++++ engine/packages/gasoline/src/ctx/message.rs | 58 ++++++++---- engine/packages/gasoline/src/error.rs | 3 + engine/packages/universalpubsub/src/lib.rs | 2 + engine/packages/universalpubsub/src/pubsub.rs | 88 +++++++++---------- .../packages/universalpubsub/src/subject.rs | 32 +++++++ 6 files changed, 179 insertions(+), 64 deletions(-) create mode 100644 .claude/skills/git-spice-merge-my-stack/SKILL.md create mode 100644 engine/packages/universalpubsub/src/subject.rs diff --git a/.claude/skills/git-spice-merge-my-stack/SKILL.md b/.claude/skills/git-spice-merge-my-stack/SKILL.md new file mode 100644 index 0000000000..b11c4b78c8 --- /dev/null +++ b/.claude/skills/git-spice-merge-my-stack/SKILL.md @@ -0,0 +1,60 @@ +--- +name: git-spice-merge-my-stack +description: Merge a stacked git-spice branch chain with gh by retargeting each PR to main and merging bottom to top, including conflict recovery via rebase. +license: MIT +compatibility: Requires GitHub CLI (gh), git, and push access. +metadata: + author: rivet + version: "1.0" +--- + +Merge a stacked PR chain. + +**Input**: A target branch in the stack (usually the top branch to merge through). + +**Goal**: Merge all PRs from the bottom of that stack up to the target branch. + +## Steps + +1. **Resolve the target PR** + - Find PR for the provided branch: + - `gh pr list --state open --head "" --json number,headRefName,baseRefName,url` + - If no open PR exists, stop and report. + +2. **Build the stack chain down to main** + - Start at target PR. + - Repeatedly find the PR whose `headRefName` equals the current PR `baseRefName`. + - Continue until base is `main` or no parent PR exists. + - If chain is ambiguous, stop and ask the user which branch to follow. + +3. **Determine merge order** + - Merge from **bottom to top**. + - Example: `[bottom, ..., target]`. + +4. **For each PR in order** + - Retarget to `main` before merge: + - `gh pr edit --base main` + - Merge with repository-compatible strategy: + - Try `gh pr merge --squash --delete-branch=false` + - If merge fails due conflicts: + - `gh pr checkout ` + - `git fetch origin main` + - `git rebase origin/main` + - Resolve conflicts. If replaying already-upstream commits from lower stack layers, prefer `git rebase --skip`. + - Continue with `GIT_EDITOR=true git rebase --continue` when needed. + - `git push --force-with-lease origin ` + - Retry `gh pr merge ... --squash`. + +5. **Verify completion** + - Confirm each PR in chain is merged: + - `gh pr view --json state,mergedAt,url` + - Report final ordered merge list with PR numbers and timestamps. + +## Guardrails + +- Always merge in bottom-to-top order. +- Do not use merge commits if the repo disallows them. +- Do not delete remote branches unless explicitly requested. +- If a conflict cannot be safely resolved, stop and ask the user. +- If force-push is required, use `--force-with-lease`, never `--force`. +- After finishing, return to the user's original branch unless they asked otherwise. diff --git a/engine/packages/gasoline/src/ctx/message.rs b/engine/packages/gasoline/src/ctx/message.rs index f61fbbb723..c32a779061 100644 --- a/engine/packages/gasoline/src/ctx/message.rs +++ b/engine/packages/gasoline/src/ctx/message.rs @@ -1,5 +1,6 @@ use std::{ - fmt::{self, Debug}, + borrow::Cow, + fmt::{self, Debug, Display}, marker::PhantomData, sync::Arc, }; @@ -8,7 +9,7 @@ use rivet_pools::UpsPool; use rivet_util::Id; use tokio_util::sync::{CancellationToken, DropGuard}; use tracing::Instrument; -use universalpubsub::{NextOutput, Subscriber}; +use universalpubsub::{NextOutput, Subject, Subscriber}; use crate::{ error::{WorkflowError, WorkflowResult}, @@ -61,7 +62,7 @@ impl MessageCtx { let client = self.clone(); let topic = topic.to_string(); - let spawn_res = tokio::task::Builder::new() + tokio::task::Builder::new() .name("gasoline::message_async") .spawn( async move { @@ -73,13 +74,9 @@ impl MessageCtx { } } .instrument(tracing::info_span!("message_bg")), - ); - - if let Err(err) = spawn_res { - tracing::error!(?err, "failed to spawn message_async task"); - } - - Ok(()) + ) + .map_err(|err| WorkflowError::PublishMessage(err.into())) + .map(|_| ()) } /// Same as `message` but waits for the message to successfully publish. @@ -92,7 +89,10 @@ impl MessageCtx { where M: Message, { - let subject = format!("{}:{topic}", M::subject()); + let subject = MsgSubject { + topic, + msg_marker: PhantomData::, + }; let duration_since_epoch = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap_or_else(|err| unreachable!("time is broken: {}", err)); @@ -126,15 +126,14 @@ impl MessageCtx { // It's important to write to the stream as fast as possible in order to // ensure messages are handled quickly. let message_buf = Arc::new(message_buf); - self.message_publish_pubsub::(&subject, message_buf) - .await; + self.message_publish_pubsub::(subject, message_buf).await; Ok(()) } /// Publishes the message to pubsub. #[tracing::instrument(level = "debug", skip_all)] - async fn message_publish_pubsub(&self, subject: &str, message_buf: Arc>) + async fn message_publish_pubsub(&self, subject: MsgSubject<'_, M>, message_buf: Arc>) where M: Message, { @@ -144,8 +143,6 @@ impl MessageCtx { // Ignore for infinite backoff backoff.tick().await; - let subject = subject.to_owned(); - tracing::trace!( %subject, message_len = message_buf.len(), @@ -154,7 +151,7 @@ impl MessageCtx { if let Err(err) = self .pubsub .publish( - &subject, + subject.clone(), &(*message_buf), universalpubsub::PublishOpts::broadcast(), ) @@ -332,3 +329,30 @@ where }) } } + +// Helper struct +struct MsgSubject<'a, M: Message> { + topic: &'a str, + msg_marker: PhantomData, +} + +impl Clone for MsgSubject<'_, M> { + fn clone(&self) -> Self { + MsgSubject { + topic: self.topic, + msg_marker: PhantomData::, + } + } +} + +impl Display for MsgSubject<'_, M> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}:{}", M::subject(), self.topic) + } +} + +impl Subject for MsgSubject<'_, M> { + fn root<'a>() -> Option> { + Some(Cow::Owned(M::subject())) + } +} diff --git a/engine/packages/gasoline/src/error.rs b/engine/packages/gasoline/src/error.rs index 69fa449b30..30edb2f7f7 100644 --- a/engine/packages/gasoline/src/error.rs +++ b/engine/packages/gasoline/src/error.rs @@ -112,6 +112,9 @@ pub enum WorkflowError { #[error("failed to flush pubsub: {0}")] FlushPubsub(#[source] anyhow::Error), + #[error("failed to publish message: {0}")] + PublishMessage(#[source] anyhow::Error), + #[error("subscription unsubscribed")] SubscriptionUnsubscribed, diff --git a/engine/packages/universalpubsub/src/lib.rs b/engine/packages/universalpubsub/src/lib.rs index 2e61a700f0..2685e86607 100644 --- a/engine/packages/universalpubsub/src/lib.rs +++ b/engine/packages/universalpubsub/src/lib.rs @@ -3,6 +3,8 @@ pub mod driver; pub mod errors; pub mod metrics; pub mod pubsub; +pub mod subject; pub use driver::*; pub use pubsub::{Message, NextOutput, PubSub, Response, Subscriber}; +pub use subject::Subject; diff --git a/engine/packages/universalpubsub/src/pubsub.rs b/engine/packages/universalpubsub/src/pubsub.rs index 46c379ae34..b6328945ca 100644 --- a/engine/packages/universalpubsub/src/pubsub.rs +++ b/engine/packages/universalpubsub/src/pubsub.rs @@ -13,6 +13,7 @@ use rivet_util::backoff::Backoff; use crate::chunking::{ChunkTracker, encode_chunk, split_payload_into_chunks}; use crate::driver::{PubSubDriverHandle, PublishOpts, SubscriberDriverHandle}; use crate::metrics; +use crate::subject::Subject; const GC_INTERVAL: Duration = Duration::from_secs(60); @@ -82,9 +83,11 @@ impl PubSub { } #[tracing::instrument(skip_all, fields(%subject))] - pub async fn subscribe(&self, subject: &str) -> Result { + pub async fn subscribe(&self, subject: impl Subject) -> Result { + let subject = subject.as_cow(); + // Underlying driver subscription - let driver = self.driver.subscribe(subject).await?; + let driver = self.driver.subscribe(&subject).await?; if !self.memory_optimization { return Ok(Subscriber::new(driver, self.clone(), None)); @@ -117,46 +120,33 @@ impl PubSub { } #[tracing::instrument(skip_all, fields(%subject))] - pub async fn publish(&self, subject: &str, payload: &[u8], opts: PublishOpts) -> Result<()> { - let message_id = *Uuid::new_v4().as_bytes(); - let chunks = - split_payload_into_chunks(payload, self.driver.max_message_size(), message_id, None)?; - let chunk_count = chunks.len() as u32; - - let use_local = self - .should_use_local_subscriber(subject, opts.behavior) - .await; - - for (chunk_idx, chunk_payload) in chunks.into_iter().enumerate() { - let encoded = encode_chunk( - chunk_payload, - chunk_idx as u32, - chunk_count, - message_id, - None, - )?; - - if use_local { - if let Some(sender) = self.local_subscribers.get_async(subject).await { - let _ = sender.send(encoded); - } else { - tracing::warn!(%subject, "local subscriber disappeared"); - break; - } - } else { - // Use backoff when publishing through the driver - self.publish_with_backoff(subject, &encoded).await?; - } - } - Ok(()) + pub async fn publish( + &self, + subject: impl Subject, + payload: &[u8], + opts: PublishOpts, + ) -> Result<()> { + self.publish_inner(subject, payload, None::<&str>, opts) + .await } #[tracing::instrument(skip_all, fields(%subject, %reply_subject))] pub async fn publish_with_reply( &self, - subject: &str, + subject: impl Subject, + payload: &[u8], + reply_subject: impl Subject, + opts: PublishOpts, + ) -> Result<()> { + self.publish_inner(subject, payload, Some(reply_subject), opts) + .await + } + + async fn publish_inner( + &self, + subject: impl Subject, payload: &[u8], - reply_subject: &str, + reply_subject: Option, opts: PublishOpts, ) -> Result<()> { let message_id = *Uuid::new_v4().as_bytes(); @@ -164,25 +154,27 @@ impl PubSub { payload, self.driver.max_message_size(), message_id, - Some(reply_subject), + reply_subject.as_ref().map(|x| x.as_cow()).as_deref(), )?; let chunk_count = chunks.len() as u32; let use_local = self - .should_use_local_subscriber(subject, opts.behavior) + .should_use_local_subscriber(&subject, opts.behavior) .await; + let subject_cow = subject.as_cow(); + for (chunk_idx, chunk_payload) in chunks.into_iter().enumerate() { let encoded = encode_chunk( chunk_payload, chunk_idx as u32, chunk_count, message_id, - Some(reply_subject.to_string()), + reply_subject.as_ref().map(|x| x.to_string()), )?; if use_local { - if let Some(sender) = self.local_subscribers.get_async(subject).await { + if let Some(sender) = self.local_subscribers.get_async(&*subject_cow).await { let _ = sender.send(encoded); } else { tracing::warn!(%subject, "local subscriber disappeared"); @@ -190,17 +182,19 @@ impl PubSub { } } else { // Use backoff when publishing through the driver - self.publish_with_backoff(subject, &encoded).await?; + self.publish_with_backoff(&subject, &encoded).await?; } } Ok(()) } #[tracing::instrument(skip_all, fields(%subject))] - async fn publish_with_backoff(&self, subject: &str, encoded: &[u8]) -> Result<()> { + async fn publish_with_backoff(&self, subject: &impl Subject, encoded: &[u8]) -> Result<()> { + let subject = subject.as_cow(); + let mut backoff = Backoff::default(); loop { - match self.driver.publish(subject, encoded).await { + match self.driver.publish(&subject, encoded).await { Result::Ok(_) => break, Err(err) if !backoff.tick().await => { tracing::warn!(?err, "error publishing, cannot retry again"); @@ -221,7 +215,7 @@ impl PubSub { } #[tracing::instrument(skip_all, fields(%subject))] - pub async fn request(&self, subject: &str, payload: &[u8]) -> Result { + pub async fn request(&self, subject: impl Subject, payload: &[u8]) -> Result { self.request_with_timeout(subject, payload, Duration::from_secs(30)) .await } @@ -229,7 +223,7 @@ impl PubSub { #[tracing::instrument(skip_all, fields(%subject))] pub async fn request_with_timeout( &self, - subject: &str, + subject: impl Subject, payload: &[u8], timeout: Duration, ) -> Result { @@ -299,7 +293,7 @@ impl PubSub { #[tracing::instrument(skip_all, fields(%subject))] async fn should_use_local_subscriber( &self, - subject: &str, + subject: &impl Subject, behavior: crate::driver::PublishBehavior, ) -> bool { // Local fast-path for one-subscriber behavior: @@ -317,7 +311,7 @@ impl PubSub { if !matches!(behavior, crate::driver::PublishBehavior::OneSubscriber) { return false; } - if let Some(sender) = self.local_subscribers.get_async(subject).await { + if let Some(sender) = self.local_subscribers.get_async(&*subject.as_cow()).await { sender.receiver_count() > 0 } else { false diff --git a/engine/packages/universalpubsub/src/subject.rs b/engine/packages/universalpubsub/src/subject.rs new file mode 100644 index 0000000000..7cda65d28d --- /dev/null +++ b/engine/packages/universalpubsub/src/subject.rs @@ -0,0 +1,32 @@ +use std::{borrow::Cow, fmt::Display}; + +pub trait Subject: Display { + /// Used for metrics. + fn root<'a>() -> Option> { + None + } + + fn as_str(&self) -> Option<&str> { + None + } + + fn as_cow<'a>(&'a self) -> Cow<'a, str> { + if let Some(subject) = self.as_str() { + Cow::Borrowed(subject) + } else { + Cow::Owned(self.to_string()) + } + } +} + +impl Subject for &str { + fn as_str(&self) -> Option<&str> { + Some(self) + } +} + +impl Subject for &String { + fn as_str(&self) -> Option<&str> { + Some(self) + } +}