Skip to content

Commit be4cb11

Browse files
committed
WIP DPL Analysis: centralised CCDB support in analysis
Thanks to the newly added binary view columns we can finally support proper CCDB integration in analysis. In order to do so, the user needs to create a TIMESTAMPED table, i.e. a table which is an extension of another one where the timestamps for each rows are provided. The extra columns of such timestamped table will be CCDB columns where the iterator of each provides access for one specified CCDB object.
1 parent 94e1b94 commit be4cb11

File tree

11 files changed

+247
-6
lines changed

11 files changed

+247
-6
lines changed

Framework/CCDBSupport/CMakeLists.txt

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Copyright 2019-2020 CERN and copyright holders of ALICE O2.
1+
# Copyright 2019-2025 CERN and copyright holders of ALICE O2.
22
# See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
33
# All rights not expressly granted are reserved.
44
#
@@ -9,9 +9,10 @@
99
# granted to it by virtue of its status as an Intergovernmental Organization
1010
# or submit itself to any jurisdiction.
1111
o2_add_library(FrameworkCCDBSupport
12-
SOURCES
12+
SOURCES
1313
src/Plugin.cxx
1414
src/CCDBHelpers.cxx
15+
src/AnalysisCCDBHelpers.cxx
1516
PRIVATE_INCLUDE_DIRECTORIES ${CMAKE_CURRENT_LIST_DIR}/src
1617
PUBLIC_LINK_LIBRARIES O2::Framework O2::CCDB)
1718

Framework/CCDBSupport/src/Plugin.cxx

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
// or submit itself to any jurisdiction.
1111
#include "Framework/Plugins.h"
1212
#include "Framework/AlgorithmSpec.h"
13+
#include "AnalysisCCDBHelpers.h"
1314
#include "CCDBHelpers.h"
1415

1516
struct CCDBFetcherPlugin : o2::framework::AlgorithmPlugin {
@@ -19,6 +20,14 @@ struct CCDBFetcherPlugin : o2::framework::AlgorithmPlugin {
1920
}
2021
};
2122

23+
struct AnalysisCCDBFetcherPlugin : o2::framework::AlgorithmPlugin {
24+
o2::framework::AlgorithmSpec create(o2::framework::ConfigContext const&) final
25+
{
26+
return o2::framework::AnalysisCCDBHelpers::fetchFromCCDB();
27+
}
28+
};
29+
2230
DEFINE_DPL_PLUGINS_BEGIN
2331
DEFINE_DPL_PLUGIN_INSTANCE(CCDBFetcherPlugin, CustomAlgorithm);
32+
DEFINE_DPL_PLUGIN_INSTANCE(AnalysisCCDBFetcherPlugin, CustomAlgorithm);
2433
DEFINE_DPL_PLUGINS_END

Framework/Core/include/Framework/ASoA.h

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2248,11 +2248,14 @@ ColumnGetterFunction<R, typename T::iterator> getColumnGetterByLabel(const std::
22482248

22492249
namespace o2::aod
22502250
{
2251+
// If you get an error about not satisfying is_origin_hash, you need to add
2252+
// an entry here.
22512253
O2ORIGIN("AOD");
22522254
O2ORIGIN("AOD1");
22532255
O2ORIGIN("AOD2");
22542256
O2ORIGIN("DYN");
22552257
O2ORIGIN("IDX");
2258+
O2ORIGIN("TIM");
22562259
O2ORIGIN("JOIN");
22572260
O2HASH("JOIN/0");
22582261
O2ORIGIN("CONC");
@@ -2313,6 +2316,43 @@ consteval static std::string_view namespace_prefix()
23132316
}; \
23142317
[[maybe_unused]] static constexpr o2::framework::expressions::BindingNode _Getter_ { _Label_, _Name_::hash, o2::framework::expressions::selectArrowType<_Type_>() }
23152318

2319+
#define DECLARE_SOA_CCDB_COLUMN_FULL(_Name_, _Label_, _Getter_, _ConcreteType_, _CCDBQuery_) \
2320+
struct _Name_ : o2::soa::Column<std::span<std::byte>, _Name_> { \
2321+
static constexpr const char* mLabel = _Label_; \
2322+
static constexpr const char* query = _CCDBQuery_; \
2323+
static constexpr const uint32_t hash = crc32(namespace_prefix<_Name_>(), std::string_view{#_Getter_}); \
2324+
using base = o2::soa::Column<std::span<std::byte>, _Name_>; \
2325+
using type = std::span<std::byte>; \
2326+
using column_t = _Name_; \
2327+
_Name_(arrow::ChunkedArray const* column) \
2328+
: o2::soa::Column<std::span<std::byte>, _Name_>(o2::soa::ColumnIterator<std::span<std::byte>>(column)) \
2329+
{ \
2330+
} \
2331+
\
2332+
_Name_() = default; \
2333+
_Name_(_Name_ const& other) = default; \
2334+
_Name_& operator=(_Name_ const& other) = default; \
2335+
\
2336+
decltype(auto) _Getter_() const \
2337+
{ \
2338+
static std::byte* payload = nullptr; \
2339+
static _ConcreteType_* deserialised = nullptr; \
2340+
auto span = *mColumnIterator; \
2341+
if (payload != (std::byte*)span.data()) { \
2342+
payload = (std::byte*)span.data(); \
2343+
TBufferFile f(TBufferFile::EMode::kRead, span.size(), (char*)span.data(), kFALSE); \
2344+
deserialised = (_ConcreteType_*)f.ReadObjectAny(TClass::GetClass(#_ConcreteType_)); \
2345+
} \
2346+
return *deserialised; \
2347+
} \
2348+
\
2349+
decltype(auto) \
2350+
get() const \
2351+
{ \
2352+
return _Getter_(); \
2353+
} \
2354+
};
2355+
23162356
#define DECLARE_SOA_COLUMN(_Name_, _Getter_, _Type_) \
23172357
DECLARE_SOA_COLUMN_FULL(_Name_, _Getter_, _Type_, "f" #_Name_)
23182358

@@ -3188,6 +3228,36 @@ consteval auto getIndexTargets()
31883228
using metadata = _Name_##Metadata; \
31893229
};
31903230

3231+
// Declare were each row is associated to a timestamp column of an _TimestampSource_
3232+
// table.
3233+
//
3234+
// The columns of this table have to be CCDB_COLUMNS so that for each timestamp, we get a row
3235+
// which points to the specified CCDB objectes described by those columns.
3236+
#define DECLARE_SOA_TIMESTAMPED_TABLE_FULL(_Name_, _Label_, _TimestampSource_, _TimestampColumn_, _Origin_, _Version_, _Desc_, ...) \
3237+
O2HASH(_Desc_ "/" #_Version_); \
3238+
template <typename O> \
3239+
using _Name_##TimestampFrom = soa::Table<o2::aod::Hash<_Label_ ""_h>, o2::aod::Hash<_Desc_ "/" #_Version_ ""_h>, O>; \
3240+
using _Name_##Timestamp = _Name_##TimestampFrom<o2::aod::Hash<_Origin_ ""_h>>; \
3241+
template <typename O = o2::aod::Hash<_Origin_ ""_h>> \
3242+
struct _Name_##TimestampMetadataFrom : TableMetadata<o2::aod::Hash<_Desc_ "/" #_Version_ ""_h>, __VA_ARGS__> { \
3243+
using base_table_t = _TimestampSource_; \
3244+
using extension_table_t = _Name_##TimestampFrom<O>; \
3245+
using ccdb_pack_t = framework::pack<__VA_ARGS__>; \
3246+
/*static constexpr auto timestampColumn = _TimestampColumn_;*/ \
3247+
}; \
3248+
using _Name_##TimestampMetadata = _Name_##TimestampMetadataFrom<o2::aod::Hash<_Origin_ ""_h>>; \
3249+
template <> \
3250+
struct MetadataTrait<o2::aod::Hash<_Desc_ "/" #_Version_ ""_h>> { \
3251+
using metadata = _Name_##TimestampMetadata; \
3252+
}; \
3253+
template <typename O> \
3254+
using _Name_##From = o2::soa::JoinFull<o2::aod::Hash<_Desc_ "/" #_Version_ ""_h>, _TimestampSource_, _Name_##TimestampFrom<O>>; \
3255+
using _Name_ = _Name_##From<o2::aod::Hash<_Origin_ ""_h>>;
3256+
3257+
#define DECLARE_SOA_TIMESTAMPED_TABLE(_Name_, _TimestampSource_, _TimestampColumn_, _Version_, _Desc_, ...) \
3258+
O2HASH(#_Name_ "Timestamped"); \
3259+
DECLARE_SOA_TIMESTAMPED_TABLE_FULL(_Name_, #_Name_ "Timestamped", _TimestampSource_, _TimestampColumn_, "TIM", _Version_, _Desc_, __VA_ARGS__)
3260+
31913261
#define DECLARE_SOA_INDEX_TABLE(_Name_, _Key_, _Description_, ...) \
31923262
DECLARE_SOA_INDEX_TABLE_FULL(_Name_, _Key_, "IDX", 0, _Description_, false, __VA_ARGS__)
31933263

Framework/Core/include/Framework/AnalysisContext.h

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,16 +29,24 @@ struct OutputObjectInfo {
2929
std::vector<std::string> bindings;
3030
};
3131

32-
//
32+
// This will keep track of the inputs which have
33+
// been requested and for which we will need to inject
34+
// some source device.
3335
struct AnalysisContext {
3436
std::vector<InputSpec> requestedAODs;
3537
std::vector<OutputSpec> providedAODs;
3638
std::vector<InputSpec> requestedDYNs;
3739
std::vector<OutputSpec> providedDYNs;
3840
std::vector<InputSpec> requestedIDXs;
41+
std::vector<OutputSpec> providedTIMs;
42+
std::vector<InputSpec> requestedTIMs;
3943
std::vector<OutputSpec> providedOutputObjHist;
4044
std::vector<InputSpec> spawnerInputs;
4145

46+
// These are the timestamped tables which are required to
47+
// inject the the CCDB objecs.
48+
std::vector<InputSpec> analysisCCDBInputs;
49+
4250
// Needed to created the hist writer
4351
std::vector<OutputTaskInfo> outTskMap;
4452
std::vector<OutputObjectInfo> outObjHistMap;

Framework/Core/include/Framework/AnalysisSupportHelpers.h

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,13 @@ struct AnalysisSupportHelpers {
4343
std::vector<InputSpec>& requestedAODs,
4444
std::vector<InputSpec>& requestedDYNs,
4545
DataProcessorSpec& publisher);
46+
static void addMissingOutputsToAnalysisCCDBFetcher(std::vector<OutputSpec> const& providedTimestampedCCDBObjecs,
47+
std::vector<InputSpec> const& requestedTimestampedCCDBObjects,
48+
DataProcessorSpec& publisher);
4649

47-
/// Match all inputs of kind ATSK and write them to a ROOT file,
48-
/// one root file per originating task.
49-
static DataProcessorSpec getOutputObjHistSink(ConfigContext const&);
50+
/// Match all inputs of kind ATSK and write them to a ROOT file,
51+
/// one root file per originating task.
52+
static DataProcessorSpec getOutputObjHistSink(ConfigContext const&);
5053
/// writes inputs of kind AOD to file
5154
static DataProcessorSpec getGlobalAODSink(ConfigContext const&);
5255
/// Get the data director

Framework/Core/include/Framework/AnalysisTask.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
#include "Framework/TypeIdHelpers.h"
2727
#include "Framework/ArrowTableSlicingCache.h"
2828
#include "Framework/AnalysisDataModel.h"
29+
#include <iostream>
2930

3031
#include <arrow/compute/kernel.h>
3132
#include <arrow/table.h>
@@ -133,6 +134,7 @@ struct AnalysisDataProcessorBuilder {
133134
template <soa::is_table... As>
134135
static void addInputsAndExpressions(uint32_t hash, const char* name, bool value, std::vector<InputSpec>& inputs, std::vector<ExpressionInfo>& eInfos)
135136
{
137+
std::cout << "name: " << name << std::endl;
136138
int ai = -1;
137139
([&ai, &hash, &eInfos, &name, &value, &inputs]() mutable {
138140
++ai;

Framework/Core/src/AnalysisSupportHelpers.cxx

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,34 @@ void AnalysisSupportHelpers::addMissingOutputsToBuilder(std::vector<InputSpec> c
207207
}
208208
}
209209

210+
void AnalysisSupportHelpers::addMissingOutputsToAnalysisCCDBFetcher(
211+
std::vector<OutputSpec> const& providedTimestampedCDBObjects,
212+
std::vector<InputSpec> const& requestedTimestampedCCDBObjects,
213+
DataProcessorSpec& publisher)
214+
{
215+
for (auto& input : requestedTimestampedCCDBObjects) {
216+
auto concrete = DataSpecUtils::asConcreteDataMatcher(input);
217+
publisher.outputs.emplace_back(concrete.origin, concrete.description, concrete.subSpec);
218+
// FIXME: good enough for now...
219+
for (auto& i : input.metadata) {
220+
if ((i.type == VariantType::String) && (i.name.find("input:") != std::string::npos)) {
221+
auto value = i.defaultValue.get<std::string>();
222+
std::cout << "XXX " << value << std::endl;
223+
// auto spec = DataSpecUtils::fromMetadataString(i.defaultValue.get<std::string>());
224+
// auto j = std::find_if(publisher.inputs.begin(), publisher.inputs.end(), [&](auto x) { return x.binding == spec.binding; });
225+
// if (j == publisher.inputs.end()) {
226+
// publisher.inputs.push_back(spec);
227+
// }
228+
// if (DataSpecUtils::partialMatch(spec, AODOrigins)) {
229+
// DataSpecUtils::updateInputList(requestedAODs, std::move(spec));
230+
// } else if (DataSpecUtils::partialMatch(spec, header::DataOrigin{"DYN"})) {
231+
// DataSpecUtils::updateInputList(requestedDYNs, std::move(spec));
232+
// }
233+
}
234+
}
235+
}
236+
}
237+
210238
// =============================================================================
211239
DataProcessorSpec AnalysisSupportHelpers::getOutputObjHistSink(ConfigContext const& ctx)
212240
{

Framework/Core/src/ArrowSupport.cxx

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
#include "Framework/ServiceRegistry.h"
2020
#include "Framework/ConfigContext.h"
2121
#include "Framework/CommonDataProcessors.h"
22+
#include "Framework/DataSpecUtils.h"
23+
#include "Framework/DataSpecViews.h"
2224
#include "Framework/DeviceSpec.h"
2325
#include "Framework/EndOfStreamContext.h"
2426
#include "Framework/Tracing.h"
@@ -27,6 +29,7 @@
2729
#include "Framework/DeviceInfo.h"
2830
#include "Framework/DevicesManager.h"
2931
#include "Framework/DeviceConfig.h"
32+
#include "Framework/PluginManager.h"
3033
#include "Framework/ServiceMetricsInfo.h"
3134
#include "WorkflowHelpers.h"
3235
#include "Framework/WorkflowSpecNode.h"
@@ -441,13 +444,16 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
441444
.adjustTopology = [](WorkflowSpecNode& node, ConfigContext const& ctx) {
442445
auto& workflow = node.specs;
443446
auto spawner = std::find_if(workflow.begin(), workflow.end(), [](DataProcessorSpec const& spec) { return spec.name == "internal-dpl-aod-spawner"; });
447+
auto analysisCCDB = std::find_if(workflow.begin(), workflow.end(), [](DataProcessorSpec const& spec) { return spec.name == "internal-dpl-aod-ccdb"; });
444448
auto builder = std::find_if(workflow.begin(), workflow.end(), [](DataProcessorSpec const& spec) { return spec.name == "internal-dpl-aod-index-builder"; });
445449
auto reader = std::find_if(workflow.begin(), workflow.end(), [](DataProcessorSpec const& spec) { return spec.name == "internal-dpl-aod-reader"; });
446450
auto writer = std::find_if(workflow.begin(), workflow.end(), [](DataProcessorSpec const& spec) { return spec.name == "internal-dpl-aod-writer"; });
447451
auto &ac = ctx.services().get<AnalysisContext>();
448452
ac.requestedAODs.clear();
449453
ac.requestedDYNs.clear();
450454
ac.providedDYNs.clear();
455+
ac.providedTIMs.clear();
456+
ac.requestedTIMs.clear();
451457

452458

453459
auto inputSpecLessThan = [](InputSpec const& lhs, InputSpec const& rhs) { return DataSpecUtils::describe(lhs) < DataSpecUtils::describe(rhs); };
@@ -511,6 +517,26 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
511517
AnalysisSupportHelpers::addMissingOutputsToSpawner({}, ac.spawnerInputs, ac.requestedAODs, *spawner);
512518
}
513519

520+
if (analysisCCDB != workflow.end()) {
521+
for (auto& d : workflow | views::exclude_by_name(analysisCCDB->name)) {
522+
d.inputs | views::partial_match_filter(header::DataOrigin{"TIM"}) | sinks::update_input_list(ac.requestedTIMs);
523+
d.outputs | views::partial_match_filter(header::DataOrigin{"TIM"}) | sinks::append_to(ac.providedTIMs);
524+
}
525+
std::sort(ac.requestedTIMs.begin(), ac.requestedTIMs.end(), inputSpecLessThan);
526+
std::sort(ac.providedTIMs.begin(), ac.providedTIMs.end(), outputSpecLessThan);
527+
// Use ranges::to<std::vector<>> in C++23...
528+
ac.analysisCCDBInputs.clear();
529+
ac.requestedTIMs | views::filter_not_matching(ac.providedTIMs) | sinks::append_to(ac.analysisCCDBInputs);
530+
531+
// recreate inputs and outputs
532+
analysisCCDB->outputs.clear();
533+
analysisCCDB->inputs.clear();
534+
// replace AlgorithmSpec
535+
// FIXME: it should be made more generic, so it does not need replacement...
536+
analysisCCDB->algorithm = PluginManager::loadAlgorithmFromPlugin("O2FrameworkCCDBSupport", "AnalysisCCDBFetcherPlugin", ctx);
537+
AnalysisSupportHelpers::addMissingOutputsToAnalysisCCDBFetcher(ac.providedTIMs, ac.requestedTIMs, *analysisCCDB);
538+
}
539+
514540
if (writer != workflow.end()) {
515541
workflow.erase(writer);
516542
}
@@ -538,6 +564,8 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
538564
}
539565
}
540566

567+
568+
541569
// replace writer as some outputs may have become dangling and some are now consumed
542570
auto [outputsInputs, isDangling] = WorkflowHelpers::analyzeOutputs(workflow);
543571

0 commit comments

Comments
 (0)