Skip to content

refactor(query): refactor flight shuffle#19458

Merged
zhang2014 merged 35 commits intodatabendlabs:mainfrom
zhang2014:refactor/shuffle
Mar 4, 2026
Merged

refactor(query): refactor flight shuffle#19458
zhang2014 merged 35 commits intodatabendlabs:mainfrom
zhang2014:refactor/shuffle

Conversation

@zhang2014
Copy link
Copy Markdown
Member

@zhang2014 zhang2014 commented Feb 13, 2026

I hereby agree to the terms of the CLA available at: https://docs.databend.com/dev/policies/cla/

Summary

refactor(query): refactor flight shuffle

Tests

  • Unit Test
  • Logic Test
  • Benchmark Test
  • No Test - Explain why

Type of change

  • Bug Fix (non-breaking change which fixes an issue)
  • New Feature (non-breaking change which adds functionality)
  • Breaking Change (fix or feature that could cause existing functionality not to work as expected)
  • Documentation Update
  • Refactoring
  • Performance Improvement
  • Other (please describe):

This change is Reviewable

zhang2014 and others added 17 commits February 3, 2026 16:21
…cutor

Add ExecutorWaker mechanism that allows processors to be woken up from
external code. This enables external events to trigger processor scheduling
in the pipeline executor.

- Add ExecutorWaker struct with OnceLock-based callback binding
- Pipeline creates waker automatically and exposes via get_waker()
- QueryPipelineExecutor binds waker callback in try_create()
- QueriesPipelineExecutor binds waker for each graph in send_graph()
- Support multiple pipelines by proxying individual wakers to graph-level waker

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Add a ping-pong style flight exchange that guarantees at most one request
is in-flight at any time. This enables efficient round-trip time measurement
and flow control for flight data exchange operations.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
…ibuted exchange

- Add ExchangeSinkBuffer with ping-pong mode for at most one in-flight request per destination
- Use Vec instead of HashMap for pre-allocated channels and remote instances
- Use ConcurrentQueue for lock-free wake target collection
- Add WakeTarget with Arc and AtomicBool for efficient backpressure wake-up
- Simplify PingPongCallback trait by removing exchange parameter
- Add force_send and ready_send methods to PingPongExchange

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
…ibuted exchange

Add ExchangeChannel trait as unified interface for local/remote data exchange.
Implement RemoteChannel for serializing DataBlock to FlightData via Arrow IPC.
Add BroadcastChannel for round-robin distribution across multiple channels.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
…onnection backpressure

Introduces a thread-channel based exchange buffer for the do_exchange server side.
Each network connection gets per-connection memory quota with backpressure via
event-listener. Processors prioritize consuming from the connection with highest
memory usage. ThreadChannelReader is a pure sync processor using FlaggedWaker +
RecvFuture (async_channel's try-listen-retry pattern). Also adds Processor::on_id_set
for deferred waker creation, ExchangeChannel::poll_send API, and do_exchange RPC handler.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…name to NetworkInbound*

Move the do_exchange server-side inbound channel from exchange/ to a new
network/ module, since it is a network-layer component. Split structs into
separate files (inbound_channel.rs, inbound_quota.rs) for clarity.

Type renames:
- ThreadChannel → NetworkInboundChannel
- ThreadChannelSet → NetworkInboundChannelSet
- ThreadChannelSender → NetworkInboundSender
- ThreadChannelReceiver → NetworkInboundReceiver

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…ng_exchange to network/

Move outbound network components from exchange/ to network/ module:
- exchange_channel.rs → network/outbound_channel.rs
- exchange_sink_buffer.rs → network/outbound_buffer.rs
- ping_pong_exchange.rs → network/ping_pong_exchange.rs

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…d_transport

Aligns naming with outbound_buffer/outbound_channel to form a clear layering:
outbound_channel → outbound_buffer → outbound_transport

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…ning BoxFuture

Replace the poll-based interface with an async interface for simpler usage
by callers. The poll_fn bridge in RemoteChannel preserves the existing
backpressure semantics of ExchangeSinkBuffer::poll_send.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…to OutboundChannel

- Add InboundChannel trait with recv()/close() methods, implemented by
  NetworkInboundReceiver
- Rename ExchangeChannel to OutboundChannel for symmetry
- Update ThreadChannelReader to use Arc<dyn InboundChannel> instead of
  concrete NetworkInboundReceiver
- RecvFuture now holds Arc<NetworkInboundChannel> directly

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…change_manager

- InboundChannel trait: recv() -> BoxFuture, close()
- OutboundChannel trait (renamed from ExchangeChannel): add_block() -> BoxFuture
- NetworkInboundReceiver implements InboundChannel
- ThreadChannelReader uses Arc<dyn InboundChannel>
- exchange_manager::get_exchange_source_channel returns Arc<dyn InboundChannel>

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…nnel

Pure synchronous sink processor (no async_process) that sends DataBlocks
through an OutboundChannel. Uses FlaggedWaker to poll the BoxFuture from
add_block, matching the same pattern as ThreadChannelReader.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Deduplicate FlaggedWaker from ThreadChannelReader and ThreadChannelWriter
into network/flagged_waker.rs. Both processors now use FlaggedWaker::create()
to wrap the executor waker.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
RecvFuture is now an implementation detail of NetworkInboundReceiver,
no longer exposed through the InboundChannel trait.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…d DataExchange::get_id

- Add DoExchangeParams struct (query_id, exchange_id, num_threads) serialized
  as JSON in x-exchange-params gRPC metadata, replacing individual metadata fields
- Server uses num_threads from coordinator to create NetworkInboundChannelSet
  with correct size, eliminating race condition with max_threads fallback
- Fix tid routing: RemoteChannel prepends tid as 2-byte u16 LE to app_metadata
  matching extract_tid(), instead of unused FlightDescriptor.cmd encoding
- One PingPong per remote node with N tids multiplexed; strip_tid in receiver
- Edge::ExchangeFragment now carries exchange_id + channels per node pair
- Add DataExchange::get_id() with GlobalUniq id on all exchange variants
- Rename num_channels to num_threads throughout for clarity

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@github-actions github-actions Bot added the pr-refactor this PR changes the code base without new features or bugfix label Feb 13, 2026
zhang2014 and others added 12 commits February 15, 2026 22:08
…ions, add tests

- FlaggedWaker now uses swap(true) to coalesce multiple wakes into one
  executor scheduling. reset() at event() start re-enables the next wake.
- Fix backpressure race in ExchangeSinkBuffer::add_data and
  LocalOutboundChannel::add_block: use try-register-retry pattern to
  re-check condition after waker registration, preventing missed wakeups.
- Add 7 unit tests for FlaggedWaker (coalescing, reset, clone sharing,
  concurrent wakes with barrier, concurrent wake-reset cycles).
- Add 7 unit tests for LocalChannel (send/recv, close, backpressure
  trigger, no missed wakeup, concurrent send/recv, multi-channel).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Schema was being serialized into every FlightData's app_metadata on the
send side and deserialized on the receive side. This is redundant since
the schema is known from the exchange plan params.

Now NetworkInboundReceiver stores the schema at creation time (injected
from BroadcastExchangeParams) and uses it for all deserialization,
matching the pattern already used by TransformExchangeDeserializer.
Supports override_block_schema() for dynamic schema cases.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@zhang2014 zhang2014 added the ci-cloud Build docker image for cloud test label Mar 3, 2026
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Mar 3, 2026

Docker Image for PR

  • tag: pr-19458-1397df6-1772507102

note: this image tag is only available for internal use.

zhang2014 and others added 5 commits March 3, 2026 12:24
The struct round-robins blocks across per-thread channels on a single
remote node. The broadcast semantics live in BroadcastSendTransform,
not here. Name it for what it actually does.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Merge multiple small FlightData items into a single batch (up to
max_batch_bytes) on the outbound side, and split them back on the
inbound side using zero-copy Bytes::split_to(). This reduces RTT
overhead when exchanging many small blocks.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@zhang2014 zhang2014 marked this pull request as ready for review March 3, 2026 17:34
Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 47a3a9b104

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".


let tid = extract_tid(&data);

match self.sub_queues[tid].add_data(data).await {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Validate tid before indexing inbound sub-queues

add_data trusts the first two bytes of app_metadata as a thread id and directly indexes self.sub_queues[tid]. A malformed or mismatched tid (for example, peer sends num_threads=8 during setup but a packet with tid=9) will panic with an out-of-bounds access, aborting the do_exchange handling task and breaking the exchange for that query. This path handles network input, so it should reject invalid ids instead of panicking.

Useful? React with 👍 / 👎.

Comment on lines +326 to +327
let mut cursor = &meta[..ROW_HEADER_SIZE];
let row_count: u32 = cursor
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Check metadata length before slicing row-count header

deserialize_flight_data slices meta[..ROW_HEADER_SIZE] without verifying that meta is at least 4 bytes long after removing the marker. A malformed FlightData frame with marker 0x01 but short metadata will panic on slice bounds instead of returning an error, which again makes exchange handling crash-prone on bad network input.

Useful? React with 👍 / 👎.

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Mar 4, 2026

🤖 CI Job Analysis

Workflow: 22650886470

📊 Summary

  • Total Jobs: 85
  • Failed Jobs: 1
  • Retryable: 0
  • Code Issues: 1

NO RETRY NEEDED

All failures appear to be code/test issues requiring manual fixes.

🔍 Job Details

  • linux / test_stateless_cluster: Not retryable (Code/Test)

🤖 About

Automated analysis using job annotations to distinguish infrastructure issues (auto-retried) from code/test issues (manual fixes needed).

@zhang2014 zhang2014 merged commit fbd1d72 into databendlabs:main Mar 4, 2026
87 of 89 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

ci-cloud Build docker image for cloud test pr-refactor this PR changes the code base without new features or bugfix

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant