fix: use the closed window in the eof response#360
Conversation
Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #360 +/- ##
==========================================
- Coverage 92.74% 92.74% -0.01%
==========================================
Files 67 67
Lines 3514 3540 +26
Branches 228 232 +4
==========================================
+ Hits 3259 3283 +24
- Misses 190 191 +1
- Partials 65 66 +1 ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
| generator_response = accumulator_stub.AccumulateFn( | ||
| request_iterator=request_generator(count=5, request=request, send_close=False) | ||
| ) | ||
| except grpc.RpcError as e: |
There was a problem hiding this comment.
Don't ignore these errors. If there's an error test should fail right?
There was a problem hiding this comment.
I'll mark the test as fail in the newly added tests.
| _result_queue: NonBlockingIterator | ||
| _consumer_future: Task | ||
| _latest_watermark: datetime | ||
| _close_window: KeyedWindow | None |
There was a problem hiding this comment.
I think you can do like this:
_close_window: KeyedWindow | None = field(default=None, init=False)This value is not known/set at initialization of AccumulatorResult right? With above, it's not part of the constructor.
There was a problem hiding this comment.
updated ty!
| """ | ||
| return self._close_window | ||
|
|
||
| def set_close_window(self, window: KeyedWindow): |
There was a problem hiding this comment.
close_window is already a read-only property. Let's make this as well property setter ?
@close_window.setter
def close_window(self, window: KeyedWindow):
if not isinstance(window, KeyedWindow):
raise TypeError("window must be a KeyedWindow object")
self._close_window = windowThere was a problem hiding this comment.
updated ty!
Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
… grpc server fails Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
Based off of numaproj/numaflow-rs#177
Testing
Updated the stream sorter example's handler to the following:
Created a jitter source, which pauses emission of events with certain keys for 45 seconds. Total 5 keys in the system. Timeout set for the accumulator is 30s
Logged into stream sorter pod to track disk usage across 4 hours (I should've probably installed prometheus and metrics server to gather this detail in my local cluster but I was too lazy to do that for some reason) :
Disk usage
Memory usage
Overall values seem mostly consistent across time.