Skip to content

Commit 29e1c7e

Browse files
authored
Add custom ring implementation to the BatchProcessor (#5237)
1 parent baeb560 commit 29e1c7e

File tree

4 files changed

+201
-5
lines changed

4 files changed

+201
-5
lines changed

sdk/log/batch.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
package log // import "go.opentelemetry.io/otel/sdk/log"
55

66
import (
7-
"container/ring"
87
"context"
98
"errors"
109
"slices"
@@ -255,11 +254,11 @@ type queue struct {
255254
sync.Mutex
256255

257256
cap, len int
258-
read, write *ring.Ring
257+
read, write *ring
259258
}
260259

261260
func newQueue(size int) *queue {
262-
r := ring.New(size)
261+
r := newRing(size)
263262
return &queue{
264263
cap: size,
265264
read: r,
@@ -304,7 +303,7 @@ func (q *queue) TryDequeue(buf []Record, write func([]Record) bool) int {
304303

305304
n := min(len(buf), q.len)
306305
for i := 0; i < n; i++ {
307-
buf[i] = q.read.Value.(Record)
306+
buf[i] = q.read.Value
308307
q.read = q.read.Next()
309308
}
310309

@@ -324,7 +323,7 @@ func (q *queue) Flush() []Record {
324323

325324
out := make([]Record, q.len)
326325
for i := range out {
327-
out[i] = q.read.Value.(Record)
326+
out[i] = q.read.Value
328327
q.read = q.read.Next()
329328
}
330329
q.len = 0

sdk/log/batch_test.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"sync"
1111
"testing"
1212
"time"
13+
"unsafe"
1314

1415
"github.com/stretchr/testify/assert"
1516
"github.com/stretchr/testify/require"
@@ -560,3 +561,31 @@ func TestQueue(t *testing.T) {
560561
assert.Len(t, out, goRoutines, "flushed Records")
561562
})
562563
}
564+
565+
func BenchmarkBatchProcessorOnEmit(b *testing.B) {
566+
var r Record
567+
body := log.BoolValue(true)
568+
r.SetBody(body)
569+
570+
rSize := unsafe.Sizeof(r) + unsafe.Sizeof(body)
571+
ctx := context.Background()
572+
bp := NewBatchProcessor(
573+
defaultNoopExporter,
574+
WithMaxQueueSize(b.N+1),
575+
WithExportMaxBatchSize(b.N+1),
576+
WithExportInterval(time.Hour),
577+
WithExportTimeout(time.Hour),
578+
)
579+
b.Cleanup(func() { _ = bp.Shutdown(ctx) })
580+
581+
b.SetBytes(int64(rSize))
582+
b.ReportAllocs()
583+
b.ResetTimer()
584+
b.RunParallel(func(pb *testing.PB) {
585+
var err error
586+
for pb.Next() {
587+
err = bp.OnEmit(ctx, r)
588+
}
589+
_ = err
590+
})
591+
}

sdk/log/ring.go

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
// Copyright 2009 The Go Authors. All rights reserved.
5+
// Use of this source code is governed by a BSD-style
6+
// license that can be found in the LICENSE file.
7+
8+
package log // import "go.opentelemetry.io/otel/sdk/log"
9+
10+
// A ring is an element of a circular list, or ring. Rings do not have a
11+
// beginning or end; a pointer to any ring element serves as reference to the
12+
// entire ring. Empty rings are represented as nil ring pointers. The zero
13+
// value for a ring is a one-element ring with a nil Value.
14+
//
15+
// This is copied from the "container/ring" package. It uses a Record type for
16+
// Value instead of any to avoid allocations.
17+
type ring struct {
18+
next, prev *ring
19+
Value Record
20+
}
21+
22+
func (r *ring) init() *ring {
23+
r.next = r
24+
r.prev = r
25+
return r
26+
}
27+
28+
// Next returns the next ring element. r must not be empty.
29+
func (r *ring) Next() *ring {
30+
if r.next == nil {
31+
return r.init()
32+
}
33+
return r.next
34+
}
35+
36+
// Prev returns the previous ring element. r must not be empty.
37+
func (r *ring) Prev() *ring {
38+
if r.next == nil {
39+
return r.init()
40+
}
41+
return r.prev
42+
}
43+
44+
// newRing creates a ring of n elements.
45+
func newRing(n int) *ring {
46+
if n <= 0 {
47+
return nil
48+
}
49+
r := new(ring)
50+
p := r
51+
for i := 1; i < n; i++ {
52+
p.next = &ring{prev: p}
53+
p = p.next
54+
}
55+
p.next = r
56+
r.prev = p
57+
return r
58+
}
59+
60+
// Len computes the number of elements in ring r. It executes in time
61+
// proportional to the number of elements.
62+
func (r *ring) Len() int {
63+
n := 0
64+
if r != nil {
65+
n = 1
66+
for p := r.Next(); p != r; p = p.next {
67+
n++
68+
}
69+
}
70+
return n
71+
}
72+
73+
// Do calls function f on each element of the ring, in forward order. The
74+
// behavior of Do is undefined if f changes *r.
75+
func (r *ring) Do(f func(Record)) {
76+
if r != nil {
77+
f(r.Value)
78+
for p := r.Next(); p != r; p = p.next {
79+
f(p.Value)
80+
}
81+
}
82+
}

sdk/log/ring_test.go

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
// Copyright 2009 The Go Authors. All rights reserved.
5+
// Use of this source code is governed by a BSD-style
6+
// license that can be found in the LICENSE file.
7+
8+
package log
9+
10+
import (
11+
"testing"
12+
13+
"github.com/stretchr/testify/assert"
14+
15+
"go.opentelemetry.io/otel/log"
16+
)
17+
18+
func verifyRing(t *testing.T, r *ring, N int, sum int) {
19+
// Length.
20+
assert.Equal(t, N, r.Len(), "r.Len()")
21+
22+
// Iteration.
23+
var n, s int
24+
r.Do(func(v Record) {
25+
n++
26+
body := v.Body()
27+
if body.Kind() != log.KindEmpty {
28+
s += int(body.AsInt64())
29+
}
30+
})
31+
assert.Equal(t, N, n, "number of forward iterations")
32+
if sum >= 0 {
33+
assert.Equal(t, sum, s, "forward ring sum")
34+
}
35+
36+
if r == nil {
37+
return
38+
}
39+
40+
// Connections.
41+
if r.next != nil {
42+
var p *ring // previous element.
43+
for q := r; p == nil || q != r; q = q.next {
44+
if p != nil {
45+
assert.Equalf(t, p, q.prev, "prev = %p, expected q.prev = %p", p, q.prev)
46+
}
47+
p = q
48+
}
49+
assert.Equalf(t, p, r.prev, "prev = %p, expected r.prev = %p", p, r.prev)
50+
}
51+
52+
// Next, Prev.
53+
assert.Equal(t, r.next, r.Next(), "r.Next() != r.next")
54+
assert.Equal(t, r.prev, r.Prev(), "r.Prev() != r.prev")
55+
}
56+
57+
func TestNewRing(t *testing.T) {
58+
for i := 0; i < 10; i++ {
59+
// Empty value.
60+
r := newRing(i)
61+
verifyRing(t, r, i, -1)
62+
}
63+
64+
for n := 0; n < 10; n++ {
65+
r := newRing(n)
66+
for i := 1; i <= n; i++ {
67+
var rec Record
68+
rec.SetBody(log.IntValue(i))
69+
r.Value = rec
70+
r = r.Next()
71+
}
72+
73+
sum := (n*n + n) / 2
74+
verifyRing(t, r, n, sum)
75+
}
76+
}
77+
78+
func TestEmptyRing(t *testing.T) {
79+
var rNext, rPrev ring
80+
verifyRing(t, rNext.Next(), 1, 0)
81+
verifyRing(t, rPrev.Prev(), 1, 0)
82+
83+
var rLen, rDo *ring
84+
assert.Equal(t, rLen.Len(), 0, "Len()")
85+
rDo.Do(func(Record) { assert.Fail(t, "Do func arg called") })
86+
}

0 commit comments

Comments
 (0)