Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ All notable changes to this project will be documented in this file.
- Add `twamp-debug` diagnostic tool for testing kernel timestamping support on switches; sends real TWAMP probes to verify which SO_TIMESTAMPING modes (RX/TX software/hardware/sched) actually deliver timestamps, and reports RTT statistics comparing userspace vs kernel timestamp sources
- E2E Tests
- Switch backward compatibility test to install versioned CLI binaries from GitHub releases instead of Cloudsmith apt repos; version enumeration now uses the GitHub API directly from Go rather than querying apt-cache inside the container
- Client
- Add `doublezero_connection_info` Prometheus metric exposing connection metadata (user_type, network, current_device, metro, tunnel_name, tunnel_src, tunnel_dst) ([#3201](https://github.com/malbeclabs/doublezero/pull/3201))
- Add `doublezero_connection_rtt_nanoseconds` and `doublezero_connection_loss_percentage` Prometheus metrics reporting RTT and packet loss to the current connected device

## [v0.15.0](https://github.com/malbeclabs/doublezero/compare/client/v0.14.0...client/v0.15.0) - 2026-03-27

Expand Down
68 changes: 64 additions & 4 deletions client/doublezerod/internal/manager/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ import (
// V2ServiceStatus wraps a StatusResponse with enriched fields.
type V2ServiceStatus struct {
*api.StatusResponse
CurrentDevice string `json:"current_device"`
LowestLatencyDevice string `json:"lowest_latency_device"`
Metro string `json:"metro"`
Tenant string `json:"tenant"`
CurrentDevice string `json:"current_device"`
CurrentDeviceRttNanoseconds int64 `json:"current_device_rtt_nanoseconds,omitempty"`
CurrentDeviceLossPercentage float64 `json:"current_device_loss_percentage,omitempty"`
LowestLatencyDevice string `json:"lowest_latency_device"`
Metro string `json:"metro"`
Tenant string `json:"tenant"`
}

// V2StatusResponse is the response for the /v2/status endpoint.
Expand Down Expand Up @@ -68,6 +70,7 @@ func (n *NetlinkManager) ServeProvision(w http.ResponseWriter, r *http.Request)
return
}

n.updateConnectionInfoMetric()
_, _ = w.Write([]byte(`{"status": "ok"}`))
}

Expand Down Expand Up @@ -97,6 +100,7 @@ func (n *NetlinkManager) ServeRemove(w http.ResponseWriter, r *http.Request) {
return
}

n.updateConnectionInfoMetric()
_, _ = w.Write([]byte(`{"status": "ok"}`))
}

Expand Down Expand Up @@ -167,6 +171,55 @@ func (n *NetlinkManager) ServeV2Status(w http.ResponseWriter, _ *http.Request) {
})
}

// updateConnectionInfoMetric resets and repopulates the doublezero_connection_info,
// doublezero_connection_rtt_nanoseconds, and doublezero_connection_loss_percentage
// gauges with current service metadata.
func (n *NetlinkManager) updateConnectionInfoMetric() {
metricConnectionInfo.Reset()
metricConnectionRttNanoseconds.Reset()
metricConnectionLossPercentage.Reset()

statuses, err := n.Status()
if err != nil || len(statuses) == 0 {
return
}

enriched := n.enrichStatuses(statuses)
for _, svc := range enriched {
metricConnectionInfo.WithLabelValues(
svc.UserType.String(),
n.network,
svc.CurrentDevice,
svc.Metro,
svc.TunnelName,
ipString(svc.TunnelSrc),
ipString(svc.TunnelDst),
).Set(1)
if svc.CurrentDeviceRttNanoseconds > 0 {
metricConnectionRttNanoseconds.WithLabelValues(
svc.UserType.String(),
n.network,
svc.CurrentDevice,
svc.Metro,
).Set(float64(svc.CurrentDeviceRttNanoseconds))
metricConnectionLossPercentage.WithLabelValues(
svc.UserType.String(),
n.network,
svc.CurrentDevice,
svc.Metro,
).Set(svc.CurrentDeviceLossPercentage)
}
}
}

// ipString returns the string representation of an IP, or empty string if nil.
func ipString(ip net.IP) string {
if ip == nil {
return ""
}
return ip.String()
}

// latencyToleranceNS matches the CLI's LATENCY_TOLERANCE_NS (5ms).
const latencyToleranceNS int64 = 5_000_000

Expand Down Expand Up @@ -284,6 +337,13 @@ func (n *NetlinkManager) enrichStatuses(statuses []*api.StatusResponse) []V2Serv
if exch, ok := exchangesByPK[exchPK]; ok {
es.Metro = exch.Name
}
for _, r := range latencyResults {
if r.Device.PubKey == matchedDevice.PubKey && r.Reachable {
es.CurrentDeviceRttNanoseconds = r.Avg
es.CurrentDeviceLossPercentage = r.Loss
break
}
}
}

if matchedUser != nil {
Expand Down
5 changes: 5 additions & 0 deletions client/doublezerod/internal/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,9 @@ func (n *NetlinkManager) reconcilerTeardown() {
}
// Clear cached tunnel src so a fresh lookup is done on next enable.
n.tunnelSrcCache = make(map[string]net.IP)
metricConnectionInfo.Reset()
metricConnectionRttNanoseconds.Reset()
metricConnectionLossPercentage.Reset()
}

func (n *NetlinkManager) reconcile(ctx context.Context) {
Expand Down Expand Up @@ -535,6 +538,8 @@ func (n *NetlinkManager) reconcile(ctx context.Context) {
// Reconcile unicast and multicast services
n.reconcileService(wantUnicast, n.HasUnicastService(), serviceUnicast, api.UserTypeIBRL, devicesByPK, mcastGroupsByPK, allPrefixes, data.Config)
n.reconcileService(wantMulticast, n.HasMulticastService(), serviceMulticast, api.UserTypeMulticast, devicesByPK, mcastGroupsByPK, allPrefixes, data.Config)

n.updateConnectionInfoMetric()
}

func (n *NetlinkManager) reconcileService(
Expand Down
24 changes: 24 additions & 0 deletions client/doublezerod/internal/manager/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,28 @@ var (
},
[]string{labelServiceType},
)

metricConnectionInfo = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "doublezero_connection_info",
Help: "Connection metadata for active DoubleZero services",
},
[]string{"user_type", "network", "current_device", "metro", "tunnel_name", "tunnel_src", "tunnel_dst"},
)

metricConnectionRttNanoseconds = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "doublezero_connection_rtt_nanoseconds",
Help: "Average round-trip time to the current connected DoubleZero device in nanoseconds",
},
[]string{"user_type", "network", "current_device", "metro"},
)

metricConnectionLossPercentage = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "doublezero_connection_loss_percentage",
Help: "Packet loss percentage to the current connected DoubleZero device",
},
[]string{"user_type", "network", "current_device", "metro"},
)
)
108 changes: 108 additions & 0 deletions client/doublezerod/internal/manager/reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@ import (

"github.com/malbeclabs/doublezero/client/doublezerod/internal/api"
"github.com/malbeclabs/doublezero/client/doublezerod/internal/bgp"
"github.com/malbeclabs/doublezero/client/doublezerod/internal/latency"
"github.com/malbeclabs/doublezero/client/doublezerod/internal/pim"
"github.com/malbeclabs/doublezero/client/doublezerod/internal/routing"
"github.com/malbeclabs/doublezero/smartcontract/sdk/go/serviceability"
"github.com/prometheus/client_golang/prometheus/testutil"
)

// --- test mocks ---
Expand Down Expand Up @@ -81,6 +83,14 @@ func (m *mockHeartbeatSender) Start(string, net.IP, []net.IP, int, time.Duration
func (m *mockHeartbeatSender) UpdateGroups([]net.IP) error { return nil }
func (m *mockHeartbeatSender) Close() error { return nil }

type mockLatencyProvider struct {
results []latency.LatencyResult
}

func (m *mockLatencyProvider) GetResultsCache() []latency.LatencyResult {
return m.results
}

// --- test helpers ---

func newTestNLM(fetcher Fetcher, opts ...Option) *NetlinkManager {
Expand Down Expand Up @@ -1387,6 +1397,104 @@ func TestServeV2Status_Enrichment(t *testing.T) {
}
}

func TestConnectionInfoMetric(t *testing.T) {
devicePK := [32]byte{1}
exchangePK := [32]byte{2}
clientIP := net.IPv4(1, 2, 3, 4).To4()

device := testDevice(devicePK, [4]uint8{5, 6, 7, 8}, [][5]uint8{{10, 0, 0, 0, 24}})
device.ExchangePubKey = exchangePK
device.Code = "dz1"
device.Status = serviceability.DeviceStatusActivated

user := testUser([4]uint8{1, 2, 3, 4}, devicePK, serviceability.UserTypeIBRL, serviceability.UserStatusActivated)

fetcher := &mockFetcher{
data: &serviceability.ProgramData{
Config: testConfig(),
Devices: []serviceability.Device{device},
Users: []serviceability.User{user},
Exchanges: []serviceability.Exchange{
{PubKey: exchangePK, Name: "Amsterdam"},
},
},
}

latencyProvider := &mockLatencyProvider{
results: []latency.LatencyResult{
{
Device: latency.DeviceInfo{PubKey: devicePK, Code: "dz1"},
Avg: 5_000_000,
Loss: 1.5,
Reachable: true,
},
},
}

dir := t.TempDir()
n := newTestNLM(fetcher,
WithClientIP(clientIP),
WithPollInterval(time.Hour),
WithStateDir(dir),
WithEnabled(true),
WithNetwork("testnet"),
WithLatencyProvider(latencyProvider),
)

// Reset metrics to avoid cross-test pollution from promauto global state.
metricConnectionInfo.Reset()
metricConnectionRttNanoseconds.Reset()

t.Run("populated_after_reconcile", func(t *testing.T) {
n.reconcile(context.Background())

count := testutil.CollectAndCount(metricConnectionInfo)
if count != 1 {
t.Fatalf("expected 1 metric series, got %d", count)
}

val := testutil.ToFloat64(metricConnectionInfo.WithLabelValues(
"IBRL", "testnet", "dz1", "Amsterdam", "doublezero0", "1.2.3.4", "5.6.7.8",
))
if val != 1 {
t.Fatalf("expected metric value 1, got %f", val)
}

rttVal := testutil.ToFloat64(metricConnectionRttNanoseconds.WithLabelValues(
"IBRL", "testnet", "dz1", "Amsterdam",
))
if rttVal != 5_000_000 {
t.Fatalf("expected RTT metric value 5000000, got %f", rttVal)
}

lossVal := testutil.ToFloat64(metricConnectionLossPercentage.WithLabelValues(
"IBRL", "testnet", "dz1", "Amsterdam",
))
if lossVal != 1.5 {
t.Fatalf("expected loss metric value 1.5, got %f", lossVal)
}
})

t.Run("cleared_after_teardown", func(t *testing.T) {
n.reconcilerTeardown()

count := testutil.CollectAndCount(metricConnectionInfo)
if count != 0 {
t.Fatalf("expected 0 metric series after teardown, got %d", count)
}

rttCount := testutil.CollectAndCount(metricConnectionRttNanoseconds)
if rttCount != 0 {
t.Fatalf("expected 0 RTT metric series after teardown, got %d", rttCount)
}

lossCount := testutil.CollectAndCount(metricConnectionLossPercentage)
if lossCount != 0 {
t.Fatalf("expected 0 loss metric series after teardown, got %d", lossCount)
}
})
}

func TestServeV2Status_NoFetcher(t *testing.T) {
dir := t.TempDir()
// Create NLM with a nil-data fetcher to verify enrichment handles missing data gracefully.
Expand Down
Loading