diff --git a/internal/metrics/helper.go b/internal/metrics/helper.go index 203927b..82be9b8 100644 --- a/internal/metrics/helper.go +++ b/internal/metrics/helper.go @@ -56,6 +56,15 @@ func SmallCountBuckets() []float64 { } } +// BandwidthMbpsBuckets is for per-request throughput in MiB/s, covering +// everything from slow long-tail clients (a few MiB/s) up through saturated +// 10 GbE links (~1.2 GiB/s) and the parallel-stream-on-localhost ceiling. +func BandwidthMbpsBuckets() []float64 { + return []float64{ + 1, 5, 10, 25, 50, 100, 250, 500, 1000, 2500, 5000, + } +} + // NewHistogram creates a Float64Histogram with explicit bucket boundaries. // Prefer this over NewMetric for histograms: the OTel SDK default boundaries // only go up to 10 seconds, which is far too narrow for most cachew metrics. diff --git a/internal/strategy/git/metrics.go b/internal/strategy/git/metrics.go index fec3e19..b95d806 100644 --- a/internal/strategy/git/metrics.go +++ b/internal/strategy/git/metrics.go @@ -7,6 +7,7 @@ import ( "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/trace" "github.com/block/cachew/internal/metrics" ) @@ -27,6 +28,9 @@ type gitMetrics struct { spoolFollowerWaitTotal metric.Int64Counter spoolFollowerWait metric.Float64Histogram repackPackCount metric.Float64Histogram + snapshotServeBandwidth metric.Float64Histogram + lfsPhaseDuration metric.Float64Histogram + lfsPhaseBytes metric.Float64Histogram } func newGitMetrics() *gitMetrics { @@ -47,6 +51,9 @@ func newGitMetrics() *gitMetrics { spoolFollowerWaitTotal: metrics.NewMetric[metric.Int64Counter](meter, "cachew.git.spool_follower_waits_total", "{waits}", "Snapshot spool follower events, by outcome (served, writer_failed)"), spoolFollowerWait: metrics.NewHistogram(meter, "cachew.git.spool_follower_wait_seconds", "s", "Time a snapshot spool follower spent waiting for the writer to publish headers", metrics.FastLatencyBuckets()), repackPackCount: metrics.NewHistogram(meter, "cachew.git.repack_pack_count", "{packs}", "Pack file count observed before and after repack, by stage (before, after)", metrics.SmallCountBuckets()), + snapshotServeBandwidth: metrics.NewHistogram(meter, "cachew.git.snapshot_serve_bandwidth_mbps", "MiBy/s", "Per-request snapshot serve throughput in MiB/s, by source and repository", metrics.BandwidthMbpsBuckets()), + lfsPhaseDuration: metrics.NewHistogram(meter, "cachew.git.lfs_phase_duration_seconds", "s", "Duration of an LFS-snapshot generation phase (discover, clone, fetch, archive_upload), by status and repository", metrics.LatencyBuckets()), + lfsPhaseBytes: metrics.NewHistogram(meter, "cachew.git.lfs_phase_bytes", "By", "Bytes processed in an LFS-snapshot generation phase, by phase and repository (e.g. .git/lfs size after fetch)", metrics.ByteBuckets()), } } @@ -64,7 +71,12 @@ func (m *gitMetrics) recordRequest(ctx context.Context, requestType string) { m.requestTotal.Add(ctx, 1, metric.WithAttributes(attribute.String("type", requestType))) } -// recordSnapshotServe records a snapshot serve event with its source, repository, size and wall-clock duration. +// recordSnapshotServe records a snapshot serve event with its source, +// repository, size and wall-clock duration. Also records per-request +// throughput (cachew.git.snapshot_serve_bandwidth_mbps) for non-empty, +// non-zero-duration serves so we can see the distribution of MiB/s instead +// of relying on aggregate-over-time of bytes/duration sums. +// // Source is one of: "cache", "cold_cache", "spool", "generated". func (m *gitMetrics) recordSnapshotServe(ctx context.Context, source, repo string, sizeBytes int64, duration time.Duration) { attrs := metric.WithAttributes( @@ -78,6 +90,13 @@ func (m *gitMetrics) recordSnapshotServe(ctx context.Context, source, repo strin if duration > 0 { m.snapshotServeDuration.Record(ctx, duration.Seconds(), attrs) } + if sizeBytes > 0 && duration > 0 { + mbps := float64(sizeBytes) / (1 << 20) / duration.Seconds() + m.snapshotServeBandwidth.Record(ctx, mbps, attrs) + trace.SpanFromContext(ctx).SetAttributes( + attribute.Float64("cachew.snapshot.bandwidth_mbps", mbps), + ) + } } // recordBundleServe records a bundle serve event. Source is one of: @@ -137,3 +156,27 @@ func (m *gitMetrics) recordRepackPackCount(ctx context.Context, repo, stage stri attribute.String("stage", stage), )) } + +// recordLFSPhase records the duration of one phase of LFS-snapshot +// generation. Phase is one of "discover", "clone", "fetch", +// "archive_upload". Status is "success" or "error". +func (m *gitMetrics) recordLFSPhase(ctx context.Context, repo, phase, status string, duration time.Duration) { + m.lfsPhaseDuration.Record(ctx, duration.Seconds(), metric.WithAttributes( + attribute.String("repository", repo), + attribute.String("phase", phase), + attribute.String("status", status), + )) +} + +// recordLFSPhaseBytes records the byte size associated with one phase of +// LFS-snapshot generation (e.g. .git/lfs total size observed after a +// fetch). +func (m *gitMetrics) recordLFSPhaseBytes(ctx context.Context, repo, phase string, sizeBytes int64) { + if sizeBytes <= 0 { + return + } + m.lfsPhaseBytes.Record(ctx, float64(sizeBytes), metric.WithAttributes( + attribute.String("repository", repo), + attribute.String("phase", phase), + )) +} diff --git a/internal/strategy/git/snapshot.go b/internal/strategy/git/snapshot.go index 5f7fcdf..214a67f 100644 --- a/internal/strategy/git/snapshot.go +++ b/internal/strategy/git/snapshot.go @@ -871,12 +871,24 @@ func (s *Strategy) generateAndUploadLFSSnapshot(ctx context.Context, repo *gitcl // Check if any .gitattributes file at HEAD declares filter=lfs. This searches // the root and all nested .gitattributes, avoiding false negatives for repos // that only configure LFS in subdirectories. + discoverStart := time.Now() repoPath := repo.Path() grepCmd := exec.CommandContext(ctx, "git", "-C", repoPath, "grep", "-q", "filter=lfs", "HEAD", "--", "*.gitattributes") //nolint:gosec if err := grepCmd.Run(); err != nil { - logger.DebugContext(ctx, "No LFS filter in any .gitattributes, skipping LFS snapshot", "upstream", upstream) - return nil + // git grep exits 1 for "no match" (legitimate "no LFS in this repo"); + // any other non-zero exit (invalid HEAD, repo corruption, command + // failure) is a real error that should propagate so we don't silently + // skip LFS snapshot generation for repos that actually use LFS. + var exitErr *exec.ExitError + if errors.As(err, &exitErr) && exitErr.ExitCode() == 1 { + s.metrics.recordLFSPhase(ctx, upstream, "discover", "skipped", time.Since(discoverStart)) + logger.DebugContext(ctx, "No LFS filter in any .gitattributes, skipping LFS snapshot", "upstream", upstream) + return nil + } + s.metrics.recordLFSPhase(ctx, upstream, "discover", "error", time.Since(discoverStart)) + return errors.Wrap(err, "git grep for LFS filter") } + s.metrics.recordLFSPhase(ctx, upstream, "discover", "success", time.Since(discoverStart)) start := time.Now() logger.InfoContext(ctx, "LFS snapshot generation started", "upstream", upstream) @@ -887,7 +899,12 @@ func (s *Strategy) generateAndUploadLFSSnapshot(ctx context.Context, repo *gitcl cacheKey := lfsSnapshotCacheKey(upstream) excludePatterns := []string{"*.lock"} + cloneStart := time.Now() + cloneRecorded := false if err := s.withSnapshotClone(ctx, repo, "lfs", func(workDir string) error { + s.metrics.recordLFSPhase(ctx, upstream, "clone", "success", time.Since(cloneStart)) + cloneRecorded = true + // Set up LFS in the snapshot clone. cloneForSnapshot already restores // remote.origin.url to the upstream URL, so LFS will fetch from GitHub. // #nosec G204 @@ -898,23 +915,43 @@ func (s *Strategy) generateAndUploadLFSSnapshot(ctx context.Context, repo *gitcl } // Fetch only the LFS objects referenced by HEAD (the default branch). + fetchStart := time.Now() fetchCmd, err := repo.GitCommand(ctx, "-C", workDir, "lfs", "fetch", "origin", "HEAD") if err != nil { + s.metrics.recordLFSPhase(ctx, upstream, "fetch", "error", time.Since(fetchStart)) return errors.Wrap(err, "create git lfs fetch command") } if output, err := fetchCmd.CombinedOutput(); err != nil { + s.metrics.recordLFSPhase(ctx, upstream, "fetch", "error", time.Since(fetchStart)) return errors.Wrapf(err, "git lfs fetch: %s", string(output)) } + s.metrics.recordLFSPhase(ctx, upstream, "fetch", "success", time.Since(fetchStart)) lfsDir := filepath.Join(workDir, ".git", "lfs") if _, err := os.Stat(lfsDir); os.IsNotExist(err) { logger.InfoContext(ctx, "No LFS objects in repository, skipping LFS snapshot", "upstream", upstream) return nil } + // Record .git/lfs size as a proxy for "bytes fetched". Best-effort: + // surface 0 on error so we don't fail the snapshot for a stat walk. + if size, walkErr := dirSizeBytes(lfsDir); walkErr == nil { + s.metrics.recordLFSPhaseBytes(ctx, upstream, "fetch", size) + } else { + logger.DebugContext(ctx, "Failed to size .git/lfs after fetch", "upstream", upstream, "error", walkErr) + } gitDir := filepath.Join(workDir, ".git") - return snapshot.CreatePaths(ctx, s.cache, cacheKey, gitDir, "lfs", []string{"lfs"}, 0, excludePatterns, s.config.ZstdThreads) + archiveStart := time.Now() + if err := snapshot.CreatePaths(ctx, s.cache, cacheKey, gitDir, "lfs", []string{"lfs"}, 0, excludePatterns, s.config.ZstdThreads); err != nil { + s.metrics.recordLFSPhase(ctx, upstream, "archive_upload", "error", time.Since(archiveStart)) + return err //nolint:wrapcheck // wrapped by caller + } + s.metrics.recordLFSPhase(ctx, upstream, "archive_upload", "success", time.Since(archiveStart)) + return nil }); err != nil { + if !cloneRecorded { + s.metrics.recordLFSPhase(ctx, upstream, "clone", "error", time.Since(cloneStart)) + } s.metrics.recordOperation(ctx, "lfs-snapshot", "error", time.Since(start)) return errors.Wrap(err, "create LFS snapshot") } @@ -924,6 +961,31 @@ func (s *Strategy) generateAndUploadLFSSnapshot(ctx context.Context, repo *gitcl return nil } +// dirSizeBytes returns the total size in bytes of regular files under root. +// Per-entry stat or walk errors are deliberately swallowed so a transient +// failure (e.g. a file removed mid-walk during snapshot prep) doesn't fail +// the surrounding snapshot operation; the returned sum is best-effort. +func dirSizeBytes(root string) (int64, error) { + var total int64 + err := filepath.WalkDir(root, func(_ string, d os.DirEntry, err error) error { + if err != nil { + return nil //nolint:nilerr // best-effort: skip unreadable entries + } + // Stat first so we don't drop files on filesystems where DirEntry.Type() + // reports "unknown" (e.g. some NFS/FUSE setups) and IsRegular() returns false. + info, infoErr := d.Info() + if infoErr != nil { + return nil //nolint:nilerr // best-effort: skip un-stat-able entries + } + if !info.Mode().IsRegular() { + return nil + } + total += info.Size() + return nil + }) + return total, errors.WithStack(err) +} + func (s *Strategy) handleLFSSnapshotRequest(w http.ResponseWriter, r *http.Request, host, pathValue string) { ctx := r.Context() logger := logging.FromContext(ctx)