Integrate prefetcher engine with zonal buckets#805
Integrate prefetcher engine with zonal buckets#805googlyrahman wants to merge 1 commit intofsspec:mainfrom
Conversation
2469216 to
675b731
Compare
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #805 +/- ##
==========================================
+ Coverage 79.55% 81.41% +1.86%
==========================================
Files 16 16
Lines 3042 3261 +219
==========================================
+ Hits 2420 2655 +235
+ Misses 622 606 -16 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
| result_bytes = PyBytes_FromStringAndSize(None, length) | ||
| buffer_ptr = PyBytes_AsString(result_bytes) | ||
|
|
||
| part_size = length // concurrency |
There was a problem hiding this comment.
When the length is less than the concurrency, the part_size becomes 0, which triggers an unnecessary _download operation with a size of 0. The MRD treats reading 0 bytes as the EOF.
There was a problem hiding this comment.
The part size should never be too small anyway, because at that point you're just adding overhead. So we need a reasonable minimum size here.
| if self._pending_count == 0: | ||
| self._done_event.clear() | ||
| self._pending_count += 1 | ||
| return self.executor.submit(self._do_memmove, dest, data_bytes, size) |
There was a problem hiding this comment.
If the submit raises an error, _do_memmove will not execute, causing the close operation to hang indefinitely.
| self._memmove_executor = ThreadPoolExecutor( | ||
| max_workers=kwargs.get("memmove_max_workers", 8) | ||
| ) | ||
| weakref.finalize(self, self._memmove_executor.shutdown) |
There was a problem hiding this comment.
Does the _Cached meta class hold a strong reference to the GCSFSSystem? If so, weakref.finalize will never be fired, correct?
There was a problem hiding this comment.
It is a strong reference by default, weak by option.
This will still run, but only at interpreter shutdown.
|
/gcbrun |
This PR integrates the prefetcher engine with Zonal Buckets to improve read throughput and introduces concurrency support for fetching data. To maximize performance and avoid Python's memory and concurrency bottlenecks, this PR also introduces a highly optimized DirectMemmoveBuffer and an MRDPool for managing connection states.
Zero-Copy, GIL-Free Memory Management (DirectMemmoveBuffer)
Standard Python buffers (like
bytearrayorio.BytesIO) were GIL-bound in my experiments and require multiple memory copies. For example, if you want to assemble fifty 2MB chunks into a 100MB object using abytearray, you must create a completely separate 100MB copy when finally retrieving the bytes object because the origin object where you filled the data wasbytesarray. Furthermore, these standard I/O objects heavily acquire the Global Interpreter Lock (e.g., holding the GIL for 20s during 60s of high I/O).To solve this, we introduced
DirectMemmoveBuffer. It bypasses these limitations by utilizingctypes.memmoveto write data directly to pre-allocated memory off the main thread, effectively bypassing the GIL and eliminating the final copy overhead.Concurrency Support for Zonal Buckets
_concurrent_mrd_fetch: A new method to handle downloading multiple data chunks concurrently ( source)._fetch_range_split: Upgraded this method (used by the readahead_chunked cache) to fully support concurrent chunk fetching ( source)._cat_file: Updated to support concurrency out of the box ( source).Connection Pooling (MRDPool)
Introduced MRDPool to manage a pool of AsyncMultiRangeDownloader (MRD) instances. This pool enables safe and efficient concurrent fetching by reusing connections and scaling downloaders on demand.
Prefetcher Engine Integration & Heuristic Tuning
Zonal Bucket Integration: Fully integrated the prefetcher engine into the zonal file read path.
Tuned Prefetching Trigger (MIN_STREAKS_FOR_PREFETCHING): Modified the prefetching heuristic to start on the 3rd sequential read ( >= 3) instead of the 2nd ( >= 2) ( source). Why? The readahead_chunked cache typically issues two requests to the prefetch engine simultaneously (the required chunk and the readahead chunk). Previously, this immediately and falsely signaled to the prefetcher that the user was performing sequential reads, triggering aggressive prefetching too early. Adjusting the limit to 3 prevents these false positives.