diff --git a/Framework/Core/src/DataProcessingDevice.cxx b/Framework/Core/src/DataProcessingDevice.cxx index 343b567d8b852..f33db316ca9de 100644 --- a/Framework/Core/src/DataProcessingDevice.cxx +++ b/Framework/Core/src/DataProcessingDevice.cxx @@ -617,6 +617,20 @@ static auto forwardInputs = [](ServiceRegistryRef registry, TimesliceSlot slot, O2_SIGNPOST_END(forwarding, sid, "forwardInputs", "Forwarding done"); }; +static auto cleanEarlyForward = [](ServiceRegistryRef registry, TimesliceSlot slot, std::vector& currentSetOfInputs, + TimesliceIndex::OldestOutputInfo oldestTimeslice, bool copy, bool consume = true) { + auto& proxy = registry.get(); + + O2_SIGNPOST_ID_GENERATE(sid, forwarding); + O2_SIGNPOST_START(forwarding, sid, "forwardInputs", "Cleaning up slot %zu with oldestTimeslice %zu %{public}s%{public}s%{public}s", + slot.index, oldestTimeslice.timeslice.value, copy ? "with copy" : "", copy && consume ? " and " : "", consume ? "with consume" : ""); + // Always copy them, because we do not want to actually send them. + // We merely need the side effect of the consume, if applicable. + auto forwardedParts = DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, true, consume); + + O2_SIGNPOST_END(forwarding, sid, "forwardInputs", "Forwarding done"); +}; + extern volatile int region_read_global_dummy_variable; volatile int region_read_global_dummy_variable; @@ -1680,6 +1694,53 @@ struct WaitBackpressurePolicy { } }; +auto forwardOnInsertion(ServiceRegistryRef& ref, std::span& messages) -> void +{ + O2_LOG_ENABLE(forwarding); + O2_SIGNPOST_ID_GENERATE(sid, forwarding); + + auto& spec = ref.get(); + auto& context = ref.get(); + if (!context.canForwardEarly || spec.forwards.empty()) { + O2_SIGNPOST_EVENT_EMIT(device, sid, "device", "Early forwardinding not enabled / needed."); + return; + } + + O2_SIGNPOST_EVENT_EMIT(device, sid, "device", "Early forwardinding before injecting data into relayer."); + auto& timesliceIndex = ref.get(); + auto oldestTimeslice = timesliceIndex.getOldestPossibleOutput(); + + auto& proxy = ref.get(); + + O2_SIGNPOST_START(forwarding, sid, "forwardInputs", + "Starting forwarding for incoming messages with oldestTimeslice %zu with copy", + oldestTimeslice.timeslice.value); + std::vector forwardedParts(proxy.getNumForwardChannels()); + DataProcessingHelpers::routeForwardedMessages(proxy, messages, forwardedParts, true, false); + + for (int fi = 0; fi < proxy.getNumForwardChannels(); fi++) { + if (forwardedParts[fi].Size() == 0) { + continue; + } + ForwardChannelInfo info = proxy.getForwardChannelInfo(ChannelIndex{fi}); + auto& parts = forwardedParts[fi]; + if (info.policy == nullptr) { + O2_SIGNPOST_EVENT_EMIT_ERROR(forwarding, sid, "forwardInputs", "Forwarding to %{public}s %d has no policy.", info.name.c_str(), fi); + continue; + } + O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding to %{public}s %d", info.name.c_str(), fi); + info.policy->forward(parts, ChannelIndex{fi}, ref); + } + auto& asyncQueue = ref.get(); + auto& decongestion = ref.get(); + O2_SIGNPOST_ID_GENERATE(aid, async_queue); + O2_SIGNPOST_EVENT_EMIT(async_queue, aid, "forwardInputs", "Queuing forwarding oldestPossible %zu", oldestTimeslice.timeslice.value); + AsyncQueueHelpers::post(asyncQueue, AsyncTask{.timeslice = oldestTimeslice.timeslice, .id = decongestion.oldestPossibleTimesliceTask, .debounce = -1, .callback = decongestionCallbackLate} + .user({.ref = ref, .oldestTimeslice = oldestTimeslice})); + O2_SIGNPOST_END(forwarding, sid, "forwardInputs", "Forwarding done"); + O2_LOG_DISABLE(forwarding); +}; + /// This is the inner loop of our framework. The actual implementation /// is divided in two parts. In the first one we define a set of lambdas /// which describe what is actually going to happen, hiding all the state @@ -1854,12 +1915,13 @@ void DataProcessingDevice::handleData(ServiceRegistryRef ref, InputChannelInfo& VariableContextHelpers::getTimeslice(variables); forwardInputs(ref, slot, dropped, oldestOutputInfo, false, true); }; + auto relayed = relayer.relay(parts.At(headerIndex)->GetData(), &parts.At(headerIndex), input, nMessages, nPayloadsPerHeader, - nullptr, + forwardOnInsertion, onDrop); switch (relayed.type) { case DataRelayer::RelayChoice::Type::Backpressured: @@ -2274,9 +2336,16 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v bool consumeSomething = action.op == CompletionPolicy::CompletionOp::Consume || action.op == CompletionPolicy::CompletionOp::ConsumeExisting; if (context.canForwardEarly && hasForwards && consumeSomething) { - O2_SIGNPOST_EVENT_EMIT(device, aid, "device", "Early forwainding: %{public}s.", fmt::format("{}", action.op).c_str()); + // We used to do fowarding here, however we now do it much earlier. + // We still need to clean the inputs which were already consumed + // via ConsumeExisting and which still have an header to hold the slot. + // FIXME: do we? This should really happen when we do the forwarding on + // insertion, because otherwise we lose the relevant information on how to + // navigate the set of headers. We could actually rely on the messageset index, + // is that the right thing to do though? + O2_SIGNPOST_EVENT_EMIT(device, aid, "device", "cleaning early forwarding: %{public}s.", fmt::format("{}", action.op).c_str()); auto& timesliceIndex = ref.get(); - forwardInputs(ref, action.slot, currentSetOfInputs, timesliceIndex.getOldestPossibleOutput(), true, action.op == CompletionPolicy::CompletionOp::Consume); + cleanEarlyForward(ref, action.slot, currentSetOfInputs, timesliceIndex.getOldestPossibleOutput(), true, action.op == CompletionPolicy::CompletionOp::Consume); } markInputsAsDone(action.slot); diff --git a/Framework/Core/src/DataRelayer.cxx b/Framework/Core/src/DataRelayer.cxx index ea2c4c0b73316..d9c340cd9c225 100644 --- a/Framework/Core/src/DataRelayer.cxx +++ b/Framework/Core/src/DataRelayer.cxx @@ -499,6 +499,12 @@ DataRelayer::RelayChoice // DataRelayer::relay assert(nPayloads > 0); size_t saved = 0; + // It's guaranteed we will see all these messages only once, so we can + // do the forwarding here. + auto allMessages = std::span(messages, messages + nMessages); + if (onInsertion) { + onInsertion(services, allMessages); + } for (size_t mi = 0; mi < nMessages; ++mi) { assert(mi + nPayloads < nMessages); // We are in calibration mode and the data does not have the calibration bit set. @@ -515,9 +521,6 @@ DataRelayer::RelayChoice continue; } auto span = std::span(messages + mi, messages + mi + nPayloads + 1); - if (onInsertion) { - onInsertion(services, span); - } target.add([&span](size_t i) -> fair::mq::MessagePtr& { return span[i]; }, nPayloads + 1); mi += nPayloads; saved += nPayloads;