diff --git a/sdk/storage/src/core/clients/storage_account_client.rs b/sdk/storage/src/core/clients/storage_account_client.rs index 2e53fade1a..18ef73ece5 100644 --- a/sdk/storage/src/core/clients/storage_account_client.rs +++ b/sdk/storage/src/core/clients/storage_account_client.rs @@ -54,7 +54,7 @@ impl std::fmt::Debug for StorageCredentials { #[derive(Debug, Clone, Copy)] pub enum ServiceType { Blob, - // Queue, + Queue, // File, Table, } diff --git a/sdk/storage_queues/Cargo.toml b/sdk/storage_queues/Cargo.toml index fe25dc7a26..249013081f 100644 --- a/sdk/storage_queues/Cargo.toml +++ b/sdk/storage_queues/Cargo.toml @@ -29,6 +29,7 @@ url = "2.2" [dev-dependencies] tokio = { version = "1.0", features = ["full"] } +uuid = { version = "1.0", features = ["v4"] } [features] default = ["enable_reqwest"] diff --git a/sdk/storage_queues/examples/delete_message.rs b/sdk/storage_queues/examples/delete_message.rs index ee4cd9520a..d8dbfd757b 100644 --- a/sdk/storage_queues/examples/delete_message.rs +++ b/sdk/storage_queues/examples/delete_message.rs @@ -30,7 +30,7 @@ async fn main() -> azure_core::Result<()> { .get_messages() .number_of_messages(2) .visibility_timeout(Duration::from_secs(5)) // the message will become visible again after 5 secs - .execute() + .into_future() .await?; println!("get_response == {:#?}", get_response); @@ -44,7 +44,7 @@ async fn main() -> azure_core::Result<()> { let delete_response = queue .pop_receipt_client(message_to_delete) .delete() - .execute() + .into_future() .await?; println!("delete_response == {:#?}", delete_response); diff --git a/sdk/storage_queues/examples/get_messages.rs b/sdk/storage_queues/examples/get_messages.rs index 5309d06dc6..c1dddb20ff 100644 --- a/sdk/storage_queues/examples/get_messages.rs +++ b/sdk/storage_queues/examples/get_messages.rs @@ -30,7 +30,7 @@ async fn main() -> azure_core::Result<()> { .get_messages() .number_of_messages(2) .visibility_timeout(Duration::from_secs(5)) // the message will become visible again after 5 secs - .execute() + .into_future() .await?; println!("response == {:#?}", response); @@ -39,7 +39,7 @@ async fn main() -> azure_core::Result<()> { .get_messages() .number_of_messages(2) .visibility_timeout(Duration::from_secs(10)) // the message will become visible again after 10 secs - .execute() + .into_future() .await?; println!("get_messages_response == {:#?}", get_messages_response); @@ -50,8 +50,11 @@ async fn main() -> azure_core::Result<()> { let pop_receipt = queue.pop_receipt_client(message_to_update); let response = pop_receipt - .update(Duration::from_secs(4)) - .execute(format!("new body at {}", chrono::Utc::now())) + .update( + format!("new body at {}", chrono::Utc::now()), + Duration::from_secs(4), + ) + .into_future() .await?; println!("response == {:#?}", response); } diff --git a/sdk/storage_queues/examples/list_queues.rs b/sdk/storage_queues/examples/list_queues.rs index c3d0dccb7f..6e9edfc734 100644 --- a/sdk/storage_queues/examples/list_queues.rs +++ b/sdk/storage_queues/examples/list_queues.rs @@ -19,13 +19,16 @@ async fn main() -> azure_core::Result<()> { let queue_service = storage_account.queue_service_client(); println!("getting service stats"); - let response = queue_service.get_queue_service_stats().execute().await?; + let response = queue_service + .get_queue_service_stats() + .into_future() + .await?; println!("get_queue_service_properties.response == {:#?}", response); println!("getting service properties"); let response = queue_service .get_queue_service_properties() - .execute() + .into_future() .await?; println!("get_queue_service_stats.response == {:#?}", response); @@ -35,17 +38,16 @@ async fn main() -> azure_core::Result<()> { .prefix("a") .include_metadata(true) .max_results(NonZeroU32::new(2u32).unwrap()) - .execute() - .await?; + .into_stream() + .next() + .await; println!("response == {:#?}", response); println!("streaming queues"); - let mut stream = Box::pin( - queue_service - .list_queues() - .max_results(NonZeroU32::new(3u32).unwrap()) - .stream(), - ); + let mut stream = queue_service + .list_queues() + .max_results(NonZeroU32::new(3u32).unwrap()) + .into_stream(); while let Some(value) = stream.next().await { let value = value?; diff --git a/sdk/storage_queues/examples/peek_messages.rs b/sdk/storage_queues/examples/peek_messages.rs index 7e3df0d0e4..9bf6033624 100644 --- a/sdk/storage_queues/examples/peek_messages.rs +++ b/sdk/storage_queues/examples/peek_messages.rs @@ -29,7 +29,7 @@ async fn main() -> azure_core::Result<()> { let response = queue .peek_messages() .number_of_messages(2) - .execute() + .into_future() .await?; println!("response == {:#?}", response); diff --git a/sdk/storage_queues/examples/put_message.rs b/sdk/storage_queues/examples/put_message.rs index 0c533db7e8..bed48bb737 100644 --- a/sdk/storage_queues/examples/put_message.rs +++ b/sdk/storage_queues/examples/put_message.rs @@ -25,9 +25,8 @@ async fn main() -> azure_core::Result<()> { trace!("putting message"); let response = queue - .put_message() - .client_request_id("optional correlation token") - .execute(format!("Azure SDK for Rust rocks! {}", chrono::Utc::now())) + .put_message(format!("Azure SDK for Rust rocks! {}", chrono::Utc::now())) + .into_future() .await?; println!("response == {:#?}", response); diff --git a/sdk/storage_queues/examples/queue_create.rs b/sdk/storage_queues/examples/queue_create.rs index 0967e9b3ba..cf072001c0 100644 --- a/sdk/storage_queues/examples/queue_create.rs +++ b/sdk/storage_queues/examples/queue_create.rs @@ -37,7 +37,11 @@ async fn main() -> azure_core::Result<()> { .as_mut() .insert("created".into(), format!("{:?}", Utc::now()).into()); - let response = queue.create().metadata(&metadata).execute().await?; + let response = queue + .create() + .metadata(metadata.clone()) + .into_future() + .await?; println!("response == {:#?}", response); // let's add some more metadata @@ -46,15 +50,15 @@ async fn main() -> azure_core::Result<()> { println!("metadata == {:#?}", metadata); - let response = queue.set_metadata().execute(&metadata).await?; + let response = queue.set_metadata(metadata).into_future().await?; println!("response == {:#?}", response); // let's get back the metadata - let response = queue.get_metadata().execute().await?; + let response = queue.get_metadata().into_future().await?; println!("response == {:#?}", response); // use two queue stored access policies - let queue_stored_acess_policies = vec![ + let policies = vec![ QueueStoredAccessPolicy::new( "first_sap_read_process", Utc::now() - Duration::hours(1), @@ -70,22 +74,15 @@ async fn main() -> azure_core::Result<()> { .enable_all(), ]; - let response = queue - .set_acl() - .execute(&queue_stored_acess_policies) - .await?; + let response = queue.set_acl(policies).into_future().await?; println!("response == {:#?}", response); // get the queue ACL - let response = queue.get_acl().execute().await?; + let response = queue.get_acl().into_future().await?; println!("response == {:#?}", response); // now let's delete it - let response = queue - .delete() - .client_request_id("myclientid") - .execute() - .await?; + let response = queue.delete().into_future().await?; println!("response == {:#?}", response); Ok(()) diff --git a/sdk/storage_queues/src/clients/pop_receipt_client.rs b/sdk/storage_queues/src/clients/pop_receipt_client.rs index f82dc4e9a3..9fb000d4e2 100644 --- a/sdk/storage_queues/src/clients/pop_receipt_client.rs +++ b/sdk/storage_queues/src/clients/pop_receipt_client.rs @@ -1,5 +1,5 @@ -use crate::prelude::*; -use crate::requests::*; +use crate::{operations::*, prelude::*}; +use azure_core::{Context, Request, Response}; use azure_storage::core::clients::StorageClient; use std::sync::Arc; @@ -39,6 +39,14 @@ impl PopReceiptClient { }) } + pub(crate) async fn send( + &self, + context: &mut Context, + request: &mut Request, + ) -> azure_core::Result { + self.queue_client.send(context, request).await + } + pub(crate) fn storage_client(&self) -> &StorageClient { self.queue_client.storage_client() } @@ -56,16 +64,19 @@ impl PopReceiptClient { Ok(url) } - /// Deletes the message. The message must not have been - /// made visible again or this call would fail. + /// Deletes the message. The message must not have been made visible again + /// or this call would fail. pub fn delete(&self) -> DeleteMessageBuilder { - DeleteMessageBuilder::new(self) + DeleteMessageBuilder::new(self.clone()) } - /// Updates the message. - /// The message must not have been - /// made visible again or this call would fail. - pub fn update(&self, visibility_timeout: impl Into) -> UpdateMessageBuilder { - UpdateMessageBuilder::new(self, visibility_timeout) + /// Updates the message. The message must not have been made visible again + /// or this call would fail. + pub fn update( + &self, + body: impl Into, + visibility_timeout: impl Into, + ) -> UpdateMessageBuilder { + UpdateMessageBuilder::new(self.clone(), body.into(), visibility_timeout.into()) } } diff --git a/sdk/storage_queues/src/clients/queue_client.rs b/sdk/storage_queues/src/clients/queue_client.rs index 2eb747383f..4e2d757c41 100644 --- a/sdk/storage_queues/src/clients/queue_client.rs +++ b/sdk/storage_queues/src/clients/queue_client.rs @@ -1,8 +1,13 @@ -use crate::requests::*; -use azure_core::error::{ErrorKind, ResultExt}; -use azure_storage::core::clients::{AsStorageClient, StorageAccountClient, StorageClient}; -use std::fmt::Debug; -use std::sync::Arc; +use crate::{operations::*, QueueStoredAccessPolicy}; +use azure_core::{ + error::{ErrorKind, ResultExt}, + prelude::*, + Context, Request, Response, +}; +use azure_storage::core::clients::{ + AsStorageClient, ServiceType, StorageAccountClient, StorageClient, +}; +use std::{fmt::Debug, sync::Arc}; pub trait AsQueueClient> { fn queue_client(&self, queue_name: QN) -> Arc; @@ -34,6 +39,16 @@ impl QueueClient { }) } + pub(crate) async fn send( + &self, + context: &mut Context, + request: &mut Request, + ) -> azure_core::Result { + self.storage_client + .send(context, request, ServiceType::Queue) + .await + } + pub(crate) fn storage_client(&self) -> &StorageClient { self.storage_client.as_ref() } @@ -53,59 +68,69 @@ impl QueueClient { /// Creates the queue. pub fn create(&self) -> CreateQueueBuilder { - CreateQueueBuilder::new(self) + CreateQueueBuilder::new(self.clone()) } /// Deletes the queue. pub fn delete(&self) -> DeleteQueueBuilder { - DeleteQueueBuilder::new(self) + DeleteQueueBuilder::new(self.clone()) } - /// Sets or clears the queue metadata. The metadata - /// will be passed to the `execute` function of the returned struct. - pub fn set_metadata(&self) -> SetQueueMetadataBuilder { - SetQueueMetadataBuilder::new(self) + /// Sets or clears the queue metadata. + /// + /// Keep in mind that keys present on Azure but not included in the passed + /// metadata parameter will be deleted. If you want to keep the preexisting + /// key-value pairs, retrieve them with GetMetadata first and then + /// update/add to the received Metadata struct. Then pass the Metadata back + /// to SetQueueMetadata. If you just want to clear the metadata, just pass + /// an empty Metadata struct. + pub fn set_metadata(&self, metadata: Metadata) -> SetQueueMetadataBuilder { + SetQueueMetadataBuilder::new(self.clone(), metadata) } /// Get the queue metadata. + pub fn get_metadata(&self) -> GetQueueMetadataBuilder { - GetQueueMetadataBuilder::new(self) + GetQueueMetadataBuilder::new(self.clone()) } /// Get the queue ACL. This call returns /// all the stored access policies associated /// to the current queue. pub fn get_acl(&self) -> GetQueueACLBuilder { - GetQueueACLBuilder::new(self) + GetQueueACLBuilder::new(self.clone()) } - /// Set the queue ACL. You can call this function - /// to change or remove already existing stored - /// access policies by modifying the list returned + /// Set the queue ACL. You can call this function to change or remove + /// already existing stored access policies by modifying the list returned /// by `get_acl`. - pub fn set_acl(&self) -> SetQueueACLBuilder { - SetQueueACLBuilder::new(self) + /// + /// While this SDK does not enforce any limit, keep in mind Azure supports a + /// limited number of stored access policies for each queue. More info here + /// [https://docs.microsoft.com/rest/api/storageservices/set-queue-acl#remarks](https://docs.microsoft.com/rest/api/storageservices/set-queue-acl#remarks). + pub fn set_acl(&self, policies: Vec) -> SetQueueACLBuilder { + SetQueueACLBuilder::new(self.clone(), policies) } /// Puts a message in the queue. The body will be passed /// to the `execute` function of the returned struct. - pub fn put_message(&self) -> PutMessageBuilder { - PutMessageBuilder::new(self) + pub fn put_message>(&self, message: S) -> PutMessageBuilder { + PutMessageBuilder::new(self.clone(), message.into()) } /// Peeks, without removing, one or more messages. pub fn peek_messages(&self) -> PeekMessagesBuilder { - PeekMessagesBuilder::new(self) + PeekMessagesBuilder::new(self.clone()) } /// Gets, shadowing them, one or more messages. pub fn get_messages(&self) -> GetMessagesBuilder { - GetMessagesBuilder::new(self) + GetMessagesBuilder::new(self.clone()) } /// Removes all messages from the queue. pub fn clear_messages(&self) -> ClearMessagesBuilder { - ClearMessagesBuilder::new(self) + ClearMessagesBuilder::new(self.clone()) } } @@ -113,8 +138,7 @@ impl QueueClient { #[cfg(feature = "test_integration")] mod integration_tests { use super::*; - use crate::core::prelude::*; - use crate::queue::clients::AsQueueClient; + use crate::{core::prelude::*, queue::clients::AsQueueClient}; fn get_emulator_client(queue_name: &str) -> Arc { let storage_account = StorageAccountClient::new_emulator_default().storage_client(); diff --git a/sdk/storage_queues/src/clients/queue_service_client.rs b/sdk/storage_queues/src/clients/queue_service_client.rs index 403830f4f7..3a91463460 100644 --- a/sdk/storage_queues/src/clients/queue_service_client.rs +++ b/sdk/storage_queues/src/clients/queue_service_client.rs @@ -1,6 +1,9 @@ -use azure_storage::core::clients::{AsStorageClient, StorageAccountClient, StorageClient}; -use std::fmt::Debug; -use std::sync::Arc; +use crate::{operations::*, QueueServiceProperties}; +use azure_core::{Context, Request, Response}; +use azure_storage::core::clients::{ + AsStorageClient, ServiceType, StorageAccountClient, StorageClient, +}; +use std::{fmt::Debug, sync::Arc}; pub trait AsQueueServiceClient { fn queue_service_client(&self) -> Arc; @@ -20,7 +23,7 @@ impl AsQueueServiceClient for Arc { #[derive(Debug, Clone)] pub struct QueueServiceClient { - storage_client: Arc, + pub(crate) storage_client: Arc, } impl QueueServiceClient { @@ -28,23 +31,36 @@ impl QueueServiceClient { Arc::new(Self { storage_client }) } - pub fn list_queues(&self) -> crate::requests::ListQueuesBuilder { - crate::requests::ListQueuesBuilder::new(&self.storage_client) + pub fn list_queues(&self) -> ListQueuesBuilder { + ListQueuesBuilder::new(self.clone()) } - pub fn get_queue_service_properties( - &self, - ) -> crate::requests::GetQueueServicePropertiesBuilder { - crate::requests::GetQueueServicePropertiesBuilder::new(&self.storage_client) + pub fn get_queue_service_properties(&self) -> GetQueueServicePropertiesBuilder { + GetQueueServicePropertiesBuilder::new(self.clone()) } + /// Set queue service properties. + /// + /// More info here + /// [https://docs.microsoft.com/rest/api/storageservices/set-queue-service-properties](https://docs.microsoft.com/rest/api/storageservices/set-queue-service-properties). pub fn set_queue_service_properties( &self, - ) -> crate::requests::SetQueueServicePropertiesBuilder { - crate::requests::SetQueueServicePropertiesBuilder::new(&self.storage_client) + properties: QueueServiceProperties, + ) -> SetQueueServicePropertiesBuilder { + SetQueueServicePropertiesBuilder::new(self.clone(), properties) + } + + pub fn get_queue_service_stats(&self) -> GetQueueServiceStatsBuilder { + GetQueueServiceStatsBuilder::new(self.clone()) } - pub fn get_queue_service_stats(&self) -> crate::requests::GetQueueServiceStatsBuilder { - crate::requests::GetQueueServiceStatsBuilder::new(&self.storage_client) + pub(crate) async fn send( + &self, + context: &mut Context, + request: &mut Request, + ) -> azure_core::Result { + self.storage_client + .send(context, request, ServiceType::Blob) + .await } } diff --git a/sdk/storage_queues/src/lib.rs b/sdk/storage_queues/src/lib.rs index a17ac2e2d7..05687d3096 100644 --- a/sdk/storage_queues/src/lib.rs +++ b/sdk/storage_queues/src/lib.rs @@ -8,12 +8,11 @@ extern crate azure_core; mod clients; mod message_ttl; mod number_of_messages; +pub mod operations; mod pop_receipt; pub mod prelude; mod queue_service_properties; mod queue_stored_access_policy; -pub mod requests; -pub mod responses; mod visibility_timeout; pub use clients::*; diff --git a/sdk/storage_queues/src/operations/clear_messages.rs b/sdk/storage_queues/src/operations/clear_messages.rs new file mode 100644 index 0000000000..0df4aa2131 --- /dev/null +++ b/sdk/storage_queues/src/operations/clear_messages.rs @@ -0,0 +1,72 @@ +use crate::clients::QueueClient; +use azure_core::{error::Error, prelude::*, Response as AzureResponse}; +use azure_storage::core::headers::CommonStorageResponseHeaders; +use std::convert::TryInto; + +#[derive(Debug)] +pub struct ClearMessagesBuilder { + queue_client: QueueClient, + timeout: Option, + context: Context, +} + +impl ClearMessagesBuilder { + pub(crate) fn new(queue_client: QueueClient) -> Self { + ClearMessagesBuilder { + queue_client, + timeout: None, + context: Context::new(), + } + } + + setters! { + timeout: Timeout => Some(timeout), + } + + pub fn into_future(mut self) -> Response { + Box::pin(async move { + let mut url = self.queue_client.url_with_segments(Some("messages"))?; + + self.timeout.append_to_url_query(&mut url); + + let mut request = self.queue_client.storage_client().prepare_request( + url.as_str(), + http::method::Method::DELETE, + None, + )?; + + let response = self + .queue_client + .send(&mut self.context, &mut request) + .await?; + + response.try_into() + }) + } +} + +pub type Response = futures::future::BoxFuture<'static, azure_core::Result>; + +#[cfg(feature = "into_future")] +impl std::future::IntoFuture for ClearMessagesBuilder { + type IntoFuture = Response; + type Output = ::Output; + fn into_future(self) -> Self::IntoFuture { + Self::into_future(self) + } +} + +#[derive(Debug, Clone)] +pub struct ClearMessagesResponse { + pub common_storage_response_headers: CommonStorageResponseHeaders, +} + +impl std::convert::TryFrom for ClearMessagesResponse { + type Error = Error; + + fn try_from(response: AzureResponse) -> azure_core::Result { + Ok(ClearMessagesResponse { + common_storage_response_headers: response.headers().try_into()?, + }) + } +} diff --git a/sdk/storage_queues/src/operations/create_queue.rs b/sdk/storage_queues/src/operations/create_queue.rs new file mode 100644 index 0000000000..5eab1b3ee4 --- /dev/null +++ b/sdk/storage_queues/src/operations/create_queue.rs @@ -0,0 +1,81 @@ +use crate::clients::QueueClient; +use azure_core::{error::Error, prelude::*, Response as AzureResponse}; +use azure_storage::core::headers::CommonStorageResponseHeaders; +use std::convert::TryInto; + +#[derive(Debug, Clone)] +pub struct CreateQueueBuilder { + queue_client: QueueClient, + timeout: Option, + metadata: Option, + context: Context, +} + +impl CreateQueueBuilder { + pub(crate) fn new(queue_client: QueueClient) -> Self { + CreateQueueBuilder { + queue_client, + timeout: None, + metadata: None, + context: Context::new(), + } + } + + setters! { + metadata: Metadata => Some(metadata), + timeout: Timeout => Some(timeout), + } + + pub fn into_future(mut self) -> Response { + Box::pin(async move { + let mut url = self.queue_client.url_with_segments(None)?; + + self.timeout.append_to_url_query(&mut url); + + let mut request = self.queue_client.storage_client().prepare_request( + url.as_str(), + http::method::Method::PUT, + None, + )?; + + if let Some(metadata) = &self.metadata { + for m in metadata.iter() { + request.add_mandatory_header(&m); + } + } + + let response = self + .queue_client + .send(&mut self.context, &mut request) + .await?; + + response.try_into() + }) + } +} + +pub type Response = futures::future::BoxFuture<'static, azure_core::Result>; + +#[cfg(feature = "into_future")] +impl std::future::IntoFuture for CreateQueueBuilder { + type IntoFuture = Response; + type Output = ::Output; + fn into_future(self) -> Self::IntoFuture { + Self::into_future(self) + } +} + +#[derive(Debug, Clone)] +pub struct CreateQueueResponse { + pub common_storage_response_headers: CommonStorageResponseHeaders, +} + +impl std::convert::TryFrom for CreateQueueResponse { + type Error = Error; + + fn try_from(response: AzureResponse) -> azure_core::Result { + Ok(CreateQueueResponse { + common_storage_response_headers: response.headers().try_into()?, + }) + } +} diff --git a/sdk/storage_queues/src/operations/delete_message.rs b/sdk/storage_queues/src/operations/delete_message.rs new file mode 100644 index 0000000000..7a39aed811 --- /dev/null +++ b/sdk/storage_queues/src/operations/delete_message.rs @@ -0,0 +1,72 @@ +use crate::clients::PopReceiptClient; +use azure_core::{error::Error, prelude::*, Context, Response as AzureResponse}; +use azure_storage::core::headers::CommonStorageResponseHeaders; +use std::convert::TryInto; + +#[derive(Debug)] +pub struct DeleteMessageBuilder { + pop_receipt_client: PopReceiptClient, + timeout: Option, + context: Context, +} + +impl DeleteMessageBuilder { + pub(crate) fn new(pop_receipt_client: PopReceiptClient) -> Self { + DeleteMessageBuilder { + pop_receipt_client, + timeout: None, + context: Context::new(), + } + } + + setters! { + timeout: Timeout => Some(timeout), + } + + pub fn into_future(mut self) -> Response { + Box::pin(async move { + let mut url = self.pop_receipt_client.pop_receipt_url()?; + + self.timeout.append_to_url_query(&mut url); + + let mut request = self.pop_receipt_client.storage_client().prepare_request( + url.as_str(), + http::method::Method::DELETE, + None, + )?; + + let response = self + .pop_receipt_client + .send(&mut self.context, &mut request) + .await?; + + response.try_into() + }) + } +} + +pub type Response = futures::future::BoxFuture<'static, azure_core::Result>; + +#[cfg(feature = "into_future")] +impl std::future::IntoFuture for DeleteMessageBuilder { + type IntoFuture = Response; + type Output = ::Output; + fn into_future(self) -> Self::IntoFuture { + Self::into_future(self) + } +} + +#[derive(Debug, Clone)] +pub struct DeleteMessageResponse { + pub common_storage_response_headers: CommonStorageResponseHeaders, +} + +impl std::convert::TryFrom for DeleteMessageResponse { + type Error = Error; + + fn try_from(response: AzureResponse) -> azure_core::Result { + Ok(DeleteMessageResponse { + common_storage_response_headers: response.headers().try_into()?, + }) + } +} diff --git a/sdk/storage_queues/src/operations/delete_queue.rs b/sdk/storage_queues/src/operations/delete_queue.rs new file mode 100644 index 0000000000..26c45b29fa --- /dev/null +++ b/sdk/storage_queues/src/operations/delete_queue.rs @@ -0,0 +1,72 @@ +use crate::clients::QueueClient; +use azure_core::{error::Error, prelude::*, Context, Response as AzureResponse}; +use azure_storage::core::headers::CommonStorageResponseHeaders; +use std::convert::TryInto; + +#[derive(Debug, Clone)] +pub struct DeleteQueueBuilder { + queue_client: QueueClient, + timeout: Option, + context: Context, +} + +impl DeleteQueueBuilder { + pub(crate) fn new(queue_client: QueueClient) -> Self { + DeleteQueueBuilder { + queue_client, + timeout: None, + context: Context::new(), + } + } + + setters! { + timeout: Timeout => Some(timeout), + } + + pub fn into_future(mut self) -> Response { + Box::pin(async move { + let mut url = self.queue_client.url_with_segments(None)?; + + self.timeout.append_to_url_query(&mut url); + + let mut request = self.queue_client.storage_client().prepare_request( + url.as_str(), + http::method::Method::DELETE, + None, + )?; + + let response = self + .queue_client + .send(&mut self.context, &mut request) + .await?; + + response.try_into() + }) + } +} + +pub type Response = futures::future::BoxFuture<'static, azure_core::Result>; + +#[cfg(feature = "into_future")] +impl std::future::IntoFuture for DeleteQueueBuilder { + type IntoFuture = Response; + type Output = ::Output; + fn into_future(self) -> Self::IntoFuture { + Self::into_future(self) + } +} + +#[derive(Debug, Clone)] +pub struct DeleteQueueResponse { + pub common_storage_response_headers: CommonStorageResponseHeaders, +} + +impl std::convert::TryFrom for DeleteQueueResponse { + type Error = Error; + + fn try_from(response: AzureResponse) -> azure_core::Result { + Ok(DeleteQueueResponse { + common_storage_response_headers: response.headers().try_into()?, + }) + } +} diff --git a/sdk/storage_queues/src/operations/get_messages.rs b/sdk/storage_queues/src/operations/get_messages.rs new file mode 100644 index 0000000000..54c725b77a --- /dev/null +++ b/sdk/storage_queues/src/operations/get_messages.rs @@ -0,0 +1,145 @@ +use crate::{clients::QueueClient, prelude::*, PopReceipt}; +use azure_core::{ + collect_pinned_stream, + error::{ErrorKind, ResultExt}, + headers::utc_date_from_rfc2822, + prelude::*, + Context, Response as AzureResponse, +}; +use azure_storage::core::{headers::CommonStorageResponseHeaders, xml::read_xml}; +use chrono::{DateTime, Utc}; +use std::convert::TryInto; + +#[derive(Debug, Clone)] +pub struct GetMessagesBuilder { + queue_client: QueueClient, + number_of_messages: Option, + visibility_timeout: Option, + timeout: Option, + context: Context, +} + +impl GetMessagesBuilder { + pub(crate) fn new(queue_client: QueueClient) -> Self { + GetMessagesBuilder { + queue_client, + number_of_messages: None, + visibility_timeout: None, + + timeout: None, + context: Context::new(), + } + } + + setters! { + number_of_messages: NumberOfMessages => Some(number_of_messages), + visibility_timeout: VisibilityTimeout => Some(visibility_timeout), + timeout: Timeout => Some(timeout), + } + + pub fn into_future(mut self) -> Response { + Box::pin(async move { + let mut url = self.queue_client.url_with_segments(Some("messages"))?; + + self.visibility_timeout.append_to_url_query(&mut url); + self.number_of_messages.append_to_url_query(&mut url); + self.timeout.append_to_url_query(&mut url); + + let mut request = self.queue_client.storage_client().prepare_request( + url.as_str(), + http::method::Method::GET, + None, + )?; + + let response = self + .queue_client + .send(&mut self.context, &mut request) + .await?; + + GetMessagesResponse::try_from(response).await + }) + } +} + +pub type Response = futures::future::BoxFuture<'static, azure_core::Result>; + +#[cfg(feature = "into_future")] +impl std::future::IntoFuture for GetMessagesBuilder { + type IntoFuture = Response; + type Output = ::Output; + fn into_future(self) -> Self::IntoFuture { + Self::into_future(self) + } +} + +#[derive(Debug, Clone)] +pub struct GetMessagesResponse { + pub common_storage_response_headers: CommonStorageResponseHeaders, + pub messages: Vec, +} + +#[derive(Debug, Clone)] +pub struct Message { + pub pop_receipt: PopReceipt, + pub insertion_time: DateTime, + pub expiration_time: DateTime, + pub time_next_visible: DateTime, + pub dequeue_count: u64, + pub message_text: String, +} + +impl From for PopReceipt { + fn from(message: Message) -> Self { + message.pop_receipt + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +struct MessageInternal { + #[serde(rename = "MessageId")] + pub message_id: String, + #[serde(rename = "InsertionTime")] + pub insertion_time: String, + #[serde(rename = "ExpirationTime")] + pub expiration_time: String, + #[serde(rename = "PopReceipt")] + pub pop_receipt: String, + #[serde(rename = "TimeNextVisible")] + pub time_next_visible: String, + #[serde(rename = "DequeueCount")] + pub dequeue_count: u64, + #[serde(rename = "MessageText")] + pub message_text: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +struct MessagesInternal { + #[serde(rename = "QueueMessage")] + pub messages: Option>, +} + +impl GetMessagesResponse { + async fn try_from(response: AzureResponse) -> azure_core::Result { + let (_, headers, body) = response.deconstruct(); + let body = collect_pinned_stream(body).await?; + + let response: MessagesInternal = read_xml(&body).map_kind(ErrorKind::DataConversion)?; + + let mut messages = Vec::new(); + for message in response.messages.unwrap_or_default().into_iter() { + messages.push(Message { + pop_receipt: PopReceipt::new(message.message_id, message.pop_receipt), + insertion_time: utc_date_from_rfc2822(&message.insertion_time)?, + expiration_time: utc_date_from_rfc2822(&message.expiration_time)?, + time_next_visible: utc_date_from_rfc2822(&message.time_next_visible)?, + dequeue_count: message.dequeue_count, + message_text: message.message_text, + }) + } + + Ok(GetMessagesResponse { + common_storage_response_headers: (&headers).try_into()?, + messages, + }) + } +} diff --git a/sdk/storage_queues/src/operations/get_queue_acl.rs b/sdk/storage_queues/src/operations/get_queue_acl.rs new file mode 100644 index 0000000000..114abeb6f0 --- /dev/null +++ b/sdk/storage_queues/src/operations/get_queue_acl.rs @@ -0,0 +1,91 @@ +use crate::{clients::QueueClient, QueueStoredAccessPolicy}; +use azure_core::{ + collect_pinned_stream, + error::{ErrorKind, ResultExt}, + prelude::*, + Context, Response as AzureResponse, +}; +use azure_storage::{core::headers::CommonStorageResponseHeaders, StoredAccessPolicyList}; +use http::method::Method; +use std::convert::TryInto; + +#[derive(Debug, Clone)] +pub struct GetQueueACLBuilder { + queue_client: QueueClient, + timeout: Option, + context: Context, +} + +impl GetQueueACLBuilder { + pub(crate) fn new(queue_client: QueueClient) -> Self { + Self { + queue_client, + timeout: None, + context: Context::new(), + } + } + + setters! { + timeout: Timeout => Some(timeout), + } + + pub fn into_future(mut self) -> Response { + Box::pin(async move { + let mut url = self.queue_client.url_with_segments(None)?; + + url.query_pairs_mut().append_pair("comp", "acl"); + + self.timeout.append_to_url_query(&mut url); + + let mut request = self.queue_client.storage_client().prepare_request( + url.as_str(), + Method::GET, + None, + )?; + + let response = self + .queue_client + .send(&mut self.context, &mut request) + .await?; + + GetQueueACLResponse::try_from(response).await + }) + } +} + +pub type Response = futures::future::BoxFuture<'static, azure_core::Result>; + +#[cfg(feature = "into_future")] +impl std::future::IntoFuture for GetQueueACLBuilder { + type IntoFuture = Response; + type Output = ::Output; + fn into_future(self) -> Self::IntoFuture { + Self::into_future(self) + } +} + +#[derive(Debug, Clone)] +pub struct GetQueueACLResponse { + pub common_storage_response_headers: CommonStorageResponseHeaders, + pub stored_access_policies: Vec, +} + +impl GetQueueACLResponse { + async fn try_from(response: AzureResponse) -> azure_core::Result { + let (_, headers, body) = response.deconstruct(); + let body = collect_pinned_stream(body).await?; + + let a: azure_core::Result> = + StoredAccessPolicyList::from_xml(&body) + .map_kind(ErrorKind::DataConversion)? + .stored_access + .into_iter() + .map(|sap| sap.try_into().map_kind(ErrorKind::DataConversion)) + .collect(); + + Ok(GetQueueACLResponse { + common_storage_response_headers: (&headers).try_into()?, + stored_access_policies: a?, + }) + } +} diff --git a/sdk/storage_queues/src/operations/get_queue_metadata.rs b/sdk/storage_queues/src/operations/get_queue_metadata.rs new file mode 100644 index 0000000000..95632a313a --- /dev/null +++ b/sdk/storage_queues/src/operations/get_queue_metadata.rs @@ -0,0 +1,80 @@ +use crate::clients::QueueClient; +use azure_core::{error::Error, prelude::*, Context, Response as AzureResponse}; +use azure_storage::core::headers::CommonStorageResponseHeaders; +use http::method::Method; +use std::convert::TryInto; + +#[derive(Debug, Clone)] +pub struct GetQueueMetadataBuilder { + queue_client: QueueClient, + timeout: Option, + context: Context, +} + +impl GetQueueMetadataBuilder { + pub(crate) fn new(queue_client: QueueClient) -> Self { + Self { + queue_client, + timeout: None, + context: Context::new(), + } + } + + setters! { + timeout: Timeout => Some(timeout), + } + + pub fn into_future(mut self) -> Response { + Box::pin(async move { + let mut url = self.queue_client.url_with_segments(None)?; + + url.query_pairs_mut().append_pair("comp", "metadata"); + + self.timeout.append_to_url_query(&mut url); + + let mut request = self.queue_client.storage_client().prepare_request( + url.as_str(), + Method::GET, + None, + )?; + + let response = self + .queue_client + .send(&mut self.context, &mut request) + .await?; + + response.try_into() + }) + } +} + +pub type Response = + futures::future::BoxFuture<'static, azure_core::Result>; + +#[cfg(feature = "into_future")] +impl std::future::IntoFuture for GetQueueMetadataBuilder { + type IntoFuture = Response; + type Output = ::Output; + fn into_future(self) -> Self::IntoFuture { + Self::into_future(self) + } +} + +#[derive(Debug, Clone)] +pub struct GetQueueMetadataResponse { + pub common_storage_response_headers: CommonStorageResponseHeaders, + pub metadata: Metadata, +} + +impl std::convert::TryFrom for GetQueueMetadataResponse { + type Error = Error; + + fn try_from(response: AzureResponse) -> azure_core::Result { + let headers = response.headers(); + + Ok(GetQueueMetadataResponse { + common_storage_response_headers: headers.try_into()?, + metadata: headers.into(), + }) + } +} diff --git a/sdk/storage_queues/src/operations/get_queue_service_properties.rs b/sdk/storage_queues/src/operations/get_queue_service_properties.rs new file mode 100644 index 0000000000..d06cae8371 --- /dev/null +++ b/sdk/storage_queues/src/operations/get_queue_service_properties.rs @@ -0,0 +1,93 @@ +use crate::{QueueServiceClient, QueueServiceProperties}; +use azure_core::{ + collect_pinned_stream, + error::{ErrorKind, ResultExt}, + prelude::*, + Context, Response as AzureResponse, +}; +use azure_storage::core::{headers::CommonStorageResponseHeaders, xml::read_xml}; +use http::method::Method; +use std::convert::TryInto; + +#[derive(Debug, Clone)] +pub struct GetQueueServicePropertiesBuilder { + service_client: QueueServiceClient, + timeout: Option, + context: Context, +} + +impl GetQueueServicePropertiesBuilder { + pub(crate) fn new(service_client: QueueServiceClient) -> Self { + Self { + service_client, + timeout: None, + context: Context::new(), + } + } + + setters! { + timeout: Timeout => Some(timeout), + } + + pub fn into_future(mut self) -> Response { + Box::pin(async move { + let mut url = self + .service_client + .storage_client + .storage_account_client() + .queue_storage_url() + .to_owned(); + + url.query_pairs_mut().append_pair("restype", "service"); + url.query_pairs_mut().append_pair("comp", "properties"); + + self.timeout.append_to_url_query(&mut url); + + let mut request = self.service_client.storage_client.prepare_request( + url.as_str(), + Method::GET, + None, + )?; + + let response = self + .service_client + .send(&mut self.context, &mut request) + .await?; + + GetQueueServicePropertiesResponse::try_from(response).await + }) + } +} + +pub type Response = + futures::future::BoxFuture<'static, azure_core::Result>; + +#[cfg(feature = "into_future")] +impl std::future::IntoFuture for GetQueueServicePropertiesBuilder { + type IntoFuture = Response; + type Output = ::Output; + fn into_future(self) -> Self::IntoFuture { + Self::into_future(self) + } +} + +#[derive(Debug, Clone)] +pub struct GetQueueServicePropertiesResponse { + pub common_storage_response_headers: CommonStorageResponseHeaders, + pub queue_service_properties: QueueServiceProperties, +} + +impl GetQueueServicePropertiesResponse { + async fn try_from(response: AzureResponse) -> azure_core::Result { + let (_, headers, body) = response.deconstruct(); + let body = collect_pinned_stream(body).await?; + + let queue_service_properties: QueueServiceProperties = + read_xml(&body).map_kind(ErrorKind::DataConversion)?; + + Ok(GetQueueServicePropertiesResponse { + common_storage_response_headers: (&headers).try_into()?, + queue_service_properties, + }) + } +} diff --git a/sdk/storage_queues/src/operations/get_queue_service_stats.rs b/sdk/storage_queues/src/operations/get_queue_service_stats.rs new file mode 100644 index 0000000000..ee3f386ca6 --- /dev/null +++ b/sdk/storage_queues/src/operations/get_queue_service_stats.rs @@ -0,0 +1,126 @@ +use crate::QueueServiceClient; +use azure_core::{ + collect_pinned_stream, + error::{ErrorKind, ResultExt}, + prelude::*, + Response as AzureResponse, +}; +use azure_storage::core::{headers::CommonStorageResponseHeaders, xml::read_xml}; +use chrono::{DateTime, Utc}; +use http::method::Method; +use std::convert::TryInto; + +#[derive(Debug, Clone)] +pub struct GetQueueServiceStatsBuilder { + service_client: QueueServiceClient, + timeout: Option, + context: Context, +} + +impl GetQueueServiceStatsBuilder { + pub(crate) fn new(service_client: QueueServiceClient) -> Self { + Self { + service_client, + timeout: None, + context: Context::new(), + } + } + + setters! { + timeout: Timeout => Some(timeout), + } + + pub fn into_future(mut self) -> Response { + Box::pin(async move { + let mut url = self + .service_client + .storage_client + .storage_account_client() + .queue_storage_secondary_url() + .to_owned(); + + url.query_pairs_mut().append_pair("restype", "service"); + url.query_pairs_mut().append_pair("comp", "stats"); + + self.timeout.append_to_url_query(&mut url); + + let mut request = self.service_client.storage_client.prepare_request( + url.as_str(), + Method::GET, + None, + )?; + + let response = self + .service_client + .send(&mut self.context, &mut request) + .await?; + + GetQueueServiceStatsResponse::try_from(response).await + }) + } +} + +pub type Response = + futures::future::BoxFuture<'static, azure_core::Result>; + +#[cfg(feature = "into_future")] +impl std::future::IntoFuture for GetQueueServiceStatsBuilder { + type IntoFuture = Response; + type Output = ::Output; + fn into_future(self) -> Self::IntoFuture { + Self::into_future(self) + } +} + +#[derive(Debug, Clone, Deserialize)] +#[serde(rename_all = "camelCase")] +pub enum Status { + Live, + Bootstrap, + Unavailable, +} + +#[derive(Debug, Clone)] +pub struct GetQueueServiceStatsResponse { + pub common_storage_response_headers: CommonStorageResponseHeaders, + pub status: Status, + pub last_sync_time: Option>, +} + +#[derive(Debug, Clone, Deserialize)] +#[serde(rename_all = "PascalCase")] +struct GetQueueServiceStatsResponseInternal { + pub geo_replication: GeoReplication, +} + +#[derive(Debug, Clone, Deserialize)] +#[serde(rename_all = "PascalCase")] +struct GeoReplication { + pub status: Status, + pub last_sync_time: Option, +} + +impl GetQueueServiceStatsResponse { + async fn try_from(response: AzureResponse) -> azure_core::Result { + let (_, headers, body) = response.deconstruct(); + let body = collect_pinned_stream(body).await?; + + debug!("headers == {:?}", headers); + debug!("body == {:#?}", body); + let response: GetQueueServiceStatsResponseInternal = + read_xml(&body).map_kind(ErrorKind::DataConversion)?; + debug!("deserde == {:#?}", response); + + Ok(GetQueueServiceStatsResponse { + common_storage_response_headers: (&headers).try_into()?, + status: response.geo_replication.status, + last_sync_time: response + .geo_replication + .last_sync_time + .map(|t| DateTime::parse_from_rfc2822(&t)) + .transpose() + .context(ErrorKind::DataConversion, "failed to parse last sync time")? + .map(|t| DateTime::from_utc(t.naive_utc(), Utc)), + }) + } +} diff --git a/sdk/storage_queues/src/operations/list_queues.rs b/sdk/storage_queues/src/operations/list_queues.rs new file mode 100644 index 0000000000..bf08eb9383 --- /dev/null +++ b/sdk/storage_queues/src/operations/list_queues.rs @@ -0,0 +1,179 @@ +use crate::QueueServiceClient; +use azure_core::{ + collect_pinned_stream, + error::{Error, ErrorKind, ResultExt}, + prelude::*, + Context, Pageable, Response as AzureResponse, +}; +use azure_storage::{core::headers::CommonStorageResponseHeaders, xml::read_xml}; +use http::method::Method; +use std::convert::TryInto; + +#[derive(Debug, Clone)] +pub struct ListQueuesBuilder { + service_client: QueueServiceClient, + prefix: Option, + max_results: Option, + include_metadata: bool, + timeout: Option, + context: Context, +} + +impl ListQueuesBuilder { + pub(crate) fn new(service_client: QueueServiceClient) -> Self { + Self { + service_client, + prefix: None, + max_results: None, + include_metadata: false, + timeout: None, + context: Context::new(), + } + } + + setters! { + prefix: Prefix => Some(prefix), + max_results: MaxResults => Some(max_results), + include_metadata: bool => include_metadata, + timeout: Timeout => Some(timeout), + } + + pub fn into_stream(self) -> Pageable { + let make_request = move |continuation: Option| { + let mut this = self.clone(); + async move { + let mut url = this + .service_client + .storage_client + .storage_account_client() + .queue_storage_url() + .to_owned(); + + url.query_pairs_mut().append_pair("comp", "list"); + + this.prefix.append_to_url_query(&mut url); + + if let Some(continuation) = continuation { + url.query_pairs_mut() + .append_pair("marker", &continuation.into_raw()); + } + + this.max_results.append_to_url_query(&mut url); + + if this.include_metadata { + url.query_pairs_mut().append_pair("include", "metadata"); + } + + this.timeout.append_to_url_query(&mut url); + AppendToUrlQuery::append_to_url_query(&this.timeout, &mut url); + + let mut request = this.service_client.storage_client.prepare_request( + url.as_str(), + Method::GET, + None, + )?; + + let response = this + .service_client + .send(&mut this.context, &mut request) + .await?; + + ListQueuesResponse::try_from(response).await + } + }; + + Pageable::new(make_request) + } +} + +#[derive(Debug, Clone)] +pub struct ListQueuesResponse { + pub common_storage_response_headers: CommonStorageResponseHeaders, + pub service_endpoint: String, + pub prefix: Option, + // this seems duplicate :S + pub marker: Option, + pub max_results: Option, + pub queues: Vec, + pub next_marker: Option, +} + +impl Continuable for ListQueuesResponse { + fn continuation(&self) -> Option { + self.next_marker.clone().map(|x| x.as_str().to_owned()) + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +struct ListQueuesResponseInternal { + #[serde(rename = "ServiceEndpoint")] + pub service_endpoint: String, + #[serde(rename = "Prefix")] + pub prefix: Option, + #[serde(rename = "Marker")] + pub marker: Option, + #[serde(rename = "MaxResults")] + pub max_results: Option, + + #[serde(rename = "Queues")] + pub queues: Queues, + + #[serde(rename = "NextMarker")] + pub next_marker: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Queues { + #[serde(rename = "Queue")] + pub queues: Option>, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Queue { + #[serde(rename = "Name")] + pub name: String, + #[serde(rename = "Metadata")] + pub metadata: Option>, +} + +impl ListQueuesResponse { + async fn try_from(response: AzureResponse) -> azure_core::Result { + let (_, headers, body) = response.deconstruct(); + let body = collect_pinned_stream(body).await?; + + let mut response: ListQueuesResponseInternal = + read_xml(&body).map_kind(ErrorKind::DataConversion)?; + + // get rid of the ugly Some("") empty string + // we use None instead + if let Some(next_marker) = &response.next_marker { + if next_marker.is_empty() { + response.next_marker = None; + } + } + + Ok(ListQueuesResponse { + common_storage_response_headers: (&headers).try_into()?, + service_endpoint: response.service_endpoint, + prefix: response.prefix, + marker: response.marker, + max_results: response.max_results, + queues: response.queues.queues.unwrap_or_default(), + next_marker: response.next_marker.map(|nm| nm.into()), + }) + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn try_parse() { + let range = "a2azureiscoolazurerocks"; + + let response: ListQueuesResponseInternal = serde_xml_rs::from_str(range).unwrap(); + + assert_eq!(response.queues.queues.unwrap().len(), 2); + } +} diff --git a/sdk/storage_queues/src/operations/mod.rs b/sdk/storage_queues/src/operations/mod.rs new file mode 100644 index 0000000000..1fc90e2aa2 --- /dev/null +++ b/sdk/storage_queues/src/operations/mod.rs @@ -0,0 +1,32 @@ +mod clear_messages; +mod create_queue; +mod delete_message; +mod delete_queue; +mod get_messages; +mod get_queue_acl; +mod get_queue_metadata; +mod get_queue_service_properties; +mod get_queue_service_stats; +mod list_queues; +mod peek_messages; +mod put_message; +mod set_queue_acl; +mod set_queue_metadata; +mod set_queue_service_properties; +mod update_message; +pub use clear_messages::ClearMessagesBuilder; +pub use create_queue::CreateQueueBuilder; +pub use delete_message::DeleteMessageBuilder; +pub use delete_queue::DeleteQueueBuilder; +pub use get_messages::GetMessagesBuilder; +pub use get_queue_acl::GetQueueACLBuilder; +pub use get_queue_metadata::GetQueueMetadataBuilder; +pub use get_queue_service_properties::GetQueueServicePropertiesBuilder; +pub use get_queue_service_stats::GetQueueServiceStatsBuilder; +pub use list_queues::ListQueuesBuilder; +pub use peek_messages::PeekMessagesBuilder; +pub use put_message::PutMessageBuilder; +pub use set_queue_acl::SetQueueACLBuilder; +pub use set_queue_metadata::SetQueueMetadataBuilder; +pub use set_queue_service_properties::SetQueueServicePropertiesBuilder; +pub use update_message::UpdateMessageBuilder; diff --git a/sdk/storage_queues/src/operations/peek_messages.rs b/sdk/storage_queues/src/operations/peek_messages.rs new file mode 100644 index 0000000000..aca3ffa331 --- /dev/null +++ b/sdk/storage_queues/src/operations/peek_messages.rs @@ -0,0 +1,129 @@ +use crate::{clients::QueueClient, prelude::*}; +use azure_core::{ + collect_pinned_stream, + error::{ErrorKind, ResultExt}, + headers::utc_date_from_rfc2822, + prelude::*, + Context, Response as AzureResponse, +}; +use azure_storage::core::{headers::CommonStorageResponseHeaders, xml::read_xml}; +use chrono::{DateTime, Utc}; +use std::convert::TryInto; + +#[derive(Debug, Clone)] +pub struct PeekMessagesBuilder { + queue_client: QueueClient, + number_of_messages: Option, + timeout: Option, + context: Context, +} + +impl PeekMessagesBuilder { + pub(crate) fn new(queue_client: QueueClient) -> Self { + PeekMessagesBuilder { + queue_client, + number_of_messages: None, + timeout: None, + context: Context::new(), + } + } + + setters! { + number_of_messages: NumberOfMessages => Some(number_of_messages), + timeout: Timeout => Some(timeout), + } + + pub fn into_future(mut self) -> Response { + Box::pin(async move { + let mut url = self.queue_client.url_with_segments(Some("messages"))?; + + url.query_pairs_mut().append_pair("peekonly", "true"); + self.number_of_messages.append_to_url_query(&mut url); + self.timeout.append_to_url_query(&mut url); + + let mut request = self.queue_client.storage_client().prepare_request( + url.as_str(), + http::method::Method::GET, + None, + )?; + + let response = self + .queue_client + .send(&mut self.context, &mut request) + .await?; + + PeekMessagesResponse::try_from(response).await + }) + } +} + +pub type Response = futures::future::BoxFuture<'static, azure_core::Result>; + +#[cfg(feature = "into_future")] +impl std::future::IntoFuture for CreateQueueBuilder { + type IntoFuture = Response; + type Output = ::Output; + fn into_future(self) -> Self::IntoFuture { + Self::into_future(self) + } +} + +#[derive(Debug, Clone)] +pub struct PeekMessagesResponse { + pub common_storage_response_headers: CommonStorageResponseHeaders, + pub messages: Vec, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +struct PeekMessageInternal { + #[serde(rename = "MessageId")] + pub message_id: String, + #[serde(rename = "InsertionTime")] + pub insertion_time: String, + #[serde(rename = "ExpirationTime")] + pub expiration_time: String, + #[serde(rename = "DequeueCount")] + pub dequeue_count: u64, + #[serde(rename = "MessageText")] + pub message_text: String, +} + +#[derive(Debug, Clone)] +pub struct PeekMessage { + pub message_id: String, + pub insertion_time: DateTime, + pub expiration_time: DateTime, + pub dequeue_count: u64, + pub message_text: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +struct PeekMessagesInternal { + #[serde(rename = "QueueMessage")] + pub messages: Option>, +} + +impl PeekMessagesResponse { + async fn try_from(response: AzureResponse) -> azure_core::Result { + let (_, headers, body) = response.deconstruct(); + let body = collect_pinned_stream(body).await?; + + let response: PeekMessagesInternal = read_xml(&body).map_kind(ErrorKind::DataConversion)?; + + let mut messages = Vec::new(); + for message in response.messages.unwrap_or_default().into_iter() { + messages.push(PeekMessage { + message_id: message.message_id, + insertion_time: utc_date_from_rfc2822(&message.insertion_time)?, + expiration_time: utc_date_from_rfc2822(&message.expiration_time)?, + dequeue_count: message.dequeue_count, + message_text: message.message_text, + }) + } + + Ok(PeekMessagesResponse { + common_storage_response_headers: (&headers).try_into()?, + messages, + }) + } +} diff --git a/sdk/storage_queues/src/operations/put_message.rs b/sdk/storage_queues/src/operations/put_message.rs new file mode 100644 index 0000000000..42683d6d42 --- /dev/null +++ b/sdk/storage_queues/src/operations/put_message.rs @@ -0,0 +1,141 @@ +use crate::prelude::*; +use azure_core::{ + collect_pinned_stream, + error::{ErrorKind, ResultExt}, + headers::utc_date_from_rfc2822, + prelude::*, + Context, Response as AzureResponse, +}; +use azure_storage::{core::headers::CommonStorageResponseHeaders, xml::read_xml}; +use chrono::{DateTime, Utc}; +use std::convert::TryInto; + +#[derive(Debug, Clone)] +pub struct PutMessageBuilder { + body: String, + queue_client: QueueClient, + visibility_timeout: Option, + ttl: Option, + timeout: Option, + context: Context, +} + +impl PutMessageBuilder { + pub(crate) fn new(queue_client: QueueClient, body: String) -> Self { + PutMessageBuilder { + body, + queue_client, + visibility_timeout: None, + ttl: None, + timeout: None, + context: Context::new(), + } + } + + setters! { + visibility_timeout: VisibilityTimeout => Some(visibility_timeout), + ttl: MessageTTL => Some(ttl), + timeout: Timeout => Some(timeout), + } + + pub fn into_future(mut self) -> Response { + Box::pin(async move { + let mut url = self.queue_client.url_with_segments(Some("messages"))?; + + self.visibility_timeout.append_to_url_query(&mut url); + self.ttl.append_to_url_query(&mut url); + self.timeout.append_to_url_query(&mut url); + + // since the format is fixed we just decorate the message with the tags. + // This could be made optional in the future and/or more + // stringent. + let message = format!( + "{}", + self.body + ); + + let mut request = self.queue_client.storage_client().prepare_request( + url.as_str(), + http::method::Method::POST, + Some(message.into()), + )?; + + let response = self + .queue_client + .send(&mut self.context, &mut request) + .await?; + + PutMessageResponse::try_from(response).await + }) + } +} + +pub type Response = futures::future::BoxFuture<'static, azure_core::Result>; + +#[cfg(feature = "into_future")] +impl std::future::IntoFuture for PutMessageBuilder { + type IntoFuture = Response; + type Output = ::Output; + fn into_future(self) -> Self::IntoFuture { + Self::into_future(self) + } +} + +#[derive(Debug, Clone)] +pub struct PutMessageResponse { + pub common_storage_response_headers: CommonStorageResponseHeaders, + pub queue_message: QueueMessage, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +struct PutMessageResponseInternal { + #[serde(rename = "QueueMessage")] + pub queue_message: QueueMessageInternal, +} + +#[derive(Debug, Clone)] +pub struct QueueMessage { + pub message_id: String, + pub insertion_time: DateTime, + pub expiration_time: DateTime, + pub pop_receipt: String, + pub time_next_visible: DateTime, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +struct QueueMessageInternal { + #[serde(rename = "MessageId")] + pub message_id: String, + #[serde(rename = "InsertionTime")] + pub insertion_time: String, + #[serde(rename = "ExpirationTime")] + pub expiration_time: String, + #[serde(rename = "PopReceipt")] + pub pop_receipt: String, + #[serde(rename = "TimeNextVisible")] + pub time_next_visible: String, +} + +impl PutMessageResponse { + async fn try_from(response: AzureResponse) -> azure_core::Result { + let (_, headers, body) = response.deconstruct(); + let body = collect_pinned_stream(body).await?; + + let response: PutMessageResponseInternal = + read_xml(&body).map_kind(ErrorKind::DataConversion)?; + let queue_message = response.queue_message; + + let queue_message = QueueMessage { + message_id: queue_message.message_id, + insertion_time: utc_date_from_rfc2822(&queue_message.insertion_time)?, + expiration_time: utc_date_from_rfc2822(&queue_message.expiration_time)?, + pop_receipt: queue_message.pop_receipt, + time_next_visible: utc_date_from_rfc2822(&queue_message.time_next_visible)?, + }; + + Ok(Self { + common_storage_response_headers: (&headers).try_into()?, + queue_message, + }) + } +} diff --git a/sdk/storage_queues/src/operations/set_queue_acl.rs b/sdk/storage_queues/src/operations/set_queue_acl.rs new file mode 100644 index 0000000000..dc8d7f4c2f --- /dev/null +++ b/sdk/storage_queues/src/operations/set_queue_acl.rs @@ -0,0 +1,87 @@ +use crate::{clients::QueueClient, QueueStoredAccessPolicy}; +use azure_core::{error::Error, prelude::*, Response as AzureResponse}; +use azure_storage::{core::headers::CommonStorageResponseHeaders, StoredAccessPolicyList}; +use std::convert::TryInto; + +#[derive(Debug, Clone)] +pub struct SetQueueACLBuilder { + queue_client: QueueClient, + policies: Vec, + timeout: Option, + context: Context, +} + +impl SetQueueACLBuilder { + pub(crate) fn new(queue_client: QueueClient, policies: Vec) -> Self { + SetQueueACLBuilder { + queue_client, + policies, + timeout: None, + context: Context::new(), + } + } + + setters! { + timeout: Timeout => Some(timeout), + } + + pub fn into_future(mut self) -> Response { + Box::pin(async move { + let mut url = self.queue_client.url_with_segments(None)?; + + url.query_pairs_mut().append_pair("comp", "acl"); + self.timeout.append_to_url_query(&mut url); + + // convert the queue_stored_access_policies slice + // in a StoredAccessPolicyList to get its XML + // representation. + let xml_body = { + let mut qapl = StoredAccessPolicyList::new(); + self.policies + .iter() + .for_each(|queue_policy| qapl.stored_access.push(queue_policy.into())); + + qapl.to_xml() + }; + + let mut request = self.queue_client.storage_client().prepare_request( + url.as_str(), + http::method::Method::PUT, + Some(xml_body.into()), + )?; + + let response = self + .queue_client + .send(&mut self.context, &mut request) + .await?; + + response.try_into() + }) + } +} + +pub type Response = futures::future::BoxFuture<'static, azure_core::Result>; + +#[cfg(feature = "into_future")] +impl std::future::IntoFuture for SetQueueACLBuilder { + type IntoFuture = Response; + type Output = ::Output; + fn into_future(self) -> Self::IntoFuture { + Self::into_future(self) + } +} + +#[derive(Debug, Clone)] +pub struct SetQueueACLResponse { + pub common_storage_response_headers: CommonStorageResponseHeaders, +} + +impl std::convert::TryFrom for SetQueueACLResponse { + type Error = Error; + + fn try_from(response: AzureResponse) -> azure_core::Result { + Ok(SetQueueACLResponse { + common_storage_response_headers: response.headers().try_into()?, + }) + } +} diff --git a/sdk/storage_queues/src/operations/set_queue_metadata.rs b/sdk/storage_queues/src/operations/set_queue_metadata.rs new file mode 100644 index 0000000000..d6f51a562b --- /dev/null +++ b/sdk/storage_queues/src/operations/set_queue_metadata.rs @@ -0,0 +1,79 @@ +use crate::clients::QueueClient; +use azure_core::{error::Error, prelude::*, Context, Response as AzureResponse}; +use azure_storage::core::headers::CommonStorageResponseHeaders; +use std::convert::TryInto; + +#[derive(Debug, Clone)] +pub struct SetQueueMetadataBuilder { + queue_client: QueueClient, + metadata: Metadata, + timeout: Option, + context: Context, +} + +impl SetQueueMetadataBuilder { + pub(crate) fn new(queue_client: QueueClient, metadata: Metadata) -> Self { + SetQueueMetadataBuilder { + queue_client, + metadata, + timeout: None, + context: Context::new(), + } + } + + setters! { + timeout: Timeout => Some(timeout), + } + + pub fn into_future(mut self) -> Response { + Box::pin(async move { + let mut url = self.queue_client.url_with_segments(None)?; + + url.query_pairs_mut().append_pair("comp", "metadata"); + self.timeout.append_to_url_query(&mut url); + + let mut request = self.queue_client.storage_client().prepare_request( + url.as_str(), + http::method::Method::PUT, + None, + )?; + for m in self.metadata.iter() { + request.add_mandatory_header(&m); + } + + let response = self + .queue_client + .send(&mut self.context, &mut request) + .await?; + + response.try_into() + }) + } +} + +pub type Response = + futures::future::BoxFuture<'static, azure_core::Result>; + +#[cfg(feature = "into_future")] +impl std::future::IntoFuture for SetQueueMetadataBuilder { + type IntoFuture = Response; + type Output = ::Output; + fn into_future(self) -> Self::IntoFuture { + Self::into_future(self) + } +} + +#[derive(Debug, Clone)] +pub struct SetQueueMetadataResponse { + pub common_storage_response_headers: CommonStorageResponseHeaders, +} + +impl std::convert::TryFrom for SetQueueMetadataResponse { + type Error = Error; + + fn try_from(response: AzureResponse) -> azure_core::Result { + Ok(SetQueueMetadataResponse { + common_storage_response_headers: response.headers().try_into()?, + }) + } +} diff --git a/sdk/storage_queues/src/operations/set_queue_service_properties.rs b/sdk/storage_queues/src/operations/set_queue_service_properties.rs new file mode 100644 index 0000000000..7ec7c1071d --- /dev/null +++ b/sdk/storage_queues/src/operations/set_queue_service_properties.rs @@ -0,0 +1,92 @@ +use crate::{QueueServiceClient, QueueServiceProperties}; +use azure_core::{ + error::{Error, ErrorKind, ResultExt}, + prelude::*, + Context, Response as AzureResponse, +}; +use azure_storage::core::headers::CommonStorageResponseHeaders; +use std::convert::TryInto; + +#[derive(Debug, Clone)] +pub struct SetQueueServicePropertiesBuilder { + service_client: QueueServiceClient, + properties: QueueServiceProperties, + timeout: Option, + context: Context, +} + +impl SetQueueServicePropertiesBuilder { + pub(crate) fn new( + service_client: QueueServiceClient, + properties: QueueServiceProperties, + ) -> Self { + SetQueueServicePropertiesBuilder { + service_client, + properties, + timeout: None, + context: Context::new(), + } + } + + setters! { + timeout: Timeout => Some(timeout), + } + + pub fn into_future(mut self) -> Response { + Box::pin(async move { + let mut url = self + .service_client + .storage_client + .storage_account_client() + .queue_storage_url() + .to_owned(); + + url.query_pairs_mut().append_pair("restype", "service"); + url.query_pairs_mut().append_pair("comp", "properties"); + self.timeout.append_to_url_query(&mut url); + + let xml_body = + serde_xml_rs::to_string(&self.properties).map_kind(ErrorKind::DataConversion)?; + + let mut request = self.service_client.storage_client.prepare_request( + url.as_str(), + http::method::Method::PUT, + Some(xml_body.into()), + )?; + + let response = self + .service_client + .send(&mut self.context, &mut request) + .await?; + + response.try_into() + }) + } +} + +pub type Response = + futures::future::BoxFuture<'static, azure_core::Result>; + +#[cfg(feature = "into_future")] +impl std::future::IntoFuture for SetQueueServicePropertiesBuilder { + type IntoFuture = Response; + type Output = ::Output; + fn into_future(self) -> Self::IntoFuture { + Self::into_future(self) + } +} + +#[derive(Debug, Clone)] +pub struct SetQueueServicePropertiesResponse { + pub common_storage_response_headers: CommonStorageResponseHeaders, +} + +impl std::convert::TryFrom for SetQueueServicePropertiesResponse { + type Error = Error; + + fn try_from(response: AzureResponse) -> azure_core::Result { + Ok(SetQueueServicePropertiesResponse { + common_storage_response_headers: response.headers().try_into()?, + }) + } +} diff --git a/sdk/storage_queues/src/operations/update_message.rs b/sdk/storage_queues/src/operations/update_message.rs new file mode 100644 index 0000000000..8260928089 --- /dev/null +++ b/sdk/storage_queues/src/operations/update_message.rs @@ -0,0 +1,106 @@ +use crate::{clients::PopReceiptClient, prelude::*}; +use azure_core::{ + error::Error, + headers::{get_str_from_headers, rfc2822_from_headers_mandatory, HeaderName}, + prelude::*, + Context, Response as AzureResponse, +}; +use azure_storage::core::headers::CommonStorageResponseHeaders; +use chrono::{DateTime, Utc}; +use std::convert::TryInto; + +#[derive(Debug, Clone)] +pub struct UpdateMessageBuilder { + pop_receipt_client: PopReceiptClient, + body: String, + visibility_timeout: VisibilityTimeout, + timeout: Option, + context: Context, +} + +impl UpdateMessageBuilder { + pub(crate) fn new( + pop_receipt_client: PopReceiptClient, + body: String, + visibility_timeout: VisibilityTimeout, + ) -> Self { + UpdateMessageBuilder { + pop_receipt_client, + body, + visibility_timeout, + timeout: None, + context: Context::new(), + } + } + + setters! { + timeout: Timeout => Some(timeout), + } + + pub fn into_future(mut self) -> Response { + Box::pin(async move { + let mut url = self.pop_receipt_client.pop_receipt_url()?; + + self.visibility_timeout.append_to_url_query(&mut url); + self.timeout.append_to_url_query(&mut url); + + // since the format is fixed we just decorate the message with the tags. + // This could be made optional in the future and/or more + // stringent. + let message = format!( + "{}", + self.body + ); + + let mut request = self.pop_receipt_client.storage_client().prepare_request( + url.as_str(), + http::method::Method::PUT, + Some(message.into()), + )?; + + let response = self + .pop_receipt_client + .send(&mut self.context, &mut request) + .await?; + + response.try_into() + }) + } +} + +pub type Response = futures::future::BoxFuture<'static, azure_core::Result>; + +#[cfg(feature = "into_future")] +impl std::future::IntoFuture for UpdateMessageBuilder { + type IntoFuture = Response; + type Output = ::Output; + fn into_future(self) -> Self::IntoFuture { + Self::into_future(self) + } +} + +#[derive(Debug, Clone)] +pub struct UpdateMessageResponse { + pub common_storage_response_headers: CommonStorageResponseHeaders, + pub time_next_visible: DateTime, + pub pop_receipt: String, +} + +impl std::convert::TryFrom for UpdateMessageResponse { + type Error = Error; + + fn try_from(response: AzureResponse) -> azure_core::Result { + Ok(UpdateMessageResponse { + common_storage_response_headers: response.headers().try_into()?, + time_next_visible: rfc2822_from_headers_mandatory( + response.headers(), + &HeaderName::from_static("x-ms-time-next-visible"), + )?, + pop_receipt: get_str_from_headers( + response.headers(), + &HeaderName::from_static("x-ms-popreceipt"), + )? + .to_owned(), + }) + } +} diff --git a/sdk/storage_queues/src/prelude.rs b/sdk/storage_queues/src/prelude.rs index 9ba771623d..c5dbf23d88 100644 --- a/sdk/storage_queues/src/prelude.rs +++ b/sdk/storage_queues/src/prelude.rs @@ -1,5 +1,5 @@ -pub use crate::clients::{AsPopReceiptClient, AsQueueClient}; pub use crate::{ + clients::{AsPopReceiptClient, AsQueueClient}, AsQueueServiceClient, MessageTTL, NumberOfMessages, PopReceipt, QueueClient, QueueServiceClient, QueueStoredAccessPolicy, VisibilityTimeout, }; diff --git a/sdk/storage_queues/src/requests/clear_messages_builder.rs b/sdk/storage_queues/src/requests/clear_messages_builder.rs deleted file mode 100644 index 3ab52207ff..0000000000 --- a/sdk/storage_queues/src/requests/clear_messages_builder.rs +++ /dev/null @@ -1,50 +0,0 @@ -use crate::clients::QueueClient; -use crate::responses::*; -use azure_core::prelude::*; - -use std::convert::TryInto; - -#[derive(Debug)] -pub struct ClearMessagesBuilder<'a> { - queue_client: &'a QueueClient, - timeout: Option, - client_request_id: Option, -} - -impl<'a> ClearMessagesBuilder<'a> { - pub(crate) fn new(queue_client: &'a QueueClient) -> Self { - ClearMessagesBuilder { - queue_client, - timeout: None, - client_request_id: None, - } - } - - setters! { - timeout: Timeout => Some(timeout), - client_request_id: ClientRequestId => Some(client_request_id), - } - - pub async fn execute(&self) -> azure_core::Result { - let mut url = self.queue_client.url_with_segments(Some("messages"))?; - - self.timeout.append_to_url_query(&mut url); - - let mut request = self.queue_client.storage_client().prepare_request( - url.as_str(), - http::method::Method::DELETE, - None, - )?; - request.add_optional_header(&self.client_request_id); - - let response = self - .queue_client - .storage_client() - .storage_account_client() - .http_client() - .execute_request_check_status(&request) - .await?; - - response.try_into() - } -} diff --git a/sdk/storage_queues/src/requests/create_queue_builder.rs b/sdk/storage_queues/src/requests/create_queue_builder.rs deleted file mode 100644 index 5c6c2db99f..0000000000 --- a/sdk/storage_queues/src/requests/create_queue_builder.rs +++ /dev/null @@ -1,57 +0,0 @@ -use crate::clients::QueueClient; -use crate::responses::*; -use azure_core::prelude::*; -use std::convert::TryInto; - -#[derive(Debug, Clone)] -pub struct CreateQueueBuilder<'a> { - queue_client: &'a QueueClient, - timeout: Option, - metadata: Option<&'a Metadata>, - client_request_id: Option, -} - -impl<'a> CreateQueueBuilder<'a> { - pub(crate) fn new(queue_client: &'a QueueClient) -> Self { - CreateQueueBuilder { - queue_client, - timeout: None, - metadata: None, - client_request_id: None, - } - } - - setters! { - metadata: &'a Metadata => Some(metadata), - timeout: Timeout => Some(timeout), - client_request_id: ClientRequestId => Some(client_request_id), - } - - pub async fn execute(&self) -> azure_core::Result { - let mut url = self.queue_client.url_with_segments(None)?; - - self.timeout.append_to_url_query(&mut url); - - let mut request = self.queue_client.storage_client().prepare_request( - url.as_str(), - http::method::Method::PUT, - None, - )?; - request.add_optional_header(&self.client_request_id); - if let Some(metadata) = &self.metadata { - for m in metadata.iter() { - request.add_mandatory_header(&m); - } - } - - let response = self - .queue_client - .storage_client() - .storage_account_client() - .http_client() - .execute_request_check_status(&request) - .await?; - - response.try_into() - } -} diff --git a/sdk/storage_queues/src/requests/delete_message_builder.rs b/sdk/storage_queues/src/requests/delete_message_builder.rs deleted file mode 100644 index f8033ee9f2..0000000000 --- a/sdk/storage_queues/src/requests/delete_message_builder.rs +++ /dev/null @@ -1,49 +0,0 @@ -use crate::clients::PopReceiptClient; -use crate::responses::*; -use azure_core::prelude::*; - -use std::convert::TryInto; - -#[derive(Debug)] -pub struct DeleteMessageBuilder<'a> { - pop_receipt_client: &'a PopReceiptClient, - timeout: Option, - client_request_id: Option, -} - -impl<'a> DeleteMessageBuilder<'a> { - pub(crate) fn new(pop_receipt_client: &'a PopReceiptClient) -> Self { - DeleteMessageBuilder { - pop_receipt_client, - timeout: None, - client_request_id: None, - } - } - - setters! { - timeout: Timeout => Some(timeout), - client_request_id: ClientRequestId => Some(client_request_id), - } - - pub async fn execute(&self) -> azure_core::Result { - let mut url = self.pop_receipt_client.pop_receipt_url()?; - - self.timeout.append_to_url_query(&mut url); - - let mut request = self.pop_receipt_client.storage_client().prepare_request( - url.as_str(), - http::method::Method::DELETE, - None, - )?; - request.add_optional_header(&self.client_request_id); - - let response = self - .pop_receipt_client - .storage_client() - .http_client() - .execute_request_check_status(&request) - .await?; - - response.try_into() - } -} diff --git a/sdk/storage_queues/src/requests/delete_queue_builder.rs b/sdk/storage_queues/src/requests/delete_queue_builder.rs deleted file mode 100644 index 8fef4845e8..0000000000 --- a/sdk/storage_queues/src/requests/delete_queue_builder.rs +++ /dev/null @@ -1,49 +0,0 @@ -use crate::clients::QueueClient; -use crate::responses::*; -use azure_core::prelude::*; - -use std::convert::TryInto; - -#[derive(Debug, Clone)] -pub struct DeleteQueueBuilder<'a> { - queue_client: &'a QueueClient, - timeout: Option, - client_request_id: Option, -} - -impl<'a> DeleteQueueBuilder<'a> { - pub(crate) fn new(queue_client: &'a QueueClient) -> Self { - DeleteQueueBuilder { - queue_client, - timeout: None, - client_request_id: None, - } - } - - setters! { - timeout: Timeout => Some(timeout), - client_request_id: ClientRequestId => Some(client_request_id), - } - - pub async fn execute(&self) -> azure_core::Result { - let mut url = self.queue_client.url_with_segments(None)?; - - self.timeout.append_to_url_query(&mut url); - - let mut request = self.queue_client.storage_client().prepare_request( - url.as_str(), - http::method::Method::DELETE, - None, - )?; - request.add_optional_header(&self.client_request_id); - - let response = self - .queue_client - .storage_client() - .http_client() - .execute_request_check_status(&request) - .await?; - - response.try_into() - } -} diff --git a/sdk/storage_queues/src/requests/get_messages_builder.rs b/sdk/storage_queues/src/requests/get_messages_builder.rs deleted file mode 100644 index 80a395e29e..0000000000 --- a/sdk/storage_queues/src/requests/get_messages_builder.rs +++ /dev/null @@ -1,58 +0,0 @@ -use crate::clients::QueueClient; -use crate::prelude::*; -use crate::responses::*; -use azure_core::prelude::*; - -use std::convert::TryInto; - -#[derive(Debug, Clone)] -pub struct GetMessagesBuilder<'a> { - queue_client: &'a QueueClient, - number_of_messages: Option, - visibility_timeout: Option, - timeout: Option, - client_request_id: Option, -} - -impl<'a> GetMessagesBuilder<'a> { - pub(crate) fn new(queue_client: &'a QueueClient) -> Self { - GetMessagesBuilder { - queue_client, - number_of_messages: None, - visibility_timeout: None, - timeout: None, - client_request_id: None, - } - } - - setters! { - number_of_messages: NumberOfMessages => Some(number_of_messages), - visibility_timeout: VisibilityTimeout => Some(visibility_timeout), - timeout: Timeout => Some(timeout), - client_request_id: ClientRequestId => Some(client_request_id), - } - - pub async fn execute(&self) -> azure_core::Result { - let mut url = self.queue_client.url_with_segments(Some("messages"))?; - - self.visibility_timeout.append_to_url_query(&mut url); - self.number_of_messages.append_to_url_query(&mut url); - self.timeout.append_to_url_query(&mut url); - - let mut request = self.queue_client.storage_client().prepare_request( - url.as_str(), - http::method::Method::GET, - None, - )?; - request.add_optional_header(&self.client_request_id); - - let response = self - .queue_client - .storage_client() - .http_client() - .execute_request_check_status(&request) - .await?; - - response.try_into() - } -} diff --git a/sdk/storage_queues/src/requests/get_queue_acl_builder.rs b/sdk/storage_queues/src/requests/get_queue_acl_builder.rs deleted file mode 100644 index bbfa8b43a0..0000000000 --- a/sdk/storage_queues/src/requests/get_queue_acl_builder.rs +++ /dev/null @@ -1,50 +0,0 @@ -use crate::clients::QueueClient; -use crate::responses::*; -use azure_core::prelude::*; -use http::method::Method; -use std::convert::TryInto; - -#[derive(Debug, Clone)] -pub struct GetQueueACLBuilder<'a> { - queue_client: &'a QueueClient, - timeout: Option, - client_request_id: Option, -} - -impl<'a> GetQueueACLBuilder<'a> { - pub(crate) fn new(queue_client: &'a QueueClient) -> Self { - Self { - queue_client, - timeout: None, - client_request_id: None, - } - } - - setters! { - timeout: Timeout => Some(timeout), - client_request_id: ClientRequestId => Some(client_request_id), - } - - pub async fn execute(&self) -> azure_core::Result { - let mut url = self.queue_client.url_with_segments(None)?; - - url.query_pairs_mut().append_pair("comp", "acl"); - - self.timeout.append_to_url_query(&mut url); - - let mut request = - self.queue_client - .storage_client() - .prepare_request(url.as_str(), Method::GET, None)?; - request.add_optional_header(&self.client_request_id); - - let response = self - .queue_client - .storage_client() - .http_client() - .execute_request_check_status(&request) - .await?; - - response.try_into() - } -} diff --git a/sdk/storage_queues/src/requests/get_queue_metadata_builder.rs b/sdk/storage_queues/src/requests/get_queue_metadata_builder.rs deleted file mode 100644 index d401adadd1..0000000000 --- a/sdk/storage_queues/src/requests/get_queue_metadata_builder.rs +++ /dev/null @@ -1,50 +0,0 @@ -use crate::clients::QueueClient; -use crate::responses::*; -use azure_core::prelude::*; -use http::method::Method; -use std::convert::TryInto; - -#[derive(Debug, Clone)] -pub struct GetQueueMetadataBuilder<'a> { - queue_client: &'a QueueClient, - timeout: Option, - client_request_id: Option, -} - -impl<'a> GetQueueMetadataBuilder<'a> { - pub(crate) fn new(queue_client: &'a QueueClient) -> Self { - Self { - queue_client, - timeout: None, - client_request_id: None, - } - } - - setters! { - timeout: Timeout => Some(timeout), - client_request_id: ClientRequestId => Some(client_request_id), - } - - pub async fn execute(&self) -> azure_core::Result { - let mut url = self.queue_client.url_with_segments(None)?; - - url.query_pairs_mut().append_pair("comp", "metadata"); - - self.timeout.append_to_url_query(&mut url); - - let mut request = - self.queue_client - .storage_client() - .prepare_request(url.as_str(), Method::GET, None)?; - request.add_optional_header(&self.client_request_id); - - let response = self - .queue_client - .storage_client() - .http_client() - .execute_request_check_status(&request) - .await?; - - response.try_into() - } -} diff --git a/sdk/storage_queues/src/requests/get_queue_service_properties_builder.rs b/sdk/storage_queues/src/requests/get_queue_service_properties_builder.rs deleted file mode 100644 index 068578858f..0000000000 --- a/sdk/storage_queues/src/requests/get_queue_service_properties_builder.rs +++ /dev/null @@ -1,54 +0,0 @@ -use crate::responses::*; -use azure_core::prelude::*; -use azure_storage::core::prelude::*; -use http::method::Method; -use std::convert::TryInto; - -#[derive(Debug, Clone)] -pub struct GetQueueServicePropertiesBuilder<'a> { - storage_client: &'a StorageClient, - timeout: Option, - client_request_id: Option, -} - -impl<'a> GetQueueServicePropertiesBuilder<'a> { - pub(crate) fn new(storage_client: &'a StorageClient) -> Self { - Self { - storage_client, - timeout: None, - client_request_id: None, - } - } - - setters! { - timeout: Timeout => Some(timeout), - client_request_id: ClientRequestId => Some(client_request_id), - } - - pub async fn execute(&self) -> azure_core::Result { - let mut url = self - .storage_client - .storage_account_client() - .queue_storage_url() - .to_owned(); - - url.query_pairs_mut().append_pair("restype", "service"); - url.query_pairs_mut().append_pair("comp", "properties"); - - self.timeout.append_to_url_query(&mut url); - - let mut request = self - .storage_client - .prepare_request(url.as_str(), Method::GET, None)?; - request.add_optional_header(&self.client_request_id); - - let response = self - .storage_client - .storage_account_client() - .http_client() - .execute_request_check_status(&request) - .await?; - - response.try_into() - } -} diff --git a/sdk/storage_queues/src/requests/get_queue_service_stats_builder.rs b/sdk/storage_queues/src/requests/get_queue_service_stats_builder.rs deleted file mode 100644 index 468ce4b02d..0000000000 --- a/sdk/storage_queues/src/requests/get_queue_service_stats_builder.rs +++ /dev/null @@ -1,54 +0,0 @@ -use crate::responses::*; -use azure_core::prelude::*; -use azure_storage::core::prelude::*; -use http::method::Method; -use std::convert::TryInto; - -#[derive(Debug, Clone)] -pub struct GetQueueServiceStatsBuilder<'a> { - storage_client: &'a StorageClient, - timeout: Option, - client_request_id: Option, -} - -impl<'a> GetQueueServiceStatsBuilder<'a> { - pub(crate) fn new(storage_client: &'a StorageClient) -> Self { - Self { - storage_client, - timeout: None, - client_request_id: None, - } - } - - setters! { - timeout: Timeout => Some(timeout), - client_request_id: ClientRequestId => Some(client_request_id), - } - - pub async fn execute(&self) -> azure_core::Result { - let mut url = self - .storage_client - .storage_account_client() - .queue_storage_secondary_url() - .to_owned(); - - url.query_pairs_mut().append_pair("restype", "service"); - url.query_pairs_mut().append_pair("comp", "stats"); - - self.timeout.append_to_url_query(&mut url); - - let mut request = self - .storage_client - .prepare_request(url.as_str(), Method::GET, None)?; - request.add_optional_header(&self.client_request_id); - - let response = self - .storage_client - .storage_account_client() - .http_client() - .execute_request_check_status(&request) - .await?; - - response.try_into() - } -} diff --git a/sdk/storage_queues/src/requests/list_queues_builder.rs b/sdk/storage_queues/src/requests/list_queues_builder.rs deleted file mode 100644 index cb39c33750..0000000000 --- a/sdk/storage_queues/src/requests/list_queues_builder.rs +++ /dev/null @@ -1,110 +0,0 @@ -use crate::responses::*; -use azure_core::prelude::*; -use azure_storage::core::prelude::*; -use futures::stream::{unfold, Stream}; -use http::method::Method; -use std::convert::TryInto; - -#[derive(Debug, Clone)] -pub struct ListQueuesBuilder<'a> { - storage_client: &'a StorageClient, - prefix: Option, - next_marker: Option, - max_results: Option, - include_metadata: bool, - timeout: Option, - client_request_id: Option, -} - -impl<'a> ListQueuesBuilder<'a> { - pub(crate) fn new(storage_client: &'a StorageClient) -> Self { - Self { - storage_client, - prefix: None, - next_marker: None, - max_results: None, - include_metadata: false, - timeout: None, - client_request_id: None, - } - } - - setters! { - prefix: Prefix => Some(prefix), - next_marker: NextMarker => Some(next_marker), - max_results: MaxResults => Some(max_results), - include_metadata: bool => include_metadata, - timeout: Timeout => Some(timeout), - client_request_id: ClientRequestId => Some(client_request_id), - } - - pub async fn execute(&self) -> azure_core::Result { - let mut url = self - .storage_client - .storage_account_client() - .queue_storage_url() - .to_owned(); - - url.query_pairs_mut().append_pair("comp", "list"); - - self.prefix.append_to_url_query(&mut url); - self.next_marker.append_to_url_query(&mut url); - self.max_results.append_to_url_query(&mut url); - - if self.include_metadata { - url.query_pairs_mut().append_pair("include", "metadata"); - } - - self.timeout.append_to_url_query(&mut url); - AppendToUrlQuery::append_to_url_query(&self.timeout, &mut url); - - let mut request = self - .storage_client - .prepare_request(url.as_str(), Method::GET, None)?; - request.add_optional_header(&self.client_request_id); - - let response = self - .storage_client - .storage_account_client() - .http_client() - .execute_request_check_status(&request) - .await?; - - response.try_into() - } - - pub fn stream(self) -> impl Stream> + 'a { - #[derive(Debug, Clone, PartialEq)] - enum States { - Init, - NextMarker(NextMarker), - } - - unfold(Some(States::Init), move |next_marker: Option| { - let req = self.clone(); - async move { - let response = match next_marker { - Some(States::Init) => req.execute().await, - Some(States::NextMarker(next_marker)) => { - req.next_marker(next_marker).execute().await - } - None => return None, - }; - - // the ? operator does not work in async move (yet?) - // so we have to resort to this boilerplate - let response = match response { - Ok(response) => response, - Err(err) => return Some((Err(err), None)), - }; - - let next_marker = response - .next_marker() - .as_ref() - .map(|next_marker| States::NextMarker(next_marker.to_owned())); - - Some((Ok(response), next_marker)) - } - }) - } -} diff --git a/sdk/storage_queues/src/requests/mod.rs b/sdk/storage_queues/src/requests/mod.rs deleted file mode 100644 index cbfe4fd3c0..0000000000 --- a/sdk/storage_queues/src/requests/mod.rs +++ /dev/null @@ -1,32 +0,0 @@ -mod clear_messages_builder; -mod create_queue_builder; -mod delete_message_builder; -mod delete_queue_builder; -mod get_messages_builder; -mod get_queue_acl_builder; -mod get_queue_metadata_builder; -mod get_queue_service_properties_builder; -mod get_queue_service_stats_builder; -mod list_queues_builder; -mod peek_messages_builder; -mod put_message_builder; -mod set_queue_acl_builder; -mod set_queue_metadata_builder; -mod set_queue_service_properties_builder; -mod update_message_builder; -pub use clear_messages_builder::ClearMessagesBuilder; -pub use create_queue_builder::CreateQueueBuilder; -pub use delete_message_builder::DeleteMessageBuilder; -pub use delete_queue_builder::DeleteQueueBuilder; -pub use get_messages_builder::GetMessagesBuilder; -pub use get_queue_acl_builder::GetQueueACLBuilder; -pub use get_queue_metadata_builder::GetQueueMetadataBuilder; -pub use get_queue_service_properties_builder::GetQueueServicePropertiesBuilder; -pub use get_queue_service_stats_builder::GetQueueServiceStatsBuilder; -pub use list_queues_builder::ListQueuesBuilder; -pub use peek_messages_builder::PeekMessagesBuilder; -pub use put_message_builder::PutMessageBuilder; -pub use set_queue_acl_builder::SetQueueACLBuilder; -pub use set_queue_metadata_builder::SetQueueMetadataBuilder; -pub use set_queue_service_properties_builder::SetQueueServicePropertiesBuilder; -pub use update_message_builder::UpdateMessageBuilder; diff --git a/sdk/storage_queues/src/requests/peek_messages_builder.rs b/sdk/storage_queues/src/requests/peek_messages_builder.rs deleted file mode 100644 index 52bdbdd929..0000000000 --- a/sdk/storage_queues/src/requests/peek_messages_builder.rs +++ /dev/null @@ -1,55 +0,0 @@ -use crate::clients::QueueClient; -use crate::prelude::*; -use crate::responses::*; -use azure_core::prelude::*; - -use std::convert::TryInto; - -#[derive(Debug, Clone)] -pub struct PeekMessagesBuilder<'a> { - queue_client: &'a QueueClient, - number_of_messages: Option, - timeout: Option, - client_request_id: Option, -} - -impl<'a> PeekMessagesBuilder<'a> { - pub(crate) fn new(queue_client: &'a QueueClient) -> Self { - PeekMessagesBuilder { - queue_client, - number_of_messages: None, - timeout: None, - client_request_id: None, - } - } - - setters! { - number_of_messages: NumberOfMessages => Some(number_of_messages), - timeout: Timeout => Some(timeout), - client_request_id: ClientRequestId => Some(client_request_id), - } - - pub async fn execute(&self) -> azure_core::Result { - let mut url = self.queue_client.url_with_segments(Some("messages"))?; - - url.query_pairs_mut().append_pair("peekonly", "true"); - self.number_of_messages.append_to_url_query(&mut url); - self.timeout.append_to_url_query(&mut url); - - let mut request = self.queue_client.storage_client().prepare_request( - url.as_str(), - http::method::Method::GET, - None, - )?; - request.add_optional_header(&self.client_request_id); - - let response = self - .queue_client - .storage_client() - .http_client() - .execute_request_check_status(&request) - .await?; - - response.try_into() - } -} diff --git a/sdk/storage_queues/src/requests/put_message_builder.rs b/sdk/storage_queues/src/requests/put_message_builder.rs deleted file mode 100644 index 8486b55cf8..0000000000 --- a/sdk/storage_queues/src/requests/put_message_builder.rs +++ /dev/null @@ -1,65 +0,0 @@ -use crate::prelude::*; -use crate::responses::*; -use azure_core::prelude::*; - -use std::convert::TryInto; - -#[derive(Debug, Clone)] -pub struct PutMessageBuilder<'a> { - queue_client: &'a QueueClient, - visibility_timeout: Option, - ttl: Option, - timeout: Option, - client_request_id: Option, -} - -impl<'a> PutMessageBuilder<'a> { - pub(crate) fn new(queue_client: &'a QueueClient) -> Self { - PutMessageBuilder { - queue_client, - visibility_timeout: None, - ttl: None, - timeout: None, - client_request_id: None, - } - } - - setters! { - visibility_timeout: VisibilityTimeout => Some(visibility_timeout), - ttl: MessageTTL => Some(ttl), - timeout: Timeout => Some(timeout), - client_request_id: ClientRequestId => Some(client_request_id), - } - - pub async fn execute(&self, body: impl AsRef) -> azure_core::Result { - let mut url = self.queue_client.url_with_segments(Some("messages"))?; - - self.visibility_timeout.append_to_url_query(&mut url); - self.ttl.append_to_url_query(&mut url); - self.timeout.append_to_url_query(&mut url); - - // since the format is fixed we just decorate the message with the tags. - // This could be made optional in the future and/or more - // stringent. - let message = format!( - "{}", - body.as_ref() - ); - - let mut request = self.queue_client.storage_client().prepare_request( - url.as_str(), - http::method::Method::POST, - Some(message.into()), - )?; - request.add_optional_header(&self.client_request_id); - - let response = self - .queue_client - .storage_client() - .http_client() - .execute_request_check_status(&request) - .await?; - - response.try_into() - } -} diff --git a/sdk/storage_queues/src/requests/set_queue_acl_builder.rs b/sdk/storage_queues/src/requests/set_queue_acl_builder.rs deleted file mode 100644 index 3abdde3272..0000000000 --- a/sdk/storage_queues/src/requests/set_queue_acl_builder.rs +++ /dev/null @@ -1,73 +0,0 @@ -use crate::clients::QueueClient; -use crate::responses::*; -use crate::QueueStoredAccessPolicy; -use azure_core::prelude::*; - -use azure_storage::StoredAccessPolicyList; -use std::convert::TryInto; - -#[derive(Debug, Clone)] -pub struct SetQueueACLBuilder<'a> { - queue_client: &'a QueueClient, - timeout: Option, - client_request_id: Option, -} - -impl<'a> SetQueueACLBuilder<'a> { - pub(crate) fn new(queue_client: &'a QueueClient) -> Self { - SetQueueACLBuilder { - queue_client, - timeout: None, - client_request_id: None, - } - } - - setters! { - timeout: Timeout => Some(timeout), - client_request_id: ClientRequestId => Some(client_request_id), - } - - /// Pass the requested polices here. - /// While this SDK does not enforce any limit, - /// keep in mind Azure supports a limited number of - /// stored access policies for each queue. - /// More info here - /// [https://docs.microsoft.com/rest/api/storageservices/set-queue-acl#remarks](https://docs.microsoft.com/rest/api/storageservices/set-queue-acl#remarks). - pub async fn execute( - &self, - queue_stored_access_policies: &[QueueStoredAccessPolicy], - ) -> azure_core::Result { - let mut url = self.queue_client.url_with_segments(None)?; - - url.query_pairs_mut().append_pair("comp", "acl"); - self.timeout.append_to_url_query(&mut url); - - // convert the queue_stored_access_policies slice - // in a StoredAccessPolicyList to get its XML - // representation. - let xml_body = { - let mut qapl = StoredAccessPolicyList::new(); - queue_stored_access_policies - .iter() - .for_each(|queue_policy| qapl.stored_access.push(queue_policy.into())); - - qapl.to_xml() - }; - - let mut request = self.queue_client.storage_client().prepare_request( - url.as_str(), - http::method::Method::PUT, - Some(xml_body.into()), - )?; - request.add_optional_header(&self.client_request_id); - - let response = self - .queue_client - .storage_client() - .http_client() - .execute_request_check_status(&request) - .await?; - - response.try_into() - } -} diff --git a/sdk/storage_queues/src/requests/set_queue_metadata_builder.rs b/sdk/storage_queues/src/requests/set_queue_metadata_builder.rs deleted file mode 100644 index 8757cf5404..0000000000 --- a/sdk/storage_queues/src/requests/set_queue_metadata_builder.rs +++ /dev/null @@ -1,63 +0,0 @@ -use crate::clients::QueueClient; -use crate::responses::*; -use azure_core::prelude::*; -use std::convert::TryInto; - -#[derive(Debug, Clone)] -pub struct SetQueueMetadataBuilder<'a> { - queue_client: &'a QueueClient, - timeout: Option, - client_request_id: Option, -} - -impl<'a> SetQueueMetadataBuilder<'a> { - pub(crate) fn new(queue_client: &'a QueueClient) -> Self { - SetQueueMetadataBuilder { - queue_client, - timeout: None, - client_request_id: None, - } - } - - setters! { - timeout: Timeout => Some(timeout), - client_request_id: ClientRequestId => Some(client_request_id), - } - - /// This call sets the metadata. - /// Keep in mind that keys present on Azure but not included in the passed - /// metadata parameter will be deleted. If you want to keep the preexisting - /// key-value pairs, retrieve them with GetMetadata first and - /// then update/add to the received Metadata struct. Then pass the Metadata - /// back to SetQueueMetadata. - /// If you just want to clear the metadata, just pass an empty Metadata - /// struct. - pub async fn execute( - &self, - metadata: &Metadata, - ) -> azure_core::Result { - let mut url = self.queue_client.url_with_segments(None)?; - - url.query_pairs_mut().append_pair("comp", "metadata"); - self.timeout.append_to_url_query(&mut url); - - let mut request = self.queue_client.storage_client().prepare_request( - url.as_str(), - http::method::Method::PUT, - None, - )?; - for m in metadata.iter() { - request.add_mandatory_header(&m); - } - request.add_optional_header(&self.client_request_id); - - let response = self - .queue_client - .storage_client() - .http_client() - .execute_request_check_status(&request) - .await?; - - response.try_into() - } -} diff --git a/sdk/storage_queues/src/requests/set_queue_service_properties_builder.rs b/sdk/storage_queues/src/requests/set_queue_service_properties_builder.rs deleted file mode 100644 index 4c7f079efc..0000000000 --- a/sdk/storage_queues/src/requests/set_queue_service_properties_builder.rs +++ /dev/null @@ -1,64 +0,0 @@ -use crate::responses::*; -use crate::QueueServiceProperties; -use azure_core::error::{ErrorKind, ResultExt}; -use azure_core::prelude::*; -use azure_storage::core::clients::StorageClient; -use std::convert::TryInto; - -#[derive(Debug, Clone)] -pub struct SetQueueServicePropertiesBuilder<'a> { - storage_client: &'a StorageClient, - timeout: Option, - client_request_id: Option, -} - -impl<'a> SetQueueServicePropertiesBuilder<'a> { - pub(crate) fn new(storage_client: &'a StorageClient) -> Self { - SetQueueServicePropertiesBuilder { - storage_client, - timeout: None, - client_request_id: None, - } - } - - setters! { - timeout: Timeout => Some(timeout), - client_request_id: ClientRequestId => Some(client_request_id), - } - - /// Pass the properties here. - /// More info here - /// [https://docs.microsoft.com/rest/api/storageservices/set-queue-service-properties](https://docs.microsoft.com/rest/api/storageservices/set-queue-service-properties). - pub async fn execute( - &self, - queue_service_properties: &QueueServiceProperties, - ) -> azure_core::Result { - let mut url = self - .storage_client - .storage_account_client() - .queue_storage_url() - .to_owned(); - - url.query_pairs_mut().append_pair("restype", "service"); - url.query_pairs_mut().append_pair("comp", "properties"); - self.timeout.append_to_url_query(&mut url); - - let xml_body = serde_xml_rs::to_string(&queue_service_properties) - .map_kind(ErrorKind::DataConversion)?; - - let mut request = self.storage_client.prepare_request( - url.as_str(), - http::method::Method::PUT, - Some(xml_body.into()), - )?; - request.add_optional_header(&self.client_request_id); - - let response = self - .storage_client - .http_client() - .execute_request_check_status(&request) - .await?; - - response.try_into() - } -} diff --git a/sdk/storage_queues/src/requests/update_message_builder.rs b/sdk/storage_queues/src/requests/update_message_builder.rs deleted file mode 100644 index 4fc300a585..0000000000 --- a/sdk/storage_queues/src/requests/update_message_builder.rs +++ /dev/null @@ -1,66 +0,0 @@ -use crate::clients::PopReceiptClient; -use crate::prelude::*; -use crate::responses::*; -use azure_core::prelude::*; -use std::convert::TryInto; - -#[derive(Debug, Clone)] -pub struct UpdateMessageBuilder<'a> { - pop_receipt_client: &'a PopReceiptClient, - visibility_timeout: VisibilityTimeout, - timeout: Option, - client_request_id: Option, -} - -impl<'a> UpdateMessageBuilder<'a> { - pub(crate) fn new( - pop_receipt_client: &'a PopReceiptClient, - visibility_timeout: impl Into, - ) -> Self { - UpdateMessageBuilder { - pop_receipt_client, - visibility_timeout: visibility_timeout.into(), - timeout: None, - client_request_id: None, - } - } - - setters! { - timeout: Timeout => Some(timeout), - client_request_id: ClientRequestId => Some(client_request_id), - } - - pub async fn execute( - &self, - new_body: impl AsRef, - ) -> azure_core::Result { - let mut url = self.pop_receipt_client.pop_receipt_url()?; - - self.visibility_timeout.append_to_url_query(&mut url); - self.timeout.append_to_url_query(&mut url); - - // since the format is fixed we just decorate the message with the tags. - // This could be made optional in the future and/or more - // stringent. - let message = format!( - "{}", - new_body.as_ref() - ); - - let mut request = self.pop_receipt_client.storage_client().prepare_request( - url.as_str(), - http::method::Method::PUT, - Some(message.into()), - )?; - request.add_optional_header(&self.client_request_id); - - let response = self - .pop_receipt_client - .storage_client() - .http_client() - .execute_request_check_status(&request) - .await?; - - response.try_into() - } -} diff --git a/sdk/storage_queues/src/responses/clear_messages_response.rs b/sdk/storage_queues/src/responses/clear_messages_response.rs deleted file mode 100644 index 59c1af252c..0000000000 --- a/sdk/storage_queues/src/responses/clear_messages_response.rs +++ /dev/null @@ -1,18 +0,0 @@ -use azure_core::{error::Error, CollectedResponse}; -use azure_storage::core::headers::CommonStorageResponseHeaders; -use std::convert::TryInto; - -#[derive(Debug, Clone)] -pub struct ClearMessagesResponse { - pub common_storage_response_headers: CommonStorageResponseHeaders, -} - -impl std::convert::TryFrom for ClearMessagesResponse { - type Error = Error; - - fn try_from(response: CollectedResponse) -> azure_core::Result { - Ok(ClearMessagesResponse { - common_storage_response_headers: response.headers().try_into()?, - }) - } -} diff --git a/sdk/storage_queues/src/responses/create_queue_response.rs b/sdk/storage_queues/src/responses/create_queue_response.rs deleted file mode 100644 index 256466744a..0000000000 --- a/sdk/storage_queues/src/responses/create_queue_response.rs +++ /dev/null @@ -1,18 +0,0 @@ -use azure_core::{error::Error, CollectedResponse}; -use azure_storage::core::headers::CommonStorageResponseHeaders; -use std::convert::TryInto; - -#[derive(Debug, Clone)] -pub struct CreateQueueResponse { - pub common_storage_response_headers: CommonStorageResponseHeaders, -} - -impl std::convert::TryFrom for CreateQueueResponse { - type Error = Error; - - fn try_from(response: CollectedResponse) -> azure_core::Result { - Ok(CreateQueueResponse { - common_storage_response_headers: response.headers().try_into()?, - }) - } -} diff --git a/sdk/storage_queues/src/responses/delete_message_response.rs b/sdk/storage_queues/src/responses/delete_message_response.rs deleted file mode 100644 index 6e4073aba6..0000000000 --- a/sdk/storage_queues/src/responses/delete_message_response.rs +++ /dev/null @@ -1,18 +0,0 @@ -use azure_core::{error::Error, CollectedResponse}; -use azure_storage::core::headers::CommonStorageResponseHeaders; -use std::convert::TryInto; - -#[derive(Debug, Clone)] -pub struct DeleteMessageResponse { - pub common_storage_response_headers: CommonStorageResponseHeaders, -} - -impl std::convert::TryFrom for DeleteMessageResponse { - type Error = Error; - - fn try_from(response: CollectedResponse) -> azure_core::Result { - Ok(DeleteMessageResponse { - common_storage_response_headers: response.headers().try_into()?, - }) - } -} diff --git a/sdk/storage_queues/src/responses/delete_queue_response.rs b/sdk/storage_queues/src/responses/delete_queue_response.rs deleted file mode 100644 index 0dbf3503d1..0000000000 --- a/sdk/storage_queues/src/responses/delete_queue_response.rs +++ /dev/null @@ -1,18 +0,0 @@ -use azure_core::{error::Error, CollectedResponse}; -use azure_storage::core::headers::CommonStorageResponseHeaders; -use std::convert::TryInto; - -#[derive(Debug, Clone)] -pub struct DeleteQueueResponse { - pub common_storage_response_headers: CommonStorageResponseHeaders, -} - -impl std::convert::TryFrom for DeleteQueueResponse { - type Error = Error; - - fn try_from(response: CollectedResponse) -> azure_core::Result { - Ok(DeleteQueueResponse { - common_storage_response_headers: response.headers().try_into()?, - }) - } -} diff --git a/sdk/storage_queues/src/responses/get_messages_response.rs b/sdk/storage_queues/src/responses/get_messages_response.rs deleted file mode 100644 index bfa4409bcd..0000000000 --- a/sdk/storage_queues/src/responses/get_messages_response.rs +++ /dev/null @@ -1,82 +0,0 @@ -use crate::PopReceipt; -use azure_core::error::{Error, ErrorKind, ResultExt}; -use azure_core::headers::utc_date_from_rfc2822; -use azure_core::CollectedResponse; -use azure_storage::core::headers::CommonStorageResponseHeaders; -use azure_storage::core::xml::read_xml; -use chrono::{DateTime, Utc}; -use std::convert::TryInto; - -#[derive(Debug, Clone)] -pub struct GetMessagesResponse { - pub common_storage_response_headers: CommonStorageResponseHeaders, - pub messages: Vec, -} - -#[derive(Debug, Clone)] -pub struct Message { - pub pop_receipt: PopReceipt, - pub insertion_time: DateTime, - pub expiration_time: DateTime, - pub time_next_visible: DateTime, - pub dequeue_count: u64, - pub message_text: String, -} - -impl From for PopReceipt { - fn from(message: Message) -> Self { - message.pop_receipt - } -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -struct MessageInternal { - #[serde(rename = "MessageId")] - pub message_id: String, - #[serde(rename = "InsertionTime")] - pub insertion_time: String, - #[serde(rename = "ExpirationTime")] - pub expiration_time: String, - #[serde(rename = "PopReceipt")] - pub pop_receipt: String, - #[serde(rename = "TimeNextVisible")] - pub time_next_visible: String, - #[serde(rename = "DequeueCount")] - pub dequeue_count: u64, - #[serde(rename = "MessageText")] - pub message_text: String, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -struct MessagesInternal { - #[serde(rename = "QueueMessage")] - pub messages: Option>, -} - -impl std::convert::TryFrom for GetMessagesResponse { - type Error = Error; - - fn try_from(response: CollectedResponse) -> azure_core::Result { - let headers = response.headers(); - let body = response.body(); - - let response: MessagesInternal = read_xml(body).map_kind(ErrorKind::DataConversion)?; - - let mut messages = Vec::new(); - for message in response.messages.unwrap_or_default().into_iter() { - messages.push(Message { - pop_receipt: PopReceipt::new(message.message_id, message.pop_receipt), - insertion_time: utc_date_from_rfc2822(&message.insertion_time)?, - expiration_time: utc_date_from_rfc2822(&message.expiration_time)?, - time_next_visible: utc_date_from_rfc2822(&message.time_next_visible)?, - dequeue_count: message.dequeue_count, - message_text: message.message_text, - }) - } - - Ok(GetMessagesResponse { - common_storage_response_headers: headers.try_into()?, - messages, - }) - } -} diff --git a/sdk/storage_queues/src/responses/get_queue_acl_response.rs b/sdk/storage_queues/src/responses/get_queue_acl_response.rs deleted file mode 100644 index 8de0288fd4..0000000000 --- a/sdk/storage_queues/src/responses/get_queue_acl_response.rs +++ /dev/null @@ -1,34 +0,0 @@ -use crate::QueueStoredAccessPolicy; -use azure_core::error::{Error, ErrorKind, ResultExt}; -use azure_core::CollectedResponse; -use azure_storage::core::headers::CommonStorageResponseHeaders; -use azure_storage::StoredAccessPolicyList; -use std::convert::TryInto; - -#[derive(Debug, Clone)] -pub struct GetQueueACLResponse { - pub common_storage_response_headers: CommonStorageResponseHeaders, - pub stored_access_policies: Vec, -} - -impl std::convert::TryFrom for GetQueueACLResponse { - type Error = Error; - - fn try_from(response: CollectedResponse) -> azure_core::Result { - let headers = response.headers(); - let body = response.body(); - - let a: azure_core::Result> = - StoredAccessPolicyList::from_xml(body) - .map_kind(ErrorKind::DataConversion)? - .stored_access - .into_iter() - .map(|sap| sap.try_into().map_kind(ErrorKind::DataConversion)) - .collect(); - - Ok(GetQueueACLResponse { - common_storage_response_headers: headers.try_into()?, - stored_access_policies: a?, - }) - } -} diff --git a/sdk/storage_queues/src/responses/get_queue_metadata_response.rs b/sdk/storage_queues/src/responses/get_queue_metadata_response.rs deleted file mode 100644 index cf92c21e1b..0000000000 --- a/sdk/storage_queues/src/responses/get_queue_metadata_response.rs +++ /dev/null @@ -1,25 +0,0 @@ -use azure_core::error::Error; -use azure_core::{prelude::*, CollectedResponse}; -use azure_storage::core::headers::CommonStorageResponseHeaders; -use std::convert::TryInto; - -#[derive(Debug, Clone)] -pub struct GetQueueMetadataResponse { - pub common_storage_response_headers: CommonStorageResponseHeaders, - pub metadata: Metadata, -} - -impl std::convert::TryFrom for GetQueueMetadataResponse { - type Error = Error; - - fn try_from(response: CollectedResponse) -> azure_core::Result { - let headers = response.headers(); - - debug!("headers == {:?}", headers); - - Ok(GetQueueMetadataResponse { - common_storage_response_headers: headers.try_into()?, - metadata: headers.into(), - }) - } -} diff --git a/sdk/storage_queues/src/responses/get_queue_service_properties_response.rs b/sdk/storage_queues/src/responses/get_queue_service_properties_response.rs deleted file mode 100644 index 0ada519817..0000000000 --- a/sdk/storage_queues/src/responses/get_queue_service_properties_response.rs +++ /dev/null @@ -1,29 +0,0 @@ -use crate::QueueServiceProperties; -use azure_core::error::{Error, ErrorKind, ResultExt}; -use azure_core::CollectedResponse; -use azure_storage::core::headers::CommonStorageResponseHeaders; -use azure_storage::core::xml::read_xml; -use std::convert::TryInto; - -#[derive(Debug, Clone)] -pub struct GetQueueServicePropertiesResponse { - pub common_storage_response_headers: CommonStorageResponseHeaders, - pub queue_service_properties: QueueServiceProperties, -} - -impl std::convert::TryFrom for GetQueueServicePropertiesResponse { - type Error = Error; - - fn try_from(response: CollectedResponse) -> azure_core::Result { - let headers = response.headers(); - let body = response.body(); - - let queue_service_properties: QueueServiceProperties = - read_xml(body).map_kind(ErrorKind::DataConversion)?; - - Ok(GetQueueServicePropertiesResponse { - common_storage_response_headers: headers.try_into()?, - queue_service_properties, - }) - } -} diff --git a/sdk/storage_queues/src/responses/get_queue_service_stats_response.rs b/sdk/storage_queues/src/responses/get_queue_service_stats_response.rs deleted file mode 100644 index b81d550d7e..0000000000 --- a/sdk/storage_queues/src/responses/get_queue_service_stats_response.rs +++ /dev/null @@ -1,63 +0,0 @@ -use azure_core::{ - error::{Error, ErrorKind, ResultExt}, - CollectedResponse, -}; -use azure_storage::core::headers::CommonStorageResponseHeaders; -use azure_storage::core::xml::read_xml; -use chrono::{DateTime, Utc}; -use std::convert::TryInto; - -#[derive(Debug, Clone, Deserialize)] -#[serde(rename_all = "camelCase")] -pub enum Status { - Live, - Bootstrap, - Unavailable, -} - -#[derive(Debug, Clone)] -pub struct GetQueueServiceStatsResponse { - pub common_storage_response_headers: CommonStorageResponseHeaders, - pub status: Status, - pub last_sync_time: Option>, -} - -#[derive(Debug, Clone, Deserialize)] -#[serde(rename_all = "PascalCase")] -struct GetQueueServiceStatsResponseInternal { - pub geo_replication: GeoReplication, -} - -#[derive(Debug, Clone, Deserialize)] -#[serde(rename_all = "PascalCase")] -struct GeoReplication { - pub status: Status, - pub last_sync_time: Option, -} - -impl std::convert::TryFrom for GetQueueServiceStatsResponse { - type Error = Error; - - fn try_from(response: CollectedResponse) -> azure_core::Result { - let headers = response.headers(); - let body = response.body(); - - debug!("headers == {:?}", headers); - debug!("body == {:#?}", body); - let response: GetQueueServiceStatsResponseInternal = - read_xml(body).map_kind(ErrorKind::DataConversion)?; - debug!("deserde == {:#?}", response); - - Ok(GetQueueServiceStatsResponse { - common_storage_response_headers: headers.try_into()?, - status: response.geo_replication.status, - last_sync_time: response - .geo_replication - .last_sync_time - .map(|t| DateTime::parse_from_rfc2822(&t)) - .transpose() - .context(ErrorKind::DataConversion, "failed to parse last sync time")? - .map(|t| DateTime::from_utc(t.naive_utc(), Utc)), - }) - } -} diff --git a/sdk/storage_queues/src/responses/list_queues_response.rs b/sdk/storage_queues/src/responses/list_queues_response.rs deleted file mode 100644 index 9e56f851f3..0000000000 --- a/sdk/storage_queues/src/responses/list_queues_response.rs +++ /dev/null @@ -1,100 +0,0 @@ -use azure_core::error::{Error, ErrorKind, ResultExt}; -use azure_core::{prelude::*, CollectedResponse}; -use azure_storage::core::headers::CommonStorageResponseHeaders; -use azure_storage::xml::read_xml; -use std::convert::TryInto; - -#[derive(Debug, Clone)] -pub struct ListQueuesResponse { - pub common_storage_response_headers: CommonStorageResponseHeaders, - pub service_endpoint: String, - pub prefix: Option, - // this seems duplicate :S - pub marker: Option, - pub max_results: Option, - pub queues: Vec, - pub next_marker: Option, -} - -impl ListQueuesResponse { - pub fn next_marker(&self) -> &Option { - &self.next_marker - } -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -struct ListQueuesResponseInternal { - #[serde(rename = "ServiceEndpoint")] - pub service_endpoint: String, - #[serde(rename = "Prefix")] - pub prefix: Option, - #[serde(rename = "Marker")] - pub marker: Option, - #[serde(rename = "MaxResults")] - pub max_results: Option, - - #[serde(rename = "Queues")] - pub queues: Queues, - - #[serde(rename = "NextMarker")] - pub next_marker: Option, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct Queues { - #[serde(rename = "Queue")] - pub queues: Option>, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct Queue { - #[serde(rename = "Name")] - pub name: String, - #[serde(rename = "Metadata")] - pub metadata: Option>, -} - -impl std::convert::TryFrom for ListQueuesResponse { - type Error = Error; - fn try_from(response: CollectedResponse) -> azure_core::Result { - let headers = response.headers(); - let body = response.body(); - - debug!("headers == {:?}", headers); - debug!("body == {:#?}", body); - let mut response: ListQueuesResponseInternal = - read_xml(body).map_kind(ErrorKind::DataConversion)?; - - // get rid of the ugly Some("") empty string - // we use None instead - if let Some(next_marker) = &response.next_marker { - if next_marker.is_empty() { - response.next_marker = None; - } - } - - Ok(ListQueuesResponse { - common_storage_response_headers: headers.try_into()?, - service_endpoint: response.service_endpoint, - prefix: response.prefix, - marker: response.marker, - max_results: response.max_results, - queues: response.queues.queues.unwrap_or_default(), - next_marker: response.next_marker.map(|nm| nm.into()), - }) - } -} - -#[cfg(test)] -mod test { - use super::*; - - #[test] - fn try_parse() { - let range = "a2azureiscoolazurerocks"; - - let response: ListQueuesResponseInternal = serde_xml_rs::from_str(range).unwrap(); - - assert_eq!(response.queues.queues.unwrap().len(), 2); - } -} diff --git a/sdk/storage_queues/src/responses/mod.rs b/sdk/storage_queues/src/responses/mod.rs deleted file mode 100644 index b79cef1786..0000000000 --- a/sdk/storage_queues/src/responses/mod.rs +++ /dev/null @@ -1,32 +0,0 @@ -mod clear_messages_response; -mod create_queue_response; -mod delete_message_response; -mod delete_queue_response; -mod get_messages_response; -mod get_queue_acl_response; -mod get_queue_metadata_response; -mod get_queue_service_properties_response; -mod get_queue_service_stats_response; -mod list_queues_response; -mod peek_messages_response; -mod put_message_response; -mod set_queue_acl_response; -mod set_queue_metadata_response; -mod set_queue_service_properties_response; -mod update_message_response; -pub use clear_messages_response::ClearMessagesResponse; -pub use create_queue_response::CreateQueueResponse; -pub use delete_message_response::DeleteMessageResponse; -pub use delete_queue_response::DeleteQueueResponse; -pub use get_messages_response::GetMessagesResponse; -pub use get_queue_acl_response::GetQueueACLResponse; -pub use get_queue_metadata_response::GetQueueMetadataResponse; -pub use get_queue_service_properties_response::GetQueueServicePropertiesResponse; -pub use get_queue_service_stats_response::GetQueueServiceStatsResponse; -pub use list_queues_response::ListQueuesResponse; -pub use peek_messages_response::PeekMessagesResponse; -pub use put_message_response::PutMessageResponse; -pub use set_queue_acl_response::SetQueueACLResponse; -pub use set_queue_metadata_response::SetQueueMetadataResponse; -pub use set_queue_service_properties_response::SetQueueServicePropertiesResponse; -pub use update_message_response::UpdateMessageResponse; diff --git a/sdk/storage_queues/src/responses/peek_messages_response.rs b/sdk/storage_queues/src/responses/peek_messages_response.rs deleted file mode 100644 index 0b88398687..0000000000 --- a/sdk/storage_queues/src/responses/peek_messages_response.rs +++ /dev/null @@ -1,69 +0,0 @@ -use azure_core::error::{Error, ErrorKind, ResultExt}; -use azure_core::headers::utc_date_from_rfc2822; -use azure_core::CollectedResponse; -use azure_storage::core::headers::CommonStorageResponseHeaders; -use azure_storage::core::xml::read_xml; -use chrono::{DateTime, Utc}; -use std::convert::TryInto; - -#[derive(Debug, Clone)] -pub struct PeekMessagesResponse { - pub common_storage_response_headers: CommonStorageResponseHeaders, - pub messages: Vec, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -struct PeekMessageInternal { - #[serde(rename = "MessageId")] - pub message_id: String, - #[serde(rename = "InsertionTime")] - pub insertion_time: String, - #[serde(rename = "ExpirationTime")] - pub expiration_time: String, - #[serde(rename = "DequeueCount")] - pub dequeue_count: u64, - #[serde(rename = "MessageText")] - pub message_text: String, -} - -#[derive(Debug, Clone)] -pub struct PeekMessage { - pub message_id: String, - pub insertion_time: DateTime, - pub expiration_time: DateTime, - pub dequeue_count: u64, - pub message_text: String, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -struct PeekMessagesInternal { - #[serde(rename = "QueueMessage")] - pub messages: Option>, -} - -impl std::convert::TryFrom for PeekMessagesResponse { - type Error = Error; - - fn try_from(response: CollectedResponse) -> azure_core::Result { - let headers = response.headers(); - let body = response.body(); - - let response: PeekMessagesInternal = read_xml(body).map_kind(ErrorKind::DataConversion)?; - - let mut messages = Vec::new(); - for message in response.messages.unwrap_or_default().into_iter() { - messages.push(PeekMessage { - message_id: message.message_id, - insertion_time: utc_date_from_rfc2822(&message.insertion_time)?, - expiration_time: utc_date_from_rfc2822(&message.expiration_time)?, - dequeue_count: message.dequeue_count, - message_text: message.message_text, - }) - } - - Ok(PeekMessagesResponse { - common_storage_response_headers: headers.try_into()?, - messages, - }) - } -} diff --git a/sdk/storage_queues/src/responses/put_message_response.rs b/sdk/storage_queues/src/responses/put_message_response.rs deleted file mode 100644 index fc32ccd45c..0000000000 --- a/sdk/storage_queues/src/responses/put_message_response.rs +++ /dev/null @@ -1,67 +0,0 @@ -use azure_core::error::{Error, ErrorKind, ResultExt}; -use azure_core::headers::utc_date_from_rfc2822; -use azure_core::CollectedResponse; -use azure_storage::core::headers::CommonStorageResponseHeaders; -use azure_storage::xml::read_xml; -use chrono::{DateTime, Utc}; -use std::convert::TryInto; - -#[derive(Debug, Clone)] -pub struct PutMessageResponse { - pub common_storage_response_headers: CommonStorageResponseHeaders, - pub queue_message: QueueMessage, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -struct PutMessageResponseInternal { - #[serde(rename = "QueueMessage")] - pub queue_message: QueueMessageInternal, -} - -#[derive(Debug, Clone)] -pub struct QueueMessage { - pub message_id: String, - pub insertion_time: DateTime, - pub expiration_time: DateTime, - pub pop_receipt: String, - pub time_next_visible: DateTime, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -struct QueueMessageInternal { - #[serde(rename = "MessageId")] - pub message_id: String, - #[serde(rename = "InsertionTime")] - pub insertion_time: String, - #[serde(rename = "ExpirationTime")] - pub expiration_time: String, - #[serde(rename = "PopReceipt")] - pub pop_receipt: String, - #[serde(rename = "TimeNextVisible")] - pub time_next_visible: String, -} - -impl std::convert::TryFrom for PutMessageResponse { - type Error = Error; - fn try_from(response: CollectedResponse) -> azure_core::Result { - let headers = response.headers(); - let body = response.body(); - - let response: PutMessageResponseInternal = - read_xml(body).map_kind(ErrorKind::DataConversion)?; - let queue_message = response.queue_message; - - let queue_message = QueueMessage { - message_id: queue_message.message_id, - insertion_time: utc_date_from_rfc2822(&queue_message.insertion_time)?, - expiration_time: utc_date_from_rfc2822(&queue_message.expiration_time)?, - pop_receipt: queue_message.pop_receipt, - time_next_visible: utc_date_from_rfc2822(&queue_message.time_next_visible)?, - }; - - Ok(Self { - common_storage_response_headers: headers.try_into()?, - queue_message, - }) - } -} diff --git a/sdk/storage_queues/src/responses/set_queue_acl_response.rs b/sdk/storage_queues/src/responses/set_queue_acl_response.rs deleted file mode 100644 index 9f7be17dfd..0000000000 --- a/sdk/storage_queues/src/responses/set_queue_acl_response.rs +++ /dev/null @@ -1,18 +0,0 @@ -use azure_core::{error::Error, CollectedResponse}; -use azure_storage::core::headers::CommonStorageResponseHeaders; -use std::convert::TryInto; - -#[derive(Debug, Clone)] -pub struct SetQueueACLResponse { - pub common_storage_response_headers: CommonStorageResponseHeaders, -} - -impl std::convert::TryFrom for SetQueueACLResponse { - type Error = Error; - - fn try_from(response: CollectedResponse) -> azure_core::Result { - Ok(SetQueueACLResponse { - common_storage_response_headers: response.headers().try_into()?, - }) - } -} diff --git a/sdk/storage_queues/src/responses/set_queue_metadata_response.rs b/sdk/storage_queues/src/responses/set_queue_metadata_response.rs deleted file mode 100644 index 75e0956660..0000000000 --- a/sdk/storage_queues/src/responses/set_queue_metadata_response.rs +++ /dev/null @@ -1,18 +0,0 @@ -use azure_core::{error::Error, CollectedResponse}; -use azure_storage::core::headers::CommonStorageResponseHeaders; -use std::convert::TryInto; - -#[derive(Debug, Clone)] -pub struct SetQueueMetadataResponse { - pub common_storage_response_headers: CommonStorageResponseHeaders, -} - -impl std::convert::TryFrom for SetQueueMetadataResponse { - type Error = Error; - - fn try_from(response: CollectedResponse) -> azure_core::Result { - Ok(SetQueueMetadataResponse { - common_storage_response_headers: response.headers().try_into()?, - }) - } -} diff --git a/sdk/storage_queues/src/responses/set_queue_service_properties_response.rs b/sdk/storage_queues/src/responses/set_queue_service_properties_response.rs deleted file mode 100644 index 8eda87f11d..0000000000 --- a/sdk/storage_queues/src/responses/set_queue_service_properties_response.rs +++ /dev/null @@ -1,18 +0,0 @@ -use azure_core::{error::Error, CollectedResponse}; -use azure_storage::core::headers::CommonStorageResponseHeaders; -use std::convert::TryInto; - -#[derive(Debug, Clone)] -pub struct SetQueueServicePropertiesResponse { - pub common_storage_response_headers: CommonStorageResponseHeaders, -} - -impl std::convert::TryFrom for SetQueueServicePropertiesResponse { - type Error = Error; - - fn try_from(response: CollectedResponse) -> azure_core::Result { - Ok(SetQueueServicePropertiesResponse { - common_storage_response_headers: response.headers().try_into()?, - }) - } -} diff --git a/sdk/storage_queues/src/responses/update_message_response.rs b/sdk/storage_queues/src/responses/update_message_response.rs deleted file mode 100644 index ad22848f7a..0000000000 --- a/sdk/storage_queues/src/responses/update_message_response.rs +++ /dev/null @@ -1,32 +0,0 @@ -use azure_core::error::Error; -use azure_core::headers::{get_str_from_headers, rfc2822_from_headers_mandatory, HeaderName}; -use azure_core::CollectedResponse; -use azure_storage::core::headers::CommonStorageResponseHeaders; -use chrono::{DateTime, Utc}; -use std::convert::TryInto; - -#[derive(Debug, Clone)] -pub struct UpdateMessageResponse { - pub common_storage_response_headers: CommonStorageResponseHeaders, - pub time_next_visible: DateTime, - pub pop_receipt: String, -} - -impl std::convert::TryFrom for UpdateMessageResponse { - type Error = Error; - - fn try_from(response: CollectedResponse) -> azure_core::Result { - Ok(UpdateMessageResponse { - common_storage_response_headers: response.headers().try_into()?, - time_next_visible: rfc2822_from_headers_mandatory( - response.headers(), - &HeaderName::from_static("x-ms-time-next-visible"), - )?, - pop_receipt: get_str_from_headers( - response.headers(), - &HeaderName::from_static("x-ms-popreceipt"), - )? - .to_owned(), - }) - } -} diff --git a/sdk/storage_queues/tests/queue.rs b/sdk/storage_queues/tests/queue.rs index ab1f634159..8e7609704c 100644 --- a/sdk/storage_queues/tests/queue.rs +++ b/sdk/storage_queues/tests/queue.rs @@ -3,7 +3,9 @@ use azure_core::prelude::*; use azure_storage::core::prelude::*; use azure_storage_queues::prelude::*; use chrono::Utc; +use futures::StreamExt; use std::time::Duration; +use uuid::Uuid; #[tokio::test] async fn queue_create_put_and_get() -> azure_core::Result<()> { @@ -12,17 +14,21 @@ async fn queue_create_put_and_get() -> azure_core::Result<()> { let access_key = std::env::var("STORAGE_ACCESS_KEY").expect("Set env variable STORAGE_ACCESS_KEY first!"); - let queue_name = "rustazuree2e"; + let queue_name = format!("sdk-{}", Uuid::new_v4()); + let http_client = azure_core::new_http_client(); let storage_account_client = StorageAccountClient::new_access_key(http_client.clone(), &account, &access_key); + + let queue_service_client = storage_account_client.queue_service_client(); + + println!("creating queue {}", queue_name); + let queue = storage_account_client .storage_client() .queue_client(queue_name); - println!("creating queue {}", queue_name); - // this step is optional but here we show // how to add metadata to a new queue. let mut metadata = Metadata::new(); @@ -33,7 +39,11 @@ async fn queue_create_put_and_get() -> azure_core::Result<()> { .as_mut() .insert("created".into(), format!("{:?}", Utc::now()).into()); - let response = queue.create().metadata(&metadata).execute().await?; + let response = queue + .create() + .metadata(metadata.clone()) + .into_future() + .await?; println!("response == {:#?}", response); // let's add some more metadata @@ -42,16 +52,15 @@ async fn queue_create_put_and_get() -> azure_core::Result<()> { println!("metadata == {:#?}", metadata); - let response = queue.set_metadata().execute(&metadata).await?; + let response = queue.set_metadata(metadata).into_future().await?; println!("response == {:#?}", response); // let's get back the metadata - let response = queue.get_metadata().execute().await?; + let response = queue.get_metadata().into_future().await?; println!("response == {:#?}", response); // create two queue stored access policies - let mut queue_stored_acess_policies = Vec::new(); - queue_stored_acess_policies.push( + let policies = vec![ QueueStoredAccessPolicy::new( "first_sap_read_process", Utc::now() - chrono::Duration::hours(1), @@ -59,33 +68,33 @@ async fn queue_create_put_and_get() -> azure_core::Result<()> { ) .enable_read() .enable_process(), - ); - queue_stored_acess_policies.push( QueueStoredAccessPolicy::new( "sap_admin", Utc::now() - chrono::Duration::hours(1), Utc::now() + chrono::Duration::hours(5), ) .enable_all(), - ); + ]; - let response = queue - .set_acl() - .execute(&queue_stored_acess_policies) - .await?; + let response = queue.set_acl(policies).into_future().await?; println!("response == {:#?}", response); // get the queue ACL - let response = queue.get_acl().execute().await?; + let response = queue.get_acl().into_future().await?; println!("response == {:#?}", response); + let mut stream = queue_service_client.list_queues().into_stream(); + while let Some(entry) = stream.next().await { + let entry = entry?; + println!("entry == {:#?}", entry); + } + for i in 0u32..5 { println!("putting message {}", i); let response = queue - .put_message() - .client_request_id("optional correlation token") - .execute(format!("Azure SDK for Rust rocks! {}", chrono::Utc::now())) + .put_message(format!("Azure SDK for Rust {}", chrono::Utc::now())) + .into_future() .await?; println!("response == {:#?}", response); @@ -95,7 +104,7 @@ async fn queue_create_put_and_get() -> azure_core::Result<()> { .get_messages() .number_of_messages(2) .visibility_timeout(Duration::from_secs(10)) - .execute() + .into_future() .await?; println!("get_messages_response == {:#?}", get_messages_response); @@ -103,8 +112,11 @@ async fn queue_create_put_and_get() -> azure_core::Result<()> { let pop_receipt = queue.pop_receipt_client(message_to_update); let response = pop_receipt - .update(Duration::from_secs(4)) - .execute(format!("new body at {}", chrono::Utc::now())) + .update( + format!("new body at {}", chrono::Utc::now()), + Duration::from_secs(4), + ) + .into_future() .await?; println!("response == {:#?}", response); } @@ -113,7 +125,7 @@ async fn queue_create_put_and_get() -> azure_core::Result<()> { .get_messages() .number_of_messages(2) .visibility_timeout(Duration::from_secs(5)) - .execute() + .into_future() .await?; println!("get_response == {:#?}", get_response); @@ -124,18 +136,14 @@ async fn queue_create_put_and_get() -> azure_core::Result<()> { let delete_response = queue .pop_receipt_client(message_to_delete) .delete() - .execute() + .into_future() .await?; println!("delete_response == {:#?}", delete_response); } // now let's delete the queue - let response = queue - .delete() - .client_request_id("myclientid") - .execute() - .await?; + let response = queue.delete().into_future().await?; println!("response == {:#?}", response); Ok(())