diff --git a/cmd/obol/sell.go b/cmd/obol/sell.go index 379fb2d9..b06508f7 100644 --- a/cmd/obol/sell.go +++ b/cmd/obol/sell.go @@ -2331,8 +2331,25 @@ Examples: func sellStopCommand(cfg *config.Config) *cli.Command { return &cli.Command{ Name: "stop", - Usage: "Pause a ServiceOffer without deleting it", + Usage: "Drain a ServiceOffer gracefully (advertises wind-down via discovery, then tears down the route)", ArgsUsage: "", + Description: `Marks a ServiceOffer as draining. While draining: + - The offer stays in /skill.md and /.well-known/agent-registration.json + with available=false and a drainEndsAt timestamp, so external + discovery (and ERC-8004 reputation scorers) can see the wind-down. + - The HTTPRoute and x402 payment gate STAY UP for the grace period + so buyers can complete in-flight payments. + - When the grace period elapses, the controller tears down the route + and marks PaymentGateReady/RoutePublished False with reason=Drained. + +The ServiceOffer CR itself is preserved — use 'obol sell delete' to +remove it entirely (which also tombstones the ERC-8004 record). + +Flags: + --grace 30m Override the grace period (default 1h). + --force Skip the drain window (equivalent to --grace 0). Use + this when the abrupt-teardown behavior of the old + pause annotation is required for behavior parity.`, Flags: []cli.Flag{ &cli.StringFlag{ Name: "namespace", @@ -2340,6 +2357,16 @@ func sellStopCommand(cfg *config.Config) *cli.Command { Usage: "Namespace of the ServiceOffer", Required: true, }, + &cli.DurationFlag{ + Name: "grace", + Usage: "Drain grace period (e.g. 30m, 2h). Defaults to 1h.", + Value: monetizeapi.DefaultDrainGracePeriod, + }, + &cli.BoolFlag{ + Name: "force", + Aliases: []string{"now"}, + Usage: "Skip the drain window and tear the route down on the next reconcile (alias: --now)", + }, }, Action: func(ctx context.Context, cmd *cli.Command) error { u := getUI(cmd) @@ -2352,19 +2379,37 @@ func sellStopCommand(cfg *config.Config) *cli.Command { return err } ns := cmd.String("namespace") + grace := cmd.Duration("grace") + if cmd.Bool("force") { + grace = 0 + } + if grace < 0 { + return errors.New("--grace must be >= 0") + } - u.Infof("Stopping the service offering %s/%s...", ns, name) - - removePricingRoute(cfg, u, name) - - patchJSON := `{"status":{"conditions":[{"type":"Ready","status":"False","reason":"Stopped","message":"Offer stopped by user"}]}}` - err := kubectlRun(cfg, "patch", "serviceoffers.obol.org", name, "-n", ns, - "--type=merge", "-p", patchJSON) - if err != nil { - return fmt.Errorf("failed to pause serviceoffer: %w", err) + now := time.Now().UTC() + drainEndsAt := now.Add(grace) + + // metav1.Duration JSON-marshals as the string form (e.g. + // "1h0m0s"), and metav1.Time marshals as RFC3339. We can + // emit a tiny strategic-merge patch directly without + // importing the meta types into the CLI. + patchJSON := fmt.Sprintf( + `{"spec":{"drainAt":%q,"drainGracePeriod":%q}}`, + now.Format(time.RFC3339), + grace.String(), + ) + if err := kubectlRun(cfg, "patch", "serviceoffers.obol.org", name, "-n", ns, + "--type=merge", "-p", patchJSON); err != nil { + return fmt.Errorf("failed to drain serviceoffer: %w", err) } - u.Successf("Service offering %s/%s stopped.", ns, name) + if grace == 0 { + u.Successf("ServiceOffer %s/%s draining; route will be removed on the next reconcile (--force).", ns, name) + } else { + u.Successf("ServiceOffer %s/%s draining; route will be removed at %s.", ns, name, drainEndsAt.Format(time.RFC3339)) + } + u.Infof("In-flight buyers can complete payments until then. Run `obol sell delete %s -n %s` to fully remove.", name, ns) return nil }, } @@ -2518,8 +2563,6 @@ func sellDeleteCommand(cfg *config.Config) *cli.Command { } } - removePricingRoute(cfg, u, name) - // Identity-level registration ownership lives in the AgentIdentity // CR and is managed by the controller. The CLI no longer patches // the registration ConfigMap here; deleting the ServiceOffer is @@ -4126,7 +4169,3 @@ func manifestNSName(manifest map[string]any) (string, string) { return ns, name } -// removePricingRoute is a no-op retained for compatibility. -// The serviceoffer-controller now manages pricing routes via the ServiceOffer -// informer; static ConfigMap routes are no longer used. -func removePricingRoute(_ *config.Config, _ *ui.UI, _ string) {} diff --git a/cmd/obol/sell_test.go b/cmd/obol/sell_test.go index 9f3991ea..4055fa2c 100644 --- a/cmd/obol/sell_test.go +++ b/cmd/obol/sell_test.go @@ -626,9 +626,20 @@ func TestSellStop_Structure(t *testing.T) { stop := findSubcommand(t, cmd, "stop") flags := flagMap(stop) - requireFlags(t, flags, "namespace") + requireFlags(t, flags, "namespace", "grace", "force") assertFlagRequired(t, flags, "namespace") assertFlagHasAlias(t, flags, "namespace", "n") + // --now is the documented alias for --force; if it disappears, + // scripted operators that rely on it break silently. + assertFlagHasAlias(t, flags, "force", "now") + + graceFlag, ok := flags["grace"].(*cli.DurationFlag) + if !ok { + t.Fatalf("--grace should be *cli.DurationFlag, got %T", flags["grace"]) + } + if graceFlag.Value != monetizeapi.DefaultDrainGracePeriod { + t.Errorf("--grace default = %v, want %v", graceFlag.Value, monetizeapi.DefaultDrainGracePeriod) + } } func TestSellDelete_Structure(t *testing.T) { diff --git a/docs/guides/monetize-inference.md b/docs/guides/monetize-inference.md index 2fdf3680..775ba812 100644 --- a/docs/guides/monetize-inference.md +++ b/docs/guides/monetize-inference.md @@ -572,15 +572,34 @@ obol sell status my-qwen --namespace llm obol sell status ``` -### Pausing +### Draining -Pause an offer without deleting it: +Stop an offer gracefully so buyers can wind down before the route disappears: ```bash -obol sell stop my-qwen --namespace llm +obol sell stop my-qwen --namespace llm # default: 1h grace +obol sell stop my-qwen --namespace llm --grace 30m # custom grace +obol sell stop my-qwen --namespace llm --force # tear down immediately ``` -The CR and any ERC-8004 registration remain intact. Re-create the offer with the same name to restart. +`obol sell stop` sets `spec.drainAt` on the ServiceOffer. While the offer is +draining: + +- `/skill.md` and `/.well-known/agent-registration.json` advertise the offer + with `available: false` and `drainEndsAt: `, so external discovery + (and ERC-8004 reputation scorers) can react before traffic disappears. +- The HTTPRoute and x402 payment gate stay up so in-flight buyers can complete + payments. +- When the grace period elapses, the controller tears down the route and marks + `Draining=False` reason=Drained. + +The ServiceOffer CR and any ERC-8004 registration remain intact. Use +`obol sell delete` to remove the offer entirely. + +`--force` (alias: `--now`) skips the drain window — useful when you want the +abrupt-teardown behavior of the legacy `obol.org/paused` annotation, for +example to reclaim the path immediately. Note that abrupt teardown is a worse +reputation signal for on-chain buyers than a graceful drain. ### Cleanup @@ -815,7 +834,7 @@ manifest. Do not paper over smoke-test failures with an ad hoc patch. | `obol sell http --wallet ... --chain ... --per-request ... --upstream ... --port ...` | Create a ServiceOffer and register by default | | `obol sell list` | List all ServiceOffers | | `obol sell status -n ` | Show conditions for an offer | -| `obol sell stop -n ` | Pause an offer without deleting it | +| `obol sell stop -n [--grace 1h] [--force]` | Drain an offer (advertise wind-down via discovery, then tear down the route after the grace period). `--force`/`--now` skips the grace window. | | `obol sell delete -n ` | Delete an offer and cleanup | | `obol sell status` | Show cluster pricing and registration | | `obol sell register --private-key-file ...` | Advanced/manual registration or repair path | diff --git a/internal/embed/infrastructure/base/templates/serviceoffer-crd.yaml b/internal/embed/infrastructure/base/templates/serviceoffer-crd.yaml index 5b37ba23..86f2a446 100644 --- a/internal/embed/infrastructure/base/templates/serviceoffer-crd.yaml +++ b/internal/embed/infrastructure/base/templates/serviceoffer-crd.yaml @@ -239,6 +239,24 @@ spec: type: string description: "URL path prefix for the HTTPRoute, defaults to /services/." pattern: "^/[a-zA-Z0-9/_.-]*$" + drainAt: + type: string + format: date-time + description: >- + When set, marks the offer as draining. Discovery surfaces + (/skill.md and /.well-known/agent-registration.json) advertise + the offer with available=false and drainEndsAt set, so external + observers can react before the route is removed. The HTTPRoute + and payment gate stay up until drainAt + drainGracePeriod so + in-flight buyers can settle. Set by `obol sell stop`. + drainGracePeriod: + type: string + description: >- + How long after drainAt the HTTPRoute remains up. Go duration + format (e.g. "1h", "30m", "0s"). Defaults to "1h" when unset. + A zero duration tears the route down on the next reconcile + (the `obol sell stop --force` path). + pattern: "^([0-9]+(ns|us|µs|ms|s|m|h))+$" registration: type: object description: >- @@ -312,7 +330,9 @@ spec: type: array description: >- Condition types: ModelReady, UpstreamHealthy, PaymentGateReady, - RoutePublished, Registered, Ready. + RoutePublished, Registered, Ready, Draining. Draining is True + while spec.drainAt is set and the grace window has not elapsed; + transitions to False reason=Drained once the route is torn down. items: type: object required: diff --git a/internal/embed/skills/sell/references/serviceoffer-spec.md b/internal/embed/skills/sell/references/serviceoffer-spec.md index a2b6183e..5ead12d7 100644 --- a/internal/embed/skills/sell/references/serviceoffer-spec.md +++ b/internal/embed/skills/sell/references/serviceoffer-spec.md @@ -180,7 +180,13 @@ Each condition contains: ## Lifecycle Notes -- Pausing is represented via the `obol.org/paused: "true"` annotation. +- Graceful stop is represented via `spec.drainAt` (RFC3339 timestamp) and + the optional `spec.drainGracePeriod` (Go duration, e.g. `"30m"`, defaults + to `1h`). While draining, discovery surfaces advertise the offer with + `available: false` + `drainEndsAt`, and the HTTPRoute/payment gate stay + up until the grace period expires so in-flight buyers can settle. + `obol sell stop --force` is the equivalent of `drainGracePeriod: 0s` — + abrupt teardown with no advertised wind-down. - Deleting a `ServiceOffer` cascades owned `Middleware` and `HTTPRoute` resources via `ownerReferences`. - Registration side effects are isolated in a child `RegistrationRequest` diff --git a/internal/monetizeapi/drain_test.go b/internal/monetizeapi/drain_test.go new file mode 100644 index 00000000..0241a49a --- /dev/null +++ b/internal/monetizeapi/drain_test.go @@ -0,0 +1,113 @@ +package monetizeapi + +import ( + "testing" + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func TestServiceOffer_IsDraining(t *testing.T) { + t.Run("nil drainAt", func(t *testing.T) { + o := &ServiceOffer{} + if o.IsDraining() { + t.Errorf("IsDraining() = true, want false for nil drainAt") + } + }) + t.Run("set drainAt", func(t *testing.T) { + now := metav1.Now() + o := &ServiceOffer{Spec: ServiceOfferSpec{DrainAt: &now}} + if !o.IsDraining() { + t.Errorf("IsDraining() = false, want true for non-nil drainAt") + } + }) +} + +func TestServiceOffer_DrainEndsAt(t *testing.T) { + base := time.Date(2026, time.May, 1, 12, 0, 0, 0, time.UTC) + baseMeta := metav1.NewTime(base) + + cases := []struct { + name string + drain *metav1.Time + grace *metav1.Duration + want time.Time + }{ + { + name: "nil drainAt returns zero", + drain: nil, + grace: nil, + want: time.Time{}, + }, + { + name: "nil grace applies default 1h", + drain: &baseMeta, + grace: nil, + want: base.Add(time.Hour), + }, + { + name: "explicit zero grace honored", + drain: &baseMeta, + grace: &metav1.Duration{Duration: 0}, + want: base, + }, + { + name: "custom grace honored", + drain: &baseMeta, + grace: &metav1.Duration{Duration: 30 * time.Minute}, + want: base.Add(30 * time.Minute), + }, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + o := &ServiceOffer{Spec: ServiceOfferSpec{DrainAt: tc.drain, DrainGracePeriod: tc.grace}} + if got := o.DrainEndsAt(); !got.Equal(tc.want) { + t.Errorf("DrainEndsAt() = %v, want %v", got, tc.want) + } + }) + } +} + +func TestServiceOffer_DrainExpired(t *testing.T) { + now := time.Date(2026, time.May, 1, 12, 0, 0, 0, time.UTC) + + t.Run("not draining returns false", func(t *testing.T) { + o := &ServiceOffer{} + if o.DrainExpired(now) { + t.Errorf("DrainExpired() = true, want false for non-draining offer") + } + }) + + t.Run("mid-drain returns false", func(t *testing.T) { + drainAt := metav1.NewTime(now.Add(-10 * time.Minute)) + o := &ServiceOffer{Spec: ServiceOfferSpec{ + DrainAt: &drainAt, + DrainGracePeriod: &metav1.Duration{Duration: time.Hour}, + }} + if o.DrainExpired(now) { + t.Errorf("DrainExpired() = true, want false for mid-drain offer") + } + }) + + t.Run("expired returns true", func(t *testing.T) { + drainAt := metav1.NewTime(now.Add(-2 * time.Hour)) + o := &ServiceOffer{Spec: ServiceOfferSpec{ + DrainAt: &drainAt, + DrainGracePeriod: &metav1.Duration{Duration: time.Hour}, + }} + if !o.DrainExpired(now) { + t.Errorf("DrainExpired() = false, want true for expired drain") + } + }) + + t.Run("force path zero grace tears down on next reconcile", func(t *testing.T) { + drainAt := metav1.NewTime(now) + o := &ServiceOffer{Spec: ServiceOfferSpec{ + DrainAt: &drainAt, + DrainGracePeriod: &metav1.Duration{Duration: 0}, + }} + if !o.DrainExpired(now) { + t.Errorf("DrainExpired() = false at now == drainAt with zero grace, want true") + } + }) +} diff --git a/internal/monetizeapi/types.go b/internal/monetizeapi/types.go index 6e905eee..30f0ca0d 100644 --- a/internal/monetizeapi/types.go +++ b/internal/monetizeapi/types.go @@ -3,11 +3,18 @@ package monetizeapi import ( "fmt" "strings" + "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" ) +// DefaultDrainGracePeriod is the grace period applied to a draining +// ServiceOffer when spec.drainGracePeriod is unset. Buyers using the +// offer can complete in-flight payments and migrate to alternative +// providers within this window before the HTTPRoute is torn down. +const DefaultDrainGracePeriod = time.Hour + const ( Group = "obol.org" Version = "v1alpha1" @@ -29,8 +36,6 @@ const ( AgentIdentityDefaultNamespace = "x402" AgentIdentityDefaultName = "default" - PausedAnnotation = "obol.org/paused" - AgentRuntimeHermes = "hermes" AgentPhasePending = "Pending" @@ -77,6 +82,21 @@ type ServiceOfferSpec struct { Path string `json:"path,omitempty"` Provenance map[string]string `json:"provenance,omitempty"` Registration ServiceOfferRegistration `json:"registration,omitempty"` + + // DrainAt marks the offer as draining when non-nil. While the offer + // is in the drain window, discovery surfaces (/skill.md and + // /.well-known/agent-registration.json) advertise the offer with + // available=false and drainEndsAt set, so buyers can migrate before + // the route is torn down. The route + payment gate stay up until + // DrainEndsAt() so in-flight payments can complete. Replaces the + // legacy obol.org/paused annotation. + DrainAt *metav1.Time `json:"drainAt,omitempty"` + + // DrainGracePeriod is how long after DrainAt the HTTPRoute remains + // up. Defaults to DefaultDrainGracePeriod when nil. A zero duration + // is honored as "tear down immediately on the next reconcile" (the + // equivalent of `obol sell stop --force`). + DrainGracePeriod *metav1.Duration `json:"drainGracePeriod,omitempty"` } // ServiceOfferAgent is populated when Spec.Type == "agent". The controller @@ -241,8 +261,41 @@ func (o *ServiceOffer) IsAgent() bool { return o.Spec.Type == "agent" } -func (o *ServiceOffer) IsPaused() bool { - return o.Annotations != nil && o.Annotations[PausedAnnotation] == "true" +// IsDraining reports whether spec.drainAt has been set. Drained offers +// transition through three phases: pre-drain (DrainAt nil), draining +// (DrainAt set, now < DrainEndsAt), and drain-expired (DrainAt set, +// now >= DrainEndsAt). The controller keeps the route up during +// "draining" and tears it down once "drain-expired" is reached. +func (o *ServiceOffer) IsDraining() bool { + return o.Spec.DrainAt != nil +} + +// DrainEndsAt returns DrainAt + DrainGracePeriod. When DrainAt is nil +// the zero time is returned (caller should gate on IsDraining first). +// When DrainGracePeriod is nil the default grace period is applied; a +// zero grace period is honored as "drain ends at DrainAt", i.e. tear +// down on the next reconcile (the --force/--now path). +func (o *ServiceOffer) DrainEndsAt() time.Time { + if o.Spec.DrainAt == nil { + return time.Time{} + } + grace := DefaultDrainGracePeriod + if o.Spec.DrainGracePeriod != nil { + grace = o.Spec.DrainGracePeriod.Duration + } + return o.Spec.DrainAt.Time.Add(grace) +} + +// DrainExpired reports whether the drain grace period has elapsed. +// Returns false when the offer is not draining at all. Callers should +// use this rather than IsDraining when deciding whether to tear down +// the HTTPRoute or filter the offer from the live x402 verifier rules. +func (o *ServiceOffer) DrainExpired(now time.Time) bool { + if !o.IsDraining() { + return false + } + end := o.DrainEndsAt() + return !now.Before(end) } // ── PurchaseRequest ───────────────────────────────────────────────────────── diff --git a/internal/schemas/service-catalog.schema.json b/internal/schemas/service-catalog.schema.json index 6f7578ba..95e79077 100644 --- a/internal/schemas/service-catalog.schema.json +++ b/internal/schemas/service-catalog.schema.json @@ -149,6 +149,14 @@ }, "isDemo": { "type": "boolean" + }, + "registrationPending": { + "type": "boolean" + }, + "drainEndsAt": { + "type": "string", + "format": "date-time", + "description": "RFC3339 timestamp at which the offer's HTTPRoute will be torn down. Presence = draining." } } } diff --git a/internal/schemas/service_catalog.go b/internal/schemas/service_catalog.go index e53085b6..525b0047 100644 --- a/internal/schemas/service_catalog.go +++ b/internal/schemas/service_catalog.go @@ -39,6 +39,15 @@ type ServiceCatalogEntry struct { // know the offer is usable for x402 payments today, even though // ERC-8004 discovery via the chain still resolves to the prior state. RegistrationPending bool `json:"registrationPending,omitempty"` + + // DrainEndsAt is the RFC3339 timestamp at which the offer's + // HTTPRoute will be removed. Set ONLY while the offer is draining; + // absent (empty) on active offers. Presence of this field IS the + // draining signal — catalog consumers detect drain with + // `if (entry.drainEndsAt) { /* draining, migrate before this time */ }`. + // Drain is purely additive: active offers serialize identically to + // pre-drain releases (no `available` field). + DrainEndsAt string `json:"drainEndsAt,omitempty"` } // ServiceCatalogAsset describes the settlement token resolved for a catalog diff --git a/internal/serviceoffercontroller/controller.go b/internal/serviceoffercontroller/controller.go index fac586b0..7cbd759b 100644 --- a/internal/serviceoffercontroller/controller.go +++ b/internal/serviceoffercontroller/controller.go @@ -296,7 +296,7 @@ func (c *Controller) enqueueOfferFromRegistration(obj any) { log.Printf("serviceoffer-controller: decode offer for registration fan-out: %v", err) continue } - if offer.DeletionTimestamp != nil || offer.IsPaused() || !offer.Spec.Registration.Enabled { + if offer.DeletionTimestamp != nil || !offer.Spec.Registration.Enabled { continue } c.offerQueue.Add(offer.Namespace + "/" + offer.Name) @@ -455,13 +455,49 @@ func (c *Controller) reconcileOffer(ctx context.Context, key string) error { return err } - if offer.IsPaused() { - if err := c.deleteRouteChildren(ctx, offer); err != nil { - return err + if offer.IsDraining() { + now := time.Now() + drainEndsAt := offer.DrainEndsAt() + if offer.DrainExpired(now) { + // Drain grace period elapsed: tear down the HTTPRoute + + // payment gate. The CR itself stays (delete is the canonical + // removal path) so external observers continue to see the + // offer in the catalog with available=false. + if err := c.deleteRouteChildren(ctx, offer); err != nil { + return err + } + setCondition(&status, "Draining", "False", "Drained", fmt.Sprintf("Drain ended at %s; route torn down", drainEndsAt.UTC().Format(time.RFC3339))) + setCondition(&status, "PaymentGateReady", "False", "Drained", "Offer drained; payment gate removed") + setCondition(&status, "RoutePublished", "False", "Drained", "Offer drained; route removed") + } else { + // Still in the drain window: keep the route + payment gate + // up so in-flight buyers can finish, but mark Draining=True + // so discovery surfaces can advertise available=false. + if upstreamHealthy && isConditionTrue(status, "ModelReady") { + if err := c.reconcilePaymentGate(ctx, &status, offer); err != nil { + return err + } + if isConditionTrue(status, "PaymentGateReady") { + if err := c.reconcileRoute(ctx, &status, offer); err != nil { + return err + } + } + } else { + setCondition(&status, "PaymentGateReady", "False", "WaitingForUpstream", "Waiting for upstream health before publishing payment gate") + setCondition(&status, "RoutePublished", "False", "WaitingForPaymentGate", "Waiting for payment gate before publishing route") + } + setCondition(&status, "Draining", "True", "Draining", fmt.Sprintf("Drain ends at %s", drainEndsAt.UTC().Format(time.RFC3339))) + // Requeue at the drain expiry so the route is torn down on + // time even without any spec change in the interim. Add a + // small slack so the comparison in DrainExpired clears. + if delay := time.Until(drainEndsAt) + time.Second; delay > 0 { + c.offerQueue.AddAfter(offer.Namespace+"/"+offer.Name, delay) + } else { + c.offerQueue.Add(offer.Namespace + "/" + offer.Name) + } } - setCondition(&status, "PaymentGateReady", "False", "Paused", "Offer is paused") - setCondition(&status, "RoutePublished", "False", "Paused", "Offer is paused") } else if upstreamHealthy && isConditionTrue(status, "ModelReady") { + setCondition(&status, "Draining", "False", "Active", "Offer is active") if err := c.reconcilePaymentGate(ctx, &status, offer); err != nil { return err } @@ -471,6 +507,7 @@ func (c *Controller) reconcileOffer(ctx context.Context, key string) error { } } } else { + setCondition(&status, "Draining", "False", "Active", "Offer is active") setCondition(&status, "PaymentGateReady", "False", "WaitingForUpstream", "Waiting for upstream health before publishing payment gate") setCondition(&status, "RoutePublished", "False", "WaitingForPaymentGate", "Waiting for payment gate before publishing route") } @@ -1114,7 +1151,7 @@ func (c *Controller) reconcileSkillCatalog(ctx context.Context, override *moneti } readyOffers := 0 for _, offer := range offers { - if offer != nil && offer.DeletionTimestamp == nil && !offer.IsPaused() && isConditionTrue(offer.Status, "Ready") { + if offer != nil && offer.DeletionTimestamp == nil && isConditionTrue(offer.Status, "Ready") { readyOffers++ } } diff --git a/internal/serviceoffercontroller/identity_controller.go b/internal/serviceoffercontroller/identity_controller.go index 030bf013..92fcd03e 100644 --- a/internal/serviceoffercontroller/identity_controller.go +++ b/internal/serviceoffercontroller/identity_controller.go @@ -386,7 +386,10 @@ func (c *Controller) registrationOffersForIdentity(key agentIdentityKey, exclude if offer.Namespace == excludeNamespace && offer.Name == excludeName { continue } - if offer.DeletionTimestamp != nil || offer.IsPaused() || !offer.Spec.Registration.Enabled { + // Draining offers stay in the registration candidate list so + // the registration document continues to advertise them with + // available=false until the drain grace period expires. + if offer.DeletionTimestamp != nil || !offer.Spec.Registration.Enabled { continue } if !isConditionTrue(offer.Status, "UpstreamHealthy") { diff --git a/internal/serviceoffercontroller/render.go b/internal/serviceoffercontroller/render.go index a733213b..aaf18bf4 100644 --- a/internal/serviceoffercontroller/render.go +++ b/internal/serviceoffercontroller/render.go @@ -712,8 +712,17 @@ func buildRegistrationServices(owner *monetizeapi.ServiceOffer, offers []*moneti return services } +// offerPublishedForRegistration reports whether an offer should appear +// in the operator's ERC-8004 registration document as a live, gated +// service. Draining offers stay in the document with available=false +// so external observers can see the wind-down — this function filters +// them out only after the drain window has fully expired (i.e. the +// HTTPRoute is gone and there is no payment surface to advertise). func offerPublishedForRegistration(offer *monetizeapi.ServiceOffer) bool { - if offer == nil || offer.DeletionTimestamp != nil || offer.IsPaused() || !offer.Spec.Registration.Enabled { + if offer == nil || offer.DeletionTimestamp != nil || !offer.Spec.Registration.Enabled { + return false + } + if offer.DrainExpired(time.Now()) { return false } return isConditionTrue(offer.Status, "ModelReady") && @@ -731,9 +740,18 @@ func buildSkillCatalogMarkdown(offers []*monetizeapi.ServiceOffer, baseURL strin // both /skill.md and /api/services.json, with the on-chain ERC-8004 // registration treated as informational metadata rather than a gating // signal. See offerOperationallyReady's doc comment for the rationale. + now := time.Now() var ready []*monetizeapi.ServiceOffer for _, offer := range offers { - if offer == nil || offer.DeletionTimestamp != nil || offer.IsPaused() { + if offer == nil || offer.DeletionTimestamp != nil { + continue + } + // Drained offers (post-grace-period) have no live route — drop + // them from the catalog entirely. Draining offers (pre-expiry) + // stay in the catalog with available=false + drainEndsAt set so + // buyers can see the wind-down via discovery before the route + // disappears. + if offer.DrainExpired(now) { continue } if offerOperationallyReady(offer) { @@ -762,20 +780,28 @@ func buildSkillCatalogMarkdown(offers []*monetizeapi.ServiceOffer, baseURL strin } lines = append(lines, "## Services", "") - lines = append(lines, "| Service | Type | Model | Price | Endpoint |") - lines = append(lines, "|---------|------|-------|-------|----------|") + lines = append(lines, "| Service | Type | Model | Price | Status | Endpoint |") + lines = append(lines, "|---------|------|-------|-------|--------|----------|") for _, offer := range ready { modelName := offer.Spec.Model.Name if modelName == "" { modelName = "—" } + // Status column: empty (rendered as "—") for active offers so + // the catalog visually matches the pre-drain layout, "draining" + // only for offers in their grace window. + status := "—" + if offer.IsDraining() { + status = fmt.Sprintf("draining · ends %s", offer.DrainEndsAt().UTC().Format(time.RFC3339)) + } lines = append(lines, fmt.Sprintf( - "| [%s](#%s) | %s | %s | %s | `%s%s` |", + "| [%s](#%s) | %s | %s | %s | %s | `%s%s` |", offer.Name, offer.Name, fallbackOfferType(offer), modelName, describeOfferPrice(offer), + status, baseURL, offer.EffectivePath(), )) @@ -792,6 +818,13 @@ func buildSkillCatalogMarkdown(offers []*monetizeapi.ServiceOffer, baseURL strin lines = append(lines, fmt.Sprintf("- **Price**: %s", describeOfferPrice(offer))) lines = append(lines, fmt.Sprintf("- **Pay To**: `%s`", firstNonEmpty(offer.Spec.Payment.PayTo, "—"))) lines = append(lines, fmt.Sprintf("- **Network**: %s", firstNonEmpty(offer.Spec.Payment.Network, "—"))) + // Drain is purely additive: emit nothing on active offers so + // the detail block matches the pre-drain layout. Only draining + // offers get a Drain ends at bullet — its presence IS the + // signal. + if offer.IsDraining() { + lines = append(lines, fmt.Sprintf("- **Drain ends at**: %s", offer.DrainEndsAt().UTC().Format(time.RFC3339))) + } description := offer.Spec.Registration.Description if description == "" { description = fmt.Sprintf("x402 payment-gated %s service", fallbackOfferType(offer)) @@ -868,9 +901,18 @@ func offerAwaitingRegistration(offer *monetizeapi.ServiceOffer) bool { func buildServiceCatalogJSON(offers []*monetizeapi.ServiceOffer, baseURL string) string { baseURL = strings.TrimRight(baseURL, "/") + now := time.Now() var ready []*monetizeapi.ServiceOffer for _, offer := range offers { - if offer == nil || offer.DeletionTimestamp != nil || offer.IsPaused() { + if offer == nil || offer.DeletionTimestamp != nil { + continue + } + // Drained offers (post-grace-period) have no live route — drop + // them from the catalog entirely. Draining offers (pre-expiry) + // stay in the catalog with drainEndsAt set so buyers can react + // before the route disappears. Presence of drainEndsAt IS the + // drain signal; no separate "available" boolean. + if offer.DrainExpired(now) { continue } if offerOperationallyReady(offer) { @@ -895,6 +937,14 @@ func buildServiceCatalogJSON(offers []*monetizeapi.ServiceOffer, baseURL string) modelName = offer.Status.AgentResolution.Model } + // Drain is purely additive: only set drainEndsAt on draining + // offers. Active offers serialize identically to pre-drain + // releases. Consumers detect drain with `if (entry.drainEndsAt)`. + drainEndsAt := "" + if offer.IsDraining() { + drainEndsAt = offer.DrainEndsAt().UTC().Format(time.RFC3339) + } + svc := schemas.ServiceCatalogEntry{ Name: offer.Name, Namespace: offer.Namespace, @@ -907,6 +957,7 @@ func buildServiceCatalogJSON(offers []*monetizeapi.ServiceOffer, baseURL string) Description: desc, IsDemo: offer.Namespace == "demo", RegistrationPending: offerAwaitingRegistration(offer), + DrainEndsAt: drainEndsAt, } raw, unit := offerPriceRawAndUnit(offer) diff --git a/internal/serviceoffercontroller/render_test.go b/internal/serviceoffercontroller/render_test.go index eb71891c..a781f453 100644 --- a/internal/serviceoffercontroller/render_test.go +++ b/internal/serviceoffercontroller/render_test.go @@ -719,18 +719,27 @@ func TestBuildServiceCatalogJSON_AgentOfferUsesResolvedModel(t *testing.T) { } // TestBuildServiceCatalogJSON_ExcludesNonReady locks in the filter pipeline: -// nil offers, paused offers, and offers with a DeletionTimestamp must never -// leak onto the public storefront, even if they carry Ready=True. +// nil offers, drain-expired offers, and offers with a DeletionTimestamp +// must never leak onto the public storefront, even if they carry +// Ready=True. Mid-drain offers DO stay in the catalog with available=false +// and drainEndsAt set — that's the whole point of the drain replacement. func TestBuildServiceCatalogJSON_ExcludesNonReady(t *testing.T) { readyCond := []monetizeapi.Condition{{Type: "Ready", Status: "True"}} deleting := metav1.Now() + drainedAt := metav1.NewTime(time.Now().Add(-2 * time.Hour)) + zeroGrace := metav1.Duration{Duration: 0} + offers := []*monetizeapi.ServiceOffer{ nil, { - ObjectMeta: metav1.ObjectMeta{ - Name: "paused-svc", Namespace: "llm", - Annotations: map[string]string{monetizeapi.PausedAnnotation: "true"}, + ObjectMeta: metav1.ObjectMeta{Name: "drained-svc", Namespace: "llm"}, + Spec: monetizeapi.ServiceOfferSpec{ + DrainAt: &drainedAt, + DrainGracePeriod: &zeroGrace, + Payment: monetizeapi.ServiceOfferPayment{ + Price: monetizeapi.ServiceOfferPriceTable{PerRequest: "0.001"}, + }, }, Status: monetizeapi.ServiceOfferStatus{Conditions: readyCond}, }, @@ -773,6 +782,171 @@ func TestBuildServiceCatalogJSON_ExcludesNonReady(t *testing.T) { if services[0].Name != "ready-svc" { t.Errorf("got %q, want ready-svc — filter pipeline leaked another offer", services[0].Name) } + if services[0].DrainEndsAt != "" { + t.Errorf("ready-svc.drainEndsAt = %q, want empty (active offers must not carry drain metadata)", services[0].DrainEndsAt) + } + // Drain is purely additive: the active-offer JSON must not contain + // `available` or `drainEndsAt` keys at all. Walk the raw map so the + // assertion catches a stray field even if it's the zero value. + rawBytes, _ := json.Marshal(services[0]) + var rawMap map[string]any + if err := json.Unmarshal(rawBytes, &rawMap); err != nil { + t.Fatalf("re-marshal ready-svc: %v", err) + } + if _, ok := rawMap["available"]; ok { + t.Errorf("ready-svc JSON contains unexpected `available` key (drain must be purely additive): %s", rawBytes) + } + if _, ok := rawMap["drainEndsAt"]; ok { + t.Errorf("ready-svc JSON contains `drainEndsAt` key on active offer: %s", rawBytes) + } +} + +// TestBuildServiceCatalogJSON_DrainLifecycle covers the three drain +// states explicitly: pre-drain (no drainEndsAt key in JSON), mid-drain +// (in catalog with drainEndsAt populated), and drain-expired (filtered +// out of the catalog because the controller has torn down the route). +// Drain is purely additive: there is no `available` field. +func TestBuildServiceCatalogJSON_DrainLifecycle(t *testing.T) { + readyCond := []monetizeapi.Condition{{Type: "Ready", Status: "True"}} + mkOffer := func(name string) monetizeapi.ServiceOffer { + return monetizeapi.ServiceOffer{ + ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: "llm"}, + Spec: monetizeapi.ServiceOfferSpec{ + Type: "http", + Payment: monetizeapi.ServiceOfferPayment{ + Network: "base", + PayTo: "0x1111111111111111111111111111111111111111", + Price: monetizeapi.ServiceOfferPriceTable{PerRequest: "0.001"}, + }, + }, + Status: monetizeapi.ServiceOfferStatus{Conditions: readyCond}, + } + } + + // Pre-drain. + pre := mkOffer("pre") + + // Mid-drain: drainAt = now, grace = 1h → ends ~1h from now. + midDrainAt := metav1.NewTime(time.Now()) + midGrace := metav1.Duration{Duration: time.Hour} + mid := mkOffer("mid") + mid.Spec.DrainAt = &midDrainAt + mid.Spec.DrainGracePeriod = &midGrace + + // Drain-expired. + expDrainAt := metav1.NewTime(time.Now().Add(-2 * time.Hour)) + expGrace := metav1.Duration{Duration: time.Hour} + exp := mkOffer("expired") + exp.Spec.DrainAt = &expDrainAt + exp.Spec.DrainGracePeriod = &expGrace + + jsonStr := buildServiceCatalogJSON([]*monetizeapi.ServiceOffer{&pre, &mid, &exp}, "https://example.com") + var services []schemas.ServiceCatalogEntry + if err := json.Unmarshal([]byte(jsonStr), &services); err != nil { + t.Fatalf("invalid JSON: %v\n%s", err, jsonStr) + } + if len(services) != 2 { + t.Fatalf("expected 2 services (pre + mid; expired filtered out), got %d: %+v", len(services), services) + } + + byName := map[string]schemas.ServiceCatalogEntry{} + for _, s := range services { + byName[s.Name] = s + } + if pre, ok := byName["pre"]; !ok { + t.Fatal("pre-drain offer missing from catalog") + } else if pre.DrainEndsAt != "" { + t.Errorf("pre.drainEndsAt = %q, want empty (active offers carry no drain metadata)", pre.DrainEndsAt) + } + if mid, ok := byName["mid"]; !ok { + t.Fatal("mid-drain offer missing from catalog") + } else { + if mid.DrainEndsAt == "" { + t.Errorf("mid.drainEndsAt is empty, want RFC3339 timestamp") + } + if _, err := time.Parse(time.RFC3339, mid.DrainEndsAt); err != nil { + t.Errorf("mid.drainEndsAt = %q is not RFC3339: %v", mid.DrainEndsAt, err) + } + } + if _, ok := byName["expired"]; ok { + t.Error("drain-expired offer leaked into catalog; should be filtered") + } + + // Pure-additivity invariant on the raw JSON: no entry has an + // `available` field, and only mid has a `drainEndsAt` field. + var rawEntries []map[string]any + if err := json.Unmarshal([]byte(jsonStr), &rawEntries); err != nil { + t.Fatalf("re-unmarshal catalog JSON as raw maps: %v\n%s", err, jsonStr) + } + for _, entry := range rawEntries { + if _, ok := entry["available"]; ok { + t.Errorf("entry %q has unexpected `available` field; drain must be purely additive: %+v", entry["name"], entry) + } + switch entry["name"] { + case "pre": + if _, ok := entry["drainEndsAt"]; ok { + t.Errorf("pre-drain entry has `drainEndsAt` key: %+v", entry) + } + case "mid": + if _, ok := entry["drainEndsAt"]; !ok { + t.Errorf("mid-drain entry missing `drainEndsAt` key: %+v", entry) + } + } + } +} + +// TestBuildSkillCatalogMarkdown_DrainAdditiveDetail asserts the per-service +// detail block in /skill.md is purely additive: active offers contain no +// "Available" or "Drain ends at" bullets, draining offers contain ONLY a +// "Drain ends at" bullet (no separate Available marker). +func TestBuildSkillCatalogMarkdown_DrainAdditiveDetail(t *testing.T) { + readyCond := []monetizeapi.Condition{{Type: "Ready", Status: "True"}} + mkOffer := func(name string) monetizeapi.ServiceOffer { + return monetizeapi.ServiceOffer{ + ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: "llm"}, + Spec: monetizeapi.ServiceOfferSpec{ + Type: "http", + Payment: monetizeapi.ServiceOfferPayment{ + Network: "base", + PayTo: "0x1111111111111111111111111111111111111111", + Price: monetizeapi.ServiceOfferPriceTable{PerRequest: "0.001"}, + }, + }, + Status: monetizeapi.ServiceOfferStatus{Conditions: readyCond}, + } + } + + active := mkOffer("alpha-active") + midDrainAt := metav1.NewTime(time.Now()) + midGrace := metav1.Duration{Duration: time.Hour} + mid := mkOffer("beta-draining") + mid.Spec.DrainAt = &midDrainAt + mid.Spec.DrainGracePeriod = &midGrace + + md := buildSkillCatalogMarkdown([]*monetizeapi.ServiceOffer{&active, &mid}, "https://example.com") + + // The whole-doc level must never contain an "Available" bullet — + // neither for active nor for draining offers (draining surfaces only + // the drain-ends-at line). + if strings.Contains(md, "- **Available**:") { + t.Errorf("skill catalog markdown contains `- **Available**:` bullet; drain must be purely additive\n%s", md) + } + + // Split by service detail section so we can assert per-offer. + activeIdx := strings.Index(md, "### alpha-active") + midIdx := strings.Index(md, "### beta-draining") + if activeIdx < 0 || midIdx < 0 { + t.Fatalf("expected detail sections for both offers, got md=\n%s", md) + } + activeBlock := md[activeIdx:midIdx] + midBlock := md[midIdx:] + + if strings.Contains(activeBlock, "Drain ends at") { + t.Errorf("active detail block contains drain bullet: %s", activeBlock) + } + if !strings.Contains(midBlock, "- **Drain ends at**:") { + t.Errorf("draining detail block missing `- **Drain ends at**:` bullet: %s", midBlock) + } } // TestBuildServiceCatalogJSON_SortOrder ensures offers render in diff --git a/internal/x402/serviceoffer_source.go b/internal/x402/serviceoffer_source.go index f0b1999a..0c08c491 100644 --- a/internal/x402/serviceoffer_source.go +++ b/internal/x402/serviceoffer_source.go @@ -7,6 +7,7 @@ import ( "log" "sort" "strings" + "time" "github.com/ObolNetwork/obol-stack/internal/monetizeapi" "github.com/ObolNetwork/obol-stack/internal/schemas" @@ -85,7 +86,12 @@ func routesFromStore(offerItems, secretItems []any) ([]RouteRule, error) { if offer.Spec.Upstream.Namespace == "" { offer.Spec.Upstream.Namespace = offer.Namespace } - if offer.IsPaused() || !offerConditionTrue(offer.Status, "RoutePublished") { + // Draining offers keep their route up until the grace period + // expires so in-flight payments can settle. Only skip after the + // drain window has elapsed — at that point the controller has + // also torn down the HTTPRoute, so the verifier rule would + // gate traffic against a non-existent backend. + if offer.DrainExpired(time.Now()) || !offerConditionTrue(offer.Status, "RoutePublished") { continue } diff --git a/internal/x402/serviceoffer_source_test.go b/internal/x402/serviceoffer_source_test.go index 9733095e..6c825cda 100644 --- a/internal/x402/serviceoffer_source_test.go +++ b/internal/x402/serviceoffer_source_test.go @@ -3,6 +3,7 @@ package x402 import ( "encoding/base64" "testing" + "time" "github.com/ObolNetwork/obol-stack/internal/monetizeapi" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -37,11 +38,16 @@ func TestRoutesFromStore(t *testing.T) { Conditions: []monetizeapi.Condition{{Type: "RoutePublished", Status: "True"}}, }, }), + // Drain-expired offer: drainAt + zero grace period in the past + // → route should already be torn down, and the verifier rule + // should be filtered out even though RoutePublished is still + // True in the cached status snapshot. mustOfferObject(t, monetizeapi.ServiceOffer{ - ObjectMeta: metav1.ObjectMeta{Name: "paused", Namespace: "alpha", Annotations: map[string]string{ - monetizeapi.PausedAnnotation: "true", - }}, + ObjectMeta: metav1.ObjectMeta{Name: "drained", Namespace: "alpha"}, Spec: monetizeapi.ServiceOfferSpec{ + Upstream: monetizeapi.ServiceOfferUpstream{Service: "httpbin"}, + DrainAt: &metav1.Time{Time: time.Now().Add(-2 * time.Hour)}, + DrainGracePeriod: &metav1.Duration{Duration: time.Hour}, Payment: monetizeapi.ServiceOfferPayment{ Price: monetizeapi.ServiceOfferPriceTable{PerRequest: "1"}, }, @@ -50,6 +56,23 @@ func TestRoutesFromStore(t *testing.T) { Conditions: []monetizeapi.Condition{{Type: "RoutePublished", Status: "True"}}, }, }), + // Mid-drain offer: drainAt = now, grace = 1h → still within the + // drain window, route stays up so in-flight buyers can settle. + // Should appear in the verifier rules. + mustOfferObject(t, monetizeapi.ServiceOffer{ + ObjectMeta: metav1.ObjectMeta{Name: "c", Namespace: "alpha"}, + Spec: monetizeapi.ServiceOfferSpec{ + Upstream: monetizeapi.ServiceOfferUpstream{Service: "httpbin"}, + DrainAt: &metav1.Time{Time: time.Now()}, + DrainGracePeriod: &metav1.Duration{Duration: time.Hour}, + Payment: monetizeapi.ServiceOfferPayment{ + Price: monetizeapi.ServiceOfferPriceTable{PerRequest: "0.1"}, + }, + }, + Status: monetizeapi.ServiceOfferStatus{ + Conditions: []monetizeapi.Condition{{Type: "RoutePublished", Status: "True"}}, + }, + }), } secrets := []any{ mustSecretObject(t, "alpha", "litellm-secrets", map[string]string{ @@ -62,11 +85,16 @@ func TestRoutesFromStore(t *testing.T) { t.Fatalf("routesFromStore: %v", err) } - if len(routes) != 2 { - t.Fatalf("len(routes) = %d, want 2", len(routes)) + if len(routes) != 3 { + t.Fatalf("len(routes) = %d, want 3", len(routes)) + } + // Expected sort order: alpha/a, alpha/c, beta/b. + // "drained" must be filtered out because its drain window expired. + if routes[0].OfferName != "a" || routes[1].OfferName != "c" || routes[2].OfferName != "b" { + t.Fatalf("routes not sorted by offer identity (drained leaked?): %+v", routes) } - if routes[0].OfferName != "a" || routes[1].OfferName != "b" { - t.Fatalf("routes not sorted by offer identity: %+v", routes) + if routes[0].OfferNamespace != "alpha" || routes[1].OfferNamespace != "alpha" || routes[2].OfferNamespace != "beta" { + t.Fatalf("unexpected route namespaces: %+v", routes) } if routes[0].Pattern != "/services/a/*" { t.Fatalf("routes[0].Pattern = %q, want /services/a/*", routes[0].Pattern) @@ -83,11 +111,16 @@ func TestRoutesFromStore(t *testing.T) { if routes[0].StripPrefix != "/services/a" { t.Fatalf("routes[0].StripPrefix = %q, want /services/a", routes[0].StripPrefix) } - if routes[1].UpstreamAuth != "" { - t.Fatalf("routes[1].UpstreamAuth = %q, want empty", routes[1].UpstreamAuth) + if routes[2].UpstreamAuth != "" { + t.Fatalf("routes[2].UpstreamAuth = %q, want empty", routes[2].UpstreamAuth) + } + if routes[2].UpstreamURL != "http://httpbin.beta.svc.cluster.local:11434" { + t.Fatalf("routes[2].UpstreamURL = %q, want httpbin upstream URL", routes[2].UpstreamURL) } - if routes[1].UpstreamURL != "http://httpbin.beta.svc.cluster.local:11434" { - t.Fatalf("routes[1].UpstreamURL = %q, want httpbin upstream URL", routes[1].UpstreamURL) + // Mid-drain offer "c" stays in the rules but tracks its own + // upstream — verifies the drain window keeps the route alive. + if routes[1].UpstreamURL != "http://httpbin.alpha.svc.cluster.local:11434" { + t.Fatalf("routes[1] (mid-drain) UpstreamURL = %q, want httpbin upstream URL", routes[1].UpstreamURL) } }