Skip to content

Commit f2b2fbb

Browse files
committed
[Host.AmazonSQS] Add SNS support
Signed-off-by: Tomasz Maruszak <[email protected]>
1 parent e18d412 commit f2b2fbb

File tree

7 files changed

+59
-32
lines changed

7 files changed

+59
-32
lines changed

src/SlimMessageBus.Host.AmazonSQS/Config/SqsConsumerBuilderExtensions.cs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@ public static TConsumerBuilder SubscribeToTopic<TConsumerBuilder>(this TConsumer
2828
if (consumerBuilder is null) throw new ArgumentNullException(nameof(consumerBuilder));
2929
if (topic is null) throw new ArgumentNullException(nameof(topic));
3030

31-
SqsProperties.SubscribeToTopic.Set(consumerBuilder.Settings, topic);
32-
SqsProperties.SubscribeToTopicFilterPolicy.Set(consumerBuilder.Settings, filterPolicy);
31+
SqsProperties.SubscribeToTopic.Set(consumerBuilder.ConsumerSettings, topic);
32+
SqsProperties.SubscribeToTopicFilterPolicy.Set(consumerBuilder.ConsumerSettings, filterPolicy);
3333
return consumerBuilder;
3434
}
3535

@@ -48,7 +48,7 @@ public static ConsumerBuilder<T> VisibilityTimeout<T>(this ConsumerBuilder<T> co
4848
if (consumerBuilder is null) throw new ArgumentNullException(nameof(consumerBuilder));
4949
if (visibilityTimeoutSeconds <= 0) throw new ArgumentOutOfRangeException(nameof(visibilityTimeoutSeconds));
5050

51-
SqsProperties.VisibilityTimeout.Set(consumerBuilder.Settings, visibilityTimeoutSeconds);
51+
SqsProperties.VisibilityTimeout.Set(consumerBuilder.ConsumerSettings, visibilityTimeoutSeconds);
5252
return consumerBuilder;
5353
}
5454

@@ -67,7 +67,7 @@ public static ConsumerBuilder<T> MaxMessages<T>(this ConsumerBuilder<T> consumer
6767
if (consumerBuilder is null) throw new ArgumentNullException(nameof(consumerBuilder));
6868
if (maxMessages <= 0 || maxMessages > 10) throw new ArgumentOutOfRangeException(nameof(maxMessages));
6969

70-
SqsProperties.MaxMessages.Set(consumerBuilder.Settings, maxMessages);
70+
SqsProperties.MaxMessages.Set(consumerBuilder.ConsumerSettings, maxMessages);
7171
return consumerBuilder;
7272
}
7373

@@ -83,7 +83,7 @@ public static ConsumerBuilder<T> FetchMessageAttributes<T>(this ConsumerBuilder<
8383
{
8484
if (consumerBuilder is null) throw new ArgumentNullException(nameof(consumerBuilder));
8585

86-
SqsProperties.MessageAttributes.Set(consumerBuilder.Settings, messageAttributeNames);
86+
SqsProperties.MessageAttributes.Set(consumerBuilder.ConsumerSettings, messageAttributeNames);
8787
return consumerBuilder;
8888
}
8989
}

src/SlimMessageBus.Host.AmazonSQS/Config/SqsProperties.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,5 @@ static internal class SqsProperties
1616
static readonly internal ProviderExtensionProperty<int?> WaitTimeSeconds = new("Sqs_WaitTimeSeconds");
1717
static readonly internal ProviderExtensionProperty<string[]> MessageAttributes = new("Sqs_MessageAttributes");
1818
static readonly internal ProviderExtensionProperty<string> SubscribeToTopic = new("Sqs_SubscribeToTopic");
19-
static readonly internal ProviderExtensionProperty<string?> SubscribeToTopicFilterPolicy = new("Sqs_SubscribeToTopic_FilterPolicy");
19+
static readonly internal ProviderExtensionProperty<string> SubscribeToTopicFilterPolicy = new("Sqs_SubscribeToTopic_FilterPolicy");
2020
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
namespace SlimMessageBus.Host.AmazonSQS;
2+
3+
internal class SnsEnvelope
4+
{
5+
public string Type { get; set; }
6+
public string MessageId { get; set; }
7+
public string TopicArn { get; set; }
8+
public string Message { get; set; }
9+
public string Timestamp { get; set; }
10+
public Dictionary<string, SnsMessageAttribute> MessageAttributes { get; set; }
11+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
namespace SlimMessageBus.Host.AmazonSQS;
2+
3+
internal class SnsMessageAttribute
4+
{
5+
public string Type { get; set; }
6+
public string Value { get; set; }
7+
}

src/SlimMessageBus.Host.AmazonSQS/Consumer/SqsBaseConsumer.cs

Lines changed: 28 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,28 @@
11
namespace SlimMessageBus.Host.AmazonSQS;
2-
internal abstract class SqsBaseConsumer : AbstractConsumer
2+
3+
abstract internal class SqsBaseConsumer : AbstractConsumer
34
{
45
private readonly ISqsClientProvider _clientProvider;
56

67
// consumer settings
78
private readonly int _maxMessages;
89
private readonly int _visibilityTimeout;
910
private readonly List<string> _messageAttributeNames;
11+
private readonly bool _isSubscribedToTopic;
1012

1113
private Task _task;
1214

1315
public SqsMessageBus MessageBus { get; }
1416
protected IMessageProcessor<SqsTransportMessageWithPayload> MessageProcessor { get; }
1517
protected ISqsHeaderSerializer<Amazon.SQS.Model.MessageAttributeValue> HeaderSerializer { get; }
18+
protected IMessageSerializer<string> MessageSerializer { get; }
1619

1720
protected SqsBaseConsumer(
1821
SqsMessageBus messageBus,
1922
ISqsClientProvider clientProvider,
2023
string path,
2124
IMessageProcessor<SqsTransportMessageWithPayload> messageProcessor,
25+
IMessageSerializer<string> messageSerializer,
2226
IEnumerable<AbstractConsumerSettings> consumerSettings,
2327
ILogger logger)
2428
: base(logger,
@@ -30,6 +34,8 @@ protected SqsBaseConsumer(
3034
MessageBus = messageBus;
3135
MessageProcessor = messageProcessor ?? throw new ArgumentNullException(nameof(messageProcessor));
3236
HeaderSerializer = messageBus.SqsHeaderSerializer;
37+
MessageSerializer = messageSerializer ?? throw new ArgumentNullException(nameof(messageSerializer));
38+
3339
T GetSingleValue<T>(Func<AbstractConsumerSettings, T> selector, string settingName, T defaultValue = default)
3440
{
3541
var set = consumerSettings.Select(x => selector(x)).Where(x => x is not null && !x.Equals(defaultValue)).ToHashSet();
@@ -43,6 +49,7 @@ T GetSingleValue<T>(Func<AbstractConsumerSettings, T> selector, string settingNa
4349
_maxMessages = GetSingleValue(x => x.GetOrDefault(SqsProperties.MaxMessages), nameof(SqsConsumerBuilderExtensions.MaxMessages)) ?? messageBus.ProviderSettings.MaxMessageCount;
4450
_visibilityTimeout = GetSingleValue(x => x.GetOrDefault(SqsProperties.VisibilityTimeout), nameof(SqsConsumerBuilderExtensions.VisibilityTimeout)) ?? 30;
4551
_messageAttributeNames = [.. GetSingleValue(x => x.GetOrDefault(SqsProperties.MessageAttributes), nameof(SqsConsumerBuilderExtensions.FetchMessageAttributes)) ?? ["All"]];
52+
_isSubscribedToTopic = consumerSettings.Any(x => x.GetOrDefault(SqsProperties.SubscribeToTopic) is not null);
4653
}
4754

4855
private async Task<IReadOnlyCollection<Message>> ReceiveMessagesByUrl(string queueUrl)
@@ -111,13 +118,9 @@ protected async Task Run()
111118
var messages = await ReceiveMessagesByUrl(queueUrl).ConfigureAwait(false);
112119
foreach (var message in messages)
113120
{
114-
// ToDo: Check if SNS message and if so unwrap envelope and use inner body and attributes
121+
Logger.LogDebug("Received message on Queue: {Queue}, MessageId: {MessageId}, Payload: {MessagePayload}", Path, message.MessageId, message.Body);
115122

116-
var messageHeaders = message
117-
.MessageAttributes
118-
.ToDictionary(x => x.Key, x => HeaderSerializer.Deserialize(x.Key, x.Value));
119-
120-
var messagePayload = message.Body;
123+
GetPayloadAndHeadersFromMessage(message, out var messagePayload, out var messageHeaders);
121124

122125
var r = await MessageProcessor.ProcessMessage(new(message, messagePayload), messageHeaders, cancellationToken: CancellationToken).ConfigureAwait(false);
123126
if (r.Exception != null)
@@ -146,20 +149,23 @@ protected async Task Run()
146149
}
147150
}
148151
}
149-
}
150152

151-
internal class SnsEnvelope
152-
{
153-
public string Type { get; set; }
154-
public string MessageId { get; set; }
155-
public string TopicArn { get; set; }
156-
public string Message { get; set; }
157-
public string Timestamp { get; set; }
158-
public Dictionary<string, SnsMessageAttribute> MessageAttributes { get; set; }
159-
}
153+
private void GetPayloadAndHeadersFromMessage(Message message, out string messagePayload, out Dictionary<string, object> messageHeaders)
154+
{
155+
if (_isSubscribedToTopic)
156+
{
157+
// Note: Messages from SNS topics are wrapped in an envelope like SnsEnvelope type. We need to get the actual message and headers from it.
158+
var snsEnvelope = (SnsEnvelope)MessageSerializer.Deserialize(typeof(SnsEnvelope), message.Body);
160159

161-
internal class SnsMessageAttribute
162-
{
163-
public string Type { get; set; }
164-
public string Value { get; set; }
165-
}
160+
messagePayload = snsEnvelope.Message ?? throw new ConsumerMessageBusException("Message of the SNS Envelope was null");
161+
messageHeaders = (snsEnvelope.MessageAttributes ?? throw new ConsumerMessageBusException("Message of the SNS Envelope was null"))
162+
.ToDictionary(x => x.Key, x => HeaderSerializer.Deserialize(x.Key, new MessageAttributeValue { DataType = x.Value.Type, StringValue = x.Value.Value }));
163+
}
164+
else
165+
{
166+
messagePayload = message.Body;
167+
messageHeaders = message.MessageAttributes
168+
.ToDictionary(x => x.Key, x => HeaderSerializer.Deserialize(x.Key, x.Value));
169+
}
170+
}
171+
}

src/SlimMessageBus.Host.AmazonSQS/Consumer/SqsQueueConsumer.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,13 @@ internal class SqsQueueConsumer(
55
string path,
66
ISqsClientProvider clientProvider,
77
IMessageProcessor<SqsTransportMessageWithPayload> messageProcessor,
8+
IMessageSerializer<string> messageSerializer,
89
IEnumerable<AbstractConsumerSettings> consumerSettings)
910
: SqsBaseConsumer(messageBus,
1011
clientProvider,
1112
path,
1213
messageProcessor,
14+
messageSerializer,
1315
consumerSettings,
1416
messageBus.LoggerFactory.CreateLogger<SqsQueueConsumer>())
1517
{

src/SlimMessageBus.Host.AmazonSQS/SqsMessageBus.cs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,12 +41,12 @@ protected override async Task CreateConsumers()
4141
{
4242
await base.CreateConsumers();
4343

44-
void AddConsumerFrom(string path, PathKind pathKind, IMessageProcessor<SqsTransportMessageWithPayload> messageProcessor, IEnumerable<AbstractConsumerSettings> consumerSettings)
44+
void AddConsumerFrom(string path, PathKind pathKind, IMessageProcessor<SqsTransportMessageWithPayload> messageProcessor, IEnumerable<AbstractConsumerSettings> consumerSettings, IMessageSerializer<string> messageSerializer)
4545
{
4646
if (pathKind == PathKind.Queue)
4747
{
4848
_logger.LogInformation("Creating consumer for Queue: {Queue}", path);
49-
var consumer = new SqsQueueConsumer(this, path, _clientProviderSqs, messageProcessor, consumerSettings);
49+
var consumer = new SqsQueueConsumer(this, path, _clientProviderSqs, messageProcessor, messageSerializer, consumerSettings);
5050
AddConsumer(consumer);
5151
}
5252
}
@@ -69,7 +69,7 @@ void AddConsumerFrom(string path, PathKind pathKind, IMessageProcessor<SqsTransp
6969
consumerContextInitializer: InitConsumerContext,
7070
consumerErrorHandlerOpenGenericType: typeof(ISqsConsumerErrorHandler<>));
7171

72-
AddConsumerFrom(path, pathKind, messageProcessor, consumerSettings);
72+
AddConsumerFrom(path, pathKind, messageProcessor, consumerSettings, messageSerializer);
7373
}
7474

7575
if (Settings.RequestResponse != null)
@@ -90,7 +90,8 @@ void AddConsumerFrom(string path, PathKind pathKind, IMessageProcessor<SqsTransp
9090
path,
9191
Settings.RequestResponse.PathKind,
9292
messageProcessor,
93-
[Settings.RequestResponse]);
93+
[Settings.RequestResponse],
94+
messageSerializer);
9495
}
9596
}
9697

0 commit comments

Comments
 (0)