diff --git a/Framework/Core/include/Framework/DataRelayer.h b/Framework/Core/include/Framework/DataRelayer.h index 012b909096317..1e010fc12f3d4 100644 --- a/Framework/Core/include/Framework/DataRelayer.h +++ b/Framework/Core/include/Framework/DataRelayer.h @@ -114,6 +114,9 @@ class DataRelayer using OnDropCallback = std::function&, TimesliceIndex::OldestOutputInfo info)>; + // Callback for when some messages are about to be owned by the the DataRelayer + using OnInsertionCallback = std::function&)>; + /// Prune all the pending entries in the cache. void prunePending(OnDropCallback); /// Prune the cache for a given slot @@ -135,6 +138,7 @@ class DataRelayer InputInfo const& info, size_t nMessages, size_t nPayloads = 1, + OnInsertionCallback onInsertion = nullptr, OnDropCallback onDrop = nullptr); /// This is to set the oldest possible @a timeslice this relayer can diff --git a/Framework/Core/src/DataProcessingDevice.cxx b/Framework/Core/src/DataProcessingDevice.cxx index 3925359b056b2..343b567d8b852 100644 --- a/Framework/Core/src/DataProcessingDevice.cxx +++ b/Framework/Core/src/DataProcessingDevice.cxx @@ -1859,6 +1859,7 @@ void DataProcessingDevice::handleData(ServiceRegistryRef ref, InputChannelInfo& input, nMessages, nPayloadsPerHeader, + nullptr, onDrop); switch (relayed.type) { case DataRelayer::RelayChoice::Type::Backpressured: diff --git a/Framework/Core/src/DataProcessingHelpers.cxx b/Framework/Core/src/DataProcessingHelpers.cxx index 2f7a1f65f3bd3..87e7c9bf8962f 100644 --- a/Framework/Core/src/DataProcessingHelpers.cxx +++ b/Framework/Core/src/DataProcessingHelpers.cxx @@ -343,9 +343,7 @@ auto DataProcessingHelpers::routeForwardedMessageSet(FairMQDeviceProxy& proxy, const bool copyByDefault, bool consume) -> std::vector { // we collect all messages per forward in a map and send them together - std::vector forwardedParts; - forwardedParts.resize(proxy.getNumForwards()); - std::vector forwardingChoices{}; + std::vector forwardedParts(proxy.getNumForwardChannels()); for (size_t ii = 0, ie = currentSetOfInputs.size(); ii < ie; ++ii) { auto span = std::span(currentSetOfInputs[ii].messages); diff --git a/Framework/Core/src/DataRelayer.cxx b/Framework/Core/src/DataRelayer.cxx index 01e7a2b29fd35..ea2c4c0b73316 100644 --- a/Framework/Core/src/DataRelayer.cxx +++ b/Framework/Core/src/DataRelayer.cxx @@ -436,7 +436,8 @@ DataRelayer::RelayChoice InputInfo const& info, size_t nMessages, size_t nPayloads, - std::function&, TimesliceIndex::OldestOutputInfo)> onDrop) + OnInsertionCallback onInsertion, + OnDropCallback onDrop) { std::scoped_lock lock(mMutex); DataProcessingHeader const* dph = o2::header::get(rawHeader); @@ -482,6 +483,7 @@ DataRelayer::RelayChoice &messages, &nMessages, &nPayloads, + &onInsertion, &cache = mCache, &services = mContext, numInputTypes = mDistinctRoutesIndex.size()](TimesliceId timeslice, int input, TimesliceSlot slot, InputInfo const& info) -> size_t { @@ -512,7 +514,11 @@ DataRelayer::RelayChoice mi += nPayloads; continue; } - target.add([&messages, &mi](size_t i) -> fair::mq::MessagePtr& { return messages[mi + i]; }, nPayloads + 1); + auto span = std::span(messages + mi, messages + mi + nPayloads + 1); + if (onInsertion) { + onInsertion(services, span); + } + target.add([&span](size_t i) -> fair::mq::MessagePtr& { return span[i]; }, nPayloads + 1); mi += nPayloads; saved += nPayloads; } diff --git a/Framework/Core/test/test_ForwardInputs.cxx b/Framework/Core/test/test_ForwardInputs.cxx index fe9f70d1daadb..7081d600080b1 100644 --- a/Framework/Core/test/test_ForwardInputs.cxx +++ b/Framework/Core/test/test_ForwardInputs.cxx @@ -616,6 +616,80 @@ TEST_CASE("ForwardInputsSplitPayload") CHECK(result[1].Size() == 3); } +TEST_CASE("ForwardInputsSplitPayloadNoMessageSet") +{ + o2::header::DataHeader dh; + dh.dataOrigin = "TST"; + dh.dataDescription = "A"; + dh.subSpecification = 0; + dh.splitPayloadIndex = 2; + dh.splitPayloadParts = 2; + + o2::header::DataHeader dh2; + dh2.dataOrigin = "TST"; + dh2.dataDescription = "B"; + dh2.subSpecification = 0; + dh2.splitPayloadIndex = 0; + dh2.splitPayloadParts = 1; + + o2::framework::DataProcessingHeader dph{0, 1}; + + std::vector channels{ + fair::mq::Channel("from_A_to_B"), + fair::mq::Channel("from_A_to_C"), + }; + + bool consume = true; + bool copyByDefault = true; + FairMQDeviceProxy proxy; + std::vector routes{ + ForwardRoute{ + .timeslice = 0, + .maxTimeslices = 1, + .matcher = {"binding", ConcreteDataMatcher{"TST", "B", 0}}, + .channel = "from_A_to_B", + .policy = nullptr, + }, + ForwardRoute{ + .timeslice = 0, + .maxTimeslices = 1, + .matcher = {"binding", ConcreteDataMatcher{"TST", "A", 0}}, + .channel = "from_A_to_C", + .policy = nullptr, + }}; + + auto findChannelByName = [&channels](std::string const& channelName) -> fair::mq::Channel& { + for (auto& channel : channels) { + if (channel.GetName() == channelName) { + return channel; + } + } + throw std::runtime_error("Channel not found"); + }; + + proxy.bind({}, {}, routes, findChannelByName, nullptr); + + auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq"); + fair::mq::MessagePtr payload1(transport->CreateMessage()); + fair::mq::MessagePtr payload2(transport->CreateMessage()); + auto channelAlloc = o2::pmr::getTransportAllocator(transport.get()); + auto header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh, dph}); + std::vector> messages; + messages.push_back(std::move(header)); + messages.push_back(std::move(payload1)); + messages.push_back(std::move(payload2)); + auto header2 = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh2, dph}); + messages.push_back(std::move(header2)); + messages.push_back(transport->CreateMessage()); + + std::vector result(2); + auto span = std::span(messages); + o2::framework::DataProcessingHelpers::routeForwardedMessages(proxy, span, result, copyByDefault, consume); + REQUIRE(result.size() == 2); // Two routes + CHECK(result[0].Size() == 2); // No messages on this route + CHECK(result[1].Size() == 3); +} + TEST_CASE("ForwardInputEOSSingleRoute") { o2::framework::SourceInfoHeader sih{};