Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 20 additions & 11 deletions internal/strategy/git/git.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ func (s *Strategy) SetMetadataStore(store *metadatadb.Store) {
// TODO: bound concurrency (currently serial).
func (s *Strategy) prewarmMostCloned(ctx context.Context) {
logger := logging.FromContext(ctx)
passStart := time.Now()
select {
case <-ctx.Done():
return
Expand All @@ -257,6 +258,7 @@ func (s *Strategy) prewarmMostCloned(ctx context.Context) {
top := s.repoCounts.TopRepos(prewarmPopularityWindowDays, s.config.PrewarmMostClonedRepos)
if len(top) == 0 {
logger.WarnContext(ctx, "prewarm-most-cloned-repos is set but the clone histogram is empty; nothing to warm")
s.metrics.recordPrewarmPass(ctx, "empty_histogram", time.Since(passStart))
return
}
logger.InfoContext(ctx, "Prewarming most-cloned repositories", "count", len(top))
Expand All @@ -265,34 +267,41 @@ func (s *Strategy) prewarmMostCloned(ctx context.Context) {
return
}
start := time.Now()
if err := s.prewarmRepo(ctx, rc.Repo); err != nil {
path, err := s.prewarmRepo(ctx, rc.Repo)
dur := time.Since(start)
status := "success"
if err != nil {
status = "error"
logger.ErrorContext(ctx, "Failed to prewarm repo",
"upstream", rc.Repo, "clone_count", rc.Count, "duration", time.Since(start), "error", err)
continue
"upstream", rc.Repo, "clone_count", rc.Count, "path", path, "duration", dur, "error", err)
} else {
logger.InfoContext(ctx, "Prewarmed repo",
"upstream", rc.Repo, "clone_count", rc.Count, "path", path, "duration", dur)
}
logger.InfoContext(ctx, "Prewarmed repo",
"upstream", rc.Repo, "clone_count", rc.Count, "duration", time.Since(start))
s.metrics.recordPrewarmRepo(ctx, path, status, dur)
}
s.metrics.recordPrewarmPass(ctx, "complete", time.Since(passStart))
}

// prewarmRepo ensures the mirror for upstreamURL exists and has been freshened
// once. Uses FetchTimeout (not CloneTimeout) on the fetch path so a slow
// upstream on an already-cloned mirror cannot extend startup by an hour.
func (s *Strategy) prewarmRepo(ctx context.Context, upstreamURL string) error {
// Returns the path taken ("fetch" or "restore") for metrics labelling.
func (s *Strategy) prewarmRepo(ctx context.Context, upstreamURL string) (string, error) {
repo, err := s.cloneManager.GetOrCreate(ctx, upstreamURL)
if err != nil {
return errors.Wrapf(err, "resolve clone for %s", upstreamURL)
return "restore", errors.Wrapf(err, "resolve clone for %s", upstreamURL)
}
if repo.State() != gitclone.StateReady {
if err := s.ensureCloneReady(ctx, repo); err != nil {
return errors.Wrapf(err, "clone %s", upstreamURL)
return "restore", errors.Wrapf(err, "clone %s", upstreamURL)
}
return nil
return "restore", nil
}
if err := repo.FetchLenient(ctx, s.cloneManager.Config().FetchTimeout); err != nil {
return errors.Wrapf(err, "fetch %s", upstreamURL)
return "fetch", errors.Wrapf(err, "fetch %s", upstreamURL)
}
return nil
return "fetch", nil
}

func (s *Strategy) warmExistingRepos(ctx context.Context) error {
Expand Down
25 changes: 25 additions & 0 deletions internal/strategy/git/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ type gitMetrics struct {
spoolFollowerWaitTotal metric.Int64Counter
spoolFollowerWait metric.Float64Histogram
repackPackCount metric.Float64Histogram
prewarmRepoDuration metric.Float64Histogram
prewarmPassDuration metric.Float64Histogram
prewarmPassTotal metric.Int64Counter
}

func newGitMetrics() *gitMetrics {
Expand All @@ -47,6 +50,9 @@ func newGitMetrics() *gitMetrics {
spoolFollowerWaitTotal: metrics.NewMetric[metric.Int64Counter](meter, "cachew.git.spool_follower_waits_total", "{waits}", "Snapshot spool follower events, by outcome (served, writer_failed)"),
spoolFollowerWait: metrics.NewHistogram(meter, "cachew.git.spool_follower_wait_seconds", "s", "Time a snapshot spool follower spent waiting for the writer to publish headers", metrics.FastLatencyBuckets()),
repackPackCount: metrics.NewHistogram(meter, "cachew.git.repack_pack_count", "{packs}", "Pack file count observed before and after repack, by stage (before, after)", metrics.SmallCountBuckets()),
prewarmRepoDuration: metrics.NewHistogram(meter, "cachew.git.prewarm_repo_duration_seconds", "s", "Duration of per-repo background prewarm, by path (fetch, restore) and status", metrics.LatencyBuckets()),
prewarmPassDuration: metrics.NewHistogram(meter, "cachew.git.prewarm_pass_duration_seconds", "s", "Total duration of the background prewarm pass, by outcome", metrics.LatencyBuckets()),
prewarmPassTotal: metrics.NewMetric[metric.Int64Counter](meter, "cachew.git.prewarm_passes_total", "{passes}", "Background prewarm passes completed, by outcome (complete, empty_histogram)"),
}
}

Expand Down Expand Up @@ -137,3 +143,22 @@ func (m *gitMetrics) recordRepackPackCount(ctx context.Context, repo, stage stri
attribute.String("stage", stage),
))
}

// recordPrewarmRepo records the wall-clock cost of warming a single repo
// during the background prewarm pass. path is "fetch" (mirror already on
// disk) or "restore" (mirror snapshot pulled from S3 or full clone). status
// is "success" or "error".
func (m *gitMetrics) recordPrewarmRepo(ctx context.Context, path, status string, duration time.Duration) {
m.prewarmRepoDuration.Record(ctx, duration.Seconds(), metric.WithAttributes(
attribute.String("path", path),
attribute.String("status", status),
))
}

// recordPrewarmPass records the total prewarm-pass wall time and completion
// outcome. outcome is "complete" or "empty_histogram".
func (m *gitMetrics) recordPrewarmPass(ctx context.Context, outcome string, duration time.Duration) {
attrs := metric.WithAttributes(attribute.String("outcome", outcome))
m.prewarmPassTotal.Add(ctx, 1, attrs)
m.prewarmPassDuration.Record(ctx, duration.Seconds(), attrs)
}
Loading