Skip to content

Commit 3808c4f

Browse files
authored
Sync upstream spec - all APIs implemented (#228)
* types updated from spec * fix compilation errors in examples * add assistants streaming APIs * updated readme * updated readme * updates from spec * MessageContentInput for CreateMessageRequest * update message content types * updated spec * update crate dependencies * update examples dependencies * cleanup types * add file search assistants example * cleanup * updated types * message delta types to have not have explicty 'type' field, instead its part of enum variant * retrieve file contents as Bytes instead of string * fix types * add code interpreter assistant example * updated examples and their data files * fix chat types for ser-de * update step types for ser-de * assistant stream event non_exhaustive * update lib doc * support for assitant streaming * assistant function call with streaming example (partially complete) * update type * working assistants-fun-call-stream * cargo test fix * update assistant example * cargo fix * cargo fmt * better message * fix
1 parent 6d01594 commit 3808c4f

File tree

62 files changed

+15442
-13470
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

62 files changed

+15442
-13470
lines changed

async-openai/Cargo.toml

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,7 @@
11
[package]
22
name = "async-openai"
33
version = "0.21.0"
4-
authors = [
5-
"Himanshu Neema"
6-
]
4+
authors = ["Himanshu Neema"]
75
categories = ["api-bindings", "web-programming", "asynchronous"]
86
keywords = ["openai", "async", "openapi", "ai"]
97
description = "Rust library for OpenAI"
@@ -26,23 +24,28 @@ native-tls = ["reqwest/native-tls"]
2624
native-tls-vendored = ["reqwest/native-tls-vendored"]
2725

2826
[dependencies]
29-
backoff = {version = "0.4.0", features = ["tokio"] }
30-
base64 = "0.22.0"
31-
futures = "0.3.26"
27+
backoff = { version = "0.4.0", features = ["tokio"] }
28+
base64 = "0.22.1"
29+
futures = "0.3.30"
3230
rand = "0.8.5"
33-
reqwest = { version = "0.12.0", features = ["json", "stream", "multipart"],default-features = false }
31+
reqwest = { version = "0.12.4", features = [
32+
"json",
33+
"stream",
34+
"multipart",
35+
], default-features = false }
3436
reqwest-eventsource = "0.6.0"
35-
serde = { version = "1.0.152", features = ["derive", "rc"] }
36-
serde_json = "1.0.93"
37-
thiserror = "1.0.38"
38-
tokio = { version = "1.25.0", features = ["fs", "macros"] }
39-
tokio-stream = "0.1.11"
40-
tokio-util = { version = "0.7.7", features = ["codec", "io-util"] }
41-
tracing = "0.1.37"
37+
serde = { version = "1.0.203", features = ["derive", "rc"] }
38+
serde_json = "1.0.117"
39+
thiserror = "1.0.61"
40+
tokio = { version = "1.38.0", features = ["fs", "macros"] }
41+
tokio-stream = "0.1.15"
42+
tokio-util = { version = "0.7.11", features = ["codec", "io-util"] }
43+
tracing = "0.1.40"
4244
derive_builder = "0.20.0"
4345
async-convert = "1.0.0"
44-
secrecy = { version = "0.8.0", features=["serde"] }
45-
bytes = "1.5.0"
46+
secrecy = { version = "0.8.0", features = ["serde"] }
47+
bytes = "1.6.0"
48+
eventsource-stream = "0.2.3"
4649

4750
[dev-dependencies]
48-
tokio-test = "0.4.2"
51+
tokio-test = "0.4.4"

async-openai/README.md

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,7 @@
2323

2424
- It's based on [OpenAI OpenAPI spec](https://github.com/openai/openai-openapi)
2525
- Current features:
26-
- [x] Assistants v2
27-
- [ ] Assistants v2 streaming
26+
- [x] Assistants (v2)
2827
- [x] Audio
2928
- [x] Batch
3029
- [x] Chat
@@ -33,14 +32,12 @@
3332
- [x] Files
3433
- [x] Fine-Tuning
3534
- [x] Images
36-
- [x] Microsoft Azure OpenAI Service
3735
- [x] Models
3836
- [x] Moderations
39-
- Support SSE streaming on available APIs
40-
- All requests including form submissions (except SSE streaming) are retried with exponential backoff when [rate limited](https://platform.openai.com/docs/guides/rate-limits) by the API server.
37+
- SSE streaming on all available APIs
38+
- Requests (except SSE streaming) including form submissions are retried with exponential backoff when [rate limited](https://platform.openai.com/docs/guides/rate-limits).
4139
- Ergonomic builder pattern for all request objects.
42-
43-
**Note on Azure OpenAI Service (AOS)**: `async-openai` primarily implements OpenAI spec, and doesn't try to maintain parity with spec of AOS.
40+
- Microsoft Azure OpenAI Service (only APIs matching OpenAI spec)
4441

4542
## Usage
4643

@@ -95,7 +92,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
9592
Ok(())
9693
}
9794
```
98-
95+
`
9996
<div align="center">
10097
<img width="315" src="https://raw.githubusercontent.com/64bit/async-openai/assets/create-image/img-1.png" />
10198
<img width="315" src="https://raw.githubusercontent.com/64bit/async-openai/assets/create-image/img-2.png" />

async-openai/src/batches.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ impl<'c, C: Config> Batches<'c, C> {
3737
self.client.get(&format!("/batches/{batch_id}")).await
3838
}
3939

40-
/// Cancels an in-progress batch.
40+
/// Cancels an in-progress batch. The batch will be in status `cancelling` for up to 10 minutes, before changing to `cancelled`, where it will have partial results (if any) available in the output file.
4141
pub async fn cancel(&self, batch_id: &str) -> Result<Batch, OpenAIError> {
4242
self.client
4343
.post(

async-openai/src/client.rs

Lines changed: 85 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,20 @@ impl<C: Config> Client<C> {
196196
self.execute(request_maker).await
197197
}
198198

199+
/// Make a GET request to {path} and return the response body
200+
pub(crate) async fn get_raw(&self, path: &str) -> Result<Bytes, OpenAIError> {
201+
let request_maker = || async {
202+
Ok(self
203+
.http_client
204+
.get(self.config.url(path))
205+
.query(&self.config.query())
206+
.headers(self.config.headers())
207+
.build()?)
208+
};
209+
210+
self.execute_raw(request_maker).await
211+
}
212+
199213
/// Make a POST request to {path} and return the response body
200214
pub(crate) async fn post_raw<I>(&self, path: &str, request: I) -> Result<Bytes, OpenAIError>
201215
where
@@ -369,8 +383,30 @@ impl<C: Config> Client<C> {
369383
stream(event_source).await
370384
}
371385

386+
pub(crate) async fn post_stream_mapped_raw_events<I, O>(
387+
&self,
388+
path: &str,
389+
request: I,
390+
event_mapper: impl Fn(eventsource_stream::Event) -> Result<O, OpenAIError> + Send + 'static,
391+
) -> Pin<Box<dyn Stream<Item = Result<O, OpenAIError>> + Send>>
392+
where
393+
I: Serialize,
394+
O: DeserializeOwned + std::marker::Send + 'static,
395+
{
396+
let event_source = self
397+
.http_client
398+
.post(self.config.url(path))
399+
.query(&self.config.query())
400+
.headers(self.config.headers())
401+
.json(&request)
402+
.eventsource()
403+
.unwrap();
404+
405+
stream_mapped_raw_events(event_source, event_mapper).await
406+
}
407+
372408
/// Make HTTP GET request to receive SSE
373-
pub(crate) async fn get_stream<Q, O>(
409+
pub(crate) async fn _get_stream<Q, O>(
374410
&self,
375411
path: &str,
376412
query: &Q,
@@ -437,3 +473,51 @@ where
437473

438474
Box::pin(tokio_stream::wrappers::UnboundedReceiverStream::new(rx))
439475
}
476+
477+
pub(crate) async fn stream_mapped_raw_events<O>(
478+
mut event_source: EventSource,
479+
event_mapper: impl Fn(eventsource_stream::Event) -> Result<O, OpenAIError> + Send + 'static,
480+
) -> Pin<Box<dyn Stream<Item = Result<O, OpenAIError>> + Send>>
481+
where
482+
O: DeserializeOwned + std::marker::Send + 'static,
483+
{
484+
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
485+
486+
tokio::spawn(async move {
487+
while let Some(ev) = event_source.next().await {
488+
match ev {
489+
Err(e) => {
490+
if let Err(_e) = tx.send(Err(OpenAIError::StreamError(e.to_string()))) {
491+
// rx dropped
492+
break;
493+
}
494+
}
495+
Ok(event) => match event {
496+
Event::Message(message) => {
497+
let mut done = false;
498+
499+
if message.data == "[DONE]" {
500+
done = true;
501+
}
502+
503+
let response = event_mapper(message);
504+
505+
if let Err(_e) = tx.send(response) {
506+
// rx dropped
507+
break;
508+
}
509+
510+
if done {
511+
break;
512+
}
513+
}
514+
Event::Open => continue,
515+
},
516+
}
517+
}
518+
519+
event_source.close();
520+
});
521+
522+
Box::pin(tokio_stream::wrappers::UnboundedReceiverStream::new(rx))
523+
}

async-openai/src/error.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ pub enum OpenAIError {
2828
}
2929

3030
/// OpenAI API returns error object on failure
31-
#[derive(Debug, Deserialize)]
31+
#[derive(Debug, Deserialize, Clone)]
3232
pub struct ApiError {
3333
pub message: String,
3434
pub r#type: Option<String>,

async-openai/src/file.rs

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use bytes::Bytes;
12
use serde::Serialize;
23

34
use crate::{
@@ -17,9 +18,13 @@ impl<'c, C: Config> Files<'c, C> {
1718
Self { client }
1819
}
1920

20-
/// Upload a file that can be used across various endpoints. The size of all the files uploaded by one organization can be up to 100 GB.
21+
/// Upload a file that can be used across various endpoints. Individual files can be up to 512 MB, and the size of all files uploaded by one organization can be up to 100 GB.
2122
///
22-
/// The size of individual files can be a maximum of 512 MB or 2 million tokens for Assistants. See the [Assistants Tools guide](https://platform.openai.com/docs/assistants/tools) to learn more about the types of files supported. The Fine-tuning API only supports `.jsonl` files.
23+
/// The Assistants API supports files up to 2 million tokens and of specific file types. See the [Assistants Tools guide](https://platform.openai.com/docs/assistants/tools) for details.
24+
///
25+
/// The Fine-tuning API only supports `.jsonl` files. The input also has certain required formats for fine-tuning [chat](https://platform.openai.com/docs/api-reference/fine-tuning/chat-input) or [completions](https://platform.openai.com/docs/api-reference/fine-tuning/completions-input) models.
26+
///
27+
///The Batch API only supports `.jsonl` files up to 100 MB in size. The input also has a specific required [format](https://platform.openai.com/docs/api-reference/batch/request-input).
2328
///
2429
/// Please [contact us](https://help.openai.com/) if you need to increase these storage limits.
2530
pub async fn create(&self, request: CreateFileRequest) -> Result<OpenAIFile, OpenAIError> {
@@ -47,16 +52,19 @@ impl<'c, C: Config> Files<'c, C> {
4752
}
4853

4954
/// Returns the contents of the specified file
50-
pub async fn retrieve_content(&self, file_id: &str) -> Result<String, OpenAIError> {
55+
pub async fn content(&self, file_id: &str) -> Result<Bytes, OpenAIError> {
5156
self.client
52-
.get(format!("/files/{file_id}/content").as_str())
57+
.get_raw(format!("/files/{file_id}/content").as_str())
5358
.await
5459
}
5560
}
5661

5762
#[cfg(test)]
5863
mod tests {
59-
use crate::{types::CreateFileRequestArgs, Client};
64+
use crate::{
65+
types::{CreateFileRequestArgs, FilePurpose},
66+
Client,
67+
};
6068

6169
#[tokio::test]
6270
async fn test_file_mod() {
@@ -72,7 +80,7 @@ mod tests {
7280

7381
let request = CreateFileRequestArgs::default()
7482
.file(test_file_path)
75-
.purpose("fine-tune")
83+
.purpose(FilePurpose::FineTune)
7684
.build()
7785
.unwrap();
7886

async-openai/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,8 @@
3838
//!
3939
//! let client = Client::with_config(config);
4040
//!
41-
//! // Note that Azure OpenAI service does not support all APIs and `async-openai`
42-
//! // doesn't restrict and still allows calls to all of the APIs as OpenAI.
41+
//! // Note that `async-openai` only implements OpenAI spec
42+
//! // and doesn't maintain parity with the spec of Azure OpenAI service.
4343
//!
4444
//! ```
4545
//!

async-openai/src/runs.rs

Lines changed: 51 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@ use crate::{
55
error::OpenAIError,
66
steps::Steps,
77
types::{
8-
CreateRunRequest, ListRunsResponse, ModifyRunRequest, RunObject,
9-
SubmitToolOutputsRunRequest,
8+
AssistantEventStream, AssistantStreamEvent, CreateRunRequest, ListRunsResponse,
9+
ModifyRunRequest, RunObject, SubmitToolOutputsRunRequest,
1010
},
1111
Client,
1212
};
@@ -39,6 +39,29 @@ impl<'c, C: Config> Runs<'c, C> {
3939
.await
4040
}
4141

42+
/// Create a run.
43+
pub async fn create_stream(
44+
&self,
45+
mut request: CreateRunRequest,
46+
) -> Result<AssistantEventStream, OpenAIError> {
47+
if request.stream.is_some() && !request.stream.unwrap() {
48+
return Err(OpenAIError::InvalidArgument(
49+
"When stream is false, use Runs::create".into(),
50+
));
51+
}
52+
53+
request.stream = Some(true);
54+
55+
Ok(self
56+
.client
57+
.post_stream_mapped_raw_events(
58+
&format!("/threads/{}/runs", self.thread_id),
59+
request,
60+
AssistantStreamEvent::try_from,
61+
)
62+
.await)
63+
}
64+
4265
/// Retrieves a run.
4366
pub async fn retrieve(&self, run_id: &str) -> Result<RunObject, OpenAIError> {
4467
self.client
@@ -87,6 +110,32 @@ impl<'c, C: Config> Runs<'c, C> {
87110
.await
88111
}
89112

113+
pub async fn submit_tool_outputs_stream(
114+
&self,
115+
run_id: &str,
116+
mut request: SubmitToolOutputsRunRequest,
117+
) -> Result<AssistantEventStream, OpenAIError> {
118+
if request.stream.is_some() && !request.stream.unwrap() {
119+
return Err(OpenAIError::InvalidArgument(
120+
"When stream is false, use Runs::submit_tool_outputs".into(),
121+
));
122+
}
123+
124+
request.stream = Some(true);
125+
126+
Ok(self
127+
.client
128+
.post_stream_mapped_raw_events(
129+
&format!(
130+
"/threads/{}/runs/{run_id}/submit_tool_outputs",
131+
self.thread_id
132+
),
133+
request,
134+
AssistantStreamEvent::try_from,
135+
)
136+
.await)
137+
}
138+
90139
/// Cancels a run that is `in_progress`
91140
pub async fn cancel(&self, run_id: &str) -> Result<RunObject, OpenAIError> {
92141
self.client

async-openai/src/threads.rs

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@ use crate::{
22
config::Config,
33
error::OpenAIError,
44
types::{
5-
CreateThreadAndRunRequest, CreateThreadRequest, DeleteThreadResponse, ModifyThreadRequest,
6-
RunObject, ThreadObject,
5+
AssistantEventStream, AssistantStreamEvent, CreateThreadAndRunRequest, CreateThreadRequest,
6+
DeleteThreadResponse, ModifyThreadRequest, RunObject, ThreadObject,
77
},
88
Client, Messages, Runs,
99
};
@@ -38,6 +38,25 @@ impl<'c, C: Config> Threads<'c, C> {
3838
self.client.post("/threads/runs", request).await
3939
}
4040

41+
/// Create a thread and run it in one request (streaming).
42+
pub async fn create_and_run_stream(
43+
&self,
44+
mut request: CreateThreadAndRunRequest,
45+
) -> Result<AssistantEventStream, OpenAIError> {
46+
if request.stream.is_some() && !request.stream.unwrap() {
47+
return Err(OpenAIError::InvalidArgument(
48+
"When stream is false, use Threads::create_and_run".into(),
49+
));
50+
}
51+
52+
request.stream = Some(true);
53+
54+
Ok(self
55+
.client
56+
.post_stream_mapped_raw_events("/threads/runs", request, AssistantStreamEvent::try_from)
57+
.await)
58+
}
59+
4160
/// Create a thread.
4261
pub async fn create(&self, request: CreateThreadRequest) -> Result<ThreadObject, OpenAIError> {
4362
self.client.post("/threads", request).await

0 commit comments

Comments
 (0)