diff --git a/internal/metadatadb/memory.go b/internal/metadatadb/memory.go index 9a32a37..b42af46 100644 --- a/internal/metadatadb/memory.go +++ b/internal/metadatadb/memory.go @@ -46,8 +46,13 @@ func (m *MemoryBackend) Apply(_ context.Context, namespace string, ops ...Op) er func (m *MemoryBackend) Query(_ context.Context, namespace string, q ReadOp, target any) error { m.mu.RLock() - defer m.mu.RUnlock() - result := queryState(m.ns(namespace), q) + // Do not lazy-create the namespace under a read lock — that races with + // concurrent Apply on a different namespace. queryState handles a nil + // map by returning zero values, which is the right answer for a + // namespace that has never been written. + ns := m.state[namespace] + result := queryState(ns, q) + m.mu.RUnlock() return errors.Wrap(jsonUnmarshalInto(result, target), "memory query") } diff --git a/internal/strategy/git/git.go b/internal/strategy/git/git.go index 261b3d3..d1f6fd3 100644 --- a/internal/strategy/git/git.go +++ b/internal/strategy/git/git.go @@ -49,8 +49,11 @@ type Config struct { RepackInterval time.Duration `hcl:"repack-interval,optional" help:"How often to run full repack. 0 disables." default:"0"` ZstdThreads int `hcl:"zstd-threads,optional" help:"Threads for zstd compression/decompression. 0 = all CPU cores; useful for short-lived CLI invocations but risky on a long-running server where multiple snapshot/restore operations can run concurrently." default:"4"` BundleCacheTTL time.Duration `hcl:"bundle-cache-ttl,optional" help:"TTL of cached server-side git bundles." default:"2h"` + PrewarmMostClonedRepos int `hcl:"prewarm-most-cloned-repos,optional" help:"Number of most-cloned repositories to warm in the background after startup. 0 disables." default:"0"` } +const prewarmPopularityWindowDays = 14 + type Strategy struct { config Config cache cache.Cache @@ -69,6 +72,7 @@ type Strategy struct { metrics *gitMetrics repoCounts *RepoCounts ready atomic.Bool + existingWarmedCh chan struct{} // closed when warmExistingRepos returns; used to sequence background prewarm } func New( @@ -129,15 +133,16 @@ func New( m := newGitMetrics() s := &Strategy{ - config: config, - cache: cache, - cloneManager: cloneManager, - httpClient: http.DefaultClient, - ctx: ctx, - scheduler: scheduler.WithQueuePrefix("git"), - spools: make(map[string]*RepoSpools), - tokenManager: tokenManager, - metrics: m, + config: config, + cache: cache, + cloneManager: cloneManager, + httpClient: http.DefaultClient, + ctx: ctx, + scheduler: scheduler.WithQueuePrefix("git"), + spools: make(map[string]*RepoSpools), + tokenManager: tokenManager, + metrics: m, + existingWarmedCh: make(chan struct{}), } // Run startup fetches in the background so the HTTP listener (and // /_liveness) come up immediately. /_readiness gates on Ready() so the @@ -148,6 +153,7 @@ func New( logger.WarnContext(warmCtx, "Failed to warm existing repos", "error", err) } s.ready.Store(true) + close(s.existingWarmedCh) logger.InfoContext(warmCtx, "Git strategy ready") }() @@ -199,12 +205,22 @@ func (s *Strategy) Ready() bool { // SetMetadataStore enables the per-repo clone histogram and schedules its // daily reaper. Called by config.Load after the metadata backend is built. +// When PrewarmMostClonedRepos > 0, also kicks off a background pass that +// warms the top-N most-cloned repos so that subsequent requests (including +// cold-S3-cache snapshot generation and wire-protocol traffic for those +// repos) hit a warm local mirror. The pass does not gate Ready(): cold-path +// fallbacks already serve traffic correctly while warming runs. func (s *Strategy) SetMetadataStore(store *metadatadb.Store) { + logger := logging.FromContext(s.ctx) if store == nil { + if s.config.PrewarmMostClonedRepos > 0 { + logger.WarnContext(s.ctx, "prewarm-most-cloned-repos is set but no metadata store was provided; "+ + "background prewarm is disabled (misconfiguration)") + } return } s.repoCounts = NewRepoCounts(store.Namespace("git")) - logging.FromContext(s.ctx).InfoContext(s.ctx, "Per-repo clone histogram enabled", + logger.InfoContext(s.ctx, "Per-repo clone histogram enabled", "retention_days", s.repoCounts.retentionDays) s.scheduler.SubmitPeriodicJob("repo-counts-reaper", "reap-repo-counts", defaultRepoCountsReapInterval, func(ctx context.Context) error { if deleted := s.repoCounts.Reap(); deleted > 0 { @@ -212,6 +228,71 @@ func (s *Strategy) SetMetadataStore(store *metadatadb.Store) { } return nil }) + if s.config.PrewarmMostClonedRepos > 0 { + go func() { + warmCtx := context.WithoutCancel(s.ctx) + s.prewarmMostCloned(warmCtx) + logging.FromContext(warmCtx).InfoContext(warmCtx, "Background prewarm pass complete") + }() + } +} + +// prewarmMostCloned warms the most-cloned repositories in the background after +// startup. Runs after warmExistingRepos so the two passes don't race on the +// same mirror directories. +// +// Per-repo failures and an empty histogram are logged and otherwise ignored: +// this is an optimization, not a correctness invariant. Cold-path fallbacks in +// the snapshot/bundle/wire-protocol handlers already serve traffic correctly +// for un-warmed repos. +// +// TODO: bound concurrency (currently serial). +func (s *Strategy) prewarmMostCloned(ctx context.Context) { + logger := logging.FromContext(ctx) + select { + case <-ctx.Done(): + return + case <-s.existingWarmedCh: + } + top := s.repoCounts.TopRepos(prewarmPopularityWindowDays, s.config.PrewarmMostClonedRepos) + if len(top) == 0 { + logger.WarnContext(ctx, "prewarm-most-cloned-repos is set but the clone histogram is empty; nothing to warm") + return + } + logger.InfoContext(ctx, "Prewarming most-cloned repositories", "count", len(top)) + for _, rc := range top { + if ctx.Err() != nil { + return + } + start := time.Now() + if err := s.prewarmRepo(ctx, rc.Repo); err != nil { + logger.ErrorContext(ctx, "Failed to prewarm repo", + "upstream", rc.Repo, "clone_count", rc.Count, "duration", time.Since(start), "error", err) + continue + } + logger.InfoContext(ctx, "Prewarmed repo", + "upstream", rc.Repo, "clone_count", rc.Count, "duration", time.Since(start)) + } +} + +// prewarmRepo ensures the mirror for upstreamURL exists and has been freshened +// once. Uses FetchTimeout (not CloneTimeout) on the fetch path so a slow +// upstream on an already-cloned mirror cannot extend startup by an hour. +func (s *Strategy) prewarmRepo(ctx context.Context, upstreamURL string) error { + repo, err := s.cloneManager.GetOrCreate(ctx, upstreamURL) + if err != nil { + return errors.Wrapf(err, "resolve clone for %s", upstreamURL) + } + if repo.State() != gitclone.StateReady { + if err := s.ensureCloneReady(ctx, repo); err != nil { + return errors.Wrapf(err, "clone %s", upstreamURL) + } + return nil + } + if err := repo.FetchLenient(ctx, s.cloneManager.Config().FetchTimeout); err != nil { + return errors.Wrapf(err, "fetch %s", upstreamURL) + } + return nil } func (s *Strategy) warmExistingRepos(ctx context.Context) error {