@@ -184,13 +184,30 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {
184184 let logger = logger_factory. component_logger ( "SubgraphInstanceManager" , None ) ;
185185 let logger_factory = logger_factory. with_parent ( logger. clone ( ) ) ;
186186
187- // Configure sharded processor
187+ // Configure trigger processor with backward compatibility
188+ // Only enable sharding if explicitly configured with more than 1 shard
189+ let enable_sharding = env_vars. subgraph_runtime_processing_shards > 1 ;
190+
188191 let processor_config = super :: trigger_processor:: TriggerProcessorConfig {
192+ enable_sharding,
189193 num_shards : env_vars. subgraph_runtime_processing_shards ,
190194 workers_per_shard : env_vars. subgraph_runtime_workers_per_shard ,
191195 max_queue_per_subgraph : env_vars. subgraph_max_queue_per_subgraph ,
192196 fairness_window_ms : 100 , // 100ms fairness window
193197 } ;
198+
199+ if enable_sharding {
200+ info ! ( & logger, "Sharded trigger processing enabled" ;
201+ "num_shards" => processor_config. num_shards,
202+ "workers_per_shard" => processor_config. workers_per_shard,
203+ "total_workers" => processor_config. num_shards * processor_config. workers_per_shard
204+ ) ;
205+ } else {
206+ info ! ( & logger, "Using legacy single-semaphore trigger processing" ;
207+ "workers" => processor_config. workers_per_shard
208+ ) ;
209+ }
210+
194211 let trigger_processor = Arc :: new ( super :: trigger_processor:: SubgraphTriggerProcessor :: new (
195212 processor_config,
196213 ) ) ;
0 commit comments