diff --git a/adapter/dynamodb_admin.go b/adapter/dynamodb_admin.go index 63b05897..1b815ce6 100644 --- a/adapter/dynamodb_admin.go +++ b/adapter/dynamodb_admin.go @@ -83,6 +83,16 @@ const ( // future "delete-only" tier reads consistently across the package. func (r AdminRole) canWrite() bool { return r == AdminRoleFull } +// canRead reports whether the role authorises non-destructive but +// sensitive reads — e.g. SQS AdminPeekQueue, which exposes message +// bodies / attributes. Both AdminRoleReadOnly and AdminRoleFull +// satisfy this gate; the zero value (unauthenticated / role-less +// principal) does not. List / Describe paths use the looser +// session-auth gate because their output is queue metadata already +// shown on the SPA list page; peek is divergent because the payload +// is the message bodies themselves. +func (r AdminRole) canRead() bool { return r == AdminRoleReadOnly || r == AdminRoleFull } + // AdminPrincipal is the authentication context every admin write // entrypoint takes. The adapter re-evaluates authorisation against // this principal *itself* — it does not trust the caller to have diff --git a/adapter/sqs_admin_peek.go b/adapter/sqs_admin_peek.go new file mode 100644 index 00000000..047785aa --- /dev/null +++ b/adapter/sqs_admin_peek.go @@ -0,0 +1,543 @@ +package adapter + +import ( + "bytes" + "context" + "encoding/base64" + "strings" + "time" + + "github.com/bootjp/elastickv/store" + "github.com/cockroachdb/errors" + json "github.com/goccy/go-json" +) + +// AdminPeekedAttribute mirrors the typed shape SQS uses for +// MessageAttribute values — DataType (e.g. "String", "Number", +// "Binary", "String.MyCustom") plus the value in the appropriate +// representation. The earlier draft used map[string]string here, +// which would have flattened the typed attribute set stored in +// sqsMessageRecord.MessageAttributes and silently dropped binary +// payloads + the DataType discriminator (Codex r11 on the design +// doc). Operators triaging a DLQ need both — a message routed there +// because of an attribute-encoding mismatch is invisible if peek +// only surfaces stringified values. +type AdminPeekedAttribute struct { + DataType string `json:"data_type"` + StringValue string `json:"string_value,omitempty"` + // BinaryValue carries the raw bytes; the JSON wire form + // base64-encodes (standard Go encoding/json behaviour for + // []byte) so binary payloads survive the SPA round-trip. + BinaryValue []byte `json:"binary_value,omitempty"` +} + +// AdminPeekedMessage is one row in the peek result. JSON tags pin +// the snake_case wire shape the design doc §3.5 specifies; without +// them the encoder would emit Go-style PascalCase field names and +// the SPA's client adapter would silently misparse every row. +// CodeRabbit r4 caught the regression. +type AdminPeekedMessage struct { + MessageID string `json:"message_id"` + Body string `json:"body"` // truncated per opts.BodyMaxBytes + BodyTruncated bool `json:"body_truncated"` // true when Body was cut + BodyOriginalSize int64 `json:"body_original_size"` // bytes in the original body, for display + SentTimestamp time.Time `json:"sent_timestamp"` // SQS SentTimestamp + ReceiveCount int32 `json:"receive_count"` // ApproximateReceiveCount + GroupID string `json:"group_id,omitempty"` // FIFO MessageGroupId, empty for standard + DeduplicationID string `json:"deduplication_id,omitempty"` // FIFO MessageDeduplicationId, empty for standard + Attributes map[string]AdminPeekedAttribute `json:"attributes,omitempty"` // typed SQS message attributes +} + +// AdminPeekMessageOptions controls a peek call. Zero values map to +// the documented defaults: Limit=20, Cursor=empty, BodyMaxBytes=4096. +type AdminPeekMessageOptions struct { + // Limit caps the number of messages returned. Clamped to + // [1, adminPeekMaxLimit]; 0 means "use default + // (adminPeekDefaultLimit)". + Limit int + // Cursor is an opaque continuation token from a prior call; + // empty means "start from the front of the visibility index". + Cursor string + // BodyMaxBytes truncates message bodies at this length to + // bound response size. Clamped to + // [adminPeekMinBodyBytes, sqsMaximumAllowedMaximumMessageSize] + // (= 256 KiB, matching AWS SQS's hard cap on stored message + // size). 0 means "use default (adminPeekDefaultBodyBytes)". + // The full body is always retained on the server; only the + // wire representation is truncated. + BodyMaxBytes int +} + +const ( + // adminPeekDefaultLimit is the row count an empty + // AdminPeekMessageOptions.Limit maps to. Matches the SPA's + // default Messages-tab page size so the cheapest call (default + // opts) is the one the SPA issues most often. + adminPeekDefaultLimit = 20 + // adminPeekMaxLimit caps Limit. The hard ceiling exists so an + // operator script cannot accidentally issue million-row peeks + // against the leader. + adminPeekMaxLimit = 100 + // adminPeekDefaultBodyBytes is the truncation length applied + // when AdminPeekMessageOptions.BodyMaxBytes is zero. 4 KiB + // keeps a default 20-row response well under typical JSON + // budgets while still covering the vast majority of message + // payloads in real DLQ triage workflows. + adminPeekDefaultBodyBytes = 4096 + // adminPeekMinBodyBytes is the smallest legal BodyMaxBytes + // (after the zero-means-default mapping). Anything smaller + // would be a probable client bug: 256 bytes still fits a JSON + // preview the SPA can render, so we round up. + adminPeekMinBodyBytes = 256 + // adminPeekCursorMaxBytes hard-caps the encoded cursor size. + // Anything larger is either client-supplied junk or a sign + // that a future cursor field grew unbounded; either way the + // admin handler returns ErrAdminSQSValidation. + adminPeekCursorMaxBytes = 512 + // peekCursorSchemaV1 pins the cursor wire format. Bumping + // requires a corresponding decoder branch. + peekCursorSchemaV1 = 1 +) + +// peekCursor is the wire shape of the continuation token. JSON-encoded +// then base64url-wrapped so the SPA can pass it back unchanged. The +// version field exists so a future field rename can be handled +// explicitly instead of silently mis-decoding old SPA tabs after a +// rolling deploy. +type peekCursor struct { + V int `json:"v"` // schema version, currently peekCursorSchemaV1 + Generation uint64 `json:"gen"` // queue generation at scan start + StartPartition uint32 `json:"sp,omitempty"` // partition where this peek walk began (partitioned only) + Partition uint32 `json:"p,omitempty"` // current partition (advances during the walk) + LastKey []byte `json:"k,omitempty"` // last scanned visibility-index key +} + +// errPeekCursorTooLarge is the sentinel returned when encodePeekCursor +// would emit a base64 token larger than adminPeekCursorMaxBytes. A +// fresh cursor with a vis-index key tops out around ~150-200 bytes +// encoded, so hitting this is a regression flag rather than an +// expected wire-level failure: surfacing it as an internal error +// keeps the response shape predictable and pinpoints whoever added +// the bloated field. +var errPeekCursorTooLarge = errors.New("admin peek: encoded cursor exceeds maximum size") + +// encodePeekCursor JSON-encodes the cursor then base64url-wraps it. +// Returns the empty string when the walk has fully completed +// (caller-supplied sentinel: an empty cursor means "no more pages"). +func encodePeekCursor(c *peekCursor) (string, error) { + if c == nil { + return "", nil + } + raw, err := json.Marshal(c) + if err != nil { + return "", errors.WithStack(err) + } + out := base64.RawURLEncoding.EncodeToString(raw) + if len(out) > adminPeekCursorMaxBytes { + return "", errors.WithStack(errPeekCursorTooLarge) + } + return out, nil +} + +// decodePeekCursor unwraps the base64url + JSON envelope. Returns +// ErrAdminSQSValidation on any wire-shape error (oversize, malformed +// base64, malformed JSON, unsupported schema version). The empty +// string returns (nil, nil) — callers treat that as "start from the +// front". +func decodePeekCursor(s string) (*peekCursor, error) { + if s == "" { + return nil, nil + } + if len(s) > adminPeekCursorMaxBytes { + return nil, errors.Wrap(ErrAdminSQSValidation, "admin peek: cursor too large") + } + raw, err := base64.RawURLEncoding.DecodeString(s) + if err != nil { + return nil, errors.Wrap(ErrAdminSQSValidation, "admin peek: cursor is not valid base64url") + } + var c peekCursor + if err := json.Unmarshal(raw, &c); err != nil { + return nil, errors.Wrap(ErrAdminSQSValidation, "admin peek: cursor is not valid JSON") + } + if c.V != peekCursorSchemaV1 { + return nil, errors.Wrapf(ErrAdminSQSValidation, "admin peek: cursor schema version %d unsupported", c.V) + } + return &c, nil +} + +// clampPeekLimit folds a user-supplied Limit into the legal +// [1, adminPeekMaxLimit] range, mapping 0 to adminPeekDefaultLimit. +func clampPeekLimit(limit int) int { + if limit <= 0 { + return adminPeekDefaultLimit + } + if limit > adminPeekMaxLimit { + return adminPeekMaxLimit + } + return limit +} + +// clampPeekBodyBytes folds a user-supplied BodyMaxBytes into +// [adminPeekMinBodyBytes, sqsMaximumAllowedMaximumMessageSize], +// mapping 0 to adminPeekDefaultBodyBytes. +func clampPeekBodyBytes(b int) int { + if b <= 0 { + return adminPeekDefaultBodyBytes + } + if b < adminPeekMinBodyBytes { + return adminPeekMinBodyBytes + } + if b > sqsMaximumAllowedMaximumMessageSize { + return sqsMaximumAllowedMaximumMessageSize + } + return b +} + +// AdminPeekQueue returns a non-destructive sample of currently-visible +// messages in name. Receive counts are NOT incremented and visibility +// timers are NOT started — peek is a pure read over the leader's +// visibility index. Returns the rows plus a continuation cursor that +// the caller passes back as opts.Cursor to fetch the next page (empty +// when the walk is complete). +// +// Sentinel errors: +// - ErrAdminForbidden — peek requires read role; nil principal is denied +// - ErrAdminNotLeader — peek runs on the leader (the visibility +// index is leader-only-written; a follower read would race the +// leader's apply) +// - ErrAdminSQSNotFound — queue absent +// - ErrAdminSQSValidation — empty / malformed name, malformed / +// oversized / stale-generation cursor +func (s *SQSServer) AdminPeekQueue( + ctx context.Context, + principal AdminPrincipal, + name string, + opts AdminPeekMessageOptions, +) ([]AdminPeekedMessage, string, error) { + if !principal.Role.canRead() { + return nil, "", ErrAdminForbidden + } + if !isVerifiedSQSLeader(ctx, s.coordinator) { + return nil, "", ErrAdminNotLeader + } + if strings.TrimSpace(name) == "" { + return nil, "", ErrAdminSQSValidation + } + limit := clampPeekLimit(opts.Limit) + bodyMaxBytes := clampPeekBodyBytes(opts.BodyMaxBytes) + cursor, err := decodePeekCursor(opts.Cursor) + if err != nil { + return nil, "", err + } + readTS := s.nextTxnReadTS(ctx) + meta, exists, err := s.loadQueueMetaAt(ctx, name, readTS) + if err != nil { + return nil, "", errors.WithStack(err) + } + if !exists { + return nil, "", ErrAdminSQSNotFound + } + cursor, err = preparePeekCursor(cursor, meta, name, s.peekStartPartition(name, cursor, meta)) + if err != nil { + return nil, "", err + } + rows, nextCursor, err := s.walkPeek(ctx, name, meta, readTS, cursor, limit, bodyMaxBytes) + if err != nil { + return nil, "", err + } + encoded, err := encodePeekCursor(nextCursor) + if err != nil { + return nil, "", err + } + return rows, encoded, nil +} + +// peekStartPartition returns the starting partition for a fresh peek +// walk, or 0 when a starting partition is not needed (continuation +// pages already carry one in the cursor; non-partitioned queues only +// have partition 0). +// +// The fanout counter is consulted ONLY on the first page of a peek +// walk. nextReceiveFanoutStart increments the per-queue counter on +// every call regardless of whether the returned value is consumed; +// advancing it on every continuation page would perturb the same +// counter ReceiveMessage reads for partition fairness, reintroducing +// fixed-stride aliasing (Codex r2 P1: e.g. PartitionCount=4 with 3 +// peek pages between receives lands every receive on the same +// partition, starving the other three). +func (s *SQSServer) peekStartPartition(queueName string, cursor *peekCursor, meta *sqsQueueMeta) uint32 { + if cursor != nil { + return 0 + } + effective := effectivePartitionCount(meta) + if effective <= 1 { + return 0 + } + return s.nextReceiveFanoutStart(queueName, effective) +} + +// preparePeekCursor builds the effective cursor for this call. On the +// first page (cursor==nil) it stamps Generation + a rotated +// StartPartition (partitioned queues only — non-partitioned and +// perQueue-throughput FIFO queues collapse to partition 0). On a +// follow-up page it validates the stored Generation matches the +// queue's current generation AND that StartPartition / Partition are +// within [0, effectivePartitionCount(meta)); a mismatch on either +// returns ErrAdminSQSValidation. +// +// effectivePartitionCount (not meta.PartitionCount) is the +// authoritative iteration bound. perQueue FIFO mode collapses every +// MessageGroupId to partition 0 (see partitionFor / +// effectivePartitionCount), so partitions 1..N-1 are guaranteed empty +// for those queues; walking them would be pointless read +// amplification (Codex r3 P2). Keying validation off the effective +// count keeps peek aligned with the data-plane's actual partition +// usage. +// +// Bounds-check rationale: walkPeek terminates the partitioned +// rotation when `(Partition + 1) % effectivePartitionCount == +// StartPartition`. If a client supplies StartPartition outside +// [0, effectivePartitionCount), that termination condition never +// fires and the call loops ScanAt-by-ScanAt over guaranteed-empty +// partitions forever — a request-amplification DoS against the +// admin endpoint (Codex r1 P1 on PR #794). Rejecting bad cursor +// partition indices up-front closes the vector. Generation mismatch +// separately forces a front-of-stream refresh after a purge. +// +// startPartition is computed by the caller (the SQSServer method's +// peekStartPartition wrapper around nextReceiveFanoutStart) so this +// helper stays method-free for unit-testability. Non-partitioned and +// perQueue-collapsed queues pass 0. +// +// queueName is consumed to validate cursor.LastKey on continuation +// pages: a forged LastKey outside the queue's visibility-index +// prefix would otherwise let an attacker start a ScanAt at any byte +// offset in the leader's keyspace (Codex r4 P2). Continuation +// cursors are admin-supplied and signed only by base64-encoding, so +// every field must be validated against the live queue meta before +// being used as a storage cursor. +func preparePeekCursor(cursor *peekCursor, meta *sqsQueueMeta, queueName string, startPartition uint32) (*peekCursor, error) { + effective := effectivePartitionCount(meta) + if cursor == nil { + out := &peekCursor{ + V: peekCursorSchemaV1, + Generation: meta.Generation, + } + if effective > 1 { + out.StartPartition = startPartition + out.Partition = startPartition + } + return out, nil + } + if cursor.Generation != meta.Generation { + return nil, errors.Wrap(ErrAdminSQSValidation, + "admin peek: cursor is from a prior generation; restart from the front") + } + if cursor.StartPartition >= effective || cursor.Partition >= effective { + return nil, errors.Wrapf(ErrAdminSQSValidation, + "admin peek: cursor partition index out of range (StartPartition=%d, Partition=%d, max=%d)", + cursor.StartPartition, cursor.Partition, effective) + } + if len(cursor.LastKey) > 0 { + expectedPrefix := sqsMsgVisPrefixForQueueDispatch(meta, queueName, cursor.Partition, meta.Generation) + if !bytes.HasPrefix(cursor.LastKey, expectedPrefix) { + return nil, errors.Wrap(ErrAdminSQSValidation, + "admin peek: cursor LastKey is outside the queue's visibility-index prefix") + } + } + return cursor, nil +} + +// walkPeek pages the visibility index, accumulating up to limit rows +// across partitions for partitioned queues (rotated sequential scan) +// or across the single keyspace for non-partitioned queues. Returns +// the rows and the cursor to pass back on the next call (nil when the +// walk has fully completed). +func (s *SQSServer) walkPeek( + ctx context.Context, + queueName string, + meta *sqsQueueMeta, + readTS uint64, + cursor *peekCursor, + limit int, + bodyMaxBytes int, +) ([]AdminPeekedMessage, *peekCursor, error) { + now := time.Now().UnixMilli() + rows := make([]AdminPeekedMessage, 0, limit) + for { + next, exhausted, err := s.walkPeekPartition(ctx, queueName, meta, readTS, now, cursor.Partition, cursor.LastKey, limit-len(rows), bodyMaxBytes, &rows) + if err != nil { + return nil, nil, err + } + if len(rows) >= limit { + cursor.LastKey = next + return rows, cursor, nil + } + if !exhausted { + cursor.LastKey = next + return rows, cursor, nil + } + // Partition exhausted before Limit reached. effectivePartitionCount + // (not meta.PartitionCount) caps the rotation so perQueue-collapsed + // FIFO queues stop after partition 0 instead of walking N-1 + // guaranteed-empty partitions (Codex r3 P2 on PR #794). + effective := effectivePartitionCount(meta) + if effective <= 1 { + return rows, nil, nil + } + nextPart := (cursor.Partition + 1) % effective + if nextPart == cursor.StartPartition { + return rows, nil, nil + } + cursor.Partition = nextPart + cursor.LastKey = nil + } +} + +// walkPeekPartition scans the visibility index for one partition +// (legacy queues invoke this with partition=0 and the legacy +// keyspace). Returns the next cursor key (for resume), whether the +// partition was scanned to completion (`exhausted=true` means no +// further pages), and any storage error. Appends to *rows. +func (s *SQSServer) walkPeekPartition( + ctx context.Context, + queueName string, + meta *sqsQueueMeta, + readTS uint64, + nowMillis int64, + partition uint32, + lastKey []byte, + want int, + bodyMaxBytes int, + rows *[]AdminPeekedMessage, +) ([]byte, bool, error) { + if want <= 0 { + return lastKey, false, nil + } + start, end := sqsMsgVisScanBoundsDispatch(meta, queueName, partition, meta.Generation, nowMillis) + if len(lastKey) > 0 { + start = nextScanCursorAfter(lastKey) + if end != nil && bytes.Compare(start, end) > 0 { + return lastKey, true, nil + } + } + page, err := s.store.ScanAt(ctx, start, end, want, readTS) + if err != nil { + return nil, false, errors.WithStack(err) + } + if len(page) == 0 { + return lastKey, true, nil + } + for _, kvp := range page { + messageID := string(kvp.Value) + rec, ok, err := s.loadPeekMessageRecord(ctx, meta, queueName, partition, messageID, readTS) + if err != nil { + return nil, false, err + } + if !ok { + // The vis-index entry points at a data record that + // has since been deleted (e.g. between our index + // scan and the per-record GetAt). Skip rather than + // fail: a single tombstoned record is not a peek + // failure — the SPA gets a shorter page, which is + // a benign outcome. + continue + } + *rows = append(*rows, projectPeekedMessage(rec, bodyMaxBytes)) + } + lastKey = bytes.Clone(page[len(page)-1].Key) + exhausted := len(page) < want + return lastKey, exhausted, nil +} + +// loadPeekMessageRecord fetches the message record by ID at the +// caller's MVCC snapshot. Returns (nil, false, nil) when the record +// is absent — the caller treats this as a benign skip (the vis-index +// entry may point at a data record that was deleted between the +// index scan and the per-record GetAt; surfacing the absence as an +// error would fail the whole peek call over a single tombstoned row). +// +// The data-record key is partition-aware via sqsMsgDataKeyDispatch. +// Partitioned FIFO queues store the data record under the same +// partition the vis-index entry was found under; using the legacy +// sqsMsgDataKey here would silently miss every row on partitioned +// queues. +func (s *SQSServer) loadPeekMessageRecord(ctx context.Context, meta *sqsQueueMeta, queueName string, partition uint32, messageID string, readTS uint64) (*sqsMessageRecord, bool, error) { + dataKey := sqsMsgDataKeyDispatch(meta, queueName, partition, meta.Generation, messageID) + b, err := s.store.GetAt(ctx, dataKey, readTS) + if err != nil { + if errors.Is(err, store.ErrKeyNotFound) { + return nil, false, nil + } + return nil, false, errors.WithStack(err) + } + if len(b) == 0 { + return nil, false, nil + } + rec, err := decodeSQSMessageRecord(b) + if err != nil { + return nil, false, errors.WithStack(err) + } + return rec, true, nil +} + +// projectPeekedMessage maps the stored record into the wire-side +// AdminPeekedMessage, truncating the body per bodyMaxBytes and +// converting the typed attribute map. +func projectPeekedMessage(rec *sqsMessageRecord, bodyMaxBytes int) AdminPeekedMessage { + body, truncated, originalSize := truncatePeekBody(rec.Body, bodyMaxBytes) + return AdminPeekedMessage{ + MessageID: rec.MessageID, + Body: body, + BodyTruncated: truncated, + BodyOriginalSize: originalSize, + SentTimestamp: time.UnixMilli(rec.SendTimestampMillis).UTC(), + ReceiveCount: clampReceiveCountToInt32(rec.ReceiveCount), + GroupID: rec.MessageGroupId, + DeduplicationID: rec.MessageDeduplicationId, + Attributes: projectPeekedAttributes(rec.MessageAttributes), + } +} + +// clampReceiveCountToInt32 converts the stored int64 ReceiveCount to +// the wire-side int32, saturating at math.MaxInt32 rather than +// overflowing. AWS publishes ApproximateReceiveCount as a uint32 on +// the SQS API, so int32 is sufficient for every realistic queue; +// saturation is the safe fallback if a future bug or corrupted +// record somehow pushes the counter past 2 billion. +func clampReceiveCountToInt32(n int64) int32 { + const maxInt32 = int32(1<<31 - 1) + if n < 0 { + return 0 + } + if n > int64(maxInt32) { + return maxInt32 + } + return int32(n) +} + +// truncatePeekBody returns the wire-side body (truncated to +// bodyMaxBytes), the truncated flag, and the original size in bytes. +func truncatePeekBody(body []byte, bodyMaxBytes int) (string, bool, int64) { + originalSize := int64(len(body)) + if int(originalSize) <= bodyMaxBytes { + return string(body), false, originalSize + } + return string(body[:bodyMaxBytes]), true, originalSize +} + +// projectPeekedAttributes maps the stored sqsMessageAttributeValue map +// into the wire-side AdminPeekedAttribute map. Returns nil when the +// stored map is nil so the JSON encoder's omitempty drops the field +// rather than emitting "attributes":{}. The two structs share the +// same field shape, so the conversion is a direct type cast. +func projectPeekedAttributes(attrs map[string]sqsMessageAttributeValue) map[string]AdminPeekedAttribute { + if len(attrs) == 0 { + return nil + } + out := make(map[string]AdminPeekedAttribute, len(attrs)) + for name, v := range attrs { + out[name] = AdminPeekedAttribute(v) + } + return out +} diff --git a/adapter/sqs_admin_peek_test.go b/adapter/sqs_admin_peek_test.go new file mode 100644 index 00000000..2b36082a --- /dev/null +++ b/adapter/sqs_admin_peek_test.go @@ -0,0 +1,1158 @@ +package adapter + +import ( + "context" + "encoding/base64" + "net/http" + "sort" + "strconv" + "strings" + "sync/atomic" + "testing" + "time" + + "github.com/cockroachdb/errors" + json "github.com/goccy/go-json" +) + +// TestAdminPeekQueue_HappyPath sends three messages into a standard +// queue and confirms AdminPeekQueue surfaces all three with their +// bodies populated. Pins the basic wiring: meta load, vis-index scan, +// data-key follow-up read, body projection. +func TestAdminPeekQueue_HappyPath(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + node := sqsLeaderNode(t, nodes) + + queueURL := createSQSQueueForTest(t, node, "peek-basic") + for i := range 3 { + status, out := callSQS(t, node, sqsSendMessageTarget, map[string]any{ + "QueueUrl": queueURL, + "MessageBody": "body-" + strconv.Itoa(i), + }) + if status != http.StatusOK { + t.Fatalf("send #%d: %d %v", i, status, out) + } + } + + rows, nextCursor, err := node.sqsServer.AdminPeekQueue(context.Background(), fullAdminPrincipal, "peek-basic", AdminPeekMessageOptions{}) + if err != nil { + t.Fatalf("AdminPeekQueue: %v", err) + } + if len(rows) != 3 { + t.Fatalf("rows=%d want 3; rows=%+v", len(rows), rows) + } + if nextCursor != "" { + t.Fatalf("nextCursor=%q want empty (queue drained in one page)", nextCursor) + } + want := map[string]struct{}{"body-0": {}, "body-1": {}, "body-2": {}} + assertPeekRowsAsSet(t, rows, want) +} + +// assertPeekRowsAsSet pins the per-row invariants (MessageID +// populated, SentTimestamp non-zero, no truncation) AND the set of +// observed bodies, without coupling to row order. Vis-index entries +// with the same visible_at millisecond tie-break on message_id which +// is random, so an order-sensitive assertion is flaky under fast +// sends — CodeRabbit r3 caught the earlier ordered-loop version. +func assertPeekRowsAsSet(t *testing.T, rows []AdminPeekedMessage, want map[string]struct{}) { + t.Helper() + got := make(map[string]struct{}, len(rows)) + for i, row := range rows { + if row.BodyTruncated { + t.Fatalf("rows[%d].BodyTruncated=true; want false for short body", i) + } + if row.MessageID == "" { + t.Fatalf("rows[%d].MessageID empty", i) + } + if row.SentTimestamp.IsZero() { + t.Fatalf("rows[%d].SentTimestamp zero", i) + } + got[row.Body] = struct{}{} + } + for w := range want { + if _, ok := got[w]; !ok { + t.Fatalf("missing body %q in observed set %v", w, got) + } + } + if len(got) != len(want) { + t.Fatalf("observed set %v != expected set %v", got, want) + } +} + +// TestAdminPeekQueue_DoesNotChangeReceiveCount sends a message, peeks +// twice, then ReceiveMessage — the receive call must see +// ApproximateReceiveCount=1 (not 3). Pins the design's "peek is a +// pure read" invariant: the visibility timer is not started and no +// receipt-handle record is committed. +func TestAdminPeekQueue_DoesNotChangeReceiveCount(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + node := sqsLeaderNode(t, nodes) + + queueURL := createSQSQueueForTest(t, node, "peek-no-bump") + status, _ := callSQS(t, node, sqsSendMessageTarget, map[string]any{ + "QueueUrl": queueURL, + "MessageBody": "peek-me", + }) + if status != http.StatusOK { + t.Fatalf("send: %d", status) + } + + for i := range 2 { + rows, _, err := node.sqsServer.AdminPeekQueue(context.Background(), fullAdminPrincipal, "peek-no-bump", AdminPeekMessageOptions{}) + if err != nil { + t.Fatalf("peek #%d: %v", i, err) + } + if len(rows) != 1 || rows[0].ReceiveCount != 0 { + t.Fatalf("peek #%d: rows=%+v want 1 row with ReceiveCount=0", i, rows) + } + } + + status, out := callSQS(t, node, sqsReceiveMessageTarget, map[string]any{ + "QueueUrl": queueURL, + "MaxNumberOfMessages": 1, + "AttributeNames": []string{"All"}, + }) + if status != http.StatusOK { + t.Fatalf("receive: %d %v", status, out) + } + msgs, _ := out["Messages"].([]any) + if len(msgs) != 1 { + t.Fatalf("receive: msgs=%d want 1", len(msgs)) + } + m, _ := msgs[0].(map[string]any) + attrs, _ := m["Attributes"].(map[string]any) + if got, _ := attrs["ApproximateReceiveCount"].(string); got != "1" { + t.Fatalf("ApproximateReceiveCount=%q want \"1\"; peek must not bump it (attrs=%v)", got, attrs) + } +} + +// TestAdminPeekQueue_BodyTruncation sends a long body and peeks with a +// small BodyMaxBytes. The returned Body is exactly BodyMaxBytes +// long; BodyTruncated is true; BodyOriginalSize records the full +// length. +func TestAdminPeekQueue_BodyTruncation(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + node := sqsLeaderNode(t, nodes) + + queueURL := createSQSQueueForTest(t, node, "peek-trunc") + body := strings.Repeat("x", 4096) + status, _ := callSQS(t, node, sqsSendMessageTarget, map[string]any{ + "QueueUrl": queueURL, + "MessageBody": body, + }) + if status != http.StatusOK { + t.Fatalf("send: %d", status) + } + + rows, _, err := node.sqsServer.AdminPeekQueue(context.Background(), fullAdminPrincipal, "peek-trunc", AdminPeekMessageOptions{BodyMaxBytes: 256}) + if err != nil { + t.Fatalf("peek: %v", err) + } + if len(rows) != 1 { + t.Fatalf("rows=%d want 1", len(rows)) + } + if len(rows[0].Body) != 256 { + t.Fatalf("len(Body)=%d want 256", len(rows[0].Body)) + } + if !rows[0].BodyTruncated { + t.Fatalf("BodyTruncated=false want true") + } + if rows[0].BodyOriginalSize != int64(len(body)) { + t.Fatalf("BodyOriginalSize=%d want %d", rows[0].BodyOriginalSize, len(body)) + } +} + +// TestAdminPeekQueue_LimitClamping pins the [1, adminPeekMaxLimit] +// clamp + default-on-zero behaviour. With more rows than the cap in +// the queue, opts.Limit=500 must return adminPeekMaxLimit rows; +// opts.Limit=0 must return adminPeekDefaultLimit. +func TestAdminPeekQueue_LimitClamping(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + node := sqsLeaderNode(t, nodes) + + queueURL := createSQSQueueForTest(t, node, "peek-limit") + const sent = 150 + for i := range sent { + status, _ := callSQS(t, node, sqsSendMessageTarget, map[string]any{ + "QueueUrl": queueURL, + "MessageBody": "row-" + strconv.Itoa(i), + }) + if status != http.StatusOK { + t.Fatalf("send #%d: %d", i, status) + } + } + + rows, _, err := node.sqsServer.AdminPeekQueue(context.Background(), fullAdminPrincipal, "peek-limit", AdminPeekMessageOptions{}) + if err != nil { + t.Fatalf("default-limit peek: %v", err) + } + if len(rows) != adminPeekDefaultLimit { + t.Fatalf("default-limit peek: rows=%d want %d", len(rows), adminPeekDefaultLimit) + } + + rows, _, err = node.sqsServer.AdminPeekQueue(context.Background(), fullAdminPrincipal, "peek-limit", AdminPeekMessageOptions{Limit: 500}) + if err != nil { + t.Fatalf("over-limit peek: %v", err) + } + if len(rows) != adminPeekMaxLimit { + t.Fatalf("over-limit peek: rows=%d want %d (clamped)", len(rows), adminPeekMaxLimit) + } +} + +// TestAdminPeekQueue_CursorRoundTrip drains a queue across two +// pages: peek returns N rows + nextCursor; peek-with-cursor returns +// the remaining rows; their union is the full queue with no +// overlaps and the final nextCursor is empty. +func TestAdminPeekQueue_CursorRoundTrip(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + node := sqsLeaderNode(t, nodes) + + queueURL := createSQSQueueForTest(t, node, "peek-cursor") + const sent = 7 + for i := range sent { + status, _ := callSQS(t, node, sqsSendMessageTarget, map[string]any{ + "QueueUrl": queueURL, + "MessageBody": "row-" + strconv.Itoa(i), + }) + if status != http.StatusOK { + t.Fatalf("send #%d: %d", i, status) + } + } + + ctx := context.Background() + pageA, cursor := assertPeekPage(t, ctx, node, "peek-cursor", "", 4, 4, true) + pageB, cursor := assertPeekPage(t, ctx, node, "peek-cursor", cursor, 10, sent-4, false) + if cursor != "" { + t.Fatalf("final cursor=%q want empty", cursor) + } + + seen := make(map[string]int, sent) + for _, row := range append(pageA, pageB...) { + seen[row.MessageID]++ + } + if len(seen) != sent { + t.Fatalf("unique rows=%d want %d (overlap=%v)", len(seen), sent, seen) + } +} + +// assertPeekPage runs one peek call and pins (a) no error, (b) the +// expected row count, (c) whether a non-empty nextCursor is required. +// Pulled into a helper so TestAdminPeekQueue_CursorRoundTrip stays +// under the cyclop budget. +func assertPeekPage(t *testing.T, ctx context.Context, node Node, queue, cursorIn string, limit, wantRows int, wantCursor bool) ([]AdminPeekedMessage, string) { + t.Helper() + rows, cursorOut, err := node.sqsServer.AdminPeekQueue(ctx, fullAdminPrincipal, queue, AdminPeekMessageOptions{Limit: limit, Cursor: cursorIn}) + if err != nil { + t.Fatalf("peek (cursor=%q): %v", cursorIn, err) + } + if len(rows) != wantRows { + t.Fatalf("peek (cursor=%q) rows=%d want %d", cursorIn, len(rows), wantRows) + } + if wantCursor && cursorOut == "" { + t.Fatalf("peek (cursor=%q) returned empty cursor; expected continuation", cursorIn) + } + return rows, cursorOut +} + +// TestAdminPeekQueue_StaleGenerationCursor reproduces the failure +// mode the design doc §3.1 requires the cursor codec to catch: +// peek, then purge, then peek with the now-stale cursor → the +// adapter must reject with ErrAdminSQSValidation so the SPA refreshes +// from the front rather than returning rows from a purged generation. +func TestAdminPeekQueue_StaleGenerationCursor(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + node := sqsLeaderNode(t, nodes) + + queueURL := createSQSQueueForTest(t, node, "peek-stale") + for i := range 5 { + _, _ = callSQS(t, node, sqsSendMessageTarget, map[string]any{ + "QueueUrl": queueURL, + "MessageBody": "row-" + strconv.Itoa(i), + }) + } + + ctx := context.Background() + _, cursor, err := node.sqsServer.AdminPeekQueue(ctx, fullAdminPrincipal, "peek-stale", AdminPeekMessageOptions{Limit: 2}) + if err != nil { + t.Fatalf("initial peek: %v", err) + } + if cursor == "" { + t.Fatalf("cursor empty after partial page") + } + + if _, err := node.sqsServer.AdminPurgeQueue(ctx, fullAdminPrincipal, "peek-stale"); err != nil { + t.Fatalf("purge: %v", err) + } + + _, _, err = node.sqsServer.AdminPeekQueue(ctx, fullAdminPrincipal, "peek-stale", AdminPeekMessageOptions{Cursor: cursor}) + if !errors.Is(err, ErrAdminSQSValidation) { + t.Fatalf("stale-gen peek: want ErrAdminSQSValidation, got %v", err) + } +} + +// TestAdminPeekQueue_CursorMalformed pins each of the documented +// wire-shape failures (oversize, bad base64, bad JSON, unsupported +// version) onto ErrAdminSQSValidation rather than leaking the raw +// codec error. +func TestAdminPeekQueue_CursorMalformed(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + node := sqsLeaderNode(t, nodes) + _ = createSQSQueueForTest(t, node, "peek-cursor-bad") + + cases := map[string]string{ + "oversize": strings.Repeat("A", adminPeekCursorMaxBytes+1), + "not-base64": "!!!not_valid_base64!!!", + "not-json": base64.RawURLEncoding.EncodeToString([]byte("not-json")), + "wrong-version": base64.RawURLEncoding.EncodeToString([]byte(`{"v":42,"gen":1}`)), + } + for name, cursor := range cases { + t.Run(name, func(t *testing.T) { + _, _, err := node.sqsServer.AdminPeekQueue(context.Background(), fullAdminPrincipal, "peek-cursor-bad", AdminPeekMessageOptions{Cursor: cursor}) + if !errors.Is(err, ErrAdminSQSValidation) { + t.Fatalf("%s: want ErrAdminSQSValidation, got %v", name, err) + } + }) + } +} + +// TestAdminPeekQueue_ReadOnlyPrincipalAllowed confirms the canRead() +// gate accepts AdminRoleReadOnly. Peek is divergent from +// AdminPurgeQueue's canWrite() gate; this test pins the divergence so +// a future reviewer does not "fix" the inconsistency. +func TestAdminPeekQueue_ReadOnlyPrincipalAllowed(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + node := sqsLeaderNode(t, nodes) + queueURL := createSQSQueueForTest(t, node, "peek-readonly") + _, _ = callSQS(t, node, sqsSendMessageTarget, map[string]any{"QueueUrl": queueURL, "MessageBody": "x"}) + + rows, _, err := node.sqsServer.AdminPeekQueue(context.Background(), readOnlyAdminPrincipal, "peek-readonly", AdminPeekMessageOptions{}) + if err != nil { + t.Fatalf("read-only peek: %v", err) + } + if len(rows) != 1 { + t.Fatalf("read-only peek rows=%d want 1", len(rows)) + } +} + +// TestAdminPeekQueue_RoleLessForbidden pins the gate's lower bound: +// the zero AdminPrincipal (no role set) is denied. +func TestAdminPeekQueue_RoleLessForbidden(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + node := sqsLeaderNode(t, nodes) + _ = createSQSQueueForTest(t, node, "peek-noauth") + + _, _, err := node.sqsServer.AdminPeekQueue(context.Background(), AdminPrincipal{}, "peek-noauth", AdminPeekMessageOptions{}) + if !errors.Is(err, ErrAdminForbidden) { + t.Fatalf("role-less peek: want ErrAdminForbidden, got %v", err) + } +} + +// TestAdminPeekQueue_MissingQueue maps to the structured 404 sentinel. +func TestAdminPeekQueue_MissingQueue(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + node := sqsLeaderNode(t, nodes) + + _, _, err := node.sqsServer.AdminPeekQueue(context.Background(), fullAdminPrincipal, "no-such-queue", AdminPeekMessageOptions{}) + if !errors.Is(err, ErrAdminSQSNotFound) { + t.Fatalf("missing peek: want ErrAdminSQSNotFound, got %v", err) + } +} + +// TestAdminPeekQueue_EmptyName pins the up-front guard. +func TestAdminPeekQueue_EmptyName(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + node := sqsLeaderNode(t, nodes) + + for _, name := range []string{"", " "} { + _, _, err := node.sqsServer.AdminPeekQueue(context.Background(), fullAdminPrincipal, name, AdminPeekMessageOptions{}) + if !errors.Is(err, ErrAdminSQSValidation) { + t.Fatalf("name=%q: want ErrAdminSQSValidation, got %v", name, err) + } + } +} + +// TestAdminPeekQueue_DelayedMessageHidden confirms the visible_at <= +// now filter: a message with DelaySeconds > 0 does NOT appear in +// peek until its delay has elapsed. Matches receiveMessage's +// behaviour and pins the design's "currently-visible only" contract. +func TestAdminPeekQueue_DelayedMessageHidden(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + node := sqsLeaderNode(t, nodes) + + queueURL := createSQSQueueForTest(t, node, "peek-delayed") + status, _ := callSQS(t, node, sqsSendMessageTarget, map[string]any{ + "QueueUrl": queueURL, + "MessageBody": "future", + "DelaySeconds": 300, + }) + if status != http.StatusOK { + t.Fatalf("send: %d", status) + } + + rows, _, err := node.sqsServer.AdminPeekQueue(context.Background(), fullAdminPrincipal, "peek-delayed", AdminPeekMessageOptions{}) + if err != nil { + t.Fatalf("peek: %v", err) + } + if len(rows) != 0 { + t.Fatalf("rows=%d want 0 (message is delayed 5min)", len(rows)) + } +} + +// TestAdminPeekQueue_FIFO_AttributesPopulated sends to a FIFO queue +// with GroupID + DeduplicationID and confirms the peek row carries +// both. Pins the field projection. +func TestAdminPeekQueue_FIFO_AttributesPopulated(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + node := sqsLeaderNode(t, nodes) + + const name = "peek-fifo.fifo" + status, out := callSQS(t, node, sqsCreateQueueTarget, map[string]any{ + "QueueName": name, + "Attributes": map[string]string{"FifoQueue": "true"}, + }) + if status != http.StatusOK { + t.Fatalf("create fifo: %d %v", status, out) + } + queueURL, _ := out["QueueUrl"].(string) + + status, _ = callSQS(t, node, sqsSendMessageTarget, map[string]any{ + "QueueUrl": queueURL, + "MessageBody": "fifo-body", + "MessageGroupId": "tenant-7", + "MessageDeduplicationId": "dedup-1", + }) + if status != http.StatusOK { + t.Fatalf("send fifo: %d", status) + } + + rows, _, err := node.sqsServer.AdminPeekQueue(context.Background(), fullAdminPrincipal, name, AdminPeekMessageOptions{}) + if err != nil { + t.Fatalf("peek fifo: %v", err) + } + if len(rows) != 1 { + t.Fatalf("rows=%d want 1", len(rows)) + } + if rows[0].GroupID != "tenant-7" { + t.Fatalf("GroupID=%q want \"tenant-7\"", rows[0].GroupID) + } + if rows[0].DeduplicationID != "dedup-1" { + t.Fatalf("DeduplicationID=%q want \"dedup-1\"", rows[0].DeduplicationID) + } +} + +// TestAdminPeekQueue_AttributesProjected confirms typed +// MessageAttribute round-tripping: a String + a Binary attribute on +// the SendMessage call surface on the peek row with DataType / +// StringValue / BinaryValue preserved. +func TestAdminPeekQueue_AttributesProjected(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + node := sqsLeaderNode(t, nodes) + + queueURL := createSQSQueueForTest(t, node, "peek-attrs") + binaryBytes := []byte{0xDE, 0xAD, 0xBE, 0xEF} + status, _ := callSQS(t, node, sqsSendMessageTarget, map[string]any{ + "QueueUrl": queueURL, + "MessageBody": "x", + "MessageAttributes": map[string]any{ + "Source": map[string]any{"DataType": "String", "StringValue": "checkout"}, + "Blob": map[string]any{"DataType": "Binary", "BinaryValue": binaryBytes}, + }, + }) + if status != http.StatusOK { + t.Fatalf("send: %d", status) + } + + rows, _, err := node.sqsServer.AdminPeekQueue(context.Background(), fullAdminPrincipal, "peek-attrs", AdminPeekMessageOptions{}) + if err != nil { + t.Fatalf("peek: %v", err) + } + if len(rows) != 1 { + t.Fatalf("rows=%d want 1", len(rows)) + } + src, ok := rows[0].Attributes["Source"] + if !ok || src.DataType != "String" || src.StringValue != "checkout" { + t.Fatalf("Source attr=%+v want {String, checkout}", src) + } + blob, ok := rows[0].Attributes["Blob"] + if !ok || blob.DataType != "Binary" || string(blob.BinaryValue) != string(binaryBytes) { + t.Fatalf("Blob attr=%+v want {Binary, %x}", blob, binaryBytes) + } +} + +// TestAdminPeekQueue_PartitionedFIFO_Pagination installs a +// PartitionCount=4 FIFO queue, sends one message per group spread +// across all 4 partitions, then walks the peek cursor across pages +// (Limit=2 per page) until exhausted. Every sent message must +// surface exactly once across the union of pages, regardless of the +// rotated start partition. Pins the design's "rotated sequential +// scanning" contract. +func TestAdminPeekQueue_PartitionedFIFO_Pagination(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + node := sqsLeaderNode(t, nodes) + + const name = "peek-part.fifo" + status, out := callSQS(t, node, sqsCreateQueueTarget, map[string]any{ + "QueueName": name, + "Attributes": map[string]string{"FifoQueue": "true"}, + }) + if status != http.StatusOK { + t.Fatalf("create: %d %v", status, out) + } + queueURL, _ := out["QueueUrl"].(string) + installPartitionedMetaForTest(t, node, name, 4, htfifoThroughputPerMessageGroupID) + + groups := []string{"alpha", "beta", "gamma", "delta", "epsilon", "zeta", "eta", "theta"} + sent := sendFIFOMessagesForPeek(t, node, queueURL, groups) + + ctx := context.Background() + seen := walkPeekUntilDone(t, ctx, node, name, 2) + assertEveryMessageSeenOnce(t, sent, seen) +} + +// sendFIFOMessagesForPeek sends one message per group to a partitioned +// FIFO queue and returns the resulting set of message IDs. Pulled into +// a helper so TestAdminPeekQueue_PartitionedFIFO_Pagination stays +// under the cyclop budget. +func sendFIFOMessagesForPeek(t *testing.T, node Node, queueURL string, groups []string) map[string]struct{} { + t.Helper() + sent := make(map[string]struct{}, len(groups)) + for _, g := range groups { + status, out := callSQS(t, node, sqsSendMessageTarget, map[string]any{ + "QueueUrl": queueURL, + "MessageBody": "body-" + g, + "MessageGroupId": g, + "MessageDeduplicationId": "dedup-" + g, + }) + if status != http.StatusOK { + t.Fatalf("send %s: %d %v", g, status, out) + } + msgID, _ := out["MessageId"].(string) + if msgID == "" { + t.Fatalf("send %s: empty MessageId", g) + } + sent[msgID] = struct{}{} + } + return sent +} + +// walkPeekUntilDone pages through every peek result on a queue with +// the given per-page limit, returning a multi-set of how many times +// each MessageID was observed. Loop bound (32) is a safety net so a +// pagination bug terminates rather than runs forever; callers assert +// "saw every message exactly once" against this output. +func walkPeekUntilDone(t *testing.T, ctx context.Context, node Node, queue string, limit int) map[string]int { + t.Helper() + seen := make(map[string]int) + cursor := "" + for page := 0; page < 32; page++ { + rows, next, err := node.sqsServer.AdminPeekQueue(ctx, fullAdminPrincipal, queue, AdminPeekMessageOptions{Limit: limit, Cursor: cursor}) + if err != nil { + t.Fatalf("peek page %d: %v", page, err) + } + for _, row := range rows { + seen[row.MessageID]++ + } + if next == "" { + return seen + } + cursor = next + } + t.Fatalf("peek did not terminate after 32 pages; seen=%v", seen) + return seen +} + +// assertEveryMessageSeenOnce pins the multi-set invariant the +// partitioned-FIFO pagination test cares about: every sent message +// surfaced in exactly one peek page, none missed and none duplicated. +func assertEveryMessageSeenOnce(t *testing.T, sent map[string]struct{}, seen map[string]int) { + t.Helper() + if len(seen) != len(sent) { + want := make([]string, 0, len(sent)) + for id := range sent { + want = append(want, id) + } + sort.Strings(want) + got := make([]string, 0, len(seen)) + for id, count := range seen { + got = append(got, id+"="+strconv.Itoa(count)) + } + sort.Strings(got) + t.Fatalf("seen=%v want %d unique messages (sent=%v)", got, len(sent), want) + } + for id, count := range seen { + if count != 1 { + t.Fatalf("message %q seen %d times (want exactly 1)", id, count) + } + } +} + +// TestPeekCursorCodec_RoundTrip pins the encode/decode contract at +// the unit level so partition / start-partition / last-key all +// survive the base64url + JSON envelope. +func TestPeekCursorCodec_RoundTrip(t *testing.T) { + t.Parallel() + in := &peekCursor{ + V: peekCursorSchemaV1, + Generation: 42, + StartPartition: 3, + Partition: 5, + LastKey: []byte("some-last-key"), + } + encoded, err := encodePeekCursor(in) + if err != nil { + t.Fatalf("encode: %v", err) + } + if encoded == "" { + t.Fatalf("encoded cursor empty") + } + out, err := decodePeekCursor(encoded) + if err != nil { + t.Fatalf("decode: %v", err) + } + if out == nil { + t.Fatalf("decode returned nil") + } + if out.V != in.V || out.Generation != in.Generation || + out.StartPartition != in.StartPartition || out.Partition != in.Partition || + string(out.LastKey) != string(in.LastKey) { + t.Fatalf("round-trip mismatch: in=%+v out=%+v", in, out) + } +} + +// TestPeekCursorCodec_EmptyRoundTrip confirms the (nil, "") sentinel +// behaviour for "no cursor / start from front". +func TestPeekCursorCodec_EmptyRoundTrip(t *testing.T) { + t.Parallel() + encoded, err := encodePeekCursor(nil) + if err != nil { + t.Fatalf("encode(nil): %v", err) + } + if encoded != "" { + t.Fatalf("encode(nil)=%q want empty", encoded) + } + out, err := decodePeekCursor("") + if err != nil { + t.Fatalf("decode(\"\"): %v", err) + } + if out != nil { + t.Fatalf("decode(\"\")=%+v want nil", out) + } +} + +// TestPeekCursorCodec_LengthCap confirms the 512-byte hard cap by +// constructing a deliberately oversized cursor on the encode side +// (the design doc's "hard-capped at 512 bytes after encoding" rule). +// The encode path treats this as an internal regression rather than +// a client-side failure, so the test asserts an error rather than a +// truncated cursor. +func TestPeekCursorCodec_LengthCap(t *testing.T) { + t.Parallel() + bigKey := make([]byte, adminPeekCursorMaxBytes) + for i := range bigKey { + bigKey[i] = 'A' + } + _, err := encodePeekCursor(&peekCursor{V: peekCursorSchemaV1, LastKey: bigKey}) + if err == nil { + t.Fatalf("encode of oversized cursor must error; got nil") + } +} + +// TestClampPeekLimit pins each branch of the clamp function. +func TestClampPeekLimit(t *testing.T) { + t.Parallel() + cases := map[int]int{ + 0: adminPeekDefaultLimit, + -5: adminPeekDefaultLimit, + 1: 1, + adminPeekDefaultLimit: adminPeekDefaultLimit, + adminPeekMaxLimit: adminPeekMaxLimit, + adminPeekMaxLimit + 1: adminPeekMaxLimit, + 1_000_000: adminPeekMaxLimit, + } + for in, want := range cases { + if got := clampPeekLimit(in); got != want { + t.Fatalf("clampPeekLimit(%d)=%d want %d", in, got, want) + } + } +} + +// TestClampPeekBodyBytes pins each branch of the body-size clamp. +func TestClampPeekBodyBytes(t *testing.T) { + t.Parallel() + cases := map[int]int{ + 0: adminPeekDefaultBodyBytes, + -5: adminPeekDefaultBodyBytes, + 1: adminPeekMinBodyBytes, + adminPeekMinBodyBytes - 1: adminPeekMinBodyBytes, + adminPeekMinBodyBytes: adminPeekMinBodyBytes, + adminPeekDefaultBodyBytes: adminPeekDefaultBodyBytes, + sqsMaximumAllowedMaximumMessageSize: sqsMaximumAllowedMaximumMessageSize, + sqsMaximumAllowedMaximumMessageSize + 1: sqsMaximumAllowedMaximumMessageSize, + 2 << 20: sqsMaximumAllowedMaximumMessageSize, + } + for in, want := range cases { + if got := clampPeekBodyBytes(in); got != want { + t.Fatalf("clampPeekBodyBytes(%d)=%d want %d", in, got, want) + } + } +} + +// TestAdminPeekedMessage_JSONWireFormat pins the snake_case wire +// shape the design doc §3.5 specifies. Without explicit JSON tags +// the encoder emits Go-style PascalCase ("MessageID", "BodyTruncated" +// …) and the SPA's client adapter silently misparses every row +// (CodeRabbit r4 caught the regression). +func TestAdminPeekedMessage_JSONWireFormat(t *testing.T) { + t.Parallel() + in := AdminPeekedMessage{ + MessageID: "m1", + Body: "hello", + BodyTruncated: true, + BodyOriginalSize: 42, + SentTimestamp: time.UnixMilli(1_700_000_000_000).UTC(), + ReceiveCount: 3, + GroupID: "g1", + DeduplicationID: "d1", + Attributes: map[string]AdminPeekedAttribute{ + "Source": {DataType: sqsAttributeBaseTypeString, StringValue: "checkout"}, + }, + } + raw, err := json.Marshal(in) + if err != nil { + t.Fatalf("marshal: %v", err) + } + var got map[string]any + if err := json.Unmarshal(raw, &got); err != nil { + t.Fatalf("unmarshal: %v", err) + } + wantKeys := []string{ + "message_id", "body", "body_truncated", "body_original_size", + "sent_timestamp", "receive_count", "group_id", "deduplication_id", + "attributes", + } + for _, k := range wantKeys { + if _, ok := got[k]; !ok { + t.Fatalf("missing key %q in wire JSON; got=%v", k, got) + } + } + // PascalCase keys must NOT appear. + for _, k := range []string{"MessageID", "Body", "BodyTruncated", "GroupID", "Attributes"} { + if _, ok := got[k]; ok { + t.Fatalf("unwanted PascalCase key %q in wire JSON; got=%v", k, got) + } + } + // Nested AdminPeekedAttribute also uses snake_case. + attrs, _ := got["attributes"].(map[string]any) + src, _ := attrs["Source"].(map[string]any) + if _, ok := src["data_type"]; !ok { + t.Fatalf("Source attribute missing 'data_type' key; got=%v", src) + } +} + +// TestAdminPeekedMessage_JSONOmitsEmptyAttributes pins the +// omitempty contract: empty Attributes / GroupID / DeduplicationID +// must NOT appear on the wire (the SPA renders the field's absence, +// not a "null" or "{}" placeholder). +func TestAdminPeekedMessage_JSONOmitsEmptyAttributes(t *testing.T) { + t.Parallel() + in := AdminPeekedMessage{MessageID: "m1", Body: "x"} + raw, err := json.Marshal(in) + if err != nil { + t.Fatalf("marshal: %v", err) + } + var got map[string]any + if err := json.Unmarshal(raw, &got); err != nil { + t.Fatalf("unmarshal: %v", err) + } + for _, k := range []string{"group_id", "deduplication_id", "attributes"} { + if _, ok := got[k]; ok { + t.Fatalf("empty %q must be omitted from wire JSON; got=%v", k, got) + } + } +} + +// TestPreparePeekCursor_FreshCursor confirms the first-call cursor +// stamps Generation and (for partitioned queues) StartPartition. +func TestPreparePeekCursor_FreshCursor(t *testing.T) { + t.Parallel() + + t.Run("non-partitioned", func(t *testing.T) { + t.Parallel() + meta := &sqsQueueMeta{Generation: 7} + out, err := preparePeekCursor(nil, meta, "test", 0) + if err != nil { + t.Fatalf("err: %v", err) + } + if out.V != peekCursorSchemaV1 || out.Generation != 7 || out.StartPartition != 0 || out.Partition != 0 { + t.Fatalf("cursor=%+v want {V:1 Gen:7 SP:0 P:0}", out) + } + }) + + t.Run("partitioned", func(t *testing.T) { + t.Parallel() + meta := &sqsQueueMeta{Generation: 11, PartitionCount: 4} + out, err := preparePeekCursor(nil, meta, "test", 2) + if err != nil { + t.Fatalf("err: %v", err) + } + if out.StartPartition != 2 || out.Partition != 2 { + t.Fatalf("cursor=%+v want StartPartition=Partition=2", out) + } + }) +} + +// TestPreparePeekCursor_GenerationMismatch confirms a cursor from a +// prior generation is rejected as ErrAdminSQSValidation. +func TestPreparePeekCursor_GenerationMismatch(t *testing.T) { + t.Parallel() + stale := &peekCursor{V: peekCursorSchemaV1, Generation: 3} + meta := &sqsQueueMeta{Generation: 4} + _, err := preparePeekCursor(stale, meta, "test", 0) + if !errors.Is(err, ErrAdminSQSValidation) { + t.Fatalf("want ErrAdminSQSValidation, got %v", err) + } +} + +// TestPreparePeekCursor_PartitionOutOfRange pins the Codex r1 P1 +// fix: a cursor with StartPartition or Partition outside +// [0, max(PartitionCount, 1)) must be rejected with +// ErrAdminSQSValidation BEFORE the walk runs. Without the bounds +// check, walkPeek's `nextPart == cursor.StartPartition` termination +// condition never fires for an out-of-range StartPartition, looping +// ScanAt forever (request-amplification DoS against the admin +// endpoint). +func TestPreparePeekCursor_PartitionOutOfRange(t *testing.T) { + t.Parallel() + pc4 := &sqsQueueMeta{Generation: 1, PartitionCount: 4} + pc0 := &sqsQueueMeta{Generation: 1, PartitionCount: 0} + pc1 := &sqsQueueMeta{Generation: 1, PartitionCount: 1} + // foreignKey / mismatchedKey share a "first byte not in the + // admin prefix" property with "aaaa" so the test exercises all + // three forged-LastKey classes the bounds check catches. + foreignKey := append(sqsMsgVisPrefixForQueueDispatch(pc4, "queue-X", 0, 1), 'X') + mismatchedKey := append(sqsMsgVisPrefixForQueueDispatch(pc4, "queue-A", 2, 1), 'X') + + cases := []struct { + name string + meta *sqsQueueMeta + queue string + cursor *peekCursor + }{ + {"partitioned: StartPartition >= PartitionCount", pc4, "queue-A", + &peekCursor{V: peekCursorSchemaV1, Generation: 1, StartPartition: 99, Partition: 0}}, + {"partitioned: Partition >= PartitionCount", pc4, "queue-A", + &peekCursor{V: peekCursorSchemaV1, Generation: 1, StartPartition: 0, Partition: 99}}, + {"non-partitioned: non-zero StartPartition", pc0, "queue-A", + &peekCursor{V: peekCursorSchemaV1, Generation: 1, StartPartition: 1, Partition: 0}}, + {"non-partitioned: non-zero Partition", pc1, "queue-A", + &peekCursor{V: peekCursorSchemaV1, Generation: 1, StartPartition: 0, Partition: 5}}, + {"forged LastKey outside partition prefix (Codex r4 P2)", pc4, "queue-A", + &peekCursor{V: peekCursorSchemaV1, Generation: 1, StartPartition: 0, Partition: 0, LastKey: []byte("aaaa")}}, + {"LastKey from a different queue", pc4, "queue-A", + &peekCursor{V: peekCursorSchemaV1, Generation: 1, StartPartition: 0, Partition: 0, LastKey: foreignKey}}, + {"LastKey from a different partition", pc4, "queue-A", + &peekCursor{V: peekCursorSchemaV1, Generation: 1, StartPartition: 0, Partition: 0, LastKey: mismatchedKey}}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + _, err := preparePeekCursor(tc.cursor, tc.meta, tc.queue, 0) + if !errors.Is(err, ErrAdminSQSValidation) { + t.Fatalf("%s: want ErrAdminSQSValidation, got %v", tc.name, err) + } + }) + } + + t.Run("partitioned: in-range cursor accepted", func(t *testing.T) { + t.Parallel() + cursor := &peekCursor{V: peekCursorSchemaV1, Generation: 1, StartPartition: 3, Partition: 2} + out, err := preparePeekCursor(cursor, pc4, "queue-A", 0) + if err != nil { + t.Fatalf("in-range cursor: got err %v want nil", err) + } + if out.StartPartition != 3 || out.Partition != 2 { + t.Fatalf("in-range cursor: got %+v want StartPartition=3 Partition=2", out) + } + }) +} + +// loadFanoutCounter retrieves the per-queue receive fanout counter +// the server's receive path uses for partition rotation. Returns the +// counter or fails the test if it is absent / has an unexpected +// type. Pulled into a helper so callers that read-then-compare the +// counter value stay under the cyclop budget. +func loadFanoutCounter(t *testing.T, node Node, queueName string) *atomic.Uint32 { + t.Helper() + v, ok := node.sqsServer.receiveFanoutCounters.Load(queueName) + if !ok { + t.Fatalf("fanout counter missing for queue %q (bump-on-first-page contract broken)", queueName) + } + counter, ok := v.(*atomic.Uint32) + if !ok || counter == nil { + t.Fatalf("fanout counter for %q has unexpected type %T (want *atomic.Uint32)", queueName, v) + } + return counter +} + +// TestAdminPeekQueue_ContinuationDoesNotBumpFanoutCounter pins Codex +// r2 P1: AdminPeekQueue must NOT advance the shared +// receiveFanoutCounters counter on continuation pages. The counter +// is consumed by ReceiveMessage's partition-rotation; if peek +// pagination bumps it on every page, a sustained backlog with small +// MaxNumberOfMessages can repeatedly start receives at the same +// partition (fixed-stride aliasing — e.g. PartitionCount=4 + 3 +// peek pages between receives lands every receive on partition 0, +// starving 1-3). Only the FIRST page should advance the counter. +func TestAdminPeekQueue_ContinuationDoesNotBumpFanoutCounter(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + node := sqsLeaderNode(t, nodes) + + const name = "peek-fanout.fifo" + status, out := callSQS(t, node, sqsCreateQueueTarget, map[string]any{ + "QueueName": name, + "Attributes": map[string]string{"FifoQueue": "true"}, + }) + if status != http.StatusOK { + t.Fatalf("create: %d %v", status, out) + } + queueURL, _ := out["QueueUrl"].(string) + installPartitionedMetaForTest(t, node, name, 4, htfifoThroughputPerMessageGroupID) + // Send across distinct groups so a Limit=2 peek must paginate. + _ = sendFIFOMessagesForPeek(t, node, queueURL, []string{"a", "b", "c", "d", "e", "f"}) + + ctx := context.Background() + rows, cursor, err := node.sqsServer.AdminPeekQueue(ctx, fullAdminPrincipal, name, AdminPeekMessageOptions{Limit: 2}) + if err != nil { + t.Fatalf("first peek: %v", err) + } + if len(rows) == 0 || cursor == "" { + t.Fatalf("first peek: rows=%d cursor=%q — need both populated to exercise continuation", len(rows), cursor) + } + + counter := loadFanoutCounter(t, node, name) + before := counter.Load() + + // Drive 5 continuation calls; each must not move the counter. + for i := 0; i < 5; i++ { + _, next, err := node.sqsServer.AdminPeekQueue(ctx, fullAdminPrincipal, name, AdminPeekMessageOptions{Limit: 2, Cursor: cursor}) + if err != nil { + t.Fatalf("continuation peek #%d: %v", i, err) + } + if next == "" { + break + } + cursor = next + } + after := counter.Load() + if after != before { + t.Fatalf("continuation peeks bumped fanout counter from %d to %d (expected unchanged)", before, after) + } +} + +// TestAdminPeekQueue_PerQueueFIFOCollapsesToOnePartition pins Codex +// r3 P2: a partitioned FIFO queue in perQueue throughput mode +// collapses every MessageGroupId to partition 0 (see +// effectivePartitionCount). Peek must align with the data plane: it +// walks only the effective partitions, not the raw PartitionCount +// count, so a PartitionCount=4 perQueue queue does not waste 3 +// guaranteed-empty ScanAt calls on partitions 1..3 per peek call. +// +// Functional assertion: peek succeeds end-to-end on a perQueue queue +// and surfaces every sent message. The optimization itself is +// internal — a follow-up that re-introduces the meta.PartitionCount +// rotation would still pass this assertion but the (Partition+1) % +// effective termination encoded in walkPeek would still cap the +// scan, so the regression vector is closed by the cursor codec +// validation (effective bounds) being the chokepoint. +func TestAdminPeekQueue_PerQueueFIFOCollapsesToOnePartition(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + node := sqsLeaderNode(t, nodes) + + const name = "peek-perqueue.fifo" + status, out := callSQS(t, node, sqsCreateQueueTarget, map[string]any{ + "QueueName": name, + "Attributes": map[string]string{"FifoQueue": "true"}, + }) + if status != http.StatusOK { + t.Fatalf("create: %d %v", status, out) + } + queueURL, _ := out["QueueUrl"].(string) + installPartitionedMetaForTest(t, node, name, 4, htfifoThroughputPerQueue) + + groups := []string{"alpha", "beta", "gamma"} + sent := sendFIFOMessagesForPeek(t, node, queueURL, groups) + seen := walkPeekUntilDone(t, context.Background(), node, name, 10) + assertEveryMessageSeenOnce(t, sent, seen) +} + +// TestPreparePeekCursor_PerQueueCollapse confirms the cursor codec's +// validation key off effectivePartitionCount, not the raw +// meta.PartitionCount. A perQueue queue with PartitionCount=4 must +// reject any cursor with Partition > 0 (the effective bound is 1) +// even though meta.PartitionCount itself is 4. +func TestPreparePeekCursor_PerQueueCollapse(t *testing.T) { + t.Parallel() + + t.Run("fresh cursor on perQueue stamps StartPartition=0", func(t *testing.T) { + t.Parallel() + meta := &sqsQueueMeta{Generation: 1, PartitionCount: 4, FifoThroughputLimit: htfifoThroughputPerQueue} + out, err := preparePeekCursor(nil, meta, "test", 2) + if err != nil { + t.Fatalf("err: %v", err) + } + if out.StartPartition != 0 || out.Partition != 0 { + t.Fatalf("perQueue fresh cursor=%+v want StartPartition=Partition=0 (effective=1)", out) + } + }) + + t.Run("perQueue rejects cursor.Partition >= 1", func(t *testing.T) { + t.Parallel() + meta := &sqsQueueMeta{Generation: 1, PartitionCount: 4, FifoThroughputLimit: htfifoThroughputPerQueue} + cursor := &peekCursor{V: peekCursorSchemaV1, Generation: 1, StartPartition: 0, Partition: 2} + _, err := preparePeekCursor(cursor, meta, "test", 0) + if !errors.Is(err, ErrAdminSQSValidation) { + t.Fatalf("perQueue Partition=2 (raw PartitionCount=4 would allow): want ErrAdminSQSValidation, got %v", err) + } + }) +} + +// TestAdminPeekQueue_HostileCursorBoundedRequest is the end-to-end +// regression: an attacker crafts a wire-level cursor with an +// out-of-range partition and submits it. The call MUST return +// ErrAdminSQSValidation (not loop forever). The test runs with a +// short deadline so a regression terminates the test run rather +// than blocking CI. +func TestAdminPeekQueue_HostileCursorBoundedRequest(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + node := sqsLeaderNode(t, nodes) + + const name = "hostile.fifo" + status, out := callSQS(t, node, sqsCreateQueueTarget, map[string]any{ + "QueueName": name, + "Attributes": map[string]string{"FifoQueue": "true"}, + }) + if status != http.StatusOK { + t.Fatalf("create: %d %v", status, out) + } + installPartitionedMetaForTest(t, node, name, 4, htfifoThroughputPerMessageGroupID) + + // Encode a cursor that names a partition far outside [0, 4). + hostile, err := encodePeekCursor(&peekCursor{V: peekCursorSchemaV1, Generation: 1, StartPartition: 999, Partition: 999}) + if err != nil { + t.Fatalf("encode hostile cursor: %v", err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + _, _, err = node.sqsServer.AdminPeekQueue(ctx, fullAdminPrincipal, name, AdminPeekMessageOptions{Cursor: hostile}) + if !errors.Is(err, ErrAdminSQSValidation) { + t.Fatalf("hostile cursor: want ErrAdminSQSValidation, got %v", err) + } +} + +// TestEncodePeekCursor_DecodesAsValidJSON spot-checks that the +// base64url envelope unwraps to recognisable JSON keys (so a SPA +// debugging session can inspect the cursor without the wire format +// being a black box). +func TestEncodePeekCursor_DecodesAsValidJSON(t *testing.T) { + t.Parallel() + encoded, err := encodePeekCursor(&peekCursor{V: peekCursorSchemaV1, Generation: 9, LastKey: []byte("k")}) + if err != nil { + t.Fatalf("encode: %v", err) + } + raw, err := base64.RawURLEncoding.DecodeString(encoded) + if err != nil { + t.Fatalf("base64 decode: %v", err) + } + var m map[string]any + if err := json.Unmarshal(raw, &m); err != nil { + t.Fatalf("json unmarshal: %v", err) + } + if _, ok := m["v"]; !ok { + t.Fatalf("decoded JSON missing 'v': %v", m) + } + if _, ok := m["gen"]; !ok { + t.Fatalf("decoded JSON missing 'gen': %v", m) + } +} + +// TestProjectPeekedAttributes_NilSafeAndTyped confirms the +// projection preserves DataType and binary payloads, and that an +// empty input yields nil (so JSON omitempty drops the field). +func TestProjectPeekedAttributes_NilSafeAndTyped(t *testing.T) { + t.Parallel() + + if got := projectPeekedAttributes(nil); got != nil { + t.Fatalf("nil input: got %v want nil", got) + } + if got := projectPeekedAttributes(map[string]sqsMessageAttributeValue{}); got != nil { + t.Fatalf("empty input: got %v want nil", got) + } + + in := map[string]sqsMessageAttributeValue{ + "A": {DataType: "String", StringValue: "foo"}, + "B": {DataType: "Binary", BinaryValue: []byte{1, 2, 3}}, + } + out := projectPeekedAttributes(in) + if out["A"].DataType != "String" || out["A"].StringValue != "foo" { + t.Fatalf("A=%+v want {String, foo}", out["A"]) + } + if out["B"].DataType != "Binary" || string(out["B"].BinaryValue) != string([]byte{1, 2, 3}) { + t.Fatalf("B=%+v want {Binary, [1 2 3]}", out["B"]) + } +} + +// TestAdminRole_CanRead pins the new canRead() gate's truth table. +func TestAdminRole_CanRead(t *testing.T) { + t.Parallel() + cases := map[AdminRole]bool{ + "": false, + AdminRoleReadOnly: true, + AdminRoleFull: true, + "bogus": false, + } + for role, want := range cases { + if got := role.canRead(); got != want { + t.Fatalf("AdminRole(%q).canRead()=%v want %v", role, got, want) + } + } +} + +// silence "imported and not used" when only fixtures are referenced. +var _ = time.Now diff --git a/adapter/sqs_messages.go b/adapter/sqs_messages.go index fbcbe099..368f5819 100644 --- a/adapter/sqs_messages.go +++ b/adapter/sqs_messages.go @@ -1721,6 +1721,8 @@ const ( // the Binary base type; suffix-extended forms ("Binary.gzipped") // share the same type byte. sqsAttributeBaseTypeBinary = "Binary" + sqsAttributeBaseTypeString = "String" + sqsAttributeBaseTypeNumber = "Number" // sqsAttributeTransportByteString applies to String and Number; // sqsAttributeTransportByteBinary applies to Binary. sqsAttributeTransportByteString = byte(0x01) @@ -1845,7 +1847,7 @@ func validateOneMessageAttribute(name string, v sqsMessageAttributeValue) error // function stays under the cyclop budget. func validateMessageAttributeValuePair(name, base string, v sqsMessageAttributeValue) error { switch base { - case "String", "Number": + case sqsAttributeBaseTypeString, sqsAttributeBaseTypeNumber: if v.StringValue == "" { return newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue, "MessageAttribute "+name+" requires StringValue")