Skip to content

Commit 3c66163

Browse files
committed
ensure consumers get a unique ephemeral name
1 parent 7b858ab commit 3c66163

File tree

4 files changed

+6
-13
lines changed

4 files changed

+6
-13
lines changed

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "pyth-stream"
3-
version = "0.1.6"
3+
version = "0.1.7"
44
edition = "2021"
55

66
[lib]

src/bin/websocket_server.rs

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -268,10 +268,10 @@ struct PriceInfo {
268268

269269
async fn handle_nats_messages(jetstream: jetstream::Context, clients: Clients) -> Result<()> {
270270
let stream_name = "PYTH_PRICE_UPDATES";
271-
let consumer_name = "websocket_server";
272271

273272
let consumer_config = consumer::pull::Config {
274-
durable_name: Some(consumer_name.to_string()),
273+
deliver_policy: consumer::DeliverPolicy::All,
274+
ack_policy: consumer::AckPolicy::None,
275275
..Default::default()
276276
};
277277

@@ -280,7 +280,7 @@ async fn handle_nats_messages(jetstream: jetstream::Context, clients: Clients) -
280280
.await
281281
.context("Failed to create NATS consumer")?;
282282

283-
info!(stream = %stream_name, consumer = %consumer_name, "Started handling NATS messages");
283+
info!(stream = %stream_name, "Started handling NATS messages");
284284

285285
loop {
286286
let mut messages = consumer
@@ -336,13 +336,6 @@ async fn handle_nats_messages(jetstream: jetstream::Context, clients: Clients) -
336336
);
337337
}
338338
}
339-
340-
// Spawn a new task for acknowledgment
341-
tokio::spawn(async move {
342-
if let Err(e) = msg.ack().await {
343-
warn!(error = %e, "Failed to acknowledge NATS message");
344-
}
345-
});
346339
}
347340
Err(e) => {
348341
error!(error = %e, "Error receiving message from NATS");

src/utils.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ pub async fn setup_jetstream(nats_client: &async_nats::Client) -> Result<jetstre
99
let stream_config = stream::Config {
1010
name: "PYTH_PRICE_UPDATES".to_string(),
1111
subjects: vec!["pyth.price.updates".to_string()],
12-
max_bytes: 1024 * 1024 * 4000,
12+
max_bytes: 1024 * 1024 * 1000,
1313
duplicate_window: Duration::from_secs(60),
1414
discard: stream::DiscardPolicy::Old,
1515
allow_direct: true,

0 commit comments

Comments
 (0)