Skip to content

LangGraph streaming with workflow streams#1500

Open
brianstrauch wants to merge 21 commits intomainfrom
langgraph-streaming
Open

LangGraph streaming with workflow streams#1500
brianstrauch wants to merge 21 commits intomainfrom
langgraph-streaming

Conversation

@brianstrauch
Copy link
Copy Markdown
Contributor

@brianstrauch brianstrauch commented May 1, 2026

  • Wire LangGraph's get_stream_writer() to Temporal: inside activity-wrapped nodes, the writer is backed by a WorkflowStreamClient that signals chunks back to the owning workflow's WorkflowStream on a topic, letting external subscribers tail node output via WorkflowStreamClient.
  • Thread an optional stream_writer through set_langgraph_config so the activity wrapper can install one per invocation; default behavior (no writer) is unchanged.

Test plan

  • test_streaming_via_workflow_streams — node calls get_stream_writer() inside an activity; external WorkflowStreamClient receives {token: a/b/c} then {done: True}, and the workflow result reflects the accumulated state.
  • test_workflow_publishes_astream_chunks — workflow iterates app.astream(...) and republishes each chunk on its own topic; subscriber sees per-node progress.

@brianstrauch brianstrauch requested a review from a team as a code owner May 1, 2026 20:27
@brianstrauch brianstrauch requested a review from jssmith May 1, 2026 20:28
@brianstrauch brianstrauch requested a review from DABH May 1, 2026 21:33
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds an experimental integration path between LangGraph’s get_stream_writer() streaming mechanism and Temporal “Workflow Streams”, so activity-wrapped LangGraph nodes can publish incremental chunks that external subscribers can tail via WorkflowStreamClient.

Changes:

  • Add streaming_topic and streaming_batch_interval options to LangGraphPlugin and thread them into the activity wrapper.
  • Extend LangGraph runtime config restoration to accept an optional stream_writer, enabling activity invocations to install a per-call writer.
  • Add tests covering (1) activity-side get_stream_writer() streaming to workflow streams and (2) workflow-side forwarding of astream() chunks to an external topic.

Reviewed changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
tests/contrib/langgraph/test_streaming.py Adds workflow-stream-backed streaming tests (activity-side writer + workflow-side republish of astream() chunks).
temporalio/contrib/langgraph/_plugin.py Introduces plugin-level streaming configuration and passes it into the activity wrapper.
temporalio/contrib/langgraph/_langgraph_config.py Adds stream_writer plumbing to set_langgraph_config so activities can inject a writer into the reconstructed Runtime.
temporalio/contrib/langgraph/_activity.py Uses WorkflowStreamClient.from_within_activity() to back the injected writer with workflow-stream topic publishes when streaming is enabled.
temporalio/contrib/langgraph/__init__.py Minor export ordering change (no functional changes).

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread temporalio/contrib/langgraph/__init__.py
Comment thread temporalio/contrib/langgraph/_plugin.py
Comment thread temporalio/contrib/langgraph/_activity.py
Comment thread temporalio/contrib/langgraph/_activity.py
Comment thread temporalio/contrib/langgraph/_plugin.py
Comment thread tests/contrib/langgraph/test_streaming.py
Comment thread temporalio/contrib/langgraph/_activity.py
Comment thread temporalio/contrib/langgraph/_activity.py
Comment thread temporalio/contrib/langgraph/README.md Outdated
Comment thread temporalio/contrib/langgraph/_plugin.py
The LangGraph interceptor now checks at workflow start that a
WorkflowStream has been registered (via the publish signal handler)
when the plugin was configured with streaming_topic. Misconfigured
workflows fail fast with a clear error pointing at @workflow.init,
instead of silently producing no-op streams.
Wrap execute_in='workflow' nodes with wrap_workflow(), which mirrors
wrap_activity() and (when streaming_topic is set) overrides the
LangGraph Runtime's stream_writer to publish synchronously to the
in-workflow WorkflowStream — no signal round-trip. Parametrized the
streaming test over execute_in to cover both paths.
Expand the README streaming section with a self-contained snippet
(plugin, WorkflowStream in __init__, external subscriber loop), an
explicit callout that streaming_topic only covers stream_mode='custom'
with an astream() bridge example for other modes, and at-least-once
retry semantics. Add an Args section to LangGraphPlugin's docstring
covering all constructor parameters.
Pick the raw user function from runnable.func instead of LangGraph's
async runnable.afunc adapter, which wraps sync nodes in
loop.run_in_executor — that's incompatible with the workflow event
loop. wrap_activity now schedules sync funcs on a thread via
asyncio.to_thread so the activity loop stays free for the streaming
flusher, with stream_writer calls marshaled back to the loop thread
to keep the workflow_streams client's asyncio.Event safe. Parametrize
the streaming test over (execute_in, sync/async).
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 8 out of 8 changed files in this pull request and generated 3 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread tests/contrib/langgraph/test_streaming.py Outdated
Comment thread tests/contrib/langgraph/test_streaming.py Outdated
Comment thread temporalio/contrib/langgraph/_activity.py
Copy link
Copy Markdown
Contributor

@DABH DABH left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All my critiques have been addressed - thanks!

The workflow was publishing chunk_b and the done marker in the same
workflow task as its return, leaving no chance for the subscriber's
next poll to land on a running workflow. Add an ack_done signal the
subscriber sends after seeing done; the workflow waits for it before
returning. Also hoist a signature() lookup out of the activity wrapper
hot path.
LangGraph's astream uses asyncio.create_task internally, and Python
3.10 doesn't propagate contextvars through new tasks. As a result
get_stream_writer() returns "outside of a runnable context" when the
node executes in-workflow under streaming_topic. Activity-side
streaming is unaffected because the activity wrapper sets the runtime
contextvar explicitly within the same task as the user node.

This matches the existing 3.10 limitation already documented on the
plugin (interrupts and the Functional API are also gated on 3.11+).
In the async-workflow case the node runs inline in the workflow with
no awaits, so ainvoke and the workflow return in the same task as the
publishes. The subscriber's first poll lands after completion and gets
zero items. Add an ack_done signal the subscriber sends after seeing
done; the workflow waits for it before returning. Mirrors 32818b1 for
AstreamPublishWorkflow.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants