Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Framework/AnalysisSupport/src/AODJAlienReaderHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ AlgorithmSpec AODJAlienReaderHelpers::rootFileReaderCallback(ConfigContext const
stats.updateStats({static_cast<short>(ProcessingStatsId::ARROW_BYTES_DESTROYED), DataProcessingStats::Op::Set, 0});
stats.updateStats({static_cast<short>(ProcessingStatsId::ARROW_MESSAGES_DESTROYED), DataProcessingStats::Op::Set, 0});
stats.updateStats({static_cast<short>(ProcessingStatsId::ARROW_BYTES_EXPIRED), DataProcessingStats::Op::Set, 0});
stats.updateStats({static_cast<short>(ProcessingStatsId::CONSUMED_TIMEFRAMES), DataProcessingStats::Op::Set, 0});

if (!options.isSet("aod-file-private")) {
LOGP(fatal, "No input file defined!");
Expand Down
3 changes: 3 additions & 0 deletions Framework/Core/include/Framework/CommonDataProcessors.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ struct CommonDataProcessors {
/// and simply discards them. @a rateLimitingChannelConfig is the configuration
/// for the rate limiting channel, if any required.
static DataProcessorSpec getDummySink(std::vector<InputSpec> const& danglingInputs, std::string rateLimitingChannelConfig);
/// @return a dummy DataProcessorSpec which requires all the passed @a InputSpec
/// and simply discards them. Rate limiting goes through the DPL driver
static DataProcessorSpec getScheduledDummySink(std::vector<InputSpec> const& danglingInputs);
static AlgorithmSpec wrapWithRateLimiting(AlgorithmSpec spec);
};

Expand Down
4 changes: 4 additions & 0 deletions Framework/Core/include/Framework/ComputingQuotaOffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ struct ComputingQuotaOffer {
int64_t memory = 0;
/// How much shared memory it can allocate
int64_t sharedMemory = 0;
/// How many timeslices it can process without giving back control
int64_t timeslices = 0;
/// How much runtime it can use before giving back the resource
/// in milliseconds.
int64_t runtime = 0;
Expand All @@ -68,8 +70,10 @@ struct ComputingQuotaInfo {
/// Statistics on the offers consumed, expired
struct ComputingQuotaStats {
int64_t totalConsumedBytes = 0;
int64_t totalConsumedTimeslices = 0;
int64_t totalConsumedOffers = 0;
int64_t totalExpiredBytes = 0;
int64_t totalExpiredTimeslices = 0;
int64_t totalExpiredOffers = 0;
};

Expand Down
2 changes: 2 additions & 0 deletions Framework/Core/include/Framework/DataProcessingStats.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,10 @@ enum struct ProcessingStatsId : short {
ARROW_MESSAGES_CREATED,
ARROW_MESSAGES_DESTROYED,
ARROW_BYTES_EXPIRED,
TIMESLICE_NUMBER_EXPIRED,
RESOURCE_OFFER_EXPIRED,
SHM_OFFER_BYTES_CONSUMED,
TIMESLICE_OFFER_NUMBER_CONSUMED,
RESOURCES_MISSING,
RESOURCES_INSUFFICIENT,
RESOURCES_SATISFACTORY,
Expand Down
1 change: 1 addition & 0 deletions Framework/Core/include/Framework/ResourcePolicyHelpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ namespace o2::framework
struct ResourcePolicyHelpers {
static ResourcePolicy trivialTask(char const* taskMatcher);
static ResourcePolicy cpuBoundTask(char const* taskMatcher, int maxCPUs = 1);
static ResourcePolicy rateLimitedSharedMemoryBoundTask(char const* taskMatcher, int maxMemory, int maxTimeslices);
static ResourcePolicy sharedMemoryBoundTask(char const* taskMatcher, int maxMemory);
};

Expand Down
108 changes: 71 additions & 37 deletions Framework/Core/src/ArrowSupport.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ enum struct RateLimitingState {

struct RateLimitConfig {
int64_t maxMemory = 2000;
int64_t maxTimeframes = 0;
int64_t maxTimeframes = 1;
};

struct MetricIndices {
Expand All @@ -77,32 +77,28 @@ struct MetricIndices {
size_t shmOfferBytesConsumed = -1;
size_t timeframesRead = -1;
size_t timeframesConsumed = -1;
size_t timeframesExpired = -1;
};

std::vector<MetricIndices> createDefaultIndices(std::vector<DeviceMetricsInfo>& allDevicesMetrics)
{
std::vector<MetricIndices> results;

for (auto& info : allDevicesMetrics) {
MetricIndices indices;
indices.arrowBytesCreated = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-bytes-created");
indices.arrowBytesDestroyed = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-bytes-destroyed");
indices.arrowMessagesCreated = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-messages-created");
indices.arrowMessagesDestroyed = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-messages-destroyed");
indices.arrowBytesExpired = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-bytes-expired");
indices.shmOfferBytesConsumed = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "shm-offer-bytes-consumed");
indices.timeframesRead = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "df-sent");
indices.timeframesConsumed = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "consumed-timeframes");
results.push_back(indices);
results.emplace_back(MetricIndices{
.arrowBytesCreated = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-bytes-created"),
.arrowBytesDestroyed = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-bytes-destroyed"),
.arrowMessagesCreated = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-messages-created"),
.arrowMessagesDestroyed = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-messages-destroyed"),
.arrowBytesExpired = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-bytes-expired"),
.shmOfferBytesConsumed = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "shm-offer-bytes-consumed"),
.timeframesRead = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "df-sent"),
.timeframesConsumed = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "consumed-timeframes"),
.timeframesExpired = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "expired-timeframes")});
}
return results;
}

uint64_t calculateAvailableSharedMemory(ServiceRegistryRef registry)
{
return registry.get<RateLimitConfig>().maxMemory;
}

struct ResourceState {
int64_t available;
int64_t offered = 0;
Expand Down Expand Up @@ -205,29 +201,30 @@ auto offerResources(ResourceState& resourceState,
// unusedOfferedSharedMemory is the amount of memory which was offered and which we know it was
// not used so far. So we need to account for the amount which got actually read (readerBytesCreated)
// and the amount which we know was given back.
static int64_t lastShmOfferConsumed = 0;
static int64_t lastUnusedOfferedMemory = 0;
if (offerConsumedCurrentValue != lastShmOfferConsumed) {
static int64_t lastResourceOfferConsumed = 0;
static int64_t lastUnusedOfferedResource = 0;
if (offerConsumedCurrentValue != lastResourceOfferConsumed) {
O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
"Offer consumed so far %llu", offerConsumedCurrentValue);
lastShmOfferConsumed = offerConsumedCurrentValue;
lastResourceOfferConsumed = offerConsumedCurrentValue;
}
int unusedOfferedMemory = (resourceState.offered - (offerExpiredCurrentValue + offerConsumedCurrentValue) / resourceSpec.metricOfferScaleFactor);
if (lastUnusedOfferedMemory != unusedOfferedMemory) {
int unusedOfferedResource = (resourceState.offered - (offerExpiredCurrentValue + offerConsumedCurrentValue) / resourceSpec.metricOfferScaleFactor);
if (lastUnusedOfferedResource != unusedOfferedResource) {
O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
"unusedOfferedMemory:%{bytes}d = offered:%{bytes}llu - (expired:%{bytes}llu + consumed:%{bytes}llu) / %lli",
unusedOfferedMemory, resourceState.offered,
"unusedOfferedResource(%{public}s):%{bytes}d = offered:%{bytes}llu - (expired:%{bytes}llu + consumed:%{bytes}llu) / %lli",
resourceSpec.name,
unusedOfferedResource, resourceState.offered,
offerExpiredCurrentValue / resourceSpec.metricOfferScaleFactor,
offerConsumedCurrentValue / resourceSpec.metricOfferScaleFactor,
resourceSpec.metricOfferScaleFactor);
lastUnusedOfferedMemory = unusedOfferedMemory;
lastUnusedOfferedResource = unusedOfferedResource;
}
// availableSharedMemory is the amount of memory which we know is available to be offered.
// We subtract the amount which we know was already offered but it's unused and we then balance how
// much was created with how much was destroyed.
resourceState.available = resourceSpec.maxAvailable + ((disposedResourceCurrentValue - acquiredResourceCurrentValue) / resourceSpec.metricOfferScaleFactor) - unusedOfferedMemory;
resourceState.available = resourceSpec.maxAvailable + ((disposedResourceCurrentValue - acquiredResourceCurrentValue) / resourceSpec.metricOfferScaleFactor) - unusedOfferedResource;
availableResourceMetric(driverMetrics, resourceState.available, timestamp);
unusedOfferedResourceMetric(driverMetrics, unusedOfferedMemory, timestamp);
unusedOfferedResourceMetric(driverMetrics, unusedOfferedResource, timestamp);

offeredResourceMetric(driverMetrics, resourceState.offered, timestamp);
};
Expand Down Expand Up @@ -258,6 +255,7 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
int64_t totalMessagesDestroyed = 0;
int64_t totalTimeframesRead = 0;
int64_t totalTimeframesConsumed = 0;
int64_t totalTimeframesExpired = 0;
auto &driverMetrics = sm.driverMetricsInfo;
auto &allDeviceMetrics = sm.deviceMetricsInfos;
auto &specs = sm.deviceSpecs;
Expand All @@ -266,9 +264,14 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
static auto stateMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "rate-limit-state");
static auto totalBytesCreatedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-arrow-bytes-created");
static auto shmOfferConsumedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-shm-offer-bytes-consumed");
// These are really to monitor the rate limiting
static auto unusedOfferedSharedMemoryMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "total-unused-offered-shared-memory");
static auto unusedOfferedTimeslicesMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "total-unused-offered-timeslices");
static auto availableSharedMemoryMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "total-available-shared-memory");
static auto availableTimeslicesMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "total-available-timeslices");
static auto offeredSharedMemoryMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "total-offered-shared-memory");
static auto offeredTimeslicesMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "total-offered-timeslices");

static auto totalBytesDestroyedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-arrow-bytes-destroyed");
static auto totalBytesExpiredMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-arrow-bytes-expired");
static auto totalMessagesCreatedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-arrow-messages-created");
Expand Down Expand Up @@ -390,6 +393,18 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
}
{
size_t index = indices.timeframesExpired;
assert(index < deviceMetrics.metrics.size());
changed |= deviceMetrics.changed[index];
MetricInfo info = deviceMetrics.metrics[index];
assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
auto value = (int64_t)data[(info.pos - 1) % data.size()];
totalTimeframesExpired += value;
auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
}
}
static uint64_t unchangedCount = 0;
if (changed) {
Expand All @@ -407,26 +422,45 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
unchangedCount++;
}
changedCountMetric(driverMetrics, unchangedCount, timestamp);
auto maxTimeframes = registry.get<RateLimitConfig>().maxTimeframes;
if (maxTimeframes && (totalTimeframesRead - totalTimeframesConsumed) > maxTimeframes) {
return;
}

static const ResourceSpec shmResourceSpec{
.name = "shared memory",
.unit = "MB",
.api = "/shm-offer {}",
.maxAvailable = (int64_t)calculateAvailableSharedMemory(registry),
.maxAvailable = (int64_t)registry.get<RateLimitConfig>().maxMemory,
.maxQuantum = 100,
.minQuantum = 50,
.metricOfferScaleFactor = 1000000,
};
static const ResourceSpec timesliceResourceSpec{
.name = "timeslice",
.unit = "timeslices",
.api = "/timeslice-offer {}",
.maxAvailable = (int64_t)registry.get<RateLimitConfig>().maxTimeframes,
.maxQuantum = 1,
.minQuantum = 1,
.metricOfferScaleFactor = 1,
};
static ResourceState shmResourceState{
.available = shmResourceSpec.maxAvailable,
};
static ResourceState timesliceResourceState{
.available = timesliceResourceSpec.maxAvailable,
};
static ResourceStats shmResourceStats{
.enoughCount = shmResourceState.available - shmResourceSpec.minQuantum > 0 ? 1 : 0,
.lowCount = shmResourceState.available - shmResourceSpec.minQuantum > 0 ? 0 : 1
};
static ResourceStats timesliceResourceStats{
.enoughCount = shmResourceState.available - shmResourceSpec.minQuantum > 0 ? 1 : 0,
.lowCount = shmResourceState.available - shmResourceSpec.minQuantum > 0 ? 0 : 1
};

offerResources(timesliceResourceState, timesliceResourceSpec, timesliceResourceStats,
specs, infos, manager, totalTimeframesConsumed, totalTimeframesExpired,
totalTimeframesRead, totalTimeframesConsumed, timestamp, driverMetrics,
availableTimeslicesMetric, unusedOfferedTimeslicesMetric, offeredTimeslicesMetric,
(void*)&sm);

offerResources(shmResourceState, shmResourceSpec, shmResourceStats,
specs, infos, manager, shmOfferBytesConsumed, totalBytesExpired,
Expand Down Expand Up @@ -487,18 +521,18 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
} else {
config->maxMemory = readers * 500;
}
if (dc.options.count("timeframes-rate-limit") && dc.options["timeframes-rate-limit"].as<std::string>() == "readers") {
config->maxTimeframes = readers;
} else {
if (dc.options.count("timeframes-rate-limit") && dc.options["timeframes-rate-limit"].defaulted() == false) {
config->maxTimeframes = std::stoll(dc.options["timeframes-rate-limit"].as<std::string>());
} else {
config->maxTimeframes = readers;
}
static bool once = false;
// Until we guarantee this is called only once...
if (!once) {
O2_SIGNPOST_ID_GENERATE(sid, rate_limiting);
O2_SIGNPOST_EVENT_EMIT_INFO(rate_limiting, sid, "setup",
"Rate limiting set up at %{bytes}llu MB distributed over %d readers",
config->maxMemory, readers);
"Rate limiting set up at %{bytes}llu MB and %llu timeframes distributed over %d readers",
config->maxMemory, config->maxTimeframes, readers);
registry.registerService(ServiceRegistryHelpers::handleForService<RateLimitConfig>(config));
once = true;
} },
Expand Down
Loading
Loading