From 20b0ef4a2adcc3718c00c892058005e1e68df415 Mon Sep 17 00:00:00 2001 From: dshivashankar Date: Mon, 9 Jun 2025 14:44:42 +0530 Subject: [PATCH] fix: reinitialize locks after fork to prevent deadlocks in child processes Summary This commit adds post-fork reinitialization of threading locks across multiple components in the OpenTelemetry Python SDK and API. It ensures that threading.Lock() instances are safely reinitialized in child processes after a fork(), preventing potential deadlocks and undefined behavior. Details Introduced usage of register_at_fork(after_in_child=...) from the os module to reinitialize thread locks. Used weakref.WeakMethod() to safely refer to bound instance methods in register_at_fork. Added _at_fork_reinit() methods to classes using threading locks and registered them to run in child processes post-fork. Applied this to all usages of Lock, RLock Rationale Forked child processes inherit thread state from the parent, including the internal state of locks. This can cause deadlocks or runtime errors if a lock was held at the time of the fork. By reinitializing locks using the register_at_fork mechanism, we ensure child processes start with clean lock states. This is especially relevant for WSGI servers and environments that use pre-fork models (e.g., gunicorn, uWSGI), where instrumentation and telemetry components may misbehave without this precaution. --- .../exporter/otlp/proto/grpc/exporter.py | 8 +++++++- .../src/opentelemetry/attributes/__init__.py | 7 +++++++ .../metrics/_internal/__init__.py | 13 +++++++++++- .../src/opentelemetry/util/_once.py | 7 +++++++ .../sdk/_logs/_internal/__init__.py | 18 ++++++++++++++++- .../export/in_memory_log_exporter.py | 7 +++++++ .../sdk/metrics/_internal/__init__.py | 20 ++++++++++++++++++- .../_internal/_view_instrument_match.py | 7 +++++++ .../sdk/metrics/_internal/aggregation.py | 7 +++++++ .../mapping/exponent_mapping.py | 9 +++++++++ .../mapping/logarithm_mapping.py | 9 +++++++++ .../sdk/metrics/_internal/export/__init__.py | 8 +++++++- .../metrics/_internal/measurement_consumer.py | 7 +++++++ .../_internal/metric_reader_storage.py | 7 +++++++ .../src/opentelemetry/sdk/trace/__init__.py | 18 ++++++++++++++++- .../trace/export/in_memory_span_exporter.py | 7 +++++++ .../src/opentelemetry/sdk/util/__init__.py | 12 +++++++++++ 17 files changed, 165 insertions(+), 6 deletions(-) diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py index 259f1ddb91b..5a6849d606f 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py @@ -15,10 +15,11 @@ """OTLP Exporter""" import threading +import weakref from abc import ABC, abstractmethod from collections.abc import Sequence # noqa: F401 from logging import getLogger -from os import environ +from os import environ, register_at_fork from time import sleep from typing import ( # noqa: F401 Any, @@ -261,8 +262,13 @@ def __init__( self._client = self._stub(self._channel) self._export_lock = threading.Lock() + weak_reinit = weakref.WeakMethod(self._at_fork_reinit) + register_at_fork(after_in_child=lambda: weak_reinit()()) self._shutdown = False + def _at_fork_reinit(self): + self._export_lock._at_fork_reinit() + @abstractmethod def _translate_data( self, data: TypingSequence[SDKDataT] diff --git a/opentelemetry-api/src/opentelemetry/attributes/__init__.py b/opentelemetry-api/src/opentelemetry/attributes/__init__.py index fc3d494631a..8d2fafe02ec 100644 --- a/opentelemetry-api/src/opentelemetry/attributes/__init__.py +++ b/opentelemetry-api/src/opentelemetry/attributes/__init__.py @@ -14,9 +14,11 @@ import logging import threading +import weakref from collections import OrderedDict from collections.abc import MutableMapping from typing import Mapping, Optional, Sequence, Tuple, Union +from os import register_at_fork from opentelemetry.util import types @@ -263,6 +265,11 @@ def __init__( for key, value in attributes.items(): self[key] = value self._immutable = immutable + weak_reinit = weakref.WeakMethod(self._at_fork_reinit) + register_at_fork(after_in_child=lambda: weak_reinit()()) + + def _at_fork_reinit(self): + self._lock._at_fork_reinit() def __repr__(self) -> str: return f"{dict(self._dict)}" diff --git a/opentelemetry-api/src/opentelemetry/metrics/_internal/__init__.py b/opentelemetry-api/src/opentelemetry/metrics/_internal/__init__.py index 2319d8d1f90..e5042936d43 100644 --- a/opentelemetry-api/src/opentelemetry/metrics/_internal/__init__.py +++ b/opentelemetry-api/src/opentelemetry/metrics/_internal/__init__.py @@ -41,10 +41,11 @@ """ import warnings +import weakref from abc import ABC, abstractmethod from dataclasses import dataclass from logging import getLogger -from os import environ +from os import environ, register_at_fork from threading import Lock from typing import Dict, List, Optional, Sequence, Union, cast @@ -156,6 +157,11 @@ def __init__(self) -> None: self._lock = Lock() self._meters: List[_ProxyMeter] = [] self._real_meter_provider: Optional[MeterProvider] = None + weak_reinit = weakref.WeakMethod(self._at_fork_reinit) + register_at_fork(after_in_child=lambda: weak_reinit()()) + + def _at_fork_reinit(self): + self._lock._at_fork_reinit() def get_meter( self, @@ -510,6 +516,11 @@ def __init__( self._lock = Lock() self._instruments: List[_ProxyInstrumentT] = [] self._real_meter: Optional[Meter] = None + weak_reinit = weakref.WeakMethod(self._at_fork_reinit) + register_at_fork(after_in_child=lambda: weak_reinit()()) + + def _at_fork_reinit(self): + self._lock._at_fork_reinit() def on_set_meter_provider(self, meter_provider: MeterProvider) -> None: """Called when a real meter provider is set on the creating _ProxyMeterProvider diff --git a/opentelemetry-api/src/opentelemetry/util/_once.py b/opentelemetry-api/src/opentelemetry/util/_once.py index c0cee43a174..33e5e8e365f 100644 --- a/opentelemetry-api/src/opentelemetry/util/_once.py +++ b/opentelemetry-api/src/opentelemetry/util/_once.py @@ -12,6 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +import weakref +from os import register_at_fork from threading import Lock from typing import Callable @@ -25,6 +27,11 @@ class Once: def __init__(self) -> None: self._lock = Lock() self._done = False + weak_reinit = weakref.WeakMethod(self._at_fork_reinit) + register_at_fork(after_in_child=lambda: weak_reinit()()) + + def _at_fork_reinit(self): + self._lock._at_fork_reinit() def do_once(self, func: Callable[[], None]) -> bool: """Execute ``func`` if it hasn't been executed or return. diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/__init__.py index 9060e49aac4..7fd6ee1432a 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/__init__.py @@ -21,7 +21,8 @@ import threading import traceback import warnings -from os import environ +import weakref +from os import environ, register_at_fork from threading import Lock from time import time_ns from typing import Any, Callable, Tuple, Union, cast # noqa @@ -317,6 +318,11 @@ def __init__(self): # iterating through it on "emit". self._log_record_processors = () # type: Tuple[LogRecordProcessor, ...] self._lock = threading.Lock() + weak_reinit = weakref.WeakMethod(self._at_fork_reinit) + register_at_fork(after_in_child=lambda: weak_reinit()()) + + def _at_fork_reinit(self): + self._lock._at_fork_reinit() def add_log_record_processor( self, log_record_processor: LogRecordProcessor @@ -379,6 +385,11 @@ def __init__(self, max_workers: int = 2): self._executor = concurrent.futures.ThreadPoolExecutor( max_workers=max_workers ) + weak_reinit = weakref.WeakMethod(self._at_fork_reinit) + register_at_fork(after_in_child=lambda: weak_reinit()()) + + def _at_fork_reinit(self): + self._lock._at_fork_reinit() def add_log_record_processor( self, log_record_processor: LogRecordProcessor @@ -633,6 +644,11 @@ def __init__( self._at_exit_handler = atexit.register(self.shutdown) self._logger_cache = {} self._logger_cache_lock = Lock() + weak_reinit = weakref.WeakMethod(self._at_fork_reinit) + register_at_fork(after_in_child=lambda: weak_reinit()()) + + def _at_fork_reinit(self): + self._logger_cache_lock._at_fork_reinit() @property def resource(self): diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/in_memory_log_exporter.py b/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/in_memory_log_exporter.py index 68cb6b7389a..48ba3bd3357 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/in_memory_log_exporter.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/in_memory_log_exporter.py @@ -14,6 +14,8 @@ import threading import typing +import weakref +from os import register_at_fork from opentelemetry.sdk._logs import LogData from opentelemetry.sdk._logs.export import LogExporter, LogExportResult @@ -30,8 +32,13 @@ class InMemoryLogExporter(LogExporter): def __init__(self): self._logs = [] self._lock = threading.Lock() + weak_reinit = weakref.WeakMethod(self._at_fork_reinit) + register_at_fork(after_in_child=lambda: weak_reinit()()) self._stopped = False + def _at_fork_reinit(self): + self._lock._at_fork_reinit() + def clear(self) -> None: with self._lock: self._logs.clear() diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/__init__.py index faa0959fce2..0b3524675ed 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/__init__.py @@ -15,7 +15,7 @@ import weakref from atexit import register, unregister from logging import getLogger -from os import environ +from os import environ, register_at_fork from threading import Lock from time import time_ns from typing import Optional, Sequence @@ -88,6 +88,11 @@ def __init__( self._measurement_consumer = measurement_consumer self._instrument_id_instrument = {} self._instrument_id_instrument_lock = Lock() + weak_reinit = weakref.WeakMethod(self._at_fork_reinit) + register_at_fork(after_in_child=lambda: weak_reinit()()) + + def _at_fork_reinit(self): + self._instrument_id_instrument_lock._at_fork_reinit() def create_counter(self, name, unit="", description="") -> APICounter: status = self._register_instrument(name, _Counter, unit, description) @@ -421,6 +426,8 @@ def __init__( ): self._lock = Lock() self._meter_lock = Lock() + weak_reinit = weakref.WeakMethod(self._at_fork_reinit) + register_at_fork(after_in_child=lambda: weak_reinit()()) self._atexit_handler = None if resource is None: resource = Resource.create({}) @@ -463,6 +470,14 @@ def __init__( self._measurement_consumer.collect ) + def _at_fork_reinit(self): + self._lock._at_fork_reinit() + self._meter_lock._at_fork_reinit() + + @classmethod + def _register_fork_handlers(cls): + register_at_fork(after_in_child=cls._all_metric_readers_lock._at_fork_reinit) + def force_flush(self, timeout_millis: float = 10_000) -> bool: deadline_ns = time_ns() + timeout_millis * 10**6 @@ -580,3 +595,6 @@ def get_meter( self._measurement_consumer, ) return self._meters[info] + +# Call the method after the class is fully defined +MeterProvider._register_fork_handlers() \ No newline at end of file diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/_view_instrument_match.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/_view_instrument_match.py index be81d70e5cd..216fa9d7ef2 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/_view_instrument_match.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/_view_instrument_match.py @@ -13,7 +13,9 @@ # limitations under the License. +import weakref from logging import getLogger +from os import register_at_fork from threading import Lock from time import time_ns from typing import Dict, List, Optional, Sequence @@ -44,6 +46,8 @@ def __init__( self._instrument = instrument self._attributes_aggregation: Dict[frozenset, _Aggregation] = {} self._lock = Lock() + weak_reinit = weakref.WeakMethod(self._at_fork_reinit) + register_at_fork(after_in_child=lambda: weak_reinit()()) self._instrument_class_aggregation = instrument_class_aggregation self._name = self._view._name or self._instrument.name self._description = ( @@ -66,6 +70,9 @@ def __init__( 0, ) + def _at_fork_reinit(self): + self._lock._at_fork_reinit() + def conflicts(self, other: "_ViewInstrumentMatch") -> bool: # pylint: disable=protected-access diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/aggregation.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/aggregation.py index 1779dac0bba..9b829dd0312 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/aggregation.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/aggregation.py @@ -14,12 +14,14 @@ # pylint: disable=too-many-lines +import weakref from abc import ABC, abstractmethod from bisect import bisect_left from enum import IntEnum from functools import partial from logging import getLogger from math import inf +from os import register_at_fork from threading import Lock from typing import ( Callable, @@ -98,9 +100,14 @@ def __init__( ): self._lock = Lock() self._attributes = attributes + weak_reinit = weakref.WeakMethod(self._at_fork_reinit) + register_at_fork(after_in_child=lambda: weak_reinit()()) self._reservoir = reservoir_builder() self._previous_point = None + def _at_fork_reinit(self): + self._lock._at_fork_reinit() + @abstractmethod def aggregate( self, measurement: Measurement, should_sample_exemplar: bool = True diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/exponential_histogram/mapping/exponent_mapping.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/exponential_histogram/mapping/exponent_mapping.py index 297bb7a4831..d66e965f7f3 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/exponential_histogram/mapping/exponent_mapping.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/exponential_histogram/mapping/exponent_mapping.py @@ -13,6 +13,7 @@ # limitations under the License. from math import ldexp +from os import register_at_fork from threading import Lock from opentelemetry.sdk.metrics._internal.exponential_histogram.mapping import ( @@ -42,6 +43,11 @@ class ExponentMapping(Mapping): _min_scale = -10 _max_scale = 0 + # Add a class method for initialization that includes fork handler registration + @classmethod + def _register_fork_handlers(cls): + register_at_fork(after_in_child=cls._mappings_lock._at_fork_reinit) + def _get_min_scale(self): # _min_scale defines the point at which the exponential mapping # function becomes useless for 64-bit floats. With scale -10, ignoring @@ -139,3 +145,6 @@ def get_lower_boundary(self, index: int) -> float: @property def scale(self) -> int: return self._scale + +# Call the method after the class is fully defined +ExponentMapping._register_fork_handlers() diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/exponential_histogram/mapping/logarithm_mapping.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/exponential_histogram/mapping/logarithm_mapping.py index e73f3a81e23..34d59da6381 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/exponential_histogram/mapping/logarithm_mapping.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/exponential_histogram/mapping/logarithm_mapping.py @@ -13,6 +13,7 @@ # limitations under the License. from math import exp, floor, ldexp, log +from os import register_at_fork from threading import Lock from opentelemetry.sdk.metrics._internal.exponential_histogram.mapping import ( @@ -41,6 +42,11 @@ class LogarithmMapping(Mapping): _min_scale = 1 _max_scale = 20 + # Add a class method for initialization that includes fork handler registration + @classmethod + def _register_fork_handlers(cls): + register_at_fork(after_in_child=cls._mappings_lock._at_fork_reinit) + def _get_min_scale(self): # _min_scale ensures that ExponentMapping is used for zero and negative # scale values. @@ -136,3 +142,6 @@ def get_lower_boundary(self, index: int) -> float: @property def scale(self) -> int: return self._scale + +# Call the method after the class is fully defined +LogarithmMapping._register_fork_handlers() diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py index 2cb587f2f65..3a96b0b27b1 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py @@ -19,7 +19,7 @@ from abc import ABC, abstractmethod from enum import Enum from logging import getLogger -from os import environ, linesep +from os import environ, linesep, register_at_fork from sys import stdout from threading import Event, Lock, RLock, Thread from time import time_ns @@ -409,6 +409,11 @@ def __init__( self._metrics_data: "opentelemetry.sdk.metrics.export.MetricsData" = ( None ) + weak_reinit = weakref.WeakMethod(self._at_fork_reinit) + register_at_fork(after_in_child=lambda: weak_reinit()()) + + def _at_fork_reinit(self): + self._lock._at_fork_reinit() def get_metrics_data( self, @@ -510,6 +515,7 @@ def __init__( ) def _at_fork_reinit(self): + self._export_lock._at_fork_reinit() self._daemon_thread = Thread( name="OtelPeriodicExportingMetricReader", target=self._ticker, diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py index c651033051a..9865b69cd47 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py @@ -14,7 +14,9 @@ # pylint: disable=unused-import +import weakref from abc import ABC, abstractmethod +from os import register_at_fork from threading import Lock from time import time_ns from typing import Iterable, List, Mapping, Optional @@ -61,6 +63,8 @@ def __init__( sdk_config: "opentelemetry.sdk.metrics._internal.SdkConfiguration", ) -> None: self._lock = Lock() + weak_reinit = weakref.WeakMethod(self._at_fork_reinit) + register_at_fork(after_in_child=lambda: weak_reinit()()) self._sdk_config = sdk_config # should never be mutated self._reader_storages: Mapping[ @@ -77,6 +81,9 @@ def __init__( "opentelemetry.sdk.metrics._internal.instrument._Asynchronous" ] = [] + def _at_fork_reinit(self): + self._lock._at_fork_reinit() + def consume_measurement(self, measurement: Measurement) -> None: should_sample_exemplar = ( self._sdk_config.exemplar_filter.should_sample( diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/metric_reader_storage.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/metric_reader_storage.py index f5121811ebc..f9da33feb0e 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/metric_reader_storage.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/metric_reader_storage.py @@ -12,7 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. +import weakref from logging import getLogger +from os import register_at_fork from threading import RLock from time import time_ns from typing import Dict, List, Optional @@ -74,6 +76,11 @@ def __init__( ] = {} self._instrument_class_temporality = instrument_class_temporality self._instrument_class_aggregation = instrument_class_aggregation + weak_reinit = weakref.WeakMethod(self._at_fork_reinit) + register_at_fork(after_in_child=lambda: weak_reinit()()) + + def _at_fork_reinit(self): + self._lock._at_fork_reinit() def _get_or_init_view_instrument_match( self, instrument: Instrument diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py index a1c0576520e..b82ba3dd6f3 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py @@ -21,7 +21,8 @@ import threading import traceback import typing -from os import environ +import weakref +from os import environ, register_at_fork from time import time_ns from types import MappingProxyType, TracebackType from typing import ( @@ -156,6 +157,11 @@ def __init__(self): # iterating through it on "on_start" and "on_end". self._span_processors = () self._lock = threading.Lock() + weak_reinit = weakref.WeakMethod(self._at_fork_reinit) + register_at_fork(after_in_child=lambda: weak_reinit()()) + + def _at_fork_reinit(self): + self._lock._at_fork_reinit() def add_span_processor(self, span_processor: SpanProcessor) -> None: """Adds a SpanProcessor to the list handled by this instance.""" @@ -226,6 +232,11 @@ def __init__(self, num_threads: int = 2): self._executor = concurrent.futures.ThreadPoolExecutor( max_workers=num_threads ) + weak_reinit = weakref.WeakMethod(self._at_fork_reinit) + register_at_fork(after_in_child=lambda: weak_reinit()()) + + def _at_fork_reinit(self): + self._lock._at_fork_reinit() def add_span_processor(self, span_processor: SpanProcessor) -> None: """Adds a SpanProcessor to the list handled by this instance.""" @@ -815,6 +826,11 @@ def __init__( self._events.append(event) self._links = self._new_links(links) + weak_reinit = weakref.WeakMethod(self._at_fork_reinit) + register_at_fork(after_in_child=lambda: weak_reinit()()) + + def _at_fork_reinit(self): + self._lock._at_fork_reinit() def __repr__(self): return f'{type(self).__name__}(name="{self._name}", context={self._context})' diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/in_memory_span_exporter.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/in_memory_span_exporter.py index c28ecfd214f..6181f6f8a1f 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/in_memory_span_exporter.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/in_memory_span_exporter.py @@ -14,6 +14,8 @@ import threading import typing +import weakref +from os import register_at_fork from opentelemetry.sdk.trace import ReadableSpan from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult @@ -31,6 +33,11 @@ def __init__(self) -> None: self._finished_spans: typing.List[ReadableSpan] = [] self._stopped = False self._lock = threading.Lock() + weak_reinit = weakref.WeakMethod(self._at_fork_reinit) + register_at_fork(after_in_child=lambda: weak_reinit()()) + + def _at_fork_reinit(self): + self._lock._at_fork_reinit() def clear(self) -> None: """Clear list of collected spans.""" diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/util/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/util/__init__.py index 72f92fc25cc..ef2ff329e4f 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/util/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/util/__init__.py @@ -14,8 +14,10 @@ import datetime import threading +import weakref from collections import deque from collections.abc import MutableMapping, Sequence +from os import register_at_fork from typing import Optional from typing_extensions import deprecated @@ -54,6 +56,11 @@ def __init__(self, maxlen: Optional[int]): self.dropped = 0 self._dq = deque(maxlen=maxlen) # type: deque self._lock = threading.Lock() + weak_reinit = weakref.WeakMethod(self._at_fork_reinit) + register_at_fork(after_in_child=lambda: weak_reinit()()) + + def _at_fork_reinit(self): + self._lock._at_fork_reinit() def __repr__(self): return f"{type(self).__name__}({list(self._dq)}, maxlen={self._dq.maxlen})" @@ -111,6 +118,11 @@ def __init__(self, maxlen: Optional[int]): self.dropped = 0 self._dict = {} # type: dict self._lock = threading.Lock() # type: threading.Lock + weak_reinit = weakref.WeakMethod(self._at_fork_reinit) + register_at_fork(after_in_child=lambda: weak_reinit()()) + + def _at_fork_reinit(self): + self._lock._at_fork_reinit() def __repr__(self): return (