A robust, asynchronous task processing system built with FastAPI, SQLModel, and a generic Background Worker.
This project implements the Producer-Consumer design pattern to handle heavy background operations without blocking the main application thread. It is fully containerized using Docker and supports data persistence.
The system decouples the Producer (API) from the Consumer (Worker) using a thread-safe queue and a persistent database state.
sequenceDiagram
participant Client
participant API as FastAPI Server
participant DB as SQLite DB
participant Queue as In-Memory Queue
participant Worker as Background Worker
Note over Client, API: 1. Submission Phase
Client->>API: POST /jobs (Create Task)
API->>DB: Save Job (Status: PENDING)
API->>Queue: Push Job to Queue
API-->>Client: Return Job ID (200 OK)
Note right of Client: Client receives ID immediately (Non-blocking)
Note over Queue, Worker: 2. Processing Phase
loop Background Process
Worker->>Queue: Dequeue Job
Worker->>DB: Update Status (RUNNING)
Worker->>Worker: Process Payload (Simulated Heavy Task)
alt Success
Worker->>DB: Update Status (DONE) + Result
else Failure
Worker->>DB: Update Status (FAILED) + Error Info
end
end
- Asynchronous Processing: Non-blocking task submission. The API responds immediately while the worker processes data in the background.
- Persistence: Uses SQLite (via SQLModel) to ensure job states (
PENDING,RUNNING,DONE) survive server restarts. - Thread Safety: Implements
threading.Lockmechanisms to prevent race conditions when accessing the shared queue. - Dockerized: Fully ready for production with Docker & Docker Compose.
- Robust Error Handling: Captures worker failures and updates job status with error details in the database.
- Backend Framework: FastAPI
- Database & ORM: SQLModel (SQLAlchemy wrapper) with SQLite
- Concurrency: Python
threadingmodule (Daemon Threads) - Containerization: Docker & Docker Compose
- Validation: Pydantic
The easiest way to run the system is using Docker Compose. This ensures a consistent environment without installing Python dependencies locally.
-
Clone the repository:
git clone https://github.com/BitBOY21/distributed-task-queue.git cd distributed-task-queue -
Run with Docker Compose:
docker-compose up --build
-
Access the API: Open your browser at http://localhost:8000/docs to see the Swagger UI.
If you prefer running it directly on your machine:
-
Install dependencies:
pip install -r requirements.txt
-
Start the server:
uvicorn main:app --reload
You can test the API directly via the Swagger UI (/docs), or use curl.
Create a task to be processed in the background.
Request: POST /jobs
{
"type": "email_processing",
"payload": {
"email": "user@example.com",
"subject": "Welcome to the platform!"
}
}Response:
{
"id": "a1b2c3d4-e5f6-4a5b-...",
"status": "PENDING",
"type": "email_processing",
"created_at": "2025-10-27T10:00:00"
}Poll the status of a specific job using its ID.
Request: GET /jobs/{job_id}
Response (After processing):
{
"id": "a1b2c3d4-e5f6-4a5b-...",
"status": "DONE",
"result": "Processed successfully: {'email': 'user@example.com'}",
"created_at": "2025-10-27T10:00:00"
}distributed-task-queue/
├── app/
│ ├── api/ # API Route handlers (Producer logic)
│ ├── core/ # Core logic (DB connection, Queue implementation, Worker)
│ └── models/ # SQLModel Database definitions (Job, JobStatus)
├── main.py # Application entry point & Lifecycle manager
├── Dockerfile # Docker build instructions
├── docker-compose.yml # Container orchestration & Volume mapping
└── requirements.txt # Python dependencies
To scale this system for a production environment with multiple servers, the following upgrades are planned:
- Redis / RabbitMQ: Replace the in-memory
dequewith an external message broker to allow multiple worker instances across different machines. - PostgreSQL: Migrate from SQLite to PostgreSQL for better concurrency handling.
- WebSockets: Implement real-time status updates to the client (push notification) instead of polling.