@@ -65,7 +65,7 @@ enum struct RateLimitingState {
6565
6666struct RateLimitConfig {
6767 int64_t maxMemory = 2000;
68- int64_t maxTimeframes = 0 ;
68+ int64_t maxTimeframes = 1 ;
6969};
7070
7171struct MetricIndices {
@@ -77,32 +77,28 @@ struct MetricIndices {
7777 size_t shmOfferBytesConsumed = -1;
7878 size_t timeframesRead = -1;
7979 size_t timeframesConsumed = -1;
80+ size_t timeframesExpired = -1;
8081};
8182
8283std::vector<MetricIndices> createDefaultIndices(std::vector<DeviceMetricsInfo>& allDevicesMetrics)
8384{
8485 std::vector<MetricIndices> results;
8586
8687 for (auto& info : allDevicesMetrics) {
87- MetricIndices indices;
88- indices .arrowBytesCreated = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-bytes-created");
89- indices .arrowBytesDestroyed = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-bytes-destroyed");
90- indices .arrowMessagesCreated = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-messages-created");
91- indices .arrowMessagesDestroyed = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-messages-destroyed");
92- indices .arrowBytesExpired = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-bytes-expired");
93- indices .shmOfferBytesConsumed = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "shm-offer-bytes-consumed");
94- indices .timeframesRead = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "df-sent");
95- indices .timeframesConsumed = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "consumed-timeframes");
96- results.push_back(indices );
88+ results.emplace_back( MetricIndices{
89+ .arrowBytesCreated = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-bytes-created"),
90+ .arrowBytesDestroyed = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-bytes-destroyed"),
91+ .arrowMessagesCreated = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-messages-created"),
92+ .arrowMessagesDestroyed = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-messages-destroyed"),
93+ .arrowBytesExpired = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-bytes-expired"),
94+ .shmOfferBytesConsumed = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "shm-offer-bytes-consumed"),
95+ .timeframesRead = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "df-sent"),
96+ .timeframesConsumed = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "consumed-timeframes"),
97+ .timeframesExpired = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "expired-timeframes")} );
9798 }
9899 return results;
99100}
100101
101- uint64_t calculateAvailableSharedMemory(ServiceRegistryRef registry)
102- {
103- return registry.get<RateLimitConfig>().maxMemory;
104- }
105-
106102struct ResourceState {
107103 int64_t available;
108104 int64_t offered = 0;
@@ -205,29 +201,30 @@ auto offerResources(ResourceState& resourceState,
205201 // unusedOfferedSharedMemory is the amount of memory which was offered and which we know it was
206202 // not used so far. So we need to account for the amount which got actually read (readerBytesCreated)
207203 // and the amount which we know was given back.
208- static int64_t lastShmOfferConsumed = 0;
209- static int64_t lastUnusedOfferedMemory = 0;
210- if (offerConsumedCurrentValue != lastShmOfferConsumed ) {
204+ static int64_t lastResourceOfferConsumed = 0;
205+ static int64_t lastUnusedOfferedResource = 0;
206+ if (offerConsumedCurrentValue != lastResourceOfferConsumed ) {
211207 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
212208 "Offer consumed so far %llu", offerConsumedCurrentValue);
213- lastShmOfferConsumed = offerConsumedCurrentValue;
209+ lastResourceOfferConsumed = offerConsumedCurrentValue;
214210 }
215- int unusedOfferedMemory = (resourceState.offered - (offerExpiredCurrentValue + offerConsumedCurrentValue) / resourceSpec.metricOfferScaleFactor);
216- if (lastUnusedOfferedMemory != unusedOfferedMemory ) {
211+ int unusedOfferedResource = (resourceState.offered - (offerExpiredCurrentValue + offerConsumedCurrentValue) / resourceSpec.metricOfferScaleFactor);
212+ if (lastUnusedOfferedResource != unusedOfferedResource ) {
217213 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
218- "unusedOfferedMemory:%{bytes}d = offered:%{bytes}llu - (expired:%{bytes}llu + consumed:%{bytes}llu) / %lli",
219- unusedOfferedMemory, resourceState.offered,
214+ "unusedOfferedResource(%{public}s):%{bytes}d = offered:%{bytes}llu - (expired:%{bytes}llu + consumed:%{bytes}llu) / %lli",
215+ resourceSpec.name,
216+ unusedOfferedResource, resourceState.offered,
220217 offerExpiredCurrentValue / resourceSpec.metricOfferScaleFactor,
221218 offerConsumedCurrentValue / resourceSpec.metricOfferScaleFactor,
222219 resourceSpec.metricOfferScaleFactor);
223- lastUnusedOfferedMemory = unusedOfferedMemory ;
220+ lastUnusedOfferedResource = unusedOfferedResource ;
224221 }
225222 // availableSharedMemory is the amount of memory which we know is available to be offered.
226223 // We subtract the amount which we know was already offered but it's unused and we then balance how
227224 // much was created with how much was destroyed.
228- resourceState.available = resourceSpec.maxAvailable + ((disposedResourceCurrentValue - acquiredResourceCurrentValue) / resourceSpec.metricOfferScaleFactor) - unusedOfferedMemory ;
225+ resourceState.available = resourceSpec.maxAvailable + ((disposedResourceCurrentValue - acquiredResourceCurrentValue) / resourceSpec.metricOfferScaleFactor) - unusedOfferedResource ;
229226 availableResourceMetric(driverMetrics, resourceState.available, timestamp);
230- unusedOfferedResourceMetric(driverMetrics, unusedOfferedMemory , timestamp);
227+ unusedOfferedResourceMetric(driverMetrics, unusedOfferedResource , timestamp);
231228
232229 offeredResourceMetric(driverMetrics, resourceState.offered, timestamp);
233230};
@@ -258,6 +255,7 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
258255 int64_t totalMessagesDestroyed = 0;
259256 int64_t totalTimeframesRead = 0;
260257 int64_t totalTimeframesConsumed = 0;
258+ int64_t totalTimeframesExpired = 0;
261259 auto &driverMetrics = sm.driverMetricsInfo;
262260 auto &allDeviceMetrics = sm.deviceMetricsInfos;
263261 auto &specs = sm.deviceSpecs;
@@ -266,9 +264,14 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
266264 static auto stateMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "rate-limit-state");
267265 static auto totalBytesCreatedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-arrow-bytes-created");
268266 static auto shmOfferConsumedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-shm-offer-bytes-consumed");
267+ // These are really to monitor the rate limiting
269268 static auto unusedOfferedSharedMemoryMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "total-unused-offered-shared-memory");
269+ static auto unusedOfferedTimeslicesMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "total-unused-offered-timeslices");
270270 static auto availableSharedMemoryMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "total-available-shared-memory");
271+ static auto availableTimeslicesMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "total-available-timeslices");
271272 static auto offeredSharedMemoryMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "total-offered-shared-memory");
273+ static auto offeredTimeslicesMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "total-offered-timeslices");
274+
272275 static auto totalBytesDestroyedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-arrow-bytes-destroyed");
273276 static auto totalBytesExpiredMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-arrow-bytes-expired");
274277 static auto totalMessagesCreatedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-arrow-messages-created");
@@ -390,6 +393,18 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
390393 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
391394 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
392395 }
396+ {
397+ size_t index = indices.timeframesExpired;
398+ assert(index < deviceMetrics.metrics.size());
399+ changed |= deviceMetrics.changed[index];
400+ MetricInfo info = deviceMetrics.metrics[index];
401+ assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
402+ auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
403+ auto value = (int64_t)data[(info.pos - 1) % data.size()];
404+ totalTimeframesExpired += value;
405+ auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
406+ lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
407+ }
393408 }
394409 static uint64_t unchangedCount = 0;
395410 if (changed) {
@@ -407,26 +422,46 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
407422 unchangedCount++;
408423 }
409424 changedCountMetric(driverMetrics, unchangedCount, timestamp);
410- auto maxTimeframes = registry.get<RateLimitConfig>().maxTimeframes;
411- if (maxTimeframes && (totalTimeframesRead - totalTimeframesConsumed) > maxTimeframes) {
412- return;
413- }
425+
414426 static const ResourceSpec shmResourceSpec{
415427 .name = "shared memory",
416428 .unit = "MB",
417429 .api = "/shm-offer {}",
418- .maxAvailable = (int64_t)calculateAvailableSharedMemory( registry) ,
430+ .maxAvailable = (int64_t)registry.get<RateLimitConfig>().maxMemory ,
419431 .maxQuantum = 100,
420432 .minQuantum = 50,
421433 .metricOfferScaleFactor = 1000000,
422434 };
435+ static const ResourceSpec timesliceResourceSpec{
436+ .name = "timeslice",
437+ .unit = "timeslices",
438+ .api = "/timeslice-offer {}",
439+ .maxAvailable = (int64_t)registry.get<RateLimitConfig>().maxTimeframes,
440+ .maxQuantum = 1,
441+ .minQuantum = 1,
442+ .metricOfferScaleFactor = 1,
443+ };
423444 static ResourceState shmResourceState{
424445 .available = shmResourceSpec.maxAvailable,
425446 };
447+ static ResourceState timesliceResourceState{
448+ .available = timesliceResourceSpec.maxAvailable,
449+ };
426450 static ResourceStats shmResourceStats{
427451 .enoughCount = shmResourceState.available - shmResourceSpec.minQuantum > 0 ? 1 : 0,
428452 .lowCount = shmResourceState.available - shmResourceSpec.minQuantum > 0 ? 0 : 1
429453 };
454+ static ResourceStats timesliceResourceStats{
455+ .enoughCount = shmResourceState.available - shmResourceSpec.minQuantum > 0 ? 1 : 0,
456+ .lowCount = shmResourceState.available - shmResourceSpec.minQuantum > 0 ? 0 : 1
457+ };
458+
459+ O2_LOG_ENABLE(rate_limiting);
460+ offerResources(timesliceResourceState, timesliceResourceSpec, timesliceResourceStats,
461+ specs, infos, manager, totalTimeframesConsumed, totalTimeframesExpired,
462+ totalTimeframesRead, totalTimeframesConsumed, timestamp, driverMetrics,
463+ availableTimeslicesMetric, unusedOfferedTimeslicesMetric, offeredTimeslicesMetric,
464+ (void*)&sm);
430465
431466 offerResources(shmResourceState, shmResourceSpec, shmResourceStats,
432467 specs, infos, manager, shmOfferBytesConsumed, totalBytesExpired,
@@ -487,18 +522,18 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
487522 } else {
488523 config->maxMemory = readers * 500;
489524 }
490- if (dc.options.count("timeframes-rate-limit") && dc.options["timeframes-rate-limit"].as<std::string>() == "readers") {
491- config->maxTimeframes = readers;
492- } else {
525+ if (dc.options.count("timeframes-rate-limit") && dc.options["timeframes-rate-limit"].defaulted() == false) {
493526 config->maxTimeframes = std::stoll(dc.options["timeframes-rate-limit"].as<std::string>());
527+ } else {
528+ config->maxTimeframes = readers;
494529 }
495530 static bool once = false;
496531 // Until we guarantee this is called only once...
497532 if (!once) {
498533 O2_SIGNPOST_ID_GENERATE(sid, rate_limiting);
499534 O2_SIGNPOST_EVENT_EMIT_INFO(rate_limiting, sid, "setup",
500- "Rate limiting set up at %{bytes}llu MB distributed over %d readers",
501- config->maxMemory, readers);
535+ "Rate limiting set up at %{bytes}llu MB and %llu timeframes distributed over %d readers",
536+ config->maxMemory, config->maxTimeframes, readers);
502537 registry.registerService(ServiceRegistryHelpers::handleForService<RateLimitConfig>(config));
503538 once = true;
504539 } },
0 commit comments