From 22a38403a859fd28d910b41b280b0e00fc2add3c Mon Sep 17 00:00:00 2001 From: Giulio Eulisse <10544+ktf@users.noreply.github.com> Date: Tue, 4 Nov 2025 12:29:38 +0100 Subject: [PATCH 1/2] DPL: add signposts to debug task scheduling --- Framework/Core/src/DataProcessingDevice.cxx | 29 +++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/Framework/Core/src/DataProcessingDevice.cxx b/Framework/Core/src/DataProcessingDevice.cxx index aa194b525ca5d..86e2aab53791d 100644 --- a/Framework/Core/src/DataProcessingDevice.cxx +++ b/Framework/Core/src/DataProcessingDevice.cxx @@ -10,6 +10,7 @@ // or submit itself to any jurisdiction. #include "Framework/AsyncQueue.h" #include "Framework/DataProcessingDevice.h" +#include #include "Framework/ControlService.h" #include "Framework/ComputingQuotaEvaluator.h" #include "Framework/DataProcessingHeader.h" @@ -99,6 +100,8 @@ O2_DECLARE_DYNAMIC_LOG(async_queue); O2_DECLARE_DYNAMIC_LOG(forwarding); // Special log to track CCDB related requests O2_DECLARE_DYNAMIC_LOG(ccdb); +// Special log to track task scheduling +O2_DECLARE_DYNAMIC_LOG(scheduling); using namespace o2::framework; using ConfigurationInterface = o2::configuration::ConfigurationInterface; @@ -1551,10 +1554,22 @@ void DataProcessingDevice::Run() auto& spec = ref.get(); bool enough = ref.get().selectOffer(streamRef.index, spec.resourcePolicy.request, uv_now(state.loop)); + struct SchedulingStats { + std::atomic lastScheduled = 0; + std::atomic numberOfUnscheduledSinceLastScheduled = 0; + std::atomic numberOfUnscheduled = 0; + std::atomic numberOfScheduled = 0; + }; + static SchedulingStats schedulingStats; + O2_SIGNPOST_ID_GENERATE(sid, scheduling); if (enough) { stream.id = streamRef; stream.running = true; stream.registry = &mServiceRegistry; + schedulingStats.lastScheduled = uv_now(state.loop); + schedulingStats.numberOfScheduled++; + schedulingStats.numberOfUnscheduledSinceLastScheduled = 0; + O2_SIGNPOST_EVENT_EMIT(scheduling, sid, "Run", "Enough resources to schedule computation on stream %d", streamRef.index); if (dplEnableMultithreding) [[unlikely]] { stream.task = &handle; uv_queue_work(state.loop, stream.task, run_callback, run_completion); @@ -1563,6 +1578,20 @@ void DataProcessingDevice::Run() run_completion(&handle, 0); } } else { + if (schedulingStats.numberOfUnscheduledSinceLastScheduled > 100 || + (uv_now(state.loop) - schedulingStats.lastScheduled) > 30000) { + O2_SIGNPOST_EVENT_EMIT_WARN(scheduling, sid, "Run", + "Not enough resources to schedule computation. %zu skipped so far. Last scheduled at %zu.", + schedulingStats.numberOfUnscheduledSinceLastScheduled.load(), + schedulingStats.lastScheduled.load()); + } else { + O2_SIGNPOST_EVENT_EMIT(scheduling, sid, "Run", + "Not enough resources to schedule computation. %zu skipped so far. Last scheduled at %zu.", + schedulingStats.numberOfUnscheduledSinceLastScheduled.load(), + schedulingStats.lastScheduled.load()); + } + schedulingStats.numberOfUnscheduled++; + schedulingStats.numberOfUnscheduledSinceLastScheduled++; auto ref = ServiceRegistryRef{mServiceRegistry}; ref.get().handleExpired(reportExpiredOffer); } From 233a34ec5ad401a7dd4d5d92b30641e58d9eb7e6 Mon Sep 17 00:00:00 2001 From: Giulio Eulisse <10544+ktf@users.noreply.github.com> Date: Tue, 4 Nov 2025 12:29:38 +0100 Subject: [PATCH 2/2] DPL analysis: support timeslice rate limiting in DPL resource manager Use DPL resource manager rather than the ad-hoc solution for reconstruction. --- .../src/AODJAlienReaderHelpers.cxx | 1 + .../include/Framework/CommonDataProcessors.h | 3 + .../include/Framework/ComputingQuotaOffer.h | 4 + .../include/Framework/DataProcessingStats.h | 2 + .../include/Framework/ResourcePolicyHelpers.h | 1 + Framework/Core/src/ArrowSupport.cxx | 108 ++++++++++++------ Framework/Core/src/CommonDataProcessors.cxx | 39 +++++++ Framework/Core/src/CommonServices.cxx | 16 +++ .../Core/src/ComputingQuotaEvaluator.cxx | 53 ++++++--- Framework/Core/src/DataProcessingDevice.cxx | 5 + Framework/Core/src/ResourcePolicy.cxx | 3 +- Framework/Core/src/ResourcePolicyHelpers.cxx | 34 +++++- Framework/Core/src/WSDriverClient.cxx | 33 +++++- Framework/Core/src/WorkflowHelpers.cxx | 16 ++- .../src/FrameworkGUIDeviceInspector.cxx | 4 + 15 files changed, 255 insertions(+), 67 deletions(-) diff --git a/Framework/AnalysisSupport/src/AODJAlienReaderHelpers.cxx b/Framework/AnalysisSupport/src/AODJAlienReaderHelpers.cxx index 85ed9cd573d8a..1d4ae5a4d3c49 100644 --- a/Framework/AnalysisSupport/src/AODJAlienReaderHelpers.cxx +++ b/Framework/AnalysisSupport/src/AODJAlienReaderHelpers.cxx @@ -145,6 +145,7 @@ AlgorithmSpec AODJAlienReaderHelpers::rootFileReaderCallback(ConfigContext const stats.updateStats({static_cast(ProcessingStatsId::ARROW_BYTES_DESTROYED), DataProcessingStats::Op::Set, 0}); stats.updateStats({static_cast(ProcessingStatsId::ARROW_MESSAGES_DESTROYED), DataProcessingStats::Op::Set, 0}); stats.updateStats({static_cast(ProcessingStatsId::ARROW_BYTES_EXPIRED), DataProcessingStats::Op::Set, 0}); + stats.updateStats({static_cast(ProcessingStatsId::CONSUMED_TIMEFRAMES), DataProcessingStats::Op::Set, 0}); if (!options.isSet("aod-file-private")) { LOGP(fatal, "No input file defined!"); diff --git a/Framework/Core/include/Framework/CommonDataProcessors.h b/Framework/Core/include/Framework/CommonDataProcessors.h index 824386c4d5921..d3ef596a0c0e0 100644 --- a/Framework/Core/include/Framework/CommonDataProcessors.h +++ b/Framework/Core/include/Framework/CommonDataProcessors.h @@ -37,6 +37,9 @@ struct CommonDataProcessors { /// and simply discards them. @a rateLimitingChannelConfig is the configuration /// for the rate limiting channel, if any required. static DataProcessorSpec getDummySink(std::vector const& danglingInputs, std::string rateLimitingChannelConfig); + /// @return a dummy DataProcessorSpec which requires all the passed @a InputSpec + /// and simply discards them. Rate limiting goes through the DPL driver + static DataProcessorSpec getScheduledDummySink(std::vector const& danglingInputs); static AlgorithmSpec wrapWithRateLimiting(AlgorithmSpec spec); }; diff --git a/Framework/Core/include/Framework/ComputingQuotaOffer.h b/Framework/Core/include/Framework/ComputingQuotaOffer.h index f457f46eef774..b7707613bc19d 100644 --- a/Framework/Core/include/Framework/ComputingQuotaOffer.h +++ b/Framework/Core/include/Framework/ComputingQuotaOffer.h @@ -44,6 +44,8 @@ struct ComputingQuotaOffer { int64_t memory = 0; /// How much shared memory it can allocate int64_t sharedMemory = 0; + /// How many timeslices it can process without giving back control + int64_t timeslices = 0; /// How much runtime it can use before giving back the resource /// in milliseconds. int64_t runtime = 0; @@ -68,8 +70,10 @@ struct ComputingQuotaInfo { /// Statistics on the offers consumed, expired struct ComputingQuotaStats { int64_t totalConsumedBytes = 0; + int64_t totalConsumedTimeslices = 0; int64_t totalConsumedOffers = 0; int64_t totalExpiredBytes = 0; + int64_t totalExpiredTimeslices = 0; int64_t totalExpiredOffers = 0; }; diff --git a/Framework/Core/include/Framework/DataProcessingStats.h b/Framework/Core/include/Framework/DataProcessingStats.h index d42f9a9d26610..e115e5d12b58f 100644 --- a/Framework/Core/include/Framework/DataProcessingStats.h +++ b/Framework/Core/include/Framework/DataProcessingStats.h @@ -60,8 +60,10 @@ enum struct ProcessingStatsId : short { ARROW_MESSAGES_CREATED, ARROW_MESSAGES_DESTROYED, ARROW_BYTES_EXPIRED, + TIMESLICE_NUMBER_EXPIRED, RESOURCE_OFFER_EXPIRED, SHM_OFFER_BYTES_CONSUMED, + TIMESLICE_OFFER_NUMBER_CONSUMED, RESOURCES_MISSING, RESOURCES_INSUFFICIENT, RESOURCES_SATISFACTORY, diff --git a/Framework/Core/include/Framework/ResourcePolicyHelpers.h b/Framework/Core/include/Framework/ResourcePolicyHelpers.h index abee264d75104..17599f9afb1a7 100644 --- a/Framework/Core/include/Framework/ResourcePolicyHelpers.h +++ b/Framework/Core/include/Framework/ResourcePolicyHelpers.h @@ -22,6 +22,7 @@ namespace o2::framework struct ResourcePolicyHelpers { static ResourcePolicy trivialTask(char const* taskMatcher); static ResourcePolicy cpuBoundTask(char const* taskMatcher, int maxCPUs = 1); + static ResourcePolicy rateLimitedSharedMemoryBoundTask(char const* taskMatcher, int maxMemory, int maxTimeslices); static ResourcePolicy sharedMemoryBoundTask(char const* taskMatcher, int maxMemory); }; diff --git a/Framework/Core/src/ArrowSupport.cxx b/Framework/Core/src/ArrowSupport.cxx index 397a6f5113d13..932c1fdacacfb 100644 --- a/Framework/Core/src/ArrowSupport.cxx +++ b/Framework/Core/src/ArrowSupport.cxx @@ -65,7 +65,7 @@ enum struct RateLimitingState { struct RateLimitConfig { int64_t maxMemory = 2000; - int64_t maxTimeframes = 0; + int64_t maxTimeframes = 1; }; struct MetricIndices { @@ -77,6 +77,7 @@ struct MetricIndices { size_t shmOfferBytesConsumed = -1; size_t timeframesRead = -1; size_t timeframesConsumed = -1; + size_t timeframesExpired = -1; }; std::vector createDefaultIndices(std::vector& allDevicesMetrics) @@ -84,25 +85,20 @@ std::vector createDefaultIndices(std::vector& std::vector results; for (auto& info : allDevicesMetrics) { - MetricIndices indices; - indices.arrowBytesCreated = DeviceMetricsHelper::bookNumericMetric(info, "arrow-bytes-created"); - indices.arrowBytesDestroyed = DeviceMetricsHelper::bookNumericMetric(info, "arrow-bytes-destroyed"); - indices.arrowMessagesCreated = DeviceMetricsHelper::bookNumericMetric(info, "arrow-messages-created"); - indices.arrowMessagesDestroyed = DeviceMetricsHelper::bookNumericMetric(info, "arrow-messages-destroyed"); - indices.arrowBytesExpired = DeviceMetricsHelper::bookNumericMetric(info, "arrow-bytes-expired"); - indices.shmOfferBytesConsumed = DeviceMetricsHelper::bookNumericMetric(info, "shm-offer-bytes-consumed"); - indices.timeframesRead = DeviceMetricsHelper::bookNumericMetric(info, "df-sent"); - indices.timeframesConsumed = DeviceMetricsHelper::bookNumericMetric(info, "consumed-timeframes"); - results.push_back(indices); + results.emplace_back(MetricIndices{ + .arrowBytesCreated = DeviceMetricsHelper::bookNumericMetric(info, "arrow-bytes-created"), + .arrowBytesDestroyed = DeviceMetricsHelper::bookNumericMetric(info, "arrow-bytes-destroyed"), + .arrowMessagesCreated = DeviceMetricsHelper::bookNumericMetric(info, "arrow-messages-created"), + .arrowMessagesDestroyed = DeviceMetricsHelper::bookNumericMetric(info, "arrow-messages-destroyed"), + .arrowBytesExpired = DeviceMetricsHelper::bookNumericMetric(info, "arrow-bytes-expired"), + .shmOfferBytesConsumed = DeviceMetricsHelper::bookNumericMetric(info, "shm-offer-bytes-consumed"), + .timeframesRead = DeviceMetricsHelper::bookNumericMetric(info, "df-sent"), + .timeframesConsumed = DeviceMetricsHelper::bookNumericMetric(info, "consumed-timeframes"), + .timeframesExpired = DeviceMetricsHelper::bookNumericMetric(info, "expired-timeframes")}); } return results; } -uint64_t calculateAvailableSharedMemory(ServiceRegistryRef registry) -{ - return registry.get().maxMemory; -} - struct ResourceState { int64_t available; int64_t offered = 0; @@ -205,29 +201,30 @@ auto offerResources(ResourceState& resourceState, // unusedOfferedSharedMemory is the amount of memory which was offered and which we know it was // not used so far. So we need to account for the amount which got actually read (readerBytesCreated) // and the amount which we know was given back. - static int64_t lastShmOfferConsumed = 0; - static int64_t lastUnusedOfferedMemory = 0; - if (offerConsumedCurrentValue != lastShmOfferConsumed) { + static int64_t lastResourceOfferConsumed = 0; + static int64_t lastUnusedOfferedResource = 0; + if (offerConsumedCurrentValue != lastResourceOfferConsumed) { O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer", "Offer consumed so far %llu", offerConsumedCurrentValue); - lastShmOfferConsumed = offerConsumedCurrentValue; + lastResourceOfferConsumed = offerConsumedCurrentValue; } - int unusedOfferedMemory = (resourceState.offered - (offerExpiredCurrentValue + offerConsumedCurrentValue) / resourceSpec.metricOfferScaleFactor); - if (lastUnusedOfferedMemory != unusedOfferedMemory) { + int unusedOfferedResource = (resourceState.offered - (offerExpiredCurrentValue + offerConsumedCurrentValue) / resourceSpec.metricOfferScaleFactor); + if (lastUnusedOfferedResource != unusedOfferedResource) { O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer", - "unusedOfferedMemory:%{bytes}d = offered:%{bytes}llu - (expired:%{bytes}llu + consumed:%{bytes}llu) / %lli", - unusedOfferedMemory, resourceState.offered, + "unusedOfferedResource(%{public}s):%{bytes}d = offered:%{bytes}llu - (expired:%{bytes}llu + consumed:%{bytes}llu) / %lli", + resourceSpec.name, + unusedOfferedResource, resourceState.offered, offerExpiredCurrentValue / resourceSpec.metricOfferScaleFactor, offerConsumedCurrentValue / resourceSpec.metricOfferScaleFactor, resourceSpec.metricOfferScaleFactor); - lastUnusedOfferedMemory = unusedOfferedMemory; + lastUnusedOfferedResource = unusedOfferedResource; } // availableSharedMemory is the amount of memory which we know is available to be offered. // We subtract the amount which we know was already offered but it's unused and we then balance how // much was created with how much was destroyed. - resourceState.available = resourceSpec.maxAvailable + ((disposedResourceCurrentValue - acquiredResourceCurrentValue) / resourceSpec.metricOfferScaleFactor) - unusedOfferedMemory; + resourceState.available = resourceSpec.maxAvailable + ((disposedResourceCurrentValue - acquiredResourceCurrentValue) / resourceSpec.metricOfferScaleFactor) - unusedOfferedResource; availableResourceMetric(driverMetrics, resourceState.available, timestamp); - unusedOfferedResourceMetric(driverMetrics, unusedOfferedMemory, timestamp); + unusedOfferedResourceMetric(driverMetrics, unusedOfferedResource, timestamp); offeredResourceMetric(driverMetrics, resourceState.offered, timestamp); }; @@ -258,6 +255,7 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() int64_t totalMessagesDestroyed = 0; int64_t totalTimeframesRead = 0; int64_t totalTimeframesConsumed = 0; + int64_t totalTimeframesExpired = 0; auto &driverMetrics = sm.driverMetricsInfo; auto &allDeviceMetrics = sm.deviceMetricsInfos; auto &specs = sm.deviceSpecs; @@ -266,9 +264,14 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() static auto stateMetric = DeviceMetricsHelper::createNumericMetric(driverMetrics, "rate-limit-state"); static auto totalBytesCreatedMetric = DeviceMetricsHelper::createNumericMetric(driverMetrics, "total-arrow-bytes-created"); static auto shmOfferConsumedMetric = DeviceMetricsHelper::createNumericMetric(driverMetrics, "total-shm-offer-bytes-consumed"); + // These are really to monitor the rate limiting static auto unusedOfferedSharedMemoryMetric = DeviceMetricsHelper::createNumericMetric(driverMetrics, "total-unused-offered-shared-memory"); + static auto unusedOfferedTimeslicesMetric = DeviceMetricsHelper::createNumericMetric(driverMetrics, "total-unused-offered-timeslices"); static auto availableSharedMemoryMetric = DeviceMetricsHelper::createNumericMetric(driverMetrics, "total-available-shared-memory"); + static auto availableTimeslicesMetric = DeviceMetricsHelper::createNumericMetric(driverMetrics, "total-available-timeslices"); static auto offeredSharedMemoryMetric = DeviceMetricsHelper::createNumericMetric(driverMetrics, "total-offered-shared-memory"); + static auto offeredTimeslicesMetric = DeviceMetricsHelper::createNumericMetric(driverMetrics, "total-offered-timeslices"); + static auto totalBytesDestroyedMetric = DeviceMetricsHelper::createNumericMetric(driverMetrics, "total-arrow-bytes-destroyed"); static auto totalBytesExpiredMetric = DeviceMetricsHelper::createNumericMetric(driverMetrics, "total-arrow-bytes-expired"); static auto totalMessagesCreatedMetric = DeviceMetricsHelper::createNumericMetric(driverMetrics, "total-arrow-messages-created"); @@ -390,6 +393,18 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() auto const& timestamps = DeviceMetricsHelper::getTimestampsStore(deviceMetrics)[info.storeIdx]; lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]); } + { + size_t index = indices.timeframesExpired; + assert(index < deviceMetrics.metrics.size()); + changed |= deviceMetrics.changed[index]; + MetricInfo info = deviceMetrics.metrics[index]; + assert(info.storeIdx < deviceMetrics.uint64Metrics.size()); + auto& data = deviceMetrics.uint64Metrics[info.storeIdx]; + auto value = (int64_t)data[(info.pos - 1) % data.size()]; + totalTimeframesExpired += value; + auto const& timestamps = DeviceMetricsHelper::getTimestampsStore(deviceMetrics)[info.storeIdx]; + lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]); + } } static uint64_t unchangedCount = 0; if (changed) { @@ -407,26 +422,45 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() unchangedCount++; } changedCountMetric(driverMetrics, unchangedCount, timestamp); - auto maxTimeframes = registry.get().maxTimeframes; - if (maxTimeframes && (totalTimeframesRead - totalTimeframesConsumed) > maxTimeframes) { - return; - } + static const ResourceSpec shmResourceSpec{ .name = "shared memory", .unit = "MB", .api = "/shm-offer {}", - .maxAvailable = (int64_t)calculateAvailableSharedMemory(registry), + .maxAvailable = (int64_t)registry.get().maxMemory, .maxQuantum = 100, .minQuantum = 50, .metricOfferScaleFactor = 1000000, }; + static const ResourceSpec timesliceResourceSpec{ + .name = "timeslice", + .unit = "timeslices", + .api = "/timeslice-offer {}", + .maxAvailable = (int64_t)registry.get().maxTimeframes, + .maxQuantum = 1, + .minQuantum = 1, + .metricOfferScaleFactor = 1, + }; static ResourceState shmResourceState{ .available = shmResourceSpec.maxAvailable, }; + static ResourceState timesliceResourceState{ + .available = timesliceResourceSpec.maxAvailable, + }; static ResourceStats shmResourceStats{ .enoughCount = shmResourceState.available - shmResourceSpec.minQuantum > 0 ? 1 : 0, .lowCount = shmResourceState.available - shmResourceSpec.minQuantum > 0 ? 0 : 1 }; + static ResourceStats timesliceResourceStats{ + .enoughCount = shmResourceState.available - shmResourceSpec.minQuantum > 0 ? 1 : 0, + .lowCount = shmResourceState.available - shmResourceSpec.minQuantum > 0 ? 0 : 1 + }; + + offerResources(timesliceResourceState, timesliceResourceSpec, timesliceResourceStats, + specs, infos, manager, totalTimeframesConsumed, totalTimeframesExpired, + totalTimeframesRead, totalTimeframesConsumed, timestamp, driverMetrics, + availableTimeslicesMetric, unusedOfferedTimeslicesMetric, offeredTimeslicesMetric, + (void*)&sm); offerResources(shmResourceState, shmResourceSpec, shmResourceStats, specs, infos, manager, shmOfferBytesConsumed, totalBytesExpired, @@ -487,18 +521,18 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() } else { config->maxMemory = readers * 500; } - if (dc.options.count("timeframes-rate-limit") && dc.options["timeframes-rate-limit"].as() == "readers") { - config->maxTimeframes = readers; - } else { + if (dc.options.count("timeframes-rate-limit") && dc.options["timeframes-rate-limit"].defaulted() == false) { config->maxTimeframes = std::stoll(dc.options["timeframes-rate-limit"].as()); + } else { + config->maxTimeframes = readers; } static bool once = false; // Until we guarantee this is called only once... if (!once) { O2_SIGNPOST_ID_GENERATE(sid, rate_limiting); O2_SIGNPOST_EVENT_EMIT_INFO(rate_limiting, sid, "setup", - "Rate limiting set up at %{bytes}llu MB distributed over %d readers", - config->maxMemory, readers); + "Rate limiting set up at %{bytes}llu MB and %llu timeframes distributed over %d readers", + config->maxMemory, config->maxTimeframes, readers); registry.registerService(ServiceRegistryHelpers::handleForService(config)); once = true; } }, diff --git a/Framework/Core/src/CommonDataProcessors.cxx b/Framework/Core/src/CommonDataProcessors.cxx index c2431b3ab068d..4d82cb7124e64 100644 --- a/Framework/Core/src/CommonDataProcessors.cxx +++ b/Framework/Core/src/CommonDataProcessors.cxx @@ -44,6 +44,7 @@ using namespace o2::framework::data_matcher; // Special log to track callbacks we know about O2_DECLARE_DYNAMIC_LOG(callbacks); +O2_DECLARE_DYNAMIC_LOG(rate_limiting); namespace o2::framework { @@ -211,6 +212,8 @@ DataProcessorSpec CommonDataProcessors::getDummySink(std::vector cons auto oldestPossingTimeslice = timesliceIndex.getOldestPossibleOutput().timeslice.value; auto& stats = services.get(); stats.updateStats({(int)ProcessingStatsId::CONSUMED_TIMEFRAMES, DataProcessingStats::Op::Set, (int64_t)oldestPossingTimeslice}); + stats.updateStats({(int)ProcessingStatsId::TIMESLICE_OFFER_NUMBER_CONSUMED, DataProcessingStats::Op::Set, (int64_t)oldestPossingTimeslice}); + stats.processCommandQueue(); }; callbacks.set(domainInfoUpdated); @@ -224,6 +227,42 @@ DataProcessorSpec CommonDataProcessors::getDummySink(std::vector cons .labels = {{"resilient"}}}; } +// For the cases were the driver is guaranteed to be there (e.g. in analysis) we can use a +// more sophisticated controller which can get offers for timeslices so that we can rate limit +// across multiple input devices and rate limit shared memory usage without race conditions +DataProcessorSpec CommonDataProcessors::getScheduledDummySink(std::vector const& danglingOutputInputs) +{ + return DataProcessorSpec{ + .name = "internal-dpl-injected-dummy-sink", + .inputs = danglingOutputInputs, + .algorithm = AlgorithmSpec{adaptStateful([](CallbackService& callbacks, DeviceState& deviceState, InitContext& ic) { + // We update the number of consumed timeframes based on the oldestPossingTimeslice + // this information will be aggregated in the driver which will then decide wether or not a new offer for + // a timeslice should be done and to which device + auto domainInfoUpdated = [](ServiceRegistryRef services, size_t timeslice, ChannelIndex channelIndex) { + LOGP(info, "Domain info updated with timeslice {}", timeslice); + auto& timesliceIndex = services.get(); + auto oldestPossingTimeslice = timesliceIndex.getOldestPossibleOutput().timeslice.value; + auto& stats = services.get(); + O2_SIGNPOST_ID_GENERATE(sid, rate_limiting); + O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "run", "Consumed timeframes (domain info updated) to be set to %zu.", oldestPossingTimeslice); + stats.updateStats({(int)ProcessingStatsId::CONSUMED_TIMEFRAMES, DataProcessingStats::Op::Set, (int64_t)oldestPossingTimeslice}); + stats.updateStats({(int)ProcessingStatsId::TIMESLICE_OFFER_NUMBER_CONSUMED, DataProcessingStats::Op::Set, (int64_t)oldestPossingTimeslice}); + stats.processCommandQueue(); + }; + callbacks.set(domainInfoUpdated); + + return adaptStateless([](DataProcessingStats& stats, TimesliceIndex& timesliceIndex) { + O2_SIGNPOST_ID_GENERATE(sid, rate_limiting); + auto oldestPossingTimeslice = timesliceIndex.getOldestPossibleOutput().timeslice.value; + O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "run", "Consumed timeframes (processing) to be set to %zu.", oldestPossingTimeslice); + stats.updateStats({(int)ProcessingStatsId::CONSUMED_TIMEFRAMES, DataProcessingStats::Op::Set, (int64_t)oldestPossingTimeslice}); + stats.updateStats({(int)ProcessingStatsId::TIMESLICE_OFFER_NUMBER_CONSUMED, DataProcessingStats::Op::Set, (int64_t)oldestPossingTimeslice}); + }); + })}, + .labels = {{"resilient"}}}; +} + AlgorithmSpec CommonDataProcessors::wrapWithRateLimiting(AlgorithmSpec spec) { return PluginManager::wrapAlgorithm(spec, [](AlgorithmSpec::ProcessCallback& original, ProcessingContext& pcx) -> void { diff --git a/Framework/Core/src/CommonServices.cxx b/Framework/Core/src/CommonServices.cxx index 091cd9d4ed0a5..aedd96dab41ca 100644 --- a/Framework/Core/src/CommonServices.cxx +++ b/Framework/Core/src/CommonServices.cxx @@ -1080,6 +1080,22 @@ o2::framework::ServiceSpec CommonServices::dataProcessingStats() .minPublishInterval = 0, .maxRefreshLatency = 10000, .sendInitialValue = true}, + MetricSpec{.name = "timeslices-expired", + .enabled = arrowAndResourceLimitingMetrics, + .metricId = static_cast(ProcessingStatsId::TIMESLICE_NUMBER_EXPIRED), + .kind = Kind::UInt64, + .scope = Scope::DPL, + .minPublishInterval = 0, + .maxRefreshLatency = 10000, + .sendInitialValue = true}, + MetricSpec{.name = "timeslices-consumed", + .enabled = arrowAndResourceLimitingMetrics, + .metricId = static_cast(ProcessingStatsId::TIMESLICE_OFFER_NUMBER_CONSUMED), + .kind = Kind::UInt64, + .scope = Scope::DPL, + .minPublishInterval = 0, + .maxRefreshLatency = 10000, + .sendInitialValue = true}, MetricSpec{.name = "resources-missing", .enabled = enableDebugMetrics, .metricId = static_cast(ProcessingStatsId::RESOURCES_MISSING), diff --git a/Framework/Core/src/ComputingQuotaEvaluator.cxx b/Framework/Core/src/ComputingQuotaEvaluator.cxx index 717a59f5f5372..aa566ccb4d549 100644 --- a/Framework/Core/src/ComputingQuotaEvaluator.cxx +++ b/Framework/Core/src/ComputingQuotaEvaluator.cxx @@ -36,14 +36,14 @@ ComputingQuotaEvaluator::ComputingQuotaEvaluator(ServiceRegistryRef ref) // so this will only work with some device which does not require // any CPU. Notice this will have troubles if a given DPL process // runs for more than a year. - mOffers[0] = { - 0, - 0, - 0, - -1, - -1, - OfferScore::Unneeded, - true}; + mOffers[0] = ComputingQuotaOffer{ + .cpu = 0, + .memory = 0, + .sharedMemory = 0, + .timeslices = 0, + .runtime = -1, + .score = OfferScore::Unneeded, + .valid = true}; mInfos[0] = { uv_now(state.loop), 0, @@ -97,7 +97,7 @@ bool ComputingQuotaEvaluator::selectOffer(int task, ComputingQuotaRequest const& result.size(), totalOffer.cpu, totalOffer.memory, totalOffer.sharedMemory); for (auto& offer : result) { // We pretend each offer id is a pointer, to have a unique id. - O2_SIGNPOST_ID_FROM_POINTER(oid, quota, (void*)(int64_t)(offer*8)); + O2_SIGNPOST_ID_FROM_POINTER(oid, quota, (void*)(int64_t)(offer * 8)); O2_SIGNPOST_START(quota, oid, "offers", "Offer %d has been selected.", offer); } dpStats.updateStats({static_cast(ProcessingStatsId::RESOURCES_SATISFACTORY), DataProcessingStats::Op::Add, 1}); @@ -132,6 +132,7 @@ bool ComputingQuotaEvaluator::selectOffer(int task, ComputingQuotaRequest const& auto& offer = mOffers[i]; auto& info = mInfos[i]; if (enough) { + O2_SIGNPOST_EVENT_EMIT(quota, qid, "select", "We have enough offers. We can continue for computation."); break; } // Ignore: @@ -139,24 +140,26 @@ bool ComputingQuotaEvaluator::selectOffer(int task, ComputingQuotaRequest const& // - Offers which belong to another task // - Expired offers if (offer.valid == false) { + O2_SIGNPOST_EVENT_EMIT(quota, qid, "select", "Offer %d is not valid. Skipping", i); stats.invalidOffers.push_back(i); continue; } if (offer.user != -1 && offer.user != task) { + O2_SIGNPOST_EVENT_EMIT(quota, qid, "select", "Offer %d already offered to some other user", i); stats.otherUser.push_back(i); continue; } if (offer.runtime < 0) { stats.unexpiring.push_back(i); } else if (offer.runtime + info.received < now) { - O2_SIGNPOST_EVENT_EMIT(quota, qid, "select", "Offer %d expired since %llu milliseconds and holds %llu MB", - i, now - offer.runtime - info.received, offer.sharedMemory / 1000000); + O2_SIGNPOST_EVENT_EMIT(quota, qid, "select", "Offer %d expired since %llu milliseconds and holds %llu MB and %llu timeslices", + i, now - offer.runtime - info.received, offer.sharedMemory / 1000000, offer.timeslices); mExpiredOffers.push_back(ComputingQuotaOfferRef{i}); stats.expired.push_back(i); continue; } else { - O2_SIGNPOST_EVENT_EMIT(quota, qid, "select", "Offer %d still valid for %llu milliseconds, providing %llu MB", - i, offer.runtime + info.received - now, offer.sharedMemory / 1000000); + O2_SIGNPOST_EVENT_EMIT(quota, qid, "select", "Offer %d still valid for %llu milliseconds, providing %llu MB and %llu timeslices", + i, offer.runtime + info.received - now, offer.sharedMemory / 1000000, offer.timeslices); if (minValidity == 0) { minValidity = offer.runtime + info.received - now; } @@ -168,22 +171,29 @@ bool ComputingQuotaEvaluator::selectOffer(int task, ComputingQuotaRequest const& tmp.cpu += offer.cpu; tmp.memory += offer.memory; tmp.sharedMemory += offer.sharedMemory; - offer.score = selector(offer, tmp); + tmp.timeslices += offer.timeslices; + offer.score = selector(offer, accumulated); switch (offer.score) { case OfferScore::Unneeded: + O2_SIGNPOST_EVENT_EMIT(quota, qid, "select", "Offer %d considered not needed. Skipping", i); continue; case OfferScore::Unsuitable: + O2_SIGNPOST_EVENT_EMIT(quota, qid, "select", "Offer %d considered Unsuitable. Skipping", i); continue; case OfferScore::More: selectOffer(i, now); accumulated = tmp; stats.selectedOffers.push_back(i); + O2_SIGNPOST_EVENT_EMIT(quota, qid, "select", "Offer %d selected but not enough. %llu MB, %d cores and %llu timeslices are not enough.", + i, tmp.sharedMemory / 1000000, tmp.cpu, tmp.timeslices); continue; case OfferScore::Enough: selectOffer(i, now); accumulated = tmp; stats.selectedOffers.push_back(i); enough = true; + O2_SIGNPOST_EVENT_EMIT(quota, qid, "select", "Selected %zu offers providing %llu MB, %d cores and %llu timeslices are deemed enough.", + stats.selectedOffers.size(), tmp.sharedMemory / 1000000, tmp.cpu, tmp.timeslices); break; }; } @@ -224,7 +234,7 @@ void ComputingQuotaEvaluator::dispose(int taskId) continue; } if (offer.sharedMemory <= 0) { - O2_SIGNPOST_ID_FROM_POINTER(oid, quota, (void*)(int64_t)(oi*8)); + O2_SIGNPOST_ID_FROM_POINTER(oid, quota, (void*)(int64_t)(oi * 8)); O2_SIGNPOST_END(quota, oid, "offers", "Offer %d back to not needed.", oi); offer.valid = false; offer.score = OfferScore::Unneeded; @@ -235,21 +245,28 @@ void ComputingQuotaEvaluator::dispose(int taskId) /// Move offers from the pending list to the actual available offers void ComputingQuotaEvaluator::updateOffers(std::vector& pending, uint64_t now) { + O2_SIGNPOST_ID_GENERATE(oid, quota); + O2_SIGNPOST_START(quota, oid, "updateOffers", "Starting to processe received offers"); for (size_t oi = 0; oi < mOffers.size(); oi++) { auto& storeOffer = mOffers[oi]; auto& info = mInfos[oi]; if (pending.empty()) { + O2_SIGNPOST_END(quota, oid, "updateOffers", "No more pending offers to process"); return; } if (storeOffer.valid == true) { + O2_SIGNPOST_EVENT_EMIT(quota, oid, "updateOffers", "Skipping update of offer %zu because it's still valid", oi); continue; } info.received = now; auto& offer = pending.back(); + O2_SIGNPOST_EVENT_EMIT(quota, oid, "updateOffers", "Updating of offer %zu at %llu. Cpu: %d, Shared Memory %lli, Timeslices: %lli", + oi, now, offer.cpu, offer.sharedMemory, offer.timeslices); storeOffer = offer; storeOffer.valid = true; pending.pop_back(); } + O2_SIGNPOST_END_WITH_ERROR(quota, oid, "updateOffers", "Some of the pending offers were not treated"); } void ComputingQuotaEvaluator::handleExpired(std::function expirator) @@ -269,7 +286,7 @@ void ComputingQuotaEvaluator::handleExpired(std::function= 0); mStats.totalExpiredBytes += offer.sharedMemory; mStats.totalExpiredOffers++; diff --git a/Framework/Core/src/DataProcessingDevice.cxx b/Framework/Core/src/DataProcessingDevice.cxx index 86e2aab53791d..5868557f2c80f 100644 --- a/Framework/Core/src/DataProcessingDevice.cxx +++ b/Framework/Core/src/DataProcessingDevice.cxx @@ -304,16 +304,20 @@ void run_completion(uv_work_t* handle, int status) static std::function reportConsumedOffer = [ref](ComputingQuotaOffer const& accumulatedConsumed, ComputingQuotaStats& stats) { auto& dpStats = ref.get(); stats.totalConsumedBytes += accumulatedConsumed.sharedMemory; + stats.totalConsumedTimeslices += accumulatedConsumed.timeslices; dpStats.updateStats({static_cast(ProcessingStatsId::SHM_OFFER_BYTES_CONSUMED), DataProcessingStats::Op::Set, stats.totalConsumedBytes}); + dpStats.updateStats({static_cast(ProcessingStatsId::TIMESLICE_OFFER_NUMBER_CONSUMED), DataProcessingStats::Op::Set, stats.totalConsumedBytes}); dpStats.processCommandQueue(); assert(stats.totalConsumedBytes == dpStats.metrics[(short)ProcessingStatsId::SHM_OFFER_BYTES_CONSUMED]); + assert(stats.totalConsumedTimeslices == dpStats.metrics[(short)ProcessingStatsId::TIMESLICE_OFFER_NUMBER_CONSUMED]); }; static std::function reportExpiredOffer = [ref](ComputingQuotaOffer const& offer, ComputingQuotaStats const& stats) { auto& dpStats = ref.get(); dpStats.updateStats({static_cast(ProcessingStatsId::RESOURCE_OFFER_EXPIRED), DataProcessingStats::Op::Set, stats.totalExpiredOffers}); dpStats.updateStats({static_cast(ProcessingStatsId::ARROW_BYTES_EXPIRED), DataProcessingStats::Op::Set, stats.totalExpiredBytes}); + dpStats.updateStats({static_cast(ProcessingStatsId::TIMESLICE_NUMBER_EXPIRED), DataProcessingStats::Op::Set, stats.totalExpiredTimeslices}); dpStats.processCommandQueue(); }; @@ -1544,6 +1548,7 @@ void DataProcessingDevice::Run() auto& dpStats = ref.get(); dpStats.updateStats({static_cast(ProcessingStatsId::RESOURCE_OFFER_EXPIRED), DataProcessingStats::Op::Set, stats.totalExpiredOffers}); dpStats.updateStats({static_cast(ProcessingStatsId::ARROW_BYTES_EXPIRED), DataProcessingStats::Op::Set, stats.totalExpiredBytes}); + dpStats.updateStats({static_cast(ProcessingStatsId::TIMESLICE_NUMBER_EXPIRED), DataProcessingStats::Op::Set, stats.totalExpiredTimeslices}); dpStats.processCommandQueue(); }; auto ref = ServiceRegistryRef{mServiceRegistry}; diff --git a/Framework/Core/src/ResourcePolicy.cxx b/Framework/Core/src/ResourcePolicy.cxx index 9076a87d547fa..18ff15e372657 100644 --- a/Framework/Core/src/ResourcePolicy.cxx +++ b/Framework/Core/src/ResourcePolicy.cxx @@ -18,8 +18,9 @@ namespace o2::framework std::vector ResourcePolicy::createDefaultPolicies() { + // FIXME: we should have better logic to decide if we can process something. return { - ResourcePolicyHelpers::sharedMemoryBoundTask("internal-dpl-aod-reader.*", 100000000), + ResourcePolicyHelpers::rateLimitedSharedMemoryBoundTask("internal-dpl-aod-reader.*", 100000000, 1), ResourcePolicyHelpers::trivialTask(".*")}; } diff --git a/Framework/Core/src/ResourcePolicyHelpers.cxx b/Framework/Core/src/ResourcePolicyHelpers.cxx index aad783cdc1f60..2c5c4f54dd9b5 100644 --- a/Framework/Core/src/ResourcePolicyHelpers.cxx +++ b/Framework/Core/src/ResourcePolicyHelpers.cxx @@ -11,9 +11,7 @@ #include "Framework/ResourcePolicyHelpers.h" #include "Framework/DeviceSpec.h" -#include "ResourcesMonitoringHelper.h" -#include #include namespace o2::framework @@ -41,6 +39,36 @@ ResourcePolicy ResourcePolicyHelpers::cpuBoundTask(char const* s, int requestedC [requestedCPUs](ComputingQuotaOffer const& offer, ComputingQuotaOffer const& accumulated) -> OfferScore { return accumulated.cpu >= requestedCPUs ? OfferScore::Enough : OfferScore::More; }}; } +ResourcePolicy ResourcePolicyHelpers::rateLimitedSharedMemoryBoundTask(char const* s, int requestedSharedMemory, int requestedTimeslices) +{ + return ResourcePolicy{ + "ratelimited-shm-bound", + [matcher = std::regex(s)](DeviceSpec const& spec) -> bool { + return std::regex_match(spec.name, matcher); + }, + [requestedSharedMemory, requestedTimeslices](ComputingQuotaOffer const& offer, ComputingQuotaOffer const& accumulated) -> OfferScore { + // If we have enough memory and not enough timeslices, + // ignore further shared memory. + if (accumulated.sharedMemory >= requestedSharedMemory && offer.timeslices == 0) { + return OfferScore::Unneeded; + } + // If we have enough timeslices and not enough shared memory + // ignore further timeslices. + if (accumulated.timeslices >= requestedTimeslices && offer.sharedMemory == 0) { + return OfferScore::Unneeded; + } + // If it does not offer neither shared memory nor timeslices, mark it as unneeded. + if (offer.sharedMemory == 0 && offer.timeslices == 0) { + return OfferScore::Unneeded; + } + // We have enough to process. + if ((accumulated.sharedMemory + offer.sharedMemory) >= requestedSharedMemory && (accumulated.timeslices + offer.timeslices) >= requestedTimeslices) { + return OfferScore::Enough; + } + // We need more resources + return OfferScore::More; }}; +} + ResourcePolicy ResourcePolicyHelpers::sharedMemoryBoundTask(char const* s, int requestedSharedMemory) { return ResourcePolicy{ @@ -52,7 +80,7 @@ ResourcePolicy ResourcePolicyHelpers::sharedMemoryBoundTask(char const* s, int r if (offer.sharedMemory == 0) { return OfferScore::Unneeded; } - return accumulated.sharedMemory >= requestedSharedMemory ? OfferScore::Enough : OfferScore::More; }}; + return (accumulated.sharedMemory + offer.sharedMemory)>= requestedSharedMemory ? OfferScore::Enough : OfferScore::More; }}; } } // namespace o2::framework diff --git a/Framework/Core/src/WSDriverClient.cxx b/Framework/Core/src/WSDriverClient.cxx index 179b13bf91d76..43a407536cb59 100644 --- a/Framework/Core/src/WSDriverClient.cxx +++ b/Framework/Core/src/WSDriverClient.cxx @@ -26,6 +26,7 @@ O2_DECLARE_DYNAMIC_LOG(completion); O2_DECLARE_DYNAMIC_LOG(monitoring_service); O2_DECLARE_DYNAMIC_LOG(data_processor_context); O2_DECLARE_DYNAMIC_LOG(stream_context); +O2_DECLARE_DYNAMIC_LOG(ws_client); namespace o2::framework { @@ -49,8 +50,8 @@ struct ClientWebSocketHandler : public WebSocketHandler { mClient.dispatch(std::string_view(frame, s)); } - void endFragmentation() override{}; - void control(char const* frame, size_t s) override{}; + void endFragmentation() override {}; + void control(char const* frame, size_t s) override {}; /// Invoked at the beginning of some incoming data. We simply /// reset actions which need to happen on a per chunk basis. @@ -119,6 +120,34 @@ void on_connect(uv_connect_t* connection, int status) state.pendingOffers.push_back(offer); }); + client->observe("/timeslice-offer", [ref = context->ref](std::string_view cmd) { + O2_SIGNPOST_ID_GENERATE(wid, ws_client); + O2_SIGNPOST_START(ws_client, wid, "timeslice-offer", "Received timeslice offer."); + auto& state = ref.get(); + static constexpr int prefixSize = std::string_view{"/timeslice-offer "}.size(); + if (prefixSize > cmd.size()) { + O2_SIGNPOST_END_WITH_ERROR(ws_client, wid, "timeslice-offer", "Malformed timeslice offer"); + return; + } + cmd.remove_prefix(prefixSize); + int64_t offerSize; + auto offerSizeError = std::from_chars(cmd.data(), cmd.data() + cmd.size(), offerSize); + if (offerSizeError.ec != std::errc()) { + O2_SIGNPOST_END_WITH_ERROR(ws_client, wid, "timeslice-offer", "Unexpected timeslice offer size"); + return; + } + ComputingQuotaOffer offer{ + .cpu = 0, + .memory = 0, + .sharedMemory = 0, + .timeslices = offerSize, + .runtime = 10000, + .user = -1, + .valid = true}; + state.pendingOffers.push_back(offer); + O2_SIGNPOST_END(ws_client, wid, "timeslice-offer", "Received %lli timeslices offer. Total pending offers %zu.", + offerSize, state.pendingOffers.size()); + }); client->observe("/quit", [ref = context->ref](std::string_view) { auto& state = ref.get(); diff --git a/Framework/Core/src/WorkflowHelpers.cxx b/Framework/Core/src/WorkflowHelpers.cxx index 36583035c41ff..d27753848d544 100644 --- a/Framework/Core/src/WorkflowHelpers.cxx +++ b/Framework/Core/src/WorkflowHelpers.cxx @@ -465,11 +465,7 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext if (mctracks2aod == workflow.end()) { // add normal reader auto&& algo = PluginManager::loadAlgorithmFromPlugin("O2FrameworkAnalysisSupport", "ROOTFileReader", ctx); - if (internalRateLimiting) { - aodReader.algorithm = CommonDataProcessors::wrapWithRateLimiting(algo); - } else { - aodReader.algorithm = algo; - } + aodReader.algorithm = algo; aodReader.outputs.emplace_back(OutputSpec{"TFN", "TFNumber"}); aodReader.outputs.emplace_back(OutputSpec{"TFF", "TFFilename"}); } else { @@ -699,7 +695,15 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext ignoredInput.lifetime = Lifetime::Sporadic; } - extraSpecs.push_back(CommonDataProcessors::getDummySink(ignored, rateLimitingChannelConfigOutput)); + // Use the new dummy sink when the AOD reader is there + O2_SIGNPOST_ID_GENERATE(sid, workflow_helpers); + if (aodReader.outputs.empty() == false) { + O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "injectServiceDevices", "Injecting scheduled dummy sink"); + extraSpecs.push_back(CommonDataProcessors::getScheduledDummySink(ignored)); + } else { + O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "injectServiceDevices", "Injecting rate limited dummy sink"); + extraSpecs.push_back(CommonDataProcessors::getDummySink(ignored, rateLimitingChannelConfigOutput)); + } } workflow.insert(workflow.end(), extraSpecs.begin(), extraSpecs.end()); diff --git a/Framework/GUISupport/src/FrameworkGUIDeviceInspector.cxx b/Framework/GUISupport/src/FrameworkGUIDeviceInspector.cxx index b8c9cc50f0770..aa546b8a9ab49 100644 --- a/Framework/GUISupport/src/FrameworkGUIDeviceInspector.cxx +++ b/Framework/GUISupport/src/FrameworkGUIDeviceInspector.cxx @@ -329,6 +329,10 @@ void displayDeviceInspector(DeviceSpec const& spec, control.controller->write("/shm-offer 1000", strlen("/shm-offer 1000")); } + if (ImGui::Button("Offer timeslices")) { + control.controller->write("/timeslice-offer 1", strlen("/timeslice-offer 1")); + } + if (control.requestedState > info.providedState) { ImGui::Text(ICON_FA_CLOCK_O); } else {