Skip to content

fix: reinitialize locks after fork to prevent deadlocks in child processes #4626

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@
"""OTLP Exporter"""

import threading
import weakref
from abc import ABC, abstractmethod
from collections.abc import Sequence # noqa: F401
from logging import getLogger
from os import environ
from os import environ, register_at_fork
from time import sleep
from typing import ( # noqa: F401
Any,
Expand Down Expand Up @@ -261,8 +262,13 @@ def __init__(
self._client = self._stub(self._channel)

self._export_lock = threading.Lock()
weak_reinit = weakref.WeakMethod(self._at_fork_reinit)
register_at_fork(after_in_child=lambda: weak_reinit()())
self._shutdown = False

def _at_fork_reinit(self):
self._export_lock._at_fork_reinit()

@abstractmethod
def _translate_data(
self, data: TypingSequence[SDKDataT]
Expand Down
7 changes: 7 additions & 0 deletions opentelemetry-api/src/opentelemetry/attributes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@

import logging
import threading
import weakref
from collections import OrderedDict
from collections.abc import MutableMapping
from typing import Mapping, Optional, Sequence, Tuple, Union
from os import register_at_fork

from opentelemetry.util import types

Expand Down Expand Up @@ -263,6 +265,11 @@ def __init__(
for key, value in attributes.items():
self[key] = value
self._immutable = immutable
weak_reinit = weakref.WeakMethod(self._at_fork_reinit)
register_at_fork(after_in_child=lambda: weak_reinit()())

def _at_fork_reinit(self):
self._lock._at_fork_reinit()

def __repr__(self) -> str:
return f"{dict(self._dict)}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,11 @@
"""

import warnings
import weakref
from abc import ABC, abstractmethod
from dataclasses import dataclass
from logging import getLogger
from os import environ
from os import environ, register_at_fork
from threading import Lock
from typing import Dict, List, Optional, Sequence, Union, cast

Expand Down Expand Up @@ -156,6 +157,11 @@ def __init__(self) -> None:
self._lock = Lock()
self._meters: List[_ProxyMeter] = []
self._real_meter_provider: Optional[MeterProvider] = None
weak_reinit = weakref.WeakMethod(self._at_fork_reinit)
register_at_fork(after_in_child=lambda: weak_reinit()())

def _at_fork_reinit(self):
self._lock._at_fork_reinit()

def get_meter(
self,
Expand Down Expand Up @@ -510,6 +516,11 @@ def __init__(
self._lock = Lock()
self._instruments: List[_ProxyInstrumentT] = []
self._real_meter: Optional[Meter] = None
weak_reinit = weakref.WeakMethod(self._at_fork_reinit)
register_at_fork(after_in_child=lambda: weak_reinit()())

def _at_fork_reinit(self):
self._lock._at_fork_reinit()

def on_set_meter_provider(self, meter_provider: MeterProvider) -> None:
"""Called when a real meter provider is set on the creating _ProxyMeterProvider
Expand Down
7 changes: 7 additions & 0 deletions opentelemetry-api/src/opentelemetry/util/_once.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import weakref
from os import register_at_fork
from threading import Lock
from typing import Callable

Expand All @@ -25,6 +27,11 @@ class Once:
def __init__(self) -> None:
self._lock = Lock()
self._done = False
weak_reinit = weakref.WeakMethod(self._at_fork_reinit)
register_at_fork(after_in_child=lambda: weak_reinit()())

def _at_fork_reinit(self):
self._lock._at_fork_reinit()

def do_once(self, func: Callable[[], None]) -> bool:
"""Execute ``func`` if it hasn't been executed or return.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
import threading
import traceback
import warnings
from os import environ
import weakref
from os import environ, register_at_fork
from threading import Lock
from time import time_ns
from typing import Any, Callable, Tuple, Union, cast # noqa
Expand Down Expand Up @@ -317,6 +318,11 @@ def __init__(self):
# iterating through it on "emit".
self._log_record_processors = () # type: Tuple[LogRecordProcessor, ...]
self._lock = threading.Lock()
weak_reinit = weakref.WeakMethod(self._at_fork_reinit)
register_at_fork(after_in_child=lambda: weak_reinit()())

def _at_fork_reinit(self):
self._lock._at_fork_reinit()

def add_log_record_processor(
self, log_record_processor: LogRecordProcessor
Expand Down Expand Up @@ -379,6 +385,11 @@ def __init__(self, max_workers: int = 2):
self._executor = concurrent.futures.ThreadPoolExecutor(
max_workers=max_workers
)
weak_reinit = weakref.WeakMethod(self._at_fork_reinit)
register_at_fork(after_in_child=lambda: weak_reinit()())

def _at_fork_reinit(self):
self._lock._at_fork_reinit()

def add_log_record_processor(
self, log_record_processor: LogRecordProcessor
Expand Down Expand Up @@ -633,6 +644,11 @@ def __init__(
self._at_exit_handler = atexit.register(self.shutdown)
self._logger_cache = {}
self._logger_cache_lock = Lock()
weak_reinit = weakref.WeakMethod(self._at_fork_reinit)
register_at_fork(after_in_child=lambda: weak_reinit()())

def _at_fork_reinit(self):
self._logger_cache_lock._at_fork_reinit()

@property
def resource(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

import threading
import typing
import weakref
from os import register_at_fork

from opentelemetry.sdk._logs import LogData
from opentelemetry.sdk._logs.export import LogExporter, LogExportResult
Expand All @@ -30,8 +32,13 @@ class InMemoryLogExporter(LogExporter):
def __init__(self):
self._logs = []
self._lock = threading.Lock()
weak_reinit = weakref.WeakMethod(self._at_fork_reinit)
register_at_fork(after_in_child=lambda: weak_reinit()())
self._stopped = False

def _at_fork_reinit(self):
self._lock._at_fork_reinit()

def clear(self) -> None:
with self._lock:
self._logs.clear()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import weakref
from atexit import register, unregister
from logging import getLogger
from os import environ
from os import environ, register_at_fork
from threading import Lock
from time import time_ns
from typing import Optional, Sequence
Expand Down Expand Up @@ -88,6 +88,11 @@ def __init__(
self._measurement_consumer = measurement_consumer
self._instrument_id_instrument = {}
self._instrument_id_instrument_lock = Lock()
weak_reinit = weakref.WeakMethod(self._at_fork_reinit)
register_at_fork(after_in_child=lambda: weak_reinit()())

def _at_fork_reinit(self):
self._instrument_id_instrument_lock._at_fork_reinit()

def create_counter(self, name, unit="", description="") -> APICounter:
status = self._register_instrument(name, _Counter, unit, description)
Expand Down Expand Up @@ -421,6 +426,8 @@ def __init__(
):
self._lock = Lock()
self._meter_lock = Lock()
weak_reinit = weakref.WeakMethod(self._at_fork_reinit)
register_at_fork(after_in_child=lambda: weak_reinit()())
self._atexit_handler = None
if resource is None:
resource = Resource.create({})
Expand Down Expand Up @@ -463,6 +470,14 @@ def __init__(
self._measurement_consumer.collect
)

def _at_fork_reinit(self):
self._lock._at_fork_reinit()
self._meter_lock._at_fork_reinit()

@classmethod
def _register_fork_handlers(cls):
register_at_fork(after_in_child=cls._all_metric_readers_lock._at_fork_reinit)

def force_flush(self, timeout_millis: float = 10_000) -> bool:
deadline_ns = time_ns() + timeout_millis * 10**6

Expand Down Expand Up @@ -580,3 +595,6 @@ def get_meter(
self._measurement_consumer,
)
return self._meters[info]

# Call the method after the class is fully defined
MeterProvider._register_fork_handlers()
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
# limitations under the License.


import weakref
from logging import getLogger
from os import register_at_fork
from threading import Lock
from time import time_ns
from typing import Dict, List, Optional, Sequence
Expand Down Expand Up @@ -44,6 +46,8 @@ def __init__(
self._instrument = instrument
self._attributes_aggregation: Dict[frozenset, _Aggregation] = {}
self._lock = Lock()
weak_reinit = weakref.WeakMethod(self._at_fork_reinit)
register_at_fork(after_in_child=lambda: weak_reinit()())
self._instrument_class_aggregation = instrument_class_aggregation
self._name = self._view._name or self._instrument.name
self._description = (
Expand All @@ -66,6 +70,9 @@ def __init__(
0,
)

def _at_fork_reinit(self):
self._lock._at_fork_reinit()

def conflicts(self, other: "_ViewInstrumentMatch") -> bool:
# pylint: disable=protected-access

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@

# pylint: disable=too-many-lines

import weakref
from abc import ABC, abstractmethod
from bisect import bisect_left
from enum import IntEnum
from functools import partial
from logging import getLogger
from math import inf
from os import register_at_fork
from threading import Lock
from typing import (
Callable,
Expand Down Expand Up @@ -98,9 +100,14 @@ def __init__(
):
self._lock = Lock()
self._attributes = attributes
weak_reinit = weakref.WeakMethod(self._at_fork_reinit)
register_at_fork(after_in_child=lambda: weak_reinit()())
self._reservoir = reservoir_builder()
self._previous_point = None

def _at_fork_reinit(self):
self._lock._at_fork_reinit()

@abstractmethod
def aggregate(
self, measurement: Measurement, should_sample_exemplar: bool = True
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.

from math import ldexp
from os import register_at_fork
from threading import Lock

from opentelemetry.sdk.metrics._internal.exponential_histogram.mapping import (
Expand Down Expand Up @@ -42,6 +43,11 @@ class ExponentMapping(Mapping):
_min_scale = -10
_max_scale = 0

# Add a class method for initialization that includes fork handler registration
@classmethod
def _register_fork_handlers(cls):
register_at_fork(after_in_child=cls._mappings_lock._at_fork_reinit)

def _get_min_scale(self):
# _min_scale defines the point at which the exponential mapping
# function becomes useless for 64-bit floats. With scale -10, ignoring
Expand Down Expand Up @@ -139,3 +145,6 @@ def get_lower_boundary(self, index: int) -> float:
@property
def scale(self) -> int:
return self._scale

# Call the method after the class is fully defined
ExponentMapping._register_fork_handlers()
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.

from math import exp, floor, ldexp, log
from os import register_at_fork
from threading import Lock

from opentelemetry.sdk.metrics._internal.exponential_histogram.mapping import (
Expand Down Expand Up @@ -41,6 +42,11 @@ class LogarithmMapping(Mapping):
_min_scale = 1
_max_scale = 20

# Add a class method for initialization that includes fork handler registration
@classmethod
def _register_fork_handlers(cls):
register_at_fork(after_in_child=cls._mappings_lock._at_fork_reinit)

def _get_min_scale(self):
# _min_scale ensures that ExponentMapping is used for zero and negative
# scale values.
Expand Down Expand Up @@ -136,3 +142,6 @@ def get_lower_boundary(self, index: int) -> float:
@property
def scale(self) -> int:
return self._scale

# Call the method after the class is fully defined
LogarithmMapping._register_fork_handlers()
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from abc import ABC, abstractmethod
from enum import Enum
from logging import getLogger
from os import environ, linesep
from os import environ, linesep, register_at_fork
from sys import stdout
from threading import Event, Lock, RLock, Thread
from time import time_ns
Expand Down Expand Up @@ -409,6 +409,11 @@ def __init__(
self._metrics_data: "opentelemetry.sdk.metrics.export.MetricsData" = (
None
)
weak_reinit = weakref.WeakMethod(self._at_fork_reinit)
register_at_fork(after_in_child=lambda: weak_reinit()())

def _at_fork_reinit(self):
self._lock._at_fork_reinit()

def get_metrics_data(
self,
Expand Down Expand Up @@ -510,6 +515,7 @@ def __init__(
)

def _at_fork_reinit(self):
self._export_lock._at_fork_reinit()
self._daemon_thread = Thread(
name="OtelPeriodicExportingMetricReader",
target=self._ticker,
Expand Down
Loading