[SPARK-55450][SS][PYTHON][DOCS] Document admission control in PySpark streaming data sources#54807
[SPARK-55450][SS][PYTHON][DOCS] Document admission control in PySpark streaming data sources#54807jiteshsoni wants to merge 11 commits intoapache:masterfrom
Conversation
|
Thanks for your contribution! Could you make sure you have run the examples on your own? If you already did that, you can update the PR description (specifically Also we are asked to put the "model name" if you use LLM to generate the code, not the tool name. That is described in the PR template, specifically the form of string as well. Please update it. |
a2988f9 to
ec89c64
Compare
✅ Testing Verification - Examples Manually Tested on DatabricksI've successfully tested both examples on Databricks Dogfood Staging. Screenshots attached below showing the streaming query statistics. Example 1: Continuous ProcessingQuery Name:
Key Observations:
Example 2: Trigger.AvailableNow - Finite ProcessingQuery Name:
Key Observations:
Test Environment
Verified Functionality✅ Admission Control: Screenshots below show the Streaming Query Statistics from both runs: This comment was generated with GitHub MCP. |
Thanks for the feedback! I've addressed both items: #54807 (comment) (screenshot attached) ✅ Testing verification: Updated the PR description to confirm examples were manually tested on Databricks. I've also added a detailed comment below with screenshots showing |
…r.AvailableNow in PySpark streaming data sources This patch adds comprehensive documentation and examples for the new admission control and Trigger.AvailableNow features in Python streaming data sources (added in SPARK-55304). Changes: - New tutorial: streaming_admission_control.rst with step-by-step guide - Example: structured_blockchain_admission_control.py demonstrating: - Admission control via getDefaultReadLimit() and latestOffset() - Parallel partitioning (50 batches × 4 partitions = 200 tasks) - Trigger.AvailableNow for finite processing - SupportsTriggerAvailableNow mixin implementation Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
67a457f to
dd0f52f
Compare
| under the License. | ||
|
|
||
| =========================================== | ||
| Streaming Admission Control and Parallelism |
There was a problem hiding this comment.
I guess this is more than admission control? Parallelism should have handled in basic functionality.
If we think the content of streaming is too large to just be a part of python_data_source, we can extract into python_streaming_data_source and combine the content in python_data_source and python_streaming_data_source into the new page. We'd want to deduplicate the content during the merge.
There was a problem hiding this comment.
Kept it in one file and only about admission control
HeartSaVioR
left a comment
There was a problem hiding this comment.
Sorry for the late. I'm still reviewing on the example code. For the doc change, we'd probably be better to recheck whether we are repetitive with existing doc. I'm OK with separating streaming out from existing doc if combining the part of admission control and trigger.availableNow would warrant having a separate doc (lengthy).
|
|
||
| - **Controlling data ingestion rate**: Prevent overwhelming the system with too much data at once | ||
| - **Backpressure management**: Handle incoming data at a sustainable rate | ||
| - **Parallel processing**: Distribute work across multiple executors efficiently |
There was a problem hiding this comment.
Again if we want to describe the normal streaming source and admission control separately, this and below should go to the normal streaming source.
|
|
||
| Additional Functions for Trigger.AvailableNow | ||
| ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | ||
| To support batching with ``Trigger.AvailableNow``, implement these additional functions: |
There was a problem hiding this comment.
SupportsTriggerAvailableNow is not a function; if we had documented with Trigger.AvailableNow already, we could probably omit.
Also, getDefaultReadLimit() is for admission control, not for Trigger.AvailableNow. I'd move this but mention it's optional and default to all rows.
There was a problem hiding this comment.
Removed all the Trigger.AvailableNow stuff. The example and docs now focus purely on admission control with getDefaultReadLimit() and latestOffset(start, limit).
| - **prepareForTriggerAvailableNow()**: Captures the target offset at query start | ||
| - **SupportsTriggerAvailableNow mixin**: Enables deterministic batch processing | ||
|
|
||
| Example: Blockchain Streaming with Admission Control |
There was a problem hiding this comment.
We probably don't need to duplicate this - the doc is probably good for the classdoc of example class, or just a wall of text for the comment of the example file. With the IDE it's easier to open the example code and their implementation side by side. With the AI, I assume either works (we expect the examples in Spark repo to be fed as context) so it doesn't really matter where to put.
There was a problem hiding this comment.
I have made minimal changes to the RST file, but if you want, I can just get rid of all changes in the RST file. I can just write an example.
| - reportLatestOffset() for progress monitoring | ||
| - Comparing behavior between default trigger and Trigger.AvailableNow | ||
|
|
||
| The example runs TWO streaming queries: |
There was a problem hiding this comment.
Maybe it's easier to go with param to define which trigger to use, rather than two queries to run?
There was a problem hiding this comment.
It's even OK if we can split out files for data source implemenetation and runners. It isn't strictly necessary to be in a single file.
HeartSaVioR
left a comment
There was a problem hiding this comment.
The change for example looks OK in overall - some comments. The doc change may need some refinement with existing content.
| - reportLatestOffset() for progress monitoring | ||
| - Comparing behavior between default trigger and Trigger.AvailableNow | ||
|
|
||
| The example runs TWO streaming queries: |
There was a problem hiding this comment.
It's even OK if we can split out files for data source implemenetation and runners. It isn't strictly necessary to be in a single file.
| return "blockchain_example" | ||
|
|
||
| def schema(self) -> str: | ||
| return "block_number INT, block_hash STRING, " "timestamp LONG, transaction_count INT" |
There was a problem hiding this comment.
nit: any reason we split if they fit to single line?
|
|
||
| df = spark.readStream.format("blockchain_example").load() | ||
|
|
||
| blocks_processed: List[Dict[str, Any]] = [] |
There was a problem hiding this comment.
Maybe we could just track the summary/stats? I understand no one will run this for pretty much longer to hit memory issue, but just to remove out such a concern.
There was a problem hiding this comment.
I see we are limiting the duration, then this would be OK.
There was a problem hiding this comment.
Removed the result accumulation entirely - just using console sink now.
|
|
||
| try: | ||
| # Run for a few batches | ||
| time.sleep(3) |
There was a problem hiding this comment.
Just 3 seconds? How many batches would be produced? Maybe 10 seconds or even 30 seconds?
There was a problem hiding this comment.
Bumped to 30 seconds.
Changes based on HeartSaVioR's review: - Remove Trigger.AvailableNow and SupportsTriggerAvailableNow (focus on admission control only) - Remove parallelism references (basic functionality, not admission control specific) - Delete standalone streaming_admission_control.rst (duplicates existing docs) - Add admission control section to python_data_source.rst instead - Simplify example to single streaming query demonstrating getDefaultReadLimit() - Increase sleep time from 3s to 10s per reviewer suggestion - Fix schema string formatting (single line) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- CHAIN_HEIGHT: 100 -> 10000 blocks - time.sleep: 10 -> 30 seconds Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Remove foreachBatch, use format("console") instead
- Match the tested notebook configuration exactly
- Remove unused List import
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
…ted notebook This updates the example and tutorial to match the manually validated ReadMaxRows(20) flow and clarifies that full batches process 20 rows while the final batch may be smaller.
…on control changes This keeps the admission control documentation aligned with the tested example while reverting unrelated streaming reader edits so the doc diff stays focused.
… for protobuf codegen check The upstream apache/spark gen-protos.sh script was updated to use ruff instead of black for formatting, but the CI workflow was not updated to install ruff. This caused the "Python CodeGen check" job to fail with "ruff: command not found". Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
…of black for protobuf codegen check" This reverts commit 649023d.
HeartSaVioR
left a comment
There was a problem hiding this comment.
Almost there, thanks for your work on this and especially your patience! My bad to not look into this in timely manner.
I found this PR to now scope to only admission control. Would you mind helping us having trigger.availableNow be documented as well?
Also, Claude found some "existing" issue on docstring and example, as below:
Pre-existing docstring bug (not this PR's fault): The latestOffset docstring in the base class (line 759 of datasource.py) uses limit.maxRows but the actual ReadMaxRows dataclass field is max_rows. The PR correctly uses max_rows. We might want to file a separate fix for the docstring, but that's out of scope.
The existing tutorial FakeStreamReader uses the old latestOffset() signature with no parameters (line 241 of python_data_source.rst), while this PR correctly uses the new latestOffset(self, start, limit) signature. This inconsistency already exists in the codebase and is a separate issue. The PR's usage is the recommended approach.
Would you mind helping fixing this in a separate PR?
Thanks again!
| class MyStreamReader(DataSourceStreamReader): | ||
|
|
||
| def getDefaultReadLimit(self) -> ReadLimit: | ||
| """Limit each micro-batch to at most 20 rows.""" |
There was a problem hiding this comment.
Probably good to mention that this is just an example and in practice they want to set this up based on given source options. We don't probably need to demonstrate enclosing code as well since it'd be much more verbose.
| return {"offset": end} | ||
|
|
||
| When Spark uses the default ``ReadMaxRows(20)`` limit, each full micro-batch | ||
| reads 20 rows, and the final micro-batch reads only the remaining rows when |
There was a problem hiding this comment.
We can remove the part of , and the final micro-batch reads only the remaining rows when... since conceptually there is no final micro-batch.
Also maybe we could say "reads at most 20 rows, depending on the available rows". That might give a feeling the data source should give exactly 20 rows.
| elif isinstance(limit, ReadAllAvailable): | ||
| end_block = self.CHAIN_HEIGHT | ||
| else: | ||
| end_block = min(start_block + 20, self.CHAIN_HEIGHT) |
There was a problem hiding this comment.
nit: we can throw an exception - engine shouldn't provide such limit parameter. It should be either the one specified in default read limit, or ReadAllAvailable.
- Add docstring note that ReadMaxRows(20) is just an example; in practice configure the limit based on source options - Reword explanatory text: "reads at most 20 rows, depending on the available rows" instead of implying a final micro-batch - Replace silent fallback else branch with ValueError in both the tutorial snippet and the example file, since the engine should only provide the default read limit or ReadAllAvailable
|
https://github.com/jiteshsoni/spark/runs/70175734795 Only failed in protobuf breaking change CI which is unrelated. |
|
Thanks! Merging to master. |



What changes were proposed in this pull request?
This PR adds documentation and an example for admission control in PySpark custom streaming data sources (SPARK-55304).
Changes include:
Updated tutorial documentation (
python/docs/source/tutorial/sql/python_data_source.rst):getDefaultReadLimit()returningReadMaxRows(n)to limit batch sizelatestOffset(start, limit)respects theReadLimitparameterExample file (
examples/src/main/python/sql/streaming/structured_blockchain_admission_control.py):getDefaultReadLimit()andlatestOffset()Why are the changes needed?
Users need documentation and practical examples to implement admission control in custom streaming sources (introduced in SPARK-55304).
Does this PR introduce any user-facing change?
No. Documentation and examples only.
How was this patch tested?
Testing approach:
Test notebook: pr_54807_admission_control_notebook
What was verified:
PythonMicroBatchStreamWithAdmissionControlactive in Streaming UISample batch output:
{ "batchId": 78, "numInputRows": 20, "sources": [{ "description": "PythonMicroBatchStreamWithAdmissionControl", "startOffset": {"block_number": 1560}, "endOffset": {"block_number": 1580}, "numInputRows": 20 }] }Was this patch authored or co-authored using generative AI tooling?
Yes (Claude Opus 4.5)
🤖 Generated with Claude Code