From 15738da58978fb492342b2690898bdb657a97fd5 Mon Sep 17 00:00:00 2001 From: Kev Date: Thu, 30 Jan 2025 00:05:22 -0500 Subject: [PATCH 1/5] feat(ourlogs): Preliminary breadcrumb to log conversion This converts breadcrumbs in a fairly simplistic way into our log format, behind a sample-rate to mitigate load when rolling this out (and also the existing enable flag). --- CHANGELOG.md | 1 + Cargo.lock | 1 + relay-config/src/config.rs | 11 + relay-dynamic-config/src/global.rs | 12 + relay-ourlogs/Cargo.toml | 1 + relay-ourlogs/src/lib.rs | 1 + relay-ourlogs/src/ourlog.rs | 296 ++++++++++++++++++- relay-server/src/envelope.rs | 22 +- relay-server/src/services/processor.rs | 12 + relay-server/src/services/processor/event.rs | 35 ++- tests/integration/test_ourlogs.py | 44 +++ 11 files changed, 432 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f75ddd9f03..c56b6f2a73 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ ## Unreleased +- Preliminary breadcrumb to log conversion. ([#4479](https://github.com/getsentry/relay/pull/4479)) - Allow log ingestion behind a flag, only for internal use currently. ([#4471](https://github.com/getsentry/relay/pull/4471)) **Features**: diff --git a/Cargo.lock b/Cargo.lock index a46174d682..b92d4e295d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3768,6 +3768,7 @@ dependencies = [ "once_cell", "opentelemetry-proto", "relay-event-schema", + "relay-log", "relay-protocol", "serde_json", ] diff --git a/relay-config/src/config.rs b/relay-config/src/config.rs index fa41ef8b1d..7ede36779c 100644 --- a/relay-config/src/config.rs +++ b/relay-config/src/config.rs @@ -667,6 +667,11 @@ pub struct Limits { /// /// Defaults to `1024`, a value [google has been using for a long time](https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/commit/?id=19f92a030ca6d772ab44b22ee6a01378a8cb32d4). pub tcp_listen_backlog: u32, + /// The maximum number of breadcrumbs to convert to OurLogs. + /// + /// When converting breadcrumbs to OurLogs, only up to this many breadcrumbs will be converted. + /// Defaults to 100. + pub max_breadcrumbs_converted: usize, } impl Default for Limits { @@ -699,6 +704,7 @@ impl Default for Limits { idle_timeout: None, max_connections: None, tcp_listen_backlog: 1024, + max_breadcrumbs_converted: 100, } } } @@ -2510,6 +2516,11 @@ impl Config { let forward = self.values.routing.accept_unknown_items; forward.unwrap_or_else(|| !self.processing_enabled()) } + + /// Returns the maximum number of breadcrumbs that should be converted to OurLogs. + pub fn max_breadcrumbs_converted(&self) -> usize { + self.values.limits.max_breadcrumbs_converted + } } impl Default for Config { diff --git a/relay-dynamic-config/src/global.rs b/relay-dynamic-config/src/global.rs index 204ec880b2..1caf7b92fd 100644 --- a/relay-dynamic-config/src/global.rs +++ b/relay-dynamic-config/src/global.rs @@ -174,6 +174,18 @@ pub struct Options { )] pub span_extraction_sample_rate: Option, + /// Extract logs from breadrumbs for a fraction of sent breadcrumbs. + /// + /// `None` is the default and interpreted as extract nothing. + /// + /// Note: Any value below 1.0 will cause the product to break, so use with caution. + #[serde( + rename = "relay.ourlogs-breadcrumb-extraction.sample-rate", + deserialize_with = "default_on_error", + skip_serializing_if = "is_default" + )] + pub ourlogs_breadcrumb_extraction_sample_rate: Option, + /// List of values on span description that are allowed to be sent to Sentry without being scrubbed. /// /// At this point, it doesn't accept IP addresses in CIDR format.. yet. diff --git a/relay-ourlogs/Cargo.toml b/relay-ourlogs/Cargo.toml index c132967b92..299a56bf9f 100644 --- a/relay-ourlogs/Cargo.toml +++ b/relay-ourlogs/Cargo.toml @@ -21,6 +21,7 @@ opentelemetry-proto = { workspace = true, features = [ "with-serde", "logs", ] } +relay-log = { workspace = true } relay-event-schema = { workspace = true } relay-protocol = { workspace = true } serde_json = { workspace = true } diff --git a/relay-ourlogs/src/lib.rs b/relay-ourlogs/src/lib.rs index 38130a4a4c..92f3047ca0 100644 --- a/relay-ourlogs/src/lib.rs +++ b/relay-ourlogs/src/lib.rs @@ -6,6 +6,7 @@ html_favicon_url = "https://raw.githubusercontent.com/getsentry/relay/master/artwork/relay-icon.png" )] +pub use crate::ourlog::breadcrumbs_to_ourlogs; pub use crate::ourlog::otel_to_sentry_log; pub use opentelemetry_proto::tonic::logs::v1::LogRecord as OtelLog; diff --git a/relay-ourlogs/src/ourlog.rs b/relay-ourlogs/src/ourlog.rs index 189f664965..dddf9dcb3d 100644 --- a/relay-ourlogs/src/ourlog.rs +++ b/relay-ourlogs/src/ourlog.rs @@ -1,8 +1,10 @@ use opentelemetry_proto::tonic::common::v1::any_value::Value as OtelValue; +use relay_event_schema::protocol::{ + AttributeValue, Breadcrumb, Event, OurLog, SpanId, TraceContext, TraceId, +}; use crate::OtelLog; -use relay_event_schema::protocol::{AttributeValue, OurLog, SpanId, TraceId}; -use relay_protocol::{Annotated, Object}; +use relay_protocol::{Annotated, Object, Value}; /// Transform an OtelLog to a Sentry log. pub fn otel_to_sentry_log(otel_log: OtelLog) -> OurLog { @@ -70,9 +72,152 @@ pub fn otel_to_sentry_log(otel_log: OtelLog) -> OurLog { } } +/// Transform event breadcrumbs to OurLogs. +/// +/// Only converts up to `max_breadcrumbs` breadcrumbs. +pub fn breadcrumbs_to_ourlogs(event: &Event, max_breadcrumbs: usize) -> Vec { + let event_trace_id = event + .context::() + .and_then(|trace_ctx| trace_ctx.trace_id.value()) + .cloned(); + + let breadcrumbs = match event.breadcrumbs.value() { + Some(breadcrumbs) => breadcrumbs, + None => return Vec::new(), + }; + + let values = match breadcrumbs.values.value() { + Some(values) => values, + None => return Vec::new(), + }; + + values + .iter() + .take(max_breadcrumbs) + .filter_map(|breadcrumb| { + let breadcrumb = breadcrumb.value()?; + + // Convert to nanoseconds + let timestamp_nanos = breadcrumb + .timestamp + .value()? + .into_inner() + .timestamp_nanos_opt() + .unwrap() as u64; + let mut attribute_data = Object::new(); + + if let Some(category) = breadcrumb.category.value() { + // Add category as sentry.category attribute if present, since the protocol doesn't have an equivalent field. + attribute_data.insert( + "sentry.category".to_string(), + Annotated::new(AttributeValue::StringValue(category.to_string())), + ); + } + + // Get span_id from data field if it exists and we have a trace_id from context, otherwise ignore it. + let span_id = if event_trace_id.is_some() { + breadcrumb + .data + .value() + .and_then(|data| data["__span"].value()) + .and_then(|span| match span { + Value::String(s) => Some(Annotated::new(SpanId(s.clone()))), + _ => None, + }) + .unwrap_or_else(Annotated::empty) + } else { + Annotated::empty() + }; + + // Convert breadcrumb data fields to primitive attributes + if let Some(data) = breadcrumb.data.value() { + for (key, value) in data.iter() { + if let Some(value) = value.value() { + let attribute = match value { + Value::String(s) => Some(AttributeValue::StringValue(s.clone())), + Value::Bool(b) => Some(AttributeValue::BoolValue(*b)), + Value::I64(i) => Some(AttributeValue::IntValue(*i)), + Value::F64(f) => Some(AttributeValue::DoubleValue(*f)), + _ => None, // Complex types will be supported once consumers are updated to ingest them. + }; + + if let Some(attr) = attribute { + attribute_data.insert(key.clone(), Annotated::new(attr)); + } + } + } + } + + let (body, level) = match breadcrumb.ty.value().map(|ty| ty.as_str()) { + Some("http") => format_http_breadcrumb(breadcrumb)?, + Some(_) | None => format_default_breadcrumb(breadcrumb)?, + }; + + Some(OurLog { + timestamp_nanos: Annotated::new(timestamp_nanos), + observed_timestamp_nanos: Annotated::new(timestamp_nanos), + trace_id: event_trace_id + .clone() + .map(Annotated::new) + .unwrap_or_else(Annotated::empty), + span_id, + trace_flags: Annotated::new(0), + severity_text: level, + severity_number: Annotated::empty(), + body, + attributes: Annotated::new(attribute_data), + ..Default::default() + }) + }) + .collect() +} + +fn format_http_breadcrumb( + breadcrumb: &Breadcrumb, +) -> Option<(Annotated, Annotated)> { + let data = breadcrumb.data.value().cloned().unwrap_or_default(); + + match ( + data.get("method").and_then(|v| v.value()), + data.get("status_code").and_then(|v| v.value()), + data.get("url").and_then(|v| v.value()), + ) { + (Some(Value::String(method)), Some(Value::I64(status)), Some(Value::String(url))) => { + Some(( + Annotated::new(format!("[{}] - {} {}", status, method, url)), + Annotated::new("info".to_string()), + )) + } + _ => { + relay_log::trace!( + "Missing body in log when converting breadcrumb missing required fields: method={}, status={}, url={}", + data.contains_key("method"), + data.contains_key("status_code"), + data.contains_key("url") + ); + None + } + } +} + +fn format_default_breadcrumb( + breadcrumb: &Breadcrumb, +) -> Option<(Annotated, Annotated)> { + breadcrumb.message.value()?; // Log must have a message. + Some(( + breadcrumb.message.clone(), + breadcrumb + .level + .clone() + .map_value(|level| level.to_string()), + )) +} + #[cfg(test)] mod tests { use super::*; + use chrono::{TimeZone, Utc}; + use relay_event_schema::protocol::{Breadcrumb, Level, Values}; use relay_protocol::{get_path, get_value}; #[test] @@ -201,4 +346,151 @@ mod tests { Some(&"SELECT \"table\".\"col\" FROM \"table\" WHERE \"table\".\"col\" = %s".into()) ); } + + #[test] + fn test_breadcrumbs_to_ourlogs() { + let json = r#"{ + "timestamp_nanos": 1577836800000000000, + "observed_timestamp_nanos": 1577836800000000000, + "trace_flags": 0, + "severity_text": "info", + "body": "test message", + "attributes": { + "bool_key": { + "bool_value": true + }, + "float_key": { + "double_value": 42.5 + }, + "int_key": { + "int_value": 42 + }, + "sentry.category": { + "string_value": "test category" + }, + "string_key": { + "string_value": "string value" + } + } +}"#; + + let timestamp = Utc.with_ymd_and_hms(2020, 1, 1, 0, 0, 0).unwrap(); + let mut breadcrumb = Breadcrumb::default(); + breadcrumb.message = Annotated::new("test message".to_string()); + breadcrumb.category = Annotated::new("test category".to_string()); + breadcrumb.timestamp = Annotated::new(timestamp.into()); + breadcrumb.level = Annotated::new(Level::Info); + + let mut data = Object::new(); + data.insert( + "string_key".to_string(), + Annotated::new(Value::String("string value".to_string())), + ); + data.insert("bool_key".to_string(), Annotated::new(Value::Bool(true))); + data.insert("int_key".to_string(), Annotated::new(Value::I64(42))); + data.insert("float_key".to_string(), Annotated::new(Value::F64(42.5))); + breadcrumb.data = Annotated::new(data); + + let mut event = Event::default(); + event.breadcrumbs = Annotated::new(Values { + values: Annotated::new(vec![Annotated::new(breadcrumb)]), + other: Object::default(), + }); + + let ourlogs = breadcrumbs_to_ourlogs(&event, 100); + assert_eq!(ourlogs.len(), 1); + + let annotated_log = Annotated::new(ourlogs[0].clone()); + assert_eq!(json, annotated_log.to_json_pretty().unwrap()); + } + + #[test] + fn test_breadcrumbs_limit() { + let mut breadcrumbs = Vec::new(); + for i in 0..5 { + let timestamp = Utc.with_ymd_and_hms(2020, 1, 1, 0, 0, i).unwrap(); + let mut breadcrumb = Breadcrumb::default(); + breadcrumb.message = Annotated::new(format!("message {}", i)); + breadcrumb.timestamp = Annotated::new(timestamp.into()); + breadcrumbs.push(Annotated::new(breadcrumb)); + } + + let mut event = Event::default(); + event.breadcrumbs = Annotated::new(Values { + values: Annotated::new(breadcrumbs), + other: Object::default(), + }); + + let ourlogs = breadcrumbs_to_ourlogs(&event, 3); + assert_eq!(ourlogs.len(), 3, "Limited to 3 breadcrumbs"); + assert_eq!(ourlogs[2].body.value().unwrap(), "message 2"); + + let ourlogs = breadcrumbs_to_ourlogs(&event, 10); + assert_eq!(ourlogs.len(), 5, "No limit"); + assert_eq!(ourlogs[4].body.value().unwrap(), "message 4"); + } + + #[test] + fn test_http_breadcrumb_conversion() { + let json = r#"{ + "timestamp_nanos": 1738209657000000000, + "observed_timestamp_nanos": 1738209657000000000, + "trace_flags": 0, + "severity_text": "info", + "body": "[200] - GET /api/0/organizations/sentry/issues/", + "attributes": { + "__span": { + "string_value": "bd61ce905c5f1bbd" + }, + "method": { + "string_value": "GET" + }, + "sentry.category": { + "string_value": "fetch" + }, + "status_code": { + "int_value": 200 + }, + "url": { + "string_value": "/api/0/organizations/sentry/issues/" + } + } +}"#; + + let timestamp = Utc.with_ymd_and_hms(2025, 1, 30, 4, 0, 57).unwrap(); + let mut breadcrumb = Breadcrumb::default(); + breadcrumb.ty = Annotated::new("http".to_string()); + breadcrumb.category = Annotated::new("fetch".to_string()); + breadcrumb.timestamp = Annotated::new(timestamp.into()); + + let mut data = Object::new(); + data.insert( + "__span".to_string(), + Annotated::new(Value::String("bd61ce905c5f1bbd".to_string())), + ); + data.insert( + "method".to_string(), + Annotated::new(Value::String("GET".to_string())), + ); + data.insert("status_code".to_string(), Annotated::new(Value::I64(200))); + data.insert( + "url".to_string(), + Annotated::new(Value::String( + "/api/0/organizations/sentry/issues/".to_string(), + )), + ); + breadcrumb.data = Annotated::new(data); + + let mut event = Event::default(); + event.breadcrumbs = Annotated::new(Values { + values: Annotated::new(vec![Annotated::new(breadcrumb)]), + other: Object::default(), + }); + + let ourlogs = breadcrumbs_to_ourlogs(&event, 100); + assert_eq!(ourlogs.len(), 1); + + let annotated_log = Annotated::new(ourlogs[0].clone()); + assert_eq!(json, annotated_log.to_json_pretty().unwrap()); + } } diff --git a/relay-server/src/envelope.rs b/relay-server/src/envelope.rs index 1561082be0..2c07d37e26 100644 --- a/relay-server/src/envelope.rs +++ b/relay-server/src/envelope.rs @@ -553,7 +553,7 @@ pub struct ItemHeaders { /// /// In order to only extract metrics once from an item while through a /// chain of Relays, a Relay that extracts metrics from an item (typically - /// the first Relay) MUST set this flat to true so that upstream Relays do + /// the first Relay) MUST set this flag to true so that upstream Relays do /// not extract the metric again causing double counting of the metric. #[serde(default, skip_serializing_if = "is_false")] metrics_extracted: bool, @@ -568,6 +568,15 @@ pub struct ItemHeaders { #[serde(default, skip_serializing_if = "is_false")] spans_extracted: bool, + /// Whether or not logs have been extracted from the item. + /// + /// In order to only extract logs once from an item while through a + /// chain of Relays, a Relay that extracts logs from an item (typically + /// the first Relay) MUST set this flag to true so that upstream Relays do + /// not extract the logs again causing double counting of the logs. + #[serde(default, skip_serializing_if = "is_false")] + ourlogs_extracted: bool, + /// Whether the event has been _fully_ normalized. /// /// If the event has been partially normalized, this flag is false. By @@ -665,6 +674,7 @@ impl Item { other: BTreeMap::new(), metrics_extracted: false, spans_extracted: false, + ourlogs_extracted: false, sampled: true, fully_normalized: false, ingest_span_in_eap: false, @@ -866,6 +876,16 @@ impl Item { self.headers.spans_extracted = spans_extracted; } + /// Returns the ourlogs extracted flag. + pub fn ourlogs_extracted(&self) -> bool { + self.headers.ourlogs_extracted + } + + /// Sets the ourlogs extracted flag. + pub fn set_ourlogs_extracted(&mut self, ourlogs_extracted: bool) { + self.headers.ourlogs_extracted = ourlogs_extracted; + } + /// Returns the fully normalized flag. pub fn fully_normalized(&self) -> bool { self.headers.fully_normalized diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index 88ecbb4750..8b117b674c 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -805,6 +805,10 @@ struct EventMetricsExtracted(bool); #[derive(Debug, Copy, Clone)] struct SpansExtracted(bool); +/// New type representing whether logs were extracted. +#[derive(Debug, Copy, Clone)] +struct OurLogsExtracted(bool); + /// The result of the envelope processing containing the processed envelope along with the partial /// result. #[derive(Debug)] @@ -1546,6 +1550,7 @@ impl EnvelopeProcessorService { &mut metrics, event_fully_normalized, &self.inner.config, + &self.inner.global_config.current(), )?; let mut event = extraction_result.event; @@ -1609,6 +1614,7 @@ impl EnvelopeProcessorService { event_fully_normalized, EventMetricsExtracted(false), SpansExtracted(false), + OurLogsExtracted(false), )?; event::emit_feedback_metrics(managed_envelope.envelope()); } @@ -1642,6 +1648,7 @@ impl EnvelopeProcessorService { let mut event_fully_normalized = EventFullyNormalized::new(managed_envelope.envelope()); let mut event_metrics_extracted = EventMetricsExtracted(false); let mut spans_extracted = SpansExtracted(false); + let mut ourlogs_extracted = OurLogsExtracted(false); let mut metrics = Metrics::default(); let mut extracted_metrics = ProcessingExtractedMetrics::new(); @@ -1655,6 +1662,7 @@ impl EnvelopeProcessorService { &mut metrics, event_fully_normalized, &self.inner.config, + &global_config, )?; // If metrics were extracted we mark that. @@ -1664,6 +1672,9 @@ impl EnvelopeProcessorService { if let Some(inner_spans_extracted) = extraction_result.spans_extracted { spans_extracted = inner_spans_extracted; }; + if let Some(inner_ourlogs_extracted) = extraction_result.ourlogs_extracted { + ourlogs_extracted = inner_ourlogs_extracted; + }; // We take the main event out of the result. let mut event = extraction_result.event; @@ -1837,6 +1848,7 @@ impl EnvelopeProcessorService { event_fully_normalized, event_metrics_extracted, spans_extracted, + ourlogs_extracted, )?; } diff --git a/relay-server/src/services/processor/event.rs b/relay-server/src/services/processor/event.rs index 0769e5c8b8..702723250f 100644 --- a/relay-server/src/services/processor/event.rs +++ b/relay-server/src/services/processor/event.rs @@ -14,6 +14,7 @@ use relay_event_schema::protocol::{ Breadcrumb, Csp, Event, ExpectCt, ExpectStaple, Hpkp, LenientString, Metrics, NetworkReportError, OtelContext, RelayInfo, SecurityReportType, Values, }; +use relay_ourlogs::breadcrumbs_to_ourlogs; use relay_pii::PiiProcessor; use relay_protocol::{Annotated, Array, Empty, Object, Value}; use relay_quotas::DataCategory; @@ -25,7 +26,7 @@ use crate::extractors::RequestMeta; use crate::services::outcome::Outcome; use crate::services::processor::{ event_category, event_type, EventFullyNormalized, EventMetricsExtracted, EventProcessing, - ExtractedEvent, ProcessingError, SpansExtracted, MINIMUM_CLOCK_DRIFT, + ExtractedEvent, OurLogsExtracted, ProcessingError, SpansExtracted, MINIMUM_CLOCK_DRIFT, }; use crate::services::projects::project::ProjectInfo; use crate::statsd::{PlatformTag, RelayCounters, RelayHistograms, RelayTimers}; @@ -37,6 +38,7 @@ pub struct ExtractionResult { pub event: Annotated, pub event_metrics_extracted: Option, pub spans_extracted: Option, + pub ourlogs_extracted: Option, } /// Extracts the primary event payload from an envelope. @@ -52,6 +54,7 @@ pub fn extract( metrics: &mut Metrics, event_fully_normalized: EventFullyNormalized, config: &Config, + global_config: &GlobalConfig, ) -> Result { let envelope = managed_envelope.envelope_mut(); @@ -83,6 +86,7 @@ pub fn extract( let mut event_metrics_extracted = None; let mut spans_extracted = None; + let mut ourlogs_extracted = None; let (event, event_len) = if let Some(item) = event_item.or(security_item) { relay_log::trace!("processing json event"); metric!(timer(RelayTimers::EventProcessingDeserialize), { @@ -145,10 +149,37 @@ pub fn extract( metrics.bytes_ingested_event = Annotated::new(event_len as u64); + if let Some(event_value) = event.value() { + let convert_breadcrumbs_to_logs = utils::sample( + global_config + .options + .ourlogs_breadcrumb_extraction_sample_rate + .unwrap_or(0.0), + ); + + if convert_breadcrumbs_to_logs { + relay_log::trace!("extracting breadcrumbs to logs"); + let ourlogs: Vec = + breadcrumbs_to_ourlogs(event_value, config.max_breadcrumbs_converted()); + + if !ourlogs.is_empty() { + for ourlog in ourlogs { + let mut log_item = Item::new(ItemType::Log); + if let Ok(payload) = Annotated::new(ourlog).to_json() { + log_item.set_payload(ContentType::Json, payload); + envelope.add_item(log_item); + } + } + ourlogs_extracted = Some(OurLogsExtracted(true)); + } + } + } + Ok(ExtractionResult { event, event_metrics_extracted, spans_extracted, + ourlogs_extracted, }) } @@ -377,6 +408,7 @@ pub fn serialize( event_fully_normalized: EventFullyNormalized, event_metrics_extracted: EventMetricsExtracted, spans_extracted: SpansExtracted, + ourlogs_extracted: OurLogsExtracted, ) -> Result<(), ProcessingError> { if event.is_empty() { relay_log::error!("Cannot serialize empty event"); @@ -396,6 +428,7 @@ pub fn serialize( event_item.set_metrics_extracted(event_metrics_extracted.0); event_item.set_spans_extracted(spans_extracted.0); event_item.set_fully_normalized(event_fully_normalized.0); + event_item.set_ourlogs_extracted(ourlogs_extracted.0); managed_envelope.envelope_mut().add_item(event_item); diff --git a/tests/integration/test_ourlogs.py b/tests/integration/test_ourlogs.py index e373898f64..0ad2a9c17a 100644 --- a/tests/integration/test_ourlogs.py +++ b/tests/integration/test_ourlogs.py @@ -1,7 +1,9 @@ import json from datetime import datetime, timedelta, timezone +import pytest from sentry_sdk.envelope import Envelope, Item, PayloadRef +from .test_store import make_transaction TEST_CONFIG = { @@ -121,3 +123,45 @@ def test_ourlog_extraction_is_disabled_without_feature( assert len(ourlogs) == 0 ourlogs_consumer.assert_empty() + + +@pytest.mark.parametrize( + "sample_rate,expected_ourlogs", + [ + (None, 0), + (1.0, 1), + (0.0, 0), + ], +) +def test_ourlog_breadcrumb_extraction_sample_rate( + mini_sentry, + relay_with_processing, + ourlogs_consumer, + sample_rate, + expected_ourlogs, +): + ourlogs_consumer = ourlogs_consumer() + project_id = 42 + + mini_sentry.global_config["options"] = { + "relay.ourlogs-breadcrumb-extraction.sample-rate": sample_rate + } + + def send_event_with_breadcrumb(relay): + event = make_transaction({"event_id": "cbf6960622e14a45abc1f03b2055b186"}) + event["breadcrumbs"] = [ + { + "timestamp": datetime.now(timezone.utc).isoformat(), + "message": "Test breadcrumb", + "category": "test", + "level": "info", + } + ] + relay.send_event(project_id, event) + + relay = relay_with_processing(options=TEST_CONFIG) + project_config = mini_sentry.add_full_project_config(project_id) + project_config["config"]["features"] = ["organizations:ourlogs-ingestion"] + send_event_with_breadcrumb(relay) + assert len(ourlogs_consumer.get_ourlogs()) == expected_ourlogs + ourlogs_consumer.assert_empty() From b678b22ac3cfd2346778beab4c5bcb3188c9a355 Mon Sep 17 00:00:00 2001 From: Kev Date: Thu, 30 Jan 2025 01:11:07 -0500 Subject: [PATCH 2/5] fix lint --- relay-ourlogs/src/ourlog.rs | 74 +++++++++++++++++++++---------------- 1 file changed, 43 insertions(+), 31 deletions(-) diff --git a/relay-ourlogs/src/ourlog.rs b/relay-ourlogs/src/ourlog.rs index dddf9dcb3d..1263468c18 100644 --- a/relay-ourlogs/src/ourlog.rs +++ b/relay-ourlogs/src/ourlog.rs @@ -375,12 +375,6 @@ mod tests { }"#; let timestamp = Utc.with_ymd_and_hms(2020, 1, 1, 0, 0, 0).unwrap(); - let mut breadcrumb = Breadcrumb::default(); - breadcrumb.message = Annotated::new("test message".to_string()); - breadcrumb.category = Annotated::new("test category".to_string()); - breadcrumb.timestamp = Annotated::new(timestamp.into()); - breadcrumb.level = Annotated::new(Level::Info); - let mut data = Object::new(); data.insert( "string_key".to_string(), @@ -389,13 +383,23 @@ mod tests { data.insert("bool_key".to_string(), Annotated::new(Value::Bool(true))); data.insert("int_key".to_string(), Annotated::new(Value::I64(42))); data.insert("float_key".to_string(), Annotated::new(Value::F64(42.5))); - breadcrumb.data = Annotated::new(data); - let mut event = Event::default(); - event.breadcrumbs = Annotated::new(Values { - values: Annotated::new(vec![Annotated::new(breadcrumb)]), - other: Object::default(), - }); + let breadcrumb = Breadcrumb { + message: Annotated::new("test message".to_string()), + category: Annotated::new("test category".to_string()), + timestamp: Annotated::new(timestamp.into()), + level: Annotated::new(Level::Info), + data: Annotated::new(data), + ..Default::default() + }; + + let event = Event { + breadcrumbs: Annotated::new(Values { + values: Annotated::new(vec![Annotated::new(breadcrumb)]), + other: Object::default(), + }), + ..Default::default() + }; let ourlogs = breadcrumbs_to_ourlogs(&event, 100); assert_eq!(ourlogs.len(), 1); @@ -409,17 +413,21 @@ mod tests { let mut breadcrumbs = Vec::new(); for i in 0..5 { let timestamp = Utc.with_ymd_and_hms(2020, 1, 1, 0, 0, i).unwrap(); - let mut breadcrumb = Breadcrumb::default(); - breadcrumb.message = Annotated::new(format!("message {}", i)); - breadcrumb.timestamp = Annotated::new(timestamp.into()); + let breadcrumb = Breadcrumb { + message: Annotated::new(format!("message {}", i)), + timestamp: Annotated::new(timestamp.into()), + ..Default::default() + }; breadcrumbs.push(Annotated::new(breadcrumb)); } - let mut event = Event::default(); - event.breadcrumbs = Annotated::new(Values { - values: Annotated::new(breadcrumbs), - other: Object::default(), - }); + let event = Event { + breadcrumbs: Annotated::new(Values { + values: Annotated::new(breadcrumbs), + other: Object::default(), + }), + ..Default::default() + }; let ourlogs = breadcrumbs_to_ourlogs(&event, 3); assert_eq!(ourlogs.len(), 3, "Limited to 3 breadcrumbs"); @@ -458,11 +466,6 @@ mod tests { }"#; let timestamp = Utc.with_ymd_and_hms(2025, 1, 30, 4, 0, 57).unwrap(); - let mut breadcrumb = Breadcrumb::default(); - breadcrumb.ty = Annotated::new("http".to_string()); - breadcrumb.category = Annotated::new("fetch".to_string()); - breadcrumb.timestamp = Annotated::new(timestamp.into()); - let mut data = Object::new(); data.insert( "__span".to_string(), @@ -479,13 +482,22 @@ mod tests { "/api/0/organizations/sentry/issues/".to_string(), )), ); - breadcrumb.data = Annotated::new(data); - let mut event = Event::default(); - event.breadcrumbs = Annotated::new(Values { - values: Annotated::new(vec![Annotated::new(breadcrumb)]), - other: Object::default(), - }); + let breadcrumb = Breadcrumb { + ty: Annotated::new("http".to_string()), + category: Annotated::new("fetch".to_string()), + timestamp: Annotated::new(timestamp.into()), + data: Annotated::new(data), + ..Default::default() + }; + + let event = Event { + breadcrumbs: Annotated::new(Values { + values: Annotated::new(vec![Annotated::new(breadcrumb)]), + other: Object::default(), + }), + ..Default::default() + }; let ourlogs = breadcrumbs_to_ourlogs(&event, 100); assert_eq!(ourlogs.len(), 1); From 3be9e9ce0ab555a86ac4d84d1f2e1564368a4610 Mon Sep 17 00:00:00 2001 From: Kev Date: Thu, 30 Jan 2025 23:48:43 -0500 Subject: [PATCH 3/5] Address PR comments --- CHANGELOG.md | 5 +- relay-config/src/config.rs | 11 - relay-dynamic-config/src/feature.rs | 5 + relay-dynamic-config/src/global.rs | 11 + relay-ourlogs/src/ourlog.rs | 223 +++++++++--------- relay-server/src/envelope.rs | 20 -- relay-server/src/services/processor.rs | 24 +- relay-server/src/services/processor/event.rs | 35 +-- relay-server/src/services/processor/ourlog.rs | 49 +++- tests/integration/test_ourlogs.py | 23 +- 10 files changed, 196 insertions(+), 210 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c56b6f2a73..6a249cf322 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,11 +2,10 @@ ## Unreleased -- Preliminary breadcrumb to log conversion. ([#4479](https://github.com/getsentry/relay/pull/4479)) -- Allow log ingestion behind a flag, only for internal use currently. ([#4471](https://github.com/getsentry/relay/pull/4471)) - **Features**: +- Preliminary breadcrumb to log conversion. ([#4479](https://github.com/getsentry/relay/pull/4479)) +- Allow log ingestion behind a flag, only for internal use currently. ([#4471](https://github.com/getsentry/relay/pull/4471)) - Add configuration option to limit the amount of concurrent http connections. ([#4453](https://github.com/getsentry/relay/pull/4453)) - Add flags context to event schema. ([#4458](https://github.com/getsentry/relay/pull/4458)) - Add support for view hierarchy attachment scrubbing. ([#4452](https://github.com/getsentry/relay/pull/4452)) diff --git a/relay-config/src/config.rs b/relay-config/src/config.rs index 7ede36779c..fa41ef8b1d 100644 --- a/relay-config/src/config.rs +++ b/relay-config/src/config.rs @@ -667,11 +667,6 @@ pub struct Limits { /// /// Defaults to `1024`, a value [google has been using for a long time](https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/commit/?id=19f92a030ca6d772ab44b22ee6a01378a8cb32d4). pub tcp_listen_backlog: u32, - /// The maximum number of breadcrumbs to convert to OurLogs. - /// - /// When converting breadcrumbs to OurLogs, only up to this many breadcrumbs will be converted. - /// Defaults to 100. - pub max_breadcrumbs_converted: usize, } impl Default for Limits { @@ -704,7 +699,6 @@ impl Default for Limits { idle_timeout: None, max_connections: None, tcp_listen_backlog: 1024, - max_breadcrumbs_converted: 100, } } } @@ -2516,11 +2510,6 @@ impl Config { let forward = self.values.routing.accept_unknown_items; forward.unwrap_or_else(|| !self.processing_enabled()) } - - /// Returns the maximum number of breadcrumbs that should be converted to OurLogs. - pub fn max_breadcrumbs_converted(&self) -> usize { - self.values.limits.max_breadcrumbs_converted - } } impl Default for Config { diff --git a/relay-dynamic-config/src/feature.rs b/relay-dynamic-config/src/feature.rs index 2210a1b1e7..dcd0146b0f 100644 --- a/relay-dynamic-config/src/feature.rs +++ b/relay-dynamic-config/src/feature.rs @@ -107,6 +107,11 @@ pub enum Feature { /// Serialized as `organizations:ourlogs-ingestion`. #[serde(rename = "organizations:ourlogs-ingestion")] OurLogsIngestion, + + /// Enables extracting logs from breadcrumbs for our log product. + #[serde(rename = "projects:ourlogs-breadcrumb-extraction")] + OurLogsBreadcrumbExtraction, + /// This feature has graduated and is hard-coded for external Relays. #[doc(hidden)] #[serde(rename = "projects:profiling-ingest-unsampled-profiles")] diff --git a/relay-dynamic-config/src/global.rs b/relay-dynamic-config/src/global.rs index 1caf7b92fd..1200c631e5 100644 --- a/relay-dynamic-config/src/global.rs +++ b/relay-dynamic-config/src/global.rs @@ -186,6 +186,17 @@ pub struct Options { )] pub ourlogs_breadcrumb_extraction_sample_rate: Option, + /// The maximum number of breadcrumbs to convert to OurLogs. + /// + /// When converting breadcrumbs to OurLogs, only up to this many breadcrumbs will be converted. + /// Defaults to 100. + #[serde( + rename = "relay.ourlogs-breadcrumb-extraction.max-breadcrumbs-converted", + deserialize_with = "default_on_error", + skip_serializing_if = "is_default" + )] + pub ourlogs_breadcrumb_extraction_max_breadcrumbs_converted: usize, + /// List of values on span description that are allowed to be sent to Sentry without being scrubbed. /// /// At this point, it doesn't accept IP addresses in CIDR format.. yet. diff --git a/relay-ourlogs/src/ourlog.rs b/relay-ourlogs/src/ourlog.rs index 1263468c18..1bbc811379 100644 --- a/relay-ourlogs/src/ourlog.rs +++ b/relay-ourlogs/src/ourlog.rs @@ -1,9 +1,8 @@ +use crate::OtelLog; use opentelemetry_proto::tonic::common::v1::any_value::Value as OtelValue; use relay_event_schema::protocol::{ - AttributeValue, Breadcrumb, Event, OurLog, SpanId, TraceContext, TraceId, + AttributeValue, Breadcrumb, Event, Level, OurLog, SpanId, TraceContext, TraceId, }; - -use crate::OtelLog; use relay_protocol::{Annotated, Object, Value}; /// Transform an OtelLog to a Sentry log. @@ -75,129 +74,117 @@ pub fn otel_to_sentry_log(otel_log: OtelLog) -> OurLog { /// Transform event breadcrumbs to OurLogs. /// /// Only converts up to `max_breadcrumbs` breadcrumbs. -pub fn breadcrumbs_to_ourlogs(event: &Event, max_breadcrumbs: usize) -> Vec { +pub fn breadcrumbs_to_ourlogs(event: &Event, max_breadcrumbs: usize) -> Option> { let event_trace_id = event .context::() - .and_then(|trace_ctx| trace_ctx.trace_id.value()) - .cloned(); - - let breadcrumbs = match event.breadcrumbs.value() { - Some(breadcrumbs) => breadcrumbs, - None => return Vec::new(), - }; - - let values = match breadcrumbs.values.value() { - Some(values) => values, - None => return Vec::new(), - }; - - values - .iter() - .take(max_breadcrumbs) - .filter_map(|breadcrumb| { - let breadcrumb = breadcrumb.value()?; - - // Convert to nanoseconds - let timestamp_nanos = breadcrumb - .timestamp - .value()? - .into_inner() - .timestamp_nanos_opt() - .unwrap() as u64; - let mut attribute_data = Object::new(); - - if let Some(category) = breadcrumb.category.value() { - // Add category as sentry.category attribute if present, since the protocol doesn't have an equivalent field. - attribute_data.insert( - "sentry.category".to_string(), - Annotated::new(AttributeValue::StringValue(category.to_string())), - ); - } - - // Get span_id from data field if it exists and we have a trace_id from context, otherwise ignore it. - let span_id = if event_trace_id.is_some() { - breadcrumb - .data - .value() - .and_then(|data| data["__span"].value()) - .and_then(|span| match span { - Value::String(s) => Some(Annotated::new(SpanId(s.clone()))), - _ => None, - }) - .unwrap_or_else(Annotated::empty) - } else { - Annotated::empty() - }; + .and_then(|trace_ctx| trace_ctx.trace_id.value()); + + let breadcrumbs = event.breadcrumbs.value()?; + let values = breadcrumbs.values.value()?; + + Some( + values + .iter() + .take(max_breadcrumbs) + .filter_map(|breadcrumb| { + let breadcrumb = breadcrumb.value()?; + + // Convert to nanoseconds + let timestamp_nanos = breadcrumb + .timestamp + .value()? + .into_inner() + .timestamp_nanos_opt() + .map(|t| t as u64)?; + + let mut attribute_data = Object::new(); + + if let Some(category) = breadcrumb.category.value() { + // Add category as sentry.category attribute if present, since the protocol doesn't have an equivalent field. + attribute_data.insert( + "sentry.category".to_string(), + Annotated::new(AttributeValue::StringValue(category.to_string())), + ); + } - // Convert breadcrumb data fields to primitive attributes - if let Some(data) = breadcrumb.data.value() { - for (key, value) in data.iter() { - if let Some(value) = value.value() { - let attribute = match value { - Value::String(s) => Some(AttributeValue::StringValue(s.clone())), - Value::Bool(b) => Some(AttributeValue::BoolValue(*b)), - Value::I64(i) => Some(AttributeValue::IntValue(*i)), - Value::F64(f) => Some(AttributeValue::DoubleValue(*f)), - _ => None, // Complex types will be supported once consumers are updated to ingest them. - }; - - if let Some(attr) = attribute { - attribute_data.insert(key.clone(), Annotated::new(attr)); + // Get span_id from data field if it exists and we have a trace_id from context, otherwise ignore it. + let span_id = if event_trace_id.is_some() { + breadcrumb + .data + .value() + .and_then(|data| data.get("__span").and_then(|span| span.value())) + .and_then(|span| match span { + Value::String(s) => Some(Annotated::new(SpanId(s.clone()))), + _ => None, + }) + .unwrap_or_else(Annotated::empty) + } else { + Annotated::empty() + }; + + // Convert breadcrumb data fields to primitive attributes + if let Some(data) = breadcrumb.data.value() { + for (key, value) in data.iter() { + if let Some(value) = value.value() { + let attribute = match value { + Value::String(s) => Some(AttributeValue::StringValue(s.clone())), + Value::Bool(b) => Some(AttributeValue::BoolValue(*b)), + Value::I64(i) => Some(AttributeValue::IntValue(*i)), + Value::F64(f) => Some(AttributeValue::DoubleValue(*f)), + _ => None, // Complex types will be supported once consumers are updated to ingest them. + }; + + if let Some(attr) = attribute { + attribute_data.insert(key.clone(), Annotated::new(attr)); + } } } } - } - let (body, level) = match breadcrumb.ty.value().map(|ty| ty.as_str()) { - Some("http") => format_http_breadcrumb(breadcrumb)?, - Some(_) | None => format_default_breadcrumb(breadcrumb)?, - }; - - Some(OurLog { - timestamp_nanos: Annotated::new(timestamp_nanos), - observed_timestamp_nanos: Annotated::new(timestamp_nanos), - trace_id: event_trace_id - .clone() - .map(Annotated::new) - .unwrap_or_else(Annotated::empty), - span_id, - trace_flags: Annotated::new(0), - severity_text: level, - severity_number: Annotated::empty(), - body, - attributes: Annotated::new(attribute_data), - ..Default::default() + let (body, level) = match breadcrumb.ty.value().map(|ty| ty.as_str()) { + Some("http") => { + let (body, level) = format_http_breadcrumb(breadcrumb)?; + (Annotated::new(body), Annotated::new(level)) + } + Some(_) | None => format_default_breadcrumb(breadcrumb)?, + }; + + Some(OurLog { + timestamp_nanos: Annotated::new(timestamp_nanos), + observed_timestamp_nanos: Annotated::new(timestamp_nanos), + trace_id: event_trace_id + .cloned() + .map(Annotated::new) + .unwrap_or_else(Annotated::empty), + span_id, + trace_flags: Annotated::new(0), + severity_text: level, + severity_number: Annotated::empty(), + body, + attributes: Annotated::new(attribute_data), + ..Default::default() + }) }) - }) - .collect() + .collect(), + ) } -fn format_http_breadcrumb( - breadcrumb: &Breadcrumb, -) -> Option<(Annotated, Annotated)> { +fn format_http_breadcrumb(breadcrumb: &Breadcrumb) -> Option<(String, String)> { let data = breadcrumb.data.value().cloned().unwrap_or_default(); + let method = data.get("method").and_then(|v| v.value())?; + let url = data.get("url").and_then(|v| v.value())?; + let status_code = data + .get("status_code") + .and_then(|v| v.value()) + .and_then(|v| match v { + Value::I64(i) => Some(i), + _ => None, + })?; - match ( - data.get("method").and_then(|v| v.value()), - data.get("status_code").and_then(|v| v.value()), - data.get("url").and_then(|v| v.value()), - ) { - (Some(Value::String(method)), Some(Value::I64(status)), Some(Value::String(url))) => { - Some(( - Annotated::new(format!("[{}] - {} {}", status, method, url)), - Annotated::new("info".to_string()), - )) - } - _ => { - relay_log::trace!( - "Missing body in log when converting breadcrumb missing required fields: method={}, status={}, url={}", - data.contains_key("method"), - data.contains_key("status_code"), - data.contains_key("url") - ); - None - } - } + Some(( + format!("[{}] - {} {}", status_code, method.as_str()?, url.as_str()?), + Level::Info.to_string(), + )) } fn format_default_breadcrumb( @@ -401,7 +388,7 @@ mod tests { ..Default::default() }; - let ourlogs = breadcrumbs_to_ourlogs(&event, 100); + let ourlogs = breadcrumbs_to_ourlogs(&event, 100).unwrap(); assert_eq!(ourlogs.len(), 1); let annotated_log = Annotated::new(ourlogs[0].clone()); @@ -429,11 +416,11 @@ mod tests { ..Default::default() }; - let ourlogs = breadcrumbs_to_ourlogs(&event, 3); + let ourlogs = breadcrumbs_to_ourlogs(&event, 3).unwrap(); assert_eq!(ourlogs.len(), 3, "Limited to 3 breadcrumbs"); assert_eq!(ourlogs[2].body.value().unwrap(), "message 2"); - let ourlogs = breadcrumbs_to_ourlogs(&event, 10); + let ourlogs = breadcrumbs_to_ourlogs(&event, 10).unwrap(); assert_eq!(ourlogs.len(), 5, "No limit"); assert_eq!(ourlogs[4].body.value().unwrap(), "message 4"); } @@ -499,7 +486,7 @@ mod tests { ..Default::default() }; - let ourlogs = breadcrumbs_to_ourlogs(&event, 100); + let ourlogs = breadcrumbs_to_ourlogs(&event, 100).unwrap(); assert_eq!(ourlogs.len(), 1); let annotated_log = Annotated::new(ourlogs[0].clone()); diff --git a/relay-server/src/envelope.rs b/relay-server/src/envelope.rs index 2c07d37e26..135cdf54fa 100644 --- a/relay-server/src/envelope.rs +++ b/relay-server/src/envelope.rs @@ -568,15 +568,6 @@ pub struct ItemHeaders { #[serde(default, skip_serializing_if = "is_false")] spans_extracted: bool, - /// Whether or not logs have been extracted from the item. - /// - /// In order to only extract logs once from an item while through a - /// chain of Relays, a Relay that extracts logs from an item (typically - /// the first Relay) MUST set this flag to true so that upstream Relays do - /// not extract the logs again causing double counting of the logs. - #[serde(default, skip_serializing_if = "is_false")] - ourlogs_extracted: bool, - /// Whether the event has been _fully_ normalized. /// /// If the event has been partially normalized, this flag is false. By @@ -674,7 +665,6 @@ impl Item { other: BTreeMap::new(), metrics_extracted: false, spans_extracted: false, - ourlogs_extracted: false, sampled: true, fully_normalized: false, ingest_span_in_eap: false, @@ -876,16 +866,6 @@ impl Item { self.headers.spans_extracted = spans_extracted; } - /// Returns the ourlogs extracted flag. - pub fn ourlogs_extracted(&self) -> bool { - self.headers.ourlogs_extracted - } - - /// Sets the ourlogs extracted flag. - pub fn set_ourlogs_extracted(&mut self, ourlogs_extracted: bool) { - self.headers.ourlogs_extracted = ourlogs_extracted; - } - /// Returns the fully normalized flag. pub fn fully_normalized(&self) -> bool { self.headers.fully_normalized diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index 8b117b674c..4fa0934ffc 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -805,10 +805,6 @@ struct EventMetricsExtracted(bool); #[derive(Debug, Copy, Clone)] struct SpansExtracted(bool); -/// New type representing whether logs were extracted. -#[derive(Debug, Copy, Clone)] -struct OurLogsExtracted(bool); - /// The result of the envelope processing containing the processed envelope along with the partial /// result. #[derive(Debug)] @@ -1550,7 +1546,6 @@ impl EnvelopeProcessorService { &mut metrics, event_fully_normalized, &self.inner.config, - &self.inner.global_config.current(), )?; let mut event = extraction_result.event; @@ -1606,6 +1601,14 @@ impl EnvelopeProcessorService { )?; }); + if_processing!(self.inner.config, { + ourlog::extract_from_event( + managed_envelope, + &event, + &self.inner.global_config.current(), + ); + }); + if event.value().is_some() { event::scrub(&mut event, project_info.clone())?; event::serialize( @@ -1614,7 +1617,6 @@ impl EnvelopeProcessorService { event_fully_normalized, EventMetricsExtracted(false), SpansExtracted(false), - OurLogsExtracted(false), )?; event::emit_feedback_metrics(managed_envelope.envelope()); } @@ -1648,7 +1650,6 @@ impl EnvelopeProcessorService { let mut event_fully_normalized = EventFullyNormalized::new(managed_envelope.envelope()); let mut event_metrics_extracted = EventMetricsExtracted(false); let mut spans_extracted = SpansExtracted(false); - let mut ourlogs_extracted = OurLogsExtracted(false); let mut metrics = Metrics::default(); let mut extracted_metrics = ProcessingExtractedMetrics::new(); @@ -1662,7 +1663,6 @@ impl EnvelopeProcessorService { &mut metrics, event_fully_normalized, &self.inner.config, - &global_config, )?; // If metrics were extracted we mark that. @@ -1672,9 +1672,6 @@ impl EnvelopeProcessorService { if let Some(inner_spans_extracted) = extraction_result.spans_extracted { spans_extracted = inner_spans_extracted; }; - if let Some(inner_ourlogs_extracted) = extraction_result.ourlogs_extracted { - ourlogs_extracted = inner_ourlogs_extracted; - }; // We take the main event out of the result. let mut event = extraction_result.event; @@ -1829,6 +1826,10 @@ impl EnvelopeProcessorService { ); } + if project_info.has_feature(Feature::OurLogsBreadcrumbExtraction) { + ourlog::extract_from_event(managed_envelope, &event, &global_config); + } + event = self.enforce_quotas( managed_envelope, event, @@ -1848,7 +1849,6 @@ impl EnvelopeProcessorService { event_fully_normalized, event_metrics_extracted, spans_extracted, - ourlogs_extracted, )?; } diff --git a/relay-server/src/services/processor/event.rs b/relay-server/src/services/processor/event.rs index 702723250f..0769e5c8b8 100644 --- a/relay-server/src/services/processor/event.rs +++ b/relay-server/src/services/processor/event.rs @@ -14,7 +14,6 @@ use relay_event_schema::protocol::{ Breadcrumb, Csp, Event, ExpectCt, ExpectStaple, Hpkp, LenientString, Metrics, NetworkReportError, OtelContext, RelayInfo, SecurityReportType, Values, }; -use relay_ourlogs::breadcrumbs_to_ourlogs; use relay_pii::PiiProcessor; use relay_protocol::{Annotated, Array, Empty, Object, Value}; use relay_quotas::DataCategory; @@ -26,7 +25,7 @@ use crate::extractors::RequestMeta; use crate::services::outcome::Outcome; use crate::services::processor::{ event_category, event_type, EventFullyNormalized, EventMetricsExtracted, EventProcessing, - ExtractedEvent, OurLogsExtracted, ProcessingError, SpansExtracted, MINIMUM_CLOCK_DRIFT, + ExtractedEvent, ProcessingError, SpansExtracted, MINIMUM_CLOCK_DRIFT, }; use crate::services::projects::project::ProjectInfo; use crate::statsd::{PlatformTag, RelayCounters, RelayHistograms, RelayTimers}; @@ -38,7 +37,6 @@ pub struct ExtractionResult { pub event: Annotated, pub event_metrics_extracted: Option, pub spans_extracted: Option, - pub ourlogs_extracted: Option, } /// Extracts the primary event payload from an envelope. @@ -54,7 +52,6 @@ pub fn extract( metrics: &mut Metrics, event_fully_normalized: EventFullyNormalized, config: &Config, - global_config: &GlobalConfig, ) -> Result { let envelope = managed_envelope.envelope_mut(); @@ -86,7 +83,6 @@ pub fn extract( let mut event_metrics_extracted = None; let mut spans_extracted = None; - let mut ourlogs_extracted = None; let (event, event_len) = if let Some(item) = event_item.or(security_item) { relay_log::trace!("processing json event"); metric!(timer(RelayTimers::EventProcessingDeserialize), { @@ -149,37 +145,10 @@ pub fn extract( metrics.bytes_ingested_event = Annotated::new(event_len as u64); - if let Some(event_value) = event.value() { - let convert_breadcrumbs_to_logs = utils::sample( - global_config - .options - .ourlogs_breadcrumb_extraction_sample_rate - .unwrap_or(0.0), - ); - - if convert_breadcrumbs_to_logs { - relay_log::trace!("extracting breadcrumbs to logs"); - let ourlogs: Vec = - breadcrumbs_to_ourlogs(event_value, config.max_breadcrumbs_converted()); - - if !ourlogs.is_empty() { - for ourlog in ourlogs { - let mut log_item = Item::new(ItemType::Log); - if let Ok(payload) = Annotated::new(ourlog).to_json() { - log_item.set_payload(ContentType::Json, payload); - envelope.add_item(log_item); - } - } - ourlogs_extracted = Some(OurLogsExtracted(true)); - } - } - } - Ok(ExtractionResult { event, event_metrics_extracted, spans_extracted, - ourlogs_extracted, }) } @@ -408,7 +377,6 @@ pub fn serialize( event_fully_normalized: EventFullyNormalized, event_metrics_extracted: EventMetricsExtracted, spans_extracted: SpansExtracted, - ourlogs_extracted: OurLogsExtracted, ) -> Result<(), ProcessingError> { if event.is_empty() { relay_log::error!("Cannot serialize empty event"); @@ -428,7 +396,6 @@ pub fn serialize( event_item.set_metrics_extracted(event_metrics_extracted.0); event_item.set_spans_extracted(spans_extracted.0); event_item.set_fully_normalized(event_fully_normalized.0); - event_item.set_ourlogs_extracted(ourlogs_extracted.0); managed_envelope.envelope_mut().add_item(event_item); diff --git a/relay-server/src/services/processor/ourlog.rs b/relay-server/src/services/processor/ourlog.rs index d4a6ad8bbf..147e3b6f6c 100644 --- a/relay-server/src/services/processor/ourlog.rs +++ b/relay-server/src/services/processor/ourlog.rs @@ -14,11 +14,12 @@ use { crate::envelope::ContentType, crate::envelope::{Item, ItemType}, crate::services::outcome::{DiscardReason, Outcome}, - crate::services::processor::ProcessingError, - relay_dynamic_config::ProjectConfig, + crate::services::processor::{EventProcessing, ProcessingError}, + crate::utils::{self}, + relay_dynamic_config::{GlobalConfig, ProjectConfig}, relay_event_schema::processor::{process_value, ProcessingState}, - relay_event_schema::protocol::OurLog, - relay_ourlogs::OtelLog, + relay_event_schema::protocol::{Event, OurLog}, + relay_ourlogs::{breadcrumbs_to_ourlogs, OtelLog}, relay_pii::PiiProcessor, relay_protocol::Annotated, }; @@ -103,3 +104,43 @@ fn scrub( Ok(()) } + +/// Extract breadcrumbs from an event and convert them to logs. +#[cfg(feature = "processing")] +pub fn extract_from_event( + managed_envelope: &mut TypedEnvelope, + event: &Annotated, + global_config: &GlobalConfig, +) { + let Some(event) = event.value() else { + return; + }; + + let convert_breadcrumbs_to_logs = utils::sample( + global_config + .options + .ourlogs_breadcrumb_extraction_sample_rate + .unwrap_or(0.0), + ); + + if convert_breadcrumbs_to_logs { + relay_log::trace!("extracting breadcrumbs to logs"); + let ourlogs = breadcrumbs_to_ourlogs( + event, + global_config + .options + .ourlogs_breadcrumb_extraction_max_breadcrumbs_converted, + ); + + if let Some(ourlogs) = ourlogs { + for ourlog in ourlogs { + let mut log_item = Item::new(ItemType::Log); + if let Ok(payload) = Annotated::new(ourlog).to_json() { + log_item.set_payload(ContentType::Json, payload); + relay_log::trace!("Adding log to envelope"); + managed_envelope.envelope_mut().add_item(log_item); + } + } + } + } +} diff --git a/tests/integration/test_ourlogs.py b/tests/integration/test_ourlogs.py index 0ad2a9c17a..11b535753d 100644 --- a/tests/integration/test_ourlogs.py +++ b/tests/integration/test_ourlogs.py @@ -143,13 +143,20 @@ def test_ourlog_breadcrumb_extraction_sample_rate( ourlogs_consumer = ourlogs_consumer() project_id = 42 + project_config = mini_sentry.add_full_project_config(project_id) + project_config["config"]["features"] = [ + "projects:ourlogs-breadcrumb-extraction", + "organizations:ourlogs-ingestion", + ] + mini_sentry.global_config["options"] = { - "relay.ourlogs-breadcrumb-extraction.sample-rate": sample_rate + "relay.ourlogs-breadcrumb-extraction.sample-rate": sample_rate, + "relay.ourlogs-breadcrumb-extraction.max-breadcrumbs-converted": 100, } - def send_event_with_breadcrumb(relay): - event = make_transaction({"event_id": "cbf6960622e14a45abc1f03b2055b186"}) - event["breadcrumbs"] = [ + def send_transaction_with_breadcrumb(upstream): + transaction = make_transaction({"event_id": "cbf6960622e14a45abc1f03b2055b186"}) + transaction["breadcrumbs"] = [ { "timestamp": datetime.now(timezone.utc).isoformat(), "message": "Test breadcrumb", @@ -157,11 +164,11 @@ def send_event_with_breadcrumb(relay): "level": "info", } ] - relay.send_event(project_id, event) + envelope = Envelope() + envelope.add_transaction(transaction) + upstream.send_envelope(project_id, envelope) relay = relay_with_processing(options=TEST_CONFIG) - project_config = mini_sentry.add_full_project_config(project_id) - project_config["config"]["features"] = ["organizations:ourlogs-ingestion"] - send_event_with_breadcrumb(relay) + send_transaction_with_breadcrumb(relay) assert len(ourlogs_consumer.get_ourlogs()) == expected_ourlogs ourlogs_consumer.assert_empty() From da7633aacb7a56a410657eb8b66586981e2b5d2a Mon Sep 17 00:00:00 2001 From: Kev Date: Fri, 31 Jan 2025 00:35:17 -0500 Subject: [PATCH 4/5] Address deduplication concerns --- .../src/protocol/contexts/mod.rs | 5 ++ .../src/protocol/contexts/our_logs.rs | 68 ++++++++++++++++++ relay-ourlogs/src/ourlog.rs | 71 ++++++++++++++++++- tests/integration/test_ourlogs.py | 4 ++ 4 files changed, 146 insertions(+), 2 deletions(-) create mode 100644 relay-event-schema/src/protocol/contexts/our_logs.rs diff --git a/relay-event-schema/src/protocol/contexts/mod.rs b/relay-event-schema/src/protocol/contexts/mod.rs index 2306b4a060..78bdfcf6f8 100644 --- a/relay-event-schema/src/protocol/contexts/mod.rs +++ b/relay-event-schema/src/protocol/contexts/mod.rs @@ -8,6 +8,7 @@ mod monitor; mod nel; mod os; mod otel; +mod our_logs; mod performance_score; mod profile; mod replay; @@ -25,6 +26,7 @@ pub use monitor::*; pub use nel::*; pub use os::*; pub use otel::*; +pub use our_logs::*; pub use performance_score::*; pub use profile::*; pub use replay::*; @@ -90,6 +92,9 @@ pub enum Context { Nel(Box), /// Performance score information. PerformanceScore(Box), + /// Ourlogs (logs product) information. + #[metastructure(tag = "sentry_logs")] + OurLogs(Box), /// Additional arbitrary fields for forwards compatibility. #[metastructure(fallback_variant)] Other(#[metastructure(pii = "true")] Object), diff --git a/relay-event-schema/src/protocol/contexts/our_logs.rs b/relay-event-schema/src/protocol/contexts/our_logs.rs new file mode 100644 index 0000000000..8c20a35f2b --- /dev/null +++ b/relay-event-schema/src/protocol/contexts/our_logs.rs @@ -0,0 +1,68 @@ +use relay_protocol::{Annotated, Empty, FromValue, IntoValue, Object, Value}; + +use crate::processor::ProcessValue; + +/// Our Logs context. +/// +/// The Sentry Logs context contains information about our logging product (ourlogs) for an event. +#[derive(Clone, Debug, Default, PartialEq, Empty, FromValue, IntoValue, ProcessValue)] +pub struct OurLogsContext { + /// Whether breadcrumbs are being deduplicated. + pub deduplicated_breadcrumbs: Annotated, + + /// Additional arbitrary fields for forwards compatibility. + #[metastructure(additional_properties, retain = true)] + pub other: Object, +} + +impl super::DefaultContext for OurLogsContext { + fn default_key() -> &'static str { + "sentry_logs" // Ourlogs is an internal name, and 'logs' likely has conflicts with user contexts. + } + + fn from_context(context: super::Context) -> Option { + match context { + super::Context::OurLogs(c) => Some(*c), + _ => None, + } + } + + fn cast(context: &super::Context) -> Option<&Self> { + match context { + super::Context::OurLogs(c) => Some(c), + _ => None, + } + } + + fn cast_mut(context: &mut super::Context) -> Option<&mut Self> { + match context { + super::Context::OurLogs(c) => Some(c), + _ => None, + } + } + + fn into_context(self) -> super::Context { + super::Context::OurLogs(Box::new(self)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::protocol::Context; + + #[test] + fn test_our_logs_context() { + let json = r#"{ + "deduplicated_breadcrumbs": true, + "type": "sentry_logs" +}"#; + let context = Annotated::new(Context::OurLogs(Box::new(OurLogsContext { + deduplicated_breadcrumbs: Annotated::new(true), + ..OurLogsContext::default() + }))); + + assert_eq!(context, Annotated::from_json(json).unwrap()); + assert_eq!(json, context.to_json_pretty().unwrap()); + } +} diff --git a/relay-ourlogs/src/ourlog.rs b/relay-ourlogs/src/ourlog.rs index 1bbc811379..3a88e223a7 100644 --- a/relay-ourlogs/src/ourlog.rs +++ b/relay-ourlogs/src/ourlog.rs @@ -1,7 +1,7 @@ use crate::OtelLog; use opentelemetry_proto::tonic::common::v1::any_value::Value as OtelValue; use relay_event_schema::protocol::{ - AttributeValue, Breadcrumb, Event, Level, OurLog, SpanId, TraceContext, TraceId, + AttributeValue, Breadcrumb, Event, Level, OurLog, OurLogsContext, SpanId, TraceContext, TraceId, }; use relay_protocol::{Annotated, Object, Value}; @@ -75,6 +75,14 @@ pub fn otel_to_sentry_log(otel_log: OtelLog) -> OurLog { /// /// Only converts up to `max_breadcrumbs` breadcrumbs. pub fn breadcrumbs_to_ourlogs(event: &Event, max_breadcrumbs: usize) -> Option> { + let deduplicated_breadcrumbs = event + .context::() + .and_then(|ctx| ctx.deduplicated_breadcrumbs.value())?; + if !deduplicated_breadcrumbs { + // Only deduplicated breadcrumbs are supported. + return None; + } + let event_trace_id = event .context::() .and_then(|trace_ctx| trace_ctx.trace_id.value()); @@ -204,7 +212,7 @@ fn format_default_breadcrumb( mod tests { use super::*; use chrono::{TimeZone, Utc}; - use relay_event_schema::protocol::{Breadcrumb, Level, Values}; + use relay_event_schema::protocol::{Breadcrumb, Contexts, Level, Values}; use relay_protocol::{get_path, get_value}; #[test] @@ -380,11 +388,18 @@ mod tests { ..Default::default() }; + let mut contexts = Contexts::new(); + contexts.add(OurLogsContext { + deduplicated_breadcrumbs: Annotated::new(true), + other: Object::default(), + }); + let event = Event { breadcrumbs: Annotated::new(Values { values: Annotated::new(vec![Annotated::new(breadcrumb)]), other: Object::default(), }), + contexts: Annotated::new(contexts), ..Default::default() }; @@ -408,11 +423,18 @@ mod tests { breadcrumbs.push(Annotated::new(breadcrumb)); } + let mut contexts = Contexts::new(); + contexts.add(OurLogsContext { + deduplicated_breadcrumbs: Annotated::new(true), + other: Object::default(), + }); + let event = Event { breadcrumbs: Annotated::new(Values { values: Annotated::new(breadcrumbs), other: Object::default(), }), + contexts: Annotated::new(contexts), ..Default::default() }; @@ -478,11 +500,18 @@ mod tests { ..Default::default() }; + let mut contexts = Contexts::new(); + contexts.add(OurLogsContext { + deduplicated_breadcrumbs: Annotated::new(true), + other: Object::default(), + }); + let event = Event { breadcrumbs: Annotated::new(Values { values: Annotated::new(vec![Annotated::new(breadcrumb)]), other: Object::default(), }), + contexts: Annotated::new(contexts), ..Default::default() }; @@ -492,4 +521,42 @@ mod tests { let annotated_log = Annotated::new(ourlogs[0].clone()); assert_eq!(json, annotated_log.to_json_pretty().unwrap()); } + + #[test] + fn test_no_breadcrumbs_without_deduplicated_flag() { + let timestamp = Utc.with_ymd_and_hms(2020, 1, 1, 0, 0, 0).unwrap(); + let breadcrumb = Breadcrumb { + message: Annotated::new("test message".to_string()), + timestamp: Annotated::new(timestamp.into()), + ..Default::default() + }; + + let mut contexts = Contexts::new(); + contexts.add(OurLogsContext { + deduplicated_breadcrumbs: Annotated::new(false), + other: Object::default(), + }); + + let event = Event { + breadcrumbs: Annotated::new(Values { + values: Annotated::new(vec![Annotated::new(breadcrumb.clone())]), + other: Object::default(), + }), + contexts: Annotated::new(contexts), + ..Default::default() + }; + + assert!(breadcrumbs_to_ourlogs(&event, 100).is_none()); + + // Check unset + let event = Event { + breadcrumbs: Annotated::new(Values { + values: Annotated::new(vec![Annotated::new(breadcrumb)]), + other: Object::default(), + }), + ..Default::default() + }; + + assert!(breadcrumbs_to_ourlogs(&event, 100).is_none()); + } } diff --git a/tests/integration/test_ourlogs.py b/tests/integration/test_ourlogs.py index 11b535753d..7f21cb9843 100644 --- a/tests/integration/test_ourlogs.py +++ b/tests/integration/test_ourlogs.py @@ -164,6 +164,10 @@ def send_transaction_with_breadcrumb(upstream): "level": "info", } ] + transaction["contexts"]["sentry_logs"] = { + "deduplicated_breadcrumbs": True, + "type": "sentry_logs", + } envelope = Envelope() envelope.add_transaction(transaction) upstream.send_envelope(project_id, envelope) From c53455391cf166970595021072b6d13c3839d00a Mon Sep 17 00:00:00 2001 From: Kev Date: Fri, 31 Jan 2025 11:47:45 -0500 Subject: [PATCH 5/5] Couple more things --- .../src/protocol/contexts/mod.rs | 2 +- relay-server/src/services/processor.rs | 22 ++++++++++--------- 2 files changed, 13 insertions(+), 11 deletions(-) diff --git a/relay-event-schema/src/protocol/contexts/mod.rs b/relay-event-schema/src/protocol/contexts/mod.rs index 78bdfcf6f8..6dd48f4ae2 100644 --- a/relay-event-schema/src/protocol/contexts/mod.rs +++ b/relay-event-schema/src/protocol/contexts/mod.rs @@ -114,7 +114,7 @@ impl From for ContextInner { /// name is `contexts`. /// /// The `contexts` type can be used to define arbitrary contextual data on the event. It accepts an -/// object of key/value pairs. The key is the “alias” of the context and can be freely chosen. +/// object of key/value pairs. The key is the "alias" of the context and can be freely chosen. /// However, as per policy, it should match the type of the context unless there are two values for /// a type. You can omit `type` if the key name is the type. /// diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index 4fa0934ffc..8f89ea4055 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -1601,14 +1601,6 @@ impl EnvelopeProcessorService { )?; }); - if_processing!(self.inner.config, { - ourlog::extract_from_event( - managed_envelope, - &event, - &self.inner.global_config.current(), - ); - }); - if event.value().is_some() { event::scrub(&mut event, project_info.clone())?; event::serialize( @@ -1621,7 +1613,17 @@ impl EnvelopeProcessorService { event::emit_feedback_metrics(managed_envelope.envelope()); } - attachment::scrub(managed_envelope, project_info); + attachment::scrub(managed_envelope, project_info.clone()); + + if_processing!(self.inner.config, { + if project_info.has_feature(Feature::OurLogsBreadcrumbExtraction) { + ourlog::extract_from_event( + managed_envelope, + &event, + &self.inner.global_config.current(), + ); + } + }); if self.inner.config.processing_enabled() && !event_fully_normalized.0 { relay_log::error!( @@ -1914,7 +1916,7 @@ impl EnvelopeProcessorService { }); report::process_user_reports(managed_envelope); - attachment::scrub(managed_envelope, project_info); + attachment::scrub(managed_envelope, project_info.clone()); Ok(Some(extracted_metrics)) }