Skip to content

Commit f6a2a45

Browse files
committed
Add unit tests
1 parent a071b35 commit f6a2a45

File tree

11 files changed

+479
-25
lines changed

11 files changed

+479
-25
lines changed

Aspire.sln

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,10 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Consumer", "samples\KafkaBa
170170
EndProject
171171
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Producer", "samples\KafkaBasic\Producer\Producer.csproj", "{8463BB20-C998-4318-8265-4D9601DA7D1E}"
172172
EndProject
173+
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Aspire.Kafka.Producer.Tests", "tests\Aspire.Kafka.Producer.Tests\Aspire.Kafka.Producer.Tests.csproj", "{7F0BAF43-46D7-42B2-B9E8-3716167FDC78}"
174+
EndProject
175+
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Aspire.Kafka.Consumer.Tests", "tests\Aspire.Kafka.Consumer.Tests\Aspire.Kafka.Consumer.Tests.csproj", "{1452CE38-284F-4845-814F-5BC828F2EE25}"
176+
EndProject
173177
Global
174178
GlobalSection(SolutionConfigurationPlatforms) = preSolution
175179
Debug|Any CPU = Debug|Any CPU
@@ -456,6 +460,14 @@ Global
456460
{8463BB20-C998-4318-8265-4D9601DA7D1E}.Debug|Any CPU.Build.0 = Debug|Any CPU
457461
{8463BB20-C998-4318-8265-4D9601DA7D1E}.Release|Any CPU.ActiveCfg = Release|Any CPU
458462
{8463BB20-C998-4318-8265-4D9601DA7D1E}.Release|Any CPU.Build.0 = Release|Any CPU
463+
{7F0BAF43-46D7-42B2-B9E8-3716167FDC78}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
464+
{7F0BAF43-46D7-42B2-B9E8-3716167FDC78}.Debug|Any CPU.Build.0 = Debug|Any CPU
465+
{7F0BAF43-46D7-42B2-B9E8-3716167FDC78}.Release|Any CPU.ActiveCfg = Release|Any CPU
466+
{7F0BAF43-46D7-42B2-B9E8-3716167FDC78}.Release|Any CPU.Build.0 = Release|Any CPU
467+
{1452CE38-284F-4845-814F-5BC828F2EE25}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
468+
{1452CE38-284F-4845-814F-5BC828F2EE25}.Debug|Any CPU.Build.0 = Debug|Any CPU
469+
{1452CE38-284F-4845-814F-5BC828F2EE25}.Release|Any CPU.ActiveCfg = Release|Any CPU
470+
{1452CE38-284F-4845-814F-5BC828F2EE25}.Release|Any CPU.Build.0 = Release|Any CPU
459471
EndGlobalSection
460472
GlobalSection(SolutionProperties) = preSolution
461473
HideSolutionNode = FALSE
@@ -536,6 +548,8 @@ Global
536548
{A6DAFDA3-4AD5-4F06-8582-B9D0928DD933} = {C6A650C8-256E-49DF-B7B7-C001255A3688}
537549
{AF581BA0-60F0-4DC0-956A-3C211DF3BC3C} = {C6A650C8-256E-49DF-B7B7-C001255A3688}
538550
{8463BB20-C998-4318-8265-4D9601DA7D1E} = {C6A650C8-256E-49DF-B7B7-C001255A3688}
551+
{7F0BAF43-46D7-42B2-B9E8-3716167FDC78} = {4981B3A5-4AFD-4191-BF7D-8692D9783D60}
552+
{1452CE38-284F-4845-814F-5BC828F2EE25} = {4981B3A5-4AFD-4191-BF7D-8692D9783D60}
539553
EndGlobalSection
540554
GlobalSection(ExtensibilityGlobals) = postSolution
541555
SolutionGuid = {6DCEDFEC-988E-4CB3-B45B-191EB5086E0C}

src/Components/Aspire.Kafka.Consumer/AspireKafkaConsumerExtensions.cs

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ private static void AddKafkaConsumer<TKey, TValue>(
4646
Action<ConsumerBuilder<TKey, TValue>>? configureConsumerBuilder,
4747
Action<ConsumerConfig>? configureConsumerConfig,
4848
string connectionName,
49-
object? serviceKey)
49+
string? serviceKey)
5050
{
5151
ArgumentNullException.ThrowIfNull(builder);
5252

@@ -62,26 +62,39 @@ private static void AddKafkaConsumer<TKey, TValue>(
6262

6363
configureConsumerConfig?.Invoke(config);
6464

65-
ConsumerBuilder<TKey, TValue> CreateConsumerBuilder(IServiceProvider sp)
66-
{
67-
// Create and configure the consumer builder
68-
var consumerBuilder = new ConsumerBuilder<TKey, TValue>(config);
69-
configureConsumerBuilder?.Invoke(consumerBuilder);
70-
71-
return consumerBuilder;
72-
}
73-
7465
if (serviceKey is null)
7566
{
76-
builder.Services.AddSingleton<ConsumerBuilder<TKey, TValue>>(CreateConsumerBuilder);
67+
builder.Services.AddSingleton<ConsumerConfig>(config);
68+
if (configureConsumerBuilder is not null)
69+
{
70+
builder.Services.AddSingleton<Action<ConsumerBuilder<TKey, TValue>>>(configureConsumerBuilder);
71+
}
72+
builder.Services.AddSingleton<ConsumerBuilder<TKey, TValue>>(sp => CreateConsumerBuilder<TKey, TValue>(sp, null));
7773
builder.Services.AddSingleton<IConsumer<TKey, TValue>>(sp => CreateConsumer(sp.GetRequiredService<ConsumerBuilder<TKey, TValue>>()));
7874
}
7975
else
8076
{
81-
builder.Services.AddKeyedSingleton<ConsumerBuilder<TKey, TValue>>(serviceKey, (sp, _) => CreateConsumerBuilder(sp));
77+
builder.Services.AddKeyedSingleton<ConsumerConfig>(serviceKey, config);
78+
if (configureConsumerBuilder is not null)
79+
{
80+
builder.Services.AddKeyedSingleton<Action<ConsumerBuilder<TKey, TValue>>>(serviceKey, configureConsumerBuilder);
81+
}
82+
builder.Services.AddKeyedSingleton<ConsumerBuilder<TKey, TValue>>(serviceKey, (sp, key) => CreateConsumerBuilder<TKey, TValue>(sp, key as string));
8283
builder.Services.AddKeyedSingleton<IConsumer<TKey, TValue>>(serviceKey, (sp, key) => CreateConsumer(sp.GetRequiredKeyedService<ConsumerBuilder<TKey, TValue>>(key)));
8384
}
8485
}
8586

87+
private static ConsumerBuilder<TKey, TValue> CreateConsumerBuilder<TKey, TValue>(IServiceProvider sp, string? key)
88+
{
89+
// Create and configure the consumer builder
90+
(ConsumerConfig config, Action<ConsumerBuilder<TKey, TValue>>? configureConsumerBuilder) = key is null
91+
? (sp.GetRequiredService<ConsumerConfig>(), sp.GetService<Action<ConsumerBuilder<TKey, TValue>>>())
92+
: (sp.GetRequiredKeyedService<ConsumerConfig>(key), sp.GetKeyedService<Action<ConsumerBuilder<TKey, TValue>>>(key));
93+
var consumerBuilder = new ConsumerBuilder<TKey, TValue>(config);
94+
configureConsumerBuilder?.Invoke(consumerBuilder);
95+
96+
return consumerBuilder;
97+
}
98+
8699
private static IConsumer<TKey, TValue> CreateConsumer<TKey, TValue>(ConsumerBuilder<TKey, TValue> consumerBuilder) => consumerBuilder.Build();
87100
}

src/Components/Aspire.Kafka.Producer/AspireKafkaProducerExtensions.cs

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ namespace Microsoft.Extensions.Hosting;
1010
/// <summary>
1111
/// Extension methods for connecting to a Kafka broker.
1212
/// </summary>
13-
public static partial class AspireKafkaProducerExtensions
13+
public static class AspireKafkaProducerExtensions
1414
{
1515
private const string DefaultConfigSectionName = "Aspire:Kafka:Producer";
1616

@@ -46,7 +46,7 @@ private static void AddKafkaProducer<TKey, TValue>(
4646
Action<ProducerBuilder<TKey, TValue>>? configureProducerBuilder,
4747
Action<ProducerConfig>? configureProducerConfig,
4848
string connectionName,
49-
object? serviceKey)
49+
string? serviceKey)
5050
{
5151
ArgumentNullException.ThrowIfNull(builder);
5252

@@ -62,26 +62,39 @@ private static void AddKafkaProducer<TKey, TValue>(
6262

6363
configureProducerConfig?.Invoke(config);
6464

65-
ProducerBuilder<TKey, TValue> CreateProducerBuilder(IServiceProvider sp)
66-
{
67-
// Create and configure the producer builder
68-
var producerBuilder = new ProducerBuilder<TKey, TValue>(config);
69-
configureProducerBuilder?.Invoke(producerBuilder);
70-
71-
return producerBuilder;
72-
}
73-
7465
if (serviceKey is null)
7566
{
76-
builder.Services.AddSingleton<ProducerBuilder<TKey, TValue>>(CreateProducerBuilder);
67+
builder.Services.AddSingleton<ProducerConfig>(config);
68+
if (configureProducerBuilder is not null)
69+
{
70+
builder.Services.AddSingleton<Action<ProducerBuilder<TKey, TValue>>>(configureProducerBuilder);
71+
}
72+
builder.Services.AddSingleton<ProducerBuilder<TKey, TValue>>(sp => CreateProducerBuilder<TKey, TValue>(sp, null));
7773
builder.Services.AddSingleton<IProducer<TKey, TValue>>(sp => CreateProducer(sp.GetRequiredService<ProducerBuilder<TKey, TValue>>()));
7874
}
7975
else
8076
{
81-
builder.Services.AddKeyedSingleton<ProducerBuilder<TKey, TValue>>(serviceKey, (sp, _) => CreateProducerBuilder(sp));
77+
builder.Services.AddKeyedSingleton<ProducerConfig>(serviceKey, config);
78+
if (configureProducerBuilder is not null)
79+
{
80+
builder.Services.AddKeyedSingleton<Action<ProducerBuilder<TKey, TValue>>>(serviceKey, configureProducerBuilder);
81+
}
82+
builder.Services.AddKeyedSingleton<ProducerBuilder<TKey, TValue>>(serviceKey, (sp, key) => CreateProducerBuilder<TKey, TValue>(sp, key as string));
8283
builder.Services.AddKeyedSingleton<IProducer<TKey, TValue>>(serviceKey, (sp, key) => CreateProducer(sp.GetRequiredKeyedService<ProducerBuilder<TKey, TValue>>(key)));
8384
}
8485
}
8586

87+
private static ProducerBuilder<TKey, TValue> CreateProducerBuilder<TKey, TValue>(IServiceProvider sp, string? key)
88+
{
89+
// Create and configure the producer builder
90+
(ProducerConfig config, Action<ProducerBuilder<TKey, TValue>>? configureProducerBuilder) = key is null
91+
? (sp.GetRequiredService<ProducerConfig>(), sp.GetService<Action<ProducerBuilder<TKey, TValue>>>())
92+
: (sp.GetRequiredKeyedService<ProducerConfig>(key), sp.GetKeyedService<Action<ProducerBuilder<TKey, TValue>>>(key));
93+
var producerBuilder = new ProducerBuilder<TKey, TValue>(config);
94+
configureProducerBuilder?.Invoke(producerBuilder);
95+
96+
return producerBuilder;
97+
}
98+
8699
private static IProducer<TKey, TValue> CreateProducer<TKey, TValue>(ProducerBuilder<TKey, TValue> producerBuilder) => producerBuilder.Build();
87100
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
3+
<PropertyGroup>
4+
<TargetFramework>$(NetCurrent)</TargetFramework>
5+
</PropertyGroup>
6+
7+
<ItemGroup>
8+
<ProjectReference Include="..\..\src\Components\Aspire.Kafka.Consumer\Aspire.Kafka.Consumer.csproj" />
9+
<ProjectReference Include="..\Aspire.Components.Common.Tests\Aspire.Components.Common.Tests.csproj" />
10+
</ItemGroup>
11+
12+
</Project>
Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
4+
using System.Text;
5+
using Confluent.Kafka;
6+
using Microsoft.Extensions.Configuration;
7+
using Microsoft.Extensions.DependencyInjection;
8+
using Microsoft.Extensions.Hosting;
9+
using Xunit;
10+
11+
namespace Aspire.Kafka.Consumer.Tests;
12+
13+
public class AspireKafkaConsumerExtensionsTests
14+
{
15+
[ConditionalTheory]
16+
[InlineData(true)]
17+
[InlineData(false)]
18+
public void ReadsFromConnectionStringsCorrectly(bool useKeyed)
19+
{
20+
var builder = Host.CreateEmptyApplicationBuilder(null);
21+
builder.Configuration.AddInMemoryCollection([
22+
new KeyValuePair<string, string?>("ConnectionStrings:messaging", AspireKafkaConsumerHelpers.TestingEndpoint)
23+
]);
24+
25+
if (useKeyed)
26+
{
27+
builder.AddKeyedKafkaConsumer<string, string>("messaging");
28+
}
29+
else
30+
{
31+
builder.AddKafkaConsumer<string, string>("messaging");
32+
}
33+
34+
var host = builder.Build();
35+
var ConsumerConfig = useKeyed ?
36+
host.Services.GetRequiredKeyedService<ConsumerConfig>("messaging") :
37+
host.Services.GetRequiredService<ConsumerConfig>();
38+
39+
Assert.Equal(AspireKafkaConsumerHelpers.TestingEndpoint, ConsumerConfig.BootstrapServers);
40+
}
41+
42+
[ConditionalTheory]
43+
[InlineData(true)]
44+
[InlineData(false)]
45+
public void ConnectionStringCanBeSetInCode(bool useKeyed)
46+
{
47+
var builder = Host.CreateEmptyApplicationBuilder(null);
48+
builder.Configuration.AddInMemoryCollection([
49+
new KeyValuePair<string, string?>("ConnectionStrings:messaging", "unused")
50+
]);
51+
52+
static void SetConnectionString(ConsumerConfig config) => config.BootstrapServers = AspireKafkaConsumerHelpers.TestingEndpoint;
53+
if (useKeyed)
54+
{
55+
builder.AddKeyedKafkaConsumer<string, string>("messaging", configureConsumerConfig: SetConnectionString);
56+
}
57+
else
58+
{
59+
builder.AddKafkaConsumer<string, string>("messaging", configureConsumerConfig: SetConnectionString);
60+
}
61+
62+
var host = builder.Build();
63+
var config = useKeyed ?
64+
host.Services.GetRequiredKeyedService<ConsumerConfig>("messaging") :
65+
host.Services.GetRequiredService<ConsumerConfig>();
66+
67+
Assert.Equal(AspireKafkaConsumerHelpers.TestingEndpoint, config.BootstrapServers);
68+
}
69+
70+
[ConditionalTheory]
71+
[InlineData(true)]
72+
[InlineData(false)]
73+
public void ConnectionNameWinsOverConfigSection(bool useKeyed)
74+
{
75+
var builder = Host.CreateEmptyApplicationBuilder(null);
76+
77+
var key = useKeyed ? "redis" : null;
78+
builder.Configuration.AddInMemoryCollection([
79+
new KeyValuePair<string, string?>(ConformanceTests.CreateConfigKey("Aspire:Kafka:Consumer", key, "ConnectionString"), "unused"),
80+
new KeyValuePair<string, string?>("ConnectionStrings:messaging", AspireKafkaConsumerHelpers.TestingEndpoint)
81+
]);
82+
83+
if (useKeyed)
84+
{
85+
builder.AddKeyedKafkaConsumer<string, string>("messaging");
86+
}
87+
else
88+
{
89+
builder.AddKafkaConsumer<string, string>("messaging");
90+
}
91+
92+
var host = builder.Build();
93+
var config = useKeyed ?
94+
host.Services.GetRequiredKeyedService<ConsumerConfig>("messaging") :
95+
host.Services.GetRequiredService<ConsumerConfig>();
96+
97+
Assert.Equal(AspireKafkaConsumerHelpers.TestingEndpoint, config.BootstrapServers);
98+
}
99+
100+
[Fact]
101+
public void ConsumerConfigOptionsFromConfig()
102+
{
103+
static Stream CreateStreamFromString(string data) => new MemoryStream(Encoding.UTF8.GetBytes(data));
104+
105+
using var jsonStream = CreateStreamFromString("""
106+
{
107+
"Aspire": {
108+
"Kafka": {
109+
"Consumer": {
110+
"AutoOffsetReset": "Earliest",
111+
"SaslUsername": "user",
112+
"SaslPassword": "password",
113+
"SaslMechanism": "Plain",
114+
"SecurityProtocol": "Plaintext"
115+
}
116+
}
117+
}
118+
}
119+
""");
120+
121+
var builder = Host.CreateEmptyApplicationBuilder(null);
122+
123+
builder.Configuration.AddJsonStream(jsonStream);
124+
125+
builder.AddKafkaConsumer<string, string>("messaging");
126+
127+
var host = builder.Build();
128+
var config = (ConsumerConfig)host.Services.GetRequiredService<ConsumerConfig>();
129+
130+
Assert.Equal(AutoOffsetReset.Earliest, config.AutoOffsetReset);
131+
Assert.Equal("user", config.SaslUsername);
132+
Assert.Equal("password", config.SaslPassword);
133+
Assert.Equal(SaslMechanism.Plain, config.SaslMechanism);
134+
Assert.Equal(SecurityProtocol.Plaintext, config.SecurityProtocol);
135+
}
136+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
4+
namespace Aspire.Kafka.Consumer.Tests;
5+
6+
internal sealed class AspireKafkaConsumerHelpers
7+
{
8+
public const string TestingEndpoint = "localhost:9092";
9+
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
4+
using Aspire.Components.ConformanceTests;
5+
using Confluent.Kafka;
6+
using Microsoft.Extensions.Configuration;
7+
using Microsoft.Extensions.DependencyInjection;
8+
using Microsoft.Extensions.Hosting;
9+
10+
namespace Aspire.Kafka.Consumer.Tests;
11+
internal sealed class ConformanceTests : ConformanceTests<ConsumerConfig, ConsumerConfig>
12+
{
13+
protected override ServiceLifetime ServiceLifetime => throw new NotImplementedException();
14+
15+
protected override string ActivitySourceName => throw new NotImplementedException();
16+
17+
protected override string JsonSchemaPath => throw new NotImplementedException();
18+
19+
protected override string[] RequiredLogCategories => throw new NotImplementedException();
20+
21+
protected override void PopulateConfiguration(ConfigurationManager configuration, string? key = null)
22+
{
23+
throw new NotImplementedException();
24+
}
25+
26+
protected override void RegisterComponent(HostApplicationBuilder builder, Action<ConsumerConfig>? configure = null, string? key = null)
27+
{
28+
throw new NotImplementedException();
29+
}
30+
31+
protected override void SetHealthCheck(ConsumerConfig options, bool enabled)
32+
{
33+
throw new NotImplementedException();
34+
}
35+
36+
protected override void SetMetrics(ConsumerConfig options, bool enabled)
37+
{
38+
throw new NotImplementedException();
39+
}
40+
41+
protected override void SetTracing(ConsumerConfig options, bool enabled)
42+
{
43+
throw new NotImplementedException();
44+
}
45+
46+
protected override void TriggerActivity(ConsumerConfig service)
47+
{
48+
throw new NotImplementedException();
49+
}
50+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
3+
<PropertyGroup>
4+
<TargetFramework>$(NetCurrent)</TargetFramework>
5+
</PropertyGroup>
6+
7+
<ItemGroup>
8+
<ProjectReference Include="..\..\src\Components\Aspire.Kafka.Producer\Aspire.Kafka.Producer.csproj" />
9+
<ProjectReference Include="..\Aspire.Components.Common.Tests\Aspire.Components.Common.Tests.csproj" />
10+
</ItemGroup>
11+
12+
</Project>

0 commit comments

Comments
 (0)