feat(taskbroker): Add Push Mode to Taskbroker#573
feat(taskbroker): Add Push Mode to Taskbroker#573james-mcnulty wants to merge 20 commits intomainfrom
Conversation
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 3 potential issues.
Autofix Details
Bugbot Autofix prepared fixes for all 3 issues found in the latest run.
- ✅ Fixed: GRPC server address ignores configurable
grpc_addrfield- Updated gRPC bind address construction to use
config.grpc_addrtogether withconfig.grpc_portinstead of hardcoding0.0.0.0.
- Updated gRPC bind address construction to use
- ✅ Fixed: Callback URL missing protocol scheme for worker callbacks
- Changed push callback URL formatting to include the
http://scheme so workers receive a valid URI.
- Changed push callback URL formatting to include the
- ✅ Fixed:
push_threads=0causes deadlock unlike guardedfetch_threads- Aligned push worker spawning with fetch behavior by using
self.config.push_threads.max(1)to guarantee at least one consumer.
- Aligned push worker spawning with fetch behavior by using
Or push these changes by commenting:
@cursor push effe733488
Preview (effe733488)
diff --git a/src/main.rs b/src/main.rs
--- a/src/main.rs
+++ b/src/main.rs
@@ -196,7 +196,7 @@
let config = config.clone();
async move {
- let addr = format!("0.0.0.0:{}", config.grpc_port)
+ let addr = format!("{}:{}", config.grpc_addr, config.grpc_port)
.parse()
.expect("Failed to parse address");
diff --git a/src/push.rs b/src/push.rs
--- a/src/push.rs
+++ b/src/push.rs
@@ -57,11 +57,11 @@
pub async fn start(&self) -> Result<()> {
let mut handles = vec![];
- for _ in 0..self.config.push_threads {
+ for _ in 0..self.config.push_threads.max(1) {
let endpoint = self.config.worker_endpoint.clone();
let callback_url = format!(
- "{}:{}",
+ "http://{}:{}",
self.config.callback_addr, self.config.callback_port
);This Bugbot Autofix run was free. To enable autofix for future PRs, go to the Cursor dashboard.
evanh
left a comment
There was a problem hiding this comment.
There is now a potential deadlock scenario where the push channels get full and the fetch activations block trying to send. Is that tracked somewhere with a metric?
src/fetch.rs
Outdated
|
|
||
| debug!("Fetching next pending activation..."); | ||
|
|
||
| match store.get_pending_activation(None, None).await { |
There was a problem hiding this comment.
This will need to use the namespace and application parameters passed into it in order to work correctly.
There was a problem hiding this comment.
This is actually something I wanted to bring up. Right now, namespace and application come from the get_task request body. In push mode, there is no easy way to know what values to use here. Should they be provided in the configuration?
There was a problem hiding this comment.
If a broker is handling multiple applications (like in local development, and in smaller environments) we'll need different worker pools to push to. Perhaps we need a mapping between application -> worker pools?
There was a problem hiding this comment.
Should they be provided in the configuration?
Yes this is how it will have to work.
There was a problem hiding this comment.
Done! Config now takes an optional application and an optional list of namespaces.
src/fetch.rs
Outdated
|
|
||
| Err(e) => { | ||
| error!("Failed to fetch pending activation - {:?}", e); | ||
| sleep(Duration::from_millis(100)).await; |
There was a problem hiding this comment.
There is no need for a sleep here.
There was a problem hiding this comment.
I had a sleep there because if this fails, it's either because the store is having issues (e.g. due to scaling AlloyDB up) or because the push queue is full. In both cases it makes sense to wait a little, no?
There was a problem hiding this comment.
Can we tell the difference between the two? I would handle those two scenarios differently. For the queue being full I would wait, but for an actual error we might want to take other actions (e.g. crash the entire producer).
There was a problem hiding this comment.
Yes, definitely. Any idea what should happen if it's the queue being full versus a store error? For now, I can just distinguish between the two and simply log which one it was without doing anything else until we decide for sure.
| ```bash | ||
| # Run unit/integration tests | ||
| make test | ||
| make unit-test |
There was a problem hiding this comment.
It's make unit-test, not make test - that doesn't do anything right now.
There was a problem hiding this comment.
Or rather, there is no test target in the Makefile for doing both unit and integration tests, which seemed to be the intention here.
| fetch_threads: 1, | ||
| push_threads: 1, | ||
| push_queue_size: 1, | ||
| worker_endpoint: "http://127.0.0.1:50052".into(), |
There was a problem hiding this comment.
Is this the port that we'll be using for the worker in self-hosted and local dev? Ideally local development 'just works' and doesn't require additional configuration.
src/fetch.rs
Outdated
|
|
||
| debug!("Fetching next pending activation..."); | ||
|
|
||
| match store.get_pending_activation(None, None).await { |
There was a problem hiding this comment.
If a broker is handling multiple applications (like in local development, and in smaller environments) we'll need different worker pools to push to. Perhaps we need a mapping between application -> worker pools?
fpacifici
left a comment
There was a problem hiding this comment.
Will do a more in depth review later.
Though please consider doing a refactoring of the config before to separate the push attribute from the pull ones.
That would require its own PR.
src/config.rs
Outdated
| /// Run the taskbroker in push mode (as opposed to pull mode). | ||
| pub push_mode: bool, |
There was a problem hiding this comment.
Please change the push_mode boolean into a delivery_mode.
Now I think this configuration will be very hard to set. We have more than ten push specific config elements and more than 10 are pull specific. We need some ways to make it more intuitive.
One option would be to give it a structure, though I do not know whether we rely on these fields to be simple fields. @evanh may know better.
Otherwise, a common pattern for these scenarios is to prefix the poll specific parameters with poll_ and the push specific ones with push_
There was a problem hiding this comment.
Please avoid a helper module. It risks becoming a god object without a clear responsibility. If we need a function to spawn pools have a tokio module for this
There was a problem hiding this comment.
Created a tokio module as suggested.
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
src/fetch/mod.rs
Outdated
| /// - `Ok(true)` if an activation was found | ||
| /// - `Ok(false)` if none pending | ||
| /// - `Err` if fetching failed. | ||
| pub async fn fetch_activation<T: TaskPusher>( |
There was a problem hiding this comment.
Why is this a standalone function instead of part of TaskPusher? That would cut down on the number of clone calls (which are memcpy commands).
There was a problem hiding this comment.
I separated it before so I could test it. I removed the function and moved the logic back into the loop, because you're right about the clones.
src/fetch/mod.rs
Outdated
|
|
||
| // Instead of returning when `fetch_activation` fails, we just try again | ||
| match fetch_activation(store.clone(), pusher.clone(), config.clone()).await { | ||
| Ok(false) | Err(_) => { |
There was a problem hiding this comment.
We should separate these cases. I would move any error handling (logging etc.) up to this function. That way we also handle any unexpected errors.
There was a problem hiding this comment.
Added a new PushError enum so we can handle errors better.
| async move { | ||
| let mut worker = match WorkerServiceClient::connect(endpoint).await { | ||
| Ok(w) => w, | ||
| Err(e) => { | ||
| error!("Failed to connect to worker - {:?}", e); | ||
| return; | ||
| } | ||
| }; |
There was a problem hiding this comment.
Bug: If a push worker fails to connect on startup, it exits silently. The PushPool then reports a successful start, leading to task loss as no tasks can be pushed.
Severity: HIGH
Suggested Fix
Modify the push worker's connection logic to propagate connection errors instead of returning a unit type (). The PushPool::start() method should then check for these errors from its worker threads and return an error if any of them failed to connect, preventing the system from entering a state where tasks are processed but cannot be pushed.
Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent.
Verify if this is a real issue. If it is, propose a fix; if not, explain why it's not
valid.
Location: src/push/mod.rs#L80-L87
Potential issue: In push mode, if a worker service is unreachable on startup, the
corresponding push thread will fail to connect and exit silently without propagating an
error. The `PushPool::start()` method incorrectly interprets this as a successful
completion. Consequently, the main application continues, and fetch threads begin
marking tasks as `Processing`. When these tasks are sent to the push pool, the operation
fails because the receiving end of the channel has been dropped. After several failed
retry attempts, the tasks are marked as `Failure` and discarded, leading to data loss,
while the application logs indicate a successful push operation.


Linear
Completes STREAM-820
Description
Currently, taskworkers pull tasks from taskbrokers via RPC. This approach works, but has some drawbacks. Therefore, we want taskbrokers to push tasks to taskworkers instead. Read this page on Notion for more information.
This PR allows users to run the taskbroker in push mode that can be adjusted using several new configuration parameters.
TASKBROKER_PUSH_MODEboolfalseTASKBROKER_FETCH_THREADSusize1TASKBROKER_PUSH_THREADSusize1TASKBROKER_PUSH_QUEUE_SIZEusize1TASKBROKER_WORKER_ENDPOINTStringhttp://127.0.0.1:50052TASKBROKER_CALLBACK_ADDRString0.0.0.0TASKBROKER_CALLBACK_PORTusize50051TASKBROKER_FETCH_WAIT_MSu64100TASKBROKER_PUSH_TIMEOUT_MSu645000Push Threads
On startup, the taskbroker now creates a "push pool," which is a pool of push threads. All of them wait to receive activations from the same MPMC channel provided by the
flumecrate. When a push thread receives an activation, it sends it to the worker service. Note that each push thread has its own connection to the worker service.Push threads are grouped together by the
PushPooldata structure, which exposes astartmethod to actually spawn the threads and asubmitmethod to receive activations.Fetch Threads
On startup, the taskbroker also creates a "fetch pool," which is a pool of fetch threads. Each one retrieves a pending activation from the store, passes it to the push pool (waiting until it accepts), and repeats.
Notes on Naming
Fetch threads and push threads are actually asynchronous tasks provided by the Tokio crate. They are not real threads.
Details
Dependencies
flume0.12.0 as a dependency (I didn't want to add any dependencies, but Tokio does not provide an asynchronous MPMC queue - only MPSC)sentry-protosfrom 0.4.11 to 0.8.5 (to use the new worker service schema)tonic,tonic-health,prost, andprost-typesto 0.14 (to match the version used bysentry-protos)Additions
FetchPoolabstraction insrc/fetch.rsPushPoolabstraction insrc/push.rssrc/main.rsModifications
get_taskwhen operating in push modeFuture Changes