Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
251 changes: 58 additions & 193 deletions crates/grpc/server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use std::net::SocketAddr;
use std::pin::Pin;
use std::str;
use std::sync::Arc;
use std::sync::LazyLock;
use std::time::Duration;

use crypto_bigint::U256;
Expand All @@ -21,13 +20,10 @@ use proto::world::{
};
use starknet::core::types::Felt;
use starknet::providers::Provider;
use subscriptions::event::EventManager;
use subscriptions::indexer::IndexerManager;
use subscriptions::token::TokenManager;
use subscriptions::token_balance::TokenBalanceManager;

use tokio::net::TcpListener;
use tokio::sync::mpsc::UnboundedSender;
use tokio_stream::wrappers::{ReceiverStream, TcpListenerStream};
use tokio_stream::wrappers::TcpListenerStream;
use tonic::codec::CompressionEncoding;
use tonic::transport::Server;
use tonic::{Request, Response, Status};
Expand All @@ -37,10 +33,7 @@ use torii_proto::error::ProtoError;
use torii_storage::Storage;
use tower_http::cors::{AllowOrigin, CorsLayer};

use crate::subscriptions::transaction::TransactionManager;

use self::subscriptions::entity::EntityManager;
use self::subscriptions::event_message::EventMessageManager;
use torii_proto::proto::world::world_server::WorldServer;
use torii_proto::proto::world::{
PublishMessageBatchRequest, PublishMessageBatchResponse, PublishMessageRequest,
Expand All @@ -60,33 +53,13 @@ use torii_proto::Message;

use anyhow::{anyhow, Error};

// Shared subscription runtime for all DojoWorld instances
// This provides performance isolation from user-facing API requests
// Subscriptions involve heavy polling and should not starve API response threads
static SUBSCRIPTION_RUNTIME: LazyLock<tokio::runtime::Runtime> = LazyLock::new(|| {
let worker_threads = (num_cpus::get() / 2).clamp(2, 8);
tokio::runtime::Builder::new_multi_thread()
.worker_threads(worker_threads)
.thread_name("torii-grpc-subscriptions")
.enable_all()
.build()
.expect("Failed to create subscriptions runtime")
});

#[derive(Debug)]
pub struct DojoWorld<P: Provider + Sync> {
storage: Arc<dyn Storage>,
provider: Arc<P>,
world_address: Felt,
cross_messaging_tx: Option<UnboundedSender<Message>>,
entity_manager: Arc<EntityManager>,
event_message_manager: Arc<EventMessageManager>,
event_manager: Arc<EventManager>,
indexer_manager: Arc<IndexerManager>,
token_balance_manager: Arc<TokenBalanceManager>,
token_manager: Arc<TokenManager>,
transaction_manager: Arc<TransactionManager>,
_config: GrpcConfig,
config: GrpcConfig,
}

impl<P: Provider + Sync> DojoWorld<P> {
Expand All @@ -97,57 +70,12 @@ impl<P: Provider + Sync> DojoWorld<P> {
cross_messaging_tx: Option<UnboundedSender<Message>>,
config: GrpcConfig,
) -> Self {
let entity_manager = Arc::new(EntityManager::new(config.clone()));
let event_message_manager = Arc::new(EventMessageManager::new(config.clone()));
let event_manager = Arc::new(EventManager::new(config.clone()));
let indexer_manager = Arc::new(IndexerManager::new(config.clone()));
let token_balance_manager = Arc::new(TokenBalanceManager::new(config.clone()));
let token_manager = Arc::new(TokenManager::new(config.clone()));
let transaction_manager = Arc::new(TransactionManager::new(config.clone()));

// Spawn subscription services on the dedicated subscription runtime
// These services do heavy polling and should be isolated from API request handling
SUBSCRIPTION_RUNTIME.spawn(subscriptions::entity::Service::new(Arc::clone(
&entity_manager,
)));

SUBSCRIPTION_RUNTIME.spawn(subscriptions::event_message::Service::new(Arc::clone(
&event_message_manager,
)));

SUBSCRIPTION_RUNTIME.spawn(subscriptions::event::Service::new(Arc::clone(
&event_manager,
)));

SUBSCRIPTION_RUNTIME.spawn(subscriptions::indexer::Service::new(Arc::clone(
&indexer_manager,
)));

SUBSCRIPTION_RUNTIME.spawn(subscriptions::token_balance::Service::new(Arc::clone(
&token_balance_manager,
)));

SUBSCRIPTION_RUNTIME.spawn(subscriptions::token::Service::new(Arc::clone(
&token_manager,
)));

SUBSCRIPTION_RUNTIME.spawn(subscriptions::transaction::Service::new(Arc::clone(
&transaction_manager,
)));

Self {
storage,
provider,
world_address,
cross_messaging_tx,
entity_manager,
event_message_manager,
event_manager,
indexer_manager,
token_balance_manager,
token_manager,
transaction_manager,
_config: config,
config,
}
}
}
Expand Down Expand Up @@ -280,10 +208,11 @@ impl<P: Provider + Sync + Send + 'static> proto::world::world_server::World for
.transpose()
.map_err(|e: ProtoError| Status::internal(e.to_string()))?;

let rx = self.transaction_manager.add_subscriber(filter).await;
Ok(Response::new(
Box::pin(ReceiverStream::new(rx)) as Self::SubscribeTransactionsStream
))
let stream = crate::subscriptions::transaction::subscribe_transactions_stream(
filter,
self.config.clone(),
);
Ok(Response::new(stream as Self::SubscribeTransactionsStream))
}

async fn retrieve_entities(
Expand Down Expand Up @@ -438,36 +367,20 @@ impl<P: Provider + Sync + Send + 'static> proto::world::world_server::World for
.map(|id| U256::from_be_slice(id))
.collect::<Vec<_>>();

let rx = self
.token_manager
.add_subscriber(contract_addresses, token_ids)
.await;
Ok(Response::new(
Box::pin(ReceiverStream::new(rx)) as Self::SubscribeTokensStream
))
let stream = crate::subscriptions::token::subscribe_tokens_stream(
contract_addresses,
token_ids,
self.config.clone(),
);
Ok(Response::new(stream as Self::SubscribeTokensStream))
}

async fn update_tokens_subscription(
&self,
request: Request<UpdateTokenSubscriptionRequest>,
_request: Request<UpdateTokenSubscriptionRequest>,
) -> ServiceResult<()> {
let UpdateTokenSubscriptionRequest {
subscription_id,
contract_addresses,
token_ids,
} = request.into_inner();
let contract_addresses = contract_addresses
.iter()
.map(|address| Felt::from_bytes_be_slice(address))
.collect::<Vec<_>>();
let token_ids = token_ids
.iter()
.map(|id| U256::from_be_slice(id))
.collect::<Vec<_>>();

self.token_manager
.update_subscriber(subscription_id, contract_addresses, token_ids)
.await;
// Update subscription is not needed with direct broker subscriptions
// Each subscription stream filters its own data
Ok(Response::new(()))
}

Expand Down Expand Up @@ -497,18 +410,15 @@ impl<P: Provider + Sync + Send + 'static> proto::world::world_server::World for
request: Request<SubscribeIndexerRequest>,
) -> ServiceResult<Self::SubscribeIndexerStream> {
let SubscribeIndexerRequest { contract_address } = request.into_inner();
let rx = self
.indexer_manager
.add_subscriber(
self.storage.clone(),
Felt::from_bytes_be_slice(&contract_address),
)
.await
.map_err(|e| Status::internal(e.to_string()))?;
let stream = crate::subscriptions::indexer::subscribe_indexer_stream(
self.storage.clone(),
Felt::from_bytes_be_slice(&contract_address),
self.config.clone(),
)
.await
.map_err(|e| Status::internal(e.to_string()))?;

Ok(Response::new(
Box::pin(ReceiverStream::new(rx)) as Self::SubscribeIndexerStream
))
Ok(Response::new(stream as Self::SubscribeIndexerStream))
}

async fn subscribe_entities(
Expand All @@ -521,29 +431,20 @@ impl<P: Provider + Sync + Send + 'static> proto::world::world_server::World for
.transpose()
.map_err(|e: ProtoError| Status::internal(e.to_string()))?;

let rx = self.entity_manager.add_subscriber(clause).await;
let stream = crate::subscriptions::entity::subscribe_entities_stream(
clause,
self.config.clone(),
);

Ok(Response::new(
Box::pin(ReceiverStream::new(rx)) as Self::SubscribeEntitiesStream
))
Ok(Response::new(stream as Self::SubscribeEntitiesStream))
}

async fn update_entities_subscription(
&self,
request: Request<UpdateEntitiesSubscriptionRequest>,
_request: Request<UpdateEntitiesSubscriptionRequest>,
) -> ServiceResult<()> {
let UpdateEntitiesSubscriptionRequest {
subscription_id,
clause,
} = request.into_inner();
let clause = clause
.map(|c| c.try_into())
.transpose()
.map_err(|e: ProtoError| Status::internal(e.to_string()))?;
self.entity_manager
.update_subscriber(subscription_id, clause)
.await;

// Update subscription is not needed with direct broker subscriptions
// Each subscription stream filters its own data
Ok(Response::new(()))
}

Expand All @@ -569,47 +470,22 @@ impl<P: Provider + Sync + Send + 'static> proto::world::world_server::World for
.map(|id| U256::from_be_slice(id))
.collect::<Vec<_>>();

let rx = self
.token_balance_manager
.add_subscriber(contract_addresses, account_addresses, token_ids)
.await;
let stream = crate::subscriptions::token_balance::subscribe_token_balances_stream(
contract_addresses,
account_addresses,
token_ids,
self.config.clone(),
);

Ok(Response::new(
Box::pin(ReceiverStream::new(rx)) as Self::SubscribeTokenBalancesStream
))
Ok(Response::new(stream as Self::SubscribeTokenBalancesStream))
}

async fn update_token_balances_subscription(
&self,
request: Request<UpdateTokenBalancesSubscriptionRequest>,
_request: Request<UpdateTokenBalancesSubscriptionRequest>,
) -> ServiceResult<()> {
let UpdateTokenBalancesSubscriptionRequest {
subscription_id,
contract_addresses,
account_addresses,
token_ids,
} = request.into_inner();
let contract_addresses = contract_addresses
.iter()
.map(|address| Felt::from_bytes_be_slice(address))
.collect::<Vec<_>>();
let account_addresses = account_addresses
.iter()
.map(|address| Felt::from_bytes_be_slice(address))
.collect::<Vec<_>>();
let token_ids = token_ids
.iter()
.map(|id| U256::from_be_slice(id))
.collect::<Vec<_>>();

self.token_balance_manager
.update_subscriber(
subscription_id,
contract_addresses,
account_addresses,
token_ids,
)
.await;
// Update subscription is not needed with direct broker subscriptions
// Each subscription stream filters its own data
Ok(Response::new(()))
}

Expand All @@ -622,29 +498,20 @@ impl<P: Provider + Sync + Send + 'static> proto::world::world_server::World for
.map(|c| c.try_into())
.transpose()
.map_err(|e: ProtoError| Status::internal(e.to_string()))?;
let rx = self.event_message_manager.add_subscriber(clause).await;
let stream = crate::subscriptions::event_message::subscribe_event_messages_stream(
clause,
self.config.clone(),
);

Ok(Response::new(
Box::pin(ReceiverStream::new(rx)) as Self::SubscribeEntitiesStream
))
Ok(Response::new(stream as Self::SubscribeEntitiesStream))
}

async fn update_event_messages_subscription(
&self,
request: Request<UpdateEventMessagesSubscriptionRequest>,
_request: Request<UpdateEventMessagesSubscriptionRequest>,
) -> ServiceResult<()> {
let UpdateEventMessagesSubscriptionRequest {
subscription_id,
clause,
} = request.into_inner();
let clause = clause
.map(|c| c.try_into())
.transpose()
.map_err(|e: ProtoError| Status::internal(e.to_string()))?;
self.event_message_manager
.update_subscriber(subscription_id, clause)
.await;

// Update subscription is not needed with direct broker subscriptions
// Each subscription stream filters its own data
Ok(Response::new(()))
}

Expand All @@ -653,14 +520,12 @@ impl<P: Provider + Sync + Send + 'static> proto::world::world_server::World for
request: Request<proto::world::SubscribeEventsRequest>,
) -> ServiceResult<Self::SubscribeEventsStream> {
let keys = request.into_inner().keys;
let rx = self
.event_manager
.add_subscriber(keys.into_iter().map(|keys| keys.into()).collect())
.await;

Ok(Response::new(
Box::pin(ReceiverStream::new(rx)) as Self::SubscribeEventsStream
))
let stream = crate::subscriptions::event::subscribe_events_stream(
keys.into_iter().map(|keys| keys.into()).collect(),
self.config.clone(),
);

Ok(Response::new(stream as Self::SubscribeEventsStream))
}

async fn publish_message(
Expand Down
Loading
Loading