Skip to content

Commit 30fba83

Browse files
committed
Add Aspire.Confluent.Kafka component
apply pr suggestions apply pr suggestions apply pr suggestions Sort ConfigurationSchema.json properties Update ConfigurationSchema.json using ConfigSchemaGenerator apply pr suggestions apply pr suggestions apply pr suggesstions apply pr suggesstions apply pr suggestions apply pr suggestions drop kafka sample from this repo apply pr suggestions
1 parent b44ff4b commit 30fba83

36 files changed

+3170
-3
lines changed

Aspire.sln

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,10 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Aspire.Oracle.EntityFramewo
176176
EndProject
177177
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Aspire.Oracle.EntityFrameworkCore.Tests", "tests\Aspire.Oracle.EntityFrameworkCore.Tests\Aspire.Oracle.EntityFrameworkCore.Tests.csproj", "{A331C123-35A5-4E81-9999-354159821374}"
178178
EndProject
179+
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Aspire.Confluent.Kafka", "src\Components\Aspire.Confluent.Kafka\Aspire.Confluent.Kafka.csproj", "{174E0507-3BB0-4CDC-829E-9CA75DA66473}"
180+
EndProject
181+
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Aspire.Confluent.Kafka.Tests", "tests\Aspire.Confluent.Kafka.Tests\Aspire.Confluent.Kafka.Tests.csproj", "{A8CB331A-1247-41D9-8118-538E5A2CC9DF}"
182+
EndProject
179183
Global
180184
GlobalSection(SolutionConfigurationPlatforms) = preSolution
181185
Debug|Any CPU = Debug|Any CPU
@@ -470,6 +474,14 @@ Global
470474
{A331C123-35A5-4E81-9999-354159821374}.Debug|Any CPU.Build.0 = Debug|Any CPU
471475
{A331C123-35A5-4E81-9999-354159821374}.Release|Any CPU.ActiveCfg = Release|Any CPU
472476
{A331C123-35A5-4E81-9999-354159821374}.Release|Any CPU.Build.0 = Release|Any CPU
477+
{174E0507-3BB0-4CDC-829E-9CA75DA66473}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
478+
{174E0507-3BB0-4CDC-829E-9CA75DA66473}.Debug|Any CPU.Build.0 = Debug|Any CPU
479+
{174E0507-3BB0-4CDC-829E-9CA75DA66473}.Release|Any CPU.ActiveCfg = Release|Any CPU
480+
{174E0507-3BB0-4CDC-829E-9CA75DA66473}.Release|Any CPU.Build.0 = Release|Any CPU
481+
{A8CB331A-1247-41D9-8118-538E5A2CC9DF}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
482+
{A8CB331A-1247-41D9-8118-538E5A2CC9DF}.Debug|Any CPU.Build.0 = Debug|Any CPU
483+
{A8CB331A-1247-41D9-8118-538E5A2CC9DF}.Release|Any CPU.ActiveCfg = Release|Any CPU
484+
{A8CB331A-1247-41D9-8118-538E5A2CC9DF}.Release|Any CPU.Build.0 = Release|Any CPU
473485
EndGlobalSection
474486
GlobalSection(SolutionProperties) = preSolution
475487
HideSolutionNode = FALSE
@@ -551,6 +563,8 @@ Global
551563
{00FEA181-84C9-42A7-AC81-29A9F176A1A0} = {4981B3A5-4AFD-4191-BF7D-8692D9783D60}
552564
{A778F29A-6C40-4C53-A793-F23F20679ADE} = {27381127-6C45-4B4C-8F18-41FF48DFE4B2}
553565
{A331C123-35A5-4E81-9999-354159821374} = {4981B3A5-4AFD-4191-BF7D-8692D9783D60}
566+
{174E0507-3BB0-4CDC-829E-9CA75DA66473} = {27381127-6C45-4B4C-8F18-41FF48DFE4B2}
567+
{A8CB331A-1247-41D9-8118-538E5A2CC9DF} = {4981B3A5-4AFD-4191-BF7D-8692D9783D60}
554568
EndGlobalSection
555569
GlobalSection(ExtensibilityGlobals) = postSolution
556570
SolutionGuid = {6DCEDFEC-988E-4CB3-B45B-191EB5086E0C}

Directory.Packages.props

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
<PackageVersion Include="AspNetCore.HealthChecks.Azure.Storage.Blobs" Version="8.0.0" />
3838
<PackageVersion Include="AspNetCore.HealthChecks.Azure.Storage.Queues" Version="8.0.0" />
3939
<PackageVersion Include="AspNetCore.HealthChecks.AzureServiceBus" Version="8.0.0" />
40+
<PackageVersion Include="AspNetCore.HealthChecks.Kafka" Version="8.0.0" />
4041
<PackageVersion Include="AspNetCore.HealthChecks.MongoDb" Version="8.0.0" />
4142
<PackageVersion Include="AspNetCore.HealthChecks.MySql" Version="8.0.0" />
4243
<PackageVersion Include="AspNetCore.HealthChecks.NpgSql" Version="8.0.0" />
@@ -64,6 +65,7 @@
6465
<PackageVersion Include="Microsoft.Extensions.Primitives" Version="$(MicrosoftExtensionsPrimitivesPackageVersion)" />
6566
<PackageVersion Include="Microsoft.Extensions.Http.Resilience" Version="$(MicrosoftExtensionsHttpResiliencePackageVersion)" />
6667
<!-- external dependencies -->
68+
<PackageVersion Include="Confluent.Kafka" Version="2.3.0" />
6769
<PackageVersion Include="Dapr.AspNetCore" Version="1.12.0" />
6870
<PackageVersion Include="DnsClient" Version="1.7.0" />
6971
<PackageVersion Include="Grpc.AspNetCore" Version="2.59.0" />
@@ -105,5 +107,7 @@
105107
<PackageVersion Include="Microsoft.DotNet.Build.Tasks.Workloads" Version="8.0.0-beta.23564.4" />
106108
<PackageVersion Include="Microsoft.Signed.Wix" Version="1.0.0-v3.14.0.5722" />
107109
<PackageVersion Include="Microsoft.DotNet.Build.Tasks.Installers" Version="8.0.0-beta.23564.4" />
110+
<!-- unit test dependencies -->
111+
<PackageVersion Include="Microsoft.Extensions.Diagnostics.Testing" Version="8.0.0" />
108112
</ItemGroup>
109113
</Project>
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
4+
using Aspire.Hosting.ApplicationModel;
5+
using Aspire.Hosting.Publishing;
6+
7+
namespace Aspire.Hosting;
8+
9+
public static class KafkaBuilderExtensions
10+
{
11+
private const int KafkaBrokerPort = 9092;
12+
/// <summary>
13+
/// Adds a Kafka broker container to the application.
14+
/// </summary>
15+
/// <param name="builder">The <see cref="IDistributedApplicationBuilder"/>.</param>
16+
/// <param name="name">The name of the resource. This name will be used as the connection string name when referenced in a dependency.</param>
17+
/// <param name="port">The host port of Kafka broker.</param>
18+
/// <returns>A reference to the <see cref="IResourceBuilder{KafkaContainerResource}"/></returns>
19+
public static IResourceBuilder<KafkaContainerResource> AddKafkaContainer(this IDistributedApplicationBuilder builder, string name, int? port = null)
20+
{
21+
var kafka = new KafkaContainerResource(name);
22+
return builder.AddResource(kafka)
23+
.WithEndpoint(hostPort: port, containerPort: KafkaBrokerPort)
24+
.WithAnnotation(new ContainerImageAnnotation { Image = "confluentinc/confluent-local", Tag = "latest" })
25+
.WithManifestPublishingCallback(context => WriteKafkaContainerToManifest(context, kafka))
26+
.WithEnvironment(context => ConfigureKafkaContainer(context, kafka));
27+
28+
static void WriteKafkaContainerToManifest(ManifestPublishingContext context, KafkaContainerResource resource)
29+
{
30+
context.WriteContainer(resource);
31+
context.Writer.WriteString("connectionString", $"{{{resource.Name}.bindings.tcp.host}}:{{{resource.Name}.bindings.tcp.port}}");
32+
}
33+
}
34+
35+
/// <summary>
36+
/// Adds a Kafka resource to the application. A container is used for local development.
37+
/// </summary>
38+
/// <param name="builder">The <see cref="IDistributedApplicationBuilder"/>.</param>
39+
/// <param name="name">The name of the resource. This name will be used as the connection string name when referenced in a dependency</param>
40+
/// <returns>A reference to the <see cref="IResourceBuilder{KafkaServerResource}"/>.</returns>
41+
public static IResourceBuilder<KafkaServerResource> AddKafka(this IDistributedApplicationBuilder builder, string name)
42+
{
43+
var kafka = new KafkaServerResource(name);
44+
return builder.AddResource(kafka)
45+
.WithEndpoint(containerPort: KafkaBrokerPort)
46+
.WithAnnotation(new ContainerImageAnnotation{ Image = "confluentinc/confluent-local", Tag = "latest" })
47+
.WithManifestPublishingCallback(WriteKafkaServerToManifest)
48+
.WithEnvironment(context => ConfigureKafkaContainer(context, kafka));
49+
50+
static void WriteKafkaServerToManifest(ManifestPublishingContext context)
51+
{
52+
context.Writer.WriteString("type", "kafka.server.v0");
53+
}
54+
}
55+
56+
private static void ConfigureKafkaContainer(EnvironmentCallbackContext context, IResource resource)
57+
{
58+
// confluentinc/confluent-local is a docker image that contains a Kafka broker started with KRaft to avoid pulling a separate image for ZooKeeper.
59+
// See https://github.com/confluentinc/kafka-images/blob/master/local/README.md.
60+
// When not explicitly set default configuration is applied.
61+
// See https://github.com/confluentinc/kafka-images/blob/master/local/include/etc/confluent/docker/configureDefaults for more details.
62+
63+
var hostPort = context.PublisherName == "manifest"
64+
? KafkaBrokerPort
65+
: GetResourcePort(resource);
66+
context.EnvironmentVariables.Add("KAFKA_ADVERTISED_LISTENERS",
67+
$"PLAINTEXT://localhost:29092,PLAINTEXT_HOST://localhost:{hostPort}");
68+
69+
static int GetResourcePort(IResource resource)
70+
{
71+
if (!resource.TryGetAllocatedEndPoints(out var allocatedEndpoints))
72+
{
73+
throw new DistributedApplicationException(
74+
$"Kafka resource \"{resource.Name}\" does not have endpoint annotation.");
75+
}
76+
77+
return allocatedEndpoints.Single().Port;
78+
}
79+
}
80+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
4+
using Aspire.Hosting.ApplicationModel;
5+
6+
namespace Aspire.Hosting;
7+
8+
/// <summary>
9+
/// A resource that represents a Kafka broker container.
10+
/// </summary>
11+
/// <param name="name"></param>
12+
public class KafkaContainerResource(string name) : ContainerResource(name), IResourceWithConnectionString, IResourceWithEnvironment
13+
{
14+
/// <summary>
15+
/// Gets the connection string for Kafka broker.
16+
/// </summary>
17+
/// <returns>A connection string for the Kafka in the form "host:port" to be passed as <see href="https://docs.confluent.io/platform/current/clients/confluent-kafka-dotnet/_site/api/Confluent.Kafka.ClientConfig.html#Confluent_Kafka_ClientConfig_BootstrapServers">BootstrapServers</see>.</returns>
18+
public string? GetConnectionString()
19+
{
20+
if (!this.TryGetAllocatedEndPoints(out var allocatedEndpoints))
21+
{
22+
throw new DistributedApplicationException($"Kafka resource \"{Name}\" does not have endpoint annotation.");
23+
}
24+
25+
return allocatedEndpoints.SingleOrDefault()?.EndPointString;
26+
}
27+
28+
internal int GetPort()
29+
{
30+
if (!this.TryGetAllocatedEndPoints(out var allocatedEndpoints))
31+
{
32+
throw new DistributedApplicationException($"Kafka resource \"{Name}\" does not have endpoint annotation.");
33+
}
34+
35+
return allocatedEndpoints.Single().Port;
36+
}
37+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
4+
namespace Aspire.Hosting.ApplicationModel;
5+
6+
/// <summary>
7+
/// A resource that represents a Kafka broker.
8+
/// </summary>
9+
/// <param name="name">The name of the resource.</param>
10+
public class KafkaServerResource(string name) : Resource(name), IResourceWithConnectionString, IResourceWithEnvironment
11+
{
12+
/// <summary>
13+
/// Gets the connection string for Kafka broker.
14+
/// </summary>
15+
/// <returns>A connection string for the Kafka in the form "host:port" to be passed as <see href="https://docs.confluent.io/platform/current/clients/confluent-kafka-dotnet/_site/api/Confluent.Kafka.ClientConfig.html#Confluent_Kafka_ClientConfig_BootstrapServers">BootstrapServers</see>.</returns>
16+
public string? GetConnectionString()
17+
{
18+
if (!this.TryGetAllocatedEndPoints(out var allocatedEndpoints))
19+
{
20+
throw new DistributedApplicationException($"Kafka resource \"{Name}\" does not have endpoint annotation.");
21+
}
22+
23+
return allocatedEndpoints.SingleOrDefault()?.EndPointString;
24+
}
25+
26+
internal int GetPort()
27+
{
28+
if (!this.TryGetAllocatedEndPoints(out var allocatedEndpoints))
29+
{
30+
throw new DistributedApplicationException($"Kafka resource \"{Name}\" does not have endpoint annotation.");
31+
}
32+
33+
return allocatedEndpoints.Single().Port;
34+
}
35+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
3+
<PropertyGroup>
4+
<TargetFramework>$(NetCurrent)</TargetFramework>
5+
<IsPackable>true</IsPackable>
6+
<PackageTags>$(ComponentCommonPackageTags) kafka</PackageTags>
7+
<Description>Confluent.Kafka based Kafka generic consumer and producer that integrates with Aspire, including healthchecks and metrics.</Description>
8+
<NoWarn>$(NoWarn);SYSLIB1100</NoWarn>
9+
</PropertyGroup>
10+
11+
<ItemGroup>
12+
<Compile Include="..\Common\HealthChecksExtensions.cs" Link="HealthChecksExtensions.cs" />
13+
</ItemGroup>
14+
15+
<ItemGroup>
16+
<PackageReference Include="AspNetCore.HealthChecks.Kafka" />
17+
<PackageReference Include="Confluent.Kafka" />
18+
<PackageReference Include="Microsoft.Extensions.Configuration.Binder" />
19+
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" />
20+
<PackageReference Include="OpenTelemetry.Extensions.Hosting" />
21+
</ItemGroup>
22+
23+
</Project>

0 commit comments

Comments
 (0)