Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 76 additions & 2 deletions cmd/serviceoffer-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,27 @@ import (
"os"
"os/signal"
"syscall"
"time"

"github.com/ObolNetwork/obol-stack/internal/serviceoffercontroller"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
)

const (
defaultLockNamespace = "x402"
leaseName = "serviceoffer-controller"
leaseDuration = 30 * time.Second
renewDeadline = 20 * time.Second
retryPeriod = 5 * time.Second
)

func main() {
kubeconfig := flag.String("kubeconfig", "", "Path to kubeconfig for out-of-cluster runs")
workers := flag.Int("workers", 1, "Number of reconcile workers")
leaderElect := flag.Bool("leader-elect", true, "Acquire a Lease before running the reconcile loop (disable for local dev)")
flag.Parse()

cfg, err := loadConfig(*kubeconfig)
Expand All @@ -31,9 +43,71 @@ func main() {
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer cancel()

if err := controller.Run(ctx, *workers); err != nil {
log.Fatalf("run controller: %v", err)
if !*leaderElect {
if err := controller.Run(ctx, *workers); err != nil {
log.Fatalf("run controller: %v", err)
}
return
}

runWithLeaderElection(ctx, cfg, controller, *workers)
}

func runWithLeaderElection(ctx context.Context, cfg *rest.Config, controller *serviceoffercontroller.Controller, workers int) {
podName := os.Getenv("POD_NAME")
if podName == "" {
// Fall back so local dev (go run ./cmd/serviceoffer-controller --leader-elect=false)
// still works if someone forgets the flag. Identity must be unique across
// candidates — in real deployments the downward API supplies the pod name.
podName = "serviceoffer-controller-local"
}

lockNamespace := os.Getenv("POD_NAMESPACE")
if lockNamespace == "" {
lockNamespace = defaultLockNamespace
}

lock, err := resourcelock.NewFromKubeconfig(
resourcelock.LeasesResourceLock,
lockNamespace,
leaseName,
resourcelock.ResourceLockConfig{
Identity: podName,
},
cfg,
renewDeadline,
)
if err != nil {
log.Fatalf("create lease lock: %v", err)
}

leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
Lock: lock,
ReleaseOnCancel: true,
LeaseDuration: leaseDuration,
RenewDeadline: renewDeadline,
RetryPeriod: retryPeriod,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
log.Printf("serviceoffer-controller: became leader %s", podName)
if err := controller.Run(ctx, workers); err != nil {
log.Printf("controller run: %v", err)
}
},
OnStoppedLeading: func() {
// On lost leadership exit non-zero so the kubelet restarts the
// pod and the next election starts from a clean state. Trying
// to keep running without the lease would race the new leader.
log.Printf("serviceoffer-controller: lost leadership %s", podName)
os.Exit(1)
},
OnNewLeader: func(identity string) {
if identity != podName {
log.Printf("serviceoffer-controller: new leader is %s", identity)
}
},
},
})
}

func loadConfig(kubeconfig string) (*rest.Config, error) {
Expand Down
81 changes: 81 additions & 0 deletions cmd/serviceoffer-controller/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package main

import (
"os"
"path/filepath"
"testing"
)

// TestLoadConfig_FromKubeconfigFile asserts loadConfig parses an explicit
// kubeconfig path. This is the local-dev codepath used when --leader-elect=false.
func TestLoadConfig_FromKubeconfigFile(t *testing.T) {
dir := t.TempDir()
kc := filepath.Join(dir, "kubeconfig")
if err := os.WriteFile(kc, []byte(minimalKubeconfig), 0o600); err != nil {
t.Fatalf("write kubeconfig: %v", err)
}

cfg, err := loadConfig(kc)
if err != nil {
t.Fatalf("loadConfig: %v", err)
}
if cfg.Host != "https://example.invalid:6443" {
t.Fatalf("unexpected host: %q", cfg.Host)
}
}

// TestLoadConfig_FromKubeconfigEnv mirrors the path used when KUBECONFIG is set
// (e.g. obol kubectl/helm passthrough during local dev).
func TestLoadConfig_FromKubeconfigEnv(t *testing.T) {
dir := t.TempDir()
kc := filepath.Join(dir, "kubeconfig")
if err := os.WriteFile(kc, []byte(minimalKubeconfig), 0o600); err != nil {
t.Fatalf("write kubeconfig: %v", err)
}

t.Setenv("KUBECONFIG", kc)
cfg, err := loadConfig("")
if err != nil {
t.Fatalf("loadConfig: %v", err)
}
if cfg.Host != "https://example.invalid:6443" {
t.Fatalf("unexpected host: %q", cfg.Host)
}
}

// TestLeaderElectionDefaults locks in the lease parameters chosen for fast
// failover on single-node k3d. If you tune these for a multi-zone deployment,
// update this test and the PR-description rationale.
func TestLeaderElectionDefaults(t *testing.T) {
if leaseDuration <= renewDeadline {
t.Fatalf("leaseDuration (%s) must exceed renewDeadline (%s)", leaseDuration, renewDeadline)
}
if renewDeadline <= retryPeriod {
t.Fatalf("renewDeadline (%s) must exceed retryPeriod (%s)", renewDeadline, retryPeriod)
}
if leaseName != "serviceoffer-controller" {
t.Fatalf("leaseName drifted from RBAC + Deployment expectation: %q", leaseName)
}
if defaultLockNamespace != "x402" {
t.Fatalf("defaultLockNamespace drifted from infrastructure manifest: %q", defaultLockNamespace)
}
}

const minimalKubeconfig = `apiVersion: v1
kind: Config
clusters:
- name: test
cluster:
server: https://example.invalid:6443
insecure-skip-tls-verify: true
contexts:
- name: test
context:
cluster: test
user: test
current-context: test
users:
- name: test
user:
token: test-token
`
9 changes: 8 additions & 1 deletion internal/embed/infrastructure/base/templates/x402.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,10 @@ metadata:
labels:
app: serviceoffer-controller
spec:
replicas: 1 # Do not scale — multiple replicas race on ERC-8004 on-chain registration
# Single replica by default; bumping to 2+ is now safe — leader election
# (client-go Lease in this namespace) prevents split-brain on the reconcile
# loop and the resulting double on-chain ERC-8004 registration.
replicas: 1
selector:
matchLabels:
app: serviceoffer-controller
Expand All @@ -286,6 +289,10 @@ spec:
image: ghcr.io/obolnetwork/serviceoffer-controller:b13254e
imagePullPolicy: IfNotPresent
env:
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: POD_NAMESPACE
valueFrom:
fieldRef:
Expand Down
Loading