Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions internal/metadatadb/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,13 @@ func (m *MemoryBackend) Apply(_ context.Context, namespace string, ops ...Op) er

func (m *MemoryBackend) Query(_ context.Context, namespace string, q ReadOp, target any) error {
m.mu.RLock()
defer m.mu.RUnlock()
result := queryState(m.ns(namespace), q)
// Do not lazy-create the namespace under a read lock — that races with
// concurrent Apply on a different namespace. queryState handles a nil
// map by returning zero values, which is the right answer for a
// namespace that has never been written.
ns := m.state[namespace]
result := queryState(ns, q)
m.mu.RUnlock()
return errors.Wrap(jsonUnmarshalInto(result, target), "memory query")
}

Expand Down
101 changes: 91 additions & 10 deletions internal/strategy/git/git.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,11 @@ type Config struct {
RepackInterval time.Duration `hcl:"repack-interval,optional" help:"How often to run full repack. 0 disables." default:"0"`
ZstdThreads int `hcl:"zstd-threads,optional" help:"Threads for zstd compression/decompression. 0 = all CPU cores; useful for short-lived CLI invocations but risky on a long-running server where multiple snapshot/restore operations can run concurrently." default:"4"`
BundleCacheTTL time.Duration `hcl:"bundle-cache-ttl,optional" help:"TTL of cached server-side git bundles." default:"2h"`
PrewarmMostClonedRepos int `hcl:"prewarm-most-cloned-repos,optional" help:"Number of most-cloned repositories to warm in the background after startup. 0 disables." default:"0"`
}

const prewarmPopularityWindowDays = 14

type Strategy struct {
config Config
cache cache.Cache
Expand All @@ -69,6 +72,7 @@ type Strategy struct {
metrics *gitMetrics
repoCounts *RepoCounts
ready atomic.Bool
existingWarmedCh chan struct{} // closed when warmExistingRepos returns; used to sequence background prewarm
}

func New(
Expand Down Expand Up @@ -129,15 +133,16 @@ func New(
m := newGitMetrics()

s := &Strategy{
config: config,
cache: cache,
cloneManager: cloneManager,
httpClient: http.DefaultClient,
ctx: ctx,
scheduler: scheduler.WithQueuePrefix("git"),
spools: make(map[string]*RepoSpools),
tokenManager: tokenManager,
metrics: m,
config: config,
cache: cache,
cloneManager: cloneManager,
httpClient: http.DefaultClient,
ctx: ctx,
scheduler: scheduler.WithQueuePrefix("git"),
spools: make(map[string]*RepoSpools),
tokenManager: tokenManager,
metrics: m,
existingWarmedCh: make(chan struct{}),
}
// Run startup fetches in the background so the HTTP listener (and
// /_liveness) come up immediately. /_readiness gates on Ready() so the
Expand All @@ -148,6 +153,7 @@ func New(
logger.WarnContext(warmCtx, "Failed to warm existing repos", "error", err)
}
s.ready.Store(true)
close(s.existingWarmedCh)
logger.InfoContext(warmCtx, "Git strategy ready")
}()

Expand Down Expand Up @@ -199,19 +205,94 @@ func (s *Strategy) Ready() bool {

// SetMetadataStore enables the per-repo clone histogram and schedules its
// daily reaper. Called by config.Load after the metadata backend is built.
// When PrewarmMostClonedRepos > 0, also kicks off a background pass that
// warms the top-N most-cloned repos so that subsequent requests (including
// cold-S3-cache snapshot generation and wire-protocol traffic for those
// repos) hit a warm local mirror. The pass does not gate Ready(): cold-path
// fallbacks already serve traffic correctly while warming runs.
func (s *Strategy) SetMetadataStore(store *metadatadb.Store) {
logger := logging.FromContext(s.ctx)
if store == nil {
if s.config.PrewarmMostClonedRepos > 0 {
logger.WarnContext(s.ctx, "prewarm-most-cloned-repos is set but no metadata store was provided; "+
"background prewarm is disabled (misconfiguration)")
}
return
}
s.repoCounts = NewRepoCounts(store.Namespace("git"))
logging.FromContext(s.ctx).InfoContext(s.ctx, "Per-repo clone histogram enabled",
logger.InfoContext(s.ctx, "Per-repo clone histogram enabled",
"retention_days", s.repoCounts.retentionDays)
s.scheduler.SubmitPeriodicJob("repo-counts-reaper", "reap-repo-counts", defaultRepoCountsReapInterval, func(ctx context.Context) error {
if deleted := s.repoCounts.Reap(); deleted > 0 {
logging.FromContext(ctx).InfoContext(ctx, "Reaped stale repo clone counts", "deleted", deleted)
}
return nil
})
if s.config.PrewarmMostClonedRepos > 0 {
go func() {
warmCtx := context.WithoutCancel(s.ctx)
s.prewarmMostCloned(warmCtx)
logging.FromContext(warmCtx).InfoContext(warmCtx, "Background prewarm pass complete")
}()
}
}

// prewarmMostCloned warms the most-cloned repositories in the background after
// startup. Runs after warmExistingRepos so the two passes don't race on the
// same mirror directories.
//
// Per-repo failures and an empty histogram are logged and otherwise ignored:
// this is an optimization, not a correctness invariant. Cold-path fallbacks in
// the snapshot/bundle/wire-protocol handlers already serve traffic correctly
// for un-warmed repos.
//
// TODO: bound concurrency (currently serial).
func (s *Strategy) prewarmMostCloned(ctx context.Context) {
logger := logging.FromContext(ctx)
select {
case <-ctx.Done():
return
case <-s.existingWarmedCh:
}
top := s.repoCounts.TopRepos(prewarmPopularityWindowDays, s.config.PrewarmMostClonedRepos)
if len(top) == 0 {
logger.WarnContext(ctx, "prewarm-most-cloned-repos is set but the clone histogram is empty; nothing to warm")
return
}
logger.InfoContext(ctx, "Prewarming most-cloned repositories", "count", len(top))
for _, rc := range top {
if ctx.Err() != nil {
return
}
start := time.Now()
if err := s.prewarmRepo(ctx, rc.Repo); err != nil {
logger.ErrorContext(ctx, "Failed to prewarm repo",
"upstream", rc.Repo, "clone_count", rc.Count, "duration", time.Since(start), "error", err)
continue
}
logger.InfoContext(ctx, "Prewarmed repo",
"upstream", rc.Repo, "clone_count", rc.Count, "duration", time.Since(start))
}
}

// prewarmRepo ensures the mirror for upstreamURL exists and has been freshened
// once. Uses FetchTimeout (not CloneTimeout) on the fetch path so a slow
// upstream on an already-cloned mirror cannot extend startup by an hour.
func (s *Strategy) prewarmRepo(ctx context.Context, upstreamURL string) error {
repo, err := s.cloneManager.GetOrCreate(ctx, upstreamURL)
if err != nil {
return errors.Wrapf(err, "resolve clone for %s", upstreamURL)
}
if repo.State() != gitclone.StateReady {
if err := s.ensureCloneReady(ctx, repo); err != nil {
return errors.Wrapf(err, "clone %s", upstreamURL)
}
return nil
}
if err := repo.FetchLenient(ctx, s.cloneManager.Config().FetchTimeout); err != nil {
return errors.Wrapf(err, "fetch %s", upstreamURL)
}
return nil
}

func (s *Strategy) warmExistingRepos(ctx context.Context) error {
Expand Down