Skip to content

Getting as async generator object instead of data being to be received as an output through streamable-Http #784

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
koteswaran2186 opened this issue May 22, 2025 · 0 comments

Comments

@koteswaran2186
Copy link

koteswaran2186 commented May 22, 2025

Hi ,

I am working on getting a functionality using mcp tool where i need pass the raw data at client and retrieve it as chunk with configurable Streamable-http. Below is the code for my poc:-

server.py

import asyncio
from typing import AsyncGenerator
from fastmcp import FastMCP

mcp = FastMCP(name="MyServer")

@mcp.tool()
async def generateReport(params: dict = None) -> AsyncGenerator[str, None]:
"""Generate a report with streaming response using yield.

This processes the raw_data and returns it in chunks for streaming.
"""

# Convert the raw data to string format
raw_data_str = str(params)

# Define chunk size
chunk_size = 100

# Stream the content in chunks with index
chunk_index = 0
for i in range(0, len(raw_data_str), chunk_size):
    chunk = raw_data_str[i:i + chunk_size]
    yield f"chunk {chunk_index}: {chunk}\n"
    chunk_index += 1
    # Add delay to simulate processing time
    await asyncio.sleep(0.05)

if name == "main":
mcp.run(transport="streamable-http",
host="127.0.0.1",
port=8000)

democlient.py:-

import asyncio
from fastmcp import Client # type: ignore
from fastmcp.client.transports import StreamableHttpTransport # type: ignore

async def example():
transport = StreamableHttpTransport("http://127.0.0.1:8000/mcp/")
async with Client(transport=transport) as client:
await client.ping()
print("Ping is successful !")

    raw_data = {
        "title": "Sales Performance Report",
        "period": "Q4 2024",
        "metrics": ["revenue", "growth", "customer_satisfaction"],
        "department": ["Sales", "Marketing", "Customer Service"],
        "format": "detailed_analysis"
    }
    
    print(f"\nCalling generateReport with streaming response:")
    print(f"Parameters: {raw_data}")
    print("\nStreaming response...")
    print("-" * 50)
    
    try:
        # Call the streaming tool
        report_stream = await client.call_tool("generateReport", {"params": raw_data})
        
        print("Tool call successful!")
        print(f"Response type: {type(report_stream)}")
        
        # Handle different response types
        if hasattr(report_stream, '__aiter__'):
            print("Detected async iterator - processing stream...")
            chunk_count = 0
            full_output = ""
            async for chunk in report_stream:
                chunk_count += 1
                full_output += str(chunk)
                # Print chunk with better formatting
                print(f"Received: {str(chunk).strip()}")
                
            print(f"\nTotal chunks received: {chunk_count}")
            print(f"Total output length: {len(full_output)} characters")
            
        elif hasattr(report_stream, 'content') and hasattr(report_stream.content, '__aiter__'):
            print("Detected content stream - processing...")
            chunk_count = 0
            full_output = ""
            async for chunk in report_stream.content:
                chunk_count += 1
                full_output += str(chunk)
                print(f"Received: {str(chunk).strip()}")
                
            print(f"\nTotal chunks received: {chunk_count}")
            print(f"Total output length: {len(full_output)} characters")
            
        elif isinstance(report_stream, (list, tuple)):
            print("Received list/tuple response:")
            for i, item in enumerate(report_stream):
                if hasattr(item, 'text'):
                    print(f"Item {i}: {item.text}")
                else:
                    print(f"Item {i}: {str(item)}")
                    
        else:
            # Handle non-streaming response
            response_str = str(report_stream)
            print(f"Received non-streaming response: {len(response_str)} characters")
            print(f"Content: {response_str}")
                
    except Exception as e:
        print(f"Error during streaming: {e}")
        print(f"Error type: {type(e)}")

async def render_to_output():
"""UI Client like function to fetch and render the streamed response."""
transport = StreamableHttpTransport("http://127.0.0.1:8000/mcp/")
async with Client(transport=transport) as client:
print("\n" + "="*60)
print("RENDERING STREAMED RESPONSE TO OUTPUT")
print("="*60)

    # More comprehensive request data
    raw_request_data = {
        "title": "Comprehensive Business Analysis Report",
        "report_type": "comprehensive_analysis",
        "data_sources": ["sales_db", "customer_feedback", "market_research"],
        "time_range": {
            "start": "2024-01-01",
            "end": "2024-12-31"
        },
        "metrics": ["revenue", "customer_acquisition", "market_share", "efficiency"],
        "department": ["Sales", "Marketing", "Operations", "Finance"],
        "include_charts": True,
        "detail_level": "high",
        "output_format": "markdown",
        "format": "detailed_analysis"
    }
    
    print(f"Request parameters: {raw_request_data}")
    print("\nStreaming output:")
    print("-" * 40)
    
    output = ""
    chunk_count = 0
    
    try:
        # Call the streaming tool
        report_stream = await client.call_tool("generateReport", {"params": raw_request_data})
        
        print("Tool call successful!")
        print(f"Response type: {type(report_stream)}")
        
        # Handle different response types
        if hasattr(report_stream, '__aiter__'):
            print("Detected async iterator - processing stream...")
            async for chunk in report_stream:
                chunk_count += 1
                output += str(chunk)
                
                # Print the actual chunk content
                print(f"Received: {str(chunk).strip()}")
                    
        elif hasattr(report_stream, 'content') and hasattr(report_stream.content, '__aiter__'):
            print("Detected content stream - processing...")
            async for chunk in report_stream.content:
                chunk_count += 1
                output += str(chunk)
                
                print(f"Received: {str(chunk).strip()}")
                    
        elif isinstance(report_stream, (list, tuple)):
            print("Received list/tuple response:")
            for i, item in enumerate(report_stream):
                if hasattr(item, 'text'):
                    print(f"Item {i}: {item.text}")
                    output += item.text
                else:
                    print(f"Item {i}: {str(item)}")
                    output += str(item)
                    
        else:
            # Handle non-streaming response
            if isinstance(report_stream, str):
                output = report_stream
            else:
                output = str(report_stream)
            print(f"Non-streaming output received: {len(output)} characters")
            print(f"Content: {output}")
            
    except Exception as e:
        print(f"Error during streaming: {e}")
        print(f"Error type: {type(e)}")
    
    # Final output summary
    print("\n" + "-" * 40)
    print(f"FINAL SUMMARY:")
    print(f"Total chunks: {chunk_count}")
    print(f"Total output length: {len(output)} characters")
    
    if output:
        preview_len = 200
        print(f"\nOutput preview (first {preview_len} chars):")
        print("'" + output[:preview_len] + "'" + ("..." if len(output) > preview_len else ""))

if name == "main":
asyncio.run(example())
asyncio.run(render_to_output())

Getting output as below instead of raw data as incremental text chunks

Received non-streaming response: [TextContext(type="text", text="<async_generator object generateReport" at 0x0001C5809504480", annotations= None]"

Any help would be highly appreciated?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant