Skip to content

feat: rewrite CLI for ADR-002 direct pub/sub#100

Open
swarna1101 wants to merge 11 commits intomainfrom
feat/adr-002-direct-pubsub
Open

feat: rewrite CLI for ADR-002 direct pub/sub#100
swarna1101 wants to merge 11 commits intomainfrom
feat/adr-002-direct-pubsub

Conversation

@swarna1101
Copy link
Copy Markdown
Collaborator

@swarna1101 swarna1101 commented Mar 8, 2026

Summary by CodeRabbit

  • New Features

    • Session-based publish/subscribe with multi-node selection, automatic failover, node-prefixed debug output, --expose-amount control, and a new direct command-stream protocol.
  • Documentation

    • README and guides rewritten for authentication-first CLI flows and updated examples (login/whoami/subscribe/publish), proxy endpoints, persistence/webhook usage, and debugging guidance.
  • Chores

    • Default service URL updated to a proxy endpoint; legacy protocol flags and help target removed; WebSocket dependency dropped.
  • Tests / CI

    • End-to-end and fuzz test suites and helpers removed; fuzz CI job disabled/commented out; related Makefile targets removed.

@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Mar 8, 2026

📝 Walkthrough

Walkthrough

Replaces direct per-request HTTP/gRPC publish and protocol-specific subscribe flows with a session-based control plane: adds session negotiation (HTTP), a file-backed session cache, and a node gRPC command-stream client. publish/subscribe CLI commands now request a session from a proxy, select node(s) from the session, and use gRPC bidi command streams to Publish/Subscribe with node tickets. Removes the legacy gRPC proxy client, websocket dependency, most e2e test harness, related Makefile/CI fuzz targets, and updates docs/README and CLI flags (adds --expose-amount, removes --grpc).

Sequence Diagram(s)

sequenceDiagram
    participant User as User/CLI
    participant CLI as CLI (publish)
    participant SessionSvc as Session Client
    participant Proxy as Proxy Service
    participant Node as Node gRPC

    User->>CLI: publish --topic "test" --message "hi"
    activate CLI
    CLI->>SessionSvc: CreateSession(proxyURL, clientID, [topic], capabilities, exposeAmount)
    activate SessionSvc
    SessionSvc->>Proxy: POST /api/v1/session {client_id, topics, expose_amount...}
    activate Proxy
    Proxy-->>SessionSvc: 200 {session_id, nodes:[{address,ticket,...}]}
    deactivate Proxy
    SessionSvc-->>CLI: Session{nodes}
    deactivate SessionSvc
    CLI->>CLI: select best node
    CLI->>Node: gRPC ListenCommands() (bidi stream) -> send Publish Request{topic,data,ticket}
    activate Node
    Node-->>CLI: Response{status,metadata}
    deactivate Node
    CLI-->>User: Publish successful
    deactivate CLI
Loading
sequenceDiagram
    participant User as User/CLI
    participant CLI as CLI (subscribe)
    participant SessionSvc as Session Client
    participant Proxy as Proxy Service
    participant Node as Node gRPC

    User->>CLI: subscribe --topic "test"
    activate CLI
    CLI->>SessionSvc: CreateSession(proxyURL, clientID, [topic], ..., exposeAmount)
    activate SessionSvc
    SessionSvc->>Proxy: POST /api/v1/session
    activate Proxy
    Proxy-->>SessionSvc: {session_id, nodes}
    deactivate Proxy
    SessionSvc-->>CLI: Session{nodes}
    deactivate SessionSvc
    CLI->>CLI: select best node
    CLI->>Node: gRPC ListenCommands() (bidi stream) -> send Subscribe Request{topic,jwt_token}
    activate Node
    loop messages
      Node-->>CLI: Response{data}
      CLI->>CLI: decodeMessage & format/display or webhook/persist
    end
    deactivate Node
    deactivate CLI
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~75 minutes

Possibly related PRs

Suggested reviewers

  • hpsing
  • Walentalien
🚥 Pre-merge checks | ✅ 4 | ❌ 6

❌ Failed checks (5 warnings, 1 inconclusive)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 11.54% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
Go Build And Test Rationale ⚠️ Warning PR introduces ~413 lines of new code across internal/session and internal/node packages and rewrites cmd/publish.go and cmd/subscribe.go (~327 lines changed), but repository has zero unit tests and entire e2e test suite was deleted (16 files, 1,426 lines). Implement unit tests for new packages (session client/store for concurrency safety, node client for gRPC logic) and integration tests for publish/subscribe command rewrites before merge.
Concurrency Safety ⚠️ Warning PR introduces a goroutine leak in cmd/subscribe.go lines 259-294: unbounded goroutines spawn with independent contexts that are never cancelled, causing resource leaks and incomplete deliveries. Use sync.WaitGroup to track goroutines, derive webhook timeout context from main context, and add wg.Wait() before program exit to ensure clean shutdown.
Public Api Changes ⚠️ Warning PR introduces breaking changes to public CLI interface without CHANGELOG or migration documentation, including removal of flags and changed service URL semantics. Create CHANGELOG.md documenting breaking changes, deprecated flags, proxy-mediated access shift, new parameters, and update README with migration guide including before/after examples.
Security Considerations ⚠️ Warning PR introduces multiple critical security vulnerabilities including unencrypted gRPC communications, unencrypted HTTP session creation, missing input validation, unverified session tickets, TOCTOU race conditions in session caching, and unsafe file locking. Enable mandatory TLS for gRPC and HTTPS for HTTP calls, add input validation for topics/messages, validate session tickets, refactor session caching under lock, implement timeout-based lock cleanup, and document trust boundaries.
Title check ❓ Inconclusive The PR title follows the required format with type(feat), domain(missing but implied by context), and a concise imperative summary under 72 characters. The title omits the <domain/pkg> component required by format specifications. Add the affected package (e.g., 'feat(cmd): rewrite CLI for ADR-002 direct pub/sub' or 'feat(cli): rewrite CLI for ADR-002 direct pub/sub') to meet exact format requirements.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.
Rust Best Practices ✅ Passed The custom check for Rust best practices is not applicable to this pull request as it is a Go project, not a Rust project.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/adr-002-direct-pubsub

Comment @coderabbitai help to get the list of available commands and usage tips.

@swarna1101 swarna1101 linked an issue Mar 8, 2026 that may be closed by this pull request
Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 12

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In @.github/workflows/ci.yml:
- Around line 58-68: The inline "NOTE:" comment sits at the same indentation as
the 'fuzz' job and will become an invalid job property if the block is
uncommented; move the NOTE line out of the job block by placing it as a
top-level comment (no leading indentation) above the commented 'fuzz' job block
or keep it indented as a normal comment inside the block body (e.g., under the
steps) so that the 'fuzz' job name and keys remain valid when re-enabled—update
the commented lines around the 'fuzz' job and the "NOTE:" so the 'fuzz' job key
and its children (steps, runs-on) remain correctly indented.

In `@cmd/publish.go`:
- Around line 146-149: Replace the unbounded context used for the one-shot
Publish RPC with a context that has a short timeout: instead of ctx :=
context.Background() create a cancellable timed context via
context.WithTimeout(context.Background(), <duration>) (e.g., 5-15s), defer the
cancel, and pass that ctx into nodeClient.Publish(bestNode.Ticket, pubTopic,
publishData); also ensure the time package is imported. This bounds
connect/send/recv and causes the command to fail fast on dead or black-holed
nodes.

In `@cmd/subscribe.go`:
- Around line 164-190: The code only dials sess.Nodes[0] so failures never fall
back to other exposed nodes; modify the logic that currently picks bestNode :=
sess.Nodes[0] and immediately calls node.NewClient(...) and
nodeClient.Subscribe(...) to instead iterate over the candidate nodes (limit the
iteration by the configured expose amount flag/variable and the length of
sess.Nodes), attempting to create a client with node.NewClient(address) and then
Subscribe(ctx, node.Ticket, subTopic, 100) for each node, closing any client on
error and continuing to the next node, and only returning an error if all
attempts fail; update references to sess.Nodes, bestNode.Ticket, node.NewClient,
nodeClient.Subscribe and ensure ctx/cancel are created once and canceled on
final exit.
- Around line 199-233: The current webhook dispatcher launches a new goroutine
for every dequeued message (inside the anonymous func reading from wq), causing
unbounded concurrency; replace this with a fixed worker pool so the queue
provides backpressure: create a workerCount constant (or use
webhookQueueSize/workers flag), start that many goroutines that each range over
wq and process messages inline (call webhookFormatter.FormatMessage, create
request with http.NewRequestWithContext using webhookTimeoutSecs, do the HTTP
POST, handle resp.Body.Close and status codes) instead of spawning a new
goroutine per message; keep existing symbols webhookMsg, wq, webhookQueueSize,
webhookURL, webhookTimeoutSecs, webhookFormatter, subTopic, and clientIDToUse to
locate and modify the code.
- Around line 153-159: The session is requesting both "publish" and "subscribe"
rights but the command only needs subscribe; update the call to
session.CreateSession so the capabilities slice contains only "subscribe"
(replace []string{"publish","subscribe"} with a single-entry slice containing
"subscribe") while keeping proxyURL, clientIDToUse, []string{subTopic}, and
subExposeAmount unchanged so the session requests least-privilege.

In `@internal/node/client.go`:
- Around line 58-80: The goroutine that reads from stream.Recv currently
swallows non-EOF errors and only closes ch, so callers (e.g., cmd/subscribe.go)
cannot distinguish EOF from a stream failure; change the implementation to
propagate receive errors to the caller by adding an error propagation mechanism
(either an additional errCh chan error or change ch to carry a result struct
{resp *pb.Response; err error}), send non-nil errors (except io.EOF) on that
channel before returning, and ensure callers of the stream reader (subscribe
logic) check this error channel/result.err to detect broken streams and trigger
retries; keep EOF behavior as a clean close with no error sent.
- Around line 106-109: The current handler in internal/node/client.go treats an
io.EOF from stream.Recv() as success by returning (nil, nil), which lets
cmd/publish.go record a successful publish with no ack; change the EOF branch to
return a non-nil error (e.g., wrap or return fmt.Errorf/io.ErrUnexpectedEOF with
context) so callers like the publish flow see the failure; update the error
message to mention "EOF before response" and ensure the function that calls
stream.Recv() (the code handling resp, err := stream.Recv()) propagates that
error instead of treating nil response as success.
- Around line 26-31: The gRPC client setup in the grpc.NewClient call currently
uses insecure.NewCredentials() and unbounded sizes
(grpc.MaxCallRecvMsgSize/math.MaxCallSendMsgSize), which exposes JwtToken in
cleartext and risks unbounded memory; update the transport to use TLS with
proper certificate validation (e.g., use
grpc.WithTransportCredentials(credentials.NewClientTLSFromCert(...) or system
root TLS creds) and replace math.MaxInt limits in grpc.WithDefaultCallOptions
(grpc.MaxCallRecvMsgSize/grpc.MaxCallSendMsgSize) with a reasonable cap such as
10 * 1024 * 1024 (10 MiB); ensure these changes are applied where nodeAddr is
used to create the connection so JwtToken isn't sent over plaintext.

In `@internal/session/client.go`:
- Around line 49-57: The code currently builds a POST to proxyURL (variable
proxyURL) and sends it over plain HTTP; to avoid leaking node tickets require
the proxyURL scheme to be https by default: validate proxyURL (parse via
net/url.Parse) and if url.Scheme != "https" return an error unless an explicit
insecure opt-in flag (e.g., allowInsecureProxy or an env/config boolean) is set;
update the function that constructs the request (where http.NewRequest and
client := &http.Client{Timeout: 10 * time.Second} are used) to perform this
check before creating the request and document/propagate the opt-in flag so
callers can deliberately enable non-HTTPS for dev only.
- Around line 12-18: The session selection never declares the desired transport,
so the proxy can return a non-gRPC node; update the code that asks the proxy for
a node to explicitly request gRPC-capable nodes by setting Node.Transport =
"grpc" (or adding a "transport":"grpc" field to the proxy request payload)
before sending the request—look for the Node struct and the functions that call
the proxy (e.g., the session creation / node request routines) and ensure they
populate Node.Transport with "grpc" so callers that immediately instantiate a
gRPC client only receive gRPC-capable nodes.

In `@README.md`:
- Around line 33-38: Replace the hardcoded Auth0 subject
"google-oauth2|116937893938826513819" used in the Authentication Status examples
with a clearly fake placeholder (for example "AUTH0_SUBJECT_PLACEHOLDER" or
"auth0|<USER_ID>"); update every occurrence of that literal string (including
the example block under the "Authentication Status" header and the other
examples referenced) so public docs no longer expose a stable user identifier.
- Around line 13-24: Update the fenced code blocks that show CLI/plain output
(for example the block starting with "Initiating authentication..." and the
other blocks referenced in the comment) to include a language tag of "text"
(i.e., change ``` to ```text) so markdownlint MD040 is satisfied; apply the same
change to the other output blocks noted (lines 32-46, 54-56, 66-73, 83-88,
111-115, 129-137, 145-158, 166-172, 180-183) to consistently mark these CLI
output samples as text.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 9a4f87b5-b447-4d3a-ae91-25e365440999

📥 Commits

Reviewing files that changed from the base of the PR and between 4f76630 and 85783ab.

⛔ Files ignored due to path filters (7)
  • go.sum is excluded by !**/*.sum
  • proto/p2p_stream.pb.go is excluded by !**/*.pb.go
  • proto/p2p_stream_grpc.pb.go is excluded by !**/*.pb.go
  • proto/proxy_stream.pb.go is excluded by !**/*.pb.go
  • proto/proxy_stream_grpc.pb.go is excluded by !**/*.pb.go
  • proto/session.pb.go is excluded by !**/*.pb.go
  • proto/session_grpc.pb.go is excluded by !**/*.pb.go
📒 Files selected for processing (27)
  • .github/workflows/ci.yml
  • Makefile
  • README.md
  • cmd/publish.go
  • cmd/subscribe.go
  • e2e/cli_runner.go
  • e2e/commands_test.go
  • e2e/config.go
  • e2e/cross_node_test.go
  • e2e/failure_test.go
  • e2e/fuzz_test.go
  • e2e/integration_test.go
  • e2e/publish_test.go
  • e2e/ratelimit_scenarios_test.go
  • e2e/ratelimit_test.go
  • e2e/setup.go
  • e2e/smoke_cases.go
  • e2e/subscribe_test.go
  • e2e/suite_test.go
  • e2e/token.go
  • e2e/validators.go
  • go.mod
  • internal/grpc/proxy_client.go
  • internal/node/client.go
  • internal/session/client.go
  • proto/p2p_stream.proto
  • proto/session.proto
💤 Files with no reviewable changes (18)
  • e2e/ratelimit_scenarios_test.go
  • go.mod
  • e2e/ratelimit_test.go
  • e2e/token.go
  • e2e/cli_runner.go
  • e2e/integration_test.go
  • e2e/smoke_cases.go
  • e2e/subscribe_test.go
  • e2e/suite_test.go
  • e2e/publish_test.go
  • e2e/setup.go
  • e2e/commands_test.go
  • e2e/validators.go
  • e2e/fuzz_test.go
  • e2e/failure_test.go
  • e2e/config.go
  • internal/grpc/proxy_client.go
  • e2e/cross_node_test.go

Comment thread .github/workflows/ci.yml
Comment thread cmd/publish.go Outdated
Comment thread cmd/subscribe.go Outdated
Comment thread cmd/subscribe.go Outdated
Comment thread cmd/subscribe.go
Comment thread internal/node/client.go
Comment thread internal/session/client.go
Comment thread internal/session/client.go
Comment thread README.md
Comment thread README.md
Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (3)
cmd/subscribe.go (3)

153-159: ⚠️ Potential issue | 🟠 Major

Request only subscribe capability for this session.

This path never publishes, but it still asks the proxy for "publish". That widens the node ticket unnecessarily.

Possible fix
 		sess, err := session.CreateSession(
 			proxyURL,
 			clientIDToUse,
 			[]string{subTopic},
-			[]string{"publish", "subscribe"},
+			[]string{"subscribe"},
 			subExposeAmount,
 		)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cmd/subscribe.go` around lines 153 - 159, The session is requesting both
"publish" and "subscribe" scopes though this command only needs subscribe;
update the session.CreateSession call (the call site that passes the scopes
slice) to request only "subscribe" (i.e. replace []string{"publish",
"subscribe"} with a single-scope slice containing "subscribe") so the proxy
ticket is limited to subscribe capability only.

164-190: ⚠️ Potential issue | 🟠 Major

--expose-amount still doesn't provide failover.

Only sess.Nodes[0] is ever dialed/subscribed. If that first connect or stream setup fails, the command exits and ignores the remaining exposed nodes.

Implementation sketch
-		bestNode := sess.Nodes[0]
-		fmt.Printf("Session: %s | Node: %s (%s, score: %.2f)\n",
-			sess.SessionID, bestNode.Address, bestNode.Region, bestNode.Score)
+		var (
+			bestNode   session.Node
+			nodeClient *node.Client
+			msgChan    <-chan *node.SubscribeResponse
+			lastErr    error
+		)
+
+		for _, candidate := range sess.Nodes {
+			fmt.Printf("Trying node: %s (%s, score: %.2f)\n", candidate.Address, candidate.Region, candidate.Score)
+
+			c, err := node.NewClient(candidate.Address)
+			if err != nil {
+				lastErr = err
+				continue
+			}
+
+			ch, err := c.Subscribe(ctx, candidate.Ticket, subTopic, 100)
+			if err != nil {
+				c.Close()
+				lastErr = err
+				continue
+			}
+
+			bestNode = candidate
+			nodeClient = c
+			msgChan = ch
+			break
+		}
+
+		if nodeClient == nil {
+			return fmt.Errorf("subscribe failed on all exposed nodes: %v", lastErr)
+		}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cmd/subscribe.go` around lines 164 - 190, The code only dials sess.Nodes[0]
and fails fast; change Subscribe command to iterate over sess.Nodes and attempt
to connect/subscribe to each until one succeeds: for each node in sess.Nodes
(use bestNode variable name when selecting), call node.NewClient(node.Address),
handle error by closing client if non-nil and continue to next node, then call
nodeClient.Subscribe(ctx, node.Ticket, subTopic, 100) and on subscribe error
close the client and continue; when subscribe returns a valid msgChan, break and
use that channel; ensure any created nodeClient is deferred/closed only for the
successful subscription lifecycle and that ctx/cancel are set appropriately.

203-233: ⚠️ Potential issue | 🟠 Major

The webhook queue still doesn't cap concurrency.

Each dequeued message spins up another goroutine, so a hot subscription can fan out to unbounded concurrent POSTs even when --webhook-queue-size is small.

Preferred direction

Process inline inside a fixed set of workers, or inline inside the queue reader itself, so the queue size actually provides backpressure.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cmd/subscribe.go` around lines 203 - 233, The webhook handler currently
spawns a new goroutine per message (inside the loop that ranges over wq), which
allows unbounded concurrent POSTs and defeats --webhook-queue-size; change it to
use a fixed worker pool or process inline so the queue size exerts backpressure:
create a bounded set of workers (e.g., spawn N goroutines once, where N is the
configured webhook worker count or webhookQueueSize) that each range over wq and
perform webhookFormatter.FormatMessage, http.NewRequestWithContext, and
http.DefaultClient.Do with the existing timeout logic (use webhookTimeoutSecs,
webhookFormatter.FormatMessage, webhookURL, wq, etc.), and remove the
per-message inner goroutine so concurrency is limited.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@cmd/subscribe.go`:
- Around line 216-221: The code always sets Content-Type: application/json on
the webhook request even when forwarding raw payloads; modify the logic around
the http.NewRequestWithContext call so that req.Header.Set("Content-Type",
"application/json") is only performed when a schema/JSON payload is intended
(e.g., when webhookSchema (or the variable representing --webhook-schema) is
non-empty or you have explicitly formatted JSON), otherwise do not set the
Content-Type header so raw/text/binary payloads are forwarded unchanged; update
the code paths around http.NewRequestWithContext, req, formattedPayload,
webhookURL and wctx accordingly.
- Around line 202-203: The channel for webhook messages is created
unconditionally with make(chan webhookMsg, webhookQueueSize) which will panic if
webhookQueueSize is negative even when webhookURL is empty; modify the code so
validation of webhookQueueSize occurs before channel creation and only create wq
when webhookURL != "" and webhookQueueSize is a non-negative integer (validate
webhookQueueSize and return or set a sane default on invalid values). Locate
uses of webhookQueueSize, webhookMsg, and wq in the subscribe command to add the
check and conditional creation.

---

Duplicate comments:
In `@cmd/subscribe.go`:
- Around line 153-159: The session is requesting both "publish" and "subscribe"
scopes though this command only needs subscribe; update the
session.CreateSession call (the call site that passes the scopes slice) to
request only "subscribe" (i.e. replace []string{"publish", "subscribe"} with a
single-scope slice containing "subscribe") so the proxy ticket is limited to
subscribe capability only.
- Around line 164-190: The code only dials sess.Nodes[0] and fails fast; change
Subscribe command to iterate over sess.Nodes and attempt to connect/subscribe to
each until one succeeds: for each node in sess.Nodes (use bestNode variable name
when selecting), call node.NewClient(node.Address), handle error by closing
client if non-nil and continue to next node, then call nodeClient.Subscribe(ctx,
node.Ticket, subTopic, 100) and on subscribe error close the client and
continue; when subscribe returns a valid msgChan, break and use that channel;
ensure any created nodeClient is deferred/closed only for the successful
subscription lifecycle and that ctx/cancel are set appropriately.
- Around line 203-233: The webhook handler currently spawns a new goroutine per
message (inside the loop that ranges over wq), which allows unbounded concurrent
POSTs and defeats --webhook-queue-size; change it to use a fixed worker pool or
process inline so the queue size exerts backpressure: create a bounded set of
workers (e.g., spawn N goroutines once, where N is the configured webhook worker
count or webhookQueueSize) that each range over wq and perform
webhookFormatter.FormatMessage, http.NewRequestWithContext, and
http.DefaultClient.Do with the existing timeout logic (use webhookTimeoutSecs,
webhookFormatter.FormatMessage, webhookURL, wq, etc.), and remove the
per-message inner goroutine so concurrency is limited.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 41911699-f36a-43ba-81db-0a9eeb8d7650

📥 Commits

Reviewing files that changed from the base of the PR and between 85783ab and 50761a1.

📒 Files selected for processing (2)
  • cmd/publish.go
  • cmd/subscribe.go

Comment thread cmd/subscribe.go Outdated
Comment thread cmd/subscribe.go Outdated
Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@README.md`:
- Around line 191-199: The statement "All commands support `--disable-auth
--client-id <id>`" is too broad; change it to explicitly state which commands
support the flag (e.g., the data-plane CLI commands used in the examples) and
avoid implying auth lifecycle or admin commands are covered. Update the README
text to mention that `--disable-auth` applies to the data-plane commands such as
the mump2p CLI commands shown (mump2p subscribe, mump2p publish, mump2p
list-topics) and, if needed, add a short note clarifying that auth
lifecycle/admin commands do not support that flag.
- Around line 40-44: Update the README sample so the two identical "Publish
Rate" rows are disambiguated: change the first "Publish Rate:  50000 per hour"
to "Publish Rate (per hour): 50000 per hour" (or mirror the CLI label, e.g.,
"Publish Rate (hourly): 50000") and change the second to "Publish Rate (per
second): 600 per second" (or the CLI equivalent) so the two limits are clearly
distinct in the output example.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: e5a6789e-e858-42ee-9b86-8d08a3f6c932

📥 Commits

Reviewing files that changed from the base of the PR and between 50761a1 and e9d52aa.

📒 Files selected for processing (1)
  • README.md

Comment thread README.md
Comment thread README.md
@swarna1101 swarna1101 requested a review from hpsing March 9, 2026 08:35
Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

♻️ Duplicate comments (9)
cmd/subscribe.go (4)

202-202: ⚠️ Potential issue | 🟡 Minor

Validate --webhook-queue-size before channel creation.

make(chan webhookMsg, webhookQueueSize) panics for negative values. The channel is created unconditionally before checking webhookURL != "".

Suggested fix
+		if webhookQueueSize < 0 {
+			return fmt.Errorf("--webhook-queue-size must be >= 0")
+		}
+
 		type webhookMsg struct {
 			data []byte
 		}
-		wq := make(chan webhookMsg, webhookQueueSize)
+		var wq chan webhookMsg
+		if webhookURL != "" {
+			wq = make(chan webhookMsg, webhookQueueSize)
+		}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cmd/subscribe.go` at line 202, Validate webhookQueueSize before creating the
channel: ensure the integer webhookQueueSize is non-negative and return an error
(or set a default) if it is negative, and only call make(chan webhookMsg,
webhookQueueSize) after confirming webhookURL != "" so the channel is not
created unconditionally; update the logic around webhookQueueSize and webhookURL
checks (referencing webhookQueueSize, webhookURL, webhookMsg and the make(chan
webhookMsg, webhookQueueSize) call) to perform validation first and then create
the buffered channel when needed.

164-176: ⚠️ Potential issue | 🟠 Major

--expose-amount does not provide actual failover.

The code only ever connects to sess.Nodes[0]. If the first node fails, the command exits without trying other exposed nodes. The help text at line 297 claims failover support, but the implementation doesn't deliver it.

Consider iterating over available nodes on connection/stream failure before giving up.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cmd/subscribe.go` around lines 164 - 176, The code currently always uses
sess.Nodes[0] (bestNode) for connections and prints other nodes but never
attempts failover; change the connection/stream establishment to iterate over
sess.Nodes (using the existing bestNode/sess.Nodes identifiers) and attempt to
connect to each node in order, logging each attempt and moving to the next node
on connection or stream failure, and only exit with error after all nodes have
been tried; implement this by replacing the single-use connection logic with a
retry loop that calls the existing connection/stream functions (the code that
dials/opens the stream) per node, breaks on success, and ensures errors from
each failed node are logged so failover actually occurs.

221-221: ⚠️ Potential issue | 🟠 Major

Don't force application/json for raw webhook payloads.

When --webhook-schema is empty, the raw message body is forwarded but still labeled as JSON. Binary or plain-text subscriptions may be rejected or misparsed by receivers.

Suggested fix
-						req.Header.Set("Content-Type", "application/json")
+						contentType := "application/octet-stream"
+						if webhookSchema != "" {
+							contentType = "application/json"
+						}
+						req.Header.Set("Content-Type", contentType)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cmd/subscribe.go` at line 221, The code currently unconditionally sets
req.Header.Set("Content-Type", "application/json"), which forces JSON for raw
webhook payloads; change this so the Content-Type header is only set when a
webhook schema is provided (i.e., when the --webhook-schema value is non-empty).
Update the code that builds the outbound request (the spot using
req.Header.Set("Content-Type", "application/json")) to check the webhook schema
variable (e.g., webhookSchema or whatever flag/field holds --webhook-schema) and
set Content-Type to "application/json" (or to the explicit schema value, if you
accept a MIME type) only when that variable is not empty; otherwise do not set
the Content-Type header so binary/plain-text payloads are forwarded raw. Ensure
you modify the same request-building logic that references req.Header.Set to be
conditional.

199-233: ⚠️ Potential issue | 🟠 Major

Webhook dispatcher spawns unbounded concurrent goroutines.

Each dequeued message launches a new goroutine (line 206), so a high-throughput subscription can fan out to unbounded concurrent HTTP POSTs regardless of --webhook-queue-size. The queue only limits buffered messages, not in-flight requests.

Use a fixed worker pool or process messages inline within the single consumer goroutine to enforce real backpressure.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cmd/subscribe.go` around lines 199 - 233, The current consumer goroutine for
wq (channel of webhookMsg) spawns a new goroutine per message (the inner go
func(payload []byte) { ... }(msg.data)), allowing unbounded in-flight HTTP
POSTs; replace that unbounded spawn with a bounded worker model or inline
processing to enforce backpressure. Concretely, remove the per-message go spawn
inside the for msg := range wq loop and either (a) create a fixed pool of N
worker goroutines that each read from wq and execute
webhookFormatter.FormatMessage(...), create the http.Request and call
http.DefaultClient.Do(...), or (b) perform the HTTP POST directly in the single
consumer goroutine so only one in-flight request exists; use a semaphore/channel
(size configurable, e.g., webhookWorkers) if you need limited concurrency.
Ensure error handling around webhookFormatter.FormatMessage and resp.Body.Close
remains, and use webhookQueueSize only for buffering while webhookWorkers
controls concurrent requests.
README.md (3)

40-45: ⚠️ Potential issue | 🟡 Minor

Disambiguate the two publish-rate rows.

Lines 42-43 use the same Publish Rate label for different limits (per hour vs per second), making the output confusing.

Suggested fix
 Rate Limits:
 ------------
-Publish Rate:  50000 per hour
-Publish Rate:  600 per second
+Publish Rate (hourly):   50000
+Publish Rate (per sec):  600
 Max Message Size:  10.00 MB
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@README.md` around lines 40 - 45, The two identical "Publish Rate" rows in the
README are ambiguous; update the labels to clearly distinguish the time windows
(e.g., "Publish Rate (per hour): 50000" and "Publish Rate (per second): 600") so
readers know which limit applies to hourly vs per-second quotas—modify the lines
that currently read "Publish Rate" to include the time unit (per hour / per
second) and keep the numeric values and other rows (Max Message Size, Daily
Quota) unchanged.

13-24: ⚠️ Potential issue | 🟡 Minor

Add language tags to plain-output code blocks.

markdownlint MD040 flags fenced code blocks without language specifiers. Use text for CLI output samples to satisfy the linter and improve rendering.

Example fix
-```
+```text
 Initiating authentication...

Also applies to: 32-46, 54-56, 66-73, 83-88, 111-115, 129-137, 145-158, 166-172, 180-183

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@README.md` around lines 13 - 24, The fenced code blocks showing CLI/plain
output (for example the block starting with "Initiating authentication..." and
other blocks at ranges you listed) lack a language tag; update each
triple-backtick fence from ``` to ```text so markdownlint MD040 is satisfied and
the output renders as plain text — apply this change to the example block and
all other occurrences noted (lines 32-46, 54-56, 66-73, 83-88, 111-115, 129-137,
145-158, 166-172, 180-183).

191-199: ⚠️ Potential issue | 🟡 Minor

Narrow the --disable-auth scope claim.

"All commands support --disable-auth" is too broad. Auth lifecycle commands (login, logout, whoami) don't need this flag. Clarify that it applies to data-plane commands.

Suggested fix
-All commands support `--disable-auth --client-id <id>` to skip Auth0.
+Data-plane commands (`subscribe`, `publish`, `list-topics`) support `--disable-auth --client-id <id>` to skip Auth0 for testing.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@README.md` around lines 191 - 199, The README incorrectly states “All
commands support --disable-auth”; narrow that scope by clarifying the flag only
applies to data-plane commands (e.g., subscribe, publish, list-topics) and not
to auth lifecycle commands (login, logout, whoami). Update the prose around the
example block to explicitly state that --disable-auth --client-id <id> is
accepted for data-plane CLI commands (list the example commands
subscribe/publish/list-topics) and note that auth commands (login/logout/whoami)
do not accept or require this flag.
internal/node/client.go (2)

58-80: ⚠️ Potential issue | 🟠 Major

Stream failures are not propagated to the caller.

The goroutine prints receive errors (line 70) and closes the channel, but cmd/subscribe.go cannot distinguish a broken stream from a clean EOF. The command exits successfully without any signal to retry another exposed node.

Consider returning an error channel or a struct with both response and error to enable proper retry logic.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/node/client.go` around lines 58 - 80, The goroutine reading from
stream (using stream.Recv and sending into ch) swallows non-EOF errors by
printing and closing the response channel, so callers like cmd/subscribe.go
cannot tell EOF vs fatal stream error; change the pattern to propagate errors by
replacing ch (chan *pb.Response) with a result channel (e.g., chan result { resp
*pb.Response; err error }) or add a separate error channel, send any non-nil err
(including io.EOF if desired) into that channel before closing, and have the
receiver check the error field (or error channel) to distinguish clean EOF from
a broken stream; update usages around ch, the anonymous goroutine, and consumers
in cmd/subscribe.go to handle the new result/error signal and trigger retries on
real errors.

25-40: ⚠️ Potential issue | 🔴 Critical

Harden gRPC transport before connecting to internet-facing nodes.

The client uses insecure.NewCredentials() (cleartext) and math.MaxInt message size bounds. Combined with public proxy URLs in the README, this means:

  • Bearer tokens (JwtToken field) transmit in plaintext
  • A malicious node can force unbounded memory allocation

Enable TLS with proper certificate validation and set reasonable message size limits (e.g., 10 MiB).

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/node/client.go` around lines 25 - 40, The NewClient function
currently uses insecure.NewCredentials() and unbounded message sizes; update
NewClient to use real TLS credentials (replace insecure.NewCredentials() with
grpc/credentials.NewClientTLSFromCert or credentials.NewClientTLSFromFile and
ensure proper server name verification) and constrain gRPC message sizes in the
grpc.WithDefaultCallOptions (replace math.MaxInt with a reasonable limit such as
10*1024*1024 for grpc.MaxCallRecvMsgSize and grpc.MaxCallSendMsgSize); ensure
any JwtToken usage continues to be sent over the secured channel and document or
load TLS certs rather than disabling validation.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Duplicate comments:
In `@cmd/subscribe.go`:
- Line 202: Validate webhookQueueSize before creating the channel: ensure the
integer webhookQueueSize is non-negative and return an error (or set a default)
if it is negative, and only call make(chan webhookMsg, webhookQueueSize) after
confirming webhookURL != "" so the channel is not created unconditionally;
update the logic around webhookQueueSize and webhookURL checks (referencing
webhookQueueSize, webhookURL, webhookMsg and the make(chan webhookMsg,
webhookQueueSize) call) to perform validation first and then create the buffered
channel when needed.
- Around line 164-176: The code currently always uses sess.Nodes[0] (bestNode)
for connections and prints other nodes but never attempts failover; change the
connection/stream establishment to iterate over sess.Nodes (using the existing
bestNode/sess.Nodes identifiers) and attempt to connect to each node in order,
logging each attempt and moving to the next node on connection or stream
failure, and only exit with error after all nodes have been tried; implement
this by replacing the single-use connection logic with a retry loop that calls
the existing connection/stream functions (the code that dials/opens the stream)
per node, breaks on success, and ensures errors from each failed node are logged
so failover actually occurs.
- Line 221: The code currently unconditionally sets
req.Header.Set("Content-Type", "application/json"), which forces JSON for raw
webhook payloads; change this so the Content-Type header is only set when a
webhook schema is provided (i.e., when the --webhook-schema value is non-empty).
Update the code that builds the outbound request (the spot using
req.Header.Set("Content-Type", "application/json")) to check the webhook schema
variable (e.g., webhookSchema or whatever flag/field holds --webhook-schema) and
set Content-Type to "application/json" (or to the explicit schema value, if you
accept a MIME type) only when that variable is not empty; otherwise do not set
the Content-Type header so binary/plain-text payloads are forwarded raw. Ensure
you modify the same request-building logic that references req.Header.Set to be
conditional.
- Around line 199-233: The current consumer goroutine for wq (channel of
webhookMsg) spawns a new goroutine per message (the inner go func(payload
[]byte) { ... }(msg.data)), allowing unbounded in-flight HTTP POSTs; replace
that unbounded spawn with a bounded worker model or inline processing to enforce
backpressure. Concretely, remove the per-message go spawn inside the for msg :=
range wq loop and either (a) create a fixed pool of N worker goroutines that
each read from wq and execute webhookFormatter.FormatMessage(...), create the
http.Request and call http.DefaultClient.Do(...), or (b) perform the HTTP POST
directly in the single consumer goroutine so only one in-flight request exists;
use a semaphore/channel (size configurable, e.g., webhookWorkers) if you need
limited concurrency. Ensure error handling around webhookFormatter.FormatMessage
and resp.Body.Close remains, and use webhookQueueSize only for buffering while
webhookWorkers controls concurrent requests.

In `@internal/node/client.go`:
- Around line 58-80: The goroutine reading from stream (using stream.Recv and
sending into ch) swallows non-EOF errors by printing and closing the response
channel, so callers like cmd/subscribe.go cannot tell EOF vs fatal stream error;
change the pattern to propagate errors by replacing ch (chan *pb.Response) with
a result channel (e.g., chan result { resp *pb.Response; err error }) or add a
separate error channel, send any non-nil err (including io.EOF if desired) into
that channel before closing, and have the receiver check the error field (or
error channel) to distinguish clean EOF from a broken stream; update usages
around ch, the anonymous goroutine, and consumers in cmd/subscribe.go to handle
the new result/error signal and trigger retries on real errors.
- Around line 25-40: The NewClient function currently uses
insecure.NewCredentials() and unbounded message sizes; update NewClient to use
real TLS credentials (replace insecure.NewCredentials() with
grpc/credentials.NewClientTLSFromCert or credentials.NewClientTLSFromFile and
ensure proper server name verification) and constrain gRPC message sizes in the
grpc.WithDefaultCallOptions (replace math.MaxInt with a reasonable limit such as
10*1024*1024 for grpc.MaxCallRecvMsgSize and grpc.MaxCallSendMsgSize); ensure
any JwtToken usage continues to be sent over the secured channel and document or
load TLS certs rather than disabling validation.

In `@README.md`:
- Around line 40-45: The two identical "Publish Rate" rows in the README are
ambiguous; update the labels to clearly distinguish the time windows (e.g.,
"Publish Rate (per hour): 50000" and "Publish Rate (per second): 600") so
readers know which limit applies to hourly vs per-second quotas—modify the lines
that currently read "Publish Rate" to include the time unit (per hour / per
second) and keep the numeric values and other rows (Max Message Size, Daily
Quota) unchanged.
- Around line 13-24: The fenced code blocks showing CLI/plain output (for
example the block starting with "Initiating authentication..." and other blocks
at ranges you listed) lack a language tag; update each triple-backtick fence
from ``` to ```text so markdownlint MD040 is satisfied and the output renders as
plain text — apply this change to the example block and all other occurrences
noted (lines 32-46, 54-56, 66-73, 83-88, 111-115, 129-137, 145-158, 166-172,
180-183).
- Around line 191-199: The README incorrectly states “All commands support
--disable-auth”; narrow that scope by clarifying the flag only applies to
data-plane commands (e.g., subscribe, publish, list-topics) and not to auth
lifecycle commands (login, logout, whoami). Update the prose around the example
block to explicitly state that --disable-auth --client-id <id> is accepted for
data-plane CLI commands (list the example commands
subscribe/publish/list-topics) and note that auth commands (login/logout/whoami)
do not accept or require this flag.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 7a5a34d8-d2da-4b28-839d-1f7b3c5e9f6f

📥 Commits

Reviewing files that changed from the base of the PR and between e9d52aa and c27546d.

📒 Files selected for processing (5)
  • Makefile
  • README.md
  • cmd/publish.go
  • cmd/subscribe.go
  • internal/node/client.go
🚧 Files skipped from review as they are similar to previous changes (1)
  • Makefile

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (4)
internal/session/store.go (3)

159-164: Silently falling through on lock failure may cause duplicate sessions.

When acquireLock fails, the code proceeds to create a new session without caching. This could lead to multiple concurrent processes each creating their own sessions. Consider logging the lock error for observability.

Log lock acquisition failure
 	lf, lockErr := acquireLock()
 	if lockErr != nil {
-		// If locking fails, fall through to create without cache.
+		// If locking fails, fall through to create without cache.
+		fmt.Printf("Warning: could not acquire session lock: %v\n", lockErr)
 		sess, err := CreateSession(proxyURL, clientID, topics, capabilities, exposeAmount)
 		return sess, false, err
 	}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/session/store.go` around lines 159 - 164, When acquireLock() returns
an error before creating the session, log the lockErr for observability before
falling back to creating a session without cache; update the branch in the code
that calls acquireLock() to emit a descriptive error (including the error value)
via the module's logger (or processLogger) so failures to acquire the lock are
visible, then continue to call CreateSession(proxyURL, clientID, topics,
capabilities, exposeAmount) and return as before.

107-124: Topic matching is asymmetric—cached sessions with extra topics will match.

The matches method checks that all requested topics exist in the cached session, but doesn't verify the reverse. A cached session with topics ["A", "B"] will match a request for ["A"], which may be intentional for topic superset reuse. However, this differs from capabilities matching (line 111) which requires exact equality.

If this asymmetry is intentional (reuse a session that covers more topics), consider adding a comment to clarify. Otherwise, align with capabilities:

Option: Enforce exact topic match like capabilities
 func (c *CachedSession) matches(proxyURL, clientID string, topics, capabilities []string) bool {
 	if c.ProxyURL != proxyURL || c.ClientID != clientID {
 		return false
 	}
 	if sortedKey(c.Capabilities) != sortedKey(capabilities) {
 		return false
 	}
-	cached := make(map[string]bool, len(c.Topics))
-	for _, t := range c.Topics {
-		cached[t] = true
-	}
-	for _, t := range topics {
-		if !cached[t] {
-			return false
-		}
-	}
-	return true
+	return sortedKey(c.Topics) == sortedKey(topics)
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/session/store.go` around lines 107 - 124, The matches method on
CachedSession currently treats topics asymmetrically (checks request topics are
subset of c.Topics); change this to require exact topic equality to match
capabilities behavior: in CachedSession.matches replace the manual subset logic
over c.Topics and topics with a sortedKey equality check (use
sortedKey(c.Topics) == sortedKey(topics)), ensuring topic comparison is
symmetric and mirrors the capabilities check; if asymmetry was intended instead,
add a brief clarifying comment above CachedSession.matches stating that cached
sessions may be reused when they are a superset of requested topics.

191-198: InvalidateSession silently ignores errors.

The function swallows errors from both cachePath() and os.Remove(). While ignoring "file not found" is reasonable, other errors (permission denied, I/O errors) might warrant logging for debugging purposes.

Log non-trivial removal errors
 func InvalidateSession() {
 	p, err := cachePath()
 	if err != nil {
+		fmt.Printf("Warning: could not get cache path for invalidation: %v\n", err)
 		return
 	}
-	os.Remove(p)
+	if err := os.Remove(p); err != nil && !os.IsNotExist(err) {
+		fmt.Printf("Warning: could not remove session cache: %v\n", err)
+	}
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/session/store.go` around lines 191 - 198, InvalidateSession
currently swallows errors from cachePath() and os.Remove(); change it to check
and handle errors: if cachePath() returns an error, log that error (don’t
silently return), and when calling os.Remove(p) detect and ignore
os.IsNotExist(err) but log any other errors (permission denied, I/O, etc.) with
sufficient context. Use the project’s logger or the standard log package to emit
a clear message referencing InvalidateSession, cachePath and os.Remove so
non-trivial failures are visible for debugging.
cmd/subscribe.go (1)

62-69: isReadable checks byte range but doesn't fully validate UTF-8 control characters.

The function allows any byte ≥0x20 plus newline/carriage-return/tab, but UTF-8 has control characters in the 0x7F-0x9F range that might not be "readable." For most use cases this is fine, but consider documenting the heuristic.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cmd/subscribe.go` around lines 62 - 69, The isReadable function currently
tests raw bytes and permits any byte >= 0x20 (plus \n, \r, \t) but misses UTF‑8
control codepoints (e.g. U+007F–U+009F); update isReadable to decode the byte
slice as runes and reject any rune where unicode.IsControl(r) is true except for
allowed whitespace runes ('\n','\r','\t'), and ensure utf8.Valid(b) is still
checked; also add a short comment above isReadable documenting this heuristic
and why control runes are rejected.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@cmd/subscribe.go`:
- Around line 62-69: The isReadable function currently tests raw bytes and
permits any byte >= 0x20 (plus \n, \r, \t) but misses UTF‑8 control codepoints
(e.g. U+007F–U+009F); update isReadable to decode the byte slice as runes and
reject any rune where unicode.IsControl(r) is true except for allowed whitespace
runes ('\n','\r','\t'), and ensure utf8.Valid(b) is still checked; also add a
short comment above isReadable documenting this heuristic and why control runes
are rejected.

In `@internal/session/store.go`:
- Around line 159-164: When acquireLock() returns an error before creating the
session, log the lockErr for observability before falling back to creating a
session without cache; update the branch in the code that calls acquireLock() to
emit a descriptive error (including the error value) via the module's logger (or
processLogger) so failures to acquire the lock are visible, then continue to
call CreateSession(proxyURL, clientID, topics, capabilities, exposeAmount) and
return as before.
- Around line 107-124: The matches method on CachedSession currently treats
topics asymmetrically (checks request topics are subset of c.Topics); change
this to require exact topic equality to match capabilities behavior: in
CachedSession.matches replace the manual subset logic over c.Topics and topics
with a sortedKey equality check (use sortedKey(c.Topics) == sortedKey(topics)),
ensuring topic comparison is symmetric and mirrors the capabilities check; if
asymmetry was intended instead, add a brief clarifying comment above
CachedSession.matches stating that cached sessions may be reused when they are a
superset of requested topics.
- Around line 191-198: InvalidateSession currently swallows errors from
cachePath() and os.Remove(); change it to check and handle errors: if
cachePath() returns an error, log that error (don’t silently return), and when
calling os.Remove(p) detect and ignore os.IsNotExist(err) but log any other
errors (permission denied, I/O, etc.) with sufficient context. Use the project’s
logger or the standard log package to emit a clear message referencing
InvalidateSession, cachePath and os.Remove so non-trivial failures are visible
for debugging.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: f6e0a15b-0ca5-47d9-b3e9-1cbef3a7700d

📥 Commits

Reviewing files that changed from the base of the PR and between c27546d and 89f5de9.

📒 Files selected for processing (4)
  • README.md
  • cmd/publish.go
  • cmd/subscribe.go
  • internal/session/store.go

@coderabbitai coderabbitai bot requested a review from Walentalien April 12, 2026 16:32
Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

♻️ Duplicate comments (1)
cmd/subscribe.go (1)

256-287: ⚠️ Potential issue | 🟠 Major

Webhook dispatcher still spawns unbounded goroutines — this was flagged before.

Line 258 spawns a new goroutine for every dequeued message. A hot subscription will fan out to unbounded concurrent POSTs regardless of --webhook-queue-size. The queue provides zero backpressure.

Additionally, the wq channel is never closed, so the outer goroutine (line 256) leaks on shutdown.

🔒️ Fixed worker pool with proper shutdown
 	var wq chan webhookMsg
 	if webhookURL != "" && webhookQueueSize > 0 {
 		wq = make(chan webhookMsg, webhookQueueSize)
+		const webhookWorkers = 4
+		for w := 0; w < webhookWorkers; w++ {
 		go func() {
 			for msg := range wq {
-				go func(payload []byte) {
 					wctx, wcancel := context.WithTimeout(context.Background(), time.Duration(webhookTimeoutSecs)*time.Second)
-					defer wcancel()
 
-					formattedPayload, fmtErr := webhookFormatter.FormatMessage(payload, subTopic, clientIDToUse, "grpc-msg")
+					formattedPayload, fmtErr := webhookFormatter.FormatMessage(msg.data, subTopic, clientIDToUse, "grpc-msg")
 					if fmtErr != nil {
 						fmt.Printf("Failed to format webhook payload: %v\n", fmtErr)
+						wcancel()
-						return
+						continue
 					}
 
 					req, reqErr := http.NewRequestWithContext(wctx, "POST", webhookURL, bytes.NewBuffer(formattedPayload))
 					if reqErr != nil {
 						fmt.Printf("Failed to create webhook request: %v\n", reqErr)
+						wcancel()
-						return
+						continue
 					}
 					if webhookSchema != "" {
 						req.Header.Set("Content-Type", "application/json")
 					}
 					resp, doErr := http.DefaultClient.Do(req)
+					wcancel()
 					if doErr != nil {
 						fmt.Printf("Webhook request error: %v\n", doErr)
-						return
+						continue
 					}
-					defer resp.Body.Close()
+					resp.Body.Close()
 					if resp.StatusCode >= 400 {
 						fmt.Printf("Webhook responded with status code: %d\n", resp.StatusCode)
 					}
-				}(msg.data)
 			}
 		}()
+		}
 	}

Then close wq after the message loop exits to signal workers to terminate:

// After the select block that waits for sigChan/doneChan:
if wq != nil {
    close(wq)
}

Based on learnings: "Watch for goroutine leaks, unbounded channels, and racy access; ensure contexts are plumbed and respected."

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cmd/subscribe.go` around lines 256 - 287, The webhook dispatcher currently
spawns an unbounded goroutine per message (the inner go func around msg.data)
and never closes wq, causing unbounded concurrent POSTs and a leak; replace the
per-message goroutine with a fixed worker pool: spawn N worker goroutines once
(workers read from wq and perform webhookFormatter.FormatMessage,
http.NewRequestWithContext using webhookTimeoutSecs, http.DefaultClient.Do, and
resp.Body.Close) and use a sync.WaitGroup to wait for workers to finish; on
shutdown (after the select that waits for sigChan/doneChan) close(wq) to signal
workers to exit and wait for the WaitGroup so all inflight requests finish
before process exit.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@cmd/publish.go`:
- Around line 214-223: The debug block re-parses resp.Data causing redundant
JSON unmarshalling; modify shortMsgID(resp) to return both the short and full
message ID (e.g., change signature to return (shortID, fullID string)) or add a
helper extractMsgID(resp) that unmarshals resp.Data once and returns both
values, then use the returned fullID for the IsDebugMode() print instead of
calling json.Unmarshal again; update callers (including where shortMsgID is used
for suffix) to accept the new return and remove the second json.Unmarshal in the
debug block.

In `@cmd/subscribe.go`:
- Around line 310-337: The block handling debug vs non-debug printing has
inconsistent indentation (around the IsDebugMode() branch using messageCount,
printDebugReceiveInfo, p2pMsg handling) causing a visual merge artifact; fix it
by reindenting the entire if/else so the inner lines (the IsDebugMode() true
branch from the atomic.AddInt32(&messageCount, 1) through the p2pMsg prints and
the else branch using isReadable(decodedMsg), atomic.AddInt32(&messageCount, 1),
displayTopic selection (subTopic/msgTopic) and fmt.Printf) align with the
surrounding code block level and ensure matching braces for the if IsDebugMode()
{ ... } else { ... } structure.

In `@internal/session/store.go`:
- Around line 184-186: The warning is being printed to stdout with fmt.Printf;
change the call in the cache-save error branch (where saveCached(c) is checked)
to write to stderr using fmt.Fprintf(os.Stderr, "Warning: could not cache
session: %v\n", saveErr) and ensure the "os" package is imported in the file so
the code builds.
- Around line 88-98: The saveCached function currently writes directly with
os.WriteFile which can corrupt the cache if the process crashes mid-write;
change saveCached to perform an atomic write by writing the marshaled data to a
temporary file in the same directory (use the same cachePath() directory and a
unique temp name), fsync the temp file, close it, then rename (os.Rename) the
temp file to the final cachePath so the replace is atomic; ensure file mode 0600
is applied to the temp file and propagate any errors from tempfile creation,
write, fsync, close, and rename.
- Around line 60-64: Replace the Unix-only syscall.Flock call in
internal/session/store.go with a cross-platform locker: add
github.com/gofrs/flock to imports, create a flock.Flock for the session lock
path instead of calling syscall.Flock(int(f.Fd()), ...), and use
flock.Lock()/TryLock()/Unlock() (or LockContext) to acquire and release the
lock; remove syscall.Flock usage and any direct fd-based locking so the session
acquisition logic (the code currently calling syscall.Flock) works on Windows
and Unix alike. Ensure to handle lock failures and close the underlying file as
before.

---

Duplicate comments:
In `@cmd/subscribe.go`:
- Around line 256-287: The webhook dispatcher currently spawns an unbounded
goroutine per message (the inner go func around msg.data) and never closes wq,
causing unbounded concurrent POSTs and a leak; replace the per-message goroutine
with a fixed worker pool: spawn N worker goroutines once (workers read from wq
and perform webhookFormatter.FormatMessage, http.NewRequestWithContext using
webhookTimeoutSecs, http.DefaultClient.Do, and resp.Body.Close) and use a
sync.WaitGroup to wait for workers to finish; on shutdown (after the select that
waits for sigChan/doneChan) close(wq) to signal workers to exit and wait for the
WaitGroup so all inflight requests finish before process exit.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository: getoptimum/coderabbit/.coderabbit.yaml

Review profile: ASSERTIVE

Plan: Pro

Run ID: d2c0e93a-b89c-436d-82f1-d7f5ca86b2d9

📥 Commits

Reviewing files that changed from the base of the PR and between c27546d and b8968d1.

📒 Files selected for processing (5)
  • README.md
  • cmd/publish.go
  • cmd/root.go
  • cmd/subscribe.go
  • internal/session/store.go

Comment thread cmd/publish.go
Comment on lines +214 to 223
if IsDebugMode() && resp != nil && msgID != "" {
var trace map[string]interface{}
if json.Unmarshal(resp.Data, &trace) == nil {
if mid, ok := trace["messageID"].(string); ok && mid != "" {
fmt.Printf(" message-id: %s\n", mid)
} else if mid, ok := trace["message_id"].(string); ok && mid != "" {
fmt.Printf(" message-id: %s\n", mid)
}
}
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Redundant JSON unmarshal — resp.Data is parsed twice.

shortMsgID(resp) already unmarshals resp.Data at line 199. This block parses it again just to print the full message ID. Consider caching the parsed trace or extracting both values in one pass.

♻️ Avoid double-parsing
-func shortMsgID(resp *pb.Response) string {
+func extractMsgID(resp *pb.Response) (short, full string) {
 	if resp == nil || len(resp.Data) == 0 {
-		return ""
+		return "", ""
 	}
 	var trace map[string]interface{}
 	if err := json.Unmarshal(resp.Data, &trace); err != nil {
-		return ""
+		return "", ""
 	}
-	if mid, ok := trace["messageID"].(string); ok && mid != "" {
-		if len(mid) > 8 {
-			return mid[:8]
-		}
-		return mid
-	}
-	if mid, ok := trace["message_id"].(string); ok && mid != "" {
-		if len(mid) > 8 {
-			return mid[:8]
+	for _, key := range []string{"messageID", "message_id"} {
+		if mid, ok := trace[key].(string); ok && mid != "" {
+			full = mid
+			short = mid
+			if len(short) > 8 {
+				short = short[:8]
+			}
+			return short, full
 		}
-		return mid
 	}
-	return ""
+	return "", ""
 }

Then at call site:

shortID, fullID := extractMsgID(resp)
// use shortID for suffix, fullID for debug output
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cmd/publish.go` around lines 214 - 223, The debug block re-parses resp.Data
causing redundant JSON unmarshalling; modify shortMsgID(resp) to return both the
short and full message ID (e.g., change signature to return (shortID, fullID
string)) or add a helper extractMsgID(resp) that unmarshals resp.Data once and
returns both values, then use the returned fullID for the IsDebugMode() print
instead of calling json.Unmarshal again; update callers (including where
shortMsgID is used for suffix) to accept the new return and remove the second
json.Unmarshal in the debug block.

Comment thread cmd/subscribe.go Outdated
Comment thread internal/session/store.go
Comment thread internal/session/store.go
Comment thread internal/session/store.go
Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

♻️ Duplicate comments (10)
internal/session/client.go (2)

29-42: ⚠️ Potential issue | 🟠 Major

Request still lacks explicit protocol/transport field.

The session request doesn't tell the proxy which transport the CLI needs. Both cmd/publish.go and cmd/subscribe.go immediately use gRPC node clients, so receiving a non-gRPC node causes a failure after session creation rather than at request time.

Proposed fix
 type sessionRequest struct {
 	ClientID     string   `json:"client_id"`
 	Topics       []string `json:"topics"`
+	Protocol     string   `json:"protocol"`
 	Capabilities []string `json:"capabilities"`
 	ExposeAmount uint32   `json:"expose_amount"`
 }

 func CreateSession(proxyURL, clientID, accessToken string, topics, capabilities []string, exposeAmount uint32) (*Session, error) {
 	reqData := sessionRequest{
 		ClientID:     clientID,
 		Topics:       topics,
+		Protocol:     "grpc",
 		Capabilities: capabilities,
 		ExposeAmount: exposeAmount,
 	}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/session/client.go` around lines 29 - 42, The sessionRequest struct
and CreateSession function must include an explicit transport/protocol field so
the proxy knows which node type to return; update the sessionRequest definition
to add a Transport (or Protocol) string field and set it in CreateSession (e.g.,
"grpc") based on the CLI usage (see cmd/publish.go and cmd/subscribe.go which
expect gRPC node clients), ensuring CreateSession populates that field when
building reqData so the proxy can reject non-gRPC nodes at request time.

49-57: ⚠️ Potential issue | 🟠 Major

Plaintext proxy URLs expose bearer tokens on the wire.

The session response carries node tickets, and Authorization: Bearer is sent over the connection. Accepting http:// URLs risks leaking credentials. Consider requiring https:// unless an explicit insecure flag is set.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/session/client.go` around lines 49 - 57, Validate the proxyURL
before creating the POST request in internal/session/client.go: parse proxyURL
(e.g., via url.Parse) and require the scheme to be "https" unless an explicit
insecure option/flag is provided (e.g., a boolean like InsecureSkipVerify or
AllowInsecureProxy passed into the client). If the scheme is not "https" and the
insecure flag is false, return an error instead of proceeding to call
http.NewRequest; if insecure is allowed, proceed but document the risk. Ensure
this check is done before building the request that sets the Authorization
header.
internal/session/store.go (3)

192-194: ⚠️ Potential issue | 🟡 Minor

Warning should go to stderr, not stdout.

Mixing warnings into stdout corrupts piped/scripted output. Use fmt.Fprintf(os.Stderr, ...).

Proposed fix
 	if saveErr := saveCached(c); saveErr != nil {
-		fmt.Printf("Warning: could not cache session: %v\n", saveErr)
+		fmt.Fprintf(os.Stderr, "Warning: could not cache session: %v\n", saveErr)
 	}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/session/store.go` around lines 192 - 194, Replace the stdout warning
print for caching errors with a stderr write: in the block that calls
saveCached(c) (the "if saveErr := saveCached(c); saveErr != nil" branch) change
the fmt.Printf to use fmt.Fprintf with os.Stderr so the warning is written to
stderr (e.g., fmt.Fprintf(os.Stderr, "...", saveErr)); ensure you import "os" if
not already imported.

88-98: 🧹 Nitpick | 🔵 Trivial

Consider atomic write to prevent cache corruption on crash.

os.WriteFile overwrites in place. A crash mid-write leaves truncated/invalid JSON. While the fallback to session creation is safe, write-to-temp + rename provides atomicity if cache integrity matters.

Atomic write pattern
 func saveCached(c *CachedSession) error {
 	p, err := cachePath()
 	if err != nil {
 		return err
 	}
 	data, err := json.MarshalIndent(c, "", "  ")
 	if err != nil {
 		return err
 	}
-	return os.WriteFile(p, data, 0600)
+	tmp := p + ".tmp"
+	if err := os.WriteFile(tmp, data, 0600); err != nil {
+		return err
+	}
+	return os.Rename(tmp, p)
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/session/store.go` around lines 88 - 98, The saveCached function
currently uses os.WriteFile which can leave a truncated JSON file if the process
crashes mid-write; change saveCached to perform an atomic write: call
cachePath() to get the final path, create a temp file in the same directory
(using os.CreateTemp), write the marshaled data to the temp file, fsync the file
(or call File.Sync), close it, set file mode to 0600 if needed, then os.Rename
the temp file to the final path; ensure any errors remove the temp file and
propagate the error. Use the existing saveCached and cachePath identifiers to
locate where to implement this pattern.

60-64: ⚠️ Potential issue | 🔴 Critical

syscall.Flock is Unix-only — Windows builds will fail at runtime.

syscall.Flock constants don't exist on Windows. Since there are no build tags, Windows builds will compile but crash when publish or subscribe attempt to acquire the session lock.

Options:

  1. Add //go:build !windows and provide a Windows-specific stub
  2. Use github.com/gofrs/flock for cross-platform file locking
  3. Document Windows as unsupported with explicit build constraints
#!/bin/bash
# Check if Windows is a supported target in CI or build configs
rg -i 'windows|GOOS.*windows' .github/ Makefile 2>/dev/null || echo "No Windows targets found"

# Check for any build tags in session package
rg '//go:build|//\+build' internal/session/
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/session/store.go` around lines 60 - 64, The code currently calls
syscall.Flock (and uses LOCK_EX) which is Unix-only; replace this with a
cross-platform file lock using github.com/gofrs/flock: add the module import,
create a flock.Flock for the session lock path (instead of calling syscall.Flock
on f.Fd()), call TryLock/Lock (or TryLockContext/LockContext) to acquire the
lock and release it via Unlock, and remove direct uses of syscall.Flock and
LOCK_EX; update the function that currently performs the syscall.Flock call (the
code around the f, f.Fd() and return f) to return the open file after securing
the flock or return the flock handle as needed, ensuring calls that expected the
old behavior (publish/subscribe session lock code) now use the flock API. Ensure
go.mod is updated and imports are added.
README.md (3)

40-46: ⚠️ Potential issue | 🟡 Minor

Disambiguate duplicate "Publish Rate" labels.

Lines 42-43 both use Publish Rate: which is confusing. Update to distinct labels:

 Rate Limits:
 ------------
-Publish Rate:  50000 per hour
-Publish Rate:  600 per second
+Publish Rate (hour):    50000
+Publish Rate (second):  600
 Max Message Size:  10.00 MB
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@README.md` around lines 40 - 46, Update the two identical "Publish Rate:"
labels in README.md so they are distinct and unambiguous (e.g., "Publish Rate
(hourly): 50000 per hour" and "Publish Rate (per second): 600 per second");
locate the block containing the Rate Limits table and rename the labels
accordingly to clarify which value is hourly vs per-second.

227-235: ⚠️ Potential issue | 🟡 Minor

Narrow the --disable-auth scope claim.

"All commands support..." is too broad. Auth lifecycle commands (login, refresh) don't meaningfully use this flag. Scope the statement to data-plane commands.

-All commands support `--disable-auth --client-id <id>` to skip Auth0.
+Data-plane commands (`publish`, `subscribe`, `list-topics`) support `--disable-auth --client-id <id>` to skip Auth0 for testing.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@README.md` around lines 227 - 235, Update the README wording to narrow the
scope of the --disable-auth flag: change the sentence "All commands support
`--disable-auth --client-id <id>`..." to specify that only data-plane commands
(e.g., subscribe, publish, list-topics) support `--disable-auth --client-id
<id>`, and explicitly exclude auth lifecycle commands (e.g., login, refresh)
from that claim so readers know those auth commands do not meaningfully use the
flag.

13-24: ⚠️ Potential issue | 🟡 Minor

Add text language tag to plain-output fences.

Fenced code blocks showing CLI output should specify a language tag to satisfy markdownlint MD040. Use ```text for these output samples.

Also applies to lines: 32, 54, 66, 82, 91, 118, 136, 152, 165, 181, 202, 216

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@README.md` around lines 13 - 24, The fenced code blocks that show CLI output
(for example the block starting with "Initiating authentication..." and ending
with "Token expires at: 07 Apr 26 23:51 IST") must use the text language tag to
satisfy markdownlint MD040; update those backticks from ``` to ```text for that
block and the other similar output samples referenced (the CLI/output samples
listed in the comment) so all plain-output fences use ```text.
cmd/publish.go (1)

217-226: 🧹 Nitpick | 🔵 Trivial

Redundant JSON unmarshal — resp.Data is parsed twice.

shortMsgID(resp) at line 202 already unmarshals resp.Data. This debug block parses it again just to print the full message ID. Extract both values in one pass.

Refactor to avoid double-parsing
-func shortMsgID(resp *pb.Response) string {
+func extractMsgID(resp *pb.Response) (short, full string) {
 	if resp == nil || len(resp.Data) == 0 {
-		return ""
+		return "", ""
 	}
 	var trace map[string]interface{}
 	if err := json.Unmarshal(resp.Data, &trace); err != nil {
-		return ""
+		return "", ""
 	}
-	if mid, ok := trace["messageID"].(string); ok && mid != "" {
-		if len(mid) > 8 {
-			return mid[:8]
-		}
-		return mid
-	}
-	if mid, ok := trace["message_id"].(string); ok && mid != "" {
-		if len(mid) > 8 {
-			return mid[:8]
+	for _, key := range []string{"messageID", "message_id"} {
+		if mid, ok := trace[key].(string); ok && mid != "" {
+			full = mid
+			short = mid
+			if len(short) > 8 {
+				short = short[:8]
+			}
+			return short, full
 		}
-		return mid
 	}
-	return ""
+	return "", ""
 }

Then at call site:

shortID, fullID := extractMsgID(resp)
// use shortID for suffix, fullID for debug output
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cmd/publish.go` around lines 217 - 226, The debug block is re-unmarshaling
resp.Data which shortMsgID(resp) already parsed; refactor to parse once by
extending shortMsgID (or adding extractMsgID) to return both the shortID and
fullID (e.g., shortID, fullID := shortMsgID(resp) or extractMsgID(resp)), then
use the returned fullID in the IsDebugMode() branch instead of re-running
json.Unmarshal on resp.Data; update the call site where shortMsgID is used
(around line 202) to receive both values and print the fullID in the debug
fmt.Printf while keeping the shortID use for suffix logic.
cmd/subscribe.go (1)

259-261: ⚠️ Potential issue | 🟠 Major

The webhook queue still does not bound concurrency.

Dequeuing from wq immediately starts another goroutine, so --webhook-queue-size no longer provides real backpressure under load. Process inline in a fixed worker pool instead of re-fanning out after dequeue.

Based on learnings: Applies to **/*.go : Watch for goroutine leaks, unbounded channels, racy access; ensure contexts are plumbed and respected

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cmd/subscribe.go` around lines 259 - 261, The current loop ranges over wq and
starts a new goroutine per message (the inner anonymous func with signature
func(payload []byte)), which defeats the --webhook-queue-size backpressure and
can leak goroutines; replace this fan-out with a fixed worker pool that reads
from wq and processes payloads inline (i.e., remove the inner go call and have N
worker goroutines created once) or implement N long-lived workers that call a
single processing function (e.g., processWebhook(payload []byte)) so dequeueing
blocks when wq is full; ensure workers respect the request context/cancellation
and that shutdown closes wq or cancels context to avoid leaks.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@cmd/subscribe.go`:
- Around line 329-332: The code currently does "if !isReadable(decodedMsg) {
continue }" which drops unreadable (binary/empty) messages before persistence
and webhook delivery; instead remove that continue and only use
isReadable(decodedMsg) to decide whether to render to stdout. Ensure the
persistence logic (the block that writes messages to storage/database) and the
webhook dispatch code still run for all messages (including when isReadable is
false), preserving the raw or original payload and any metadata; keep the
stdout/console print gated by isReadable so only printable payloads are
rendered.
- Around line 361-365: The shutdown currently calls cancel() on SIGINT and
returns immediately while background goroutines (the reader that writes to
persistFile and the webhook worker goroutines that enqueue/process via wq) may
still be active; modify the SIGINT branch to, after calling cancel(), wait for
doneChan to be closed (ensuring the reader goroutine has finished), then close
the work queue (wq) and join/stop the webhook workers before returning so no
goroutines are still writing to persistFile or enqueuing work; ensure the
deferred cleanup that closes persistFile runs after workers have exited and that
contexts passed into the reader and webhook worker functions respect the
canceled context.
- Around line 182-183: The subscription and webhook contexts are being created
from context.Background(), severing cancellation from the Cobra command; change
the root to use cmd.Context() instead: replace
context.WithCancel(context.Background()) used to create ctx/cancel for the
subscription with context.WithCancel(cmd.Context()), and ensure any webhook
timeout contexts (the places creating timeouts for webhook goroutines) are
derived from cmd.Context() (e.g., use context.WithTimeout(cmd.Context(), ...) or
context.WithCancel(cmd.Context())) so webhook goroutines will respect the
command's cancellation and shutdown semantics.

In `@docs/guide.md`:
- Around line 67-74: Markdown code fences in the docs/guide.md snippet (the
fenced block showing MUMP2P_AUTH_PATH and mump2p commands) lack blank lines
before and after the ``` markers, which triggers MD031; add a single blank line
immediately before the opening ``` and immediately after the closing ``` for
that block and for the other fenced blocks referenced (the blocks at lines
132-135, 195-197, 200-202, and 205-207) so each code fence is surrounded by one
blank line on both sides to satisfy markdownlint.

In `@internal/session/client.go`:
- Around line 84-86: CreateSession currently only checks for len(sess.Nodes) ==
0; update it to validate that len(sess.Nodes) >= exposeAmount (the same
condition used by isUsable in store.go) before returning the session. Locate the
CreateSession function and replace the simple empty check on sess.Nodes with a
check comparing to exposeAmount (or the variable/parameter that represents the
requested exposeAmount), and return an error like "not enough nodes to satisfy
exposeAmount" when the condition fails so cache validity and session creation
are consistent.

In `@Makefile`:
- Line 10: The Makefile default SERVICE_URL now points to the production proxy
which forces local builds to target production unless overridden; update project
docs and/or the Makefile to make local development easier by either (A)
documenting the need to override SERVICE_URL in README.md (add a short note
under "Development" showing the recommended make invocation such as make build
SERVICE_URL=http://localhost:12080) and referencing the SERVICE_URL variable, or
(B) add a new Makefile target named build-local that sets SERVICE_URL to
http://localhost:12080 and delegates to the existing build target (so developers
can run make build-local), and ensure the README references the new build-local
target and the SERVICE_URL variable.

---

Duplicate comments:
In `@cmd/publish.go`:
- Around line 217-226: The debug block is re-unmarshaling resp.Data which
shortMsgID(resp) already parsed; refactor to parse once by extending shortMsgID
(or adding extractMsgID) to return both the shortID and fullID (e.g., shortID,
fullID := shortMsgID(resp) or extractMsgID(resp)), then use the returned fullID
in the IsDebugMode() branch instead of re-running json.Unmarshal on resp.Data;
update the call site where shortMsgID is used (around line 202) to receive both
values and print the fullID in the debug fmt.Printf while keeping the shortID
use for suffix logic.

In `@cmd/subscribe.go`:
- Around line 259-261: The current loop ranges over wq and starts a new
goroutine per message (the inner anonymous func with signature func(payload
[]byte)), which defeats the --webhook-queue-size backpressure and can leak
goroutines; replace this fan-out with a fixed worker pool that reads from wq and
processes payloads inline (i.e., remove the inner go call and have N worker
goroutines created once) or implement N long-lived workers that call a single
processing function (e.g., processWebhook(payload []byte)) so dequeueing blocks
when wq is full; ensure workers respect the request context/cancellation and
that shutdown closes wq or cancels context to avoid leaks.

In `@internal/session/client.go`:
- Around line 29-42: The sessionRequest struct and CreateSession function must
include an explicit transport/protocol field so the proxy knows which node type
to return; update the sessionRequest definition to add a Transport (or Protocol)
string field and set it in CreateSession (e.g., "grpc") based on the CLI usage
(see cmd/publish.go and cmd/subscribe.go which expect gRPC node clients),
ensuring CreateSession populates that field when building reqData so the proxy
can reject non-gRPC nodes at request time.
- Around line 49-57: Validate the proxyURL before creating the POST request in
internal/session/client.go: parse proxyURL (e.g., via url.Parse) and require the
scheme to be "https" unless an explicit insecure option/flag is provided (e.g.,
a boolean like InsecureSkipVerify or AllowInsecureProxy passed into the client).
If the scheme is not "https" and the insecure flag is false, return an error
instead of proceeding to call http.NewRequest; if insecure is allowed, proceed
but document the risk. Ensure this check is done before building the request
that sets the Authorization header.

In `@internal/session/store.go`:
- Around line 192-194: Replace the stdout warning print for caching errors with
a stderr write: in the block that calls saveCached(c) (the "if saveErr :=
saveCached(c); saveErr != nil" branch) change the fmt.Printf to use fmt.Fprintf
with os.Stderr so the warning is written to stderr (e.g., fmt.Fprintf(os.Stderr,
"...", saveErr)); ensure you import "os" if not already imported.
- Around line 88-98: The saveCached function currently uses os.WriteFile which
can leave a truncated JSON file if the process crashes mid-write; change
saveCached to perform an atomic write: call cachePath() to get the final path,
create a temp file in the same directory (using os.CreateTemp), write the
marshaled data to the temp file, fsync the file (or call File.Sync), close it,
set file mode to 0600 if needed, then os.Rename the temp file to the final path;
ensure any errors remove the temp file and propagate the error. Use the existing
saveCached and cachePath identifiers to locate where to implement this pattern.
- Around line 60-64: The code currently calls syscall.Flock (and uses LOCK_EX)
which is Unix-only; replace this with a cross-platform file lock using
github.com/gofrs/flock: add the module import, create a flock.Flock for the
session lock path (instead of calling syscall.Flock on f.Fd()), call
TryLock/Lock (or TryLockContext/LockContext) to acquire the lock and release it
via Unlock, and remove direct uses of syscall.Flock and LOCK_EX; update the
function that currently performs the syscall.Flock call (the code around the f,
f.Fd() and return f) to return the open file after securing the flock or return
the flock handle as needed, ensuring calls that expected the old behavior
(publish/subscribe session lock code) now use the flock API. Ensure go.mod is
updated and imports are added.

In `@README.md`:
- Around line 40-46: Update the two identical "Publish Rate:" labels in
README.md so they are distinct and unambiguous (e.g., "Publish Rate (hourly):
50000 per hour" and "Publish Rate (per second): 600 per second"); locate the
block containing the Rate Limits table and rename the labels accordingly to
clarify which value is hourly vs per-second.
- Around line 227-235: Update the README wording to narrow the scope of the
--disable-auth flag: change the sentence "All commands support `--disable-auth
--client-id <id>`..." to specify that only data-plane commands (e.g., subscribe,
publish, list-topics) support `--disable-auth --client-id <id>`, and explicitly
exclude auth lifecycle commands (e.g., login, refresh) from that claim so
readers know those auth commands do not meaningfully use the flag.
- Around line 13-24: The fenced code blocks that show CLI output (for example
the block starting with "Initiating authentication..." and ending with "Token
expires at: 07 Apr 26 23:51 IST") must use the text language tag to satisfy
markdownlint MD040; update those backticks from ``` to ```text for that block
and the other similar output samples referenced (the CLI/output samples listed
in the comment) so all plain-output fences use ```text.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository: getoptimum/coderabbit/.coderabbit.yaml

Review profile: ASSERTIVE

Plan: Pro

Run ID: 928dd644-0877-460f-b1a4-b368111332d2

📥 Commits

Reviewing files that changed from the base of the PR and between c27546d and 0e7c5d1.

📒 Files selected for processing (9)
  • Makefile
  • README.md
  • cmd/auth.go
  • cmd/publish.go
  • cmd/root.go
  • cmd/subscribe.go
  • docs/guide.md
  • internal/session/client.go
  • internal/session/store.go

Comment thread cmd/subscribe.go
Comment thread cmd/subscribe.go Outdated
Comment on lines +329 to +332
} else {
if !isReadable(decodedMsg) {
continue
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Do not drop unreadable payloads here.

This continue skips the persistence and webhook blocks below, so valid binary or empty messages are silently lost in non-debug mode. Gate only the stdout rendering on isReadable, not the rest of the delivery pipeline.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cmd/subscribe.go` around lines 329 - 332, The code currently does "if
!isReadable(decodedMsg) { continue }" which drops unreadable (binary/empty)
messages before persistence and webhook delivery; instead remove that continue
and only use isReadable(decodedMsg) to decide whether to render to stdout.
Ensure the persistence logic (the block that writes messages to
storage/database) and the webhook dispatch code still run for all messages
(including when isReadable is false), preserving the raw or original payload and
any metadata; keep the stdout/console print gated by isReadable so only
printable payloads are rendered.

Comment thread cmd/subscribe.go
Comment thread docs/guide.md
Comment thread internal/session/client.go
Comment thread Makefile
Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

♻️ Duplicate comments (1)
cmd/subscribe.go (1)

222-245: ⚠️ Potential issue | 🟠 Major

Backup nodes are displayed but never used for stream reconnection.

The code builds and displays a list of backup nodes ("1 backup node ready"), but if the active stream fails after initial connection, the command exits without attempting reconnection. The UX suggests failover capability that doesn't actually exist at runtime.

Either remove the "backup nodes ready" messaging, or implement reconnection logic that attempts backup nodes when msgChan closes unexpectedly.

Option A: Remove misleading messaging
-		backupSuffix := ""
-		if len(backupNodes) == 1 {
-			backupSuffix = " — 1 backup node ready"
-		} else if len(backupNodes) > 1 {
-			backupSuffix = fmt.Sprintf(" — %d backup nodes ready", len(backupNodes))
-		}
-
-		fmt.Printf("Subscribed to '%s' on %s (%s) in %s%s\n",
-			subTopic, connectedNode.Address, region, humanDuration(connectDur), backupSuffix)
-
-		for _, bn := range backupNodes {
-			r := bn.Region
-			if r == "" {
-				r = "unknown"
-			}
-			fmt.Printf("  backup: %s (%s)\n", bn.Address, r)
-		}
+		fmt.Printf("Subscribed to '%s' on %s (%s) in %s\n",
+			subTopic, connectedNode.Address, region, humanDuration(connectDur))
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cmd/subscribe.go` around lines 222 - 245, The code currently builds and
prints backupNodes but never uses them on stream failure; update the subscriber
to actually attempt failover: when msgChan closes/unblocks (indicating the
active stream ended), iterate over backupNodes and try to reconnect to each node
(updating connectedNode and re-establishing the message stream/channel), break
on first successful reconnection and continue normal processing, and if all
backups fail then exit with an error; ensure printed status reflects the new
connectedNode and that the previously-built backupNodes slice is reused for
retries.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Duplicate comments:
In `@cmd/subscribe.go`:
- Around line 222-245: The code currently builds and prints backupNodes but
never uses them on stream failure; update the subscriber to actually attempt
failover: when msgChan closes/unblocks (indicating the active stream ended),
iterate over backupNodes and try to reconnect to each node (updating
connectedNode and re-establishing the message stream/channel), break on first
successful reconnection and continue normal processing, and if all backups fail
then exit with an error; ensure printed status reflects the new connectedNode
and that the previously-built backupNodes slice is reused for retries.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository: getoptimum/coderabbit/.coderabbit.yaml

Review profile: ASSERTIVE

Plan: Pro

Run ID: ae7445cb-79bd-4c82-96ac-008f2fb89be6

📥 Commits

Reviewing files that changed from the base of the PR and between 0e7c5d1 and a3e9150.

📒 Files selected for processing (1)
  • cmd/subscribe.go

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Migrate the CLI based on ADR 0002

1 participant