Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 140 additions & 3 deletions internal/kubectl/kubectl.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,156 @@ import (
"os/exec"
"path/filepath"
"strings"
"time"

"github.com/ObolNetwork/obol-stack/internal/config"
)

// EnsureCluster checks that the kubeconfig file exists, returning a
// descriptive error when the cluster is not running.
// probeAPIServerFn is a package-level hook so tests can replace the live
// `kubectl version` probe with an in-process stub. It returns the kubectl
// stderr text (used by wrapClusterDown to classify the failure) and the
// underlying error. nil error means the API server is reachable.
var probeAPIServerFn = probeAPIServerExec

// refreshKubeconfigFn is a package-level hook so tests can replace the
// k3d kubeconfig-write step without shelling out. It must overwrite the
// kubeconfig file at cfg.ConfigDir/kubeconfig.yaml in-place.
var refreshKubeconfigFn = refreshK3dKubeconfig

// clusterProbeTimeout bounds the live API server probe and the post-refresh
// retry. Three seconds is enough for a healthy k3d API server on loopback
// (typical response is sub-100ms) and short enough that a stopped cluster
// surfaces an actionable error before the user wonders if the CLI hung.
const clusterProbeTimeout = 3 * time.Second

// EnsureCluster verifies that the Kubernetes API server is reachable using
// the kubeconfig under cfg.ConfigDir. It checks the kubeconfig file exists,
// then actively probes the API server (kubectl is the source of truth — file
// presence alone does not mean ops will succeed). When the probe fails with a
// "cluster down"-shaped error AND the deployment looks like a k3d stack, it
// attempts ONE kubeconfig refresh via `k3d kubeconfig write` and retries the
// probe before giving up. This recovers from the common port-drift case after
// `k3d cluster stop && k3d cluster start`, where the kubeconfig on disk still
// points at the previous (now-defunct) API server port.
func EnsureCluster(cfg *config.Config) error {
kubeconfig := filepath.Join(cfg.ConfigDir, "kubeconfig.yaml")
if _, err := os.Stat(kubeconfig); os.IsNotExist(err) {
return errors.New("cluster not running. Run 'obol stack up' first")
}

return nil
bin, kc := Paths(cfg)
stderr, err := probeAPIServerFn(bin, kc, clusterProbeTimeout)
if err == nil {
return nil
}

// If the failure does not look like a cluster-down condition (e.g. the
// kubectl binary is missing), surface the original error verbatim rather
// than misleading the user with the "cluster appears to be stopped" hint.
if wrapped := wrapClusterDown(err, stderr); !errors.Is(wrapped, ErrClusterDown) {
return wrapped
}

// Attempt a single best-effort k3d kubeconfig refresh, then re-probe.
// This handles the port-drift case after `k3d cluster stop && start`.
if refreshed := refreshKubeconfigFn(cfg); refreshed {
stderr, err := probeAPIServerFn(bin, kc, clusterProbeTimeout)
if err == nil {
return nil
}
return wrapClusterDown(err, stderr)
}

return ErrClusterDown
}

// probeAPIServerExec runs `kubectl version --request-timeout` against the
// API server pointed at by kubeconfig, returning (stderr, error). A successful
// response means the K8s control plane is reachable and serving the discovery
// endpoint — the same RTT a real CLI command would experience.
func probeAPIServerExec(binary, kubeconfig string, timeout time.Duration) (string, error) {
// --request-timeout bounds the HTTP request; we still bound the overall
// exec via the same timeout in case the kubectl binary hangs on DNS.
args := []string{
"version",
"-o", "json",
"--request-timeout=" + timeout.String(),
}

cmd := exec.Command(binary, args...)
cmd.Env = append(os.Environ(), "KUBECONFIG="+kubeconfig)

var stderr bytes.Buffer
cmd.Stderr = &stderr

done := make(chan error, 1)
if err := cmd.Start(); err != nil {
return "", err
}
go func() { done <- cmd.Wait() }()

select {
case err := <-done:
return strings.TrimSpace(stderr.String()), err
case <-time.After(timeout + time.Second):
_ = cmd.Process.Kill()
<-done
// Synthesise an "Unable to connect" stderr so wrapClusterDown
// classifies a hung kubectl as a cluster-down condition.
return "Unable to connect to the server: timed out waiting for API server",
fmt.Errorf("kubectl version timed out after %s", timeout)
}
}

// refreshK3dKubeconfig attempts to overwrite cfg.ConfigDir/kubeconfig.yaml
// from `k3d kubeconfig write` for the persisted stack ID. Returns true when
// the refresh actually ran (regardless of whether it changed the file).
// Returns false when prerequisites are missing — in that case the caller
// should treat the original probe failure as authoritative.
//
// Prerequisites:
// - cfg.BinDir/k3d exists and is executable
// - cfg.ConfigDir/.stack-id is present (records the petname)
// - cfg.ConfigDir/.stack-backend is "k3d" (or missing, which we treat as k3d
// because that is the historical default — see internal/stack.LoadBackend
// which falls back to k3d when the file is absent)
func refreshK3dKubeconfig(cfg *config.Config) bool {
if cfg == nil {
return false
}

k3dBin := filepath.Join(cfg.BinDir, "k3d")
if _, err := os.Stat(k3dBin); err != nil {
return false
}

backendPath := filepath.Join(cfg.ConfigDir, ".stack-backend")
if data, err := os.ReadFile(backendPath); err == nil {
if name := strings.TrimSpace(string(data)); name != "" && name != "k3d" {
return false
}
}

stackIDPath := filepath.Join(cfg.ConfigDir, ".stack-id")
stackIDBytes, err := os.ReadFile(stackIDPath)
if err != nil {
return false
}
stackID := strings.TrimSpace(string(stackIDBytes))
if stackID == "" {
return false
}

kubeconfig := filepath.Join(cfg.ConfigDir, "kubeconfig.yaml")
clusterName := "obol-stack-" + stackID

cmd := exec.Command(k3dBin, "kubeconfig", "write", clusterName,
"-o", kubeconfig, "--overwrite")
if err := cmd.Run(); err != nil {
return false
}

return true
}

// ErrClusterDown indicates the Kubernetes API server is unreachable,
Expand Down
205 changes: 201 additions & 4 deletions internal/kubectl/kubectl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,45 @@ import (
"path/filepath"
"strings"
"testing"
"time"

"github.com/ObolNetwork/obol-stack/internal/config"
)

// stubProbe returns a probeAPIServerFn that returns the given (stderr, err)
// and counts how many times it was called.
func stubProbe(stderr string, err error, calls *int) func(string, string, time.Duration) (string, error) {
return func(string, string, time.Duration) (string, error) {
if calls != nil {
*calls++
}
return stderr, err
}
}

// withProbe swaps probeAPIServerFn for the duration of the test.
func withProbe(t *testing.T, fn func(string, string, time.Duration) (string, error)) {
t.Helper()
orig := probeAPIServerFn
probeAPIServerFn = fn
t.Cleanup(func() { probeAPIServerFn = orig })
}

// withRefresh swaps refreshKubeconfigFn for the duration of the test.
func withRefresh(t *testing.T, fn func(*config.Config) bool) {
t.Helper()
orig := refreshKubeconfigFn
refreshKubeconfigFn = fn
t.Cleanup(func() { refreshKubeconfigFn = orig })
}

func writeKubeconfig(t *testing.T, dir string) {
t.Helper()
if err := os.WriteFile(filepath.Join(dir, "kubeconfig.yaml"), []byte("test"), 0o600); err != nil {
t.Fatal(err)
}
}

func TestEnsureCluster_Missing(t *testing.T) {
cfg := &config.Config{ConfigDir: t.TempDir()}

Expand All @@ -19,15 +54,177 @@ func TestEnsureCluster_Missing(t *testing.T) {
}
}

func TestEnsureCluster_Exists(t *testing.T) {
func TestEnsureCluster_ProbeSucceeds(t *testing.T) {
dir := t.TempDir()
if err := os.WriteFile(filepath.Join(dir, "kubeconfig.yaml"), []byte("test"), 0o644); err != nil {
t.Fatal(err)
writeKubeconfig(t, dir)
cfg := &config.Config{ConfigDir: dir}

var probeCalls int
withProbe(t, stubProbe("", nil, &probeCalls))
withRefresh(t, func(*config.Config) bool {
t.Fatal("refresh should not run when initial probe succeeds")
return false
})

if err := EnsureCluster(cfg); err != nil {
t.Fatalf("unexpected error: %v", err)
}
if probeCalls != 1 {
t.Errorf("expected 1 probe call, got %d", probeCalls)
}
}

// Authoritative-probe regression: kubeconfig is stale (e.g. after `k3d cluster
// stop && start` and a port change), so `kubectl version` reports connection
// refused, AND k3d refresh recovers it. EnsureCluster must report success
// instead of telling the user the cluster is stopped.
func TestEnsureCluster_RefreshRecoversFromPortDrift(t *testing.T) {
dir := t.TempDir()
writeKubeconfig(t, dir)
cfg := &config.Config{ConfigDir: dir}

var probeCalls, refreshCalls int
withProbe(t, func(string, string, time.Duration) (string, error) {
probeCalls++
if probeCalls == 1 {
return "dial tcp 127.0.0.1:50839: connect: connection refused",
errors.New("exit status 1")
}
// Post-refresh probe succeeds.
return "", nil
})
withRefresh(t, func(*config.Config) bool {
refreshCalls++
return true
})

if err := EnsureCluster(cfg); err != nil {
t.Fatalf("unexpected error: %v", err)
t.Fatalf("expected nil after kubeconfig refresh, got: %v", err)
}
if probeCalls != 2 {
t.Errorf("expected 2 probe calls (initial + post-refresh), got %d", probeCalls)
}
if refreshCalls != 1 {
t.Errorf("expected exactly 1 refresh call, got %d", refreshCalls)
}
}

// When the refresh helper cannot run (no k3d binary / no stack id),
// EnsureCluster must still surface ErrClusterDown for a connection-refused
// stderr — i.e. fall back to current behavior, not crash.
func TestEnsureCluster_RefreshSkippedReturnsClusterDown(t *testing.T) {
dir := t.TempDir()
writeKubeconfig(t, dir)
cfg := &config.Config{ConfigDir: dir}

withProbe(t, stubProbe(
"Unable to connect to the server: dial tcp 127.0.0.1:6443: connection refused",
errors.New("exit status 1"),
nil,
))
withRefresh(t, func(*config.Config) bool { return false })

err := EnsureCluster(cfg)
if !errors.Is(err, ErrClusterDown) {
t.Fatalf("expected ErrClusterDown, got: %v", err)
}
}

// When the refresh runs but the post-refresh probe still cannot reach the
// API server, EnsureCluster must report ErrClusterDown (do not loop forever).
func TestEnsureCluster_RefreshRanButProbeStillFails(t *testing.T) {
dir := t.TempDir()
writeKubeconfig(t, dir)
cfg := &config.Config{ConfigDir: dir}

var probeCalls int
withProbe(t, func(string, string, time.Duration) (string, error) {
probeCalls++
return "Unable to connect to the server: dial tcp 127.0.0.1:6443: connection refused",
errors.New("exit status 1")
})
withRefresh(t, func(*config.Config) bool { return true })

err := EnsureCluster(cfg)
if !errors.Is(err, ErrClusterDown) {
t.Fatalf("expected ErrClusterDown after refresh+retry, got: %v", err)
}
if probeCalls != 2 {
t.Errorf("expected exactly 2 probe calls, got %d", probeCalls)
}
}

// Non-cluster-down probe failures (e.g. kubectl binary missing) must NOT be
// reported as "cluster appears to be stopped". That message has misled
// debugging in the past — the original error should pass through verbatim.
func TestEnsureCluster_NonClusterDownErrorPassesThrough(t *testing.T) {
dir := t.TempDir()
writeKubeconfig(t, dir)
cfg := &config.Config{ConfigDir: dir}

want := errors.New("fork/exec /bin/kubectl: no such file or directory")
withProbe(t, stubProbe("", want, nil))
withRefresh(t, func(*config.Config) bool {
t.Fatal("refresh should not run for non-cluster-down errors")
return false
})

err := EnsureCluster(cfg)
if err == nil || !strings.Contains(err.Error(), "no such file or directory") {
t.Fatalf("expected original error to pass through, got: %v", err)
}
if errors.Is(err, ErrClusterDown) {
t.Fatalf("non-cluster-down error must not be wrapped as ErrClusterDown")
}
}

func TestRefreshK3dKubeconfig_MissingPrerequisites(t *testing.T) {
// No k3d binary, no stack id → refresh must decline (return false) rather
// than panic or shell out to a binary that does not exist.
dir := t.TempDir()
cfg := &config.Config{
ConfigDir: dir,
BinDir: filepath.Join(dir, "bin"),
}
if refreshK3dKubeconfig(cfg) {
t.Fatal("expected false when prerequisites are missing")
}

// k3d binary present but no stack id file → still declines.
if err := os.MkdirAll(cfg.BinDir, 0o700); err != nil {
t.Fatal(err)
}
if err := os.WriteFile(filepath.Join(cfg.BinDir, "k3d"), []byte("#!/bin/sh\nexit 0"), 0o700); err != nil { //nolint:gosec // test fixture
t.Fatal(err)
}
if refreshK3dKubeconfig(cfg) {
t.Fatal("expected false when .stack-id is missing")
}
}

func TestRefreshK3dKubeconfig_NonK3dBackendDeclines(t *testing.T) {
// When the persisted backend is something other than k3d (e.g. k3s),
// the refresh must not run — the k3d CLI does not own this cluster.
dir := t.TempDir()
cfg := &config.Config{
ConfigDir: dir,
BinDir: filepath.Join(dir, "bin"),
}
if err := os.MkdirAll(cfg.BinDir, 0o700); err != nil {
t.Fatal(err)
}
if err := os.WriteFile(filepath.Join(cfg.BinDir, "k3d"), []byte("#!/bin/sh\nexit 0"), 0o700); err != nil { //nolint:gosec // test fixture
t.Fatal(err)
}
if err := os.WriteFile(filepath.Join(dir, ".stack-id"), []byte("fancy-yak"), 0o600); err != nil {
t.Fatal(err)
}
if err := os.WriteFile(filepath.Join(dir, ".stack-backend"), []byte("k3s"), 0o600); err != nil {
t.Fatal(err)
}

if refreshK3dKubeconfig(cfg) {
t.Fatal("expected false when backend is not k3d")
}
}

Expand Down
Loading