Skip to content

How to use streaming in sync ? #1502

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
vitalik opened this issue Apr 16, 2025 · 2 comments
Open

How to use streaming in sync ? #1502

vitalik opened this issue Apr 16, 2025 · 2 comments
Assignees
Labels
asyncio question Further information is requested

Comments

@vitalik
Copy link

vitalik commented Apr 16, 2025

Question

Cannot find any example on how to use streaming text chunks in NON-async context

trying something like this:

import asyncio
from datetime import datetime
from pydantic_ai import Agent


def get_event_loop():
    try:
        event_loop = asyncio.get_event_loop()
    except RuntimeError:
        event_loop = asyncio.new_event_loop()
        asyncio.set_event_loop(event_loop)
    return event_loop


agent = Agent(
    'openai:gpt-4o',
    system_prompt=(
        'You are are a helpful assistant that always answer with too many words loosing some times context but finally getting the right answer.'
        'Also always call tool get_current_time to find the current time to know if answer make sense.'
        'Basically be a bit annoying but always correct. it must be at least three paragraphs long.'
    ),
)


@agent.tool
def get_current_time(ctx) -> str:
    """Get the current time."""
    print(f' --> call get_current_time <--')
    return str(datetime.now())


async def agent_stream_deltas(agent):
    async with agent.run_stream('What is the capital of the UK?') as response:
        async for chunk in response.stream_text(delta=True):
            yield chunk


def agent_stream_sync(agent):
    loop = get_event_loop()
    gen = agent_stream_deltas(agent)
    while True:
        try:
            chunk = loop.run_until_complete(gen.__anext__())
            yield chunk
        except StopAsyncIteration:
            pass


if __name__ == '__main__':
    for chunk in agent_stream_sync(agent):
        print(chunk, end='', flush=True)

it prints chunks as they arrive - but et the end crashes/freezes:

 --> call get_current_time <--
The capital of the United Kingdom
,...
. The city is an exuberant mix of old and new, tradition and innovation, known for its diverse communities and vibrant urban fabric. Therefore, while my explanation may have taken you on a slightly winding path filled with relevant details, London stands firmly as the city's heart and soul of the United Kingdom.Failed to detach context\

Traceback (most recent call last):
  File "/private/tmp/aa_differencingly_kusti/.venv/lib/python3.12/site-packages/opentelemetry/context/__init__.py", line 155, in detach
    _RUNTIME_CONTEXT.detach(token)
  File "/private/tmp/aa_differencingly_kusti/.venv/lib/python3.12/site-packages/opentelemetry/context/contextvars_context.py", line 53, in detach
    self._current_context.reset(token)
ValueError: <Token var=<ContextVar name='current_context' default={} at 0x1020d87c0> at 0x104550c00> was created in a different Context

Additional Context

No response

@vitalik vitalik added the question Further information is requested label Apr 16, 2025
@Kludex Kludex added the asyncio label Apr 17, 2025
@Kludex Kludex self-assigned this Apr 17, 2025
@DouweM
Copy link
Contributor

DouweM commented Apr 17, 2025

@vitalik Thanks for reporting this!

There are two things going on here:

  1. The script freezes because it never breaks out of the while True loop: in except StopAsyncIteration, can you change pass to break?

  2. agent.run_stream automatically wraps the work in a logfire span (even if you're not using logfire), and these cannot contain the yield keyword as explained on https://logfire.pydantic.dev/docs/reference/advanced/generators/.

    As @alexmojaki wrote in Handling Incorrect Error Type in validate_structured_result Causes Context Detachment Issue #674 (comment), which hits the same error:

    The reason is that agent.run_stream opens the span and relies on with to ensure it closes nicely. That doesn't happen in the last case because it's in a generator that gets suspended when the loop breaks and never resumes. Instead it gets closed by garbage collection in a different context, hence the error.

    There's more context on the OTel issue tracker as well: Runtime context fails to detach token open-telemetry/opentelemetry-python#2606

The error is harmless, but obviously distracting and a bit scary-looking, so we're going to do two things:

  1. Stop creating a logfire span unless necessary. This will make your example work without any issues, but it's obviously not a complete solution because it'd still affect those using logfire, which we want to be everyone.
  2. Natively support running PydanticAI in a synchronous context, so you don't have to manually do things like loop.run_until_complete(gen.__anext__()): Feature Request: Synchronous Calls #934

@alexmojaki I think you mentioned you're going to look at 1, and @Kludex is already looking into 2.

@cspiecker
Copy link

I’m also running into this issue. Would love to see proper sync support or a clean fix for the span issue. Thanks for looking into it!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
asyncio question Further information is requested
Projects
None yet
Development

No branches or pull requests

4 participants