From 0c3b5af97e0f8e087434086fd8b4d5415a40ff1a Mon Sep 17 00:00:00 2001 From: Josh Friend Date: Fri, 15 May 2026 20:26:05 -0400 Subject: [PATCH] Stop periodic jobs for idle repos Repos that haven't had a client request in 24h (configurable via idle-timeout) no longer re-arm their snapshot/repack periodic jobs. When a request arrives for an idle repo, the jobs are re-scheduled. --- cmd/cachewd/main.go | 22 +++++--- cmd/cachewd/main_test.go | 31 +++++++++++ go.mod | 4 ++ internal/gitclone/manager.go | 15 ++++++ internal/jobscheduler/jobs.go | 87 ++++++++++++++++++++++++++---- internal/jobscheduler/jobs_test.go | 32 +++++++++++ internal/strategy/git/git.go | 7 +++ internal/strategy/git/refs.go | 1 + internal/strategy/git/repack.go | 2 +- internal/strategy/git/snapshot.go | 25 +++++---- 10 files changed, 199 insertions(+), 27 deletions(-) create mode 100644 cmd/cachewd/main_test.go diff --git a/cmd/cachewd/main.go b/cmd/cachewd/main.go index 788cada..bcda7f6 100644 --- a/cmd/cachewd/main.go +++ b/cmd/cachewd/main.go @@ -327,15 +327,25 @@ func fatalIfError(ctx context.Context, logger *slog.Logger, err error, msg strin os.Exit(1) } -// extractPathPrefix extracts the strategy name, path prefix from a request path. -// Examples: /git/... -> "git", /gomod/... -> "gomod", /api/v1/... -> "api". +// extractPathPrefix returns the first path segment, or the first two +// non-version segments for /api/ so that object/stats/namespaces aren't +// lumped together (e.g. /api/v1/object/... -> "api/object"). func extractPathPrefix(path string) string { - if path == "" || path == "/" { + parts := strings.SplitN(strings.Trim(path, "/"), "/", 4) + if len(parts) == 0 || parts[0] == "" { return "" } - trimmed := strings.TrimPrefix(path, "/") - prefix, _, _ := strings.Cut(trimmed, "/") - return prefix + if parts[0] != "api" || len(parts) < 2 { + return parts[0] + } + i := 1 + if i < len(parts) && len(parts[i]) > 0 && parts[i][0] == 'v' { + i++ + } + if i < len(parts) { + return "api/" + parts[i] + } + return "api" } func newServer( diff --git a/cmd/cachewd/main_test.go b/cmd/cachewd/main_test.go new file mode 100644 index 0000000..336eac5 --- /dev/null +++ b/cmd/cachewd/main_test.go @@ -0,0 +1,31 @@ +package main + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestExtractPathPrefix(t *testing.T) { + tests := []struct { + path string + want string + }{ + {"", ""}, + {"/", ""}, + {"/git/github.com/org/repo/info/refs", "git"}, + {"/gomod/proxy.golang.org/x/mod/@latest", "gomod"}, + {"/hermit/packages/go-1.22.0.tar.gz", "hermit"}, + {"/api/v1/object/brew/some-key", "api/object"}, + {"/api/v1/stats", "api/stats"}, + {"/api/v1/namespaces", "api/namespaces"}, + {"/api/object/ns/key", "api/object"}, + {"/api", "api"}, + {"/api/", "api"}, + } + for _, tt := range tests { + t.Run(tt.path, func(t *testing.T) { + assert.Equal(t, tt.want, extractPathPrefix(tt.path)) + }) + } +} diff --git a/go.mod b/go.mod index 8534329..384dec5 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/minio/minio-go/v7 v7.0.100 github.com/open-policy-agent/opa v1.15.2 github.com/prometheus/client_golang v1.23.2 + github.com/stretchr/testify v1.11.1 go.etcd.io/bbolt v1.4.3 go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.68.0 go.opentelemetry.io/contrib/instrumentation/runtime v0.68.0 @@ -33,6 +34,7 @@ require ( github.com/beorn7/perks v1.0.1 // indirect github.com/cenkalti/backoff/v5 v5.0.3 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.4.0 // indirect github.com/dlclark/regexp2 v1.11.5 // indirect github.com/dustin/go-humanize v1.0.1 // indirect @@ -59,6 +61,7 @@ require ( github.com/minio/md5-simd v1.1.2 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/philhofer/fwd v1.2.0 // indirect + github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/prometheus/client_model v0.6.2 // indirect github.com/prometheus/common v0.67.5 // indirect github.com/prometheus/otlptranslator v1.0.0 // indirect @@ -87,6 +90,7 @@ require ( google.golang.org/genproto/googleapis/rpc v0.0.0-20260401024825-9d38bb4040a9 // indirect google.golang.org/grpc v1.80.0 // indirect google.golang.org/protobuf v1.36.11 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect sigs.k8s.io/yaml v1.6.0 // indirect ) diff --git a/internal/gitclone/manager.go b/internal/gitclone/manager.go index 90a2bb3..c01de69 100644 --- a/internal/gitclone/manager.go +++ b/internal/gitclone/manager.go @@ -11,6 +11,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "syscall" "time" @@ -85,6 +86,19 @@ type Repository struct { refCheckValid bool fetchSem chan struct{} credentialProvider CredentialProvider + lastAccessed atomic.Int64 +} + +func (r *Repository) TouchAccessed() { + r.lastAccessed.Store(time.Now().UnixNano()) +} + +func (r *Repository) LastAccessed() time.Time { + ns := r.lastAccessed.Load() + if ns == 0 { + return time.Time{} + } + return time.Unix(0, ns) } type Manager struct { @@ -199,6 +213,7 @@ func (m *Manager) GetOrCreate(_ context.Context, upstreamURL string) (*Repositor fetchSem: make(chan struct{}, 1), credentialProvider: m.credentialProvider, } + repo.lastAccessed.Store(time.Now().UnixNano()) headFile := filepath.Join(clonePath, "HEAD") if _, err := os.Stat(headFile); err == nil { diff --git a/internal/jobscheduler/jobs.go b/internal/jobscheduler/jobs.go index 123b08c..914b1a0 100644 --- a/internal/jobscheduler/jobs.go +++ b/internal/jobscheduler/jobs.go @@ -7,6 +7,7 @@ import ( "runtime" "strings" "sync" + "sync/atomic" "time" "github.com/alecthomas/errors" @@ -22,6 +23,16 @@ type Config struct { SchedulerDB string `hcl:"scheduler-db" help:"Path to the scheduler state database." default:"${CACHEW_STATE}/scheduler.db"` } +// idledPeriodicJob stores enough information to re-arm a periodic job that was +// stopped due to queue inactivity. +type idledPeriodicJob struct { + queue string + id string + interval time.Duration + run func(ctx context.Context) error + idleTimeout time.Duration +} + type queueJob struct { id string queue string @@ -49,9 +60,14 @@ type Scheduler interface { // Jobs run concurrently across queues, but never within a queue. Submit(queue, id string, run func(ctx context.Context) error) // SubmitPeriodicJob submits a job to the queue that runs immediately, and then periodically after the interval. + // If idleTimeout is provided, the job stops re-arming when the queue has not been touched (via Touch) for + // longer than the timeout. Calling Touch on an idle queue re-arms all its stopped periodic jobs. // // Jobs run concurrently across queues, but never within a queue. - SubmitPeriodicJob(queue, id string, interval time.Duration, run func(ctx context.Context) error) + SubmitPeriodicJob(queue, id string, interval time.Duration, run func(ctx context.Context) error, idleTimeout ...time.Duration) + // Touch records activity for a queue, resetting its idle timer. If the queue had periodic jobs that were + // stopped due to inactivity, they are re-scheduled. + Touch(queue string) } type prefixedScheduler struct { @@ -63,8 +79,12 @@ func (p *prefixedScheduler) Submit(queue, id string, run func(ctx context.Contex p.scheduler.Submit(queue, p.prefix+id, run) } -func (p *prefixedScheduler) SubmitPeriodicJob(queue, id string, interval time.Duration, run func(ctx context.Context) error) { - p.scheduler.SubmitPeriodicJob(queue, p.prefix+id, interval, run) +func (p *prefixedScheduler) SubmitPeriodicJob(queue, id string, interval time.Duration, run func(ctx context.Context) error, idleTimeout ...time.Duration) { + p.scheduler.SubmitPeriodicJob(queue, p.prefix+id, interval, run, idleTimeout...) +} + +func (p *prefixedScheduler) Touch(queue string) { + p.scheduler.Touch(queue) } func (p *prefixedScheduler) WithQueuePrefix(prefix string) Scheduler { @@ -85,11 +105,13 @@ type RootScheduler struct { // ctx is cancelled when the scheduler is shutting down. Periodic re-arm // goroutines select on it so they exit cleanly instead of submitting to a // dead scheduler. - ctx context.Context //nolint:containedctx - cancel context.CancelFunc - wg sync.WaitGroup - store ScheduleStore - metrics *schedulerMetrics + ctx context.Context //nolint:containedctx + cancel context.CancelFunc + wg sync.WaitGroup + store ScheduleStore + metrics *schedulerMetrics + lastTouched sync.Map // queue -> *atomic.Int64 (unix nanos) + idledJobs sync.Map // jobKey -> *idledPeriodicJob } var _ Scheduler = &RootScheduler{} @@ -173,7 +195,18 @@ func (q *RootScheduler) Submit(queue, id string, run func(ctx context.Context) e q.cond.Signal() } -func (q *RootScheduler) SubmitPeriodicJob(queue, id string, interval time.Duration, run func(ctx context.Context) error) { +func (q *RootScheduler) SubmitPeriodicJob(queue, id string, interval time.Duration, run func(ctx context.Context) error, idleTimeout ...time.Duration) { + var timeout time.Duration + if len(idleTimeout) > 0 { + timeout = idleTimeout[0] + } + if timeout > 0 { + q.touchQueue(queue) + } + q.submitPeriodicJob(queue, id, interval, run, timeout) +} + +func (q *RootScheduler) submitPeriodicJob(queue, id string, interval time.Duration, run func(ctx context.Context) error, idleTimeout time.Duration) { if q.ctx.Err() != nil { return } @@ -192,7 +225,14 @@ func (q *RootScheduler) SubmitPeriodicJob(queue, id string, interval time.Durati // to wake and submit to a dead scheduler. The new pod's // warmExistingRepos re-registers periodic jobs on startup. go q.sleepThenSubmit(interval, func() { - q.SubmitPeriodicJob(queue, id, interval, run) + if idleTimeout > 0 && q.isQueueIdle(queue, idleTimeout) { + logging.FromContext(ctx).InfoContext(ctx, "Periodic job idled out", "queue", queue, "job", id) + q.idledJobs.Store(key, &idledPeriodicJob{ + queue: queue, id: id, interval: interval, run: run, idleTimeout: idleTimeout, + }) + return + } + q.submitPeriodicJob(queue, id, interval, run, idleTimeout) }) return errors.WithStack(err) }) @@ -204,6 +244,33 @@ func (q *RootScheduler) SubmitPeriodicJob(queue, id string, interval time.Durati go q.sleepThenSubmit(delay, submit) } +// Touch records activity for a queue, resetting its idle timer. If the queue +// had periodic jobs that were stopped due to inactivity, they are re-scheduled. +func (q *RootScheduler) Touch(queue string) { + q.touchQueue(queue) + q.idledJobs.Range(func(key, value any) bool { + job := value.(*idledPeriodicJob) + if job.queue == queue { + q.idledJobs.Delete(key) + q.submitPeriodicJob(job.queue, job.id, job.interval, job.run, job.idleTimeout) + } + return true + }) +} + +func (q *RootScheduler) touchQueue(queue string) { + val, _ := q.lastTouched.LoadOrStore(queue, &atomic.Int64{}) + val.(*atomic.Int64).Store(time.Now().UnixNano()) +} + +func (q *RootScheduler) isQueueIdle(queue string, timeout time.Duration) bool { + val, ok := q.lastTouched.Load(queue) + if !ok { + return true + } + return time.Since(time.Unix(0, val.(*atomic.Int64).Load())) > timeout +} + // sleepThenSubmit waits for d, then runs fn — unless the scheduler is // shutting down, in which case it returns immediately. func (q *RootScheduler) sleepThenSubmit(d time.Duration, fn func()) { diff --git a/internal/jobscheduler/jobs_test.go b/internal/jobscheduler/jobs_test.go index 75516b4..ce3e1e6 100644 --- a/internal/jobscheduler/jobs_test.go +++ b/internal/jobscheduler/jobs_test.go @@ -263,6 +263,38 @@ func TestJobSchedulerPeriodicJob(t *testing.T) { }, "periodic job should execute multiple times") } +func TestJobSchedulerPeriodicJobStopsOnIdle(t *testing.T) { + _, ctx := logging.Configure(context.Background(), logging.Config{Level: slog.LevelError}) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + scheduler, err := jobscheduler.New(ctx, jobscheduler.Config{Concurrency: 1}) + assert.NoError(t, err) + t.Cleanup(func() { scheduler.Close() }) + + var executions atomic.Int32 + done := make(chan struct{}) + + // Submit with a nanosecond idle timeout. The initial SubmitPeriodicJob + // touches the queue so the first execution runs. By the time the re-arm + // check fires, the nanosecond timeout has elapsed and the job stops. + scheduler.SubmitPeriodicJob("queue1", "idle-job", time.Nanosecond, func(_ context.Context) error { + executions.Add(1) + close(done) + return nil + }, time.Nanosecond) + + <-done + + // Cancel and drain the scheduler. If the job was re-armed, the worker + // would execute it before Wait returns because concurrency is 1. + cancel() + scheduler.Wait() + + assert.Equal(t, int32(1), executions.Load(), + "periodic job should not re-arm after idle timeout") +} + func TestJobSchedulerPeriodicJobWithError(t *testing.T) { _, ctx := logging.Configure(context.Background(), logging.Config{Level: slog.LevelError}) ctx, cancel := context.WithCancel(ctx) diff --git a/internal/strategy/git/git.go b/internal/strategy/git/git.go index 261b3d3..8eceddb 100644 --- a/internal/strategy/git/git.go +++ b/internal/strategy/git/git.go @@ -49,6 +49,7 @@ type Config struct { RepackInterval time.Duration `hcl:"repack-interval,optional" help:"How often to run full repack. 0 disables." default:"0"` ZstdThreads int `hcl:"zstd-threads,optional" help:"Threads for zstd compression/decompression. 0 = all CPU cores; useful for short-lived CLI invocations but risky on a long-running server where multiple snapshot/restore operations can run concurrently." default:"4"` BundleCacheTTL time.Duration `hcl:"bundle-cache-ttl,optional" help:"TTL of cached server-side git bundles." default:"2h"` + IdleTimeout time.Duration `hcl:"idle-timeout,optional" help:"Stop periodic jobs for repos with no client requests for this duration. 0 disables." default:"72h"` } type Strategy struct { @@ -197,6 +198,11 @@ func (s *Strategy) Ready() bool { return s.ready.Load() } +func (s *Strategy) touchRepo(repo *gitclone.Repository) { + repo.TouchAccessed() + s.scheduler.Touch(repo.UpstreamURL()) +} + // SetMetadataStore enables the per-repo clone histogram and schedules its // daily reaper. Called by config.Load after the metadata backend is built. func (s *Strategy) SetMetadataStore(store *metadatadb.Store) { @@ -332,6 +338,7 @@ func (s *Strategy) handleGitRequest(w http.ResponseWriter, r *http.Request, host http.Error(w, "Internal server error", http.StatusInternalServerError) return } + s.touchRepo(repo) // Increment after GetOrCreate so unvalidated URLs can't bloat the keyspace. if isClone, cerr := RequestIsClone(pathValue, r); cerr != nil { diff --git a/internal/strategy/git/refs.go b/internal/strategy/git/refs.go index e8335a7..1c4cbed 100644 --- a/internal/strategy/git/refs.go +++ b/internal/strategy/git/refs.go @@ -72,6 +72,7 @@ func (s *Strategy) handleEnsureRefs(w http.ResponseWriter, r *http.Request, host http.Error(w, "internal server error", http.StatusInternalServerError) return } + s.touchRepo(repo) if repo.State() != gitclone.StateReady { if err := s.ensureCloneReady(ctx, repo); err != nil { diff --git a/internal/strategy/git/repack.go b/internal/strategy/git/repack.go index 51b8efc..3586dd4 100644 --- a/internal/strategy/git/repack.go +++ b/internal/strategy/git/repack.go @@ -19,5 +19,5 @@ func (s *Strategy) scheduleRepackJobs(repo *gitclone.Repository) { } s.metrics.recordOperation(ctx, "repack", status, time.Since(start)) return errors.Wrap(err, "repack") - }) + }, s.config.IdleTimeout) } diff --git a/internal/strategy/git/snapshot.go b/internal/strategy/git/snapshot.go index 926436c..61185dc 100644 --- a/internal/strategy/git/snapshot.go +++ b/internal/strategy/git/snapshot.go @@ -170,17 +170,17 @@ func (s *Strategy) generateAndUploadMirrorSnapshot(ctx context.Context, repo *gi func (s *Strategy) scheduleSnapshotJobs(repo *gitclone.Repository) { s.scheduler.SubmitPeriodicJob(repo.UpstreamURL(), "snapshot-periodic", s.config.SnapshotInterval, func(ctx context.Context) error { return s.generateAndUploadSnapshot(ctx, repo) - }) + }, s.config.IdleTimeout) s.scheduler.SubmitPeriodicJob(repo.UpstreamURL(), "lfs-snapshot-periodic", s.config.SnapshotInterval, func(ctx context.Context) error { return s.generateAndUploadLFSSnapshot(ctx, repo) - }) + }, s.config.IdleTimeout) mirrorInterval := s.config.MirrorSnapshotInterval if mirrorInterval == 0 { mirrorInterval = s.config.SnapshotInterval } s.scheduler.SubmitPeriodicJob(repo.UpstreamURL(), "mirror-snapshot-periodic", mirrorInterval, func(ctx context.Context) error { return s.generateAndUploadMirrorSnapshot(ctx, repo) - }) + }, s.config.IdleTimeout) } func (s *Strategy) snapshotMutexFor(upstreamURL string) *sync.Mutex { @@ -202,6 +202,7 @@ func (s *Strategy) handleSnapshotRequest(w http.ResponseWriter, r *http.Request, http.Error(w, "Internal server error", http.StatusInternalServerError) return } + s.touchRepo(repo) cacheKey := snapshotCacheKey(upstreamURL) @@ -346,6 +347,7 @@ func (s *Strategy) handleBundleRequest(w http.ResponseWriter, r *http.Request, h http.Error(w, "Internal server error", http.StatusInternalServerError) return } + s.touchRepo(repo) if cloneErr := s.ensureCloneReady(ctx, repo); cloneErr != nil { logger.ErrorContext(ctx, "Clone unavailable for bundle", "upstream", upstreamURL, "error", cloneErr) http.Error(w, "Repository unavailable", http.StatusServiceUnavailable) @@ -832,13 +834,16 @@ func (s *Strategy) handleLFSSnapshotRequest(w http.ResponseWriter, r *http.Reque // restore + on-demand generation. Kick off a background mirror warm so // the periodic LFS snapshot job can fire once the mirror is ready. logger.InfoContext(ctx, "LFS snapshot cache miss, triggering background warm", "upstream", upstreamURL) - if repo, repoErr := s.cloneManager.GetOrCreate(ctx, upstreamURL); repoErr == nil && repo.State() != gitclone.StateReady { - s.scheduler.Submit(upstreamURL, "lfs-mirror-warm", func(ctx context.Context) error { - if err := s.startClone(ctx, repo); err != nil { - logger.WarnContext(ctx, "Background mirror warm for LFS failed", "upstream", upstreamURL, "error", err) - } - return nil - }) + if repo, repoErr := s.cloneManager.GetOrCreate(ctx, upstreamURL); repoErr == nil { + s.touchRepo(repo) + if repo.State() != gitclone.StateReady { + s.scheduler.Submit(upstreamURL, "lfs-mirror-warm", func(ctx context.Context) error { + if err := s.startClone(ctx, repo); err != nil { + logger.WarnContext(ctx, "Background mirror warm for LFS failed", "upstream", upstreamURL, "error", err) + } + return nil + }) + } } http.Error(w, "LFS snapshot not found", http.StatusNotFound) }