feat: rewrite CLI for ADR-002 direct pub/sub#100
Conversation
📝 WalkthroughWalkthroughReplaces direct per-request HTTP/gRPC publish and protocol-specific subscribe flows with a session-based control plane: adds session negotiation (HTTP), a file-backed session cache, and a node gRPC command-stream client. publish/subscribe CLI commands now request a session from a proxy, select node(s) from the session, and use gRPC bidi command streams to Publish/Subscribe with node tickets. Removes the legacy gRPC proxy client, websocket dependency, most e2e test harness, related Makefile/CI fuzz targets, and updates docs/README and CLI flags (adds --expose-amount, removes --grpc). Sequence Diagram(s)sequenceDiagram
participant User as User/CLI
participant CLI as CLI (publish)
participant SessionSvc as Session Client
participant Proxy as Proxy Service
participant Node as Node gRPC
User->>CLI: publish --topic "test" --message "hi"
activate CLI
CLI->>SessionSvc: CreateSession(proxyURL, clientID, [topic], capabilities, exposeAmount)
activate SessionSvc
SessionSvc->>Proxy: POST /api/v1/session {client_id, topics, expose_amount...}
activate Proxy
Proxy-->>SessionSvc: 200 {session_id, nodes:[{address,ticket,...}]}
deactivate Proxy
SessionSvc-->>CLI: Session{nodes}
deactivate SessionSvc
CLI->>CLI: select best node
CLI->>Node: gRPC ListenCommands() (bidi stream) -> send Publish Request{topic,data,ticket}
activate Node
Node-->>CLI: Response{status,metadata}
deactivate Node
CLI-->>User: Publish successful
deactivate CLI
sequenceDiagram
participant User as User/CLI
participant CLI as CLI (subscribe)
participant SessionSvc as Session Client
participant Proxy as Proxy Service
participant Node as Node gRPC
User->>CLI: subscribe --topic "test"
activate CLI
CLI->>SessionSvc: CreateSession(proxyURL, clientID, [topic], ..., exposeAmount)
activate SessionSvc
SessionSvc->>Proxy: POST /api/v1/session
activate Proxy
Proxy-->>SessionSvc: {session_id, nodes}
deactivate Proxy
SessionSvc-->>CLI: Session{nodes}
deactivate SessionSvc
CLI->>CLI: select best node
CLI->>Node: gRPC ListenCommands() (bidi stream) -> send Subscribe Request{topic,jwt_token}
activate Node
loop messages
Node-->>CLI: Response{data}
CLI->>CLI: decodeMessage & format/display or webhook/persist
end
deactivate Node
deactivate CLI
Estimated code review effort🎯 4 (Complex) | ⏱️ ~75 minutes Possibly related PRs
Suggested reviewers
🚥 Pre-merge checks | ✅ 4 | ❌ 6❌ Failed checks (5 warnings, 1 inconclusive)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Actionable comments posted: 12
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In @.github/workflows/ci.yml:
- Around line 58-68: The inline "NOTE:" comment sits at the same indentation as
the 'fuzz' job and will become an invalid job property if the block is
uncommented; move the NOTE line out of the job block by placing it as a
top-level comment (no leading indentation) above the commented 'fuzz' job block
or keep it indented as a normal comment inside the block body (e.g., under the
steps) so that the 'fuzz' job name and keys remain valid when re-enabled—update
the commented lines around the 'fuzz' job and the "NOTE:" so the 'fuzz' job key
and its children (steps, runs-on) remain correctly indented.
In `@cmd/publish.go`:
- Around line 146-149: Replace the unbounded context used for the one-shot
Publish RPC with a context that has a short timeout: instead of ctx :=
context.Background() create a cancellable timed context via
context.WithTimeout(context.Background(), <duration>) (e.g., 5-15s), defer the
cancel, and pass that ctx into nodeClient.Publish(bestNode.Ticket, pubTopic,
publishData); also ensure the time package is imported. This bounds
connect/send/recv and causes the command to fail fast on dead or black-holed
nodes.
In `@cmd/subscribe.go`:
- Around line 164-190: The code only dials sess.Nodes[0] so failures never fall
back to other exposed nodes; modify the logic that currently picks bestNode :=
sess.Nodes[0] and immediately calls node.NewClient(...) and
nodeClient.Subscribe(...) to instead iterate over the candidate nodes (limit the
iteration by the configured expose amount flag/variable and the length of
sess.Nodes), attempting to create a client with node.NewClient(address) and then
Subscribe(ctx, node.Ticket, subTopic, 100) for each node, closing any client on
error and continuing to the next node, and only returning an error if all
attempts fail; update references to sess.Nodes, bestNode.Ticket, node.NewClient,
nodeClient.Subscribe and ensure ctx/cancel are created once and canceled on
final exit.
- Around line 199-233: The current webhook dispatcher launches a new goroutine
for every dequeued message (inside the anonymous func reading from wq), causing
unbounded concurrency; replace this with a fixed worker pool so the queue
provides backpressure: create a workerCount constant (or use
webhookQueueSize/workers flag), start that many goroutines that each range over
wq and process messages inline (call webhookFormatter.FormatMessage, create
request with http.NewRequestWithContext using webhookTimeoutSecs, do the HTTP
POST, handle resp.Body.Close and status codes) instead of spawning a new
goroutine per message; keep existing symbols webhookMsg, wq, webhookQueueSize,
webhookURL, webhookTimeoutSecs, webhookFormatter, subTopic, and clientIDToUse to
locate and modify the code.
- Around line 153-159: The session is requesting both "publish" and "subscribe"
rights but the command only needs subscribe; update the call to
session.CreateSession so the capabilities slice contains only "subscribe"
(replace []string{"publish","subscribe"} with a single-entry slice containing
"subscribe") while keeping proxyURL, clientIDToUse, []string{subTopic}, and
subExposeAmount unchanged so the session requests least-privilege.
In `@internal/node/client.go`:
- Around line 58-80: The goroutine that reads from stream.Recv currently
swallows non-EOF errors and only closes ch, so callers (e.g., cmd/subscribe.go)
cannot distinguish EOF from a stream failure; change the implementation to
propagate receive errors to the caller by adding an error propagation mechanism
(either an additional errCh chan error or change ch to carry a result struct
{resp *pb.Response; err error}), send non-nil errors (except io.EOF) on that
channel before returning, and ensure callers of the stream reader (subscribe
logic) check this error channel/result.err to detect broken streams and trigger
retries; keep EOF behavior as a clean close with no error sent.
- Around line 106-109: The current handler in internal/node/client.go treats an
io.EOF from stream.Recv() as success by returning (nil, nil), which lets
cmd/publish.go record a successful publish with no ack; change the EOF branch to
return a non-nil error (e.g., wrap or return fmt.Errorf/io.ErrUnexpectedEOF with
context) so callers like the publish flow see the failure; update the error
message to mention "EOF before response" and ensure the function that calls
stream.Recv() (the code handling resp, err := stream.Recv()) propagates that
error instead of treating nil response as success.
- Around line 26-31: The gRPC client setup in the grpc.NewClient call currently
uses insecure.NewCredentials() and unbounded sizes
(grpc.MaxCallRecvMsgSize/math.MaxCallSendMsgSize), which exposes JwtToken in
cleartext and risks unbounded memory; update the transport to use TLS with
proper certificate validation (e.g., use
grpc.WithTransportCredentials(credentials.NewClientTLSFromCert(...) or system
root TLS creds) and replace math.MaxInt limits in grpc.WithDefaultCallOptions
(grpc.MaxCallRecvMsgSize/grpc.MaxCallSendMsgSize) with a reasonable cap such as
10 * 1024 * 1024 (10 MiB); ensure these changes are applied where nodeAddr is
used to create the connection so JwtToken isn't sent over plaintext.
In `@internal/session/client.go`:
- Around line 49-57: The code currently builds a POST to proxyURL (variable
proxyURL) and sends it over plain HTTP; to avoid leaking node tickets require
the proxyURL scheme to be https by default: validate proxyURL (parse via
net/url.Parse) and if url.Scheme != "https" return an error unless an explicit
insecure opt-in flag (e.g., allowInsecureProxy or an env/config boolean) is set;
update the function that constructs the request (where http.NewRequest and
client := &http.Client{Timeout: 10 * time.Second} are used) to perform this
check before creating the request and document/propagate the opt-in flag so
callers can deliberately enable non-HTTPS for dev only.
- Around line 12-18: The session selection never declares the desired transport,
so the proxy can return a non-gRPC node; update the code that asks the proxy for
a node to explicitly request gRPC-capable nodes by setting Node.Transport =
"grpc" (or adding a "transport":"grpc" field to the proxy request payload)
before sending the request—look for the Node struct and the functions that call
the proxy (e.g., the session creation / node request routines) and ensure they
populate Node.Transport with "grpc" so callers that immediately instantiate a
gRPC client only receive gRPC-capable nodes.
In `@README.md`:
- Around line 33-38: Replace the hardcoded Auth0 subject
"google-oauth2|116937893938826513819" used in the Authentication Status examples
with a clearly fake placeholder (for example "AUTH0_SUBJECT_PLACEHOLDER" or
"auth0|<USER_ID>"); update every occurrence of that literal string (including
the example block under the "Authentication Status" header and the other
examples referenced) so public docs no longer expose a stable user identifier.
- Around line 13-24: Update the fenced code blocks that show CLI/plain output
(for example the block starting with "Initiating authentication..." and the
other blocks referenced in the comment) to include a language tag of "text"
(i.e., change ``` to ```text) so markdownlint MD040 is satisfied; apply the same
change to the other output blocks noted (lines 32-46, 54-56, 66-73, 83-88,
111-115, 129-137, 145-158, 166-172, 180-183) to consistently mark these CLI
output samples as text.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 9a4f87b5-b447-4d3a-ae91-25e365440999
⛔ Files ignored due to path filters (7)
go.sumis excluded by!**/*.sumproto/p2p_stream.pb.gois excluded by!**/*.pb.goproto/p2p_stream_grpc.pb.gois excluded by!**/*.pb.goproto/proxy_stream.pb.gois excluded by!**/*.pb.goproto/proxy_stream_grpc.pb.gois excluded by!**/*.pb.goproto/session.pb.gois excluded by!**/*.pb.goproto/session_grpc.pb.gois excluded by!**/*.pb.go
📒 Files selected for processing (27)
.github/workflows/ci.ymlMakefileREADME.mdcmd/publish.gocmd/subscribe.goe2e/cli_runner.goe2e/commands_test.goe2e/config.goe2e/cross_node_test.goe2e/failure_test.goe2e/fuzz_test.goe2e/integration_test.goe2e/publish_test.goe2e/ratelimit_scenarios_test.goe2e/ratelimit_test.goe2e/setup.goe2e/smoke_cases.goe2e/subscribe_test.goe2e/suite_test.goe2e/token.goe2e/validators.gogo.modinternal/grpc/proxy_client.gointernal/node/client.gointernal/session/client.goproto/p2p_stream.protoproto/session.proto
💤 Files with no reviewable changes (18)
- e2e/ratelimit_scenarios_test.go
- go.mod
- e2e/ratelimit_test.go
- e2e/token.go
- e2e/cli_runner.go
- e2e/integration_test.go
- e2e/smoke_cases.go
- e2e/subscribe_test.go
- e2e/suite_test.go
- e2e/publish_test.go
- e2e/setup.go
- e2e/commands_test.go
- e2e/validators.go
- e2e/fuzz_test.go
- e2e/failure_test.go
- e2e/config.go
- internal/grpc/proxy_client.go
- e2e/cross_node_test.go
There was a problem hiding this comment.
Actionable comments posted: 2
♻️ Duplicate comments (3)
cmd/subscribe.go (3)
153-159:⚠️ Potential issue | 🟠 MajorRequest only subscribe capability for this session.
This path never publishes, but it still asks the proxy for
"publish". That widens the node ticket unnecessarily.Possible fix
sess, err := session.CreateSession( proxyURL, clientIDToUse, []string{subTopic}, - []string{"publish", "subscribe"}, + []string{"subscribe"}, subExposeAmount, )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@cmd/subscribe.go` around lines 153 - 159, The session is requesting both "publish" and "subscribe" scopes though this command only needs subscribe; update the session.CreateSession call (the call site that passes the scopes slice) to request only "subscribe" (i.e. replace []string{"publish", "subscribe"} with a single-scope slice containing "subscribe") so the proxy ticket is limited to subscribe capability only.
164-190:⚠️ Potential issue | 🟠 Major
--expose-amountstill doesn't provide failover.Only
sess.Nodes[0]is ever dialed/subscribed. If that first connect or stream setup fails, the command exits and ignores the remaining exposed nodes.Implementation sketch
- bestNode := sess.Nodes[0] - fmt.Printf("Session: %s | Node: %s (%s, score: %.2f)\n", - sess.SessionID, bestNode.Address, bestNode.Region, bestNode.Score) + var ( + bestNode session.Node + nodeClient *node.Client + msgChan <-chan *node.SubscribeResponse + lastErr error + ) + + for _, candidate := range sess.Nodes { + fmt.Printf("Trying node: %s (%s, score: %.2f)\n", candidate.Address, candidate.Region, candidate.Score) + + c, err := node.NewClient(candidate.Address) + if err != nil { + lastErr = err + continue + } + + ch, err := c.Subscribe(ctx, candidate.Ticket, subTopic, 100) + if err != nil { + c.Close() + lastErr = err + continue + } + + bestNode = candidate + nodeClient = c + msgChan = ch + break + } + + if nodeClient == nil { + return fmt.Errorf("subscribe failed on all exposed nodes: %v", lastErr) + }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@cmd/subscribe.go` around lines 164 - 190, The code only dials sess.Nodes[0] and fails fast; change Subscribe command to iterate over sess.Nodes and attempt to connect/subscribe to each until one succeeds: for each node in sess.Nodes (use bestNode variable name when selecting), call node.NewClient(node.Address), handle error by closing client if non-nil and continue to next node, then call nodeClient.Subscribe(ctx, node.Ticket, subTopic, 100) and on subscribe error close the client and continue; when subscribe returns a valid msgChan, break and use that channel; ensure any created nodeClient is deferred/closed only for the successful subscription lifecycle and that ctx/cancel are set appropriately.
203-233:⚠️ Potential issue | 🟠 MajorThe webhook queue still doesn't cap concurrency.
Each dequeued message spins up another goroutine, so a hot subscription can fan out to unbounded concurrent POSTs even when
--webhook-queue-sizeis small.Preferred direction
Process inline inside a fixed set of workers, or inline inside the queue reader itself, so the queue size actually provides backpressure.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@cmd/subscribe.go` around lines 203 - 233, The webhook handler currently spawns a new goroutine per message (inside the loop that ranges over wq), which allows unbounded concurrent POSTs and defeats --webhook-queue-size; change it to use a fixed worker pool or process inline so the queue size exerts backpressure: create a bounded set of workers (e.g., spawn N goroutines once, where N is the configured webhook worker count or webhookQueueSize) that each range over wq and perform webhookFormatter.FormatMessage, http.NewRequestWithContext, and http.DefaultClient.Do with the existing timeout logic (use webhookTimeoutSecs, webhookFormatter.FormatMessage, webhookURL, wq, etc.), and remove the per-message inner goroutine so concurrency is limited.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@cmd/subscribe.go`:
- Around line 216-221: The code always sets Content-Type: application/json on
the webhook request even when forwarding raw payloads; modify the logic around
the http.NewRequestWithContext call so that req.Header.Set("Content-Type",
"application/json") is only performed when a schema/JSON payload is intended
(e.g., when webhookSchema (or the variable representing --webhook-schema) is
non-empty or you have explicitly formatted JSON), otherwise do not set the
Content-Type header so raw/text/binary payloads are forwarded unchanged; update
the code paths around http.NewRequestWithContext, req, formattedPayload,
webhookURL and wctx accordingly.
- Around line 202-203: The channel for webhook messages is created
unconditionally with make(chan webhookMsg, webhookQueueSize) which will panic if
webhookQueueSize is negative even when webhookURL is empty; modify the code so
validation of webhookQueueSize occurs before channel creation and only create wq
when webhookURL != "" and webhookQueueSize is a non-negative integer (validate
webhookQueueSize and return or set a sane default on invalid values). Locate
uses of webhookQueueSize, webhookMsg, and wq in the subscribe command to add the
check and conditional creation.
---
Duplicate comments:
In `@cmd/subscribe.go`:
- Around line 153-159: The session is requesting both "publish" and "subscribe"
scopes though this command only needs subscribe; update the
session.CreateSession call (the call site that passes the scopes slice) to
request only "subscribe" (i.e. replace []string{"publish", "subscribe"} with a
single-scope slice containing "subscribe") so the proxy ticket is limited to
subscribe capability only.
- Around line 164-190: The code only dials sess.Nodes[0] and fails fast; change
Subscribe command to iterate over sess.Nodes and attempt to connect/subscribe to
each until one succeeds: for each node in sess.Nodes (use bestNode variable name
when selecting), call node.NewClient(node.Address), handle error by closing
client if non-nil and continue to next node, then call nodeClient.Subscribe(ctx,
node.Ticket, subTopic, 100) and on subscribe error close the client and
continue; when subscribe returns a valid msgChan, break and use that channel;
ensure any created nodeClient is deferred/closed only for the successful
subscription lifecycle and that ctx/cancel are set appropriately.
- Around line 203-233: The webhook handler currently spawns a new goroutine per
message (inside the loop that ranges over wq), which allows unbounded concurrent
POSTs and defeats --webhook-queue-size; change it to use a fixed worker pool or
process inline so the queue size exerts backpressure: create a bounded set of
workers (e.g., spawn N goroutines once, where N is the configured webhook worker
count or webhookQueueSize) that each range over wq and perform
webhookFormatter.FormatMessage, http.NewRequestWithContext, and
http.DefaultClient.Do with the existing timeout logic (use webhookTimeoutSecs,
webhookFormatter.FormatMessage, webhookURL, wq, etc.), and remove the
per-message inner goroutine so concurrency is limited.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 41911699-f36a-43ba-81db-0a9eeb8d7650
📒 Files selected for processing (2)
cmd/publish.gocmd/subscribe.go
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@README.md`:
- Around line 191-199: The statement "All commands support `--disable-auth
--client-id <id>`" is too broad; change it to explicitly state which commands
support the flag (e.g., the data-plane CLI commands used in the examples) and
avoid implying auth lifecycle or admin commands are covered. Update the README
text to mention that `--disable-auth` applies to the data-plane commands such as
the mump2p CLI commands shown (mump2p subscribe, mump2p publish, mump2p
list-topics) and, if needed, add a short note clarifying that auth
lifecycle/admin commands do not support that flag.
- Around line 40-44: Update the README sample so the two identical "Publish
Rate" rows are disambiguated: change the first "Publish Rate: 50000 per hour"
to "Publish Rate (per hour): 50000 per hour" (or mirror the CLI label, e.g.,
"Publish Rate (hourly): 50000") and change the second to "Publish Rate (per
second): 600 per second" (or the CLI equivalent) so the two limits are clearly
distinct in the output example.
There was a problem hiding this comment.
♻️ Duplicate comments (9)
cmd/subscribe.go (4)
202-202:⚠️ Potential issue | 🟡 MinorValidate
--webhook-queue-sizebefore channel creation.
make(chan webhookMsg, webhookQueueSize)panics for negative values. The channel is created unconditionally before checkingwebhookURL != "".Suggested fix
+ if webhookQueueSize < 0 { + return fmt.Errorf("--webhook-queue-size must be >= 0") + } + type webhookMsg struct { data []byte } - wq := make(chan webhookMsg, webhookQueueSize) + var wq chan webhookMsg + if webhookURL != "" { + wq = make(chan webhookMsg, webhookQueueSize) + }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@cmd/subscribe.go` at line 202, Validate webhookQueueSize before creating the channel: ensure the integer webhookQueueSize is non-negative and return an error (or set a default) if it is negative, and only call make(chan webhookMsg, webhookQueueSize) after confirming webhookURL != "" so the channel is not created unconditionally; update the logic around webhookQueueSize and webhookURL checks (referencing webhookQueueSize, webhookURL, webhookMsg and the make(chan webhookMsg, webhookQueueSize) call) to perform validation first and then create the buffered channel when needed.
164-176:⚠️ Potential issue | 🟠 Major
--expose-amountdoes not provide actual failover.The code only ever connects to
sess.Nodes[0]. If the first node fails, the command exits without trying other exposed nodes. The help text at line 297 claims failover support, but the implementation doesn't deliver it.Consider iterating over available nodes on connection/stream failure before giving up.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@cmd/subscribe.go` around lines 164 - 176, The code currently always uses sess.Nodes[0] (bestNode) for connections and prints other nodes but never attempts failover; change the connection/stream establishment to iterate over sess.Nodes (using the existing bestNode/sess.Nodes identifiers) and attempt to connect to each node in order, logging each attempt and moving to the next node on connection or stream failure, and only exit with error after all nodes have been tried; implement this by replacing the single-use connection logic with a retry loop that calls the existing connection/stream functions (the code that dials/opens the stream) per node, breaks on success, and ensures errors from each failed node are logged so failover actually occurs.
221-221:⚠️ Potential issue | 🟠 MajorDon't force
application/jsonfor raw webhook payloads.When
--webhook-schemais empty, the raw message body is forwarded but still labeled as JSON. Binary or plain-text subscriptions may be rejected or misparsed by receivers.Suggested fix
- req.Header.Set("Content-Type", "application/json") + contentType := "application/octet-stream" + if webhookSchema != "" { + contentType = "application/json" + } + req.Header.Set("Content-Type", contentType)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@cmd/subscribe.go` at line 221, The code currently unconditionally sets req.Header.Set("Content-Type", "application/json"), which forces JSON for raw webhook payloads; change this so the Content-Type header is only set when a webhook schema is provided (i.e., when the --webhook-schema value is non-empty). Update the code that builds the outbound request (the spot using req.Header.Set("Content-Type", "application/json")) to check the webhook schema variable (e.g., webhookSchema or whatever flag/field holds --webhook-schema) and set Content-Type to "application/json" (or to the explicit schema value, if you accept a MIME type) only when that variable is not empty; otherwise do not set the Content-Type header so binary/plain-text payloads are forwarded raw. Ensure you modify the same request-building logic that references req.Header.Set to be conditional.
199-233:⚠️ Potential issue | 🟠 MajorWebhook dispatcher spawns unbounded concurrent goroutines.
Each dequeued message launches a new goroutine (line 206), so a high-throughput subscription can fan out to unbounded concurrent HTTP POSTs regardless of
--webhook-queue-size. The queue only limits buffered messages, not in-flight requests.Use a fixed worker pool or process messages inline within the single consumer goroutine to enforce real backpressure.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@cmd/subscribe.go` around lines 199 - 233, The current consumer goroutine for wq (channel of webhookMsg) spawns a new goroutine per message (the inner go func(payload []byte) { ... }(msg.data)), allowing unbounded in-flight HTTP POSTs; replace that unbounded spawn with a bounded worker model or inline processing to enforce backpressure. Concretely, remove the per-message go spawn inside the for msg := range wq loop and either (a) create a fixed pool of N worker goroutines that each read from wq and execute webhookFormatter.FormatMessage(...), create the http.Request and call http.DefaultClient.Do(...), or (b) perform the HTTP POST directly in the single consumer goroutine so only one in-flight request exists; use a semaphore/channel (size configurable, e.g., webhookWorkers) if you need limited concurrency. Ensure error handling around webhookFormatter.FormatMessage and resp.Body.Close remains, and use webhookQueueSize only for buffering while webhookWorkers controls concurrent requests.README.md (3)
40-45:⚠️ Potential issue | 🟡 MinorDisambiguate the two publish-rate rows.
Lines 42-43 use the same
Publish Ratelabel for different limits (per hour vs per second), making the output confusing.Suggested fix
Rate Limits: ------------ -Publish Rate: 50000 per hour -Publish Rate: 600 per second +Publish Rate (hourly): 50000 +Publish Rate (per sec): 600 Max Message Size: 10.00 MB🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@README.md` around lines 40 - 45, The two identical "Publish Rate" rows in the README are ambiguous; update the labels to clearly distinguish the time windows (e.g., "Publish Rate (per hour): 50000" and "Publish Rate (per second): 600") so readers know which limit applies to hourly vs per-second quotas—modify the lines that currently read "Publish Rate" to include the time unit (per hour / per second) and keep the numeric values and other rows (Max Message Size, Daily Quota) unchanged.
13-24:⚠️ Potential issue | 🟡 MinorAdd language tags to plain-output code blocks.
markdownlint MD040 flags fenced code blocks without language specifiers. Use
textfor CLI output samples to satisfy the linter and improve rendering.Example fix
-``` +```text Initiating authentication...Also applies to: 32-46, 54-56, 66-73, 83-88, 111-115, 129-137, 145-158, 166-172, 180-183
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@README.md` around lines 13 - 24, The fenced code blocks showing CLI/plain output (for example the block starting with "Initiating authentication..." and other blocks at ranges you listed) lack a language tag; update each triple-backtick fence from ``` to ```text so markdownlint MD040 is satisfied and the output renders as plain text — apply this change to the example block and all other occurrences noted (lines 32-46, 54-56, 66-73, 83-88, 111-115, 129-137, 145-158, 166-172, 180-183).
191-199:⚠️ Potential issue | 🟡 MinorNarrow the
--disable-authscope claim."All commands support
--disable-auth" is too broad. Auth lifecycle commands (login,logout,whoami) don't need this flag. Clarify that it applies to data-plane commands.Suggested fix
-All commands support `--disable-auth --client-id <id>` to skip Auth0. +Data-plane commands (`subscribe`, `publish`, `list-topics`) support `--disable-auth --client-id <id>` to skip Auth0 for testing.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@README.md` around lines 191 - 199, The README incorrectly states “All commands support --disable-auth”; narrow that scope by clarifying the flag only applies to data-plane commands (e.g., subscribe, publish, list-topics) and not to auth lifecycle commands (login, logout, whoami). Update the prose around the example block to explicitly state that --disable-auth --client-id <id> is accepted for data-plane CLI commands (list the example commands subscribe/publish/list-topics) and note that auth commands (login/logout/whoami) do not accept or require this flag.internal/node/client.go (2)
58-80:⚠️ Potential issue | 🟠 MajorStream failures are not propagated to the caller.
The goroutine prints receive errors (line 70) and closes the channel, but
cmd/subscribe.gocannot distinguish a broken stream from a clean EOF. The command exits successfully without any signal to retry another exposed node.Consider returning an error channel or a struct with both response and error to enable proper retry logic.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/node/client.go` around lines 58 - 80, The goroutine reading from stream (using stream.Recv and sending into ch) swallows non-EOF errors by printing and closing the response channel, so callers like cmd/subscribe.go cannot tell EOF vs fatal stream error; change the pattern to propagate errors by replacing ch (chan *pb.Response) with a result channel (e.g., chan result { resp *pb.Response; err error }) or add a separate error channel, send any non-nil err (including io.EOF if desired) into that channel before closing, and have the receiver check the error field (or error channel) to distinguish clean EOF from a broken stream; update usages around ch, the anonymous goroutine, and consumers in cmd/subscribe.go to handle the new result/error signal and trigger retries on real errors.
25-40:⚠️ Potential issue | 🔴 CriticalHarden gRPC transport before connecting to internet-facing nodes.
The client uses
insecure.NewCredentials()(cleartext) andmath.MaxIntmessage size bounds. Combined with public proxy URLs in the README, this means:
- Bearer tokens (
JwtTokenfield) transmit in plaintext- A malicious node can force unbounded memory allocation
Enable TLS with proper certificate validation and set reasonable message size limits (e.g., 10 MiB).
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/node/client.go` around lines 25 - 40, The NewClient function currently uses insecure.NewCredentials() and unbounded message sizes; update NewClient to use real TLS credentials (replace insecure.NewCredentials() with grpc/credentials.NewClientTLSFromCert or credentials.NewClientTLSFromFile and ensure proper server name verification) and constrain gRPC message sizes in the grpc.WithDefaultCallOptions (replace math.MaxInt with a reasonable limit such as 10*1024*1024 for grpc.MaxCallRecvMsgSize and grpc.MaxCallSendMsgSize); ensure any JwtToken usage continues to be sent over the secured channel and document or load TLS certs rather than disabling validation.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@cmd/subscribe.go`:
- Line 202: Validate webhookQueueSize before creating the channel: ensure the
integer webhookQueueSize is non-negative and return an error (or set a default)
if it is negative, and only call make(chan webhookMsg, webhookQueueSize) after
confirming webhookURL != "" so the channel is not created unconditionally;
update the logic around webhookQueueSize and webhookURL checks (referencing
webhookQueueSize, webhookURL, webhookMsg and the make(chan webhookMsg,
webhookQueueSize) call) to perform validation first and then create the buffered
channel when needed.
- Around line 164-176: The code currently always uses sess.Nodes[0] (bestNode)
for connections and prints other nodes but never attempts failover; change the
connection/stream establishment to iterate over sess.Nodes (using the existing
bestNode/sess.Nodes identifiers) and attempt to connect to each node in order,
logging each attempt and moving to the next node on connection or stream
failure, and only exit with error after all nodes have been tried; implement
this by replacing the single-use connection logic with a retry loop that calls
the existing connection/stream functions (the code that dials/opens the stream)
per node, breaks on success, and ensures errors from each failed node are logged
so failover actually occurs.
- Line 221: The code currently unconditionally sets
req.Header.Set("Content-Type", "application/json"), which forces JSON for raw
webhook payloads; change this so the Content-Type header is only set when a
webhook schema is provided (i.e., when the --webhook-schema value is non-empty).
Update the code that builds the outbound request (the spot using
req.Header.Set("Content-Type", "application/json")) to check the webhook schema
variable (e.g., webhookSchema or whatever flag/field holds --webhook-schema) and
set Content-Type to "application/json" (or to the explicit schema value, if you
accept a MIME type) only when that variable is not empty; otherwise do not set
the Content-Type header so binary/plain-text payloads are forwarded raw. Ensure
you modify the same request-building logic that references req.Header.Set to be
conditional.
- Around line 199-233: The current consumer goroutine for wq (channel of
webhookMsg) spawns a new goroutine per message (the inner go func(payload
[]byte) { ... }(msg.data)), allowing unbounded in-flight HTTP POSTs; replace
that unbounded spawn with a bounded worker model or inline processing to enforce
backpressure. Concretely, remove the per-message go spawn inside the for msg :=
range wq loop and either (a) create a fixed pool of N worker goroutines that
each read from wq and execute webhookFormatter.FormatMessage(...), create the
http.Request and call http.DefaultClient.Do(...), or (b) perform the HTTP POST
directly in the single consumer goroutine so only one in-flight request exists;
use a semaphore/channel (size configurable, e.g., webhookWorkers) if you need
limited concurrency. Ensure error handling around webhookFormatter.FormatMessage
and resp.Body.Close remains, and use webhookQueueSize only for buffering while
webhookWorkers controls concurrent requests.
In `@internal/node/client.go`:
- Around line 58-80: The goroutine reading from stream (using stream.Recv and
sending into ch) swallows non-EOF errors by printing and closing the response
channel, so callers like cmd/subscribe.go cannot tell EOF vs fatal stream error;
change the pattern to propagate errors by replacing ch (chan *pb.Response) with
a result channel (e.g., chan result { resp *pb.Response; err error }) or add a
separate error channel, send any non-nil err (including io.EOF if desired) into
that channel before closing, and have the receiver check the error field (or
error channel) to distinguish clean EOF from a broken stream; update usages
around ch, the anonymous goroutine, and consumers in cmd/subscribe.go to handle
the new result/error signal and trigger retries on real errors.
- Around line 25-40: The NewClient function currently uses
insecure.NewCredentials() and unbounded message sizes; update NewClient to use
real TLS credentials (replace insecure.NewCredentials() with
grpc/credentials.NewClientTLSFromCert or credentials.NewClientTLSFromFile and
ensure proper server name verification) and constrain gRPC message sizes in the
grpc.WithDefaultCallOptions (replace math.MaxInt with a reasonable limit such as
10*1024*1024 for grpc.MaxCallRecvMsgSize and grpc.MaxCallSendMsgSize); ensure
any JwtToken usage continues to be sent over the secured channel and document or
load TLS certs rather than disabling validation.
In `@README.md`:
- Around line 40-45: The two identical "Publish Rate" rows in the README are
ambiguous; update the labels to clearly distinguish the time windows (e.g.,
"Publish Rate (per hour): 50000" and "Publish Rate (per second): 600") so
readers know which limit applies to hourly vs per-second quotas—modify the lines
that currently read "Publish Rate" to include the time unit (per hour / per
second) and keep the numeric values and other rows (Max Message Size, Daily
Quota) unchanged.
- Around line 13-24: The fenced code blocks showing CLI/plain output (for
example the block starting with "Initiating authentication..." and other blocks
at ranges you listed) lack a language tag; update each triple-backtick fence
from ``` to ```text so markdownlint MD040 is satisfied and the output renders as
plain text — apply this change to the example block and all other occurrences
noted (lines 32-46, 54-56, 66-73, 83-88, 111-115, 129-137, 145-158, 166-172,
180-183).
- Around line 191-199: The README incorrectly states “All commands support
--disable-auth”; narrow that scope by clarifying the flag only applies to
data-plane commands (e.g., subscribe, publish, list-topics) and not to auth
lifecycle commands (login, logout, whoami). Update the prose around the example
block to explicitly state that --disable-auth --client-id <id> is accepted for
data-plane CLI commands (list the example commands
subscribe/publish/list-topics) and note that auth commands (login/logout/whoami)
do not accept or require this flag.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 7a5a34d8-d2da-4b28-839d-1f7b3c5e9f6f
📒 Files selected for processing (5)
MakefileREADME.mdcmd/publish.gocmd/subscribe.gointernal/node/client.go
🚧 Files skipped from review as they are similar to previous changes (1)
- Makefile
There was a problem hiding this comment.
🧹 Nitpick comments (4)
internal/session/store.go (3)
159-164: Silently falling through on lock failure may cause duplicate sessions.When
acquireLockfails, the code proceeds to create a new session without caching. This could lead to multiple concurrent processes each creating their own sessions. Consider logging the lock error for observability.Log lock acquisition failure
lf, lockErr := acquireLock() if lockErr != nil { - // If locking fails, fall through to create without cache. + // If locking fails, fall through to create without cache. + fmt.Printf("Warning: could not acquire session lock: %v\n", lockErr) sess, err := CreateSession(proxyURL, clientID, topics, capabilities, exposeAmount) return sess, false, err }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/session/store.go` around lines 159 - 164, When acquireLock() returns an error before creating the session, log the lockErr for observability before falling back to creating a session without cache; update the branch in the code that calls acquireLock() to emit a descriptive error (including the error value) via the module's logger (or processLogger) so failures to acquire the lock are visible, then continue to call CreateSession(proxyURL, clientID, topics, capabilities, exposeAmount) and return as before.
107-124: Topic matching is asymmetric—cached sessions with extra topics will match.The
matchesmethod checks that all requestedtopicsexist in the cached session, but doesn't verify the reverse. A cached session with topics["A", "B"]will match a request for["A"], which may be intentional for topic superset reuse. However, this differs from capabilities matching (line 111) which requires exact equality.If this asymmetry is intentional (reuse a session that covers more topics), consider adding a comment to clarify. Otherwise, align with capabilities:
Option: Enforce exact topic match like capabilities
func (c *CachedSession) matches(proxyURL, clientID string, topics, capabilities []string) bool { if c.ProxyURL != proxyURL || c.ClientID != clientID { return false } if sortedKey(c.Capabilities) != sortedKey(capabilities) { return false } - cached := make(map[string]bool, len(c.Topics)) - for _, t := range c.Topics { - cached[t] = true - } - for _, t := range topics { - if !cached[t] { - return false - } - } - return true + return sortedKey(c.Topics) == sortedKey(topics) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/session/store.go` around lines 107 - 124, The matches method on CachedSession currently treats topics asymmetrically (checks request topics are subset of c.Topics); change this to require exact topic equality to match capabilities behavior: in CachedSession.matches replace the manual subset logic over c.Topics and topics with a sortedKey equality check (use sortedKey(c.Topics) == sortedKey(topics)), ensuring topic comparison is symmetric and mirrors the capabilities check; if asymmetry was intended instead, add a brief clarifying comment above CachedSession.matches stating that cached sessions may be reused when they are a superset of requested topics.
191-198:InvalidateSessionsilently ignores errors.The function swallows errors from both
cachePath()andos.Remove(). While ignoring "file not found" is reasonable, other errors (permission denied, I/O errors) might warrant logging for debugging purposes.Log non-trivial removal errors
func InvalidateSession() { p, err := cachePath() if err != nil { + fmt.Printf("Warning: could not get cache path for invalidation: %v\n", err) return } - os.Remove(p) + if err := os.Remove(p); err != nil && !os.IsNotExist(err) { + fmt.Printf("Warning: could not remove session cache: %v\n", err) + } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/session/store.go` around lines 191 - 198, InvalidateSession currently swallows errors from cachePath() and os.Remove(); change it to check and handle errors: if cachePath() returns an error, log that error (don’t silently return), and when calling os.Remove(p) detect and ignore os.IsNotExist(err) but log any other errors (permission denied, I/O, etc.) with sufficient context. Use the project’s logger or the standard log package to emit a clear message referencing InvalidateSession, cachePath and os.Remove so non-trivial failures are visible for debugging.cmd/subscribe.go (1)
62-69:isReadablechecks byte range but doesn't fully validate UTF-8 control characters.The function allows any byte ≥0x20 plus newline/carriage-return/tab, but UTF-8 has control characters in the 0x7F-0x9F range that might not be "readable." For most use cases this is fine, but consider documenting the heuristic.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@cmd/subscribe.go` around lines 62 - 69, The isReadable function currently tests raw bytes and permits any byte >= 0x20 (plus \n, \r, \t) but misses UTF‑8 control codepoints (e.g. U+007F–U+009F); update isReadable to decode the byte slice as runes and reject any rune where unicode.IsControl(r) is true except for allowed whitespace runes ('\n','\r','\t'), and ensure utf8.Valid(b) is still checked; also add a short comment above isReadable documenting this heuristic and why control runes are rejected.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@cmd/subscribe.go`:
- Around line 62-69: The isReadable function currently tests raw bytes and
permits any byte >= 0x20 (plus \n, \r, \t) but misses UTF‑8 control codepoints
(e.g. U+007F–U+009F); update isReadable to decode the byte slice as runes and
reject any rune where unicode.IsControl(r) is true except for allowed whitespace
runes ('\n','\r','\t'), and ensure utf8.Valid(b) is still checked; also add a
short comment above isReadable documenting this heuristic and why control runes
are rejected.
In `@internal/session/store.go`:
- Around line 159-164: When acquireLock() returns an error before creating the
session, log the lockErr for observability before falling back to creating a
session without cache; update the branch in the code that calls acquireLock() to
emit a descriptive error (including the error value) via the module's logger (or
processLogger) so failures to acquire the lock are visible, then continue to
call CreateSession(proxyURL, clientID, topics, capabilities, exposeAmount) and
return as before.
- Around line 107-124: The matches method on CachedSession currently treats
topics asymmetrically (checks request topics are subset of c.Topics); change
this to require exact topic equality to match capabilities behavior: in
CachedSession.matches replace the manual subset logic over c.Topics and topics
with a sortedKey equality check (use sortedKey(c.Topics) == sortedKey(topics)),
ensuring topic comparison is symmetric and mirrors the capabilities check; if
asymmetry was intended instead, add a brief clarifying comment above
CachedSession.matches stating that cached sessions may be reused when they are a
superset of requested topics.
- Around line 191-198: InvalidateSession currently swallows errors from
cachePath() and os.Remove(); change it to check and handle errors: if
cachePath() returns an error, log that error (don’t silently return), and when
calling os.Remove(p) detect and ignore os.IsNotExist(err) but log any other
errors (permission denied, I/O, etc.) with sufficient context. Use the project’s
logger or the standard log package to emit a clear message referencing
InvalidateSession, cachePath and os.Remove so non-trivial failures are visible
for debugging.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: f6e0a15b-0ca5-47d9-b3e9-1cbef3a7700d
📒 Files selected for processing (4)
README.mdcmd/publish.gocmd/subscribe.gointernal/session/store.go
There was a problem hiding this comment.
Actionable comments posted: 5
♻️ Duplicate comments (1)
cmd/subscribe.go (1)
256-287:⚠️ Potential issue | 🟠 MajorWebhook dispatcher still spawns unbounded goroutines — this was flagged before.
Line 258 spawns a new goroutine for every dequeued message. A hot subscription will fan out to unbounded concurrent POSTs regardless of
--webhook-queue-size. The queue provides zero backpressure.Additionally, the
wqchannel is never closed, so the outer goroutine (line 256) leaks on shutdown.🔒️ Fixed worker pool with proper shutdown
var wq chan webhookMsg if webhookURL != "" && webhookQueueSize > 0 { wq = make(chan webhookMsg, webhookQueueSize) + const webhookWorkers = 4 + for w := 0; w < webhookWorkers; w++ { go func() { for msg := range wq { - go func(payload []byte) { wctx, wcancel := context.WithTimeout(context.Background(), time.Duration(webhookTimeoutSecs)*time.Second) - defer wcancel() - formattedPayload, fmtErr := webhookFormatter.FormatMessage(payload, subTopic, clientIDToUse, "grpc-msg") + formattedPayload, fmtErr := webhookFormatter.FormatMessage(msg.data, subTopic, clientIDToUse, "grpc-msg") if fmtErr != nil { fmt.Printf("Failed to format webhook payload: %v\n", fmtErr) + wcancel() - return + continue } req, reqErr := http.NewRequestWithContext(wctx, "POST", webhookURL, bytes.NewBuffer(formattedPayload)) if reqErr != nil { fmt.Printf("Failed to create webhook request: %v\n", reqErr) + wcancel() - return + continue } if webhookSchema != "" { req.Header.Set("Content-Type", "application/json") } resp, doErr := http.DefaultClient.Do(req) + wcancel() if doErr != nil { fmt.Printf("Webhook request error: %v\n", doErr) - return + continue } - defer resp.Body.Close() + resp.Body.Close() if resp.StatusCode >= 400 { fmt.Printf("Webhook responded with status code: %d\n", resp.StatusCode) } - }(msg.data) } }() + } }Then close
wqafter the message loop exits to signal workers to terminate:// After the select block that waits for sigChan/doneChan: if wq != nil { close(wq) }Based on learnings: "Watch for goroutine leaks, unbounded channels, and racy access; ensure contexts are plumbed and respected."
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@cmd/subscribe.go` around lines 256 - 287, The webhook dispatcher currently spawns an unbounded goroutine per message (the inner go func around msg.data) and never closes wq, causing unbounded concurrent POSTs and a leak; replace the per-message goroutine with a fixed worker pool: spawn N worker goroutines once (workers read from wq and perform webhookFormatter.FormatMessage, http.NewRequestWithContext using webhookTimeoutSecs, http.DefaultClient.Do, and resp.Body.Close) and use a sync.WaitGroup to wait for workers to finish; on shutdown (after the select that waits for sigChan/doneChan) close(wq) to signal workers to exit and wait for the WaitGroup so all inflight requests finish before process exit.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@cmd/publish.go`:
- Around line 214-223: The debug block re-parses resp.Data causing redundant
JSON unmarshalling; modify shortMsgID(resp) to return both the short and full
message ID (e.g., change signature to return (shortID, fullID string)) or add a
helper extractMsgID(resp) that unmarshals resp.Data once and returns both
values, then use the returned fullID for the IsDebugMode() print instead of
calling json.Unmarshal again; update callers (including where shortMsgID is used
for suffix) to accept the new return and remove the second json.Unmarshal in the
debug block.
In `@cmd/subscribe.go`:
- Around line 310-337: The block handling debug vs non-debug printing has
inconsistent indentation (around the IsDebugMode() branch using messageCount,
printDebugReceiveInfo, p2pMsg handling) causing a visual merge artifact; fix it
by reindenting the entire if/else so the inner lines (the IsDebugMode() true
branch from the atomic.AddInt32(&messageCount, 1) through the p2pMsg prints and
the else branch using isReadable(decodedMsg), atomic.AddInt32(&messageCount, 1),
displayTopic selection (subTopic/msgTopic) and fmt.Printf) align with the
surrounding code block level and ensure matching braces for the if IsDebugMode()
{ ... } else { ... } structure.
In `@internal/session/store.go`:
- Around line 184-186: The warning is being printed to stdout with fmt.Printf;
change the call in the cache-save error branch (where saveCached(c) is checked)
to write to stderr using fmt.Fprintf(os.Stderr, "Warning: could not cache
session: %v\n", saveErr) and ensure the "os" package is imported in the file so
the code builds.
- Around line 88-98: The saveCached function currently writes directly with
os.WriteFile which can corrupt the cache if the process crashes mid-write;
change saveCached to perform an atomic write by writing the marshaled data to a
temporary file in the same directory (use the same cachePath() directory and a
unique temp name), fsync the temp file, close it, then rename (os.Rename) the
temp file to the final cachePath so the replace is atomic; ensure file mode 0600
is applied to the temp file and propagate any errors from tempfile creation,
write, fsync, close, and rename.
- Around line 60-64: Replace the Unix-only syscall.Flock call in
internal/session/store.go with a cross-platform locker: add
github.com/gofrs/flock to imports, create a flock.Flock for the session lock
path instead of calling syscall.Flock(int(f.Fd()), ...), and use
flock.Lock()/TryLock()/Unlock() (or LockContext) to acquire and release the
lock; remove syscall.Flock usage and any direct fd-based locking so the session
acquisition logic (the code currently calling syscall.Flock) works on Windows
and Unix alike. Ensure to handle lock failures and close the underlying file as
before.
---
Duplicate comments:
In `@cmd/subscribe.go`:
- Around line 256-287: The webhook dispatcher currently spawns an unbounded
goroutine per message (the inner go func around msg.data) and never closes wq,
causing unbounded concurrent POSTs and a leak; replace the per-message goroutine
with a fixed worker pool: spawn N worker goroutines once (workers read from wq
and perform webhookFormatter.FormatMessage, http.NewRequestWithContext using
webhookTimeoutSecs, http.DefaultClient.Do, and resp.Body.Close) and use a
sync.WaitGroup to wait for workers to finish; on shutdown (after the select that
waits for sigChan/doneChan) close(wq) to signal workers to exit and wait for the
WaitGroup so all inflight requests finish before process exit.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository: getoptimum/coderabbit/.coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
Run ID: d2c0e93a-b89c-436d-82f1-d7f5ca86b2d9
📒 Files selected for processing (5)
README.mdcmd/publish.gocmd/root.gocmd/subscribe.gointernal/session/store.go
| if IsDebugMode() && resp != nil && msgID != "" { | ||
| var trace map[string]interface{} | ||
| if json.Unmarshal(resp.Data, &trace) == nil { | ||
| if mid, ok := trace["messageID"].(string); ok && mid != "" { | ||
| fmt.Printf(" message-id: %s\n", mid) | ||
| } else if mid, ok := trace["message_id"].(string); ok && mid != "" { | ||
| fmt.Printf(" message-id: %s\n", mid) | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Redundant JSON unmarshal — resp.Data is parsed twice.
shortMsgID(resp) already unmarshals resp.Data at line 199. This block parses it again just to print the full message ID. Consider caching the parsed trace or extracting both values in one pass.
♻️ Avoid double-parsing
-func shortMsgID(resp *pb.Response) string {
+func extractMsgID(resp *pb.Response) (short, full string) {
if resp == nil || len(resp.Data) == 0 {
- return ""
+ return "", ""
}
var trace map[string]interface{}
if err := json.Unmarshal(resp.Data, &trace); err != nil {
- return ""
+ return "", ""
}
- if mid, ok := trace["messageID"].(string); ok && mid != "" {
- if len(mid) > 8 {
- return mid[:8]
- }
- return mid
- }
- if mid, ok := trace["message_id"].(string); ok && mid != "" {
- if len(mid) > 8 {
- return mid[:8]
+ for _, key := range []string{"messageID", "message_id"} {
+ if mid, ok := trace[key].(string); ok && mid != "" {
+ full = mid
+ short = mid
+ if len(short) > 8 {
+ short = short[:8]
+ }
+ return short, full
}
- return mid
}
- return ""
+ return "", ""
}Then at call site:
shortID, fullID := extractMsgID(resp)
// use shortID for suffix, fullID for debug output🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@cmd/publish.go` around lines 214 - 223, The debug block re-parses resp.Data
causing redundant JSON unmarshalling; modify shortMsgID(resp) to return both the
short and full message ID (e.g., change signature to return (shortID, fullID
string)) or add a helper extractMsgID(resp) that unmarshals resp.Data once and
returns both values, then use the returned fullID for the IsDebugMode() print
instead of calling json.Unmarshal again; update callers (including where
shortMsgID is used for suffix) to accept the new return and remove the second
json.Unmarshal in the debug block.
There was a problem hiding this comment.
Actionable comments posted: 6
♻️ Duplicate comments (10)
internal/session/client.go (2)
29-42:⚠️ Potential issue | 🟠 MajorRequest still lacks explicit protocol/transport field.
The session request doesn't tell the proxy which transport the CLI needs. Both
cmd/publish.goandcmd/subscribe.goimmediately use gRPC node clients, so receiving a non-gRPC node causes a failure after session creation rather than at request time.Proposed fix
type sessionRequest struct { ClientID string `json:"client_id"` Topics []string `json:"topics"` + Protocol string `json:"protocol"` Capabilities []string `json:"capabilities"` ExposeAmount uint32 `json:"expose_amount"` } func CreateSession(proxyURL, clientID, accessToken string, topics, capabilities []string, exposeAmount uint32) (*Session, error) { reqData := sessionRequest{ ClientID: clientID, Topics: topics, + Protocol: "grpc", Capabilities: capabilities, ExposeAmount: exposeAmount, }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/session/client.go` around lines 29 - 42, The sessionRequest struct and CreateSession function must include an explicit transport/protocol field so the proxy knows which node type to return; update the sessionRequest definition to add a Transport (or Protocol) string field and set it in CreateSession (e.g., "grpc") based on the CLI usage (see cmd/publish.go and cmd/subscribe.go which expect gRPC node clients), ensuring CreateSession populates that field when building reqData so the proxy can reject non-gRPC nodes at request time.
49-57:⚠️ Potential issue | 🟠 MajorPlaintext proxy URLs expose bearer tokens on the wire.
The session response carries node tickets, and
Authorization: Beareris sent over the connection. Acceptinghttp://URLs risks leaking credentials. Consider requiringhttps://unless an explicit insecure flag is set.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/session/client.go` around lines 49 - 57, Validate the proxyURL before creating the POST request in internal/session/client.go: parse proxyURL (e.g., via url.Parse) and require the scheme to be "https" unless an explicit insecure option/flag is provided (e.g., a boolean like InsecureSkipVerify or AllowInsecureProxy passed into the client). If the scheme is not "https" and the insecure flag is false, return an error instead of proceeding to call http.NewRequest; if insecure is allowed, proceed but document the risk. Ensure this check is done before building the request that sets the Authorization header.internal/session/store.go (3)
192-194:⚠️ Potential issue | 🟡 MinorWarning should go to stderr, not stdout.
Mixing warnings into stdout corrupts piped/scripted output. Use
fmt.Fprintf(os.Stderr, ...).Proposed fix
if saveErr := saveCached(c); saveErr != nil { - fmt.Printf("Warning: could not cache session: %v\n", saveErr) + fmt.Fprintf(os.Stderr, "Warning: could not cache session: %v\n", saveErr) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/session/store.go` around lines 192 - 194, Replace the stdout warning print for caching errors with a stderr write: in the block that calls saveCached(c) (the "if saveErr := saveCached(c); saveErr != nil" branch) change the fmt.Printf to use fmt.Fprintf with os.Stderr so the warning is written to stderr (e.g., fmt.Fprintf(os.Stderr, "...", saveErr)); ensure you import "os" if not already imported.
88-98: 🧹 Nitpick | 🔵 TrivialConsider atomic write to prevent cache corruption on crash.
os.WriteFileoverwrites in place. A crash mid-write leaves truncated/invalid JSON. While the fallback to session creation is safe, write-to-temp + rename provides atomicity if cache integrity matters.Atomic write pattern
func saveCached(c *CachedSession) error { p, err := cachePath() if err != nil { return err } data, err := json.MarshalIndent(c, "", " ") if err != nil { return err } - return os.WriteFile(p, data, 0600) + tmp := p + ".tmp" + if err := os.WriteFile(tmp, data, 0600); err != nil { + return err + } + return os.Rename(tmp, p) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/session/store.go` around lines 88 - 98, The saveCached function currently uses os.WriteFile which can leave a truncated JSON file if the process crashes mid-write; change saveCached to perform an atomic write: call cachePath() to get the final path, create a temp file in the same directory (using os.CreateTemp), write the marshaled data to the temp file, fsync the file (or call File.Sync), close it, set file mode to 0600 if needed, then os.Rename the temp file to the final path; ensure any errors remove the temp file and propagate the error. Use the existing saveCached and cachePath identifiers to locate where to implement this pattern.
60-64:⚠️ Potential issue | 🔴 Critical
syscall.Flockis Unix-only — Windows builds will fail at runtime.
syscall.Flockconstants don't exist on Windows. Since there are no build tags, Windows builds will compile but crash whenpublishorsubscribeattempt to acquire the session lock.Options:
- Add
//go:build !windowsand provide a Windows-specific stub- Use
github.com/gofrs/flockfor cross-platform file locking- Document Windows as unsupported with explicit build constraints
#!/bin/bash # Check if Windows is a supported target in CI or build configs rg -i 'windows|GOOS.*windows' .github/ Makefile 2>/dev/null || echo "No Windows targets found" # Check for any build tags in session package rg '//go:build|//\+build' internal/session/🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/session/store.go` around lines 60 - 64, The code currently calls syscall.Flock (and uses LOCK_EX) which is Unix-only; replace this with a cross-platform file lock using github.com/gofrs/flock: add the module import, create a flock.Flock for the session lock path (instead of calling syscall.Flock on f.Fd()), call TryLock/Lock (or TryLockContext/LockContext) to acquire the lock and release it via Unlock, and remove direct uses of syscall.Flock and LOCK_EX; update the function that currently performs the syscall.Flock call (the code around the f, f.Fd() and return f) to return the open file after securing the flock or return the flock handle as needed, ensuring calls that expected the old behavior (publish/subscribe session lock code) now use the flock API. Ensure go.mod is updated and imports are added.README.md (3)
40-46:⚠️ Potential issue | 🟡 MinorDisambiguate duplicate "Publish Rate" labels.
Lines 42-43 both use
Publish Rate:which is confusing. Update to distinct labels:Rate Limits: ------------ -Publish Rate: 50000 per hour -Publish Rate: 600 per second +Publish Rate (hour): 50000 +Publish Rate (second): 600 Max Message Size: 10.00 MB🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@README.md` around lines 40 - 46, Update the two identical "Publish Rate:" labels in README.md so they are distinct and unambiguous (e.g., "Publish Rate (hourly): 50000 per hour" and "Publish Rate (per second): 600 per second"); locate the block containing the Rate Limits table and rename the labels accordingly to clarify which value is hourly vs per-second.
227-235:⚠️ Potential issue | 🟡 MinorNarrow the
--disable-authscope claim."All commands support..." is too broad. Auth lifecycle commands (
login,refresh) don't meaningfully use this flag. Scope the statement to data-plane commands.-All commands support `--disable-auth --client-id <id>` to skip Auth0. +Data-plane commands (`publish`, `subscribe`, `list-topics`) support `--disable-auth --client-id <id>` to skip Auth0 for testing.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@README.md` around lines 227 - 235, Update the README wording to narrow the scope of the --disable-auth flag: change the sentence "All commands support `--disable-auth --client-id <id>`..." to specify that only data-plane commands (e.g., subscribe, publish, list-topics) support `--disable-auth --client-id <id>`, and explicitly exclude auth lifecycle commands (e.g., login, refresh) from that claim so readers know those auth commands do not meaningfully use the flag.
13-24:⚠️ Potential issue | 🟡 MinorAdd
textlanguage tag to plain-output fences.Fenced code blocks showing CLI output should specify a language tag to satisfy markdownlint MD040. Use
```textfor these output samples.Also applies to lines: 32, 54, 66, 82, 91, 118, 136, 152, 165, 181, 202, 216
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@README.md` around lines 13 - 24, The fenced code blocks that show CLI output (for example the block starting with "Initiating authentication..." and ending with "Token expires at: 07 Apr 26 23:51 IST") must use the text language tag to satisfy markdownlint MD040; update those backticks from ``` to ```text for that block and the other similar output samples referenced (the CLI/output samples listed in the comment) so all plain-output fences use ```text.cmd/publish.go (1)
217-226: 🧹 Nitpick | 🔵 TrivialRedundant JSON unmarshal —
resp.Datais parsed twice.
shortMsgID(resp)at line 202 already unmarshalsresp.Data. This debug block parses it again just to print the full message ID. Extract both values in one pass.Refactor to avoid double-parsing
-func shortMsgID(resp *pb.Response) string { +func extractMsgID(resp *pb.Response) (short, full string) { if resp == nil || len(resp.Data) == 0 { - return "" + return "", "" } var trace map[string]interface{} if err := json.Unmarshal(resp.Data, &trace); err != nil { - return "" + return "", "" } - if mid, ok := trace["messageID"].(string); ok && mid != "" { - if len(mid) > 8 { - return mid[:8] - } - return mid - } - if mid, ok := trace["message_id"].(string); ok && mid != "" { - if len(mid) > 8 { - return mid[:8] + for _, key := range []string{"messageID", "message_id"} { + if mid, ok := trace[key].(string); ok && mid != "" { + full = mid + short = mid + if len(short) > 8 { + short = short[:8] + } + return short, full } - return mid } - return "" + return "", "" }Then at call site:
shortID, fullID := extractMsgID(resp) // use shortID for suffix, fullID for debug output🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@cmd/publish.go` around lines 217 - 226, The debug block is re-unmarshaling resp.Data which shortMsgID(resp) already parsed; refactor to parse once by extending shortMsgID (or adding extractMsgID) to return both the shortID and fullID (e.g., shortID, fullID := shortMsgID(resp) or extractMsgID(resp)), then use the returned fullID in the IsDebugMode() branch instead of re-running json.Unmarshal on resp.Data; update the call site where shortMsgID is used (around line 202) to receive both values and print the fullID in the debug fmt.Printf while keeping the shortID use for suffix logic.cmd/subscribe.go (1)
259-261:⚠️ Potential issue | 🟠 MajorThe webhook queue still does not bound concurrency.
Dequeuing from
wqimmediately starts another goroutine, so--webhook-queue-sizeno longer provides real backpressure under load. Process inline in a fixed worker pool instead of re-fanning out after dequeue.Based on learnings: Applies to **/*.go : Watch for goroutine leaks, unbounded channels, racy access; ensure contexts are plumbed and respected
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@cmd/subscribe.go` around lines 259 - 261, The current loop ranges over wq and starts a new goroutine per message (the inner anonymous func with signature func(payload []byte)), which defeats the --webhook-queue-size backpressure and can leak goroutines; replace this fan-out with a fixed worker pool that reads from wq and processes payloads inline (i.e., remove the inner go call and have N worker goroutines created once) or implement N long-lived workers that call a single processing function (e.g., processWebhook(payload []byte)) so dequeueing blocks when wq is full; ensure workers respect the request context/cancellation and that shutdown closes wq or cancels context to avoid leaks.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@cmd/subscribe.go`:
- Around line 329-332: The code currently does "if !isReadable(decodedMsg) {
continue }" which drops unreadable (binary/empty) messages before persistence
and webhook delivery; instead remove that continue and only use
isReadable(decodedMsg) to decide whether to render to stdout. Ensure the
persistence logic (the block that writes messages to storage/database) and the
webhook dispatch code still run for all messages (including when isReadable is
false), preserving the raw or original payload and any metadata; keep the
stdout/console print gated by isReadable so only printable payloads are
rendered.
- Around line 361-365: The shutdown currently calls cancel() on SIGINT and
returns immediately while background goroutines (the reader that writes to
persistFile and the webhook worker goroutines that enqueue/process via wq) may
still be active; modify the SIGINT branch to, after calling cancel(), wait for
doneChan to be closed (ensuring the reader goroutine has finished), then close
the work queue (wq) and join/stop the webhook workers before returning so no
goroutines are still writing to persistFile or enqueuing work; ensure the
deferred cleanup that closes persistFile runs after workers have exited and that
contexts passed into the reader and webhook worker functions respect the
canceled context.
- Around line 182-183: The subscription and webhook contexts are being created
from context.Background(), severing cancellation from the Cobra command; change
the root to use cmd.Context() instead: replace
context.WithCancel(context.Background()) used to create ctx/cancel for the
subscription with context.WithCancel(cmd.Context()), and ensure any webhook
timeout contexts (the places creating timeouts for webhook goroutines) are
derived from cmd.Context() (e.g., use context.WithTimeout(cmd.Context(), ...) or
context.WithCancel(cmd.Context())) so webhook goroutines will respect the
command's cancellation and shutdown semantics.
In `@docs/guide.md`:
- Around line 67-74: Markdown code fences in the docs/guide.md snippet (the
fenced block showing MUMP2P_AUTH_PATH and mump2p commands) lack blank lines
before and after the ``` markers, which triggers MD031; add a single blank line
immediately before the opening ``` and immediately after the closing ``` for
that block and for the other fenced blocks referenced (the blocks at lines
132-135, 195-197, 200-202, and 205-207) so each code fence is surrounded by one
blank line on both sides to satisfy markdownlint.
In `@internal/session/client.go`:
- Around line 84-86: CreateSession currently only checks for len(sess.Nodes) ==
0; update it to validate that len(sess.Nodes) >= exposeAmount (the same
condition used by isUsable in store.go) before returning the session. Locate the
CreateSession function and replace the simple empty check on sess.Nodes with a
check comparing to exposeAmount (or the variable/parameter that represents the
requested exposeAmount), and return an error like "not enough nodes to satisfy
exposeAmount" when the condition fails so cache validity and session creation
are consistent.
In `@Makefile`:
- Line 10: The Makefile default SERVICE_URL now points to the production proxy
which forces local builds to target production unless overridden; update project
docs and/or the Makefile to make local development easier by either (A)
documenting the need to override SERVICE_URL in README.md (add a short note
under "Development" showing the recommended make invocation such as make build
SERVICE_URL=http://localhost:12080) and referencing the SERVICE_URL variable, or
(B) add a new Makefile target named build-local that sets SERVICE_URL to
http://localhost:12080 and delegates to the existing build target (so developers
can run make build-local), and ensure the README references the new build-local
target and the SERVICE_URL variable.
---
Duplicate comments:
In `@cmd/publish.go`:
- Around line 217-226: The debug block is re-unmarshaling resp.Data which
shortMsgID(resp) already parsed; refactor to parse once by extending shortMsgID
(or adding extractMsgID) to return both the shortID and fullID (e.g., shortID,
fullID := shortMsgID(resp) or extractMsgID(resp)), then use the returned fullID
in the IsDebugMode() branch instead of re-running json.Unmarshal on resp.Data;
update the call site where shortMsgID is used (around line 202) to receive both
values and print the fullID in the debug fmt.Printf while keeping the shortID
use for suffix logic.
In `@cmd/subscribe.go`:
- Around line 259-261: The current loop ranges over wq and starts a new
goroutine per message (the inner anonymous func with signature func(payload
[]byte)), which defeats the --webhook-queue-size backpressure and can leak
goroutines; replace this fan-out with a fixed worker pool that reads from wq and
processes payloads inline (i.e., remove the inner go call and have N worker
goroutines created once) or implement N long-lived workers that call a single
processing function (e.g., processWebhook(payload []byte)) so dequeueing blocks
when wq is full; ensure workers respect the request context/cancellation and
that shutdown closes wq or cancels context to avoid leaks.
In `@internal/session/client.go`:
- Around line 29-42: The sessionRequest struct and CreateSession function must
include an explicit transport/protocol field so the proxy knows which node type
to return; update the sessionRequest definition to add a Transport (or Protocol)
string field and set it in CreateSession (e.g., "grpc") based on the CLI usage
(see cmd/publish.go and cmd/subscribe.go which expect gRPC node clients),
ensuring CreateSession populates that field when building reqData so the proxy
can reject non-gRPC nodes at request time.
- Around line 49-57: Validate the proxyURL before creating the POST request in
internal/session/client.go: parse proxyURL (e.g., via url.Parse) and require the
scheme to be "https" unless an explicit insecure option/flag is provided (e.g.,
a boolean like InsecureSkipVerify or AllowInsecureProxy passed into the client).
If the scheme is not "https" and the insecure flag is false, return an error
instead of proceeding to call http.NewRequest; if insecure is allowed, proceed
but document the risk. Ensure this check is done before building the request
that sets the Authorization header.
In `@internal/session/store.go`:
- Around line 192-194: Replace the stdout warning print for caching errors with
a stderr write: in the block that calls saveCached(c) (the "if saveErr :=
saveCached(c); saveErr != nil" branch) change the fmt.Printf to use fmt.Fprintf
with os.Stderr so the warning is written to stderr (e.g., fmt.Fprintf(os.Stderr,
"...", saveErr)); ensure you import "os" if not already imported.
- Around line 88-98: The saveCached function currently uses os.WriteFile which
can leave a truncated JSON file if the process crashes mid-write; change
saveCached to perform an atomic write: call cachePath() to get the final path,
create a temp file in the same directory (using os.CreateTemp), write the
marshaled data to the temp file, fsync the file (or call File.Sync), close it,
set file mode to 0600 if needed, then os.Rename the temp file to the final path;
ensure any errors remove the temp file and propagate the error. Use the existing
saveCached and cachePath identifiers to locate where to implement this pattern.
- Around line 60-64: The code currently calls syscall.Flock (and uses LOCK_EX)
which is Unix-only; replace this with a cross-platform file lock using
github.com/gofrs/flock: add the module import, create a flock.Flock for the
session lock path (instead of calling syscall.Flock on f.Fd()), call
TryLock/Lock (or TryLockContext/LockContext) to acquire the lock and release it
via Unlock, and remove direct uses of syscall.Flock and LOCK_EX; update the
function that currently performs the syscall.Flock call (the code around the f,
f.Fd() and return f) to return the open file after securing the flock or return
the flock handle as needed, ensuring calls that expected the old behavior
(publish/subscribe session lock code) now use the flock API. Ensure go.mod is
updated and imports are added.
In `@README.md`:
- Around line 40-46: Update the two identical "Publish Rate:" labels in
README.md so they are distinct and unambiguous (e.g., "Publish Rate (hourly):
50000 per hour" and "Publish Rate (per second): 600 per second"); locate the
block containing the Rate Limits table and rename the labels accordingly to
clarify which value is hourly vs per-second.
- Around line 227-235: Update the README wording to narrow the scope of the
--disable-auth flag: change the sentence "All commands support `--disable-auth
--client-id <id>`..." to specify that only data-plane commands (e.g., subscribe,
publish, list-topics) support `--disable-auth --client-id <id>`, and explicitly
exclude auth lifecycle commands (e.g., login, refresh) from that claim so
readers know those auth commands do not meaningfully use the flag.
- Around line 13-24: The fenced code blocks that show CLI output (for example
the block starting with "Initiating authentication..." and ending with "Token
expires at: 07 Apr 26 23:51 IST") must use the text language tag to satisfy
markdownlint MD040; update those backticks from ``` to ```text for that block
and the other similar output samples referenced (the CLI/output samples listed
in the comment) so all plain-output fences use ```text.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository: getoptimum/coderabbit/.coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
Run ID: 928dd644-0877-460f-b1a4-b368111332d2
📒 Files selected for processing (9)
MakefileREADME.mdcmd/auth.gocmd/publish.gocmd/root.gocmd/subscribe.godocs/guide.mdinternal/session/client.gointernal/session/store.go
| } else { | ||
| if !isReadable(decodedMsg) { | ||
| continue | ||
| } |
There was a problem hiding this comment.
Do not drop unreadable payloads here.
This continue skips the persistence and webhook blocks below, so valid binary or empty messages are silently lost in non-debug mode. Gate only the stdout rendering on isReadable, not the rest of the delivery pipeline.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@cmd/subscribe.go` around lines 329 - 332, The code currently does "if
!isReadable(decodedMsg) { continue }" which drops unreadable (binary/empty)
messages before persistence and webhook delivery; instead remove that continue
and only use isReadable(decodedMsg) to decide whether to render to stdout.
Ensure the persistence logic (the block that writes messages to
storage/database) and the webhook dispatch code still run for all messages
(including when isReadable is false), preserving the raw or original payload and
any metadata; keep the stdout/console print gated by isReadable so only
printable payloads are rendered.
There was a problem hiding this comment.
♻️ Duplicate comments (1)
cmd/subscribe.go (1)
222-245:⚠️ Potential issue | 🟠 MajorBackup nodes are displayed but never used for stream reconnection.
The code builds and displays a list of backup nodes ("1 backup node ready"), but if the active stream fails after initial connection, the command exits without attempting reconnection. The UX suggests failover capability that doesn't actually exist at runtime.
Either remove the "backup nodes ready" messaging, or implement reconnection logic that attempts backup nodes when
msgChancloses unexpectedly.Option A: Remove misleading messaging
- backupSuffix := "" - if len(backupNodes) == 1 { - backupSuffix = " — 1 backup node ready" - } else if len(backupNodes) > 1 { - backupSuffix = fmt.Sprintf(" — %d backup nodes ready", len(backupNodes)) - } - - fmt.Printf("Subscribed to '%s' on %s (%s) in %s%s\n", - subTopic, connectedNode.Address, region, humanDuration(connectDur), backupSuffix) - - for _, bn := range backupNodes { - r := bn.Region - if r == "" { - r = "unknown" - } - fmt.Printf(" backup: %s (%s)\n", bn.Address, r) - } + fmt.Printf("Subscribed to '%s' on %s (%s) in %s\n", + subTopic, connectedNode.Address, region, humanDuration(connectDur))🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@cmd/subscribe.go` around lines 222 - 245, The code currently builds and prints backupNodes but never uses them on stream failure; update the subscriber to actually attempt failover: when msgChan closes/unblocks (indicating the active stream ended), iterate over backupNodes and try to reconnect to each node (updating connectedNode and re-establishing the message stream/channel), break on first successful reconnection and continue normal processing, and if all backups fail then exit with an error; ensure printed status reflects the new connectedNode and that the previously-built backupNodes slice is reused for retries.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@cmd/subscribe.go`:
- Around line 222-245: The code currently builds and prints backupNodes but
never uses them on stream failure; update the subscriber to actually attempt
failover: when msgChan closes/unblocks (indicating the active stream ended),
iterate over backupNodes and try to reconnect to each node (updating
connectedNode and re-establishing the message stream/channel), break on first
successful reconnection and continue normal processing, and if all backups fail
then exit with an error; ensure printed status reflects the new connectedNode
and that the previously-built backupNodes slice is reused for retries.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository: getoptimum/coderabbit/.coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
Run ID: ae7445cb-79bd-4c82-96ac-008f2fb89be6
📒 Files selected for processing (1)
cmd/subscribe.go
Summary by CodeRabbit
New Features
Documentation
Chores
Tests / CI