Skip to content

Commit 5ade45c

Browse files
committed
DPL analysis: support timeslice rate limiting in DPL resource manager
Use DPL resource manager rather than the ad-hoc solution for reconstruction.
1 parent a70f2e2 commit 5ade45c

15 files changed

+256
-62
lines changed

Framework/AnalysisSupport/src/AODJAlienReaderHelpers.cxx

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,7 @@ AlgorithmSpec AODJAlienReaderHelpers::rootFileReaderCallback(ConfigContext const
145145
stats.updateStats({static_cast<short>(ProcessingStatsId::ARROW_BYTES_DESTROYED), DataProcessingStats::Op::Set, 0});
146146
stats.updateStats({static_cast<short>(ProcessingStatsId::ARROW_MESSAGES_DESTROYED), DataProcessingStats::Op::Set, 0});
147147
stats.updateStats({static_cast<short>(ProcessingStatsId::ARROW_BYTES_EXPIRED), DataProcessingStats::Op::Set, 0});
148+
stats.updateStats({static_cast<short>(ProcessingStatsId::CONSUMED_TIMEFRAMES), DataProcessingStats::Op::Set, 0});
148149

149150
if (!options.isSet("aod-file-private")) {
150151
LOGP(fatal, "No input file defined!");

Framework/Core/include/Framework/CommonDataProcessors.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,9 @@ struct CommonDataProcessors {
3737
/// and simply discards them. @a rateLimitingChannelConfig is the configuration
3838
/// for the rate limiting channel, if any required.
3939
static DataProcessorSpec getDummySink(std::vector<InputSpec> const& danglingInputs, std::string rateLimitingChannelConfig);
40+
/// @return a dummy DataProcessorSpec which requires all the passed @a InputSpec
41+
/// and simply discards them. Rate limiting goes through the DPL driver
42+
static DataProcessorSpec getScheduledDummySink(std::vector<InputSpec> const& danglingInputs);
4043
static AlgorithmSpec wrapWithRateLimiting(AlgorithmSpec spec);
4144
};
4245

Framework/Core/include/Framework/ComputingQuotaOffer.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ struct ComputingQuotaOffer {
4444
int64_t memory = 0;
4545
/// How much shared memory it can allocate
4646
int64_t sharedMemory = 0;
47+
/// How many timeslices it can process without giving back control
48+
int64_t timeslices = 0;
4749
/// How much runtime it can use before giving back the resource
4850
/// in milliseconds.
4951
int64_t runtime = 0;
@@ -68,8 +70,10 @@ struct ComputingQuotaInfo {
6870
/// Statistics on the offers consumed, expired
6971
struct ComputingQuotaStats {
7072
int64_t totalConsumedBytes = 0;
73+
int64_t totalConsumedTimeslices = 0;
7174
int64_t totalConsumedOffers = 0;
7275
int64_t totalExpiredBytes = 0;
76+
int64_t totalExpiredTimeslices = 0;
7377
int64_t totalExpiredOffers = 0;
7478
};
7579

Framework/Core/include/Framework/DataProcessingStats.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,10 @@ enum struct ProcessingStatsId : short {
6060
ARROW_MESSAGES_CREATED,
6161
ARROW_MESSAGES_DESTROYED,
6262
ARROW_BYTES_EXPIRED,
63+
TIMESLICE_NUMBER_EXPIRED,
6364
RESOURCE_OFFER_EXPIRED,
6465
SHM_OFFER_BYTES_CONSUMED,
66+
TIMESLICE_OFFER_NUMBER_CONSUMED,
6567
RESOURCES_MISSING,
6668
RESOURCES_INSUFFICIENT,
6769
RESOURCES_SATISFACTORY,

Framework/Core/include/Framework/ResourcePolicyHelpers.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ namespace o2::framework
2222
struct ResourcePolicyHelpers {
2323
static ResourcePolicy trivialTask(char const* taskMatcher);
2424
static ResourcePolicy cpuBoundTask(char const* taskMatcher, int maxCPUs = 1);
25+
static ResourcePolicy rateLimitedSharedMemoryBoundTask(char const* taskMatcher, int maxMemory, int maxTimeslices);
2526
static ResourcePolicy sharedMemoryBoundTask(char const* taskMatcher, int maxMemory);
2627
};
2728

Framework/Core/src/ArrowSupport.cxx

Lines changed: 72 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ enum struct RateLimitingState {
6565

6666
struct RateLimitConfig {
6767
int64_t maxMemory = 2000;
68-
int64_t maxTimeframes = 0;
68+
int64_t maxTimeframes = 1;
6969
};
7070

7171
struct MetricIndices {
@@ -77,32 +77,28 @@ struct MetricIndices {
7777
size_t shmOfferBytesConsumed = -1;
7878
size_t timeframesRead = -1;
7979
size_t timeframesConsumed = -1;
80+
size_t timeframesExpired = -1;
8081
};
8182

8283
std::vector<MetricIndices> createDefaultIndices(std::vector<DeviceMetricsInfo>& allDevicesMetrics)
8384
{
8485
std::vector<MetricIndices> results;
8586

8687
for (auto& info : allDevicesMetrics) {
87-
MetricIndices indices;
88-
indices.arrowBytesCreated = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-bytes-created");
89-
indices.arrowBytesDestroyed = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-bytes-destroyed");
90-
indices.arrowMessagesCreated = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-messages-created");
91-
indices.arrowMessagesDestroyed = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-messages-destroyed");
92-
indices.arrowBytesExpired = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-bytes-expired");
93-
indices.shmOfferBytesConsumed = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "shm-offer-bytes-consumed");
94-
indices.timeframesRead = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "df-sent");
95-
indices.timeframesConsumed = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "consumed-timeframes");
96-
results.push_back(indices);
88+
results.emplace_back(MetricIndices{
89+
.arrowBytesCreated = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-bytes-created"),
90+
.arrowBytesDestroyed = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-bytes-destroyed"),
91+
.arrowMessagesCreated = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-messages-created"),
92+
.arrowMessagesDestroyed = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-messages-destroyed"),
93+
.arrowBytesExpired = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-bytes-expired"),
94+
.shmOfferBytesConsumed = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "shm-offer-bytes-consumed"),
95+
.timeframesRead = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "df-sent"),
96+
.timeframesConsumed = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "consumed-timeframes"),
97+
.timeframesExpired = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "expired-timeframes")});
9798
}
9899
return results;
99100
}
100101

101-
uint64_t calculateAvailableSharedMemory(ServiceRegistryRef registry)
102-
{
103-
return registry.get<RateLimitConfig>().maxMemory;
104-
}
105-
106102
struct ResourceState {
107103
int64_t available;
108104
int64_t offered = 0;
@@ -205,29 +201,30 @@ auto offerResources(ResourceState& resourceState,
205201
// unusedOfferedSharedMemory is the amount of memory which was offered and which we know it was
206202
// not used so far. So we need to account for the amount which got actually read (readerBytesCreated)
207203
// and the amount which we know was given back.
208-
static int64_t lastShmOfferConsumed = 0;
209-
static int64_t lastUnusedOfferedMemory = 0;
210-
if (offerConsumedCurrentValue != lastShmOfferConsumed) {
204+
static int64_t lastResourceOfferConsumed = 0;
205+
static int64_t lastUnusedOfferedResource = 0;
206+
if (offerConsumedCurrentValue != lastResourceOfferConsumed) {
211207
O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
212208
"Offer consumed so far %llu", offerConsumedCurrentValue);
213-
lastShmOfferConsumed = offerConsumedCurrentValue;
209+
lastResourceOfferConsumed = offerConsumedCurrentValue;
214210
}
215-
int unusedOfferedMemory = (resourceState.offered - (offerExpiredCurrentValue + offerConsumedCurrentValue) / resourceSpec.metricOfferScaleFactor);
216-
if (lastUnusedOfferedMemory != unusedOfferedMemory) {
211+
int unusedOfferedResource = (resourceState.offered - (offerExpiredCurrentValue + offerConsumedCurrentValue) / resourceSpec.metricOfferScaleFactor);
212+
if (lastUnusedOfferedResource != unusedOfferedResource) {
217213
O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
218-
"unusedOfferedMemory:%{bytes}d = offered:%{bytes}llu - (expired:%{bytes}llu + consumed:%{bytes}llu) / %lli",
219-
unusedOfferedMemory, resourceState.offered,
214+
"unusedOfferedResource(%{public}s):%{bytes}d = offered:%{bytes}llu - (expired:%{bytes}llu + consumed:%{bytes}llu) / %lli",
215+
resourceSpec.name,
216+
unusedOfferedResource, resourceState.offered,
220217
offerExpiredCurrentValue / resourceSpec.metricOfferScaleFactor,
221218
offerConsumedCurrentValue / resourceSpec.metricOfferScaleFactor,
222219
resourceSpec.metricOfferScaleFactor);
223-
lastUnusedOfferedMemory = unusedOfferedMemory;
220+
lastUnusedOfferedResource = unusedOfferedResource;
224221
}
225222
// availableSharedMemory is the amount of memory which we know is available to be offered.
226223
// We subtract the amount which we know was already offered but it's unused and we then balance how
227224
// much was created with how much was destroyed.
228-
resourceState.available = resourceSpec.maxAvailable + ((disposedResourceCurrentValue - acquiredResourceCurrentValue) / resourceSpec.metricOfferScaleFactor) - unusedOfferedMemory;
225+
resourceState.available = resourceSpec.maxAvailable + ((disposedResourceCurrentValue - acquiredResourceCurrentValue) / resourceSpec.metricOfferScaleFactor) - unusedOfferedResource;
229226
availableResourceMetric(driverMetrics, resourceState.available, timestamp);
230-
unusedOfferedResourceMetric(driverMetrics, unusedOfferedMemory, timestamp);
227+
unusedOfferedResourceMetric(driverMetrics, unusedOfferedResource, timestamp);
231228

232229
offeredResourceMetric(driverMetrics, resourceState.offered, timestamp);
233230
};
@@ -258,6 +255,7 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
258255
int64_t totalMessagesDestroyed = 0;
259256
int64_t totalTimeframesRead = 0;
260257
int64_t totalTimeframesConsumed = 0;
258+
int64_t totalTimeframesExpired = 0;
261259
auto &driverMetrics = sm.driverMetricsInfo;
262260
auto &allDeviceMetrics = sm.deviceMetricsInfos;
263261
auto &specs = sm.deviceSpecs;
@@ -266,9 +264,14 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
266264
static auto stateMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "rate-limit-state");
267265
static auto totalBytesCreatedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-arrow-bytes-created");
268266
static auto shmOfferConsumedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-shm-offer-bytes-consumed");
267+
// These are really to monitor the rate limiting
269268
static auto unusedOfferedSharedMemoryMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "total-unused-offered-shared-memory");
269+
static auto unusedOfferedTimeslicesMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "total-unused-offered-timeslices");
270270
static auto availableSharedMemoryMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "total-available-shared-memory");
271+
static auto availableTimeslicesMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "total-available-timeslices");
271272
static auto offeredSharedMemoryMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "total-offered-shared-memory");
273+
static auto offeredTimeslicesMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "total-offered-timeslices");
274+
272275
static auto totalBytesDestroyedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-arrow-bytes-destroyed");
273276
static auto totalBytesExpiredMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-arrow-bytes-expired");
274277
static auto totalMessagesCreatedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-arrow-messages-created");
@@ -390,6 +393,18 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
390393
auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
391394
lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
392395
}
396+
{
397+
size_t index = indices.timeframesExpired;
398+
assert(index < deviceMetrics.metrics.size());
399+
changed |= deviceMetrics.changed[index];
400+
MetricInfo info = deviceMetrics.metrics[index];
401+
assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
402+
auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
403+
auto value = (int64_t)data[(info.pos - 1) % data.size()];
404+
totalTimeframesExpired += value;
405+
auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
406+
lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
407+
}
393408
}
394409
static uint64_t unchangedCount = 0;
395410
if (changed) {
@@ -407,26 +422,46 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
407422
unchangedCount++;
408423
}
409424
changedCountMetric(driverMetrics, unchangedCount, timestamp);
410-
auto maxTimeframes = registry.get<RateLimitConfig>().maxTimeframes;
411-
if (maxTimeframes && (totalTimeframesRead - totalTimeframesConsumed) > maxTimeframes) {
412-
return;
413-
}
425+
414426
static const ResourceSpec shmResourceSpec{
415427
.name = "shared memory",
416428
.unit = "MB",
417429
.api = "/shm-offer {}",
418-
.maxAvailable = (int64_t)calculateAvailableSharedMemory(registry),
430+
.maxAvailable = (int64_t)registry.get<RateLimitConfig>().maxMemory,
419431
.maxQuantum = 100,
420432
.minQuantum = 50,
421433
.metricOfferScaleFactor = 1000000,
422434
};
435+
static const ResourceSpec timesliceResourceSpec{
436+
.name = "timeslice",
437+
.unit = "timeslices",
438+
.api = "/timeslice-offer {}",
439+
.maxAvailable = (int64_t)registry.get<RateLimitConfig>().maxTimeframes,
440+
.maxQuantum = 2,
441+
.minQuantum = 1,
442+
.metricOfferScaleFactor = 1,
443+
};
423444
static ResourceState shmResourceState{
424445
.available = shmResourceSpec.maxAvailable,
425446
};
447+
static ResourceState timesliceResourceState{
448+
.available = timesliceResourceSpec.maxAvailable,
449+
};
426450
static ResourceStats shmResourceStats{
427451
.enoughCount = shmResourceState.available - shmResourceSpec.minQuantum > 0 ? 1 : 0,
428452
.lowCount = shmResourceState.available - shmResourceSpec.minQuantum > 0 ? 0 : 1
429453
};
454+
static ResourceStats timesliceResourceStats{
455+
.enoughCount = shmResourceState.available - shmResourceSpec.minQuantum > 0 ? 1 : 0,
456+
.lowCount = shmResourceState.available - shmResourceSpec.minQuantum > 0 ? 0 : 1
457+
};
458+
459+
O2_LOG_ENABLE(rate_limiting);
460+
offerResources(timesliceResourceState, timesliceResourceSpec, timesliceResourceStats,
461+
specs, infos, manager, totalTimeframesConsumed, totalTimeframesExpired,
462+
totalTimeframesRead, totalTimeframesConsumed, timestamp, driverMetrics,
463+
availableTimeslicesMetric, unusedOfferedTimeslicesMetric, offeredTimeslicesMetric,
464+
(void*)&sm);
430465

431466
offerResources(shmResourceState, shmResourceSpec, shmResourceStats,
432467
specs, infos, manager, shmOfferBytesConsumed, totalBytesExpired,
@@ -487,18 +522,18 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
487522
} else {
488523
config->maxMemory = readers * 500;
489524
}
490-
if (dc.options.count("timeframes-rate-limit") && dc.options["timeframes-rate-limit"].as<std::string>() == "readers") {
491-
config->maxTimeframes = readers;
492-
} else {
525+
if (dc.options.count("timeframes-rate-limit") && dc.options["timeframes-rate-limit"].defaulted() == false) {
493526
config->maxTimeframes = std::stoll(dc.options["timeframes-rate-limit"].as<std::string>());
527+
} else {
528+
config->maxTimeframes = readers;
494529
}
495530
static bool once = false;
496531
// Until we guarantee this is called only once...
497532
if (!once) {
498533
O2_SIGNPOST_ID_GENERATE(sid, rate_limiting);
499534
O2_SIGNPOST_EVENT_EMIT_INFO(rate_limiting, sid, "setup",
500-
"Rate limiting set up at %{bytes}llu MB distributed over %d readers",
501-
config->maxMemory, readers);
535+
"Rate limiting set up at %{bytes}llu MB and %llu timeframes distributed over %d readers",
536+
config->maxMemory, config->maxTimeframes, readers);
502537
registry.registerService(ServiceRegistryHelpers::handleForService<RateLimitConfig>(config));
503538
once = true;
504539
} },

0 commit comments

Comments
 (0)