feat(schedules): agent run schedules (v1)#335
Conversation
Replace the prior schedules implementation with per-agent "agent run
schedules": recurring schedules backed by a Temporal Schedule that, on
each fire, creates a task and delivers a configured initial input via
the same path as a manual agent run — message/send for sync agents,
event/send for agentic agents — attributed to the schedule's stored
creator principal.
- REST CRUD under /agents/{agent_id}/schedules: create, get, list,
pause, resume, delete
- Postgres row is the source of truth for the schedule definition;
the Temporal Schedule is only the recurring clock and carries just
the row id
- ScheduledAgentRunWorkflow (thin, deterministic) + the
launch_scheduled_agent_run activity that does all side effects
- deterministic per-fire task name makes task/create idempotent on
activity retry; a delivered marker guards against re-delivery
- fire-time authz re-check under the creator principal so a revoked
creator stops firing cleanly
- new agent_run_schedules table migration
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The UI derives a task's display name from task_metadata.display_name
(falling back to params.description), never the task's `name` field, so
scheduled tasks rendered as "Unnamed task".
Set a templated, per-fire display_name on each scheduled task —
"Scheduled Message: {schedule_name} · {fire_time}" — placed first in the
metadata so a caller-supplied display_name in the schedule's task_metadata
still overrides it.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…omments This repository is public. Strip internal ticket IDs and design-decision shorthand from code comments and docstrings, keeping the descriptive text. No behavior change.
✱ Stainless preview buildsThis PR will update the openapi python typescript Edit this comment to update them. They will appear in their respective SDK's changelogs.
|
| ❗ Endpoint/NotFound: Skipped endpoint because it's not in your OpenAPI spec: `get /agents/{agent_id}/schedules/{schedule_name}` |
| ❗ Endpoint/NotFound: Skipped endpoint because it's not in your OpenAPI spec: `delete /agents/{agent_id}/schedules/{schedule_name}` |
| ❗ Endpoint/NotFound: Skipped endpoint because it's not in your OpenAPI spec: `post /agents/{agent_id}/schedules/{schedule_name}/pause` |
| ❗ Endpoint/NotFound: Skipped endpoint because it's not in your OpenAPI spec: `post /agents/{agent_id}/schedules/{schedule_name}/unpause` |
| ❗ Endpoint/NotFound: Skipped endpoint because it's not in your OpenAPI spec: `post /agents/{agent_id}/schedules/{schedule_name}/trigger` |
| 💡 Endpoint/NotConfigured: Skipped endpoint because it's not in your Stainless config: `get /agents/{agent_id}/schedules/{name}` |
| 💡 Endpoint/NotConfigured: Skipped endpoint because it's not in your Stainless config: `patch /agents/{agent_id}/schedules/{name}` |
| 💡 Endpoint/NotConfigured: Skipped endpoint because it's not in your Stainless config: `delete /agents/{agent_id}/schedules/{name}` |
| 💡 Endpoint/NotConfigured: Skipped endpoint because it's not in your Stainless config: `post /agents/{agent_id}/schedules/{name}/trigger` |
| 💡 Endpoint/NotConfigured: Skipped endpoint because it's not in your Stainless config: `post /agents/{agent_id}/schedules/{name}/pause` |
⚠️ agentex-sdk-typescript studio · code · diff
Your SDK build had at least one "error" diagnostic, which is a regression from the base state.
generate ❗(prev:generate ⚠️) →build ✅→lint ✅→test ✅npm install https://pkg.stainless.com/s/agentex-sdk-typescript/2c460208ed25be592da2cb21f9905341cb6ccdb3/dist.tar.gzNew diagnostics (5 error, 8 note)
❗ Endpoint/NotFound: Skipped endpoint because it's not in your OpenAPI spec: `get /agents/{agent_id}/schedules/{schedule_name}` ❗ Endpoint/NotFound: Skipped endpoint because it's not in your OpenAPI spec: `delete /agents/{agent_id}/schedules/{schedule_name}` ❗ Endpoint/NotFound: Skipped endpoint because it's not in your OpenAPI spec: `post /agents/{agent_id}/schedules/{schedule_name}/pause` ❗ Endpoint/NotFound: Skipped endpoint because it's not in your OpenAPI spec: `post /agents/{agent_id}/schedules/{schedule_name}/unpause` ❗ Endpoint/NotFound: Skipped endpoint because it's not in your OpenAPI spec: `post /agents/{agent_id}/schedules/{schedule_name}/trigger` 💡 Endpoint/NotConfigured: Skipped endpoint because it's not in your Stainless config: `get /agents/{agent_id}/schedules/{name}` 💡 Endpoint/NotConfigured: Skipped endpoint because it's not in your Stainless config: `patch /agents/{agent_id}/schedules/{name}` 💡 Endpoint/NotConfigured: Skipped endpoint because it's not in your Stainless config: `delete /agents/{agent_id}/schedules/{name}` 💡 Endpoint/NotConfigured: Skipped endpoint because it's not in your Stainless config: `post /agents/{agent_id}/schedules/{name}/trigger` 💡 Endpoint/NotConfigured: Skipped endpoint because it's not in your Stainless config: `post /agents/{agent_id}/schedules/{name}/pause`
⚠️ agentex-sdk-python studio · conflict
Your SDK build had at least one new error diagnostic, which is a regression from the base state.
New diagnostics (5 error, 8 note)
❗ Endpoint/NotFound: Skipped endpoint because it's not in your OpenAPI spec: `get /agents/{agent_id}/schedules/{schedule_name}` ❗ Endpoint/NotFound: Skipped endpoint because it's not in your OpenAPI spec: `delete /agents/{agent_id}/schedules/{schedule_name}` ❗ Endpoint/NotFound: Skipped endpoint because it's not in your OpenAPI spec: `post /agents/{agent_id}/schedules/{schedule_name}/pause` ❗ Endpoint/NotFound: Skipped endpoint because it's not in your OpenAPI spec: `post /agents/{agent_id}/schedules/{schedule_name}/unpause` ❗ Endpoint/NotFound: Skipped endpoint because it's not in your OpenAPI spec: `post /agents/{agent_id}/schedules/{schedule_name}/trigger` 💡 Endpoint/NotConfigured: Skipped endpoint because it's not in your Stainless config: `get /agents/{agent_id}/schedules/{name}` 💡 Endpoint/NotConfigured: Skipped endpoint because it's not in your Stainless config: `patch /agents/{agent_id}/schedules/{name}` 💡 Endpoint/NotConfigured: Skipped endpoint because it's not in your Stainless config: `delete /agents/{agent_id}/schedules/{name}` 💡 Endpoint/NotConfigured: Skipped endpoint because it's not in your Stainless config: `post /agents/{agent_id}/schedules/{name}/trigger` 💡 Endpoint/NotConfigured: Skipped endpoint because it's not in your Stainless config: `post /agents/{agent_id}/schedules/{name}/pause`
This comment is auto-generated by GitHub Actions and is automatically kept up to date as you push.
If you push custom code to the preview branch, re-run this workflow to update the comment.
Last updated: 2026-06-24 16:46:52 UTC
…igger
- delete/pause/resume tolerate a missing Temporal schedule (treat as
success / log) so a prior partial delete can't strand an un-cleanable,
un-toggleable row.
- list no longer fans out a describe RPC per row; live Temporal fields are
served only on the single-schedule GET (list state comes from the row).
- scheduled task display_name uses the nominal fire time parsed from the
workflow id (stable across activity retries) instead of wall-clock now().
- add PATCH /agents/{agent_id}/schedules/{name} (partial update of cadence,
window, input, etc.; cron/interval stay mutually exclusive).
- re-add POST /agents/{agent_id}/schedules/{name}/trigger for an immediate
out-of-band run (restores parity with the prior scheduler).
- new Temporal adapter update_schedule; regenerated OpenAPI spec; unit tests
for all of the above.
…_SCHEDULES) Gate the run schedules router behind a boolean env flag, matching the existing ENABLE_HEALTH_CHECK_WORKFLOW pattern. Disabled by default in every environment, so the API surface is absent unless explicitly enabled. Local dev reads the flag from the shell (defaults false), so you opt in only when testing: `ENABLE_AGENT_RUN_SCHEDULES=true ./dev.sh`. Deployed envs set the env var when they want the feature on. The OpenAPI generator opts the feature on so the endpoints stay documented in the spec/SDK regardless of the runtime default; live serving remains gated.
…, harden update ordering Address review follow-ups on agent run schedules: - ScheduleInitialInput.type is now Literal["text"] (was a free str with a "v1 only" comment), so an unsupported content type is rejected at validation instead of silently coerced to text. - Remove the persisted initial_input_method column/entity field. Delivery method is always inferred from the agent's ACP type, so the stored value was always null and could only go stale relative to the agent's current type. The response still exposes the (now always computed) method. - update_schedule pushes the merged spec to Temporal BEFORE committing the row, closing the common divergence: a rejected cron/timezone or transient Temporal error now aborts with nothing persisted. A residual window remains (Temporal accepts, then the row write fails) since there is no cross-store transaction; the row stays the declared source of truth so a later successful update re-converges. create holds the analogous invariant via row rollback; update has no in-place rollback, so it orders the writes instead. Regenerate openapi.yaml and add an update-ordering regression test. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
| # Best-effort delivered marker for the retry guard above. A crash between | ||
| # delivery and this update is the only window where a retry could | ||
| # re-deliver; deterministic task naming still prevents duplicate tasks. | ||
| task.task_metadata = { |
There was a problem hiding this comment.
The deterministic task name prevents a duplicate task, but not a duplicate delivery right? event/send has no dedupe. For agentic agents that second event re-runs the whole turn (double reply, double LLM/tool cost etc..). We could add a idempotency key on the delivery so a replay is dropped
| run_schedules_use_case: DAgentRunSchedulesUseCase, | ||
| authorization: DAuthorizationService, | ||
| ) -> DeleteResponse: | ||
| await _check_schedule_or_collapse_to_404( |
There was a problem hiding this comment.
I think we should soft delete the schedule record for audit purposes
| message=f"Schedule '{schedule_id}' not found", | ||
| detail=str(e), | ||
| ) from e | ||
| logger.error(f"Failed to update schedule {schedule_id}: {e}") |
| ['agent_id', 'name'], | ||
| unique=True, | ||
| ) | ||
| op.create_index( |
There was a problem hiding this comment.
can we add an index on updated_at?
|
|
||
| def upgrade() -> None: | ||
| op.create_table( | ||
| 'agent_run_schedules', |
There was a problem hiding this comment.
Should we add a record version field to track every schedule update? Currently, patch, pause, and resume look like blind read-modify-write flows. That means a stale patch could accidentally overwrite a newer pause/resume change and silently reactivate a schedule.
If we add a version number, we can make updates conditional on the version the caller last read. That gives us optimistic concurrency control and also gives us a cleaner audit trail of schedule changes over time.
Summary
Adds per-agent run schedules: recurring schedules that fire a task and deliver a configured initial input on a cron/interval cadence. Replaces the prior
schedulesimplementation (a bare-workflow scheduler) on the same API path.Each schedule is a Postgres row (the source of truth) plus a Temporal Schedule that acts purely as the recurring clock (it carries only the row id). On each fire, a thin, deterministic workflow runs a single activity that creates a task and delivers the initial input via the same path as a manual run —
message/sendfor sync agents,event/sendfor agentic agents — attributed to the schedule's stored creator principal.Feature flag
The API is gated behind
ENABLE_AGENT_RUN_SCHEDULES(matches the existingENABLE_HEALTH_CHECK_WORKFLOWpattern), disabled by default in every environment — when off, the routes are not registered at all. Enable per-environment when ready to test (e.g. locallyENABLE_AGENT_RUN_SCHEDULES=true ./dev.sh). The OpenAPI spec/SDK document the endpoints regardless of the runtime default.Removed / breaking changes
This PR deletes the previous
schedulesfeature (routes, schemas, service, use case, and its tests). The old endpoint scheduled a raw Temporal workflow and stored nothing in Postgres; the new one schedules an agent run and is Postgres-backed. Because the API path/agents/{agent_id}/schedulesis reused with new semantics, this is breaking for existing consumers of the old endpoint:POST /agents/{agent_id}/schedules— request/response schema changed (schedules an agent run, not a bare workflow)POST …/{name}/unpause→ renamed to…/{name}/resume{schedule_name}→{name}(cosmetic)agent_run_schedulestable (the old scheduler was Temporal-only)(
…/{name}/triggeris preserved — see below.)Endpoints
/agents/{agent_id}/schedules:POST— createGET— list (served from Postgres; no per-row Temporal call)GET /{name}— get (includes live Temporal state: next/last fire, action count)PATCH /{name}— partial update (cadence, window, input, params, paused; cron/interval stay mutually exclusive)POST /{name}/pause·POST /{name}/resumePOST /{name}/trigger— immediate out-of-band runDELETE /{name}Implementation notes
ScheduledAgentRunWorkflow(thin/deterministic) +launch_scheduled_agent_runactivity (all side effects live in the activity).task/createidempotent on activity retry; a delivered marker guards against duplicate input delivery.task_metadata.display_name(Scheduled Message: <name> · <fire time>), stamped with the nominal fire time (stable across retries) so they render with a label instead of "Unnamed task".delete/pause/resume/updatetolerate a missing Temporal schedule so a partial failure can't strand an un-cleanable row.agent_run_schedulestable migration (new-table create; schema-only, non-blocking).Testing
message/sendand agenticevent/send), plus pause/resume/update/trigger/delete reflected consistently in Postgres and Temporal.message/senddelivered, with the row persisted and the creator principal captured from real auth.Deployment dependency (authz provider)
Dev verification surfaced this: on a cluster using the SGP authz provider (
AUTH_PROVIDER=sgp), the provider must learn the newscheduleresource type before this is usable there. Today its/v1/authz/checkreturns 422 for ascheduleresource, so:agent.update, andregisterof thescheduleresource is tolerated).GET /{name},pause,resume,trigger,PATCH,DELETE— returns 422 until the provider handlescheck/grant/revoke/register/deregister/searchforschedule(mirroringagent/task/api_key).This is provider-side work (the
scheduletype is already part of the documented auth-provider contract); it should land alongside this feature's rollout. Environments with authz disabled or a permissive provider are unaffected.🤖 Generated with Claude Code
Greptile Summary
This PR replaces the previous bare-workflow scheduler with a Postgres-backed agent run schedule system, reusing the same
/agents/{agent_id}/schedulesAPI path with new semantics. Each schedule is a Postgres row (source of truth) backed by a thin Temporal Schedule (recurring clock only), and each fire creates a fresh task and delivers the configured initial input through the same path as a manual agent run.ScheduledAgentRunWorkflow→launch_scheduled_agent_runactivity → idempotenttask/create(deterministic name) →event/sendormessage/sendattributed to the stored creator principal, with ascheduled_input_deliveredmarker preventing duplicate input delivery on activity retry.ENABLE_AGENT_RUN_SCHEDULES(defaultfalse) gates route registration; the Temporal worker always registers the workflow and activity unconditionally so existing schedules keep executing even when the API is disabled.Confidence Score: 5/5
Safe to merge — the three issues flagged in the prior review have all been addressed, and no new blocking problems were introduced.
The Postgres ↔ Temporal consistency story is solid: create rolls back both the auth registration and the DB row on any failure; delete/pause/resume/update all tolerate a missing Temporal schedule so a stranded row can always be cleaned up; list serves exclusively from Postgres. The activity's idempotency design (deterministic task name + delivered marker) is correct and tested. The factory reuses the GlobalDependencies singleton for the DB engine and the class-level HttpxGateway client, so there is no resource-leak risk on repeated activity fires. Test coverage is thorough across all CRUD paths, the two delivery methods, retry guards, authorization skip, and the environment-variable flag.
agentex/src/adapters/temporal/adapter_temporal.py — the new update_schedule method detects ScheduleNotFoundError via string matching rather than a typed exception, which is fragile; harmless for now but worth hardening if Temporal ever changes its error messages.
Important Files Changed
scheduled_input_deliveredmarker, fire-time authz re-check, and nominal fire-time extraction from workflow id to avoid wall-clock drift on retries.Sequence Diagram
%%{init: {'theme': 'neutral'}}%% sequenceDiagram participant T as Temporal Scheduler participant W as ScheduledAgentRunWorkflow participant A as launch_scheduled_agent_run activity participant DB as Postgres (agent_run_schedules) participant Auth as AuthorizationService participant ACP as ACP Server T->>W: fire (schedule_id) W->>W: "fire_id = workflow.info().workflow_id" W->>A: execute_activity(schedule_id, fire_id) A->>DB: get schedule row A->>Auth: check agent.execute (creator principal) A->>Auth: check task.create (creator principal) A->>ACP: task/create (deterministic name, get-or-create) A->>A: check scheduled_input_delivered marker A->>Auth: check task.update (creator principal) alt "sync agent (ACP_TYPE=SYNC)" A->>ACP: message/send else async/agentic agent A->>ACP: event/send end A->>DB: update task_metadata (delivered marker) A-->>W: "{status: launched, task_id, ...}"%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%% sequenceDiagram participant T as Temporal Scheduler participant W as ScheduledAgentRunWorkflow participant A as launch_scheduled_agent_run activity participant DB as Postgres (agent_run_schedules) participant Auth as AuthorizationService participant ACP as ACP Server T->>W: fire (schedule_id) W->>W: "fire_id = workflow.info().workflow_id" W->>A: execute_activity(schedule_id, fire_id) A->>DB: get schedule row A->>Auth: check agent.execute (creator principal) A->>Auth: check task.create (creator principal) A->>ACP: task/create (deterministic name, get-or-create) A->>A: check scheduled_input_delivered marker A->>Auth: check task.update (creator principal) alt "sync agent (ACP_TYPE=SYNC)" A->>ACP: message/send else async/agentic agent A->>ACP: event/send end A->>DB: update task_metadata (delivered marker) A-->>W: "{status: launched, task_id, ...}"Reviews (2): Last reviewed commit: "feat(schedules): enforce text input, dro..." | Re-trigger Greptile