Skip to content

Commit e77a853

Browse files
Fix Ai comments
1 parent fb1882c commit e77a853

5 files changed

Lines changed: 311 additions & 29 deletions

File tree

supernode/host_reporter/service.go

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
const (
2222
defaultPollInterval = 5 * time.Second
2323
defaultDialTimeout = 2 * time.Second
24+
defaultTickTimeout = 30 * time.Second
2425

2526
maxConcurrentTargets = 8
2627
)
@@ -103,52 +104,56 @@ func (s *Service) Run(ctx context.Context) error {
103104
}
104105

105106
func (s *Service) tick(ctx context.Context) {
106-
epochResp, err := s.lumera.Audit().GetCurrentEpoch(ctx)
107+
// Bound each reporting cycle so a slow/hung chain RPC cannot block future ticks indefinitely.
108+
tickCtx, cancel := context.WithTimeout(ctx, defaultTickTimeout)
109+
defer cancel()
110+
111+
epochResp, err := s.lumera.Audit().GetCurrentEpoch(tickCtx)
107112
if err != nil || epochResp == nil {
108113
return
109114
}
110115
epochID := epochResp.EpochId
111116
reachability.SetCurrentEpochID(epochID)
112117

113-
anchorResp, err := s.lumera.Audit().GetEpochAnchor(ctx, epochID)
118+
anchorResp, err := s.lumera.Audit().GetEpochAnchor(tickCtx, epochID)
114119
if err != nil || anchorResp == nil || anchorResp.Anchor.EpochId != epochID {
115120
// Anchor may not be committed yet at the epoch boundary; retry on next tick.
116121
return
117122
}
118123

119124
// Idempotency: if a report exists for this epoch, do nothing.
120-
if _, err := s.lumera.Audit().GetEpochReport(ctx, epochID, s.identity); err == nil {
125+
if _, err := s.lumera.Audit().GetEpochReport(tickCtx, epochID, s.identity); err == nil {
121126
return
122127
} else if status.Code(err) != codes.NotFound {
123128
return
124129
}
125130

126-
assignResp, err := s.lumera.Audit().GetAssignedTargets(ctx, s.identity, epochID)
131+
assignResp, err := s.lumera.Audit().GetAssignedTargets(tickCtx, s.identity, epochID)
127132
if err != nil || assignResp == nil {
128133
return
129134
}
130135

131-
storageChallengeObservations := s.buildStorageChallengeObservations(ctx, epochID, assignResp.RequiredOpenPorts, assignResp.TargetSupernodeAccounts)
136+
storageChallengeObservations := s.buildStorageChallengeObservations(tickCtx, epochID, assignResp.RequiredOpenPorts, assignResp.TargetSupernodeAccounts)
132137

133138
hostReport := audittypes.HostReport{
134139
// Intentionally submit 0% usage for CPU/memory so the chain treats these as "unknown".
135140
// Disk usage is reported accurately (legacy-aligned) so disk-based enforcement can work.
136141
CpuUsagePercent: 0,
137142
MemUsagePercent: 0,
138143
}
139-
if diskUsagePercent, ok := s.diskUsagePercent(ctx); ok {
144+
if diskUsagePercent, ok := s.diskUsagePercent(tickCtx); ok {
140145
hostReport.DiskUsagePercent = diskUsagePercent
141146
}
142147

143-
if _, err := s.lumera.AuditMsg().SubmitEpochReport(ctx, epochID, hostReport, storageChallengeObservations); err != nil {
144-
logtrace.Warn(ctx, "epoch report submit failed", logtrace.Fields{
148+
if _, err := s.lumera.AuditMsg().SubmitEpochReport(tickCtx, epochID, hostReport, storageChallengeObservations); err != nil {
149+
logtrace.Warn(tickCtx, "epoch report submit failed", logtrace.Fields{
145150
"epoch_id": epochID,
146151
"error": err.Error(),
147152
})
148153
return
149154
}
150155

151-
logtrace.Info(ctx, "epoch report submitted", logtrace.Fields{
156+
logtrace.Info(tickCtx, "epoch report submitted", logtrace.Fields{
152157
"epoch_id": epochID,
153158
"storage_challenge_observations_count": len(storageChallengeObservations),
154159
})
@@ -250,12 +255,23 @@ func (s *Service) targetHost(ctx context.Context, supernodeAccount string) (stri
250255
if raw == "" {
251256
return "", fmt.Errorf("empty latest address for %s", supernodeAccount)
252257
}
258+
return normalizeProbeHost(raw), nil
259+
}
253260

261+
func normalizeProbeHost(raw string) string {
254262
// LatestAddress is expected to be an IP/host, but tolerate host:port.
255263
if host, _, splitErr := net.SplitHostPort(raw); splitErr == nil && host != "" {
256-
return host, nil
264+
return host
265+
}
266+
267+
// Handle bracketed IPv6 literals without a port, e.g. "[2001:db8::1]".
268+
if strings.HasPrefix(raw, "[") && strings.HasSuffix(raw, "]") {
269+
if unbracketed := strings.TrimPrefix(strings.TrimSuffix(raw, "]"), "["); unbracketed != "" {
270+
return unbracketed
271+
}
257272
}
258-
return raw, nil
273+
274+
return raw
259275
}
260276

261277
func probeTCP(ctx context.Context, host string, port uint32, timeout time.Duration) audittypes.PortState {
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package host_reporter
2+
3+
import "testing"
4+
5+
func TestNormalizeProbeHost(t *testing.T) {
6+
t.Parallel()
7+
8+
tests := []struct {
9+
name string
10+
in string
11+
want string
12+
}{
13+
{name: "ipv4 host only", in: "203.0.113.1", want: "203.0.113.1"},
14+
{name: "host port", in: "example.com:8080", want: "example.com"},
15+
{name: "ipv6 host only", in: "2001:db8::1", want: "2001:db8::1"},
16+
{name: "bracketed ipv6 host only", in: "[2001:db8::1]", want: "2001:db8::1"},
17+
{name: "bracketed ipv6 host port", in: "[2001:db8::1]:8080", want: "2001:db8::1"},
18+
}
19+
20+
for _, tc := range tests {
21+
tc := tc
22+
t.Run(tc.name, func(t *testing.T) {
23+
t.Parallel()
24+
if got := normalizeProbeHost(tc.in); got != tc.want {
25+
t.Fatalf("normalizeProbeHost(%q)=%q want %q", tc.in, got, tc.want)
26+
}
27+
})
28+
}
29+
}

supernode/storage_challenge/service.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,9 @@ const (
4949
scMaxSliceBytes = uint64(65_536)
5050

5151
// scResponseTimeout/scAffirmationTimeout are the gRPC timeouts for recipient proof and observer verification.
52-
scResponseTimeout = 30 * time.Second
53-
scAffirmationTimeout = 30 * time.Second
52+
scResponseTimeout = 30 * time.Second
53+
scAffirmationTimeout = 30 * time.Second
54+
scEvidenceSubmitTimeout = 20 * time.Second
5455

5556
// scCandidateKeysLookbackEpochs is how many epochs back we look for candidate local keys.
5657
scCandidateKeysLookbackEpochs = uint32(1)
@@ -608,7 +609,10 @@ func (s *Service) maybeSubmitEvidence(ctx context.Context, params audittypes.Par
608609
return err
609610
}
610611

611-
_, err = s.lumera.AuditMsg().SubmitEvidence(ctx, recipient, audittypes.EvidenceType_EVIDENCE_TYPE_STORAGE_CHALLENGE_FAILURE, "", string(bz))
612+
submitCtx, cancel := context.WithTimeout(ctx, scEvidenceSubmitTimeout)
613+
defer cancel()
614+
615+
_, err = s.lumera.AuditMsg().SubmitEvidence(submitCtx, recipient, audittypes.EvidenceType_EVIDENCE_TYPE_STORAGE_CHALLENGE_FAILURE, "", string(bz))
612616
if err != nil {
613617
return err
614618
}

supernode/transport/grpc/storage_challenge/handler.go

Lines changed: 31 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ import (
1616
"lukechampine.com/blake3"
1717
)
1818

19+
const maxServedSliceBytes = uint64(65_536)
20+
1921
type Server struct {
2022
supernode.UnimplementedStorageChallengeServiceServer
2123

@@ -48,17 +50,31 @@ func (s *Server) GetSliceProof(ctx context.Context, req *supernode.GetSliceProof
4850
start := req.RequestedStart
4951
end := req.RequestedEnd
5052
if end <= start {
51-
start = 0
52-
end = uint64(len(data))
53-
}
54-
if start >= uint64(len(data)) {
55-
start = 0
56-
}
57-
if end > uint64(len(data)) {
58-
end = uint64(len(data))
59-
}
60-
if end < start {
61-
end = start
53+
return &supernode.GetSliceProofResponse{
54+
ChallengeId: req.ChallengeId,
55+
EpochId: req.EpochId,
56+
FileKey: req.FileKey,
57+
RecipientId: s.identity,
58+
Ok: false,
59+
Error: "invalid requested range: requested_end must be greater than requested_start",
60+
}, nil
61+
}
62+
dataLen := uint64(len(data))
63+
if start >= dataLen {
64+
return &supernode.GetSliceProofResponse{
65+
ChallengeId: req.ChallengeId,
66+
EpochId: req.EpochId,
67+
FileKey: req.FileKey,
68+
RecipientId: s.identity,
69+
Ok: false,
70+
Error: "invalid requested range: requested_start is out of bounds",
71+
}, nil
72+
}
73+
if end > dataLen {
74+
end = dataLen
75+
}
76+
if end-start > maxServedSliceBytes {
77+
end = start + maxServedSliceBytes
6278
}
6379

6480
slice := make([]byte, int(end-start))
@@ -118,12 +134,12 @@ func (s *Server) persistRecipientProof(ctx context.Context, req *supernode.GetSl
118134

119135
challenge := types.MessageData{
120136
ChallengerID: req.ChallengerId,
121-
RecipientID: req.RecipientId,
137+
RecipientID: s.identity,
122138
Observers: append([]string(nil), req.ObserverIds...),
123139
Challenge: types.ChallengeData{
124140
FileHash: req.FileKey,
125-
StartIndex: int(req.RequestedStart),
126-
EndIndex: int(req.RequestedEnd),
141+
StartIndex: int(resp.Start),
142+
EndIndex: int(resp.End),
127143
Timestamp: time.Now().UTC(),
128144
},
129145
}
@@ -148,7 +164,7 @@ func (s *Server) persistRecipientProof(ctx context.Context, req *supernode.GetSl
148164

149165
response := types.MessageData{
150166
ChallengerID: req.ChallengerId,
151-
RecipientID: req.RecipientId,
167+
RecipientID: s.identity,
152168
Observers: append([]string(nil), req.ObserverIds...),
153169
Response: types.ResponseData{
154170
Hash: resp.ProofHashHex,

0 commit comments

Comments
 (0)