Skip to content

Handling Incorrect Error Type in validate_structured_result Causes Context Detachment Issue #674

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
YanSte opened this issue Jan 13, 2025 · 7 comments
Assignees
Labels
bug Something isn't working OpenTelemetry

Comments

@YanSte
Copy link
Contributor

YanSte commented Jan 13, 2025

When using the agent.run_stream functionality and attempting to catch with an incorrect exception type during validation with validate_structured_result, the code doesn’t handle the error correctly.
Instead, it fails with a context detachment issue.

The exception doesn’t get caught, and fail.

I’m unsure whether this issue falls under PydanticAI’s responsibility or if it’s expected to be handled by the developer.
👉 However, I’m reporting it in case it can help identify a solution for the next developer.

(You can close this issue if needed.)

Here is a minimal example to reproduce the issue:

async with agent.run_stream(...):
    async for message, last in result.stream_structured():
        try:
            profile = await result.validate_structured_result(
                message,
                allow_partial=not last,
            )
        except ValidationError:  # <<< The error isn't caught here as expected (incorrect exception type)
            continue
        # Nothing more

The above code fails to handle errors properly and throws the following traceback:

Failed to detach context
Traceback (most recent call last):
  File "..../pydantic_ai/result.py", line 302, in stream_structured
    yield msg, False
GeneratorExit

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "..../opentelemetry/context/__init__.py", line 152, in detach
    _RUNTIME_CONTEXT.detach(token)
  File "..../opentelemetry/context/contextvars_context.py", line 50, in detach
    self._current_context.reset(token)  # type: ignore
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
ValueError: <Token var=<ContextVar name='current_context' default={} at xxx> at xxx> was created in a different Context
@samuelcolvin samuelcolvin added the bug Something isn't working label Jan 16, 2025
@samuelcolvin
Copy link
Member

Thanks for reporting.

@alexmojaki is this a bug with logfire, opentelemetry or how I've done something in pydantic-ai?

@alexmojaki
Copy link
Contributor

https://logfire.pydantic.dev/docs/reference/advanced/generators/

with _logfire.span('response stream structured') as lf_span:
# if the message currently has any parts with content, yield before streaming
msg = self._stream_response.get()
for part in msg.parts:
if part.has_content():
yield msg, False
break
async with _utils.group_by_temporal(usage_checking_stream, debounce_by) as group_iter:
async for _events in group_iter:
msg = self._stream_response.get()
yield msg, False

This happens when with span contains yield

@martgra
Copy link

martgra commented Mar 17, 2025

@alexmojaki what would be the correct work-around currently?

I try to slowly move into pydantic-ai but as of now not planning to use logfire at all.

Getting this exception when stream is cancelled from client application.

async def get_streaming_response(query: str) -> dict:
    """Uses the Pydantic AI agent to process the query in streaming mode."""
    async with agent.run_stream(query) as stream:
        async for chunk in stream.stream_text(delta=True):
            yield chunk
Failed to detach context
Traceback (most recent call last):
  File "/home/user/.cache/pypoetry/virtualenvs/api-2E0zB5zf-py3.11/lib/python3.11/site-packages/opentelemetry/trace/__init__.py", line 587, in use_span
    yield span
  File "/home/user/.cache/pypoetry/virtualenvs/api-2E0zB5zf-py3.11/lib/python3.11/site-packages/pydantic_graph/graph.py", line 264, in iter
    yield GraphRun[StateT, DepsT, T](
  File "/home/user/.cache/pypoetry/virtualenvs/api-2E0zB5zf-py3.11/lib/python3.11/site-packages/pydantic_ai/agent.py", line 481, in iter
    yield AgentRun(graph_run)
  File "/home/user/.cache/pypoetry/virtualenvs/api-2E0zB5zf-py3.11/lib/python3.11/site-packages/pydantic_ai/agent.py", line 723, in run_stream
    yield StreamedRunResult(
  File "/home/user/dev/rag-backend/api/consumers/streaming_agent.py", line 30, in get_streaming_response
    yield chunk
GeneratorExit

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/user/.cache/pypoetry/virtualenvs/api-2E0zB5zf-py3.11/lib/python3.11/site-packages/opentelemetry/context/__init__.py", line 155, in detach
    _RUNTIME_CONTEXT.detach(token)
  File "/home/user/.cache/pypoetry/virtualenvs/api-2E0zB5zf-py3.11/lib/python3.11/site-packages/opentelemetry/context/contextvars_context.py", line 53, in detach
    self._current_context.reset(token)
ValueError: <Token var=<ContextVar name='current_context' default={} at 0x7a941f8a9120> at 0x7a94103e4c80> was created in a different Context

@alexmojaki
Copy link
Contributor

Trying to look into this, things got weirder:

import asyncio

from pydantic_ai import Agent

agent = Agent('openai:gpt-4o')


async def main():
    async with agent.run_stream('Hello') as stream:
        async for _ in stream.stream_text():
            break


asyncio.run(main())

logs:

Task exception was never retrieved
future: <Task finished name='Task-10' coro=<<async_generator_athrow without __name__>()> exception=ReadError('')>
Traceback (most recent call last):
  ...
httpcore.ReadError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/alex/work/pydantic-ai/pydantic_ai_slim/pydantic_ai/result.py", line 444, in _stream_text_deltas
    async with _utils.group_by_temporal(_stream_text_deltas_ungrouped(), debounce_by) as group_iter:
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/alex/.pyenv/versions/3.12.6/lib/python3.12/contextlib.py", line 231, in __aexit__
    await self.gen.athrow(value)
  File "/Users/alex/work/pydantic-ai/pydantic_ai_slim/pydantic_ai/_utils.py", line 180, in group_by_temporal
    await task
  File "/Users/alex/work/pydantic-ai/pydantic_ai_slim/pydantic_ai/result.py", line 429, in _stream_text_deltas_ungrouped
    async for event in self._stream_response:
  File "/Users/alex/work/pydantic-ai/pydantic_ai_slim/pydantic_ai/models/openai.py", line 467, in _get_event_iterator
    async for chunk in self._response:
  File "/Users/alex/work/pydantic-ai/pydantic_ai_slim/pydantic_ai/_utils.py", line 270, in __anext__
    return await self._source_iter.__anext__()
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/alex/work/pydantic-ai/.venv/lib/python3.12/site-packages/openai/_streaming.py", line 147, in __aiter__
    async for item in self._iterator:
  ...
httpx.ReadError

then raises:

Traceback (most recent call last):
  File "/Users/alex/Library/Application Support/JetBrains/PyCharm2024.3/scratches/scratch_1223.py", line 20, in <module>
    asyncio.run(main())
  File "/Users/alex/.pyenv/versions/3.12.6/lib/python3.12/asyncio/runners.py", line 194, in run
    return runner.run(main)
           ^^^^^^^^^^^^^^^^
  File "/Users/alex/.pyenv/versions/3.12.6/lib/python3.12/asyncio/runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/alex/.pyenv/versions/3.12.6/lib/python3.12/asyncio/base_events.py", line 687, in run_until_complete
    return future.result()
           ^^^^^^^^^^^^^^^
  File "/Users/alex/Library/Application Support/JetBrains/PyCharm2024.3/scratches/scratch_1223.py", line 15, in main
    async with agent.run_stream('Hello') as stream:
               ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/alex/.pyenv/versions/3.12.6/lib/python3.12/contextlib.py", line 217, in __aexit__
    await anext(self.gen)
  File "/Users/alex/work/pydantic-ai/pydantic_ai_slim/pydantic_ai/agent.py", line 670, in run_stream
    async with node._stream(graph_ctx) as streamed_response:  # pyright: ignore[reportPrivateUsage]
  File "/Users/alex/.pyenv/versions/3.12.6/lib/python3.12/contextlib.py", line 217, in __aexit__
    await anext(self.gen)
  File "/Users/alex/work/pydantic-ai/pydantic_ai_slim/pydantic_ai/_agent_graph.py", line 289, in _stream
    async for _ in streamed_response:
RuntimeError: anext(): asynchronous generator is already running

Narrowing things down:

async def main():
    async with agent.run_stream('Hello') as stream:
        async for _ in stream._stream_response_text(debounce_by=0.1):
            break
        async for _ in stream._stream_response._event_iterator:
            assert not 'reached'

throws similarly:

Traceback (most recent call last):
  File "/Users/alex/Library/Application Support/JetBrains/PyCharm2024.3/scratches/scratch_1226.py", line 21, in <module>
    asyncio.run(main())
  File "/Users/alex/.pyenv/versions/3.12.6/lib/python3.12/asyncio/runners.py", line 194, in run
    return runner.run(main)
           ^^^^^^^^^^^^^^^^
  File "/Users/alex/.pyenv/versions/3.12.6/lib/python3.12/asyncio/runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/alex/.pyenv/versions/3.12.6/lib/python3.12/asyncio/base_events.py", line 687, in run_until_complete
    return future.result()
           ^^^^^^^^^^^^^^^
  File "/Users/alex/Library/Application Support/JetBrains/PyCharm2024.3/scratches/scratch_1226.py", line 17, in main
    async for _ in stream._stream_response._event_iterator:
RuntimeError: anext(): asynchronous generator is already running

Changing to debounce_by=None fixes things, i.e. it gets past the async for _ in stream._stream_response._event_iterator:, and if you remove that second 'loop' then the whole thing cleans up and finishes properly.

@alexmojaki
Copy link
Contributor

Trying to narrow things further:

import asyncio

from pydantic_ai._utils import group_by_temporal


async def gen():
    yield 1
    await asyncio.sleep(0.2)
    yield 2


async def main():
    g = gen()
    async with group_by_temporal(g, 0.1) as t:
        async for x in t:
            print(x)
            break
        async for x in t:
            print(x)
            break


asyncio.run(main())

gives:

[1]
[2]
Traceback (most recent call last):
  File "/Users/alex/work/pydantic-ai/pydantic_ai_slim/pydantic_ai/_utils.py", line 180, in group_by_temporal
    await task
StopAsyncIteration

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/alex/Library/Application Support/JetBrains/PyCharm2024.3/scratches/scratch_1227.py", line 23, in <module>
    asyncio.run(main())
  File "/Users/alex/.pyenv/versions/3.12.6/lib/python3.12/asyncio/runners.py", line 194, in run
    return runner.run(main)
           ^^^^^^^^^^^^^^^^
  File "/Users/alex/.pyenv/versions/3.12.6/lib/python3.12/asyncio/runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/alex/.pyenv/versions/3.12.6/lib/python3.12/asyncio/base_events.py", line 687, in run_until_complete
    return future.result()
           ^^^^^^^^^^^^^^^
  File "/Users/alex/Library/Application Support/JetBrains/PyCharm2024.3/scratches/scratch_1227.py", line 14, in main
    async with group_by_temporal(g, 0.1) as t:
               ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/alex/.pyenv/versions/3.12.6/lib/python3.12/contextlib.py", line 217, in __aexit__
    await anext(self.gen)
RuntimeError: async generator raised StopAsyncIteration

which isn't quite the same, but I did see similar errors sometimes when playing around with the previous example, e.g. when using a TestModel.

@martgra
Copy link

martgra commented Mar 17, 2025

@alexmojaki weird. Dont think I have anything to bring to the table here - but will properly test if you get to the bottom of this. Debounce by is something I anyhow need to set to None as Azure is giving me a hard time with smooth streams already. If that solves it I'm good.

In my case the stream would be interrupted by the asyncio.CancellationError as its used to proxy the stream to client through fastapi.

@alexmojaki
Copy link
Contributor

It looks like debounce_by=None doesn't solve the context detaching error.

You should know that the traceback comes from this code in the OpenTelemetry SDK:

    try:
        _RUNTIME_CONTEXT.detach(token)
    except Exception:
        logger.exception("Failed to detach context")

This means you don't have to worry that anything is really failing or crashing, it's just logging. Things should still be working normally.

This is fine:

    async with agent.run_stream(query) as stream:
        async for chunk in stream.stream_text(debounce_by=None):
            break

This is fine:

async def get_streaming_response(stream):
    async for chunk in stream.stream_text(debounce_by=None):
        yield chunk

async def main():
    async with agent.run_stream(query) as stream:
        async for chunk in get_streaming_response(stream):
            break

But this logs the error:

async def get_streaming_response(query):
    async with agent.run_stream(query) as stream:
        async for chunk in stream.stream_text(debounce_by=None):
            yield chunk

async def main():
    async for _ in get_streaming_response('Hello'):
        break

The reason is that agent.run_stream opens the span and relies on with to ensure it closes nicely. That doesn't happen in the last case because it's in a generator that gets suspended when the loop breaks and never resumes. Instead it gets closed by garbage collection in a different context, hence the error.

So the short answer is to keep agent.run_stream in a function that doesn't have yield. The longer answer is to see https://logfire.pydantic.dev/docs/reference/advanced/generators/ and mentally replace logfire.span with agent.run_stream.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working OpenTelemetry
Projects
None yet
Development

No branches or pull requests

5 participants