Skip to content

Commit 7b721e3

Browse files
enable the ability for buffering and aggregation to work at the same (#851)
* enable the ability for buffering and aggregation to work at the same time
1 parent 2bc6b36 commit 7b721e3

File tree

3 files changed

+94
-113
lines changed

3 files changed

+94
-113
lines changed

datadog/__init__.py

+8-7
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,9 @@ def initialize(
3737
api_host=None, # type: Optional[str]
3838
statsd_host=None, # type: Optional[str]
3939
statsd_port=None, # type: Optional[int]
40-
statsd_disable_aggregator=True, # type: bool
40+
statsd_disable_aggregation=True, # type: bool
4141
statsd_disable_buffering=True, # type: bool
42-
statsd_aggregation_flush_interval=2, # type: float
42+
statsd_aggregation_flush_interval=0.3, # type: float
4343
statsd_use_default_route=False, # type: bool
4444
statsd_socket_path=None, # type: Optional[str]
4545
statsd_namespace=None, # type: Optional[str]
@@ -78,12 +78,13 @@ def initialize(
7878
(default: True).
7979
:type statsd_disable_buffering: boolean
8080
81-
:param statsd_disable_aggregator: Enable/disable statsd client aggregation support
81+
:param statsd_disable_aggregation: Enable/disable statsd client aggregation support
8282
(default: True).
83-
:type statsd_disable_aggregator: boolean
83+
:type statsd_disable_aggregation: boolean
8484
85-
:param statsd_aggregation_flush_interval: Sets the flush interval for aggregation
86-
(default: 2 seconds)
85+
:param statsd_aggregation_flush_interval: If aggregation is enabled, set the flush interval for
86+
aggregation/buffering
87+
(default: 0.3 seconds)
8788
:type statsd_aggregation_flush_interval: float
8889
8990
:param statsd_use_default_route: Dynamically set the statsd host to the default route
@@ -138,7 +139,7 @@ def initialize(
138139
if statsd_constant_tags:
139140
statsd.constant_tags += statsd_constant_tags
140141

141-
if statsd_disable_aggregator:
142+
if statsd_disable_aggregation:
142143
statsd.disable_aggregation()
143144
else:
144145
statsd.enable_aggregation(statsd_aggregation_flush_interval)

datadog/dogstatsd/base.py

+55-103
Original file line numberDiff line numberDiff line change
@@ -50,11 +50,9 @@
5050
DEFAULT_PORT = 8125
5151

5252
# Buffering-related values (in seconds)
53-
DEFAULT_BUFFERING_FLUSH_INTERVAL = 0.3
53+
DEFAULT_FLUSH_INTERVAL = 0.3
5454
MIN_FLUSH_INTERVAL = 0.0001
5555

56-
# Aggregation-related values (in seconds)
57-
DEFAULT_AGGREGATION_FLUSH_INTERVAL = 2
5856
# Env var to enable/disable sending the container ID field
5957
ORIGIN_DETECTION_ENABLED = "DD_ORIGIN_DETECTION_ENABLED"
6058

@@ -147,8 +145,8 @@ def __init__(
147145
host=DEFAULT_HOST, # type: Text
148146
port=DEFAULT_PORT, # type: int
149147
max_buffer_size=None, # type: None
150-
flush_interval=DEFAULT_BUFFERING_FLUSH_INTERVAL, # type: float
151-
disable_aggregating=True, # type: bool
148+
flush_interval=DEFAULT_FLUSH_INTERVAL, # type: float
149+
disable_aggregation=True, # type: bool
152150
disable_buffering=True, # type: bool
153151
namespace=None, # type: Optional[Text]
154152
constant_tags=None, # type: Optional[List[str]]
@@ -238,8 +236,8 @@ def __init__(
238236
it overrides the default value.
239237
:type flush_interval: float
240238
241-
:disable_aggregating: If true, metrics (Count, Gauge, Set) are no longered aggregated by the client
242-
:type disable_aggregating: bool
239+
:disable_aggregation: If true, metrics (Count, Gauge, Set) are no longered aggregated by the client
240+
:type disable_aggregation: bool
243241
244242
:disable_buffering: If set, metrics are no longered buffered by the client and
245243
all data is sent synchronously to the server
@@ -447,34 +445,24 @@ def __init__(
447445
self._config_lock = RLock()
448446

449447
self._disable_buffering = disable_buffering
450-
self._disable_aggregating = disable_aggregating
448+
self._disable_aggregation = disable_aggregation
451449

452450
self._flush_interval = flush_interval
453451
self._flush_thread = None
454452
self._flush_thread_stop = threading.Event()
455453
self.aggregator = Aggregator()
456454
# Indicates if the process is about to fork, so we shouldn't start any new threads yet.
457455
self._forking = False
458-
# Currently, we do not allow both aggregation and buffering, we may revisit this in the future
459-
if self._disable_buffering and self._disable_aggregating:
460-
self._send = self._send_to_server
461-
log.debug("Statsd buffering and aggregation is disabled")
462-
elif self._disable_aggregating:
463-
# Start the flush thread if buffering is enabled and the interval is above
464-
# a reasonable range. This both prevents thrashing and allow us to use "0.0"
465-
# as a value for disabling the automatic flush timer as well.
456+
457+
if not self._disable_buffering:
466458
self._send = self._send_to_buffer
467-
self._start_flush_thread(
468-
self._flush_interval,
469-
self.flush_buffered_metrics,
470-
)
471459
else:
472460
self._send = self._send_to_server
473-
self._disable_buffering = True
474-
self._start_flush_thread(
475-
self._flush_interval,
476-
self.flush_aggregated_metrics,
477-
)
461+
462+
if not self._disable_aggregation or not self._disable_buffering:
463+
self._start_flush_thread()
464+
else:
465+
log.debug("Statsd buffering and aggregation is disabled")
478466

479467
self._queue = None
480468
self._sender_thread = None
@@ -551,30 +539,14 @@ def enable_telemetry(self):
551539
self._telemetry = True
552540

553541
# Note: Invocations of this method should be thread-safe
554-
def _start_flush_thread(
555-
self,
556-
flush_interval,
557-
flush_function,
558-
):
559-
if (self._disable_buffering or not self._disable_aggregating) and flush_function == self.flush_buffered_metrics:
560-
log.debug("Statsd periodic buffer flush is disabled")
542+
def _start_flush_thread(self):
543+
if self._disable_aggregation and self.disable_buffering:
544+
log.debug("Statsd periodic buffer and aggregation flush is disabled")
561545
return
562-
if (
563-
self._disable_aggregating
564-
and flush_function == self.flush_aggregated_metrics
565-
):
566-
log.debug("Statsd periodic aggregating flush is disabled")
567-
return
568-
569-
flush_type = ""
570-
if self._disable_buffering:
571-
flush_type = "aggregation"
572-
else:
573-
flush_type = "buffering"
574546

575-
if flush_interval <= MIN_FLUSH_INTERVAL:
547+
if self._flush_interval <= MIN_FLUSH_INTERVAL:
576548
log.debug(
577-
"the set flush interval for %s is less then the minimum", flush_type
549+
"the set flush interval is less then the minimum"
578550
)
579551
return
580552

@@ -587,30 +559,31 @@ def _start_flush_thread(
587559
def _flush_thread_loop(self, flush_interval):
588560
while not self._flush_thread_stop.is_set():
589561
time.sleep(flush_interval)
590-
flush_function()
591-
562+
if not self._disable_aggregation:
563+
self.flush_aggregated_metrics()
564+
if not self._disable_buffering:
565+
self.flush_buffered_metrics()
592566
self._flush_thread = threading.Thread(
593567
name="{}_flush_thread".format(self.__class__.__name__),
594568
target=_flush_thread_loop,
595-
args=(self, flush_interval,),
569+
args=(self, self._flush_interval,),
596570
)
597571
self._flush_thread.daemon = True
598572
self._flush_thread.start()
599573
log.debug(
600-
"Statsd %s flush thread registered with period of %s",
601-
flush_type,
602-
flush_interval,
574+
"Statsd flush thread registered with period of %s",
575+
self._flush_interval,
603576
)
604577

605578
# Note: Invocations of this method should be thread-safe
606579
def _stop_flush_thread(self):
607580
if not self._flush_thread:
608581
return
609582
try:
610-
if self._disable_aggregating:
611-
self.flush_buffered_metrics()
612-
else:
583+
if not self._disable_aggregation:
613584
self.flush_aggregated_metrics()
585+
if not self.disable_buffering:
586+
self.flush_buffered_metrics()
614587
finally:
615588
pass
616589

@@ -645,43 +618,40 @@ def disable_buffering(self, is_disabled):
645618

646619
self._disable_buffering = is_disabled
647620

648-
# If buffering has been disabled, flush and kill the background thread
621+
# If buffering (and aggregation) has been disabled, flush and kill the background thread
649622
# otherwise start up the flushing thread and enable the buffering.
650623
if is_disabled:
651624
self._send = self._send_to_server
652-
self._stop_flush_thread()
625+
if self._disable_aggregation and self.disable_buffering:
626+
self._stop_flush_thread()
653627
log.debug("Statsd buffering is disabled")
654628
else:
655629
self._send = self._send_to_buffer
656-
self._start_flush_thread(
657-
self._flush_interval,
658-
self.flush_buffered_metrics,
659-
)
630+
self._start_flush_thread()
660631

661632
def disable_aggregation(self):
662633
with self._config_lock:
663634
# If the toggle didn't change anything, this method is a noop
664-
if self._disable_aggregating:
635+
if self._disable_aggregation:
665636
return
666637

667-
self._disable_aggregating = True
638+
self._disable_aggregation = True
668639

669-
# If aggregation has been disabled, flush and kill the background thread
640+
# If aggregation and buffering has been disabled, flush and kill the background thread
670641
# otherwise start up the flushing thread and enable aggregation.
671-
self._stop_flush_thread()
642+
if self._disable_aggregation and self.disable_buffering:
643+
self._stop_flush_thread()
672644
log.debug("Statsd aggregation is disabled")
673645

674-
def enable_aggregation(self, aggregation_flush_interval=DEFAULT_AGGREGATION_FLUSH_INTERVAL):
646+
def enable_aggregation(self, flush_interval=DEFAULT_FLUSH_INTERVAL):
675647
with self._config_lock:
676-
if not self._disable_aggregating:
648+
if not self._disable_aggregation:
677649
return
678-
self._disable_aggregating = False
679-
self._flush_interval = aggregation_flush_interval
680-
self._send = self._send_to_server
681-
self._start_flush_thread(
682-
self._flush_interval,
683-
self.flush_aggregated_metrics,
684-
)
650+
self._disable_aggregation = False
651+
self._flush_interval = flush_interval
652+
if self._disable_buffering:
653+
self._send = self._send_to_server
654+
self._start_flush_thread()
685655

686656
@staticmethod
687657
def resolve_host(host, use_default_route):
@@ -867,7 +837,7 @@ def gauge(
867837
>>> statsd.gauge("users.online", 123)
868838
>>> statsd.gauge("active.connections", 1001, tags=["protocol:http"])
869839
"""
870-
if self._disable_aggregating:
840+
if self._disable_aggregation:
871841
self._report(metric, "g", value, tags, sample_rate)
872842
else:
873843
self.aggregator.gauge(metric, value, tags, sample_rate)
@@ -890,7 +860,7 @@ def gauge_with_timestamp(
890860
>>> statsd.gauge("users.online", 123, 1713804588)
891861
>>> statsd.gauge("active.connections", 1001, 1713804588, tags=["protocol:http"])
892862
"""
893-
if self._disable_aggregating:
863+
if self._disable_aggregation:
894864
self._report(metric, "g", value, tags, sample_rate, timestamp)
895865
else:
896866
self.aggregator.gauge(metric, value, tags, sample_rate, timestamp)
@@ -908,7 +878,7 @@ def count(
908878
909879
>>> statsd.count("page.views", 123)
910880
"""
911-
if self._disable_aggregating:
881+
if self._disable_aggregation:
912882
self._report(metric, "c", value, tags, sample_rate)
913883
else:
914884
self.aggregator.count(metric, value, tags, sample_rate)
@@ -930,7 +900,7 @@ def count_with_timestamp(
930900
931901
>>> statsd.count("files.transferred", 124, timestamp=1713804588)
932902
"""
933-
if self._disable_aggregating:
903+
if self._disable_aggregation:
934904
self._report(metric, "c", value, tags, sample_rate, timestamp)
935905
else:
936906
self.aggregator.count(metric, value, tags, sample_rate, timestamp)
@@ -949,7 +919,7 @@ def increment(
949919
>>> statsd.increment("page.views")
950920
>>> statsd.increment("files.transferred", 124)
951921
"""
952-
if self._disable_aggregating:
922+
if self._disable_aggregation:
953923
self._report(metric, "c", value, tags, sample_rate)
954924
else:
955925
self.aggregator.count(metric, value, tags, sample_rate)
@@ -969,7 +939,7 @@ def decrement(
969939
>>> statsd.decrement("active.connections", 2)
970940
"""
971941
metric_value = -value if value else value
972-
if self._disable_aggregating:
942+
if self._disable_aggregation:
973943
self._report(metric, "c", metric_value, tags, sample_rate)
974944
else:
975945
self.aggregator.count(metric, metric_value, tags, sample_rate)
@@ -1080,7 +1050,7 @@ def set(self, metric, value, tags=None, sample_rate=None):
10801050
10811051
>>> statsd.set("visitors.uniques", 999)
10821052
"""
1083-
if self._disable_aggregating:
1053+
if self._disable_aggregation:
10841054
self._report(metric, "s", value, tags, sample_rate)
10851055
else:
10861056
self.aggregator.set(metric, value, tags, sample_rate)
@@ -1533,16 +1503,7 @@ def pre_fork(self):
15331503

15341504
def post_fork_parent(self):
15351505
"""Restore the client state after a fork in the parent process."""
1536-
if self._disable_aggregating:
1537-
self._start_flush_thread(
1538-
self._flush_interval,
1539-
self.flush_buffered_metrics,
1540-
)
1541-
else:
1542-
self._start_flush_thread(
1543-
self._flush_interval,
1544-
self.flush_aggregated_metrics,
1545-
)
1506+
self._start_flush_thread()
15461507
self._start_sender_thread()
15471508
self._config_lock.release()
15481509

@@ -1565,16 +1526,7 @@ def post_fork_child(self):
15651526
self.close_socket()
15661527

15671528
with self._config_lock:
1568-
if self._disable_aggregating:
1569-
self._start_flush_thread(
1570-
self._flush_interval,
1571-
self.flush_buffered_metrics,
1572-
)
1573-
else:
1574-
self._start_flush_thread(
1575-
self._flush_interval,
1576-
self.flush_aggregated_metrics,
1577-
)
1529+
self._start_flush_thread()
15781530
self._start_sender_thread()
15791531

15801532
def stop(self):
@@ -1587,9 +1539,9 @@ def stop(self):
15871539

15881540
self.disable_background_sender()
15891541
self._disable_buffering = True
1590-
self._disable_aggregating = True
1591-
self.flush_buffered_metrics()
1542+
self._disable_aggregation = True
15921543
self.flush_aggregated_metrics()
1544+
self.flush_buffered_metrics()
15931545
self.close_socket()
15941546

15951547

0 commit comments

Comments
 (0)