Table of Contents

    Book an Appointment

    INTRODUCTION

    While working on a complex conversational AI voice-bot for a FinTech platform, we needed to build an intelligent routing and response system. To handle the complexity of intent classification, data fetching, and user response generation, we adopted a multi-agent architecture using LangGraph. In this setup, several intermediary agents processed background tasks, while dedicated user-facing agents generated the final spoken responses.

    During the initial integration phase, we discovered a critical latency bottleneck. When testing an agent as a standalone entity, it successfully streamed its response token-by-token, offering the near-instantaneous Time-to-First-Token (TTFT) required for natural voice interactions. However, the moment we integrated this exact same agent as a node within a LangGraph StateGraph workflow, it lost its streaming capability. Instead of a fluid token stream, the system buffered the execution, waiting for the entire node operation to complete before delivering the full text block.

    In voice-bot architectures, throughput and TTFT are make-or-break metrics. Waiting seconds for a complete response to generate before synthesizing audio creates an unacceptable user experience. We needed a way to preserve the built-in functionalities of standard LangChain/LangGraph agents while forcing the workflow to yield tokens in real-time. We encountered a situation where standard synchronous execution masked the underlying streaming capabilities. This challenge inspired this article so other engineering teams can avoid the same architectural roadblock when scaling multi-agent systems.

    PROBLEM CONTEXT

    In modern AI architectures, especially those built on LangGraph, workflows are typically structured as state machines (StateGraph). Each node represents a discrete operation—often an entire LLM agent—that receives a state, mutates it, and passes it forward.

    Our specific use case required a hybrid approach to streaming:

    • Intermediary Nodes: Agents responsible for internal reasoning, database lookups, or API calls. For these, we did not care about streaming. The system needed the complete output to decide the next routing step anyway.
    • User-Facing Nodes: Agents responsible for crafting the final conversational response. For these, real-time token-by-token output was mandatory to feed the Text-to-Speech (TTS) engine seamlessly.

    We wanted to rely on LangGraph’s native create_agent (or create_react_agent) methods to retain access to built-in middleware, memory management, and tool-calling validation. Falling back to a raw LLM wrapped with bind_tools() would have given us streaming easily, but at the cost of reinventing the wheel for state management and retry logic. We needed the best of both worlds: robust agent scaffolding and granular streaming.

    WHAT WENT WRONG

    To understand the bottleneck, we had to examine how the agent was being encapsulated and invoked within the graph. The initial implementation looked something like this:

    def agent_node(state: MessagesState):
       return agent.invoke({"messages": state["messages"]})
    

    When the workflow was compiled and executed via compiled_workflow.stream(..., stream_mode=["messages"]), the user-facing output arrived as one massive chunk at the very end. The architectural oversights were twofold:

    1. Synchronous Blocking Operations: By using agent.invoke() inside the node function, we forced the Python thread to block until the agent completed its entire generative cycle. invoke() is fundamentally a synchronous, non-yielding command. The graph node could not update the state until invoke() returned the final payload.

    2. Incompatible Stream Modes: The stream_mode=["messages"] parameter works beautifully for raw LLMs or perfectly async-native subgraphs. However, when legacy agents or heavily abstracted components are placed inside a generic Python function node, the internal token generation events are swallowed by the synchronous wrapper. The parent graph only sees the final state update, not the granular LLM events occurring deep within the invocation.

    HOW WE APPROACHED THE SOLUTION

    Our diagnostic process focused on unlocking the internal event stream of the LLM without dismantling the agent itself. We considered a few tradeoffs:

    First, we looked at injecting synchronous custom StreamingStdOutCallbackHandler instances. While this prints tokens to the console, it is virtually useless for a programmatic backend that needs to capture tokens and stream them over WebSockets or gRPC to a frontend TTS engine.

    Next, we evaluated converting the entire graph to use raw chat models. We quickly discarded this. When companies look to hire python developers for scalable data systems, they expect reusable, maintainable architectures—not custom-built, brittle replacements for well-tested library primitives.

    Finally, we landed on the optimal path: transitioning the entire execution pipeline to asynchronous operations and utilizing the LangChain/LangGraph V2 Streaming API (astream_events). By shifting from invoke() to ainvoke(), we allowed the underlying event loop to breathe. By using astream_events on the compiled workflow, we could dynamically subscribe to internal execution events—specifically on_chat_model_stream—bypassing the node-level state buffering entirely.

    FINAL IMPLEMENTATION

    The fix required changes at both the node definition level and the execution level. Here is the sanitized, corrected architecture.

    from langgraph.graph import MessagesState, StateGraph, START
    from langchain_openai import ChatOpenAI
    from langgraph.checkpoint.memory import MemorySaver
    from langchain.agents import create_agent
    import asyncio
    # Initialize model with streaming natively enabled
    model = ChatOpenAI(model="gpt-4o-mini", temperature=0.1, streaming=True)
    # Create the agent using the built-in method
    agent = create_agent(
        model=model,
        tools=[],
    )
    # CRITICAL FIX 1: Make the node asynchronous and use ainvoke
    async def async_agent_node(state: MessagesState):
       response = await agent.ainvoke({"messages": state["messages"]})
       return response
    checkpointer = MemorySaver()
    workflow = StateGraph(MessagesState)
    workflow.add_node("agent", async_agent_node)
    workflow.add_edge(START, "agent")
    compiled_workflow = workflow.compile(checkpointer=checkpointer)
    # CRITICAL FIX 2: Use astream_events to catch deeply nested tokens
    async def stream_response(user_input, compiled_workflow, config):
        user_input_state = {"messages": [("human", user_input)]}
        # Listen to the V2 event stream
        async for event in compiled_workflow.astream_events(user_input_state, config, version="v2"):
            kind = event["event"]
            # Filter explicitly for token generation events
            if kind == "on_chat_model_stream":
                # We can also filter by event["name"] if we only want tokens from a specific node/agent
                chunk = event["data"]["chunk"]
                if chunk.content:
                    # Yield or print the token for the WebSocket/Voice backend
                    print(chunk.content, end="", flush=True)
    if __name__ == "__main__":
        config = {"configurable": {"thread_id": "voice-session-1"}}
        asyncio.run(stream_response("Hello, how can you help me today?", compiled_workflow, config))
    

    Validation Steps

    Upon implementing this, the intermediary nodes (if we added them) could execute fully, and their internal events could either be ignored or logged based on the event["name"] metadata. When the execution reached the user-facing async_agent_node, the on_chat_model_stream events triggered instantly, streaming tokens down to the TTS service with mere milliseconds of latency.

    LESSONS FOR ENGINEERING TEAMS

    Solving this architectural quirk reinforced several best practices for building scalable AI pipelines:

    • Embrace Async by Default: When building I/O bound LLM applications, synchronous operations are a liability. Always default to ainvoke and astream to prevent thread blocking and enable real-time event propagation.
    • Understand Workflow State vs. Execution Events: A LangGraph StateGraph tracks the holistic state of a conversation, while astream_events tracks the micro-operations of execution. You must tap into the execution events, not the state updates, to achieve token-level granularity.
    • Filter Events Granularly: The V2 events API emits a massive amount of data (tool starts, chain ends, LLM starts). Implement strict filtering on event["event"] == "on_chat_model_stream" and evaluate the source node to prevent echoing internal thought processes to the user.
    • Preserve Framework Primitives: Avoid the temptation to rewrite standard agents just to get streaming working. Utilizing the correct streaming API allows you to maintain middleware, memory, and tool schemas cleanly. This is a standard we strictly adhere to when clients hire ai developers for production deployment with our teams.
    • Test TTFT in Production Contexts: Always measure Time-to-First-Token in an environment that mimics production routing. A standalone test script is not representative of how an agent will behave when wrapped in workflow middleware.

    WRAP UP

    Integrating pre-built agents into complex LangGraph workflows does not mean sacrificing real-time performance. By shifting away from synchronous node invocations and leveraging the highly granular astream_events API, we successfully unblocked the LLM token stream. This approach allowed our multi-agent architecture to maintain high throughput for user-facing voice interactions while securely delegating complex reasoning to background nodes.

    If your team is facing complex architectural bottlenecks in AI integration, workflow orchestration, or scalable backends, you shouldn’t have to compromise on performance. Whether you need to hire software developer expertise for an immediate fix or assemble a dedicated engineering pod to overhaul your platform’s architecture, we can help. Feel free to contact us to discuss how our pre-vetted teams can accelerate your roadmap.

    Social Hashtags

    #LangGraph #LangChain #LLM #AIEngineering #GenerativeAI #StreamingAI #PythonAI #VoiceBots #AsyncProgramming #AIWorkflows #MachineLearning #OpenAI #SoftwareEngineering #DevOps #TechTutorial

    Frequently Asked Questions