[wip] Add InferencePipeline: 3-stage async pipeline for batch inference#826
Open
[wip] Add InferencePipeline: 3-stage async pipeline for batch inference#826
Conversation
InferencePipeline wires together preprocessing, batched GPU inference (via DynamicBatcher), and postprocessing with proper thread pool dispatch and backpressure — so users only define three functions: preprocess_fn (per-item, sync or async, optional executor) inference_fn (batched, async — the DynamicBatcher process_fn) postprocess_fn (per-item, receives original input + result) Key design decisions: - No new abstraction for batching — reuses DynamicBatcher internally - Bounded asyncio.Queue between preprocess and batcher for backpressure - Supports both sync and async stage functions (auto-detected) - Sync stages can be dispatched to a ThreadPoolExecutor - postprocess_fn receives the original raw item alongside the inference result so it has full context (filename, metadata, etc.) - Streaming via async generator (run) or batch collect (run_all) - Exposes underlying batcher + stats for monitoring/tuning This pattern emerged from the embed_wikipedia and multimodal-retrieval examples, where every GPU inference use case independently wired the same load → preprocess → batch-infer → postprocess pipeline. The abstraction eliminates that boilerplate while keeping full control over DynamicBatcher tuning knobs (cost budgeting, batch size, timeouts). Signed-off-by: Ketan Umare <kumare3@users.noreply.github.com>
| Yields: | ||
| Postprocessed results, one per input item, in order. | ||
| """ | ||
| if not self._batcher.is_running: |
Contributor
There was a problem hiding this comment.
why can’t we start the batcher by default rather than requiring the user to do it?
|
|
||
| preprocess_executor: | ||
| Executor for sync ``preprocess_fn`` calls. Pass a | ||
| ``ThreadPoolExecutor`` for CPU-bound preprocessing. |
Contributor
There was a problem hiding this comment.
i think threadpoolexecutor is for i/o bound tasks no?
Contributor
There was a problem hiding this comment.
looks like ProcessPoolExecutor is what we should be supporting for CPU-bound tasks.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
InferencePipeline wires together preprocessing, batched GPU inference (via DynamicBatcher), and postprocessing with proper thread pool dispatch and backpressure — so users only define three functions:
preprocess_fn (per-item, sync or async, optional executor)
inference_fn (batched, async — the DynamicBatcher process_fn)
postprocess_fn (per-item, receives original input + result)
Key design decisions:
This pattern emerged from the embed_wikipedia and multimodal-retrieval examples, where every GPU inference use case independently wired the same load → preprocess → batch-infer → postprocess pipeline. The abstraction eliminates that boilerplate while keeping full control over DynamicBatcher tuning knobs (cost budgeting, batch size, timeouts).