Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions .claude/skills/git-spice-merge-my-stack/SKILL.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
---
name: git-spice-merge-my-stack
description: Merge a stacked git-spice branch chain with gh by retargeting each PR to main and merging bottom to top, including conflict recovery via rebase.
license: MIT
compatibility: Requires GitHub CLI (gh), git, and push access.
metadata:
author: rivet
version: "1.0"
---

Merge a stacked PR chain.

**Input**: A target branch in the stack (usually the top branch to merge through).

**Goal**: Merge all PRs from the bottom of that stack up to the target branch.

## Steps

1. **Resolve the target PR**
- Find PR for the provided branch:
- `gh pr list --state open --head "<target-branch>" --json number,headRefName,baseRefName,url`
- If no open PR exists, stop and report.

2. **Build the stack chain down to main**
- Start at target PR.
- Repeatedly find the PR whose `headRefName` equals the current PR `baseRefName`.
- Continue until base is `main` or no parent PR exists.
- If chain is ambiguous, stop and ask the user which branch to follow.

3. **Determine merge order**
- Merge from **bottom to top**.
- Example: `[bottom, ..., target]`.

4. **For each PR in order**
- Retarget to `main` before merge:
- `gh pr edit <pr-number> --base main`
- Merge with repository-compatible strategy:
- Try `gh pr merge <pr-number> --squash --delete-branch=false`
- If merge fails due conflicts:
- `gh pr checkout <pr-number>`
- `git fetch origin main`
- `git rebase origin/main`
- Resolve conflicts. If replaying already-upstream commits from lower stack layers, prefer `git rebase --skip`.
- Continue with `GIT_EDITOR=true git rebase --continue` when needed.
- `git push --force-with-lease origin <head-branch>`
- Retry `gh pr merge ... --squash`.

5. **Verify completion**
- Confirm each PR in chain is merged:
- `gh pr view <pr-number> --json state,mergedAt,url`
- Report final ordered merge list with PR numbers and timestamps.

## Guardrails

- Always merge in bottom-to-top order.
- Do not use merge commits if the repo disallows them.
- Do not delete remote branches unless explicitly requested.
- If a conflict cannot be safely resolved, stop and ask the user.
- If force-push is required, use `--force-with-lease`, never `--force`.
- After finishing, return to the user's original branch unless they asked otherwise.
58 changes: 41 additions & 17 deletions engine/packages/gasoline/src/ctx/message.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{
fmt::{self, Debug},
borrow::Cow,
fmt::{self, Debug, Display},
marker::PhantomData,
sync::Arc,
};
Expand All @@ -8,7 +9,7 @@ use rivet_pools::UpsPool;
use rivet_util::Id;
use tokio_util::sync::{CancellationToken, DropGuard};
use tracing::Instrument;
use universalpubsub::{NextOutput, Subscriber};
use universalpubsub::{NextOutput, Subject, Subscriber};

use crate::{
error::{WorkflowError, WorkflowResult},
Expand Down Expand Up @@ -61,7 +62,7 @@ impl MessageCtx {
let client = self.clone();
let topic = topic.to_string();

let spawn_res = tokio::task::Builder::new()
tokio::task::Builder::new()
.name("gasoline::message_async")
.spawn(
async move {
Expand All @@ -73,13 +74,9 @@ impl MessageCtx {
}
}
.instrument(tracing::info_span!("message_bg")),
);

if let Err(err) = spawn_res {
tracing::error!(?err, "failed to spawn message_async task");
}

Ok(())
)
.map_err(|err| WorkflowError::PublishMessage(err.into()))
.map(|_| ())
}

/// Same as `message` but waits for the message to successfully publish.
Expand All @@ -92,7 +89,10 @@ impl MessageCtx {
where
M: Message,
{
let subject = format!("{}:{topic}", M::subject());
let subject = MsgSubject {
topic,
msg_marker: PhantomData::<M>,
};
let duration_since_epoch = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_else(|err| unreachable!("time is broken: {}", err));
Expand Down Expand Up @@ -126,15 +126,14 @@ impl MessageCtx {
// It's important to write to the stream as fast as possible in order to
// ensure messages are handled quickly.
let message_buf = Arc::new(message_buf);
self.message_publish_pubsub::<M>(&subject, message_buf)
.await;
self.message_publish_pubsub::<M>(subject, message_buf).await;

Ok(())
}

/// Publishes the message to pubsub.
#[tracing::instrument(level = "debug", skip_all)]
async fn message_publish_pubsub<M>(&self, subject: &str, message_buf: Arc<Vec<u8>>)
async fn message_publish_pubsub<M>(&self, subject: MsgSubject<'_, M>, message_buf: Arc<Vec<u8>>)
where
M: Message,
{
Expand All @@ -144,8 +143,6 @@ impl MessageCtx {
// Ignore for infinite backoff
backoff.tick().await;

let subject = subject.to_owned();

tracing::trace!(
%subject,
message_len = message_buf.len(),
Expand All @@ -154,7 +151,7 @@ impl MessageCtx {
if let Err(err) = self
.pubsub
.publish(
&subject,
subject.clone(),
&(*message_buf),
universalpubsub::PublishOpts::broadcast(),
)
Expand Down Expand Up @@ -332,3 +329,30 @@ where
})
}
}

// Helper struct
struct MsgSubject<'a, M: Message> {
topic: &'a str,
msg_marker: PhantomData<M>,
}

impl<M: Message> Clone for MsgSubject<'_, M> {
fn clone(&self) -> Self {
MsgSubject {
topic: self.topic,
msg_marker: PhantomData::<M>,
}
}
}

impl<M: Message> Display for MsgSubject<'_, M> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}:{}", M::subject(), self.topic)
}
}

impl<M: Message> Subject for MsgSubject<'_, M> {
fn root<'a>() -> Option<Cow<'a, str>> {
Some(Cow::Owned(M::subject()))
}
}
3 changes: 3 additions & 0 deletions engine/packages/gasoline/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@ pub enum WorkflowError {
#[error("failed to flush pubsub: {0}")]
FlushPubsub(#[source] anyhow::Error),

#[error("failed to publish message: {0}")]
PublishMessage(#[source] anyhow::Error),

#[error("subscription unsubscribed")]
SubscriptionUnsubscribed,

Expand Down
2 changes: 2 additions & 0 deletions engine/packages/universalpubsub/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ pub mod driver;
pub mod errors;
pub mod metrics;
pub mod pubsub;
pub mod subject;

pub use driver::*;
pub use pubsub::{Message, NextOutput, PubSub, Response, Subscriber};
pub use subject::Subject;
88 changes: 41 additions & 47 deletions engine/packages/universalpubsub/src/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use rivet_util::backoff::Backoff;
use crate::chunking::{ChunkTracker, encode_chunk, split_payload_into_chunks};
use crate::driver::{PubSubDriverHandle, PublishOpts, SubscriberDriverHandle};
use crate::metrics;
use crate::subject::Subject;

const GC_INTERVAL: Duration = Duration::from_secs(60);

Expand Down Expand Up @@ -82,9 +83,11 @@ impl PubSub {
}

#[tracing::instrument(skip_all, fields(%subject))]
pub async fn subscribe(&self, subject: &str) -> Result<Subscriber> {
pub async fn subscribe(&self, subject: impl Subject) -> Result<Subscriber> {
let subject = subject.as_cow();

// Underlying driver subscription
let driver = self.driver.subscribe(subject).await?;
let driver = self.driver.subscribe(&subject).await?;

if !self.memory_optimization {
return Ok(Subscriber::new(driver, self.clone(), None));
Expand Down Expand Up @@ -117,90 +120,81 @@ impl PubSub {
}

#[tracing::instrument(skip_all, fields(%subject))]
pub async fn publish(&self, subject: &str, payload: &[u8], opts: PublishOpts) -> Result<()> {
let message_id = *Uuid::new_v4().as_bytes();
let chunks =
split_payload_into_chunks(payload, self.driver.max_message_size(), message_id, None)?;
let chunk_count = chunks.len() as u32;

let use_local = self
.should_use_local_subscriber(subject, opts.behavior)
.await;

for (chunk_idx, chunk_payload) in chunks.into_iter().enumerate() {
let encoded = encode_chunk(
chunk_payload,
chunk_idx as u32,
chunk_count,
message_id,
None,
)?;

if use_local {
if let Some(sender) = self.local_subscribers.get_async(subject).await {
let _ = sender.send(encoded);
} else {
tracing::warn!(%subject, "local subscriber disappeared");
break;
}
} else {
// Use backoff when publishing through the driver
self.publish_with_backoff(subject, &encoded).await?;
}
}
Ok(())
pub async fn publish(
&self,
subject: impl Subject,
payload: &[u8],
opts: PublishOpts,
) -> Result<()> {
self.publish_inner(subject, payload, None::<&str>, opts)
.await
}

#[tracing::instrument(skip_all, fields(%subject, %reply_subject))]
pub async fn publish_with_reply(
&self,
subject: &str,
subject: impl Subject,
payload: &[u8],
reply_subject: impl Subject,
opts: PublishOpts,
) -> Result<()> {
self.publish_inner(subject, payload, Some(reply_subject), opts)
.await
}

async fn publish_inner(
&self,
subject: impl Subject,
payload: &[u8],
reply_subject: &str,
reply_subject: Option<impl Subject>,
opts: PublishOpts,
) -> Result<()> {
let message_id = *Uuid::new_v4().as_bytes();
let chunks = split_payload_into_chunks(
payload,
self.driver.max_message_size(),
message_id,
Some(reply_subject),
reply_subject.as_ref().map(|x| x.as_cow()).as_deref(),
)?;
let chunk_count = chunks.len() as u32;

let use_local = self
.should_use_local_subscriber(subject, opts.behavior)
.should_use_local_subscriber(&subject, opts.behavior)
.await;

let subject_cow = subject.as_cow();

for (chunk_idx, chunk_payload) in chunks.into_iter().enumerate() {
let encoded = encode_chunk(
chunk_payload,
chunk_idx as u32,
chunk_count,
message_id,
Some(reply_subject.to_string()),
reply_subject.as_ref().map(|x| x.to_string()),
)?;

if use_local {
if let Some(sender) = self.local_subscribers.get_async(subject).await {
if let Some(sender) = self.local_subscribers.get_async(&*subject_cow).await {
let _ = sender.send(encoded);
} else {
tracing::warn!(%subject, "local subscriber disappeared");
break;
}
} else {
// Use backoff when publishing through the driver
self.publish_with_backoff(subject, &encoded).await?;
self.publish_with_backoff(&subject, &encoded).await?;
}
}
Ok(())
}

#[tracing::instrument(skip_all, fields(%subject))]
async fn publish_with_backoff(&self, subject: &str, encoded: &[u8]) -> Result<()> {
async fn publish_with_backoff(&self, subject: &impl Subject, encoded: &[u8]) -> Result<()> {
let subject = subject.as_cow();

let mut backoff = Backoff::default();
loop {
match self.driver.publish(subject, encoded).await {
match self.driver.publish(&subject, encoded).await {
Result::Ok(_) => break,
Err(err) if !backoff.tick().await => {
tracing::warn!(?err, "error publishing, cannot retry again");
Expand All @@ -221,15 +215,15 @@ impl PubSub {
}

#[tracing::instrument(skip_all, fields(%subject))]
pub async fn request(&self, subject: &str, payload: &[u8]) -> Result<Response> {
pub async fn request(&self, subject: impl Subject, payload: &[u8]) -> Result<Response> {
self.request_with_timeout(subject, payload, Duration::from_secs(30))
.await
}

#[tracing::instrument(skip_all, fields(%subject))]
pub async fn request_with_timeout(
&self,
subject: &str,
subject: impl Subject,
payload: &[u8],
timeout: Duration,
) -> Result<Response> {
Expand Down Expand Up @@ -299,7 +293,7 @@ impl PubSub {
#[tracing::instrument(skip_all, fields(%subject))]
async fn should_use_local_subscriber(
&self,
subject: &str,
subject: &impl Subject,
behavior: crate::driver::PublishBehavior,
) -> bool {
// Local fast-path for one-subscriber behavior:
Expand All @@ -317,7 +311,7 @@ impl PubSub {
if !matches!(behavior, crate::driver::PublishBehavior::OneSubscriber) {
return false;
}
if let Some(sender) = self.local_subscribers.get_async(subject).await {
if let Some(sender) = self.local_subscribers.get_async(&*subject.as_cow()).await {
sender.receiver_count() > 0
} else {
false
Expand Down
Loading
Loading