Skip to content

Commit 79bb726

Browse files
committed
Improve tests
1 parent 6940abe commit 79bb726

File tree

7 files changed

+228
-31
lines changed

7 files changed

+228
-31
lines changed

test/Confluent.Kafka.IntegrationTests/Tests/Producer_Produce.cs

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,71 @@ public void Producer_Produce(string bootstrapServers)
107107

108108
Assert.Equal(2, count);
109109

110+
// Memory<byte> case.
111+
112+
count = 0;
113+
Action<DeliveryReport<Memory<byte>, ReadOnlyMemory<byte>>> dh3 = dr =>
114+
{
115+
Assert.Equal(ErrorCode.NoError, dr.Error.Code);
116+
Assert.Equal(PersistenceStatus.Persisted, dr.Status);
117+
Assert.Equal((Partition)0, dr.Partition);
118+
Assert.Equal(singlePartitionTopic, dr.Topic);
119+
Assert.True(dr.Offset >= 0);
120+
Assert.Equal($"test key {count + 42}", Encoding.UTF8.GetString(dr.Message.Key.Span));
121+
Assert.Equal($"test val {count + 42}", Encoding.UTF8.GetString(dr.Message.Value.Span));
122+
Assert.Equal(TimestampType.CreateTime, dr.Message.Timestamp.Type);
123+
Assert.True(Math.Abs((DateTime.UtcNow - dr.Message.Timestamp.UtcDateTime).TotalMinutes) < 1.0);
124+
count += 1;
125+
};
126+
127+
using (var producer = new TestProducerBuilder<Memory<byte>, ReadOnlyMemory<byte>>(producerConfig).Build())
128+
{
129+
producer.Produce(
130+
new TopicPartition(singlePartitionTopic, 0),
131+
new Message<Memory<byte>, ReadOnlyMemory<byte>> { Key = Encoding.UTF8.GetBytes("test key 42"), Value = Encoding.UTF8.GetBytes("test val 42") }, dh3);
132+
133+
producer.Produce(
134+
singlePartitionTopic,
135+
new Message<Memory<byte>, ReadOnlyMemory<byte>> { Key = Encoding.UTF8.GetBytes("test key 43"), Value = Encoding.UTF8.GetBytes("test val 43") }, dh3);
136+
137+
producer.Flush(TimeSpan.FromSeconds(10));
138+
}
139+
140+
Assert.Equal(2, count);
141+
142+
// Memory<byte>? case.
143+
144+
count = 0;
145+
Action<DeliveryReport<ReadOnlyMemory<byte>?, Memory<byte>?>> dh4 = dr =>
146+
{
147+
Assert.Equal(ErrorCode.NoError, dr.Error.Code);
148+
Assert.Equal(PersistenceStatus.Persisted, dr.Status);
149+
Assert.Equal((Partition)0, dr.Partition);
150+
Assert.Equal(singlePartitionTopic, dr.Topic);
151+
Assert.True(dr.Offset >= 0);
152+
Assert.True(dr.Message.Key.HasValue);
153+
Assert.Equal($"test key {count + 42}", Encoding.UTF8.GetString(dr.Message.Key.Value.Span));
154+
Assert.True(dr.Message.Value.HasValue);
155+
Assert.Equal($"test val {count + 42}", Encoding.UTF8.GetString(dr.Message.Value.Value.Span));
156+
Assert.Equal(TimestampType.CreateTime, dr.Message.Timestamp.Type);
157+
Assert.True(Math.Abs((DateTime.UtcNow - dr.Message.Timestamp.UtcDateTime).TotalMinutes) < 1.0);
158+
count += 1;
159+
};
160+
161+
using (var producer = new TestProducerBuilder<ReadOnlyMemory<byte>?, Memory<byte>?>(producerConfig).Build())
162+
{
163+
producer.Produce(
164+
new TopicPartition(singlePartitionTopic, 0),
165+
new Message<ReadOnlyMemory<byte>?, Memory<byte>?> { Key = Encoding.UTF8.GetBytes("test key 42"), Value = Encoding.UTF8.GetBytes("test val 42") }, dh4);
166+
167+
producer.Produce(
168+
singlePartitionTopic,
169+
new Message<ReadOnlyMemory<byte>?, Memory<byte>?> { Key = Encoding.UTF8.GetBytes("test key 43"), Value = Encoding.UTF8.GetBytes("test val 43") }, dh4);
170+
171+
producer.Flush(TimeSpan.FromSeconds(10));
172+
}
173+
174+
Assert.Equal(2, count);
110175

111176
Assert.Equal(0, Library.HandleCount);
112177
LogToFile("end Producer_Produce");

test/Confluent.Kafka.IntegrationTests/Tests/Producer_ProduceAsync_Error.cs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,40 @@ public void Producer_ProduceAsync_Error(string bootstrapServers)
106106
Assert.Equal(TimestampType.NotAvailable, dr.Message.Timestamp.Type);
107107
}
108108

109+
// byte[] case
110+
111+
Task<DeliveryResult<ReadOnlyMemory<byte>, Memory<byte>>> drt3;
112+
using (var producer = new TestProducerBuilder<ReadOnlyMemory<byte>, Memory<byte>>(producerConfig).Build())
113+
{
114+
drt3 = producer.ProduceAsync(
115+
new TopicPartition(partitionedTopic, 42),
116+
new Message<ReadOnlyMemory<byte>, Memory<byte>> { Key = new byte[] { 100 }, Value = new byte[] { 101 } });
117+
Assert.Equal(0, producer.Flush(TimeSpan.FromSeconds(10)));
118+
}
119+
120+
Assert.Throws<AggregateException>(() => { drt.Wait(); });
121+
122+
try
123+
{
124+
_ = drt3.Result;
125+
}
126+
catch (AggregateException e)
127+
{
128+
var inner = e.InnerException;
129+
Assert.IsType<ProduceException<ReadOnlyMemory<byte>, Memory<byte>>>(inner);
130+
var dr = ((ProduceException<ReadOnlyMemory<byte>, Memory<byte>>)inner).DeliveryResult;
131+
var err = ((ProduceException<ReadOnlyMemory<byte>, Memory<byte>>)inner).Error;
132+
133+
Assert.True(err.IsError);
134+
Assert.False(err.IsFatal);
135+
Assert.Equal(partitionedTopic, dr.Topic);
136+
Assert.Equal(Offset.Unset, dr.Offset);
137+
Assert.True(dr.Partition == 42);
138+
Assert.Equal(new byte[] { 100 }, dr.Message.Key.ToArray());
139+
Assert.Equal(new byte[] { 101 }, dr.Message.Value.ToArray());
140+
Assert.Equal(TimestampType.NotAvailable, dr.Message.Timestamp.Type);
141+
}
142+
109143
Assert.Equal(0, Library.HandleCount);
110144
LogToFile("end Producer_ProduceAsync_Error");
111145
}

test/Confluent.Kafka.IntegrationTests/Tests/Producer_ProduceAsync_Null_Task.cs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,29 @@ public void Producer_ProduceAsync_Null_Task(string bootstrapServers)
9090

9191
Assert.Equal((Partition)1, drs2[0].Result.Partition);
9292

93+
// Memory<byte>? case
94+
95+
var drs3 = new List<Task<DeliveryResult<Memory<byte>?, Memory<byte>?>>>();
96+
using (var producer = new TestProducerBuilder<Memory<byte>?, Memory<byte>?>(producerConfig).Build())
97+
{
98+
drs3.Add(producer.ProduceAsync(new TopicPartition(partitionedTopic, 1), new Message<Memory<byte>?, Memory<byte>?>()));
99+
drs3.Add(producer.ProduceAsync(partitionedTopic, new Message<Memory<byte>?, Memory<byte>?>()));
100+
Assert.Equal(0, producer.Flush(TimeSpan.FromSeconds(10)));
101+
}
102+
103+
for (int i = 0; i < 2; ++i)
104+
{
105+
var dr = drs3[i].Result;
106+
Assert.True(dr.Partition == 0 || dr.Partition == 1);
107+
Assert.Equal(partitionedTopic, dr.Topic);
108+
Assert.True(dr.Offset >= 0);
109+
Assert.Null(dr.Message.Key);
110+
Assert.Null(dr.Message.Value);
111+
Assert.Equal(TimestampType.CreateTime, dr.Message.Timestamp.Type);
112+
Assert.True(Math.Abs((DateTime.UtcNow - dr.Message.Timestamp.UtcDateTime).TotalMinutes) < 1.0);
113+
}
114+
115+
Assert.Equal((Partition)1, drs3[0].Result.Partition);
93116

94117
Assert.Equal(0, Library.HandleCount);
95118
LogToFile("end Producer_ProduceAsync_Null_Task");

test/Confluent.Kafka.IntegrationTests/Tests/Producer_ProduceAsync_Task.cs

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,64 @@ public void Producer_ProduceAsync_Task(string bootstrapServers)
9898

9999
Assert.Equal((Partition)1, drs2[0].Result.Partition);
100100

101+
// Memory<byte> case
102+
103+
var drs3 = new List<Task<DeliveryResult<Memory<byte>, ReadOnlyMemory<byte>>>>();
104+
using (var producer = new TestProducerBuilder<Memory<byte>, ReadOnlyMemory<byte>>(producerConfig).Build())
105+
{
106+
drs3.Add(producer.ProduceAsync(
107+
new TopicPartition(partitionedTopic, 1),
108+
new Message<Memory<byte>, ReadOnlyMemory<byte>> { Key = Encoding.UTF8.GetBytes("test key 2"), Value = Encoding.UTF8.GetBytes("test val 2") }));
109+
drs3.Add(producer.ProduceAsync(
110+
partitionedTopic,
111+
new Message<Memory<byte>, ReadOnlyMemory<byte>> { Key = Encoding.UTF8.GetBytes("test key 3"), Value = Encoding.UTF8.GetBytes("test val 3") }));
112+
Assert.Equal(0, producer.Flush(TimeSpan.FromSeconds(10)));
113+
}
114+
115+
for (int i = 0; i < 2; ++i)
116+
{
117+
var dr = drs3[i].Result;
118+
Assert.Equal(partitionedTopic, dr.Topic);
119+
Assert.True(dr.Offset >= 0);
120+
Assert.True(dr.Partition == 0 || dr.Partition == 1);
121+
Assert.Equal($"test key {i+2}", Encoding.UTF8.GetString(dr.Message.Key.Span));
122+
Assert.Equal($"test val {i+2}", Encoding.UTF8.GetString(dr.Message.Value.Span));
123+
Assert.Equal(TimestampType.CreateTime, dr.Message.Timestamp.Type);
124+
Assert.True(Math.Abs((DateTime.UtcNow - dr.Message.Timestamp.UtcDateTime).TotalMinutes) < 1.0);
125+
}
126+
127+
Assert.Equal((Partition)1, drs3[0].Result.Partition);
128+
129+
// Memory<byte>? case
130+
131+
var drs4 = new List<Task<DeliveryResult<ReadOnlyMemory<byte>?, Memory<byte>?>>>();
132+
using (var producer = new TestProducerBuilder<ReadOnlyMemory<byte>?, Memory<byte>?>(producerConfig).Build())
133+
{
134+
drs4.Add(producer.ProduceAsync(
135+
new TopicPartition(partitionedTopic, 1),
136+
new Message<ReadOnlyMemory<byte>?, Memory<byte>?> { Key = Encoding.UTF8.GetBytes("test key 2"), Value = Encoding.UTF8.GetBytes("test val 2") }));
137+
drs4.Add(producer.ProduceAsync(
138+
partitionedTopic,
139+
new Message<ReadOnlyMemory<byte>?, Memory<byte>?> { Key = Encoding.UTF8.GetBytes("test key 3"), Value = Encoding.UTF8.GetBytes("test val 3") }));
140+
Assert.Equal(0, producer.Flush(TimeSpan.FromSeconds(10)));
141+
}
142+
143+
for (int i = 0; i < 2; ++i)
144+
{
145+
var dr = drs4[i].Result;
146+
Assert.Equal(partitionedTopic, dr.Topic);
147+
Assert.True(dr.Offset >= 0);
148+
Assert.True(dr.Partition == 0 || dr.Partition == 1);
149+
Assert.True(dr.Message.Key.HasValue);
150+
Assert.Equal($"test key {i+2}", Encoding.UTF8.GetString(dr.Message.Key.Value.Span));
151+
Assert.True(dr.Message.Value.HasValue);
152+
Assert.Equal($"test val {i+2}", Encoding.UTF8.GetString(dr.Message.Value.Value.Span));
153+
Assert.Equal(TimestampType.CreateTime, dr.Message.Timestamp.Type);
154+
Assert.True(Math.Abs((DateTime.UtcNow - dr.Message.Timestamp.UtcDateTime).TotalMinutes) < 1.0);
155+
}
156+
157+
Assert.Equal((Partition)1, drs4[0].Result.Partition);
158+
101159
Assert.Equal(0, Library.HandleCount);
102160
LogToFile("end Producer_ProduceAsync_Task");
103161
}

test/Confluent.Kafka.IntegrationTests/Tests/Producer_Produce_Async.cs

Lines changed: 0 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -60,36 +60,5 @@ public void Producer_Produce_Async(string bootstrapServers)
6060
Assert.Equal(0, Library.HandleCount);
6161
LogToFile("end Producer_Produce_Async");
6262
}
63-
64-
[Theory, MemberData(nameof(KafkaParameters))]
65-
public void Producer_Produce_Memory_Async(string bootstrapServers)
66-
{
67-
LogToFile("start Producer_Produce_Memory_Async");
68-
69-
var producerConfig = new ProducerConfig { BootstrapServers = bootstrapServers };
70-
71-
using (var testTopic = new TemporaryTopic(bootstrapServers, 1))
72-
using (var producer = new TestProducerBuilder<Memory<byte>?, Memory<byte>>(producerConfig)
73-
.Build())
74-
using (var dProducer = new DependentProducerBuilder<ReadOnlyMemory<byte>, ReadOnlyMemory<byte>?>(producer.Handle)
75-
.Build())
76-
{
77-
Memory<byte> data = new byte[] { 1, 2, 3, 4 };
78-
Assert.Throws<ProduceException<Memory<byte>?, Memory<byte>>>(
79-
() => producer.Produce(testTopic.Name, new Message<Memory<byte>?, Memory<byte>> { Value = data }));
80-
81-
Assert.Throws<ProduceException<Memory<byte>?, Memory<byte>>>(
82-
() => producer.Produce(testTopic.Name, new Message<Memory<byte>?, Memory<byte>> { Value = data }, dr => { Assert.True(false); }));
83-
84-
Assert.Throws<ProduceException<ReadOnlyMemory<byte>, ReadOnlyMemory<byte>?>>(
85-
() => dProducer.Produce(testTopic.Name, new Message<ReadOnlyMemory<byte>, ReadOnlyMemory<byte>?> { Key = data }));
86-
87-
Assert.Throws<ProduceException<ReadOnlyMemory<byte>, ReadOnlyMemory<byte>?>>(
88-
() => dProducer.Produce(testTopic.Name, new Message<ReadOnlyMemory<byte>, ReadOnlyMemory<byte>?> { Key = data }, dr => { Assert.True(false); }));
89-
}
90-
91-
Assert.Equal(0, Library.HandleCount);
92-
LogToFile("end Producer_Produce_Memory_Async");
93-
}
9463
}
9564
}

test/Confluent.Kafka.IntegrationTests/Tests/Producer_Produce_Error.cs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,29 @@ public void Producer_Produce_Error(string bootstrapServers)
8989

9090
Assert.Equal(1, count);
9191

92+
// Memory<byte> case.
93+
94+
count = 0;
95+
Action<DeliveryReport<Memory<byte>, ReadOnlyMemory<byte>?>> dh3 = dr =>
96+
{
97+
Assert.Equal(ErrorCode.Local_UnknownPartition, dr.Error.Code);
98+
Assert.Equal((Partition)42, dr.Partition);
99+
Assert.Equal(singlePartitionTopic, dr.Topic);
100+
Assert.Equal(Offset.Unset, dr.Offset);
101+
Assert.Equal(new byte[] { 11 }, dr.Message.Key.ToArray());
102+
Assert.Null(dr.Message.Value);
103+
Assert.Equal(TimestampType.NotAvailable, dr.Message.Timestamp.Type);
104+
count += 1;
105+
};
106+
107+
using (var producer = new TestProducerBuilder<Memory<byte>, ReadOnlyMemory<byte>?>(producerConfig).Build())
108+
{
109+
producer.Produce(new TopicPartition(singlePartitionTopic, 42), new Message<Memory<byte>, ReadOnlyMemory<byte>?> { Key = new byte[] { 11 } }, dh3);
110+
producer.Flush(TimeSpan.FromSeconds(10));
111+
}
112+
113+
Assert.Equal(1, count);
114+
92115
Assert.Equal(0, Library.HandleCount);
93116
LogToFile("end Producer_Produce_Error");
94117
}

test/Confluent.Kafka.IntegrationTests/Tests/Producer_Produce_Null.cs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,31 @@ public void Producer_Produce_Null(string bootstrapServers)
8989

9090
Assert.Equal(2, count);
9191

92+
// Memory<byte>? case.
93+
94+
count = 0;
95+
Action<DeliveryReport<ReadOnlyMemory<byte>?, Memory<byte>?>> dh3 = dr =>
96+
{
97+
Assert.Equal(ErrorCode.NoError, dr.Error.Code);
98+
Assert.Equal((Partition)0, dr.Partition);
99+
Assert.Equal(singlePartitionTopic, dr.Topic);
100+
Assert.True(dr.Offset >= 0);
101+
Assert.Null(dr.Message.Key);
102+
Assert.Null(dr.Message.Value);
103+
Assert.Equal(TimestampType.CreateTime, dr.Message.Timestamp.Type);
104+
Assert.True(Math.Abs((DateTime.UtcNow - dr.Message.Timestamp.UtcDateTime).TotalMinutes) < 1.0);
105+
count += 1;
106+
};
107+
108+
using (var producer = new TestProducerBuilder<ReadOnlyMemory<byte>?, Memory<byte>?>(producerConfig).Build())
109+
{
110+
producer.Produce(new TopicPartition(singlePartitionTopic, 0), new Message<ReadOnlyMemory<byte>?, Memory<byte>?>(), dh3);
111+
producer.Produce(singlePartitionTopic, new Message<ReadOnlyMemory<byte>?, Memory<byte>?>(), dh3);
112+
producer.Flush(TimeSpan.FromSeconds(10));
113+
}
114+
115+
Assert.Equal(2, count);
116+
92117
Assert.Equal(0, Library.HandleCount);
93118
LogToFile("end Producer_Produce_Null");
94119
}

0 commit comments

Comments
 (0)