diff --git a/Framework/Core/src/ArrowSupport.cxx b/Framework/Core/src/ArrowSupport.cxx index 932c1fdacacfb..da00c8db42280 100644 --- a/Framework/Core/src/ArrowSupport.cxx +++ b/Framework/Core/src/ArrowSupport.cxx @@ -33,6 +33,7 @@ #include "Framework/ServiceRegistryRef.h" #include "Framework/ServiceRegistryHelpers.h" #include "Framework/Signpost.h" +#include "Framework/DefaultsHelpers.h" #include "CommonMessageBackendsHelpers.h" #include @@ -65,7 +66,7 @@ enum struct RateLimitingState { struct RateLimitConfig { int64_t maxMemory = 2000; - int64_t maxTimeframes = 1; + int64_t maxTimeframes = 1000; }; struct MetricIndices { @@ -524,7 +525,7 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() if (dc.options.count("timeframes-rate-limit") && dc.options["timeframes-rate-limit"].defaulted() == false) { config->maxTimeframes = std::stoll(dc.options["timeframes-rate-limit"].as()); } else { - config->maxTimeframes = readers; + config->maxTimeframes = readers * DefaultsHelpers::pipelineLength(); } static bool once = false; // Until we guarantee this is called only once... diff --git a/Framework/Core/src/CommonDataProcessors.cxx b/Framework/Core/src/CommonDataProcessors.cxx index 4b5f317f58063..5d99fd3db7578 100644 --- a/Framework/Core/src/CommonDataProcessors.cxx +++ b/Framework/Core/src/CommonDataProcessors.cxx @@ -268,12 +268,16 @@ AlgorithmSpec CommonDataProcessors::wrapWithRateLimiting(AlgorithmSpec spec) return PluginManager::wrapAlgorithm(spec, [](AlgorithmSpec::ProcessCallback& original, ProcessingContext& pcx) -> void { auto& raw = pcx.services().get(); static RateLimiter limiter; + O2_SIGNPOST_ID_FROM_POINTER(sid, rate_limiting, &pcx); auto limit = std::stoi(raw.device()->fConfig->GetValue("timeframes-rate-limit")); - LOG(detail) << "Rate limiting to " << limit << " timeframes in flight"; + O2_SIGNPOST_EVENT_EMIT_DETAIL(rate_limiting, sid, "rate limiting callback", + "Rate limiting to %d timeframes in flight", limit); limiter.check(pcx, limit, 2000); - LOG(detail) << "Rate limiting passed. Invoking old callback"; + O2_SIGNPOST_EVENT_EMIT_DETAIL(rate_limiting, sid, "rate limiting callback", + "Rate limiting passed. Invoking old callback."); original(pcx); - LOG(detail) << "Rate limited callback done"; + O2_SIGNPOST_EVENT_EMIT_DETAIL(rate_limiting, sid, "rate limiting callback", + "Rate limited callback done."); }); }