Skip to content

Commit 5d11cbc

Browse files
authored
fix(pyth-lazer-agent): Only send one success response per batch update (#3186)
1 parent d09e47e commit 5d11cbc

File tree

3 files changed

+54
-39
lines changed

3 files changed

+54
-39
lines changed

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

apps/pyth-lazer-agent/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "pyth-lazer-agent"
3-
version = "0.8.0"
3+
version = "0.8.1"
44
edition = "2024"
55
description = "Pyth Lazer Agent"
66
license = "Apache-2.0"

apps/pyth-lazer-agent/src/jrpc_handle.rs

Lines changed: 52 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -103,20 +103,36 @@ async fn handle_jrpc_inner<T: AsyncRead + AsyncWrite + Unpin>(
103103
match serde_json::from_slice::<PythLazerAgentJrpcV1>(receive_buf.as_slice()) {
104104
Ok(jrpc_request) => match jrpc_request.params {
105105
JrpcCall::PushUpdate(request_params) => {
106-
handle_push_update(
107-
sender,
108-
lazer_publisher,
109-
request_params,
110-
jrpc_request.id.clone(),
111-
)
112-
.await
106+
match lazer_publisher
107+
.push_feed_update(request_params.clone().into())
108+
.await
109+
{
110+
Ok(()) => send_update_success_response(sender, jrpc_request.id).await,
111+
Err(err) => {
112+
send_update_failure_response(sender, request_params, jrpc_request.id, err)
113+
.await
114+
}
115+
}
113116
}
114-
JrpcCall::PushUpdates(request_params) => {
115-
for feed in request_params {
116-
handle_push_update(sender, lazer_publisher, feed, jrpc_request.id.clone())
117-
.await?;
117+
JrpcCall::PushUpdates(request_params_batch) => {
118+
for request_params in request_params_batch {
119+
match lazer_publisher
120+
.push_feed_update(request_params.clone().into())
121+
.await
122+
{
123+
Ok(()) => (),
124+
Err(err) => {
125+
return send_update_failure_response(
126+
sender,
127+
request_params,
128+
jrpc_request.id,
129+
err,
130+
)
131+
.await;
132+
}
133+
}
118134
}
119-
Ok(())
135+
send_update_success_response(sender, jrpc_request.id).await
120136
}
121137
JrpcCall::GetMetadata(request_params) => match jrpc_request.id {
122138
JrpcId::Null => {
@@ -201,37 +217,18 @@ fn filter_symbols(
201217
res
202218
}
203219

204-
async fn handle_push_update<T: AsyncRead + AsyncWrite + Unpin>(
220+
async fn send_update_success_response<T: AsyncRead + AsyncWrite + Unpin>(
205221
sender: &mut Sender<T>,
206-
lazer_publisher: &LazerPublisher,
207-
request_params: FeedUpdateParams,
208222
request_id: JrpcId,
209223
) -> anyhow::Result<()> {
210-
match lazer_publisher
211-
.push_feed_update(request_params.clone().into())
212-
.await
213-
{
214-
Ok(_) => match request_id {
215-
JrpcId::Null => Ok(()),
216-
_ => {
217-
send_json(
218-
sender,
219-
&JrpcSuccessResponse::<String> {
220-
jsonrpc: JsonRpcVersion::V2,
221-
result: "success".to_string(),
222-
id: request_id,
223-
},
224-
)
225-
.await
226-
}
227-
},
228-
Err(err) => {
229-
debug!("error while sending updates: {:?}", err);
224+
match request_id {
225+
JrpcId::Null => Ok(()),
226+
_ => {
230227
send_json(
231228
sender,
232-
&JrpcErrorResponse {
229+
&JrpcSuccessResponse::<String> {
233230
jsonrpc: JsonRpcVersion::V2,
234-
error: JrpcError::SendUpdateError(request_params).into(),
231+
result: "success".to_string(),
235232
id: request_id,
236233
},
237234
)
@@ -240,6 +237,24 @@ async fn handle_push_update<T: AsyncRead + AsyncWrite + Unpin>(
240237
}
241238
}
242239

240+
async fn send_update_failure_response<T: AsyncRead + AsyncWrite + Unpin>(
241+
sender: &mut Sender<T>,
242+
request_params: FeedUpdateParams,
243+
request_id: JrpcId,
244+
err: Error,
245+
) -> anyhow::Result<()> {
246+
debug!("error while sending updates: {:?}", err);
247+
send_json(
248+
sender,
249+
&JrpcErrorResponse {
250+
jsonrpc: JsonRpcVersion::V2,
251+
error: JrpcError::SendUpdateError(request_params).into(),
252+
id: request_id,
253+
},
254+
)
255+
.await
256+
}
257+
243258
async fn handle_get_metadata<T: AsyncRead + AsyncWrite + Unpin>(
244259
sender: &mut Sender<T>,
245260
config: &Config,

0 commit comments

Comments
 (0)