Skip to content
5 changes: 2 additions & 3 deletions api/v1alpha1/seinode_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,9 +347,8 @@ type SeiNodeStatus struct {
// +optional
Plan *TaskPlan `json:"plan,omitempty"`

// ResolvedPeers is the current set of peer DNS hostnames discovered
// from label-based peer sources. Reconciled continuously so that
// future peer-update plans can detect drift.
// ResolvedPeers carries `<node_id>@<host>:<port>` entries resolved
// from label-based peer sources, ready for CometBFT's persistent_peers.
// +optional
ResolvedPeers []string `json:"resolvedPeers,omitempty"`

Expand Down
5 changes: 2 additions & 3 deletions config/crd/sei.io_seinodes.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -957,9 +957,8 @@ spec:
type: object
resolvedPeers:
description: |-
ResolvedPeers is the current set of peer DNS hostnames discovered
from label-based peer sources. Reconciled continuously so that
future peer-update plans can detect drift.
ResolvedPeers carries `<node_id>@<host>:<port>` entries resolved
from label-based peer sources, ready for CometBFT's persistent_peers.
items:
type: string
type: array
Expand Down
65 changes: 60 additions & 5 deletions internal/controller/node/peers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,23 @@ package node

import (
"context"
"errors"
"fmt"
"slices"
"strings"

seiconfig "github.com/sei-protocol/sei-config"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"

seiv1alpha1 "github.com/sei-protocol/sei-k8s-controller/api/v1alpha1"
"github.com/sei-protocol/sei-k8s-controller/internal/task"
)

// errNoSidecarFactory honors planner.NodeResolver's nilable-factory
// contract: resolveLabelPeers treats nil as a transient peer failure.
var errNoSidecarFactory = errors.New("sidecar client factory is nil")

func (r *SeiNodeReconciler) reconcilePeers(ctx context.Context, node *seiv1alpha1.SeiNode) error {
var resolved []string
for _, src := range node.Spec.Peers {
Expand All @@ -32,13 +41,16 @@ func (r *SeiNodeReconciler) reconcilePeers(ctx context.Context, node *seiv1alpha
return nil
}

// resolveLabelPeers lists SeiNode resources matching the label selector
// and returns their stable headless Service DNS hostnames.
// resolveLabelPeers returns fully-composed `<node_id>@<host>:<port>`
// strings for SeiNodes matching the selector. Per-peer sidecar failures
// preserve the prior entry from Status.ResolvedPeers (so transients
// don't wedge fleet-wide reconciles) or skip with a log line.
func (r *SeiNodeReconciler) resolveLabelPeers(
ctx context.Context,
node *seiv1alpha1.SeiNode,
src *seiv1alpha1.LabelPeerSource,
) ([]string, error) {
logger := log.FromContext(ctx)
ns := node.Namespace
if src.Namespace != "" {
ns = src.Namespace
Expand All @@ -52,15 +64,58 @@ func (r *SeiNodeReconciler) resolveLabelPeers(
return nil, fmt.Errorf("listing peers by label: %w", err)
}

prior := indexResolvedPeersByHost(node.Status.ResolvedPeers)
var endpoints []string
for i := range nodeList.Items {
peer := &nodeList.Items[i]
if peer.Name == node.Name && peer.Namespace == node.Namespace {
continue
}
dns := fmt.Sprintf("%s-0.%s.%s.svc.cluster.local",
peer.Name, peer.Name, peer.Namespace)
endpoints = append(endpoints, dns)

address := peerAddress(peer)
var sc task.SidecarClient
err := errNoSidecarFactory
if r.Planner.BuildSidecarClient != nil {
sc, err = r.Planner.BuildSidecarClient(peer)
}
if err == nil {
var nodeID string
nodeID, err = sc.GetNodeID(ctx)
if err == nil {
endpoints = append(endpoints, fmt.Sprintf("%s@%s", nodeID, address))
continue
}
}
if existing, ok := prior[address]; ok {
logger.Info("preserving prior peer entry; node_id fetch failed", "peer", peer.Name, "err", err)
endpoints = append(endpoints, existing)
continue
}
logger.Info("skipping peer until node_id is resolvable", "peer", peer.Name, "err", err)
}
return endpoints, nil
}

// indexResolvedPeersByHost maps `host:port` → `<node_id>@host:port` for
// O(1) lookup of the prior composed entry on transient failure.
func indexResolvedPeersByHost(peers []string) map[string]string {
out := make(map[string]string, len(peers))
for _, p := range peers {
at := strings.Index(p, "@")
if at <= 0 || at == len(p)-1 {
continue
}
out[p[at+1:]] = p
}
return out
}

// peerAddress returns Spec.ExternalAddress (already host:port) when set,
// otherwise the headless Service DNS at the standard P2P port.
func peerAddress(peer *seiv1alpha1.SeiNode) string {
if peer.Spec.ExternalAddress != "" {
return peer.Spec.ExternalAddress
}
return fmt.Sprintf("%s-0.%s.%s.svc.cluster.local:%d",
peer.Name, peer.Name, peer.Namespace, seiconfig.PortP2P)
}
212 changes: 206 additions & 6 deletions internal/controller/node/peers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,17 @@ import (
seiv1alpha1 "github.com/sei-protocol/sei-k8s-controller/api/v1alpha1"
)

const (
testRoleLabel = "role"
testRoleValue = "validator"
testConsumerName = "consumer"
testPeer1ResolvedID = "mock-node-id@peer-1-0.peer-1.default.svc.cluster.local:26656"
)

type errStub string

func (e errStub) Error() string { return string(e) }

func TestReconcilePeers_ResolvesLabelSource(t *testing.T) {
node := &seiv1alpha1.SeiNode{
ObjectMeta: metav1.ObjectMeta{Name: "my-node", Namespace: "default"},
Expand Down Expand Up @@ -49,8 +60,8 @@ func TestReconcilePeers_ResolvesLabelSource(t *testing.T) {
t.Fatalf("expected 2 resolved peers, got %d: %v", len(node.Status.ResolvedPeers), node.Status.ResolvedPeers)
}
want := []string{
"peer-1-0.peer-1.default.svc.cluster.local",
"peer-2-0.peer-2.default.svc.cluster.local",
testPeer1ResolvedID,
"mock-node-id@peer-2-0.peer-2.default.svc.cluster.local:26656",
}
for i, w := range want {
if node.Status.ResolvedPeers[i] != w {
Expand All @@ -59,6 +70,46 @@ func TestReconcilePeers_ResolvesLabelSource(t *testing.T) {
}
}

func TestReconcilePeers_PrefersExternalAddress(t *testing.T) {
node := &seiv1alpha1.SeiNode{
ObjectMeta: metav1.ObjectMeta{Name: testConsumerName, Namespace: "default"},
Spec: seiv1alpha1.SeiNodeSpec{
ChainID: "test-1",
Image: "sei:latest",
Peers: []seiv1alpha1.PeerSource{
{Label: &seiv1alpha1.LabelPeerSource{
Selector: map[string]string{testRoleLabel: "publishable"},
}},
},
FullNode: &seiv1alpha1.FullNodeSpec{},
},
}
peer := &seiv1alpha1.SeiNode{
ObjectMeta: metav1.ObjectMeta{
Name: "pub-peer", Namespace: "default",
Labels: map[string]string{testRoleLabel: "publishable"},
},
Spec: seiv1alpha1.SeiNodeSpec{
ChainID: "test-1",
Image: "sei:latest",
ExternalAddress: "pub-peer-p2p.test-1.example.com:26656",
FullNode: &seiv1alpha1.FullNodeSpec{},
},
}

r, _ := newNodeReconciler(t, node, peer)
if err := r.reconcilePeers(context.Background(), node); err != nil {
t.Fatalf("reconcilePeers: %v", err)
}
if len(node.Status.ResolvedPeers) != 1 {
t.Fatalf("expected 1 peer, got %d: %v", len(node.Status.ResolvedPeers), node.Status.ResolvedPeers)
}
want := "mock-node-id@pub-peer-p2p.test-1.example.com:26656"
if node.Status.ResolvedPeers[0] != want {
t.Errorf("resolvedPeers[0] = %q, want %q", node.Status.ResolvedPeers[0], want)
}
}

func TestReconcilePeers_ExcludesSelf(t *testing.T) {
node := &seiv1alpha1.SeiNode{
ObjectMeta: metav1.ObjectMeta{
Expand Down Expand Up @@ -94,7 +145,7 @@ func TestReconcilePeers_ExcludesSelf(t *testing.T) {
if len(node.Status.ResolvedPeers) != 1 {
t.Fatalf("expected 1 resolved peer (self excluded), got %d: %v", len(node.Status.ResolvedPeers), node.Status.ResolvedPeers)
}
if node.Status.ResolvedPeers[0] != "other-node-0.other-node.default.svc.cluster.local" {
if node.Status.ResolvedPeers[0] != "mock-node-id@other-node-0.other-node.default.svc.cluster.local:26656" {
t.Errorf("resolvedPeers[0] = %q", node.Status.ResolvedPeers[0])
}
}
Expand All @@ -107,7 +158,7 @@ func TestReconcilePeers_CrossNamespace_DoesNotExcludeMatchingName(t *testing.T)
Image: "sei:latest",
Peers: []seiv1alpha1.PeerSource{
{Label: &seiv1alpha1.LabelPeerSource{
Selector: map[string]string{"role": "peer"},
Selector: map[string]string{testRoleLabel: "peer"},
Namespace: "ns-b",
}},
},
Expand All @@ -118,7 +169,7 @@ func TestReconcilePeers_CrossNamespace_DoesNotExcludeMatchingName(t *testing.T)
peerSameName := &seiv1alpha1.SeiNode{
ObjectMeta: metav1.ObjectMeta{
Name: "shared-name", Namespace: "ns-b",
Labels: map[string]string{"role": "peer"},
Labels: map[string]string{testRoleLabel: "peer"},
},
Spec: seiv1alpha1.SeiNodeSpec{ChainID: "test-1", Image: "sei:latest", FullNode: &seiv1alpha1.FullNodeSpec{}},
}
Expand Down Expand Up @@ -149,7 +200,7 @@ func TestReconcilePeers_NoPatchWhenUnchanged(t *testing.T) {
FullNode: &seiv1alpha1.FullNodeSpec{},
},
Status: seiv1alpha1.SeiNodeStatus{
ResolvedPeers: []string{"peer-1-0.peer-1.default.svc.cluster.local"},
ResolvedPeers: []string{testPeer1ResolvedID},
},
}
peer := &seiv1alpha1.SeiNode{
Expand Down Expand Up @@ -189,6 +240,155 @@ func TestReconcilePeers_NoLabelSources_NoPatch(t *testing.T) {
// No label sources means no resolved peers, no patch — just verifying no error
}

// Transient sidecar failure: prior entry is preserved.
func TestReconcilePeers_PreservesPriorEntryOnTransientFailure(t *testing.T) {
node := &seiv1alpha1.SeiNode{
ObjectMeta: metav1.ObjectMeta{Name: testConsumerName, Namespace: "default"},
Spec: seiv1alpha1.SeiNodeSpec{
ChainID: "test-1",
Image: "sei:latest",
Peers: []seiv1alpha1.PeerSource{
{Label: &seiv1alpha1.LabelPeerSource{
Selector: map[string]string{testRoleLabel: testRoleValue},
}},
},
FullNode: &seiv1alpha1.FullNodeSpec{},
},
Status: seiv1alpha1.SeiNodeStatus{
ResolvedPeers: []string{
testPeer1ResolvedID,
},
},
}
peer := &seiv1alpha1.SeiNode{
ObjectMeta: metav1.ObjectMeta{
Name: "peer-1", Namespace: "default",
Labels: map[string]string{testRoleLabel: testRoleValue},
},
Spec: seiv1alpha1.SeiNodeSpec{ChainID: "test-1", Image: "sei:latest", FullNode: &seiv1alpha1.FullNodeSpec{}},
}

// Sidecar returns an error — simulates a peer mid-restart.
mock := &mockSidecarClient{nodeIDErr: errStub("sidecar unreachable")}
r, _ := newNodeReconcilerWithSidecar(t, mock, node, peer)

if err := r.reconcilePeers(context.Background(), node); err != nil {
t.Fatalf("reconcilePeers errored on transient peer failure: %v", err)
}
if len(node.Status.ResolvedPeers) != 1 {
t.Fatalf("expected prior entry preserved, got %d: %v", len(node.Status.ResolvedPeers), node.Status.ResolvedPeers)
}
want := testPeer1ResolvedID
if node.Status.ResolvedPeers[0] != want {
t.Errorf("resolvedPeers[0] = %q, want preserved %q", node.Status.ResolvedPeers[0], want)
}
}

// New peer with no prior entry + sidecar failure: skip, don't wedge.
func TestReconcilePeers_SkipsNewPeerOnSidecarFailure(t *testing.T) {
node := &seiv1alpha1.SeiNode{
ObjectMeta: metav1.ObjectMeta{Name: testConsumerName, Namespace: "default"},
Spec: seiv1alpha1.SeiNodeSpec{
ChainID: "test-1",
Image: "sei:latest",
Peers: []seiv1alpha1.PeerSource{
{Label: &seiv1alpha1.LabelPeerSource{
Selector: map[string]string{testRoleLabel: testRoleValue},
}},
},
FullNode: &seiv1alpha1.FullNodeSpec{},
},
}
peer := &seiv1alpha1.SeiNode{
ObjectMeta: metav1.ObjectMeta{
Name: "peer-1", Namespace: "default",
Labels: map[string]string{testRoleLabel: testRoleValue},
},
Spec: seiv1alpha1.SeiNodeSpec{ChainID: "test-1", Image: "sei:latest", FullNode: &seiv1alpha1.FullNodeSpec{}},
}

mock := &mockSidecarClient{nodeIDErr: errStub("sidecar unreachable")}
r, _ := newNodeReconcilerWithSidecar(t, mock, node, peer)

if err := r.reconcilePeers(context.Background(), node); err != nil {
t.Fatalf("reconcilePeers errored on new-peer sidecar failure: %v", err)
}
if len(node.Status.ResolvedPeers) != 0 {
t.Fatalf("expected new unresolvable peer to be skipped, got %d: %v", len(node.Status.ResolvedPeers), node.Status.ResolvedPeers)
}
}

// Nil factory + new peer: skip without panic.
func TestReconcilePeers_NilSidecarFactorySkipsNewPeer(t *testing.T) {
node := &seiv1alpha1.SeiNode{
ObjectMeta: metav1.ObjectMeta{Name: testConsumerName, Namespace: "default"},
Spec: seiv1alpha1.SeiNodeSpec{
ChainID: "test-1",
Image: "sei:latest",
Peers: []seiv1alpha1.PeerSource{
{Label: &seiv1alpha1.LabelPeerSource{
Selector: map[string]string{testRoleLabel: testRoleValue},
}},
},
FullNode: &seiv1alpha1.FullNodeSpec{},
},
}
peer := &seiv1alpha1.SeiNode{
ObjectMeta: metav1.ObjectMeta{
Name: "peer-1", Namespace: "default",
Labels: map[string]string{testRoleLabel: testRoleValue},
},
Spec: seiv1alpha1.SeiNodeSpec{ChainID: "test-1", Image: "sei:latest", FullNode: &seiv1alpha1.FullNodeSpec{}},
}

r, _ := newNodeReconciler(t, node, peer)
r.Planner.BuildSidecarClient = nil

if err := r.reconcilePeers(context.Background(), node); err != nil {
t.Fatalf("reconcilePeers errored on nil factory: %v", err)
}
if len(node.Status.ResolvedPeers) != 0 {
t.Fatalf("expected unresolvable peer to be skipped, got %d: %v", len(node.Status.ResolvedPeers), node.Status.ResolvedPeers)
}
}

// Nil factory + prior entry: preserve-prior branch fires.
func TestReconcilePeers_NilSidecarFactoryPreservesPriorEntry(t *testing.T) {
node := &seiv1alpha1.SeiNode{
ObjectMeta: metav1.ObjectMeta{Name: testConsumerName, Namespace: "default"},
Spec: seiv1alpha1.SeiNodeSpec{
ChainID: "test-1",
Image: "sei:latest",
Peers: []seiv1alpha1.PeerSource{
{Label: &seiv1alpha1.LabelPeerSource{
Selector: map[string]string{testRoleLabel: testRoleValue},
}},
},
FullNode: &seiv1alpha1.FullNodeSpec{},
},
Status: seiv1alpha1.SeiNodeStatus{
ResolvedPeers: []string{testPeer1ResolvedID},
},
}
peer := &seiv1alpha1.SeiNode{
ObjectMeta: metav1.ObjectMeta{
Name: "peer-1", Namespace: "default",
Labels: map[string]string{testRoleLabel: testRoleValue},
},
Spec: seiv1alpha1.SeiNodeSpec{ChainID: "test-1", Image: "sei:latest", FullNode: &seiv1alpha1.FullNodeSpec{}},
}

r, _ := newNodeReconciler(t, node, peer)
r.Planner.BuildSidecarClient = nil

if err := r.reconcilePeers(context.Background(), node); err != nil {
t.Fatalf("reconcilePeers errored on nil factory: %v", err)
}
if len(node.Status.ResolvedPeers) != 1 || node.Status.ResolvedPeers[0] != testPeer1ResolvedID {
t.Fatalf("expected prior entry preserved, got %v", node.Status.ResolvedPeers)
}
}

func TestReconcilePeers_DeduplicatesOverlappingSources(t *testing.T) {
node := &seiv1alpha1.SeiNode{
ObjectMeta: metav1.ObjectMeta{Name: "my-node", Namespace: "default"},
Expand Down
Loading
Loading