Skip to content

feat(go): Add PollMessagesInto#3499

Open
matanper wants to merge 10 commits into
apache:masterfrom
matanper:go/response-buffer
Open

feat(go): Add PollMessagesInto#3499
matanper wants to merge 10 commits into
apache:masterfrom
matanper:go/response-buffer

Conversation

@matanper

@matanper matanper commented Jun 16, 2026

Copy link
Copy Markdown
Contributor

Which issue does this PR address?

Closes #3452

Rationale

PollMessages allocated a fresh []byte for every response body. For consumers polling at high rates this is the largest remaining source of allocation churn on the read side.

What changed?

Each PollMessages call passed through do()sendWireAndFetchResponse()make([]byte, length), allocating a new buffer on every RPC regardless of message size.

PollMessagesInto lets the caller supply a reusable buffer that grows on demand and is returned for the next call. Once the buffer reaches steady-state capacity, the response-body read is zero-alloc. PollMessages delegates to PollMessagesInto(nil) so existing callers are unchanged. The same delegation pattern is applied down the stack: dodoInto, sendWireAndFetchResponsesendWireAndFetchResponseInto, sendLockedsendLockedInto.

Local Execution

  • Passed
  • Pre-commit hooks ran

AI Usage

  1. Claude
  2. Entire implementation
  3. Ran it against iggy server
  4. Yes

@matanper matanper changed the title Go/response buffer feat(go): Add PollMessagesInto Jun 16, 2026
@matanper matanper marked this pull request as ready for review June 16, 2026 09:47
@github-actions github-actions Bot added the S-waiting-on-review PR is waiting on a reviewer label Jun 16, 2026
@codecov

codecov Bot commented Jun 16, 2026

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 80.00000% with 7 lines in your changes missing coverage. Please review.
✅ Project coverage is 74.77%. Comparing base (4a48008) to head (98eb5ff).

Files with missing lines Patch % Lines
foreign/go/client/tcp/tcp_core.go 77.77% 5 Missing and 1 partial ⚠️
foreign/go/client/tcp/tcp_messaging.go 87.50% 1 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master    #3499      +/-   ##
============================================
+ Coverage     74.27%   74.77%   +0.50%     
  Complexity      937      937              
============================================
  Files          1259     1248      -11     
  Lines        125969   122533    -3436     
  Branches     101644    99003    -2641     
============================================
- Hits          93558    91627    -1931     
+ Misses        29396    27945    -1451     
+ Partials       3015     2961      -54     
Components Coverage Δ
Rust Core 75.91% <ø> (+0.75%) ⬆️
Java SDK 58.57% <ø> (ø)
C# SDK 69.82% <ø> (-2.29%) ⬇️
Python SDK 88.88% <ø> (ø)
PHP SDK 83.57% <ø> (-0.72%) ⬇️
Node SDK 91.35% <ø> (+0.12%) ⬆️
Go SDK 40.45% <80.00%> (+0.08%) ⬆️
Files with missing lines Coverage Δ
foreign/go/client/tcp/tcp_messaging.go 87.09% <87.50%> (+1.91%) ⬆️
foreign/go/client/tcp/tcp_core.go 57.93% <77.77%> (+0.68%) ⬆️

... and 96 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@chengxilo

chengxilo commented Jun 16, 2026

Copy link
Copy Markdown
Contributor

The readInto/doInto may make sense but I don't think it's a good idea to provide a PollMessagesInto. I'm not a fan of this kind of change. If I'm not mistaken, it require the caller to manually manage the buffer and I don't think it actually worth it.

@matanper

matanper commented Jun 17, 2026

Copy link
Copy Markdown
Contributor Author

As I understand, high performance is central to what Iggy offers. Why not give advanced users an opt-in way to maximize it?

@chengxilo

Copy link
Copy Markdown
Contributor

As I understand, high performance is central to what Iggy offers. Why not give advanced users an opt-in way to maximize it?

I agree performance matters, but I don't think exposing buffer management to callers is the right way to achieve it. No major SDK I've seen asks users to manually pass buffers for RPC calls.

We should do it internally (e.g. a pool or a reusable buffer inside the client) so callers get the benefit transparently. The internal refactoring in this PR could still be useful tho.

@matanper

Copy link
Copy Markdown
Contributor Author

Sure, I'm open to any suggestions how to do it better.

IIUC an internal buffer approach doesn't work here because deserialization is zero-copy.
So as long as the caller holds the returned *PolledMessage, the buffer is live. There's no hook to know when to reclaim it — no destructor, no finalizer worth trusting, no Release() call (which would impose more discipline on the caller than PollMessagesInto does, with less compile-time enforcement).

I can give an example of fasthttp which does this:

// Get returns the status code and body of url.
//
// The contents of dst will be replaced by the body and returned, if the dst
// is too small a new slice will be allocated.
//
// The function follows redirects. Use Do* for manually handling redirects.
func Get(dst []byte, url string) (statusCode int, body []byte, err error) {
	return defaultClient.Get(dst, url)
}

@chengxilo

chengxilo commented Jun 18, 2026

Copy link
Copy Markdown
Contributor

Sure, I'm open to any suggestions how to do it better.

IIUC an internal buffer approach doesn't work here because deserialization is zero-copy. So as long as the caller holds the returned *PolledMessage, the buffer is live. There's no hook to know when to reclaim it — no destructor, no finalizer worth trusting, no Release() call (which would impose more discipline on the caller than PollMessagesInto does, with less compile-time enforcement).

I can give an example of fasthttp which does this:

// Get returns the status code and body of url.
//
// The contents of dst will be replaced by the body and returned, if the dst
// is too small a new slice will be allocated.
//
// The function follows redirects. Use Do* for manually handling redirects.
func Get(dst []byte, url string) (statusCode int, body []byte, err error) {
	return defaultClient.Get(dst, url)
}

Since this is zero-copy, if the caller modifies the payload in a polled message (or reuses the buffer), it silently corrupts messages returned by earlier calls. The only guard is a doc comment -- easy to miss, and nothing in the type system prevents it.

Beyond that, only PollMessages gets the buffer-reuse variant. If the goal is avoiding response allocations, we would hope the same argument applies to every API call. Offering it for one method makes the surface inconsistent. But if we offer the buffer-reuse variant to other methods, some of them need to pay attention to the zero-copy silent corruption while some doesn't, it need the user to understand everything behind the Go SDK.+

I benchmarked the new xxxxInto function, it's true that it improves the allocation, and in some cases it halved the cost, but compare to the net I/O, it's a very small improvement compare to what caller lose.

Also, there are many features that is supported by Rust but not supported by Go, we can pay more attention to fill the gap instead.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the server returns an empty body (length <= 1), sendLockedInto returns buf unchanged -- still holding data from the previous call. DeserializeFetchMessagesResponse then re-parses that stale data and returns ghost messages. The old code returned []byte{} here. I think you need return buf[:0], nil to preserve the reusable backing array without leaking old contents.

I reproduced this locally: first poll returns 1 message, second poll has an empty server response, but caller still sees 1 stale message from the first call.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed, added regression test

Comment on lines +119 to +129
PollMessagesInto(
ctx context.Context,
streamId Identifier,
topicId Identifier,
consumer Consumer,
strategy PollingStrategy,
count uint32,
autoCommit bool,
partitionId *uint32,
buf []byte,
) (*PolledMessage, []byte, error)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding PollMessagesInto to the Client interface is a compile-breaking change for anyone implementing or mocking it (test doubles, alternative transports). Would it make sense to keep this as a concrete method on IggyTcpClient only, or put it on a separate optional interface like BufferedPoller that callers can type-assert to?

@github-actions github-actions Bot added S-waiting-on-author PR is waiting on author response and removed S-waiting-on-review PR is waiting on a reviewer labels Jun 18, 2026
@atharvalade

Copy link
Copy Markdown
Contributor

I agree with @chengxilo here. I tried reproducing the zero-copy footgun and hit a concrete bug: if the server ever returns an empty body (length <= 1), the caller gets ghost messages from the previous poll because sendLockedInto hands back the stale buffer as-is. That's exactly the kind of silent corruption that a doc comment won't save you from. The internal refactoring (doInto/sendLockedInto) is solid work, but exposing the buffer to callers adds a foot-shotgun for marginal gain over net I/O cost.

@matanper

Copy link
Copy Markdown
Contributor Author

@chengxilo @atharvalade OK I understand your view 👍

The internal refactoring (doInto/sendLockedInto) is solid work

I'm not sure what the value of the internal refactoring without PollMessagesInto, it is the only caller of the new Into variants

@chengxilo

Copy link
Copy Markdown
Contributor

@chengxilo @atharvalade OK I understand your view 👍

The internal refactoring (doInto/sendLockedInto) is solid work

I'm not sure what the value of the internal refactoring without PollMessagesInto, it is the only caller of the new Into variants

Yeah, I believe so. We are not going to reuse the buffer for response anyway so all the xxxInto method won't be useful.

@hubcio

hubcio commented Jun 19, 2026

Copy link
Copy Markdown
Contributor

TBH i'm fine with the optimization idea proposed by @matanper, but i'd like to see a full-blown approach (with design approved by current golang folks: @chengxilo @atharvalade) to the problem, not just polled messages. we don't even have reuseable buffers in Rust SDK, so it would be interesting to see. there are quite a few challenges with the API itself, and even in Rust safety is hard. can't imagine that in go :)

@chengxilo

chengxilo commented Jun 19, 2026

Copy link
Copy Markdown
Contributor

TBH i'm fine with the optimization idea proposed by @matanper, but i'd like to see a full-blown approach (with design approved by current golang folks: @chengxilo @atharvalade) to the problem, not just polled messages. we don't even have reuseable buffers in Rust SDK, so it would be interesting to see. there are quite a few challenges with the API itself, and even in Rust safety is hard. can't imagine that in go :)

Yeah I believe if we can make it safe and don't introduce complexity, it would be an amazing improvement. Re-use the buffer is possible, but we have to remove the zero-copy logic in poll-message response deserialization method so caller will not be able to poision the byte slice we are re-using. Then we can provide a sync.Pool to protect the byte slice we re-use, make sure they won't be used by other RPC during the process of reading from TCP connect and deserialization.

I think with this approach, the buf []bytes won't be necessary, and we can apply this to all methods in the SDK. Theoratically, it won't cause regression to PollMessages since while we give up zero-copy, we avoid the allocation of a new bytes slice for the response from PollMessages. And for other methods it is pure improvement.

@matanper what do you think? I just come up with this idea like a few minutes ago, to be honest, I'm not sure if it actually works.

@matanper

matanper commented Jun 19, 2026

Copy link
Copy Markdown
Contributor Author

As I see it the rest of the SDK functions (GetStats, CreateStream, GetTopic, etc.) are called occasionally and are not performance sensitive.
So if we will pool the response buffer for all methods, but keep allocating for PollMessages I don't think the added complexity worth the gains

@chengxilo

chengxilo commented Jun 19, 2026

Copy link
Copy Markdown
Contributor

As I see it the rest of the SDK functions (GetStats, CreateStream, GetTopic, etc.) are called occasionally and are not performance sensitive. So if we will pool the response buffer for all methods, but keep allocating for PollMessages I don't think the added complexity worth the gains

It's totally fine if we can handle the deserialization of all the response type in one single function. For example, we can implment the do method like this (It need us to implement the UnmashalBinary for each response type, which is actually planned):

// do sends the command and returns the response body. Commands implementing
// the appender interface encode directly into a pooled buffer.
func (c *IggyTcpClient) do(ctx context.Context, cmd command.Command, resp encoding.BinaryUnmarshaler) error {
	bp := acquireRequestBuf()
	buf, err := encodeWireRequest(*bp, cmd)
	if err != nil {
		releaseRequestBuf(bp)
		return err
	}
	*bp = buf

	// we can use a re-usable buffer to receive it to improve performance.
	respBuf, err := c.sendWireAndFetchResponse(ctx, buf)
	releaseRequestBuf(bp)

	return resp.UnmarshalBinary(respBuf)
}

To support it we need to provide UnmarshalBinary function for requests. Example:

Update the PollMessages in iggcon package (I basically copied the logic from binary_serialization)

type PolledMessage struct {
	// This field should be set before unmarshalling the message.
	// It indicates the compression algorithm used for the message payload.
	Compression IggyMessageCompression
	PartitionId uint32
	CurrentOffset      uint64
	MessageCount       uint32
	Messages           []IggyMessage
}

func (m *PolledMessage) UnmarshalBinary(data []byte) error {
	if len(data) == 0 {
		return nil
	}

	length := len(data)
	m.PartitionId  = binary.LittleEndian.Uint32(data[0:4])
	m.CurrentOffset = binary.LittleEndian.Uint64(data[4:12])
	m.MessageCount = binary.LittleEndian.Uint32(data[12:16])
	position := 16
	var messages = make([]IggyMessage, 0)
	for position < length {
		if position+MessageHeaderSize >= length {
			// body needs to be at least 1 byte
			break
		}
		header, err := MessageHeaderFromBytes(data[position : position+MessageHeaderSize])
		if err != nil {
			return err
		}
		position += MessageHeaderSize
		payloadEnd := position + int(header.PayloadLength)
		if payloadEnd > length {
			break
		}
		payloadSlice := data[position:payloadEnd]
		position = payloadEnd

		var userHeaders []byte = nil
		if header.UserHeaderLength > 0 {
			userHeaders = data[position : position+int(header.UserHeaderLength)]
		}
		position += int(header.UserHeaderLength)

		switch m.Compression {
		case MESSAGE_COMPRESSION_S2, MESSAGE_COMPRESSION_S2_BETTER, MESSAGE_COMPRESSION_S2_BEST:
			if length < 32 {
				break
			}
			payloadSlice, err = s2.Decode(nil, payloadSlice)
			if err != nil {
				return fmt.Errorf("failed to decode s2 payload: %w", err)
			}
		}

		m.Messages = append(messages, IggyMessage{
			Header:      *header,
			Payload:     payloadSlice,
			UserHeaders: userHeaders,
		})
	}

	// !TODO: Add message offset ordering
	return nil
} 

Caller just need to provide a pointer of the response type they want here. Example:

func (c *IggyTcpClient) PollMessages(
	ctx context.Context,
	streamId iggcon.Identifier,
	topicId iggcon.Identifier,
	consumer iggcon.Consumer,
	strategy iggcon.PollingStrategy,
	count uint32,
	autoCommit bool,
	partitionId *uint32,
) (*iggcon.PolledMessage, error) {
	resp := &iggcon.PolledMessage{
		// Provide the compression to the deserializer so that it can decompress the messages if needed
		MessageCompression: c.MessageCompression,
	}
	if err := c.do(ctx, &command.PollMessages{
		StreamId:    streamId,
		TopicId:     topicId,
		Consumer:    consumer,
		AutoCommit:  autoCommit,
		Strategy:    strategy,
		Count:       count,
		PartitionId: partitionId,
	}, resp); err != nil {
		return nil, err
	}

	return resp, nil
}

I write these examples manually, so it's error-prone. But they should be enough to describe my idea.

@matanper What is your opinion to this approach. It will, instead of introduce complexity, make the code more clear since the deserialization functions for each different type will be replaced by standarized encoder.Unmarshaller interface.

It may need an extra PR to implement the Marshaller and get rid of binary_deserialization directory, since this is a huge change.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

S-waiting-on-author PR is waiting on author response

Projects

None yet

Development

Successfully merging this pull request may close these issues.

feat(go): response body buffer allocates fresh on every poll

4 participants