Skip to content

Commit 285904f

Browse files
author
Jon Abaunza
committed
feat: provide support for Kafka message keys different than "string?"
Implements #313 Signed-off-by: Jon Abaunza <[email protected]>
1 parent b4632f4 commit 285904f

File tree

7 files changed

+259
-39
lines changed

7 files changed

+259
-39
lines changed

src/CloudNative.CloudEvents.Kafka/CloudNative.CloudEvents.Kafka.csproj

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
1-
<Project Sdk="Microsoft.NET.Sdk">
1+
<Project Sdk="Microsoft.NET.Sdk">
22

33
<PropertyGroup>
44
<TargetFrameworks>netstandard2.0;netstandard2.1;net8.0</TargetFrameworks>
55
<Description>Kafka extensions for CloudNative.CloudEvents</Description>
66
<PackageTags>cncf;cloudnative;cloudevents;events;kafka</PackageTags>
7-
<LangVersion>8.0</LangVersion>
7+
<LangVersion>9.0</LangVersion>
88
<Nullable>enable</Nullable>
99
</PropertyGroup>
1010

src/CloudNative.CloudEvents.Kafka/KafkaExtensions.cs

Lines changed: 41 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
1-
// Copyright (c) Cloud Native Foundation.
1+
// Copyright (c) Cloud Native Foundation.
22
// Licensed under the Apache 2.0 license.
33
// See LICENSE file in the project root for full license information.
44

55
using CloudNative.CloudEvents.Core;
66
using CloudNative.CloudEvents.Extensions;
7+
using CloudNative.CloudEvents.Kafka.PartitionKeyAdapters;
78
using Confluent.Kafka;
89
using System;
910
using System.Collections.Generic;
@@ -32,7 +33,7 @@ public static class KafkaExtensions
3233
/// </remarks>
3334
/// <param name="message">The message to check for the presence of a CloudEvent. Must not be null.</param>
3435
/// <returns>true, if the request is a CloudEvent</returns>
35-
public static bool IsCloudEvent(this Message<string?, byte[]> message) =>
36+
public static bool IsCloudEvent<TKey>(this Message<TKey, byte[]> message) =>
3637
GetHeaderValue(message, SpecVersionKafkaHeader) is object ||
3738
MimeUtilities.IsCloudEventsContentType(GetHeaderValue(message, KafkaContentTypeAttributeName));
3839

@@ -56,6 +57,21 @@ public static CloudEvent ToCloudEvent(this Message<string?, byte[]> message,
5657
/// <returns>A reference to a validated CloudEvent instance.</returns>
5758
public static CloudEvent ToCloudEvent(this Message<string?, byte[]> message,
5859
CloudEventFormatter formatter, IEnumerable<CloudEventAttribute>? extensionAttributes)
60+
{
61+
return ToCloudEvent(message, formatter, extensionAttributes, new StringPartitionKeyAdapter());
62+
}
63+
64+
/// <summary>
65+
/// Converts this Kafka message into a CloudEvent object.
66+
/// </summary>
67+
/// <param name="message">The Kafka message to convert. Must not be null.</param>
68+
/// <param name="formatter">The event formatter to use to parse the CloudEvent. Must not be null.</param>
69+
/// <param name="extensionAttributes">The extension attributes to use when parsing the CloudEvent. May be null.</param>
70+
/// <param name="partitionKeyAdapter">The PartitionKey Adapter responsible for determining wether to set the partitionKey attribute and its value.</param>
71+
/// <typeparam name="TKey">The type of key of the Kafka message.</typeparam>
72+
/// <returns>A reference to a validated CloudEvent instance.</returns>
73+
public static CloudEvent ToCloudEvent<TKey>(this Message<TKey, byte[]> message,
74+
CloudEventFormatter formatter, IEnumerable<CloudEventAttribute>? extensionAttributes, IPartitionKeyAdapter<TKey> partitionKeyAdapter)
5975
{
6076
Validation.CheckNotNull(message, nameof(message));
6177
Validation.CheckNotNull(formatter, nameof(formatter));
@@ -109,16 +125,11 @@ public static CloudEvent ToCloudEvent(this Message<string?, byte[]> message,
109125
formatter.DecodeBinaryModeEventData(message.Value, cloudEvent);
110126
}
111127

112-
InitPartitioningKey(message, cloudEvent);
113-
return Validation.CheckCloudEventArgument(cloudEvent, nameof(message));
114-
}
115-
116-
private static void InitPartitioningKey(Message<string?, byte[]> message, CloudEvent cloudEvent)
117-
{
118-
if (!string.IsNullOrEmpty(message.Key))
128+
if (partitionKeyAdapter.ConvertKeyToPartitionKeyAttributeValue(message.Key, out var partitionKeyAttributeValue))
119129
{
120-
cloudEvent[Partitioning.PartitionKeyAttribute] = message.Key;
130+
cloudEvent[Partitioning.PartitionKeyAttribute] = partitionKeyAttributeValue;
121131
}
132+
return Validation.CheckCloudEventArgument(cloudEvent, nameof(message));
122133
}
123134

124135
/// <summary>
@@ -136,12 +147,22 @@ private static void InitPartitioningKey(Message<string?, byte[]> message, CloudE
136147
/// <param name="contentMode">Content mode. Structured or binary.</param>
137148
/// <param name="formatter">The formatter to use within the conversion. Must not be null.</param>
138149
public static Message<string?, byte[]> ToKafkaMessage(this CloudEvent cloudEvent, ContentMode contentMode, CloudEventFormatter formatter)
150+
=> ToKafkaMessage(cloudEvent, contentMode, formatter, new StringPartitionKeyAdapter());
151+
152+
/// <summary>
153+
/// Converts a CloudEvent to a Kafka message.
154+
/// </summary>
155+
/// <param name="cloudEvent">The CloudEvent to convert. Must not be null, and must be a valid CloudEvent.</param>
156+
/// <param name="contentMode">Content mode. Structured or binary.</param>
157+
/// <param name="formatter">The formatter to use within the conversion. Must not be null.</param>
158+
/// <param name="partitionKeyAdapter">The partition key adapter responsible for transforming the cloud event partitioning key into the desired Kafka key type.</param>
159+
/// <typeparam name="TKey">The Kafka Key type to be used </typeparam>
160+
public static Message<TKey, byte[]> ToKafkaMessage<TKey>(this CloudEvent cloudEvent, ContentMode contentMode, CloudEventFormatter formatter, IPartitionKeyAdapter<TKey> partitionKeyAdapter)
139161
{
140162
Validation.CheckCloudEventArgument(cloudEvent, nameof(cloudEvent));
141163
Validation.CheckNotNull(formatter, nameof(formatter));
142164

143165
var headers = MapHeaders(cloudEvent);
144-
string? key = (string?) cloudEvent[Partitioning.PartitionKeyAttribute];
145166
byte[] value;
146167
string? contentTypeHeaderValue;
147168

@@ -163,12 +184,17 @@ private static void InitPartitioningKey(Message<string?, byte[]> message, CloudE
163184
{
164185
headers.Add(KafkaContentTypeAttributeName, Encoding.UTF8.GetBytes(contentTypeHeaderValue));
165186
}
166-
return new Message<string?, byte[]>
187+
var message = new Message<TKey, byte[]>
167188
{
168189
Headers = headers,
169-
Value = value,
170-
Key = key
190+
Value = value
171191
};
192+
if (partitionKeyAdapter.ConvertPartitionKeyAttributeValueToKey((string?)cloudEvent[Partitioning.PartitionKeyAttribute], out var keyValue)
193+
&& keyValue != null)
194+
{
195+
message.Key = keyValue;
196+
}
197+
return message;
172198
}
173199

174200
private static Headers MapHeaders(CloudEvent cloudEvent)
@@ -191,4 +217,4 @@ private static Headers MapHeaders(CloudEvent cloudEvent)
191217
return headers;
192218
}
193219
}
194-
}
220+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
using System;
2+
3+
namespace CloudNative.CloudEvents.Kafka.PartitionKeyAdapters
4+
{
5+
/// <summary>
6+
/// Partion Key Adapter that converts to and from Guids in binary representation.
7+
/// </summary>
8+
public class BinaryGuidPartitionKeyAdapter : IPartitionKeyAdapter<byte[]?>
9+
{
10+
/// <inheritdoc/>
11+
public bool ConvertKeyToPartitionKeyAttributeValue(byte[]? keyValue, out string? attributeValue)
12+
{
13+
if (keyValue == null)
14+
{
15+
attributeValue = null;
16+
return false;
17+
}
18+
19+
attributeValue = new Guid(keyValue).ToString();
20+
return true;
21+
}
22+
23+
/// <inheritdoc/>
24+
public bool ConvertPartitionKeyAttributeValueToKey(string? attributeValue, out byte[]? keyValue)
25+
{
26+
if (string.IsNullOrEmpty(attributeValue))
27+
{
28+
keyValue = default;
29+
return false;
30+
}
31+
32+
keyValue = Guid.Parse(attributeValue).ToByteArray();
33+
return true;
34+
}
35+
}
36+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
namespace CloudNative.CloudEvents.Kafka.PartitionKeyAdapters
2+
{
3+
/// <summary>
4+
/// Defines the methods of the adapters responsible for transforming from cloud event
5+
/// PartitionKey Attribute to Kafka Message Key.
6+
/// </summary>
7+
/// <typeparam name="TKey"></typeparam>
8+
public interface IPartitionKeyAdapter<TKey>
9+
{
10+
/// <summary>
11+
/// Converts a Message Key to PartionKey Attribute Value.
12+
/// </summary>
13+
/// <param name="keyValue">The key value to transform.</param>
14+
/// <param name="attributeValue">The transformed attribute value (output).</param>
15+
/// <returns>Whether the attribute should be set.</returns>
16+
bool ConvertKeyToPartitionKeyAttributeValue(TKey keyValue, out string? attributeValue);
17+
18+
/// <summary>
19+
/// Converts a PartitionKey Attribute value to a Message Key.
20+
/// </summary>
21+
/// <param name="attributeValue">The attribute value to transform.</param>
22+
/// <param name="keyValue">The transformed key value (output)</param>
23+
/// <returns>Whether the key should be set.</returns>
24+
bool ConvertPartitionKeyAttributeValueToKey(string? attributeValue, out TKey? keyValue);
25+
}
26+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
namespace CloudNative.CloudEvents.Kafka.PartitionKeyAdapters
2+
{
3+
/// <summary>
4+
/// Partion Key Adapter that skips handling the key.
5+
/// </summary>
6+
/// <typeparam name="TKey">The type of Kafka Message Key</typeparam>
7+
public class NullPartitionKeyAdapter<TKey> : IPartitionKeyAdapter<TKey>
8+
{
9+
/// <inheritdoc/>
10+
public bool ConvertKeyToPartitionKeyAttributeValue(TKey keyValue, out string? attributeValue)
11+
{
12+
attributeValue = null;
13+
return false;
14+
}
15+
16+
/// <inheritdoc/>
17+
public bool ConvertPartitionKeyAttributeValueToKey(string? attributeValue, out TKey? keyValue)
18+
{
19+
keyValue = default;
20+
return false;
21+
}
22+
}
23+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
namespace CloudNative.CloudEvents.Kafka.PartitionKeyAdapters
2+
{
3+
/// <summary>
4+
/// Partion Key Adapter that skips handling the key.
5+
/// </summary>
6+
public class StringPartitionKeyAdapter : IPartitionKeyAdapter<string?>
7+
{
8+
/// <inheritdoc/>
9+
public bool ConvertKeyToPartitionKeyAttributeValue(string? keyValue, out string? attributeValue)
10+
{
11+
attributeValue = keyValue;
12+
return true;
13+
}
14+
15+
/// <inheritdoc/>
16+
public bool ConvertPartitionKeyAttributeValueToKey(string? attributeValue, out string? keyValue)
17+
{
18+
keyValue = attributeValue;
19+
return true;
20+
}
21+
}
22+
}

test/CloudNative.CloudEvents.UnitTests/Kafka/KafkaTest.cs

Lines changed: 109 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -37,16 +37,9 @@ public void IsCloudEvent(string headerName, string headerValue, bool expectedRes
3737
public void IsCloudEvent_NoHeaders() =>
3838
Assert.False(new Message<string?, byte[]>().IsCloudEvent());
3939

40-
[Fact]
41-
public void KafkaStructuredMessageTest()
40+
private static CloudEvent CreateTestCloudEvent()
4241
{
43-
// Kafka doesn't provide any way to get to the message transport level to do the test properly
44-
// and it doesn't have an embedded version of a server for .Net so the lowest we can get is
45-
// the `Message<T, K>`
46-
47-
var jsonEventFormatter = new JsonEventFormatter();
48-
49-
var cloudEvent = new CloudEvent
42+
return new CloudEvent
5043
{
5144
Type = "com.github.pull.create",
5245
Source = new Uri("https://github.com/cloudevents/spec/pull"),
@@ -55,21 +48,12 @@ public void KafkaStructuredMessageTest()
5548
Time = new DateTimeOffset(2018, 4, 5, 17, 31, 0, TimeSpan.Zero),
5649
DataContentType = MediaTypeNames.Text.Xml,
5750
Data = "<much wow=\"xml\"/>",
58-
["comexampleextension1"] = "value"
51+
["comexampleextension1"] = "value",
5952
};
53+
}
6054

61-
var message = cloudEvent.ToKafkaMessage(ContentMode.Structured, new JsonEventFormatter());
62-
63-
Assert.True(message.IsCloudEvent());
64-
65-
// Using serialization to create fully independent copy thus simulating message transport.
66-
// The real transport will work in a similar way.
67-
var serialized = JsonConvert.SerializeObject(message, new HeaderConverter());
68-
var messageCopy = JsonConvert.DeserializeObject<Message<string?, byte[]>>(serialized, new HeadersConverter(), new HeaderConverter())!;
69-
70-
Assert.True(messageCopy.IsCloudEvent());
71-
var receivedCloudEvent = messageCopy.ToCloudEvent(jsonEventFormatter);
72-
55+
private static void VerifyTestCloudEvent(CloudEvent receivedCloudEvent)
56+
{
7357
Assert.Equal(CloudEventsSpecVersion.Default, receivedCloudEvent.SpecVersion);
7458
Assert.Equal("com.github.pull.create", receivedCloudEvent.Type);
7559
Assert.Equal(new Uri("https://github.com/cloudevents/spec/pull"), receivedCloudEvent.Source);
@@ -82,6 +66,109 @@ public void KafkaStructuredMessageTest()
8266
Assert.Equal("value", (string?) receivedCloudEvent["comexampleextension1"]);
8367
}
8468

69+
private static Message<TKey, byte[]>? SimulateMessageTransport<TKey>(Message<TKey, byte[]> message)
70+
{
71+
// Using serialization to create fully independent copy thus simulating message transport.
72+
// The real transport will work in a similar way.
73+
var serialized = JsonConvert.SerializeObject(message, new HeaderConverter());
74+
var messageCopy = JsonConvert.DeserializeObject<Message<TKey, byte[]>>(serialized, new HeadersConverter(), new HeaderConverter())!;
75+
return messageCopy;
76+
}
77+
78+
[Fact]
79+
public void KafkaStructuredMessageTest()
80+
{
81+
// Kafka doesn't provide any way to get to the message transport level to do the test properly
82+
// and it doesn't have an embedded version of a server for .Net so the lowest we can get is
83+
// the `Message<T, K>`
84+
85+
var jsonEventFormatter = new JsonEventFormatter();
86+
var key = "Test";
87+
var cloudEvent = CreateTestCloudEvent();
88+
cloudEvent[Partitioning.PartitionKeyAttribute] = key;
89+
90+
var message = cloudEvent.ToKafkaMessage(ContentMode.Structured, jsonEventFormatter);
91+
92+
Assert.True(message.IsCloudEvent());
93+
94+
var messageCopy = SimulateMessageTransport(message);
95+
96+
Assert.NotNull(messageCopy);
97+
Assert.Equal(key, messageCopy.Key);
98+
Assert.True(messageCopy.IsCloudEvent());
99+
var receivedCloudEvent = messageCopy.ToCloudEvent(jsonEventFormatter, null);
100+
101+
VerifyTestCloudEvent(receivedCloudEvent);
102+
}
103+
104+
[Fact]
105+
public void KafkaBinaryGuidKeyedStructuredMessageTest()
106+
{
107+
// In order to test the most extreme case of key management we will simulate
108+
// using Guid Keys serialized in their binary form in kafka that are converted
109+
// back to their string representation in the cloudEvent.
110+
var partitionKeyAdapter = new PartitionKeyAdapters.BinaryGuidPartitionKeyAdapter();
111+
var jsonEventFormatter = new JsonEventFormatter();
112+
var key = Guid.NewGuid();
113+
var cloudEvent = CreateTestCloudEvent();
114+
cloudEvent[Partitioning.PartitionKeyAttribute] = key.ToString();
115+
116+
var message = cloudEvent.ToKafkaMessage<byte[]?>(
117+
ContentMode.Structured,
118+
jsonEventFormatter,
119+
partitionKeyAdapter);
120+
121+
Assert.True(message.IsCloudEvent());
122+
123+
var messageCopy = SimulateMessageTransport(message);
124+
125+
Assert.NotNull(messageCopy);
126+
Assert.True(messageCopy.IsCloudEvent());
127+
128+
var receivedCloudEvent = messageCopy.ToCloudEvent<byte[]?>(
129+
jsonEventFormatter,
130+
null,
131+
partitionKeyAdapter);
132+
133+
Assert.NotNull(message.Key);
134+
// The key should be the original Guid in the binary representation.
135+
Assert.Equal(key, new Guid(messageCopy.Key!));
136+
VerifyTestCloudEvent(receivedCloudEvent);
137+
}
138+
139+
[Fact]
140+
public void KafkaNullKeyedStructuredMessageTest()
141+
{
142+
// It will test the serialization using Confluent's Confluent.Kafka.Null type for the key.
143+
// As the default behavior without adapter is to skip the key it will work properly.
144+
var partitionKeyAdapter = new PartitionKeyAdapters.NullPartitionKeyAdapter<Confluent.Kafka.Null>();
145+
var jsonEventFormatter = new JsonEventFormatter();
146+
var cloudEvent = CreateTestCloudEvent();
147+
// Even if the key is established in the cloud event it won't flow.
148+
cloudEvent[Partitioning.PartitionKeyAttribute] = "Test";
149+
150+
var message = cloudEvent.ToKafkaMessage<Confluent.Kafka.Null>(
151+
ContentMode.Structured,
152+
jsonEventFormatter,
153+
partitionKeyAdapter);
154+
155+
Assert.True(message.IsCloudEvent());
156+
157+
var messageCopy = SimulateMessageTransport(message);
158+
159+
Assert.NotNull(messageCopy);
160+
Assert.True(messageCopy.IsCloudEvent());
161+
162+
var receivedCloudEvent = messageCopy.ToCloudEvent<Confluent.Kafka.Null>(
163+
jsonEventFormatter,
164+
null,
165+
partitionKeyAdapter);
166+
167+
//The Message key will continue to be null.
168+
Assert.Null(message.Key);
169+
VerifyTestCloudEvent(receivedCloudEvent);
170+
}
171+
85172
[Fact]
86173
public void KafkaBinaryMessageTest()
87174
{

0 commit comments

Comments
 (0)