diff --git a/cmd/obol/network.go b/cmd/obol/network.go index 8c8940b3..41a206e9 100644 --- a/cmd/obol/network.go +++ b/cmd/obol/network.go @@ -38,11 +38,11 @@ func networkCommand(cfg *config.Config) *cli.Command { }, { Name: "sync", - Usage: "Deploy or update network configuration to cluster", - ArgsUsage: "/ or -", + Usage: "Deploy or update network configuration to cluster (no args = sync all)", + ArgsUsage: "[/]", Action: func(c *cli.Context) error { if c.NArg() == 0 { - return fmt.Errorf("deployment identifier required (e.g., ethereum/knowing-wahoo or ethereum-knowing-wahoo)") + return network.SyncAll(cfg) } deploymentIdentifier := c.Args().First() return network.Sync(cfg, deploymentIdentifier) diff --git a/internal/embed/infrastructure/values/erpc.yaml.gotmpl b/internal/embed/infrastructure/values/erpc.yaml.gotmpl index e187651a..40269862 100644 --- a/internal/embed/infrastructure/values/erpc.yaml.gotmpl +++ b/internal/embed/infrastructure/values/erpc.yaml.gotmpl @@ -65,6 +65,16 @@ config: |- evm: chainId: 1 alias: mainnet + selectionPolicy: + evalInterval: 1m + evalPerMethod: true + evalFunction: | + (upstreams, method) => { + if (method === 'eth_sendRawTransaction') { + return upstreams.filter(u => u.config.id === 'obol-rpc-mainnet'); + } + return upstreams; + } failsafe: timeout: duration: 30s diff --git a/internal/embed/networks/ethereum/helmfile.yaml.gotmpl b/internal/embed/networks/ethereum/helmfile.yaml.gotmpl index 8593c500..198351d8 100644 --- a/internal/embed/networks/ethereum/helmfile.yaml.gotmpl +++ b/internal/embed/networks/ethereum/helmfile.yaml.gotmpl @@ -85,6 +85,7 @@ releases: metadata.json: | { "network": "{{ .Values.network }}", + "chainId": {{ if eq .Values.network "mainnet" }}1{{ else if eq .Values.network "hoodi" }}560048{{ else if eq .Values.network "sepolia" }}11155111{{ else }}0{{ end }}, "execution": { "client": "{{ .Values.executionClient }}", "endpoints": { diff --git a/internal/network/erpc.go b/internal/network/erpc.go new file mode 100644 index 00000000..d18a2824 --- /dev/null +++ b/internal/network/erpc.go @@ -0,0 +1,252 @@ +package network + +import ( + "bytes" + "encoding/json" + "fmt" + "os" + "os/exec" + "path/filepath" + "strings" + + "github.com/ObolNetwork/obol-stack/internal/config" + "gopkg.in/yaml.v3" +) + +const ( + erpcNamespace = "erpc" + erpcConfigMapName = "erpc-config" + erpcConfigKey = "erpc.yaml" + erpcDeployment = "erpc" +) + +// networkChainIDs maps network names to EVM chain IDs. +var networkChainIDs = map[string]int{ + "mainnet": 1, + "hoodi": 560048, + "sepolia": 11155111, +} + +// RegisterERPCUpstream reads the deployed network's RPC endpoint and adds +// it as an upstream in the eRPC ConfigMap. The local node becomes the +// primary upstream (group: "primary") with automatic fallback to existing +// remote upstreams. +func RegisterERPCUpstream(cfg *config.Config, networkType, id string) error { + // Read values.yaml to get the network name (mainnet, hoodi, etc.) + deploymentDir := filepath.Join(cfg.ConfigDir, "networks", networkType, id) + valuesPath := filepath.Join(deploymentDir, "values.yaml") + + valuesContent, err := os.ReadFile(valuesPath) + if err != nil { + return fmt.Errorf("could not read values.yaml: %w", err) + } + + var values struct { + Network string `yaml:"network"` + } + if err := yaml.Unmarshal(valuesContent, &values); err != nil { + return fmt.Errorf("could not parse values.yaml: %w", err) + } + + chainID, ok := networkChainIDs[values.Network] + if !ok { + return fmt.Errorf("unknown network %q — no chain ID mapping", values.Network) + } + + // Build the internal RPC endpoint for this network's execution client + namespace := fmt.Sprintf("%s-%s", networkType, id) + endpoint := fmt.Sprintf("http://ethereum-execution.%s.svc.cluster.local:8545", namespace) + upstreamID := fmt.Sprintf("local-%s-%s", networkType, id) + + return patchERPCUpstream(cfg, upstreamID, endpoint, chainID, true) +} + +// DeregisterERPCUpstream removes a previously registered local upstream +// from the eRPC ConfigMap. +func DeregisterERPCUpstream(cfg *config.Config, networkType, id string) error { + upstreamID := fmt.Sprintf("local-%s-%s", networkType, id) + return patchERPCUpstream(cfg, upstreamID, "", 0, false) +} + +// patchERPCUpstream adds or removes an upstream in the eRPC ConfigMap and +// restarts the eRPC deployment. When add is true, it adds/updates the +// upstream. When false, it removes it. +func patchERPCUpstream(cfg *config.Config, upstreamID, endpoint string, chainID int, add bool) error { + kubectlBin := filepath.Join(cfg.BinDir, "kubectl") + kubeconfigPath := filepath.Join(cfg.ConfigDir, "kubeconfig.yaml") + + if _, err := os.Stat(kubeconfigPath); os.IsNotExist(err) { + return fmt.Errorf("cluster not running") + } + + // Read current eRPC config from ConfigMap + configYAML, err := kubectlOutput(kubectlBin, kubeconfigPath, + "get", "configmap", erpcConfigMapName, "-n", erpcNamespace, + "-o", fmt.Sprintf("jsonpath={.data.%s}", strings.ReplaceAll(erpcConfigKey, ".", "\\."))) + if err != nil { + return fmt.Errorf("could not read eRPC config: %w", err) + } + + // Parse the YAML config + var erpcConfig map[string]interface{} + if err := yaml.Unmarshal([]byte(configYAML), &erpcConfig); err != nil { + return fmt.Errorf("could not parse eRPC config: %w", err) + } + + // Navigate to projects[0].upstreams + projects, ok := erpcConfig["projects"].([]interface{}) + if !ok || len(projects) == 0 { + return fmt.Errorf("eRPC config has no projects") + } + project, ok := projects[0].(map[string]interface{}) + if !ok { + return fmt.Errorf("eRPC config project[0] is not a map") + } + + upstreams, _ := project["upstreams"].([]interface{}) + + // Remove existing upstream with this ID (idempotent) + filtered := make([]interface{}, 0, len(upstreams)) + for _, u := range upstreams { + um, ok := u.(map[string]interface{}) + if !ok { + filtered = append(filtered, u) + continue + } + if um["id"] == upstreamID { + continue // remove it + } + filtered = append(filtered, u) + } + + if add { + // Add the new upstream at the front of the array. eRPC tries + // upstreams in order, so position 0 = highest priority. This gives + // local-first routing with automatic fallback to remote RPCs. + // + // Write methods are blocked on local nodes so transactions are + // always routed through the designated write upstream (e.g. + // obol-rpc-mainnet) rather than leaking to the public mempool. + newUpstream := map[string]interface{}{ + "id": upstreamID, + "endpoint": endpoint, + "evm": map[string]interface{}{ + "chainId": chainID, + }, + "ignoreMethods": []interface{}{ + "eth_sendRawTransaction", + "eth_sendTransaction", + }, + } + filtered = append([]interface{}{newUpstream}, filtered...) + } + + project["upstreams"] = filtered + + // If no network entry exists for this chainID yet, add one so eRPC + // knows how to route requests for this chain. We do NOT touch existing + // selectionPolicy entries — they may contain method-specific routing + // (e.g. write-only upstreams like blink). eRPC tries upstreams in + // array order, so inserting the local node at position 0 is sufficient + // for local-first routing with automatic remote fallback. + if add { + networks, _ := project["networks"].([]interface{}) + found := false + for _, n := range networks { + nm, ok := n.(map[string]interface{}) + if !ok { + continue + } + evm, _ := nm["evm"].(map[string]interface{}) + if evm == nil { + continue + } + if cid, _ := evm["chainId"].(int); cid == chainID { + found = true + break + } + } + if !found { + newNetwork := map[string]interface{}{ + "architecture": "evm", + "evm": map[string]interface{}{"chainId": chainID}, + "failsafe": map[string]interface{}{ + "timeout": map[string]interface{}{"duration": "30s"}, + "retry": map[string]interface{}{"maxAttempts": 2, "delay": "100ms"}, + }, + } + networks = append(networks, newNetwork) + project["networks"] = networks + } + } + + // Serialize back to YAML + updatedYAML, err := yaml.Marshal(erpcConfig) + if err != nil { + return fmt.Errorf("could not serialize eRPC config: %w", err) + } + + // Patch the ConfigMap + patchData := map[string]interface{}{ + "data": map[string]string{ + erpcConfigKey: string(updatedYAML), + }, + } + patchJSON, err := json.Marshal(patchData) + if err != nil { + return fmt.Errorf("could not marshal patch: %w", err) + } + + if err := kubectl(kubectlBin, kubeconfigPath, + "patch", "configmap", erpcConfigMapName, "-n", erpcNamespace, + "-p", string(patchJSON), "--type=merge"); err != nil { + return fmt.Errorf("could not patch eRPC ConfigMap: %w", err) + } + + // Restart eRPC to pick up new config + if err := kubectl(kubectlBin, kubeconfigPath, + "rollout", "restart", fmt.Sprintf("deployment/%s", erpcDeployment), "-n", erpcNamespace); err != nil { + return fmt.Errorf("could not restart eRPC: %w", err) + } + + if add { + fmt.Printf("✓ Registered local upstream %s with eRPC (chainId: %d)\n", upstreamID, chainID) + } else { + fmt.Printf("✓ Deregistered upstream %s from eRPC\n", upstreamID) + } + + return nil +} + +// kubectl runs a kubectl command, capturing stderr for error messages. +func kubectl(binary, kubeconfig string, args ...string) error { + cmd := exec.Command(binary, args...) + cmd.Env = append(os.Environ(), fmt.Sprintf("KUBECONFIG=%s", kubeconfig)) + var stderr bytes.Buffer + cmd.Stderr = &stderr + if err := cmd.Run(); err != nil { + errMsg := strings.TrimSpace(stderr.String()) + if errMsg != "" { + return fmt.Errorf("%w: %s", err, errMsg) + } + return err + } + return nil +} + +// kubectlOutput runs a kubectl command and returns stdout. +func kubectlOutput(binary, kubeconfig string, args ...string) (string, error) { + cmd := exec.Command(binary, args...) + cmd.Env = append(os.Environ(), fmt.Sprintf("KUBECONFIG=%s", kubeconfig)) + var stdout, stderr bytes.Buffer + cmd.Stdout = &stdout + cmd.Stderr = &stderr + if err := cmd.Run(); err != nil { + errMsg := strings.TrimSpace(stderr.String()) + if errMsg != "" { + return "", fmt.Errorf("%w: %s", err, errMsg) + } + return "", err + } + return stdout.String(), nil +} diff --git a/internal/network/erpc_test.go b/internal/network/erpc_test.go new file mode 100644 index 00000000..01ec3e17 --- /dev/null +++ b/internal/network/erpc_test.go @@ -0,0 +1,274 @@ +package network + +import ( + "strings" + "testing" + + "gopkg.in/yaml.v3" +) + +func TestPatchERPCConfig_AddUpstream(t *testing.T) { + // Simulate the eRPC config YAML that lives in the ConfigMap + configYAML := `logLevel: debug +projects: + - id: rpc + upstreams: + - id: obol-rpc-mainnet + endpoint: https://erpc.gcp.obol.tech/mainnet/evm/1 + evm: + chainId: 1 + networks: + - architecture: evm + evm: + chainId: 1 + alias: mainnet + failsafe: + timeout: + duration: 30s +` + + var erpcConfig map[string]interface{} + if err := yaml.Unmarshal([]byte(configYAML), &erpcConfig); err != nil { + t.Fatalf("failed to parse test config: %v", err) + } + + projects := erpcConfig["projects"].([]interface{}) + project := projects[0].(map[string]interface{}) + + // Add a local upstream at position 0 (highest priority) + upstreams := project["upstreams"].([]interface{}) + newUpstream := map[string]interface{}{ + "id": "local-ethereum-test", + "endpoint": "http://ethereum-execution.ethereum-test.svc.cluster.local:8545", + "evm": map[string]interface{}{"chainId": 1}, + "ignoreMethods": []interface{}{ + "eth_sendRawTransaction", + "eth_sendTransaction", + }, + } + upstreams = append([]interface{}{newUpstream}, upstreams...) + project["upstreams"] = upstreams + + // Verify local upstream is first (eRPC tries in order for reads) + if len(project["upstreams"].([]interface{})) != 2 { + t.Errorf("expected 2 upstreams, got %d", len(project["upstreams"].([]interface{}))) + } + + first := project["upstreams"].([]interface{})[0].(map[string]interface{}) + if first["id"] != "local-ethereum-test" { + t.Errorf("first upstream should be local, got %v", first["id"]) + } + // Local upstream must block write methods + ignored, ok := first["ignoreMethods"].([]interface{}) + if !ok || len(ignored) != 2 { + t.Fatal("local upstream must have ignoreMethods for write methods") + } + if ignored[0] != "eth_sendRawTransaction" { + t.Errorf("ignoreMethods[0] = %v, want eth_sendRawTransaction", ignored[0]) + } + + second := project["upstreams"].([]interface{})[1].(map[string]interface{}) + if second["id"] != "obol-rpc-mainnet" { + t.Errorf("second upstream should be remote (write-capable), got %v", second["id"]) + } +} + +func TestPatchERPCConfig_RemoveUpstream(t *testing.T) { + configYAML := `projects: + - id: rpc + upstreams: + - id: local-ethereum-test + endpoint: http://localhost:8545 + evm: + chainId: 1 + - id: obol-rpc-mainnet + endpoint: https://erpc.gcp.obol.tech/mainnet/evm/1 + evm: + chainId: 1 +` + + var erpcConfig map[string]interface{} + if err := yaml.Unmarshal([]byte(configYAML), &erpcConfig); err != nil { + t.Fatalf("failed to parse: %v", err) + } + + projects := erpcConfig["projects"].([]interface{}) + project := projects[0].(map[string]interface{}) + upstreams := project["upstreams"].([]interface{}) + + // Filter out the local upstream + var filtered []interface{} + for _, u := range upstreams { + um := u.(map[string]interface{}) + if um["id"] == "local-ethereum-test" { + continue + } + filtered = append(filtered, u) + } + project["upstreams"] = filtered + + if len(filtered) != 1 { + t.Errorf("expected 1 upstream after removal, got %d", len(filtered)) + } + remaining := filtered[0].(map[string]interface{}) + if remaining["id"] != "obol-rpc-mainnet" { + t.Errorf("remaining upstream should be obol-rpc-mainnet, got %v", remaining["id"]) + } +} + +func TestPatchERPCConfig_Idempotent(t *testing.T) { + configYAML := `projects: + - id: rpc + upstreams: + - id: local-ethereum-test + endpoint: http://old-endpoint:8545 + evm: + chainId: 1 + - id: obol-rpc-mainnet + endpoint: https://erpc.gcp.obol.tech/mainnet/evm/1 + evm: + chainId: 1 +` + + var erpcConfig map[string]interface{} + if err := yaml.Unmarshal([]byte(configYAML), &erpcConfig); err != nil { + t.Fatalf("failed to parse: %v", err) + } + + projects := erpcConfig["projects"].([]interface{}) + project := projects[0].(map[string]interface{}) + upstreams := project["upstreams"].([]interface{}) + + // Remove existing, then add updated (same as patchERPCUpstream logic) + var filtered []interface{} + for _, u := range upstreams { + um := u.(map[string]interface{}) + if um["id"] == "local-ethereum-test" { + continue + } + filtered = append(filtered, u) + } + + newUpstream := map[string]interface{}{ + "id": "local-ethereum-test", + "endpoint": "http://new-endpoint:8545", + "evm": map[string]interface{}{"chainId": 1}, + "ignoreMethods": []interface{}{ + "eth_sendRawTransaction", + "eth_sendTransaction", + }, + } + filtered = append([]interface{}{newUpstream}, filtered...) + project["upstreams"] = filtered + + // Should still be 2 upstreams (not 3) + if len(filtered) != 2 { + t.Errorf("expected 2 upstreams (idempotent), got %d", len(filtered)) + } + first := filtered[0].(map[string]interface{}) + if first["endpoint"] != "http://new-endpoint:8545" { + t.Errorf("endpoint should be updated, got %v", first["endpoint"]) + } +} + +func TestPatchERPCConfig_PreservesWriteOnlySelectionPolicy(t *testing.T) { + // The obol-stack eRPC config routes eth_sendRawTransaction exclusively + // to obol-rpc-mainnet. When a local node is registered, the selection + // policy must be preserved — writes still go to obol-rpc-mainnet only, + // while reads use the local node (first in array order). + configYAML := `projects: + - id: rpc + upstreams: + - id: obol-rpc-mainnet + endpoint: https://erpc.gcp.obol.tech/mainnet/evm/1 + evm: + chainId: 1 + networks: + - architecture: evm + evm: + chainId: 1 + selectionPolicy: + evalInterval: 1m + evalPerMethod: true + evalFunction: | + (upstreams, method) => { + if (method === 'eth_sendRawTransaction') { + return upstreams.filter(u => u.config.id === 'obol-rpc-mainnet'); + } + return upstreams; + } +` + + var erpcConfig map[string]interface{} + if err := yaml.Unmarshal([]byte(configYAML), &erpcConfig); err != nil { + t.Fatalf("failed to parse: %v", err) + } + + projects := erpcConfig["projects"].([]interface{}) + project := projects[0].(map[string]interface{}) + + // Simulate what patchERPCUpstream does: add local upstream at front + // with write methods blocked + upstreams := project["upstreams"].([]interface{}) + newUpstream := map[string]interface{}{ + "id": "local-ethereum-prod", + "endpoint": "http://ethereum-execution.ethereum-prod.svc.cluster.local:8545", + "evm": map[string]interface{}{"chainId": 1}, + "ignoreMethods": []interface{}{ + "eth_sendRawTransaction", + "eth_sendTransaction", + }, + } + upstreams = append([]interface{}{newUpstream}, upstreams...) + project["upstreams"] = upstreams + + // Verify: selectionPolicy must be UNTOUCHED + networks := project["networks"].([]interface{}) + mainnet := networks[0].(map[string]interface{}) + sp, ok := mainnet["selectionPolicy"].(map[string]interface{}) + if !ok { + t.Fatal("selectionPolicy was removed") + } + if sp["evalPerMethod"] != true { + t.Error("evalPerMethod should still be true") + } + fn, ok := sp["evalFunction"].(string) + if !ok || !strings.Contains(fn, "obol-rpc-mainnet") { + t.Error("evalFunction should still route writes to obol-rpc-mainnet") + } + + // Verify: 2 upstreams (local + obol), local first for reads + if len(project["upstreams"].([]interface{})) != 2 { + t.Errorf("expected 2 upstreams, got %d", len(project["upstreams"].([]interface{}))) + } + first := project["upstreams"].([]interface{})[0].(map[string]interface{}) + if first["id"] != "local-ethereum-prod" { + t.Errorf("local upstream should be first (for reads), got %v", first["id"]) + } + second := project["upstreams"].([]interface{})[1].(map[string]interface{}) + if second["id"] != "obol-rpc-mainnet" { + t.Errorf("obol-rpc-mainnet should be second (protected write target), got %v", second["id"]) + } +} + +func TestNetworkChainIDs(t *testing.T) { + tests := []struct { + network string + chainID int + }{ + {"mainnet", 1}, + {"hoodi", 560048}, + {"sepolia", 11155111}, + } + + for _, tt := range tests { + got, ok := networkChainIDs[tt.network] + if !ok { + t.Errorf("networkChainIDs missing %q", tt.network) + continue + } + if got != tt.chainID { + t.Errorf("networkChainIDs[%q] = %d, want %d", tt.network, got, tt.chainID) + } + } +} diff --git a/internal/network/network.go b/internal/network/network.go index f7dc8912..6b838b71 100644 --- a/internal/network/network.go +++ b/internal/network/network.go @@ -159,6 +159,50 @@ func Install(cfg *config.Config, network string, overrides map[string]string, fo return nil } +// SyncAll syncs all installed network deployments found in the config directory. +func SyncAll(cfg *config.Config) error { + networksDir := filepath.Join(cfg.ConfigDir, "networks") + networkDirs, err := os.ReadDir(networksDir) + if err != nil { + if os.IsNotExist(err) { + fmt.Println("No networks installed.") + return nil + } + return fmt.Errorf("could not read networks directory: %w", err) + } + + var synced int + for _, networkDir := range networkDirs { + if !networkDir.IsDir() { + continue + } + deployments, err := os.ReadDir(filepath.Join(networksDir, networkDir.Name())) + if err != nil { + continue + } + for _, deployment := range deployments { + if !deployment.IsDir() { + continue + } + identifier := fmt.Sprintf("%s/%s", networkDir.Name(), deployment.Name()) + fmt.Printf("─── Syncing %s ───\n", identifier) + if err := Sync(cfg, identifier); err != nil { + fmt.Printf(" Warning: failed to sync %s: %v\n", identifier, err) + continue + } + synced++ + fmt.Println() + } + } + + if synced == 0 { + fmt.Println("No networks installed. Use 'obol network install ' first.") + } else { + fmt.Printf("✓ Synced %d network deployment(s)\n", synced) + } + return nil +} + // Sync deploys or updates a network configuration to the cluster using helmfile func Sync(cfg *config.Config, deploymentIdentifier string) error { // Parse deployment identifier (supports both "ethereum/knowing-wahoo" and "ethereum-knowing-wahoo") @@ -242,6 +286,12 @@ func Sync(cfg *config.Config, deploymentIdentifier string) error { fmt.Printf("\nDeployment synced successfully!\n") fmt.Printf("Namespace: %s-%s\n", networkName, deploymentID) + + // Register local node as eRPC upstream so the gateway routes through it + if err := RegisterERPCUpstream(cfg, networkName, deploymentID); err != nil { + fmt.Printf(" Warning: could not register eRPC upstream: %v\n", err) + } + fmt.Printf("\nTo check status: obol kubectl get all -n %s-%s\n", networkName, deploymentID) fmt.Printf("To view logs: obol kubectl logs -n %s-%s \n", networkName, deploymentID) fmt.Printf("To access dashboard: obol k9s -n %s-%s\n", networkName, deploymentID) @@ -316,6 +366,11 @@ func Delete(cfg *config.Config, deploymentIdentifier string) error { return fmt.Errorf("deployment not found: %s", deploymentIdentifier) } + // Deregister from eRPC before deleting the namespace + if err := DeregisterERPCUpstream(cfg, networkName, deploymentID); err != nil { + fmt.Printf(" Warning: could not deregister eRPC upstream: %v\n", err) + } + // Delete Kubernetes namespace if namespaceExists { fmt.Printf("\nDeleting namespace %s...\n", namespaceName)