Background workers#83
Conversation
06bf048 to
68938a1
Compare
|
|
||
|
|
||
| class IngestionStatus(str, Enum): | ||
| QUEUED = "queued" |
There was a problem hiding this comment.
here the statuses are lowercase but in the alembic migration they are uppercase. im assuming these will fail
There was a problem hiding this comment.
Yes ...
Suggest to make any (str, Enum) subclass' member values equal to the member name.
Having Pydantic models including such enums will be more pleasant to work with... (e.g. deserializing upon assignment with a matching str)
btw, StrEnumwas introduced in 3.11, before that, use (str, Enum).
btw2. auto() can't be used prior 3.11: it will return 0, 1, 2... not "MEMBERNAME"
| ) | ||
| with op.batch_alter_table("simulations", schema=None) as batch_op: | ||
| batch_op.alter_column("ingestion_status", nullable=False) | ||
| batch_op.alter_column("ingestion_version", nullable=False) |
There was a problem hiding this comment.
what is this ingestion version for?
|
|
||
| simulation = database.get_simulation(simulation_uuid.hex) | ||
| simulation.ingestion_status = IngestionStatus.COMPLETED | ||
| database.session.commit() |
There was a problem hiding this comment.
should also have a finally block to close the db connection
| u.query.set("path", str(imas_path)) | ||
| return u | ||
|
|
||
| if {p.name for p in children} == { |
There was a problem hiding this comment.
are there no other files possible? anything extra will fail this i think. you could check with issubset instead to see if the directory contains those files?
| """Validation result.""" | ||
|
|
||
|
|
||
| class SimulationPostResponse3(BaseModel): |
| simdb_celery worker | ||
|
|
||
| # Terminal 2: beat scheduler | ||
| simdb_celery beat |
|
|
||
| ```bash | ||
| # Worker | ||
| simdb_celery worker |
| simdb_celery worker | ||
|
|
||
| # Beat scheduler (if needed) | ||
| simdb_celery beat |
|
|
||
| # This job will copy and add the files to the simulation | ||
| copy_files = copy_files_task.si( | ||
| simulation.uuid, |
There was a problem hiding this comment.
python uuid is not json serializable, this should crash. pass the .hex maybe? or string. should maybe have a test for this that calls apply_sync
| """Add ingestion status | ||
|
|
||
| Revision ID: b2c52ee8ff12 | ||
| Revises: 9e9a4a7cd639 |
There was a problem hiding this comment.
for me alembic history
28bee3aa2429 -> b2c52ee8ff12 (head), Add ingestion status
9e9a4a7cd639 -> 28bee3aa2429, convert_metadata_to_json_column
21f2b1287595 -> 9e9a4a7cd639, Make watcher email not nullable
<base> -> 21f2b1287595, create init tables
think # Revises: should be 28bee3aa2429. some merging mismatch?
This PR introduces simulation ingestion via Celery background workers.
When simulations are uploaded through the new v1.3 API endpoint, file copying and status tracking are now handled by Celery tasks (copy_files_task chained with complete_ingestion_task) rather than blocking the HTTP request.
The ingestion pipeline tracks status though the following: QUEUED → COPYING → COPIED → VALIDATING → VALIDATED → COMPLETED, with failure variants.