Skip to content

Background workers#83

Draft
Yannicked wants to merge 31 commits into
iterorganization:developfrom
Yannicked:feature/celery-tasks
Draft

Background workers#83
Yannicked wants to merge 31 commits into
iterorganization:developfrom
Yannicked:feature/celery-tasks

Conversation

@Yannicked

Copy link
Copy Markdown
Collaborator

This PR introduces simulation ingestion via Celery background workers.

When simulations are uploaded through the new v1.3 API endpoint, file copying and status tracking are now handled by Celery tasks (copy_files_task chained with complete_ingestion_task) rather than blocking the HTTP request.

The ingestion pipeline tracks status though the following: QUEUED → COPYING → COPIED → VALIDATING → VALIDATED → COMPLETED, with failure variants.

@Yannicked Yannicked force-pushed the feature/celery-tasks branch from 06bf048 to 68938a1 Compare May 28, 2026 08:19
@Yannicked Yannicked requested a review from ioan-alexandra June 2, 2026 14:28
@Yannicked Yannicked mentioned this pull request Jun 8, 2026
Comment thread src/simdb/enums.py


class IngestionStatus(str, Enum):
QUEUED = "queued"

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here the statuses are lowercase but in the alembic migration they are uppercase. im assuming these will fail

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes ...
Suggest to make any (str, Enum) subclass' member values equal to the member name.

Having Pydantic models including such enums will be more pleasant to work with... (e.g. deserializing upon assignment with a matching str)

btw, StrEnumwas introduced in 3.11, before that, use (str, Enum).
btw2. auto() can't be used prior 3.11: it will return 0, 1, 2... not "MEMBERNAME"

)
with op.batch_alter_table("simulations", schema=None) as batch_op:
batch_op.alter_column("ingestion_status", nullable=False)
batch_op.alter_column("ingestion_version", nullable=False)

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is this ingestion version for?


simulation = database.get_simulation(simulation_uuid.hex)
simulation.ingestion_status = IngestionStatus.COMPLETED
database.session.commit()

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should also have a finally block to close the db connection

Comment thread src/simdb/workers/tasks.py Outdated
u.query.set("path", str(imas_path))
return u

if {p.name for p in children} == {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are there no other files possible? anything extra will fail this i think. you could check with issubset instead to see if the directory contains those files?

Comment thread src/simdb/remote/models.py Outdated
"""Validation result."""


class SimulationPostResponse3(BaseModel):

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dead code?

Comment thread docs/celery.md
simdb_celery worker

# Terminal 2: beat scheduler
simdb_celery beat

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

simdb_beat

Comment thread docs/developer_guide.md

```bash
# Worker
simdb_celery worker

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

simdb_worker

Comment thread docs/developer_guide.md
simdb_celery worker

# Beat scheduler (if needed)
simdb_celery beat

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

simdb_beat


# This job will copy and add the files to the simulation
copy_files = copy_files_task.si(
simulation.uuid,

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

python uuid is not json serializable, this should crash. pass the .hex maybe? or string. should maybe have a test for this that calls apply_sync

"""Add ingestion status

Revision ID: b2c52ee8ff12
Revises: 9e9a4a7cd639

@ioan-alexandra ioan-alexandra Jun 8, 2026

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for me alembic history

28bee3aa2429 -> b2c52ee8ff12 (head), Add ingestion status
9e9a4a7cd639 -> 28bee3aa2429, convert_metadata_to_json_column
21f2b1287595 -> 9e9a4a7cd639, Make watcher email not nullable
<base> -> 21f2b1287595, create init tables

think # Revises: should be 28bee3aa2429. some merging mismatch?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants