-
Notifications
You must be signed in to change notification settings - Fork 933
Handling Incorrect Error Type in validate_structured_result Causes Context Detachment Issue #674
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
Thanks for reporting. @alexmojaki is this a bug with logfire, opentelemetry or how I've done something in pydantic-ai? |
https://logfire.pydantic.dev/docs/reference/advanced/generators/ pydantic-ai/pydantic_ai_slim/pydantic_ai/result.py Lines 291 to 302 in f01f099
This happens when |
@alexmojaki what would be the correct work-around currently? I try to slowly move into pydantic-ai but as of now not planning to use logfire at all. Getting this exception when stream is cancelled from client application. async def get_streaming_response(query: str) -> dict:
"""Uses the Pydantic AI agent to process the query in streaming mode."""
async with agent.run_stream(query) as stream:
async for chunk in stream.stream_text(delta=True):
yield chunk Failed to detach context
Traceback (most recent call last):
File "/home/user/.cache/pypoetry/virtualenvs/api-2E0zB5zf-py3.11/lib/python3.11/site-packages/opentelemetry/trace/__init__.py", line 587, in use_span
yield span
File "/home/user/.cache/pypoetry/virtualenvs/api-2E0zB5zf-py3.11/lib/python3.11/site-packages/pydantic_graph/graph.py", line 264, in iter
yield GraphRun[StateT, DepsT, T](
File "/home/user/.cache/pypoetry/virtualenvs/api-2E0zB5zf-py3.11/lib/python3.11/site-packages/pydantic_ai/agent.py", line 481, in iter
yield AgentRun(graph_run)
File "/home/user/.cache/pypoetry/virtualenvs/api-2E0zB5zf-py3.11/lib/python3.11/site-packages/pydantic_ai/agent.py", line 723, in run_stream
yield StreamedRunResult(
File "/home/user/dev/rag-backend/api/consumers/streaming_agent.py", line 30, in get_streaming_response
yield chunk
GeneratorExit
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/user/.cache/pypoetry/virtualenvs/api-2E0zB5zf-py3.11/lib/python3.11/site-packages/opentelemetry/context/__init__.py", line 155, in detach
_RUNTIME_CONTEXT.detach(token)
File "/home/user/.cache/pypoetry/virtualenvs/api-2E0zB5zf-py3.11/lib/python3.11/site-packages/opentelemetry/context/contextvars_context.py", line 53, in detach
self._current_context.reset(token)
ValueError: <Token var=<ContextVar name='current_context' default={} at 0x7a941f8a9120> at 0x7a94103e4c80> was created in a different Context |
Trying to look into this, things got weirder: import asyncio
from pydantic_ai import Agent
agent = Agent('openai:gpt-4o')
async def main():
async with agent.run_stream('Hello') as stream:
async for _ in stream.stream_text():
break
asyncio.run(main()) logs: Task exception was never retrieved
future: <Task finished name='Task-10' coro=<<async_generator_athrow without __name__>()> exception=ReadError('')>
Traceback (most recent call last):
...
httpcore.ReadError
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/Users/alex/work/pydantic-ai/pydantic_ai_slim/pydantic_ai/result.py", line 444, in _stream_text_deltas
async with _utils.group_by_temporal(_stream_text_deltas_ungrouped(), debounce_by) as group_iter:
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/alex/.pyenv/versions/3.12.6/lib/python3.12/contextlib.py", line 231, in __aexit__
await self.gen.athrow(value)
File "/Users/alex/work/pydantic-ai/pydantic_ai_slim/pydantic_ai/_utils.py", line 180, in group_by_temporal
await task
File "/Users/alex/work/pydantic-ai/pydantic_ai_slim/pydantic_ai/result.py", line 429, in _stream_text_deltas_ungrouped
async for event in self._stream_response:
File "/Users/alex/work/pydantic-ai/pydantic_ai_slim/pydantic_ai/models/openai.py", line 467, in _get_event_iterator
async for chunk in self._response:
File "/Users/alex/work/pydantic-ai/pydantic_ai_slim/pydantic_ai/_utils.py", line 270, in __anext__
return await self._source_iter.__anext__()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/alex/work/pydantic-ai/.venv/lib/python3.12/site-packages/openai/_streaming.py", line 147, in __aiter__
async for item in self._iterator:
...
httpx.ReadError then raises: Traceback (most recent call last):
File "/Users/alex/Library/Application Support/JetBrains/PyCharm2024.3/scratches/scratch_1223.py", line 20, in <module>
asyncio.run(main())
File "/Users/alex/.pyenv/versions/3.12.6/lib/python3.12/asyncio/runners.py", line 194, in run
return runner.run(main)
^^^^^^^^^^^^^^^^
File "/Users/alex/.pyenv/versions/3.12.6/lib/python3.12/asyncio/runners.py", line 118, in run
return self._loop.run_until_complete(task)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/alex/.pyenv/versions/3.12.6/lib/python3.12/asyncio/base_events.py", line 687, in run_until_complete
return future.result()
^^^^^^^^^^^^^^^
File "/Users/alex/Library/Application Support/JetBrains/PyCharm2024.3/scratches/scratch_1223.py", line 15, in main
async with agent.run_stream('Hello') as stream:
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/alex/.pyenv/versions/3.12.6/lib/python3.12/contextlib.py", line 217, in __aexit__
await anext(self.gen)
File "/Users/alex/work/pydantic-ai/pydantic_ai_slim/pydantic_ai/agent.py", line 670, in run_stream
async with node._stream(graph_ctx) as streamed_response: # pyright: ignore[reportPrivateUsage]
File "/Users/alex/.pyenv/versions/3.12.6/lib/python3.12/contextlib.py", line 217, in __aexit__
await anext(self.gen)
File "/Users/alex/work/pydantic-ai/pydantic_ai_slim/pydantic_ai/_agent_graph.py", line 289, in _stream
async for _ in streamed_response:
RuntimeError: anext(): asynchronous generator is already running Narrowing things down: async def main():
async with agent.run_stream('Hello') as stream:
async for _ in stream._stream_response_text(debounce_by=0.1):
break
async for _ in stream._stream_response._event_iterator:
assert not 'reached' throws similarly: Traceback (most recent call last):
File "/Users/alex/Library/Application Support/JetBrains/PyCharm2024.3/scratches/scratch_1226.py", line 21, in <module>
asyncio.run(main())
File "/Users/alex/.pyenv/versions/3.12.6/lib/python3.12/asyncio/runners.py", line 194, in run
return runner.run(main)
^^^^^^^^^^^^^^^^
File "/Users/alex/.pyenv/versions/3.12.6/lib/python3.12/asyncio/runners.py", line 118, in run
return self._loop.run_until_complete(task)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/alex/.pyenv/versions/3.12.6/lib/python3.12/asyncio/base_events.py", line 687, in run_until_complete
return future.result()
^^^^^^^^^^^^^^^
File "/Users/alex/Library/Application Support/JetBrains/PyCharm2024.3/scratches/scratch_1226.py", line 17, in main
async for _ in stream._stream_response._event_iterator:
RuntimeError: anext(): asynchronous generator is already running Changing to |
Trying to narrow things further: import asyncio
from pydantic_ai._utils import group_by_temporal
async def gen():
yield 1
await asyncio.sleep(0.2)
yield 2
async def main():
g = gen()
async with group_by_temporal(g, 0.1) as t:
async for x in t:
print(x)
break
async for x in t:
print(x)
break
asyncio.run(main()) gives: [1]
[2]
Traceback (most recent call last):
File "/Users/alex/work/pydantic-ai/pydantic_ai_slim/pydantic_ai/_utils.py", line 180, in group_by_temporal
await task
StopAsyncIteration
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/Users/alex/Library/Application Support/JetBrains/PyCharm2024.3/scratches/scratch_1227.py", line 23, in <module>
asyncio.run(main())
File "/Users/alex/.pyenv/versions/3.12.6/lib/python3.12/asyncio/runners.py", line 194, in run
return runner.run(main)
^^^^^^^^^^^^^^^^
File "/Users/alex/.pyenv/versions/3.12.6/lib/python3.12/asyncio/runners.py", line 118, in run
return self._loop.run_until_complete(task)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/alex/.pyenv/versions/3.12.6/lib/python3.12/asyncio/base_events.py", line 687, in run_until_complete
return future.result()
^^^^^^^^^^^^^^^
File "/Users/alex/Library/Application Support/JetBrains/PyCharm2024.3/scratches/scratch_1227.py", line 14, in main
async with group_by_temporal(g, 0.1) as t:
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/alex/.pyenv/versions/3.12.6/lib/python3.12/contextlib.py", line 217, in __aexit__
await anext(self.gen)
RuntimeError: async generator raised StopAsyncIteration which isn't quite the same, but I did see similar errors sometimes when playing around with the previous example, e.g. when using a TestModel. |
@alexmojaki weird. Dont think I have anything to bring to the table here - but will properly test if you get to the bottom of this. Debounce by is something I anyhow need to set to None as Azure is giving me a hard time with smooth streams already. If that solves it I'm good. In my case the stream would be interrupted by the asyncio.CancellationError as its used to proxy the stream to client through fastapi. |
It looks like You should know that the traceback comes from this code in the OpenTelemetry SDK: try:
_RUNTIME_CONTEXT.detach(token)
except Exception:
logger.exception("Failed to detach context") This means you don't have to worry that anything is really failing or crashing, it's just logging. Things should still be working normally. This is fine: async with agent.run_stream(query) as stream:
async for chunk in stream.stream_text(debounce_by=None):
break This is fine: async def get_streaming_response(stream):
async for chunk in stream.stream_text(debounce_by=None):
yield chunk
async def main():
async with agent.run_stream(query) as stream:
async for chunk in get_streaming_response(stream):
break But this logs the error: async def get_streaming_response(query):
async with agent.run_stream(query) as stream:
async for chunk in stream.stream_text(debounce_by=None):
yield chunk
async def main():
async for _ in get_streaming_response('Hello'):
break The reason is that So the short answer is to keep |
Uh oh!
There was an error while loading. Please reload this page.
When using the agent.run_stream functionality and attempting to catch with an incorrect exception type during validation with validate_structured_result, the code doesn’t handle the error correctly.
Instead, it fails with a context detachment issue.
The exception doesn’t get caught, and fail.
I’m unsure whether this issue falls under PydanticAI’s responsibility or if it’s expected to be handled by the developer.
👉 However, I’m reporting it in case it can help identify a solution for the next developer.
(You can close this issue if needed.)
Here is a minimal example to reproduce the issue:
The above code fails to handle errors properly and throws the following traceback:
The text was updated successfully, but these errors were encountered: