Сервис-обработчик для управления своими оркестраторами:
- выдача задач оркестраторам;
- прием результатов выполнения;
- отправка изображений на отдельный storage-сервер (
../storage) без конвертации; - сохранение нормализованного артефакта в БД.
Конвертация изображений выполняется на стороне storage.
receiver автоматически отправляет задачи в оркестратор parser по WebSocket (submit_store/status), если включен bridge.
При старте применяется легаси-патч для crawl_tasks.deleted_at/include_images.
Изменения схемы task_runs выполняются вручную.
Используется SQLAlchemy ORM и MySQL. Для локальных тестов поддерживается SQLite.
Таблица crawl_tasks включает обязательные поля:
citystorelast_crawl_atfrequency_hoursinclude_images- запрашивать изображения карточек вsubmit_store
Дополнительно есть служебные поля lease/статуса для безопасной выдачи задач нескольким оркестраторам.
В таблицу task_runs:
dispatch_meta_json- только служебные метаданные bridge (remote_job_id, служебные статусы);processed_images- количество успешно загруженных изображений;status,error_message,finished_at.
Детальный payload скрапера и сырой список загрузки изображений в task_runs больше не сохраняются.
Нормализованные данные сохраняются в run_artifacts*.
В run_artifacts хранится parser_name (какой парсер выполнил обход), сырой payload_json не хранится.
Перед запуском новой версии примените SQL-миграции:
mysql -h127.0.0.1 -P3306 -uUSER -p DBNAME < migrations/manual/20260226_drop_raw_task_run_storage.sql
mysql -h127.0.0.1 -P3306 -uUSER -p DBNAME < migrations/manual/20260226_run_artifacts_parser_name_drop_payload.sql
mysql -h127.0.0.1 -P3306 -uUSER -p DBNAME < migrations/manual/20260226_task_runs_assigned_invariant.sqlDATABASE_URL- URL БД, например:- MySQL:
mysql+pymysql://user:pass@127.0.0.1:3306/receiver - SQLite (по умолчанию):
sqlite:///data/receiver.db
- MySQL:
STORAGE_BASE_URL- base URL storage-сервера (http://127.0.0.1:8000)STORAGE_API_TOKEN- Bearer-токен дляPOST /api/imagesstorage-сервераPARSER_SRC_PATH- путь к../parser/srcдля мягкой интеграции parser-модуляLEASE_TTL_MINUTES- время аренды задачи оркестратором (по умолчанию30)ORCHESTRATOR_MAX_CLAIMS_PER_CYCLE- максимум новых задач за один цикл bridge (по умолчанию5)ORCHESTRATOR_ASSIGNED_PARALLELISM- параллелизм обработкиassignedrun в bridge (по умолчанию4)ORCHESTRATOR_AUTO_DISPATCH_ENABLED- включить авто-диспетчер в parser WebSocket (trueпо умолчанию)ORCHESTRATOR_WS_URL- адрес parser orchestrator WS (по умолчаниюws://127.0.0.1:8765)ORCHESTRATOR_WS_PASSWORD- пароль, если parser запущен с--auth-passwordORCHESTRATOR_POLL_INTERVAL_SEC- интервал poll статуса задач (по умолчанию5)ORCHESTRATOR_MANAGER_NAME- имя внутреннего оркестратора в БД receiver (по умолчаниюparser-ws)ORCHESTRATOR_SUBMIT_INCLUDE_IMAGES- дефолтinclude_imagesдля новых задач (если в API/dashboard явно не задано)ORCHESTRATOR_UPLOAD_ARCHIVE_IMAGES- послеsuccessзагружать изображения изoutput_gzв storage (trueпо умолчанию)ARTIFACT_DOWNLOAD_MAX_BYTES- лимит размера скачиваемого артефакта поdownload_url(по умолчанию268435456)ARTIFACT_JSON_MEMBER_MAX_BYTES- лимит размера JSON файла внутри артефакта/output_json(по умолчанию16777216)IMAGE_ARCHIVE_MAX_FILE_BYTES- лимит размера одной картинки вimages/архива (по умолчанию12582912)IMAGE_ARCHIVE_MAX_FILES- лимит числа картинок вimages/архива (по умолчанию2000)CONVERTER_TRIGGER_URL- URL trigger API конвертера (напримерhttp://127.0.0.1:8090/trigger); если не задан, trigger не отправляетсяCONVERTER_TRIGGER_TOKEN- Bearer token для trigger API конвертераCONVERTER_TRIGGER_TIMEOUT_SEC- timeout trigger-запроса (по умолчанию3)CONVERTER_TRIGGER_RECEIVER_DB- опционально переопределяетreceiver_dbв payload trigger-запросаCONVERTER_TRIGGER_CATALOG_DB- опционально переопределяетcatalog_dbв payload trigger-запросаCONVERTER_TRIGGER_BATCH_SIZE- опционально переопределяетbatch_sizeв payload trigger-запросаCONVERTER_TRIGGER_MAX_BATCHES- опционально переопределяетmax_batchesв payload trigger-запроса
python -m venv .venv
source .venv/bin/activate
pip install -e .
pip install -e .[test]uvicorn app.main:app --host 0.0.0.0 --port 8090Есть отдельный исполняемый Python-скрипт dashboard.py.
Это отдельное web-приложение для:
- управления задачами (
create/update active/frequency/parser/include_images); - просмотра общей статистики (
tasks/runs/orchestrators); - просмотра последних запусков;
- отдельной страницы ошибок валидации, подлежащих устранению (
/validation-errors).
Важно: дашборд не управляет last_crawl_at; это поле обновляет основной скрипт при сохранении результатов обхода.
Запуск:
./dashboard.py --host 127.0.0.1 --port 8091или
python dashboard.py --host 127.0.0.1 --port 8091Открыть в браузере:
http://127.0.0.1:8091
POST /api/orchestrators/register- регистрация оркестратора (возвращает token)POST /api/orchestrators/heartbeat- heartbeat (Bearer token)POST /api/orchestrators/next-task- получить следующую просроченную задачу (Bearer token)POST /api/orchestrators/results- отправить результат run + артефакты архива + изображения (Bearer token)POST /api/tasks- создать задачуGET /api/tasks- список задачPATCH /api/tasks/{task_id}- обновить задачуGET /api/validation-errors- список актуальных ошибок dataclass validation (dashboard; по последнему успешному run каждой задачи)GET /api/runs/{run_id}- получить runGET /healthz- healthcheck
POST /api/orchestrators/results
{
"run_id": "...",
"status": "success",
"payload": {"any": "json"},
"images": [
{
"filename": "shelf.jpg",
"content_base64": "..."
}
],
"upload_images_from_archive": true,
"output_json": "/abs/path/store_20260225.json",
"output_gz": "/abs/path/store_20260225.tar.gz",
"download_url": "http://127.0.0.1:8766/download?...",
"download_sha256": "...",
"download_expires_at": "2026-02-25T12:00:00+00:00"
}images[]- прямые изображения для отправки в storage;upload_images_from_archive=true- дополнительно взять файлы изimages/внутриoutput_gzи отправить в storage.
pytest