refactor(query): refactor flight shuffle#19458
Conversation
…cutor Add ExecutorWaker mechanism that allows processors to be woken up from external code. This enables external events to trigger processor scheduling in the pipeline executor. - Add ExecutorWaker struct with OnceLock-based callback binding - Pipeline creates waker automatically and exposes via get_waker() - QueryPipelineExecutor binds waker callback in try_create() - QueriesPipelineExecutor binds waker for each graph in send_graph() - Support multiple pipelines by proxying individual wakers to graph-level waker Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Add a ping-pong style flight exchange that guarantees at most one request is in-flight at any time. This enables efficient round-trip time measurement and flow control for flight data exchange operations. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
…ibuted exchange - Add ExchangeSinkBuffer with ping-pong mode for at most one in-flight request per destination - Use Vec instead of HashMap for pre-allocated channels and remote instances - Use ConcurrentQueue for lock-free wake target collection - Add WakeTarget with Arc and AtomicBool for efficient backpressure wake-up - Simplify PingPongCallback trait by removing exchange parameter - Add force_send and ready_send methods to PingPongExchange Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
…ibuted exchange Add ExchangeChannel trait as unified interface for local/remote data exchange. Implement RemoteChannel for serializing DataBlock to FlightData via Arrow IPC. Add BroadcastChannel for round-robin distribution across multiple channels. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
…onnection backpressure Introduces a thread-channel based exchange buffer for the do_exchange server side. Each network connection gets per-connection memory quota with backpressure via event-listener. Processors prioritize consuming from the connection with highest memory usage. ThreadChannelReader is a pure sync processor using FlaggedWaker + RecvFuture (async_channel's try-listen-retry pattern). Also adds Processor::on_id_set for deferred waker creation, ExchangeChannel::poll_send API, and do_exchange RPC handler. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…refactor/shuffle
…name to NetworkInbound* Move the do_exchange server-side inbound channel from exchange/ to a new network/ module, since it is a network-layer component. Split structs into separate files (inbound_channel.rs, inbound_quota.rs) for clarity. Type renames: - ThreadChannel → NetworkInboundChannel - ThreadChannelSet → NetworkInboundChannelSet - ThreadChannelSender → NetworkInboundSender - ThreadChannelReceiver → NetworkInboundReceiver Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…ng_exchange to network/ Move outbound network components from exchange/ to network/ module: - exchange_channel.rs → network/outbound_channel.rs - exchange_sink_buffer.rs → network/outbound_buffer.rs - ping_pong_exchange.rs → network/ping_pong_exchange.rs Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…d_transport Aligns naming with outbound_buffer/outbound_channel to form a clear layering: outbound_channel → outbound_buffer → outbound_transport Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…ning BoxFuture Replace the poll-based interface with an async interface for simpler usage by callers. The poll_fn bridge in RemoteChannel preserves the existing backpressure semantics of ExchangeSinkBuffer::poll_send. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…to OutboundChannel - Add InboundChannel trait with recv()/close() methods, implemented by NetworkInboundReceiver - Rename ExchangeChannel to OutboundChannel for symmetry - Update ThreadChannelReader to use Arc<dyn InboundChannel> instead of concrete NetworkInboundReceiver - RecvFuture now holds Arc<NetworkInboundChannel> directly Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…change_manager - InboundChannel trait: recv() -> BoxFuture, close() - OutboundChannel trait (renamed from ExchangeChannel): add_block() -> BoxFuture - NetworkInboundReceiver implements InboundChannel - ThreadChannelReader uses Arc<dyn InboundChannel> - exchange_manager::get_exchange_source_channel returns Arc<dyn InboundChannel> Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…nnel Pure synchronous sink processor (no async_process) that sends DataBlocks through an OutboundChannel. Uses FlaggedWaker to poll the BoxFuture from add_block, matching the same pattern as ThreadChannelReader. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Deduplicate FlaggedWaker from ThreadChannelReader and ThreadChannelWriter into network/flagged_waker.rs. Both processors now use FlaggedWaker::create() to wrap the executor waker. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
RecvFuture is now an implementation detail of NetworkInboundReceiver, no longer exposed through the InboundChannel trait. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…d DataExchange::get_id - Add DoExchangeParams struct (query_id, exchange_id, num_threads) serialized as JSON in x-exchange-params gRPC metadata, replacing individual metadata fields - Server uses num_threads from coordinator to create NetworkInboundChannelSet with correct size, eliminating race condition with max_threads fallback - Fix tid routing: RemoteChannel prepends tid as 2-byte u16 LE to app_metadata matching extract_tid(), instead of unused FlightDescriptor.cmd encoding - One PingPong per remote node with N tids multiplexed; strip_tid in receiver - Edge::ExchangeFragment now carries exchange_id + channels per node pair - Add DataExchange::get_id() with GlobalUniq id on all exchange variants - Rename num_channels to num_threads throughout for clarity Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…ions, add tests - FlaggedWaker now uses swap(true) to coalesce multiple wakes into one executor scheduling. reset() at event() start re-enables the next wake. - Fix backpressure race in ExchangeSinkBuffer::add_data and LocalOutboundChannel::add_block: use try-register-retry pattern to re-check condition after waker registration, preventing missed wakeups. - Add 7 unit tests for FlaggedWaker (coalescing, reset, clone sharing, concurrent wakes with barrier, concurrent wake-reset cycles). - Add 7 unit tests for LocalChannel (send/recv, close, backpressure trigger, no missed wakeup, concurrent send/recv, multi-channel). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Schema was being serialized into every FlightData's app_metadata on the send side and deserialized on the receive side. This is redundant since the schema is known from the exchange plan params. Now NetworkInboundReceiver stores the schema at creation time (injected from BroadcastExchangeParams) and uses it for all deserialization, matching the pattern already used by TransformExchangeDeserializer. Supports override_block_schema() for dynamic schema cases. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Docker Image for PR
|
The struct round-robins blocks across per-thread channels on a single remote node. The broadcast semantics live in BroadcastSendTransform, not here. Name it for what it actually does. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Merge multiple small FlightData items into a single batch (up to max_batch_bytes) on the outbound side, and split them back on the inbound side using zero-copy Bytes::split_to(). This reduces RTT overhead when exchanging many small blocks. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 47a3a9b104
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
|
|
||
| let tid = extract_tid(&data); | ||
|
|
||
| match self.sub_queues[tid].add_data(data).await { |
There was a problem hiding this comment.
Validate tid before indexing inbound sub-queues
add_data trusts the first two bytes of app_metadata as a thread id and directly indexes self.sub_queues[tid]. A malformed or mismatched tid (for example, peer sends num_threads=8 during setup but a packet with tid=9) will panic with an out-of-bounds access, aborting the do_exchange handling task and breaking the exchange for that query. This path handles network input, so it should reject invalid ids instead of panicking.
Useful? React with 👍 / 👎.
| let mut cursor = &meta[..ROW_HEADER_SIZE]; | ||
| let row_count: u32 = cursor |
There was a problem hiding this comment.
Check metadata length before slicing row-count header
deserialize_flight_data slices meta[..ROW_HEADER_SIZE] without verifying that meta is at least 4 bytes long after removing the marker. A malformed FlightData frame with marker 0x01 but short metadata will panic on slice bounds instead of returning an error, which again makes exchange handling crash-prone on bad network input.
Useful? React with 👍 / 👎.
🤖 CI Job Analysis
📊 Summary
❌ NO RETRY NEEDEDAll failures appear to be code/test issues requiring manual fixes. 🔍 Job Details
🤖 AboutAutomated analysis using job annotations to distinguish infrastructure issues (auto-retried) from code/test issues (manual fixes needed). |
I hereby agree to the terms of the CLA available at: https://docs.databend.com/dev/policies/cla/
Summary
refactor(query): refactor flight shuffle
Tests
Type of change
This change is