Skip to content

Commit 8b91537

Browse files
author
Marek Pavelka
authored
add service bus topic support (#1155)
1 parent 9c37953 commit 8b91537

File tree

8 files changed

+255
-32
lines changed

8 files changed

+255
-32
lines changed

sdk/messaging_servicebus/examples/service_bus00.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ async fn main() {
1616

1717
let http_client = azure_core::new_http_client();
1818

19-
let client = Client::new(
19+
let client = QueueClient::new(
2020
http_client,
2121
service_bus_namespace,
2222
queue_name,
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
use azure_messaging_servicebus::prelude::*;
2+
3+
#[tokio::main]
4+
async fn main() {
5+
let service_bus_namespace = std::env::var("AZURE_SERVICE_BUS_NAMESPACE")
6+
.expect("Please set AZURE_SERVICE_BUS_NAMESPACE env variable first!");
7+
8+
let topic_name =
9+
std::env::var("AZURE_TOPIC_NAME").expect("Please set AZURE_TOPIC_NAME env variable first!");
10+
11+
let subscription_name =
12+
std::env::var("SUBSCRIPTION").expect("Please set SUBSCRIPTION env variable first!");
13+
14+
let policy_name = std::env::var("AZURE_POLICY_NAME")
15+
.expect("Please set AZURE_POLICY_NAME env variable first!");
16+
17+
let policy_key =
18+
std::env::var("AZURE_POLICY_KEY").expect("Please set AZURE_POLICY_KEY env variable first!");
19+
20+
let http_client = azure_core::new_http_client();
21+
22+
let client = TopicClient::new(
23+
http_client,
24+
service_bus_namespace,
25+
topic_name,
26+
policy_name,
27+
policy_key,
28+
)
29+
.expect("Failed to create client");
30+
31+
let sender = client.topic_sender();
32+
let receiver = client.subscription_receiver(&subscription_name);
33+
34+
let message_to_send = "hello, world!";
35+
36+
sender
37+
.send_message(message_to_send)
38+
.await
39+
.expect("Failed to send message while testing receive");
40+
41+
println!("Sent Message: {}", message_to_send);
42+
43+
let received_message = receiver
44+
.receive_and_delete_message()
45+
.await
46+
.expect("Failed to receive message");
47+
48+
println!("Received Message: {}", received_message);
49+
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
pub use crate::service_bus::Client;
1+
pub use crate::service_bus::{QueueClient, SubscriptionReceiver, TopicClient, TopicSender};

sdk/messaging_servicebus/src/service_bus/mod.rs

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,13 @@ use std::{ops::Add, sync::Arc};
66
use time::OffsetDateTime;
77
use url::form_urlencoded::{self, Serializer};
88

9-
mod client;
9+
mod queue_client;
10+
mod topic_client;
1011

11-
use crate::utils::craft_peek_lock_url;
12+
use crate::utils::{craft_peek_lock_url, get_head_url};
1213

13-
pub use self::client::Client;
14+
pub use self::queue_client::QueueClient;
15+
pub use self::topic_client::{SubscriptionReceiver, TopicClient, TopicSender};
1416

1517
/// Default duration for the SAS token in days — We might want to make this configurable at some point
1618
const DEFAULT_SAS_DURATION: u64 = 3_600; // seconds = 1 hour
@@ -77,18 +79,18 @@ fn generate_signature(
7779
)
7880
}
7981

80-
/// Sends a message to the queue
82+
/// Sends a message to the queue or topic
8183
async fn send_message(
8284
http_client: &Arc<dyn HttpClient>,
8385
namespace: &str,
84-
queue: &str,
86+
queue_or_topic: &str,
8587
policy_name: &str,
8688
signing_key: &hmac::Key,
8789
msg: &str,
8890
) -> azure_core::Result<()> {
8991
let url = format!(
9092
"https://{}.servicebus.windows.net/{}/messages",
91-
namespace, queue
93+
namespace, queue_or_topic
9294
);
9395

9496
let req = finalize_request(
@@ -110,15 +112,12 @@ async fn send_message(
110112
async fn receive_and_delete_message(
111113
http_client: &Arc<dyn HttpClient>,
112114
namespace: &str,
113-
queue: &str,
115+
queue_or_topic: &str,
114116
policy_name: &str,
115117
signing_key: &hmac::Key,
118+
subscription: Option<&str>,
116119
) -> azure_core::Result<CollectedResponse> {
117-
let url = format!(
118-
"https://{}.servicebus.windows.net/{}/messages/head",
119-
namespace, queue
120-
);
121-
120+
let url = get_head_url(namespace, queue_or_topic, subscription);
122121
let req = finalize_request(&url, Method::Delete, None, policy_name, signing_key)?;
123122

124123
http_client
@@ -138,12 +137,13 @@ async fn receive_and_delete_message(
138137
async fn peek_lock_message(
139138
http_client: &Arc<dyn HttpClient>,
140139
namespace: &str,
141-
queue: &str,
140+
queue_or_topic: &str,
142141
policy_name: &str,
143142
signing_key: &hmac::Key,
144143
lock_expiry: Option<Duration>,
144+
subscription: Option<&str>,
145145
) -> azure_core::Result<CollectedResponse> {
146-
let url = craft_peek_lock_url(namespace, queue, lock_expiry)?;
146+
let url = craft_peek_lock_url(namespace, queue_or_topic, lock_expiry, subscription)?;
147147

148148
let req = finalize_request(url.as_ref(), Method::Post, None, policy_name, signing_key)?;
149149

@@ -160,12 +160,13 @@ async fn peek_lock_message(
160160
async fn peek_lock_message2(
161161
http_client: &Arc<dyn HttpClient>,
162162
namespace: &str,
163-
queue: &str,
163+
queue_or_topic: &str,
164164
policy_name: &str,
165165
signing_key: &hmac::Key,
166166
lock_expiry: Option<Duration>,
167+
subscription: Option<&str>,
167168
) -> azure_core::Result<PeekLockResponse> {
168-
let url = craft_peek_lock_url(namespace, queue, lock_expiry)?;
169+
let url = craft_peek_lock_url(namespace, queue_or_topic, lock_expiry, subscription)?;
169170

170171
let req = finalize_request(url.as_ref(), Method::Post, None, policy_name, signing_key)?;
171172

sdk/messaging_servicebus/src/service_bus/client.rs renamed to sdk/messaging_servicebus/src/service_bus/queue_client.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,23 +14,23 @@ use azure_core::{error::Error, HttpClient};
1414

1515
/// Client object that allows interaction with the ServiceBus API
1616
#[derive(Debug, Clone)]
17-
pub struct Client {
17+
pub struct QueueClient {
1818
http_client: Arc<dyn HttpClient>,
1919
namespace: String,
2020
queue: String,
2121
policy_name: String,
2222
signing_key: Key,
2323
}
2424

25-
impl Client {
26-
/// Creates a new client instance
25+
impl QueueClient {
26+
/// Creates a new queue client instance
2727
pub fn new<N, Q, P, K>(
2828
http_client: Arc<dyn HttpClient>,
2929
namespace: N,
3030
queue: Q,
3131
policy_name: P,
3232
policy_key: K,
33-
) -> Result<Client, Error>
33+
) -> Result<QueueClient, Error>
3434
where
3535
N: Into<String>,
3636
Q: Into<String>,
@@ -39,7 +39,7 @@ impl Client {
3939
{
4040
let signing_key = Key::new(ring::hmac::HMAC_SHA256, policy_key.as_ref().as_bytes());
4141

42-
Ok(Client {
42+
Ok(QueueClient {
4343
http_client,
4444
namespace: namespace.into(),
4545
queue: queue.into(),
@@ -70,6 +70,7 @@ impl Client {
7070
&self.queue,
7171
&self.policy_name,
7272
&self.signing_key,
73+
None,
7374
)
7475
.await?
7576
.body(),
@@ -93,6 +94,7 @@ impl Client {
9394
&self.policy_name,
9495
&self.signing_key,
9596
lock_expiry,
97+
None,
9698
)
9799
.await?
98100
.body(),
@@ -114,6 +116,7 @@ impl Client {
114116
&self.policy_name,
115117
&self.signing_key,
116118
timeout,
119+
None,
117120
)
118121
.await
119122
}
Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
use std::sync::Arc;
2+
3+
use crate::{
4+
service_bus::{
5+
peek_lock_message, peek_lock_message2, receive_and_delete_message, send_message,
6+
PeekLockResponse,
7+
},
8+
utils::body_bytes_to_utf8,
9+
};
10+
use ring::hmac::Key;
11+
use std::time::Duration;
12+
13+
use azure_core::{error::Error, HttpClient};
14+
15+
/// Client object that allows interaction with the ServiceBus API
16+
#[derive(Debug, Clone)]
17+
pub struct TopicClient {
18+
http_client: Arc<dyn HttpClient>,
19+
namespace: String,
20+
topic: String,
21+
policy_name: String,
22+
signing_key: Key,
23+
}
24+
25+
#[derive(Debug, Clone)]
26+
pub struct TopicSender {
27+
topic_client: TopicClient,
28+
}
29+
30+
#[derive(Debug, Clone)]
31+
pub struct SubscriptionReceiver {
32+
topic_client: TopicClient,
33+
subscription: String,
34+
}
35+
36+
impl TopicClient {
37+
/// Creates a new topic client instance
38+
pub fn new<N, T, P, K>(
39+
http_client: Arc<dyn HttpClient>,
40+
namespace: N,
41+
topic: T,
42+
policy_name: P,
43+
policy_key: K,
44+
) -> Result<TopicClient, Error>
45+
where
46+
N: Into<String>,
47+
T: Into<String>,
48+
P: Into<String>,
49+
K: AsRef<str>,
50+
{
51+
let signing_key = Key::new(ring::hmac::HMAC_SHA256, policy_key.as_ref().as_bytes());
52+
53+
Ok(Self {
54+
http_client,
55+
namespace: namespace.into(),
56+
topic: topic.into(),
57+
policy_name: policy_name.into(),
58+
signing_key,
59+
})
60+
}
61+
62+
pub fn topic_sender(&self) -> TopicSender {
63+
TopicSender::new(self.clone())
64+
}
65+
66+
pub fn subscription_receiver(&self, subscription: &str) -> SubscriptionReceiver {
67+
SubscriptionReceiver::new(self.clone(), subscription)
68+
}
69+
}
70+
71+
impl TopicSender {
72+
pub fn new(topic_client: TopicClient) -> TopicSender {
73+
Self { topic_client }
74+
}
75+
/// Sends a message to the topic
76+
pub async fn send_message(&self, msg: &str) -> Result<(), Error> {
77+
send_message(
78+
&self.topic_client.http_client,
79+
&self.topic_client.namespace,
80+
&self.topic_client.topic,
81+
&self.topic_client.policy_name,
82+
&self.topic_client.signing_key,
83+
msg,
84+
)
85+
.await
86+
}
87+
}
88+
89+
impl SubscriptionReceiver {
90+
pub fn new<S>(topic_client: TopicClient, subscription: S) -> SubscriptionReceiver
91+
where
92+
S: Into<String>,
93+
{
94+
Self {
95+
topic_client,
96+
subscription: subscription.into(),
97+
}
98+
}
99+
100+
/// Receive and delete a message
101+
pub async fn receive_and_delete_message(&self) -> Result<String, Error> {
102+
body_bytes_to_utf8(
103+
receive_and_delete_message(
104+
&self.topic_client.http_client,
105+
&self.topic_client.namespace,
106+
&self.topic_client.topic,
107+
&self.topic_client.policy_name,
108+
&self.topic_client.signing_key,
109+
Some(&self.subscription),
110+
)
111+
.await?
112+
.body(),
113+
)
114+
}
115+
116+
/// Non-destructively read a message
117+
///
118+
/// Note: This function does not return the delete location
119+
/// of the message, so, after reading, you will lose
120+
/// "track" of it until the lock expiry runs out and
121+
/// the message can be consumed by others. If you want to keep
122+
/// track of this message (i.e., have the possibility of deletion),
123+
/// use `peek_lock_message2`.
124+
pub async fn peek_lock_message(&self, lock_expiry: Option<Duration>) -> Result<String, Error> {
125+
body_bytes_to_utf8(
126+
peek_lock_message(
127+
&self.topic_client.http_client,
128+
&self.topic_client.namespace,
129+
&self.topic_client.topic,
130+
&self.topic_client.policy_name,
131+
&self.topic_client.signing_key,
132+
lock_expiry,
133+
Some(&self.subscription),
134+
)
135+
.await?
136+
.body(),
137+
)
138+
}
139+
140+
/// Non-destructively read a message but track it
141+
///
142+
/// Note: This function returns a `PeekLockResponse`
143+
/// that contains a helper `delete_message` function.
144+
pub async fn peek_lock_message2(
145+
&self,
146+
timeout: Option<Duration>,
147+
) -> Result<PeekLockResponse, Error> {
148+
peek_lock_message2(
149+
&self.topic_client.http_client,
150+
&self.topic_client.namespace,
151+
&self.topic_client.topic,
152+
&self.topic_client.policy_name,
153+
&self.topic_client.signing_key,
154+
timeout,
155+
Some(&self.subscription),
156+
)
157+
.await
158+
}
159+
}

0 commit comments

Comments
 (0)