Skip to content

Integrate prefetcher engine with zonal buckets#805

Draft
googlyrahman wants to merge 1 commit intofsspec:mainfrom
ankitaluthra1:zonal
Draft

Integrate prefetcher engine with zonal buckets#805
googlyrahman wants to merge 1 commit intofsspec:mainfrom
ankitaluthra1:zonal

Conversation

@googlyrahman
Copy link
Copy Markdown
Collaborator

@googlyrahman googlyrahman commented Apr 13, 2026

This PR integrates the prefetcher engine with Zonal Buckets to improve read throughput and introduces concurrency support for fetching data. To maximize performance and avoid Python's memory and concurrency bottlenecks, this PR also introduces a highly optimized DirectMemmoveBuffer and an MRDPool for managing connection states.

Zero-Copy, GIL-Free Memory Management (DirectMemmoveBuffer)

Standard Python buffers (like bytearray or io.BytesIO) were GIL-bound in my experiments and require multiple memory copies. For example, if you want to assemble fifty 2MB chunks into a 100MB object using a bytearray, you must create a completely separate 100MB copy when finally retrieving the bytes object because the origin object where you filled the data was bytesarray. Furthermore, these standard I/O objects heavily acquire the Global Interpreter Lock (e.g., holding the GIL for 20s during 60s of high I/O).

To solve this, we introduced DirectMemmoveBuffer. It bypasses these limitations by utilizing ctypes.memmove to write data directly to pre-allocated memory off the main thread, effectively bypassing the GIL and eliminating the final copy overhead.

Concurrency Support for Zonal Buckets

  • Implemented _concurrent_mrd_fetch: A new method to handle downloading multiple data chunks concurrently ( source).
  • Refactored _fetch_range_split: Upgraded this method (used by the readahead_chunked cache) to fully support concurrent chunk fetching ( source).
  • Refactored _cat_file: Updated to support concurrency out of the box ( source).

Connection Pooling (MRDPool)

Introduced MRDPool to manage a pool of AsyncMultiRangeDownloader (MRD) instances. This pool enables safe and efficient concurrent fetching by reusing connections and scaling downloaders on demand.

Prefetcher Engine Integration & Heuristic Tuning

  • Zonal Bucket Integration: Fully integrated the prefetcher engine into the zonal file read path.

  • Tuned Prefetching Trigger (MIN_STREAKS_FOR_PREFETCHING): Modified the prefetching heuristic to start on the 3rd sequential read ( >= 3) instead of the 2nd ( >= 2) ( source). Why? The readahead_chunked cache typically issues two requests to the prefetch engine simultaneously (the required chunk and the readahead chunk). Previously, this immediately and falsely signaled to the prefetcher that the user was performing sequential reads, triggering aggressive prefetching too early. Adjusting the limit to 3 prevents these false positives.

@googlyrahman googlyrahman force-pushed the zonal branch 5 times, most recently from 2469216 to 675b731 Compare April 14, 2026 03:29
@codecov
Copy link
Copy Markdown

codecov bot commented Apr 14, 2026

Codecov Report

❌ Patch coverage is 98.88889% with 3 lines in your changes missing coverage. Please review.
✅ Project coverage is 81.41%. Comparing base (5596df9) to head (6f6600d).

Files with missing lines Patch % Lines
gcsfs/zb_hns_utils.py 97.70% 3 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #805      +/-   ##
==========================================
+ Coverage   79.55%   81.41%   +1.86%     
==========================================
  Files          16       16              
  Lines        3042     3261     +219     
==========================================
+ Hits         2420     2655     +235     
+ Misses        622      606      -16     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Comment thread gcsfs/zb_hns_utils.py Outdated
Comment thread gcsfs/zb_hns_utils.py Outdated
Comment thread gcsfs/zb_hns_utils.py
Comment thread gcsfs/extended_gcsfs.py Outdated
Comment thread gcsfs/zb_hns_utils.py Outdated
Comment thread gcsfs/extended_gcsfs.py
result_bytes = PyBytes_FromStringAndSize(None, length)
buffer_ptr = PyBytes_AsString(result_bytes)

part_size = length // concurrency
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the length is less than the concurrency, the part_size becomes 0, which triggers an unnecessary _download operation with a size of 0. The MRD treats reading 0 bytes as the EOF.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The part size should never be too small anyway, because at that point you're just adding overhead. So we need a reasonable minimum size here.

Comment thread gcsfs/zb_hns_utils.py
if self._pending_count == 0:
self._done_event.clear()
self._pending_count += 1
return self.executor.submit(self._do_memmove, dest, data_bytes, size)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the submit raises an error, _do_memmove will not execute, causing the close operation to hang indefinitely.

Comment thread gcsfs/zb_hns_utils.py
Comment thread gcsfs/extended_gcsfs.py
self._memmove_executor = ThreadPoolExecutor(
max_workers=kwargs.get("memmove_max_workers", 8)
)
weakref.finalize(self, self._memmove_executor.shutdown)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the _Cached meta class hold a strong reference to the GCSFSSystem? If so, weakref.finalize will never be fired, correct?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a strong reference by default, weak by option.
This will still run, but only at interpreter shutdown.

@zhixiangli
Copy link
Copy Markdown
Collaborator

/gcbrun

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants