Skip to content

Commit 0131399

Browse files
committed
DPL Analysis: write HistogramRegistry incrementally
This should reduce big spikes at the end of the processing when a large HistogramRegistry is serialised.
1 parent 95da11c commit 0131399

File tree

6 files changed

+146
-134
lines changed

6 files changed

+146
-134
lines changed

Framework/AnalysisSupport/src/AODWriterHelpers.cxx

Lines changed: 70 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,20 @@
2222
#include "Framework/DataOutputDirector.h"
2323
#include "Framework/TableTreeHelpers.h"
2424
#include "Framework/Monitoring.h"
25+
#include "Framework/Signpost.h"
2526

2627
#include <Monitoring/Monitoring.h>
28+
#include <TDirectory.h>
2729
#include <TFile.h>
2830
#include <TFile.h>
2931
#include <TTree.h>
3032
#include <TMap.h>
3133
#include <TObjString.h>
3234
#include <arrow/table.h>
35+
#include <chrono>
36+
#include <ios>
37+
38+
O2_DECLARE_DYNAMIC_LOG(histogram_registry);
3339

3440
namespace o2::framework::writers
3541
{
@@ -46,6 +52,7 @@ struct InputObjectRoute {
4652
struct InputObject {
4753
TClass* kind = nullptr;
4854
void* obj = nullptr;
55+
std::string container;
4956
std::string name;
5057
int count = -1;
5158
};
@@ -273,24 +280,30 @@ AlgorithmSpec AODWriterHelpers::getOutputObjHistWriter(ConfigContext const& ctx)
273280
callbacks.set<CallbackService::Id::EndOfStream>(endofdatacb);
274281
return [inputObjects, objmap, tskmap](ProcessingContext& pc) mutable -> void {
275282
auto mergePart = [&inputObjects, &objmap, &tskmap, &pc](DataRef const& ref) {
283+
O2_SIGNPOST_ID_GENERATE(hid, histogram_registry);
284+
O2_SIGNPOST_START(histogram_registry, hid, "mergePart", "Merging histogram");
276285
if (!ref.header) {
277-
LOG(error) << "Header not found";
286+
O2_SIGNPOST_END_WITH_ERROR(histogram_registry, hid, "mergePart", "Header not found.");
278287
return;
279288
}
280289
auto datah = o2::header::get<o2::header::DataHeader*>(ref.header);
281290
if (!datah) {
282-
LOG(error) << "No data header in stack";
291+
O2_SIGNPOST_END_WITH_ERROR(histogram_registry, hid, "mergePart", "No data header in stack");
283292
return;
284293
}
285294

286295
if (!ref.payload) {
287-
LOGP(error, "Payload not found for {}/{}/{}", datah->dataOrigin.as<std::string>(), datah->dataDescription.as<std::string>(), datah->subSpecification);
296+
O2_SIGNPOST_END_WITH_ERROR(histogram_registry, hid, "mergePart", "Payload not found for %{public}s/%{public}s/%d",
297+
datah->dataOrigin.as<std::string>().c_str(), datah->dataDescription.as<std::string>().c_str(),
298+
datah->subSpecification);
288299
return;
289300
}
290301

291302
auto objh = o2::header::get<o2::framework::OutputObjHeader*>(ref.header);
292303
if (!objh) {
293-
LOGP(error, "No output object header in stack of {}/{}/{}", datah->dataOrigin.as<std::string>(), datah->dataDescription.as<std::string>(), datah->subSpecification);
304+
O2_SIGNPOST_END_WITH_ERROR(histogram_registry, hid, "mergePart", "No output object header in stack of %{public}s/%{public}s/%d.",
305+
datah->dataOrigin.as<std::string>().c_str(), datah->dataDescription.as<std::string>().c_str(),
306+
datah->subSpecification);
294307
return;
295308
}
296309

@@ -301,7 +314,9 @@ AlgorithmSpec AODWriterHelpers::getOutputObjHistWriter(ConfigContext const& ctx)
301314
tm.SetBufferOffset(0);
302315
tm.ResetMap();
303316
if (obj.kind == nullptr) {
304-
LOGP(error, "Cannot read class info from buffer of {}/{}/{}", datah->dataOrigin.as<std::string>(), datah->dataDescription.as<std::string>(), datah->subSpecification);
317+
O2_SIGNPOST_END_WITH_ERROR(histogram_registry, hid, "mergePart", "Cannot read class info from buffer of %{public}s/%{public}s/%d.",
318+
datah->dataOrigin.as<std::string>().c_str(), datah->dataDescription.as<std::string>().c_str(),
319+
datah->subSpecification);
305320
return;
306321
}
307322

@@ -312,20 +327,29 @@ AlgorithmSpec AODWriterHelpers::getOutputObjHistWriter(ConfigContext const& ctx)
312327
obj.obj = tm.ReadObjectAny(obj.kind);
313328
auto* named = static_cast<TNamed*>(obj.obj);
314329
obj.name = named->GetName();
330+
// If we have a folder, we assume the first element of the path
331+
// to be the name of the registry.
332+
if (sourceType == HistogramRegistrySource) {
333+
obj.container = objh->containerName;
334+
} else {
335+
obj.container = obj.name;
336+
}
315337
auto hpos = std::find_if(tskmap.begin(), tskmap.end(), [&](auto&& x) { return x.id == hash; });
316338
if (hpos == tskmap.end()) {
317-
LOG(error) << "No task found for hash " << hash;
339+
O2_SIGNPOST_END_WITH_ERROR(histogram_registry, hid, "mergePart", "No task found for hash %d.", hash);
318340
return;
319341
}
320342
auto taskname = hpos->name;
321343
auto opos = std::find_if(objmap.begin(), objmap.end(), [&](auto&& x) { return x.id == hash; });
322344
if (opos == objmap.end()) {
323-
LOG(error) << "No object list found for task " << taskname << " (hash=" << hash << ")";
345+
O2_SIGNPOST_END_WITH_ERROR(histogram_registry, hid, "mergePart", "No object list found for task %{public}s (hash=%d).",
346+
taskname.c_str(), hash);
324347
return;
325348
}
326349
auto objects = opos->bindings;
327-
if (std::find(objects.begin(), objects.end(), obj.name) == objects.end()) {
328-
LOG(error) << "No object " << obj.name << " in map for task " << taskname;
350+
if (std::find(objects.begin(), objects.end(), obj.container) == objects.end()) {
351+
O2_SIGNPOST_END_WITH_ERROR(histogram_registry, hid, "mergePart", "No container %{public}s in map for task %{public}s.",
352+
obj.container.c_str(), taskname.c_str());
329353
return;
330354
}
331355
auto nameHash = runtime_hash(obj.name.c_str());
@@ -334,14 +358,14 @@ AlgorithmSpec AODWriterHelpers::getOutputObjHistWriter(ConfigContext const& ctx)
334358
// If it's the first one, we just add it to the list.
335359
if (existing == inputObjects->end()) {
336360
obj.count = objh->mPipelineSize;
337-
inputObjects->push_back(std::make_pair(key, obj));
361+
inputObjects->emplace_back(key, obj);
338362
existing = inputObjects->end() - 1;
339363
} else {
340364
obj.count = existing->second.count;
341365
// Otherwise, we merge it with the existing one.
342366
auto merger = existing->second.kind->GetMerge();
343367
if (!merger) {
344-
LOG(error) << "Already one unmergeable object found for " << obj.name;
368+
O2_SIGNPOST_END_WITH_ERROR(histogram_registry, hid, "mergePart", "Already one unmergeable object found for %{public}s", obj.name.c_str());
345369
return;
346370
}
347371
TList coll;
@@ -353,13 +377,15 @@ AlgorithmSpec AODWriterHelpers::getOutputObjHistWriter(ConfigContext const& ctx)
353377
existing->second.count -= 1;
354378

355379
if (existing->second.count != 0) {
380+
O2_SIGNPOST_END(histogram_registry, hid, "mergePart", "Pipeline lanes still missing.");
356381
return;
357382
}
358383
// Write the object here.
359384
auto route = existing->first;
360385
auto entry = existing->second;
361386
auto file = ROOTfileNames.find(route.policy);
362387
if (file == ROOTfileNames.end()) {
388+
O2_SIGNPOST_END(histogram_registry, hid, "mergePart", "Not matching any file.");
363389
return;
364390
}
365391
auto filename = file->second;
@@ -375,53 +401,51 @@ AlgorithmSpec AODWriterHelpers::getOutputObjHistWriter(ConfigContext const& ctx)
375401
currentFile = filename;
376402
}
377403

378-
// translate the list-structure created by the registry into a directory structure within the file
379-
std::function<void(TList*, TDirectory*)> writeListToFile;
380-
writeListToFile = [&](TList* list, TDirectory* parentDir) {
381-
TIter next(list);
382-
TObject* object = nullptr;
383-
while ((object = next())) {
384-
if (object->InheritsFrom(TList::Class())) {
385-
writeListToFile(static_cast<TList*>(object), parentDir->mkdir(object->GetName(), object->GetName(), true));
386-
} else {
387-
int objSize = parentDir->WriteObjectAny(object, object->Class(), object->GetName());
388-
static int maxSizeWritten = 0;
389-
if (objSize > maxSizeWritten) {
390-
auto& monitoring = pc.services().get<Monitoring>();
391-
maxSizeWritten = objSize;
392-
monitoring.send(Metric{fmt::format("{}/{}:{}", object->ClassName(), object->GetName(), objSize), "aod-largest-object-written"}.addTag(tags::Key::Subsystem, tags::Value::DPL));
393-
}
394-
auto* written = list->Remove(object);
395-
delete written;
396-
}
404+
// FIXME: handle folders
405+
f[route.policy]->cd("/");
406+
auto* currentDir = f[route.policy]->GetDirectory(currentDirectory.c_str());
407+
// The name contains a path...
408+
if (sourceType == HistogramRegistrySource) {
409+
TDirectory* currentFolder = currentDir;
410+
O2_SIGNPOST_EVENT_EMIT(histogram_registry, hid, "mergePart", "Toplevel folder is %{public}s.",
411+
currentDir->GetName());
412+
std::string objName = entry.name;
413+
auto lastSlash = entry.name.rfind('/');
414+
auto containerName = obj.container;
415+
if (lastSlash != std::string::npos) {
416+
auto dirname = entry.name.substr(0, lastSlash);
417+
objName = entry.name.substr(lastSlash + 1);
418+
containerName += "/" + dirname;
397419
}
398-
};
399-
400-
TDirectory* currentDir = f[route.policy]->GetDirectory(currentDirectory.c_str());
401-
if (route.sourceType == OutputObjSourceType::HistogramRegistrySource) {
402-
auto* outputList = static_cast<TList*>(entry.obj);
403-
outputList->SetOwner(false);
404-
405-
// if registry should live in dedicated folder a TNamed object is appended to the list
406-
if (outputList->Last() && outputList->Last()->IsA() == TNamed::Class()) {
407-
delete outputList->Last();
408-
outputList->RemoveLast();
409-
currentDir = currentDir->mkdir(outputList->GetName(), outputList->GetName(), true);
420+
currentFolder = currentDir->GetDirectory(containerName.c_str());
421+
if (!currentFolder) {
422+
O2_SIGNPOST_EVENT_EMIT(histogram_registry, hid, "mergePart", "Creating folder %{public}s",
423+
containerName.c_str());
424+
currentFolder = currentDir->mkdir(containerName.c_str(), "", kTRUE);
425+
} else {
426+
O2_SIGNPOST_EVENT_EMIT(histogram_registry, hid, "mergePart", "Folder %{public}s already there.",
427+
currentFolder->GetName());
410428
}
411-
412-
writeListToFile(outputList, currentDir);
413-
outputList->SetOwner();
414-
delete outputList;
429+
O2_SIGNPOST_EVENT_EMIT(histogram_registry, hid, "mergePart", "Writing %{public}s of kind %{public}s in %{public}s",
430+
entry.name.c_str(), entry.kind->GetName(), currentDir->GetName());
431+
currentFolder->WriteObjectAny(entry.obj, entry.kind, objName.c_str());
432+
delete (TObject*)entry.obj;
415433
entry.obj = nullptr;
416434
} else {
435+
O2_SIGNPOST_EVENT_EMIT(histogram_registry, hid, "mergePart", "Writing %{public}s of kind %{public}s in %{public}s",
436+
entry.name.c_str(), entry.kind->GetName(), currentDir->GetName());
417437
currentDir->WriteObjectAny(entry.obj, entry.kind, entry.name.c_str());
418438
delete (TObject*)entry.obj;
419439
entry.obj = nullptr;
420440
}
441+
O2_SIGNPOST_END(histogram_registry, hid, "mergePart", "Done merging.");
421442
};
443+
O2_SIGNPOST_ID_GENERATE(rid, histogram_registry);
444+
O2_SIGNPOST_START(histogram_registry, rid, "process", "Start merging %zu parts received together.", pc.inputs().getNofParts(0));
422445
for (int pi = 0; pi < pc.inputs().getNofParts(0); ++pi) {
423446
mergePart(pc.inputs().get("x", pi));
424447
}
448+
O2_SIGNPOST_END(histogram_registry, rid, "process", "Done histograms in multipart message.");
425449
};
426450
}};
427451
}

Framework/Core/include/Framework/AnalysisManagers.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
#ifndef FRAMEWORK_ANALYSISMANAGERS_H
1313
#define FRAMEWORK_ANALYSISMANAGERS_H
14+
#include "DataAllocator.h"
1415
#include "Framework/AnalysisHelpers.h"
1516
#include "Framework/DataSpecUtils.h"
1617
#include "Framework/GroupedCombinations.h"
@@ -247,7 +248,10 @@ template <is_histogram_registry T>
247248
bool postRunOutput(EndOfStreamContext& context, T& hr)
248249
{
249250
auto& deviceSpec = context.services().get<o2::framework::DeviceSpec const>();
250-
context.outputs().snapshot(hr.ref(deviceSpec.inputTimesliceId, deviceSpec.maxInputTimeslices), *(hr.getListOfHistograms()));
251+
auto sendHistos = [deviceSpec, &context](HistogramRegistry const& self, TNamed* obj) mutable {
252+
context.outputs().snapshot(self.ref(deviceSpec.inputTimesliceId, deviceSpec.maxInputTimeslices), *obj);
253+
};
254+
hr.apply(sendHistos);
251255
hr.clean();
252256
return true;
253257
}

Framework/Core/include/Framework/HistogramRegistry.h

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -173,16 +173,15 @@ class HistogramRegistry
173173
template <typename T>
174174
std::shared_ptr<T> operator()(const HistName& histName);
175175

176+
// Apply @a callback on every single entry in the registry
177+
void apply(std::function<void(HistogramRegistry const&, TNamed* named)> callback) const;
176178
// return the OutputSpec associated to the HistogramRegistry
177179
OutputSpec const spec();
178180

179-
OutputRef ref(uint16_t idx, uint16_t pipelineSize);
181+
OutputRef ref(uint16_t idx, uint16_t pipelineSize) const;
180182

181183
void setHash(uint32_t hash);
182184

183-
/// returns the list of histograms, properly sorted for writing.
184-
TList* getListOfHistograms();
185-
186185
/// deletes all the histograms from the registry
187186
void clean();
188187

@@ -220,16 +219,13 @@ class HistogramRegistry
220219

221220
// helper function to find the histogram position in the registry
222221
template <typename T>
223-
uint32_t getHistIndex(const T& histName);
222+
uint32_t getHistIndex(const T& histName) const;
224223

225224
constexpr uint32_t imask(uint32_t i) const
226225
{
227226
return i & REGISTRY_BITMASK;
228227
}
229228

230-
// helper function to create resp. find the subList defined by path
231-
TList* getSubList(TList* list, std::deque<std::string>& path);
232-
233229
// helper function to split user defined path/to/hist/name string
234230
std::deque<std::string> splitPath(const std::string& pathAndNameUser);
235231

@@ -431,7 +427,7 @@ std::shared_ptr<T> HistogramRegistry::operator()(const HistName& histName)
431427
}
432428

433429
template <typename T>
434-
uint32_t HistogramRegistry::getHistIndex(const T& histName)
430+
uint32_t HistogramRegistry::getHistIndex(const T& histName) const
435431
{
436432
if (O2_BUILTIN_LIKELY(histName.hash == mRegistryKey[histName.idx])) {
437433
return histName.idx;

Framework/Core/include/Framework/OutputObjHeader.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ struct OutputObjHeader : public BaseHeader {
4444
uint32_t mTaskHash;
4545
uint16_t mPipelineIndex = 0;
4646
uint16_t mPipelineSize = 1;
47+
// Name of the actual container for the object, e.g. the HistogramRegistry name
48+
char containerName[64] = {0};
4749

4850
constexpr OutputObjHeader()
4951
: BaseHeader(sizeof(OutputObjHeader), sHeaderType, sSerializationMethod, sVersion),

0 commit comments

Comments
 (0)