diff --git a/CCDB/include/CCDB/BasicCCDBManager.h b/CCDB/include/CCDB/BasicCCDBManager.h index b76beb2576eb3..6ad645a0a893e 100644 --- a/CCDB/include/CCDB/BasicCCDBManager.h +++ b/CCDB/include/CCDB/BasicCCDBManager.h @@ -55,6 +55,7 @@ class CCDBManagerInstance long endvalidity = -1; long cacheValidFrom = 0; // time for which the object was cached long cacheValidUntil = -1; // object is guaranteed to be valid till this time (modulo new updates) + size_t size = 0; size_t minSize = -1ULL; size_t maxSize = 0; int queries = 0; @@ -229,6 +230,7 @@ class CCDBManagerInstance long mCreatedNotBefore = 0; // lower limit for object creation timestamp (TimeMachine mode) - If-Not-Before HTTP header long mTimerMS = 0; // timer for queries size_t mFetchedSize = 0; // total fetched size + size_t mRequestedSize = 0; // total requested size (fetched + served from cache) int mQueries = 0; // total number of object queries int mFetches = 0; // total number of succesful fetches from CCDB int mFailures = 0; // total number of failed fetches @@ -258,6 +260,7 @@ T* CCDBManagerInstance::getForTimeStamp(std::string const& path, long timestamp, if (sh != mHeaders.end()) { size_t s = atol(sh->second.c_str()); mFetchedSize += s; + mRequestedSize += s; } } @@ -272,6 +275,7 @@ T* CCDBManagerInstance::getForTimeStamp(std::string const& path, long timestamp, if (headers) { *headers = cached.cacheOfHeaders; } + mRequestedSize += cached.size; return reinterpret_cast(cached.noCleanupPtr ? cached.noCleanupPtr : cached.objPtr.get()); } ptr = mCCDBAccessor.retrieveFromTFileAny(path, mMetaData, timestamp, &mHeaders, cached.uuid, @@ -318,6 +322,8 @@ T* CCDBManagerInstance::getForTimeStamp(std::string const& path, long timestamp, if (sh != mHeaders.end()) { size_t s = atol(sh->second.c_str()); mFetchedSize += s; + mRequestedSize += s; + cached.size = s; cached.minSize = std::min(s, cached.minSize); cached.maxSize = std::max(s, cached.minSize); } @@ -342,12 +348,14 @@ T* CCDBManagerInstance::getForTimeStamp(std::string const& path, long timestamp, } auto end = std::chrono::system_clock::now(); mTimerMS += std::chrono::duration_cast(end - start).count(); - auto *ref = o2::framework::ServiceRegistryRef::globalDeviceRef(); + auto* ref = o2::framework::ServiceRegistryRef::globalDeviceRef(); if (ref && ref->active()) { auto& stats = ref->get(); stats.updateStats({(int)o2::framework::ProcessingStatsId::CCDB_CACHE_HIT, o2::framework::DataProcessingStats::Op::Set, (int64_t)mQueries - mFailures - mFetches}); stats.updateStats({(int)o2::framework::ProcessingStatsId::CCDB_CACHE_MISS, o2::framework::DataProcessingStats::Op::Set, (int64_t)mFetches}); stats.updateStats({(int)o2::framework::ProcessingStatsId::CCDB_CACHE_FAILURE, o2::framework::DataProcessingStats::Op::Set, (int64_t)mFailures}); + stats.updateStats({(int)o2::framework::ProcessingStatsId::CCDB_CACHE_FETCHED_BYTES, o2::framework::DataProcessingStats::Op::Set, (int64_t)mFetchedSize}); + stats.updateStats({(int)o2::framework::ProcessingStatsId::CCDB_CACHE_REQUESTED_BYTES, o2::framework::DataProcessingStats::Op::Set, (int64_t)mRequestedSize}); } return ptr; } diff --git a/CCDB/src/BasicCCDBManager.cxx b/CCDB/src/BasicCCDBManager.cxx index d55fdad960d3a..53055d0d1231d 100644 --- a/CCDB/src/BasicCCDBManager.cxx +++ b/CCDB/src/BasicCCDBManager.cxx @@ -101,7 +101,7 @@ std::pair CCDBManagerInstance::getRunDuration(int runnumber, b std::string CCDBManagerInstance::getSummaryString() const { - std::string res = fmt::format("{} queries, {} bytes", mQueries, fmt::group_digits(mFetchedSize)); + std::string res = fmt::format("{} queries, {} fetched / {} requested bytes", mQueries, fmt::group_digits(mFetchedSize), fmt::group_digits(mRequestedSize)); if (mCachingEnabled) { res += fmt::format(" for {} objects", mCache.size()); } diff --git a/Framework/CCDBSupport/src/AnalysisCCDBHelpers.cxx b/Framework/CCDBSupport/src/AnalysisCCDBHelpers.cxx index 413adfddecf04..3892f200645f6 100644 --- a/Framework/CCDBSupport/src/AnalysisCCDBHelpers.cxx +++ b/Framework/CCDBSupport/src/AnalysisCCDBHelpers.cxx @@ -11,6 +11,7 @@ #include "AnalysisCCDBHelpers.h" #include "CCDBFetcherHelper.h" +#include "Framework/DataProcessingStats.h" #include "Framework/DeviceSpec.h" #include "Framework/TimingInfo.h" #include "Framework/ConfigParamRegistry.h" @@ -105,7 +106,7 @@ AlgorithmSpec AnalysisCCDBHelpers::fetchFromCCDB(ConfigContext const& /*ctx*/) std::unordered_map bindings; fillValidRoutes(*helper, spec.outputs, bindings); - return adaptStateless([schemas, bindings, helper](InputRecord& inputs, DataTakingContext& dtc, DataAllocator& allocator, TimingInfo& timingInfo) { + return adaptStateless([schemas, bindings, helper](InputRecord& inputs, DataTakingContext& dtc, DataAllocator& allocator, TimingInfo& timingInfo, DataProcessingStats& stats) { O2_SIGNPOST_ID_GENERATE(sid, ccdb); O2_SIGNPOST_START(ccdb, sid, "fetchFromAnalysisCCDB", "Fetching CCDB objects for analysis%" PRIu64, (uint64_t)timingInfo.timeslice); for (auto& schema : schemas) { @@ -182,6 +183,8 @@ AlgorithmSpec AnalysisCCDBHelpers::fetchFromCCDB(ConfigContext const& /*ctx*/) allocator.adopt(Output{concrete.origin, concrete.description, concrete.subSpec}, outTable); } + stats.updateStats({(int)ProcessingStatsId::CCDB_CACHE_FETCHED_BYTES, DataProcessingStats::Op::Set, (int64_t)helper->totalFetchedBytes}); + stats.updateStats({(int)ProcessingStatsId::CCDB_CACHE_REQUESTED_BYTES, DataProcessingStats::Op::Set, (int64_t)helper->totalRequestedBytes}); O2_SIGNPOST_END(ccdb, sid, "fetchFromAnalysisCCDB", "Fetching CCDB objects"); }); }); diff --git a/Framework/CCDBSupport/src/CCDBFetcherHelper.cxx b/Framework/CCDBSupport/src/CCDBFetcherHelper.cxx index 151703105a1dd..8d50dac63a67b 100644 --- a/Framework/CCDBSupport/src/CCDBFetcherHelper.cxx +++ b/Framework/CCDBSupport/src/CCDBFetcherHelper.cxx @@ -254,6 +254,8 @@ auto CCDBFetcherHelper::populateCacheWith(std::shared_ptr con helper->mapURL2UUID[path].minSize = std::min(v.size(), helper->mapURL2UUID[path].minSize); helper->mapURL2UUID[path].maxSize = std::max(v.size(), helper->mapURL2UUID[path].maxSize); auto size = v.size(); + helper->totalFetchedBytes += size; + helper->totalRequestedBytes += size; api.appendFlatHeader(v, headers); auto cacheId = allocator.adoptContainer(output, std::move(v), DataAllocator::CacheStrategy::Always, header::gSerializationMethodCCDB); helper->mapURL2DPLCache[path] = cacheId; @@ -271,6 +273,8 @@ auto CCDBFetcherHelper::populateCacheWith(std::shared_ptr con helper->mapURL2UUID[path].minSize = std::min(v.size(), helper->mapURL2UUID[path].minSize); helper->mapURL2UUID[path].maxSize = std::max(v.size(), helper->mapURL2UUID[path].maxSize); auto size = v.size(); + helper->totalFetchedBytes += size; + helper->totalRequestedBytes += size; api.appendFlatHeader(v, headers); auto cacheId = allocator.adoptContainer(output, std::move(v), DataAllocator::CacheStrategy::Always, header::gSerializationMethodCCDB); helper->mapURL2DPLCache[path] = cacheId; diff --git a/Framework/CCDBSupport/src/CCDBFetcherHelper.h b/Framework/CCDBSupport/src/CCDBFetcherHelper.h index 1778712f45002..a6cb3f70215af 100644 --- a/Framework/CCDBSupport/src/CCDBFetcherHelper.h +++ b/Framework/CCDBSupport/src/CCDBFetcherHelper.h @@ -84,6 +84,8 @@ struct CCDBFetcherHelper { static ParserResult parseRemappings(char const*); + size_t totalFetchedBytes = 0; + size_t totalRequestedBytes = 0; std::unordered_map mapURL2UUID; std::unordered_map mapURL2DPLCache; std::string createdNotBefore = "0"; diff --git a/Framework/CCDBSupport/src/CCDBHelpers.cxx b/Framework/CCDBSupport/src/CCDBHelpers.cxx index 80a79796f8c1b..fd78594e365bf 100644 --- a/Framework/CCDBSupport/src/CCDBHelpers.cxx +++ b/Framework/CCDBSupport/src/CCDBHelpers.cxx @@ -11,6 +11,7 @@ #include "CCDBHelpers.h" #include "Framework/DeviceSpec.h" +#include "Framework/DataProcessingStats.h" #include "Framework/Logger.h" #include "Framework/TimingInfo.h" #include "Framework/ConfigParamRegistry.h" @@ -28,7 +29,8 @@ O2_DECLARE_DYNAMIC_LOG(ccdb); namespace o2::framework { -namespace { +namespace +{ struct CCDBFetcherHelper { struct CCDBCacheInfo { std::string etag; @@ -36,6 +38,7 @@ struct CCDBFetcherHelper { size_t cachePopulatedAt = 0; size_t cacheMiss = 0; size_t cacheHit = 0; + size_t size = 0; size_t minSize = -1ULL; size_t maxSize = 0; int lastCheckedTF = 0; @@ -50,6 +53,8 @@ struct CCDBFetcherHelper { std::string url; }; + size_t totalFetchedBytes = 0; + size_t totalRequestedBytes = 0; std::unordered_map mapURL2UUID; std::unordered_map mapURL2DPLCache; std::string createdNotBefore = "0"; @@ -80,7 +85,7 @@ struct CCDBFetcherHelper { return apis[entry == remappings.end() ? "" : entry->second]; } }; -} +} // namespace bool isPrefix(std::string_view prefix, std::string_view full) { @@ -336,8 +341,11 @@ auto populateCacheWith(std::shared_ptr const& helper, helper->mapURL2UUID[path].etag = headers["ETag"]; // update uuid helper->mapURL2UUID[path].cachePopulatedAt = timestampToUse; helper->mapURL2UUID[path].cacheMiss++; + helper->mapURL2UUID[path].size = v.size(); helper->mapURL2UUID[path].minSize = std::min(v.size(), helper->mapURL2UUID[path].minSize); helper->mapURL2UUID[path].maxSize = std::max(v.size(), helper->mapURL2UUID[path].maxSize); + helper->totalFetchedBytes += v.size(); + helper->totalRequestedBytes += v.size(); api.appendFlatHeader(v, headers); auto cacheId = allocator.adoptContainer(output, std::move(v), DataAllocator::CacheStrategy::Always, header::gSerializationMethodCCDB); helper->mapURL2DPLCache[path] = cacheId; @@ -350,8 +358,11 @@ auto populateCacheWith(std::shared_ptr const& helper, helper->mapURL2UUID[path].cachePopulatedAt = timestampToUse; helper->mapURL2UUID[path].cacheValidUntil = headers["Cache-Valid-Until"].empty() ? 0 : std::stoul(headers["Cache-Valid-Until"]); helper->mapURL2UUID[path].cacheMiss++; + helper->mapURL2UUID[path].size = v.size(); helper->mapURL2UUID[path].minSize = std::min(v.size(), helper->mapURL2UUID[path].minSize); helper->mapURL2UUID[path].maxSize = std::max(v.size(), helper->mapURL2UUID[path].maxSize); + helper->totalFetchedBytes += v.size(); + helper->totalRequestedBytes += v.size(); api.appendFlatHeader(v, headers); auto cacheId = allocator.adoptContainer(output, std::move(v), DataAllocator::CacheStrategy::Always, header::gSerializationMethodCCDB); helper->mapURL2DPLCache[path] = cacheId; @@ -368,6 +379,7 @@ auto populateCacheWith(std::shared_ptr const& helper, auto cacheId = helper->mapURL2DPLCache[path]; O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "Reusing %{public}s for %{public}s (DPL id %" PRIu64 ")", path.data(), headers["ETag"].data(), cacheId.value); helper->mapURL2UUID[path].cacheHit++; + helper->totalRequestedBytes += helper->mapURL2UUID[path].size; allocator.adoptFromCache(output, cacheId, header::gSerializationMethodCCDB); // the outputBuffer was not used, can we destroy it? } @@ -382,13 +394,13 @@ AlgorithmSpec CCDBHelpers::fetchFromCCDB() /// Add a callback on stop which dumps the statistics for the caching per /// path callbacks.set([helper]() { - LOGP(info, "CCDB cache miss/hit ratio:"); + LOGP(info, "CCDB cache miss/hit ratio ({} fetched / {} requested bytes):", helper->totalFetchedBytes, helper->totalRequestedBytes); for (auto& entry : helper->mapURL2UUID) { LOGP(info, " {}: {}/{} ({}-{} bytes)", entry.first, entry.second.cacheMiss, entry.second.cacheHit, entry.second.minSize, entry.second.maxSize); } }); - return adaptStateless([helper](DataTakingContext& dtc, DataAllocator& allocator, TimingInfo& timingInfo) { + return adaptStateless([helper](DataTakingContext& dtc, DataAllocator& allocator, TimingInfo& timingInfo, DataProcessingStats& stats) { auto sid = _o2_signpost_id_t{(int64_t)timingInfo.timeslice}; O2_SIGNPOST_START(ccdb, sid, "fetchFromCCDB", "Fetching CCDB objects for timeslice %" PRIu64, (uint64_t)timingInfo.timeslice); static Long64_t orbitResetTime = -1; @@ -429,8 +441,11 @@ AlgorithmSpec CCDBHelpers::fetchFromCCDB() if (etag.empty()) { helper->mapURL2UUID[path].etag = headers["ETag"]; // update uuid helper->mapURL2UUID[path].cacheMiss++; + helper->mapURL2UUID[path].size = v.size(); helper->mapURL2UUID[path].minSize = std::min(v.size(), helper->mapURL2UUID[path].minSize); helper->mapURL2UUID[path].maxSize = std::max(v.size(), helper->mapURL2UUID[path].maxSize); + helper->totalFetchedBytes += v.size(); + helper->totalRequestedBytes += v.size(); newOrbitResetTime = getOrbitResetTime(v); api.appendFlatHeader(v, headers); auto cacheId = allocator.adoptContainer(output, std::move(v), DataAllocator::CacheStrategy::Always, header::gSerializationMethodNone); @@ -440,8 +455,11 @@ AlgorithmSpec CCDBHelpers::fetchFromCCDB() // somewhere here pruneFromCache should be called helper->mapURL2UUID[path].etag = headers["ETag"]; // update uuid helper->mapURL2UUID[path].cacheMiss++; + helper->mapURL2UUID[path].size = v.size(); helper->mapURL2UUID[path].minSize = std::min(v.size(), helper->mapURL2UUID[path].minSize); helper->mapURL2UUID[path].maxSize = std::max(v.size(), helper->mapURL2UUID[path].maxSize); + helper->totalFetchedBytes += v.size(); + helper->totalRequestedBytes += v.size(); newOrbitResetTime = getOrbitResetTime(v); api.appendFlatHeader(v, headers); auto cacheId = allocator.adoptContainer(output, std::move(v), DataAllocator::CacheStrategy::Always, header::gSerializationMethodNone); @@ -455,6 +473,7 @@ AlgorithmSpec CCDBHelpers::fetchFromCCDB() auto cacheId = helper->mapURL2DPLCache[path]; O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "fetchFromCCDB", "Reusing %{public}s for %{public}s (DPL id %" PRIu64 ")", path.data(), headers["ETag"].data(), cacheId.value); helper->mapURL2UUID[path].cacheHit++; + helper->totalRequestedBytes += helper->mapURL2UUID[path].size; allocator.adoptFromCache(output, cacheId, header::gSerializationMethodNone); if (newOrbitResetTime != orbitResetTime) { @@ -480,6 +499,8 @@ AlgorithmSpec CCDBHelpers::fetchFromCCDB() dtc.runNumber.data(), orbitResetTime, timingInfo.creation, timestamp, timingInfo.firstTForbit); populateCacheWith(helper, timestamp, timingInfo, dtc, allocator); + stats.updateStats({(int)ProcessingStatsId::CCDB_CACHE_FETCHED_BYTES, DataProcessingStats::Op::Set, (int64_t)helper->totalFetchedBytes}); + stats.updateStats({(int)ProcessingStatsId::CCDB_CACHE_REQUESTED_BYTES, DataProcessingStats::Op::Set, (int64_t)helper->totalRequestedBytes}); O2_SIGNPOST_END(ccdb, _o2_signpost_id_t{(int64_t)timingInfo.timeslice}, "fetchFromCCDB", "Fetching CCDB objects"); }); }); } diff --git a/Framework/Core/include/Framework/DataProcessingStats.h b/Framework/Core/include/Framework/DataProcessingStats.h index a1f5c0eec5568..edb04c4c5f752 100644 --- a/Framework/Core/include/Framework/DataProcessingStats.h +++ b/Framework/Core/include/Framework/DataProcessingStats.h @@ -72,6 +72,8 @@ enum struct ProcessingStatsId : short { CCDB_CACHE_HIT, CCDB_CACHE_MISS, CCDB_CACHE_FAILURE, + CCDB_CACHE_FETCHED_BYTES, + CCDB_CACHE_REQUESTED_BYTES, AVAILABLE_MANAGED_SHM_BASE = 512, }; diff --git a/Framework/Core/src/CommonServices.cxx b/Framework/Core/src/CommonServices.cxx index 06bc7969ebf1e..0f53f5a6be5a1 100644 --- a/Framework/Core/src/CommonServices.cxx +++ b/Framework/Core/src/CommonServices.cxx @@ -1176,6 +1176,22 @@ o2::framework::ServiceSpec CommonServices::dataProcessingStats() .scope = Scope::DPL, .minPublishInterval = 1000, .maxRefreshLatency = 10000, + .sendInitialValue = true}, + MetricSpec{.name = "ccdb-cache-fetched-bytes", + .enabled = true, + .metricId = static_cast(ProcessingStatsId::CCDB_CACHE_FETCHED_BYTES), + .kind = Kind::UInt64, + .scope = Scope::DPL, + .minPublishInterval = 1000, + .maxRefreshLatency = 10000, + .sendInitialValue = true}, + MetricSpec{.name = "ccdb-cache-requested-bytes", + .enabled = true, + .metricId = static_cast(ProcessingStatsId::CCDB_CACHE_REQUESTED_BYTES), + .kind = Kind::UInt64, + .scope = Scope::DPL, + .minPublishInterval = 1000, + .maxRefreshLatency = 10000, .sendInitialValue = true}}; for (auto& metric : metrics) {