Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion cmd/x402-verifier/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,13 @@ func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// File-sourced routes are populated synchronously by LoadConfig above,
// so they are "loaded" as soon as NewVerifier returns. The kube branch
// below flips this flag only after the first informer apply succeeds.
if *routeSource == "file" {
v.MarkRoutesLoaded()
}

if *watch {
switch *routeSource {
case "file":
Expand All @@ -76,7 +83,7 @@ func main() {
log.Fatalf("load kube route source config: %v", err)
}
go func() {
if err := x402verifier.WatchServiceOffers(ctx, kubeCfg, accumulator.SetRoutes); err != nil {
if err := x402verifier.WatchServiceOffers(ctx, kubeCfg, accumulator.SetRoutes, v.MarkRoutesLoaded); err != nil {
log.Printf("x402-serviceoffer-source: stopped: %v", err)
}
}()
Expand Down
18 changes: 13 additions & 5 deletions internal/x402/serviceoffer_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,12 @@ import (
"k8s.io/client-go/tools/cache"
)

func WatchServiceOffers(ctx context.Context, cfg *rest.Config, apply func([]RouteRule) error) error {
// WatchServiceOffers runs the ServiceOffer + litellm-secrets informers and
// pushes rendered RouteRules to apply on every change. The optional
// onFirstApply callback is invoked exactly once after the post-cache-sync
// refresh succeeds; it is the signal that the route source has produced its
// first usable snapshot. Pass nil to skip.
func WatchServiceOffers(ctx context.Context, cfg *rest.Config, apply func([]RouteRule) error, onFirstApply func()) error {
client, err := dynamic.NewForConfig(cfg)
if err != nil {
return fmt.Errorf("create dynamic client: %w", err)
Expand All @@ -33,17 +38,18 @@ func WatchServiceOffers(ctx context.Context, cfg *rest.Config, apply func([]Rout
offers := offerFactory.ForResource(monetizeapi.ServiceOfferGVR).Informer()
secrets := secretFactory.ForResource(monetizeapi.SecretGVR).Informer()

refresh := func() {
refresh := func() (ok bool) {
routes, err := routesFromStore(offers.GetStore().List(), secrets.GetStore().List())
if err != nil {
log.Printf("x402-serviceoffer-source: render routes: %v", err)
return
return false
}
if err := apply(routes); err != nil {
log.Printf("x402-serviceoffer-source: apply routes: %v", err)
return
return false
}
log.Printf("x402-serviceoffer-source: routes reloaded (%d routes)", len(routes))
return true
}

handler := cache.ResourceEventHandlerFuncs{
Expand All @@ -60,7 +66,9 @@ func WatchServiceOffers(ctx context.Context, cfg *rest.Config, apply func([]Rout
return fmt.Errorf("wait for serviceoffer informer sync")
}

refresh()
if refresh() && onFirstApply != nil {
onFirstApply()
}
<-ctx.Done()
return nil
}
Expand Down
23 changes: 21 additions & 2 deletions internal/x402/verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,19 @@ type Verifier struct {
chain atomic.Pointer[ChainInfo]
chains atomic.Pointer[map[string]ChainInfo] // pre-resolved: chain name → config
metrics *verifierMetrics

// routesLoaded is set true after the first route source apply completes.
// Until then HandleReadyz returns 503 so kubelet keeps the pod out of
// the Service Endpoints, preventing the "no rule -> 200 free pass"
// window during informer warmup (CLAUDE.md pitfall #14).
routesLoaded atomic.Bool
}

// MarkRoutesLoaded signals that the route source has produced its first
// non-error apply. Idempotent. After this, HandleReadyz returns 200
// once config is also loaded.
func (v *Verifier) MarkRoutesLoaded() { v.routesLoaded.Store(true) }

// NewVerifier creates a Verifier with the given initial configuration.
func NewVerifier(cfg *PricingConfig) (*Verifier, error) {
v := &Verifier{metrics: newVerifierMetrics()}
Expand Down Expand Up @@ -224,10 +235,18 @@ func (v *Verifier) HandleHealthz(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, `{"status":"ok"}`)
}

// HandleReadyz returns 200 OK if pricing config is loaded, 503 otherwise.
// HandleReadyz returns 200 OK once BOTH pricing config and the first route
// source apply have completed. Until then it returns 503 with a cause-specific
// body so kubelet keeps the pod out of Service Endpoints, preventing the
// "no rule -> 200 free pass" window during informer warmup
// (CLAUDE.md pitfall #14).
func (v *Verifier) HandleReadyz(w http.ResponseWriter, r *http.Request) {
if v.config.Load() == nil {
http.Error(w, "not ready", http.StatusServiceUnavailable)
http.Error(w, "not ready: config not loaded", http.StatusServiceUnavailable)
return
}
if !v.routesLoaded.Load() {
http.Error(w, "not ready: routes not loaded", http.StatusServiceUnavailable)
return
}

Expand Down
52 changes: 52 additions & 0 deletions internal/x402/verifier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ func testPaymentHeaderFor(t *testing.T, payTo, amount string) string {
}

// newTestVerifier creates a Verifier backed by the given facilitator URL.
// It also marks routes as loaded so /readyz returns 200 immediately, which
// matches what the production wire-up does once the route source warms up.
func newTestVerifier(t *testing.T, facilitatorURL string, routes []RouteRule) *Verifier {
t.Helper()
v, err := NewVerifier(&PricingConfig{
Expand All @@ -112,6 +114,7 @@ func newTestVerifier(t *testing.T, facilitatorURL string, routes []RouteRule) *V
if err != nil {
t.Fatalf("NewVerifier: %v", err)
}
v.MarkRoutesLoaded()
return v
}

Expand Down Expand Up @@ -488,6 +491,55 @@ func TestVerifier_ReadyzNotReady(t *testing.T) {
if w.Code != http.StatusServiceUnavailable {
t.Errorf("expected 503 when config is nil, got %d", w.Code)
}
if got := w.Body.String(); !strings.Contains(got, "config not loaded") {
t.Errorf("expected body to mention %q, got %q", "config not loaded", got)
}
}

// TestVerifier_Readyz_BlocksUntilRoutesLoaded asserts the fix for
// CLAUDE.md pitfall #14: /readyz must return 503 between "config loaded"
// and "first route source apply completed" so kubelet keeps the pod out
// of the Service Endpoints during informer warm-up.
func TestVerifier_Readyz_BlocksUntilRoutesLoaded(t *testing.T) {
v, err := NewVerifier(&PricingConfig{
Wallet: "0xdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef",
Chain: "base-sepolia",
FacilitatorURL: "http://example.invalid",
})
if err != nil {
t.Fatalf("NewVerifier: %v", err)
}

// Config is loaded by NewVerifier, but routes have NOT been marked
// loaded yet — /readyz must still 503 with a routes-specific message
// so kubectl describe pod surfaces the actual cause.
req := httptest.NewRequest(http.MethodGet, "/readyz", nil)
w := httptest.NewRecorder()
v.HandleReadyz(w, req)

if w.Code != http.StatusServiceUnavailable {
t.Fatalf("expected 503 before routes loaded, got %d", w.Code)
}
if got := w.Body.String(); !strings.Contains(got, "routes not loaded") {
t.Errorf("expected body to mention %q, got %q", "routes not loaded", got)
}

// After the route source signals first apply, /readyz flips to 200.
v.MarkRoutesLoaded()

w = httptest.NewRecorder()
v.HandleReadyz(w, req)
if w.Code != http.StatusOK {
t.Fatalf("expected 200 after MarkRoutesLoaded, got %d (body=%q)", w.Code, w.Body.String())
}

// MarkRoutesLoaded is idempotent — calling it again must not regress.
v.MarkRoutesLoaded()
w = httptest.NewRecorder()
v.HandleReadyz(w, req)
if w.Code != http.StatusOK {
t.Fatalf("expected 200 after second MarkRoutesLoaded, got %d", w.Code)
}
}

// ── Per-route PayTo / Network override tests ─────────────────────────────────
Expand Down