diff --git a/Framework/Core/src/ArrowSupport.cxx b/Framework/Core/src/ArrowSupport.cxx index 94764571840f4..397a6f5113d13 100644 --- a/Framework/Core/src/ArrowSupport.cxx +++ b/Framework/Core/src/ArrowSupport.cxx @@ -13,17 +13,13 @@ #include "Framework/AODReaderHelpers.h" #include "Framework/ArrowContext.h" #include "Framework/ArrowTableSlicingCache.h" -#include "Framework/SliceCache.h" #include "Framework/DataProcessor.h" #include "Framework/DataProcessingStats.h" #include "Framework/ServiceRegistry.h" #include "Framework/ConfigContext.h" -#include "Framework/CommonDataProcessors.h" #include "Framework/DataSpecUtils.h" #include "Framework/DataSpecViews.h" #include "Framework/DeviceSpec.h" -#include "Framework/EndOfStreamContext.h" -#include "Framework/Tracing.h" #include "Framework/DeviceMetricsInfo.h" #include "Framework/DeviceMetricsHelper.h" #include "Framework/DeviceInfo.h" @@ -41,7 +37,6 @@ #include "CommonMessageBackendsHelpers.h" #include #include "Headers/DataHeader.h" -#include "Headers/DataHeaderHelpers.h" #include #include @@ -108,6 +103,135 @@ uint64_t calculateAvailableSharedMemory(ServiceRegistryRef registry) return registry.get().maxMemory; } +struct ResourceState { + int64_t available; + int64_t offered = 0; + int64_t lastDeviceOffered = 0; +}; +struct ResourceStats { + int64_t enoughCount; /// How many times the resources were enough + int64_t lowCount; /// How many times the resources were not enough +}; +struct ResourceSpec { + char const* name; + char const* unit; + char const* api; /// The callback to give resources to a device + int64_t maxAvailable; /// Maximum available quantity for a resource + int64_t maxQuantum; /// Largest offer which can be given + int64_t minQuantum; /// Smallest offer which can be given + int64_t metricOfferScaleFactor; /// The scale factor between the metric accounting and offers accounting +}; + +auto offerResources(ResourceState& resourceState, + ResourceSpec const& resourceSpec, + ResourceStats& resourceStats, + std::vector const& specs, + std::vector const& infos, + DevicesManager& manager, + int64_t offerConsumedCurrentValue, + int64_t offerExpiredCurrentValue, + int64_t acquiredResourceCurrentValue, + int64_t disposedResourceCurrentValue, + size_t timestamp, + DeviceMetricsInfo& driverMetrics, + std::function& availableResourceMetric, + std::function& unusedOfferedResourceMetric, + std::function& offeredResourceMetric, + void* signpostId) -> void +{ + O2_SIGNPOST_ID_FROM_POINTER(sid, rate_limiting, signpostId); + /// We loop over the devices, starting from where we stopped last time + /// offering the minimum offer to each one + int64_t lastCandidate = -1; + int64_t possibleOffer = resourceSpec.minQuantum; + + for (size_t di = 0; di < specs.size(); di++) { + if (resourceState.available < possibleOffer) { + if (resourceStats.lowCount == 0) { + O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "not enough", + "We do not have enough %{public}s (%llu %{public}s) to offer %llu %{public}s. Total offerings %{bytes}llu %{string}s.", + resourceSpec.name, resourceState.available, resourceSpec.unit, + possibleOffer, resourceSpec.unit, + resourceState.offered, resourceSpec.unit); + } + resourceStats.lowCount++; + resourceStats.enoughCount = 0; + break; + } else { + if (resourceStats.enoughCount == 0) { + O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "enough", + "We are back in a state where we enough %{public}s: %llu %{public}s", + resourceSpec.name, + resourceState.available, + resourceSpec.unit); + } + resourceStats.lowCount = 0; + resourceStats.enoughCount++; + } + size_t candidate = (resourceState.lastDeviceOffered + di) % specs.size(); + + auto& info = infos[candidate]; + // Do not bother for inactive devices + // FIXME: there is probably a race condition if the device died and we did not + // took notice yet... + if (info.active == false || info.readyToQuit) { + O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer", + "Device %s is inactive not offering %{public}s to it.", + specs[candidate].name.c_str(), resourceSpec.name); + continue; + } + if (specs[candidate].name != "internal-dpl-aod-reader") { + O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer", + "Device %s is not a reader. Not offering %{public}s to it.", + specs[candidate].name.c_str(), + resourceSpec.name); + continue; + } + possibleOffer = std::min(resourceSpec.maxQuantum, resourceState.available); + O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer", + "Offering %llu %{public}s out of %llu to %{public}s", + possibleOffer, resourceSpec.unit, resourceState.available, specs[candidate].id.c_str()); + manager.queueMessage(specs[candidate].id.c_str(), fmt::format(fmt::runtime(resourceSpec.api), possibleOffer).data()); + resourceState.available -= possibleOffer; + resourceState.offered += possibleOffer; + lastCandidate = candidate; + } + // We had at least a valid candidate, so + // next time we offer to the next device. + if (lastCandidate >= 0) { + resourceState.lastDeviceOffered = lastCandidate + 1; + } + + // unusedOfferedSharedMemory is the amount of memory which was offered and which we know it was + // not used so far. So we need to account for the amount which got actually read (readerBytesCreated) + // and the amount which we know was given back. + static int64_t lastShmOfferConsumed = 0; + static int64_t lastUnusedOfferedMemory = 0; + if (offerConsumedCurrentValue != lastShmOfferConsumed) { + O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer", + "Offer consumed so far %llu", offerConsumedCurrentValue); + lastShmOfferConsumed = offerConsumedCurrentValue; + } + int unusedOfferedMemory = (resourceState.offered - (offerExpiredCurrentValue + offerConsumedCurrentValue) / resourceSpec.metricOfferScaleFactor); + if (lastUnusedOfferedMemory != unusedOfferedMemory) { + O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer", + "unusedOfferedMemory:%{bytes}d = offered:%{bytes}llu - (expired:%{bytes}llu + consumed:%{bytes}llu) / %lli", + unusedOfferedMemory, resourceState.offered, + offerExpiredCurrentValue / resourceSpec.metricOfferScaleFactor, + offerConsumedCurrentValue / resourceSpec.metricOfferScaleFactor, + resourceSpec.metricOfferScaleFactor); + lastUnusedOfferedMemory = unusedOfferedMemory; + } + // availableSharedMemory is the amount of memory which we know is available to be offered. + // We subtract the amount which we know was already offered but it's unused and we then balance how + // much was created with how much was destroyed. + resourceState.available = resourceSpec.maxAvailable + ((disposedResourceCurrentValue - acquiredResourceCurrentValue) / resourceSpec.metricOfferScaleFactor) - unusedOfferedMemory; + availableResourceMetric(driverMetrics, resourceState.available, timestamp); + unusedOfferedResourceMetric(driverMetrics, unusedOfferedMemory, timestamp); + + offeredResourceMetric(driverMetrics, resourceState.offered, timestamp); +}; + o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() { using o2::monitoring::Metric; @@ -138,7 +262,6 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() auto &allDeviceMetrics = sm.deviceMetricsInfos; auto &specs = sm.deviceSpecs; auto &infos = sm.deviceInfos; - O2_SIGNPOST_ID_FROM_POINTER(sid, rate_limiting, &sm); static auto stateMetric = DeviceMetricsHelper::createNumericMetric(driverMetrics, "rate-limit-state"); static auto totalBytesCreatedMetric = DeviceMetricsHelper::createNumericMetric(driverMetrics, "total-arrow-bytes-created"); @@ -288,112 +411,28 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() if (maxTimeframes && (totalTimeframesRead - totalTimeframesConsumed) > maxTimeframes) { return; } - struct ResourceState { - int64_t available; - int64_t offered = 0; - int64_t lastDeviceOffered = 0; - }; - struct ResourceStats { - int64_t enoughCount; - int64_t lowCount; - }; - struct ResourceSpec{ - int64_t maxAvailable; - int64_t maxQuantum; - int64_t minQuantum; - }; - static const ResourceSpec resourceSpec{ + static const ResourceSpec shmResourceSpec{ + .name = "shared memory", + .unit = "MB", + .api = "/shm-offer {}", .maxAvailable = (int64_t)calculateAvailableSharedMemory(registry), .maxQuantum = 100, .minQuantum = 50, + .metricOfferScaleFactor = 1000000, }; - static ResourceState resourceState{ - .available = resourceSpec.maxAvailable, + static ResourceState shmResourceState{ + .available = shmResourceSpec.maxAvailable, }; - static ResourceStats resourceStats{ - .enoughCount = resourceState.available - resourceSpec.minQuantum > 0 ? 1 : 0, - .lowCount = resourceState.available - resourceSpec.minQuantum > 0 ? 0 : 1 + static ResourceStats shmResourceStats{ + .enoughCount = shmResourceState.available - shmResourceSpec.minQuantum > 0 ? 1 : 0, + .lowCount = shmResourceState.available - shmResourceSpec.minQuantum > 0 ? 0 : 1 }; - /// We loop over the devices, starting from where we stopped last time - /// offering MIN_QUANTUM_SHARED_MEMORY of shared memory to each reader. - int64_t lastCandidate = -1; - int64_t possibleOffer = resourceSpec.minQuantum; - - for (size_t di = 0; di < specs.size(); di++) { - if (resourceState.available < possibleOffer) { - if (resourceStats.lowCount == 0) { - O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "not enough", - "We do not have enough shared memory (%{bytes}llu MB) to offer %{bytes}llu MB. Total offerings %{bytes}llu", - resourceState.available, possibleOffer, resourceState.offered); - } - resourceStats.lowCount++; - resourceStats.enoughCount = 0; - break; - } else { - if (resourceStats.enoughCount == 0) { - O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "enough", - "We are back in a state where we enough shared memory: %{bytes}llu MB", resourceState.available); - } - resourceStats.lowCount = 0; - resourceStats.enoughCount++; - } - size_t candidate = (resourceState.lastDeviceOffered + di) % specs.size(); - - auto& info = infos[candidate]; - // Do not bother for inactive devices - // FIXME: there is probably a race condition if the device died and we did not - // took notice yet... - if (info.active == false || info.readyToQuit) { - O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer", - "Device %s is inactive not offering memory to it.", specs[candidate].name.c_str()); - continue; - } - if (specs[candidate].name != "internal-dpl-aod-reader") { - O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer", - "Device %s is not a reader. Not offering memory to it.", specs[candidate].name.c_str()); - continue; - } - possibleOffer = std::min(resourceSpec.maxQuantum, resourceState.available); - O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer", - "Offering %{bytes}llu MB out of %{bytes}llu to %{public}s", - possibleOffer, resourceState.available, specs[candidate].id.c_str()); - manager.queueMessage(specs[candidate].id.c_str(), fmt::format("/shm-offer {}", possibleOffer).data()); - resourceState.available -= possibleOffer; - resourceState.offered += possibleOffer; - lastCandidate = candidate; - } - // We had at least a valid candidate, so - // next time we offer to the next device. - if (lastCandidate >= 0) { - resourceState.lastDeviceOffered = lastCandidate + 1; - } - - // unusedOfferedSharedMemory is the amount of memory which was offered and which we know it was - // not used so far. So we need to account for the amount which got actually read (readerBytesCreated) - // and the amount which we know was given back. - static int64_t lastShmOfferConsumed = 0; - static int64_t lastUnusedOfferedMemory = 0; - if (shmOfferBytesConsumed != lastShmOfferConsumed) { - O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer", - "Offer consumed so far %{bytes}llu", shmOfferBytesConsumed); - lastShmOfferConsumed = shmOfferBytesConsumed; - } - int unusedOfferedMemory = (resourceState.offered - (totalBytesExpired + shmOfferBytesConsumed) / 1000000); - if (lastUnusedOfferedMemory != unusedOfferedMemory) { - O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer", - "unusedOfferedMemory:%{bytes}d = offered:%{bytes}llu - (expired:%{bytes}llu + consumed:%{bytes}llu) / 1000000", - unusedOfferedMemory, resourceState.offered, totalBytesExpired / 1000000, shmOfferBytesConsumed / 1000000); - lastUnusedOfferedMemory = unusedOfferedMemory; - } - // availableSharedMemory is the amount of memory which we know is available to be offered. - // We subtract the amount which we know was already offered but it's unused and we then balance how - // much was created with how much was destroyed. - resourceState.available = resourceSpec.maxAvailable + ((totalBytesDestroyed - totalBytesCreated) / 1000000) - unusedOfferedMemory; - availableSharedMemoryMetric(driverMetrics, resourceState.available, timestamp); - unusedOfferedSharedMemoryMetric(driverMetrics, unusedOfferedMemory, timestamp); - - offeredSharedMemoryMetric(driverMetrics, resourceState.offered, timestamp); }, + offerResources(shmResourceState, shmResourceSpec, shmResourceStats, + specs, infos, manager, shmOfferBytesConsumed, totalBytesExpired, + totalBytesCreated, totalBytesDestroyed, timestamp, driverMetrics, + availableSharedMemoryMetric, unusedOfferedSharedMemoryMetric, offeredSharedMemoryMetric, + (void*)&sm); }, .postDispatching = [](ProcessingContext& ctx, void* service) { using DataHeader = o2::header::DataHeader; auto* arrow = reinterpret_cast(service); diff --git a/Framework/Core/src/DataRelayer.cxx b/Framework/Core/src/DataRelayer.cxx index 092e8340a934a..06e920112649e 100644 --- a/Framework/Core/src/DataRelayer.cxx +++ b/Framework/Core/src/DataRelayer.cxx @@ -52,7 +52,7 @@ #endif #include #include -#include +#include #include using namespace o2::framework::data_matcher; @@ -191,7 +191,7 @@ DataRelayer::ActivityStats DataRelayer::processDanglingInputs(std::vector gsl::span { + auto getPartialRecord = [&cache = mCache, numInputTypes = mDistinctRoutesIndex.size()](int li) -> std::span { auto offset = li * numInputTypes; assert(cache.size() >= offset + numInputTypes); auto const start = cache.data() + offset; @@ -710,7 +710,7 @@ void DataRelayer::getReadyToProcess(std::vector& comp // // We use this to bail out early from the check as soon as we find something // which we know is not complete. - auto getPartialRecord = [&cache, &numInputTypes](int li) -> gsl::span { + auto getPartialRecord = [&cache, &numInputTypes](int li) -> std::span { auto offset = li * numInputTypes; assert(cache.size() >= offset + numInputTypes); auto const start = cache.data() + offset;