Skip to content

Fix event-loop blocking in one-step-off async save/export paths#1446

Open
taivu1998 wants to merge 2 commits intoNovaSky-AI:mainfrom
taivu1998:tdv/issue-188-async-offload
Open

Fix event-loop blocking in one-step-off async save/export paths#1446
taivu1998 wants to merge 2 commits intoNovaSky-AI:mainfrom
taivu1998:tdv/issue-188-async-offload

Conversation

@taivu1998
Copy link
Copy Markdown

@taivu1998 taivu1998 commented Apr 2, 2026

Summary

  • fix the one-step-off async trainer so checkpoint saves and HF exports are offloaded with asyncio.to_thread(...)
  • add a focused regression test that fails if the async example calls save/export inline on the event loop
  • document the async-safe pattern in the one-step-off async tutorial

Why

This addresses issue #188.

The async example in examples/train/async/async_trainer.py runs generation in a background coroutine, but its periodic and final save_checkpoints() / save_models() calls were still executed directly from async def train(). Those methods perform heavyweight synchronous work, so running them inline blocks the event loop and stalls the background generation coroutine during save windows.

Changes

  • replace the four blocking save/export call sites in the one-step-off async trainer with:
    • await asyncio.to_thread(self.save_checkpoints)
    • await asyncio.to_thread(self.save_models)
  • add tests/test_async_trainer_example.py to verify the periodic and final save/export paths are offloaded
  • add a short note in the one-step-off async tutorial warning that heavyweight synchronous trainer work should be kicked off the event loop

Validation

  • PYTHONPATH=/tmp/skyrl-issue-188 /Users/vuductai/Documents/Projects/SkyRL/.venv/bin/pytest tests/test_async_trainer_example.py -q
  • PYTHONPATH=/tmp/skyrl-issue-188 /Users/vuductai/Documents/Projects/SkyRL/.venv/bin/python -m py_compile examples/train/async/async_trainer.py tests/test_async_trainer_example.py
  • git diff --check

Notes

  • I kept this change intentionally narrow so it closes Blocking functions in async example #188 without broadening into a larger async trainer refactor.
  • There is still an existing timing-state nuance in save_checkpoints() because it updates self.all_timings internally during cleanup timing; that pattern already exists in the fully async trainer as well, so I left it out of scope for this patch.

Open with Devin

@taivu1998 taivu1998 closed this Apr 3, 2026
@taivu1998 taivu1998 reopened this Apr 4, 2026
@taivu1998 taivu1998 marked this pull request as ready for review April 5, 2026 00:07
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request offloads heavyweight synchronous operations, such as saving checkpoints and models, to separate threads using asyncio.to_thread within the AsyncRayPPOTrainer. This change is intended to prevent blocking the event loop and is accompanied by updated documentation and a new test case. Feedback focuses on potential thread-safety issues introduced by this concurrency, specifically regarding the dataloader's state access during checkpointing and a race condition when resetting the shared all_timings dictionary.

if self.cfg.trainer.ckpt_interval > 0 and self.global_step % self.cfg.trainer.ckpt_interval == 0:
with Timer("save_checkpoints", self.all_timings):
self.save_checkpoints()
await asyncio.to_thread(self.save_checkpoints)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Offloading save_checkpoints to a separate thread while the background generation loop is running introduces potential thread-safety concerns. Specifically, self.save_checkpoints calls self.train_dataloader.state_dict(), which may conflict with the concurrent iteration of the same dataloader in _run_generate_loop. It is important to ensure that the dataloader implementation used here supports concurrent state access while an iteration is in progress.

self.save_models()
await asyncio.to_thread(self.save_models)
self.tracker.log({"timing/" + k: v for k, v in self.all_timings.items()}, step=self.global_step)
self.all_timings = {}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Resetting self.all_timings by assignment (self.all_timings = {}) while the background _run_generate_loop coroutine is active creates a race condition. If the background loop is currently inside a Timer block, it holds a reference to the old dictionary; when it finishes, it will write its timing data to that discarded dictionary, causing the metrics to be lost. While this pattern existed previously, offloading more work to threads increases the window for such concurrency issues. Consider using a thread-safe mechanism to aggregate and clear timings.

Copy link
Copy Markdown
Contributor

@devin-ai-integration devin-ai-integration bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✅ Devin Review: No Issues Found

Devin Review analyzed this PR and found no potential bugs to report.

View in Devin Review to see 4 additional findings.

Open in Devin Review

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Blocking functions in async example

1 participant