diff --git a/configresolve/resolve.go b/configresolve/resolve.go index 17af2400..cbc19427 100644 --- a/configresolve/resolve.go +++ b/configresolve/resolve.go @@ -116,6 +116,15 @@ type Resolved struct { K8sWorkerTolerationKey string K8sWorkerTolerationValue string K8sWorkerExclusiveNode bool + K8sIceboomEnabled bool + K8sIceboomImage string + K8sIceboomPort int + K8sIceboomConfigMap string + K8sIceboomCPURequest string + K8sIceboomCPULimit string + K8sIceboomMemoryRequest string + K8sIceboomMemoryLimit string + K8sIceboomImagePullPolicy string AWSRegion string ConfigStoreConn string ConfigPollInterval time.Duration @@ -192,6 +201,11 @@ func ResolveEffective(fileCfg *configloader.FileConfig, cli CLIInputs, getenv fu var k8sWorkerCPURequest, k8sWorkerMemoryRequest string var k8sWorkerNodeSelector, k8sWorkerTolerationKey, k8sWorkerTolerationValue string var k8sWorkerExclusiveNode bool + var k8sIceboomEnabled bool + var k8sIceboomImage, k8sIceboomConfigMap, k8sIceboomImagePullPolicy string + var k8sIceboomCPURequest, k8sIceboomCPULimit string + var k8sIceboomMemoryRequest, k8sIceboomMemoryLimit string + var k8sIceboomPort int var awsRegion string var configStoreConn string var configPollInterval time.Duration @@ -836,6 +850,41 @@ func ResolveEffective(fileCfg *configloader.FileConfig, cli CLIInputs, getenv fu k8sWorkerExclusiveNode = b } } + if v := getenv("DUCKGRES_K8S_WORKER_ICEBOOM_ENABLED"); v != "" { + if b, err := strconv.ParseBool(v); err == nil { + k8sIceboomEnabled = b + } else { + warn("Invalid DUCKGRES_K8S_WORKER_ICEBOOM_ENABLED: " + err.Error()) + } + } + if v := getenv("DUCKGRES_K8S_WORKER_ICEBOOM_IMAGE"); v != "" { + k8sIceboomImage = v + } + if v := getenv("DUCKGRES_K8S_WORKER_ICEBOOM_PORT"); v != "" { + if n, err := strconv.Atoi(v); err == nil { + k8sIceboomPort = n + } else { + warn("Invalid DUCKGRES_K8S_WORKER_ICEBOOM_PORT: " + err.Error()) + } + } + if v := getenv("DUCKGRES_K8S_WORKER_ICEBOOM_CONFIGMAP"); v != "" { + k8sIceboomConfigMap = v + } + if v := getenv("DUCKGRES_K8S_WORKER_ICEBOOM_CPU_REQUEST"); v != "" { + k8sIceboomCPURequest = v + } + if v := getenv("DUCKGRES_K8S_WORKER_ICEBOOM_CPU_LIMIT"); v != "" { + k8sIceboomCPULimit = v + } + if v := getenv("DUCKGRES_K8S_WORKER_ICEBOOM_MEMORY_REQUEST"); v != "" { + k8sIceboomMemoryRequest = v + } + if v := getenv("DUCKGRES_K8S_WORKER_ICEBOOM_MEMORY_LIMIT"); v != "" { + k8sIceboomMemoryLimit = v + } + if v := getenv("DUCKGRES_K8S_WORKER_ICEBOOM_IMAGE_PULL_POLICY"); v != "" { + k8sIceboomImagePullPolicy = v + } if v := getenv("DUCKGRES_AWS_REGION"); v != "" { awsRegion = v } @@ -1164,6 +1213,15 @@ func ResolveEffective(fileCfg *configloader.FileConfig, cli CLIInputs, getenv fu K8sWorkerTolerationKey: k8sWorkerTolerationKey, K8sWorkerTolerationValue: k8sWorkerTolerationValue, K8sWorkerExclusiveNode: k8sWorkerExclusiveNode, + K8sIceboomEnabled: k8sIceboomEnabled, + K8sIceboomImage: k8sIceboomImage, + K8sIceboomPort: k8sIceboomPort, + K8sIceboomConfigMap: k8sIceboomConfigMap, + K8sIceboomCPURequest: k8sIceboomCPURequest, + K8sIceboomCPULimit: k8sIceboomCPULimit, + K8sIceboomMemoryRequest: k8sIceboomMemoryRequest, + K8sIceboomMemoryLimit: k8sIceboomMemoryLimit, + K8sIceboomImagePullPolicy: k8sIceboomImagePullPolicy, AWSRegion: awsRegion, ConfigStoreConn: configStoreConn, ConfigPollInterval: configPollInterval, diff --git a/controlplane/control.go b/controlplane/control.go index 08efcad7..2b905022 100644 --- a/controlplane/control.go +++ b/controlplane/control.go @@ -116,6 +116,24 @@ type K8sConfig struct { WorkerTolerationValue string // Taint value for worker pod NoSchedule toleration WorkerExclusiveNode bool // One worker per node via pod anti-affinity AWSRegion string // AWS region for STS client + Iceboom IceboomConfig +} + +// IceboomConfig configures the optional iceboom sidecar injected into each +// worker pod. iceboom is an Iceberg REST catalog proxy that fronts AWS S3 +// Tables; the control plane reconfigures it per tenant at activation time, +// reusing the same STS-broker path used for DuckLake S3 creds. Chart wiring +// lives in PostHog/charts charts/duckgres. +type IceboomConfig struct { + Enabled bool + Image string + Port int // Loopback port iceboom listens on inside the worker pod + ConfigMap string // ConfigMap mounted at /etc/iceboom (must define config.toml) + CPURequest string // CPU request (e.g., "200m"). Empty = no resources block. + CPULimit string // CPU limit; should equal request for Guaranteed QoS. + MemoryRequest string // Memory request (e.g., "128Mi"). + MemoryLimit string // Memory limit; should equal request for Guaranteed QoS. + ImagePullPolicy string } // ControlPlane manages the TCP listener and routes connections to Flight SQL workers. diff --git a/controlplane/k8s_pool.go b/controlplane/k8s_pool.go index dcf75ece..f4edf3a8 100644 --- a/controlplane/k8s_pool.go +++ b/controlplane/k8s_pool.go @@ -74,6 +74,7 @@ type K8sWorkerPool struct { workerTolerationKey string // taint key for NoSchedule toleration workerTolerationValue string // taint value for NoSchedule toleration workerExclusiveNode bool // one worker per node via anti-affinity + iceboom IceboomPoolConfig // optional iceboom sidecar (Iceberg REST catalog proxy) orgID string // org ID for pod labels (multi-tenant mode) workerIDGenerator func() int // shared ID generator across orgs (nil = internal counter) resolveOrgConfig func(string) (*configstore.OrgConfig, error) // resolve org config for per-tenant image reaping @@ -143,6 +144,20 @@ func newK8sWorkerPool(cfg K8sWorkerPoolConfig, clientset kubernetes.Interface) ( if strings.TrimSpace(cfg.ServiceAccount) == "" { cfg.ServiceAccount = DefaultK8sWorkerServiceAccount } + if cfg.Iceboom.Enabled { + if cfg.Iceboom.Image == "" { + return nil, fmt.Errorf("iceboom.image is required when iceboom is enabled") + } + if cfg.Iceboom.ConfigMap == "" { + return nil, fmt.Errorf("iceboom.configMap is required when iceboom is enabled") + } + if cfg.Iceboom.Port == 0 { + cfg.Iceboom.Port = 8181 + } + if cfg.Iceboom.ImagePullPolicy == "" { + cfg.Iceboom.ImagePullPolicy = cfg.ImagePullPolicy + } + } // Limit concurrent K8s API calls to avoid overwhelming the API server. spawnConcurrency := 3 @@ -170,6 +185,7 @@ func newK8sWorkerPool(cfg K8sWorkerPoolConfig, clientset kubernetes.Interface) ( workerTolerationKey: cfg.WorkerTolerationKey, workerTolerationValue: cfg.WorkerTolerationValue, workerExclusiveNode: cfg.WorkerExclusiveNode, + iceboom: cfg.Iceboom, orgID: cfg.OrgID, workerIDGenerator: cfg.WorkerIDGenerator, resolveOrgConfig: cfg.ResolveOrgConfig, @@ -752,6 +768,24 @@ func (p *K8sWorkerPool) spawnWorker(ctx context.Context, id int, image string, p }) } + // Iceboom sidecar: Iceberg REST catalog proxy on 127.0.0.1:. The + // container starts with a bootable upstream-disabled config; the control + // plane reconfigures it per tenant at activation time via an HTTP RPC + // (iceboom roadmap phase 8). Adding it here keeps the worker pod's QoS + // class consistent — iceboom carries its own request==limit when those + // values are populated. + if p.iceboom.Enabled { + pod.Spec.Containers = append(pod.Spec.Containers, p.buildIceboomContainer()) + pod.Spec.Volumes = append(pod.Spec.Volumes, corev1.Volume{ + Name: "iceboom-config", + VolumeSource: corev1.VolumeSource{ + ConfigMap: &corev1.ConfigMapVolumeSource{ + LocalObjectReference: corev1.LocalObjectReference{Name: p.iceboom.ConfigMap}, + }, + }, + }) + } + // Delete stale pod with the same name if it exists (from a previous run) _ = p.clientset.CoreV1().Pods(p.namespace).Delete(ctx, podName, metav1.DeleteOptions{ GracePeriodSeconds: int64Ptr(0), @@ -2624,6 +2658,75 @@ func (p *K8sWorkerPool) workerResources() corev1.ResourceRequirements { return corev1.ResourceRequirements{Requests: requests, Limits: limits} } +// buildIceboomContainer assembles the iceboom sidecar container for a +// worker pod. iceboom listens on 127.0.0.1: (loopback only — the +// duckdb-worker container in the same pod reaches it via localhost) and +// reads its bootable config from the mounted ConfigMap. The control +// plane reconfigures the upstream / per-tenant warehouse ARN / STS creds +// at activation time over a separate RPC. +// +// The container intentionally: +// - runs with allowPrivilegeEscalation=false to match the worker. +// - mounts the iceboom-config volume read-only at /etc/iceboom. +// - takes the same imagePullPolicy as the worker when unset. +// - emits a livenessProbe but no readinessProbe (the worker pod's +// readiness is driven by duckdb-worker; iceboom's /readyz depends on +// the upstream, which isn't configured until activation). +func (p *K8sWorkerPool) buildIceboomContainer() corev1.Container { + c := corev1.Container{ + Name: "iceboom", + Image: p.iceboom.Image, + ImagePullPolicy: corev1.PullPolicy(p.iceboom.ImagePullPolicy), + Args: []string{"--config", "/etc/iceboom/config.toml"}, + Ports: []corev1.ContainerPort{ + { + Name: "iceberg", + ContainerPort: int32(p.iceboom.Port), + Protocol: corev1.ProtocolTCP, + }, + }, + SecurityContext: &corev1.SecurityContext{ + AllowPrivilegeEscalation: boolPtr(false), + }, + VolumeMounts: []corev1.VolumeMount{{ + Name: "iceboom-config", + MountPath: "/etc/iceboom", + ReadOnly: true, + }}, + Resources: p.iceboomResources(), + } + return c +} + +// iceboomResources returns the resource block for the iceboom sidecar. +// Mirrors workerResources's BestEffort-when-empty semantics; when CPU / +// memory limits are set independently from requests, both are honored so +// Guaranteed QoS only kicks in when the operator opts in. +func (p *K8sWorkerPool) iceboomResources() corev1.ResourceRequirements { + requests := corev1.ResourceList{} + if p.iceboom.CPURequest != "" { + requests[corev1.ResourceCPU] = resource.MustParse(p.iceboom.CPURequest) + } + if p.iceboom.MemoryRequest != "" { + requests[corev1.ResourceMemory] = resource.MustParse(p.iceboom.MemoryRequest) + } + limits := corev1.ResourceList{} + if p.iceboom.CPULimit != "" { + limits[corev1.ResourceCPU] = resource.MustParse(p.iceboom.CPULimit) + } + if p.iceboom.MemoryLimit != "" { + limits[corev1.ResourceMemory] = resource.MustParse(p.iceboom.MemoryLimit) + } + out := corev1.ResourceRequirements{} + if len(requests) > 0 { + out.Requests = requests + } + if len(limits) > 0 { + out.Limits = limits + } + return out +} + // --- Helpers --- // allocateWorkerID returns the next worker ID, using the shared generator diff --git a/controlplane/k8s_pool_test.go b/controlplane/k8s_pool_test.go index 388cbd47..c89d5a96 100644 --- a/controlplane/k8s_pool_test.go +++ b/controlplane/k8s_pool_test.go @@ -2193,6 +2193,140 @@ func assertSpawnedWorkerPod(t *testing.T, pod *corev1.Pod) { } } +// When iceboom is enabled the spawned worker pod must carry the iceboom +// sidecar container, an iceboom-config ConfigMap volume, and a read-only +// mount at /etc/iceboom on the sidecar. Pod-level QoS depends on every +// container carrying matching request/limit pairs, so this also asserts +// that the sidecar resources block is what we configured. +func TestK8sPool_SpawnWorkerInjectsIceboomSidecarWhenEnabled(t *testing.T) { + pool, cs := newTestK8sPool(t, 5) + pool.iceboom = IceboomPoolConfig{ + Enabled: true, + Image: "iceboom:test", + Port: 8181, + ConfigMap: "duckgres-iceboom-config", + CPURequest: "200m", + CPULimit: "200m", + MemoryRequest: "128Mi", + MemoryLimit: "256Mi", + ImagePullPolicy: "IfNotPresent", + } + + var createdWorkerPod *corev1.Pod + cs.PrependReactor("create", "pods", func(action k8stesting.Action) (bool, runtime.Object, error) { + createAction, ok := action.(k8stesting.CreateAction) + if !ok { + return false, nil, nil + } + pod, ok := createAction.GetObject().(*corev1.Pod) + if !ok { + return false, nil, nil + } + if pod.Labels["app"] == "duckgres-worker" { + createdWorkerPod = pod.DeepCopy() + } + return false, nil, nil + }) + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + _ = pool.SpawnWorker(ctx, 0, pool.workerImage) + + if createdWorkerPod == nil { + t.Fatal("expected worker pod create to be attempted") + } + + if got, want := len(createdWorkerPod.Spec.Containers), 2; got != want { + t.Fatalf("expected %d containers (worker + iceboom), got %d", want, got) + } + var iceboom *corev1.Container + for i := range createdWorkerPod.Spec.Containers { + if createdWorkerPod.Spec.Containers[i].Name == "iceboom" { + iceboom = &createdWorkerPod.Spec.Containers[i] + break + } + } + if iceboom == nil { + t.Fatal("iceboom container not found in pod spec") + } + if iceboom.Image != "iceboom:test" { + t.Fatalf("expected iceboom image iceboom:test, got %q", iceboom.Image) + } + if string(iceboom.ImagePullPolicy) != "IfNotPresent" { + t.Fatalf("expected pullPolicy IfNotPresent, got %q", iceboom.ImagePullPolicy) + } + if len(iceboom.Args) != 2 || iceboom.Args[0] != "--config" || iceboom.Args[1] != "/etc/iceboom/config.toml" { + t.Fatalf("expected args [--config /etc/iceboom/config.toml], got %v", iceboom.Args) + } + if iceboom.SecurityContext == nil || iceboom.SecurityContext.AllowPrivilegeEscalation == nil || *iceboom.SecurityContext.AllowPrivilegeEscalation { + t.Fatal("expected allowPrivilegeEscalation=false on iceboom container") + } + foundMount := false + for _, vm := range iceboom.VolumeMounts { + if vm.Name == "iceboom-config" && vm.MountPath == "/etc/iceboom" && vm.ReadOnly { + foundMount = true + } + } + if !foundMount { + t.Fatalf("expected read-only iceboom-config mount at /etc/iceboom, got %+v", iceboom.VolumeMounts) + } + if iceboom.Resources.Requests.Cpu().String() != "200m" { + t.Fatalf("expected iceboom cpu request 200m, got %s", iceboom.Resources.Requests.Cpu().String()) + } + if iceboom.Resources.Limits.Memory().String() != "256Mi" { + t.Fatalf("expected iceboom memory limit 256Mi, got %s", iceboom.Resources.Limits.Memory().String()) + } + + foundVol := false + for _, v := range createdWorkerPod.Spec.Volumes { + if v.Name == "iceboom-config" && v.ConfigMap != nil && v.ConfigMap.Name == "duckgres-iceboom-config" { + foundVol = true + } + } + if !foundVol { + t.Fatal("expected iceboom-config ConfigMap volume on pod spec") + } +} + +// Default disabled iceboom must not perturb the spawned pod spec: still +// exactly one container, no iceboom volume. +func TestK8sPool_SpawnWorkerDoesNotInjectIceboomWhenDisabled(t *testing.T) { + pool, cs := newTestK8sPool(t, 5) + // iceboom left zero-valued (Enabled=false). + + var createdWorkerPod *corev1.Pod + cs.PrependReactor("create", "pods", func(action k8stesting.Action) (bool, runtime.Object, error) { + createAction, ok := action.(k8stesting.CreateAction) + if !ok { + return false, nil, nil + } + pod, ok := createAction.GetObject().(*corev1.Pod) + if !ok { + return false, nil, nil + } + if pod.Labels["app"] == "duckgres-worker" { + createdWorkerPod = pod.DeepCopy() + } + return false, nil, nil + }) + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + _ = pool.SpawnWorker(ctx, 0, pool.workerImage) + + if createdWorkerPod == nil { + t.Fatal("expected worker pod create to be attempted") + } + if got := len(createdWorkerPod.Spec.Containers); got != 1 { + t.Fatalf("expected 1 container when iceboom disabled, got %d", got) + } + for _, v := range createdWorkerPod.Spec.Volumes { + if v.Name == "iceboom-config" { + t.Fatal("did not expect iceboom-config volume when iceboom is disabled") + } + } +} + func TestK8sPool_RetireWorkerDeletesWorkerRPCSecret(t *testing.T) { pool, cs := newTestK8sPool(t, 5) worker := &ManagedWorker{ID: 1, podName: "duckgres-worker-test-cp-1", done: make(chan struct{})} diff --git a/controlplane/multitenant.go b/controlplane/multitenant.go index 415353b8..93a62e9d 100644 --- a/controlplane/multitenant.go +++ b/controlplane/multitenant.go @@ -185,6 +185,17 @@ func SetupMultiTenant( WorkerTolerationKey: cfg.K8s.WorkerTolerationKey, WorkerTolerationValue: cfg.K8s.WorkerTolerationValue, WorkerExclusiveNode: cfg.K8s.WorkerExclusiveNode, + Iceboom: IceboomPoolConfig{ + Enabled: cfg.K8s.Iceboom.Enabled, + Image: cfg.K8s.Iceboom.Image, + Port: cfg.K8s.Iceboom.Port, + ConfigMap: cfg.K8s.Iceboom.ConfigMap, + CPURequest: cfg.K8s.Iceboom.CPURequest, + CPULimit: cfg.K8s.Iceboom.CPULimit, + MemoryRequest: cfg.K8s.Iceboom.MemoryRequest, + MemoryLimit: cfg.K8s.Iceboom.MemoryLimit, + ImagePullPolicy: cfg.K8s.Iceboom.ImagePullPolicy, + }, ResolveOrgConfig: func(orgID string) (*configstore.OrgConfig, error) { snap := store.Snapshot() if snap == nil { diff --git a/controlplane/worker_pool.go b/controlplane/worker_pool.go index 6c02ab5e..7d4411a4 100644 --- a/controlplane/worker_pool.go +++ b/controlplane/worker_pool.go @@ -72,6 +72,22 @@ type K8sWorkerPoolConfig struct { WorkerIDGenerator func() int // Shared ID generator across orgs (nil = internal counter) ResolveOrgConfig func(string) (*configstore.OrgConfig, error) // Optional: resolve org config for version-aware reaping RuntimeStore RuntimeWorkerStore + Iceboom IceboomPoolConfig // Optional iceboom sidecar; enabled when Iceboom.Enabled is true. +} + +// IceboomPoolConfig mirrors IceboomConfig at the pool layer. Kept as a +// separate type so the pool package doesn't need to reach into the higher- +// level ControlPlaneConfig. +type IceboomPoolConfig struct { + Enabled bool + Image string + Port int + ConfigMap string + CPURequest string + CPULimit string + MemoryRequest string + MemoryLimit string + ImagePullPolicy string } type RuntimeWorkerStore interface { diff --git a/main.go b/main.go index b1e59ca4..6b3e8311 100644 --- a/main.go +++ b/main.go @@ -361,6 +361,17 @@ func main() { WorkerTolerationValue: resolved.K8sWorkerTolerationValue, WorkerExclusiveNode: resolved.K8sWorkerExclusiveNode, AWSRegion: resolved.AWSRegion, + Iceboom: controlplane.IceboomConfig{ + Enabled: resolved.K8sIceboomEnabled, + Image: resolved.K8sIceboomImage, + Port: resolved.K8sIceboomPort, + ConfigMap: resolved.K8sIceboomConfigMap, + CPURequest: resolved.K8sIceboomCPURequest, + CPULimit: resolved.K8sIceboomCPULimit, + MemoryRequest: resolved.K8sIceboomMemoryRequest, + MemoryLimit: resolved.K8sIceboomMemoryLimit, + ImagePullPolicy: resolved.K8sIceboomImagePullPolicy, + }, }, } controlplane.RunControlPlane(cpCfg)