Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 22 additions & 2 deletions src/exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@
import sys
import time
from collections import defaultdict
from typing import Callable, Optional
from typing import Callable, Dict, Optional

from celery import Celery
from celery.events.state import State # type: ignore
from celery.states import READY_STATES # type: ignore
from celery.utils import nodesplit # type: ignore
from celery.utils.time import utcoffset # type: ignore
from kombu.exceptions import ChannelError # type: ignore
Expand All @@ -34,6 +35,7 @@ def __init__(
):
self.registry = CollectorRegistry(auto_describe=True)
self.queue_cache = set(initial_queues or [])
self.task_queue_map: Dict[str, str] = {}
self.worker_last_seen = {}
self.worker_timeout_seconds = worker_timeout_seconds
self.purge_offline_worker_metrics_after_seconds = (
Expand Down Expand Up @@ -273,17 +275,32 @@ def track_queue_metrics(self):
def track_task_event(self, event):
self.state.event(event)
task = self.state.tasks.get(event["uuid"])
if task is None:
logger.warning("Received event='%s' for unknown task", event["type"])
return
logger.debug("Received event='{}' for task='{}'", event["type"], task.name)

if event["type"] not in self.state_counters:
logger.warning("No counter matches task state='{}'", task.state)

queue_name = (
event.get("queue")
or event.get("routing_key")
or event.get("delivery_info", {}).get("routing_key")
or getattr(task, "queue", None)
or self.task_queue_map.get(task.uuid)
or self.default_queue_name
)
self.task_queue_map[task.uuid] = queue_name

labels = {
"name": task.name,
"hostname": get_hostname(task.hostname),
"queue_name": getattr(task, "queue", self.default_queue_name),
"queue_name": queue_name,
**self.static_label,
}
if queue_name:
self.queue_cache.add(queue_name)
if event["type"] == "task-sent" and self.generic_hostname_task_sent_metric:
labels["hostname"] = "generic"

Expand Down Expand Up @@ -317,6 +334,9 @@ def track_task_event(self, event):
task.runtime,
)

if task.state in READY_STATES:
self.task_queue_map.pop(task.uuid, None)

def track_worker_status(self, event, is_online):
value = 1 if is_online else 0
event_name = "worker-online" if is_online else "worker-offline"
Expand Down