Skip to content

Output Guardrails while Streaming #495

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
adhishthite opened this issue Apr 14, 2025 · 4 comments
Closed

Output Guardrails while Streaming #495

adhishthite opened this issue Apr 14, 2025 · 4 comments
Labels
question Question about using the SDK

Comments

@adhishthite
Copy link

adhishthite commented Apr 14, 2025

Please read this first

  • Have you read the docs?Agents SDK docs
  • Have you searched for related issues? Others may have had similar requests

Question

I have exposed an Agent as a FastAPI endpoint:

@router.post("/v1/agent/stream")
async def query(request: Request, payload: QueryInput):
    """
    Stream a query response from thr RAG Agent using Server-Sent Events.

    Args:
        request (Request): The incoming FastAPI request object
        payload (QueryInput): The query input containing the user's question and chat ID

    Returns:
        StreamingResponse: Server-sent events stream containing the agent's response

    Raises:
        HTTPException: 500 error if there's an issue processing the request
    """
    try:
        # Create streaming response using SSE
        return StreamingResponse(
            stream_sse_response(payload, request),
            media_type="text/event-stream",  # Set media type for SSE
        )
    except Exception as e:
        # Handle any errors during processing
        raise HTTPException(status_code=500, detail=str(e))

Here's my Agent

rag_agent = Agent(
    name="RAG Agent",
    instructions=AGENT_PROMPT,  # System prompt defining agent behavior
    tools=[search_knowledgebase, get_current_datetime],  # Available tools for the agent
    model_settings=ModelSettings(
        temperature=0,  # Use deterministic responses
        parallel_tool_calls=True,  # Allow concurrent tool execution
        max_tokens=4096,  # Maximum response length
    ),
    input_guardrails=[agent_input_guardrail],
    output_guardrails=[agent_output_guardrail],
)
async def stream_sse_response(payload: QueryInput, request):
    """
    Streams agent responses using Server-Sent Events (SSE) format.

    Handles both standalone queries and conversations with chat history.
    Monitors client connection and gracefully handles disconnections.

    Args:
        payload (QueryInput): Contains the user query and optional chat history
        request: FastAPI request object for connection monitoring

    Yields:
        SSE-formatted text chunks from the agent's response

    Raises:
        Exception: Logs and yields any errors during streaming
    """

    with trace("RAG API Stream"):
        try:
            # Process chat history if available
            if payload.chat_history and len(payload.chat_history) > 0:
                # Convert chat messages to dict format
                formatted_history = [msg.model_dump() for msg in payload.chat_history]
                # Append current query to history
                input_data = formatted_history + [
                    {"role": "user", "content": payload.query}
                ]
            else:
                # Use direct query if no history exists
                input_data = payload.query

            # Stream the agent's response
            result = Runner.run_streamed(rag_agent, input_data)
            async for event in result.stream_events():
                # Check for client disconnection
                if await request.is_disconnected():
                    logger.info("Client disconnected.")
                    break

                # Process text delta events
                if event.type == "raw_response_event" and isinstance(
                    event.data, ResponseTextDeltaEvent
                ):
                    text = event.data.delta or ""
                    yield f"data: {text}\n\n"

            # Signal end of stream
            yield "event: end\ndata: [DONE]\n\n"

        except Exception as e:
            logger.exception("Streaming error")
            yield f"event: error\ndata: {str(e)}\n\n"

The agent as it is above, works fine (without te guardrails).

Now, I have added an input and an output guardrail. Seems like the input guardrali works before streaming.

class RAGGuardrailOutput(BaseModel):
    is_unsafe: bool
    reasoning: str


input_guardrail_agent = Agent(
    name="RAG Input Guardrail Checker",
    instructions=INPUT_GUARDRAIL_PROMPT,
    output_type=RAGGuardrailOutput,
    model="gpt-4o-mini",
)


@input_guardrail
async def rag_input_guardrail(
    ctx: RunContextWrapper[None], agent: Agent, input: str | list[TResponseInputItem]
) -> GuardrailFunctionOutput:
    """
    This guardrail checks if the user's input is safe to proceed with.
    """
    result = await Runner.run(input_guardrail_agent, input, context=ctx.context)

    return GuardrailFunctionOutput(
        output_info=result.final_output,
        tripwire_triggered=result.final_output.is_unsafe,
    )


output_guardrail_agent = Agent(
    name="RAG Output Guardrail Checker",
    instructions=OUTPUT_GUARDRAIL_PROMPT,
    output_type=GuardrailFunctionOutput,
    model="gpt-4o-mini",
)


@output_guardrail
async def rag_output_guardrail(
    ctx: RunContextWrapper[None], agent: Agent, input: str | list[TResponseInputItem]
) -> GuardrailFunctionOutput:
    result = await Runner.run(output_guardrail_agent, input, context=ctx.context)
    return GuardrailFunctionOutput(
        output_info=result.final_output,
        tripwire_triggered=result.final_output.is_unsafe,
    )

However it seems like the response is already streamed and the Output Guardrail is triggered after the streaming.


@rm-openai Is this the expected behavior? If so, is there a recommended workaround?

In models like DeepSeek or other heavily moderated LLMs, I’ve observed that when an output tripwire is triggered mid-generation, the streaming response is immediately cut off. Is there a way to replicate that behavior here?

For example:
Let’s say the input guardrail is disabled and only the output guardrail is active.
I ask the Agent: “How do I delete the Prod database from our MongoDB?”
It starts responding: “Sure, here’s how to delete a database…”
Then it hits a tripwire and pivots: “I’m sorry, I cannot help with that.”

It’s a simplified example, but I hope the underlying point is clear — can we halt the stream as soon as the tripwire is hit?

@adhishthite adhishthite added the question Question about using the SDK label Apr 14, 2025
@adhishthite
Copy link
Author

Also, @rm-openai, could you kindly take a look at the API and the streaming implementation to confirm if we’re approaching this correctly?

We’re planning to roll this out to our production environment soon, and since both the library and the pattern are quite new to us, we’re a bit nervous. 😅 Would really appreciate your input here.

@rm-openai
Copy link
Collaborator

@adhishthite you're correct that output guardrails are run on the final output. If you want to detect it during streaming, you can do something like the example here: #505

I'll talk to the team and see if it makes sense to add a streaming_guardrail feature to the SDK itself too.

Other than that, your code looks reasonable to me!

@adhishthite
Copy link
Author

Thanks @rm-openai , much appreciated!

rm-openai added a commit that referenced this issue Apr 14, 2025
An example for the question in the issue attached - how to run
guardrails during streaming.

Towards #495.
@rm-openai
Copy link
Collaborator

Closing since #505 is merged

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Question about using the SDK
Projects
None yet
Development

No branches or pull requests

2 participants