Skip to content

201 ml streaming endpoint #202

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
May 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ PYTHONDONTWRITEBYTECODE=1
PYTHONUNBUFFERED=1

# Postgres
POSTGRES_HOST=db
POSTGRES_HOST=localhost
POSTGRES_PORT=5432
POSTGRES_DB=devdb
POSTGRES_USER=devdb
POSTGRES_PASSWORD=secret

# Redis
REDIS_HOST=inmemory
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_DB=2

Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ py-upgrade: ## Upgrade project py files with pyupgrade library for python versio

.PHONY: lint
lint: ## Lint project code.
poetry run ruff check --fix .
uv run ruff check --fix .

.PHONY: slim-build
slim-build: ## with power of docker-slim build smaller and safer images
Expand Down
20 changes: 20 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
<li><a href="#worker-aware-async-scheduler">Schedule jobs</a></li>
<li><a href="#smtp-setup">Email Configuration</a></li>
<li><a href="#uv-knowledge-and-inspirations">UV knowledge and inspirations</a></li>
<li><a href="#large-language-model">Integration with local LLM</a></li>
</ul>
</li>
<li><a href="#acknowledgments">Acknowledgments</a></li>
Expand Down Expand Up @@ -162,6 +163,24 @@ This service supports plaintext and HTML emails, and also allows sending templat
It is implemented as a singleton to ensure that only one SMTP connection is maintained
throughout the application lifecycle, optimizing resource usage.

<p align="right">(<a href="#readme-top">back to top</a>)</p>

### Large Language Model
The `/v1/ml/chat/` endpoint is designed to handle chat-based interactions with the LLM model.
It accepts a user prompt and streams responses back in real-time.
The endpoint leverages FastAPI's asynchronous capabilities to efficiently manage multiple simultaneous requests,
ensuring low latency and high throughput.

FastAPI's async support is particularly beneficial for reducing I/O bottlenecks when connecting to the LLM model.
By using asynchronous HTTP clients like `httpx`,
the application can handle multiple I/O-bound tasks concurrently,
such as sending requests to the LLM server and streaming responses back to the client.
This approach minimizes idle time and optimizes resource utilization, making it ideal for high-performance applications.

Install ollama and run the server
```shell
ollama run llama3.2
```

<p align="right">(<a href="#readme-top">back to top</a>)</p>

Expand Down Expand Up @@ -215,6 +234,7 @@ I've included a few of my favorites to kick things off!
- **[DEC 16 2024]** bump project to Python 3.13 :fast_forward:
- **[JAN 28 2025]** add SMTP setup :email:
- **[MAR 8 2025]** switch from poetry to uv :fast_forward:
- **[MAY 3 2025]** add large language model integration :robot:

<p align="right">(<a href="#readme-top">back to top</a>)</p>

Expand Down
16 changes: 16 additions & 0 deletions app/api/ml.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from typing import Annotated

from fastapi import APIRouter, Depends, Form
from fastapi.responses import StreamingResponse

from app.services.llm import get_llm_service
from app.utils.logging import AppLogger

logger = AppLogger().get_logger()

router = APIRouter()


@router.post("/chat/")
async def chat(prompt: Annotated[str, Form()], llm_service=Depends(get_llm_service)):
return StreamingResponse(llm_service.stream_chat(prompt), media_type="text/plain")
4 changes: 3 additions & 1 deletion app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from fastapi import Depends, FastAPI

from app.api.health import router as health_router
from app.api.ml import router as ml_router
from app.api.nonsense import router as nonsense_router
from app.api.shakespeare import router as shakespeare_router
from app.api.stuff import router as stuff_router
Expand Down Expand Up @@ -45,12 +46,13 @@ async def lifespan(_app: FastAPI):
await _app.postgres_pool.close()


app = FastAPI(title="Stuff And Nonsense API", version="0.17", lifespan=lifespan)
app = FastAPI(title="Stuff And Nonsense API", version="0.18.0", lifespan=lifespan)

app.include_router(stuff_router)
app.include_router(nonsense_router)
app.include_router(shakespeare_router)
app.include_router(user_router)
app.include_router(ml_router, prefix="/v1/ml", tags=["ML"])


app.include_router(health_router, prefix="/v1/public/health", tags=["Health, Public"])
Expand Down
52 changes: 52 additions & 0 deletions app/services/llm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from collections.abc import AsyncGenerator

import httpx
import orjson


class StreamLLMService:
def __init__(self, base_url: str = "http://localhost:11434/v1"):
self.base_url = base_url
self.model = "llama3.2"

async def stream_chat(self, prompt: str) -> AsyncGenerator[bytes]:
"""Stream chat completion responses from LLM."""
# Send the user a message first
user_msg = {
"role": "user",
"content": prompt,
}
yield orjson.dumps(user_msg) + b"\n"

# Open client as context manager and stream responses
async with httpx.AsyncClient(base_url=self.base_url) as client:
async with client.stream(
"POST",
"/chat/completions",
json={
"model": self.model,
"messages": [{"role": "user", "content": prompt}],
"stream": True,
},
timeout=60.0,
) as response:
async for line in response.aiter_lines():
if line.startswith("data: ") and line != "data: [DONE]":
try:
json_line = line[6:] # Remove "data: " prefix
data = orjson.loads(json_line)
content = (
data.get("choices", [{}])[0]
.get("delta", {})
.get("content", "")
)
if content:
model_msg = {"role": "model", "content": content}
yield orjson.dumps(model_msg) + b"\n"
except Exception:
pass
Comment on lines +46 to +47
Copy link
Preview

Copilot AI May 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid silently passing exceptions using a bare 'except Exception:' block. Consider logging the error details or handling the exception explicitly to aid in debugging.

Suggested change
except Exception:
pass
except Exception as e:
logging.exception("Error processing streamed line: %s", line)

Copilot uses AI. Check for mistakes.



# FastAPI dependency
def get_llm_service(base_url: str | None = None) -> StreamLLMService:
return StreamLLMService(base_url=base_url or "http://localhost:11434/v1")
3 changes: 3 additions & 0 deletions compose.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
services:
app:
container_name: fsap_app
network_mode: host
build: .
env_file:
- .env
Expand All @@ -22,6 +23,7 @@ services:

db:
container_name: fsap_db
network_mode: host
build:
context: ./db
dockerfile: Dockerfile
Expand All @@ -46,6 +48,7 @@ services:

inmemory:
image: redis:latest
network_mode: host
container_name: fsap_inmemory
ports:
- "6379:6379"
Expand Down
40 changes: 20 additions & 20 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,45 +1,45 @@
[project]
name = "fastapi-sqlalchemy-asyncpg"
version = "0.1.0"
version = "0.18.0"
description = "A modern FastAPI application with SQLAlchemy 2.0 and AsyncPG for high-performance async database operations. Features include JWT authentication with Redis token storage, password hashing, connection pooling, data processing with Polars, Rich logging, task scheduling with APScheduler, and Shakespeare datasets integration."
readme = "README.md"
requires-python = ">=3.13"
dependencies = [
"fastapi[all]>=0.115.11",
"pydantic[email]>=2.10.6",
"pydantic-settings>=2.8.1",
"sqlalchemy>=2.0.38",
"uvicorn[standard]>=0.34.0",
"fastapi[all]>=0.115.12",
"pydantic[email]>=2.11.4",
"pydantic-settings>=2.9.1",
"sqlalchemy>=2.0.40",
"uvicorn[standard]>=0.34.2",
"asyncpg>=0.30.0",
"alembic>=1.15.1",
"alembic>=1.15.2",
"httpx>=0.28.1",
"pytest>=8.3.5",
"pytest-cov>=6.0.0",
"pytest-cov>=6.1.1",
"uvloop>=0.21.0",
"httptools>=0.6.4",
"rich>=13.9.4",
"rich>=14.0.0",
"pyjwt>=2.10.1",
"redis>=5.2.1",
"redis>=6.0.0",
"bcrypt>=4.3.0",
"polars>=1.24.0",
"polars>=1.29.0",
"python-multipart>=0.0.20",
"fastexcel>=0.13.0",
"inline-snapshot>=0.17.0",
"dirty-equals>=0.8.0",
"polyfactory>=2.18.1",
"granian>=1.7.0",
"apscheduler[redis,sqlalchemy]>=4.0.0a5",
"fastexcel>=0.14.0",
"inline-snapshot>=0.23.0",
"dirty-equals>=0.9.0",
"polyfactory>=2.21.0",
"granian>=2.2.5",
"apscheduler[redis,sqlalchemy]>=4.0.0a6",
]

[tool.uv]
dev-dependencies = [
"ruff>=0.9.10",
"ruff>=0.11.8",
"devtools[pygments]>=0.12.2",
"pyupgrade>=3.19.1",
"ipython>=9.0.2",
"ipython>=9.2.0",
"sqlacodegen>=3.0.0",
"tryceratops>=2.4.1",
"locust>=2.33.0"
"locust>=2.36.2"

]

Expand Down
32 changes: 32 additions & 0 deletions tests/chat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import anyio
import httpx
import orjson


async def chat_with_endpoint():
async with httpx.AsyncClient() as client:
while True:
# Get user input
prompt = input("\nYou: ")
Copy link
Preview

Copilot AI May 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using the synchronous input() call inside an async function may block the event loop. Consider using an asynchronous input strategy or executing the blocking call in a separate thread to avoid potential performance issues.

Suggested change
prompt = input("\nYou: ")
prompt = await anyio.to_thread.run_sync(input, "\nYou: ")

Copilot uses AI. Check for mistakes.

if prompt.lower() == "exit":
break

# Send request to the API
print("\nModel: ", end="", flush=True)
async with client.stream(
"POST",
"http://0.0.0.0:8080/v1/ml/chat/",
data={"prompt": prompt},
timeout=60,
) as response:
async for chunk in response.aiter_lines():
if chunk:
try:
data = orjson.loads(chunk)
print(data["content"], end="", flush=True)
except Exception as e:
print(f"\nError parsing chunk: {e}")


if __name__ == "__main__":
anyio.run(chat_with_endpoint)
Loading