Skip to content

[SPARK-55450][SS][PYTHON][DOCS] Document admission control in PySpark streaming data sources#54807

Closed
jiteshsoni wants to merge 11 commits intoapache:masterfrom
jiteshsoni:SPARK-55450-admission-control-docs
Closed

[SPARK-55450][SS][PYTHON][DOCS] Document admission control in PySpark streaming data sources#54807
jiteshsoni wants to merge 11 commits intoapache:masterfrom
jiteshsoni:SPARK-55450-admission-control-docs

Conversation

@jiteshsoni
Copy link
Copy Markdown
Contributor

@jiteshsoni jiteshsoni commented Mar 14, 2026

What changes were proposed in this pull request?

This PR adds documentation and an example for admission control in PySpark custom streaming data sources (SPARK-55304).

Changes include:

  1. Updated tutorial documentation (python/docs/source/tutorial/sql/python_data_source.rst):

    • Added "Admission Control for Streaming Readers" section
    • Documents getDefaultReadLimit() returning ReadMaxRows(n) to limit batch size
    • Shows how latestOffset(start, limit) respects the ReadLimit parameter
  2. Example file (examples/src/main/python/sql/streaming/structured_blockchain_admission_control.py):

    • Demonstrates admission control via getDefaultReadLimit() and latestOffset()
    • Simulates blockchain data source with controlled batch sizes (20 blocks per batch)
    • Simple, focused example showing backpressure management

Why are the changes needed?

Users need documentation and practical examples to implement admission control in custom streaming sources (introduced in SPARK-55304).

Does this PR introduce any user-facing change?

No. Documentation and examples only.

How was this patch tested?

Testing approach:

  • Ran the example on Databricks Dogfood Staging (DBR 17.3 / Spark 4.0)
  • Used the Spark Streaming UI to verify admission control works correctly

Test notebook: pr_54807_admission_control_notebook

What was verified:

  1. Batch sizes: Each micro-batch processed exactly 20 blocks (admission control working)
  2. Consistent behavior: 79 batches completed in ~28 seconds, all with 20 rows
  3. Stream reader: PythonMicroBatchStreamWithAdmissionControl active in Streaming UI

Sample batch output:

{
  "batchId": 78,
  "numInputRows": 20,
  "sources": [{
    "description": "PythonMicroBatchStreamWithAdmissionControl",
    "startOffset": {"block_number": 1560},
    "endOffset": {"block_number": 1580},
    "numInputRows": 20
  }]
}

Was this patch authored or co-authored using generative AI tooling?

Yes (Claude Opus 4.5)

🤖 Generated with Claude Code

@HeartSaVioR
Copy link
Copy Markdown
Contributor

HeartSaVioR commented Mar 15, 2026

Thanks for your contribution! Could you make sure you have run the examples on your own? If you already did that, you can update the PR description (specifically How was this patch tested?) to include it.

Also we are asked to put the "model name" if you use LLM to generate the code, not the tool name. That is described in the PR template, specifically the form of string as well. Please update it.

@jiteshsoni jiteshsoni force-pushed the SPARK-55450-admission-control-docs branch from a2988f9 to ec89c64 Compare March 16, 2026 20:45
@jiteshsoni
Copy link
Copy Markdown
Contributor Author

jiteshsoni commented Mar 16, 2026

✅ Testing Verification - Examples Manually Tested on Databricks

I've successfully tested both examples on Databricks Dogfood Staging. Screenshots attached below showing the streaming query statistics.

Example 1: Continuous Processing

Query Name: blockchain_continuous

  • Trigger: Default trigger with 3-second processing intervals
  • Result: ✅ Admission control successfully limits each batch to 20 blocks
  • Termination: I used a timer to kill the query

Key Observations:

  • Input Rows: Consistently 20 records per batch (admission control working)
  • Batch Duration: Stable around 200-300ms per batch
  • Input Rate histogram shows steady processing across multiple batches
Cont

Example 2: Trigger.AvailableNow - Finite Processing

Query Name: blockchain_available_now

  • Batches Completed: 50 batches ✅
  • Total Blocks Processed: 1,000 blocks (50 batches × 20 blocks each)
  • Batch Size: 20 blocks per batch (admission control via getDefaultReadLimit())
  • Trigger: availableNow=True
  • Result: ✅ All 1,000 blocks processed in exactly 50 batches as designed
  • Termination: Automatic completion when all data processed ✅

Key Observations:

  • Input Rows histogram shows 50 batches total (exactly as expected)
  • prepareForTriggerAvailableNow() successfully captured target offset of 1,000 blocks
  • Query automatically terminated after processing all available data
Admission Control 50 batches

Test Environment

Verified Functionality

Admission Control: getDefaultReadLimit() returns ReadMaxRows(20) successfully limits batch size
Parallel Partitioning: 4 partitions per batch (5 blocks each) working correctly
Continuous Processing: Default trigger processes batches continuously until stopped
Trigger.AvailableNow: Finite processing mode completes all 50 batches and terminates automatically
SupportsTriggerAvailableNow: Mixin implementation working correctly
5 Core Functions: initialOffset(), latestOffset(), partitions(), read(), commit() all working


Screenshots below show the Streaming Query Statistics from both runs:


This comment was generated with GitHub MCP.

@jiteshsoni
Copy link
Copy Markdown
Contributor Author

jiteshsoni commented Mar 16, 2026

Thanks for your contribution! Could you make sure you have run the examples on your own? If you already did that, you can update the PR description (specifically How was this patch tested?) to include it.

Also we are asked to put the "model name" if you use LLM to generate the code, not the tool name. That is described in the PR template, specifically the form of string as well. Please update it.

Thanks for the feedback! I've addressed both items: #54807 (comment) (screenshot attached)

✅ Testing verification: Updated the PR description to confirm examples were manually tested on Databricks. I've also added a detailed comment below with screenshots showing

…r.AvailableNow in PySpark streaming data sources

This patch adds comprehensive documentation and examples for the new admission control
and Trigger.AvailableNow features in Python streaming data sources (added in SPARK-55304).

Changes:
- New tutorial: streaming_admission_control.rst with step-by-step guide
- Example: structured_blockchain_admission_control.py demonstrating:
  - Admission control via getDefaultReadLimit() and latestOffset()
  - Parallel partitioning (50 batches × 4 partitions = 200 tasks)
  - Trigger.AvailableNow for finite processing
  - SupportsTriggerAvailableNow mixin implementation

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
@jiteshsoni jiteshsoni force-pushed the SPARK-55450-admission-control-docs branch 3 times, most recently from 67a457f to dd0f52f Compare March 16, 2026 21:51
@jiteshsoni jiteshsoni changed the title [SPARK-55450][SS][PYTHON][DOCS] Document admission control and Trigger.AvailableNow in PySpark streaming data sources [SPARK-55450][SS][PYTHON][DOCS] Document admission control in PySpark streaming data sources Mar 16, 2026
under the License.

===========================================
Streaming Admission Control and Parallelism
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this is more than admission control? Parallelism should have handled in basic functionality.

If we think the content of streaming is too large to just be a part of python_data_source, we can extract into python_streaming_data_source and combine the content in python_data_source and python_streaming_data_source into the new page. We'd want to deduplicate the content during the merge.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kept it in one file and only about admission control

Copy link
Copy Markdown
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the late. I'm still reviewing on the example code. For the doc change, we'd probably be better to recheck whether we are repetitive with existing doc. I'm OK with separating streaming out from existing doc if combining the part of admission control and trigger.availableNow would warrant having a separate doc (lengthy).


- **Controlling data ingestion rate**: Prevent overwhelming the system with too much data at once
- **Backpressure management**: Handle incoming data at a sustainable rate
- **Parallel processing**: Distribute work across multiple executors efficiently
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again if we want to describe the normal streaming source and admission control separately, this and below should go to the normal streaming source.


Additional Functions for Trigger.AvailableNow
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
To support batching with ``Trigger.AvailableNow``, implement these additional functions:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SupportsTriggerAvailableNow is not a function; if we had documented with Trigger.AvailableNow already, we could probably omit.

Also, getDefaultReadLimit() is for admission control, not for Trigger.AvailableNow. I'd move this but mention it's optional and default to all rows.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed all the Trigger.AvailableNow stuff. The example and docs now focus purely on admission control with getDefaultReadLimit() and latestOffset(start, limit).

- **prepareForTriggerAvailableNow()**: Captures the target offset at query start
- **SupportsTriggerAvailableNow mixin**: Enables deterministic batch processing

Example: Blockchain Streaming with Admission Control
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably don't need to duplicate this - the doc is probably good for the classdoc of example class, or just a wall of text for the comment of the example file. With the IDE it's easier to open the example code and their implementation side by side. With the AI, I assume either works (we expect the examples in Spark repo to be fed as context) so it doesn't really matter where to put.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have made minimal changes to the RST file, but if you want, I can just get rid of all changes in the RST file. I can just write an example.

- reportLatestOffset() for progress monitoring
- Comparing behavior between default trigger and Trigger.AvailableNow

The example runs TWO streaming queries:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it's easier to go with param to define which trigger to use, rather than two queries to run?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's even OK if we can split out files for data source implemenetation and runners. It isn't strictly necessary to be in a single file.

Copy link
Copy Markdown
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change for example looks OK in overall - some comments. The doc change may need some refinement with existing content.

- reportLatestOffset() for progress monitoring
- Comparing behavior between default trigger and Trigger.AvailableNow

The example runs TWO streaming queries:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's even OK if we can split out files for data source implemenetation and runners. It isn't strictly necessary to be in a single file.

return "blockchain_example"

def schema(self) -> str:
return "block_number INT, block_hash STRING, " "timestamp LONG, transaction_count INT"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: any reason we split if they fit to single line?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed


df = spark.readStream.format("blockchain_example").load()

blocks_processed: List[Dict[str, Any]] = []
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could just track the summary/stats? I understand no one will run this for pretty much longer to hit memory issue, but just to remove out such a concern.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see we are limiting the duration, then this would be OK.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the result accumulation entirely - just using console sink now.


try:
# Run for a few batches
time.sleep(3)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just 3 seconds? How many batches would be produced? Maybe 10 seconds or even 30 seconds?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bumped to 30 seconds.

@jiteshsoni
Copy link
Copy Markdown
Contributor Author

jiteshsoni commented Apr 4, 2026

✅ Testing Verification - Simplified Admission Control Example

Tested the updated example on Databricks (DBR 17.3+ / Spark 4.0+). The admission control is working correctly.

Link to the notebook

image

Test Configuration

  • CHAIN_HEIGHT: 10,000 blocks
  • Batch size: 20 blocks per batch (via getDefaultReadLimit())
  • Run duration: ~30 seconds

Results

  • Completed batches: 79 batches in 28 seconds
  • Input rows per batch: Consistently 20 rows (admission control working)
  • Processing rate: ~60 records/second
  • Batch duration: ~342ms average

Key Evidence from Streaming UI

{
  "id" : "9a6de7cc-941a-44f9-9a68-802eb6980bb1",
  "runId" : "34ad9a8f-061d-4cc9-a348-ca18b1f7d59d",
  "name" : "admission_control_test",
  "timestamp" : "2026-04-04T00:57:33.342Z",
  "batchId" : 92,
  "batchDuration" : 241,
  "numInputRows" : 20,
  "inputRowsPerSecond" : 69.0,
  "processedRowsPerSecond" : 83.0,
  "durationMs" : {
    "addBatch" : 108,
    "collectSourceMetrics" : 0,
    "commitOffsets" : 56,
    "getBatch" : 0,
    "latestOffset" : 1,
    "queryPlanning" : 9,
    "triggerExecution" : 241,
    "walCommit" : 126
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "org.apache.spark.sql.execution.datasources.v2.python.PythonMicroBatchStreamWithAdmissionControl@662a64dd",
    "startOffset" : {
      "block_number" : 1840
    },
    "endOffset" : {
      "block_number" : 1860
    },
    "latestOffset" : null,
    "numInputRows" : 20,
    "inputRowsPerSecond" : 69.0,
    "processedRowsPerSecond" : 83.0
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.execution.streaming.ConsoleTable$@5970c47a",
    "numOutputRows" : 20
  }
}

Verified Functionality

  • getDefaultReadLimit() returns ReadMaxRows(20) - limits batch size correctly
  • latestOffset(start, limit) respects the ReadLimit parameter
  • ✅ Each micro-batch processes exactly 20 blocks as expected
  • ✅ Streaming query runs continuously with controlled ingestion rate

jiteshsoni and others added 7 commits April 3, 2026 17:41
Changes based on HeartSaVioR's review:
- Remove Trigger.AvailableNow and SupportsTriggerAvailableNow (focus on admission control only)
- Remove parallelism references (basic functionality, not admission control specific)
- Delete standalone streaming_admission_control.rst (duplicates existing docs)
- Add admission control section to python_data_source.rst instead
- Simplify example to single streaming query demonstrating getDefaultReadLimit()
- Increase sleep time from 3s to 10s per reviewer suggestion
- Fix schema string formatting (single line)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- CHAIN_HEIGHT: 100 -> 10000 blocks
- time.sleep: 10 -> 30 seconds

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Remove foreachBatch, use format("console") instead
- Match the tested notebook configuration exactly
- Remove unused List import

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
…ted notebook

This updates the example and tutorial to match the manually validated ReadMaxRows(20)
flow and clarifies that full batches process 20 rows while the final batch may be
smaller.
…on control changes

This keeps the admission control documentation aligned with the tested example
while reverting unrelated streaming reader edits so the doc diff stays focused.
… for protobuf codegen check

The upstream apache/spark gen-protos.sh script was updated to use ruff
instead of black for formatting, but the CI workflow was not updated to
install ruff. This caused the "Python CodeGen check" job to fail with
"ruff: command not found".

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
…of black for protobuf codegen check"

This reverts commit 649023d.
Copy link
Copy Markdown
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Almost there, thanks for your work on this and especially your patience! My bad to not look into this in timely manner.

I found this PR to now scope to only admission control. Would you mind helping us having trigger.availableNow be documented as well?

Also, Claude found some "existing" issue on docstring and example, as below:

  1. Pre-existing docstring bug (not this PR's fault): The latestOffset docstring in the base class (line 759 of datasource.py) uses limit.maxRows but the actual ReadMaxRows dataclass field is max_rows. The PR correctly uses max_rows. We might want to file a separate fix for the docstring, but that's out of scope.

  2. The existing tutorial FakeStreamReader uses the old latestOffset() signature with no parameters (line 241 of python_data_source.rst), while this PR correctly uses the new latestOffset(self, start, limit) signature. This inconsistency already exists in the codebase and is a separate issue. The PR's usage is the recommended approach.

Would you mind helping fixing this in a separate PR?

Thanks again!

class MyStreamReader(DataSourceStreamReader):

def getDefaultReadLimit(self) -> ReadLimit:
"""Limit each micro-batch to at most 20 rows."""
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably good to mention that this is just an example and in practice they want to set this up based on given source options. We don't probably need to demonstrate enclosing code as well since it'd be much more verbose.

return {"offset": end}

When Spark uses the default ``ReadMaxRows(20)`` limit, each full micro-batch
reads 20 rows, and the final micro-batch reads only the remaining rows when
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove the part of , and the final micro-batch reads only the remaining rows when... since conceptually there is no final micro-batch.

Also maybe we could say "reads at most 20 rows, depending on the available rows". That might give a feeling the data source should give exactly 20 rows.

elif isinstance(limit, ReadAllAvailable):
end_block = self.CHAIN_HEIGHT
else:
end_block = min(start_block + 20, self.CHAIN_HEIGHT)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we can throw an exception - engine shouldn't provide such limit parameter. It should be either the one specified in default read limit, or ReadAllAvailable.

- Add docstring note that ReadMaxRows(20) is just an example; in practice
  configure the limit based on source options
- Reword explanatory text: "reads at most 20 rows, depending on the
  available rows" instead of implying a final micro-batch
- Replace silent fallback else branch with ValueError in both the tutorial
  snippet and the example file, since the engine should only provide the
  default read limit or ReadAllAvailable
Copy link
Copy Markdown
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@HeartSaVioR
Copy link
Copy Markdown
Contributor

https://github.com/jiteshsoni/spark/runs/70175734795

Only failed in protobuf breaking change CI which is unrelated.

@HeartSaVioR
Copy link
Copy Markdown
Contributor

Thanks! Merging to master.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants