diff --git a/.github/workflows/service-replay.yml b/.github/workflows/service-replay.yml index 6bc4cfa4..e3025c28 100644 --- a/.github/workflows/service-replay.yml +++ b/.github/workflows/service-replay.yml @@ -24,6 +24,7 @@ jobs: contents: read packages: write id-token: write + deployments: write strategy: matrix: include: @@ -69,8 +70,8 @@ jobs: tags: "${{matrix.tag}}:${{ env.GITHUB_REF_SLUG }}" load: true context: . - cache-from: type=gha - cache-to: type=gha,mode=max + # cache-from: type=gha + # cache-to: type=gha,mode=max # Login against a Docker registry except on PR # https://github.com/docker/login-action @@ -100,8 +101,8 @@ jobs: tags: "${{matrix.tag}}:${{ env.TAG }}" push: true context: . - cache-from: type=gha - cache-to: type=gha,mode=max + # cache-from: type=gha + # cache-to: type=gha,mode=max - name: Configure Docker rolling release image tag ${{matrix.tag}}:${{ env.GITHUB_REF_SLUG }} if: github.ref == 'refs/heads/main' || github.ref == 'refs/tags/*' @@ -121,5 +122,5 @@ jobs: tags: "${{matrix.tag}}:${{ env.SAVANT_RS_VERSION }}-rolling" push: true context: . - cache-from: type=gha - cache-to: type=gha,mode=max + # cache-from: type=gha + # cache-to: type=gha,mode=max diff --git a/Cargo.toml b/Cargo.toml index cbf67fe9..c4a8e524 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,7 +21,7 @@ default-members = [ ] [workspace.package] -version = "1.0.5" +version = "1.1.0" edition = "2021" authors = ["Ivan Kudriavtsev "] description = "Savant Rust core functions library" diff --git a/docs/source/services/replay/2_installation.rst b/docs/source/services/replay/2_installation.rst index a3280aa4..50a7e83c 100644 --- a/docs/source/services/replay/2_installation.rst +++ b/docs/source/services/replay/2_installation.rst @@ -70,84 +70,8 @@ Configuration File The configuration file is a JSON file that contains the following parameters: -.. code-block:: json - - { - "common": { - "pass_metadata_only": false, - "management_port": 8080, - "stats_period": { - "secs": 60, - "nanos": 0 - }, - "job_writer_cache_max_capacity": 1000, - "job_writer_cache_ttl": { - "secs": 60, - "nanos": 0 - }, - "job_eviction_ttl": { - "secs": 60, - "nanos": 0 - }, - "default_job_sink_options": { - "send_timeout": { - "secs": 1, - "nanos": 0 - }, - "send_retries": 3, - "receive_timeout": { - "secs": 1, - "nanos": 0 - }, - "receive_retries": 3, - "send_hwm": 1000, - "receive_hwm": 100, - "inflight_ops": 100 - } - }, - "in_stream": { - "url": "router+bind:tcp://0.0.0.0:5555", - "options": { - "receive_timeout": { - "secs": 1, - "nanos": 0 - }, - "receive_hwm": 1000, - "topic_prefix_spec": { - "none": null - }, - "source_cache_size": 1000, - "inflight_ops": 100 - } - }, - "out_stream": { - "url": "pub+bind:tcp://0.0.0.0:5556", - "options": { - "send_timeout": { - "secs": 1, - "nanos": 0 - }, - "send_retries": 3, - "receive_timeout": { - "secs": 1, - "nanos": 0 - }, - "receive_retries": 3, - "send_hwm": 1000, - "receive_hwm": 100, - "inflight_ops": 100 - } - }, - "storage": { - "rocksdb": { - "path": "${DB_PATH:-/tmp/rocksdb}", - "data_expiration_ttl": { - "secs": 60, - "nanos": 0 - } - } - } - } +.. literalinclude:: ../../../../services/replay/replay/assets/test.json + :language: json The above-mentioned configuration file is used by default, when you launch Replay without specifying the configuration file. You can override the default configuration by providing your own configuration file and specifying it in the launch command: @@ -197,6 +121,18 @@ Configuration Parameters - Default sink options to be applied to jobs if they don't specify their own options. If not set, jobs must provide their own sink options. - ``null`` - See ``out_stream.options`` format. + * - ``common.telemetry_config_file`` + - The path to a file containing telemetry configuration. When set, the service loads telemetry settings from this file. + - ``null`` + - ``"/opt/telemetry_config.json"`` + * - ``common.stats_frame_period`` + - Defines how frequently the service should report statistics based on the number of frames processed. When set, statistics are logged after processing the specified number of frames. + - ``null`` + - ``1000`` + * - ``common.stats_timestamp_period`` + - Defines how frequently the service should report statistics based on elapsed time. Controls the time interval between statistics reports. + - ``null`` + - ``{"secs": 10, "nanos": 0}`` * - ``in_stream.url`` - The URL for the data ingress in Savant ZMQ format. - ``router+bind:tcp://0.0.0.0:5555`` diff --git a/savant_core/src/lib.rs b/savant_core/src/lib.rs index 9dddc116..6149d639 100644 --- a/savant_core/src/lib.rs +++ b/savant_core/src/lib.rs @@ -59,7 +59,7 @@ pub fn fast_hash(bytes: &[u8]) -> u32 { #[inline] pub fn get_tracer() -> BoxedTracer { - global::tracer("video_pipeline") + global::tracer("savant-tracer") } pub mod rust { diff --git a/savant_core/src/metrics/pipeline_metric_builder.rs b/savant_core/src/metrics/pipeline_metric_builder.rs index a72d9177..8c430fd7 100644 --- a/savant_core/src/metrics/pipeline_metric_builder.rs +++ b/savant_core/src/metrics/pipeline_metric_builder.rs @@ -1,6 +1,6 @@ use crate::metrics::{get_or_create_counter_family, get_or_create_gauge_family}; +use crate::pipeline::get_registered_pipelines; use crate::rust::FrameProcessingStatRecordType; -use crate::webserver::get_registered_pipelines; use log::debug; #[derive(Debug)] @@ -29,7 +29,7 @@ impl PipelineMetricBuilder { let stage_latency_label_names = ["record_type", "destination_stage_name", "source_stage_name"].as_slice(); - let registered_pipelines = get_registered_pipelines().await; + let registered_pipelines = get_registered_pipelines(); debug!( "Found {} registered pipeline(s)", registered_pipelines.len() diff --git a/savant_core/src/pipeline.rs b/savant_core/src/pipeline.rs index 0b2f3d65..8a616903 100644 --- a/savant_core/src/pipeline.rs +++ b/savant_core/src/pipeline.rs @@ -3,7 +3,11 @@ use std::time::SystemTime; use anyhow::Result; use hashbrown::HashMap; +use log::debug; +use log::error; +use log::info; use opentelemetry::Context; +use parking_lot::Mutex; pub use implementation::PipelineConfiguration; pub use implementation::PipelineConfigurationBuilder; @@ -15,10 +19,57 @@ use crate::primitives::frame::VideoFrameProxy; use crate::primitives::frame_batch::VideoFrameBatch; use crate::primitives::frame_update::VideoFrameUpdate; use crate::primitives::object::BorrowedVideoObject; -use crate::webserver::{register_pipeline, unregister_pipeline}; +use lazy_static::lazy_static; const MAX_TRACKED_STREAMS: usize = 8192; // defines how many streams are tracked for the frame ordering +// + +// pipelines: Arc>>>, + +lazy_static! { + static ref PIPELINES: Arc>>> = + Arc::new(Mutex::new(HashMap::new())); +} + +pub(crate) fn register_pipeline(pipeline: Arc) { + let pipelines = PIPELINES.clone(); + let mut bind = pipelines.lock(); + let name = pipeline.get_name(); + let entry = bind.get(&name); + if entry.is_some() { + let message = format!("Pipeline with name {} already exists in registry.", &name); + error!("{}", message); + panic!("{}", message); + } + bind.insert(name.clone(), pipeline.clone()); + info!("Pipeline {} registered.", name); +} + +pub(crate) fn unregister_pipeline(pipeline: Arc) { + let stats = PIPELINES.clone(); + let pipeline_name = pipeline.get_name(); + let mut bind = stats.lock(); + let prev_len = bind.len(); + debug!("Removing pipeline {} from stats.", &pipeline_name); + bind.remove(&pipeline_name); + if bind.len() == prev_len { + let message = format!("Failed to remove pipeline {} from stats.", &pipeline_name); + error!("{}", message); + panic!("{}", message); + } +} + +pub(crate) fn get_registered_pipelines() -> HashMap> { + let s = PIPELINES.lock(); + s.clone() +} + +pub fn get_pipeline(name: &str) -> Option> { + let s = PIPELINES.lock(); + s.get(name).cloned() +} + pub mod stage; pub mod stage_function_loader; pub mod stage_plugin_sample; @@ -293,6 +344,7 @@ pub(super) mod implementation { id_counter: AtomicI64, frame_counter: AtomicI64, root_spans: SavantRwLock>, + outer_spans: SavantRwLock>, stages: Vec, frame_locations: SavantRwLock>, frame_ordering: SavantRwLock>, @@ -311,6 +363,7 @@ pub(super) mod implementation { id_counter: AtomicI64::new(0), frame_counter: AtomicI64::new(0), root_spans: SavantRwLock::new(HashMap::new()), + outer_spans: SavantRwLock::new(HashMap::new()), stages: Vec::new(), frame_locations: SavantRwLock::new(HashMap::new()), frame_ordering: SavantRwLock::new(LruCache::new( @@ -588,6 +641,9 @@ pub(super) mod implementation { .write() .insert(id_counter, Context::current_with_span(span)); } + + self.outer_spans.write().insert(id_counter, parent_ctx); + let source_id_compatibility_hash = frame.stream_compatibility_hash(); let mut ordering = self.frame_ordering.write(); let prev_ordering_seq = ordering.get(&source_id); @@ -676,17 +732,18 @@ pub(super) mod implementation { bail!("Object {} is not found in the stage {}", id, stage.name) } - let mut bind = self.root_spans.write(); + // let mut bind = self.root_spans.write(); match removed.unwrap() { PipelinePayload::Frame(frame, _, ctx, _, _) => { self.stats.register_frame(frame.get_object_count()); self.add_frame_json(&frame, &ctx); ctx.span().end(); - let root_ctx = bind.remove(&id).unwrap(); - Ok(HashMap::from([(id, root_ctx)])) + self.root_spans.write().remove(&id).unwrap(); + let outer_ctx = self.outer_spans.write().remove(&id).unwrap(); + Ok(HashMap::from([(id, outer_ctx)])) } PipelinePayload::Batch(batch, _, contexts, _, _) => Ok({ - let mut bind = self.root_spans.write(); + //let mut bind = self.root_spans.write(); contexts .into_iter() .map(|(frame_id, ctx)| { @@ -703,8 +760,9 @@ pub(super) mod implementation { ) } ctx.span().end(); - let root_ctx = bind.remove(&id).unwrap(); - Ok((id, root_ctx)) + self.root_spans.write().remove(&id).unwrap(); + let outer_ctx = self.outer_spans.write().remove(&id).unwrap(); + Ok((id, outer_ctx)) }) .collect::, _>>()? }), diff --git a/savant_core/src/telemetry.rs b/savant_core/src/telemetry.rs index 447127d0..e2cc9899 100644 --- a/savant_core/src/telemetry.rs +++ b/savant_core/src/telemetry.rs @@ -1,4 +1,3 @@ -use crate::get_or_init_async_runtime; use log::error; use opentelemetry::global; use opentelemetry_jaeger_propagator::Propagator; @@ -14,6 +13,8 @@ use std::fs; use std::time::Duration; use twelf::{config, Layer}; +use crate::get_or_init_async_runtime; + #[derive(Debug, Serialize, Deserialize, Clone)] pub enum ContextPropagationFormat { #[serde(rename = "jaeger")] @@ -251,8 +252,12 @@ pub fn init(config: &TelemetryConfiguration) { match configurator.get() { Some(_) => panic!("Open Telemetry has been configured"), None => { - let runtime = get_or_init_async_runtime(); - let c = runtime.block_on(async { Configurator::new("savant", config) }); + let c = if tokio::runtime::Handle::try_current().is_ok() { + Configurator::new("savant", config) + } else { + let rt = get_or_init_async_runtime(); + rt.block_on(async { Configurator::new("savant", config) }) + }; let result = configurator.set(c); if result.is_err() { // should not happen diff --git a/savant_core/src/webserver.rs b/savant_core/src/webserver.rs index c574f54d..e2447ac0 100644 --- a/savant_core/src/webserver.rs +++ b/savant_core/src/webserver.rs @@ -10,7 +10,6 @@ use tokio::sync::Mutex; use crate::get_or_init_async_runtime; use crate::metrics::metric_collector::SystemMetricCollector; use crate::metrics::pipeline_metric_builder::PipelineMetricBuilder; -use crate::pipeline::implementation; use crate::primitives::rust::AttributeSet; use crate::primitives::Attribute; use crate::protobuf::ToProtobuf; @@ -25,7 +24,7 @@ use anyhow::bail; use futures_util::StreamExt; use hashbrown::HashMap; use lazy_static::lazy_static; -use log::{debug, error, info, warn}; +use log::{error, info, warn}; use moka::future::Cache; use moka::Expiry; use prometheus_client::encoding::text::encode; @@ -78,7 +77,6 @@ pub struct KvsOperation { #[allow(clippy::type_complexity)] struct WsData { - pipelines: Arc>>>, status: Arc>, shutdown_token: Arc>, shutdown_status: Arc>, @@ -131,7 +129,6 @@ impl WsData { } }); WsData { - pipelines: Arc::new(Mutex::new(HashMap::new())), status: Arc::new(Mutex::new(PipelineStatus::Stopped)), shutdown_token: Arc::new(OnceLock::new()), shutdown_status: Arc::new(OnceLock::new()), @@ -228,52 +225,6 @@ lazy_static! { static ref PID: Mutex = Mutex::new(0); } -pub(crate) fn register_pipeline(pipeline: Arc) { - let runtime = get_or_init_async_runtime(); - let pipelines = WS_DATA.pipelines.clone(); - runtime.block_on(async move { - let mut bind = pipelines.lock().await; - let name = pipeline.get_name(); - let entry = bind.get(&name); - if entry.is_some() { - let message = format!("Pipeline with name {} already exists in registry.", &name); - error!("{}", message); - panic!("{}", message); - } - bind.insert(name.clone(), pipeline.clone()); - info!("Pipeline {} registered.", name); - }); -} - -pub(crate) fn unregister_pipeline(pipeline: Arc) { - let runtime = get_or_init_async_runtime(); - let stats = WS_DATA.pipelines.clone(); - let pipeline_name = pipeline.get_name(); - runtime.block_on(async move { - let mut bind = stats.lock().await; - let prev_len = bind.len(); - debug!("Removing pipeline {} from stats.", &pipeline_name); - bind.remove(&pipeline_name); - if bind.len() == prev_len { - error!("Failed to remove pipeline from stats."); - } - }); -} - -pub(crate) async fn get_registered_pipelines() -> HashMap> { - let s = WS_DATA.pipelines.lock().await; - s.clone() -} - -pub fn get_pipeline(name: &str) -> Option> { - let runtime = get_or_init_async_runtime(); - let pipelines = WS_DATA.pipelines.clone(); - runtime.block_on(async { - let bind = pipelines.lock().await; - bind.get(name).cloned() - }) -} - pub fn set_status(s: PipelineStatus) -> anyhow::Result<()> { WS_DATA.set_status(s) } @@ -552,10 +503,10 @@ mod tests { set_extra_labels, }; use crate::pipeline::implementation::create_test_pipeline; + use crate::pipeline::register_pipeline; use crate::test::gen_frame; use crate::webserver::{ - init_webserver, register_pipeline, set_shutdown_token, set_status, stop_webserver, - PipelineStatus, + init_webserver, set_shutdown_token, set_status, stop_webserver, PipelineStatus, }; use hashbrown::HashMap; use prometheus_client::registry::Unit; diff --git a/savant_gstreamer_elements/src/zeromq_src/message_handlers.rs b/savant_gstreamer_elements/src/zeromq_src/message_handlers.rs index 396d2ba6..3712e794 100644 --- a/savant_gstreamer_elements/src/zeromq_src/message_handlers.rs +++ b/savant_gstreamer_elements/src/zeromq_src/message_handlers.rs @@ -3,13 +3,13 @@ use gstreamer::subclass::prelude::*; use gstreamer::{prelude::*, FlowError}; use gstreamer_base::subclass::base_src::CreateSuccess; use savant_core::message::{save_message, Message}; +use savant_core::pipeline::get_pipeline; use savant_core::primitives::eos::EndOfStream; use savant_core::primitives::rust::{VideoFrameContent, VideoFrameProxy}; use savant_core::primitives::shutdown::Shutdown; use savant_core::rust::{Pipeline, PropagatedContext}; use savant_core::transport::zeromq::ReaderResult; use savant_core::utils::bytes_to_hex_string; -use savant_core::webserver::get_pipeline; use savant_gstreamer::id_meta::SavantIdMetaKind; use crate::utils::convert_ts; diff --git a/services/replay/replay/assets/test.json b/services/replay/replay/assets/test.json index e6fbd9e6..3b5abf5d 100644 --- a/services/replay/replay/assets/test.json +++ b/services/replay/replay/assets/test.json @@ -29,7 +29,13 @@ "send_hwm": 1000, "receive_hwm": 100, "inflight_ops": 100 - } + }, + "stats_frame_period": null, + "stats_timestamp_period": { + "secs": 10, + "nanos": 0 + }, + "telemetry_config_file": null }, "in_stream": { "url": "router+bind:ipc:///tmp/${SOCKET_PATH_IN:-undefined}", diff --git a/services/replay/replaydb/Cargo.toml b/services/replay/replaydb/Cargo.toml index cf2226c8..d2a8b4eb 100644 --- a/services/replay/replaydb/Cargo.toml +++ b/services/replay/replaydb/Cargo.toml @@ -20,6 +20,7 @@ bincode = { workspace = true } derive_builder = { workspace = true } env_logger = { workspace = true } hashbrown = { workspace = true } +lazy_static = { workspace = true } log = { workspace = true } md-5 = { workspace = true } mini-moka = { workspace = true } diff --git a/services/replay/replaydb/assets/rocksdb.json b/services/replay/replaydb/assets/rocksdb.json index 4890c6da..0139800d 100644 --- a/services/replay/replaydb/assets/rocksdb.json +++ b/services/replay/replaydb/assets/rocksdb.json @@ -14,7 +14,13 @@ "job_eviction_ttl": { "secs": 60, "nanos": 0 - } + }, + "stats_frame_period": null, + "stats_timestamp_period": { + "secs": 10, + "nanos": 0 + }, + "telemetry_config_file": null }, "in_stream": { "url": "router+bind:ipc:///tmp/${SOCKET_PATH_IN:-undefined}", diff --git a/services/replay/replaydb/assets/rocksdb_opt_out.json b/services/replay/replaydb/assets/rocksdb_opt_out.json index f675987b..e2d644b3 100644 --- a/services/replay/replaydb/assets/rocksdb_opt_out.json +++ b/services/replay/replaydb/assets/rocksdb_opt_out.json @@ -14,7 +14,13 @@ "job_eviction_ttl": { "secs": 60, "nanos": 0 - } + }, + "stats_frame_period": null, + "stats_timestamp_period": { + "secs": 10, + "nanos": 0 + }, + "telemetry_config_file": null }, "in_stream": { "url": "router+bind:ipc:///tmp/${SOCKET_PATH_IN:-undefined}", diff --git a/services/replay/replaydb/src/service/configuration.rs b/services/replay/replaydb/src/service/configuration.rs index 6e4fb527..aa652bd8 100644 --- a/services/replay/replaydb/src/service/configuration.rs +++ b/services/replay/replaydb/src/service/configuration.rs @@ -1,6 +1,9 @@ use crate::job_writer::{SinkConfiguration, SinkOptions}; use anyhow::{bail, Result}; -use savant_core::transport::zeromq::{NonBlockingReader, ReaderConfigBuilder}; +use savant_core::{ + telemetry::init_from_file, + transport::zeromq::{NonBlockingReader, ReaderConfigBuilder}, +}; use serde::{Deserialize, Serialize}; use std::result; use std::time::Duration; @@ -50,6 +53,9 @@ pub struct CommonConfiguration { pub job_writer_cache_ttl: Duration, pub job_eviction_ttl: Duration, pub default_job_sink_options: Option, + pub telemetry_config_file: Option, + pub stats_frame_period: Option, + pub stats_timestamp_period: Option, } #[config] @@ -66,6 +72,9 @@ impl ServiceConfiguration { if self.common.management_port <= 1024 { bail!("Management port must be set to a value greater than 1024!"); } + if let Some(telemetry_config_file) = &self.common.telemetry_config_file { + init_from_file(telemetry_config_file.as_str()); + } Ok(()) } diff --git a/services/replay/replaydb/src/service/rocksdb_service.rs b/services/replay/replaydb/src/service/rocksdb_service.rs index 0d887a8b..d892485d 100644 --- a/services/replay/replaydb/src/service/rocksdb_service.rs +++ b/services/replay/replaydb/src/service/rocksdb_service.rs @@ -72,6 +72,11 @@ impl TryFrom<&ServiceConfiguration> for RocksDbStreamProcessor { output, configuration.common.stats_period, configuration.common.pass_metadata_only, + configuration.common.stats_frame_period, + configuration + .common + .stats_timestamp_period + .map(|d| d.as_millis() as i64), )) } } diff --git a/services/replay/replaydb/src/stream_processor.rs b/services/replay/replaydb/src/stream_processor.rs index d9fe4150..2f48aa84 100644 --- a/services/replay/replaydb/src/stream_processor.rs +++ b/services/replay/replaydb/src/stream_processor.rs @@ -3,6 +3,7 @@ use std::time::{Duration, Instant}; use anyhow::{bail, Result}; use savant_core::message::Message; +use savant_core::pipeline::{Pipeline, PipelineConfigurationBuilder, PipelineStagePayloadType}; use savant_core::transport::zeromq::{ NonBlockingReader, NonBlockingWriter, ReaderResult, WriterResult, }; @@ -27,6 +28,7 @@ struct StreamProcessor { stats_period: Duration, send_metadata_only: bool, stop_flag: bool, + pipeline: Pipeline, } impl StreamProcessor @@ -39,8 +41,45 @@ where output: Option, stats_period: Duration, send_metadata_only: bool, - ) -> Self { - Self { + frame_period: Option, + timestamp_period: Option, + ) -> Result { + let conf = PipelineConfigurationBuilder::default() + .frame_period(frame_period) + .timestamp_period(timestamp_period) + .build()?; + + let mut stages = vec![ + ( + "store".to_string(), + PipelineStagePayloadType::Frame, + None, + None, + ), + ( + "cleanup".to_string(), + PipelineStagePayloadType::Frame, + None, + None, + ), + ]; + + if output.is_some() { + stages.insert( + 1, + ( + "egress".to_string(), + PipelineStagePayloadType::Frame, + None, + None, + ), + ); + } + + let pipeline = Pipeline::new("replay", stages, conf)?; + pipeline.set_root_span_name("replay")?; + + Ok(Self { db, input, output, @@ -52,7 +91,8 @@ where last_stats: Instant::now(), send_metadata_only, stop_flag: false, - } + pipeline, + }) } async fn receive_message(&mut self) -> Result { @@ -147,6 +187,16 @@ where routing_id, data, } => { + let frame_id = if message.is_video_frame() { + let context = message.get_span_context().extract(); + Some(self.pipeline.add_frame_with_telemetry( + "store", + message.as_video_frame().unwrap(), + context, + )?) + } else { + None + }; self.stats.packet_counter += 1; self.stats.byte_counter += data.iter().map(|v| v.len() as u64).sum::(); log::debug!( @@ -169,6 +219,14 @@ where .await .add_message(&message, &topic, &data) .await?; + if let Some(frame_id) = frame_id { + let stage_name = if self.output.is_some() { + "egress" + } else { + "cleanup" + }; + self.pipeline.move_as_is(stage_name, vec![frame_id])?; + } } let data_slice = if self.send_metadata_only { vec![] @@ -178,6 +236,13 @@ where self.send_message(std::str::from_utf8(&topic)?, &message, &data_slice) .await?; + + if let Some(frame_id) = frame_id { + if self.output.is_some() { + self.pipeline.move_as_is("cleanup", vec![frame_id])?; + } + self.pipeline.delete(frame_id)?; + } } ReaderResult::Timeout => { log::debug!( @@ -251,14 +316,21 @@ impl RocksDbStreamProcessor { output: Option, stats_period: Duration, send_metadata_only: bool, + stats_frame_period: Option, + stats_timestamp_period: Option, ) -> Self { - Self(StreamProcessor::new( - db, - input, - output, - stats_period, - send_metadata_only, - )) + Self( + StreamProcessor::new( + db, + input, + output, + stats_period, + send_metadata_only, + stats_frame_period, + stats_timestamp_period, + ) + .unwrap(), + ) } pub async fn run_once(&mut self) -> Result<()> { @@ -360,7 +432,10 @@ mod tests { Some(out_writer), Duration::from_secs(30), false, - ); + None, + None, + ) + .unwrap(); let f = gen_properly_filled_frame(true); let uuid = f.get_uuid_u128(); @@ -396,6 +471,12 @@ mod tests { ReaderResult::TooShort(_) => { panic!("Too short"); } + ReaderResult::MessageVersionMismatch { + topic, + routing_id, + sender_version, + expected_version, + } => panic!("Message version mismatch: topic: {:?}, routing_id: {:?}, sender_version: {:?}, expected_version: {:?}", topic, routing_id, sender_version, expected_version), } Ok(()) } diff --git a/services/replay/samples/file_restreaming/README.md b/services/replay/samples/file_restreaming/README.md index 61f4feff..5ac22410 100644 --- a/services/replay/samples/file_restreaming/README.md +++ b/services/replay/samples/file_restreaming/README.md @@ -58,7 +58,7 @@ We are going to use the file source adapter to ingest the video file to Replay. docker run --rm -it --name source-video-files-test \ --network host \ -e FILE_TYPE=video \ - -e SYNC_OUTPUT=False \ + -e SYNC_OUTPUT=True \ -e ZMQ_ENDPOINT=dealer+connect:tcp://127.0.0.1:5555 \ -e SOURCE_ID=in-video \ -e LOCATION=/data/shuffle_dance.mp4 \ diff --git a/services/replay/samples/file_restreaming/replay_config.json b/services/replay/samples/file_restreaming/replay_config.json index d67b17a6..d82ac5bd 100644 --- a/services/replay/samples/file_restreaming/replay_config.json +++ b/services/replay/samples/file_restreaming/replay_config.json @@ -29,7 +29,12 @@ "send_hwm": 1000, "receive_hwm": 100, "inflight_ops": 100 - } + }, + "stats_timestamp_period": { + "secs": 10, + "nanos": 0 + }, + "telemetry_config_file": null }, "in_stream": { "url": "router+bind:tcp://127.0.0.1:5555",