Skip to content

Commit 44b00c4

Browse files
committed
Implemented Vit's comments
1 parent d87cff5 commit 44b00c4

6 files changed

Lines changed: 115 additions & 132 deletions

File tree

DataFormats/Detectors/TPC/include/DataFormatsTPC/CMV.h

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -85,11 +85,13 @@ struct Data {
8585
return positive ? magnitude : -magnitude;
8686
}
8787

88-
// Encode from float: clamps magnitude to 15 bits, range ±255.992
88+
// Encode from float: truncates magnitude to 15 bits, range ±255.992
8989
void setCMVFloat(float value)
9090
{
9191
const bool positive = (value >= 0.f);
92-
const uint16_t magnitude = static_cast<uint16_t>(std::abs(value) * 128.f + 0.5f) & 0x7FFF;
92+
const uint16_t magnitude = static_cast<uint16_t>(
93+
std::lround(std::abs(value) * 128.f)) &
94+
0x7FFF;
9395
cmv = (positive ? 0x8000 : 0x0000) | magnitude;
9496
}
9597
};
@@ -119,4 +121,4 @@ struct Container {
119121

120122
} // namespace o2::tpc::cmv
121123

122-
#endif
124+
#endif

Detectors/TPC/workflow/include/TPCWorkflow/CMVToVectorSpec.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ namespace o2::tpc
2323

2424
/// create a processor spec
2525
/// convert CMV raw values to a vector in a CRU
26-
o2::framework::DataProcessorSpec getCMVToVectorSpec(const std::string inputSpec, std::vector<uint32_t> const& crus);
26+
o2::framework::DataProcessorSpec getCMVToVectorSpec(std::string const& inputSpec, std::vector<uint32_t> const& crus);
2727

2828
} // end namespace o2::tpc
2929

Detectors/TPC/workflow/include/TPCWorkflow/TPCAggregateCMVSpec.h

Lines changed: 50 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,6 @@
4949
#include "CommonUtils/StringUtils.h"
5050
#include "DetectorsCommonDataFormats/FileMetaData.h"
5151

52-
using namespace o2::framework;
53-
using o2::header::gDataOriginTPC;
54-
5552
namespace o2::tpc
5653
{
5754

@@ -114,7 +111,7 @@ class TPCAggregateCMVDevice : public o2::framework::Task
114111
initIntervalTree();
115112
}
116113

117-
void finaliseCCDB(ConcreteDataMatcher& matcher, void* obj) final
114+
void finaliseCCDB(o2::framework::ConcreteDataMatcher& matcher, void* obj) final
118115
{
119116
o2::base::GRPGeomHelper::instance().finaliseCCDB(matcher, obj);
120117
}
@@ -136,7 +133,7 @@ class TPCAggregateCMVDevice : public o2::framework::Task
136133
}
137134

138135
if (mSetDataTakingCont) {
139-
mDataTakingContext = pc.services().get<DataTakingContext>();
136+
mDataTakingContext = pc.services().get<o2::framework::DataTakingContext>();
140137
mSetDataTakingCont = false;
141138
}
142139

@@ -147,7 +144,7 @@ class TPCAggregateCMVDevice : public o2::framework::Task
147144
const auto currTF = processing_helpers::getCurrentTF(pc);
148145

149146
if (mTFFirst == -1) {
150-
for (auto& ref : InputRecordWalker(pc.inputs(), mFirstTFFilter)) {
147+
for (auto& ref : o2::framework::InputRecordWalker(pc.inputs(), mFirstTFFilter)) {
151148
mTFFirst = pc.inputs().get<long>(ref);
152149
mIntervalFirstTF = mTFFirst;
153150
mHasIntervalFirstTF = true;
@@ -203,7 +200,7 @@ class TPCAggregateCMVDevice : public o2::framework::Task
203200
// Capture orbit info first so setTimestampCCDB can use the measured stride
204201
if (!mOrbitInfoSeen[relTF]) {
205202
// all CRUs within a batch carry identical timing, so the first one is sufficient
206-
for (auto& ref : InputRecordWalker(pc.inputs(), mOrbitFilter)) {
203+
for (auto& ref : o2::framework::InputRecordWalker(pc.inputs(), mOrbitFilter)) {
207204
mOrbitInfo[relTF] = pc.inputs().get<uint64_t>(ref);
208205
const auto batchFirstOrbit = static_cast<uint32_t>(mOrbitInfo[relTF] >> 32);
209206
// TimingInfo.firstTForbit is the orbit of the last real TF in the batch (the TF that triggered the FLP to send).
@@ -222,8 +219,8 @@ class TPCAggregateCMVDevice : public o2::framework::Task
222219
setTimestampCCDB(relTF, mOrbitStep[relTF], pc);
223220
}
224221

225-
for (auto& ref : InputRecordWalker(pc.inputs(), mFilter)) {
226-
auto const* hdr = DataRefUtils::getHeader<o2::header::DataHeader*>(ref);
222+
for (auto& ref : o2::framework::InputRecordWalker(pc.inputs(), mFilter)) {
223+
auto const* hdr = o2::framework::DataRefUtils::getHeader<o2::header::DataHeader*>(ref);
227224
const unsigned int cru = hdr->subSpecification;
228225
if (!(std::binary_search(mCRUs.begin(), mCRUs.end(), cru))) {
229226
LOGP(debug, "Received CMV data from CRU {} which is not part of this aggregate lane", cru);
@@ -233,7 +230,7 @@ class TPCAggregateCMVDevice : public o2::framework::Task
233230
continue;
234231
}
235232

236-
auto cmvVec = pc.inputs().get<pmr::vector<uint16_t>>(ref);
233+
auto cmvVec = pc.inputs().get<o2::pmr::vector<uint16_t>>(ref);
237234
mRawCMVs[relTF][cru] = std::vector<uint16_t>(cmvVec.begin(), cmvVec.end());
238235
mProcessedCRUs[relTF][cru] = true;
239236
++mProcessedCRU[relTF];
@@ -257,7 +254,7 @@ class TPCAggregateCMVDevice : public o2::framework::Task
257254
materializeBufferedTFs(true);
258255
materializeEOSBuffer();
259256
sendOutput(ec.outputs());
260-
ec.services().get<ControlService>().readyToQuit(QuitRequest::Me);
257+
ec.services().get<o2::framework::ControlService>().readyToQuit(o2::framework::QuitRequest::Me);
261258
}
262259

263260
static constexpr header::DataDescription getDataDescriptionCCDBCMV() { return header::DataDescription{"TPC_CMV"}; }
@@ -314,18 +311,18 @@ class TPCAggregateCMVDevice : public o2::framework::Task
314311
std::unique_ptr<TTree> mIntervalTree{}; ///< in-memory TTree accumulating one entry per real TF; serialised to CCDB/disk at interval end
315312
CMVPerTF mCurrentTF{}; ///< staging object written to the TTree branch for the uncompressed path
316313
CMVPerTFCompressed mCurrentCompressedTF{}; ///< staging object written to the TTree branch when any compression flags are set
317-
const std::vector<InputSpec> mFilter{
314+
const std::vector<o2::framework::InputSpec> mFilter{
318315
{"cmvagg",
319-
ConcreteDataTypeMatcher{gDataOriginTPC, TPCDistributeCMVSpec::getDataDescriptionCMV(mLaneId)},
320-
Lifetime::Sporadic}};
321-
const std::vector<InputSpec> mOrbitFilter{
316+
o2::framework::ConcreteDataTypeMatcher{o2::header::gDataOriginTPC, TPCDistributeCMVSpec::getDataDescriptionCMV(mLaneId)},
317+
o2::framework::Lifetime::Sporadic}};
318+
const std::vector<o2::framework::InputSpec> mOrbitFilter{
322319
{"cmvorbit",
323-
ConcreteDataMatcher{gDataOriginTPC, TPCDistributeCMVSpec::getDataDescriptionCMVOrbitInfo(mLaneId), header::DataHeader::SubSpecificationType{static_cast<unsigned int>(mLaneId)}},
324-
Lifetime::Sporadic}};
325-
const std::vector<InputSpec> mFirstTFFilter{
320+
o2::framework::ConcreteDataMatcher{o2::header::gDataOriginTPC, TPCDistributeCMVSpec::getDataDescriptionCMVOrbitInfo(mLaneId), header::DataHeader::SubSpecificationType{static_cast<unsigned int>(mLaneId)}},
321+
o2::framework::Lifetime::Sporadic}};
322+
const std::vector<o2::framework::InputSpec> mFirstTFFilter{
326323
{"firstTF",
327-
ConcreteDataMatcher{gDataOriginTPC, TPCDistributeCMVSpec::getDataDescriptionCMVFirstTF(), header::DataHeader::SubSpecificationType{static_cast<unsigned int>(mLaneId)}},
328-
Lifetime::Sporadic}};
324+
o2::framework::ConcreteDataMatcher{o2::header::gDataOriginTPC, TPCDistributeCMVSpec::getDataDescriptionCMVFirstTF(), header::DataHeader::SubSpecificationType{static_cast<unsigned int>(mLaneId)}},
325+
o2::framework::Lifetime::Sporadic}};
329326

330327
uint8_t buildCompressionFlags() const
331328
{
@@ -360,21 +357,21 @@ class TPCAggregateCMVDevice : public o2::framework::Task
360357
void collectEOSInputs(o2::framework::ProcessingContext& pc)
361358
{
362359
if (mEOSFirstOrbit == 0) {
363-
for (auto& ref : InputRecordWalker(pc.inputs(), mOrbitFilter)) {
360+
for (auto& ref : o2::framework::InputRecordWalker(pc.inputs(), mOrbitFilter)) {
364361
const auto orbitBC = pc.inputs().get<uint64_t>(ref);
365362
mEOSFirstOrbit = static_cast<uint32_t>(orbitBC >> 32);
366363
mEOSFirstBC = static_cast<uint16_t>(orbitBC & 0xFFFFu);
367364
break;
368365
}
369366
}
370367

371-
for (auto& ref : InputRecordWalker(pc.inputs(), mFilter)) {
372-
auto const* hdr = DataRefUtils::getHeader<o2::header::DataHeader*>(ref);
368+
for (auto& ref : o2::framework::InputRecordWalker(pc.inputs(), mFilter)) {
369+
auto const* hdr = o2::framework::DataRefUtils::getHeader<o2::header::DataHeader*>(ref);
373370
const unsigned int cru = hdr->subSpecification;
374371
if (!(std::binary_search(mCRUs.begin(), mCRUs.end(), cru))) {
375372
continue;
376373
}
377-
auto cmvVec = pc.inputs().get<pmr::vector<uint16_t>>(ref);
374+
auto cmvVec = pc.inputs().get<o2::pmr::vector<uint16_t>>(ref);
378375
auto& buffer = mEOSRawCMVs[cru];
379376
buffer.insert(buffer.end(), cmvVec.begin(), cmvVec.end());
380377
}
@@ -548,7 +545,7 @@ class TPCAggregateCMVDevice : public o2::framework::Task
548545
}
549546
}
550547

551-
void sendOutput(DataAllocator& output)
548+
void sendOutput(o2::framework::DataAllocator& output)
552549
{
553550
using timer = std::chrono::high_resolution_clock;
554551

@@ -619,8 +616,8 @@ class TPCAggregateCMVDevice : public o2::framework::Task
619616
}
620617

621618
LOGP(info, "Sending object {} / {} of size {} bytes, valid for {} : {}", ccdbInfoCMV.getPath(), ccdbInfoCMV.getFileName(), image->size(), ccdbInfoCMV.getStartValidityTimestamp(), ccdbInfoCMV.getEndValidityTimestamp());
622-
output.snapshot(Output{o2::calibration::Utils::gDataOriginCDBPayload, getDataDescriptionCCDBCMV(), 0}, *image);
623-
output.snapshot(Output{o2::calibration::Utils::gDataOriginCDBWrapper, getDataDescriptionCCDBCMV(), 0}, ccdbInfoCMV);
619+
output.snapshot(o2::framework::Output{o2::calibration::Utils::gDataOriginCDBPayload, getDataDescriptionCCDBCMV(), 0}, *image);
620+
output.snapshot(o2::framework::Output{o2::calibration::Utils::gDataOriginCDBWrapper, getDataDescriptionCCDBCMV(), 0}, ccdbInfoCMV);
624621

625622
auto stop = timer::now();
626623
std::chrono::duration<float> elapsed = stop - start;
@@ -666,25 +663,25 @@ class TPCAggregateCMVDevice : public o2::framework::Task
666663

667664
/// Build a DataProcessorSpec for one aggregate lane
668665
/// Each lane receives CMV data from one distribute output lane (matched by lane index) and expects the full CRU list — the distribute stage already routes per-CRU data to the correct lane
669-
inline DataProcessorSpec getTPCAggregateCMVSpec(const int lane,
670-
const std::vector<uint32_t>& crus,
671-
const unsigned int timeframes,
672-
const bool sendCCDB,
673-
const bool usePreciseTimestamp,
674-
const int nTFsBuffer = 1)
666+
inline o2::framework::DataProcessorSpec getTPCAggregateCMVSpec(const int lane,
667+
const std::vector<uint32_t>& crus,
668+
const unsigned int timeframes,
669+
const bool sendCCDB,
670+
const bool usePreciseTimestamp,
671+
const int nTFsBuffer = 1)
675672
{
676-
std::vector<OutputSpec> outputSpecs;
673+
std::vector<o2::framework::OutputSpec> outputSpecs;
677674
if (sendCCDB) {
678-
outputSpecs.emplace_back(ConcreteDataTypeMatcher{o2::calibration::Utils::gDataOriginCDBPayload, TPCAggregateCMVDevice::getDataDescriptionCCDBCMV()}, Lifetime::Sporadic);
679-
outputSpecs.emplace_back(ConcreteDataTypeMatcher{o2::calibration::Utils::gDataOriginCDBWrapper, TPCAggregateCMVDevice::getDataDescriptionCCDBCMV()}, Lifetime::Sporadic);
675+
outputSpecs.emplace_back(o2::framework::ConcreteDataTypeMatcher{o2::calibration::Utils::gDataOriginCDBPayload, TPCAggregateCMVDevice::getDataDescriptionCCDBCMV()}, o2::framework::Lifetime::Sporadic);
676+
outputSpecs.emplace_back(o2::framework::ConcreteDataTypeMatcher{o2::calibration::Utils::gDataOriginCDBWrapper, TPCAggregateCMVDevice::getDataDescriptionCCDBCMV()}, o2::framework::Lifetime::Sporadic);
680677
}
681678

682-
std::vector<InputSpec> inputSpecs;
683-
inputSpecs.emplace_back(InputSpec{"cmvagg", ConcreteDataTypeMatcher{gDataOriginTPC, TPCDistributeCMVSpec::getDataDescriptionCMV(lane)}, Lifetime::Sporadic});
684-
inputSpecs.emplace_back(InputSpec{"cmvorbit", gDataOriginTPC, TPCDistributeCMVSpec::getDataDescriptionCMVOrbitInfo(lane), header::DataHeader::SubSpecificationType{static_cast<unsigned int>(lane)}, Lifetime::Sporadic});
685-
inputSpecs.emplace_back(InputSpec{"firstTF", gDataOriginTPC, TPCDistributeCMVSpec::getDataDescriptionCMVFirstTF(), header::DataHeader::SubSpecificationType{static_cast<unsigned int>(lane)}, Lifetime::Sporadic});
679+
std::vector<o2::framework::InputSpec> inputSpecs;
680+
inputSpecs.emplace_back(o2::framework::InputSpec{"cmvagg", o2::framework::ConcreteDataTypeMatcher{o2::header::gDataOriginTPC, TPCDistributeCMVSpec::getDataDescriptionCMV(lane)}, o2::framework::Lifetime::Sporadic});
681+
inputSpecs.emplace_back(o2::framework::InputSpec{"cmvorbit", o2::header::gDataOriginTPC, TPCDistributeCMVSpec::getDataDescriptionCMVOrbitInfo(lane), header::DataHeader::SubSpecificationType{static_cast<unsigned int>(lane)}, o2::framework::Lifetime::Sporadic});
682+
inputSpecs.emplace_back(o2::framework::InputSpec{"firstTF", o2::header::gDataOriginTPC, TPCDistributeCMVSpec::getDataDescriptionCMVFirstTF(), header::DataHeader::SubSpecificationType{static_cast<unsigned int>(lane)}, o2::framework::Lifetime::Sporadic});
686683
if (usePreciseTimestamp) {
687-
inputSpecs.emplace_back(InputSpec{"orbitreset", gDataOriginTPC, TPCDistributeCMVSpec::getDataDescriptionCMVOrbitReset(), header::DataHeader::SubSpecificationType{static_cast<unsigned int>(lane)}, Lifetime::Sporadic});
684+
inputSpecs.emplace_back(o2::framework::InputSpec{"orbitreset", o2::header::gDataOriginTPC, TPCDistributeCMVSpec::getDataDescriptionCMVOrbitReset(), header::DataHeader::SubSpecificationType{static_cast<unsigned int>(lane)}, o2::framework::Lifetime::Sporadic});
688685
}
689686

690687
// Request GRPECS from CCDB so that GRPGeomHelper::getNHBFPerTF() is valid in this (separate) process
@@ -696,21 +693,21 @@ inline DataProcessorSpec getTPCAggregateCMVSpec(const int lane,
696693
o2::base::GRPGeomRequest::None, // geometry
697694
inputSpecs);
698695

699-
DataProcessorSpec spec{
696+
o2::framework::DataProcessorSpec spec{
700697
fmt::format("tpc-aggregate-cmv-{:02}", lane).data(),
701698
inputSpecs,
702699
outputSpecs,
703-
AlgorithmSpec{adaptFromTask<TPCAggregateCMVDevice>(lane, crus, timeframes, sendCCDB, usePreciseTimestamp, nTFsBuffer, ccdbRequest)},
704-
Options{{"output-dir", VariantType::String, "/dev/null", {"CMV output directory, must exist (if not /dev/null)"}},
705-
{"meta-output-dir", VariantType::String, "/dev/null", {"calibration metadata output directory, must exist (if not /dev/null)"}},
706-
{"nthreads-compression", VariantType::Int, 1, {"Number of threads used for CMV per timeframe preprocessing and compression"}},
707-
{"use-sparse", VariantType::Bool, false, {"Sparse encoding (skip zero time bins). Alone: raw uint16 values. With --use-compression-varint: varint exact values. With --use-compression-huffman: Huffman exact values"}},
708-
{"use-compression-varint", VariantType::Bool, false, {"Delta+zigzag+varint compression (all values). Combined with --use-sparse: sparse positions + varint encoded exact CMV values"}},
709-
{"use-compression-huffman", VariantType::Bool, false, {"Huffman encoding. Combined with --use-sparse: sparse positions + Huffman-encoded exact CMV values"}},
710-
{"cmv-zero-threshold", VariantType::Float, 0.f, {"Zero out CMV values whose float magnitude is below this threshold after optional integer rounding and before compression; 0 disables"}},
711-
{"cmv-round-integers-threshold", VariantType::Int, 0, {"Round values to nearest integer ADC for |v| <= N ADC before compression; 0 disables"}},
712-
{"cmv-dynamic-precision-mean", VariantType::Float, 1.f, {"Gaussian centre in |CMV| ADC where the strongest fractional bit trimming is applied"}},
713-
{"cmv-dynamic-precision-sigma", VariantType::Float, 0.f, {"Gaussian width in ADC for smooth CMV fractional bit trimming; 0 disables"}}}};
700+
o2::framework::AlgorithmSpec{o2::framework::adaptFromTask<TPCAggregateCMVDevice>(lane, crus, timeframes, sendCCDB, usePreciseTimestamp, nTFsBuffer, ccdbRequest)},
701+
o2::framework::Options{{"output-dir", o2::framework::VariantType::String, "/dev/null", {"CMV output directory, must exist (if not /dev/null)"}},
702+
{"meta-output-dir", o2::framework::VariantType::String, "/dev/null", {"calibration metadata output directory, must exist (if not /dev/null)"}},
703+
{"nthreads-compression", o2::framework::VariantType::Int, 1, {"Number of threads used for CMV per timeframe preprocessing and compression"}},
704+
{"use-sparse", o2::framework::VariantType::Bool, false, {"Sparse encoding (skip zero time bins). Alone: raw uint16 values. With --use-compression-varint: varint exact values. With --use-compression-huffman: Huffman exact values"}},
705+
{"use-compression-varint", o2::framework::VariantType::Bool, false, {"Delta+zigzag+varint compression (all values). Combined with --use-sparse: sparse positions + varint encoded exact CMV values"}},
706+
{"use-compression-huffman", o2::framework::VariantType::Bool, false, {"Huffman encoding. Combined with --use-sparse: sparse positions + Huffman-encoded exact CMV values"}},
707+
{"cmv-zero-threshold", o2::framework::VariantType::Float, 0.f, {"Zero out CMV values whose float magnitude is below this threshold after optional integer rounding and before compression; 0 disables"}},
708+
{"cmv-round-integers-threshold", o2::framework::VariantType::Int, 0, {"Round values to nearest integer ADC for |v| <= N ADC before compression; 0 disables"}},
709+
{"cmv-dynamic-precision-mean", o2::framework::VariantType::Float, 1.f, {"Gaussian centre in |CMV| ADC where the strongest fractional bit trimming is applied"}},
710+
{"cmv-dynamic-precision-sigma", o2::framework::VariantType::Float, 0.f, {"Gaussian width in ADC for smooth CMV fractional bit trimming; 0 disables"}}}};
714711
spec.rank = lane;
715712
return spec;
716713
}

0 commit comments

Comments
 (0)