Skip to content

move storage queues to pipeline architecture #851

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sdk/storage/src/core/clients/storage_account_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ impl std::fmt::Debug for StorageCredentials {
#[derive(Debug, Clone, Copy)]
pub enum ServiceType {
Blob,
// Queue,
Queue,
// File,
Table,
}
Expand Down
1 change: 1 addition & 0 deletions sdk/storage_queues/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ url = "2.2"

[dev-dependencies]
tokio = { version = "1.0", features = ["full"] }
uuid = { version = "1.0", features = ["v4"] }

[features]
default = ["enable_reqwest"]
Expand Down
4 changes: 2 additions & 2 deletions sdk/storage_queues/examples/delete_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ async fn main() -> azure_core::Result<()> {
.get_messages()
.number_of_messages(2)
.visibility_timeout(Duration::from_secs(5)) // the message will become visible again after 5 secs
.execute()
.into_future()
.await?;

println!("get_response == {:#?}", get_response);
Expand All @@ -44,7 +44,7 @@ async fn main() -> azure_core::Result<()> {
let delete_response = queue
.pop_receipt_client(message_to_delete)
.delete()
.execute()
.into_future()
.await?;

println!("delete_response == {:#?}", delete_response);
Expand Down
11 changes: 7 additions & 4 deletions sdk/storage_queues/examples/get_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ async fn main() -> azure_core::Result<()> {
.get_messages()
.number_of_messages(2)
.visibility_timeout(Duration::from_secs(5)) // the message will become visible again after 5 secs
.execute()
.into_future()
.await?;

println!("response == {:#?}", response);
Expand All @@ -39,7 +39,7 @@ async fn main() -> azure_core::Result<()> {
.get_messages()
.number_of_messages(2)
.visibility_timeout(Duration::from_secs(10)) // the message will become visible again after 10 secs
.execute()
.into_future()
.await?;
println!("get_messages_response == {:#?}", get_messages_response);

Expand All @@ -50,8 +50,11 @@ async fn main() -> azure_core::Result<()> {
let pop_receipt = queue.pop_receipt_client(message_to_update);

let response = pop_receipt
.update(Duration::from_secs(4))
.execute(format!("new body at {}", chrono::Utc::now()))
.update(
format!("new body at {}", chrono::Utc::now()),
Duration::from_secs(4),
)
.into_future()
.await?;
println!("response == {:#?}", response);
}
Expand Down
22 changes: 12 additions & 10 deletions sdk/storage_queues/examples/list_queues.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@ async fn main() -> azure_core::Result<()> {
let queue_service = storage_account.queue_service_client();

println!("getting service stats");
let response = queue_service.get_queue_service_stats().execute().await?;
let response = queue_service
.get_queue_service_stats()
.into_future()
.await?;
println!("get_queue_service_properties.response == {:#?}", response);

println!("getting service properties");
let response = queue_service
.get_queue_service_properties()
.execute()
.into_future()
.await?;
println!("get_queue_service_stats.response == {:#?}", response);

Expand All @@ -35,17 +38,16 @@ async fn main() -> azure_core::Result<()> {
.prefix("a")
.include_metadata(true)
.max_results(NonZeroU32::new(2u32).unwrap())
.execute()
.await?;
.into_stream()
.next()
.await;
println!("response == {:#?}", response);

println!("streaming queues");
let mut stream = Box::pin(
queue_service
.list_queues()
.max_results(NonZeroU32::new(3u32).unwrap())
.stream(),
);
let mut stream = queue_service
.list_queues()
.max_results(NonZeroU32::new(3u32).unwrap())
.into_stream();

while let Some(value) = stream.next().await {
let value = value?;
Expand Down
2 changes: 1 addition & 1 deletion sdk/storage_queues/examples/peek_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ async fn main() -> azure_core::Result<()> {
let response = queue
.peek_messages()
.number_of_messages(2)
.execute()
.into_future()
.await?;

println!("response == {:#?}", response);
Expand Down
5 changes: 2 additions & 3 deletions sdk/storage_queues/examples/put_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,8 @@ async fn main() -> azure_core::Result<()> {

trace!("putting message");
let response = queue
.put_message()
.client_request_id("optional correlation token")
.execute(format!("Azure SDK for Rust rocks! {}", chrono::Utc::now()))
.put_message(format!("Azure SDK for Rust rocks! {}", chrono::Utc::now()))
.into_future()
.await?;

println!("response == {:#?}", response);
Expand Down
25 changes: 11 additions & 14 deletions sdk/storage_queues/examples/queue_create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,11 @@ async fn main() -> azure_core::Result<()> {
.as_mut()
.insert("created".into(), format!("{:?}", Utc::now()).into());

let response = queue.create().metadata(&metadata).execute().await?;
let response = queue
.create()
.metadata(metadata.clone())
.into_future()
.await?;
println!("response == {:#?}", response);

// let's add some more metadata
Expand All @@ -46,15 +50,15 @@ async fn main() -> azure_core::Result<()> {

println!("metadata == {:#?}", metadata);

let response = queue.set_metadata().execute(&metadata).await?;
let response = queue.set_metadata(metadata).into_future().await?;
println!("response == {:#?}", response);

// let's get back the metadata
let response = queue.get_metadata().execute().await?;
let response = queue.get_metadata().into_future().await?;
println!("response == {:#?}", response);

// use two queue stored access policies
let queue_stored_acess_policies = vec![
let policies = vec![
QueueStoredAccessPolicy::new(
"first_sap_read_process",
Utc::now() - Duration::hours(1),
Expand All @@ -70,22 +74,15 @@ async fn main() -> azure_core::Result<()> {
.enable_all(),
];

let response = queue
.set_acl()
.execute(&queue_stored_acess_policies)
.await?;
let response = queue.set_acl(policies).into_future().await?;
println!("response == {:#?}", response);

// get the queue ACL
let response = queue.get_acl().execute().await?;
let response = queue.get_acl().into_future().await?;
println!("response == {:#?}", response);

// now let's delete it
let response = queue
.delete()
.client_request_id("myclientid")
.execute()
.await?;
let response = queue.delete().into_future().await?;
println!("response == {:#?}", response);

Ok(())
Expand Down
31 changes: 21 additions & 10 deletions sdk/storage_queues/src/clients/pop_receipt_client.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::prelude::*;
use crate::requests::*;
use crate::{operations::*, prelude::*};
use azure_core::{Context, Request, Response};
use azure_storage::core::clients::StorageClient;
use std::sync::Arc;

Expand Down Expand Up @@ -39,6 +39,14 @@ impl PopReceiptClient {
})
}

pub(crate) async fn send(
&self,
context: &mut Context,
request: &mut Request,
) -> azure_core::Result<Response> {
self.queue_client.send(context, request).await
}

pub(crate) fn storage_client(&self) -> &StorageClient {
self.queue_client.storage_client()
}
Expand All @@ -56,16 +64,19 @@ impl PopReceiptClient {
Ok(url)
}

/// Deletes the message. The message must not have been
/// made visible again or this call would fail.
/// Deletes the message. The message must not have been made visible again
/// or this call would fail.
Comment on lines +67 to +68
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// Deletes the message. The message must not have been made visible again
/// or this call would fail.
/// Deletes the message.
///
/// The message must not have been made visible again or this call would fail.

pub fn delete(&self) -> DeleteMessageBuilder {
DeleteMessageBuilder::new(self)
DeleteMessageBuilder::new(self.clone())
}

/// Updates the message.
/// The message must not have been
/// made visible again or this call would fail.
pub fn update(&self, visibility_timeout: impl Into<VisibilityTimeout>) -> UpdateMessageBuilder {
UpdateMessageBuilder::new(self, visibility_timeout)
/// Updates the message. The message must not have been made visible again
/// or this call would fail.
Comment on lines +73 to +74
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// Updates the message. The message must not have been made visible again
/// or this call would fail.
/// Updates the message.
///
/// The message must not have been made visible again or this call would fail.

Also, it might be nice to place this above delete. In Cosmos, we generally follow the order: get, create, update, delete for ordering operations.

pub fn update(
&self,
body: impl Into<String>,
visibility_timeout: impl Into<VisibilityTimeout>,
) -> UpdateMessageBuilder {
UpdateMessageBuilder::new(self.clone(), body.into(), visibility_timeout.into())
}
}
74 changes: 49 additions & 25 deletions sdk/storage_queues/src/clients/queue_client.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
use crate::requests::*;
use azure_core::error::{ErrorKind, ResultExt};
use azure_storage::core::clients::{AsStorageClient, StorageAccountClient, StorageClient};
use std::fmt::Debug;
use std::sync::Arc;
use crate::{operations::*, QueueStoredAccessPolicy};
use azure_core::{
error::{ErrorKind, ResultExt},
prelude::*,
Context, Request, Response,
};
use azure_storage::core::clients::{
AsStorageClient, ServiceType, StorageAccountClient, StorageClient,
};
use std::{fmt::Debug, sync::Arc};

pub trait AsQueueClient<QN: Into<String>> {
fn queue_client(&self, queue_name: QN) -> Arc<QueueClient>;
Expand Down Expand Up @@ -34,6 +39,16 @@ impl QueueClient {
})
}

pub(crate) async fn send(
&self,
context: &mut Context,
request: &mut Request,
) -> azure_core::Result<Response> {
self.storage_client
.send(context, request, ServiceType::Queue)
.await
}

pub(crate) fn storage_client(&self) -> &StorageClient {
self.storage_client.as_ref()
}
Expand All @@ -53,68 +68,77 @@ impl QueueClient {

/// Creates the queue.
pub fn create(&self) -> CreateQueueBuilder {
CreateQueueBuilder::new(self)
CreateQueueBuilder::new(self.clone())
}

/// Deletes the queue.
pub fn delete(&self) -> DeleteQueueBuilder {
DeleteQueueBuilder::new(self)
DeleteQueueBuilder::new(self.clone())
}

/// Sets or clears the queue metadata. The metadata
/// will be passed to the `execute` function of the returned struct.
pub fn set_metadata(&self) -> SetQueueMetadataBuilder {
SetQueueMetadataBuilder::new(self)
/// Sets or clears the queue metadata.
///
/// Keep in mind that keys present on Azure but not included in the passed
/// metadata parameter will be deleted. If you want to keep the preexisting
/// key-value pairs, retrieve them with GetMetadata first and then
/// update/add to the received Metadata struct. Then pass the Metadata back
/// to SetQueueMetadata. If you just want to clear the metadata, just pass
/// an empty Metadata struct.
pub fn set_metadata(&self, metadata: Metadata) -> SetQueueMetadataBuilder {
SetQueueMetadataBuilder::new(self.clone(), metadata)
}

/// Get the queue metadata.

pub fn get_metadata(&self) -> GetQueueMetadataBuilder {
GetQueueMetadataBuilder::new(self)
GetQueueMetadataBuilder::new(self.clone())
}

/// Get the queue ACL. This call returns
/// all the stored access policies associated
/// to the current queue.
pub fn get_acl(&self) -> GetQueueACLBuilder {
GetQueueACLBuilder::new(self)
GetQueueACLBuilder::new(self.clone())
}

/// Set the queue ACL. You can call this function
/// to change or remove already existing stored
/// access policies by modifying the list returned
/// Set the queue ACL. You can call this function to change or remove
/// already existing stored access policies by modifying the list returned
/// by `get_acl`.
pub fn set_acl(&self) -> SetQueueACLBuilder {
SetQueueACLBuilder::new(self)
///
/// While this SDK does not enforce any limit, keep in mind Azure supports a
/// limited number of stored access policies for each queue. More info here
/// [https://docs.microsoft.com/rest/api/storageservices/set-queue-acl#remarks](https://docs.microsoft.com/rest/api/storageservices/set-queue-acl#remarks).
pub fn set_acl(&self, policies: Vec<QueueStoredAccessPolicy>) -> SetQueueACLBuilder {
SetQueueACLBuilder::new(self.clone(), policies)
}

/// Puts a message in the queue. The body will be passed
/// to the `execute` function of the returned struct.
Comment on lines 115 to 116
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// Puts a message in the queue. The body will be passed
/// to the `execute` function of the returned struct.
/// Puts a message in the queue.
///
/// The body will be passed to the `execute` function of the returned struct.

pub fn put_message(&self) -> PutMessageBuilder {
PutMessageBuilder::new(self)
pub fn put_message<S: Into<String>>(&self, message: S) -> PutMessageBuilder {
PutMessageBuilder::new(self.clone(), message.into())
}

/// Peeks, without removing, one or more messages.
pub fn peek_messages(&self) -> PeekMessagesBuilder {
PeekMessagesBuilder::new(self)
PeekMessagesBuilder::new(self.clone())
}

/// Gets, shadowing them, one or more messages.
pub fn get_messages(&self) -> GetMessagesBuilder {
GetMessagesBuilder::new(self)
GetMessagesBuilder::new(self.clone())
}

/// Removes all messages from the queue.
pub fn clear_messages(&self) -> ClearMessagesBuilder {
ClearMessagesBuilder::new(self)
ClearMessagesBuilder::new(self.clone())
}
}

#[cfg(test)]
#[cfg(feature = "test_integration")]
mod integration_tests {
use super::*;
use crate::core::prelude::*;
use crate::queue::clients::AsQueueClient;
use crate::{core::prelude::*, queue::clients::AsQueueClient};

fn get_emulator_client(queue_name: &str) -> Arc<QueueClient> {
let storage_account = StorageAccountClient::new_emulator_default().storage_client();
Expand Down
Loading