From 55452dd34eabca44616b23b296ef86543f313100 Mon Sep 17 00:00:00 2001 From: Giulio Eulisse <10544+ktf@users.noreply.github.com> Date: Fri, 14 Nov 2025 14:58:58 +0100 Subject: [PATCH] DPL: fix timeslice rate limiting issues --- .../src/AODJAlienReaderHelpers.cxx | 6 ++- .../include/Framework/CommonDataProcessors.h | 1 + .../include/Framework/DataProcessingStats.h | 8 ++- Framework/Core/src/ArrowSupport.cxx | 51 +++++++++++++++++-- Framework/Core/src/CommonDataProcessors.cxx | 40 +++++++++++++-- Framework/Core/src/CommonServices.cxx | 20 +++++++- .../Core/src/ComputingQuotaEvaluator.cxx | 15 +++--- Framework/Core/src/DataProcessingDevice.cxx | 4 +- Framework/Core/src/DataProcessingStats.cxx | 10 ++++ Framework/Core/src/WorkflowHelpers.cxx | 5 +- 10 files changed, 140 insertions(+), 20 deletions(-) diff --git a/Framework/AnalysisSupport/src/AODJAlienReaderHelpers.cxx b/Framework/AnalysisSupport/src/AODJAlienReaderHelpers.cxx index 1d4ae5a4d3c49..b532c51b8d307 100644 --- a/Framework/AnalysisSupport/src/AODJAlienReaderHelpers.cxx +++ b/Framework/AnalysisSupport/src/AODJAlienReaderHelpers.cxx @@ -199,7 +199,7 @@ AlgorithmSpec AODJAlienReaderHelpers::rootFileReaderCallback(ConfigContext const numTF, watchdog, maxRate, - didir, reportTFN, reportTFFileName](Monitoring& monitoring, DataAllocator& outputs, ControlService& control, DeviceSpec const& device) { + didir, reportTFN, reportTFFileName](Monitoring& monitoring, DataAllocator& outputs, ControlService& control, DeviceSpec const& device, DataProcessingStats& dpstats) { // Each parallel reader device.inputTimesliceId reads the files fileCounter*device.maxInputTimeslices+device.inputTimesliceId // the TF to read is numTF assert(device.inputTimesliceId < device.maxInputTimeslices); @@ -302,6 +302,10 @@ AlgorithmSpec AODJAlienReaderHelpers::rootFileReaderCallback(ConfigContext const } } totalDFSent++; + + // Use the new API for sending TIMESLICE_NUMBER_STARTED + dpstats.updateStats({(int)ProcessingStatsId::TIMESLICE_NUMBER_STARTED, DataProcessingStats::Op::Add, 1}); + dpstats.processCommandQueue(); monitoring.send(Metric{(uint64_t)totalDFSent, "df-sent"}.addTag(Key::Subsystem, monitoring::tags::Value::DPL)); monitoring.send(Metric{(uint64_t)totalSizeUncompressed / 1000, "aod-bytes-read-uncompressed"}.addTag(Key::Subsystem, monitoring::tags::Value::DPL)); monitoring.send(Metric{(uint64_t)totalSizeCompressed / 1000, "aod-bytes-read-compressed"}.addTag(Key::Subsystem, monitoring::tags::Value::DPL)); diff --git a/Framework/Core/include/Framework/CommonDataProcessors.h b/Framework/Core/include/Framework/CommonDataProcessors.h index d3ef596a0c0e0..48e240c59e5d2 100644 --- a/Framework/Core/include/Framework/CommonDataProcessors.h +++ b/Framework/Core/include/Framework/CommonDataProcessors.h @@ -41,6 +41,7 @@ struct CommonDataProcessors { /// and simply discards them. Rate limiting goes through the DPL driver static DataProcessorSpec getScheduledDummySink(std::vector const& danglingInputs); static AlgorithmSpec wrapWithRateLimiting(AlgorithmSpec spec); + static AlgorithmSpec wrapWithTimesliceConsumption(AlgorithmSpec spec); }; } // namespace o2::framework diff --git a/Framework/Core/include/Framework/DataProcessingStats.h b/Framework/Core/include/Framework/DataProcessingStats.h index e115e5d12b58f..e32523c9abb08 100644 --- a/Framework/Core/include/Framework/DataProcessingStats.h +++ b/Framework/Core/include/Framework/DataProcessingStats.h @@ -57,13 +57,15 @@ enum struct ProcessingStatsId : short { CPU_USAGE_FRACTION, ARROW_BYTES_CREATED, ARROW_BYTES_DESTROYED, + ARROW_BYTES_EXPIRED, ARROW_MESSAGES_CREATED, ARROW_MESSAGES_DESTROYED, - ARROW_BYTES_EXPIRED, + TIMESLICE_OFFER_NUMBER_CONSUMED, + TIMESLICE_NUMBER_STARTED, TIMESLICE_NUMBER_EXPIRED, + TIMESLICE_NUMBER_DONE, RESOURCE_OFFER_EXPIRED, SHM_OFFER_BYTES_CONSUMED, - TIMESLICE_OFFER_NUMBER_CONSUMED, RESOURCES_MISSING, RESOURCES_INSUFFICIENT, RESOURCES_SATISFACTORY, @@ -172,9 +174,11 @@ struct DataProcessingStats { }; void registerMetric(MetricSpec const& spec); + // Update some stats as specified by the @cmd cmd void updateStats(CommandSpec cmd); + char const* findMetricNameById(ProcessingStatsId id) const; /// This will process the queue of commands required to update the stats. /// It is meant to be called periodically by a single thread. void processCommandQueue(); diff --git a/Framework/Core/src/ArrowSupport.cxx b/Framework/Core/src/ArrowSupport.cxx index da00c8db42280..329af960f8bfb 100644 --- a/Framework/Core/src/ArrowSupport.cxx +++ b/Framework/Core/src/ArrowSupport.cxx @@ -79,6 +79,10 @@ struct MetricIndices { size_t timeframesRead = -1; size_t timeframesConsumed = -1; size_t timeframesExpired = -1; + // Timeslices counting + size_t timeslicesStarted = -1; + size_t timeslicesExpired = -1; + size_t timeslicesDone = -1; }; std::vector createDefaultIndices(std::vector& allDevicesMetrics) @@ -95,7 +99,11 @@ std::vector createDefaultIndices(std::vector& .shmOfferBytesConsumed = DeviceMetricsHelper::bookNumericMetric(info, "shm-offer-bytes-consumed"), .timeframesRead = DeviceMetricsHelper::bookNumericMetric(info, "df-sent"), .timeframesConsumed = DeviceMetricsHelper::bookNumericMetric(info, "consumed-timeframes"), - .timeframesExpired = DeviceMetricsHelper::bookNumericMetric(info, "expired-timeframes")}); + .timeframesExpired = DeviceMetricsHelper::bookNumericMetric(info, "expired-timeframes"), + .timeslicesStarted = DeviceMetricsHelper::bookNumericMetric(info, "timeslices-started"), + .timeslicesExpired = DeviceMetricsHelper::bookNumericMetric(info, "timeslices-expired"), + .timeslicesDone = DeviceMetricsHelper::bookNumericMetric(info, "timeslices-done"), + }); } return results; } @@ -230,6 +238,19 @@ auto offerResources(ResourceState& resourceState, offeredResourceMetric(driverMetrics, resourceState.offered, timestamp); }; +auto processTimeslices = [](size_t index, DeviceMetricsInfo& deviceMetrics, bool& changed, + int64_t& totalMetricValue, size_t& lastTimestamp) { + assert(index < deviceMetrics.metrics.size()); + changed |= deviceMetrics.changed[index]; + MetricInfo info = deviceMetrics.metrics[index]; + assert(info.storeIdx < deviceMetrics.uint64Metrics.size()); + auto& data = deviceMetrics.uint64Metrics[info.storeIdx]; + auto value = (int64_t)data[(info.pos - 1) % data.size()]; + totalMetricValue += value; + auto const& timestamps = DeviceMetricsHelper::getTimestampsStore(deviceMetrics)[info.storeIdx]; + lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]); +}; + o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() { using o2::monitoring::Metric; @@ -257,11 +278,22 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() int64_t totalTimeframesRead = 0; int64_t totalTimeframesConsumed = 0; int64_t totalTimeframesExpired = 0; + int64_t totalTimeslicesStarted = 0; + int64_t totalTimeslicesDone = 0; + int64_t totalTimeslicesExpired = 0; auto &driverMetrics = sm.driverMetricsInfo; auto &allDeviceMetrics = sm.deviceMetricsInfos; auto &specs = sm.deviceSpecs; auto &infos = sm.deviceInfos; + // Aggregated driver metrics for timeslice rate limiting + auto createUint64DriverMetric = [&driverMetrics](char const*name) -> auto { + return DeviceMetricsHelper::createNumericMetric(driverMetrics, name); + }; + auto createIntDriverMetric = [&driverMetrics](char const*name) -> auto { + return DeviceMetricsHelper::createNumericMetric(driverMetrics, name); + }; + static auto stateMetric = DeviceMetricsHelper::createNumericMetric(driverMetrics, "rate-limit-state"); static auto totalBytesCreatedMetric = DeviceMetricsHelper::createNumericMetric(driverMetrics, "total-arrow-bytes-created"); static auto shmOfferConsumedMetric = DeviceMetricsHelper::createNumericMetric(driverMetrics, "total-shm-offer-bytes-consumed"); @@ -280,6 +312,12 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() static auto totalTimeframesReadMetric = DeviceMetricsHelper::createNumericMetric(driverMetrics, "total-timeframes-read"); static auto totalTimeframesConsumedMetric = DeviceMetricsHelper::createNumericMetric(driverMetrics, "total-timeframes-consumed"); static auto totalTimeframesInFlyMetric = DeviceMetricsHelper::createNumericMetric(driverMetrics, "total-timeframes-in-fly"); + + static auto totalTimeslicesStartedMetric = createUint64DriverMetric("total-timeslices-started"); + static auto totalTimeslicesExpiredMetric = createUint64DriverMetric("total-timeslices-expired"); + static auto totalTimeslicesDoneMetric = createUint64DriverMetric("total-timeslices-done"); + static auto totalTimeslicesInFlyMetric = createIntDriverMetric("total-timeslices-in-fly"); + static auto totalBytesDeltaMetric = DeviceMetricsHelper::createNumericMetric(driverMetrics, "arrow-bytes-delta"); static auto changedCountMetric = DeviceMetricsHelper::createNumericMetric(driverMetrics, "changed-metrics-count"); static auto totalSignalsMetric = DeviceMetricsHelper::createNumericMetric(driverMetrics, "aod-reader-signals"); @@ -406,6 +444,9 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() auto const& timestamps = DeviceMetricsHelper::getTimestampsStore(deviceMetrics)[info.storeIdx]; lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]); } + processTimeslices(indices.timeslicesStarted, deviceMetrics, changed, totalTimeslicesStarted, lastTimestamp); + processTimeslices(indices.timeslicesExpired, deviceMetrics, changed, totalTimeslicesExpired, lastTimestamp); + processTimeslices(indices.timeslicesDone, deviceMetrics, changed, totalTimeslicesDone, lastTimestamp); } static uint64_t unchangedCount = 0; if (changed) { @@ -418,6 +459,10 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() totalTimeframesReadMetric(driverMetrics, totalTimeframesRead, timestamp); totalTimeframesConsumedMetric(driverMetrics, totalTimeframesConsumed, timestamp); totalTimeframesInFlyMetric(driverMetrics, (int)(totalTimeframesRead - totalTimeframesConsumed), timestamp); + totalTimeslicesStartedMetric(driverMetrics, totalTimeslicesStarted, timestamp); + totalTimeslicesExpiredMetric(driverMetrics, totalTimeslicesExpired, timestamp); + totalTimeslicesDoneMetric(driverMetrics, totalTimeslicesDone, timestamp); + totalTimeslicesInFlyMetric(driverMetrics, (int)(totalTimeslicesStarted - totalTimeslicesDone), timestamp); totalBytesDeltaMetric(driverMetrics, totalBytesCreated - totalBytesExpired - totalBytesDestroyed, timestamp); } else { unchangedCount++; @@ -458,8 +503,8 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() }; offerResources(timesliceResourceState, timesliceResourceSpec, timesliceResourceStats, - specs, infos, manager, totalTimeframesConsumed, totalTimeframesExpired, - totalTimeframesRead, totalTimeframesConsumed, timestamp, driverMetrics, + specs, infos, manager, totalTimeframesConsumed, totalTimeslicesExpired, + totalTimeslicesStarted, totalTimeslicesDone, timestamp, driverMetrics, availableTimeslicesMetric, unusedOfferedTimeslicesMetric, offeredTimeslicesMetric, (void*)&sm); diff --git a/Framework/Core/src/CommonDataProcessors.cxx b/Framework/Core/src/CommonDataProcessors.cxx index 5d99fd3db7578..67c6314de1c34 100644 --- a/Framework/Core/src/CommonDataProcessors.cxx +++ b/Framework/Core/src/CommonDataProcessors.cxx @@ -45,6 +45,7 @@ using namespace o2::framework::data_matcher; // Special log to track callbacks we know about O2_DECLARE_DYNAMIC_LOG(callbacks); O2_DECLARE_DYNAMIC_LOG(rate_limiting); +O2_DECLARE_DYNAMIC_LOG(quota); namespace o2::framework { @@ -212,7 +213,7 @@ DataProcessorSpec CommonDataProcessors::getDummySink(std::vector cons auto oldestPossingTimeslice = timesliceIndex.getOldestPossibleOutput().timeslice.value; auto& stats = services.get(); stats.updateStats({(int)ProcessingStatsId::CONSUMED_TIMEFRAMES, DataProcessingStats::Op::Set, (int64_t)oldestPossingTimeslice}); - stats.updateStats({(int)ProcessingStatsId::TIMESLICE_OFFER_NUMBER_CONSUMED, DataProcessingStats::Op::Set, (int64_t)oldestPossingTimeslice}); + stats.updateStats({(int)ProcessingStatsId::TIMESLICE_NUMBER_DONE, DataProcessingStats::Op::Set, (int64_t)oldestPossingTimeslice}); stats.processCommandQueue(); }; callbacks.set(domainInfoUpdated); @@ -247,7 +248,7 @@ DataProcessorSpec CommonDataProcessors::getScheduledDummySink(std::vector(domainInfoUpdated); @@ -257,7 +258,8 @@ DataProcessorSpec CommonDataProcessors::getScheduledDummySink(std::vector void { + original(pcx); + + auto disposeResources = [](int taskId, + std::array& offers, + ComputingQuotaStats& stats, + std::function accountDisposed) { + ComputingQuotaOffer disposed; + disposed.sharedMemory = 0; + // When invoked, we have processed one timeslice by construction. + int64_t timeslicesProcessed = 1; + for (auto& offer : offers) { + if (offer.user != taskId) { + continue; + } + int64_t toRemove = std::min((int64_t)timeslicesProcessed, offer.timeslices); + offer.timeslices -= toRemove; + timeslicesProcessed -= toRemove; + disposed.timeslices += toRemove; + if (timeslicesProcessed <= 0) { + break; + } + } + return accountDisposed(disposed, stats); + }; + pcx.services().get().offerConsumers.emplace_back(disposeResources); + }); +} + } // namespace o2::framework diff --git a/Framework/Core/src/CommonServices.cxx b/Framework/Core/src/CommonServices.cxx index aedd96dab41ca..3aa46269bdd7e 100644 --- a/Framework/Core/src/CommonServices.cxx +++ b/Framework/Core/src/CommonServices.cxx @@ -1080,6 +1080,14 @@ o2::framework::ServiceSpec CommonServices::dataProcessingStats() .minPublishInterval = 0, .maxRefreshLatency = 10000, .sendInitialValue = true}, + MetricSpec{.name = "timeslice-offer-number-consumed", + .enabled = arrowAndResourceLimitingMetrics, + .metricId = static_cast(ProcessingStatsId::TIMESLICE_OFFER_NUMBER_CONSUMED), + .kind = Kind::UInt64, + .scope = Scope::DPL, + .minPublishInterval = 0, + .maxRefreshLatency = 10000, + .sendInitialValue = true}, MetricSpec{.name = "timeslices-expired", .enabled = arrowAndResourceLimitingMetrics, .metricId = static_cast(ProcessingStatsId::TIMESLICE_NUMBER_EXPIRED), @@ -1088,9 +1096,17 @@ o2::framework::ServiceSpec CommonServices::dataProcessingStats() .minPublishInterval = 0, .maxRefreshLatency = 10000, .sendInitialValue = true}, - MetricSpec{.name = "timeslices-consumed", + MetricSpec{.name = "timeslices-started", .enabled = arrowAndResourceLimitingMetrics, - .metricId = static_cast(ProcessingStatsId::TIMESLICE_OFFER_NUMBER_CONSUMED), + .metricId = static_cast(ProcessingStatsId::TIMESLICE_NUMBER_STARTED), + .kind = Kind::UInt64, + .scope = Scope::DPL, + .minPublishInterval = 0, + .maxRefreshLatency = 10000, + .sendInitialValue = true}, + MetricSpec{.name = "timeslices-done", + .enabled = arrowAndResourceLimitingMetrics, + .metricId = static_cast(ProcessingStatsId::TIMESLICE_NUMBER_DONE), .kind = Kind::UInt64, .scope = Scope::DPL, .minPublishInterval = 0, diff --git a/Framework/Core/src/ComputingQuotaEvaluator.cxx b/Framework/Core/src/ComputingQuotaEvaluator.cxx index 85a5e6be467a7..3f5bff2b53fab 100644 --- a/Framework/Core/src/ComputingQuotaEvaluator.cxx +++ b/Framework/Core/src/ComputingQuotaEvaluator.cxx @@ -246,7 +246,7 @@ void ComputingQuotaEvaluator::dispose(int taskId) void ComputingQuotaEvaluator::updateOffers(std::vector& pending, uint64_t now) { O2_SIGNPOST_ID_GENERATE(oid, quota); - O2_SIGNPOST_START(quota, oid, "updateOffers", "Starting to processe received offers"); + O2_SIGNPOST_START(quota, oid, "updateOffers", "Starting to process %zu received offers", pending.size()); int lastValid = -1; for (size_t oi = 0; oi < mOffers.size(); oi++) { auto& storeOffer = mOffers[oi]; @@ -283,7 +283,9 @@ void ComputingQuotaEvaluator::updateOffers(std::vector& pen lastValidOffer.runtime = std::max(lastValidOffer.runtime, stillPending.runtime); } pending.clear(); - O2_SIGNPOST_END(quota, oid, "updateOffers", "Remaining offers cohalesced to %d", lastValid); + auto& updatedOffer = mOffers[lastValid]; + O2_SIGNPOST_END(quota, oid, "updateOffers", "Remaining offers cohalesced to %d. New values: Cpu%d, Shared Memory %lli, Timeslices %lli", + lastValid, updatedOffer.cpu, updatedOffer.sharedMemory, updatedOffer.timeslices); } void ComputingQuotaEvaluator::handleExpired(std::function expirator) @@ -304,8 +306,8 @@ void ComputingQuotaEvaluator::handleExpired(std::function= 0); - mStats.totalExpiredBytes += offer.sharedMemory; + mStats.totalExpiredBytes += std::max(offer.sharedMemory, 0); + mStats.totalExpiredTimeslices += std::max(offer.timeslices, 0); mStats.totalExpiredOffers++; expirator(offer, mStats); // driverClient.tell("expired shmem {}", offer.sharedMemory); // driverClient.tell("expired cpu {}", offer.cpu); offer.sharedMemory = -1; + offer.timeslices = -1; offer.valid = false; offer.score = OfferScore::Unneeded; } diff --git a/Framework/Core/src/DataProcessingDevice.cxx b/Framework/Core/src/DataProcessingDevice.cxx index 42206e160d726..3b430378dc0b0 100644 --- a/Framework/Core/src/DataProcessingDevice.cxx +++ b/Framework/Core/src/DataProcessingDevice.cxx @@ -247,7 +247,9 @@ void run_completion(uv_work_t* handle, int status) static std::function reportConsumedOffer = [ref](ComputingQuotaOffer const& accumulatedConsumed, ComputingQuotaStats& stats) { auto& dpStats = ref.get(); stats.totalConsumedBytes += accumulatedConsumed.sharedMemory; - stats.totalConsumedTimeslices += accumulatedConsumed.timeslices; + // For now we give back the offer if we did not use it completely. + // In principle we should try to run until the offer is fully consumed. + stats.totalConsumedTimeslices += std::min(accumulatedConsumed.timeslices, 1); dpStats.updateStats({static_cast(ProcessingStatsId::SHM_OFFER_BYTES_CONSUMED), DataProcessingStats::Op::Set, stats.totalConsumedBytes}); dpStats.updateStats({static_cast(ProcessingStatsId::TIMESLICE_OFFER_NUMBER_CONSUMED), DataProcessingStats::Op::Set, stats.totalConsumedTimeslices}); diff --git a/Framework/Core/src/DataProcessingStats.cxx b/Framework/Core/src/DataProcessingStats.cxx index 3b02a0aacdd70..8349af62acdc2 100644 --- a/Framework/Core/src/DataProcessingStats.cxx +++ b/Framework/Core/src/DataProcessingStats.cxx @@ -29,6 +29,16 @@ DataProcessingStats::DataProcessingStats(std::function