diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py index 259f1ddb91b..5a6849d606f 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py @@ -15,10 +15,11 @@ """OTLP Exporter""" import threading +import weakref from abc import ABC, abstractmethod from collections.abc import Sequence # noqa: F401 from logging import getLogger -from os import environ +from os import environ, register_at_fork from time import sleep from typing import ( # noqa: F401 Any, @@ -261,8 +262,13 @@ def __init__( self._client = self._stub(self._channel) self._export_lock = threading.Lock() + weak_reinit = weakref.WeakMethod(self._at_fork_reinit) + register_at_fork(after_in_child=lambda: weak_reinit()()) self._shutdown = False + def _at_fork_reinit(self): + self._export_lock._at_fork_reinit() + @abstractmethod def _translate_data( self, data: TypingSequence[SDKDataT] diff --git a/opentelemetry-api/src/opentelemetry/attributes/__init__.py b/opentelemetry-api/src/opentelemetry/attributes/__init__.py index fc3d494631a..8d2fafe02ec 100644 --- a/opentelemetry-api/src/opentelemetry/attributes/__init__.py +++ b/opentelemetry-api/src/opentelemetry/attributes/__init__.py @@ -14,9 +14,11 @@ import logging import threading +import weakref from collections import OrderedDict from collections.abc import MutableMapping from typing import Mapping, Optional, Sequence, Tuple, Union +from os import register_at_fork from opentelemetry.util import types @@ -263,6 +265,11 @@ def __init__( for key, value in attributes.items(): self[key] = value self._immutable = immutable + weak_reinit = weakref.WeakMethod(self._at_fork_reinit) + register_at_fork(after_in_child=lambda: weak_reinit()()) + + def _at_fork_reinit(self): + self._lock._at_fork_reinit() def __repr__(self) -> str: return f"{dict(self._dict)}" diff --git a/opentelemetry-api/src/opentelemetry/metrics/_internal/__init__.py b/opentelemetry-api/src/opentelemetry/metrics/_internal/__init__.py index 2319d8d1f90..e5042936d43 100644 --- a/opentelemetry-api/src/opentelemetry/metrics/_internal/__init__.py +++ b/opentelemetry-api/src/opentelemetry/metrics/_internal/__init__.py @@ -41,10 +41,11 @@ """ import warnings +import weakref from abc import ABC, abstractmethod from dataclasses import dataclass from logging import getLogger -from os import environ +from os import environ, register_at_fork from threading import Lock from typing import Dict, List, Optional, Sequence, Union, cast @@ -156,6 +157,11 @@ def __init__(self) -> None: self._lock = Lock() self._meters: List[_ProxyMeter] = [] self._real_meter_provider: Optional[MeterProvider] = None + weak_reinit = weakref.WeakMethod(self._at_fork_reinit) + register_at_fork(after_in_child=lambda: weak_reinit()()) + + def _at_fork_reinit(self): + self._lock._at_fork_reinit() def get_meter( self, @@ -510,6 +516,11 @@ def __init__( self._lock = Lock() self._instruments: List[_ProxyInstrumentT] = [] self._real_meter: Optional[Meter] = None + weak_reinit = weakref.WeakMethod(self._at_fork_reinit) + register_at_fork(after_in_child=lambda: weak_reinit()()) + + def _at_fork_reinit(self): + self._lock._at_fork_reinit() def on_set_meter_provider(self, meter_provider: MeterProvider) -> None: """Called when a real meter provider is set on the creating _ProxyMeterProvider diff --git a/opentelemetry-api/src/opentelemetry/util/_once.py b/opentelemetry-api/src/opentelemetry/util/_once.py index c0cee43a174..33e5e8e365f 100644 --- a/opentelemetry-api/src/opentelemetry/util/_once.py +++ b/opentelemetry-api/src/opentelemetry/util/_once.py @@ -12,6 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +import weakref +from os import register_at_fork from threading import Lock from typing import Callable @@ -25,6 +27,11 @@ class Once: def __init__(self) -> None: self._lock = Lock() self._done = False + weak_reinit = weakref.WeakMethod(self._at_fork_reinit) + register_at_fork(after_in_child=lambda: weak_reinit()()) + + def _at_fork_reinit(self): + self._lock._at_fork_reinit() def do_once(self, func: Callable[[], None]) -> bool: """Execute ``func`` if it hasn't been executed or return. diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/__init__.py index 9060e49aac4..7fd6ee1432a 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/__init__.py @@ -21,7 +21,8 @@ import threading import traceback import warnings -from os import environ +import weakref +from os import environ, register_at_fork from threading import Lock from time import time_ns from typing import Any, Callable, Tuple, Union, cast # noqa @@ -317,6 +318,11 @@ def __init__(self): # iterating through it on "emit". self._log_record_processors = () # type: Tuple[LogRecordProcessor, ...] self._lock = threading.Lock() + weak_reinit = weakref.WeakMethod(self._at_fork_reinit) + register_at_fork(after_in_child=lambda: weak_reinit()()) + + def _at_fork_reinit(self): + self._lock._at_fork_reinit() def add_log_record_processor( self, log_record_processor: LogRecordProcessor @@ -379,6 +385,11 @@ def __init__(self, max_workers: int = 2): self._executor = concurrent.futures.ThreadPoolExecutor( max_workers=max_workers ) + weak_reinit = weakref.WeakMethod(self._at_fork_reinit) + register_at_fork(after_in_child=lambda: weak_reinit()()) + + def _at_fork_reinit(self): + self._lock._at_fork_reinit() def add_log_record_processor( self, log_record_processor: LogRecordProcessor @@ -633,6 +644,11 @@ def __init__( self._at_exit_handler = atexit.register(self.shutdown) self._logger_cache = {} self._logger_cache_lock = Lock() + weak_reinit = weakref.WeakMethod(self._at_fork_reinit) + register_at_fork(after_in_child=lambda: weak_reinit()()) + + def _at_fork_reinit(self): + self._logger_cache_lock._at_fork_reinit() @property def resource(self): diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/in_memory_log_exporter.py b/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/in_memory_log_exporter.py index 68cb6b7389a..48ba3bd3357 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/in_memory_log_exporter.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/in_memory_log_exporter.py @@ -14,6 +14,8 @@ import threading import typing +import weakref +from os import register_at_fork from opentelemetry.sdk._logs import LogData from opentelemetry.sdk._logs.export import LogExporter, LogExportResult @@ -30,8 +32,13 @@ class InMemoryLogExporter(LogExporter): def __init__(self): self._logs = [] self._lock = threading.Lock() + weak_reinit = weakref.WeakMethod(self._at_fork_reinit) + register_at_fork(after_in_child=lambda: weak_reinit()()) self._stopped = False + def _at_fork_reinit(self): + self._lock._at_fork_reinit() + def clear(self) -> None: with self._lock: self._logs.clear() diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/__init__.py index faa0959fce2..0b3524675ed 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/__init__.py @@ -15,7 +15,7 @@ import weakref from atexit import register, unregister from logging import getLogger -from os import environ +from os import environ, register_at_fork from threading import Lock from time import time_ns from typing import Optional, Sequence @@ -88,6 +88,11 @@ def __init__( self._measurement_consumer = measurement_consumer self._instrument_id_instrument = {} self._instrument_id_instrument_lock = Lock() + weak_reinit = weakref.WeakMethod(self._at_fork_reinit) + register_at_fork(after_in_child=lambda: weak_reinit()()) + + def _at_fork_reinit(self): + self._instrument_id_instrument_lock._at_fork_reinit() def create_counter(self, name, unit="", description="") -> APICounter: status = self._register_instrument(name, _Counter, unit, description) @@ -421,6 +426,8 @@ def __init__( ): self._lock = Lock() self._meter_lock = Lock() + weak_reinit = weakref.WeakMethod(self._at_fork_reinit) + register_at_fork(after_in_child=lambda: weak_reinit()()) self._atexit_handler = None if resource is None: resource = Resource.create({}) @@ -463,6 +470,14 @@ def __init__( self._measurement_consumer.collect ) + def _at_fork_reinit(self): + self._lock._at_fork_reinit() + self._meter_lock._at_fork_reinit() + + @classmethod + def _register_fork_handlers(cls): + register_at_fork(after_in_child=cls._all_metric_readers_lock._at_fork_reinit) + def force_flush(self, timeout_millis: float = 10_000) -> bool: deadline_ns = time_ns() + timeout_millis * 10**6 @@ -580,3 +595,6 @@ def get_meter( self._measurement_consumer, ) return self._meters[info] + +# Call the method after the class is fully defined +MeterProvider._register_fork_handlers() \ No newline at end of file diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/_view_instrument_match.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/_view_instrument_match.py index be81d70e5cd..216fa9d7ef2 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/_view_instrument_match.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/_view_instrument_match.py @@ -13,7 +13,9 @@ # limitations under the License. +import weakref from logging import getLogger +from os import register_at_fork from threading import Lock from time import time_ns from typing import Dict, List, Optional, Sequence @@ -44,6 +46,8 @@ def __init__( self._instrument = instrument self._attributes_aggregation: Dict[frozenset, _Aggregation] = {} self._lock = Lock() + weak_reinit = weakref.WeakMethod(self._at_fork_reinit) + register_at_fork(after_in_child=lambda: weak_reinit()()) self._instrument_class_aggregation = instrument_class_aggregation self._name = self._view._name or self._instrument.name self._description = ( @@ -66,6 +70,9 @@ def __init__( 0, ) + def _at_fork_reinit(self): + self._lock._at_fork_reinit() + def conflicts(self, other: "_ViewInstrumentMatch") -> bool: # pylint: disable=protected-access diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/aggregation.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/aggregation.py index 1779dac0bba..9b829dd0312 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/aggregation.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/aggregation.py @@ -14,12 +14,14 @@ # pylint: disable=too-many-lines +import weakref from abc import ABC, abstractmethod from bisect import bisect_left from enum import IntEnum from functools import partial from logging import getLogger from math import inf +from os import register_at_fork from threading import Lock from typing import ( Callable, @@ -98,9 +100,14 @@ def __init__( ): self._lock = Lock() self._attributes = attributes + weak_reinit = weakref.WeakMethod(self._at_fork_reinit) + register_at_fork(after_in_child=lambda: weak_reinit()()) self._reservoir = reservoir_builder() self._previous_point = None + def _at_fork_reinit(self): + self._lock._at_fork_reinit() + @abstractmethod def aggregate( self, measurement: Measurement, should_sample_exemplar: bool = True diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/exponential_histogram/mapping/exponent_mapping.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/exponential_histogram/mapping/exponent_mapping.py index 297bb7a4831..d66e965f7f3 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/exponential_histogram/mapping/exponent_mapping.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/exponential_histogram/mapping/exponent_mapping.py @@ -13,6 +13,7 @@ # limitations under the License. from math import ldexp +from os import register_at_fork from threading import Lock from opentelemetry.sdk.metrics._internal.exponential_histogram.mapping import ( @@ -42,6 +43,11 @@ class ExponentMapping(Mapping): _min_scale = -10 _max_scale = 0 + # Add a class method for initialization that includes fork handler registration + @classmethod + def _register_fork_handlers(cls): + register_at_fork(after_in_child=cls._mappings_lock._at_fork_reinit) + def _get_min_scale(self): # _min_scale defines the point at which the exponential mapping # function becomes useless for 64-bit floats. With scale -10, ignoring @@ -139,3 +145,6 @@ def get_lower_boundary(self, index: int) -> float: @property def scale(self) -> int: return self._scale + +# Call the method after the class is fully defined +ExponentMapping._register_fork_handlers() diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/exponential_histogram/mapping/logarithm_mapping.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/exponential_histogram/mapping/logarithm_mapping.py index e73f3a81e23..34d59da6381 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/exponential_histogram/mapping/logarithm_mapping.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/exponential_histogram/mapping/logarithm_mapping.py @@ -13,6 +13,7 @@ # limitations under the License. from math import exp, floor, ldexp, log +from os import register_at_fork from threading import Lock from opentelemetry.sdk.metrics._internal.exponential_histogram.mapping import ( @@ -41,6 +42,11 @@ class LogarithmMapping(Mapping): _min_scale = 1 _max_scale = 20 + # Add a class method for initialization that includes fork handler registration + @classmethod + def _register_fork_handlers(cls): + register_at_fork(after_in_child=cls._mappings_lock._at_fork_reinit) + def _get_min_scale(self): # _min_scale ensures that ExponentMapping is used for zero and negative # scale values. @@ -136,3 +142,6 @@ def get_lower_boundary(self, index: int) -> float: @property def scale(self) -> int: return self._scale + +# Call the method after the class is fully defined +LogarithmMapping._register_fork_handlers() diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py index 2cb587f2f65..3a96b0b27b1 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py @@ -19,7 +19,7 @@ from abc import ABC, abstractmethod from enum import Enum from logging import getLogger -from os import environ, linesep +from os import environ, linesep, register_at_fork from sys import stdout from threading import Event, Lock, RLock, Thread from time import time_ns @@ -409,6 +409,11 @@ def __init__( self._metrics_data: "opentelemetry.sdk.metrics.export.MetricsData" = ( None ) + weak_reinit = weakref.WeakMethod(self._at_fork_reinit) + register_at_fork(after_in_child=lambda: weak_reinit()()) + + def _at_fork_reinit(self): + self._lock._at_fork_reinit() def get_metrics_data( self, @@ -510,6 +515,7 @@ def __init__( ) def _at_fork_reinit(self): + self._export_lock._at_fork_reinit() self._daemon_thread = Thread( name="OtelPeriodicExportingMetricReader", target=self._ticker, diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py index c651033051a..9865b69cd47 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py @@ -14,7 +14,9 @@ # pylint: disable=unused-import +import weakref from abc import ABC, abstractmethod +from os import register_at_fork from threading import Lock from time import time_ns from typing import Iterable, List, Mapping, Optional @@ -61,6 +63,8 @@ def __init__( sdk_config: "opentelemetry.sdk.metrics._internal.SdkConfiguration", ) -> None: self._lock = Lock() + weak_reinit = weakref.WeakMethod(self._at_fork_reinit) + register_at_fork(after_in_child=lambda: weak_reinit()()) self._sdk_config = sdk_config # should never be mutated self._reader_storages: Mapping[ @@ -77,6 +81,9 @@ def __init__( "opentelemetry.sdk.metrics._internal.instrument._Asynchronous" ] = [] + def _at_fork_reinit(self): + self._lock._at_fork_reinit() + def consume_measurement(self, measurement: Measurement) -> None: should_sample_exemplar = ( self._sdk_config.exemplar_filter.should_sample( diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/metric_reader_storage.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/metric_reader_storage.py index f5121811ebc..f9da33feb0e 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/metric_reader_storage.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/metric_reader_storage.py @@ -12,7 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. +import weakref from logging import getLogger +from os import register_at_fork from threading import RLock from time import time_ns from typing import Dict, List, Optional @@ -74,6 +76,11 @@ def __init__( ] = {} self._instrument_class_temporality = instrument_class_temporality self._instrument_class_aggregation = instrument_class_aggregation + weak_reinit = weakref.WeakMethod(self._at_fork_reinit) + register_at_fork(after_in_child=lambda: weak_reinit()()) + + def _at_fork_reinit(self): + self._lock._at_fork_reinit() def _get_or_init_view_instrument_match( self, instrument: Instrument diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py index a1c0576520e..b82ba3dd6f3 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py @@ -21,7 +21,8 @@ import threading import traceback import typing -from os import environ +import weakref +from os import environ, register_at_fork from time import time_ns from types import MappingProxyType, TracebackType from typing import ( @@ -156,6 +157,11 @@ def __init__(self): # iterating through it on "on_start" and "on_end". self._span_processors = () self._lock = threading.Lock() + weak_reinit = weakref.WeakMethod(self._at_fork_reinit) + register_at_fork(after_in_child=lambda: weak_reinit()()) + + def _at_fork_reinit(self): + self._lock._at_fork_reinit() def add_span_processor(self, span_processor: SpanProcessor) -> None: """Adds a SpanProcessor to the list handled by this instance.""" @@ -226,6 +232,11 @@ def __init__(self, num_threads: int = 2): self._executor = concurrent.futures.ThreadPoolExecutor( max_workers=num_threads ) + weak_reinit = weakref.WeakMethod(self._at_fork_reinit) + register_at_fork(after_in_child=lambda: weak_reinit()()) + + def _at_fork_reinit(self): + self._lock._at_fork_reinit() def add_span_processor(self, span_processor: SpanProcessor) -> None: """Adds a SpanProcessor to the list handled by this instance.""" @@ -815,6 +826,11 @@ def __init__( self._events.append(event) self._links = self._new_links(links) + weak_reinit = weakref.WeakMethod(self._at_fork_reinit) + register_at_fork(after_in_child=lambda: weak_reinit()()) + + def _at_fork_reinit(self): + self._lock._at_fork_reinit() def __repr__(self): return f'{type(self).__name__}(name="{self._name}", context={self._context})' diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/in_memory_span_exporter.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/in_memory_span_exporter.py index c28ecfd214f..6181f6f8a1f 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/in_memory_span_exporter.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/in_memory_span_exporter.py @@ -14,6 +14,8 @@ import threading import typing +import weakref +from os import register_at_fork from opentelemetry.sdk.trace import ReadableSpan from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult @@ -31,6 +33,11 @@ def __init__(self) -> None: self._finished_spans: typing.List[ReadableSpan] = [] self._stopped = False self._lock = threading.Lock() + weak_reinit = weakref.WeakMethod(self._at_fork_reinit) + register_at_fork(after_in_child=lambda: weak_reinit()()) + + def _at_fork_reinit(self): + self._lock._at_fork_reinit() def clear(self) -> None: """Clear list of collected spans.""" diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/util/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/util/__init__.py index 72f92fc25cc..ef2ff329e4f 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/util/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/util/__init__.py @@ -14,8 +14,10 @@ import datetime import threading +import weakref from collections import deque from collections.abc import MutableMapping, Sequence +from os import register_at_fork from typing import Optional from typing_extensions import deprecated @@ -54,6 +56,11 @@ def __init__(self, maxlen: Optional[int]): self.dropped = 0 self._dq = deque(maxlen=maxlen) # type: deque self._lock = threading.Lock() + weak_reinit = weakref.WeakMethod(self._at_fork_reinit) + register_at_fork(after_in_child=lambda: weak_reinit()()) + + def _at_fork_reinit(self): + self._lock._at_fork_reinit() def __repr__(self): return f"{type(self).__name__}({list(self._dq)}, maxlen={self._dq.maxlen})" @@ -111,6 +118,11 @@ def __init__(self, maxlen: Optional[int]): self.dropped = 0 self._dict = {} # type: dict self._lock = threading.Lock() # type: threading.Lock + weak_reinit = weakref.WeakMethod(self._at_fork_reinit) + register_at_fork(after_in_child=lambda: weak_reinit()()) + + def _at_fork_reinit(self): + self._lock._at_fork_reinit() def __repr__(self): return (