Table of Contents

    Book an Appointment

    INTRODUCTION

    While working on an enterprise customer service platform, our engineering team was tasked with architecting the backend for an AI-powered voice-bot. Because conversational AI requires near-instantaneous responses to feel natural, latency was our primary constraint. The architecture relied heavily on a multi-agent workflow: intermediate agents handled routing, context retrieval, and business logic, while the final, user-facing agent generated the conversational response to be piped into a Text-to-Speech (TTS) engine.

    During a recent project phase, we realized a critical bottleneck. When we tested the user-facing agent in isolation, it streamed token-by-token perfectly, allowing our TTS engine to begin synthesizing speech almost immediately. However, the moment we integrated this same agent into our LangGraph StateGraph workflow, it lost its ability to stream. The workflow began batching the tokens, waiting for the entire LLM response to complete before passing the output downstream. In a voice-bot scenario, this multi-second delay results in dead silence—an unacceptable user experience.

    We encountered a situation where standard synchronous execution mechanisms obscured the real-time event stream of nested computational graphs. This challenge inspired this article, as understanding the nuances of nested graph execution and streaming modes is critical for any team building high-throughput AI systems. Here is how we diagnosed and resolved the issue.

    PROBLEM CONTEXT

    In our architecture, the voice-bot backend required LangGraph to manage complex state transitions. We utilized a StateGraph to act as the primary orchestration layer. Some nodes executed background tasks—where streaming didn’t matter because the user wouldn’t see or hear the output. However, the final node was responsible for the actual conversation generation.

    To retain built-in middleware, memory capabilities, and standard tool-binding features, we used LangGraph’s prebuilt agent functions (like create_agent or create_react_agent). The initial implementation looked similar to this:

    def agent_node(state: MessagesState):
       # Synchronous invocation blocks the stream
       return agent.invoke({"messages": state["messages"]})

    Even when we attempted to bypass the wrapper function and add the compiled agent directly as a node (workflow.add_node("agent", agent)), calling the synchronous stream() method on the compiled workflow yielded the same result: chunks were batched, and the token-by-token real-time flow was destroyed.

    WHAT WENT WRONG

    To understand why this happened, we must look at how LangGraph handles nested computation graphs. An agent created with LangGraph’s prebuilt methods is, under the hood, a CompiledGraph itself.

    When you place a CompiledGraph inside another CompiledGraph (the outer StateGraph workflow), calling a synchronous method like .invoke() inside a node forces the Python thread to wait for the inner graph to finish its execution entirely before returning control to the outer graph.

    Furthermore, when using the outer graph’s synchronous .stream(..., stream_mode=["messages"]) method, LangGraph relies on the event loop to surface internal message chunks. In a synchronous context, the internal iterations of the nested agent’s LLM calls do not properly yield their intermediate token events to the outer graph’s stream generator. The inner graph essentially acts as a black box, swallowing the intermediate token events and only emitting the final aggregated message state.

    For a text-based chatbot, a slightly delayed whole-message response might be annoying. For a voice-bot relying on sub-second Time-To-First-Token (TTFT), it is a catastrophic architectural failure.

    HOW WE APPROACHED THE SOLUTION

    We knew we needed to achieve two things without compromising the architecture:

    1. Preserve the prebuilt agent functionalities: We did not want to revert to a custom LLM loop with bind_tools, as that would mean losing the robust middleware, checkpointing, and memory management provided by the built-in agent factory.
    2. Pierce the nested graph boundary: We needed the innermost LLM token chunks to bubble up through the parent StateGraph in real-time.

    Our diagnostic process led us to evaluate asynchronous execution. In Python, yielding real-time chunks from deeply nested asynchronous generators requires the entire stack to be asynchronous. By migrating from the synchronous .stream() to the asynchronous .astream() method, we could leverage LangGraph’s native async event propagation.

    We decided to inject the compiled agent directly into the workflow rather than wrapping it in a blocking Python function. Then, by invoking the workflow asynchronously with stream_mode=["messages"], LangGraph would be able to capture the AIMessageChunk events emitted by the nested agent and yield them instantly to our TTS consumer.

    FINAL IMPLEMENTATION

    Here is the sanitized, refactored solution that successfully enabled token-by-token streaming through the nested workflow.

    import asyncio
    from langgraph.graph import MessagesState, StateGraph, START
    from langgraph.checkpoint.memory import MemorySaver
    from langchain_openai import ChatOpenAI
    from langgraph.prebuilt import create_react_agent
    # 1. Ensure the underlying model supports streaming natively
    model = ChatOpenAI(model="gpt-4o-mini", temperature=0.1, streaming=True)
        
    # 2. Instantiate the agent using built-in methods to preserve middleware
    agent = create_react_agent(model=model, tools=[])
    workflow = StateGraph(MessagesState)
        
    # 3. Add the agent directly as a node rather than wrapping it in a sync function
    workflow.add_node("agent", agent) 
    workflow.add_edge(START, "agent")
    checkpointer = MemorySaver()
    compiled_workflow = workflow.compile(checkpointer=checkpointer)
    async def stream_voice_response(user_input, workflow, config):
        state_input = {"messages": [("human", user_input)]}
        
        # 4. Use asynchronous streaming (astream) with message mode
        async for mode, chunk in workflow.astream(state_input, config, stream_mode=["messages"]):
            if mode == "messages":
                token, metadata = chunk
                
                # 5. Filter for chunks explicitly originating from our user-facing node
                # This prevents echoing user input or intermediate internal reasoning
                if token.content and metadata.get("langgraph_node") == "agent":
                    # Flush immediately for the TTS engine
                    print(token.content, end="", flush=True)
    if __name__ == "__main__":
        config = {"configurable": {"thread_id": "voice-session-101"}}
        # Execute within an asyncio event loop
        asyncio.run(stream_voice_response("Hello, how can I help?", compiled_workflow, config))
    

    Validation and Performance Considerations

    By switching to astream and adding the compiled agent directly as a node, the token streaming was immediately restored. We validated the fix by measuring the TTFT, which dropped from ~3.5 seconds (batched) to ~300 milliseconds (streamed). Furthermore, by filtering via metadata.get("langgraph_node") == "agent", we ensured our intermediate database-fetching nodes remained silent to the user.

    LESSONS FOR ENGINEERING TEAMS

    Building high-performance AI architectures exposes edge cases that local testing often misses. Here are key takeaways other engineering teams should apply:

    • Embrace Async-First AI Development: Synchronous Python wrappers are the enemy of high-throughput AI backends. Always design real-time systems using asynchronous patterns from day one. This is especially true when you hire ai developers for production deployment—ensure they understand Python’s asyncio inside and out.
    • Understand Nested Graph Boundaries: In modern orchestration frameworks like LangGraph, agents are graphs themselves. Treating them as simple functions obscures their internal event loops.
    • Utilize Metadata for Stream Filtering: In a complex workflow, not all text should be spoken or displayed. Leverage the metadata emitted during streaming to filter tokens by their originating node.
    • Avoid Premature Customization: It is tempting to write custom LLM invocation loops to force streaming. However, relying on native methods (like adding the agent directly as a node) preserves built-in checkpointing and middleware.
    • Align Infrastructure with Expertise: Complex stateful applications demand robust architectures. Whether you need to hire python developers for scalable data systems or you need to hire dotnet developers for enterprise modernization, ensure the team recognizes how framework constraints impact end-user latency.

    WRAP UP

    Fixing the token-by-token streaming issue in our LangGraph workflow came down to respecting the framework’s asynchronous event architecture. By removing blocking synchronous wrappers and leveraging astream with targeted metadata filtering, we achieved the sub-second latency required for a production-grade voice-bot while maintaining the integrity of our underlying agent tools.

    Building resilient, low-latency AI workflows requires deep framework knowledge and rigorous architectural standards. If you are looking to scale your engineering efforts with pre-vetted, dedicated experts, you can efficiently hire software developer teams through our platform. contact us to learn how we can help you accelerate your next enterprise deployment.

    Hashtags

    #LangGraph #AIStreaming #PythonAsyncIO #VoiceAI #GenerativeAI #OpenAI #AIEngineering #LLM #ChatbotDevelopment #LangChain #MachineLearning #AITools #SoftwareDevelopment #AIAgents #TechInnovation

    Frequently Asked Questions

    Success Stories That Inspire

    See how our team takes complex business challenges and turns them into powerful, scalable digital solutions. From custom software and web applications to automation, integrations, and cloud-ready systems, each project reflects our commitment to innovation, performance, and long-term value.