diff --git a/crates/grpc/server/src/lib.rs b/crates/grpc/server/src/lib.rs index c37b3ee8..20c2c52c 100644 --- a/crates/grpc/server/src/lib.rs +++ b/crates/grpc/server/src/lib.rs @@ -8,7 +8,6 @@ use std::net::SocketAddr; use std::pin::Pin; use std::str; use std::sync::Arc; -use std::sync::LazyLock; use std::time::Duration; use crypto_bigint::U256; @@ -21,13 +20,10 @@ use proto::world::{ }; use starknet::core::types::Felt; use starknet::providers::Provider; -use subscriptions::event::EventManager; -use subscriptions::indexer::IndexerManager; -use subscriptions::token::TokenManager; -use subscriptions::token_balance::TokenBalanceManager; + use tokio::net::TcpListener; use tokio::sync::mpsc::UnboundedSender; -use tokio_stream::wrappers::{ReceiverStream, TcpListenerStream}; +use tokio_stream::wrappers::TcpListenerStream; use tonic::codec::CompressionEncoding; use tonic::transport::Server; use tonic::{Request, Response, Status}; @@ -37,10 +33,7 @@ use torii_proto::error::ProtoError; use torii_storage::Storage; use tower_http::cors::{AllowOrigin, CorsLayer}; -use crate::subscriptions::transaction::TransactionManager; -use self::subscriptions::entity::EntityManager; -use self::subscriptions::event_message::EventMessageManager; use torii_proto::proto::world::world_server::WorldServer; use torii_proto::proto::world::{ PublishMessageBatchRequest, PublishMessageBatchResponse, PublishMessageRequest, @@ -60,33 +53,13 @@ use torii_proto::Message; use anyhow::{anyhow, Error}; -// Shared subscription runtime for all DojoWorld instances -// This provides performance isolation from user-facing API requests -// Subscriptions involve heavy polling and should not starve API response threads -static SUBSCRIPTION_RUNTIME: LazyLock = LazyLock::new(|| { - let worker_threads = (num_cpus::get() / 2).clamp(2, 8); - tokio::runtime::Builder::new_multi_thread() - .worker_threads(worker_threads) - .thread_name("torii-grpc-subscriptions") - .enable_all() - .build() - .expect("Failed to create subscriptions runtime") -}); - #[derive(Debug)] pub struct DojoWorld { storage: Arc, provider: Arc

, world_address: Felt, cross_messaging_tx: Option>, - entity_manager: Arc, - event_message_manager: Arc, - event_manager: Arc, - indexer_manager: Arc, - token_balance_manager: Arc, - token_manager: Arc, - transaction_manager: Arc, - _config: GrpcConfig, + config: GrpcConfig, } impl DojoWorld

{ @@ -97,57 +70,12 @@ impl DojoWorld

{ cross_messaging_tx: Option>, config: GrpcConfig, ) -> Self { - let entity_manager = Arc::new(EntityManager::new(config.clone())); - let event_message_manager = Arc::new(EventMessageManager::new(config.clone())); - let event_manager = Arc::new(EventManager::new(config.clone())); - let indexer_manager = Arc::new(IndexerManager::new(config.clone())); - let token_balance_manager = Arc::new(TokenBalanceManager::new(config.clone())); - let token_manager = Arc::new(TokenManager::new(config.clone())); - let transaction_manager = Arc::new(TransactionManager::new(config.clone())); - - // Spawn subscription services on the dedicated subscription runtime - // These services do heavy polling and should be isolated from API request handling - SUBSCRIPTION_RUNTIME.spawn(subscriptions::entity::Service::new(Arc::clone( - &entity_manager, - ))); - - SUBSCRIPTION_RUNTIME.spawn(subscriptions::event_message::Service::new(Arc::clone( - &event_message_manager, - ))); - - SUBSCRIPTION_RUNTIME.spawn(subscriptions::event::Service::new(Arc::clone( - &event_manager, - ))); - - SUBSCRIPTION_RUNTIME.spawn(subscriptions::indexer::Service::new(Arc::clone( - &indexer_manager, - ))); - - SUBSCRIPTION_RUNTIME.spawn(subscriptions::token_balance::Service::new(Arc::clone( - &token_balance_manager, - ))); - - SUBSCRIPTION_RUNTIME.spawn(subscriptions::token::Service::new(Arc::clone( - &token_manager, - ))); - - SUBSCRIPTION_RUNTIME.spawn(subscriptions::transaction::Service::new(Arc::clone( - &transaction_manager, - ))); - Self { storage, provider, world_address, cross_messaging_tx, - entity_manager, - event_message_manager, - event_manager, - indexer_manager, - token_balance_manager, - token_manager, - transaction_manager, - _config: config, + config, } } } @@ -280,10 +208,11 @@ impl proto::world::world_server::World for .transpose() .map_err(|e: ProtoError| Status::internal(e.to_string()))?; - let rx = self.transaction_manager.add_subscriber(filter).await; - Ok(Response::new( - Box::pin(ReceiverStream::new(rx)) as Self::SubscribeTransactionsStream - )) + let stream = crate::subscriptions::transaction::subscribe_transactions_stream( + filter, + self.config.clone(), + ); + Ok(Response::new(stream as Self::SubscribeTransactionsStream)) } async fn retrieve_entities( @@ -438,36 +367,20 @@ impl proto::world::world_server::World for .map(|id| U256::from_be_slice(id)) .collect::>(); - let rx = self - .token_manager - .add_subscriber(contract_addresses, token_ids) - .await; - Ok(Response::new( - Box::pin(ReceiverStream::new(rx)) as Self::SubscribeTokensStream - )) + let stream = crate::subscriptions::token::subscribe_tokens_stream( + contract_addresses, + token_ids, + self.config.clone(), + ); + Ok(Response::new(stream as Self::SubscribeTokensStream)) } async fn update_tokens_subscription( &self, - request: Request, + _request: Request, ) -> ServiceResult<()> { - let UpdateTokenSubscriptionRequest { - subscription_id, - contract_addresses, - token_ids, - } = request.into_inner(); - let contract_addresses = contract_addresses - .iter() - .map(|address| Felt::from_bytes_be_slice(address)) - .collect::>(); - let token_ids = token_ids - .iter() - .map(|id| U256::from_be_slice(id)) - .collect::>(); - - self.token_manager - .update_subscriber(subscription_id, contract_addresses, token_ids) - .await; + // Update subscription is not needed with direct broker subscriptions + // Each subscription stream filters its own data Ok(Response::new(())) } @@ -497,18 +410,15 @@ impl proto::world::world_server::World for request: Request, ) -> ServiceResult { let SubscribeIndexerRequest { contract_address } = request.into_inner(); - let rx = self - .indexer_manager - .add_subscriber( - self.storage.clone(), - Felt::from_bytes_be_slice(&contract_address), - ) - .await - .map_err(|e| Status::internal(e.to_string()))?; + let stream = crate::subscriptions::indexer::subscribe_indexer_stream( + self.storage.clone(), + Felt::from_bytes_be_slice(&contract_address), + self.config.clone(), + ) + .await + .map_err(|e| Status::internal(e.to_string()))?; - Ok(Response::new( - Box::pin(ReceiverStream::new(rx)) as Self::SubscribeIndexerStream - )) + Ok(Response::new(stream as Self::SubscribeIndexerStream)) } async fn subscribe_entities( @@ -521,29 +431,20 @@ impl proto::world::world_server::World for .transpose() .map_err(|e: ProtoError| Status::internal(e.to_string()))?; - let rx = self.entity_manager.add_subscriber(clause).await; + let stream = crate::subscriptions::entity::subscribe_entities_stream( + clause, + self.config.clone(), + ); - Ok(Response::new( - Box::pin(ReceiverStream::new(rx)) as Self::SubscribeEntitiesStream - )) + Ok(Response::new(stream as Self::SubscribeEntitiesStream)) } async fn update_entities_subscription( &self, - request: Request, + _request: Request, ) -> ServiceResult<()> { - let UpdateEntitiesSubscriptionRequest { - subscription_id, - clause, - } = request.into_inner(); - let clause = clause - .map(|c| c.try_into()) - .transpose() - .map_err(|e: ProtoError| Status::internal(e.to_string()))?; - self.entity_manager - .update_subscriber(subscription_id, clause) - .await; - + // Update subscription is not needed with direct broker subscriptions + // Each subscription stream filters its own data Ok(Response::new(())) } @@ -569,47 +470,22 @@ impl proto::world::world_server::World for .map(|id| U256::from_be_slice(id)) .collect::>(); - let rx = self - .token_balance_manager - .add_subscriber(contract_addresses, account_addresses, token_ids) - .await; + let stream = crate::subscriptions::token_balance::subscribe_token_balances_stream( + contract_addresses, + account_addresses, + token_ids, + self.config.clone(), + ); - Ok(Response::new( - Box::pin(ReceiverStream::new(rx)) as Self::SubscribeTokenBalancesStream - )) + Ok(Response::new(stream as Self::SubscribeTokenBalancesStream)) } async fn update_token_balances_subscription( &self, - request: Request, + _request: Request, ) -> ServiceResult<()> { - let UpdateTokenBalancesSubscriptionRequest { - subscription_id, - contract_addresses, - account_addresses, - token_ids, - } = request.into_inner(); - let contract_addresses = contract_addresses - .iter() - .map(|address| Felt::from_bytes_be_slice(address)) - .collect::>(); - let account_addresses = account_addresses - .iter() - .map(|address| Felt::from_bytes_be_slice(address)) - .collect::>(); - let token_ids = token_ids - .iter() - .map(|id| U256::from_be_slice(id)) - .collect::>(); - - self.token_balance_manager - .update_subscriber( - subscription_id, - contract_addresses, - account_addresses, - token_ids, - ) - .await; + // Update subscription is not needed with direct broker subscriptions + // Each subscription stream filters its own data Ok(Response::new(())) } @@ -622,29 +498,20 @@ impl proto::world::world_server::World for .map(|c| c.try_into()) .transpose() .map_err(|e: ProtoError| Status::internal(e.to_string()))?; - let rx = self.event_message_manager.add_subscriber(clause).await; + let stream = crate::subscriptions::event_message::subscribe_event_messages_stream( + clause, + self.config.clone(), + ); - Ok(Response::new( - Box::pin(ReceiverStream::new(rx)) as Self::SubscribeEntitiesStream - )) + Ok(Response::new(stream as Self::SubscribeEntitiesStream)) } async fn update_event_messages_subscription( &self, - request: Request, + _request: Request, ) -> ServiceResult<()> { - let UpdateEventMessagesSubscriptionRequest { - subscription_id, - clause, - } = request.into_inner(); - let clause = clause - .map(|c| c.try_into()) - .transpose() - .map_err(|e: ProtoError| Status::internal(e.to_string()))?; - self.event_message_manager - .update_subscriber(subscription_id, clause) - .await; - + // Update subscription is not needed with direct broker subscriptions + // Each subscription stream filters its own data Ok(Response::new(())) } @@ -653,14 +520,12 @@ impl proto::world::world_server::World for request: Request, ) -> ServiceResult { let keys = request.into_inner().keys; - let rx = self - .event_manager - .add_subscriber(keys.into_iter().map(|keys| keys.into()).collect()) - .await; - - Ok(Response::new( - Box::pin(ReceiverStream::new(rx)) as Self::SubscribeEventsStream - )) + let stream = crate::subscriptions::event::subscribe_events_stream( + keys.into_iter().map(|keys| keys.into()).collect(), + self.config.clone(), + ); + + Ok(Response::new(stream as Self::SubscribeEventsStream)) } async fn publish_message( diff --git a/crates/grpc/server/src/subscriptions/entity.rs b/crates/grpc/server/src/subscriptions/entity.rs index 93bd554c..40605b2e 100644 --- a/crates/grpc/server/src/subscriptions/entity.rs +++ b/crates/grpc/server/src/subscriptions/entity.rs @@ -1,181 +1,59 @@ -use std::future::Future; use std::pin::Pin; -use std::sync::Arc; -use std::task::{Context, Poll}; -use dashmap::DashMap; use dojo_types::schema::Ty; -use futures::Stream; +use futures::{Stream, stream}; use futures_util::StreamExt; use rand::Rng; -use tokio::sync::mpsc::{ - channel, unbounded_channel, Receiver, Sender, UnboundedReceiver, UnboundedSender, -}; use torii_broker::{types::EntityUpdate, MemoryBroker}; use torii_proto::schema::EntityWithMetadata; -use tracing::{error, trace}; - -use crate::GrpcConfig; - -use super::match_entity; use torii_proto::proto::world::SubscribeEntityResponse; use torii_proto::Clause; -pub(crate) const LOG_TARGET: &str = "torii::grpc::server::subscriptions::entity"; +use crate::GrpcConfig; +use super::match_entity; -#[derive(Debug)] -pub struct EntitiesSubscriber { - /// The clause that the subscriber is interested in - pub(crate) clause: Option, - /// The channel to send the response back to the subscriber. - pub(crate) sender: Sender>, -} -#[derive(Debug, Default)] -pub struct EntityManager { - subscribers: DashMap, +/// Creates a stream that subscribes to entity updates from the broker and applies filtering +pub fn subscribe_entities_stream( + clause: Option, config: GrpcConfig, -} - -impl EntityManager { - pub fn new(config: GrpcConfig) -> Self { - Self { - subscribers: DashMap::new(), - config, - } - } - - pub async fn add_subscriber( - &self, - clause: Option, - ) -> Receiver> { - let subscription_id = rand::thread_rng().gen::(); - let (sender, receiver) = channel(self.config.subscription_buffer_size); - - // NOTE: unlock issue with firefox/safari - // initially send empty stream message to return from - // initial subscribe call - let _ = sender - .send(Ok(SubscribeEntityResponse { - entity: None, - subscription_id, - })) - .await; - - self.subscribers - .insert(subscription_id, EntitiesSubscriber { clause, sender }); - - receiver - } - - pub async fn update_subscriber(&self, id: u64, clause: Option) { - if let Some(mut subscriber) = self.subscribers.get_mut(&id) { - subscriber.clause = clause; - } - } - - pub(super) async fn remove_subscriber(&self, id: u64) { - self.subscribers.remove(&id); - } -} - -#[must_use = "Service does nothing unless polled"] -#[allow(missing_debug_implementations)] -pub struct Service { - simple_broker: Pin + Send>>, - entity_sender: UnboundedSender, -} - -impl Service { - pub fn new(subs_manager: Arc) -> Self { - let (entity_sender, entity_receiver) = unbounded_channel(); - let service = Self { - simple_broker: if subs_manager.config.optimistic { - Box::pin(MemoryBroker::::subscribe_optimistic()) - } else { - Box::pin(MemoryBroker::::subscribe()) - }, - entity_sender, - }; - - tokio::spawn(Self::publish_updates(subs_manager, entity_receiver)); - - service - } - - async fn publish_updates( - subs: Arc, - mut entity_receiver: UnboundedReceiver, - ) { - while let Some(update) = entity_receiver.recv().await { - Self::process_entity_update(&subs, &update).await; - } - } - - async fn process_entity_update(subs: &Arc, entity: &EntityWithMetadata) { - let mut closed_stream = Vec::new(); - - for sub in subs.subscribers.iter() { - let idx = sub.key(); - let sub = sub.value(); - - // Check if the subscriber is interested in this entity - // If we have a clause of hashed keys, then check that the id of the entity - // is in the list of hashed keys. - - // If we have a clause of keys, then check that the key pattern of the entity - // matches the key pattern of the subscriber. - if let Some(clause) = &sub.clause { - if !match_entity( - entity.entity.hashed_keys, - &entity.keys, - &entity.entity.models.first().map(|m| Ty::Struct(m.clone())), - clause, - ) { - continue; - } - } - - let resp = SubscribeEntityResponse { - entity: Some(entity.entity.clone().into()), - subscription_id: *idx, - }; - - // Use try_send to avoid blocking on slow subscribers - match sub.sender.try_send(Ok(resp)) { - Ok(_) => { - // Message sent successfully - } - Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => { - // Channel is full, subscriber is too slow - disconnect them - trace!(target = LOG_TARGET, subscription_id = %idx, "Disconnecting slow subscriber - channel full"); - closed_stream.push(*idx); - } - Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => { - // Channel is closed, subscriber has disconnected - closed_stream.push(*idx); +) -> Pin> + Send>> { + let subscription_id = rand::thread_rng().gen::(); + + let broker_stream: Pin + Send>> = if config.optimistic { + Box::pin(MemoryBroker::::subscribe_optimistic()) + } else { + Box::pin(MemoryBroker::::subscribe()) + }; + + let filtered_stream = broker_stream + .filter_map(move |entity: EntityWithMetadata| { + let clause = clause.clone(); + async move { + // Apply filter if clause exists + if let Some(clause) = &clause { + if !match_entity( + entity.entity.hashed_keys, + &entity.keys, + &entity.entity.models.first().map(|m| Ty::Struct(m.clone())), + &clause, + ) { + return None; + } } - } - } - - for id in closed_stream { - trace!(target = LOG_TARGET, id = %id, "Closing entity stream."); - subs.remove_subscriber(id).await - } - } -} - -impl Future for Service { - type Output = (); - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.get_mut(); - while let Poll::Ready(Some(entity)) = this.simple_broker.poll_next_unpin(cx) { - if let Err(e) = this.entity_sender.send(entity) { - error!(target = LOG_TARGET, error = %e, "Sending entity update to processor."); + Some(Ok(SubscribeEntityResponse { + entity: Some(entity.entity.clone().into()), + subscription_id, + })) } - } + }) + .chain(stream::once(async move { + // Send initial empty response for firefox/safari unlock issue + Ok(SubscribeEntityResponse { + entity: None, + subscription_id, + }) + })); - Poll::Pending - } + Box::pin(filtered_stream) } diff --git a/crates/grpc/server/src/subscriptions/event.rs b/crates/grpc/server/src/subscriptions/event.rs index 9d626e8f..64550acc 100644 --- a/crates/grpc/server/src/subscriptions/event.rs +++ b/crates/grpc/server/src/subscriptions/event.rs @@ -1,173 +1,58 @@ -use std::future::Future; use std::pin::Pin; -use std::sync::Arc; -use std::task::{Context, Poll}; -use dashmap::DashMap; -use futures::Stream; +use futures::{Stream, stream}; use futures_util::StreamExt; -use rand::Rng; -use tokio::sync::mpsc::{ - channel, unbounded_channel, Receiver, Sender, UnboundedReceiver, UnboundedSender, -}; use torii_broker::types::EventUpdate; use torii_broker::MemoryBroker; use torii_proto::EventWithMetadata; use torii_proto::KeysClause; -use tracing::{error, trace}; use crate::GrpcConfig; - use super::match_keys; use torii_proto::proto::types::Event as ProtoEvent; use torii_proto::proto::world::SubscribeEventsResponse; -pub(crate) const LOG_TARGET: &str = "torii::grpc::server::subscriptions::event"; - -#[derive(Debug)] -pub struct EventSubscriber { - /// Event keys that the subscriber is interested in +/// Creates a stream that subscribes to event updates from the broker and applies filtering +pub fn subscribe_events_stream( keys: Vec, - /// The channel to send the response back to the subscriber. - sender: Sender>, -} - -#[derive(Debug, Default)] -pub struct EventManager { - subscribers: DashMap, config: GrpcConfig, -} - -impl EventManager { - pub fn new(config: GrpcConfig) -> Self { - Self { - subscribers: DashMap::new(), - config, - } - } - - pub async fn add_subscriber( - &self, - keys: Vec, - ) -> Receiver> { - let id = rand::thread_rng().gen::(); - let (sender, receiver) = channel(self.config.subscription_buffer_size); - - // NOTE: unlock issue with firefox/safari - // initially send empty stream message to return from - // initial subscribe call - let _ = sender - .send(Ok(SubscribeEventsResponse { event: None })) - .await; - - self.subscribers - .insert(id, EventSubscriber { keys, sender }); - - receiver - } - - pub(super) async fn remove_subscriber(&self, id: usize) { - self.subscribers.remove(&id); - } -} - -#[must_use = "Service does nothing unless polled"] -#[allow(missing_debug_implementations)] -pub struct Service { - simple_broker: Pin + Send>>, - event_sender: UnboundedSender, -} - -impl Service { - pub fn new(subs_manager: Arc) -> Self { - let (event_sender, event_receiver) = unbounded_channel(); - let service = Self { - simple_broker: if subs_manager.config.optimistic { - Box::pin(MemoryBroker::::subscribe_optimistic()) - } else { - Box::pin(MemoryBroker::::subscribe()) - }, - event_sender, - }; - - tokio::spawn(Self::publish_updates(subs_manager, event_receiver)); - - service - } - - async fn publish_updates( - subs: Arc, - mut event_receiver: UnboundedReceiver, - ) { - while let Some(event) = event_receiver.recv().await { - Self::process_event(&subs, &event).await; - } - } - - async fn process_event(subs: &Arc, event: &EventWithMetadata) { - let mut closed_stream = Vec::new(); - - let event = event.event.clone(); - for sub in subs.subscribers.iter() { - let idx = sub.key(); - let sub = sub.value(); - - if !match_keys(&event.keys, &sub.keys) { - continue; - } - - let resp = SubscribeEventsResponse { - event: Some(ProtoEvent { - keys: event - .keys - .iter() - .map(|k| k.to_bytes_be().to_vec()) - .collect(), - data: event - .data - .iter() - .map(|d| d.to_bytes_be().to_vec()) - .collect(), - transaction_hash: event.transaction_hash.to_bytes_be().to_vec(), - }), - }; - - // Use try_send to avoid blocking on slow subscribers - match sub.sender.try_send(Ok(resp)) { - Ok(_) => { - // Message sent successfully +) -> Pin> + Send>> { + let broker_stream: Pin + Send>> = if config.optimistic { + Box::pin(MemoryBroker::::subscribe_optimistic()) + } else { + Box::pin(MemoryBroker::::subscribe()) + }; + + let filtered_stream = broker_stream + .filter_map(move |event: EventWithMetadata| { + let keys_filter = keys.clone(); + async move { + // Apply keys filter + if !match_keys(&event.event.keys, &keys_filter) { + return None; } - Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => { - // Channel is full, subscriber is too slow - disconnect them - trace!(target = LOG_TARGET, subscription_id = %idx, "Disconnecting slow subscriber - channel full"); - closed_stream.push(*idx); - } - Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => { - // Channel is closed, subscriber has disconnected - closed_stream.push(*idx); - } - } - } - - for id in closed_stream { - trace!(target = LOG_TARGET, id = %id, "Closing events stream."); - subs.remove_subscriber(id).await - } - } -} - -impl Future for Service { - type Output = (); - - fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll { - let pin = self.get_mut(); - while let Poll::Ready(Some(event)) = pin.simple_broker.poll_next_unpin(cx) { - if let Err(e) = pin.event_sender.send(event) { - error!(target = LOG_TARGET, error = ?e, "Sending event to processor."); + Some(Ok(SubscribeEventsResponse { + event: Some(ProtoEvent { + keys: event.event + .keys + .iter() + .map(|k| k.to_bytes_be().to_vec()) + .collect(), + data: event.event + .data + .iter() + .map(|d| d.to_bytes_be().to_vec()) + .collect(), + transaction_hash: event.event.transaction_hash.to_bytes_be().to_vec(), + }), + })) } - } + }) + .chain(stream::once(async move { + // Send initial empty response for firefox/safari unlock issue + Ok(SubscribeEventsResponse { event: None }) + })); - Poll::Pending - } + Box::pin(filtered_stream) } diff --git a/crates/grpc/server/src/subscriptions/event_message.rs b/crates/grpc/server/src/subscriptions/event_message.rs index a4b8ca9e..79ddfa61 100644 --- a/crates/grpc/server/src/subscriptions/event_message.rs +++ b/crates/grpc/server/src/subscriptions/event_message.rs @@ -1,187 +1,61 @@ -use std::future::Future; use std::pin::Pin; -use std::sync::Arc; -use std::task::{Context, Poll}; -use dashmap::DashMap; use dojo_types::schema::Ty; -use futures::Stream; +use futures::{Stream, stream}; use futures_util::StreamExt; use rand::Rng; -use tokio::sync::mpsc::{ - channel, unbounded_channel, Receiver, Sender, UnboundedReceiver, UnboundedSender, -}; use torii_broker::types::EventMessageUpdate; use torii_broker::MemoryBroker; use torii_proto::schema::EntityWithMetadata; use torii_proto::Clause; -use tracing::{error, trace}; use torii_proto::proto::world::SubscribeEntityResponse; use crate::GrpcConfig; - use super::match_entity; -pub(crate) const LOG_TARGET: &str = "torii::grpc::server::subscriptions::event_message"; - -#[derive(Debug)] -pub struct EventMessageSubscriber { - /// The clause that the subscriber is interested in - pub(crate) clause: Option, - /// The channel to send the response back to the subscriber. - pub(crate) sender: Sender>, -} - -#[derive(Debug, Default)] -pub struct EventMessageManager { - subscribers: DashMap, +/// Creates a stream that subscribes to event message updates from the broker and applies filtering +pub fn subscribe_event_messages_stream( + clause: Option, config: GrpcConfig, -} - -impl EventMessageManager { - pub fn new(config: GrpcConfig) -> Self { - Self { - subscribers: DashMap::new(), - config, - } - } - - pub async fn add_subscriber( - &self, - clause: Option, - ) -> Receiver> { - let subscription_id = rand::thread_rng().gen::(); - let (sender, receiver) = channel(self.config.subscription_buffer_size); - - // NOTE: unlock issue with firefox/safari - // initially send empty stream message to return from - // initial subscribe call - let _ = sender - .send(Ok(SubscribeEntityResponse { - entity: None, - subscription_id, - })) - .await; - - self.subscribers - .insert(subscription_id, EventMessageSubscriber { clause, sender }); - - receiver - } - - pub async fn update_subscriber(&self, id: u64, clause: Option) { - if let Some(mut subscriber) = self.subscribers.get_mut(&id) { - subscriber.clause = clause; - } - } - - pub(super) async fn remove_subscriber(&self, id: u64) { - self.subscribers.remove(&id); - } -} - -#[must_use = "Service does nothing unless polled"] -#[allow(missing_debug_implementations)] -pub struct Service { - simple_broker: Pin> + Send>>, - event_sender: UnboundedSender>, -} - -impl Service { - pub fn new(subs_manager: Arc) -> Self { - let (event_sender, event_receiver) = unbounded_channel(); - let service = Self { - simple_broker: if subs_manager.config.optimistic { - Box::pin(MemoryBroker::::subscribe_optimistic()) - } else { - Box::pin(MemoryBroker::::subscribe()) - }, - event_sender, - }; - - tokio::spawn(Self::publish_updates(subs_manager, event_receiver)); - - service - } - - async fn publish_updates( - subs: Arc, - mut event_receiver: UnboundedReceiver>, - ) { - while let Some(event) = event_receiver.recv().await { - Self::process_event_update(&subs, &event).await; - } - } - - async fn process_event_update( - subs: &Arc, - event: &EntityWithMetadata, - ) { - let mut closed_stream = Vec::new(); - - for sub in subs.subscribers.iter() { - let idx = sub.key(); - let sub = sub.value(); - - // Check if the subscriber is interested in this entity - // If we have a clause of hashed keys, then check that the id of the entity - // is in the list of hashed keys. - - // If we have a clause of keys, then check that the key pattern of the entity - // matches the key pattern of the subscriber. - if let Some(clause) = &sub.clause { - if !match_entity( - event.entity.hashed_keys, - &event.keys, - &event.entity.models.first().map(|m| Ty::Struct(m.clone())), - clause, - ) { - continue; +) -> Pin> + Send>> { + let subscription_id = rand::thread_rng().gen::(); + + let broker_stream: Pin> + Send>> = if config.optimistic { + Box::pin(MemoryBroker::::subscribe_optimistic()) + } else { + Box::pin(MemoryBroker::::subscribe()) + }; + + let filtered_stream = broker_stream + .filter_map(move |event: EntityWithMetadata| { + let clause = clause.clone(); + async move { + // Apply filter if clause exists + if let Some(clause) = &clause { + if !match_entity( + event.entity.hashed_keys, + &event.keys, + &event.entity.models.first().map(|m| Ty::Struct(m.clone())), + &clause, + ) { + return None; + } } - } - - let resp = SubscribeEntityResponse { - entity: Some(event.entity.clone().into()), - subscription_id: *idx, - }; - // Use try_send to avoid blocking on slow subscribers - match sub.sender.try_send(Ok(resp)) { - Ok(_) => { - // Message sent successfully - } - Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => { - // Channel is full, subscriber is too slow - disconnect them - trace!(target = LOG_TARGET, subscription_id = %idx, "Disconnecting slow subscriber - channel full"); - closed_stream.push(*idx); - } - Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => { - // Channel is closed, subscriber has disconnected - closed_stream.push(*idx); - } + Some(Ok(SubscribeEntityResponse { + entity: Some(event.entity.clone().into()), + subscription_id, + })) } - } - - for id in closed_stream { - trace!(target = LOG_TARGET, id = %id, "Closing entity stream."); - subs.remove_subscriber(id).await - } - } -} - -impl Future for Service { - type Output = (); - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.get_mut(); - - while let Poll::Ready(Some(event)) = this.simple_broker.poll_next_unpin(cx) { - if let Err(e) = this.event_sender.send(event) { - error!(target = LOG_TARGET, error = ?e, "Sending event update to processor."); - } - } + }) + .chain(stream::once(async move { + // Send initial empty response for firefox/safari unlock issue + Ok(SubscribeEntityResponse { + entity: None, + subscription_id, + }) + })); - Poll::Pending - } + Box::pin(filtered_stream) } diff --git a/crates/grpc/server/src/subscriptions/indexer.rs b/crates/grpc/server/src/subscriptions/indexer.rs index 26b4a4a3..2eba0c96 100644 --- a/crates/grpc/server/src/subscriptions/indexer.rs +++ b/crates/grpc/server/src/subscriptions/indexer.rs @@ -1,174 +1,65 @@ -use std::future::Future; use std::pin::Pin; use std::sync::Arc; -use std::task::{Context, Poll}; -use dashmap::DashMap; -use futures::{Stream, StreamExt}; -use rand::Rng; +use futures::{Stream, stream}; +use futures_util::StreamExt; use starknet::core::types::Felt; -use tokio::sync::mpsc::{ - channel, unbounded_channel, Receiver, Sender, UnboundedReceiver, UnboundedSender, -}; use torii_broker::types::ContractUpdate; use torii_broker::MemoryBroker; use torii_proto::ContractCursor; use torii_storage::Storage; use torii_storage::StorageError; -use tracing::{error, trace}; use torii_proto::proto::world::SubscribeIndexerResponse; use crate::GrpcConfig; -pub(crate) const LOG_TARGET: &str = "torii::grpc::server::subscriptions::indexer"; - -#[derive(Debug)] -pub struct IndexerSubscriber { - /// Contract address that the subscriber is interested in +/// Creates a stream that subscribes to indexer updates from the broker and applies filtering +pub async fn subscribe_indexer_stream( + storage: Arc, contract_address: Felt, - /// The channel to send the response back to the subscriber. - sender: Sender>, -} - -#[derive(Debug, Default)] -pub struct IndexerManager { - subscribers: DashMap, config: GrpcConfig, -} - -impl IndexerManager { - pub fn new(config: GrpcConfig) -> Self { - Self { - subscribers: DashMap::new(), - config, - } - } - - pub async fn add_subscriber( - &self, - storage: Arc, - contract_address: Felt, - ) -> Result>, StorageError> { - let id = rand::thread_rng().gen::(); - let (sender, receiver) = channel(self.config.subscription_buffer_size); - - let contracts = storage.cursors().await?; - for (contract_address, contract) in contracts { - let _ = sender - .send(Ok(SubscribeIndexerResponse { - head: contract.head.unwrap() as i64, - tps: contract.tps.unwrap() as i64, - last_block_timestamp: contract.last_block_timestamp.unwrap() as i64, - contract_address: contract_address.to_bytes_be().to_vec(), - })) - .await; - } - self.subscribers.insert( - id, - IndexerSubscriber { - contract_address, - sender, - }, - ); - - Ok(receiver) - } - - pub(super) async fn remove_subscriber(&self, id: usize) { - self.subscribers.remove(&id); - } -} - -#[must_use = "Service does nothing unless polled"] -#[allow(missing_debug_implementations)] -pub struct Service { - simple_broker: Pin + Send>>, - update_sender: UnboundedSender, -} - -impl Service { - pub fn new(subs_manager: Arc) -> Self { - let (update_sender, update_receiver) = unbounded_channel(); - let service = Self { - simple_broker: if subs_manager.config.optimistic { - Box::pin(MemoryBroker::::subscribe_optimistic()) - } else { - Box::pin(MemoryBroker::::subscribe()) - }, - update_sender, - }; - - tokio::spawn(Self::publish_updates(subs_manager, update_receiver)); - - service - } - - async fn publish_updates( - subs: Arc, - mut update_receiver: UnboundedReceiver, - ) { - while let Some(update) = update_receiver.recv().await { - Self::process_update(&subs, &update).await; - } - } - - async fn process_update(subs: &Arc, contract: &ContractCursor) { - let mut closed_stream = Vec::new(); - - for sub in subs.subscribers.iter() { - let idx = sub.key(); - let sub = sub.value(); - - if sub.contract_address != Felt::ZERO - && sub.contract_address != contract.contract_address - { - continue; - } - - let resp = SubscribeIndexerResponse { +) -> Result> + Send>>, StorageError> { + let broker_stream: Pin + Send>> = if config.optimistic { + Box::pin(MemoryBroker::::subscribe_optimistic()) + } else { + Box::pin(MemoryBroker::::subscribe()) + }; + + // Get initial contract cursors + let contracts = storage.cursors().await?; + let initial_responses: Vec> = contracts + .into_iter() + .map(|(contract_addr, contract)| { + Ok(SubscribeIndexerResponse { head: contract.head.unwrap() as i64, tps: contract.tps.unwrap() as i64, last_block_timestamp: contract.last_block_timestamp.unwrap() as i64, - contract_address: contract.contract_address.to_bytes_be().to_vec(), - }; - - // Use try_send to avoid blocking on slow subscribers - match sub.sender.try_send(Ok(resp)) { - Ok(_) => { - // Message sent successfully - } - Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => { - // Channel is full, subscriber is too slow - disconnect them - trace!(target = LOG_TARGET, subscription_id = %idx, "Disconnecting slow subscriber - channel full"); - closed_stream.push(*idx); - } - Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => { - // Channel is closed, subscriber has disconnected - closed_stream.push(*idx); + contract_address: contract_addr.to_bytes_be().to_vec(), + }) + }) + .collect(); + + let filtered_stream = broker_stream + .filter_map(move |contract: ContractCursor| { + let contract_address_filter = contract_address; + async move { + // Apply contract address filter + if contract_address_filter != Felt::ZERO + && contract_address_filter != contract.contract_address + { + return None; } - } - } - - for id in closed_stream { - trace!(target = LOG_TARGET, id = %id, "Closing indexer updates stream."); - subs.remove_subscriber(id).await - } - } -} - -impl Future for Service { - type Output = (); - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.get_mut(); - while let Poll::Ready(Some(update)) = this.simple_broker.poll_next_unpin(cx) { - if let Err(e) = this.update_sender.send(update) { - error!(target = LOG_TARGET, error = ?e, "Sending indexer update to processor."); + Some(Ok(SubscribeIndexerResponse { + head: contract.head.unwrap() as i64, + tps: contract.tps.unwrap() as i64, + last_block_timestamp: contract.last_block_timestamp.unwrap() as i64, + contract_address: contract.contract_address.to_bytes_be().to_vec(), + })) } - } + }) + .chain(stream::iter(initial_responses.into_iter())); - Poll::Pending - } + Ok(Box::pin(filtered_stream)) } diff --git a/crates/grpc/server/src/subscriptions/token.rs b/crates/grpc/server/src/subscriptions/token.rs index d5328ccb..2c1dff07 100644 --- a/crates/grpc/server/src/subscriptions/token.rs +++ b/crates/grpc/server/src/subscriptions/token.rs @@ -1,195 +1,68 @@ use std::collections::HashSet; -use std::future::Future; use std::pin::Pin; -use std::sync::Arc; -use std::task::{Context, Poll}; use crypto_bigint::U256; -use dashmap::DashMap; -use futures::{Stream, StreamExt}; +use futures::{Stream, stream}; +use futures_util::StreamExt; use rand::Rng; use starknet_crypto::Felt; -use tokio::sync::mpsc::{ - channel, unbounded_channel, Receiver, Sender, UnboundedReceiver, UnboundedSender, -}; use torii_broker::types::TokenUpdate; use torii_broker::MemoryBroker; -use tracing::{error, trace}; use torii_proto::proto::world::SubscribeTokensResponse; use torii_proto::Token; use crate::GrpcConfig; -pub(crate) const LOG_TARGET: &str = "torii::grpc::server::subscriptions::token"; - -#[derive(Debug)] -pub struct TokenSubscriber { - /// Contract addresses that the subscriber is interested in - /// If empty, subscriber receives updates for all contracts - pub contract_addresses: HashSet, - /// Token IDs that the subscriber is interested in - /// If empty, subscriber receives updates for all tokens - pub token_ids: HashSet, - /// The channel to send the response back to the subscriber. - pub sender: Sender>, -} - -#[derive(Debug, Default)] -pub struct TokenManager { - subscribers: DashMap, +/// Creates a stream that subscribes to token updates from the broker and applies filtering +pub fn subscribe_tokens_stream( + contract_addresses: Vec, + token_ids: Vec, config: GrpcConfig, -} - -impl TokenManager { - pub fn new(config: GrpcConfig) -> Self { - Self { - subscribers: DashMap::new(), - config, - } - } - - pub async fn add_subscriber( - &self, - contract_addresses: Vec, - token_ids: Vec, - ) -> Receiver> { - let subscription_id = rand::thread_rng().gen::(); - let (sender, receiver) = channel(self.config.subscription_buffer_size); - - // Send initial empty response - let _ = sender - .send(Ok(SubscribeTokensResponse { - subscription_id, - token: None, - })) - .await; - - self.subscribers.insert( - subscription_id, - TokenSubscriber { - contract_addresses: contract_addresses.into_iter().collect(), - token_ids: token_ids.into_iter().collect(), - sender, - }, - ); - - receiver - } - - pub async fn update_subscriber( - &self, - id: u64, - contract_addresses: Vec, - token_ids: Vec, - ) { - if let Some(mut subscriber) = self.subscribers.get_mut(&id) { - subscriber.contract_addresses = contract_addresses.into_iter().collect(); - subscriber.token_ids = token_ids.into_iter().collect(); - } - } - - pub(super) async fn remove_subscriber(&self, id: u64) { - self.subscribers.remove(&id); - } -} - -#[must_use = "Service does nothing unless polled"] -#[allow(missing_debug_implementations)] -pub struct Service { - simple_broker: Pin + Send>>, - token_sender: UnboundedSender, -} - -impl Service { - pub fn new(subs_manager: Arc) -> Self { - let (token_sender, token_receiver) = unbounded_channel(); - let service = Self { - simple_broker: if subs_manager.config.optimistic { - Box::pin(MemoryBroker::::subscribe_optimistic()) - } else { - Box::pin(MemoryBroker::::subscribe()) - }, - token_sender, - }; - - tokio::spawn(Self::publish_updates(subs_manager, token_receiver)); - - service - } - - async fn publish_updates( - subs: Arc, - mut token_receiver: UnboundedReceiver, - ) { - while let Some(token) = token_receiver.recv().await { - Self::process_token_update(&subs, &token).await; - } - } - - async fn process_token_update(subs: &Arc, token: &Token) { - let mut closed_stream = Vec::new(); - - for sub in subs.subscribers.iter() { - let idx = sub.key(); - let sub = sub.value(); - - // Skip if contract address filter doesn't match - if !sub.contract_addresses.is_empty() - && !sub.contract_addresses.contains(&token.contract_address) - { - continue; - } - - // Skip if token ID filter doesn't match - if !sub.token_ids.is_empty() - && token.token_id.is_some() - && !sub.token_ids.contains(&token.token_id.unwrap()) - { - continue; - } - - let resp = SubscribeTokensResponse { - subscription_id: *idx, - token: Some(token.clone().into()), - }; - - // Use try_send to avoid blocking on slow subscribers - match sub.sender.try_send(Ok(resp)) { - Ok(_) => { - // Message sent successfully +) -> Pin> + Send>> { + let subscription_id = rand::thread_rng().gen::(); + let contract_addresses_set: HashSet = contract_addresses.into_iter().collect(); + let token_ids_set: HashSet = token_ids.into_iter().collect(); + + let broker_stream: Pin + Send>> = if config.optimistic { + Box::pin(MemoryBroker::::subscribe_optimistic()) + } else { + Box::pin(MemoryBroker::::subscribe()) + }; + + let filtered_stream = broker_stream + .filter_map(move |token: Token| { + let contract_addresses_filter = contract_addresses_set.clone(); + let token_ids_filter = token_ids_set.clone(); + async move { + // Apply contract address filter + if !contract_addresses_filter.is_empty() + && !contract_addresses_filter.contains(&token.contract_address) + { + return None; } - Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => { - // Channel is full, subscriber is too slow - disconnect them - trace!(target = LOG_TARGET, subscription_id = %idx, "Disconnecting slow subscriber - channel full"); - closed_stream.push(*idx); - } - Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => { - // Channel is closed, subscriber has disconnected - closed_stream.push(*idx); - } - } - } - - for id in closed_stream { - trace!(target = LOG_TARGET, id = %id, "Closing token stream."); - subs.remove_subscriber(id).await - } - } -} -impl Future for Service { - type Output = (); - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.get_mut(); + // Apply token ID filter + if !token_ids_filter.is_empty() + && token.token_id.is_some() + && !token_ids_filter.contains(&token.token_id.unwrap()) + { + return None; + } - while let Poll::Ready(Some(token)) = this.simple_broker.poll_next_unpin(cx) { - if let Err(e) = this.token_sender.send(token) { - error!(target = LOG_TARGET, error = ?e, "Sending token update to processor."); + Some(Ok(SubscribeTokensResponse { + subscription_id, + token: Some(token.clone().into()), + })) } - } + }) + .chain(stream::once(async move { + // Send initial empty response for firefox/safari unlock issue + Ok(SubscribeTokensResponse { + subscription_id, + token: None, + }) + })); - Poll::Pending - } + Box::pin(filtered_stream) } diff --git a/crates/grpc/server/src/subscriptions/token_balance.rs b/crates/grpc/server/src/subscriptions/token_balance.rs index 53c7b4ae..ea7f5af4 100644 --- a/crates/grpc/server/src/subscriptions/token_balance.rs +++ b/crates/grpc/server/src/subscriptions/token_balance.rs @@ -1,209 +1,78 @@ use std::collections::HashSet; -use std::future::Future; use std::pin::Pin; -use std::sync::Arc; -use std::task::{Context, Poll}; use crypto_bigint::U256; -use dashmap::DashMap; -use futures::{Stream, StreamExt}; +use futures::{Stream, stream}; +use futures_util::StreamExt; use rand::Rng; use starknet_crypto::Felt; -use tokio::sync::mpsc::{ - channel, unbounded_channel, Receiver, Sender, UnboundedReceiver, UnboundedSender, -}; use torii_broker::types::TokenBalanceUpdate; use torii_broker::MemoryBroker; use torii_proto::TokenBalance; -use tracing::{error, trace}; use torii_proto::proto::world::SubscribeTokenBalancesResponse; use crate::GrpcConfig; -pub(crate) const LOG_TARGET: &str = "torii::grpc::server::subscriptions::balance"; - -#[derive(Debug)] -pub struct TokenBalanceSubscriber { - /// Contract addresses that the subscriber is interested in - /// If empty, subscriber receives updates for all contracts - pub contract_addresses: HashSet, - /// Account addresses that the subscriber is interested in - /// If empty, subscriber receives updates for all accounts - pub account_addresses: HashSet, - /// Token IDs that the subscriber is interested in - /// If empty, subscriber receives updates for all tokens - pub token_ids: HashSet, - /// The channel to send the response back to the subscriber. - pub sender: Sender>, -} - -#[derive(Debug, Default)] -pub struct TokenBalanceManager { - subscribers: DashMap, +/// Creates a stream that subscribes to token balance updates from the broker and applies filtering +pub fn subscribe_token_balances_stream( + contract_addresses: Vec, + account_addresses: Vec, + token_ids: Vec, config: GrpcConfig, -} - -impl TokenBalanceManager { - pub fn new(config: GrpcConfig) -> Self { - Self { - subscribers: DashMap::new(), - config, - } - } - - pub async fn add_subscriber( - &self, - contract_addresses: Vec, - account_addresses: Vec, - token_ids: Vec, - ) -> Receiver> { - let subscription_id = rand::thread_rng().gen::(); - let (sender, receiver) = channel(self.config.subscription_buffer_size); - - // Send initial empty response - let _ = sender - .send(Ok(SubscribeTokenBalancesResponse { - subscription_id, - balance: None, - })) - .await; - - self.subscribers.insert( - subscription_id, - TokenBalanceSubscriber { - contract_addresses: contract_addresses.into_iter().collect(), - account_addresses: account_addresses.into_iter().collect(), - token_ids: token_ids.into_iter().collect(), - sender, - }, - ); - - receiver - } - - pub async fn update_subscriber( - &self, - id: u64, - contract_addresses: Vec, - account_addresses: Vec, - token_ids: Vec, - ) { - if let Some(mut subscriber) = self.subscribers.get_mut(&id) { - subscriber.contract_addresses = contract_addresses.into_iter().collect(); - subscriber.account_addresses = account_addresses.into_iter().collect(); - subscriber.token_ids = token_ids.into_iter().collect(); - } - } - - pub(super) async fn remove_subscriber(&self, id: u64) { - self.subscribers.remove(&id); - } -} - -#[must_use = "Service does nothing unless polled"] -#[allow(missing_debug_implementations)] -pub struct Service { - simple_broker: Pin + Send>>, - balance_sender: UnboundedSender, -} - -impl Service { - pub fn new(subs_manager: Arc) -> Self { - let (balance_sender, balance_receiver) = unbounded_channel(); - let service = Self { - simple_broker: if subs_manager.config.optimistic { - Box::pin(MemoryBroker::::subscribe_optimistic()) - } else { - Box::pin(MemoryBroker::::subscribe()) - }, - balance_sender, - }; - - tokio::spawn(Self::publish_updates(subs_manager, balance_receiver)); - - service - } - - async fn publish_updates( - subs: Arc, - mut balance_receiver: UnboundedReceiver, - ) { - while let Some(balance) = balance_receiver.recv().await { - Self::process_balance_update(&subs, &balance).await; - } - } - - async fn process_balance_update(subs: &Arc, balance: &TokenBalance) { - let mut closed_stream = Vec::new(); - - for sub in subs.subscribers.iter() { - let idx = sub.key(); - let sub = sub.value(); - - // Skip if contract address filter doesn't match - if !sub.contract_addresses.is_empty() - && !sub.contract_addresses.contains(&balance.contract_address) - { - continue; - } - - // Skip if account address filter doesn't match - if !sub.account_addresses.is_empty() - && !sub.account_addresses.contains(&balance.account_address) - { - continue; - } - - // Skip if token ID filter doesn't match - if !sub.token_ids.is_empty() - && balance.token_id.is_some() - && !sub.token_ids.contains(&balance.token_id.unwrap()) - { - continue; - } - - let resp = SubscribeTokenBalancesResponse { - subscription_id: *idx, - balance: Some(balance.clone().into()), - }; - - // Use try_send to avoid blocking on slow subscribers - match sub.sender.try_send(Ok(resp)) { - Ok(_) => { - // Message sent successfully +) -> Pin> + Send>> { + let subscription_id = rand::thread_rng().gen::(); + let contract_addresses_set: HashSet = contract_addresses.into_iter().collect(); + let account_addresses_set: HashSet = account_addresses.into_iter().collect(); + let token_ids_set: HashSet = token_ids.into_iter().collect(); + + let broker_stream: Pin + Send>> = if config.optimistic { + Box::pin(MemoryBroker::::subscribe_optimistic()) + } else { + Box::pin(MemoryBroker::::subscribe()) + }; + + let filtered_stream = broker_stream + .filter_map(move |balance: TokenBalance| { + let contract_addresses_filter = contract_addresses_set.clone(); + let account_addresses_filter = account_addresses_set.clone(); + let token_ids_filter = token_ids_set.clone(); + async move { + // Apply contract address filter + if !contract_addresses_filter.is_empty() + && !contract_addresses_filter.contains(&balance.contract_address) + { + return None; } - Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => { - // Channel is full, subscriber is too slow - disconnect them - trace!(target = LOG_TARGET, subscription_id = %idx, "Disconnecting slow subscriber - channel full"); - closed_stream.push(*idx); - } - Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => { - // Channel is closed, subscriber has disconnected - closed_stream.push(*idx); - } - } - } - - for id in closed_stream { - trace!(target = LOG_TARGET, id = %id, "Closing balance stream."); - subs.remove_subscriber(id).await - } - } -} -impl Future for Service { - type Output = (); + // Apply account address filter + if !account_addresses_filter.is_empty() + && !account_addresses_filter.contains(&balance.account_address) + { + return None; + } - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.get_mut(); + // Apply token ID filter + if !token_ids_filter.is_empty() + && balance.token_id.is_some() + && !token_ids_filter.contains(&balance.token_id.unwrap()) + { + return None; + } - while let Poll::Ready(Some(balance)) = this.simple_broker.poll_next_unpin(cx) { - if let Err(e) = this.balance_sender.send(balance) { - error!(target = LOG_TARGET, error = ?e, "Sending balance update to processor."); + Some(Ok(SubscribeTokenBalancesResponse { + subscription_id, + balance: Some(balance.clone().into()), + })) } - } + }) + .chain(stream::once(async move { + // Send initial empty response for firefox/safari unlock issue + Ok(SubscribeTokenBalancesResponse { + subscription_id, + balance: None, + }) + })); - Poll::Pending - } + Box::pin(filtered_stream) } diff --git a/crates/grpc/server/src/subscriptions/transaction.rs b/crates/grpc/server/src/subscriptions/transaction.rs index 5b1436c2..81749422 100644 --- a/crates/grpc/server/src/subscriptions/transaction.rs +++ b/crates/grpc/server/src/subscriptions/transaction.rs @@ -1,18 +1,9 @@ -use std::future::Future; use std::pin::Pin; -use std::sync::Arc; -use std::task::{Context, Poll}; -use dashmap::DashMap; -use futures::Stream; +use futures::{Stream, stream}; use futures_util::StreamExt; -use rand::Rng; -use tokio::sync::mpsc::{ - channel, unbounded_channel, Receiver, Sender, UnboundedReceiver, UnboundedSender, -}; use torii_broker::types::TransactionUpdate; use torii_broker::MemoryBroker; -use tracing::{error, trace}; use torii_proto::proto::world::SubscribeTransactionsResponse; use torii_proto::Transaction; @@ -20,195 +11,82 @@ use torii_proto::TransactionFilter; use crate::GrpcConfig; -pub(crate) const LOG_TARGET: &str = "torii::grpc::server::subscriptions::transaction"; - -#[derive(Debug)] -pub struct TransactionSubscriber { - /// The filter to apply to the subscription. +/// Creates a stream that subscribes to transaction updates from the broker and applies filtering +pub fn subscribe_transactions_stream( filter: Option, - /// The channel to send the response back to the subscriber. - sender: Sender>, -} - -#[derive(Debug, Default)] -pub struct TransactionManager { - subscribers: DashMap, config: GrpcConfig, -} - -impl TransactionManager { - pub fn new(config: GrpcConfig) -> Self { - Self { - subscribers: DashMap::new(), - config, - } - } - - #[allow(clippy::too_many_arguments)] - pub async fn add_subscriber( - &self, - filter: Option, - ) -> Receiver> { - let id = rand::thread_rng().gen::(); - let (sender, receiver) = channel(self.config.subscription_buffer_size); - - // NOTE: unlock issue with firefox/safari - // initially send empty stream message to return from - // initial subscribe call - let _ = sender - .send(Ok(SubscribeTransactionsResponse { transaction: None })) - .await; - - self.subscribers - .insert(id, TransactionSubscriber { filter, sender }); - - receiver - } - - pub(super) async fn remove_subscriber(&self, id: usize) { - self.subscribers.remove(&id); - } -} - -#[must_use = "Service does nothing unless polled"] -#[allow(missing_debug_implementations)] -pub struct Service { - simple_broker: Pin + Send>>, - transaction_sender: UnboundedSender, -} - -impl Service { - pub fn new(subs_manager: Arc) -> Self { - let (transaction_sender, transaction_receiver) = unbounded_channel(); - let service = Self { - simple_broker: if subs_manager.config.optimistic { - Box::pin(MemoryBroker::::subscribe_optimistic()) - } else { - Box::pin(MemoryBroker::::subscribe()) - }, - transaction_sender, - }; - - tokio::spawn(Self::publish_updates(subs_manager, transaction_receiver)); - - service - } - - async fn publish_updates( - subs: Arc, - mut transaction_receiver: UnboundedReceiver, - ) { - while let Some(transaction) = transaction_receiver.recv().await { - Self::process_transaction(&subs, &transaction).await; - } - } - - async fn process_transaction(subs: &Arc, transaction: &Transaction) { - let mut closed_stream = Vec::new(); - - for sub in subs.subscribers.iter() { - let idx = sub.key(); - let sub = sub.value(); - - if let Some(filter) = &sub.filter { - if !filter.transaction_hashes.is_empty() - && !filter - .transaction_hashes - .contains(&transaction.transaction_hash) - { - continue; - } +) -> Pin> + Send>> { + let broker_stream: Pin + Send>> = if config.optimistic { + Box::pin(MemoryBroker::::subscribe_optimistic()) + } else { + Box::pin(MemoryBroker::::subscribe()) + }; + + let filtered_stream = broker_stream + .filter_map(move |transaction: Transaction| { + let filter = filter.clone(); + async move { + // Apply filter if it exists + if let Some(filter) = &filter { + if !filter.transaction_hashes.is_empty() + && !filter.transaction_hashes.contains(&transaction.transaction_hash) + { + return None; + } - if !filter.caller_addresses.is_empty() { - for caller_address in &transaction.calls { - if !filter - .caller_addresses - .contains(&caller_address.caller_address) - { - continue; + if !filter.caller_addresses.is_empty() { + for caller_address in &transaction.calls { + if !filter.caller_addresses.contains(&caller_address.caller_address) { + return None; + } } } - } - if !filter.contract_addresses.is_empty() { - for contract_address in &transaction.calls { - if !filter - .contract_addresses - .contains(&contract_address.contract_address) - { - continue; + if !filter.contract_addresses.is_empty() { + for contract_address in &transaction.calls { + if !filter.contract_addresses.contains(&contract_address.contract_address) { + return None; + } } } - } - if !filter.entrypoints.is_empty() { - for entrypoint in &transaction.calls { - if !filter.entrypoints.contains(&entrypoint.entrypoint) { - continue; + if !filter.entrypoints.is_empty() { + for entrypoint in &transaction.calls { + if !filter.entrypoints.contains(&entrypoint.entrypoint) { + return None; + } } } - } - if !filter.model_selectors.is_empty() { - for model_selector in &transaction.unique_models { - if !filter.model_selectors.contains(model_selector) { - continue; + if !filter.model_selectors.is_empty() { + for model_selector in &transaction.unique_models { + if !filter.model_selectors.contains(model_selector) { + return None; + } } } - } - - if filter.from_block.is_some() - && transaction.block_number < filter.from_block.unwrap() - { - continue; - } - - if filter.to_block.is_some() && transaction.block_number > filter.to_block.unwrap() - { - continue; - } - } - let resp: SubscribeTransactionsResponse = SubscribeTransactionsResponse { - transaction: Some(transaction.clone().into()), - }; + if filter.from_block.is_some() + && transaction.block_number < filter.from_block.unwrap() + { + return None; + } - // Use try_send to avoid blocking on slow subscribers - match sub.sender.try_send(Ok(resp)) { - Ok(_) => { - // Message sent successfully - } - Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => { - // Channel is full, subscriber is too slow - disconnect them - trace!(target = LOG_TARGET, subscription_id = %idx, "Disconnecting slow subscriber - channel full"); - closed_stream.push(*idx); - } - Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => { - // Channel is closed, subscriber has disconnected - closed_stream.push(*idx); + if filter.to_block.is_some() && transaction.block_number > filter.to_block.unwrap() + { + return None; + } } - } - } - - for id in closed_stream { - trace!(target = LOG_TARGET, id = %id, "Closing events stream."); - subs.remove_subscriber(id).await - } - } -} - -impl Future for Service { - type Output = (); - - fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll { - let pin = self.get_mut(); - while let Poll::Ready(Some(transaction)) = pin.simple_broker.poll_next_unpin(cx) { - if let Err(e) = pin.transaction_sender.send(transaction) { - error!(target = LOG_TARGET, error = ?e, "Sending transaction to processor."); + Some(Ok(SubscribeTransactionsResponse { + transaction: Some(transaction.clone().into()), + })) } - } + }) + .chain(stream::once(async move { + // Send initial empty response for firefox/safari unlock issue + Ok(SubscribeTransactionsResponse { transaction: None }) + })); - Poll::Pending - } + Box::pin(filtered_stream) }