-
Notifications
You must be signed in to change notification settings - Fork 9
Background workers #83
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Changes from all commits
fc08d1f
1eb037c
aba96f5
55d8e53
795de15
e6989d9
d6c654d
7f9678f
22f35ee
1fbe188
dc05598
3fbfea6
d4fbb52
2fbd598
d4e6ac4
68938a1
03d2abb
af44d84
a9ed16c
b60af62
36b4bfa
a7c992e
287d0e1
015242c
14967aa
c21c06c
05cf791
fc7c3ef
95c100b
5b5eb9f
c748208
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,71 @@ | ||
| """Add ingestion status | ||
|
|
||
| Revision ID: b2c52ee8ff12 | ||
| Revises: 28bee3aa2429 | ||
| Create Date: 2026-05-11 16:16:03.768893 | ||
|
|
||
| """ | ||
|
|
||
| from typing import Sequence, Union | ||
|
|
||
| import sqlalchemy as sa | ||
|
|
||
| from alembic import op | ||
|
|
||
| # revision identifiers, used by Alembic. | ||
| revision: str = "b2c52ee8ff12" | ||
| down_revision: Union[str, Sequence[str], None] = "28bee3aa2429" | ||
| branch_labels: Union[str, Sequence[str], None] = None | ||
| depends_on: Union[str, Sequence[str], None] = None | ||
|
|
||
|
|
||
| def upgrade() -> None: | ||
| """Upgrade schema.""" | ||
| conn = op.get_bind() | ||
| dialect = conn.dialect.name | ||
| if dialect == "postgresql": | ||
| op.execute( | ||
| "CREATE TYPE ingestionstatus AS ENUM ('QUEUED', 'COPYING', 'COPIED', " | ||
| "'VALIDATING', 'VALIDATED', 'COMPLETED', 'COPY_FAILED', " | ||
| "'VALIDATION_FAILED')" | ||
| ) | ||
| with op.batch_alter_table("simulations", schema=None) as batch_op: | ||
| batch_op.add_column( | ||
| sa.Column( | ||
| "ingestion_status", | ||
| sa.Enum( | ||
| "QUEUED", | ||
| "COPYING", | ||
| "COPIED", | ||
| "VALIDATING", | ||
| "VALIDATED", | ||
| "COMPLETED", | ||
| "COPY_FAILED", | ||
| "VALIDATION_FAILED", | ||
| name="ingestionstatus", | ||
| ), | ||
| nullable=True, | ||
| ) | ||
| ) | ||
| batch_op.add_column(sa.Column("ingestion_version", sa.Integer(), nullable=True)) | ||
| op.execute( | ||
| "UPDATE simulations SET ingestion_status = 'COMPLETED' WHERE ingestion_status " | ||
| "IS NULL" | ||
| ) | ||
| op.execute( | ||
| "UPDATE simulations SET ingestion_version = 0 WHERE ingestion_version IS NULL" | ||
| ) | ||
| with op.batch_alter_table("simulations", schema=None) as batch_op: | ||
| batch_op.alter_column("ingestion_status", nullable=False) | ||
| batch_op.alter_column("ingestion_version", nullable=False) | ||
|
|
||
|
|
||
| def downgrade() -> None: | ||
| """Downgrade schema.""" | ||
| with op.batch_alter_table("simulations", schema=None) as batch_op: | ||
| batch_op.drop_column("ingestion_version") | ||
| batch_op.drop_column("ingestion_status") | ||
| conn = op.get_bind() | ||
| dialect = conn.dialect.name | ||
| if dialect == "postgresql": | ||
| op.execute("DROP TYPE ingestionstatus") | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,77 @@ | ||
| # Celery async task processing | ||
|
|
||
| SimDB uses [Celery](https://docs.celeryproject.org/) to run asynchronous background | ||
| tasks such as copying simulation files and completing the ingestion pipeline. | ||
|
|
||
| ## Overview | ||
|
|
||
| When simulations are uploaded via the REST API, the server offloads heavy operations | ||
| to Celery workers instead of blocking the HTTP request. Tasks are defined in | ||
| `src/simdb/workers/tasks.py`: | ||
|
|
||
| - `copy_files_task` — copies input/output files from source locations to the server's | ||
| upload folder and updates the simulation's ingestion status. | ||
| - `complete_ingestion_task` — marks a simulation as fully ingested. | ||
| - `validate_imas_task` — runs validation checks on IMAS data (placeholder). | ||
| - `send_email_task` — sends email notifications. | ||
|
|
||
| Tasks can be chained in the API endpoint: | ||
|
|
||
| ```python | ||
| copy_files = copy_files_task.si(simulation.uuid, ...) | ||
| complete = complete_ingestion_task.si(simulation.uuid) | ||
| _ = (copy_files | complete).apply_async() | ||
| ``` | ||
|
|
||
| ## Configuration | ||
|
|
||
| Celery is configured via `app.cfg`: | ||
|
|
||
| | Section | Option | Required | Description | | ||
| |---------|----------------|----------|--------------------------------------------------| | ||
| | celery | broker_url | no | Redis URL for the message broker. Defaults to `redis://localhost:6379/0` | | ||
| | celery | result_backend | no | Redis URL for results storage. Defaults to `redis://localhost:6379/0` | | ||
|
|
||
| Example: | ||
|
|
||
| ```ini | ||
| [celery] | ||
| broker_url = redis://localhost:6379/0 | ||
| result_backend = redis://localhost:6379/0 | ||
| ``` | ||
|
|
||
| ## Running workers | ||
|
|
||
| ### Standalone worker | ||
|
|
||
| Start a Celery worker using the built-in CLI: | ||
|
|
||
| ```bash | ||
| simdb_celery worker | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should be simdb_worker. i tried simdb_celery worker but it didnt work. simdb_worker works and i also see it registered under that in pyproject.toml |
||
| ``` | ||
|
|
||
| ### Worker with beat scheduler | ||
|
|
||
| For periodic tasks (e.g. cleanup, reports), run both the worker and beat: | ||
|
|
||
| ```bash | ||
| # Terminal 1: worker | ||
| simdb_celery worker | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. simdb_worker |
||
|
|
||
| # Terminal 2: beat scheduler | ||
| simdb_celery beat | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. simdb_beat |
||
| ``` | ||
|
|
||
| ### Flower monitoring | ||
|
|
||
| [Flower](https://flower.readthedocs.io/) provides a web UI for monitoring Celery | ||
| workers and tasks: | ||
|
|
||
| ```bash | ||
| celery -A simdb.workers.celery flower --port=5555 | ||
| ``` | ||
|
|
||
| ## Testing with eager mode | ||
|
|
||
| In tests, set `task_always_eager = True` to run tasks synchronously without a | ||
| broker. | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -45,6 +45,22 @@ simdb_server | |
|
|
||
| This will start a server on port 5000. You can test this server is running by opening http://localhost:5000 in a browser. | ||
|
|
||
| ## Running Celery workers | ||
|
|
||
| For development, you typically want to run Celery tasks synchronously. This is | ||
| enabled by setting `task_always_eager = True` in tests (see `tests/remote/api/v1.3/test_simulations3.py`). | ||
|
|
||
| To run actual background workers during development: | ||
|
|
||
| ```bash | ||
| # Worker | ||
| simdb_celery worker | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. simdb_worker |
||
|
|
||
| # Beat scheduler (if needed) | ||
| simdb_celery beat | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. simdb_beat |
||
| ``` | ||
|
|
||
| See the [Celery documentation](celery.md) for full details. | ||
| ## Swagger API documentation | ||
|
|
||
| SimDB provides interactive Swagger API documentation for each API version. The documentation is automatically generated and accessible at different endpoints depending on the API version you want to explore. | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is this ingestion version for?