Skip to content

feat(schedules): agent run schedules (v1)#335

Open
jromualdez-scale wants to merge 7 commits into
mainfrom
jerome/scheduled-agents
Open

feat(schedules): agent run schedules (v1)#335
jromualdez-scale wants to merge 7 commits into
mainfrom
jerome/scheduled-agents

Conversation

@jromualdez-scale

@jromualdez-scale jromualdez-scale commented Jun 23, 2026

Copy link
Copy Markdown
Contributor

Summary

Adds per-agent run schedules: recurring schedules that fire a task and deliver a configured initial input on a cron/interval cadence. Replaces the prior schedules implementation (a bare-workflow scheduler) on the same API path.

Each schedule is a Postgres row (the source of truth) plus a Temporal Schedule that acts purely as the recurring clock (it carries only the row id). On each fire, a thin, deterministic workflow runs a single activity that creates a task and delivers the initial input via the same path as a manual run — message/send for sync agents, event/send for agentic agents — attributed to the schedule's stored creator principal.

Feature flag

The API is gated behind ENABLE_AGENT_RUN_SCHEDULES (matches the existing ENABLE_HEALTH_CHECK_WORKFLOW pattern), disabled by default in every environment — when off, the routes are not registered at all. Enable per-environment when ready to test (e.g. locally ENABLE_AGENT_RUN_SCHEDULES=true ./dev.sh). The OpenAPI spec/SDK document the endpoints regardless of the runtime default.

Removed / breaking changes

This PR deletes the previous schedules feature (routes, schemas, service, use case, and its tests). The old endpoint scheduled a raw Temporal workflow and stored nothing in Postgres; the new one schedules an agent run and is Postgres-backed. Because the API path /agents/{agent_id}/schedules is reused with new semantics, this is breaking for existing consumers of the old endpoint:

  • POST /agents/{agent_id}/schedules — request/response schema changed (schedules an agent run, not a bare workflow)
  • POST …/{name}/unpauserenamed to …/{name}/resume
  • Path param {schedule_name}{name} (cosmetic)
  • Adds the new agent_run_schedules table (the old scheduler was Temporal-only)

(…/{name}/trigger is preserved — see below.)

Endpoints

/agents/{agent_id}/schedules:

  • POST — create
  • GET — list (served from Postgres; no per-row Temporal call)
  • GET /{name} — get (includes live Temporal state: next/last fire, action count)
  • PATCH /{name} — partial update (cadence, window, input, params, paused; cron/interval stay mutually exclusive)
  • POST /{name}/pause · POST /{name}/resume
  • POST /{name}/trigger — immediate out-of-band run
  • DELETE /{name}

Implementation notes

  • ScheduledAgentRunWorkflow (thin/deterministic) + launch_scheduled_agent_run activity (all side effects live in the activity).
  • Deterministic per-fire task name makes task/create idempotent on activity retry; a delivered marker guards against duplicate input delivery.
  • Fire-time authorization re-check under the stored creator principal — a revoked creator stops firing cleanly.
  • Scheduled tasks get a task_metadata.display_name (Scheduled Message: <name> · <fire time>), stamped with the nominal fire time (stable across retries) so they render with a label instead of "Unnamed task".
  • delete/pause/resume/update tolerate a missing Temporal schedule so a partial failure can't strand an un-cleanable row.
  • New agent_run_schedules table migration (new-table create; schema-only, non-blocking).

Testing

  • 30 unit tests (service, activity, use case, env flag) pass, covering create/list/get/update/pause/resume/trigger/delete, idempotency, validation, and flag parsing.
  • Verified end-to-end locally (flag on): both delivery paths (sync message/send and agentic event/send), plus pause/resume/update/trigger/delete reflected consistently in Postgres and Temporal.
  • Verified on a dev cluster (branch image, flag on): create → Temporal schedule → worker fires on schedule → message/send delivered, with the row persisted and the creator principal captured from real auth.

Deployment dependency (authz provider)

Dev verification surfaced this: on a cluster using the SGP authz provider (AUTH_PROVIDER=sgp), the provider must learn the new schedule resource type before this is usable there. Today its /v1/authz/check returns 422 for a schedule resource, so:

  • Create works (it gates on agent.update, and register of the schedule resource is tolerated).
  • Every op gated on the schedule resource — GET /{name}, pause, resume, trigger, PATCH, DELETE — returns 422 until the provider handles check/grant/revoke/register/deregister/search for schedule (mirroring agent/task/api_key).

This is provider-side work (the schedule type is already part of the documented auth-provider contract); it should land alongside this feature's rollout. Environments with authz disabled or a permissive provider are unaffected.

🤖 Generated with Claude Code

Greptile Summary

This PR replaces the previous bare-workflow scheduler with a Postgres-backed agent run schedule system, reusing the same /agents/{agent_id}/schedules API path with new semantics. Each schedule is a Postgres row (source of truth) backed by a thin Temporal Schedule (recurring clock only), and each fire creates a fresh task and delivers the configured initial input through the same path as a manual agent run.

  • Core loop: ScheduledAgentRunWorkflowlaunch_scheduled_agent_run activity → idempotent task/create (deterministic name) → event/send or message/send attributed to the stored creator principal, with a scheduled_input_delivered marker preventing duplicate input delivery on activity retry.
  • Resilience design: delete/pause/resume/update all tolerate a missing Temporal schedule so partial failures leave the Postgres row cleanable; list serves entirely from Postgres (no per-row Temporal RPC fan-out); GET enriches with live Temporal fields best-effort.
  • Rollout gating: ENABLE_AGENT_RUN_SCHEDULES (default false) gates route registration; the Temporal worker always registers the workflow and activity unconditionally so existing schedules keep executing even when the API is disabled.

Confidence Score: 5/5

Safe to merge — the three issues flagged in the prior review have all been addressed, and no new blocking problems were introduced.

The Postgres ↔ Temporal consistency story is solid: create rolls back both the auth registration and the DB row on any failure; delete/pause/resume/update all tolerate a missing Temporal schedule so a stranded row can always be cleaned up; list serves exclusively from Postgres. The activity's idempotency design (deterministic task name + delivered marker) is correct and tested. The factory reuses the GlobalDependencies singleton for the DB engine and the class-level HttpxGateway client, so there is no resource-leak risk on repeated activity fires. Test coverage is thorough across all CRUD paths, the two delivery methods, retry guards, authorization skip, and the environment-variable flag.

agentex/src/adapters/temporal/adapter_temporal.py — the new update_schedule method detects ScheduleNotFoundError via string matching rather than a typed exception, which is fragile; harmless for now but worth hardening if Temporal ever changes its error messages.

Important Files Changed

Filename Overview
agentex/src/domain/services/agent_run_schedule_service.py Core service orchestrating Postgres ↔ Temporal consistency for all CRUD operations; correctly implements rollback on create failure, missing-Temporal tolerance on delete/pause/resume/update, and serves list from Postgres only (no per-row Temporal fan-out).
agentex/src/temporal/activities/scheduled_agent_run_activities.py Activity handles all side effects: idempotent task creation via deterministic name, input delivery guard via scheduled_input_delivered marker, fire-time authz re-check, and nominal fire-time extraction from workflow id to avoid wall-clock drift on retries.
agentex/src/temporal/scheduled_agent_run_factory.py Wires AgentsACPUseCase outside FastAPI DI using the shared GlobalDependencies singleton (connection pool, Redis pool, class-level HttpxGateway client) and a per-fire principal context; follows the established task_retention_factory pattern.
agentex/src/adapters/temporal/adapter_temporal.py Adds timezone, overlap_policy, and update_schedule support. The update_schedule method detects ScheduleNotFoundError via string matching ("not found" in str(e).lower()) rather than catching a typed exception, which is fragile but consistent with the existing codebase pattern for Temporal error handling.
agentex/src/temporal/workflows/scheduled_agent_run_workflow.py Thin deterministic workflow; correctly captures fire_id from workflow.info().workflow_id and delegates all side effects to the activity. Retry policy (5 attempts, 2s/2x backoff) and 120s start-to-close timeout are reasonable for the activity's workload.
agentex/src/api/routes/agent_run_schedules.py Routes correctly gate on agent.update for create (no schedule resource yet), use _check_schedule_or_collapse_to_404 for per-schedule operations, and capture only credential-free creator fields (principal_type, user_id, service_account_id, account_id).
agentex/database/migrations/alembic/versions/2026_06_22_1200_add_agent_run_schedules_3b1c9d2e4f6a.py New-table migration with (agent_id, name) unique index and agent_id list index; non-blocking by construction (indexes target the just-created empty table). ForeignKey to agents.id and all non-nullable columns match the ORM definition.
agentex/src/temporal/run_worker.py ScheduledAgentRunWorkflow and launch_scheduled_agent_run are registered unconditionally (not gated on ENABLE_AGENT_RUN_SCHEDULES), which is intentional so the worker can drain existing schedules even when the API flag is off.
agentex/tests/unit/temporal/test_scheduled_agent_run_activity.py Good coverage: schedule-not-found, paused, async/sync delivery paths, already-delivered idempotency guard, permission-revoked skip, and agent-deleted skip. Auth check order (agent.execute → task.create → task.update) is verified explicitly.
agentex/src/domain/use_cases/agent_run_schedules_use_case.py Thin use case layer enforcing cron/interval mutual-exclusivity before delegating to the service; correct validation that at least one cadence is provided on create.

Sequence Diagram

%%{init: {'theme': 'neutral'}}%%
sequenceDiagram
    participant T as Temporal Scheduler
    participant W as ScheduledAgentRunWorkflow
    participant A as launch_scheduled_agent_run activity
    participant DB as Postgres (agent_run_schedules)
    participant Auth as AuthorizationService
    participant ACP as ACP Server

    T->>W: fire (schedule_id)
    W->>W: "fire_id = workflow.info().workflow_id"
    W->>A: execute_activity(schedule_id, fire_id)
    A->>DB: get schedule row
    A->>Auth: check agent.execute (creator principal)
    A->>Auth: check task.create (creator principal)
    A->>ACP: task/create (deterministic name, get-or-create)
    A->>A: check scheduled_input_delivered marker
    A->>Auth: check task.update (creator principal)
    alt "sync agent (ACP_TYPE=SYNC)"
        A->>ACP: message/send
    else async/agentic agent
        A->>ACP: event/send
    end
    A->>DB: update task_metadata (delivered marker)
    A-->>W: "{status: launched, task_id, ...}"
Loading
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
sequenceDiagram
    participant T as Temporal Scheduler
    participant W as ScheduledAgentRunWorkflow
    participant A as launch_scheduled_agent_run activity
    participant DB as Postgres (agent_run_schedules)
    participant Auth as AuthorizationService
    participant ACP as ACP Server

    T->>W: fire (schedule_id)
    W->>W: "fire_id = workflow.info().workflow_id"
    W->>A: execute_activity(schedule_id, fire_id)
    A->>DB: get schedule row
    A->>Auth: check agent.execute (creator principal)
    A->>Auth: check task.create (creator principal)
    A->>ACP: task/create (deterministic name, get-or-create)
    A->>A: check scheduled_input_delivered marker
    A->>Auth: check task.update (creator principal)
    alt "sync agent (ACP_TYPE=SYNC)"
        A->>ACP: message/send
    else async/agentic agent
        A->>ACP: event/send
    end
    A->>DB: update task_metadata (delivered marker)
    A-->>W: "{status: launched, task_id, ...}"
Loading

Reviews (2): Last reviewed commit: "feat(schedules): enforce text input, dro..." | Re-trigger Greptile

jromualdez-scale and others added 3 commits June 23, 2026 14:13
Replace the prior schedules implementation with per-agent "agent run
schedules": recurring schedules backed by a Temporal Schedule that, on
each fire, creates a task and delivers a configured initial input via
the same path as a manual agent run — message/send for sync agents,
event/send for agentic agents — attributed to the schedule's stored
creator principal.

- REST CRUD under /agents/{agent_id}/schedules: create, get, list,
  pause, resume, delete
- Postgres row is the source of truth for the schedule definition;
  the Temporal Schedule is only the recurring clock and carries just
  the row id
- ScheduledAgentRunWorkflow (thin, deterministic) + the
  launch_scheduled_agent_run activity that does all side effects
- deterministic per-fire task name makes task/create idempotent on
  activity retry; a delivered marker guards against re-delivery
- fire-time authz re-check under the creator principal so a revoked
  creator stops firing cleanly
- new agent_run_schedules table migration

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The UI derives a task's display name from task_metadata.display_name
(falling back to params.description), never the task's `name` field, so
scheduled tasks rendered as "Unnamed task".

Set a templated, per-fire display_name on each scheduled task —
"Scheduled Message: {schedule_name} · {fire_time}" — placed first in the
metadata so a caller-supplied display_name in the schedule's task_metadata
still overrides it.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…omments

This repository is public. Strip internal ticket IDs and design-decision
shorthand from code comments and docstrings, keeping the descriptive text.
No behavior change.
@jromualdez-scale jromualdez-scale requested a review from a team as a code owner June 23, 2026 18:16
@github-actions

github-actions Bot commented Jun 23, 2026

Copy link
Copy Markdown

✱ Stainless preview builds

This PR will update the agentex-sdk SDKs with the following commit messages.

openapi

feat(api): remove trigger endpoint, rename schedules to run_schedules, update types

python

feat(api): remove retrieve/delete/pause/trigger/unpause, update create/list in schedules

typescript

feat(api): remove retrieve/delete/pause/trigger/unpause from schedules, update types

Edit this comment to update them. They will appear in their respective SDK's changelogs.

⚠️ agentex-sdk-openapi studio · code · diff

Your SDK build had at least one "error" diagnostic, which is a regression from the base state.
generate ❗ (prev: generate ✅)

New diagnostics (5 error, 8 note)
Endpoint/NotFound: Skipped endpoint because it's not in your OpenAPI spec: `get /agents/{agent_id}/schedules/{schedule_name}`
Endpoint/NotFound: Skipped endpoint because it's not in your OpenAPI spec: `delete /agents/{agent_id}/schedules/{schedule_name}`
Endpoint/NotFound: Skipped endpoint because it's not in your OpenAPI spec: `post /agents/{agent_id}/schedules/{schedule_name}/pause`
Endpoint/NotFound: Skipped endpoint because it's not in your OpenAPI spec: `post /agents/{agent_id}/schedules/{schedule_name}/unpause`
Endpoint/NotFound: Skipped endpoint because it's not in your OpenAPI spec: `post /agents/{agent_id}/schedules/{schedule_name}/trigger`
💡 Endpoint/NotConfigured: Skipped endpoint because it's not in your Stainless config: `get /agents/{agent_id}/schedules/{name}`
💡 Endpoint/NotConfigured: Skipped endpoint because it's not in your Stainless config: `patch /agents/{agent_id}/schedules/{name}`
💡 Endpoint/NotConfigured: Skipped endpoint because it's not in your Stainless config: `delete /agents/{agent_id}/schedules/{name}`
💡 Endpoint/NotConfigured: Skipped endpoint because it's not in your Stainless config: `post /agents/{agent_id}/schedules/{name}/trigger`
💡 Endpoint/NotConfigured: Skipped endpoint because it's not in your Stainless config: `post /agents/{agent_id}/schedules/{name}/pause`
⚠️ agentex-sdk-typescript studio · code · diff

Your SDK build had at least one "error" diagnostic, which is a regression from the base state.
generate ❗ (prev: generate ⚠️) → build ✅lint ✅test ✅

npm install https://pkg.stainless.com/s/agentex-sdk-typescript/2c460208ed25be592da2cb21f9905341cb6ccdb3/dist.tar.gz
New diagnostics (5 error, 8 note)
Endpoint/NotFound: Skipped endpoint because it's not in your OpenAPI spec: `get /agents/{agent_id}/schedules/{schedule_name}`
Endpoint/NotFound: Skipped endpoint because it's not in your OpenAPI spec: `delete /agents/{agent_id}/schedules/{schedule_name}`
Endpoint/NotFound: Skipped endpoint because it's not in your OpenAPI spec: `post /agents/{agent_id}/schedules/{schedule_name}/pause`
Endpoint/NotFound: Skipped endpoint because it's not in your OpenAPI spec: `post /agents/{agent_id}/schedules/{schedule_name}/unpause`
Endpoint/NotFound: Skipped endpoint because it's not in your OpenAPI spec: `post /agents/{agent_id}/schedules/{schedule_name}/trigger`
💡 Endpoint/NotConfigured: Skipped endpoint because it's not in your Stainless config: `get /agents/{agent_id}/schedules/{name}`
💡 Endpoint/NotConfigured: Skipped endpoint because it's not in your Stainless config: `patch /agents/{agent_id}/schedules/{name}`
💡 Endpoint/NotConfigured: Skipped endpoint because it's not in your Stainless config: `delete /agents/{agent_id}/schedules/{name}`
💡 Endpoint/NotConfigured: Skipped endpoint because it's not in your Stainless config: `post /agents/{agent_id}/schedules/{name}/trigger`
💡 Endpoint/NotConfigured: Skipped endpoint because it's not in your Stainless config: `post /agents/{agent_id}/schedules/{name}/pause`
⚠️ agentex-sdk-python studio · conflict

Your SDK build had at least one new error diagnostic, which is a regression from the base state.

New diagnostics (5 error, 8 note)
Endpoint/NotFound: Skipped endpoint because it's not in your OpenAPI spec: `get /agents/{agent_id}/schedules/{schedule_name}`
Endpoint/NotFound: Skipped endpoint because it's not in your OpenAPI spec: `delete /agents/{agent_id}/schedules/{schedule_name}`
Endpoint/NotFound: Skipped endpoint because it's not in your OpenAPI spec: `post /agents/{agent_id}/schedules/{schedule_name}/pause`
Endpoint/NotFound: Skipped endpoint because it's not in your OpenAPI spec: `post /agents/{agent_id}/schedules/{schedule_name}/unpause`
Endpoint/NotFound: Skipped endpoint because it's not in your OpenAPI spec: `post /agents/{agent_id}/schedules/{schedule_name}/trigger`
💡 Endpoint/NotConfigured: Skipped endpoint because it's not in your Stainless config: `get /agents/{agent_id}/schedules/{name}`
💡 Endpoint/NotConfigured: Skipped endpoint because it's not in your Stainless config: `patch /agents/{agent_id}/schedules/{name}`
💡 Endpoint/NotConfigured: Skipped endpoint because it's not in your Stainless config: `delete /agents/{agent_id}/schedules/{name}`
💡 Endpoint/NotConfigured: Skipped endpoint because it's not in your Stainless config: `post /agents/{agent_id}/schedules/{name}/trigger`
💡 Endpoint/NotConfigured: Skipped endpoint because it's not in your Stainless config: `post /agents/{agent_id}/schedules/{name}/pause`

This comment is auto-generated by GitHub Actions and is automatically kept up to date as you push.
If you push custom code to the preview branch, re-run this workflow to update the comment.
Last updated: 2026-06-24 16:46:52 UTC

@jromualdez-scale jromualdez-scale marked this pull request as draft June 23, 2026 18:19
Comment thread agentex/src/domain/services/agent_run_schedule_service.py
Comment thread agentex/src/domain/services/agent_run_schedule_service.py
Comment thread agentex/src/temporal/activities/scheduled_agent_run_activities.py
jromualdez-scale and others added 4 commits June 23, 2026 14:46
…igger

- delete/pause/resume tolerate a missing Temporal schedule (treat as
  success / log) so a prior partial delete can't strand an un-cleanable,
  un-toggleable row.
- list no longer fans out a describe RPC per row; live Temporal fields are
  served only on the single-schedule GET (list state comes from the row).
- scheduled task display_name uses the nominal fire time parsed from the
  workflow id (stable across activity retries) instead of wall-clock now().
- add PATCH /agents/{agent_id}/schedules/{name} (partial update of cadence,
  window, input, etc.; cron/interval stay mutually exclusive).
- re-add POST /agents/{agent_id}/schedules/{name}/trigger for an immediate
  out-of-band run (restores parity with the prior scheduler).
- new Temporal adapter update_schedule; regenerated OpenAPI spec; unit tests
  for all of the above.
…_SCHEDULES)

Gate the run schedules router behind a boolean env flag, matching the
existing ENABLE_HEALTH_CHECK_WORKFLOW pattern. Disabled by default in every
environment, so the API surface is absent unless explicitly enabled.

Local dev reads the flag from the shell (defaults false), so you opt in only
when testing: `ENABLE_AGENT_RUN_SCHEDULES=true ./dev.sh`. Deployed envs set the
env var when they want the feature on.

The OpenAPI generator opts the feature on so the endpoints stay documented in
the spec/SDK regardless of the runtime default; live serving remains gated.
…, harden update ordering

Address review follow-ups on agent run schedules:

- ScheduleInitialInput.type is now Literal["text"] (was a free str with a
  "v1 only" comment), so an unsupported content type is rejected at validation
  instead of silently coerced to text.
- Remove the persisted initial_input_method column/entity field. Delivery
  method is always inferred from the agent's ACP type, so the stored value was
  always null and could only go stale relative to the agent's current type. The
  response still exposes the (now always computed) method.
- update_schedule pushes the merged spec to Temporal BEFORE committing the row,
  closing the common divergence: a rejected cron/timezone or transient Temporal
  error now aborts with nothing persisted. A residual window remains (Temporal
  accepts, then the row write fails) since there is no cross-store transaction;
  the row stays the declared source of truth so a later successful update
  re-converges. create holds the analogous invariant via row rollback; update
  has no in-place rollback, so it orders the writes instead.

Regenerate openapi.yaml and add an update-ordering regression test.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@jromualdez-scale jromualdez-scale marked this pull request as ready for review June 24, 2026 19:43
# Best-effort delivered marker for the retry guard above. A crash between
# delivery and this update is the only window where a retry could
# re-deliver; deterministic task naming still prevents duplicate tasks.
task.task_metadata = {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The deterministic task name prevents a duplicate task, but not a duplicate delivery right? event/send has no dedupe. For agentic agents that second event re-runs the whole turn (double reply, double LLM/tool cost etc..). We could add a idempotency key on the delivery so a replay is dropped

run_schedules_use_case: DAgentRunSchedulesUseCase,
authorization: DAuthorizationService,
) -> DeleteResponse:
await _check_schedule_or_collapse_to_404(

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should soft delete the schedule record for audit purposes

message=f"Schedule '{schedule_id}' not found",
detail=str(e),
) from e
logger.error(f"Failed to update schedule {schedule_id}: {e}")

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we log a metric?

['agent_id', 'name'],
unique=True,
)
op.create_index(

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add an index on updated_at?


def upgrade() -> None:
op.create_table(
'agent_run_schedules',

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a record version field to track every schedule update? Currently, patch, pause, and resume look like blind read-modify-write flows. That means a stale patch could accidentally overwrite a newer pause/resume change and silently reactivate a schedule.
If we add a version number, we can make updates conditional on the version the caller last read. That gives us optimistic concurrency control and also gives us a cleaner audit trail of schedule changes over time.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants