Skip to content

Natively support Memory<byte> #2311

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 13 commits into
base: master
Choose a base branch
from

Conversation

verdie-g
Copy link

@verdie-g verdie-g commented Sep 13, 2024

Closes #1238, #1603, #1725, #2177, #2219, #1782, #2367.

Problem

The current serializer ISerializer forces the user to return a byte[]. This is an important performance problem for two reasons:

  1. It prevents sending a slice of a byte[]. In the following example, a protobuf object is serialized in a MemoryStream and to send these bytes to the Kafka client, MemoryStream.GetBuffer which returns the underlying buffer can't be used because it would return the data + some extra bytes. So MemoryStream.ToArray has to be used, meaning copying the entire buffer.
ProducerConfig config = new() { BootstrapServers = "localhost:9092" };
IProducer<Null, byte[]> producer = new ProducerBuilder<Null, byte[]>(config).Build();

MemoryStream ms = new();
Data.WriteTo(ms);
byte[] buffer = ms.ToArray(); // Copy the whole thing :/
_producer.Produce("demo", new Message<Null, byte[]> { Value = buffer });
  1. It prevents memory pooling. ArrayPool.Rent can return a buffer greater than requested and the Kafka client doesn't support sending a "slice" of it. That's a corollary of the first problem but it's important to mention it because each buffer passed to Produce will be pinned, which cause an important amount of pressure on the GC. If pooling was available, we could create a pool of buffer on the POH (pinned object heap) using GC.AllocateArray.

Solution

There are many PRs open aiming to extend the API to allow low allocation produce but they all have some caveats:

Instead of adding a third type of serializer (after ISerializer and IAsyncSerializer), this PR makes a special case for Memory<byte> (and ReadOnlyMemory<byte>), just like byte[] from the user POV, to directly use Message.Key and Message.Value.

So in the end, you can write

ProducerConfig config = new() { BootstrapServers = "localhost:9092" };
IProducer<Null, ReadOnlyMemory<byte>> producer = new ProducerBuilder<Null, ReadOnlyMemory<byte>>(config).Build();

MemoryStream ms = new();
Data.WriteTo(ms);
ReadOnlyMemory<byte> mem = ms.GetBuffer().AsMemory(0, (int)ms.Length);
_producer.Produce("demo", new Message<Null, ReadOnlyMemory<byte>> { Value = mem });

Benchmark

Here is a benchmark using BenchmarkDotNet which serializes a protobuf object of ~10 KB from open-telemetry/opentelemetry-proto and produces it.

  • ProduceByteArray copies the MemoryStream to a byte[]
  • ProduceMemory gets a slice of the MemoryStream with no copy. It's 35% faster and uses 20% less memory
  • ProducePooledMemory is similar as ProduceMemory but uses Microsoft.IO.RecyclableMemoryStream to show how pooling could work. It's 32% faster and uses 83% less memory

In real-world the benefits could be way higher because the benchmark doesn't capture the decreased pressure on the GC from the avoided allocation as well as the pinned buffers being reused.

Results

Method Mean Error StdDev Ratio RatioSD Allocated Alloc Ratio
ProduceByteArray 88.26 us 5.763 us 14.774 us 1.02 0.23 44.22 KB 1.00
ProduceMemory 57.74 us 1.987 us 5.540 us 0.67 0.11 35.37 KB 0.80
ProducePooledMemory 60.36 us 2.582 us 7.449 us 0.70 0.13 7.51 KB 0.17

Code

using BenchmarkDotNet.Attributes;
using BenchmarkDotNet.Running;
using Confluent.Kafka;
using Google.Protobuf;
using Microsoft.IO;
using OpenTelemetry.Proto.Common.V1;
using OpenTelemetry.Proto.Metrics.V1;

BenchmarkSwitcher.FromAssembly(typeof(Program).Assembly)
    .Run(Array.Empty<string>(), new BenchmarkDotNet.Configs.DebugInProcessConfig());

[MemoryDiagnoser]
public class ProduceBench
{
    private static readonly MetricsData Data = GenerateData();

    private static readonly RecyclableMemoryStreamManager MemoryManager = new(new RecyclableMemoryStreamManager.Options
    {
        BlockSize = 10 * 1024,
    });

    private static MetricsData GenerateData()
    {
        return new MetricsData
        {
            ResourceMetrics =
            {
                new ResourceMetrics
                {
                    ScopeMetrics =
                    {
                        new ScopeMetrics
                        {
                            Metrics =
                            {
                                Enumerable.Range(0, 100).Select(_ => new Metric
                                {
                                    Name = "example_gauge",
                                    Description = "An example of a gauge metric",
                                    Unit = "ms",
                                    Gauge = new Gauge
                                    {
                                        DataPoints =
                                        {
                                            new NumberDataPoint
                                            {
                                                StartTimeUnixNano = 1,
                                                TimeUnixNano = 2,
                                                AsDouble = 123.45,
                                                Attributes =
                                                {
                                                    new KeyValue
                                                    {
                                                        Key = "key2",
                                                        Value = new AnyValue { StringValue = "value2" },
                                                    }
                                                }
                                            }
                                        }
                                    }
                                })
                            }
                        }
                    }
                }
            }
        };
    }

    private IProducer<Null, byte[]> _producer1 = default!;
    private IProducer<Null, ReadOnlyMemory<byte>> _producer2 = default!;

    [GlobalSetup]
    public void GlobalSetup()
    {
        ProducerConfig config = new() { BootstrapServers = "localhost:9092" };
        _producer1 = new ProducerBuilder<Null, byte[]>(config).Build();
        _producer2 = new ProducerBuilder<Null, ReadOnlyMemory<byte>>(config).Build();
    }

    [GlobalCleanup]
    public void GlobalCleanup()
    {
        _producer1.Dispose();
        _producer2.Dispose();
    }

    [IterationSetup]
    public void IterationSetup()
    {
        _producer1.Flush();
        _producer2.Flush();
    }

    [Benchmark(Baseline = true)]
    public void ProduceByteArray()
    {
        MemoryStream ms = new();
        Data.WriteTo(ms);
        _producer1.Produce("demo", new Message<Null, byte[]> { Value = ms.ToArray() });
    }

    [Benchmark]
    public void ProduceMemory()
    {
        MemoryStream ms = new();
        Data.WriteTo(ms);
        ReadOnlyMemory<byte> mem = ms.GetBuffer().AsMemory(0, (int)ms.Length);
        _producer2.Produce("demo", new Message<Null, ReadOnlyMemory<byte>> { Value = mem });
    }

    [Benchmark]
    public void ProducePooledMemory()
    {
        using var ms = MemoryManager.GetStream();
        Data.WriteTo((Stream)ms);
        ReadOnlyMemory<byte> mem = ms.GetBuffer().AsMemory(0, (int)ms.Length);
        _producer2.Produce("demo", new Message<Null, ReadOnlyMemory<byte>> { Value = mem });
    }
}

Production test

We have also load tested a prod service that uses kafka extensively.

Before:
image

After:
image

What can be observed is that, with the change, on the 3rd step, the latency is greatly better, while before the change, some requests even fail.

Copy link

cla-assistant bot commented Sep 13, 2024

CLA assistant check
All committers have signed the CLA.

Copy link

cla-assistant bot commented Sep 13, 2024

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution.
You have signed the CLA already but the status is still pending? Let us recheck it.

@verdie-g verdie-g marked this pull request as ready for review September 16, 2024 15:36
@verdie-g verdie-g requested review from a team as code owners September 16, 2024 15:36
@bmesetovic
Copy link

There seems to be no progress on either of the several approaches to reduce allocations on the publishing side of Kafka. All of them are stuck at the review phase.

We are affected by the issue as well and would really like to see how we can push this topic so that it finally gets the attention it deserves.

@verdie-g Did you end up forking the project and adding your changes?

@verdie-g
Copy link
Author

We used a fork with #2219 just to confirm the important performance benefits but we were not ready to maintain a fork.

We are still waiting for a review on that PR.

@verdie-g verdie-g force-pushed the produce-memory-byte branch from f0a45e6 to 79bb726 Compare November 18, 2024 09:37
@confluent-cla-assistant
Copy link

confluent-cla-assistant bot commented Nov 18, 2024

🎉 All Contributor License Agreements have been signed. Ready to merge.
✅ verdie-g
Please push an empty commit if you would like to re-run the checks to verify CLA status for all contributors.

@verdie-g verdie-g force-pushed the produce-memory-byte branch from edb8d39 to caf216a Compare April 10, 2025 00:52
@verdie-g
Copy link
Author

@Claimundefine @rayokota could you have a look at his PR 🙏 It's been sitting for a while and I believe it fixes an important performance issue in the client.

@verdie-g
Copy link
Author

or @anchitj @emasab 🥺

@Scylin232
Copy link

I also would love to see this feature, @anchitj @emasab, give this Dino some hugs from your side

@verdie-g verdie-g force-pushed the produce-memory-byte branch from 63a7bbc to 30146df Compare May 26, 2025 19:44
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

SerializationContext
3 participants