Skip to content

Commit b48d4ad

Browse files
authored
Azure Queue Storage support (#65)
Add support for Azure Queue Storage. Note that the blocking receives are not possible in Azure Queue Storage and as such, calling receive on an empty queue will return immediately with QueueError::NoData.
1 parent a00efe9 commit b48d4ad

File tree

8 files changed

+503
-0
lines changed

8 files changed

+503
-0
lines changed

.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
Cargo.lock
22
target
3+
.vscode

omniqueue/Cargo.toml

+3
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ edition = "2021"
1313
async-trait = "0.1"
1414
aws-config = { version = "1.1.5", features = ["behavior-version-latest"], optional = true }
1515
aws-sdk-sqs = { version = "1.13.0", optional = true }
16+
azure_storage = { version = "0.19.0", optional = true }
17+
azure_storage_queues = { version = "0.19.0", optional = true }
1618
bb8 = { version = "0.8", optional = true }
1719
bb8-redis = { version = "0.14.0", optional = true }
1820
futures-util = { version = "0.3.28", default-features = false, features = ["async-await", "std"], optional = true }
@@ -46,3 +48,4 @@ rabbitmq-with-message-ids = ["rabbitmq", "dep:time", "dep:svix-ksuid"]
4648
redis = ["dep:bb8", "dep:bb8-redis", "dep:redis", "dep:svix-ksuid"]
4749
redis_cluster = ["redis", "redis/cluster-async"]
4850
sqs = ["dep:aws-config", "dep:aws-sdk-sqs"]
51+
azure_queue_storage = ["dep:azure_storage", "dep:azure_storage_queues"]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,225 @@
1+
use std::time::Duration;
2+
3+
use async_trait::async_trait;
4+
use azure_storage::StorageCredentials;
5+
use azure_storage_queues::{
6+
operations::Message, PopReceipt, QueueClient, QueueServiceClientBuilder,
7+
};
8+
use serde::Serialize;
9+
10+
use crate::{
11+
builder::Static, queue::Acker, Delivery, QueueBackend, QueueBuilder, QueueError, Result,
12+
};
13+
14+
fn get_client(cfg: &AqsConfig) -> QueueClient {
15+
let AqsConfig {
16+
queue_name,
17+
storage_account,
18+
credentials,
19+
cloud_uri,
20+
..
21+
} = cfg;
22+
let mut builder = QueueServiceClientBuilder::new(storage_account, credentials.clone());
23+
if let Some(cloud_uri) = cloud_uri {
24+
builder = builder.cloud_location(azure_storage::CloudLocation::Custom {
25+
account: cfg.storage_account.clone(),
26+
uri: cloud_uri.clone(),
27+
});
28+
}
29+
builder.build().queue_client(queue_name)
30+
}
31+
32+
/// Note that blocking receives are not supported by Azure Queue Storage and
33+
/// that message order is not guaranteed.
34+
#[non_exhaustive]
35+
pub struct AqsBackend;
36+
37+
impl AqsBackend {
38+
/// Creates a new Azure Queue Storage builder with the given
39+
/// configuration.
40+
pub fn builder(cfg: impl Into<AqsConfig>) -> QueueBuilder<Self, Static> {
41+
QueueBuilder::new(cfg.into())
42+
}
43+
}
44+
45+
const DEFAULT_RECV_TIMEOUT: Duration = Duration::from_secs(180);
46+
const DEFAULT_EMPTY_RECV_DELAY: Duration = Duration::from_millis(200);
47+
48+
#[derive(Clone)]
49+
pub struct AqsConfig {
50+
pub queue_name: String,
51+
pub empty_receive_delay: Option<Duration>,
52+
pub message_ttl: Duration,
53+
pub storage_account: String,
54+
pub credentials: StorageCredentials,
55+
pub cloud_uri: Option<String>,
56+
pub receive_timeout: Option<Duration>,
57+
}
58+
59+
impl QueueBackend for AqsBackend {
60+
type Config = AqsConfig;
61+
62+
type PayloadIn = String;
63+
type PayloadOut = String;
64+
65+
type Producer = AqsProducer;
66+
type Consumer = AqsConsumer;
67+
68+
async fn new_pair(config: Self::Config) -> Result<(AqsProducer, AqsConsumer)> {
69+
let client = get_client(&config);
70+
Ok((
71+
AqsProducer {
72+
client: client.clone(),
73+
config: config.clone(),
74+
},
75+
AqsConsumer {
76+
client: client.clone(),
77+
config: config.clone(),
78+
},
79+
))
80+
}
81+
82+
async fn producing_half(config: Self::Config) -> Result<AqsProducer> {
83+
let client = get_client(&config);
84+
Ok(AqsProducer { client, config })
85+
}
86+
87+
async fn consuming_half(config: Self::Config) -> Result<AqsConsumer> {
88+
let client = get_client(&config);
89+
Ok(AqsConsumer { client, config })
90+
}
91+
}
92+
93+
pub struct AqsProducer {
94+
client: QueueClient,
95+
config: AqsConfig,
96+
}
97+
98+
impl AqsProducer {
99+
pub async fn send_raw(&self, payload: &str) -> Result<()> {
100+
self.send_raw_scheduled(payload, Duration::ZERO).await
101+
}
102+
103+
pub async fn send_raw_scheduled(&self, payload: &str, delay: Duration) -> Result<()> {
104+
self.client
105+
.put_message(payload)
106+
.visibility_timeout(delay)
107+
.ttl(self.config.message_ttl)
108+
.await
109+
.map_err(QueueError::generic)
110+
.map(|_| ())
111+
}
112+
113+
pub async fn send_serde_json<P: Serialize + Sync>(&self, payload: &P) -> Result<()> {
114+
let payload = serde_json::to_string(payload)?;
115+
self.send_raw(&payload).await
116+
}
117+
118+
pub async fn send_serde_json_scheduled<P: Serialize + Sync>(
119+
&self,
120+
payload: &P,
121+
delay: Duration,
122+
) -> Result<()> {
123+
let payload = serde_json::to_string(payload)?;
124+
self.send_raw_scheduled(&payload, delay).await
125+
}
126+
}
127+
128+
impl_queue_producer!(AqsProducer, String);
129+
impl_scheduled_queue_producer!(AqsProducer, String);
130+
131+
/// Note that blocking receives are not supported by Azure Queue Storage and
132+
/// that message order is not guaranteed.
133+
pub struct AqsConsumer {
134+
client: QueueClient,
135+
config: AqsConfig,
136+
}
137+
138+
struct AqsAcker {
139+
client: QueueClient,
140+
already_acked_or_nacked: bool,
141+
pop_receipt: PopReceipt,
142+
}
143+
144+
#[async_trait]
145+
impl Acker for AqsAcker {
146+
async fn ack(&mut self) -> Result<()> {
147+
if self.already_acked_or_nacked {
148+
return Err(QueueError::CannotAckOrNackTwice);
149+
}
150+
self.already_acked_or_nacked = true;
151+
self.client
152+
.pop_receipt_client(self.pop_receipt.clone())
153+
.delete()
154+
.await
155+
.map_err(QueueError::generic)
156+
.map(|_| ())
157+
}
158+
159+
async fn nack(&mut self) -> Result<()> {
160+
Ok(())
161+
}
162+
}
163+
164+
impl AqsConsumer {
165+
fn wrap_message(&self, message: &Message) -> Delivery {
166+
Delivery {
167+
acker: Box::new(AqsAcker {
168+
client: self.client.clone(),
169+
pop_receipt: message.pop_receipt(),
170+
already_acked_or_nacked: false,
171+
}),
172+
payload: Some(message.message_text.as_bytes().to_owned()),
173+
}
174+
}
175+
176+
/// Note that blocking receives are not supported by Azure Queue Storage.
177+
/// Calls to this method will return immediately if no messages are
178+
/// available for delivery in the queue.
179+
pub async fn receive(&mut self) -> Result<Delivery> {
180+
self.client
181+
.get_messages()
182+
.visibility_timeout(self.config.receive_timeout.unwrap_or(DEFAULT_RECV_TIMEOUT))
183+
.await
184+
.map_err(QueueError::generic)
185+
.and_then(|m| m.messages.into_iter().next().ok_or(QueueError::NoData))
186+
.map(|m| self.wrap_message(&m))
187+
}
188+
189+
pub async fn receive_all(
190+
&mut self,
191+
max_messages: usize,
192+
deadline: Duration,
193+
) -> Result<Vec<Delivery>> {
194+
let end = std::time::Instant::now() + deadline;
195+
let mut interval = tokio::time::interval(
196+
self.config
197+
.empty_receive_delay
198+
.unwrap_or(DEFAULT_EMPTY_RECV_DELAY),
199+
);
200+
loop {
201+
interval.tick().await;
202+
let msgs = self
203+
.client
204+
.get_messages()
205+
.number_of_messages(max_messages.try_into().unwrap_or(u8::MAX))
206+
.visibility_timeout(self.config.receive_timeout.unwrap_or(DEFAULT_RECV_TIMEOUT))
207+
.await
208+
.map_err(QueueError::generic)
209+
.map(|m| {
210+
m.messages
211+
.iter()
212+
.map(|m| self.wrap_message(m))
213+
.collect::<Vec<_>>()
214+
})?;
215+
if !msgs.is_empty() {
216+
return Ok(msgs);
217+
}
218+
if std::time::Instant::now() > end {
219+
return Ok(vec![]);
220+
}
221+
}
222+
}
223+
}
224+
225+
impl_queue_consumer!(AqsConsumer, String);

omniqueue/src/backends/mod.rs

+4
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
#[cfg(feature = "azure_queue_storage")]
2+
pub mod azure_queue_storage;
13
#[cfg(feature = "gcp_pubsub")]
24
pub mod gcp_pubsub;
35
#[cfg(feature = "in_memory")]
@@ -9,6 +11,8 @@ pub mod redis;
911
#[cfg(feature = "sqs")]
1012
pub mod sqs;
1113

14+
#[cfg(feature = "azure_queue_storage")]
15+
pub use azure_queue_storage::{AqsBackend, AqsConfig, AqsConsumer, AqsProducer};
1216
#[cfg(feature = "gcp_pubsub")]
1317
pub use gcp_pubsub::{GcpPubSubBackend, GcpPubSubConfig, GcpPubSubConsumer, GcpPubSubProducer};
1418
#[cfg(feature = "in_memory")]

omniqueue/src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
//! * RabbitMQ
1818
//! * Redis
1919
//! * Amazon SQS
20+
//! * Azure Queue Storage
2021
//!
2122
//! ## How to Use Omniqueue
2223
//!

0 commit comments

Comments
 (0)