Существует оркестрационный модуль и суб-модули парсеров.
Структура парсеров:
src/openinflation_parser/parsers/fixprice/src/openinflation_parser/parsers/chizhik/
Структура оркестратора:
src/openinflation_parser/orchestration/cli.py(CLI и запуск)src/openinflation_parser/orchestration/server.py(WS server и диспетчер)src/openinflation_parser/orchestration/worker.py(worker runtime)src/openinflation_parser/orchestration/job_store.py(история задач + sqlite)src/openinflation_parser/orchestration/requests.py(pydantic-валидация WS payload)
Парсеры имеют команды:
- Собрать каталог категорий
- Собрать товаров
- Собрать информацию о магазине
- Собрать информацию о городах
Пайплайн такой:
- На основе доступной ОЗУ и на основе доступных прокси (или их отцуцтвия) оркестратор выбирает кол-во воркеров (управление через командную строку, важно - они НЕ демоны). Основное управление через вебсокет
- Парсеры возвращают данные в шаблонизированном виде библиотеки openinflation-dataclass
- После того как оркестратор получил всю нужную информацию об конкертном магазине, он сохраняет json на диск и запаковывает в .gz архив
openinflation_parser.orchestrator:- выбор числа воркеров по ОЗУ и прокси;
- пул non-daemon воркеров на
multiprocessing.Process; - очередь задач + сбор статусов;
- pydantic-валидация входящих websocket-команд;
- lifecycle history задач (TTL + max history) с опциональной sqlite-персистентностью;
- websocket-управление задачами.
openinflation_parser.parsers.fixprice.FixPriceParser:- работает на библиотеке
fixprice_api; - маппит API-контракты в
openinflation_dataclass(Category,Card,AdministrativeUnit,RetailUnit).
- работает на библиотеке
openinflation_parser.parsers.chizhik.ChizhikParser:- работает на библиотеке
chizhik_api; - маппит API-контракты в
openinflation_dataclass(Category,Card,AdministrativeUnit,RetailUnit).
- работает на библиотеке
openinflation_parser.parsers.perekrestok.PerekrestokParser:- работает на библиотеке
perekrestok_api; - маппит API-контракты в
openinflation_dataclass(Category,Card,AdministrativeUnit,RetailUnit).
- работает на библиотеке
pip install -e .Также можно установить зависимости через requirements.txt.
python -m openinflation_parser.orchestrator \
--host 127.0.0.1 \
--port 8765 \
--parser fixprice \
--output-dir ./output \
--auth-password changeme \
--country-id 2 \
--full-catalog \
--max-pages-per-category 200 \
--api-timeout-ms 120000 \
--ram-per-worker-gb 1.5 \
--log-level INFO \
--proxy-file ./proxies.txtДля подробного трассинга можно использовать --log-level DEBUG.
Для отправки телеметрии в Uptrace укажите --uptrace-dsn или переменную окружения UPTRACE_DSN.
Важно: оркестратор в этом режиме только поднимает воркеры и ждёт задачи по WebSocket (submit_store).
Чтобы запустить парсинг сразу при старте, передайте --bootstrap-store-code <PFM>.
В режиме --full-catalog парсер обходит только leaf-категории (subcategories), а root-категория запрашивается только если у неё нет детей.
Полезные флаги:
--strict-validation— включить строгую pydantic-валидацию mapped моделей (для CI/dev).--jobs-max-history— лимит terminal jobs в памяти/БД.--jobs-retention-sec— TTL terminal jobs.--jobs-db-path— путь к sqlite с состоянием задач (""чтобы отключить).--max-jobs-per-worker— перезапуск воркера после N задач (по умолчанию1) для ограничения роста памяти в долгих прогонах.--auth-password— статический пароль для websocket-команд (если задан, обязателен в каждом payload какpassword).--download-host/--download-port— где поднимать FastAPI endpoint загрузки.--download-url-ttl-sec— время жизни подписанной ссылки.--download-secret— HMAC secret для подписи ссылок (если не передан, генерируется при старте).--uptrace-dsn— DSN для Uptrace (илиUPTRACE_DSN).--uptrace-env— deployment environment для Uptrace (dev/stage/prodи т.д.).--uptrace-orchestrator-service-name— имя сервиса оркестратора в Uptrace.--uptrace-worker-service-name— имя сервиса воркеров в Uptrace.
openinflation-orchestrator \
--host 127.0.0.1 \
--port 8765 \
--parser fixprice \
--bootstrap-store-code C001 \
--log-level DEBUG \
--uptrace-dsn "$UPTRACE_DSN" \
--uptrace-env prod \
--uptrace-orchestrator-service-name openinflation-orchestrator \
--uptrace-worker-service-name openinflation-workerЛоги теперь содержат стабильные поля svc/role/worker/job/store/parser/trace/span,
что упрощает фильтрацию в терминале и корреляцию в Uptrace.
Пример для Чижика:
openinflation-orchestrator \
--parser chizhik \
--host 127.0.0.1 \
--port 8765 \
--output-dir ./output \
--include-images \
--max-pages-per-category 200 \
--bootstrap-store-code moskva \
--log-level DEBUGПример для Перекрёстка:
openinflation-orchestrator \
--parser perekrestok \
--host 127.0.0.1 \
--port 8765 \
--output-dir ./output \
--city-id 81 \
--include-images \
--max-pages-per-category 200 \
--bootstrap-store-code 1 \
--log-level DEBUGПример для FixPrice:
openinflation-orchestrator \
--parser fixprice \
--host 127.0.0.1 \
--port 8765 \
--output-dir ./output \
--city-id 3 \
--include-images \
--max-pages-per-category 200 \
--bootstrap-store-code C001 \
--log-level DEBUGЕсли оркестратор запущен с --auth-password, добавляйте "password":"<your-password>" в каждый payload.
Для submit_store действует правило приоритета:
- значения, переданные в WebSocket запросе, имеют приоритет над значениями из CLI-дефолтов при запуске оркестратора;
- если поле не передано (или
null), используется дефолт запуска.
Пример: include_images в submit_store переопределяет --include-images для конкретной задачи.
{"action":"ping","password":"<your-password>"}{"action":"submit_store","store_code":"C001","city_id":3,"password":"<your-password>"}{"action":"submit_store","store_code":"C001","city_id":3,"full_catalog":true,"max_pages_per_category":200,"products_per_page":27,"api_timeout_ms":120000,"password":"<your-password>"}{"action":"submit_store","parser":"chizhik","store_code":"moskva","full_catalog":true,"max_pages_per_category":200,"api_timeout_ms":120000,"password":"<your-password>"}{"action":"submit_store","parser":"perekrestok","store_code":"1","city_id":81,"full_catalog":true,"max_pages_per_category":200,"api_timeout_ms":120000,"password":"<your-password>"}{"action":"status","password":"<your-password>"}{"action":"status","job_id":"<id>","password":"<your-password>"}{"action":"jobs","password":"<your-password>"}{"action":"workers","password":"<your-password>"}{"action":"stream_job_log","job_id":"<id>","tail_lines":200,"password":"<your-password>"}{"action":"shutdown","password":"<your-password>"}
stream_job_log работает как потоковый websocket-action: после запроса сервер отправляет события
waiting -> snapshot -> append -> end (или error). Параметр tail_lines задает, сколько
последних строк отдать в первом snapshot (по умолчанию 200, максимум 5000).
Для успешной задачи в status/jobs возвращаются:
output_json,output_gzdownload_url(подписанный URL FastAPI/download)download_sha256(контрольная сумма архива.tar.gz)download_expires_at(UTC, до какого момента URL валиден)
После истечения download_expires_at оркестратор удаляет файлы output_json/output_gz/output_worker_log
и очищает download-поля у задачи, чтобы не накапливался файловый кэш.
Результат задачи сохраняется файлами <store_code>_<timestamp>.json и <store_code>_<timestamp>.tar.gz в output_dir.
Архив содержит meta.json, worker.log (лог процесса воркера по этой задаче) и каталог images/; поля изображений в meta.json содержат относительные пути до файлов внутри архива.
При запуске с --include-images воркер сохраняет изображения сразу на диск в cache-директорию задачи и передает оркестратору путь к уже готовому дереву images/ (без накопления base64 изображений в RAM на весь job).
Оркестратор только включает эти файлы в .tar.gz и удаляет временный cache задачи после упаковки.
Расширение файлов изображений определяется по сигнатуре содержимого (jpg/png/webp/...).
Если вместо изображения приходит HTML/ошибка WAF, файл не сохраняется, а в meta.json ставится null/пусто.