From f5838bddb7ac3a468c3220954764b17e8559774b Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Wed, 20 May 2026 15:31:20 +0900 Subject: [PATCH 1/5] adapter/sqs: AdminPeekQueue backend (non-destructive message sample) Phase 3 of the Admin Queue Peek and Purge design (docs/design/2026_05_16_proposed_admin_purge_queue.md). Implements the read-only counterpart to AdminPurgeQueue: AdminPeekQueue(ctx, principal, name, opts) ([]AdminPeekedMessage, nextCursor string, error) Peek walks the leader's visibility index at the next read timestamp, returns up to opts.Limit currently-visible messages (visible_at less than or equal to now), and bumps NO receive counts. No receipt handle is minted; no visibility timer is started. Pure read. New surface: - AdminPeekedAttribute mirrors the typed SQS MessageAttribute shape so binary payloads and the DataType discriminator survive the SPA round-trip (Codex r11 on the design doc). - AdminPeekedMessage carries body (truncated per BodyMaxBytes), BodyTruncated, BodyOriginalSize, SentTimestamp, ReceiveCount, GroupID, DeduplicationID, and the typed attribute map. - AdminPeekMessageOptions{Limit, Cursor, BodyMaxBytes} with documented defaults (20, "", 4096) and bounds ([1, 100], 512-byte cursor cap, [256, 262144] body cap). - peekCursor (versioned JSON, base64url-wrapped) so the SPA can page across partition boundaries deterministically. Cursor pins Generation so a purge between pages forces the SPA to refresh from the front (ErrAdminSQSValidation rather than silently surfacing rows from a purged generation). - AdminRole.canRead() - peek requires non-zero role; both read-only and full principals satisfy the gate. Purge stays on canWrite() so role-only principals can triage but cannot drop the queue. Partitioned FIFO support: peek uses rotated sequential scanning over partitions, picking the start partition via the same receiveFanoutCounters the receive path uses for fairness. The cursor encodes both StartPartition (for the wrap detection) and the current Partition + LastKey so subsequent pages continue deterministically. Caller audit (semantic-change rule): no existing function signatures changed. canRead is a new method; AdminPeekQueue is a new entrypoint. The sqsAttributeBaseType{String,Number} constant extraction in sqs_messages.go fixes a pre-existing goconst regression that the larger test surface tripped; the validator switch now reads canonical constants instead of inline literals. Tests (adapter/sqs_admin_peek_test.go) cover: - happy path with body / MessageID / SentTimestamp assertions - peek does NOT bump ApproximateReceiveCount across two calls - body truncation reports BodyTruncated + BodyOriginalSize - Limit clamping (0 to default 20, 500 to max 100) - cursor round-trip across two pages, no overlap, complete coverage - stale-generation cursor after purge to ErrAdminSQSValidation - oversize / bad-base64 / bad-JSON / wrong-version cursors all map to ErrAdminSQSValidation - read-only principal allowed; role-less denied - missing queue to ErrAdminSQSNotFound - empty / whitespace name to ErrAdminSQSValidation - DelaySeconds > 0 hides the message until visible_at elapses - FIFO MessageGroupId / MessageDeduplicationId both projected - typed MessageAttribute (String + Binary) round-trips - partitioned FIFO (PartitionCount=4): every sent message surfaces exactly once across the union of paginated peek pages, regardless of the rotated start partition - cursor codec unit tests: round-trip, empty-cursor sentinel, length cap, generation mismatch - clampPeekLimit / clampPeekBodyBytes truth tables - projectPeekedAttributes nil-safety and DataType preservation - AdminRole.canRead() truth table Out of scope (deferred to a follow-up): - Throttle integration (bucketActionAdminPeek + dedicated per-queue admin-peek bucket per design doc 3.1). Leader-only + Limit cap already bound the cost; the dedicated bucket adds a steady-rate cap that should land alongside the SPA wiring so the rate-limit metric has a real consumer. - HTTP handler + bridge - Phase 4. - principalForReadSensitive live RoleStore re-check - depends on the wider RoleStore plumbing landing first (design doc Goal 8, noted as a future direction). --- adapter/dynamodb_admin.go | 10 + adapter/sqs_admin_peek.go | 469 +++++++++++++++++++ adapter/sqs_admin_peek_test.go | 832 +++++++++++++++++++++++++++++++++ adapter/sqs_messages.go | 4 +- 4 files changed, 1314 insertions(+), 1 deletion(-) create mode 100644 adapter/sqs_admin_peek.go create mode 100644 adapter/sqs_admin_peek_test.go diff --git a/adapter/dynamodb_admin.go b/adapter/dynamodb_admin.go index 63b05897..1b815ce6 100644 --- a/adapter/dynamodb_admin.go +++ b/adapter/dynamodb_admin.go @@ -83,6 +83,16 @@ const ( // future "delete-only" tier reads consistently across the package. func (r AdminRole) canWrite() bool { return r == AdminRoleFull } +// canRead reports whether the role authorises non-destructive but +// sensitive reads — e.g. SQS AdminPeekQueue, which exposes message +// bodies / attributes. Both AdminRoleReadOnly and AdminRoleFull +// satisfy this gate; the zero value (unauthenticated / role-less +// principal) does not. List / Describe paths use the looser +// session-auth gate because their output is queue metadata already +// shown on the SPA list page; peek is divergent because the payload +// is the message bodies themselves. +func (r AdminRole) canRead() bool { return r == AdminRoleReadOnly || r == AdminRoleFull } + // AdminPrincipal is the authentication context every admin write // entrypoint takes. The adapter re-evaluates authorisation against // this principal *itself* — it does not trust the caller to have diff --git a/adapter/sqs_admin_peek.go b/adapter/sqs_admin_peek.go new file mode 100644 index 00000000..0ca9e37b --- /dev/null +++ b/adapter/sqs_admin_peek.go @@ -0,0 +1,469 @@ +package adapter + +import ( + "bytes" + "context" + "encoding/base64" + "strings" + "time" + + "github.com/bootjp/elastickv/store" + "github.com/cockroachdb/errors" + json "github.com/goccy/go-json" +) + +// AdminPeekedAttribute mirrors the typed shape SQS uses for +// MessageAttribute values — DataType (e.g. "String", "Number", +// "Binary", "String.MyCustom") plus the value in the appropriate +// representation. The earlier draft used map[string]string here, +// which would have flattened the typed attribute set stored in +// sqsMessageRecord.MessageAttributes and silently dropped binary +// payloads + the DataType discriminator (Codex r11 on the design +// doc). Operators triaging a DLQ need both — a message routed there +// because of an attribute-encoding mismatch is invisible if peek +// only surfaces stringified values. +type AdminPeekedAttribute struct { + DataType string `json:"data_type"` + StringValue string `json:"string_value,omitempty"` + // BinaryValue carries the raw bytes; the JSON wire form + // base64-encodes (standard Go encoding/json behaviour for + // []byte) so binary payloads survive the SPA round-trip. + BinaryValue []byte `json:"binary_value,omitempty"` +} + +// AdminPeekedMessage is one row in the peek result. +type AdminPeekedMessage struct { + MessageID string + Body string // truncated per opts.BodyMaxBytes + BodyTruncated bool // true when Body was cut + BodyOriginalSize int64 // bytes in the original body, for display + SentTimestamp time.Time // SQS SentTimestamp + ReceiveCount int32 // ApproximateReceiveCount + GroupID string // FIFO MessageGroupId, empty for standard + DeduplicationID string // FIFO MessageDeduplicationId, empty for standard + Attributes map[string]AdminPeekedAttribute // typed SQS message attributes +} + +// AdminPeekMessageOptions controls a peek call. Zero values map to +// the documented defaults: Limit=20, Cursor=empty, BodyMaxBytes=4096. +type AdminPeekMessageOptions struct { + // Limit caps the number of messages returned. Clamped to + // [1, adminPeekMaxLimit]; 0 means "use default + // (adminPeekDefaultLimit)". + Limit int + // Cursor is an opaque continuation token from a prior call; + // empty means "start from the front of the visibility index". + Cursor string + // BodyMaxBytes truncates message bodies at this length to + // bound response size. Clamped to + // [adminPeekMinBodyBytes, sqsMaximumAllowedMaximumMessageSize] + // (= 256 KiB, matching AWS SQS's hard cap on stored message + // size). 0 means "use default (adminPeekDefaultBodyBytes)". + // The full body is always retained on the server; only the + // wire representation is truncated. + BodyMaxBytes int +} + +const ( + // adminPeekDefaultLimit is the row count an empty + // AdminPeekMessageOptions.Limit maps to. Matches the SPA's + // default Messages-tab page size so the cheapest call (default + // opts) is the one the SPA issues most often. + adminPeekDefaultLimit = 20 + // adminPeekMaxLimit caps Limit. The hard ceiling exists so an + // operator script cannot accidentally issue million-row peeks + // against the leader. + adminPeekMaxLimit = 100 + // adminPeekDefaultBodyBytes is the truncation length applied + // when AdminPeekMessageOptions.BodyMaxBytes is zero. 4 KiB + // keeps a default 20-row response well under typical JSON + // budgets while still covering the vast majority of message + // payloads in real DLQ triage workflows. + adminPeekDefaultBodyBytes = 4096 + // adminPeekMinBodyBytes is the smallest legal BodyMaxBytes + // (after the zero-means-default mapping). Anything smaller + // would be a probable client bug: 256 bytes still fits a JSON + // preview the SPA can render, so we round up. + adminPeekMinBodyBytes = 256 + // adminPeekCursorMaxBytes hard-caps the encoded cursor size. + // Anything larger is either client-supplied junk or a sign + // that a future cursor field grew unbounded; either way the + // admin handler returns ErrAdminSQSValidation. + adminPeekCursorMaxBytes = 512 + // peekCursorSchemaV1 pins the cursor wire format. Bumping + // requires a corresponding decoder branch. + peekCursorSchemaV1 = 1 +) + +// peekCursor is the wire shape of the continuation token. JSON-encoded +// then base64url-wrapped so the SPA can pass it back unchanged. The +// version field exists so a future field rename can be handled +// explicitly instead of silently mis-decoding old SPA tabs after a +// rolling deploy. +type peekCursor struct { + V int `json:"v"` // schema version, currently peekCursorSchemaV1 + Generation uint64 `json:"gen"` // queue generation at scan start + StartPartition uint32 `json:"sp,omitempty"` // partition where this peek walk began (partitioned only) + Partition uint32 `json:"p,omitempty"` // current partition (advances during the walk) + LastKey []byte `json:"k,omitempty"` // last scanned visibility-index key +} + +// errPeekCursorTooLarge is the sentinel returned when encodePeekCursor +// would emit a base64 token larger than adminPeekCursorMaxBytes. A +// fresh cursor with a vis-index key tops out around ~150-200 bytes +// encoded, so hitting this is a regression flag rather than an +// expected wire-level failure: surfacing it as an internal error +// keeps the response shape predictable and pinpoints whoever added +// the bloated field. +var errPeekCursorTooLarge = errors.New("admin peek: encoded cursor exceeds maximum size") + +// encodePeekCursor JSON-encodes the cursor then base64url-wraps it. +// Returns the empty string when the walk has fully completed +// (caller-supplied sentinel: an empty cursor means "no more pages"). +func encodePeekCursor(c *peekCursor) (string, error) { + if c == nil { + return "", nil + } + raw, err := json.Marshal(c) + if err != nil { + return "", errors.WithStack(err) + } + out := base64.RawURLEncoding.EncodeToString(raw) + if len(out) > adminPeekCursorMaxBytes { + return "", errors.WithStack(errPeekCursorTooLarge) + } + return out, nil +} + +// decodePeekCursor unwraps the base64url + JSON envelope. Returns +// ErrAdminSQSValidation on any wire-shape error (oversize, malformed +// base64, malformed JSON, unsupported schema version). The empty +// string returns (nil, nil) — callers treat that as "start from the +// front". +func decodePeekCursor(s string) (*peekCursor, error) { + if s == "" { + return nil, nil + } + if len(s) > adminPeekCursorMaxBytes { + return nil, errors.Wrap(ErrAdminSQSValidation, "admin peek: cursor too large") + } + raw, err := base64.RawURLEncoding.DecodeString(s) + if err != nil { + return nil, errors.Wrap(ErrAdminSQSValidation, "admin peek: cursor is not valid base64url") + } + var c peekCursor + if err := json.Unmarshal(raw, &c); err != nil { + return nil, errors.Wrap(ErrAdminSQSValidation, "admin peek: cursor is not valid JSON") + } + if c.V != peekCursorSchemaV1 { + return nil, errors.Wrapf(ErrAdminSQSValidation, "admin peek: cursor schema version %d unsupported", c.V) + } + return &c, nil +} + +// clampPeekLimit folds a user-supplied Limit into the legal +// [1, adminPeekMaxLimit] range, mapping 0 to adminPeekDefaultLimit. +func clampPeekLimit(limit int) int { + if limit <= 0 { + return adminPeekDefaultLimit + } + if limit > adminPeekMaxLimit { + return adminPeekMaxLimit + } + return limit +} + +// clampPeekBodyBytes folds a user-supplied BodyMaxBytes into +// [adminPeekMinBodyBytes, sqsMaximumAllowedMaximumMessageSize], +// mapping 0 to adminPeekDefaultBodyBytes. +func clampPeekBodyBytes(b int) int { + if b <= 0 { + return adminPeekDefaultBodyBytes + } + if b < adminPeekMinBodyBytes { + return adminPeekMinBodyBytes + } + if b > sqsMaximumAllowedMaximumMessageSize { + return sqsMaximumAllowedMaximumMessageSize + } + return b +} + +// AdminPeekQueue returns a non-destructive sample of currently-visible +// messages in name. Receive counts are NOT incremented and visibility +// timers are NOT started — peek is a pure read over the leader's +// visibility index. Returns the rows plus a continuation cursor that +// the caller passes back as opts.Cursor to fetch the next page (empty +// when the walk is complete). +// +// Sentinel errors: +// - ErrAdminForbidden — peek requires read role; nil principal is denied +// - ErrAdminNotLeader — peek runs on the leader (the visibility +// index is leader-only-written; a follower read would race the +// leader's apply) +// - ErrAdminSQSNotFound — queue absent +// - ErrAdminSQSValidation — empty / malformed name, malformed / +// oversized / stale-generation cursor +func (s *SQSServer) AdminPeekQueue( + ctx context.Context, + principal AdminPrincipal, + name string, + opts AdminPeekMessageOptions, +) ([]AdminPeekedMessage, string, error) { + if !principal.Role.canRead() { + return nil, "", ErrAdminForbidden + } + if !isVerifiedSQSLeader(ctx, s.coordinator) { + return nil, "", ErrAdminNotLeader + } + if strings.TrimSpace(name) == "" { + return nil, "", ErrAdminSQSValidation + } + limit := clampPeekLimit(opts.Limit) + bodyMaxBytes := clampPeekBodyBytes(opts.BodyMaxBytes) + cursor, err := decodePeekCursor(opts.Cursor) + if err != nil { + return nil, "", err + } + readTS := s.nextTxnReadTS(ctx) + meta, exists, err := s.loadQueueMetaAt(ctx, name, readTS) + if err != nil { + return nil, "", errors.WithStack(err) + } + if !exists { + return nil, "", ErrAdminSQSNotFound + } + cursor, err = preparePeekCursor(cursor, meta, s.nextReceiveFanoutStart(name, meta.PartitionCount)) + if err != nil { + return nil, "", err + } + rows, nextCursor, err := s.walkPeek(ctx, name, meta, readTS, cursor, limit, bodyMaxBytes) + if err != nil { + return nil, "", err + } + encoded, err := encodePeekCursor(nextCursor) + if err != nil { + return nil, "", err + } + return rows, encoded, nil +} + +// preparePeekCursor builds the effective cursor for this call. On the +// first page (cursor==nil) it stamps Generation + a rotated +// StartPartition (partitioned queues only — non-partitioned start at +// 0). On a follow-up page it validates the stored Generation matches +// the queue's current generation; a mismatch returns +// ErrAdminSQSValidation so the SPA refreshes from the front rather +// than silently surfacing messages from a purged generation. +// +// startPartition is computed by the caller (the SQSServer method's +// nextReceiveFanoutStart) so this helper stays method-free for +// unit-testability. Non-partitioned queues pass 0. +func preparePeekCursor(cursor *peekCursor, meta *sqsQueueMeta, startPartition uint32) (*peekCursor, error) { + if cursor == nil { + out := &peekCursor{ + V: peekCursorSchemaV1, + Generation: meta.Generation, + } + if meta.PartitionCount > 1 { + out.StartPartition = startPartition + out.Partition = startPartition + } + return out, nil + } + if cursor.Generation != meta.Generation { + return nil, errors.Wrap(ErrAdminSQSValidation, + "admin peek: cursor is from a prior generation; restart from the front") + } + return cursor, nil +} + +// walkPeek pages the visibility index, accumulating up to limit rows +// across partitions for partitioned queues (rotated sequential scan) +// or across the single keyspace for non-partitioned queues. Returns +// the rows and the cursor to pass back on the next call (nil when the +// walk has fully completed). +func (s *SQSServer) walkPeek( + ctx context.Context, + queueName string, + meta *sqsQueueMeta, + readTS uint64, + cursor *peekCursor, + limit int, + bodyMaxBytes int, +) ([]AdminPeekedMessage, *peekCursor, error) { + now := time.Now().UnixMilli() + rows := make([]AdminPeekedMessage, 0, limit) + for { + next, exhausted, err := s.walkPeekPartition(ctx, queueName, meta, readTS, now, cursor.Partition, cursor.LastKey, limit-len(rows), bodyMaxBytes, &rows) + if err != nil { + return nil, nil, err + } + if len(rows) >= limit { + cursor.LastKey = next + return rows, cursor, nil + } + if !exhausted { + cursor.LastKey = next + return rows, cursor, nil + } + // Partition exhausted before Limit reached. + if meta.PartitionCount <= 1 { + return rows, nil, nil + } + nextPart := (cursor.Partition + 1) % meta.PartitionCount + if nextPart == cursor.StartPartition { + return rows, nil, nil + } + cursor.Partition = nextPart + cursor.LastKey = nil + } +} + +// walkPeekPartition scans the visibility index for one partition +// (legacy queues invoke this with partition=0 and the legacy +// keyspace). Returns the next cursor key (for resume), whether the +// partition was scanned to completion (`exhausted=true` means no +// further pages), and any storage error. Appends to *rows. +func (s *SQSServer) walkPeekPartition( + ctx context.Context, + queueName string, + meta *sqsQueueMeta, + readTS uint64, + nowMillis int64, + partition uint32, + lastKey []byte, + want int, + bodyMaxBytes int, + rows *[]AdminPeekedMessage, +) ([]byte, bool, error) { + if want <= 0 { + return lastKey, false, nil + } + start, end := sqsMsgVisScanBoundsDispatch(meta, queueName, partition, meta.Generation, nowMillis) + if len(lastKey) > 0 { + start = nextScanCursorAfter(lastKey) + if end != nil && bytes.Compare(start, end) > 0 { + return lastKey, true, nil + } + } + page, err := s.store.ScanAt(ctx, start, end, want, readTS) + if err != nil { + return nil, false, errors.WithStack(err) + } + if len(page) == 0 { + return lastKey, true, nil + } + for _, kvp := range page { + messageID := string(kvp.Value) + rec, ok, err := s.loadPeekMessageRecord(ctx, meta, queueName, partition, messageID, readTS) + if err != nil { + return nil, false, err + } + if !ok { + // The vis-index entry points at a data record that + // has since been deleted (e.g. between our index + // scan and the per-record GetAt). Skip rather than + // fail: a single tombstoned record is not a peek + // failure — the SPA gets a shorter page, which is + // a benign outcome. + continue + } + *rows = append(*rows, projectPeekedMessage(rec, bodyMaxBytes)) + } + lastKey = bytes.Clone(page[len(page)-1].Key) + exhausted := len(page) < want + return lastKey, exhausted, nil +} + +// loadPeekMessageRecord fetches the message record by ID at the +// caller's MVCC snapshot. Returns (nil, false, nil) when the record +// is absent — the caller treats this as a benign skip (the vis-index +// entry may point at a data record that was deleted between the +// index scan and the per-record GetAt; surfacing the absence as an +// error would fail the whole peek call over a single tombstoned row). +// +// The data-record key is partition-aware via sqsMsgDataKeyDispatch. +// Partitioned FIFO queues store the data record under the same +// partition the vis-index entry was found under; using the legacy +// sqsMsgDataKey here would silently miss every row on partitioned +// queues. +func (s *SQSServer) loadPeekMessageRecord(ctx context.Context, meta *sqsQueueMeta, queueName string, partition uint32, messageID string, readTS uint64) (*sqsMessageRecord, bool, error) { + dataKey := sqsMsgDataKeyDispatch(meta, queueName, partition, meta.Generation, messageID) + b, err := s.store.GetAt(ctx, dataKey, readTS) + if err != nil { + if errors.Is(err, store.ErrKeyNotFound) { + return nil, false, nil + } + return nil, false, errors.WithStack(err) + } + if len(b) == 0 { + return nil, false, nil + } + rec, err := decodeSQSMessageRecord(b) + if err != nil { + return nil, false, errors.WithStack(err) + } + return rec, true, nil +} + +// projectPeekedMessage maps the stored record into the wire-side +// AdminPeekedMessage, truncating the body per bodyMaxBytes and +// converting the typed attribute map. +func projectPeekedMessage(rec *sqsMessageRecord, bodyMaxBytes int) AdminPeekedMessage { + body, truncated, originalSize := truncatePeekBody(rec.Body, bodyMaxBytes) + return AdminPeekedMessage{ + MessageID: rec.MessageID, + Body: body, + BodyTruncated: truncated, + BodyOriginalSize: originalSize, + SentTimestamp: time.UnixMilli(rec.SendTimestampMillis).UTC(), + ReceiveCount: clampReceiveCountToInt32(rec.ReceiveCount), + GroupID: rec.MessageGroupId, + DeduplicationID: rec.MessageDeduplicationId, + Attributes: projectPeekedAttributes(rec.MessageAttributes), + } +} + +// clampReceiveCountToInt32 converts the stored int64 ReceiveCount to +// the wire-side int32, saturating at math.MaxInt32 rather than +// overflowing. AWS publishes ApproximateReceiveCount as a uint32 on +// the SQS API, so int32 is sufficient for every realistic queue; +// saturation is the safe fallback if a future bug or corrupted +// record somehow pushes the counter past 2 billion. +func clampReceiveCountToInt32(n int64) int32 { + const maxInt32 = int32(1<<31 - 1) + if n < 0 { + return 0 + } + if n > int64(maxInt32) { + return maxInt32 + } + return int32(n) +} + +// truncatePeekBody returns the wire-side body (truncated to +// bodyMaxBytes), the truncated flag, and the original size in bytes. +func truncatePeekBody(body []byte, bodyMaxBytes int) (string, bool, int64) { + originalSize := int64(len(body)) + if int(originalSize) <= bodyMaxBytes { + return string(body), false, originalSize + } + return string(body[:bodyMaxBytes]), true, originalSize +} + +// projectPeekedAttributes maps the stored sqsMessageAttributeValue map +// into the wire-side AdminPeekedAttribute map. Returns nil when the +// stored map is nil so the JSON encoder's omitempty drops the field +// rather than emitting "attributes":{}. The two structs share the +// same field shape, so the conversion is a direct type cast. +func projectPeekedAttributes(attrs map[string]sqsMessageAttributeValue) map[string]AdminPeekedAttribute { + if len(attrs) == 0 { + return nil + } + out := make(map[string]AdminPeekedAttribute, len(attrs)) + for name, v := range attrs { + out[name] = AdminPeekedAttribute(v) + } + return out +} diff --git a/adapter/sqs_admin_peek_test.go b/adapter/sqs_admin_peek_test.go new file mode 100644 index 00000000..2b63415d --- /dev/null +++ b/adapter/sqs_admin_peek_test.go @@ -0,0 +1,832 @@ +package adapter + +import ( + "context" + "encoding/base64" + "net/http" + "sort" + "strconv" + "strings" + "testing" + "time" + + "github.com/cockroachdb/errors" + json "github.com/goccy/go-json" +) + +// TestAdminPeekQueue_HappyPath sends three messages into a standard +// queue and confirms AdminPeekQueue surfaces all three with their +// bodies populated. Pins the basic wiring: meta load, vis-index scan, +// data-key follow-up read, body projection. +func TestAdminPeekQueue_HappyPath(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + node := sqsLeaderNode(t, nodes) + + queueURL := createSQSQueueForTest(t, node, "peek-basic") + for i := range 3 { + status, out := callSQS(t, node, sqsSendMessageTarget, map[string]any{ + "QueueUrl": queueURL, + "MessageBody": "body-" + strconv.Itoa(i), + }) + if status != http.StatusOK { + t.Fatalf("send #%d: %d %v", i, status, out) + } + } + + rows, nextCursor, err := node.sqsServer.AdminPeekQueue(context.Background(), fullAdminPrincipal, "peek-basic", AdminPeekMessageOptions{}) + if err != nil { + t.Fatalf("AdminPeekQueue: %v", err) + } + if len(rows) != 3 { + t.Fatalf("rows=%d want 3; rows=%+v", len(rows), rows) + } + if nextCursor != "" { + t.Fatalf("nextCursor=%q want empty (queue drained in one page)", nextCursor) + } + for i, row := range rows { + assertPeekRowMatchesIndexedBody(t, i, row, "body-") + } +} + +// assertPeekRowMatchesIndexedBody pins the per-row invariants the +// happy path expects: the body matches bodyPrefix+i, no truncation +// fired (the bodies are short), MessageID is populated, and +// SentTimestamp is non-zero. Pulled into a helper so +// TestAdminPeekQueue_HappyPath stays under the cyclop budget. +func assertPeekRowMatchesIndexedBody(t *testing.T, idx int, row AdminPeekedMessage, bodyPrefix string) { + t.Helper() + want := bodyPrefix + strconv.Itoa(idx) + if row.Body != want { + t.Fatalf("rows[%d].Body=%q want %q", idx, row.Body, want) + } + if row.BodyTruncated { + t.Fatalf("rows[%d].BodyTruncated=true; want false for %d-byte body", idx, len(row.Body)) + } + if row.MessageID == "" { + t.Fatalf("rows[%d].MessageID empty", idx) + } + if row.SentTimestamp.IsZero() { + t.Fatalf("rows[%d].SentTimestamp zero", idx) + } +} + +// TestAdminPeekQueue_DoesNotChangeReceiveCount sends a message, peeks +// twice, then ReceiveMessage — the receive call must see +// ApproximateReceiveCount=1 (not 3). Pins the design's "peek is a +// pure read" invariant: the visibility timer is not started and no +// receipt-handle record is committed. +func TestAdminPeekQueue_DoesNotChangeReceiveCount(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + node := sqsLeaderNode(t, nodes) + + queueURL := createSQSQueueForTest(t, node, "peek-no-bump") + status, _ := callSQS(t, node, sqsSendMessageTarget, map[string]any{ + "QueueUrl": queueURL, + "MessageBody": "peek-me", + }) + if status != http.StatusOK { + t.Fatalf("send: %d", status) + } + + for i := range 2 { + rows, _, err := node.sqsServer.AdminPeekQueue(context.Background(), fullAdminPrincipal, "peek-no-bump", AdminPeekMessageOptions{}) + if err != nil { + t.Fatalf("peek #%d: %v", i, err) + } + if len(rows) != 1 || rows[0].ReceiveCount != 0 { + t.Fatalf("peek #%d: rows=%+v want 1 row with ReceiveCount=0", i, rows) + } + } + + status, out := callSQS(t, node, sqsReceiveMessageTarget, map[string]any{ + "QueueUrl": queueURL, + "MaxNumberOfMessages": 1, + "AttributeNames": []string{"All"}, + }) + if status != http.StatusOK { + t.Fatalf("receive: %d %v", status, out) + } + msgs, _ := out["Messages"].([]any) + if len(msgs) != 1 { + t.Fatalf("receive: msgs=%d want 1", len(msgs)) + } + m, _ := msgs[0].(map[string]any) + attrs, _ := m["Attributes"].(map[string]any) + if got, _ := attrs["ApproximateReceiveCount"].(string); got != "1" { + t.Fatalf("ApproximateReceiveCount=%q want \"1\"; peek must not bump it (attrs=%v)", got, attrs) + } +} + +// TestAdminPeekQueue_BodyTruncation sends a long body and peeks with a +// small BodyMaxBytes. The returned Body is exactly BodyMaxBytes +// long; BodyTruncated is true; BodyOriginalSize records the full +// length. +func TestAdminPeekQueue_BodyTruncation(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + node := sqsLeaderNode(t, nodes) + + queueURL := createSQSQueueForTest(t, node, "peek-trunc") + body := strings.Repeat("x", 4096) + status, _ := callSQS(t, node, sqsSendMessageTarget, map[string]any{ + "QueueUrl": queueURL, + "MessageBody": body, + }) + if status != http.StatusOK { + t.Fatalf("send: %d", status) + } + + rows, _, err := node.sqsServer.AdminPeekQueue(context.Background(), fullAdminPrincipal, "peek-trunc", AdminPeekMessageOptions{BodyMaxBytes: 256}) + if err != nil { + t.Fatalf("peek: %v", err) + } + if len(rows) != 1 { + t.Fatalf("rows=%d want 1", len(rows)) + } + if len(rows[0].Body) != 256 { + t.Fatalf("len(Body)=%d want 256", len(rows[0].Body)) + } + if !rows[0].BodyTruncated { + t.Fatalf("BodyTruncated=false want true") + } + if rows[0].BodyOriginalSize != int64(len(body)) { + t.Fatalf("BodyOriginalSize=%d want %d", rows[0].BodyOriginalSize, len(body)) + } +} + +// TestAdminPeekQueue_LimitClamping pins the [1, adminPeekMaxLimit] +// clamp + default-on-zero behaviour. With more rows than the cap in +// the queue, opts.Limit=500 must return adminPeekMaxLimit rows; +// opts.Limit=0 must return adminPeekDefaultLimit. +func TestAdminPeekQueue_LimitClamping(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + node := sqsLeaderNode(t, nodes) + + queueURL := createSQSQueueForTest(t, node, "peek-limit") + const sent = 150 + for i := range sent { + status, _ := callSQS(t, node, sqsSendMessageTarget, map[string]any{ + "QueueUrl": queueURL, + "MessageBody": "row-" + strconv.Itoa(i), + }) + if status != http.StatusOK { + t.Fatalf("send #%d: %d", i, status) + } + } + + rows, _, err := node.sqsServer.AdminPeekQueue(context.Background(), fullAdminPrincipal, "peek-limit", AdminPeekMessageOptions{}) + if err != nil { + t.Fatalf("default-limit peek: %v", err) + } + if len(rows) != adminPeekDefaultLimit { + t.Fatalf("default-limit peek: rows=%d want %d", len(rows), adminPeekDefaultLimit) + } + + rows, _, err = node.sqsServer.AdminPeekQueue(context.Background(), fullAdminPrincipal, "peek-limit", AdminPeekMessageOptions{Limit: 500}) + if err != nil { + t.Fatalf("over-limit peek: %v", err) + } + if len(rows) != adminPeekMaxLimit { + t.Fatalf("over-limit peek: rows=%d want %d (clamped)", len(rows), adminPeekMaxLimit) + } +} + +// TestAdminPeekQueue_CursorRoundTrip drains a queue across two +// pages: peek returns N rows + nextCursor; peek-with-cursor returns +// the remaining rows; their union is the full queue with no +// overlaps and the final nextCursor is empty. +func TestAdminPeekQueue_CursorRoundTrip(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + node := sqsLeaderNode(t, nodes) + + queueURL := createSQSQueueForTest(t, node, "peek-cursor") + const sent = 7 + for i := range sent { + status, _ := callSQS(t, node, sqsSendMessageTarget, map[string]any{ + "QueueUrl": queueURL, + "MessageBody": "row-" + strconv.Itoa(i), + }) + if status != http.StatusOK { + t.Fatalf("send #%d: %d", i, status) + } + } + + ctx := context.Background() + pageA, cursor := assertPeekPage(t, ctx, node, "peek-cursor", "", 4, 4, true) + pageB, cursor := assertPeekPage(t, ctx, node, "peek-cursor", cursor, 10, sent-4, false) + if cursor != "" { + t.Fatalf("final cursor=%q want empty", cursor) + } + + seen := make(map[string]int, sent) + for _, row := range append(pageA, pageB...) { + seen[row.MessageID]++ + } + if len(seen) != sent { + t.Fatalf("unique rows=%d want %d (overlap=%v)", len(seen), sent, seen) + } +} + +// assertPeekPage runs one peek call and pins (a) no error, (b) the +// expected row count, (c) whether a non-empty nextCursor is required. +// Pulled into a helper so TestAdminPeekQueue_CursorRoundTrip stays +// under the cyclop budget. +func assertPeekPage(t *testing.T, ctx context.Context, node Node, queue, cursorIn string, limit, wantRows int, wantCursor bool) ([]AdminPeekedMessage, string) { + t.Helper() + rows, cursorOut, err := node.sqsServer.AdminPeekQueue(ctx, fullAdminPrincipal, queue, AdminPeekMessageOptions{Limit: limit, Cursor: cursorIn}) + if err != nil { + t.Fatalf("peek (cursor=%q): %v", cursorIn, err) + } + if len(rows) != wantRows { + t.Fatalf("peek (cursor=%q) rows=%d want %d", cursorIn, len(rows), wantRows) + } + if wantCursor && cursorOut == "" { + t.Fatalf("peek (cursor=%q) returned empty cursor; expected continuation", cursorIn) + } + return rows, cursorOut +} + +// TestAdminPeekQueue_StaleGenerationCursor reproduces the failure +// mode the design doc §3.1 requires the cursor codec to catch: +// peek, then purge, then peek with the now-stale cursor → the +// adapter must reject with ErrAdminSQSValidation so the SPA refreshes +// from the front rather than returning rows from a purged generation. +func TestAdminPeekQueue_StaleGenerationCursor(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + node := sqsLeaderNode(t, nodes) + + queueURL := createSQSQueueForTest(t, node, "peek-stale") + for i := range 5 { + _, _ = callSQS(t, node, sqsSendMessageTarget, map[string]any{ + "QueueUrl": queueURL, + "MessageBody": "row-" + strconv.Itoa(i), + }) + } + + ctx := context.Background() + _, cursor, err := node.sqsServer.AdminPeekQueue(ctx, fullAdminPrincipal, "peek-stale", AdminPeekMessageOptions{Limit: 2}) + if err != nil { + t.Fatalf("initial peek: %v", err) + } + if cursor == "" { + t.Fatalf("cursor empty after partial page") + } + + if _, err := node.sqsServer.AdminPurgeQueue(ctx, fullAdminPrincipal, "peek-stale"); err != nil { + t.Fatalf("purge: %v", err) + } + + _, _, err = node.sqsServer.AdminPeekQueue(ctx, fullAdminPrincipal, "peek-stale", AdminPeekMessageOptions{Cursor: cursor}) + if !errors.Is(err, ErrAdminSQSValidation) { + t.Fatalf("stale-gen peek: want ErrAdminSQSValidation, got %v", err) + } +} + +// TestAdminPeekQueue_CursorMalformed pins each of the documented +// wire-shape failures (oversize, bad base64, bad JSON, unsupported +// version) onto ErrAdminSQSValidation rather than leaking the raw +// codec error. +func TestAdminPeekQueue_CursorMalformed(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + node := sqsLeaderNode(t, nodes) + _ = createSQSQueueForTest(t, node, "peek-cursor-bad") + + cases := map[string]string{ + "oversize": strings.Repeat("A", adminPeekCursorMaxBytes+1), + "not-base64": "!!!not_valid_base64!!!", + "not-json": base64.RawURLEncoding.EncodeToString([]byte("not-json")), + "wrong-version": base64.RawURLEncoding.EncodeToString([]byte(`{"v":42,"gen":1}`)), + } + for name, cursor := range cases { + t.Run(name, func(t *testing.T) { + _, _, err := node.sqsServer.AdminPeekQueue(context.Background(), fullAdminPrincipal, "peek-cursor-bad", AdminPeekMessageOptions{Cursor: cursor}) + if !errors.Is(err, ErrAdminSQSValidation) { + t.Fatalf("%s: want ErrAdminSQSValidation, got %v", name, err) + } + }) + } +} + +// TestAdminPeekQueue_ReadOnlyPrincipalAllowed confirms the canRead() +// gate accepts AdminRoleReadOnly. Peek is divergent from +// AdminPurgeQueue's canWrite() gate; this test pins the divergence so +// a future reviewer does not "fix" the inconsistency. +func TestAdminPeekQueue_ReadOnlyPrincipalAllowed(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + node := sqsLeaderNode(t, nodes) + queueURL := createSQSQueueForTest(t, node, "peek-readonly") + _, _ = callSQS(t, node, sqsSendMessageTarget, map[string]any{"QueueUrl": queueURL, "MessageBody": "x"}) + + rows, _, err := node.sqsServer.AdminPeekQueue(context.Background(), readOnlyAdminPrincipal, "peek-readonly", AdminPeekMessageOptions{}) + if err != nil { + t.Fatalf("read-only peek: %v", err) + } + if len(rows) != 1 { + t.Fatalf("read-only peek rows=%d want 1", len(rows)) + } +} + +// TestAdminPeekQueue_RoleLessForbidden pins the gate's lower bound: +// the zero AdminPrincipal (no role set) is denied. +func TestAdminPeekQueue_RoleLessForbidden(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + node := sqsLeaderNode(t, nodes) + _ = createSQSQueueForTest(t, node, "peek-noauth") + + _, _, err := node.sqsServer.AdminPeekQueue(context.Background(), AdminPrincipal{}, "peek-noauth", AdminPeekMessageOptions{}) + if !errors.Is(err, ErrAdminForbidden) { + t.Fatalf("role-less peek: want ErrAdminForbidden, got %v", err) + } +} + +// TestAdminPeekQueue_MissingQueue maps to the structured 404 sentinel. +func TestAdminPeekQueue_MissingQueue(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + node := sqsLeaderNode(t, nodes) + + _, _, err := node.sqsServer.AdminPeekQueue(context.Background(), fullAdminPrincipal, "no-such-queue", AdminPeekMessageOptions{}) + if !errors.Is(err, ErrAdminSQSNotFound) { + t.Fatalf("missing peek: want ErrAdminSQSNotFound, got %v", err) + } +} + +// TestAdminPeekQueue_EmptyName pins the up-front guard. +func TestAdminPeekQueue_EmptyName(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + node := sqsLeaderNode(t, nodes) + + for _, name := range []string{"", " "} { + _, _, err := node.sqsServer.AdminPeekQueue(context.Background(), fullAdminPrincipal, name, AdminPeekMessageOptions{}) + if !errors.Is(err, ErrAdminSQSValidation) { + t.Fatalf("name=%q: want ErrAdminSQSValidation, got %v", name, err) + } + } +} + +// TestAdminPeekQueue_DelayedMessageHidden confirms the visible_at <= +// now filter: a message with DelaySeconds > 0 does NOT appear in +// peek until its delay has elapsed. Matches receiveMessage's +// behaviour and pins the design's "currently-visible only" contract. +func TestAdminPeekQueue_DelayedMessageHidden(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + node := sqsLeaderNode(t, nodes) + + queueURL := createSQSQueueForTest(t, node, "peek-delayed") + status, _ := callSQS(t, node, sqsSendMessageTarget, map[string]any{ + "QueueUrl": queueURL, + "MessageBody": "future", + "DelaySeconds": 300, + }) + if status != http.StatusOK { + t.Fatalf("send: %d", status) + } + + rows, _, err := node.sqsServer.AdminPeekQueue(context.Background(), fullAdminPrincipal, "peek-delayed", AdminPeekMessageOptions{}) + if err != nil { + t.Fatalf("peek: %v", err) + } + if len(rows) != 0 { + t.Fatalf("rows=%d want 0 (message is delayed 5min)", len(rows)) + } +} + +// TestAdminPeekQueue_FIFO_AttributesPopulated sends to a FIFO queue +// with GroupID + DeduplicationID and confirms the peek row carries +// both. Pins the field projection. +func TestAdminPeekQueue_FIFO_AttributesPopulated(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + node := sqsLeaderNode(t, nodes) + + const name = "peek-fifo.fifo" + status, out := callSQS(t, node, sqsCreateQueueTarget, map[string]any{ + "QueueName": name, + "Attributes": map[string]string{"FifoQueue": "true"}, + }) + if status != http.StatusOK { + t.Fatalf("create fifo: %d %v", status, out) + } + queueURL, _ := out["QueueUrl"].(string) + + status, _ = callSQS(t, node, sqsSendMessageTarget, map[string]any{ + "QueueUrl": queueURL, + "MessageBody": "fifo-body", + "MessageGroupId": "tenant-7", + "MessageDeduplicationId": "dedup-1", + }) + if status != http.StatusOK { + t.Fatalf("send fifo: %d", status) + } + + rows, _, err := node.sqsServer.AdminPeekQueue(context.Background(), fullAdminPrincipal, name, AdminPeekMessageOptions{}) + if err != nil { + t.Fatalf("peek fifo: %v", err) + } + if len(rows) != 1 { + t.Fatalf("rows=%d want 1", len(rows)) + } + if rows[0].GroupID != "tenant-7" { + t.Fatalf("GroupID=%q want \"tenant-7\"", rows[0].GroupID) + } + if rows[0].DeduplicationID != "dedup-1" { + t.Fatalf("DeduplicationID=%q want \"dedup-1\"", rows[0].DeduplicationID) + } +} + +// TestAdminPeekQueue_AttributesProjected confirms typed +// MessageAttribute round-tripping: a String + a Binary attribute on +// the SendMessage call surface on the peek row with DataType / +// StringValue / BinaryValue preserved. +func TestAdminPeekQueue_AttributesProjected(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + node := sqsLeaderNode(t, nodes) + + queueURL := createSQSQueueForTest(t, node, "peek-attrs") + binaryBytes := []byte{0xDE, 0xAD, 0xBE, 0xEF} + status, _ := callSQS(t, node, sqsSendMessageTarget, map[string]any{ + "QueueUrl": queueURL, + "MessageBody": "x", + "MessageAttributes": map[string]any{ + "Source": map[string]any{"DataType": "String", "StringValue": "checkout"}, + "Blob": map[string]any{"DataType": "Binary", "BinaryValue": binaryBytes}, + }, + }) + if status != http.StatusOK { + t.Fatalf("send: %d", status) + } + + rows, _, err := node.sqsServer.AdminPeekQueue(context.Background(), fullAdminPrincipal, "peek-attrs", AdminPeekMessageOptions{}) + if err != nil { + t.Fatalf("peek: %v", err) + } + if len(rows) != 1 { + t.Fatalf("rows=%d want 1", len(rows)) + } + src, ok := rows[0].Attributes["Source"] + if !ok || src.DataType != "String" || src.StringValue != "checkout" { + t.Fatalf("Source attr=%+v want {String, checkout}", src) + } + blob, ok := rows[0].Attributes["Blob"] + if !ok || blob.DataType != "Binary" || string(blob.BinaryValue) != string(binaryBytes) { + t.Fatalf("Blob attr=%+v want {Binary, %x}", blob, binaryBytes) + } +} + +// TestAdminPeekQueue_PartitionedFIFO_Pagination installs a +// PartitionCount=4 FIFO queue, sends one message per group spread +// across all 4 partitions, then walks the peek cursor across pages +// (Limit=2 per page) until exhausted. Every sent message must +// surface exactly once across the union of pages, regardless of the +// rotated start partition. Pins the design's "rotated sequential +// scanning" contract. +func TestAdminPeekQueue_PartitionedFIFO_Pagination(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + node := sqsLeaderNode(t, nodes) + + const name = "peek-part.fifo" + status, out := callSQS(t, node, sqsCreateQueueTarget, map[string]any{ + "QueueName": name, + "Attributes": map[string]string{"FifoQueue": "true"}, + }) + if status != http.StatusOK { + t.Fatalf("create: %d %v", status, out) + } + queueURL, _ := out["QueueUrl"].(string) + installPartitionedMetaForTest(t, node, name, 4, htfifoThroughputPerMessageGroupID) + + groups := []string{"alpha", "beta", "gamma", "delta", "epsilon", "zeta", "eta", "theta"} + sent := sendFIFOMessagesForPeek(t, node, queueURL, groups) + + ctx := context.Background() + seen := walkPeekUntilDone(t, ctx, node, name, 2) + assertEveryMessageSeenOnce(t, sent, seen) +} + +// sendFIFOMessagesForPeek sends one message per group to a partitioned +// FIFO queue and returns the resulting set of message IDs. Pulled into +// a helper so TestAdminPeekQueue_PartitionedFIFO_Pagination stays +// under the cyclop budget. +func sendFIFOMessagesForPeek(t *testing.T, node Node, queueURL string, groups []string) map[string]struct{} { + t.Helper() + sent := make(map[string]struct{}, len(groups)) + for _, g := range groups { + status, out := callSQS(t, node, sqsSendMessageTarget, map[string]any{ + "QueueUrl": queueURL, + "MessageBody": "body-" + g, + "MessageGroupId": g, + "MessageDeduplicationId": "dedup-" + g, + }) + if status != http.StatusOK { + t.Fatalf("send %s: %d %v", g, status, out) + } + msgID, _ := out["MessageId"].(string) + if msgID == "" { + t.Fatalf("send %s: empty MessageId", g) + } + sent[msgID] = struct{}{} + } + return sent +} + +// walkPeekUntilDone pages through every peek result on a queue with +// the given per-page limit, returning a multi-set of how many times +// each MessageID was observed. Loop bound (32) is a safety net so a +// pagination bug terminates rather than runs forever; callers assert +// "saw every message exactly once" against this output. +func walkPeekUntilDone(t *testing.T, ctx context.Context, node Node, queue string, limit int) map[string]int { + t.Helper() + seen := make(map[string]int) + cursor := "" + for page := 0; page < 32; page++ { + rows, next, err := node.sqsServer.AdminPeekQueue(ctx, fullAdminPrincipal, queue, AdminPeekMessageOptions{Limit: limit, Cursor: cursor}) + if err != nil { + t.Fatalf("peek page %d: %v", page, err) + } + for _, row := range rows { + seen[row.MessageID]++ + } + if next == "" { + return seen + } + cursor = next + } + t.Fatalf("peek did not terminate after 32 pages; seen=%v", seen) + return seen +} + +// assertEveryMessageSeenOnce pins the multi-set invariant the +// partitioned-FIFO pagination test cares about: every sent message +// surfaced in exactly one peek page, none missed and none duplicated. +func assertEveryMessageSeenOnce(t *testing.T, sent map[string]struct{}, seen map[string]int) { + t.Helper() + if len(seen) != len(sent) { + want := make([]string, 0, len(sent)) + for id := range sent { + want = append(want, id) + } + sort.Strings(want) + got := make([]string, 0, len(seen)) + for id, count := range seen { + got = append(got, id+"="+strconv.Itoa(count)) + } + sort.Strings(got) + t.Fatalf("seen=%v want %d unique messages (sent=%v)", got, len(sent), want) + } + for id, count := range seen { + if count != 1 { + t.Fatalf("message %q seen %d times (want exactly 1)", id, count) + } + } +} + +// TestPeekCursorCodec_RoundTrip pins the encode/decode contract at +// the unit level so partition / start-partition / last-key all +// survive the base64url + JSON envelope. +func TestPeekCursorCodec_RoundTrip(t *testing.T) { + t.Parallel() + in := &peekCursor{ + V: peekCursorSchemaV1, + Generation: 42, + StartPartition: 3, + Partition: 5, + LastKey: []byte("some-last-key"), + } + encoded, err := encodePeekCursor(in) + if err != nil { + t.Fatalf("encode: %v", err) + } + if encoded == "" { + t.Fatalf("encoded cursor empty") + } + out, err := decodePeekCursor(encoded) + if err != nil { + t.Fatalf("decode: %v", err) + } + if out == nil { + t.Fatalf("decode returned nil") + } + if out.V != in.V || out.Generation != in.Generation || + out.StartPartition != in.StartPartition || out.Partition != in.Partition || + string(out.LastKey) != string(in.LastKey) { + t.Fatalf("round-trip mismatch: in=%+v out=%+v", in, out) + } +} + +// TestPeekCursorCodec_EmptyRoundTrip confirms the (nil, "") sentinel +// behaviour for "no cursor / start from front". +func TestPeekCursorCodec_EmptyRoundTrip(t *testing.T) { + t.Parallel() + encoded, err := encodePeekCursor(nil) + if err != nil { + t.Fatalf("encode(nil): %v", err) + } + if encoded != "" { + t.Fatalf("encode(nil)=%q want empty", encoded) + } + out, err := decodePeekCursor("") + if err != nil { + t.Fatalf("decode(\"\"): %v", err) + } + if out != nil { + t.Fatalf("decode(\"\")=%+v want nil", out) + } +} + +// TestPeekCursorCodec_LengthCap confirms the 512-byte hard cap by +// constructing a deliberately oversized cursor on the encode side +// (the design doc's "hard-capped at 512 bytes after encoding" rule). +// The encode path treats this as an internal regression rather than +// a client-side failure, so the test asserts an error rather than a +// truncated cursor. +func TestPeekCursorCodec_LengthCap(t *testing.T) { + t.Parallel() + bigKey := make([]byte, adminPeekCursorMaxBytes) + for i := range bigKey { + bigKey[i] = 'A' + } + _, err := encodePeekCursor(&peekCursor{V: peekCursorSchemaV1, LastKey: bigKey}) + if err == nil { + t.Fatalf("encode of oversized cursor must error; got nil") + } +} + +// TestClampPeekLimit pins each branch of the clamp function. +func TestClampPeekLimit(t *testing.T) { + t.Parallel() + cases := map[int]int{ + 0: adminPeekDefaultLimit, + -5: adminPeekDefaultLimit, + 1: 1, + adminPeekDefaultLimit: adminPeekDefaultLimit, + adminPeekMaxLimit: adminPeekMaxLimit, + adminPeekMaxLimit + 1: adminPeekMaxLimit, + 1_000_000: adminPeekMaxLimit, + } + for in, want := range cases { + if got := clampPeekLimit(in); got != want { + t.Fatalf("clampPeekLimit(%d)=%d want %d", in, got, want) + } + } +} + +// TestClampPeekBodyBytes pins each branch of the body-size clamp. +func TestClampPeekBodyBytes(t *testing.T) { + t.Parallel() + cases := map[int]int{ + 0: adminPeekDefaultBodyBytes, + -5: adminPeekDefaultBodyBytes, + 1: adminPeekMinBodyBytes, + adminPeekMinBodyBytes - 1: adminPeekMinBodyBytes, + adminPeekMinBodyBytes: adminPeekMinBodyBytes, + adminPeekDefaultBodyBytes: adminPeekDefaultBodyBytes, + sqsMaximumAllowedMaximumMessageSize: sqsMaximumAllowedMaximumMessageSize, + sqsMaximumAllowedMaximumMessageSize + 1: sqsMaximumAllowedMaximumMessageSize, + 2 << 20: sqsMaximumAllowedMaximumMessageSize, + } + for in, want := range cases { + if got := clampPeekBodyBytes(in); got != want { + t.Fatalf("clampPeekBodyBytes(%d)=%d want %d", in, got, want) + } + } +} + +// TestPreparePeekCursor_FreshCursor confirms the first-call cursor +// stamps Generation and (for partitioned queues) StartPartition. +func TestPreparePeekCursor_FreshCursor(t *testing.T) { + t.Parallel() + + t.Run("non-partitioned", func(t *testing.T) { + t.Parallel() + meta := &sqsQueueMeta{Generation: 7} + out, err := preparePeekCursor(nil, meta, 0) + if err != nil { + t.Fatalf("err: %v", err) + } + if out.V != peekCursorSchemaV1 || out.Generation != 7 || out.StartPartition != 0 || out.Partition != 0 { + t.Fatalf("cursor=%+v want {V:1 Gen:7 SP:0 P:0}", out) + } + }) + + t.Run("partitioned", func(t *testing.T) { + t.Parallel() + meta := &sqsQueueMeta{Generation: 11, PartitionCount: 4} + out, err := preparePeekCursor(nil, meta, 2) + if err != nil { + t.Fatalf("err: %v", err) + } + if out.StartPartition != 2 || out.Partition != 2 { + t.Fatalf("cursor=%+v want StartPartition=Partition=2", out) + } + }) +} + +// TestPreparePeekCursor_GenerationMismatch confirms a cursor from a +// prior generation is rejected as ErrAdminSQSValidation. +func TestPreparePeekCursor_GenerationMismatch(t *testing.T) { + t.Parallel() + stale := &peekCursor{V: peekCursorSchemaV1, Generation: 3} + meta := &sqsQueueMeta{Generation: 4} + _, err := preparePeekCursor(stale, meta, 0) + if !errors.Is(err, ErrAdminSQSValidation) { + t.Fatalf("want ErrAdminSQSValidation, got %v", err) + } +} + +// TestEncodePeekCursor_DecodesAsValidJSON spot-checks that the +// base64url envelope unwraps to recognisable JSON keys (so a SPA +// debugging session can inspect the cursor without the wire format +// being a black box). +func TestEncodePeekCursor_DecodesAsValidJSON(t *testing.T) { + t.Parallel() + encoded, err := encodePeekCursor(&peekCursor{V: peekCursorSchemaV1, Generation: 9, LastKey: []byte("k")}) + if err != nil { + t.Fatalf("encode: %v", err) + } + raw, err := base64.RawURLEncoding.DecodeString(encoded) + if err != nil { + t.Fatalf("base64 decode: %v", err) + } + var m map[string]any + if err := json.Unmarshal(raw, &m); err != nil { + t.Fatalf("json unmarshal: %v", err) + } + if _, ok := m["v"]; !ok { + t.Fatalf("decoded JSON missing 'v': %v", m) + } + if _, ok := m["gen"]; !ok { + t.Fatalf("decoded JSON missing 'gen': %v", m) + } +} + +// TestProjectPeekedAttributes_NilSafeAndTyped confirms the +// projection preserves DataType and binary payloads, and that an +// empty input yields nil (so JSON omitempty drops the field). +func TestProjectPeekedAttributes_NilSafeAndTyped(t *testing.T) { + t.Parallel() + + if got := projectPeekedAttributes(nil); got != nil { + t.Fatalf("nil input: got %v want nil", got) + } + if got := projectPeekedAttributes(map[string]sqsMessageAttributeValue{}); got != nil { + t.Fatalf("empty input: got %v want nil", got) + } + + in := map[string]sqsMessageAttributeValue{ + "A": {DataType: "String", StringValue: "foo"}, + "B": {DataType: "Binary", BinaryValue: []byte{1, 2, 3}}, + } + out := projectPeekedAttributes(in) + if out["A"].DataType != "String" || out["A"].StringValue != "foo" { + t.Fatalf("A=%+v want {String, foo}", out["A"]) + } + if out["B"].DataType != "Binary" || string(out["B"].BinaryValue) != string([]byte{1, 2, 3}) { + t.Fatalf("B=%+v want {Binary, [1 2 3]}", out["B"]) + } +} + +// TestAdminRole_CanRead pins the new canRead() gate's truth table. +func TestAdminRole_CanRead(t *testing.T) { + t.Parallel() + cases := map[AdminRole]bool{ + "": false, + AdminRoleReadOnly: true, + AdminRoleFull: true, + "bogus": false, + } + for role, want := range cases { + if got := role.canRead(); got != want { + t.Fatalf("AdminRole(%q).canRead()=%v want %v", role, got, want) + } + } +} + +// silence "imported and not used" when only fixtures are referenced. +var _ = time.Now diff --git a/adapter/sqs_messages.go b/adapter/sqs_messages.go index fbcbe099..368f5819 100644 --- a/adapter/sqs_messages.go +++ b/adapter/sqs_messages.go @@ -1721,6 +1721,8 @@ const ( // the Binary base type; suffix-extended forms ("Binary.gzipped") // share the same type byte. sqsAttributeBaseTypeBinary = "Binary" + sqsAttributeBaseTypeString = "String" + sqsAttributeBaseTypeNumber = "Number" // sqsAttributeTransportByteString applies to String and Number; // sqsAttributeTransportByteBinary applies to Binary. sqsAttributeTransportByteString = byte(0x01) @@ -1845,7 +1847,7 @@ func validateOneMessageAttribute(name string, v sqsMessageAttributeValue) error // function stays under the cyclop budget. func validateMessageAttributeValuePair(name, base string, v sqsMessageAttributeValue) error { switch base { - case "String", "Number": + case sqsAttributeBaseTypeString, sqsAttributeBaseTypeNumber: if v.StringValue == "" { return newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue, "MessageAttribute "+name+" requires StringValue") From 9bfaacfa4f5b351e8f14fdfd137e1c6a13250729 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Wed, 20 May 2026 20:43:12 +0900 Subject: [PATCH 2/5] adapter/sqs: reject out-of-range partition indices in peek cursor Codex r1 P1 on PR #794: walkPeek's partition-rotation termination condition is "(Partition + 1) mod PartitionCount == StartPartition". With a client-supplied cursor whose StartPartition lies outside [0, PartitionCount), the modulo cycle never matches and the call loops ScanAt over guaranteed-empty partitions forever - a request-amplification DoS against the admin endpoint. preparePeekCursor now bounds-checks both StartPartition and Partition against max(meta.PartitionCount, 1) and rejects out-of-range values with ErrAdminSQSValidation. Non-partitioned queues (PartitionCount <= 1) require both fields to be 0. Caller audit: peek cursors are constructed in exactly two places - preparePeekCursor itself (now validated) and walkPeek's partition-advance ((Partition+1) mod PartitionCount), which produces values in [0, PartitionCount) by construction. No other caller. Generation validation already handles the purge-between- pages case separately. Tests: regression test feeds a wire-level cursor with StartPartition=999 / Partition=999 against a PartitionCount=4 queue and asserts ErrAdminSQSValidation under a 5-second deadline so any future loss of the bounds check terminates rather than hanging CI. Four preparePeekCursor unit tests pin the truth table (partitioned out-of-range / non-partitioned non-zero / in-range accepted). --- adapter/sqs_admin_peek.go | 29 +++++++++- adapter/sqs_admin_peek_test.go | 101 +++++++++++++++++++++++++++++++++ 2 files changed, 127 insertions(+), 3 deletions(-) diff --git a/adapter/sqs_admin_peek.go b/adapter/sqs_admin_peek.go index 0ca9e37b..c6e03925 100644 --- a/adapter/sqs_admin_peek.go +++ b/adapter/sqs_admin_peek.go @@ -252,9 +252,19 @@ func (s *SQSServer) AdminPeekQueue( // first page (cursor==nil) it stamps Generation + a rotated // StartPartition (partitioned queues only — non-partitioned start at // 0). On a follow-up page it validates the stored Generation matches -// the queue's current generation; a mismatch returns -// ErrAdminSQSValidation so the SPA refreshes from the front rather -// than silently surfacing messages from a purged generation. +// the queue's current generation AND that StartPartition / Partition +// are within [0, max(PartitionCount, 1)); a mismatch on either +// returns ErrAdminSQSValidation. +// +// Bounds-check rationale: walkPeek terminates the partitioned +// rotation when `(Partition + 1) % PartitionCount == StartPartition`. +// If a client supplies StartPartition outside [0, PartitionCount), +// that termination condition never fires and the call loops +// ScanAt-by-ScanAt over guaranteed-empty partitions forever — a +// request-amplification DoS against the admin endpoint (Codex r1 +// P1 on PR #794). Rejecting bad cursor partition indices up-front +// closes the vector. Generation mismatch separately forces a +// front-of-stream refresh after a purge. // // startPartition is computed by the caller (the SQSServer method's // nextReceiveFanoutStart) so this helper stays method-free for @@ -275,6 +285,19 @@ func preparePeekCursor(cursor *peekCursor, meta *sqsQueueMeta, startPartition ui return nil, errors.Wrap(ErrAdminSQSValidation, "admin peek: cursor is from a prior generation; restart from the front") } + maxPartition := meta.PartitionCount + if maxPartition <= 1 { + // Non-partitioned queues only have partition 0. A non-zero + // Partition or StartPartition would also fail the rotation + // termination check; reject explicitly so the error is the + // documented 400, not a silent O(infty) loop. + maxPartition = 1 + } + if cursor.StartPartition >= maxPartition || cursor.Partition >= maxPartition { + return nil, errors.Wrapf(ErrAdminSQSValidation, + "admin peek: cursor partition index out of range (StartPartition=%d, Partition=%d, max=%d)", + cursor.StartPartition, cursor.Partition, maxPartition) + } return cursor, nil } diff --git a/adapter/sqs_admin_peek_test.go b/adapter/sqs_admin_peek_test.go index 2b63415d..715e22d3 100644 --- a/adapter/sqs_admin_peek_test.go +++ b/adapter/sqs_admin_peek_test.go @@ -760,6 +760,107 @@ func TestPreparePeekCursor_GenerationMismatch(t *testing.T) { } } +// TestPreparePeekCursor_PartitionOutOfRange pins the Codex r1 P1 +// fix: a cursor with StartPartition or Partition outside +// [0, max(PartitionCount, 1)) must be rejected with +// ErrAdminSQSValidation BEFORE the walk runs. Without the bounds +// check, walkPeek's `nextPart == cursor.StartPartition` termination +// condition never fires for an out-of-range StartPartition, looping +// ScanAt forever (request-amplification DoS against the admin +// endpoint). +func TestPreparePeekCursor_PartitionOutOfRange(t *testing.T) { + t.Parallel() + + t.Run("partitioned: StartPartition >= PartitionCount", func(t *testing.T) { + t.Parallel() + meta := &sqsQueueMeta{Generation: 1, PartitionCount: 4} + cursor := &peekCursor{V: peekCursorSchemaV1, Generation: 1, StartPartition: 99, Partition: 0} + _, err := preparePeekCursor(cursor, meta, 0) + if !errors.Is(err, ErrAdminSQSValidation) { + t.Fatalf("StartPartition=99/PC=4: want ErrAdminSQSValidation, got %v", err) + } + }) + + t.Run("partitioned: Partition >= PartitionCount", func(t *testing.T) { + t.Parallel() + meta := &sqsQueueMeta{Generation: 1, PartitionCount: 4} + cursor := &peekCursor{V: peekCursorSchemaV1, Generation: 1, StartPartition: 0, Partition: 99} + _, err := preparePeekCursor(cursor, meta, 0) + if !errors.Is(err, ErrAdminSQSValidation) { + t.Fatalf("Partition=99/PC=4: want ErrAdminSQSValidation, got %v", err) + } + }) + + t.Run("non-partitioned: non-zero StartPartition", func(t *testing.T) { + t.Parallel() + meta := &sqsQueueMeta{Generation: 1, PartitionCount: 0} + cursor := &peekCursor{V: peekCursorSchemaV1, Generation: 1, StartPartition: 1, Partition: 0} + _, err := preparePeekCursor(cursor, meta, 0) + if !errors.Is(err, ErrAdminSQSValidation) { + t.Fatalf("non-partitioned StartPartition=1: want ErrAdminSQSValidation, got %v", err) + } + }) + + t.Run("non-partitioned: non-zero Partition", func(t *testing.T) { + t.Parallel() + meta := &sqsQueueMeta{Generation: 1, PartitionCount: 1} + cursor := &peekCursor{V: peekCursorSchemaV1, Generation: 1, StartPartition: 0, Partition: 5} + _, err := preparePeekCursor(cursor, meta, 0) + if !errors.Is(err, ErrAdminSQSValidation) { + t.Fatalf("non-partitioned Partition=5: want ErrAdminSQSValidation, got %v", err) + } + }) + + t.Run("partitioned: in-range cursor accepted", func(t *testing.T) { + t.Parallel() + meta := &sqsQueueMeta{Generation: 1, PartitionCount: 4} + cursor := &peekCursor{V: peekCursorSchemaV1, Generation: 1, StartPartition: 3, Partition: 2} + out, err := preparePeekCursor(cursor, meta, 0) + if err != nil { + t.Fatalf("in-range cursor: got err %v want nil", err) + } + if out.StartPartition != 3 || out.Partition != 2 { + t.Fatalf("in-range cursor: got %+v want StartPartition=3 Partition=2", out) + } + }) +} + +// TestAdminPeekQueue_HostileCursorBoundedRequest is the end-to-end +// regression: an attacker crafts a wire-level cursor with an +// out-of-range partition and submits it. The call MUST return +// ErrAdminSQSValidation (not loop forever). The test runs with a +// short deadline so a regression terminates the test run rather +// than blocking CI. +func TestAdminPeekQueue_HostileCursorBoundedRequest(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + node := sqsLeaderNode(t, nodes) + + const name = "hostile.fifo" + status, out := callSQS(t, node, sqsCreateQueueTarget, map[string]any{ + "QueueName": name, + "Attributes": map[string]string{"FifoQueue": "true"}, + }) + if status != http.StatusOK { + t.Fatalf("create: %d %v", status, out) + } + installPartitionedMetaForTest(t, node, name, 4, htfifoThroughputPerMessageGroupID) + + // Encode a cursor that names a partition far outside [0, 4). + hostile, err := encodePeekCursor(&peekCursor{V: peekCursorSchemaV1, Generation: 1, StartPartition: 999, Partition: 999}) + if err != nil { + t.Fatalf("encode hostile cursor: %v", err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + _, _, err = node.sqsServer.AdminPeekQueue(ctx, fullAdminPrincipal, name, AdminPeekMessageOptions{Cursor: hostile}) + if !errors.Is(err, ErrAdminSQSValidation) { + t.Fatalf("hostile cursor: want ErrAdminSQSValidation, got %v", err) + } +} + // TestEncodePeekCursor_DecodesAsValidJSON spot-checks that the // base64url envelope unwraps to recognisable JSON keys (so a SPA // debugging session can inspect the cursor without the wire format From 3e6841e0c52bc0fdb2d136abf44b0417816f3957 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Thu, 21 May 2026 04:38:20 +0900 Subject: [PATCH 3/5] adapter/sqs: only advance receive-fanout counter on first peek page Codex r2 P1 on PR #794: AdminPeekQueue called nextReceiveFanoutStart unconditionally - on every continuation page as well as the first. The counter is shared with ReceiveMessage's partition-rotation; bumping it on every peek page perturbs the stride ReceiveMessage reads for fairness. Under sustained backlog with small MaxNumberOfMessages, a fixed peek-to-receive interleave (e.g. PartitionCount=4 + 3 peek pages between receives) re-aligns every receive to the same partition, starving the others. Extract a peekStartPartition helper that returns 0 (no consultation needed) when either (a) the call carries a continuation cursor or (b) the queue is non-partitioned. The fanout counter advances only on the first page of a partitioned-queue peek walk. Caller audit: nextReceiveFanoutStart has two callers - the receive fanout (unchanged, still bumps per receive call) and now peekStartPartition (bumps only on first peek page). preparePeekCursor only USES startPartition when stamping a fresh cursor, so passing 0 on continuations is equivalent to passing the unused fanout value except it does not perturb the shared counter. Tests: TestAdminPeekQueue_ContinuationDoesNotBumpFanoutCounter exercises a partitioned FIFO queue, reads the counter value directly from receiveFanoutCounters after the first page, drives five continuation peek calls, and asserts the counter is unchanged. Regression of this property reintroduces the receive starvation Codex flagged. --- adapter/sqs_admin_peek.go | 22 +++++++++++- adapter/sqs_admin_peek_test.go | 62 ++++++++++++++++++++++++++++++++++ 2 files changed, 83 insertions(+), 1 deletion(-) diff --git a/adapter/sqs_admin_peek.go b/adapter/sqs_admin_peek.go index c6e03925..cb775ba1 100644 --- a/adapter/sqs_admin_peek.go +++ b/adapter/sqs_admin_peek.go @@ -233,7 +233,7 @@ func (s *SQSServer) AdminPeekQueue( if !exists { return nil, "", ErrAdminSQSNotFound } - cursor, err = preparePeekCursor(cursor, meta, s.nextReceiveFanoutStart(name, meta.PartitionCount)) + cursor, err = preparePeekCursor(cursor, meta, s.peekStartPartition(name, cursor, meta)) if err != nil { return nil, "", err } @@ -248,6 +248,26 @@ func (s *SQSServer) AdminPeekQueue( return rows, encoded, nil } +// peekStartPartition returns the starting partition for a fresh peek +// walk, or 0 when a starting partition is not needed (continuation +// pages already carry one in the cursor; non-partitioned queues only +// have partition 0). +// +// The fanout counter is consulted ONLY on the first page of a peek +// walk. nextReceiveFanoutStart increments the per-queue counter on +// every call regardless of whether the returned value is consumed; +// advancing it on every continuation page would perturb the same +// counter ReceiveMessage reads for partition fairness, reintroducing +// fixed-stride aliasing (Codex r2 P1: e.g. PartitionCount=4 with 3 +// peek pages between receives lands every receive on the same +// partition, starving the other three). +func (s *SQSServer) peekStartPartition(queueName string, cursor *peekCursor, meta *sqsQueueMeta) uint32 { + if cursor != nil || meta.PartitionCount <= 1 { + return 0 + } + return s.nextReceiveFanoutStart(queueName, meta.PartitionCount) +} + // preparePeekCursor builds the effective cursor for this call. On the // first page (cursor==nil) it stamps Generation + a rotated // StartPartition (partitioned queues only — non-partitioned start at diff --git a/adapter/sqs_admin_peek_test.go b/adapter/sqs_admin_peek_test.go index 715e22d3..a4cd543a 100644 --- a/adapter/sqs_admin_peek_test.go +++ b/adapter/sqs_admin_peek_test.go @@ -7,6 +7,7 @@ import ( "sort" "strconv" "strings" + "sync/atomic" "testing" "time" @@ -825,6 +826,67 @@ func TestPreparePeekCursor_PartitionOutOfRange(t *testing.T) { }) } +// TestAdminPeekQueue_ContinuationDoesNotBumpFanoutCounter pins Codex +// r2 P1: AdminPeekQueue must NOT advance the shared +// receiveFanoutCounters counter on continuation pages. The counter +// is consumed by ReceiveMessage's partition-rotation; if peek +// pagination bumps it on every page, a sustained backlog with small +// MaxNumberOfMessages can repeatedly start receives at the same +// partition (fixed-stride aliasing — e.g. PartitionCount=4 + 3 +// peek pages between receives lands every receive on partition 0, +// starving 1-3). Only the FIRST page should advance the counter. +func TestAdminPeekQueue_ContinuationDoesNotBumpFanoutCounter(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + node := sqsLeaderNode(t, nodes) + + const name = "peek-fanout.fifo" + status, out := callSQS(t, node, sqsCreateQueueTarget, map[string]any{ + "QueueName": name, + "Attributes": map[string]string{"FifoQueue": "true"}, + }) + if status != http.StatusOK { + t.Fatalf("create: %d %v", status, out) + } + queueURL, _ := out["QueueUrl"].(string) + installPartitionedMetaForTest(t, node, name, 4, htfifoThroughputPerMessageGroupID) + // Send across distinct groups so a Limit=2 peek must paginate. + _ = sendFIFOMessagesForPeek(t, node, queueURL, []string{"a", "b", "c", "d", "e", "f"}) + + ctx := context.Background() + rows, cursor, err := node.sqsServer.AdminPeekQueue(ctx, fullAdminPrincipal, name, AdminPeekMessageOptions{Limit: 2}) + if err != nil { + t.Fatalf("first peek: %v", err) + } + if len(rows) == 0 || cursor == "" { + t.Fatalf("first peek: rows=%d cursor=%q — need both populated to exercise continuation", len(rows), cursor) + } + + v, ok := node.sqsServer.receiveFanoutCounters.Load(name) + if !ok { + t.Fatalf("first peek did not create the fanout counter; the bump-on-first-page contract is broken") + } + counter, _ := v.(*atomic.Uint32) + before := counter.Load() + + // Drive 5 continuation calls; each must not move the counter. + for i := 0; i < 5; i++ { + _, next, err := node.sqsServer.AdminPeekQueue(ctx, fullAdminPrincipal, name, AdminPeekMessageOptions{Limit: 2, Cursor: cursor}) + if err != nil { + t.Fatalf("continuation peek #%d: %v", i, err) + } + if next == "" { + break + } + cursor = next + } + after := counter.Load() + if after != before { + t.Fatalf("continuation peeks bumped fanout counter from %d to %d (expected unchanged)", before, after) + } +} + // TestAdminPeekQueue_HostileCursorBoundedRequest is the end-to-end // regression: an attacker crafts a wire-level cursor with an // out-of-range partition and submits it. The call MUST return From fc35ee517410fa21d6faadfcbb8962b74602aec9 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Thu, 21 May 2026 14:28:36 +0900 Subject: [PATCH 4/5] adapter/sqs: align peek partition iteration with effectivePartitionCount MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three r3 findings on PR #794: 1. Codex P2 (sqs_admin_peek.go:268). peekStartPartition and walkPeek's partition advance / termination keyed off meta.PartitionCount directly. In perQueue throughput mode (FifoThroughputLimit=perQueue) the data plane collapses every MessageGroupId to partition 0 (see effectivePartitionCount in sqs_keys_dispatch.go); peek still rotated over all N partitions, wasting up to N-1 guaranteed-empty ScanAt calls per peek (31 extra scans at PartitionCount=32). Switch to effectivePartitionCount(meta) in peekStartPartition, walkPeek's partition advance, and preparePeekCursor's bounds check. Caller audit: effectivePartitionCount is the same helper receiveMessage uses for its fanout loop (sqs_messages.go:885), so peek and receive now agree on partition usage. 2. CodeRabbit minor (test order-coupling). HappyPath asserted row ordering matched send order, but vis-index entries with the same visible_at millisecond tie-break on message_id (random hex) — a timing-sensitive flake. Replace ordered loop with set-based assertion via assertPeekRowsAsSet. 3. CodeRabbit minor (counter type assertion panic). The fanout counter test used `counter, _ := v.(*atomic.Uint32)` then counter.Load(), which panics if the assertion fails. Extract loadFanoutCounter helper that fails the test explicitly on bad type / nil rather than panicking. Tests: TestAdminPeekQueue_PerQueueFIFOCollapsesToOnePartition pins the perQueue functional contract end-to-end. TestPreparePeekCursor_PerQueueCollapse pins the cursor codec's effective-bounds validation (a Partition=2 cursor against a perQueue PartitionCount=4 queue is rejected — even though the raw PartitionCount would otherwise allow it). --- adapter/sqs_admin_peek.go | 72 ++++++++++------- adapter/sqs_admin_peek_test.go | 139 +++++++++++++++++++++++++++------ 2 files changed, 157 insertions(+), 54 deletions(-) diff --git a/adapter/sqs_admin_peek.go b/adapter/sqs_admin_peek.go index cb775ba1..996ad57a 100644 --- a/adapter/sqs_admin_peek.go +++ b/adapter/sqs_admin_peek.go @@ -262,40 +262,56 @@ func (s *SQSServer) AdminPeekQueue( // peek pages between receives lands every receive on the same // partition, starving the other three). func (s *SQSServer) peekStartPartition(queueName string, cursor *peekCursor, meta *sqsQueueMeta) uint32 { - if cursor != nil || meta.PartitionCount <= 1 { + if cursor != nil { return 0 } - return s.nextReceiveFanoutStart(queueName, meta.PartitionCount) + effective := effectivePartitionCount(meta) + if effective <= 1 { + return 0 + } + return s.nextReceiveFanoutStart(queueName, effective) } // preparePeekCursor builds the effective cursor for this call. On the // first page (cursor==nil) it stamps Generation + a rotated -// StartPartition (partitioned queues only — non-partitioned start at -// 0). On a follow-up page it validates the stored Generation matches -// the queue's current generation AND that StartPartition / Partition -// are within [0, max(PartitionCount, 1)); a mismatch on either +// StartPartition (partitioned queues only — non-partitioned and +// perQueue-throughput FIFO queues collapse to partition 0). On a +// follow-up page it validates the stored Generation matches the +// queue's current generation AND that StartPartition / Partition are +// within [0, effectivePartitionCount(meta)); a mismatch on either // returns ErrAdminSQSValidation. // +// effectivePartitionCount (not meta.PartitionCount) is the +// authoritative iteration bound. perQueue FIFO mode collapses every +// MessageGroupId to partition 0 (see partitionFor / +// effectivePartitionCount), so partitions 1..N-1 are guaranteed empty +// for those queues; walking them would be pointless read +// amplification (Codex r3 P2). Keying validation off the effective +// count keeps peek aligned with the data-plane's actual partition +// usage. +// // Bounds-check rationale: walkPeek terminates the partitioned -// rotation when `(Partition + 1) % PartitionCount == StartPartition`. -// If a client supplies StartPartition outside [0, PartitionCount), -// that termination condition never fires and the call loops -// ScanAt-by-ScanAt over guaranteed-empty partitions forever — a -// request-amplification DoS against the admin endpoint (Codex r1 -// P1 on PR #794). Rejecting bad cursor partition indices up-front -// closes the vector. Generation mismatch separately forces a -// front-of-stream refresh after a purge. +// rotation when `(Partition + 1) % effectivePartitionCount == +// StartPartition`. If a client supplies StartPartition outside +// [0, effectivePartitionCount), that termination condition never +// fires and the call loops ScanAt-by-ScanAt over guaranteed-empty +// partitions forever — a request-amplification DoS against the +// admin endpoint (Codex r1 P1 on PR #794). Rejecting bad cursor +// partition indices up-front closes the vector. Generation mismatch +// separately forces a front-of-stream refresh after a purge. // // startPartition is computed by the caller (the SQSServer method's -// nextReceiveFanoutStart) so this helper stays method-free for -// unit-testability. Non-partitioned queues pass 0. +// peekStartPartition wrapper around nextReceiveFanoutStart) so this +// helper stays method-free for unit-testability. Non-partitioned and +// perQueue-collapsed queues pass 0. func preparePeekCursor(cursor *peekCursor, meta *sqsQueueMeta, startPartition uint32) (*peekCursor, error) { + effective := effectivePartitionCount(meta) if cursor == nil { out := &peekCursor{ V: peekCursorSchemaV1, Generation: meta.Generation, } - if meta.PartitionCount > 1 { + if effective > 1 { out.StartPartition = startPartition out.Partition = startPartition } @@ -305,18 +321,10 @@ func preparePeekCursor(cursor *peekCursor, meta *sqsQueueMeta, startPartition ui return nil, errors.Wrap(ErrAdminSQSValidation, "admin peek: cursor is from a prior generation; restart from the front") } - maxPartition := meta.PartitionCount - if maxPartition <= 1 { - // Non-partitioned queues only have partition 0. A non-zero - // Partition or StartPartition would also fail the rotation - // termination check; reject explicitly so the error is the - // documented 400, not a silent O(infty) loop. - maxPartition = 1 - } - if cursor.StartPartition >= maxPartition || cursor.Partition >= maxPartition { + if cursor.StartPartition >= effective || cursor.Partition >= effective { return nil, errors.Wrapf(ErrAdminSQSValidation, "admin peek: cursor partition index out of range (StartPartition=%d, Partition=%d, max=%d)", - cursor.StartPartition, cursor.Partition, maxPartition) + cursor.StartPartition, cursor.Partition, effective) } return cursor, nil } @@ -350,11 +358,15 @@ func (s *SQSServer) walkPeek( cursor.LastKey = next return rows, cursor, nil } - // Partition exhausted before Limit reached. - if meta.PartitionCount <= 1 { + // Partition exhausted before Limit reached. effectivePartitionCount + // (not meta.PartitionCount) caps the rotation so perQueue-collapsed + // FIFO queues stop after partition 0 instead of walking N-1 + // guaranteed-empty partitions (Codex r3 P2 on PR #794). + effective := effectivePartitionCount(meta) + if effective <= 1 { return rows, nil, nil } - nextPart := (cursor.Partition + 1) % meta.PartitionCount + nextPart := (cursor.Partition + 1) % effective if nextPart == cursor.StartPartition { return rows, nil, nil } diff --git a/adapter/sqs_admin_peek_test.go b/adapter/sqs_admin_peek_test.go index a4cd543a..681c2f16 100644 --- a/adapter/sqs_admin_peek_test.go +++ b/adapter/sqs_admin_peek_test.go @@ -46,30 +46,38 @@ func TestAdminPeekQueue_HappyPath(t *testing.T) { if nextCursor != "" { t.Fatalf("nextCursor=%q want empty (queue drained in one page)", nextCursor) } - for i, row := range rows { - assertPeekRowMatchesIndexedBody(t, i, row, "body-") - } + want := map[string]struct{}{"body-0": {}, "body-1": {}, "body-2": {}} + assertPeekRowsAsSet(t, rows, want) } -// assertPeekRowMatchesIndexedBody pins the per-row invariants the -// happy path expects: the body matches bodyPrefix+i, no truncation -// fired (the bodies are short), MessageID is populated, and -// SentTimestamp is non-zero. Pulled into a helper so -// TestAdminPeekQueue_HappyPath stays under the cyclop budget. -func assertPeekRowMatchesIndexedBody(t *testing.T, idx int, row AdminPeekedMessage, bodyPrefix string) { +// assertPeekRowsAsSet pins the per-row invariants (MessageID +// populated, SentTimestamp non-zero, no truncation) AND the set of +// observed bodies, without coupling to row order. Vis-index entries +// with the same visible_at millisecond tie-break on message_id which +// is random, so an order-sensitive assertion is flaky under fast +// sends — CodeRabbit r3 caught the earlier ordered-loop version. +func assertPeekRowsAsSet(t *testing.T, rows []AdminPeekedMessage, want map[string]struct{}) { t.Helper() - want := bodyPrefix + strconv.Itoa(idx) - if row.Body != want { - t.Fatalf("rows[%d].Body=%q want %q", idx, row.Body, want) - } - if row.BodyTruncated { - t.Fatalf("rows[%d].BodyTruncated=true; want false for %d-byte body", idx, len(row.Body)) + got := make(map[string]struct{}, len(rows)) + for i, row := range rows { + if row.BodyTruncated { + t.Fatalf("rows[%d].BodyTruncated=true; want false for short body", i) + } + if row.MessageID == "" { + t.Fatalf("rows[%d].MessageID empty", i) + } + if row.SentTimestamp.IsZero() { + t.Fatalf("rows[%d].SentTimestamp zero", i) + } + got[row.Body] = struct{}{} } - if row.MessageID == "" { - t.Fatalf("rows[%d].MessageID empty", idx) + for w := range want { + if _, ok := got[w]; !ok { + t.Fatalf("missing body %q in observed set %v", w, got) + } } - if row.SentTimestamp.IsZero() { - t.Fatalf("rows[%d].SentTimestamp zero", idx) + if len(got) != len(want) { + t.Fatalf("observed set %v != expected set %v", got, want) } } @@ -826,6 +834,24 @@ func TestPreparePeekCursor_PartitionOutOfRange(t *testing.T) { }) } +// loadFanoutCounter retrieves the per-queue receive fanout counter +// the server's receive path uses for partition rotation. Returns the +// counter or fails the test if it is absent / has an unexpected +// type. Pulled into a helper so callers that read-then-compare the +// counter value stay under the cyclop budget. +func loadFanoutCounter(t *testing.T, node Node, queueName string) *atomic.Uint32 { + t.Helper() + v, ok := node.sqsServer.receiveFanoutCounters.Load(queueName) + if !ok { + t.Fatalf("fanout counter missing for queue %q (bump-on-first-page contract broken)", queueName) + } + counter, ok := v.(*atomic.Uint32) + if !ok || counter == nil { + t.Fatalf("fanout counter for %q has unexpected type %T (want *atomic.Uint32)", queueName, v) + } + return counter +} + // TestAdminPeekQueue_ContinuationDoesNotBumpFanoutCounter pins Codex // r2 P1: AdminPeekQueue must NOT advance the shared // receiveFanoutCounters counter on continuation pages. The counter @@ -863,11 +889,7 @@ func TestAdminPeekQueue_ContinuationDoesNotBumpFanoutCounter(t *testing.T) { t.Fatalf("first peek: rows=%d cursor=%q — need both populated to exercise continuation", len(rows), cursor) } - v, ok := node.sqsServer.receiveFanoutCounters.Load(name) - if !ok { - t.Fatalf("first peek did not create the fanout counter; the bump-on-first-page contract is broken") - } - counter, _ := v.(*atomic.Uint32) + counter := loadFanoutCounter(t, node, name) before := counter.Load() // Drive 5 continuation calls; each must not move the counter. @@ -887,6 +909,75 @@ func TestAdminPeekQueue_ContinuationDoesNotBumpFanoutCounter(t *testing.T) { } } +// TestAdminPeekQueue_PerQueueFIFOCollapsesToOnePartition pins Codex +// r3 P2: a partitioned FIFO queue in perQueue throughput mode +// collapses every MessageGroupId to partition 0 (see +// effectivePartitionCount). Peek must align with the data plane: it +// walks only the effective partitions, not the raw PartitionCount +// count, so a PartitionCount=4 perQueue queue does not waste 3 +// guaranteed-empty ScanAt calls on partitions 1..3 per peek call. +// +// Functional assertion: peek succeeds end-to-end on a perQueue queue +// and surfaces every sent message. The optimization itself is +// internal — a follow-up that re-introduces the meta.PartitionCount +// rotation would still pass this assertion but the (Partition+1) % +// effective termination encoded in walkPeek would still cap the +// scan, so the regression vector is closed by the cursor codec +// validation (effective bounds) being the chokepoint. +func TestAdminPeekQueue_PerQueueFIFOCollapsesToOnePartition(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + node := sqsLeaderNode(t, nodes) + + const name = "peek-perqueue.fifo" + status, out := callSQS(t, node, sqsCreateQueueTarget, map[string]any{ + "QueueName": name, + "Attributes": map[string]string{"FifoQueue": "true"}, + }) + if status != http.StatusOK { + t.Fatalf("create: %d %v", status, out) + } + queueURL, _ := out["QueueUrl"].(string) + installPartitionedMetaForTest(t, node, name, 4, htfifoThroughputPerQueue) + + groups := []string{"alpha", "beta", "gamma"} + sent := sendFIFOMessagesForPeek(t, node, queueURL, groups) + seen := walkPeekUntilDone(t, context.Background(), node, name, 10) + assertEveryMessageSeenOnce(t, sent, seen) +} + +// TestPreparePeekCursor_PerQueueCollapse confirms the cursor codec's +// validation key off effectivePartitionCount, not the raw +// meta.PartitionCount. A perQueue queue with PartitionCount=4 must +// reject any cursor with Partition > 0 (the effective bound is 1) +// even though meta.PartitionCount itself is 4. +func TestPreparePeekCursor_PerQueueCollapse(t *testing.T) { + t.Parallel() + + t.Run("fresh cursor on perQueue stamps StartPartition=0", func(t *testing.T) { + t.Parallel() + meta := &sqsQueueMeta{Generation: 1, PartitionCount: 4, FifoThroughputLimit: htfifoThroughputPerQueue} + out, err := preparePeekCursor(nil, meta, 2) + if err != nil { + t.Fatalf("err: %v", err) + } + if out.StartPartition != 0 || out.Partition != 0 { + t.Fatalf("perQueue fresh cursor=%+v want StartPartition=Partition=0 (effective=1)", out) + } + }) + + t.Run("perQueue rejects cursor.Partition >= 1", func(t *testing.T) { + t.Parallel() + meta := &sqsQueueMeta{Generation: 1, PartitionCount: 4, FifoThroughputLimit: htfifoThroughputPerQueue} + cursor := &peekCursor{V: peekCursorSchemaV1, Generation: 1, StartPartition: 0, Partition: 2} + _, err := preparePeekCursor(cursor, meta, 0) + if !errors.Is(err, ErrAdminSQSValidation) { + t.Fatalf("perQueue Partition=2 (raw PartitionCount=4 would allow): want ErrAdminSQSValidation, got %v", err) + } + }) +} + // TestAdminPeekQueue_HostileCursorBoundedRequest is the end-to-end // regression: an attacker crafts a wire-level cursor with an // out-of-range partition and submits it. The call MUST return From ddb4e6ec7b0f18f4b0342b37829be545efe1f7a7 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Thu, 21 May 2026 22:28:58 +0900 Subject: [PATCH 5/5] adapter/sqs: validate peek cursor LastKey + add JSON tags to wire types MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two r4 findings on PR #794: 1. Codex P2 (sqs_admin_peek.go:401). preparePeekCursor only validated Generation and partition indices; cursor.LastKey was trusted directly as the ScanAt start key, checked only against the upper bound. A forged LastKey below the queue's visibility-index prefix would let ScanAt walk unrelated key ranges, triggering up to Limit extra GetAt misses per call - read amplification against the leader. preparePeekCursor now rejects with ErrAdminSQSValidation when cursor.LastKey is not prefixed by sqsMsgVisPrefixForQueueDispatch(meta, queueName, Partition, Generation). The check catches three classes of forgery: completely arbitrary bytes (e.g. "aaaa"), a valid LastKey from a different queue (prefix encodes queue name), and a valid LastKey from a different partition of the same queue. preparePeekCursor's signature gains queueName; the only existing caller (AdminPeekQueue) and the existing unit tests were updated. 2. CodeRabbit Major (sqs_admin_peek.go:35-45, outside diff). AdminPeekedMessage lacked JSON tags, so json.Marshal emitted Go-style PascalCase ("MessageID", "BodyTruncated", ...) instead of the snake_case wire shape the design doc §3.5 specifies (the SPA's client adapter expects the spec'd form). Also empty Attributes / GroupID / DeduplicationID would serialize as "null" / "" instead of being omitted. JSON tags added with appropriate omitempty. Caller audit (semantic-change rule): preparePeekCursor signature changed from (cursor, meta, startPartition) to (cursor, meta, queueName, startPartition). One non-test caller (AdminPeekQueue at sqs_admin_peek.go:240) and 10 test call sites updated. The new queueName parameter is purely additive validation; it does not alter the cursor's stamping or generation-mismatch behavior. Tests: TestAdminPeekedMessage_JSONWireFormat / TestAdminPeekedMessage_JSONOmitsEmptyAttributes pin the JSON wire shape. TestPreparePeekCursor_PartitionOutOfRange gains three new sub-cases (forged LastKey, foreign-queue LastKey, mismatched- partition LastKey). The test function was rewritten as a table-driven loop so it stays under the cyclop budget. --- adapter/sqs_admin_peek.go | 43 ++++++--- adapter/sqs_admin_peek_test.go | 166 +++++++++++++++++++++++---------- 2 files changed, 150 insertions(+), 59 deletions(-) diff --git a/adapter/sqs_admin_peek.go b/adapter/sqs_admin_peek.go index 996ad57a..047785aa 100644 --- a/adapter/sqs_admin_peek.go +++ b/adapter/sqs_admin_peek.go @@ -31,17 +31,21 @@ type AdminPeekedAttribute struct { BinaryValue []byte `json:"binary_value,omitempty"` } -// AdminPeekedMessage is one row in the peek result. +// AdminPeekedMessage is one row in the peek result. JSON tags pin +// the snake_case wire shape the design doc §3.5 specifies; without +// them the encoder would emit Go-style PascalCase field names and +// the SPA's client adapter would silently misparse every row. +// CodeRabbit r4 caught the regression. type AdminPeekedMessage struct { - MessageID string - Body string // truncated per opts.BodyMaxBytes - BodyTruncated bool // true when Body was cut - BodyOriginalSize int64 // bytes in the original body, for display - SentTimestamp time.Time // SQS SentTimestamp - ReceiveCount int32 // ApproximateReceiveCount - GroupID string // FIFO MessageGroupId, empty for standard - DeduplicationID string // FIFO MessageDeduplicationId, empty for standard - Attributes map[string]AdminPeekedAttribute // typed SQS message attributes + MessageID string `json:"message_id"` + Body string `json:"body"` // truncated per opts.BodyMaxBytes + BodyTruncated bool `json:"body_truncated"` // true when Body was cut + BodyOriginalSize int64 `json:"body_original_size"` // bytes in the original body, for display + SentTimestamp time.Time `json:"sent_timestamp"` // SQS SentTimestamp + ReceiveCount int32 `json:"receive_count"` // ApproximateReceiveCount + GroupID string `json:"group_id,omitempty"` // FIFO MessageGroupId, empty for standard + DeduplicationID string `json:"deduplication_id,omitempty"` // FIFO MessageDeduplicationId, empty for standard + Attributes map[string]AdminPeekedAttribute `json:"attributes,omitempty"` // typed SQS message attributes } // AdminPeekMessageOptions controls a peek call. Zero values map to @@ -233,7 +237,7 @@ func (s *SQSServer) AdminPeekQueue( if !exists { return nil, "", ErrAdminSQSNotFound } - cursor, err = preparePeekCursor(cursor, meta, s.peekStartPartition(name, cursor, meta)) + cursor, err = preparePeekCursor(cursor, meta, name, s.peekStartPartition(name, cursor, meta)) if err != nil { return nil, "", err } @@ -304,7 +308,15 @@ func (s *SQSServer) peekStartPartition(queueName string, cursor *peekCursor, met // peekStartPartition wrapper around nextReceiveFanoutStart) so this // helper stays method-free for unit-testability. Non-partitioned and // perQueue-collapsed queues pass 0. -func preparePeekCursor(cursor *peekCursor, meta *sqsQueueMeta, startPartition uint32) (*peekCursor, error) { +// +// queueName is consumed to validate cursor.LastKey on continuation +// pages: a forged LastKey outside the queue's visibility-index +// prefix would otherwise let an attacker start a ScanAt at any byte +// offset in the leader's keyspace (Codex r4 P2). Continuation +// cursors are admin-supplied and signed only by base64-encoding, so +// every field must be validated against the live queue meta before +// being used as a storage cursor. +func preparePeekCursor(cursor *peekCursor, meta *sqsQueueMeta, queueName string, startPartition uint32) (*peekCursor, error) { effective := effectivePartitionCount(meta) if cursor == nil { out := &peekCursor{ @@ -326,6 +338,13 @@ func preparePeekCursor(cursor *peekCursor, meta *sqsQueueMeta, startPartition ui "admin peek: cursor partition index out of range (StartPartition=%d, Partition=%d, max=%d)", cursor.StartPartition, cursor.Partition, effective) } + if len(cursor.LastKey) > 0 { + expectedPrefix := sqsMsgVisPrefixForQueueDispatch(meta, queueName, cursor.Partition, meta.Generation) + if !bytes.HasPrefix(cursor.LastKey, expectedPrefix) { + return nil, errors.Wrap(ErrAdminSQSValidation, + "admin peek: cursor LastKey is outside the queue's visibility-index prefix") + } + } return cursor, nil } diff --git a/adapter/sqs_admin_peek_test.go b/adapter/sqs_admin_peek_test.go index 681c2f16..2b36082a 100644 --- a/adapter/sqs_admin_peek_test.go +++ b/adapter/sqs_admin_peek_test.go @@ -727,6 +727,80 @@ func TestClampPeekBodyBytes(t *testing.T) { } } +// TestAdminPeekedMessage_JSONWireFormat pins the snake_case wire +// shape the design doc §3.5 specifies. Without explicit JSON tags +// the encoder emits Go-style PascalCase ("MessageID", "BodyTruncated" +// …) and the SPA's client adapter silently misparses every row +// (CodeRabbit r4 caught the regression). +func TestAdminPeekedMessage_JSONWireFormat(t *testing.T) { + t.Parallel() + in := AdminPeekedMessage{ + MessageID: "m1", + Body: "hello", + BodyTruncated: true, + BodyOriginalSize: 42, + SentTimestamp: time.UnixMilli(1_700_000_000_000).UTC(), + ReceiveCount: 3, + GroupID: "g1", + DeduplicationID: "d1", + Attributes: map[string]AdminPeekedAttribute{ + "Source": {DataType: sqsAttributeBaseTypeString, StringValue: "checkout"}, + }, + } + raw, err := json.Marshal(in) + if err != nil { + t.Fatalf("marshal: %v", err) + } + var got map[string]any + if err := json.Unmarshal(raw, &got); err != nil { + t.Fatalf("unmarshal: %v", err) + } + wantKeys := []string{ + "message_id", "body", "body_truncated", "body_original_size", + "sent_timestamp", "receive_count", "group_id", "deduplication_id", + "attributes", + } + for _, k := range wantKeys { + if _, ok := got[k]; !ok { + t.Fatalf("missing key %q in wire JSON; got=%v", k, got) + } + } + // PascalCase keys must NOT appear. + for _, k := range []string{"MessageID", "Body", "BodyTruncated", "GroupID", "Attributes"} { + if _, ok := got[k]; ok { + t.Fatalf("unwanted PascalCase key %q in wire JSON; got=%v", k, got) + } + } + // Nested AdminPeekedAttribute also uses snake_case. + attrs, _ := got["attributes"].(map[string]any) + src, _ := attrs["Source"].(map[string]any) + if _, ok := src["data_type"]; !ok { + t.Fatalf("Source attribute missing 'data_type' key; got=%v", src) + } +} + +// TestAdminPeekedMessage_JSONOmitsEmptyAttributes pins the +// omitempty contract: empty Attributes / GroupID / DeduplicationID +// must NOT appear on the wire (the SPA renders the field's absence, +// not a "null" or "{}" placeholder). +func TestAdminPeekedMessage_JSONOmitsEmptyAttributes(t *testing.T) { + t.Parallel() + in := AdminPeekedMessage{MessageID: "m1", Body: "x"} + raw, err := json.Marshal(in) + if err != nil { + t.Fatalf("marshal: %v", err) + } + var got map[string]any + if err := json.Unmarshal(raw, &got); err != nil { + t.Fatalf("unmarshal: %v", err) + } + for _, k := range []string{"group_id", "deduplication_id", "attributes"} { + if _, ok := got[k]; ok { + t.Fatalf("empty %q must be omitted from wire JSON; got=%v", k, got) + } + } +} + // TestPreparePeekCursor_FreshCursor confirms the first-call cursor // stamps Generation and (for partitioned queues) StartPartition. func TestPreparePeekCursor_FreshCursor(t *testing.T) { @@ -735,7 +809,7 @@ func TestPreparePeekCursor_FreshCursor(t *testing.T) { t.Run("non-partitioned", func(t *testing.T) { t.Parallel() meta := &sqsQueueMeta{Generation: 7} - out, err := preparePeekCursor(nil, meta, 0) + out, err := preparePeekCursor(nil, meta, "test", 0) if err != nil { t.Fatalf("err: %v", err) } @@ -747,7 +821,7 @@ func TestPreparePeekCursor_FreshCursor(t *testing.T) { t.Run("partitioned", func(t *testing.T) { t.Parallel() meta := &sqsQueueMeta{Generation: 11, PartitionCount: 4} - out, err := preparePeekCursor(nil, meta, 2) + out, err := preparePeekCursor(nil, meta, "test", 2) if err != nil { t.Fatalf("err: %v", err) } @@ -763,7 +837,7 @@ func TestPreparePeekCursor_GenerationMismatch(t *testing.T) { t.Parallel() stale := &peekCursor{V: peekCursorSchemaV1, Generation: 3} meta := &sqsQueueMeta{Generation: 4} - _, err := preparePeekCursor(stale, meta, 0) + _, err := preparePeekCursor(stale, meta, "test", 0) if !errors.Is(err, ErrAdminSQSValidation) { t.Fatalf("want ErrAdminSQSValidation, got %v", err) } @@ -779,52 +853,50 @@ func TestPreparePeekCursor_GenerationMismatch(t *testing.T) { // endpoint). func TestPreparePeekCursor_PartitionOutOfRange(t *testing.T) { t.Parallel() - - t.Run("partitioned: StartPartition >= PartitionCount", func(t *testing.T) { - t.Parallel() - meta := &sqsQueueMeta{Generation: 1, PartitionCount: 4} - cursor := &peekCursor{V: peekCursorSchemaV1, Generation: 1, StartPartition: 99, Partition: 0} - _, err := preparePeekCursor(cursor, meta, 0) - if !errors.Is(err, ErrAdminSQSValidation) { - t.Fatalf("StartPartition=99/PC=4: want ErrAdminSQSValidation, got %v", err) - } - }) - - t.Run("partitioned: Partition >= PartitionCount", func(t *testing.T) { - t.Parallel() - meta := &sqsQueueMeta{Generation: 1, PartitionCount: 4} - cursor := &peekCursor{V: peekCursorSchemaV1, Generation: 1, StartPartition: 0, Partition: 99} - _, err := preparePeekCursor(cursor, meta, 0) - if !errors.Is(err, ErrAdminSQSValidation) { - t.Fatalf("Partition=99/PC=4: want ErrAdminSQSValidation, got %v", err) - } - }) - - t.Run("non-partitioned: non-zero StartPartition", func(t *testing.T) { - t.Parallel() - meta := &sqsQueueMeta{Generation: 1, PartitionCount: 0} - cursor := &peekCursor{V: peekCursorSchemaV1, Generation: 1, StartPartition: 1, Partition: 0} - _, err := preparePeekCursor(cursor, meta, 0) - if !errors.Is(err, ErrAdminSQSValidation) { - t.Fatalf("non-partitioned StartPartition=1: want ErrAdminSQSValidation, got %v", err) - } - }) - - t.Run("non-partitioned: non-zero Partition", func(t *testing.T) { - t.Parallel() - meta := &sqsQueueMeta{Generation: 1, PartitionCount: 1} - cursor := &peekCursor{V: peekCursorSchemaV1, Generation: 1, StartPartition: 0, Partition: 5} - _, err := preparePeekCursor(cursor, meta, 0) - if !errors.Is(err, ErrAdminSQSValidation) { - t.Fatalf("non-partitioned Partition=5: want ErrAdminSQSValidation, got %v", err) - } - }) + pc4 := &sqsQueueMeta{Generation: 1, PartitionCount: 4} + pc0 := &sqsQueueMeta{Generation: 1, PartitionCount: 0} + pc1 := &sqsQueueMeta{Generation: 1, PartitionCount: 1} + // foreignKey / mismatchedKey share a "first byte not in the + // admin prefix" property with "aaaa" so the test exercises all + // three forged-LastKey classes the bounds check catches. + foreignKey := append(sqsMsgVisPrefixForQueueDispatch(pc4, "queue-X", 0, 1), 'X') + mismatchedKey := append(sqsMsgVisPrefixForQueueDispatch(pc4, "queue-A", 2, 1), 'X') + + cases := []struct { + name string + meta *sqsQueueMeta + queue string + cursor *peekCursor + }{ + {"partitioned: StartPartition >= PartitionCount", pc4, "queue-A", + &peekCursor{V: peekCursorSchemaV1, Generation: 1, StartPartition: 99, Partition: 0}}, + {"partitioned: Partition >= PartitionCount", pc4, "queue-A", + &peekCursor{V: peekCursorSchemaV1, Generation: 1, StartPartition: 0, Partition: 99}}, + {"non-partitioned: non-zero StartPartition", pc0, "queue-A", + &peekCursor{V: peekCursorSchemaV1, Generation: 1, StartPartition: 1, Partition: 0}}, + {"non-partitioned: non-zero Partition", pc1, "queue-A", + &peekCursor{V: peekCursorSchemaV1, Generation: 1, StartPartition: 0, Partition: 5}}, + {"forged LastKey outside partition prefix (Codex r4 P2)", pc4, "queue-A", + &peekCursor{V: peekCursorSchemaV1, Generation: 1, StartPartition: 0, Partition: 0, LastKey: []byte("aaaa")}}, + {"LastKey from a different queue", pc4, "queue-A", + &peekCursor{V: peekCursorSchemaV1, Generation: 1, StartPartition: 0, Partition: 0, LastKey: foreignKey}}, + {"LastKey from a different partition", pc4, "queue-A", + &peekCursor{V: peekCursorSchemaV1, Generation: 1, StartPartition: 0, Partition: 0, LastKey: mismatchedKey}}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + _, err := preparePeekCursor(tc.cursor, tc.meta, tc.queue, 0) + if !errors.Is(err, ErrAdminSQSValidation) { + t.Fatalf("%s: want ErrAdminSQSValidation, got %v", tc.name, err) + } + }) + } t.Run("partitioned: in-range cursor accepted", func(t *testing.T) { t.Parallel() - meta := &sqsQueueMeta{Generation: 1, PartitionCount: 4} cursor := &peekCursor{V: peekCursorSchemaV1, Generation: 1, StartPartition: 3, Partition: 2} - out, err := preparePeekCursor(cursor, meta, 0) + out, err := preparePeekCursor(cursor, pc4, "queue-A", 0) if err != nil { t.Fatalf("in-range cursor: got err %v want nil", err) } @@ -958,7 +1030,7 @@ func TestPreparePeekCursor_PerQueueCollapse(t *testing.T) { t.Run("fresh cursor on perQueue stamps StartPartition=0", func(t *testing.T) { t.Parallel() meta := &sqsQueueMeta{Generation: 1, PartitionCount: 4, FifoThroughputLimit: htfifoThroughputPerQueue} - out, err := preparePeekCursor(nil, meta, 2) + out, err := preparePeekCursor(nil, meta, "test", 2) if err != nil { t.Fatalf("err: %v", err) } @@ -971,7 +1043,7 @@ func TestPreparePeekCursor_PerQueueCollapse(t *testing.T) { t.Parallel() meta := &sqsQueueMeta{Generation: 1, PartitionCount: 4, FifoThroughputLimit: htfifoThroughputPerQueue} cursor := &peekCursor{V: peekCursorSchemaV1, Generation: 1, StartPartition: 0, Partition: 2} - _, err := preparePeekCursor(cursor, meta, 0) + _, err := preparePeekCursor(cursor, meta, "test", 0) if !errors.Is(err, ErrAdminSQSValidation) { t.Fatalf("perQueue Partition=2 (raw PartitionCount=4 would allow): want ErrAdminSQSValidation, got %v", err) }