[FLINK-39103][runtime] Tolerate errors when recycling buffer from input channel notification if it had errors already#27624
[FLINK-39103][runtime] Tolerate errors when recycling buffer from input channel notification if it had errors already#27624rkhachatryan wants to merge 3 commits intoapache:masterfrom
Conversation
| @@ -216,6 +221,10 @@ public void recycle(MemorySegment segment) { | |||
| if (inputChannel.isReleased()) { | |||
| globalPool.recycleUnpooledMemorySegments(Collections.singletonList(segment)); | |||
| return; | |||
| } else if (inputChannel.hasError()) { | |||
| LOG.warn("Input channel has errors - recycling the buffer"); | |||
There was a problem hiding this comment.
Can this warn result in e2e test failure as "unexpected error" or do we only filter for ERROR level there?
There was a problem hiding this comment.
IIRC the script greps errors, not warnings, so it should be fine.
Izeren
left a comment
There was a problem hiding this comment.
Thank you @rkhachatryan, I have left a couple of comments for better understanding of the problem
| @@ -346,6 +346,11 @@ public long unsynchronizedGetSizeOfQueuedBuffers() { | |||
| */ | |||
| public void notifyRequiredSegmentId(int subpartitionId, int segmentId) throws IOException {} | |||
|
|
|||
| /** Whether this input channel has encountered error. */ | |||
| public boolean hasError() { | |||
| return cause.get() != null; | |||
There was a problem hiding this comment.
If it is not too complicated would be good to write a dedicated unit test for new public method
There was a problem hiding this comment.
I agree, I just wanted to agree on the approach first.
Other approaches are passing ignoreErrors to notifyBufferAvailable or ignoring errors coming from it.
| @@ -216,6 +221,10 @@ public void recycle(MemorySegment segment) { | |||
| if (inputChannel.isReleased()) { | |||
| globalPool.recycleUnpooledMemorySegments(Collections.singletonList(segment)); | |||
| return; | |||
| } else if (inputChannel.hasError()) { | |||
| LOG.warn("Input channel has errors - recycling the buffer"); | |||
| globalPool.recycleUnpooledMemorySegments(Collections.singletonList(segment)); | |||
There was a problem hiding this comment.
For my understanding, does every segment have only 1 input channel?
Can it happen that we would call recycleUnpooledMemorySegments on the same segment twice? If yes, is this call idempotent?
There was a problem hiding this comment.
Yes, at any given time each memory segment has only one owner, and recycling it twice would cause an error.
0042631 to
7d35eec
Compare
| final boolean hadError = | ||
| checkpointedInputGate.getChannel(channelInfo.getInputChannelIdx()).hasError(); |
There was a problem hiding this comment.
In theory, recycling the buffer can also fail for other reason:
if (inputChannel.isReleased()) {
globalPool.recycleUnpooledMemorySegments(Collections.singletonList(segment));
But I think global pool failure shouldn't be ignored.
There was a problem hiding this comment.
I share the same concern you mentioned.
The current catch (Exception e) + if (hadError) would swallow all exceptions from releaseDeserializer, not just the channel error. If e.g. globalPool.recycleUnpooledMemorySegments fails while hadError=true, that failure is silently lost.
And it is dangerous if introducing new exception in the future.
|
CI failures seem unrelated |
…in StreamTaskNetworkInput
…orresponding channel already had errors On cleanup, deserializer recycles its buffers, potentially notifying the input channel. However, if the input channel has encountered an error (such as RemoteTransportException); then notification will fail which might cause the whol cleanup to fail and lead to TM shutdown.
Izeren
left a comment
There was a problem hiding this comment.
LGTM, I have added a few more comments, PTAL
| LOG.warn( | ||
| "Ignoring deserializer release failure - the channel has encountered an error before"); | ||
| } else { | ||
| err = e; |
There was a problem hiding this comment.
!nit, If multiple channels throw, we will only capture the last one. Maybe it would make sense to chain them like: err = ExceptionUtils.firstOrSuppressed(e, err); or similar, WDYT
There was a problem hiding this comment.
If we do so, may make sense to add unit test covering multiple failures
There was a problem hiding this comment.
Good point, I'll use firstOrSuppressed.
Covering this with unti tests I think is overkill though, this is not a critical information IMO
| } catch (Exception e) { | ||
| if (hadError) { | ||
| LOG.warn( | ||
| "Ignoring deserializer release failure - the channel has encountered an error before"); |
There was a problem hiding this comment.
Is there any channel info/task info we would like to log, so that we know where this error was coming from?
There was a problem hiding this comment.
Yeah, there' s actually channelInfo available - I'll add it to the log message.
| @@ -316,8 +322,23 @@ public CompletableFuture<?> getAvailableFuture() { | |||
| @Override | |||
| public void close() throws IOException { | |||
| // release the deserializers . this part should not ever fail | |||
There was a problem hiding this comment.
🤔, this part shouldn't ever fail and we put try-catch right after. Should we update this comment explaining how can it fail?
There was a problem hiding this comment.
Good point, updated the comment.
| public void close() throws IOException { | ||
| // release the deserializers . this part should not ever fail | ||
| // WARN: throwing an exception from this method might fail Task close procedure and | ||
| // termination th TM |
1996fanrui
left a comment
There was a problem hiding this comment.
According the inline comment, I wonder if we should fix this at a different layer. Tracing the call chain:
BufferManager.recycle(segment)
→ buffer added back to queue ← already recycled successfully
→ inputChannel.notifyBufferAvailable(1) ← credit notification
→ notifyCreditAvailable()
→ checkPartitionRequestQueueInitialized()
→ checkError() → throws
The buffer is already recycled at that point. The failure comes from notifying credit to a producer whose connection is already dead — which is unnecessary.
What about skipping credit notification for errored channels instead?
// RemoteInputChannel.java
public void notifyBufferAvailable(int numAvailableBuffers) throws IOException {
if (hasError()) {
return;
}
if (numAvailableBuffers > 0 && unannouncedCredit.getAndAdd(numAvailableBuffers) == 0) {
notifyCreditAvailable();
}
}This way: (1) buffer recycling is not affected, (2) global pool errors are not affected (they happen before notifyBufferAvailable in BufferManager.recycle), and (3) all code paths that recycle buffers are covered, not just close().
| final boolean hadError = | ||
| checkpointedInputGate.getChannel(channelInfo.getInputChannelIdx()).hasError(); |
There was a problem hiding this comment.
I share the same concern you mentioned.
The current catch (Exception e) + if (hadError) would swallow all exceptions from releaseDeserializer, not just the channel error. If e.g. globalPool.recycleUnpooledMemorySegments fails while hadError=true, that failure is silently lost.
And it is dangerous if introducing new exception in the future.
The first 3 commits are from #27619 - please ignore them while reviewing.
Example failure stack trace: