Сервис асинхронной обработки и нормализации собранных данных.
- читает
source.raw.v1из RabbitMQ; - валидирует raw-события;
- нормализует payload до формата
source.normalized.v1; - отправляет ingest mutation
ingestNormalizedItemвbackend-apiс заголовкомx-ingest-token; - после успешного ingest публикует нормализованные события в
source.normalized.v1; - при transient ошибках отправляет сообщение в retry queue, а poison messages складывает в DLQ.
- consumer RabbitMQ (
src/messaging/queue-client.ts); - нормализатор (
src/normalize.ts); - валидация raw/normalized схем через Ajv;
- GraphQL-клиент отправки в backend (
src/backend-client.ts); - Dockerfile и CI workflow.
RABBITMQ_URL- адрес RabbitMQ.QUEUE_RAW_EVENT- основная очередь входящих raw-событий, по умолчаниюsource.raw.v1.QUEUE_RETRY_EVENT- очередь retry, по умолчаниюsource.raw.retry.v1.QUEUE_DEAD_LETTER_EVENT- очередь dead-letter, по умолчаниюsource.raw.dlq.v1.QUEUE_NORMALIZED_EVENT- очередь нормализованных событий, по умолчаниюsource.normalized.v1.API_BASE_URL- базовый URLbackend-api, по умолчаниюhttp://localhost:3000.GRAPHQL_PATH- GraphQL path, по умолчанию/graphql.API_GRAPHQL_URL- явный GraphQL URL. Если задан, имеет приоритет надAPI_BASE_URL + GRAPHQL_PATH.API_INGEST_TOKEN- токен для заголовкаx-ingest-token. Должен совпадать сINGEST_API_TOKENвbackend-api.SHARED_CONTRACTS_DIR- путь кshared-contracts.RETRY_ATTEMPTS,RETRY_BASE_DELAY_MS,PREFETCH,LOG_LEVEL- параметры обработки и логирования.
cp .env.example .env
npm install
npm run start:devДля Docker Compose сервис ожидает:
- RabbitMQ на
RABBITMQ_URL; backend-apiнаAPI_BASE_URLилиAPI_GRAPHQL_URL;- смонтированный
shared-contractsвSHARED_CONTRACTS_DIR.
source.raw.v1- входящие события.source.raw.retry.v1- delayed retry queue.source.raw.dlq.v1- dead-letter queue.source.normalized.v1- успешно обработанные нормализованные события.
- Опубликуй валидное сообщение в
source.raw.v1. - В логах worker должны появиться строки
connected to rabbitmq,consuming queue,raw event validated,normalized event created,ingest success,published normalized event,message acknowledged. - Проверь, что сообщение появилось в
source.normalized.v1, а запись создалась вbackend-api. - Для transient ошибки временно выключи
backend-api: worker должен логироватьingest failedиretry scheduled, а не падать. - Для poison message отправь невалидный JSON или payload, нарушающий schema: worker должен логировать
message dead-lettered, сообщение должно уйти вsource.raw.dlq.v1.
- получает события от
scraper-service; - использует схемы из
shared-contracts; - отправляет результаты в
backend-api.