feat(go): Add PollMessagesInto#3499
Conversation
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #3499 +/- ##
============================================
+ Coverage 74.27% 74.77% +0.50%
Complexity 937 937
============================================
Files 1259 1248 -11
Lines 125969 122533 -3436
Branches 101644 99003 -2641
============================================
- Hits 93558 91627 -1931
+ Misses 29396 27945 -1451
+ Partials 3015 2961 -54
🚀 New features to boost your workflow:
|
|
The |
|
As I understand, high performance is central to what Iggy offers. Why not give advanced users an opt-in way to maximize it? |
I agree performance matters, but I don't think exposing buffer management to callers is the right way to achieve it. No major SDK I've seen asks users to manually pass buffers for RPC calls. We should do it internally (e.g. a pool or a reusable buffer inside the client) so callers get the benefit transparently. The internal refactoring in this PR could still be useful tho. |
|
Sure, I'm open to any suggestions how to do it better. IIUC an internal buffer approach doesn't work here because deserialization is zero-copy. I can give an example of // Get returns the status code and body of url.
//
// The contents of dst will be replaced by the body and returned, if the dst
// is too small a new slice will be allocated.
//
// The function follows redirects. Use Do* for manually handling redirects.
func Get(dst []byte, url string) (statusCode int, body []byte, err error) {
return defaultClient.Get(dst, url)
} |
Since this is zero-copy, if the caller modifies the payload in a polled message (or reuses the buffer), it silently corrupts messages returned by earlier calls. The only guard is a doc comment -- easy to miss, and nothing in the type system prevents it. Beyond that, only PollMessages gets the buffer-reuse variant. If the goal is avoiding response allocations, we would hope the same argument applies to every API call. Offering it for one method makes the surface inconsistent. But if we offer the buffer-reuse variant to other methods, some of them need to pay attention to the zero-copy silent corruption while some doesn't, it need the user to understand everything behind the Go SDK.+ I benchmarked the new xxxxInto function, it's true that it improves the allocation, and in some cases it halved the cost, but compare to the net I/O, it's a very small improvement compare to what caller lose. Also, there are many features that is supported by Rust but not supported by Go, we can pay more attention to fill the gap instead. |
There was a problem hiding this comment.
When the server returns an empty body (length <= 1), sendLockedInto returns buf unchanged -- still holding data from the previous call. DeserializeFetchMessagesResponse then re-parses that stale data and returns ghost messages. The old code returned []byte{} here. I think you need return buf[:0], nil to preserve the reusable backing array without leaking old contents.
I reproduced this locally: first poll returns 1 message, second poll has an empty server response, but caller still sees 1 stale message from the first call.
There was a problem hiding this comment.
fixed, added regression test
| PollMessagesInto( | ||
| ctx context.Context, | ||
| streamId Identifier, | ||
| topicId Identifier, | ||
| consumer Consumer, | ||
| strategy PollingStrategy, | ||
| count uint32, | ||
| autoCommit bool, | ||
| partitionId *uint32, | ||
| buf []byte, | ||
| ) (*PolledMessage, []byte, error) |
There was a problem hiding this comment.
Adding PollMessagesInto to the Client interface is a compile-breaking change for anyone implementing or mocking it (test doubles, alternative transports). Would it make sense to keep this as a concrete method on IggyTcpClient only, or put it on a separate optional interface like BufferedPoller that callers can type-assert to?
|
I agree with @chengxilo here. I tried reproducing the zero-copy footgun and hit a concrete bug: if the server ever returns an empty body (length <= 1), the caller gets ghost messages from the previous poll because |
…into go/response-buffer
|
@chengxilo @atharvalade OK I understand your view 👍
I'm not sure what the value of the internal refactoring without |
Yeah, I believe so. We are not going to reuse the buffer for response anyway so all the xxxInto method won't be useful. |
|
TBH i'm fine with the optimization idea proposed by @matanper, but i'd like to see a full-blown approach (with design approved by current golang folks: @chengxilo @atharvalade) to the problem, not just polled messages. we don't even have reuseable buffers in Rust SDK, so it would be interesting to see. there are quite a few challenges with the API itself, and even in Rust safety is hard. can't imagine that in go :) |
Yeah I believe if we can make it safe and don't introduce complexity, it would be an amazing improvement. Re-use the buffer is possible, but we have to remove the zero-copy logic in poll-message response deserialization method so caller will not be able to poision the byte slice we are re-using. Then we can provide a I think with this approach, the @matanper what do you think? I just come up with this idea like a few minutes ago, to be honest, I'm not sure if it actually works. |
|
As I see it the rest of the SDK functions ( |
It's totally fine if we can handle the deserialization of all the response type in one single function. For example, we can implment the // do sends the command and returns the response body. Commands implementing
// the appender interface encode directly into a pooled buffer.
func (c *IggyTcpClient) do(ctx context.Context, cmd command.Command, resp encoding.BinaryUnmarshaler) error {
bp := acquireRequestBuf()
buf, err := encodeWireRequest(*bp, cmd)
if err != nil {
releaseRequestBuf(bp)
return err
}
*bp = buf
// we can use a re-usable buffer to receive it to improve performance.
respBuf, err := c.sendWireAndFetchResponse(ctx, buf)
releaseRequestBuf(bp)
return resp.UnmarshalBinary(respBuf)
}To support it we need to provide UnmarshalBinary function for requests. Example: Update the type PolledMessage struct {
// This field should be set before unmarshalling the message.
// It indicates the compression algorithm used for the message payload.
Compression IggyMessageCompression
PartitionId uint32
CurrentOffset uint64
MessageCount uint32
Messages []IggyMessage
}
func (m *PolledMessage) UnmarshalBinary(data []byte) error {
if len(data) == 0 {
return nil
}
length := len(data)
m.PartitionId = binary.LittleEndian.Uint32(data[0:4])
m.CurrentOffset = binary.LittleEndian.Uint64(data[4:12])
m.MessageCount = binary.LittleEndian.Uint32(data[12:16])
position := 16
var messages = make([]IggyMessage, 0)
for position < length {
if position+MessageHeaderSize >= length {
// body needs to be at least 1 byte
break
}
header, err := MessageHeaderFromBytes(data[position : position+MessageHeaderSize])
if err != nil {
return err
}
position += MessageHeaderSize
payloadEnd := position + int(header.PayloadLength)
if payloadEnd > length {
break
}
payloadSlice := data[position:payloadEnd]
position = payloadEnd
var userHeaders []byte = nil
if header.UserHeaderLength > 0 {
userHeaders = data[position : position+int(header.UserHeaderLength)]
}
position += int(header.UserHeaderLength)
switch m.Compression {
case MESSAGE_COMPRESSION_S2, MESSAGE_COMPRESSION_S2_BETTER, MESSAGE_COMPRESSION_S2_BEST:
if length < 32 {
break
}
payloadSlice, err = s2.Decode(nil, payloadSlice)
if err != nil {
return fmt.Errorf("failed to decode s2 payload: %w", err)
}
}
m.Messages = append(messages, IggyMessage{
Header: *header,
Payload: payloadSlice,
UserHeaders: userHeaders,
})
}
// !TODO: Add message offset ordering
return nil
} Caller just need to provide a pointer of the response type they want here. Example: func (c *IggyTcpClient) PollMessages(
ctx context.Context,
streamId iggcon.Identifier,
topicId iggcon.Identifier,
consumer iggcon.Consumer,
strategy iggcon.PollingStrategy,
count uint32,
autoCommit bool,
partitionId *uint32,
) (*iggcon.PolledMessage, error) {
resp := &iggcon.PolledMessage{
// Provide the compression to the deserializer so that it can decompress the messages if needed
MessageCompression: c.MessageCompression,
}
if err := c.do(ctx, &command.PollMessages{
StreamId: streamId,
TopicId: topicId,
Consumer: consumer,
AutoCommit: autoCommit,
Strategy: strategy,
Count: count,
PartitionId: partitionId,
}, resp); err != nil {
return nil, err
}
return resp, nil
}I write these examples manually, so it's error-prone. But they should be enough to describe my idea. @matanper What is your opinion to this approach. It will, instead of introduce complexity, make the code more clear since the deserialization functions for each different type will be replaced by standarized It may need an extra PR to implement the Marshaller and get rid of |
Which issue does this PR address?
Closes #3452
Rationale
PollMessagesallocated a fresh[]bytefor every response body. For consumers polling at high rates this is the largest remaining source of allocation churn on the read side.What changed?
Each
PollMessagescall passed throughdo()→sendWireAndFetchResponse()→make([]byte, length), allocating a new buffer on every RPC regardless of message size.PollMessagesIntolets the caller supply a reusable buffer that grows on demand and is returned for the next call. Once the buffer reaches steady-state capacity, the response-body read is zero-alloc.PollMessagesdelegates toPollMessagesInto(nil)so existing callers are unchanged. The same delegation pattern is applied down the stack:do→doInto,sendWireAndFetchResponse→sendWireAndFetchResponseInto,sendLocked→sendLockedInto.Local Execution
AI Usage