Skip to content

Move get blob to pageable #850

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Jun 27, 2022
  •  
  •  
  •  
10 changes: 6 additions & 4 deletions sdk/core/src/pageable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,12 @@ where
r#try!(request.await)
}
State::Continuation(token) => {
let request = make_request(Some(Continuation::new(token)));
let request = make_request(Some(token));
r#try!(request.await)
}
State::Done => return None,
State::Done => {
return None;
}
};

let next_state = response
Expand Down Expand Up @@ -82,12 +84,12 @@ impl<T, O> std::fmt::Debug for Pageable<T, O> {

/// A type that can yield an optional continuation token
pub trait Continuable {
fn continuation(&self) -> Option<String>;
fn continuation(&self) -> Option<Continuation>;
Comment on lines 86 to +87
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do any endpoints allow for multiple types of continuations? It seems like right now the Continuation header with a continuation token is the happy path and we're using panics to ensure that we don't use a Continuation::Range when we don't meant time.

What if we were to trying the following;

pub trait Continuable {
   type Continuation: crate::AsHeaders;
    fn continuation(&self) -> Option<Self::Continuation>;
}

The make_request function then becomes impl Fn(Option<T::Continuation>) -> F + Clone + 'static + Send

This would make the system more robust. Every continuable response just needs to provide the headers it uses for specifying how it continues.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Examples of different Continuation methods:

  • BlobClient.get uses Range header
  • Cosmos's list_attachments uses Continuation header
  • ContainerClient.list_blobs uses marker URI parameter
  • datalake's file_systems_list uses maker URI parameter. Note, this one also accepts next_marker from the builder, which probably shouldn't be available.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok so we couldn't do type Continuation: crate::AsHeaders but we could do type Continuation: crate::AsContinuation and implement AsContinuation for all the appropriate types.

}

#[derive(Debug, Clone, PartialEq)]
enum State {
Init,
Continuation(String),
Continuation(Continuation),
Done,
}
53 changes: 44 additions & 9 deletions sdk/core/src/request_options/continuation.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,50 @@
use crate::{headers, Header};
use crate::{headers, request_options::NextMarker, request_options::Range, Header};
use std::ops::Range as StdRange;

#[derive(Debug, Clone)]
pub struct Continuation(String);
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Continuation {
String(String),
Range(StdRange<u64>),
}

impl Continuation {
pub fn new(c: String) -> Self {
Self(c)
impl From<NextMarker> for Continuation {
fn from(next_marker: NextMarker) -> Self {
Continuation::String(next_marker.as_str().to_string())
}
}

impl From<&str> for Continuation {
fn from(value: &str) -> Self {
Continuation::String(value.to_string())
}
}

impl From<String> for Continuation {
fn from(value: String) -> Self {
Continuation::String(value)
}
}

impl From<StdRange<u64>> for Continuation {
fn from(value: StdRange<u64>) -> Self {
Continuation::Range(value)
}
}

pub fn into_raw(self) -> String {
self.0
impl From<Range> for Continuation {
fn from(value: Range) -> Self {
Continuation::Range(value.start..value.end)
}
}

impl Continuation {
pub fn as_string(&self) -> String {
match self {
Self::String(c) => c.clone(),
Self::Range(_) => {
panic!("unable to convert Continuation::Range to string")
}
}
}
}

Expand All @@ -19,6 +54,6 @@ impl Header for Continuation {
}

fn value(&self) -> headers::HeaderValue {
self.0.to_owned().into()
self.as_string().into()
}
}
2 changes: 1 addition & 1 deletion sdk/core/src/request_options/next_marker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ impl AppendToUrlQuery for NextMarker {

impl From<Continuation> for NextMarker {
fn from(next_marker: Continuation) -> Self {
Self::new(next_marker.into_raw())
Self::new(next_marker.as_string())
}
}

Expand Down
4 changes: 2 additions & 2 deletions sdk/data_cosmos/src/operations/list_attachments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ impl ListAttachmentsResponse {
}

impl Continuable for ListAttachmentsResponse {
fn continuation(&self) -> Option<String> {
self.continuation_token.clone()
fn continuation(&self) -> Option<Continuation> {
self.continuation_token.clone().map(Continuation::from)
}
}
4 changes: 2 additions & 2 deletions sdk/data_cosmos/src/operations/list_collections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ impl ListCollectionsResponse {
}

impl Continuable for ListCollectionsResponse {
fn continuation(&self) -> Option<String> {
self.continuation_token.clone()
fn continuation(&self) -> Option<Continuation> {
self.continuation_token.clone().map(Continuation::from)
}
}
4 changes: 2 additions & 2 deletions sdk/data_cosmos/src/operations/list_databases.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ impl ListDatabasesResponse {
}

impl Continuable for ListDatabasesResponse {
fn continuation(&self) -> Option<String> {
self.continuation_token.clone()
fn continuation(&self) -> Option<Continuation> {
self.continuation_token.clone().map(Continuation::from)
}
}

Expand Down
4 changes: 2 additions & 2 deletions sdk/data_cosmos/src/operations/list_documents.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,8 @@ where
}

impl<T> Continuable for ListDocumentsResponse<T> {
fn continuation(&self) -> Option<String> {
self.continuation_token.clone()
fn continuation(&self) -> Option<Continuation> {
self.continuation_token.clone().map(Continuation::from)
}
}

Expand Down
4 changes: 2 additions & 2 deletions sdk/data_cosmos/src/operations/list_permissions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ impl ListPermissionsResponse {
}

impl Continuable for ListPermissionsResponse {
fn continuation(&self) -> Option<String> {
self.continuation_token.clone()
fn continuation(&self) -> Option<Continuation> {
self.continuation_token.clone().map(Continuation::from)
}
}
4 changes: 2 additions & 2 deletions sdk/data_cosmos/src/operations/list_stored_procedures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ impl ListStoredProceduresResponse {
}

impl Continuable for ListStoredProceduresResponse {
fn continuation(&self) -> Option<String> {
self.continuation_token.clone()
fn continuation(&self) -> Option<Continuation> {
self.continuation_token.clone().map(Continuation::from)
}
}
4 changes: 2 additions & 2 deletions sdk/data_cosmos/src/operations/list_triggers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ impl ListTriggersResponse {
}

impl Continuable for ListTriggersResponse {
fn continuation(&self) -> Option<String> {
self.continuation_token.clone()
fn continuation(&self) -> Option<Continuation> {
self.continuation_token.clone().map(Continuation::from)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ impl ListUserDefinedFunctionsResponse {
}

impl Continuable for ListUserDefinedFunctionsResponse {
fn continuation(&self) -> Option<String> {
self.continuation_token.clone()
fn continuation(&self) -> Option<Continuation> {
self.continuation_token.clone().map(Continuation::from)
}
}
4 changes: 2 additions & 2 deletions sdk/data_cosmos/src/operations/list_users.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ impl IntoIterator for ListUsersResponse {
}

impl Continuable for ListUsersResponse {
fn continuation(&self) -> Option<String> {
self.continuation_token.clone()
fn continuation(&self) -> Option<Continuation> {
self.continuation_token.clone().map(Continuation::from)
}
}
4 changes: 2 additions & 2 deletions sdk/data_cosmos/src/operations/query_documents.rs
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ impl<T> std::convert::TryFrom<QueryDocumentsResponse<T>> for QueryDocumentsRespo
}
}
impl<T> Continuable for QueryDocumentsResponse<T> {
fn continuation(&self) -> Option<String> {
self.continuation_token.clone()
fn continuation(&self) -> Option<Continuation> {
self.continuation_token.clone().map(Continuation::from)
}
}
2 changes: 1 addition & 1 deletion sdk/iot_hub/src/service/requests/query_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ impl<'a> QueryBuilder<'a> {
}

azure_core::setters! {
continuation: String => Some(Continuation::new(continuation)),
continuation: String => Some(Continuation::String(continuation)),
max_item_count: i32 => MaxItemCount::new(max_item_count),
}

Expand Down
7 changes: 5 additions & 2 deletions sdk/storage_blobs/examples/blob_00.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ async fn main() -> azure_core::Result<()> {

// this is a single call that retrieves the first 1KB of the blob (or less if the blob is
// smaller). The range(...) call is optional.
let response = Box::pin(blob_client.get().range(0u64..1024).stream(1024))
let response = blob_client
.get()
.range(0u64..1024)
.into_stream()
.next()
.await
.expect("stream failed")?;
Expand All @@ -47,7 +50,7 @@ async fn main() -> azure_core::Result<()> {
let mut complete_response = vec![];
// this is how you stream a blob. You can specify the range(...) value as above if necessary.
// In this case we are retrieving the whole blob in 8KB chunks.
let mut stream = Box::pin(blob_client.get().stream(1024 * 8));
let mut stream = blob_client.get().chunk_size(0x2000u64).into_stream();
while let Some(value) = stream.next().await {
let data = value?.data;
println!("received {:?} bytes", data.len());
Expand Down
6 changes: 4 additions & 2 deletions sdk/storage_blobs/examples/blob_01.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ async fn main() -> azure_core::Result<()> {
let container_client = storage_client.container_client(&container_name);
let blob_client = container_client.blob_client("SorgeniaReorganizeRebuildIndexes.zip");

// only get the first 8k chunk
let result = Box::pin(blob_client.get().stream(1024 * 8))
// only get the first chunk
let result = blob_client
.get()
.into_stream()
.next()
.await
.expect("stream failed")?;
Expand Down
18 changes: 14 additions & 4 deletions sdk/storage_blobs/examples/blob_range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@ async fn main() -> azure_core::Result<()> {

assert_eq!(blob.len(), buf.len());

let chunk0 = Box::pin(blob_client.get().range(0u64..1024).stream(1024))
let chunk0 = blob_client
.get()
.range(0u64..1024)
.into_stream()
.next()
.await
.expect("stream failed")?;
Expand All @@ -50,7 +53,10 @@ async fn main() -> azure_core::Result<()> {
assert_eq!(chunk0.data[i], 71);
}

let chunk1 = Box::pin(blob_client.get().range(1024u64..1536).stream(1024))
let chunk1 = blob_client
.get()
.range(1024u64..1536)
.into_stream()
.next()
.await
.expect("stream failed")?;
Expand All @@ -63,7 +69,11 @@ async fn main() -> azure_core::Result<()> {
// this time, only download them in chunks of 10 bytes
let mut chunk2 = vec![];

let mut stream = Box::pin(blob_client.get().range(1536u64..3584).stream(10));
let mut stream = blob_client
.get()
.range(1536u64..3584)
.chunk_size(10u64)
.into_stream();
while let Some(result) = stream.next().await {
chunk2.extend(result?.data);
}
Expand All @@ -72,7 +82,7 @@ async fn main() -> azure_core::Result<()> {
assert_eq!(chunk2[i], 73);
}

let mut stream = Box::pin(blob_client.get().stream(512));
let mut stream = blob_client.get().chunk_size(512u64).into_stream();

println!("\nStreaming");
let mut chunk: usize = 0;
Expand Down
26 changes: 5 additions & 21 deletions sdk/storage_blobs/examples/list_containers2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,38 +24,22 @@ async fn main() -> azure_core::Result<()> {
.storage_client();
let blob_service_client = storage_client.blob_service_client();

let response = storage_client
.container_client("azuresdkforrust")
.list_blobs()
.into_stream()
.next()
.await
.expect("stream failed")?;

println!("key response = {:#?}", response);

let response = blob_service_client
.list_containers()
.into_stream()
.next()
.await
.expect("stream failed")?;
println!("key response = {:#?}", response);
println!("response = {:#?}", response);

// let's test a SAS token
// the code is identical
// once instantiated
let sas_token = "?sv=2019-12-12&ss=bfqt&srt=sco&sp=rwdlacupx&se=2020-12-05T20:20:58Z&st=2020-12-05T12:20:58Z&spr=https&sig=vxUuKjQW4%2FmB884f%2BdqCp4h3O%2BYuYgIJN8RVGHFVFpY%3D";
let blob_service_client =
StorageAccountClient::new_sas_token(http_client.clone(), &account, sas_token)?
.blob_service_client();
let response = blob_service_client
.list_containers()
let response = storage_client
.container_client("$logs")
.list_blobs()
.into_stream()
.next()
.await
.expect("stream failed")?;
println!("sas response = {:#?}", response);
println!("response = {:#?}", response);

Ok(())
}
23 changes: 10 additions & 13 deletions sdk/storage_blobs/examples/list_containers_and_blobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,18 @@ use futures::StreamExt;

#[tokio::main]
async fn main() -> azure_core::Result<()> {
env_logger::init();
// First we retrieve the account name, container and blob name from command line args

let account = std::env::args()
.nth(1)
.expect("please specify the account name as first command line parameter");

let account_key =
std::env::var("STORAGE_ACCOUNT_KEY").expect("Set env variable STORAGE_ACCOUNT_KEY first!");
// First we retrieve the account name and access key from environment variables.
let account =
std::env::var("STORAGE_ACCOUNT").expect("Set env variable STORAGE_ACCOUNT first!");
let access_key =
std::env::var("STORAGE_ACCESS_KEY").expect("Set env variable STORAGE_ACCESS_KEY first!");

let http_client = azure_core::new_http_client();
let storage_account_client =
StorageAccountClient::new_access_key(http_client, &account, &account_key);

let blob_service_client = storage_account_client.blob_service_client();
let storage_client =
StorageAccountClient::new_access_key(http_client.clone(), &account, &access_key)
.storage_client();
let blob_service_client = storage_client.blob_service_client();

let mut stream = blob_service_client.list_containers().into_stream();

Expand All @@ -27,7 +24,7 @@ async fn main() -> azure_core::Result<()> {
for container in entry.containers {
println!("container: {}", container.name);

let container_client = storage_account_client.container_client(container.name);
let container_client = storage_client.container_client(container.name);

let mut blob_stream = container_client.list_blobs().into_stream();
while let Some(blob_entry) = blob_stream.next().await {
Expand Down
Loading