Skip to content

Conversation

@ayazabbas
Copy link
Collaborator

No description provided.

@thmzlt
Copy link

thmzlt commented Oct 20, 2024

A few overall points:

  • From reading the code, I see that it is possible to configure the services using env. variables. We should specify in the README what the config options are. Also, we've had some discussions in the past about defining configuration for services. I tend to favor env. variables and CLI args rather than config files as it makes it easier to pass config in Kubernetes, but it is mostly personal.
  • When you connect to the websocket server and send an invalid message, you get no feedback from the server unless you enable a more verbose log level and read the server logs. We should send back something to the client, even if it is "I couldn't understand your message".
  • Better yet - would it be possible to pass the list of price ids in the websocket URL so the clients are subscribed automatically when they connect? That would make the websocket connection stateless and simplify the implementation (i.e. it is now one-way).

};

if price_account.agg.status == PriceStatus::Trading
&& (update.context.slot - price_account.agg.pub_slot)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add a comment for context here? Looks like you want to discard account updates when the update slot is more than X slots ahead of the price aggregate slot. What is a invalid price update in this context?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I took this from benchmarks/rust-streaming https://github.com/pyth-network/benchmarks/blob/main/rust-streaming/src/pythnet.rs#L56 - I don't really have full context on this myself.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @ali-bahjati

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally we want to send price updates whenever the aggregate changes but sometimes the accounts can change without the aggregate changing; for example it happens when there is an update but min_pub is not reached and it can happen for feeds in closed market hours.

price_feed: PriceFeed {
id: update.value.pubkey.to_string(),
price: PriceInfo {
price: price_account.agg.price.to_string(),
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd check with Ali if it makes sense to send the price and expo components separately or formatted as a regular decimal.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think @ayazabbas made it compatible with Hermes. I'd say let's keep it as our partners are trying it out with Hermes.

Ok(jetstream)
}

async fn handle_connection(stream: TcpStream, clients: Clients) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is difficult to parse. In addition to the websocket sender/receiver this is defining two additional channels (tx/rx and outgoing_tx/outgoing_rx), I am assuming it is so you can send both price updates and responses from the client through the ws_sender. Is that correct? Is it not possible to clone the ws_sender and use it in more than one place?

Copy link
Collaborator Author

@ayazabbas ayazabbas Oct 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea behind multiple channels is to separate message generation from message sending. The broadcast channel is used for distributing price updates to all interested clients, and the mspc channel is used to queue outgoing messages for a specific client.

I agree that this can be simplified but I'm not sure if cloning the ws_sender is safe/recommended. I think an easy simplification might be to eliminate the broadcast channel and just have the nats message handler forward price updates directly to the outgoing_tx channel for each client. Gonna try this.

P.S. Using the broadcast channel approach would be more efficient when you have many clients subscribed to the same price feeds but in this case the difference is probably negligible.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if ws_sender is clone able, then we should do it to make things easier.

Copy link
Contributor

@ali-behjati ali-behjati left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. I left some comments that are mostly minor and language specific.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can separate it to two workflows: one for ci, and one for image build&push (and add manual dispatch there)

Cargo.toml Outdated
tokio-tungstenite = "0.24.0"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
zeroize = { version = "1.7", default-features = false }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need this after the patch below?


#[derive(Debug, Deserialize)]
struct NatsConfig {
url: String,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: you can use url::Url or nats ServerAddr here to make sure right argument is passed upon parsing the config.

Comment on lines 64 to 65
http_addr: String,
websocket_addr: String,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above

Comment on lines 66 to 67
program_key: String,
mapping_key: String,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use Pubkey here.


info!(client_addr = %addr, "New WebSocket connection established");

let (tx, mut rx) = broadcast::channel(100);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: let's extract 100 and make it a constant CHANNEL_SIZE

#[derive(Debug, Serialize, Deserialize)]
struct ClientMessage {
#[serde(rename = "type")]
message_type: String,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can have this as enum and act based on that. right? (like this)

let jetstream_clone = jetstream.clone();
let clients_clone = clients.clone();
tokio::spawn(async move {
handle_nats_messages(jetstream_clone, clients_clone).await;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one nit thing here is that if these threads fail the app will enter a dead state and we might not want it. So either reconnection is good here or panic there and set the panic behaviour to abort the process (see this)

Ok(jetstream)
}

async fn handle_connection(stream: TcpStream, clients: Clients) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if ws_sender is clone able, then we should do it to make things easier.

let outgoing_tx_clone = outgoing_tx.clone();
let broadcast_task = tokio::spawn(async move {
while let Ok(msg) = rx.recv().await {
if let Err(e) = outgoing_tx_clone.send(Message::Text(msg)).await {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for future: as we have dozens of prices coming at each slot, it might be better to feed them first, and then flush once we've sent all of them.

add configuration options to readme
use specific types in structs
use expect or context instead of unwrap
remove use of mapping account
spawn separate threads for blocking operations
move duplicate nats code to utils
move binary entrypoints under src/bin
add reconnection logic and abort process on panic
@ayazabbas ayazabbas merged commit e850536 into main Oct 22, 2024
2 checks passed
@ayazabbas ayazabbas deleted the pyth-stream branch October 22, 2024 10:56
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants