From 3a25c5897b0affc894d25b7612e2c1dc6715ab7c Mon Sep 17 00:00:00 2001 From: shahoian Date: Fri, 5 Sep 2025 01:48:56 +0200 Subject: [PATCH] With -condition-use-slice-for-prescaling use TF slice instead of TFcounter for CCDB cache validation is N!=0 If --condition-tf-per-query-multiplier value is negative, the prescaling is simply applied to tfCounter%|query_rate| (or timeslice%|query_rate| if --condition-use-slice-for-prescaling is asked) If N>0, then enforce a check if the abs difference between the last checked and current TFCounters (not slices!) exceeds N, even if the slices difference is less than the requested check rate. --- .../CCDBSupport/src/CCDBFetcherHelper.cxx | 28 +++++++++++-- Framework/CCDBSupport/src/CCDBFetcherHelper.h | 2 + Framework/CCDBSupport/src/CCDBHelpers.cxx | 41 +++++++++++++++---- Framework/Core/src/WorkflowHelpers.cxx | 6 ++- 4 files changed, 62 insertions(+), 15 deletions(-) diff --git a/Framework/CCDBSupport/src/CCDBFetcherHelper.cxx b/Framework/CCDBSupport/src/CCDBFetcherHelper.cxx index 14c3fefb31024..92aff08a26032 100644 --- a/Framework/CCDBSupport/src/CCDBFetcherHelper.cxx +++ b/Framework/CCDBSupport/src/CCDBFetcherHelper.cxx @@ -51,10 +51,20 @@ void CCDBFetcherHelper::initialiseHelper(CCDBFetcherHelper& helper, ConfigParamR auto defHost = options.get("condition-backend"); auto checkRate = options.get("condition-tf-per-query"); auto checkMult = options.get("condition-tf-per-query-multiplier"); + helper.useTFSlice = options.get("condition-use-slice-for-prescaling"); helper.timeToleranceMS = options.get("condition-time-tolerance"); helper.queryPeriodGlo = checkRate > 0 ? checkRate : std::numeric_limits::max(); - helper.queryPeriodFactor = checkMult > 0 ? checkMult : 1; - LOGP(info, "CCDB Backend at: {}, validity check for every {} TF{}", defHost, helper.queryPeriodGlo, helper.queryPeriodFactor == 1 ? std::string{} : fmt::format(", (query for high-rate objects downscaled by {})", helper.queryPeriodFactor)); + helper.queryPeriodFactor = checkMult == 0 ? 1 : checkMult; + std::string extraCond{}; + if (helper.useTFSlice) { + extraCond = ". Use TFSlice"; + if (helper.useTFSlice > 0) { + extraCond += fmt::format(" + max TFcounter jump <= {}", helper.useTFSlice); + } + } + LOGP(info, "CCDB Backend at: {}, validity check for every {} TF{}{}", defHost, helper.queryPeriodGlo, + helper.queryPeriodFactor == 1 ? std::string{} : (helper.queryPeriodFactor > 0 ? fmt::format(", (query for high-rate objects downscaled by {})", helper.queryPeriodFactor) : fmt::format(", (query downscaled as TFcounter%{})", -helper.queryPeriodFactor)), + extraCond); LOGP(info, "Hook to enable signposts for CCDB messages at {}", (void*)&private_o2_log_ccdb->stacktrace); auto remapString = options.get("condition-remap"); ParserResult result = parseRemappings(remapString.c_str()); @@ -205,12 +215,21 @@ auto CCDBFetcherHelper::populateCacheWith(std::shared_ptr con // If timestamp is before the time the element was cached or after the claimed validity, we need to check validity, again // when online. bool cacheExpired = (validUntil <= timestampToUse) || (op.timestamp < cachePopulatedAt); - checkValidity = (std::abs(int(timingInfo.tfCounter - url2uuid->second.lastCheckedTF)) >= chRate) && (isOnline || cacheExpired); + if (isOnline || cacheExpired) { + if (!helper->useTFSlice) { + checkValidity = chRate > 0 ? (std::abs(int(timingInfo.tfCounter - url2uuid->second.lastCheckedTF)) >= chRate) : (timingInfo.tfCounter % -chRate) == 0; + } else { + checkValidity = chRate > 0 ? (std::abs(int(timingInfo.timeslice - url2uuid->second.lastCheckedSlice)) >= chRate) : (timingInfo.timeslice % -chRate) == 0; + if (!checkValidity && helper->useTFSlice > std::abs(chRate)) { // make sure the interval is tolerated unless the check rate itself is too large + checkValidity = std::abs(int(timingInfo.tfCounter) - url2uuid->second.lastCheckedTF) > helper->useTFSlice; + } + } + } } else { checkValidity = true; // never skip check if the cache is empty } - O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "checkValidity is %{public}s for tfID %d of %{public}s", checkValidity ? "true" : "false", timingInfo.tfCounter, path.data()); + O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "checkValidity is %{public}s for tf%{public}s %d of %{public}s", checkValidity ? "true" : "false", helper->useTFSlice ? "ID" : "Slice", helper->useTFSlice ? timingInfo.timeslice : timingInfo.tfCounter, path.data()); const auto& api = helper->getAPI(path); if (checkValidity && (!api.isSnapshotMode() || etag.empty())) { // in the snapshot mode the object needs to be fetched only once @@ -226,6 +245,7 @@ auto CCDBFetcherHelper::populateCacheWith(std::shared_ptr con LOGP(detail, "******** Default entry used for {} ********", path); } helper->mapURL2UUID[path].lastCheckedTF = timingInfo.tfCounter; + helper->mapURL2UUID[path].lastCheckedSlice = timingInfo.timeslice; if (etag.empty()) { helper->mapURL2UUID[path].etag = headers["ETag"]; // update uuid helper->mapURL2UUID[path].cachePopulatedAt = timestampToUse; diff --git a/Framework/CCDBSupport/src/CCDBFetcherHelper.h b/Framework/CCDBSupport/src/CCDBFetcherHelper.h index e3453b48bf156..1778712f45002 100644 --- a/Framework/CCDBSupport/src/CCDBFetcherHelper.h +++ b/Framework/CCDBSupport/src/CCDBFetcherHelper.h @@ -33,6 +33,7 @@ struct CCDBFetcherHelper { size_t minSize = -1ULL; size_t maxSize = 0; int lastCheckedTF = 0; + int lastCheckedSlice = 0; }; struct RemapMatcher { @@ -94,6 +95,7 @@ struct CCDBFetcherHelper { int queryPeriodGlo = 1; int queryPeriodFactor = 1; int64_t timeToleranceMS = 5000; + int useTFSlice = 0; // if non-zero, use TFslice instead of TFcounter for the validity check. If > requested checking rate, add additional check on |lastTFchecked - TCcounter|<=useTFSlice o2::ccdb::CcdbApi& getAPI(const std::string& path); static void initialiseHelper(CCDBFetcherHelper& helper, ConfigParamRegistry const& options); diff --git a/Framework/CCDBSupport/src/CCDBHelpers.cxx b/Framework/CCDBSupport/src/CCDBHelpers.cxx index acf8b782f8f06..d303308df0c82 100644 --- a/Framework/CCDBSupport/src/CCDBHelpers.cxx +++ b/Framework/CCDBSupport/src/CCDBHelpers.cxx @@ -39,6 +39,7 @@ struct CCDBFetcherHelper { size_t minSize = -1ULL; size_t maxSize = 0; int lastCheckedTF = 0; + int lastCheckedSlice = 0; }; struct RemapMatcher { @@ -60,6 +61,7 @@ struct CCDBFetcherHelper { int queryPeriodGlo = 1; int queryPeriodFactor = 1; int64_t timeToleranceMS = 5000; + int useTFSlice = 0; // if non-zero, use TFslice instead of TFcounter for the validity check. If > requested checking rate, add additional check on |lastTFchecked - TCcounter|<=useTFSlice o2::ccdb::CcdbApi& getAPI(const std::string& path) { @@ -165,10 +167,20 @@ void initialiseHelper(CCDBFetcherHelper& helper, ConfigParamRegistry const& opti auto defHost = options.get("condition-backend"); auto checkRate = options.get("condition-tf-per-query"); auto checkMult = options.get("condition-tf-per-query-multiplier"); + helper.useTFSlice = options.get("condition-use-slice-for-prescaling"); helper.timeToleranceMS = options.get("condition-time-tolerance"); helper.queryPeriodGlo = checkRate > 0 ? checkRate : std::numeric_limits::max(); - helper.queryPeriodFactor = checkMult > 0 ? checkMult : 1; - LOGP(info, "CCDB Backend at: {}, validity check for every {} TF{}", defHost, helper.queryPeriodGlo, helper.queryPeriodFactor == 1 ? std::string{} : fmt::format(", (query for high-rate objects downscaled by {})", helper.queryPeriodFactor)); + helper.queryPeriodFactor = checkMult == 0 ? 1 : checkMult; + std::string extraCond{}; + if (helper.useTFSlice) { + extraCond = ". Use TFSlice"; + if (helper.useTFSlice > 0) { + extraCond += fmt::format(" + max TFcounter jump <= {}", helper.useTFSlice); + } + } + LOGP(info, "CCDB Backend at: {}, validity check for every {} TF{}{}", defHost, helper.queryPeriodGlo, + helper.queryPeriodFactor == 1 ? std::string{} : (helper.queryPeriodFactor > 0 ? fmt::format(", (query for high-rate objects downscaled by {})", helper.queryPeriodFactor) : fmt::format(", (query downscaled as TFcounter%{})", -helper.queryPeriodFactor)), + extraCond); LOGP(info, "Hook to enable signposts for CCDB messages at {}", (void*)&private_o2_log_ccdb->stacktrace); auto remapString = options.get("condition-remap"); CCDBHelpers::ParserResult result = CCDBHelpers::parseRemappings(remapString.c_str()); @@ -276,7 +288,7 @@ auto populateCacheWith(std::shared_ptr const& helper, O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "Adding metadata %{public}s: %{public}s to the request", key.data(), value.data()); metadata[key] = value; } else if (meta.name == "ccdb-query-rate") { - chRate = meta.defaultValue.get() * helper->queryPeriodFactor; + chRate = std::max(1, meta.defaultValue.get()) * helper->queryPeriodFactor; } } const auto url2uuid = helper->mapURL2UUID.find(path); @@ -289,12 +301,21 @@ auto populateCacheWith(std::shared_ptr const& helper, // If timestamp is before the time the element was cached or after the claimed validity, we need to check validity, again // when online. bool cacheExpired = (validUntil <= timestampToUse) || (timestamp < cachePopulatedAt); - checkValidity = (std::abs(int(timingInfo.tfCounter - url2uuid->second.lastCheckedTF)) >= chRate) && (isOnline || cacheExpired); + if (isOnline || cacheExpired) { + if (!helper->useTFSlice) { + checkValidity = chRate > 0 ? (std::abs(int(timingInfo.tfCounter - url2uuid->second.lastCheckedTF)) >= chRate) : (timingInfo.tfCounter % -chRate) == 0; + } else { + checkValidity = chRate > 0 ? (std::abs(int(timingInfo.timeslice - url2uuid->second.lastCheckedSlice)) >= chRate) : (timingInfo.timeslice % -chRate) == 0; + if (!checkValidity && helper->useTFSlice > std::abs(chRate)) { // make sure the interval is tolerated unless the check rate itself is too large + checkValidity = std::abs(int(timingInfo.tfCounter) - url2uuid->second.lastCheckedTF) > helper->useTFSlice; + } + } + } } else { checkValidity = true; // never skip check if the cache is empty } - O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "checkValidity is %{public}s for tfID %d of %{public}s", checkValidity ? "true" : "false", timingInfo.tfCounter, path.data()); + O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "checkValidity is %{public}s for tf%{public}s %d of %{public}s", checkValidity ? "true" : "false", helper->useTFSlice ? "ID" : "Slice", helper->useTFSlice ? timingInfo.timeslice : timingInfo.tfCounter, path.data()); const auto& api = helper->getAPI(path); if (checkValidity && (!api.isSnapshotMode() || etag.empty())) { // in the snapshot mode the object needs to be fetched only once @@ -310,6 +331,7 @@ auto populateCacheWith(std::shared_ptr const& helper, LOGP(detail, "******** Default entry used for {} ********", path); } helper->mapURL2UUID[path].lastCheckedTF = timingInfo.tfCounter; + helper->mapURL2UUID[path].lastCheckedSlice = timingInfo.timeslice; if (etag.empty()) { helper->mapURL2UUID[path].etag = headers["ETag"]; // update uuid helper->mapURL2UUID[path].cachePopulatedAt = timestampToUse; @@ -382,21 +404,22 @@ AlgorithmSpec CCDBHelpers::fetchFromCCDB() std::map metadata; std::map headers; std::string etag; - bool checkValidity = std::abs(int(timingInfo.tfCounter - helper->lastCheckedTFCounterOrbReset)) >= helper->queryPeriodGlo; + int32_t counter = helper->useTFSlice ? timingInfo.timeslice : timingInfo.tfCounter; + bool checkValidity = std::abs(int(counter - helper->lastCheckedTFCounterOrbReset)) >= helper->queryPeriodGlo; const auto url2uuid = helper->mapURL2UUID.find(path); if (url2uuid != helper->mapURL2UUID.end()) { etag = url2uuid->second.etag; } else { checkValidity = true; // never skip check if the cache is empty } - O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "fetchFromCCDB", "checkValidity is %{public}s for tfID %d of %{public}s", - checkValidity ? "true" : "false", timingInfo.tfCounter, path.data()); + O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "fetchFromCCDB", "checkValidity is %{public}s for tf%{public}s %d of %{public}s", + checkValidity ? "true" : "false", helper->useTFSlice ? "ID" : "Slice", counter, path.data()); Output output{"CTP", "OrbitReset", 0}; Long64_t newOrbitResetTime = orbitResetTime; auto&& v = allocator.makeVector(output); const auto& api = helper->getAPI(path); if (checkValidity && (!api.isSnapshotMode() || etag.empty())) { // in the snapshot mode the object needs to be fetched only once - helper->lastCheckedTFCounterOrbReset = timingInfo.tfCounter; + helper->lastCheckedTFCounterOrbReset = counter; api.loadFileToMemory(v, path, metadata, timingInfo.creation, &headers, etag, helper->createdNotAfter, helper->createdNotBefore); if ((headers.count("Error") != 0) || (etag.empty() && v.empty())) { LOGP(fatal, "Unable to find CCDB object {}/{}", path, timingInfo.creation); diff --git a/Framework/Core/src/WorkflowHelpers.cxx b/Framework/Core/src/WorkflowHelpers.cxx index 6eda838070f6d..36583035c41ff 100644 --- a/Framework/Core/src/WorkflowHelpers.cxx +++ b/Framework/Core/src/WorkflowHelpers.cxx @@ -177,7 +177,8 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext {"condition-not-after", VariantType::Int64, 3385078236000ll, {"do not fetch from CCDB objects created after the timestamp"}}, {"condition-remap", VariantType::String, "", {"remap condition path in CCDB based on the provided string."}}, {"condition-tf-per-query", VariantType::Int, defaultConditionQueryRate(), {"check condition validity per requested number of TFs, fetch only once if <=0"}}, - {"condition-tf-per-query-multiplier", VariantType::Int, defaultConditionQueryRateMultiplier(), {"check conditions once per this amount of nominal checks"}}, + {"condition-tf-per-query-multiplier", VariantType::Int, defaultConditionQueryRateMultiplier(), {"check conditions once per this amount of nominal checks (>0) or on module of TFcounter (<0)"}}, + {"condition-use-slice-for-prescaling", VariantType::Int, 0, {"use TFslice instead of TFcounter to control validation frequency. If > query rate, do not allow TFCounter excursion exceeding it"}}, {"condition-time-tolerance", VariantType::Int64, 5000ll, {"prefer creation time if its difference to orbit-derived time exceeds threshold (ms), impose if <0"}}, {"orbit-offset-enumeration", VariantType::Int64, 0ll, {"initial value for the orbit"}}, {"orbit-multiplier-enumeration", VariantType::Int64, 0ll, {"multiplier to get the orbit from the counter"}}, @@ -195,7 +196,8 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext {"condition-not-after", VariantType::Int64, 3385078236000ll, {"do not fetch from CCDB objects created after the timestamp"}}, {"condition-remap", VariantType::String, "", {"remap condition path in CCDB based on the provided string."}}, {"condition-tf-per-query", VariantType::Int, defaultConditionQueryRate(), {"check condition validity per requested number of TFs, fetch only once if <=0"}}, - {"condition-tf-per-query-multiplier", VariantType::Int, defaultConditionQueryRateMultiplier(), {"check conditions once per this amount of nominal checks"}}, + {"condition-tf-per-query-multiplier", VariantType::Int, defaultConditionQueryRateMultiplier(), {"check conditions once per this amount of nominal checks (>0) or on module of TFcounter (<0)"}}, + {"condition-use-slice-for-prescaling", VariantType::Int, 0, {"use TFslice instead of TFcounter to control validation frequency. If > query rate, do not allow TFCounter excursion exceeding it"}}, {"condition-time-tolerance", VariantType::Int64, 5000ll, {"prefer creation time if its difference to orbit-derived time exceeds threshold (ms), impose if <0"}}, {"start-value-enumeration", VariantType::Int64, 0ll, {"initial value for the enumeration"}}, {"end-value-enumeration", VariantType::Int64, -1ll, {"final value for the enumeration"}},