Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions internal/metrics/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,15 @@ func SmallCountBuckets() []float64 {
}
}

// BandwidthMbpsBuckets is for per-request throughput in MiB/s, covering
// everything from slow long-tail clients (a few MiB/s) up through saturated
// 10 GbE links (~1.2 GiB/s) and the parallel-stream-on-localhost ceiling.
func BandwidthMbpsBuckets() []float64 {
return []float64{
1, 5, 10, 25, 50, 100, 250, 500, 1000, 2500, 5000,
}
}

// NewHistogram creates a Float64Histogram with explicit bucket boundaries.
// Prefer this over NewMetric for histograms: the OTel SDK default boundaries
// only go up to 10 seconds, which is far too narrow for most cachew metrics.
Expand Down
45 changes: 44 additions & 1 deletion internal/strategy/git/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"

"github.com/block/cachew/internal/metrics"
)
Expand All @@ -27,6 +28,9 @@ type gitMetrics struct {
spoolFollowerWaitTotal metric.Int64Counter
spoolFollowerWait metric.Float64Histogram
repackPackCount metric.Float64Histogram
snapshotServeBandwidth metric.Float64Histogram
lfsPhaseDuration metric.Float64Histogram
lfsPhaseBytes metric.Float64Histogram
}

func newGitMetrics() *gitMetrics {
Expand All @@ -47,6 +51,9 @@ func newGitMetrics() *gitMetrics {
spoolFollowerWaitTotal: metrics.NewMetric[metric.Int64Counter](meter, "cachew.git.spool_follower_waits_total", "{waits}", "Snapshot spool follower events, by outcome (served, writer_failed)"),
spoolFollowerWait: metrics.NewHistogram(meter, "cachew.git.spool_follower_wait_seconds", "s", "Time a snapshot spool follower spent waiting for the writer to publish headers", metrics.FastLatencyBuckets()),
repackPackCount: metrics.NewHistogram(meter, "cachew.git.repack_pack_count", "{packs}", "Pack file count observed before and after repack, by stage (before, after)", metrics.SmallCountBuckets()),
snapshotServeBandwidth: metrics.NewHistogram(meter, "cachew.git.snapshot_serve_bandwidth_mbps", "MiBy/s", "Per-request snapshot serve throughput in MiB/s, by source and repository", metrics.BandwidthMbpsBuckets()),
lfsPhaseDuration: metrics.NewHistogram(meter, "cachew.git.lfs_phase_duration_seconds", "s", "Duration of an LFS-snapshot generation phase (discover, clone, fetch, archive_upload), by status and repository", metrics.LatencyBuckets()),
lfsPhaseBytes: metrics.NewHistogram(meter, "cachew.git.lfs_phase_bytes", "By", "Bytes processed in an LFS-snapshot generation phase, by phase and repository (e.g. .git/lfs size after fetch)", metrics.ByteBuckets()),
}
}

Expand All @@ -64,7 +71,12 @@ func (m *gitMetrics) recordRequest(ctx context.Context, requestType string) {
m.requestTotal.Add(ctx, 1, metric.WithAttributes(attribute.String("type", requestType)))
}

// recordSnapshotServe records a snapshot serve event with its source, repository, size and wall-clock duration.
// recordSnapshotServe records a snapshot serve event with its source,
// repository, size and wall-clock duration. Also records per-request
// throughput (cachew.git.snapshot_serve_bandwidth_mbps) for non-empty,
// non-zero-duration serves so we can see the distribution of MiB/s instead
// of relying on aggregate-over-time of bytes/duration sums.
//
// Source is one of: "cache", "cold_cache", "spool", "generated".
func (m *gitMetrics) recordSnapshotServe(ctx context.Context, source, repo string, sizeBytes int64, duration time.Duration) {
attrs := metric.WithAttributes(
Expand All @@ -78,6 +90,13 @@ func (m *gitMetrics) recordSnapshotServe(ctx context.Context, source, repo strin
if duration > 0 {
m.snapshotServeDuration.Record(ctx, duration.Seconds(), attrs)
}
if sizeBytes > 0 && duration > 0 {
mbps := float64(sizeBytes) / (1 << 20) / duration.Seconds()
m.snapshotServeBandwidth.Record(ctx, mbps, attrs)
trace.SpanFromContext(ctx).SetAttributes(
attribute.Float64("cachew.snapshot.bandwidth_mbps", mbps),
)
}
}

// recordBundleServe records a bundle serve event. Source is one of:
Expand Down Expand Up @@ -137,3 +156,27 @@ func (m *gitMetrics) recordRepackPackCount(ctx context.Context, repo, stage stri
attribute.String("stage", stage),
))
}

// recordLFSPhase records the duration of one phase of LFS-snapshot
// generation. Phase is one of "discover", "clone", "fetch",
// "archive_upload". Status is "success" or "error".
func (m *gitMetrics) recordLFSPhase(ctx context.Context, repo, phase, status string, duration time.Duration) {
m.lfsPhaseDuration.Record(ctx, duration.Seconds(), metric.WithAttributes(
attribute.String("repository", repo),
attribute.String("phase", phase),
attribute.String("status", status),
))
}

// recordLFSPhaseBytes records the byte size associated with one phase of
// LFS-snapshot generation (e.g. .git/lfs total size observed after a
// fetch).
func (m *gitMetrics) recordLFSPhaseBytes(ctx context.Context, repo, phase string, sizeBytes int64) {
if sizeBytes <= 0 {
return
}
m.lfsPhaseBytes.Record(ctx, float64(sizeBytes), metric.WithAttributes(
attribute.String("repository", repo),
attribute.String("phase", phase),
))
}
68 changes: 65 additions & 3 deletions internal/strategy/git/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -871,12 +871,24 @@ func (s *Strategy) generateAndUploadLFSSnapshot(ctx context.Context, repo *gitcl
// Check if any .gitattributes file at HEAD declares filter=lfs. This searches
// the root and all nested .gitattributes, avoiding false negatives for repos
// that only configure LFS in subdirectories.
discoverStart := time.Now()
repoPath := repo.Path()
grepCmd := exec.CommandContext(ctx, "git", "-C", repoPath, "grep", "-q", "filter=lfs", "HEAD", "--", "*.gitattributes") //nolint:gosec
if err := grepCmd.Run(); err != nil {
logger.DebugContext(ctx, "No LFS filter in any .gitattributes, skipping LFS snapshot", "upstream", upstream)
return nil
// git grep exits 1 for "no match" (legitimate "no LFS in this repo");
// any other non-zero exit (invalid HEAD, repo corruption, command
// failure) is a real error that should propagate so we don't silently
// skip LFS snapshot generation for repos that actually use LFS.
var exitErr *exec.ExitError
if errors.As(err, &exitErr) && exitErr.ExitCode() == 1 {
s.metrics.recordLFSPhase(ctx, upstream, "discover", "skipped", time.Since(discoverStart))
logger.DebugContext(ctx, "No LFS filter in any .gitattributes, skipping LFS snapshot", "upstream", upstream)
return nil
}
s.metrics.recordLFSPhase(ctx, upstream, "discover", "error", time.Since(discoverStart))
return errors.Wrap(err, "git grep for LFS filter")
}
s.metrics.recordLFSPhase(ctx, upstream, "discover", "success", time.Since(discoverStart))

start := time.Now()
logger.InfoContext(ctx, "LFS snapshot generation started", "upstream", upstream)
Expand All @@ -887,7 +899,12 @@ func (s *Strategy) generateAndUploadLFSSnapshot(ctx context.Context, repo *gitcl

cacheKey := lfsSnapshotCacheKey(upstream)
excludePatterns := []string{"*.lock"}
cloneStart := time.Now()
cloneRecorded := false
if err := s.withSnapshotClone(ctx, repo, "lfs", func(workDir string) error {
s.metrics.recordLFSPhase(ctx, upstream, "clone", "success", time.Since(cloneStart))
cloneRecorded = true

// Set up LFS in the snapshot clone. cloneForSnapshot already restores
// remote.origin.url to the upstream URL, so LFS will fetch from GitHub.
// #nosec G204
Expand All @@ -898,23 +915,43 @@ func (s *Strategy) generateAndUploadLFSSnapshot(ctx context.Context, repo *gitcl
}

// Fetch only the LFS objects referenced by HEAD (the default branch).
fetchStart := time.Now()
fetchCmd, err := repo.GitCommand(ctx, "-C", workDir, "lfs", "fetch", "origin", "HEAD")
if err != nil {
s.metrics.recordLFSPhase(ctx, upstream, "fetch", "error", time.Since(fetchStart))
return errors.Wrap(err, "create git lfs fetch command")
}
if output, err := fetchCmd.CombinedOutput(); err != nil {
s.metrics.recordLFSPhase(ctx, upstream, "fetch", "error", time.Since(fetchStart))
return errors.Wrapf(err, "git lfs fetch: %s", string(output))
}
s.metrics.recordLFSPhase(ctx, upstream, "fetch", "success", time.Since(fetchStart))

lfsDir := filepath.Join(workDir, ".git", "lfs")
if _, err := os.Stat(lfsDir); os.IsNotExist(err) {
logger.InfoContext(ctx, "No LFS objects in repository, skipping LFS snapshot", "upstream", upstream)
return nil
}
// Record .git/lfs size as a proxy for "bytes fetched". Best-effort:
// surface 0 on error so we don't fail the snapshot for a stat walk.
if size, walkErr := dirSizeBytes(lfsDir); walkErr == nil {
s.metrics.recordLFSPhaseBytes(ctx, upstream, "fetch", size)
} else {
logger.DebugContext(ctx, "Failed to size .git/lfs after fetch", "upstream", upstream, "error", walkErr)
}

gitDir := filepath.Join(workDir, ".git")
return snapshot.CreatePaths(ctx, s.cache, cacheKey, gitDir, "lfs", []string{"lfs"}, 0, excludePatterns, s.config.ZstdThreads)
archiveStart := time.Now()
if err := snapshot.CreatePaths(ctx, s.cache, cacheKey, gitDir, "lfs", []string{"lfs"}, 0, excludePatterns, s.config.ZstdThreads); err != nil {
s.metrics.recordLFSPhase(ctx, upstream, "archive_upload", "error", time.Since(archiveStart))
return err //nolint:wrapcheck // wrapped by caller
}
s.metrics.recordLFSPhase(ctx, upstream, "archive_upload", "success", time.Since(archiveStart))
return nil
}); err != nil {
if !cloneRecorded {
s.metrics.recordLFSPhase(ctx, upstream, "clone", "error", time.Since(cloneStart))
}
s.metrics.recordOperation(ctx, "lfs-snapshot", "error", time.Since(start))
return errors.Wrap(err, "create LFS snapshot")
}
Expand All @@ -924,6 +961,31 @@ func (s *Strategy) generateAndUploadLFSSnapshot(ctx context.Context, repo *gitcl
return nil
}

// dirSizeBytes returns the total size in bytes of regular files under root.
// Per-entry stat or walk errors are deliberately swallowed so a transient
// failure (e.g. a file removed mid-walk during snapshot prep) doesn't fail
// the surrounding snapshot operation; the returned sum is best-effort.
func dirSizeBytes(root string) (int64, error) {
var total int64
err := filepath.WalkDir(root, func(_ string, d os.DirEntry, err error) error {
if err != nil {
return nil //nolint:nilerr // best-effort: skip unreadable entries
}
// Stat first so we don't drop files on filesystems where DirEntry.Type()
// reports "unknown" (e.g. some NFS/FUSE setups) and IsRegular() returns false.
info, infoErr := d.Info()
if infoErr != nil {
return nil //nolint:nilerr // best-effort: skip un-stat-able entries
}
if !info.Mode().IsRegular() {
return nil
}
total += info.Size()
return nil
})
return total, errors.WithStack(err)
}

func (s *Strategy) handleLFSSnapshotRequest(w http.ResponseWriter, r *http.Request, host, pathValue string) {
ctx := r.Context()
logger := logging.FromContext(ctx)
Expand Down