From cfed85d313fd804fd0b47761bd5aece9199e04a6 Mon Sep 17 00:00:00 2001 From: srothh Date: Fri, 11 Jul 2025 10:49:35 +0200 Subject: [PATCH 01/15] ref(transport): Added shared sync/async transport superclass and created a sync transport HTTP subclass Moved shared sync/async logic into a new superclass (HttpTransportCore), and moved sync transport specific code into a new subclass(BaseSyncHttpTransport), from which the current transport implementations inherit Fixes GH-4568 --- sentry_sdk/client.py | 4 +- sentry_sdk/transport.py | 151 ++++++++++++++++++++++++---------------- 2 files changed, 92 insertions(+), 63 deletions(-) diff --git a/sentry_sdk/client.py b/sentry_sdk/client.py index 98553d8993..917701f39c 100644 --- a/sentry_sdk/client.py +++ b/sentry_sdk/client.py @@ -25,7 +25,7 @@ ) from sentry_sdk.serializer import serialize from sentry_sdk.tracing import trace -from sentry_sdk.transport import BaseHttpTransport, make_transport +from sentry_sdk.transport import HttpTransportCore, make_transport from sentry_sdk.consts import ( SPANDATA, DEFAULT_MAX_VALUE_LENGTH, @@ -406,7 +406,7 @@ def _capture_envelope(envelope: Envelope) -> None: self.monitor or self.log_batcher or has_profiling_enabled(self.options) - or isinstance(self.transport, BaseHttpTransport) + or isinstance(self.transport, HttpTransportCore) ): # If we have anything on that could spawn a background thread, we # need to check if it's safe to use them. diff --git a/sentry_sdk/transport.py b/sentry_sdk/transport.py index ac7a8c3522..3a0d4ec991 100644 --- a/sentry_sdk/transport.py +++ b/sentry_sdk/transport.py @@ -162,7 +162,7 @@ def _parse_rate_limits( continue -class BaseHttpTransport(Transport): +class HttpTransportCore(Transport): """The base HTTP transport.""" TIMEOUT = 30 # seconds @@ -286,12 +286,8 @@ def _update_rate_limits( seconds=retry_after ) - def _send_request( - self: Self, - body: bytes, - headers: Dict[str, str], - endpoint_type: EndpointType = EndpointType.ENVELOPE, - envelope: Optional[Envelope] = None, + def _handle_request_error( + self: Self, envelope: Optional[Envelope], loss_reason: str = "network" ) -> None: def record_loss(reason: str) -> None: if envelope is None: @@ -300,45 +296,45 @@ def record_loss(reason: str) -> None: for item in envelope.items: self.record_lost_event(reason, item=item) + self.on_dropped_event(loss_reason) + record_loss("network_error") + + def _handle_response( + self: Self, + response: Union[urllib3.BaseHTTPResponse, httpcore.Response], + envelope: Optional[Envelope], + ) -> None: + self._update_rate_limits(response) + + if response.status == 429: + # if we hit a 429. Something was rate limited but we already + # acted on this in `self._update_rate_limits`. Note that we + # do not want to record event loss here as we will have recorded + # an outcome in relay already. + self.on_dropped_event("status_429") + pass + + elif response.status >= 300 or response.status < 200: + logger.error( + "Unexpected status code: %s (body: %s)", + response.status, + getattr(response, "data", getattr(response, "content", None)), + ) + self._handle_request_error( + envelope=envelope, loss_reason="status_{}".format(response.status) + ) + + def _update_headers( + self: Self, + headers: Dict[str, str], + ) -> None: + headers.update( { "User-Agent": str(self._auth.client), "X-Sentry-Auth": str(self._auth.to_header()), } ) - try: - response = self._request( - "POST", - endpoint_type, - body, - headers, - ) - except Exception: - self.on_dropped_event("network") - record_loss("network_error") - raise - - try: - self._update_rate_limits(response) - - if response.status == 429: - # if we hit a 429. Something was rate limited but we already - # acted on this in `self._update_rate_limits`. Note that we - # do not want to record event loss here as we will have recorded - # an outcome in relay already. - self.on_dropped_event("status_429") - pass - - elif response.status >= 300 or response.status < 200: - logger.error( - "Unexpected status code: %s (body: %s)", - response.status, - getattr(response, "data", getattr(response, "content", None)), - ) - self.on_dropped_event("status_{}".format(response.status)) - record_loss("network_error") - finally: - response.close() def on_dropped_event(self: Self, _reason: str) -> None: return None @@ -375,11 +371,6 @@ def _fetch_pending_client_report( type="client_report", ) - def _flush_client_reports(self: Self, force: bool = False) -> None: - client_report = self._fetch_pending_client_report(force=force, interval=60) - if client_report is not None: - self.capture_envelope(Envelope(items=[client_report])) - def _check_disabled(self: Self, category: EventDataCategory) -> bool: def _disabled(bucket: Optional[EventDataCategory]) -> bool: ts = self._disabled_until.get(bucket) @@ -398,9 +389,9 @@ def _is_worker_full(self: Self) -> bool: def is_healthy(self: Self) -> bool: return not (self._is_worker_full() or self._is_rate_limited()) - def _send_envelope(self: Self, envelope: Envelope) -> None: - - # remove all items from the envelope which are over quota + def _prepare_envelope( + self: Self, envelope: Envelope + ) -> Optional[Tuple[Envelope, io.BytesIO, Dict[str, str]]]: new_items = [] for item in envelope.items: if self._check_disabled(item.data_category): @@ -442,13 +433,7 @@ def _send_envelope(self: Self, envelope: Envelope) -> None: if content_encoding: headers["Content-Encoding"] = content_encoding - self._send_request( - body.getvalue(), - headers=headers, - endpoint_type=EndpointType.ENVELOPE, - envelope=envelope, - ) - return None + return envelope, body, headers def _serialize_envelope( self: Self, envelope: Envelope @@ -506,6 +491,54 @@ def _request( ) -> Union[urllib3.BaseHTTPResponse, httpcore.Response]: raise NotImplementedError() + def kill(self: Self) -> None: + logger.debug("Killing HTTP transport") + self._worker.kill() + + +class BaseSyncHttpTransport(HttpTransportCore): + + def _send_envelope(self: Self, envelope: Envelope) -> None: + _prepared_envelope = self._prepare_envelope(envelope) + if _prepared_envelope is None: # TODO: check this behaviour in detail + return None + envelope, body, headers = _prepared_envelope + self._send_request( + body.getvalue(), + headers=headers, + endpoint_type=EndpointType.ENVELOPE, + envelope=envelope, + ) + return None + + def _send_request( + self: Self, + body: bytes, + headers: Dict[str, str], + endpoint_type: EndpointType, + envelope: Optional[Envelope], + ) -> None: + self._update_headers(headers) + try: + response = self._request( + "POST", + endpoint_type, + body, + headers, + ) + except Exception: + self._handle_request_error(envelope=envelope, loss_reason="network") + raise + try: + self._handle_response(response=response, envelope=envelope) + finally: + response.close() + + def _flush_client_reports(self: Self, force: bool = False) -> None: + client_report = self._fetch_pending_client_report(force=force, interval=60) + if client_report is not None: + self.capture_envelope(Envelope(items=[client_report])) + def capture_envelope(self: Self, envelope: Envelope) -> None: def send_envelope_wrapper() -> None: with capture_internal_exceptions(): @@ -528,12 +561,8 @@ def flush( self._worker.submit(lambda: self._flush_client_reports(force=True)) self._worker.flush(timeout, callback) - def kill(self: Self) -> None: - logger.debug("Killing HTTP transport") - self._worker.kill() - -class HttpTransport(BaseHttpTransport): +class HttpTransport(BaseSyncHttpTransport): if TYPE_CHECKING: _pool: Union[PoolManager, ProxyManager] @@ -650,7 +679,7 @@ def __init__(self: Self, options: Dict[str, Any]) -> None: else: - class Http2Transport(BaseHttpTransport): # type: ignore + class Http2Transport(BaseSyncHttpTransport): # type: ignore """The HTTP2 transport based on httpcore.""" TIMEOUT = 15 From 3fbb6da7ba7e1a967b8c7b32e4b68cbcdbd22cd0 Mon Sep 17 00:00:00 2001 From: srothh Date: Fri, 11 Jul 2025 11:30:29 +0200 Subject: [PATCH 02/15] ref(transport) Removed Todo and reverted class name change Removed an unnecessary TODO message and reverted a class name change for BaseHTTPTransport. GH-4568 --- sentry_sdk/client.py | 4 ++-- sentry_sdk/transport.py | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/sentry_sdk/client.py b/sentry_sdk/client.py index 917701f39c..98553d8993 100644 --- a/sentry_sdk/client.py +++ b/sentry_sdk/client.py @@ -25,7 +25,7 @@ ) from sentry_sdk.serializer import serialize from sentry_sdk.tracing import trace -from sentry_sdk.transport import HttpTransportCore, make_transport +from sentry_sdk.transport import BaseHttpTransport, make_transport from sentry_sdk.consts import ( SPANDATA, DEFAULT_MAX_VALUE_LENGTH, @@ -406,7 +406,7 @@ def _capture_envelope(envelope: Envelope) -> None: self.monitor or self.log_batcher or has_profiling_enabled(self.options) - or isinstance(self.transport, HttpTransportCore) + or isinstance(self.transport, BaseHttpTransport) ): # If we have anything on that could spawn a background thread, we # need to check if it's safe to use them. diff --git a/sentry_sdk/transport.py b/sentry_sdk/transport.py index 3a0d4ec991..89651f355b 100644 --- a/sentry_sdk/transport.py +++ b/sentry_sdk/transport.py @@ -162,7 +162,7 @@ def _parse_rate_limits( continue -class HttpTransportCore(Transport): +class BaseHttpTransport(Transport): """The base HTTP transport.""" TIMEOUT = 30 # seconds @@ -496,11 +496,11 @@ def kill(self: Self) -> None: self._worker.kill() -class BaseSyncHttpTransport(HttpTransportCore): +class BaseSyncHttpTransport(BaseHttpTransport): def _send_envelope(self: Self, envelope: Envelope) -> None: _prepared_envelope = self._prepare_envelope(envelope) - if _prepared_envelope is None: # TODO: check this behaviour in detail + if _prepared_envelope is None: return None envelope, body, headers = _prepared_envelope self._send_request( From 2f2cfdff85b3673a2532c864fbe01c0470b87f0f Mon Sep 17 00:00:00 2001 From: srothh Date: Fri, 11 Jul 2025 12:34:28 +0200 Subject: [PATCH 03/15] test(transport): Add test for HTTP error status handling Adds test coverage for the error handling path when HTTP requests return error status codes. GH-4568 --- tests/test_transport.py | 39 +++++++++++++++++++++++++++++++++++++-- 1 file changed, 37 insertions(+), 2 deletions(-) diff --git a/tests/test_transport.py b/tests/test_transport.py index 7e0cc6383c..76db2b8e82 100644 --- a/tests/test_transport.py +++ b/tests/test_transport.py @@ -619,9 +619,9 @@ def test_record_lost_event_transaction_item(capturing_server, make_client, span_ transport.record_lost_event(reason="test", item=transaction_item) client.flush() - (captured,) = capturing_server.captured # Should only be one envelope + (captured,) = capturing_server.captured envelope = captured.envelope - (item,) = envelope.items # Envelope should only have one item + (item,) = envelope.items assert item.type == "client_report" @@ -641,3 +641,38 @@ def test_record_lost_event_transaction_item(capturing_server, make_client, span_ "reason": "test", "quantity": span_count + 1, } in discarded_events + + +def test_handle_unexpected_status_invokes_handle_request_error( + make_client, monkeypatch +): + client = make_client() + transport = client.transport + + monkeypatch.setattr(transport._worker, "submit", lambda fn: fn() or True) + + def stub_request(method, endpoint, body=None, headers=None): + class MockResponse: + def __init__(self): + self.status = 500 # Integer + self.data = b"server error" + self.headers = {} + + def close(self): + pass + + return MockResponse() + + monkeypatch.setattr(transport, "_request", stub_request) + + seen = [] + monkeypatch.setattr( + transport, + "_handle_request_error", + lambda envelope, loss_reason: seen.append(loss_reason), + ) + + client.capture_event({"message": "test"}) + client.flush() + + assert seen == ["status_500"] From 356e905d315f5679105393eaf2a79f455c29aa16 Mon Sep 17 00:00:00 2001 From: srothh Date: Fri, 11 Jul 2025 14:48:30 +0200 Subject: [PATCH 04/15] test(transport): Restore accidentally removed comments Restore comments accidentally removed during a previous commit. --- tests/test_transport.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_transport.py b/tests/test_transport.py index 76db2b8e82..bd87728962 100644 --- a/tests/test_transport.py +++ b/tests/test_transport.py @@ -619,9 +619,9 @@ def test_record_lost_event_transaction_item(capturing_server, make_client, span_ transport.record_lost_event(reason="test", item=transaction_item) client.flush() - (captured,) = capturing_server.captured + (captured,) = capturing_server.captured # Should only be one envelope envelope = captured.envelope - (item,) = envelope.items + (item,) = envelope.items # Envelope should only have one item assert item.type == "client_report" From 781f7d22e859fce3bf12646e17b74fa8af9dbcb3 Mon Sep 17 00:00:00 2001 From: srothh Date: Mon, 14 Jul 2025 09:27:57 +0200 Subject: [PATCH 05/15] ref(transport) Refactor class names to reflect previous functionality Refactored class names such that BaseHttpTransport now has the same functionality as before the hierarchy refactor GH-4568 --- sentry_sdk/client.py | 4 ++-- sentry_sdk/transport.py | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/sentry_sdk/client.py b/sentry_sdk/client.py index 98553d8993..917701f39c 100644 --- a/sentry_sdk/client.py +++ b/sentry_sdk/client.py @@ -25,7 +25,7 @@ ) from sentry_sdk.serializer import serialize from sentry_sdk.tracing import trace -from sentry_sdk.transport import BaseHttpTransport, make_transport +from sentry_sdk.transport import HttpTransportCore, make_transport from sentry_sdk.consts import ( SPANDATA, DEFAULT_MAX_VALUE_LENGTH, @@ -406,7 +406,7 @@ def _capture_envelope(envelope: Envelope) -> None: self.monitor or self.log_batcher or has_profiling_enabled(self.options) - or isinstance(self.transport, BaseHttpTransport) + or isinstance(self.transport, HttpTransportCore) ): # If we have anything on that could spawn a background thread, we # need to check if it's safe to use them. diff --git a/sentry_sdk/transport.py b/sentry_sdk/transport.py index 89651f355b..e0e9694b90 100644 --- a/sentry_sdk/transport.py +++ b/sentry_sdk/transport.py @@ -162,7 +162,7 @@ def _parse_rate_limits( continue -class BaseHttpTransport(Transport): +class HttpTransportCore(Transport): """The base HTTP transport.""" TIMEOUT = 30 # seconds @@ -496,7 +496,7 @@ def kill(self: Self) -> None: self._worker.kill() -class BaseSyncHttpTransport(BaseHttpTransport): +class BaseHttpTransport(HttpTransportCore): def _send_envelope(self: Self, envelope: Envelope) -> None: _prepared_envelope = self._prepare_envelope(envelope) @@ -562,7 +562,7 @@ def flush( self._worker.flush(timeout, callback) -class HttpTransport(BaseSyncHttpTransport): +class HttpTransport(BaseHttpTransport): if TYPE_CHECKING: _pool: Union[PoolManager, ProxyManager] @@ -679,7 +679,7 @@ def __init__(self: Self, options: Dict[str, Any]) -> None: else: - class Http2Transport(BaseSyncHttpTransport): # type: ignore + class Http2Transport(BaseHttpTransport): # type: ignore """The HTTP2 transport based on httpcore.""" TIMEOUT = 15 From 2135cd6fcc0ab9e007b88af546f53dc0c002719f Mon Sep 17 00:00:00 2001 From: srothh Date: Thu, 17 Jul 2025 11:48:11 +0200 Subject: [PATCH 06/15] ref(transport): Add flush_async in the Transport abc Add a new flush_async method in the Transport ABC. This is needed for the async transport, as calling it from the client while preserving execution order in close will require flush to be a coroutine, not a function. GH-4568 --- sentry_sdk/transport.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/sentry_sdk/transport.py b/sentry_sdk/transport.py index e0e9694b90..60c6ae794a 100644 --- a/sentry_sdk/transport.py +++ b/sentry_sdk/transport.py @@ -107,6 +107,19 @@ def flush( """ return None + async def flush_async( + self: Self, + timeout: float, + callback: Optional[Any] = None, + ) -> None: + """ + Send out current events within `timeout` seconds. This method needs to be awaited for blocking behavior. + + The default implementation is a no-op, since this method may only be relevant to some transports. + Subclasses should override this method if necessary. + """ + return None + def kill(self: Self) -> None: """ Forcefully kills the transport. From 7916ab24e18fefa0bdf1aa59c0ed21bd32cfc276 Mon Sep 17 00:00:00 2001 From: srothh Date: Thu, 17 Jul 2025 12:33:26 +0200 Subject: [PATCH 07/15] ref(transport): Move flush_async from ABC Move flush_async down to the specific async transport subclass. This makes more sense anyway, as this will only be required by the async transport. If more async transports are expected, another shared superclass can be created. GH-4568 --- sentry_sdk/transport.py | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/sentry_sdk/transport.py b/sentry_sdk/transport.py index 60c6ae794a..e0e9694b90 100644 --- a/sentry_sdk/transport.py +++ b/sentry_sdk/transport.py @@ -107,19 +107,6 @@ def flush( """ return None - async def flush_async( - self: Self, - timeout: float, - callback: Optional[Any] = None, - ) -> None: - """ - Send out current events within `timeout` seconds. This method needs to be awaited for blocking behavior. - - The default implementation is a no-op, since this method may only be relevant to some transports. - Subclasses should override this method if necessary. - """ - return None - def kill(self: Self) -> None: """ Forcefully kills the transport. From cb3c2a9e276c285d7f93f09fbca4f91ae9730f50 Mon Sep 17 00:00:00 2001 From: srothh Date: Wed, 23 Jul 2025 15:51:58 +0200 Subject: [PATCH 08/15] ref(transport): add async type annotations to HTTPTransportCore Add necessary type annotations to the core HttpTransport to accomodate for async transport. GH-4568 --- sentry_sdk/transport.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sentry_sdk/transport.py b/sentry_sdk/transport.py index e0e9694b90..c9aa3e15ea 100644 --- a/sentry_sdk/transport.py +++ b/sentry_sdk/transport.py @@ -479,6 +479,9 @@ def _make_pool( httpcore.SOCKSProxy, httpcore.HTTPProxy, httpcore.ConnectionPool, + httpcore.AsyncSOCKSProxy, + httpcore.AsyncHTTPProxy, + httpcore.AsyncConnectionPool, ]: raise NotImplementedError() From 5fd43178c5700bc21c3a73cff2578ac7786d4df3 Mon Sep 17 00:00:00 2001 From: srothh Date: Thu, 24 Jul 2025 14:21:17 +0200 Subject: [PATCH 09/15] ref(transport): Refactor transport class to restore former comments and cleaner return paths GH-4568 --- sentry_sdk/transport.py | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/sentry_sdk/transport.py b/sentry_sdk/transport.py index c9aa3e15ea..d612028250 100644 --- a/sentry_sdk/transport.py +++ b/sentry_sdk/transport.py @@ -163,7 +163,7 @@ def _parse_rate_limits( class HttpTransportCore(Transport): - """The base HTTP transport.""" + """Shared base class for sync and async transports.""" TIMEOUT = 30 # seconds @@ -392,6 +392,8 @@ def is_healthy(self: Self) -> bool: def _prepare_envelope( self: Self, envelope: Envelope ) -> Optional[Tuple[Envelope, io.BytesIO, Dict[str, str]]]: + + # remove all items from the envelope which are over quota new_items = [] for item in envelope.items: if self._check_disabled(item.data_category): @@ -500,18 +502,18 @@ def kill(self: Self) -> None: class BaseHttpTransport(HttpTransportCore): + """The base HTTP transport.""" def _send_envelope(self: Self, envelope: Envelope) -> None: _prepared_envelope = self._prepare_envelope(envelope) - if _prepared_envelope is None: - return None - envelope, body, headers = _prepared_envelope - self._send_request( - body.getvalue(), - headers=headers, - endpoint_type=EndpointType.ENVELOPE, - envelope=envelope, - ) + if _prepared_envelope is not None: + envelope, body, headers = _prepared_envelope + self._send_request( + body.getvalue(), + headers=headers, + endpoint_type=EndpointType.ENVELOPE, + envelope=envelope, + ) return None def _send_request( From 4e56e5c2348d7235f40002e82f58ed948f8a3fbe Mon Sep 17 00:00:00 2001 From: srothh Date: Thu, 24 Jul 2025 14:28:44 +0200 Subject: [PATCH 10/15] ref(transport): Add test for record_loss GH-4568 --- tests/test_transport.py | 41 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/tests/test_transport.py b/tests/test_transport.py index bd87728962..e612bfcaa5 100644 --- a/tests/test_transport.py +++ b/tests/test_transport.py @@ -676,3 +676,44 @@ def close(self): client.flush() assert seen == ["status_500"] + + +def test_handle_request_error_basic_coverage(make_client, monkeypatch): + client = make_client() + transport = client.transport + + monkeypatch.setattr(transport._worker, "submit", lambda fn: fn() or True) + + # Track method calls + calls = [] + + def mock_on_dropped_event(reason): + calls.append(("on_dropped_event", reason)) + + def mock_record_lost_event(reason, data_category=None, item=None): + calls.append(("record_lost_event", reason, data_category, item)) + + monkeypatch.setattr(transport, "on_dropped_event", mock_on_dropped_event) + monkeypatch.setattr(transport, "record_lost_event", mock_record_lost_event) + + # Test case 1: envelope is None + transport._handle_request_error(envelope=None, loss_reason="test_reason") + + assert len(calls) == 2 + assert calls[0] == ("on_dropped_event", "test_reason") + assert calls[1] == ("record_lost_event", "network_error", "error", None) + + # Reset + calls.clear() + + # Test case 2: envelope with items + envelope = Envelope() + envelope.add_item(mock.MagicMock()) # Simple mock item + envelope.add_item(mock.MagicMock()) # Another mock item + + transport._handle_request_error(envelope=envelope, loss_reason="connection_error") + + assert len(calls) == 3 + assert calls[0] == ("on_dropped_event", "connection_error") + assert calls[1][0:2] == ("record_lost_event", "network_error") + assert calls[2][0:2] == ("record_lost_event", "network_error") From c4a986b5153c2cd816b117cbd2757e3d65211e21 Mon Sep 17 00:00:00 2001 From: Simon Roth <39389607+srothh@users.noreply.github.com> Date: Tue, 12 Aug 2025 15:13:47 +0200 Subject: [PATCH 11/15] Add transport worker factory and abstract base class for background worker (#4580) Add a new abstract base class for the background worker implementations in the transport. This allows for implementation of the upcoming async task-based worker in addition to the current thread based worker used in the sync context. Additionally, add a factory method in the HttpTransportCore shared class, to allow the worker methods to live higher up in the class hierarchy regardless of specific implementation. GH-4578 --- sentry_sdk/transport.py | 8 ++++-- sentry_sdk/worker.py | 63 +++++++++++++++++++++++++++++++++++++++-- 2 files changed, 67 insertions(+), 4 deletions(-) diff --git a/sentry_sdk/transport.py b/sentry_sdk/transport.py index d612028250..f8328cac12 100644 --- a/sentry_sdk/transport.py +++ b/sentry_sdk/transport.py @@ -28,7 +28,7 @@ from sentry_sdk.consts import EndpointType from sentry_sdk.utils import Dsn, logger, capture_internal_exceptions -from sentry_sdk.worker import BackgroundWorker +from sentry_sdk.worker import BackgroundWorker, Worker from sentry_sdk.envelope import Envelope, Item, PayloadRef from typing import TYPE_CHECKING @@ -173,7 +173,7 @@ def __init__(self: Self, options: Dict[str, Any]) -> None: Transport.__init__(self, options) assert self.parsed_dsn is not None self.options: Dict[str, Any] = options - self._worker = BackgroundWorker(queue_size=options["transport_queue_size"]) + self._worker = self._create_worker(options) self._auth = self.parsed_dsn.to_auth("sentry.python/%s" % VERSION) self._disabled_until: Dict[Optional[str], datetime] = {} # We only use this Retry() class for the `get_retry_after` method it exposes @@ -224,6 +224,10 @@ def __init__(self: Self, options: Dict[str, Any]) -> None: elif self._compression_algo == "br": self._compression_level = 4 + def _create_worker(self: Self, options: Dict[str, Any]) -> Worker: + # For now, we only support the threaded sync background worker. + return BackgroundWorker(queue_size=options["transport_queue_size"]) + def record_lost_event( self: Self, reason: str, diff --git a/sentry_sdk/worker.py b/sentry_sdk/worker.py index d911e15623..555539dc3a 100644 --- a/sentry_sdk/worker.py +++ b/sentry_sdk/worker.py @@ -1,4 +1,5 @@ from __future__ import annotations +from abc import ABC, abstractmethod import os import threading @@ -16,7 +17,65 @@ _TERMINATOR = object() -class BackgroundWorker: +class Worker(ABC): + """ + Base class for all workers. + + A worker is used to process events in the background and send them to Sentry. + """ + + @property + @abstractmethod + def is_alive(self) -> bool: + """ + Checks whether the worker is alive and running. + + Returns True if the worker is alive, False otherwise. + """ + pass + + @abstractmethod + def kill(self) -> None: + """ + Kills the worker. + + This method is used to kill the worker. The queue will be drained up to the point where the worker is killed. + The worker will not be able to process any more events. + """ + pass + + def flush( + self, timeout: float, callback: Optional[Callable[[int, float], Any]] = None + ) -> None: + """ + Flush the worker. + + This method blocks until the worker has flushed all events or the specified timeout is reached. + Default implementation is a no-op, since this method may only be relevant to some workers. + Subclasses should override this method if necessary. + """ + return None + + @abstractmethod + def full(self) -> bool: + """ + Checks whether the worker's queue is full. + + Returns True if the queue is full, False otherwise. + """ + pass + + @abstractmethod + def submit(self, callback: Callable[[], Any]) -> bool: + """ + Schedule a callback to be executed by the worker. + + Returns True if the callback was scheduled, False if the queue is full. + """ + pass + + +class BackgroundWorker(Worker): def __init__(self, queue_size: int = DEFAULT_QUEUE_SIZE) -> None: self._queue: Queue = Queue(queue_size) self._lock = threading.Lock() @@ -106,7 +165,7 @@ def _wait_flush(self, timeout: float, callback: Optional[Any]) -> None: pending = self._queue.qsize() + 1 logger.error("flush timed out, dropped %s events", pending) - def submit(self, callback: Callable[[], None]) -> bool: + def submit(self, callback: Callable[[], Any]) -> bool: self._ensure_thread() try: self._queue.put_nowait(callback) From 78890748f8878ee099988a03b23c4f9f68df9226 Mon Sep 17 00:00:00 2001 From: Simon Roth <39389607+srothh@users.noreply.github.com> Date: Tue, 12 Aug 2025 17:30:42 +0200 Subject: [PATCH 12/15] Add async task background worker (#4591) Add a new implementation of the transport background worker based on an async task. This worker mostly mirrors the same functionality as the thread-based worker, with the exception that it exposes a non-blocking async flush (which can be awaited from an async context). Furthermore, the worker itself is not thread-safe and should be called using [run_coroutine_threadsafe](https://docs.python.org/3/library/asyncio-task.html#asyncio.run_coroutine_threadsafe) or similar when called from another thread (this is fixed and handled by the transport). I have kept the fork check from the threaded worker, but I am not sure if it is necessary as forking in an async application would also break the event loop. GH-4581 --------- Co-authored-by: Neel Shah --- sentry_sdk/worker.py | 132 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 132 insertions(+) diff --git a/sentry_sdk/worker.py b/sentry_sdk/worker.py index 555539dc3a..c8dbbb2d73 100644 --- a/sentry_sdk/worker.py +++ b/sentry_sdk/worker.py @@ -2,6 +2,7 @@ from abc import ABC, abstractmethod import os import threading +import asyncio from time import sleep, time from sentry_sdk._queue import Queue, FullError @@ -186,3 +187,134 @@ def _target(self) -> None: finally: self._queue.task_done() sleep(0) + + +class AsyncWorker(Worker): + def __init__(self, queue_size: int = DEFAULT_QUEUE_SIZE) -> None: + self._queue: Optional[asyncio.Queue[Any]] = None + self._queue_size = queue_size + self._task: Optional[asyncio.Task[None]] = None + # Event loop needs to remain in the same process + self._task_for_pid: Optional[int] = None + self._loop: Optional[asyncio.AbstractEventLoop] = None + # Track active callback tasks so they have a strong reference and can be cancelled on kill + self._active_tasks: set[asyncio.Task[None]] = set() + + @property + def is_alive(self) -> bool: + if self._task_for_pid != os.getpid(): + return False + if not self._task or not self._loop: + return False + return self._loop.is_running() and not self._task.done() + + def kill(self) -> None: + if self._task: + if self._queue is not None: + try: + self._queue.put_nowait(_TERMINATOR) + except asyncio.QueueFull: + logger.debug("async worker queue full, kill failed") + # Also cancel any active callback tasks + # Avoid modifying the set while cancelling tasks + tasks_to_cancel = set(self._active_tasks) + for task in tasks_to_cancel: + task.cancel() + self._active_tasks.clear() + self._loop = None + self._task = None + self._task_for_pid = None + + def start(self) -> None: + if not self.is_alive: + try: + self._loop = asyncio.get_running_loop() + if self._queue is None: + self._queue = asyncio.Queue(maxsize=self._queue_size) + self._task = self._loop.create_task(self._target()) + self._task_for_pid = os.getpid() + except RuntimeError: + # There is no event loop running + logger.warning("No event loop running, async worker not started") + self._loop = None + self._task = None + self._task_for_pid = None + + def full(self) -> bool: + if self._queue is None: + return True + return self._queue.full() + + def _ensure_task(self) -> None: + if not self.is_alive: + self.start() + + async def _wait_flush(self, timeout: float, callback: Optional[Any] = None) -> None: + if not self._loop or not self._loop.is_running() or self._queue is None: + return + + initial_timeout = min(0.1, timeout) + + # Timeout on the join + try: + await asyncio.wait_for(self._queue.join(), timeout=initial_timeout) + except asyncio.TimeoutError: + pending = self._queue.qsize() + len(self._active_tasks) + logger.debug("%d event(s) pending on flush", pending) + if callback is not None: + callback(pending, timeout) + + try: + remaining_timeout = timeout - initial_timeout + await asyncio.wait_for(self._queue.join(), timeout=remaining_timeout) + except asyncio.TimeoutError: + pending = self._queue.qsize() + len(self._active_tasks) + logger.error("flush timed out, dropped %s events", pending) + + def flush(self, timeout: float, callback: Optional[Any] = None) -> Optional[asyncio.Task[None]]: # type: ignore[override] + if self.is_alive and timeout > 0.0 and self._loop and self._loop.is_running(): + return self._loop.create_task(self._wait_flush(timeout, callback)) + return None + + def submit(self, callback: Callable[[], Any]) -> bool: + self._ensure_task() + if self._queue is None: + return False + try: + self._queue.put_nowait(callback) + return True + except asyncio.QueueFull: + return False + + async def _target(self) -> None: + if self._queue is None: + return + while True: + callback = await self._queue.get() + if callback is _TERMINATOR: + self._queue.task_done() + break + # Firing tasks instead of awaiting them allows for concurrent requests + task = asyncio.create_task(self._process_callback(callback)) + # Create a strong reference to the task so it can be cancelled on kill + # and does not get garbage collected while running + self._active_tasks.add(task) + task.add_done_callback(self._on_task_complete) + # Yield to let the event loop run other tasks + await asyncio.sleep(0) + + async def _process_callback(self, callback: Callable[[], Any]) -> None: + # Callback is an async coroutine, need to await it + await callback() + + def _on_task_complete(self, task: asyncio.Task[None]) -> None: + try: + task.result() + except Exception: + logger.error("Failed processing job", exc_info=True) + finally: + # Mark the task as done and remove it from the active tasks set + # This happens only after the task has completed + if self._queue is not None: + self._queue.task_done() + self._active_tasks.discard(task) From 35d7078f6f5b47e1379f0abac8944509c92e9e6a Mon Sep 17 00:00:00 2001 From: Simon Roth <39389607+srothh@users.noreply.github.com> Date: Wed, 13 Aug 2025 18:21:17 +0200 Subject: [PATCH 13/15] Add async transport (#4614) Add async implementation of the abstract Transport class. This transport utilizes the async task worker as well as the httpcore async functionality. Thread Safety: As capture_envelope is registered by the client as a callback for several background threads in the sdk, which are not running the event loop, capture_envelope in the transport is made to be thread safe and allow for execution on the event loop from other threads. The same is currently not the case for flush, as there does not seem to be a usage from background threads, however if necessary, it can also be added. HTTP2 support: Currently not activated, but from the looks of the [httpcore docs](https://www.encode.io/httpcore/http2/) it should be as simple as setting the http2 in the init of the pool to true. This likely makes sense to support, as HTTP2 shows great performance improvements with concurrent requests. Kill: The kill method is sync, but the pool needs to be closed asynchronously. Currently, this is done by launching a task. However, the task cannot be awaited in sync code without deadlocking, therefore kill followed by an immediate loop shutdown could technically lead to resource leakage. Therefore, I decided to make kill optionally return the async task, so it can be awaited if called from an async context. Note also that parts of the code are very similar to the HTTP2 integration, as they both use the httpcore library. Maybe in a later PR there could be a shared superclass to avoid code duplication? GH-4582 --------- Co-authored-by: Neel Shah --- sentry_sdk/transport.py | 251 +++++++++++++++++++++++++++++++++++++++- setup.py | 1 + 2 files changed, 246 insertions(+), 6 deletions(-) diff --git a/sentry_sdk/transport.py b/sentry_sdk/transport.py index f8328cac12..eec4025048 100644 --- a/sentry_sdk/transport.py +++ b/sentry_sdk/transport.py @@ -6,6 +6,7 @@ import socket import ssl import time +import asyncio from datetime import datetime, timedelta, timezone from collections import defaultdict from urllib.request import getproxies @@ -17,18 +18,27 @@ try: import httpcore +except ImportError: + httpcore = None # type: ignore + +try: import h2 # noqa: F401 - HTTP2_ENABLED = True + HTTP2_ENABLED = httpcore is not None except ImportError: HTTP2_ENABLED = False +try: + ASYNC_TRANSPORT_ENABLED = httpcore is not None +except ImportError: + ASYNC_TRANSPORT_ENABLED = False + import urllib3 import certifi from sentry_sdk.consts import EndpointType from sentry_sdk.utils import Dsn, logger, capture_internal_exceptions -from sentry_sdk.worker import BackgroundWorker, Worker +from sentry_sdk.worker import BackgroundWorker, Worker, AsyncWorker from sentry_sdk.envelope import Envelope, Item, PayloadRef from typing import TYPE_CHECKING @@ -224,9 +234,8 @@ def __init__(self: Self, options: Dict[str, Any]) -> None: elif self._compression_algo == "br": self._compression_level = 4 - def _create_worker(self: Self, options: Dict[str, Any]) -> Worker: - # For now, we only support the threaded sync background worker. - return BackgroundWorker(queue_size=options["transport_queue_size"]) + def _create_worker(self, options: dict[str, Any]) -> Worker: + raise NotImplementedError() def record_lost_event( self: Self, @@ -543,6 +552,9 @@ def _send_request( finally: response.close() + def _create_worker(self: Self, options: dict[str, Any]) -> Worker: + return BackgroundWorker(queue_size=options["transport_queue_size"]) + def _flush_client_reports(self: Self, force: bool = False) -> None: client_report = self._fetch_pending_client_report(force=force, interval=60) if client_report is not None: @@ -571,6 +583,222 @@ def flush( self._worker.flush(timeout, callback) +if not ASYNC_TRANSPORT_ENABLED: + # Sorry, no AsyncHttpTransport for you + AsyncHttpTransport = BaseHttpTransport + +else: + + class AsyncHttpTransport(HttpTransportCore): # type: ignore + def __init__(self: Self, options: Dict[str, Any]) -> None: + super().__init__(options) + # Requires event loop at init time + self.loop = asyncio.get_running_loop() + + def _create_worker(self: Self, options: dict[str, Any]) -> Worker: + return AsyncWorker(queue_size=options["transport_queue_size"]) + + def _get_header_value(self: Self, response: Any, header: str) -> Optional[str]: + return next( + ( + val.decode("ascii") + for key, val in response.headers + if key.decode("ascii").lower() == header + ), + None, + ) + + async def _send_envelope(self: Self, envelope: Envelope) -> None: + _prepared_envelope = self._prepare_envelope(envelope) + if _prepared_envelope is not None: + envelope, body, headers = _prepared_envelope + await self._send_request( + body.getvalue(), + headers=headers, + endpoint_type=EndpointType.ENVELOPE, + envelope=envelope, + ) + return None + + async def _send_request( + self: Self, + body: bytes, + headers: Dict[str, str], + endpoint_type: EndpointType, + envelope: Optional[Envelope], + ) -> None: + self._update_headers(headers) + try: + response = await self._request( + "POST", + endpoint_type, + body, + headers, + ) + except Exception: + self._handle_request_error(envelope=envelope, loss_reason="network") + raise + try: + self._handle_response(response=response, envelope=envelope) + finally: + await response.aclose() + + async def _request( # type: ignore[override] + self: Self, + method: str, + endpoint_type: EndpointType, + body: Any, + headers: Mapping[str, str], + ) -> httpcore.Response: + return await self._pool.request( + method, + self._auth.get_api_url(endpoint_type), + content=body, + headers=headers, # type: ignore + extensions={ + "timeout": { + "pool": self.TIMEOUT, + "connect": self.TIMEOUT, + "write": self.TIMEOUT, + "read": self.TIMEOUT, + } + }, + ) + + async def _flush_client_reports(self: Self, force: bool = False) -> None: + client_report = self._fetch_pending_client_report(force=force, interval=60) + if client_report is not None: + self.capture_envelope(Envelope(items=[client_report])) + + def _capture_envelope(self: Self, envelope: Envelope) -> None: + async def send_envelope_wrapper() -> None: + with capture_internal_exceptions(): + await self._send_envelope(envelope) + await self._flush_client_reports() + + if not self._worker.submit(send_envelope_wrapper): + self.on_dropped_event("full_queue") + for item in envelope.items: + self.record_lost_event("queue_overflow", item=item) + + def capture_envelope(self: Self, envelope: Envelope) -> None: + # Synchronous entry point + if self.loop and self.loop.is_running(): + self.loop.call_soon_threadsafe(self._capture_envelope, envelope) + else: + # The event loop is no longer running + logger.warning("Async Transport is not running in an event loop.") + self.on_dropped_event("internal_sdk_error") + for item in envelope.items: + self.record_lost_event("internal_sdk_error", item=item) + + def flush( # type: ignore[override] + self: Self, + timeout: float, + callback: Optional[Callable[[int, float], None]] = None, + ) -> Optional[asyncio.Task[None]]: + logger.debug("Flushing HTTP transport") + + if timeout > 0: + self._worker.submit(lambda: self._flush_client_reports(force=True)) + return self._worker.flush(timeout, callback) # type: ignore[func-returns-value] + return None + + def _get_pool_options(self: Self) -> Dict[str, Any]: + options: Dict[str, Any] = { + "http2": False, # no HTTP2 for now + "retries": 3, + } + + socket_options = ( + self.options["socket_options"] + if self.options["socket_options"] is not None + else [] + ) + + used_options = {(o[0], o[1]) for o in socket_options} + for default_option in KEEP_ALIVE_SOCKET_OPTIONS: + if (default_option[0], default_option[1]) not in used_options: + socket_options.append(default_option) + + options["socket_options"] = socket_options + + ssl_context = ssl.create_default_context() + ssl_context.load_verify_locations( + self.options["ca_certs"] # User-provided bundle from the SDK init + or os.environ.get("SSL_CERT_FILE") + or os.environ.get("REQUESTS_CA_BUNDLE") + or certifi.where() + ) + cert_file = self.options["cert_file"] or os.environ.get("CLIENT_CERT_FILE") + key_file = self.options["key_file"] or os.environ.get("CLIENT_KEY_FILE") + if cert_file is not None: + ssl_context.load_cert_chain(cert_file, key_file) + + options["ssl_context"] = ssl_context + + return options + + def _make_pool( + self: Self, + ) -> Union[ + httpcore.AsyncSOCKSProxy, + httpcore.AsyncHTTPProxy, + httpcore.AsyncConnectionPool, + ]: + if self.parsed_dsn is None: + raise ValueError("Cannot create HTTP-based transport without valid DSN") + proxy = None + no_proxy = self._in_no_proxy(self.parsed_dsn) + + # try HTTPS first + https_proxy = self.options["https_proxy"] + if self.parsed_dsn.scheme == "https" and (https_proxy != ""): + proxy = https_proxy or (not no_proxy and getproxies().get("https")) + + # maybe fallback to HTTP proxy + http_proxy = self.options["http_proxy"] + if not proxy and (http_proxy != ""): + proxy = http_proxy or (not no_proxy and getproxies().get("http")) + + opts = self._get_pool_options() + + if proxy: + proxy_headers = self.options["proxy_headers"] + if proxy_headers: + opts["proxy_headers"] = proxy_headers + + if proxy.startswith("socks"): + try: + if "socket_options" in opts: + socket_options = opts.pop("socket_options") + if socket_options: + logger.warning( + "You have defined socket_options but using a SOCKS proxy which doesn't support these. We'll ignore socket_options." + ) + return httpcore.AsyncSOCKSProxy(proxy_url=proxy, **opts) + except RuntimeError: + logger.warning( + "You have configured a SOCKS proxy (%s) but support for SOCKS proxies is not installed. Disabling proxy support.", + proxy, + ) + else: + return httpcore.AsyncHTTPProxy(proxy_url=proxy, **opts) + + return httpcore.AsyncConnectionPool(**opts) + + def kill(self: Self) -> Optional[asyncio.Task[None]]: # type: ignore + + logger.debug("Killing HTTP transport") + self._worker.kill() + try: + # Return the pool cleanup task so caller can await it if needed + return self.loop.create_task(self._pool.aclose()) # type: ignore + except RuntimeError: + logger.warning("Event loop not running, aborting kill.") + return None + + class HttpTransport(BaseHttpTransport): if TYPE_CHECKING: _pool: Union[PoolManager, ProxyManager] @@ -816,11 +1044,22 @@ def make_transport(options: Dict[str, Any]) -> Optional[Transport]: ref_transport = options["transport"] use_http2_transport = options.get("_experiments", {}).get("transport_http2", False) - + use_async_transport = options.get("_experiments", {}).get("transport_async", False) # By default, we use the http transport class transport_cls: Type[Transport] = ( Http2Transport if use_http2_transport else HttpTransport ) + if use_async_transport and ASYNC_TRANSPORT_ENABLED: + try: + asyncio.get_running_loop() + transport_cls = AsyncHttpTransport + except RuntimeError: + # No event loop running, fall back to sync transport + logger.warning("No event loop running, falling back to sync transport.") + elif use_async_transport: + logger.warning( + "You tried to use AsyncHttpTransport but don't have httpcore[asyncio] installed. Falling back to sync transport." + ) if isinstance(ref_transport, Transport): return ref_transport diff --git a/setup.py b/setup.py index e4a29d858a..d2d0bf7bac 100644 --- a/setup.py +++ b/setup.py @@ -60,6 +60,7 @@ def get_file_text(file_name): "flask": ["flask>=0.11", "blinker>=1.1", "markupsafe"], "grpcio": ["grpcio>=1.21.1", "protobuf>=3.8.0"], "http2": ["httpcore[http2]==1.*"], + "asyncio": ["httpcore[asyncio]==1.*"], "httpx": ["httpx>=0.16.0"], "huey": ["huey>=2"], "huggingface_hub": ["huggingface_hub>=0.22"], From 9a2c80c400528fd9aa649fbec66b9282806ef903 Mon Sep 17 00:00:00 2001 From: Simon Roth <39389607+srothh@users.noreply.github.com> Date: Thu, 14 Aug 2025 15:31:34 +0200 Subject: [PATCH 14/15] Integrate async transport with SDK (#4615) Integrate the async transport with the rest of the SDK. Provide a new experimental option `transport_async` that enables the async transport if an event loop is running. Otherwise, fall back to the sync transport. Furthermore, adapt the client to work with the async transport. To this end, flush and close were changed to be non blocking and awaitable in an async context to avoid deadlocks, however close enforces a completed flush before shutdown. As there are to my knowledge no background threads running flush/close, these methods are currently not thread-safe/loop-aware for async, which can be changed if necessary. Atexit issue: The atexit integration used by the SDK runs after the event loop has already closed if asyncio.run() is used. This makes it impossible for the async flush to happen, as atexit calls client.close(), but a loop is no longer present. I attempted to apply [this fix](https://discuss.python.org/t/atexit-for-asyncio/13911) by patching the loop close in the asyncio integration, but I am unsure if I did it correctly/put it in the correct spot, or if this is a good idea. From my SDK test however, it seems to fix the flush issue. Note also that this will apparently be patched in Python 3.14, as per the discussion in the linked thread. As a final note, I added event loop checking. Whenever the event loop is used, the transport/client catch RuntimeErrors, which would be thrown in case the event loop was already shut down. I am not sure if this is a case we need to consider, but I added it for now because I did not want the transport to potentially throw RuntimeError if the event loop is shutdown during a program. If this should be left out currently for simplicity, I can remove it again. I added the [httpcore[asyncio] ](https://www.encode.io/httpcore/async/) dependency to requirements-testing, as it is needed for the async httpcore functionality. GH-4601 --------- Co-authored-by: Neel Shah --- requirements-testing.txt | 2 +- scripts/populate_tox/config.py | 3 +- scripts/populate_tox/tox.jinja | 2 +- sentry_sdk/api.py | 8 + sentry_sdk/client.py | 103 ++++++- sentry_sdk/consts.py | 1 + sentry_sdk/integrations/asyncio.py | 41 +++ sentry_sdk/transport.py | 228 ++++++++------- tests/integrations/asyncio/test_asyncio.py | 49 ++++ tests/test_client.py | 325 ++++++++++++++++++++- tests/test_transport.py | 222 ++++++++++++++ tox.ini | 65 +++-- 12 files changed, 891 insertions(+), 158 deletions(-) diff --git a/requirements-testing.txt b/requirements-testing.txt index 8e7bc47be0..e9a972680c 100644 --- a/requirements-testing.txt +++ b/requirements-testing.txt @@ -11,7 +11,7 @@ asttokens responses pysocks socksio -httpcore[http2] +httpcore[http2,asyncio] setuptools freezegun Brotli diff --git a/scripts/populate_tox/config.py b/scripts/populate_tox/config.py index 78bed91475..06eac2aa83 100644 --- a/scripts/populate_tox/config.py +++ b/scripts/populate_tox/config.py @@ -96,7 +96,7 @@ "pytest-asyncio", "python-multipart", "requests", - "anyio<4", + "anyio>=3,<5", ], # There's an incompatibility between FastAPI's TestClient, which is # actually Starlette's TestClient, which is actually httpx's Client. @@ -106,6 +106,7 @@ # FastAPI versions we use older httpx which still supports the # deprecated argument. "<0.110.1": ["httpx<0.28.0"], + "<0.80": ["anyio<4"], "py3.6": ["aiocontextvars"], }, }, diff --git a/scripts/populate_tox/tox.jinja b/scripts/populate_tox/tox.jinja index 66b1d7885a..514566ea46 100644 --- a/scripts/populate_tox/tox.jinja +++ b/scripts/populate_tox/tox.jinja @@ -207,7 +207,7 @@ deps = httpx-v0.25: pytest-httpx==0.25.0 httpx: pytest-httpx # anyio is a dep of httpx - httpx: anyio<4.0.0 + httpx: anyio>=3,<5 httpx-v0.16: httpx~=0.16.0 httpx-v0.18: httpx~=0.18.0 httpx-v0.20: httpx~=0.20.0 diff --git a/sentry_sdk/api.py b/sentry_sdk/api.py index 3aefc57f69..3252622746 100644 --- a/sentry_sdk/api.py +++ b/sentry_sdk/api.py @@ -226,6 +226,14 @@ def flush( return get_client().flush(timeout=timeout, callback=callback) +@clientmethod +async def flush_async( + timeout: Optional[float] = None, + callback: Optional[Callable[[int, float], None]] = None, +) -> None: + return await get_client().flush_async(timeout=timeout, callback=callback) + + def start_span(**kwargs: Any) -> Span: """ Start and return a span. diff --git a/sentry_sdk/client.py b/sentry_sdk/client.py index 917701f39c..b9d07e1402 100644 --- a/sentry_sdk/client.py +++ b/sentry_sdk/client.py @@ -25,7 +25,7 @@ ) from sentry_sdk.serializer import serialize from sentry_sdk.tracing import trace -from sentry_sdk.transport import HttpTransportCore, make_transport +from sentry_sdk.transport import HttpTransportCore, make_transport, AsyncHttpTransport from sentry_sdk.consts import ( SPANDATA, DEFAULT_MAX_VALUE_LENGTH, @@ -214,6 +214,12 @@ def close(self, *args: Any, **kwargs: Any) -> None: def flush(self, *args: Any, **kwargs: Any) -> None: return None + async def close_async(self, *args: Any, **kwargs: Any) -> None: + return None + + async def flush_async(self, *args: Any, **kwargs: Any) -> None: + return None + def __enter__(self) -> BaseClient: return self @@ -917,6 +923,14 @@ def get_integration( return self.integrations.get(integration_name) + def _close_components(self) -> None: + """Kill all client components in the correct order.""" + self.session_flusher.kill() + if self.log_batcher is not None: + self.log_batcher.kill() + if self.monitor: + self.monitor.kill() + def close( self, timeout: Optional[float] = None, @@ -927,19 +941,43 @@ def close( semantics as :py:meth:`Client.flush`. """ if self.transport is not None: + if isinstance(self.transport, AsyncHttpTransport) and hasattr( + self.transport, "loop" + ): + logger.debug( + "close() used with AsyncHttpTransport, aborting. Please use close_async() instead." + ) + return self.flush(timeout=timeout, callback=callback) - - self.session_flusher.kill() - - if self.log_batcher is not None: - self.log_batcher.kill() - - if self.monitor: - self.monitor.kill() - + self._close_components() self.transport.kill() self.transport = None + async def close_async( + self, + timeout: Optional[float] = None, + callback: Optional[Callable[[int, float], None]] = None, + ) -> None: + """ + Asynchronously close the client and shut down the transport. Arguments have the same + semantics as :py:meth:`Client.flush_async`. + """ + if self.transport is not None: + if not ( + isinstance(self.transport, AsyncHttpTransport) + and hasattr(self.transport, "loop") + ): + logger.debug( + "close_async() used with non-async transport, aborting. Please use close() instead." + ) + return + await self.flush_async(timeout=timeout, callback=callback) + self._close_components() + kill_task = self.transport.kill() # type: ignore + if kill_task is not None: + await kill_task + self.transport = None + def flush( self, timeout: Optional[float] = None, @@ -953,15 +991,52 @@ def flush( :param callback: Is invoked with the number of pending events and the configured timeout. """ if self.transport is not None: + if isinstance(self.transport, AsyncHttpTransport) and hasattr( + self.transport, "loop" + ): + logger.debug( + "flush() used with AsyncHttpTransport, aborting. Please use flush_async() instead." + ) + return if timeout is None: timeout = self.options["shutdown_timeout"] - self.session_flusher.flush() - - if self.log_batcher is not None: - self.log_batcher.flush() + self._flush_components() self.transport.flush(timeout=timeout, callback=callback) + async def flush_async( + self, + timeout: Optional[float] = None, + callback: Optional[Callable[[int, float], None]] = None, + ) -> None: + """ + Asynchronously wait for the current events to be sent. + + :param timeout: Wait for at most `timeout` seconds. If no `timeout` is provided, the `shutdown_timeout` option value is used. + + :param callback: Is invoked with the number of pending events and the configured timeout. + """ + if self.transport is not None: + if not ( + isinstance(self.transport, AsyncHttpTransport) + and hasattr(self.transport, "loop") + ): + logger.debug( + "flush_async() used with non-async transport, aborting. Please use flush() instead." + ) + return + if timeout is None: + timeout = self.options["shutdown_timeout"] + self._flush_components() + flush_task = self.transport.flush(timeout=timeout, callback=callback) # type: ignore + if flush_task is not None: + await flush_task + + def _flush_components(self) -> None: + self.session_flusher.flush() + if self.log_batcher is not None: + self.log_batcher.flush() + def __enter__(self) -> _Client: return self diff --git a/sentry_sdk/consts.py b/sentry_sdk/consts.py index 2b81fc4a2b..643cbb871a 100644 --- a/sentry_sdk/consts.py +++ b/sentry_sdk/consts.py @@ -75,6 +75,7 @@ class CompressionAlgo(Enum): "transport_compression_algo": Optional[CompressionAlgo], "transport_num_pools": Optional[int], "transport_http2": Optional[bool], + "transport_async": Optional[bool], "enable_logs": Optional[bool], "before_send_log": Optional[Callable[[Log, Hint], Optional[Log]]], }, diff --git a/sentry_sdk/integrations/asyncio.py b/sentry_sdk/integrations/asyncio.py index 719cbba1a8..b515345f1a 100644 --- a/sentry_sdk/integrations/asyncio.py +++ b/sentry_sdk/integrations/asyncio.py @@ -5,6 +5,7 @@ from sentry_sdk.consts import OP from sentry_sdk.integrations import Integration, DidNotEnable from sentry_sdk.utils import event_from_exception, logger, reraise +from sentry_sdk.transport import AsyncHttpTransport try: import asyncio @@ -29,6 +30,45 @@ def get_name(coro: Any) -> str: ) +def patch_loop_close() -> None: + """Patch loop.close to flush pending events before shutdown.""" + # Atexit shutdown hook happens after the event loop is closed. + # Therefore, it is necessary to patch the loop.close method to ensure + # that pending events are flushed before the interpreter shuts down. + try: + loop = asyncio.get_running_loop() + except RuntimeError: + # No running loop → cannot patch now + return + + if getattr(loop, "_sentry_flush_patched", False): + return + + async def _flush() -> None: + client = sentry_sdk.get_client() + if not client: + return + + try: + if not isinstance(client.transport, AsyncHttpTransport): + return + + await client.close_async() + except Exception: + logger.warning("Sentry flush failed during loop shutdown", exc_info=True) + + orig_close = loop.close + + def _patched_close() -> None: + try: + loop.run_until_complete(_flush()) + finally: + orig_close() + + loop.close = _patched_close # type: ignore + loop._sentry_flush_patched = True # type: ignore + + def patch_asyncio() -> None: orig_task_factory = None try: @@ -124,3 +164,4 @@ class AsyncioIntegration(Integration): @staticmethod def setup_once() -> None: patch_asyncio() + patch_loop_close() diff --git a/sentry_sdk/transport.py b/sentry_sdk/transport.py index eec4025048..5c2c864198 100644 --- a/sentry_sdk/transport.py +++ b/sentry_sdk/transport.py @@ -29,6 +29,8 @@ HTTP2_ENABLED = False try: + import anyio # noqa: F401 + ASYNC_TRANSPORT_ENABLED = httpcore is not None except ImportError: ASYNC_TRANSPORT_ENABLED = False @@ -583,9 +585,115 @@ def flush( self._worker.flush(timeout, callback) +class HttpTransport(BaseHttpTransport): + if TYPE_CHECKING: + _pool: Union[PoolManager, ProxyManager] + + def _get_pool_options(self: Self) -> Dict[str, Any]: + + num_pools = self.options.get("_experiments", {}).get("transport_num_pools") + options = { + "num_pools": 2 if num_pools is None else int(num_pools), + "cert_reqs": "CERT_REQUIRED", + "timeout": urllib3.Timeout(total=self.TIMEOUT), + } + + socket_options: Optional[List[Tuple[int, int, int | bytes]]] = None + + if self.options["socket_options"] is not None: + socket_options = self.options["socket_options"] + + if self.options["keep_alive"]: + if socket_options is None: + socket_options = [] + + used_options = {(o[0], o[1]) for o in socket_options} + for default_option in KEEP_ALIVE_SOCKET_OPTIONS: + if (default_option[0], default_option[1]) not in used_options: + socket_options.append(default_option) + + if socket_options is not None: + options["socket_options"] = socket_options + + options["ca_certs"] = ( + self.options["ca_certs"] # User-provided bundle from the SDK init + or os.environ.get("SSL_CERT_FILE") + or os.environ.get("REQUESTS_CA_BUNDLE") + or certifi.where() + ) + + options["cert_file"] = self.options["cert_file"] or os.environ.get( + "CLIENT_CERT_FILE" + ) + options["key_file"] = self.options["key_file"] or os.environ.get( + "CLIENT_KEY_FILE" + ) + + return options + + def _make_pool(self: Self) -> Union[PoolManager, ProxyManager]: + if self.parsed_dsn is None: + raise ValueError("Cannot create HTTP-based transport without valid DSN") + + proxy = None + no_proxy = self._in_no_proxy(self.parsed_dsn) + + # try HTTPS first + https_proxy = self.options["https_proxy"] + if self.parsed_dsn.scheme == "https" and (https_proxy != ""): + proxy = https_proxy or (not no_proxy and getproxies().get("https")) + + # maybe fallback to HTTP proxy + http_proxy = self.options["http_proxy"] + if not proxy and (http_proxy != ""): + proxy = http_proxy or (not no_proxy and getproxies().get("http")) + + opts = self._get_pool_options() + + if proxy: + proxy_headers = self.options["proxy_headers"] + if proxy_headers: + opts["proxy_headers"] = proxy_headers + + if proxy.startswith("socks"): + use_socks_proxy = True + try: + # Check if PySocks dependency is available + from urllib3.contrib.socks import SOCKSProxyManager + except ImportError: + use_socks_proxy = False + logger.warning( + "You have configured a SOCKS proxy (%s) but support for SOCKS proxies is not installed. Disabling proxy support. Please add `PySocks` (or `urllib3` with the `[socks]` extra) to your dependencies.", + proxy, + ) + + if use_socks_proxy: + return SOCKSProxyManager(proxy, **opts) + else: + return urllib3.PoolManager(**opts) + else: + return urllib3.ProxyManager(proxy, **opts) + else: + return urllib3.PoolManager(**opts) + + def _request( + self: Self, + method: str, + endpoint_type: EndpointType, + body: Any, + headers: Mapping[str, str], + ) -> urllib3.BaseHTTPResponse: + return self._pool.request( + method, + self._auth.get_api_url(endpoint_type), + body=body, + headers=headers, + ) + + if not ASYNC_TRANSPORT_ENABLED: # Sorry, no AsyncHttpTransport for you - AsyncHttpTransport = BaseHttpTransport + AsyncHttpTransport = HttpTransport else: @@ -799,112 +907,6 @@ def kill(self: Self) -> Optional[asyncio.Task[None]]: # type: ignore return None -class HttpTransport(BaseHttpTransport): - if TYPE_CHECKING: - _pool: Union[PoolManager, ProxyManager] - - def _get_pool_options(self: Self) -> Dict[str, Any]: - - num_pools = self.options.get("_experiments", {}).get("transport_num_pools") - options = { - "num_pools": 2 if num_pools is None else int(num_pools), - "cert_reqs": "CERT_REQUIRED", - "timeout": urllib3.Timeout(total=self.TIMEOUT), - } - - socket_options: Optional[List[Tuple[int, int, int | bytes]]] = None - - if self.options["socket_options"] is not None: - socket_options = self.options["socket_options"] - - if self.options["keep_alive"]: - if socket_options is None: - socket_options = [] - - used_options = {(o[0], o[1]) for o in socket_options} - for default_option in KEEP_ALIVE_SOCKET_OPTIONS: - if (default_option[0], default_option[1]) not in used_options: - socket_options.append(default_option) - - if socket_options is not None: - options["socket_options"] = socket_options - - options["ca_certs"] = ( - self.options["ca_certs"] # User-provided bundle from the SDK init - or os.environ.get("SSL_CERT_FILE") - or os.environ.get("REQUESTS_CA_BUNDLE") - or certifi.where() - ) - - options["cert_file"] = self.options["cert_file"] or os.environ.get( - "CLIENT_CERT_FILE" - ) - options["key_file"] = self.options["key_file"] or os.environ.get( - "CLIENT_KEY_FILE" - ) - - return options - - def _make_pool(self: Self) -> Union[PoolManager, ProxyManager]: - if self.parsed_dsn is None: - raise ValueError("Cannot create HTTP-based transport without valid DSN") - - proxy = None - no_proxy = self._in_no_proxy(self.parsed_dsn) - - # try HTTPS first - https_proxy = self.options["https_proxy"] - if self.parsed_dsn.scheme == "https" and (https_proxy != ""): - proxy = https_proxy or (not no_proxy and getproxies().get("https")) - - # maybe fallback to HTTP proxy - http_proxy = self.options["http_proxy"] - if not proxy and (http_proxy != ""): - proxy = http_proxy or (not no_proxy and getproxies().get("http")) - - opts = self._get_pool_options() - - if proxy: - proxy_headers = self.options["proxy_headers"] - if proxy_headers: - opts["proxy_headers"] = proxy_headers - - if proxy.startswith("socks"): - use_socks_proxy = True - try: - # Check if PySocks dependency is available - from urllib3.contrib.socks import SOCKSProxyManager - except ImportError: - use_socks_proxy = False - logger.warning( - "You have configured a SOCKS proxy (%s) but support for SOCKS proxies is not installed. Disabling proxy support. Please add `PySocks` (or `urllib3` with the `[socks]` extra) to your dependencies.", - proxy, - ) - - if use_socks_proxy: - return SOCKSProxyManager(proxy, **opts) - else: - return urllib3.PoolManager(**opts) - else: - return urllib3.ProxyManager(proxy, **opts) - else: - return urllib3.PoolManager(**opts) - - def _request( - self: Self, - method: str, - endpoint_type: EndpointType, - body: Any, - headers: Mapping[str, str], - ) -> urllib3.BaseHTTPResponse: - return self._pool.request( - method, - self._auth.get_api_url(endpoint_type), - body=body, - headers=headers, - ) - - if not HTTP2_ENABLED: # Sorry, no Http2Transport for you class Http2Transport(HttpTransport): @@ -1045,6 +1047,11 @@ def make_transport(options: Dict[str, Any]) -> Optional[Transport]: use_http2_transport = options.get("_experiments", {}).get("transport_http2", False) use_async_transport = options.get("_experiments", {}).get("transport_async", False) + async_integration = any( + integration.__class__.__name__ == "AsyncioIntegration" + for integration in options.get("integrations") or [] + ) + # By default, we use the http transport class transport_cls: Type[Transport] = ( Http2Transport if use_http2_transport else HttpTransport @@ -1052,7 +1059,12 @@ def make_transport(options: Dict[str, Any]) -> Optional[Transport]: if use_async_transport and ASYNC_TRANSPORT_ENABLED: try: asyncio.get_running_loop() - transport_cls = AsyncHttpTransport + if async_integration: + transport_cls = AsyncHttpTransport + else: + logger.warning( + "You tried to use AsyncHttpTransport but the AsyncioIntegration is not enabled. Falling back to sync transport." + ) except RuntimeError: # No event loop running, fall back to sync transport logger.warning("No event loop running, falling back to sync transport.") diff --git a/tests/integrations/asyncio/test_asyncio.py b/tests/integrations/asyncio/test_asyncio.py index 2ae71f8f43..5c329f8185 100644 --- a/tests/integrations/asyncio/test_asyncio.py +++ b/tests/integrations/asyncio/test_asyncio.py @@ -377,3 +377,52 @@ async def test_span_origin( assert event["contexts"]["trace"]["origin"] == "manual" assert event["spans"][0]["origin"] == "auto.function.asyncio" + + +@minimum_python_38 +def test_loop_close_patching(sentry_init): + sentry_init(integrations=[AsyncioIntegration()]) + + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + try: + with patch("asyncio.get_running_loop", return_value=loop): + assert not hasattr(loop, "_sentry_flush_patched") + AsyncioIntegration.setup_once() + assert hasattr(loop, "_sentry_flush_patched") + assert loop._sentry_flush_patched is True + + finally: + if not loop.is_closed(): + loop.close() + + +@minimum_python_38 +def test_loop_close_flushes_async_transport(sentry_init): + from sentry_sdk.transport import AsyncHttpTransport + from unittest.mock import Mock, AsyncMock + + sentry_init(integrations=[AsyncioIntegration()]) + + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + try: + with patch("asyncio.get_running_loop", return_value=loop): + AsyncioIntegration.setup_once() + + mock_client = Mock() + mock_transport = Mock(spec=AsyncHttpTransport) + mock_client.transport = mock_transport + mock_client.close = AsyncMock(return_value=None) + + with patch("sentry_sdk.get_client", return_value=mock_client): + loop.close() + + mock_client.close.assert_called_once() + mock_client.close.assert_awaited_once() + + except Exception: + if not loop.is_closed(): + loop.close() diff --git a/tests/test_client.py b/tests/test_client.py index 8290c8e575..25a3a8ab00 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -23,9 +23,12 @@ from sentry_sdk.spotlight import DEFAULT_SPOTLIGHT_URL from sentry_sdk.utils import capture_internal_exception from sentry_sdk.integrations.executing import ExecutingIntegration -from sentry_sdk.transport import Transport +from sentry_sdk.integrations.asyncio import AsyncioIntegration + +from sentry_sdk.transport import Transport, AsyncHttpTransport from sentry_sdk.serializer import MAX_DATABAG_BREADTH from sentry_sdk.consts import DEFAULT_MAX_BREADCRUMBS, DEFAULT_MAX_VALUE_LENGTH +from sentry_sdk._compat import PY38 from typing import TYPE_CHECKING @@ -1498,3 +1501,323 @@ def test_keep_alive(env_value, arg_value, expected_value): ) assert transport_cls.options["keep_alive"] is expected_value + + +@pytest.mark.parametrize( + "testcase", + [ + { + "dsn": "http://foo@sentry.io/123", + "env_http_proxy": None, + "env_https_proxy": None, + "arg_http_proxy": "http://localhost/123", + "arg_https_proxy": None, + "expected_proxy_scheme": "http", + }, + { + "dsn": "https://foo@sentry.io/123", + "env_http_proxy": None, + "env_https_proxy": None, + "arg_http_proxy": "https://localhost/123", + "arg_https_proxy": None, + "expected_proxy_scheme": "https", + }, + { + "dsn": "http://foo@sentry.io/123", + "env_http_proxy": None, + "env_https_proxy": None, + "arg_http_proxy": "http://localhost/123", + "arg_https_proxy": "https://localhost/123", + "expected_proxy_scheme": "http", + }, + { + "dsn": "https://foo@sentry.io/123", + "env_http_proxy": None, + "env_https_proxy": None, + "arg_http_proxy": "http://localhost/123", + "arg_https_proxy": "https://localhost/123", + "expected_proxy_scheme": "https", + }, + { + "dsn": "https://foo@sentry.io/123", + "env_http_proxy": None, + "env_https_proxy": None, + "arg_http_proxy": "http://localhost/123", + "arg_https_proxy": None, + "expected_proxy_scheme": "http", + }, + { + "dsn": "http://foo@sentry.io/123", + "env_http_proxy": None, + "env_https_proxy": None, + "arg_http_proxy": None, + "arg_https_proxy": None, + "expected_proxy_scheme": None, + }, + { + "dsn": "http://foo@sentry.io/123", + "env_http_proxy": "http://localhost/123", + "env_https_proxy": None, + "arg_http_proxy": None, + "arg_https_proxy": None, + "expected_proxy_scheme": "http", + }, + { + "dsn": "https://foo@sentry.io/123", + "env_http_proxy": None, + "env_https_proxy": "https://localhost/123", + "arg_http_proxy": None, + "arg_https_proxy": None, + "expected_proxy_scheme": "https", + }, + { + "dsn": "https://foo@sentry.io/123", + "env_http_proxy": "http://localhost/123", + "env_https_proxy": None, + "arg_http_proxy": None, + "arg_https_proxy": None, + "expected_proxy_scheme": "http", + }, + { + "dsn": "https://foo@sentry.io/123", + "env_http_proxy": "http://localhost/123", + "env_https_proxy": "https://localhost/123", + "arg_http_proxy": "", + "arg_https_proxy": "", + "expected_proxy_scheme": None, + }, + { + "dsn": "https://foo@sentry.io/123", + "env_http_proxy": "http://localhost/123", + "env_https_proxy": "https://localhost/123", + "arg_http_proxy": None, + "arg_https_proxy": None, + "expected_proxy_scheme": "https", + }, + { + "dsn": "https://foo@sentry.io/123", + "env_http_proxy": "http://localhost/123", + "env_https_proxy": None, + "arg_http_proxy": None, + "arg_https_proxy": None, + "expected_proxy_scheme": "http", + }, + { + "dsn": "https://foo@sentry.io/123", + "env_http_proxy": "http://localhost/123", + "env_https_proxy": "https://localhost/123", + "arg_http_proxy": None, + "arg_https_proxy": "", + "expected_proxy_scheme": "http", + }, + { + "dsn": "https://foo@sentry.io/123", + "env_http_proxy": "http://localhost/123", + "env_https_proxy": "https://localhost/123", + "arg_http_proxy": "", + "arg_https_proxy": None, + "expected_proxy_scheme": "https", + }, + { + "dsn": "https://foo@sentry.io/123", + "env_http_proxy": None, + "env_https_proxy": "https://localhost/123", + "arg_http_proxy": None, + "arg_https_proxy": "", + "expected_proxy_scheme": None, + }, + { + "dsn": "http://foo@sentry.io/123", + "env_http_proxy": "http://localhost/123", + "env_https_proxy": "https://localhost/123", + "arg_http_proxy": None, + "arg_https_proxy": None, + "expected_proxy_scheme": "http", + }, + # NO_PROXY testcases + { + "dsn": "http://foo@sentry.io/123", + "env_http_proxy": "http://localhost/123", + "env_https_proxy": None, + "env_no_proxy": "sentry.io,example.com", + "arg_http_proxy": None, + "arg_https_proxy": None, + "expected_proxy_scheme": None, + }, + { + "dsn": "https://foo@sentry.io/123", + "env_http_proxy": None, + "env_https_proxy": "https://localhost/123", + "env_no_proxy": "example.com,sentry.io", + "arg_http_proxy": None, + "arg_https_proxy": None, + "expected_proxy_scheme": None, + }, + { + "dsn": "http://foo@sentry.io/123", + "env_http_proxy": None, + "env_https_proxy": None, + "env_no_proxy": "sentry.io,example.com", + "arg_http_proxy": "http://localhost/123", + "arg_https_proxy": None, + "expected_proxy_scheme": "http", + }, + { + "dsn": "https://foo@sentry.io/123", + "env_http_proxy": None, + "env_https_proxy": None, + "env_no_proxy": "sentry.io,example.com", + "arg_http_proxy": None, + "arg_https_proxy": "https://localhost/123", + "expected_proxy_scheme": "https", + }, + { + "dsn": "https://foo@sentry.io/123", + "env_http_proxy": None, + "env_https_proxy": None, + "env_no_proxy": "sentry.io,example.com", + "arg_http_proxy": None, + "arg_https_proxy": "https://localhost/123", + "expected_proxy_scheme": "https", + "arg_proxy_headers": {"Test-Header": "foo-bar"}, + }, + ], +) +@pytest.mark.asyncio +@pytest.mark.skipif(not PY38, reason="Async transport requires Python 3.8+") +async def test_async_proxy(monkeypatch, testcase): + # These are just the same tests as the sync ones, but they need to be run in an event loop + # and respect the shutdown behavior of the async transport + if testcase["env_http_proxy"] is not None: + monkeypatch.setenv("HTTP_PROXY", testcase["env_http_proxy"]) + if testcase["env_https_proxy"] is not None: + monkeypatch.setenv("HTTPS_PROXY", testcase["env_https_proxy"]) + if testcase.get("env_no_proxy") is not None: + monkeypatch.setenv("NO_PROXY", testcase["env_no_proxy"]) + + kwargs = { + "_experiments": {"transport_async": True}, + "integrations": [AsyncioIntegration()], + } + + if testcase["arg_http_proxy"] is not None: + kwargs["http_proxy"] = testcase["arg_http_proxy"] + if testcase["arg_https_proxy"] is not None: + kwargs["https_proxy"] = testcase["arg_https_proxy"] + if testcase.get("arg_proxy_headers") is not None: + kwargs["proxy_headers"] = testcase["arg_proxy_headers"] + + client = Client(testcase["dsn"], **kwargs) + assert isinstance(client.transport, AsyncHttpTransport) + + proxy = getattr( + client.transport._pool, + "proxy", + getattr(client.transport._pool, "_proxy_url", None), + ) + if testcase["expected_proxy_scheme"] is None: + assert proxy is None + else: + scheme = ( + proxy.scheme.decode("ascii") + if isinstance(proxy.scheme, bytes) + else proxy.scheme + ) + assert scheme == testcase["expected_proxy_scheme"] + + if testcase.get("arg_proxy_headers") is not None: + proxy_headers = dict( + (k.decode("ascii"), v.decode("ascii")) + for k, v in client.transport._pool._proxy_headers + ) + assert proxy_headers == testcase["arg_proxy_headers"] + + await client.close_async() + + +@pytest.mark.parametrize( + "testcase", + [ + { + "dsn": "https://foo@sentry.io/123", + "arg_http_proxy": "http://localhost/123", + "arg_https_proxy": None, + "should_be_socks_proxy": False, + }, + { + "dsn": "https://foo@sentry.io/123", + "arg_http_proxy": "socks4a://localhost/123", + "arg_https_proxy": None, + "should_be_socks_proxy": True, + }, + { + "dsn": "https://foo@sentry.io/123", + "arg_http_proxy": "socks4://localhost/123", + "arg_https_proxy": None, + "should_be_socks_proxy": True, + }, + { + "dsn": "https://foo@sentry.io/123", + "arg_http_proxy": "socks5h://localhost/123", + "arg_https_proxy": None, + "should_be_socks_proxy": True, + }, + { + "dsn": "https://foo@sentry.io/123", + "arg_http_proxy": "socks5://localhost/123", + "arg_https_proxy": None, + "should_be_socks_proxy": True, + }, + { + "dsn": "https://foo@sentry.io/123", + "arg_http_proxy": None, + "arg_https_proxy": "socks4a://localhost/123", + "should_be_socks_proxy": True, + }, + { + "dsn": "https://foo@sentry.io/123", + "arg_http_proxy": None, + "arg_https_proxy": "socks4://localhost/123", + "should_be_socks_proxy": True, + }, + { + "dsn": "https://foo@sentry.io/123", + "arg_http_proxy": None, + "arg_https_proxy": "socks5h://localhost/123", + "should_be_socks_proxy": True, + }, + { + "dsn": "https://foo@sentry.io/123", + "arg_http_proxy": None, + "arg_https_proxy": "socks5://localhost/123", + "should_be_socks_proxy": True, + }, + ], +) +@pytest.mark.asyncio +@pytest.mark.skipif(not PY38, reason="Async transport requires Python 3.8+") +async def test_async_socks_proxy(testcase): + # These are just the same tests as the sync ones, but they need to be run in an event loop + # and respect the shutdown behavior of the async transport + + kwargs = { + "_experiments": {"transport_async": True}, + "integrations": [AsyncioIntegration()], + } + + if testcase["arg_http_proxy"] is not None: + kwargs["http_proxy"] = testcase["arg_http_proxy"] + if testcase["arg_https_proxy"] is not None: + kwargs["https_proxy"] = testcase["arg_https_proxy"] + + client = Client(testcase["dsn"], **kwargs) + assert isinstance(client.transport, AsyncHttpTransport) + + assert ("socks" in str(type(client.transport._pool)).lower()) == testcase[ + "should_be_socks_proxy" + ], ( + f"Expected {kwargs} to result in SOCKS == {testcase['should_be_socks_proxy']}" + f"but got {str(type(client.transport._pool))}" + ) + + await client.close_async() diff --git a/tests/test_transport.py b/tests/test_transport.py index e612bfcaa5..53426795c6 100644 --- a/tests/test_transport.py +++ b/tests/test_transport.py @@ -3,6 +3,8 @@ import os import socket import sys +import asyncio +import threading from collections import defaultdict from datetime import datetime, timedelta, timezone from unittest import mock @@ -28,8 +30,10 @@ from sentry_sdk.transport import ( KEEP_ALIVE_SOCKET_OPTIONS, _parse_rate_limits, + AsyncHttpTransport, ) from sentry_sdk.integrations.logging import LoggingIntegration, ignore_logger +from sentry_sdk.integrations.asyncio import AsyncioIntegration server = None @@ -146,6 +150,89 @@ def test_transport_works( assert any("Sending envelope" in record.msg for record in caplog.records) == debug +@pytest.mark.asyncio +@pytest.mark.parametrize("debug", (True, False)) +@pytest.mark.parametrize("client_flush_method", ["close", "flush"]) +@pytest.mark.parametrize("use_pickle", (True, False)) +@pytest.mark.parametrize("compression_level", (0, 9, None)) +@pytest.mark.parametrize("compression_algo", ("gzip", "br", "", None)) +@pytest.mark.skipif(not PY38, reason="Async transport only supported in Python 3.8+") +async def test_transport_works_async( + capturing_server, + request, + capsys, + caplog, + debug, + make_client, + client_flush_method, + use_pickle, + compression_level, + compression_algo, +): + caplog.set_level(logging.DEBUG) + + experiments = {} + if compression_level is not None: + experiments["transport_compression_level"] = compression_level + + if compression_algo is not None: + experiments["transport_compression_algo"] = compression_algo + + # Enable async transport + experiments["transport_async"] = True + + client = make_client( + debug=debug, + _experiments=experiments, + integrations=[AsyncioIntegration()], + ) + + if use_pickle: + client = pickle.loads(pickle.dumps(client)) + + # Verify we're using async transport + assert isinstance( + client.transport, AsyncHttpTransport + ), "Expected AsyncHttpTransport" + + sentry_sdk.get_global_scope().set_client(client) + request.addfinalizer(lambda: sentry_sdk.get_global_scope().set_client(None)) + + add_breadcrumb( + level="info", message="i like bread", timestamp=datetime.now(timezone.utc) + ) + capture_message("löl") + + if client_flush_method == "close": + await client.close_async(timeout=2.0) + if client_flush_method == "flush": + await client.flush_async(timeout=2.0) + + out, err = capsys.readouterr() + assert not err and not out + assert capturing_server.captured + should_compress = ( + # default is to compress with brotli if available, gzip otherwise + (compression_level is None) + or ( + # setting compression level to 0 means don't compress + compression_level + > 0 + ) + ) and ( + # if we couldn't resolve to a known algo, we don't compress + compression_algo + != "" + ) + + assert capturing_server.captured[0].compressed == should_compress + # After flush, the worker task is still running, but the end of the test will shut down the event loop + # Therefore, we need to explicitly close the client to clean up the worker task + assert any("Sending envelope" in record.msg for record in caplog.records) == debug + if client_flush_method == "flush": + await client.close_async(timeout=2.0) + + @pytest.mark.parametrize( "num_pools,expected_num_pools", ( @@ -717,3 +804,138 @@ def mock_record_lost_event(reason, data_category=None, item=None): assert calls[0] == ("on_dropped_event", "connection_error") assert calls[1][0:2] == ("record_lost_event", "network_error") assert calls[2][0:2] == ("record_lost_event", "network_error") + + +@pytest.mark.asyncio +@pytest.mark.skipif(not PY38, reason="Async transport requires Python 3.8+") +async def test_async_transport_background_thread_capture( + capturing_server, make_client, caplog +): + """Test capture_envelope from background threads uses run_coroutine_threadsafe""" + caplog.set_level(logging.DEBUG) + experiments = {"transport_async": True} + client = make_client(_experiments=experiments, integrations=[AsyncioIntegration()]) + assert isinstance(client.transport, AsyncHttpTransport) + sentry_sdk.get_global_scope().set_client(client) + captured_from_thread = [] + exception_from_thread = [] + + def background_thread_work(): + try: + # This should use run_coroutine_threadsafe path + capture_message("from background thread") + captured_from_thread.append(True) + except Exception as e: + exception_from_thread.append(e) + + thread = threading.Thread(target=background_thread_work) + thread.start() + thread.join() + assert not exception_from_thread + assert captured_from_thread + await client.close_async(timeout=2.0) + assert capturing_server.captured + + +@pytest.mark.asyncio +@pytest.mark.skipif(not PY38, reason="Async transport requires Python 3.8+") +async def test_async_transport_event_loop_closed_scenario( + capturing_server, make_client, caplog +): + """Test behavior when trying to capture after event loop context ends""" + caplog.set_level(logging.DEBUG) + experiments = {"transport_async": True} + client = make_client(_experiments=experiments, integrations=[AsyncioIntegration()]) + sentry_sdk.get_global_scope().set_client(client) + original_loop = client.transport.loop + + with mock.patch("asyncio.get_running_loop", side_effect=RuntimeError("no loop")): + with mock.patch.object(client.transport.loop, "is_running", return_value=False): + with mock.patch("sentry_sdk.transport.logger") as mock_logger: + # This should trigger the "no_async_context" path + capture_message("after loop closed") + + mock_logger.warning.assert_called_with( + "Async Transport is not running in an event loop." + ) + + client.transport.loop = original_loop + await client.close_async(timeout=2.0) + + +@pytest.mark.asyncio +@pytest.mark.skipif(not PY38, reason="Async transport requires Python 3.8+") +async def test_async_transport_concurrent_requests( + capturing_server, make_client, caplog +): + """Test multiple simultaneous envelope submissions""" + caplog.set_level(logging.DEBUG) + experiments = {"transport_async": True} + client = make_client(_experiments=experiments, integrations=[AsyncioIntegration()]) + assert isinstance(client.transport, AsyncHttpTransport) + sentry_sdk.get_global_scope().set_client(client) + + num_messages = 15 + + async def send_message(i): + capture_message(f"concurrent message {i}") + + tasks = [send_message(i) for i in range(num_messages)] + await asyncio.gather(*tasks) + await client.close_async(timeout=2.0) + assert len(capturing_server.captured) == num_messages + + +@pytest.mark.asyncio +@pytest.mark.skipif(not PY38, reason="Async transport requires Python 3.8+") +async def test_async_transport_rate_limiting_with_concurrency( + capturing_server, make_client, request +): + """Test async transport rate limiting with concurrent requests""" + experiments = {"transport_async": True} + client = make_client(_experiments=experiments, integrations=[AsyncioIntegration()]) + + assert isinstance(client.transport, AsyncHttpTransport) + sentry_sdk.get_global_scope().set_client(client) + request.addfinalizer(lambda: sentry_sdk.get_global_scope().set_client(None)) + capturing_server.respond_with( + code=429, headers={"X-Sentry-Rate-Limits": "60:error:organization"} + ) + + # Send one request first to trigger rate limiting + capture_message("initial message") + await asyncio.sleep(0.1) # Wait for request to execute + assert client.transport._check_disabled("error") is True + capturing_server.clear_captured() + + async def send_message(i): + capture_message(f"message {i}") + await asyncio.sleep(0.01) + + await asyncio.gather(*[send_message(i) for i in range(5)]) + await asyncio.sleep(0.1) + # New request should be dropped due to rate limiting + assert len(capturing_server.captured) == 0 + await client.close_async(timeout=2.0) + + +@pytest.mark.asyncio +@pytest.mark.skipif(not PY38, reason="Async transport requires Python 3.8+") +async def test_async_two_way_ssl_authentication(): + current_dir = os.path.dirname(__file__) + cert_file = f"{current_dir}/test.pem" + key_file = f"{current_dir}/test.key" + + client = Client( + "https://foo@sentry.io/123", + cert_file=cert_file, + key_file=key_file, + _experiments={"transport_async": True}, + integrations=[AsyncioIntegration()], + ) + assert isinstance(client.transport, AsyncHttpTransport) + + options = client.transport._get_pool_options() + assert options["ssl_context"] is not None + + await client.close_async() diff --git a/tox.ini b/tox.ini index fd52035fac..d6f5e173eb 100644 --- a/tox.ini +++ b/tox.ini @@ -10,7 +10,7 @@ # The file (and all resulting CI YAMLs) then need to be regenerated via # "scripts/generate-test-files.sh". # -# Last generated: 2025-07-23T07:24:30.467173+00:00 +# Last generated: 2025-07-30T13:59:12.959550+00:00 [tox] requires = @@ -125,16 +125,16 @@ envlist = # ~~~ Common ~~~ {py3.7,py3.8,py3.9}-common-v1.4.1 - {py3.7,py3.8,py3.9,py3.10,py3.11}-common-v1.14.0 - {py3.8,py3.9,py3.10,py3.11}-common-v1.24.0 - {py3.9,py3.10,py3.11,py3.12,py3.13}-common-v1.35.0 + {py3.7,py3.8,py3.9,py3.10,py3.11}-common-v1.15.0 + {py3.8,py3.9,py3.10,py3.11,py3.12}-common-v1.26.0 + {py3.9,py3.10,py3.11,py3.12,py3.13}-common-v1.36.0 # ~~~ AI ~~~ {py3.8,py3.11,py3.12}-anthropic-v0.16.0 - {py3.8,py3.11,py3.12}-anthropic-v0.30.1 - {py3.8,py3.11,py3.12}-anthropic-v0.44.0 - {py3.8,py3.12,py3.13}-anthropic-v0.58.2 + {py3.8,py3.11,py3.12}-anthropic-v0.31.2 + {py3.8,py3.11,py3.12}-anthropic-v0.46.0 + {py3.8,py3.12,py3.13}-anthropic-v0.60.0 {py3.9,py3.10,py3.11}-cohere-v5.4.0 {py3.9,py3.11,py3.12}-cohere-v5.9.4 @@ -143,12 +143,13 @@ envlist = {py3.10,py3.11,py3.12}-openai_agents-v0.0.19 {py3.10,py3.12,py3.13}-openai_agents-v0.1.0 - {py3.10,py3.12,py3.13}-openai_agents-v0.2.3 + {py3.10,py3.12,py3.13}-openai_agents-v0.2.4 {py3.8,py3.10,py3.11}-huggingface_hub-v0.22.2 {py3.8,py3.11,py3.12}-huggingface_hub-v0.26.5 {py3.8,py3.12,py3.13}-huggingface_hub-v0.30.2 - {py3.8,py3.12,py3.13}-huggingface_hub-v0.33.4 + {py3.8,py3.12,py3.13}-huggingface_hub-v0.34.3 + {py3.8,py3.12,py3.13}-huggingface_hub-v0.35.0rc0 # ~~~ DBs ~~~ @@ -164,7 +165,7 @@ envlist = {py3.7,py3.8,py3.9}-sqlalchemy-v1.3.24 {py3.7,py3.11,py3.12}-sqlalchemy-v1.4.54 - {py3.7,py3.12,py3.13}-sqlalchemy-v2.0.41 + {py3.7,py3.12,py3.13}-sqlalchemy-v2.0.42 # ~~~ Flags ~~~ @@ -179,7 +180,7 @@ envlist = {py3.7,py3.12,py3.13}-statsig-v0.55.3 {py3.7,py3.12,py3.13}-statsig-v0.57.3 {py3.7,py3.12,py3.13}-statsig-v0.59.1 - {py3.7,py3.12,py3.13}-statsig-v0.60.0 + {py3.7,py3.12,py3.13}-statsig-v0.61.0 {py3.8,py3.12,py3.13}-unleash-v6.0.1 {py3.8,py3.12,py3.13}-unleash-v6.1.0 @@ -210,8 +211,7 @@ envlist = {py3.7,py3.8}-grpc-v1.32.0 {py3.7,py3.9,py3.10}-grpc-v1.46.5 {py3.7,py3.11,py3.12}-grpc-v1.60.2 - {py3.9,py3.12,py3.13}-grpc-v1.73.1 - {py3.9,py3.12,py3.13}-grpc-v1.74.0rc1 + {py3.9,py3.12,py3.13}-grpc-v1.74.0 # ~~~ Tasks ~~~ @@ -262,7 +262,7 @@ envlist = {py3.7}-aiohttp-v3.4.4 {py3.7,py3.8,py3.9}-aiohttp-v3.7.4 {py3.8,py3.12,py3.13}-aiohttp-v3.10.11 - {py3.9,py3.12,py3.13}-aiohttp-v3.12.14 + {py3.9,py3.12,py3.13}-aiohttp-v3.12.15 {py3.7}-bottle-v0.12.25 {py3.8,py3.12,py3.13}-bottle-v0.13.4 @@ -378,7 +378,7 @@ deps = httpx-v0.25: pytest-httpx==0.25.0 httpx: pytest-httpx # anyio is a dep of httpx - httpx: anyio<4.0.0 + httpx: anyio>=3,<5 httpx-v0.16: httpx~=0.16.0 httpx-v0.18: httpx~=0.18.0 httpx-v0.20: httpx~=0.20.0 @@ -484,9 +484,9 @@ deps = # ~~~ Common ~~~ common-v1.4.1: opentelemetry-sdk==1.4.1 - common-v1.14.0: opentelemetry-sdk==1.14.0 - common-v1.24.0: opentelemetry-sdk==1.24.0 - common-v1.35.0: opentelemetry-sdk==1.35.0 + common-v1.15.0: opentelemetry-sdk==1.15.0 + common-v1.26.0: opentelemetry-sdk==1.26.0 + common-v1.36.0: opentelemetry-sdk==1.36.0 common: pytest common: pytest-asyncio py3.7-common: pytest<7.0.0 @@ -495,13 +495,13 @@ deps = # ~~~ AI ~~~ anthropic-v0.16.0: anthropic==0.16.0 - anthropic-v0.30.1: anthropic==0.30.1 - anthropic-v0.44.0: anthropic==0.44.0 - anthropic-v0.58.2: anthropic==0.58.2 + anthropic-v0.31.2: anthropic==0.31.2 + anthropic-v0.46.0: anthropic==0.46.0 + anthropic-v0.60.0: anthropic==0.60.0 anthropic: pytest-asyncio anthropic-v0.16.0: httpx<0.28.0 - anthropic-v0.30.1: httpx<0.28.0 - anthropic-v0.44.0: httpx<0.28.0 + anthropic-v0.31.2: httpx<0.28.0 + anthropic-v0.46.0: httpx<0.28.0 cohere-v5.4.0: cohere==5.4.0 cohere-v5.9.4: cohere==5.9.4 @@ -510,13 +510,14 @@ deps = openai_agents-v0.0.19: openai-agents==0.0.19 openai_agents-v0.1.0: openai-agents==0.1.0 - openai_agents-v0.2.3: openai-agents==0.2.3 + openai_agents-v0.2.4: openai-agents==0.2.4 openai_agents: pytest-asyncio huggingface_hub-v0.22.2: huggingface_hub==0.22.2 huggingface_hub-v0.26.5: huggingface_hub==0.26.5 huggingface_hub-v0.30.2: huggingface_hub==0.30.2 - huggingface_hub-v0.33.4: huggingface_hub==0.33.4 + huggingface_hub-v0.34.3: huggingface_hub==0.34.3 + huggingface_hub-v0.35.0rc0: huggingface_hub==0.35.0rc0 # ~~~ DBs ~~~ @@ -533,7 +534,7 @@ deps = sqlalchemy-v1.3.24: sqlalchemy==1.3.24 sqlalchemy-v1.4.54: sqlalchemy==1.4.54 - sqlalchemy-v2.0.41: sqlalchemy==2.0.41 + sqlalchemy-v2.0.42: sqlalchemy==2.0.42 # ~~~ Flags ~~~ @@ -548,7 +549,7 @@ deps = statsig-v0.55.3: statsig==0.55.3 statsig-v0.57.3: statsig==0.57.3 statsig-v0.59.1: statsig==0.59.1 - statsig-v0.60.0: statsig==0.60.0 + statsig-v0.61.0: statsig==0.61.0 statsig: typing_extensions unleash-v6.0.1: UnleashClient==6.0.1 @@ -592,8 +593,7 @@ deps = grpc-v1.32.0: grpcio==1.32.0 grpc-v1.46.5: grpcio==1.46.5 grpc-v1.60.2: grpcio==1.60.2 - grpc-v1.73.1: grpcio==1.73.1 - grpc-v1.74.0rc1: grpcio==1.74.0rc1 + grpc-v1.74.0: grpcio==1.74.0 grpc: protobuf grpc: mypy-protobuf grpc: types-protobuf @@ -686,10 +686,11 @@ deps = fastapi: pytest-asyncio fastapi: python-multipart fastapi: requests - fastapi: anyio<4 + fastapi: anyio>=3,<5 fastapi-v0.79.1: httpx<0.28.0 fastapi-v0.91.0: httpx<0.28.0 fastapi-v0.103.2: httpx<0.28.0 + fastapi-v0.79.1: anyio<4 py3.6-fastapi: aiocontextvars @@ -697,10 +698,10 @@ deps = aiohttp-v3.4.4: aiohttp==3.4.4 aiohttp-v3.7.4: aiohttp==3.7.4 aiohttp-v3.10.11: aiohttp==3.10.11 - aiohttp-v3.12.14: aiohttp==3.12.14 + aiohttp-v3.12.15: aiohttp==3.12.15 aiohttp: pytest-aiohttp aiohttp-v3.10.11: pytest-asyncio - aiohttp-v3.12.14: pytest-asyncio + aiohttp-v3.12.15: pytest-asyncio bottle-v0.12.25: bottle==0.12.25 bottle-v0.13.4: bottle==0.13.4 From 371ca965168ec2b13df7f57dc8b5b3a0550b5964 Mon Sep 17 00:00:00 2001 From: Simon Roth <39389607+srothh@users.noreply.github.com> Date: Thu, 14 Aug 2025 16:30:45 +0200 Subject: [PATCH 15/15] fix(asyncio integration): Filter SDK internal tasks from span creation (#4700) Filter SDK internal tasks from span creation. Implements a new contextmanager for tasks spawned internally by the SDK, which filters it in the asyncio integration. GH-4699 --------- Co-authored-by: Neel Shah --- sentry_sdk/integrations/asyncio.py | 64 ++++++++++++++------- sentry_sdk/transport.py | 10 +++- sentry_sdk/utils.py | 21 +++++++ sentry_sdk/worker.py | 11 ++-- tests/integrations/asyncio/test_asyncio.py | 67 ++++++++++++++++++++-- 5 files changed, 143 insertions(+), 30 deletions(-) diff --git a/sentry_sdk/integrations/asyncio.py b/sentry_sdk/integrations/asyncio.py index b515345f1a..f6f1d57c8b 100644 --- a/sentry_sdk/integrations/asyncio.py +++ b/sentry_sdk/integrations/asyncio.py @@ -4,7 +4,12 @@ import sentry_sdk from sentry_sdk.consts import OP from sentry_sdk.integrations import Integration, DidNotEnable -from sentry_sdk.utils import event_from_exception, logger, reraise +from sentry_sdk.utils import ( + event_from_exception, + logger, + reraise, + is_internal_task, +) from sentry_sdk.transport import AsyncHttpTransport try: @@ -69,6 +74,33 @@ def _patched_close() -> None: loop._sentry_flush_patched = True # type: ignore +def _create_task_with_factory( + orig_task_factory: Any, + loop: asyncio.AbstractEventLoop, + coro: Coroutine[Any, Any, Any], + **kwargs: Any, +) -> asyncio.Task[Any]: + task = None + + # Trying to use user set task factory (if there is one) + if orig_task_factory: + task = orig_task_factory(loop, coro, **kwargs) + + if task is None: + # The default task factory in `asyncio` does not have its own function + # but is just a couple of lines in `asyncio.base_events.create_task()` + # Those lines are copied here. + + # WARNING: + # If the default behavior of the task creation in asyncio changes, + # this will break! + task = Task(coro, loop=loop, **kwargs) + if task._source_traceback: # type: ignore + del task._source_traceback[-1] # type: ignore + + return task + + def patch_asyncio() -> None: orig_task_factory = None try: @@ -81,6 +113,14 @@ def _sentry_task_factory( **kwargs: Any, ) -> asyncio.Future[Any]: + # Check if this is an internal Sentry task + is_internal = is_internal_task() + + if is_internal: + return _create_task_with_factory( + orig_task_factory, loop, coro, **kwargs + ) + async def _task_with_sentry_span_creation() -> Any: result = None @@ -98,25 +138,9 @@ async def _task_with_sentry_span_creation() -> Any: return result - task = None - - # Trying to use user set task factory (if there is one) - if orig_task_factory: - task = orig_task_factory( - loop, _task_with_sentry_span_creation(), **kwargs - ) - - if task is None: - # The default task factory in `asyncio` does not have its own function - # but is just a couple of lines in `asyncio.base_events.create_task()` - # Those lines are copied here. - - # WARNING: - # If the default behavior of the task creation in asyncio changes, - # this will break! - task = Task(_task_with_sentry_span_creation(), loop=loop, **kwargs) - if task._source_traceback: # type: ignore - del task._source_traceback[-1] # type: ignore + task = _create_task_with_factory( + orig_task_factory, loop, _task_with_sentry_span_creation(), **kwargs + ) # Set the task name to include the original coroutine's name try: diff --git a/sentry_sdk/transport.py b/sentry_sdk/transport.py index 5c2c864198..6d7e4c4f84 100644 --- a/sentry_sdk/transport.py +++ b/sentry_sdk/transport.py @@ -39,7 +39,12 @@ import certifi from sentry_sdk.consts import EndpointType -from sentry_sdk.utils import Dsn, logger, capture_internal_exceptions +from sentry_sdk.utils import ( + Dsn, + logger, + capture_internal_exceptions, + mark_sentry_task_internal, +) from sentry_sdk.worker import BackgroundWorker, Worker, AsyncWorker from sentry_sdk.envelope import Envelope, Item, PayloadRef @@ -901,7 +906,8 @@ def kill(self: Self) -> Optional[asyncio.Task[None]]: # type: ignore self._worker.kill() try: # Return the pool cleanup task so caller can await it if needed - return self.loop.create_task(self._pool.aclose()) # type: ignore + with mark_sentry_task_internal(): + return self.loop.create_task(self._pool.aclose()) # type: ignore except RuntimeError: logger.warning("Event loop not running, aborting kill.") return None diff --git a/sentry_sdk/utils.py b/sentry_sdk/utils.py index 0842749baf..ee2cbfb5f1 100644 --- a/sentry_sdk/utils.py +++ b/sentry_sdk/utils.py @@ -1,5 +1,6 @@ from __future__ import annotations import base64 +import contextvars import json import linecache import logging @@ -12,6 +13,7 @@ import threading import time from collections import namedtuple +from contextlib import contextmanager from datetime import datetime, timezone from decimal import Decimal from functools import partial, partialmethod, wraps @@ -72,6 +74,25 @@ _installed_modules = None +_is_sentry_internal_task = contextvars.ContextVar( + "is_sentry_internal_task", default=False +) + + +def is_internal_task(): + return _is_sentry_internal_task.get() + + +@contextmanager +def mark_sentry_task_internal(): + """Context manager to mark a task as Sentry internal.""" + token = _is_sentry_internal_task.set(True) + try: + yield + finally: + _is_sentry_internal_task.reset(token) + + BASE64_ALPHABET = re.compile(r"^[a-zA-Z0-9/+=]*$") FALSY_ENV_VALUES = frozenset(("false", "f", "n", "no", "off", "0")) diff --git a/sentry_sdk/worker.py b/sentry_sdk/worker.py index c8dbbb2d73..8a23fa3ee1 100644 --- a/sentry_sdk/worker.py +++ b/sentry_sdk/worker.py @@ -6,7 +6,7 @@ from time import sleep, time from sentry_sdk._queue import Queue, FullError -from sentry_sdk.utils import logger +from sentry_sdk.utils import logger, mark_sentry_task_internal from sentry_sdk.consts import DEFAULT_QUEUE_SIZE from typing import TYPE_CHECKING @@ -231,7 +231,8 @@ def start(self) -> None: self._loop = asyncio.get_running_loop() if self._queue is None: self._queue = asyncio.Queue(maxsize=self._queue_size) - self._task = self._loop.create_task(self._target()) + with mark_sentry_task_internal(): + self._task = self._loop.create_task(self._target()) self._task_for_pid = os.getpid() except RuntimeError: # There is no event loop running @@ -273,7 +274,8 @@ async def _wait_flush(self, timeout: float, callback: Optional[Any] = None) -> N def flush(self, timeout: float, callback: Optional[Any] = None) -> Optional[asyncio.Task[None]]: # type: ignore[override] if self.is_alive and timeout > 0.0 and self._loop and self._loop.is_running(): - return self._loop.create_task(self._wait_flush(timeout, callback)) + with mark_sentry_task_internal(): + return self._loop.create_task(self._wait_flush(timeout, callback)) return None def submit(self, callback: Callable[[], Any]) -> bool: @@ -295,7 +297,8 @@ async def _target(self) -> None: self._queue.task_done() break # Firing tasks instead of awaiting them allows for concurrent requests - task = asyncio.create_task(self._process_callback(callback)) + with mark_sentry_task_internal(): + task = asyncio.create_task(self._process_callback(callback)) # Create a strong reference to the task so it can be cancelled on kill # and does not get garbage collected while running self._active_tasks.add(task) diff --git a/tests/integrations/asyncio/test_asyncio.py b/tests/integrations/asyncio/test_asyncio.py index 5c329f8185..42d8626ff3 100644 --- a/tests/integrations/asyncio/test_asyncio.py +++ b/tests/integrations/asyncio/test_asyncio.py @@ -8,6 +8,8 @@ import sentry_sdk from sentry_sdk.consts import OP from sentry_sdk.integrations.asyncio import AsyncioIntegration, patch_asyncio +from sentry_sdk.utils import mark_sentry_task_internal + try: from contextvars import Context, ContextVar @@ -379,6 +381,55 @@ async def test_span_origin( assert event["spans"][0]["origin"] == "auto.function.asyncio" +@minimum_python_38 +@pytest.mark.asyncio(loop_scope="module") +async def test_internal_tasks_not_wrapped(sentry_init, capture_events): + + sentry_init(integrations=[AsyncioIntegration()], traces_sample_rate=1.0) + events = capture_events() + + # Create a user task that should be wrapped + async def user_task(): + await asyncio.sleep(0.01) + return "user_result" + + # Create an internal task that should NOT be wrapped + async def internal_task(): + await asyncio.sleep(0.01) + return "internal_result" + + with sentry_sdk.start_transaction(name="test_transaction"): + user_task_obj = asyncio.create_task(user_task()) + + with mark_sentry_task_internal(): + internal_task_obj = asyncio.create_task(internal_task()) + + user_result = await user_task_obj + internal_result = await internal_task_obj + + assert user_result == "user_result" + assert internal_result == "internal_result" + + assert len(events) == 1 + transaction = events[0] + + user_spans = [] + internal_spans = [] + + for span in transaction.get("spans", []): + if "user_task" in span.get("description", ""): + user_spans.append(span) + elif "internal_task" in span.get("description", ""): + internal_spans.append(span) + + assert ( + len(user_spans) > 0 + ), f"User task should have been traced. All spans: {[s.get('description') for s in transaction.get('spans', [])]}" + assert ( + len(internal_spans) == 0 + ), f"Internal task should NOT have been traced. All spans: {[s.get('description') for s in transaction.get('spans', [])]}" + + @minimum_python_38 def test_loop_close_patching(sentry_init): sentry_init(integrations=[AsyncioIntegration()]) @@ -405,6 +456,12 @@ def test_loop_close_flushes_async_transport(sentry_init): sentry_init(integrations=[AsyncioIntegration()]) + # Save the current event loop to restore it later + try: + original_loop = asyncio.get_event_loop() + except RuntimeError: + original_loop = None + loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) @@ -415,14 +472,16 @@ def test_loop_close_flushes_async_transport(sentry_init): mock_client = Mock() mock_transport = Mock(spec=AsyncHttpTransport) mock_client.transport = mock_transport - mock_client.close = AsyncMock(return_value=None) + mock_client.close_async = AsyncMock(return_value=None) with patch("sentry_sdk.get_client", return_value=mock_client): loop.close() - mock_client.close.assert_called_once() - mock_client.close.assert_awaited_once() + mock_client.close_async.assert_called_once() + mock_client.close_async.assert_awaited_once() - except Exception: + finally: if not loop.is_closed(): loop.close() + if original_loop: + asyncio.set_event_loop(original_loop)