-
Notifications
You must be signed in to change notification settings - Fork 6
Fix server streaming handler not cancelled on client disconnect #175
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,5 +1,7 @@ | ||
| from __future__ import annotations | ||
|
|
||
| import asyncio | ||
| import struct | ||
| from typing import TYPE_CHECKING | ||
|
|
||
| import pytest | ||
|
|
@@ -23,6 +25,8 @@ | |
| if TYPE_CHECKING: | ||
| from collections.abc import AsyncIterator, Iterator | ||
|
|
||
| from asgiref.typing import HTTPDisconnectEvent, HTTPRequestEvent, HTTPScope | ||
|
|
||
|
|
||
| @pytest.mark.parametrize("proto_json", [False, True]) | ||
| @pytest.mark.parametrize("compression_name", ["gzip", "br", "zstd", "identity"]) | ||
|
|
@@ -280,3 +284,76 @@ async def request_stream(): | |
| else: | ||
| assert len(requests) == 2 | ||
| assert len(responses) == 1 | ||
|
|
||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_server_stream_client_disconnect() -> None: | ||
| """Server streaming generator should be closed when the client disconnects. | ||
|
|
||
| Regression test for https://github.com/connectrpc/connect-python/issues/174. | ||
| """ | ||
| generator_closed = asyncio.Event() | ||
|
|
||
| class InfiniteHaberdasher(Haberdasher): | ||
| async def make_similar_hats(self, request, ctx): | ||
| try: | ||
| while True: | ||
| yield Hat(size=request.inches, color="green") | ||
| await asyncio.sleep(0) # yield control to event loop | ||
| finally: | ||
| generator_closed.set() | ||
|
|
||
| app = HaberdasherASGIApplication(InfiniteHaberdasher()) | ||
|
|
||
| # Encode a Connect protocol (application/connect+proto) request for Size(inches=10). | ||
| request_bytes = Size(inches=10).SerializeToString() | ||
| request_body = struct.pack(">BI", 0, len(request_bytes)) + request_bytes | ||
|
|
||
| # We invoke the ASGI app directly rather than using a real client with a | ||
| # short timeout because a real client could trigger the disconnect before the | ||
| # request body has been fully read, which would be a different code path. | ||
| disconnect_trigger = asyncio.Event() | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think a client with a timeout of some tens of Ms should be non-flaky, but maybe the windows runner would try extra hard to prove otherwise. Manually invoking the app seems fine too but let's add a comment then about potential flakiness if using a client with a timeout (we need it to happen after the request has been fully read)? |
||
| response_count = 0 | ||
| call_count = 0 | ||
|
|
||
| async def receive() -> HTTPRequestEvent | HTTPDisconnectEvent: | ||
| nonlocal call_count | ||
| call_count += 1 | ||
| if call_count == 1: | ||
| return {"type": "http.request", "body": request_body, "more_body": False} | ||
| # Block until the test is ready to simulate a disconnect. | ||
| await disconnect_trigger.wait() | ||
| return {"type": "http.disconnect"} | ||
|
|
||
| async def send(message): | ||
| nonlocal response_count | ||
| if message.get("type") == "http.response.body" and message.get( | ||
| "more_body", False | ||
| ): | ||
| response_count += 1 | ||
| if response_count >= 3: | ||
| disconnect_trigger.set() | ||
|
|
||
| scope: HTTPScope = { | ||
| "type": "http", | ||
| "asgi": {"spec_version": "2.0", "version": "3.0"}, | ||
| "http_version": "1.1", | ||
| "method": "POST", | ||
| "scheme": "http", | ||
| "path": "/connectrpc.example.Haberdasher/MakeSimilarHats", | ||
| "raw_path": b"/connectrpc.example.Haberdasher/MakeSimilarHats", | ||
| "query_string": b"", | ||
| "root_path": "", | ||
| "headers": [(b"content-type", b"application/connect+proto")], | ||
| "client": None, | ||
| "server": None, | ||
| "extensions": None, | ||
| } | ||
|
|
||
| # Without the fix the app hangs forever (generator never stopped), causing a | ||
| # TimeoutError here. With the fix it terminates promptly after the disconnect. | ||
| await asyncio.wait_for(app(scope, receive, send), timeout=5.0) | ||
|
|
||
| assert generator_closed.is_set(), ( | ||
| "generator should be closed after client disconnect" | ||
| ) | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC it's possible for this to throw since it's user code and then monitor task would be leaked. When thinking about letting it propagate vs catch and maybe log, I guess a finally in a user generator is still part of the request handler and makes sense to allow to affect the response. How about just reordering then if that makes sense?