From a305e36027c001abede2f9d79adf486dd3f9e8e0 Mon Sep 17 00:00:00 2001 From: Eugene Yurtsev Date: Sun, 28 Sep 2025 22:08:52 -0400 Subject: [PATCH] update --- src/oss/langgraph/streaming.mdx | 302 ++++++++++++++++---------------- 1 file changed, 149 insertions(+), 153 deletions(-) diff --git a/src/oss/langgraph/streaming.mdx b/src/oss/langgraph/streaming.mdx index e8c71c97f..a88a75f05 100644 --- a/src/oss/langgraph/streaming.mdx +++ b/src/oss/langgraph/streaming.mdx @@ -84,15 +84,15 @@ for await (const chunk of await graph.stream(inputs, { .compile() ) - for chunk in graph.stream( # (1)! + # The stream() method returns an iterator that yields streamed outputs + for chunk in graph.stream( {"topic": "ice cream"}, - stream_mode="updates", # (2)! + # Set stream_mode="updates" to stream only the updates to the graph state after each node + # Other stream modes are also available. See supported stream modes for details + stream_mode="updates", ): print(chunk) ``` - - 1. The `stream()` method returns an iterator that yields streamed outputs. - 2. Set `stream_mode="updates"` to stream only the updates to the graph state after each node. Other stream modes are also available. See [supported stream modes](#supported-stream-modes) for details. ::: :::js @@ -119,13 +119,13 @@ for await (const chunk of await graph.stream(inputs, { for await (const chunk of await graph.stream( { topic: "ice cream" }, - { streamMode: "updates" } // (1)! + // Set streamMode: "updates" to stream only the updates to the graph state after each node + // Other stream modes are also available. See supported stream modes for details + { streamMode: "updates" } )) { console.log(chunk); } ``` - - 1. Set `streamMode: "updates"` to stream only the updates to the graph state after each node. Other stream modes are also available. See [supported stream modes](#supported-stream-modes) for details. ::: ```output @@ -283,13 +283,12 @@ The outputs will be streamed as tuples `(namespace, data)`, where `namespace` is ```python {highlight={3}} for chunk in graph.stream( {"foo": "foo"}, - subgraphs=True, # (1)! + # Set subgraphs=True to stream outputs from subgraphs + subgraphs=True, stream_mode="updates", ): print(chunk) ``` - -1. Set `subgraphs=True` to stream outputs from subgraphs. ::: :::js @@ -301,15 +300,14 @@ The outputs will be streamed as tuples `[namespace, data]`, where `namespace` is for await (const chunk of await graph.stream( { foo: "foo" }, { - subgraphs: true, // (1)! + // Set subgraphs: true to stream outputs from subgraphs + subgraphs: true, streamMode: "updates", } )) { console.log(chunk); } ``` - -1. Set `subgraphs: true` to stream outputs from subgraphs. ::: @@ -353,12 +351,11 @@ for await (const chunk of await graph.stream( for chunk in graph.stream( {"foo": "foo"}, stream_mode="updates", - subgraphs=True, # (1)! + # Set subgraphs=True to stream outputs from subgraphs + subgraphs=True, ): print(chunk) ``` - - 1. Set `subgraphs=True` to stream outputs from subgraphs. ::: :::js @@ -401,14 +398,13 @@ for await (const chunk of await graph.stream( { foo: "foo" }, { streamMode: "updates", - subgraphs: true, // (1)! + // Set subgraphs: true to stream outputs from subgraphs + subgraphs: true, } )) { console.log(chunk); } ``` - - 1. Set `subgraphs: true` to stream outputs from subgraphs. ::: :::python @@ -493,7 +489,8 @@ llm = init_chat_model(model="openai:gpt-4o-mini") def call_model(state: MyState): """Call the LLM to generate a joke about a topic""" - llm_response = llm.invoke( # (1)! + # Note that message events are emitted even when the LLM is run using .invoke rather than .stream + llm_response = llm.invoke( [ {"role": "user", "content": f"Generate a joke about {state.topic}"} ] @@ -507,16 +504,16 @@ graph = ( .compile() ) -for message_chunk, metadata in graph.stream( # (2)! +# The "messages" stream mode returns an iterator of tuples (message_chunk, metadata) +# where message_chunk is the token streamed by the LLM and metadata is a dictionary +# with information about the graph node where the LLM was called and other information +for message_chunk, metadata in graph.stream( {"topic": "ice cream"}, stream_mode="messages", ): if message_chunk.content: print(message_chunk.content, end="|", flush=True) ``` - -1. Note that the message events are emitted even when the LLM is run using `.invoke` rather than `.stream`. -2. The "messages" stream mode returns an iterator of tuples `(message_chunk, metadata)` where `message_chunk` is the token streamed by the LLM and `metadata` is a dictionary with information about the graph node where the LLM was called and other information. ::: :::js @@ -541,9 +538,10 @@ const llm = new ChatOpenAI({ model: "gpt-4o-mini" }); const callModel = async (state: z.infer) => { // Call the LLM to generate a joke about a topic + // Note that message events are emitted even when the LLM is run using .invoke rather than .stream const llmResponse = await llm.invoke([ { role: "user", content: `Generate a joke about ${state.topic}` }, - ]); // (1)! + ]); return { joke: llmResponse.content }; }; @@ -552,8 +550,10 @@ const graph = new StateGraph(MyState) .addEdge(START, "callModel") .compile(); +// The "messages" stream mode returns an iterator of tuples [messageChunk, metadata] +// where messageChunk is the token streamed by the LLM and metadata is a dictionary +// with information about the graph node where the LLM was called and other information for await (const [messageChunk, metadata] of await graph.stream( - // (2)! { topic: "ice cream" }, { streamMode: "messages" } )) { @@ -562,9 +562,6 @@ for await (const [messageChunk, metadata] of await graph.stream( } } ``` - -1. Note that the message events are emitted even when the LLM is run using `.invoke` rather than `.stream`. -2. The "messages" stream mode returns an iterator of tuples `[messageChunk, metadata]` where `messageChunk` is the token streamed by the LLM and `metadata` is a dictionary with information about the graph node where the LLM was called and other information. ::: #### Filter by LLM invocation @@ -575,54 +572,56 @@ You can associate `tags` with LLM invocations to filter the streamed tokens by L ```python {highlight={10}} from langchain.chat_models import init_chat_model -llm_1 = init_chat_model(model="openai:gpt-4o-mini", tags=['joke']) # (1)! -llm_2 = init_chat_model(model="openai:gpt-4o-mini", tags=['poem']) # (2)! +# llm_1 is tagged with "joke" +llm_1 = init_chat_model(model="openai:gpt-4o-mini", tags=['joke']) +# llm_2 is tagged with "poem" +llm_2 = init_chat_model(model="openai:gpt-4o-mini", tags=['poem']) graph = ... # define a graph that uses these LLMs -async for msg, metadata in graph.astream( # (3)! +# The stream_mode is set to "messages" to stream LLM tokens +# The metadata contains information about the LLM invocation, including the tags +async for msg, metadata in graph.astream( {"topic": "cats"}, stream_mode="messages", ): - if metadata["tags"] == ["joke"]: # (4)! + # Filter the streamed tokens by the tags field in the metadata to only include + # the tokens from the LLM invocation with the "joke" tag + if metadata["tags"] == ["joke"]: print(msg.content, end="|", flush=True) ``` - -1. llm_1 is tagged with "joke". -2. llm_2 is tagged with "poem". -3. The `stream_mode` is set to "messages" to stream LLM tokens. The `metadata` contains information about the LLM invocation, including the tags. -4. Filter the streamed tokens by the `tags` field in the metadata to only include the tokens from the LLM invocation with the "joke" tag. ::: :::js ```typescript import { ChatOpenAI } from "@langchain/openai"; +// llm1 is tagged with "joke" const llm1 = new ChatOpenAI({ model: "gpt-4o-mini", - tags: ['joke'] // (1)! + tags: ['joke'] }); +// llm2 is tagged with "poem" const llm2 = new ChatOpenAI({ model: "gpt-4o-mini", - tags: ['poem'] // (2)! + tags: ['poem'] }); const graph = // ... define a graph that uses these LLMs -for await (const [msg, metadata] of await graph.stream( // (3)! +// The streamMode is set to "messages" to stream LLM tokens +// The metadata contains information about the LLM invocation, including the tags +for await (const [msg, metadata] of await graph.stream( { topic: "cats" }, { streamMode: "messages" } )) { - if (metadata.tags?.includes("joke")) { // (4)! + // Filter the streamed tokens by the tags field in the metadata to only include + // the tokens from the LLM invocation with the "joke" tag + if (metadata.tags?.includes("joke")) { console.log(msg.content + "|"); } } ``` - -1. llm1 is tagged with "joke". -2. llm2 is tagged with "poem". -3. The `streamMode` is set to "messages" to stream LLM tokens. The `metadata` contains information about the LLM invocation, including the tags. -4. Filter the streamed tokens by the `tags` field in the metadata to only include the tokens from the LLM invocation with the "joke" tag. ::: @@ -633,8 +632,10 @@ for await (const [msg, metadata] of await graph.stream( // (3)! from langchain.chat_models import init_chat_model from langgraph.graph import START, StateGraph - joke_model = init_chat_model(model="openai:gpt-4o-mini", tags=["joke"]) # (1)! - poem_model = init_chat_model(model="openai:gpt-4o-mini", tags=["poem"]) # (2)! + # The joke_model is tagged with "joke" + joke_model = init_chat_model(model="openai:gpt-4o-mini", tags=["joke"]) + # The poem_model is tagged with "poem" + poem_model = init_chat_model(model="openai:gpt-4o-mini", tags=["poem"]) class State(TypedDict): @@ -648,14 +649,16 @@ for await (const [msg, metadata] of await graph.stream( // (3)! print("Writing joke...") # Note: Passing the config through explicitly is required for python < 3.11 # Since context var support wasn't added before then: https://docs.python.org/3/library/asyncio-task.html#creating-tasks + # The config is passed through explicitly to ensure the context vars are propagated correctly + # This is required for Python < 3.11 when using async code. Please see the async section for more details joke_response = await joke_model.ainvoke( [{"role": "user", "content": f"Write a joke about {topic}"}], - config, # (3)! + config, ) print("\n\nWriting poem...") poem_response = await poem_model.ainvoke( [{"role": "user", "content": f"Write a short poem about {topic}"}], - config, # (3)! + config, ) return {"joke": joke_response.content, "poem": poem_response.content} @@ -667,18 +670,15 @@ for await (const [msg, metadata] of await graph.stream( // (3)! .compile() ) + # The stream_mode is set to "messages" to stream LLM tokens + # The metadata contains information about the LLM invocation, including the tags async for msg, metadata in graph.astream( {"topic": "cats"}, - stream_mode="messages", # (4)! + stream_mode="messages", ): - if metadata["tags"] == ["joke"]: # (4)! + if metadata["tags"] == ["joke"]: print(msg.content, end="|", flush=True) ``` - - 1. The `joke_model` is tagged with "joke". - 2. The `poem_model` is tagged with "poem". - 3. The `config` is passed through explicitly to ensure the context vars are propagated correctly. This is required for Python < 3.11 when using async code. Please see the [async section](#async) for more details. - 4. The `stream_mode` is set to "messages" to stream LLM tokens. The `metadata` contains information about the LLM invocation, including the tags. ::: :::js @@ -687,13 +687,15 @@ for await (const [msg, metadata] of await graph.stream( // (3)! import { StateGraph, START } from "@langchain/langgraph"; import * as z from "zod"; + // The jokeModel is tagged with "joke" const jokeModel = new ChatOpenAI({ model: "gpt-4o-mini", - tags: ["joke"] // (1)! + tags: ["joke"] }); + // The poemModel is tagged with "poem" const poemModel = new ChatOpenAI({ model: "gpt-4o-mini", - tags: ["poem"] // (2)! + tags: ["poem"] }); const State = z.object({ @@ -724,20 +726,19 @@ for await (const [msg, metadata] of await graph.stream( // (3)! .addEdge(START, "callModel") .compile(); + // The streamMode is set to "messages" to stream LLM tokens + // The metadata contains information about the LLM invocation, including the tags for await (const [msg, metadata] of await graph.stream( { topic: "cats" }, - { streamMode: "messages" } // (3)! + { streamMode: "messages" } )) { - if (metadata.tags?.includes("joke")) { // (4)! + // Filter the streamed tokens by the tags field in the metadata to only include + // the tokens from the LLM invocation with the "joke" tag + if (metadata.tags?.includes("joke")) { console.log(msg.content + "|"); } } ``` - - 1. The `jokeModel` is tagged with "joke". - 2. The `poemModel` is tagged with "poem". - 3. The `streamMode` is set to "messages" to stream LLM tokens. The `metadata` contains information about the LLM invocation, including the tags. - 4. Filter the streamed tokens by the `tags` field in the metadata to only include the tokens from the LLM invocation with the "joke" tag. ::: @@ -747,34 +748,36 @@ To stream tokens only from specific nodes, use `stream_mode="messages"` and filt :::python ```python {highlight={3,5}} -for msg, metadata in graph.stream( # (1)! +# The "messages" stream mode returns a tuple of (message_chunk, metadata) +# where message_chunk is the token streamed by the LLM and metadata is a dictionary +# with information about the graph node where the LLM was called and other information +for msg, metadata in graph.stream( inputs, stream_mode="messages", ): - if msg.content and metadata["langgraph_node"] == "some_node_name": # (2)! + # Filter the streamed tokens by the langgraph_node field in the metadata + # to only include the tokens from the specified node + if msg.content and metadata["langgraph_node"] == "some_node_name": ... ``` - -1. The "messages" stream mode returns a tuple of `(message_chunk, metadata)` where `message_chunk` is the token streamed by the LLM and `metadata` is a dictionary with information about the graph node where the LLM was called and other information. -2. Filter the streamed tokens by the `langgraph_node` field in the metadata to only include the tokens from the `write_poem` node. ::: :::js ```typescript +// The "messages" stream mode returns a tuple of [messageChunk, metadata] +// where messageChunk is the token streamed by the LLM and metadata is a dictionary +// with information about the graph node where the LLM was called and other information for await (const [msg, metadata] of await graph.stream( - // (1)! inputs, { streamMode: "messages" } )) { + // Filter the streamed tokens by the langgraph_node field in the metadata + // to only include the tokens from the specified node if (msg.content && metadata.langgraph_node === "some_node_name") { - // (2)! // ... } } ``` - -1. The "messages" stream mode returns a tuple of `[messageChunk, metadata]` where `messageChunk` is the token streamed by the LLM and `metadata` is a dictionary with information about the graph node where the LLM was called and other information. -2. Filter the streamed tokens by the `langgraph_node` field in the metadata to only include the tokens from the `writePoem` node. ::: @@ -819,16 +822,18 @@ for await (const [msg, metadata] of await graph.stream( .compile() ) - for msg, metadata in graph.stream( # (1)! + # The "messages" stream mode returns a tuple of (message_chunk, metadata) + # where message_chunk is the token streamed by the LLM and metadata is a dictionary + # with information about the graph node where the LLM was called and other information + for msg, metadata in graph.stream( {"topic": "cats"}, stream_mode="messages", ): - if msg.content and metadata["langgraph_node"] == "write_poem": # (2)! + # Filter the streamed tokens by the langgraph_node field in the metadata + # to only include the tokens from the write_poem node + if msg.content and metadata["langgraph_node"] == "write_poem": print(msg.content, end="|", flush=True) ``` - - 1. The "messages" stream mode returns a tuple of `(message_chunk, metadata)` where `message_chunk` is the token streamed by the LLM and `metadata` is a dictionary with information about the graph node where the LLM was called and other information. - 2. Filter the streamed tokens by the `langgraph_node` field in the metadata to only include the tokens from the `write_poem` node. ::: :::js @@ -865,18 +870,20 @@ for await (const [msg, metadata] of await graph.stream( .addEdge(START, "writePoem") .compile(); - for await (const [msg, metadata] of await graph.stream( // (1)! + // The "messages" stream mode returns a tuple of [messageChunk, metadata] + // where messageChunk is the token streamed by the LLM and metadata is a dictionary + // with information about the graph node where the LLM was called and other information + for await (const [msg, metadata] of await graph.stream( { topic: "cats" }, { streamMode: "messages" } )) { - if (msg.content && metadata.langgraph_node === "writePoem") { // (2)! + // Filter the streamed tokens by the langgraph_node field in the metadata + // to only include the tokens from the writePoem node + if (msg.content && metadata.langgraph_node === "writePoem") { console.log(msg.content + "|"); } } ``` - - 1. The "messages" stream mode returns a tuple of `[messageChunk, metadata]` where `messageChunk` is the token streamed by the LLM and `metadata` is a dictionary with information about the graph node where the LLM was called and other information. - 2. Filter the streamed tokens by the `langgraph_node` field in the metadata to only include the tokens from the `writePoem` node. ::: @@ -907,8 +914,10 @@ See [Async with Python < 3.11](#async) for usage examples. answer: str def node(state: State): - writer = get_stream_writer() # (1)! - writer({"custom_key": "Generating custom data inside node"}) # (2)! + # Get the stream writer to send custom data + writer = get_stream_writer() + # Emit a custom key-value pair (e.g., progress update) + writer({"custom_key": "Generating custom data inside node"}) return {"answer": "some data"} graph = ( @@ -920,14 +929,10 @@ See [Async with Python < 3.11](#async) for usage examples. inputs = {"query": "example"} - # Usage - for chunk in graph.stream(inputs, stream_mode="custom"): # (3)! + # Set stream_mode="custom" to receive the custom data in the stream + for chunk in graph.stream(inputs, stream_mode="custom"): print(chunk) ``` - - 1. Get the stream writer to send custom data. - 2. Emit a custom key-value pair (e.g., progress update). - 3. Set `stream_mode="custom"` to receive the custom data in the stream. ```python {highlight={8,10}} @@ -937,23 +942,22 @@ See [Async with Python < 3.11](#async) for usage examples. @tool def query_database(query: str) -> str: """Query the database.""" - writer = get_stream_writer() # (1)! - writer({"data": "Retrieved 0/100 records", "type": "progress"}) # (2)! + # Access the stream writer to send custom data + writer = get_stream_writer() + # Emit a custom key-value pair (e.g., progress update) + writer({"data": "Retrieved 0/100 records", "type": "progress"}) # perform query - writer({"data": "Retrieved 100/100 records", "type": "progress"}) # (3)! + # Emit another custom key-value pair + writer({"data": "Retrieved 100/100 records", "type": "progress"}) return "some-answer" graph = ... # define a graph that uses this tool - for chunk in graph.stream(inputs, stream_mode="custom"): # (4)! + # Set stream_mode="custom" to receive the custom data in the stream + for chunk in graph.stream(inputs, stream_mode="custom"): print(chunk) ``` - - 1. Access the stream writer to send custom data. - 2. Emit a custom key-value pair (e.g., progress update). - 3. Emit another custom key-value pair. - 4. Set `stream_mode="custom"` to receive the custom data in the stream. ::: @@ -977,7 +981,8 @@ To send **custom user-defined data** from inside a LangGraph node or tool, follo const graph = new StateGraph(State) .addNode("node", async (state, config) => { - config.writer({ custom_key: "Generating custom data inside node" }); // (1)! + // Use the writer to emit a custom key-value pair (e.g., progress update) + config.writer({ custom_key: "Generating custom data inside node" }); return { answer: "some data" }; }) .addEdge(START, "node") @@ -985,14 +990,11 @@ To send **custom user-defined data** from inside a LangGraph node or tool, follo const inputs = { query: "example" }; - // Usage - for await (const chunk of await graph.stream(inputs, { streamMode: "custom" })) { // (2)! + // Set streamMode: "custom" to receive the custom data in the stream + for await (const chunk of await graph.stream(inputs, { streamMode: "custom" })) { console.log(chunk); } ``` - - 1. Use the writer to emit a custom key-value pair (e.g., progress update). - 2. Set `streamMode: "custom"` to receive the custom data in the stream. ```typescript @@ -1002,9 +1004,11 @@ To send **custom user-defined data** from inside a LangGraph node or tool, follo const queryDatabase = tool( async (input, config: LangGraphRunnableConfig) => { - config.writer({ data: "Retrieved 0/100 records", type: "progress" }); // (1)! + // Use the writer to emit a custom key-value pair (e.g., progress update) + config.writer({ data: "Retrieved 0/100 records", type: "progress" }); // perform query - config.writer({ data: "Retrieved 100/100 records", type: "progress" }); // (2)! + // Emit another custom key-value pair + config.writer({ data: "Retrieved 100/100 records", type: "progress" }); return "some-answer"; }, { @@ -1018,14 +1022,11 @@ To send **custom user-defined data** from inside a LangGraph node or tool, follo const graph = // ... define a graph that uses this tool - for await (const chunk of await graph.stream(inputs, { streamMode: "custom" })) { // (3)! + // Set streamMode: "custom" to receive the custom data in the stream + for await (const chunk of await graph.stream(inputs, { streamMode: "custom" })) { console.log(chunk); } ``` - - 1. Use the writer to emit a custom key-value pair (e.g., progress update). - 2. Emit another custom key-value pair. - 3. Set `streamMode: "custom"` to receive the custom data in the stream. ::: @@ -1042,10 +1043,13 @@ from langgraph.config import get_stream_writer def call_arbitrary_model(state): """Example node that calls an arbitrary model and streams the output""" - writer = get_stream_writer() # (1)! + # Get the stream writer to send custom data + writer = get_stream_writer() # Assume you have a streaming client that yields chunks - for chunk in your_custom_streaming_client(state["topic"]): # (2)! - writer({"custom_llm_chunk": chunk}) # (3)! + # Generate LLM tokens using your custom streaming client + for chunk in your_custom_streaming_client(state["topic"]): + # Use the writer to send custom data to the stream + writer({"custom_llm_chunk": chunk}) return {"result": "completed"} graph = ( @@ -1055,18 +1059,14 @@ graph = ( .compile() ) +# Set stream_mode="custom" to receive the custom data in the stream for chunk in graph.stream( {"topic": "cats"}, - stream_mode="custom", # (4)! + stream_mode="custom", ): # The chunk will contain the custom data streamed from the llm print(chunk) ``` - -1. Get the stream writer to send custom data. -2. Generate LLM tokens using your custom streaming client. -3. Use the writer to send custom data to the stream. -4. Set `stream_mode="custom"` to receive the custom data in the stream. ::: :::js @@ -1083,9 +1083,10 @@ const callArbitraryModel = async ( ) => { // Example node that calls an arbitrary model and streams the output // Assume you have a streaming client that yields chunks + // Generate LLM tokens using your custom streaming client for await (const chunk of yourCustomStreamingClient(state.topic)) { - // (1)! - config.writer({ custom_llm_chunk: chunk }); // (2)! + // Use the writer to send custom data to the stream + config.writer({ custom_llm_chunk: chunk }); } return { result: "completed" }; }; @@ -1095,18 +1096,15 @@ const graph = new StateGraph(State) // Add other nodes and edges as needed .compile(); +// Set streamMode: "custom" to receive the custom data in the stream for await (const chunk of await graph.stream( { topic: "cats" }, - { streamMode: "custom" } // (3)! + { streamMode: "custom" } )) { // The chunk will contain the custom data streamed from the llm console.log(chunk); } ``` - -1. Generate LLM tokens using your custom streaming client. -2. Use the writer to send custom data to the stream. -3. Set `streamMode: "custom"` to receive the custom data in the stream. ::: @@ -1368,20 +1366,18 @@ Set `disable_streaming=True` when initializing the model. model = init_chat_model( "anthropic:claude-3-7-sonnet-latest", - disable_streaming=True # (1)! + # Set disable_streaming=True to disable streaming for the chat model + disable_streaming=True ) ``` - - 1. Set `disable_streaming=True` to disable streaming for the chat model. ```python from langchain_openai import ChatOpenAI - llm = ChatOpenAI(model="o1-preview", disable_streaming=True) # (1)! + # Set disable_streaming=True to disable streaming for the chat model + llm = ChatOpenAI(model="o1-preview", disable_streaming=True) ``` - - 1. Set `disable_streaming=True` to disable streaming for the chat model. ::: @@ -1394,7 +1390,8 @@ import { ChatOpenAI } from "@langchain/openai"; const model = new ChatOpenAI({ model: "o1-preview", - streaming: false, // (1)! + // Set streaming: false to disable streaming for the chat model + streaming: false, }); ``` ::: @@ -1421,12 +1418,14 @@ This limits LangGraph ability to automatically propagate context, and affects La topic: str joke: str - async def call_model(state, config): # (1)! + # Accept config as an argument in the async node function + async def call_model(state, config): topic = state["topic"] print("Generating joke...") + # Pass config to llm.ainvoke() to ensure proper context propagation joke_response = await llm.ainvoke( [{"role": "user", "content": f"Write a joke about {topic}"}], - config, # (2)! + config, ) return {"joke": joke_response.content} @@ -1437,17 +1436,14 @@ This limits LangGraph ability to automatically propagate context, and affects La .compile() ) + # Set stream_mode="messages" to stream LLM tokens async for chunk, metadata in graph.astream( {"topic": "ice cream"}, - stream_mode="messages", # (3)! + stream_mode="messages", ): if chunk.content: print(chunk.content, end="|", flush=True) ``` - - 1. Accept `config` as an argument in the async node function. - 2. Pass `config` to `llm.ainvoke()` to ensure proper context propagation. - 3. Set `stream_mode="messages"` to stream LLM tokens. @@ -1459,7 +1455,9 @@ This limits LangGraph ability to automatically propagate context, and affects La topic: str joke: str - async def generate_joke(state: State, writer: StreamWriter): # (1)! + # Add writer as an argument in the function signature of the async node or tool + # LangGraph will automatically pass the stream writer to the function + async def generate_joke(state: State, writer: StreamWriter): writer({"custom_key": "Streaming custom data while generating a joke"}) return {"joke": f"This is a joke about {state['topic']}"} @@ -1470,14 +1468,12 @@ This limits LangGraph ability to automatically propagate context, and affects La .compile() ) + # Set stream_mode="custom" to receive the custom data in the stream async for chunk in graph.astream( {"topic": "ice cream"}, - stream_mode="custom", # (2)! + stream_mode="custom", ): print(chunk) ``` - - 1. Add `writer` as an argument in the function signature of the async node or tool. LangGraph will automatically pass the stream writer to the function. - 2. Set `stream_mode="custom"` to receive the custom data in the stream. :::