fix: Respect flush_queue_size in Worker.flush()#64
Conversation
|
bugbot run |
|
Skipping Bugbot: Bugbot is disabled for this repository |
Mercy811
left a comment
There was a problem hiding this comment.
Thanks @sojingle, LGTM! Let's wait for bugbot run. I'm requesting access at https://amplitude.slack.com/archives/C08PQE45N68/p1763513616629679
|
bugbot run |
|
Hi @dsaxton-1password , are you able to test this branch in a real environment? If testing is difficult, I will just proceed with the release. |
I could try, but it would likely take a while to actually trigger the issue (it has been happening maybe once a week) and you may not want to wait that long to release. If you're confident this doesn't introduce breaking changes I would go ahead and release it and we can start running the new version in a staging environment (and then in production if everything runs smoothly). |
Summary
Fix: Respect flush_queue_size in Worker.flush() to prevent payload size errors
Problem
The current
flush()implementation usespull_all()and sends all events in a single HTTP request, completely ignoring theflush_queue_sizeconfiguration. This causes:flush_queue_sizewhen payloads are too large, butflush()bypassed this entirelySolution
Thanks to @dsaxton-1password for raising this issue and submitting the PR #63. Based on that, I made further optimizations and fixed some problems caused by the modification.
Worker.flush()to batch eventsUsing
pull_allto fetch all the data first and then splitting it can reduce the number and duration of locks._create_combined_future()helperChanging the return type of flush to a Feature list would be an API change, so this helper is added to merge them back into a single feature.
flush_queue_sizereductionWhen multiple batches from the same
flush()fail, the old code would reduce the divider for EVERY failure, causing exponential over-reduction.Checklist
Note
Batch flushes by
flush_queue_sizewith a combined future and guard flush-divider increases on payload-too-large; update tests accordingly.src/amplitude/worker.py):flush()byconfiguration.flush_queue_size, submitting each batch to the thread pool._create_combined_future()that waits for all batch futures, logs failures, and raises on error; early-return when no storage or no events.src/amplitude/processor.py):PAYLOAD_TOO_LARGE, only callconfiguration._increase_flush_divider()whenlen(events) <= configuration.flush_queue_sizeto avoid multiple reductions; re-queue events as before.src/test/test_worker.py):stopto verify storage is flushed and HTTP sends occur.flush()and guarded divider increase underPAYLOAD_TOO_LARGE.Written by Cursor Bugbot for commit 954e288. Configure here.