Schedule periodic tasks, cron jobs, and asynchronous method execution with the PyFly scheduling module.
- Introduction
- The @scheduled Decorator
- CronExpression
- TaskScheduler
- TaskExecutorPort
- AsyncIOTaskExecutor
- ThreadPoolTaskExecutor
- The @async_method Decorator
- Configuration
- Auto-Configuration
- Complete Example
Most non-trivial applications need to run work on a schedule: syncing data from an upstream API every five minutes, purging stale records at midnight, or publishing health-check heartbeats every ten seconds. The PyFly scheduling module gives you a declarative, decorator-driven way to define these tasks without manually managing threads, event loops, or timer wheels.
The module is built around a hexagonal architecture:
- Decorators (
@scheduled,@async_method) mark methods for scheduling. - CronExpression provides next-fire-time calculations via standard 5-field cron syntax.
- TaskScheduler is the engine that discovers decorated methods, creates execution loops, and manages their lifecycle.
- TaskExecutorPort is the outbound port abstraction, allowing you to swap execution strategies.
- AsyncIOTaskExecutor and ThreadPoolTaskExecutor are the built-in adapters.
All public types are available from a single import:
from pyfly.scheduling import (
scheduled,
async_method,
CronExpression,
TaskScheduler,
TaskExecutorPort,
)
from pyfly.scheduling.adapters.asyncio_executor import AsyncIOTaskExecutor
from pyfly.scheduling.adapters.thread_executor import ThreadPoolTaskExecutor@scheduled marks a bean method for periodic execution. It is a keyword-only
decorator that accepts exactly one trigger parameter: fixed_rate,
fixed_delay, or cron. Providing zero or more than one trigger raises a
ValueError at decoration time.
from pyfly.scheduling import scheduledRuns the method at a fixed interval, measured from the start of each
invocation. If the method takes longer than the interval, the next run begins
immediately after the current one finishes, but there is no overlap -- the
scheduler awaits the executor's submit() then sleeps for the remaining
interval.
The parameter accepts a datetime.timedelta:
from datetime import timedelta
class MetricsCollector:
@scheduled(fixed_rate=timedelta(seconds=30))
async def collect(self):
"""Collect system metrics every 30 seconds."""
await self.scrape_metrics()Runs the method repeatedly with a fixed delay between the end of one execution and the start of the next. This guarantees a minimum gap between runs, regardless of how long each execution takes.
class DataSyncer:
@scheduled(fixed_delay=timedelta(minutes=5))
async def sync(self):
"""Sync data, then wait 5 minutes before the next sync."""
await self.pull_upstream_changes()The key difference from fixed_rate: with fixed_delay, the scheduler waits
for the task to complete (await task), then sleeps for the full delay before
running again. With fixed_rate, the scheduler fires-and-forgets the task,
sleeps for the interval, then fires again.
Runs the method according to a standard 5-field cron expression. The scheduler
calculates seconds_until_next() via CronExpression, sleeps that long, then
executes the method.
class ReportGenerator:
@scheduled(cron="0 2 * * 1") # Every Monday at 02:00
async def generate_weekly_report(self):
await self.build_and_email_report()An optional timedelta that delays the very first execution. Applies to both
fixed_rate and fixed_delay triggers. Ignored for cron triggers (the first
execution always waits for the next matching cron time).
class CacheWarmer:
@scheduled(fixed_rate=timedelta(minutes=10), initial_delay=timedelta(seconds=30))
async def warm_cache(self):
"""Wait 30 seconds after startup, then warm cache every 10 minutes."""
await self.preload_hot_keys()Under the hood, @scheduled attaches metadata attributes to the decorated
function:
| Attribute | Value |
|---|---|
__pyfly_scheduled__ |
True |
__pyfly_scheduled_cron__ |
The cron expression string, or None |
__pyfly_scheduled_fixed_rate__ |
The timedelta, or None |
__pyfly_scheduled_fixed_delay__ |
The timedelta, or None |
__pyfly_scheduled_initial_delay__ |
The timedelta, or None |
The TaskScheduler reads these attributes during its discovery phase.
CronExpression is an immutable dataclass that wraps a cron expression string
and provides fire-time calculation methods. It delegates parsing and iteration
to the croniter library.
from pyfly.scheduling import CronExpressionPyFly uses the standard 5-field cron format:
+------------- minute (0-59)
| +---------- hour (0-23)
| | +------- day of month (1-31)
| | | +---- month (1-12)
| | | | +- day of week (0-6, 0 = Sunday)
| | | | |
* * * * *
Special characters: * (any), , (list), - (range), / (step).
Invalid expressions raise ValueError during construction:
CronExpression("invalid") # ValueError: Invalid cron expression: invalidReturns the next datetime after a given reference point (default: now()):
from datetime import datetime
from pyfly.scheduling import CronExpression
cron = CronExpression("0 9 * * *") # Daily at 09:00
next_run = cron.next_fire_time()
print(next_run) # e.g., 2026-02-15 09:00:00
# With an explicit reference time
ref = datetime(2026, 3, 1, 8, 0)
next_run = cron.next_fire_time(after=ref)
print(next_run) # 2026-03-01 09:00:00Returns the most recent fire time before a given reference point:
cron = CronExpression("0 */6 * * *") # Every 6 hours
prev = cron.previous_fire_time()Returns a list of the next N fire times:
cron = CronExpression("30 8 * * 1-5") # Weekdays at 08:30
upcoming = cron.next_n_fire_times(5)
for t in upcoming:
print(t)Returns the number of seconds (as float) until the next fire time. This is
the method the TaskScheduler uses to determine how long to sleep in a cron
loop:
cron = CronExpression("0 0 * * *") # Midnight
delay = cron.seconds_until_next()
print(f"Next midnight in {delay:.0f} seconds")| Expression | Description |
|---|---|
* * * * * |
Every minute |
0 * * * * |
Every hour, on the hour |
0 0 * * * |
Every day at midnight |
0 9 * * 1-5 |
Weekdays at 09:00 |
30 2 1 * * |
1st of each month at 02:30 |
*/15 * * * * |
Every 15 minutes |
0 0 * * 0 |
Every Sunday at midnight |
0 8,12,18 * * * |
Daily at 08:00, 12:00, and 18:00 |
TaskScheduler is the engine that ties everything together. It scans beans for
@scheduled methods, creates async loops for each, and manages start/stop
lifecycle.
from pyfly.scheduling import TaskSchedulerThe constructor takes an optional TaskExecutorPort. If none is provided, it
defaults to AsyncIOTaskExecutor:
# Default: uses AsyncIOTaskExecutor
scheduler = TaskScheduler()
# Custom: use ThreadPoolTaskExecutor for CPU-bound tasks
from pyfly.scheduling.adapters.thread_executor import ThreadPoolTaskExecutor
scheduler = TaskScheduler(executor=ThreadPoolTaskExecutor(max_workers=8))Call discover() with a list of bean instances. It scans every public attribute
(names not starting with _) and records those marked with
__pyfly_scheduled__ = True. Returns the number of scheduled methods found:
beans = [metrics_collector, data_syncer, report_generator]
count = scheduler.discover(beans)
print(f"Found {count} scheduled methods")start() and stop() are async methods. start() creates an
asyncio.Task for each discovered entry. stop() cancels all loop tasks,
gathers them, clears the task list, and stops the executor:
await scheduler.start()
# ... application runs ...
await scheduler.stop()Stops all scheduling loops and the executor. Always waits for pending tasks to complete (graceful shutdown).
Each trigger type has its own loop coroutine inside TaskScheduler:
- Cron loop (
_run_cron_loop): Calculatesseconds_until_next()from aCronExpression, sleeps that duration, submits the method to the executor, then repeats. - Fixed-rate loop (
_run_fixed_rate_loop): Optionally sleeps forinitial_delay, then enters a loop that submits the method and sleeps for the rate interval. - Fixed-delay loop (
_run_fixed_delay_loop): Optionally sleeps forinitial_delay, then enters a loop that submits the method, awaits the returned task (waits for completion), sleeps for the delay, then repeats.
Both sync and async methods are supported transparently. The static
_invoke() helper calls the method and, if the result is awaitable, awaits it.
TaskExecutorPort is a Protocol (runtime-checkable) that defines the
contract for task execution:
from pyfly.scheduling import TaskExecutorPort
@runtime_checkable
class TaskExecutorPort(Protocol):
async def submit(self, coro: Coroutine[Any, Any, T]) -> asyncio.Task[T]: ...
async def start(self) -> None: ...
async def stop(self) -> None: ...You can implement this protocol to create custom executors -- for example, one that publishes tasks to a distributed queue or logs execution metrics.
The default executor. Wraps asyncio.create_task() and tracks running tasks in
a set for clean shutdown:
from pyfly.scheduling.adapters.asyncio_executor import AsyncIOTaskExecutor
executor = AsyncIOTaskExecutor()
task = await executor.submit(some_coroutine())
await executor.stop() # Wait for all pending tasks- submit(): Creates an
asyncio.Taskviacreate_task(), adds it to an internal tracking set, and registers a done-callback that removes it. - start(): No-op (ready after construction).
- stop(): Waits for all pending tasks to complete, then clears the task set.
This executor is ideal for I/O-bound tasks that use async/await.
For CPU-bound or blocking work, ThreadPoolTaskExecutor wraps a standard
concurrent.futures.ThreadPoolExecutor:
from pyfly.scheduling.adapters.thread_executor import ThreadPoolTaskExecutor
executor = ThreadPoolTaskExecutor(max_workers=4)It exposes two submission methods:
- submit(coro): Works identically to
AsyncIOTaskExecutor.submit()-- creates anasyncio.Taskfor async coroutines. - submit_sync(func, *args): Runs a synchronous function in the thread pool
via
loop.run_in_executor(), wraps the result withasyncio.ensure_future().
# Async coroutine
task = await executor.submit(async_work())
# Sync function in thread pool
task = executor.submit_sync(cpu_heavy_function, arg1, arg2)Constructor:
| Parameter | Type | Default | Description |
|---|---|---|---|
max_workers |
int |
4 |
Number of threads in the pool |
API:
- start(): No-op (ready after construction).
- stop(): Waits for all pending tasks, clears task set, shuts down the thread pool.
@async_method marks a method to execute asynchronously via a
TaskExecutorPort. The caller returns immediately -- the actual execution is
offloaded to the executor:
from pyfly.scheduling import async_method
class NotificationService:
@async_method
async def send_email(self, to: str, subject: str, body: str):
"""This runs asynchronously -- caller does not wait."""
await self.email_client.send(to, subject, body)Under the hood, @async_method sets __pyfly_async__ = True on the function.
The framework picks this up and routes the call through the configured
TaskExecutorPort.
Scheduling behavior can be configured in pyfly.yaml:
pyfly:
scheduling:
enabled: true
thread-pool:
max-workers: 4| Key | Description | Default |
|---|---|---|
pyfly.scheduling.enabled |
Enable or disable the scheduling subsystem | true |
pyfly.scheduling.thread-pool.max-workers |
Max threads for ThreadPoolTaskExecutor |
4 |
When enabled is false, the TaskScheduler will not start any loops and
@scheduled methods will be ignored.
Requires: uv add "pyfly[scheduling]" (installs croniter for cron
expression parsing)
When croniter is installed, PyFly automatically registers a TaskScheduler bean through the SchedulingAutoConfiguration class. This eliminates the need to manually create and manage a TaskScheduler instance.
Conditions: croniter library installed.
| Bean | Type | Description |
|---|---|---|
task_scheduler |
TaskScheduler |
Container-managed scheduler that discovers and runs @scheduled methods |
With auto-configuration, you no longer need a SchedulerManager service. The ApplicationContext automatically:
- Creates a
TaskSchedulerbean (from auto-config, or uses one you provide) - Discovers all
@scheduledmethods across all beans - Starts the scheduler during context startup
- Stops the scheduler during context shutdown
@service
class SchedulerManager:
def __init__(self, sync_service: DataSyncService):
self._scheduler = TaskScheduler() # Manual creation
self._beans = [sync_service]
@post_construct
async def start(self):
self._scheduler.discover(self._beans)
await self._scheduler.start()
@pre_destroy
async def stop(self):
await self._scheduler.stop()# Just declare your scheduled beans — no SchedulerManager needed!
@service
class DataSyncService:
@scheduled(fixed_rate=timedelta(minutes=5))
async def sync(self):
...The TaskScheduler is auto-wired as a container bean and the ApplicationContext handles discovery and lifecycle.
Provide your own TaskScheduler bean to override the auto-configured one:
from pyfly.container.bean import bean
from pyfly.container import configuration
from pyfly.scheduling import TaskScheduler
from pyfly.scheduling.adapters.thread_executor import ThreadPoolTaskExecutor
@configuration
class MySchedulingConfig:
@bean
def task_scheduler(self) -> TaskScheduler:
return TaskScheduler(executor=ThreadPoolTaskExecutor(max_workers=4))Source: src/pyfly/scheduling/auto_configuration.py
Below is a full example that demonstrates all three trigger types working together in a single application: a periodic data sync (fixed delay), a cron-based nightly cleanup, and a fixed-rate health heartbeat.
from datetime import timedelta
from pyfly.container import service
from pyfly.context import post_construct, pre_destroy
from pyfly.scheduling import (
CronExpression,
TaskScheduler,
scheduled,
)
from pyfly.scheduling.adapters.asyncio_executor import AsyncIOTaskExecutor
@service
class DataSyncService:
"""Pulls data from an upstream API with a guaranteed gap between runs."""
@scheduled(fixed_delay=timedelta(minutes=5))
async def sync_upstream(self):
print("Starting data sync...")
# Simulate work
import asyncio
await asyncio.sleep(2)
print("Data sync complete.")
@service
class CleanupService:
"""Purges stale records every night at 02:00."""
@scheduled(cron="0 2 * * *")
async def purge_stale_records(self):
print("Running nightly cleanup...")
# Delete records older than 90 days
await self.repository.delete_older_than(days=90)
print("Cleanup done.")
@service
class HealthMonitor:
"""Publishes a heartbeat every 10 seconds, starting after a 5-second delay."""
@scheduled(fixed_rate=timedelta(seconds=10), initial_delay=timedelta(seconds=5))
async def heartbeat(self):
print("Heartbeat: OK")
# With auto-configuration (recommended), no SchedulerManager is needed.
# The ApplicationContext automatically discovers @scheduled methods
# and manages the TaskScheduler lifecycle.
#
# If you need a manual SchedulerManager (e.g., for custom executor):
@service
class SchedulerManager:
"""Manages the lifecycle of the TaskScheduler (manual approach)."""
def __init__(
self,
sync_service: DataSyncService,
cleanup_service: CleanupService,
health_monitor: HealthMonitor,
):
self._scheduler = TaskScheduler() # Uses AsyncIOTaskExecutor by default
self._beans = [sync_service, cleanup_service, health_monitor]
@post_construct
async def start(self):
count = self._scheduler.discover(self._beans)
print(f"Discovered {count} scheduled tasks")
await self._scheduler.start()
@pre_destroy
async def stop(self):
await self._scheduler.stop()
print("Scheduler stopped.")You can also use CronExpression independently for any cron-related
calculation:
from datetime import datetime
from pyfly.scheduling import CronExpression
# When is the next weekday at 09:00?
cron = CronExpression("0 9 * * 1-5")
print(f"Next working-day start: {cron.next_fire_time()}")
print(f"Seconds to wait: {cron.seconds_until_next():.0f}")
# Show the next 5 fire times
for t in cron.next_n_fire_times(5):
print(f" {t}")
# What was the last fire time?
print(f"Previous fire: {cron.previous_fire_time()}")Implementing a custom executor is straightforward -- just satisfy the
TaskExecutorPort protocol:
import asyncio
import logging
from typing import Any, Coroutine, TypeVar
from pyfly.scheduling import TaskExecutorPort
T = TypeVar("T")
logger = logging.getLogger(__name__)
class LoggingTaskExecutor:
"""Custom executor that logs every task submission."""
def __init__(self):
self._tasks: set[asyncio.Task[Any]] = set()
async def submit(self, coro: Coroutine[Any, Any, T]) -> asyncio.Task[T]:
logger.info("Submitting task: %s", coro.__qualname__)
task = asyncio.create_task(coro)
self._tasks.add(task)
task.add_done_callback(self._tasks.discard)
return task
async def start(self) -> None:
pass # Ready after construction
async def stop(self) -> None:
if self._tasks:
await asyncio.gather(*self._tasks, return_exceptions=True)
self._tasks.clear()
# Use it with the scheduler
scheduler = TaskScheduler(executor=LoggingTaskExecutor())This architecture makes it easy to plug in metrics collection, distributed execution, or any other cross-cutting concern without modifying your scheduled tasks.