Ext_proc filter#12792
Conversation
…sActivatedImmediately to use real dataPlaneChannel and ServerInterceptor
…entToExtProc to use real dataPlaneChannel
…henMessagesAreDiscarded to use real dataPlaneChannel
…ExtProcAndSuperHalfCloseIsDeferred to use real dataPlaneChannel
…nSuperHalfCloseIsCalled to use real dataPlaneChannel
…False to use real dataPlaneChannel
…sTrue to use real dataPlaneChannel
…OnReady to use real dataPlaneChannel
…Ready to use real dataPlaneChannel
…stsAreForwardedImmediately to use real dataPlaneChannel
…sHeadersAndCallIsBuffered to use real dataPlaneChannel
…henMutationsAreAppliedAndCallIsActivated to use real dataPlaneChannel
…sActivatedImmediately to use real dataPlaneChannel
…entToExtProc to use real dataPlaneChannel
…henMutatedBodyIsForwardedToDataPlane to use real dataPlaneChannel
…eCallFailsOpen to use real dataPlaneChannel
…lizingExecutor to use real dataPlaneChannel
…henMutatedBodyIsForwardedToDataPlane to use real dataPlaneChannel
…henMessagesAreDiscarded to use real dataPlaneChannel
…ExtProcAndSuperHalfCloseIsDeferred to use real dataPlaneChannel
…nSuperHalfCloseIsCalled to use real dataPlaneChannel
…ntToExtProc to use real dataPlaneChannel
…thenMutatedBodyIsDeliveredToClient to use real dataPlaneChannel
…thenClientListenerCloseIsPropagated to use real dataPlaneChannel
…eCallFailsOpen to use real dataPlaneChannel
…Errored to use real dataPlaneChannel
|
Reviewed approximately ~500 LOC of source(not tests), which currently mostly only covers the config part of things. Sending comments incrementally to kick start progress, since it seems that I'll need probably about 3 days to review all of source. |
|
Reviewed about another 300 LOC, which covers static utilities and the interceptors. Speed was slower than expected, but will try to catch up. |
c75ba11 to
64010fd
Compare
| } | ||
| headersToModify.add(HeaderValueOption.create( | ||
| headerValue, | ||
| HeaderValueOption.HeaderAppendAction.valueOf(protoOption.getAppendAction().name()), |
There was a problem hiding this comment.
This may be slightly risky IMO.
I am not sure we want to couple our internal enum to xds proto enum, so I'd prefer an explicit switch case if possible.
There was a problem hiding this comment.
valueOf will fail with an IllegalArgumentException if the enum with the specified name is not present, so it will be caught by unit test failures if a mismatch occurs in the future.
Besides, with backward compatibility in mind, enum names are not subject to change.
There was a problem hiding this comment.
Noted , I am fine with that. For my education, Do we want this to be a runtime exception instead or we don't care because even if we catch it we want to terminate the RPC anyways? If we terminate the RPC do we want it to be gracefully terminated or just throwing an exception is reasonable?
| @Override | ||
| public void beforeStart(ClientCallStreamObserver<ProcessingRequest> requestStream) { | ||
| synchronized (streamLock) { | ||
| extProcClientCallRequestObserver = requestStream; |
There was a problem hiding this comment.
Couple of things here.
- It seems like we aren't respecting control flow before calling on next for the stream and processing pending buffer
- It seems like this is dead code.
BeforeStartis called synchronously so pendingRequests should in theory always be empty.
There was a problem hiding this comment.
Why buffering is required - Buffering is still required when grpc creates a DelayedClientCall for the ext_proc call: While the interceptor execution starts on the application thread, the extProcStub is created using a channel from cachedChannelManager.getChannel(...). If the external processor's gRPC channel is not yet connected, or name resolution/load balancing is still in progress, gRPC-Java wraps the call in a DelayedClientCall. A DelayedClientCall buffers and delays the actual stream start. In this case, stub.process(...) will return before the stream is created and before beforeStart() is called.
The application thread can resume execution and can immediately start calling sendMessage() or halfClose().
At this point, extProcClientCallRequestObserver is still null. Without the pendingProcessingRequests buffer, all request headers, body messages, and half-closes would be completely lost or trigger a crash.
About respecting ext_proc flow control:
The isReady behavior of the data plane call uses also the ext_proc call's isReady state, and it would apply in this buffering case as well. In fact the any buffering into pendingProcessingRequests will already be a violation of flow control by the calling application because isReady would be returning false in that period because extProcClientCallRequestObserver would be null.
There was a problem hiding this comment.
Can you help me understanding this a bit?
I don't understand how DelayedCall or startCall is involved here. The invocation of beforeStart happens at construction time which should be before delayed call can buffer things?
| public void beforeStart(ClientCallStreamObserver<ProcessingRequest> requestStream) { | ||
| synchronized (streamLock) { | ||
| extProcClientCallRequestObserver = requestStream; | ||
| while (!pendingProcessingRequests.isEmpty()) { |
There was a problem hiding this comment.
I have some questiosns on of why we need pendingProcessing.... The design doc seems to mention a couple of cases
- observability mode ext proc slowness
- request drain.
But the grfc for observability mode requests flow control over buffering: https://github.com/markdroth/proposal/blob/6ee533dac7d9d1b71ad46ba8826d2a3b3cdba313/A93-xds-ext-proc.md#flow-control . This should be somewhat easy to enforce, where we push problem to ext proc server, if our caller doesn't respect flow control.
Similarly, flow control is recommended solution for draining as well at https://github.com/markdroth/proposal/blob/6ee533dac7d9d1b71ad46ba8826d2a3b3cdba313/A93-xds-ext-proc.md#early-termination-of-ext_proc-stream . I don't really have a good solution for this if the caller doesn't respect flow control, we'll have to buffer to avoid out of order messages. Maybe let's discuss with Mark on the grfc about this.
There was a problem hiding this comment.
Flow control is already implemented via isReady and onReady overriding of the dataplane rpc to take into account ext_proc stream's existence and readiness.
I have now also implemented buffering request messages during the ext_proc draining time period.
|
Reviewed the extproc response handling. Will be reviewing rest of the client call private methods today. |
…lers - Separate internally triggered protocol errors from server-side gRPC stream onError callbacks. - Introduce `internalOnError(Throwable)` to propagate client cancellation only when the error is triggered internally by the client filter. - Avoid calling `extProcClientCallRequestObserver.onError(t)` inside the StreamObserver's `onError(t)` callback since the stream is already terminated by the gRPC framework. - Remove unused and dead `response.hasRequestTrailers()` handling block.
… machines Migrates the ExternalProcessorFilter's state management in DataPlaneClientCall from 10+ independent AtomicBoolean flags (such as activated, notifiedApp, extProcStreamCompleted, extProcStreamFailed, and drainingExtProcStream) to two disciplined AtomicReference state machines. This eliminates invalid concurrent state combinations, prevents race conditions during cleanup, and simplifies overall lifecycle readability. - Introduced ExtProcStreamState (IDLE, DRAINING, COMPLETED, FAILED) and DataPlaneCallState (IDLE, ACTIVE, CLOSED) enums. - Implemented atomic CAS transition helpers (markExtProcStreamCompleted, markExtProcStreamFailed, markDataPlaneCallClosed) and query methods. - Updated activateCall, internalOnError, and onClose callbacks to coordinate lifecycle events and seamlessly support fail-open (failureModeAllow) behavior without redundant cancellations.
1. Introduced buffering queue: Added pendingDrainingMessages inside DataPlaneClientCall. 2. Buffered during draining: Updated sendMessage(InputStream message) to detect if isExtProcStreamDraining() is true, buffering any outgoing application messages into pendingDrainingMessages rather than forwarding them to ext_proc. 3. Drained upon completion: Added drainPendingDrainingMessages() which drains the queue directly to the upstream raw call. This method is invoked synchronously during stream completion (handleFailOpen). Added unit test givenDrainingStream_whenAppSends_thenBufferedAndDelivered. This test verifies that: 1. Outgoing data plane messages sent while the ext_proc stream is in the DRAINING state are not forwarded to the ext_proc observer. 2. Once the ext_proc stream reaches completion, all buffered messages are successfully drained and received by the upstream data plane receiver.
# Conflicts: # xds/src/main/java/io/grpc/xds/Filter.java # xds/src/test/java/io/grpc/xds/StatefulFilter.java
…anges for parseFilterConfig.
…eClientInterceptor Prior to this change, `ExternalProcessorInterceptor` manually wrapped method descriptors with raw byte marshallers to operate on InputStream payloads. This created redundant wrapping overhead and coupled HTTP filters directly to payload serialization concerns. This commit introduces `RawMessageClientInterceptor` at the framework level in `XdsNameResolver`, which acts as a centralized serialization/deserialization boundary for the entire HTTP filter chain. Key changes: - Implements `RawMessageClientInterceptor` to convert `<ReqT, RespT>` method descriptors to `<InputStream, InputStream>` before passing calls down the xDS HTTP filter chain. - Conditionally injects `RawMessageClientInterceptor` and `ExternalProcessorFilter` into the chain guarded by the `GRPC_EXPERIMENTAL_XDS_EXT_PROC_ON_CLIENT` system property. - Simplifies `ExternalProcessorInterceptor` to operate cleanly on `InputStream` payloads without performing manual marshaller wrapping. - Updates `ExternalProcessorFilterTest` to explicitly chain `RawMessageClientInterceptor` when testing the interceptor standalone.
I identified the following synchronization issues by analyzing the potential race conditions between the thread handling external processor responses (or stream completion) and the application thread calling sendMessage().
Specifically, here is how the two major race conditions occurred during state transitions:
1. The isExtProcStreamCompleted() Check Bypass Race
In the previous implementation, sendMessage(InputStream message) started with:
java
if (passThroughMode.get() || isExtProcStreamCompleted()) {
super.sendMessage(message);
return;
}
When the external processor stream terminated (e.g., via onCompleted()), the event listener immediately called markExtProcStreamCompleted(). This updated the extProcStreamState atomic reference to COMPLETED before handleFailOpen() ran drainPendingDrainingMessages(). If an application thread called sendMessage() in that exact window, isExtProcStreamCompleted() returned true. The application thread bypassed pendingDrainingMessages and sent the message directly over the wire (super.sendMessage). Shortly after, handleFailOpen() flushed the queue, delivering previously buffered draining messages to the wire after the newer message, resulting in out-of-order (FIFO violation) message delivery.
2. The Queue Flushed / Preemption Race
If an application thread entered sendMessage() while the stream was in the DRAINING state, it evaluated:
java
if (isExtProcStreamDraining()) {
pendingDrainingMessages.add(message);
return;
}
If the application thread was preempted right before executing pendingDrainingMessages.add(message), the external processor stream could complete in the background on another thread. E.g., handleFailOpen() would run, invoke drainPendingDrainingMessages() (which found the queue empty), set passThroughMode = true, and finish. When the application thread resumed, it added its message to pendingDrainingMessages. Because drainPendingDrainingMessages() had already executed, that message sat in the queue indefinitely and was dropped.
The Solution
By wrapping the state checks and queue additions in sendMessage() with synchronized (streamLock), and similarly synchronizing drainPendingDrainingMessages() (where passThroughMode = true is updated atomically with the queue flush), we eliminate both race windows and ensure 100% robust, FIFO-ordered delivery across all stream transitions.
| } | ||
| headersToModify.add(HeaderValueOption.create( | ||
| headerValue, | ||
| HeaderValueOption.HeaderAppendAction.valueOf(protoOption.getAppendAction().name()), |
There was a problem hiding this comment.
Noted , I am fine with that. For my education, Do we want this to be a runtime exception instead or we don't care because even if we catch it we want to terminate the RPC anyways? If we terminate the RPC do we want it to be gracefully terminated or just throwing an exception is reasonable?
| @Override | ||
| public void onNext(ProcessingResponse response) { | ||
| try { | ||
| if (response.hasImmediateResponse()) { |
There was a problem hiding this comment.
I rechecked and couldn't find it. Could you help point me to the section in the ext proc that talks about it?
https://github.com/envoyproxy/envoy/blob/4c4ba5c91e9d2d0932cddf939f1781b6fd36f966/source/extensions/filters/http/ext_proc/ext_proc.cc#L1786-L1791 - envoy implementation seems to indicate otherwise, unless I am missing something here.
| @Override | ||
| public void beforeStart(ClientCallStreamObserver<ProcessingRequest> requestStream) { | ||
| synchronized (streamLock) { | ||
| extProcClientCallRequestObserver = requestStream; |
There was a problem hiding this comment.
Can you help me understanding this a bit?
I don't understand how DelayedCall or startCall is involved here. The invocation of beforeStart happens at construction time which should be before delayed call can buffer things?
| if (mutation.hasStreamedResponse()) { | ||
| StreamedBodyResponse streamed = mutation.getStreamedResponse(); | ||
| if (!streamed.getBody().isEmpty()) { | ||
| super.sendMessage(streamed.getBody().newInput()); |
There was a problem hiding this comment.
Do we need to respect flow control before sending messages somehow? and maintain some communication via onReady and Ready between ext proc listener and dataplane call?
I believe this comes down from the ext_proc response observer. How are we ensuring we are appropriately pushing back on the responses from ext_proc to the dataplane call when the data plane is not ready?
| BodyMutation mutation = bodyResponse.getResponse().getBodyMutation(); | ||
| if (mutation.hasStreamedResponse()) { | ||
| StreamedBodyResponse streamed = mutation.getStreamedResponse(); | ||
| if (!streamed.getBody().isEmpty()) { |
There was a problem hiding this comment.
Does the grfc talk about excluding zero byte messsages or anything about handling zero byte messages?
IIUC, there are valid empty body messages google.protobuf.Empty being a popular one that's widely used.
Not entirely sure but it might be that protos with all fields unset also have an empty serialization body.
| super.cancel(message, cause); | ||
| } | ||
|
|
||
| private void handleRequestBodyResponse(BodyResponse bodyResponse) { |
There was a problem hiding this comment.
Do we need to handle end_of_stream (and end ... without..message) field as well here?
| } | ||
|
|
||
| private void handleFailOpen(DataPlaneListener listener) { | ||
| activateCall(); |
There was a problem hiding this comment.
Probably some race condition here. Not 100% sure. But I believe if this function has already run once , the next time it's a no-op.
So, in draining state, the sidecar isn't ready, so if the client may end up doing request(n) which may result in a bunch of pending requests. Then when we run activate call, we expect it to do drainPendingRequests which doesn't get called due to the call becoming a no-op.
|
I think I've reviewed all of the Filter changes now, excluding metrics. I'd want to go over it again(metrics and any changes to the section that I reviewed in past and have slightly changed now), a shorter one this time now that I am familiar with the code and don't need to go section by section. I should be able to wrap this by Monday. So, we can request Eric's review starting early next week and get this submitted. |
ejona86
left a comment
There was a problem hiding this comment.
This was a very small glance, as I wondered how some things turned out when reviewing the design doc.
|
|
||
| private void drainPendingDrainingMessages() { | ||
| synchronized (streamLock) { | ||
| passThroughMode.set(true); |
There was a problem hiding this comment.
This should only be set after the queue is drained, otherwise racing messages will be sent before those in the queue and we'll be writing to the stream from two threads simultaneously, which is not thread-safe.
| return; | ||
| } | ||
|
|
||
| if (isExtProcStreamDraining() || isExtProcStreamCompleted()) { |
There was a problem hiding this comment.
I highly discourage reading the same atomic multiple times within conditions like this. It is subtle that isExtProcStreamCompleted() || isExtProcStreamDraining() would be broken. I tend to make it very clear that atomics are being read for conditions. I might have isExtProcStreamDraining() require the current value be passed in, so callers would do isExtProcStreamDraining(extProcStreamState.get()) and then here you'd do a single extProcStreamState.get() and pass the same value to both helpers.
Obviously, that makes your helper functions a bit less helpful. I think all you need in this case is to move the helper methods to ExtProcStreamState itself, and then it'd all feel pretty natural. markExtProcStreamCompleted() would remain separate like you have it now; that's a state transition, and not just used in conditions like the other methods are.
(If you only mutated extProcStreamState while streamLock was held, then reading the volatile multiple times within the lock is less of a concern.)
…xed them: 1. Client-to-Server EOS & Cardinality Decoupling (handleRequestBodyResponse) The Issue: Previously, the filter tracked application-initiated half-closes using an internal halfClosed atomic boolean. Once set, it assumed the very next ProcessingResponse from the sidecar was the final one and immediately half-closed the upstream RPC, violating gRFC A93’s 1-to-N / M-to-N streaming body specification. The Fix: Stateless Half-Close: Completely eliminated halfClosed state tracking; upstream half-closing is now stateless and driven exclusively by sidecar commands. Explicit A93 Protocol Boundaries: In handleRequestBodyResponse(), proceedWithHalfClose() is triggered strictly when the sidecar sends a ProcessingResponse explicitly marked with end_of_stream = true (piggybacked on a body chunk) or end_of_stream_without_message = true. Streaming Queue Decoupling: Refined the expectedResponses queue to enforce 1-to-1 synchronous ordering only for headers and trailers, allowing asynchronous streaming body chunks to be exchanged freely without triggering cardinality mismatch errors. 2. Strict proceedWithClose() Lifecycle on Trailers Response The Issue: handleResponseBodyResponse() previously triggered premature call closure during body processing. Furthermore, there was ambiguity around whether server-to-client body EOS indicators should trigger call completion. The Fix: Removed Body-Triggered Closure: Completely removed proceedWithClose() from server-to-client body processing (handleResponseBodyResponse()), ensuring body EOS indicators do not terminate the RPC. Trailers-Driven Completion: Enforced that client call completion (proceedWithClose()) relies strictly on the receipt of response trailers from the sidecar (hasResponseTrailers()). Clean Handshake for Skipped Trailers: If response_trailer_mode is set to SKIP (or default), the filter notifies the sidecar that the server stream is finished via an empty body carrying end_of_stream_without_message = true, and immediately invokes proceedWithClose() without waiting for a response.
Implements ext_proc filter from A93 (internal design doc)
Includes commits from unmerged channel caching PRs.
Only the ExternaProcessingFilter.java, ExternaProcessingFilterTest.java and the envoy xds proto import and generated code need to be reviewed.
Rebasing commit history caused all received and merged commits to show my name as the committer, ignore all commits for which I'm not shown as the author.