Skip to content

[wip] Add InferencePipeline: 3-stage async pipeline for batch inference#826

Open
kumare3 wants to merge 1 commit intomainfrom
inference-pipeline
Open

[wip] Add InferencePipeline: 3-stage async pipeline for batch inference#826
kumare3 wants to merge 1 commit intomainfrom
inference-pipeline

Conversation

@kumare3
Copy link
Copy Markdown
Contributor

@kumare3 kumare3 commented Mar 23, 2026

InferencePipeline wires together preprocessing, batched GPU inference (via DynamicBatcher), and postprocessing with proper thread pool dispatch and backpressure — so users only define three functions:

preprocess_fn (per-item, sync or async, optional executor)
inference_fn (batched, async — the DynamicBatcher process_fn)
postprocess_fn (per-item, receives original input + result)

Key design decisions:

  • No new abstraction for batching — reuses DynamicBatcher internally
  • Bounded asyncio.Queue between preprocess and batcher for backpressure
  • Supports both sync and async stage functions (auto-detected)
  • Sync stages can be dispatched to a ThreadPoolExecutor
  • postprocess_fn receives the original raw item alongside the inference result so it has full context (filename, metadata, etc.)
  • Streaming via async generator (run) or batch collect (run_all)
  • Exposes underlying batcher + stats for monitoring/tuning

This pattern emerged from the embed_wikipedia and multimodal-retrieval examples, where every GPU inference use case independently wired the same load → preprocess → batch-infer → postprocess pipeline. The abstraction eliminates that boilerplate while keeping full control over DynamicBatcher tuning knobs (cost budgeting, batch size, timeouts).

InferencePipeline wires together preprocessing, batched GPU inference
(via DynamicBatcher), and postprocessing with proper thread pool dispatch
and backpressure — so users only define three functions:

  preprocess_fn  (per-item, sync or async, optional executor)
  inference_fn   (batched, async — the DynamicBatcher process_fn)
  postprocess_fn (per-item, receives original input + result)

Key design decisions:
- No new abstraction for batching — reuses DynamicBatcher internally
- Bounded asyncio.Queue between preprocess and batcher for backpressure
- Supports both sync and async stage functions (auto-detected)
- Sync stages can be dispatched to a ThreadPoolExecutor
- postprocess_fn receives the original raw item alongside the inference
  result so it has full context (filename, metadata, etc.)
- Streaming via async generator (run) or batch collect (run_all)
- Exposes underlying batcher + stats for monitoring/tuning

This pattern emerged from the embed_wikipedia and multimodal-retrieval
examples, where every GPU inference use case independently wired the
same load → preprocess → batch-infer → postprocess pipeline. The
abstraction eliminates that boilerplate while keeping full control over
DynamicBatcher tuning knobs (cost budgeting, batch size, timeouts).

Signed-off-by: Ketan Umare <kumare3@users.noreply.github.com>
Yields:
Postprocessed results, one per input item, in order.
"""
if not self._batcher.is_running:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why can’t we start the batcher by default rather than requiring the user to do it?


preprocess_executor:
Executor for sync ``preprocess_fn`` calls. Pass a
``ThreadPoolExecutor`` for CPU-bound preprocessing.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think threadpoolexecutor is for i/o bound tasks no?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like ProcessPoolExecutor is what we should be supporting for CPU-bound tasks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants