diff --git a/plugins/slice/Config.cc b/plugins/slice/Config.cc index e60d894b7a4..b95ace3bab0 100644 --- a/plugins/slice/Config.cc +++ b/plugins/slice/Config.cc @@ -36,6 +36,7 @@ Config::~Config() if (nullptr != m_regex) { delete m_regex; } + prefetchCleanup(); } int64_t @@ -387,3 +388,36 @@ Config::sizeCacheRemove(std::string_view url) m_oscache->remove(url); } } + +std::pair +Config::prefetchAcquire(const std::string &key) +{ + std::lock_guard const guard(m_prefetch_mutex); + auto [it, inserted] = m_prefetch_active.insert(key); + + if (!inserted) { + return {false, nullptr}; + } + + BgBlockFetch *bg = nullptr; + + if (!m_prefetch_freelist.empty()) { + bg = m_prefetch_freelist.back(); + m_prefetch_freelist.pop_back(); + } + + return {true, bg}; +} + +#if defined(UNITTEST) +// Stubs for unit tests that don't link prefetch.cc +void +Config::prefetchRelease(BgBlockFetch *) +{ +} + +void +Config::prefetchCleanup() +{ +} +#endif diff --git a/plugins/slice/Config.h b/plugins/slice/Config.h index 4d57ea10cc2..4fd15a2506b 100644 --- a/plugins/slice/Config.h +++ b/plugins/slice/Config.h @@ -25,6 +25,10 @@ #include #include +#include +#include + +struct BgBlockFetch; // Data Structures and Classes struct Config { @@ -79,6 +83,10 @@ struct Config { // Did we cache this internally as a small object? bool isKnownLargeObj(std::string_view url); + // Prefetch dedup and freelist + std::pair prefetchAcquire(const std::string &key); + void prefetchRelease(BgBlockFetch *bg); + // Metadata cache stats std::string stat_prefix{}; int stat_TP{0}, stat_TN{0}, stat_FP{0}, stat_FN{0}, stat_no_cl{0}, stat_bad_cl{0}, stat_no_url{0}; @@ -89,4 +97,9 @@ struct Config { std::mutex m_mutex; std::optional m_oscache; void setCacheSize(size_t entries); + + std::mutex m_prefetch_mutex; + std::unordered_set m_prefetch_active; + std::vector m_prefetch_freelist; + void prefetchCleanup(); }; diff --git a/plugins/slice/prefetch.cc b/plugins/slice/prefetch.cc index 5e5f229566c..50f7875c734 100644 --- a/plugins/slice/prefetch.cc +++ b/plugins/slice/prefetch.cc @@ -25,16 +25,61 @@ #include "prefetch.h" bool -BgBlockFetch::schedule(Data *const data, int blocknum) +BgBlockFetch::schedule(Data *const data, int blocknum, std::string_view url) { - bool ret = false; - BgBlockFetch *bg = new BgBlockFetch(blocknum); + std::string key = std::string(url) + ':' + std::to_string(blocknum); + auto [acquired, bg] = data->m_config->prefetchAcquire(key); + + if (!acquired) { + DEBUG_LOG("Prefetch already in flight for block %d, skipping", blocknum); + return false; + } + + // Nothing on the freelist, so make a new object + if (!bg) { + bg = new BgBlockFetch(); + } + + bg->m_blocknum = blocknum; + bg->m_config = data->m_config; + bg->m_key = std::move(key); + if (bg->fetch(data)) { - ret = true; + return true; } else { + bg->m_config->prefetchRelease(bg); + return false; + } +} + +void +BgBlockFetch::clear() +{ + m_blocknum = 0; + m_cont = nullptr; + m_config = nullptr; + m_key.clear(); +} + +void +Config::prefetchRelease(BgBlockFetch *bg) +{ + std::lock_guard const guard(m_prefetch_mutex); + + m_prefetch_active.erase(bg->m_key); + bg->clear(); + m_prefetch_freelist.push_back(bg); +} + +void +Config::prefetchCleanup() +{ + std::lock_guard const guard(m_prefetch_mutex); + + for (auto *bg : m_prefetch_freelist) { delete bg; } - return ret; + m_prefetch_freelist.clear(); } /** @@ -132,15 +177,15 @@ BgBlockFetch::handler(TSCont contp, TSEvent event, void * /* edata ATS_UNUSED */ case TS_EVENT_ERROR: bg->m_stream.abort(); TSContDataSet(contp, nullptr); - delete bg; TSContDestroy(contp); + bg->m_config->prefetchRelease(bg); break; case TS_EVENT_VCONN_READ_COMPLETE: case TS_EVENT_VCONN_EOS: bg->m_stream.close(); TSContDataSet(contp, nullptr); - delete bg; TSContDestroy(contp); + bg->m_config->prefetchRelease(bg); break; default: DEBUG_LOG("Unhandled bg fetch event:%s (%d)", TSHttpEventNameLookup(event), event); diff --git a/plugins/slice/prefetch.h b/plugins/slice/prefetch.h index 261d1db700e..4a0ce972b6d 100644 --- a/plugins/slice/prefetch.h +++ b/plugins/slice/prefetch.h @@ -23,7 +23,8 @@ #pragma once -#include +#include +#include #include "ts/ts.h" #include "Data.h" @@ -33,15 +34,18 @@ * @brief Represents a single background fetch. */ struct BgBlockFetch { - static bool schedule(Data *const data, int blocknum); + static bool schedule(Data *const data, int blocknum, std::string_view url); - explicit BgBlockFetch(int blocknum) : m_blocknum(blocknum) {} + BgBlockFetch() = default; bool fetch(Data *const data); static int handler(TSCont contp, TSEvent event, void * /* edata ATS_UNUSED */); + void clear(); /* This is for the actual background fetch / NetVC */ - Stage m_stream; - int m_blocknum; - TSCont m_cont = nullptr; + Stage m_stream; + int m_blocknum{0}; + TSCont m_cont{nullptr}; + Config *m_config{nullptr}; + std::string m_key; }; diff --git a/plugins/slice/server.cc b/plugins/slice/server.cc index f723af38d0f..a9f3e3e7340 100644 --- a/plugins/slice/server.cc +++ b/plugins/slice/server.cc @@ -603,6 +603,8 @@ handle_server_resp(TSCont contp, TSEvent event, Data *const data) data->m_blockskip = data->m_req_range.skipBytesForBlock(data->m_config->m_blockbytes, data->m_blocknum); } break; } + + schedule_prefetch(data); } transfer_content_bytes(data); diff --git a/plugins/slice/unit-tests/test_config.cc b/plugins/slice/unit-tests/test_config.cc index 4610f409071..ac565824721 100644 --- a/plugins/slice/unit-tests/test_config.cc +++ b/plugins/slice/unit-tests/test_config.cc @@ -80,6 +80,26 @@ TEST_CASE("config bytesfrom invalid parsing", "[AWS][slice][utility]") } } +TEST_CASE("prefetchAcquire deduplication", "[AWS][slice][utility]") +{ + Config config; + + // Acquiring a new key should succeed with no freelist item. + auto [acquired1, bg1] = config.prefetchAcquire("http://example.com/file:0"); + CHECK(acquired1 == true); + CHECK(bg1 == nullptr); + + // Acquiring the same key again should fail (dedup). + auto [acquired2, bg2] = config.prefetchAcquire("http://example.com/file:0"); + CHECK(acquired2 == false); + CHECK(bg2 == nullptr); + + // A distinct key should succeed independently. + auto [acquired3, bg3] = config.prefetchAcquire("http://example.com/file:1"); + CHECK(acquired3 == true); + CHECK(bg3 == nullptr); +} + TEST_CASE("config fromargs validate sizes", "[AWS][slice][utility]") { char const *const appname = "slice.so"; diff --git a/plugins/slice/util.cc b/plugins/slice/util.cc index 579aac447a2..6792a1969b5 100644 --- a/plugins/slice/util.cc +++ b/plugins/slice/util.cc @@ -45,6 +45,41 @@ abort(TSCont const contp, Data *const data) TSContDestroy(contp); } +void +schedule_prefetch(Data *const data) +{ + if (!data->m_prefetchable || data->m_config->m_prefetchcount <= 0) { + return; + } + + int urllen = 0; + char *const urlstr = TSUrlStringGet(data->m_urlbuf, data->m_urlloc, &urllen); + + if (urlstr == nullptr || urllen <= 0) { + TSfree(urlstr); + return; + } + + std::string_view const url(urlstr, urllen); + int nextblocknum = data->m_blocknum + 1; + + if (data->m_blocknum > data->m_req_range.firstBlockFor(data->m_config->m_blockbytes) + 1) { + nextblocknum = data->m_blocknum + data->m_config->m_prefetchcount; + } + + for (int i = nextblocknum; i <= data->m_blocknum + data->m_config->m_prefetchcount; i++) { + if (data->m_req_range.blockIsInside(data->m_config->m_blockbytes, i)) { + if (BgBlockFetch::schedule(data, i, url)) { + DEBUG_LOG("Background fetch requested"); + } else { + DEBUG_LOG("Background fetch not requested"); + } + } + } + + TSfree(urlstr); +} + // create and issue a block request bool request_block(TSCont contp, Data *const data) @@ -151,22 +186,11 @@ request_block(TSCont contp, Data *const data) DEBUG_LOG("Headers\n%s", headerstr.c_str()); } - // if prefetch config set, schedule next block requests in background - if (data->m_prefetchable && data->m_config->m_prefetchcount > 0) { - int nextblocknum = data->m_blocknum + 1; - if (data->m_blocknum > data->m_req_range.firstBlockFor(data->m_config->m_blockbytes) + 1) { - nextblocknum = data->m_blocknum + data->m_config->m_prefetchcount; - } - for (int i = nextblocknum; i <= data->m_blocknum + data->m_config->m_prefetchcount; i++) { - if (data->m_req_range.blockIsInside(data->m_config->m_blockbytes, i)) { - if (BgBlockFetch::schedule(data, i)) { - DEBUG_LOG("Background fetch requested"); - } else { - DEBUG_LOG("Background fetch not requested"); - } - } - } + // Extend prefetch sliding window past the initial burst + if (data->m_blocknum > data->m_req_range.firstBlockFor(data->m_config->m_blockbytes) + 1) { + schedule_prefetch(data); } + // get ready for data back from the server data->m_upstream.setupVioRead(contp, INT64_MAX); diff --git a/plugins/slice/util.h b/plugins/slice/util.h index 9da6f368a3b..7c0c57e5cd9 100644 --- a/plugins/slice/util.h +++ b/plugins/slice/util.h @@ -33,4 +33,6 @@ void abort(TSCont const contp, Data *const data); bool request_block(TSCont contp, Data *const data); +void schedule_prefetch(Data *const data); + bool reader_avail_more_than(TSIOBufferReader const reader, int64_t bytes);