Skip to content

Open-Inflation/parser

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

26 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

parser

Существует оркестрационный модуль и суб-модули парсеров.

Структура парсеров:

  • src/openinflation_parser/parsers/fixprice/
  • src/openinflation_parser/parsers/chizhik/

Структура оркестратора:

  • src/openinflation_parser/orchestration/cli.py (CLI и запуск)
  • src/openinflation_parser/orchestration/server.py (WS server и диспетчер)
  • src/openinflation_parser/orchestration/worker.py (worker runtime)
  • src/openinflation_parser/orchestration/job_store.py (история задач + sqlite)
  • src/openinflation_parser/orchestration/requests.py (pydantic-валидация WS payload)

Парсеры имеют команды:

  1. Собрать каталог категорий
  2. Собрать товаров
  3. Собрать информацию о магазине
  4. Собрать информацию о городах

Пайплайн такой:

  1. На основе доступной ОЗУ и на основе доступных прокси (или их отцуцтвия) оркестратор выбирает кол-во воркеров (управление через командную строку, важно - они НЕ демоны). Основное управление через вебсокет
  2. Парсеры возвращают данные в шаблонизированном виде библиотеки openinflation-dataclass
  3. После того как оркестратор получил всю нужную информацию об конкертном магазине, он сохраняет json на диск и запаковывает в .gz архив

Реализовано

  • openinflation_parser.orchestrator:
    • выбор числа воркеров по ОЗУ и прокси;
    • пул non-daemon воркеров на multiprocessing.Process;
    • очередь задач + сбор статусов;
    • pydantic-валидация входящих websocket-команд;
    • lifecycle history задач (TTL + max history) с опциональной sqlite-персистентностью;
    • websocket-управление задачами.
  • openinflation_parser.parsers.fixprice.FixPriceParser:
    • работает на библиотеке fixprice_api;
    • маппит API-контракты в openinflation_dataclass (Category, Card, AdministrativeUnit, RetailUnit).
  • openinflation_parser.parsers.chizhik.ChizhikParser:
    • работает на библиотеке chizhik_api;
    • маппит API-контракты в openinflation_dataclass (Category, Card, AdministrativeUnit, RetailUnit).
  • openinflation_parser.parsers.perekrestok.PerekrestokParser:
    • работает на библиотеке perekrestok_api;
    • маппит API-контракты в openinflation_dataclass (Category, Card, AdministrativeUnit, RetailUnit).

Установка

pip install -e .

Также можно установить зависимости через requirements.txt.

Запуск оркестратора

python -m openinflation_parser.orchestrator \
  --host 127.0.0.1 \
  --port 8765 \
  --parser fixprice \
  --output-dir ./output \
  --auth-password changeme \
  --country-id 2 \
  --full-catalog \
  --max-pages-per-category 200 \
  --api-timeout-ms 120000 \
  --ram-per-worker-gb 1.5 \
  --log-level INFO \
  --proxy-file ./proxies.txt

Для подробного трассинга можно использовать --log-level DEBUG. Для отправки телеметрии в Uptrace укажите --uptrace-dsn или переменную окружения UPTRACE_DSN.

Важно: оркестратор в этом режиме только поднимает воркеры и ждёт задачи по WebSocket (submit_store). Чтобы запустить парсинг сразу при старте, передайте --bootstrap-store-code <PFM>. В режиме --full-catalog парсер обходит только leaf-категории (subcategories), а root-категория запрашивается только если у неё нет детей.

Полезные флаги:

  • --strict-validation — включить строгую pydantic-валидацию mapped моделей (для CI/dev).
  • --jobs-max-history — лимит terminal jobs в памяти/БД.
  • --jobs-retention-sec — TTL terminal jobs.
  • --jobs-db-path — путь к sqlite с состоянием задач ("" чтобы отключить).
  • --max-jobs-per-worker — перезапуск воркера после N задач (по умолчанию 1) для ограничения роста памяти в долгих прогонах.
  • --auth-password — статический пароль для websocket-команд (если задан, обязателен в каждом payload как password).
  • --download-host / --download-port — где поднимать FastAPI endpoint загрузки.
  • --download-url-ttl-sec — время жизни подписанной ссылки.
  • --download-secret — HMAC secret для подписи ссылок (если не передан, генерируется при старте).
  • --uptrace-dsn — DSN для Uptrace (или UPTRACE_DSN).
  • --uptrace-env — deployment environment для Uptrace (dev/stage/prod и т.д.).
  • --uptrace-orchestrator-service-name — имя сервиса оркестратора в Uptrace.
  • --uptrace-worker-service-name — имя сервиса воркеров в Uptrace.

Uptrace: оркестратор и воркеры как независимые сущности

openinflation-orchestrator \
  --host 127.0.0.1 \
  --port 8765 \
  --parser fixprice \
  --bootstrap-store-code C001 \
  --log-level DEBUG \
  --uptrace-dsn "$UPTRACE_DSN" \
  --uptrace-env prod \
  --uptrace-orchestrator-service-name openinflation-orchestrator \
  --uptrace-worker-service-name openinflation-worker

Логи теперь содержат стабильные поля svc/role/worker/job/store/parser/trace/span, что упрощает фильтрацию в терминале и корреляцию в Uptrace.

Пример для Чижика:

openinflation-orchestrator \
  --parser chizhik \
  --host 127.0.0.1 \
  --port 8765 \
  --output-dir ./output \
  --include-images \
  --max-pages-per-category 200 \
  --bootstrap-store-code moskva \
  --log-level DEBUG

Пример для Перекрёстка:

openinflation-orchestrator \
  --parser perekrestok \
  --host 127.0.0.1 \
  --port 8765 \
  --output-dir ./output \
  --city-id 81 \
  --include-images \
  --max-pages-per-category 200 \
  --bootstrap-store-code 1 \
  --log-level DEBUG

Пример для FixPrice:

openinflation-orchestrator \
  --parser fixprice \
  --host 127.0.0.1 \
  --port 8765 \
  --output-dir ./output \
  --city-id 3 \
  --include-images \
  --max-pages-per-category 200 \
  --bootstrap-store-code C001 \
  --log-level DEBUG

Основные websocket actions

Если оркестратор запущен с --auth-password, добавляйте "password":"<your-password>" в каждый payload.

Для submit_store действует правило приоритета:

  • значения, переданные в WebSocket запросе, имеют приоритет над значениями из CLI-дефолтов при запуске оркестратора;
  • если поле не передано (или null), используется дефолт запуска.

Пример: include_images в submit_store переопределяет --include-images для конкретной задачи.

  • {"action":"ping","password":"<your-password>"}
  • {"action":"submit_store","store_code":"C001","city_id":3,"password":"<your-password>"}
  • {"action":"submit_store","store_code":"C001","city_id":3,"full_catalog":true,"max_pages_per_category":200,"products_per_page":27,"api_timeout_ms":120000,"password":"<your-password>"}
  • {"action":"submit_store","parser":"chizhik","store_code":"moskva","full_catalog":true,"max_pages_per_category":200,"api_timeout_ms":120000,"password":"<your-password>"}
  • {"action":"submit_store","parser":"perekrestok","store_code":"1","city_id":81,"full_catalog":true,"max_pages_per_category":200,"api_timeout_ms":120000,"password":"<your-password>"}
  • {"action":"status","password":"<your-password>"}
  • {"action":"status","job_id":"<id>","password":"<your-password>"}
  • {"action":"jobs","password":"<your-password>"}
  • {"action":"workers","password":"<your-password>"}
  • {"action":"stream_job_log","job_id":"<id>","tail_lines":200,"password":"<your-password>"}
  • {"action":"shutdown","password":"<your-password>"}

stream_job_log работает как потоковый websocket-action: после запроса сервер отправляет события waiting -> snapshot -> append -> end (или error). Параметр tail_lines задает, сколько последних строк отдать в первом snapshot (по умолчанию 200, максимум 5000).

Для успешной задачи в status/jobs возвращаются:

  • output_json, output_gz
  • download_url (подписанный URL FastAPI /download)
  • download_sha256 (контрольная сумма архива .tar.gz)
  • download_expires_at (UTC, до какого момента URL валиден)

После истечения download_expires_at оркестратор удаляет файлы output_json/output_gz/output_worker_log и очищает download-поля у задачи, чтобы не накапливался файловый кэш.

Результат задачи сохраняется файлами <store_code>_<timestamp>.json и <store_code>_<timestamp>.tar.gz в output_dir. Архив содержит meta.json, worker.log (лог процесса воркера по этой задаче) и каталог images/; поля изображений в meta.json содержат относительные пути до файлов внутри архива. При запуске с --include-images воркер сохраняет изображения сразу на диск в cache-директорию задачи и передает оркестратору путь к уже готовому дереву images/ (без накопления base64 изображений в RAM на весь job). Оркестратор только включает эти файлы в .tar.gz и удаляет временный cache задачи после упаковки. Расширение файлов изображений определяется по сигнатуре содержимого (jpg/png/webp/...). Если вместо изображения приходит HTML/ошибка WAF, файл не сохраняется, а в meta.json ставится null/пусто.

About

No description, website, or topics provided.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Languages