Skip to content

cumulative_to_delta_processor: Add Cumulative to Delta Conversion logic #4216

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
3 changes: 3 additions & 0 deletions processor/cumulativetodeltaprocessor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/processor/cumul
go 1.16

require (
github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/metrics v0.0.0-00010101000000-000000000000
github.com/stretchr/testify v1.7.0
go.opentelemetry.io/collector v0.30.2-0.20210727185145-88b2935343aa
go.opentelemetry.io/collector/model v0.30.2-0.20210727185145-88b2935343aa
go.uber.org/zap v1.18.1
)

replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/metrics => ./../../internal/aws/metrics
76 changes: 69 additions & 7 deletions processor/cumulativetodeltaprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,35 +16,97 @@ package cumulativetodeltaprocessor

import (
"context"
"time"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/model/pdata"
"go.uber.org/zap"

awsmetrics "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/metrics"
)

type cumulativeToDeltaProcessor struct {
metrics []string
logger *zap.Logger
metrics map[string]bool
logger *zap.Logger
deltaCalculator awsmetrics.MetricCalculator
}

func newCumulativeToDeltaProcessor(config *Config, logger *zap.Logger) *cumulativeToDeltaProcessor {
inputMetricSet := make(map[string]bool, len(config.Metrics))
for _, name := range config.Metrics {
inputMetricSet[name] = true
}

return &cumulativeToDeltaProcessor{
metrics: config.Metrics,
logger: logger,
metrics: inputMetricSet,
logger: logger,
deltaCalculator: newDeltaCalculator(),
}
}

// Start is invoked during service startup.
func (mgp *cumulativeToDeltaProcessor) Start(context.Context, component.Host) error {
func (ctdp *cumulativeToDeltaProcessor) Start(context.Context, component.Host) error {
return nil
}

// processMetrics implements the ProcessMetricsFunc type.
func (mgp *cumulativeToDeltaProcessor) processMetrics(_ context.Context, md pdata.Metrics) (pdata.Metrics, error) {
func (ctdp *cumulativeToDeltaProcessor) processMetrics(_ context.Context, md pdata.Metrics) (pdata.Metrics, error) {
resourceMetricsSlice := md.ResourceMetrics()
for i := 0; i < resourceMetricsSlice.Len(); i++ {
rm := resourceMetricsSlice.At(i)
ilms := rm.InstrumentationLibraryMetrics()
for j := 0; j < ilms.Len(); j++ {
ilm := ilms.At(j)
metricSlice := ilm.Metrics()
for k := 0; k < metricSlice.Len(); k++ {
metric := metricSlice.At(k)
if ctdp.metrics[metric.Name()] {
if metric.DataType() == pdata.MetricDataTypeSum && metric.Sum().AggregationTemporality() == pdata.AggregationTemporalityCumulative {
dataPoints := metric.Sum().DataPoints()

for l := 0; l < dataPoints.Len(); l++ {
fromDataPoint := dataPoints.At(l)
labelMap := make(map[string]string)

fromDataPoint.LabelsMap().Range(func(k string, v string) bool {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should LabelsMap be sorted here? Do we consider labels that arrive in a different order to be equivalent?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure. Is there any such possibility?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For sure they can come in any order since a "Map" (LabelMap) does not guarantee ordering. But you add them to another Map so there should be no problem. Correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

labelMap[k] = v
return true
})

result, _ := ctdp.deltaCalculator.Calculate(metric.Name(), labelMap, fromDataPoint.Value(), fromDataPoint.Timestamp().AsTime())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the delta calculator operates only on metric name and label map, what happens if the same metricName + labelMap is sent across resources / instrumentation libraries? Do metrics compute deltas without taking into account the resource / instrumentation library?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. For my use case, I don't think there would be a scenario where the metric name and labels cannot uniquly identify a metric.

However, if thats a requirement, we can update the awsmetrics calculator sending a different PR and update this processor accordingly.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cannot implement "logic" for our use-cases. I think this problem needs to be taken care of.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @bogdandrutu , I pushed an update where we use Rsource attributes and Metric labels together to create the mapping.

As @a-feld mentioned, we had a meeting over zoom. They had couple of more requirements. They will send a follow-up PR after this to address those needs.


fromDataPoint.SetValue(result.(delta).value)
fromDataPoint.SetStartTimestamp(pdata.TimestampFromTime(result.(delta).prevTimestamp))
}
metric.Sum().SetAggregationTemporality(pdata.AggregationTemporalityDelta)
}
}
}
}
}
return md, nil
}

// Shutdown is invoked during service shutdown.
func (mgp *cumulativeToDeltaProcessor) Shutdown(context.Context) error {
func (ctdp *cumulativeToDeltaProcessor) Shutdown(context.Context) error {
return nil
}

func newDeltaCalculator() awsmetrics.MetricCalculator {
return awsmetrics.NewMetricCalculator(func(prev *awsmetrics.MetricValue, val interface{}, timestamp time.Time) (interface{}, bool) {
result := delta{value: val.(float64), prevTimestamp: timestamp}

if prev != nil {
deltaValue := val.(float64) - prev.RawValue.(float64)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to look through awsmetrics to see what the calculator does here. This looks good to me, although you might consider for monotonic cumulatives to insert a "Reset heuristic" the way Prometheus has. The current logic looks like it will deliver a negative delta when a cumulative value resets, and you might want to interpret that as a zero depending on your knowledge.

See https://github.com/lightstep/opentelemetry-prometheus-sidecar/blob/main/retrieval/series_cache.go#L331 for a similar handler used to convert Prometheus cumulatives w/o a start time to cumulatives with a start time. If you pass through similar logic first, you won't see the negative delta (for monotonic cumulatives).

Approving this PR because the implied logic fix probably belongs in awsmetrics (?).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Josh. Will check and update here.

Copy link
Member

@a-feld a-feld Jul 20, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deleting this comment On further review, it looks like the initial value is reported.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jmacd in order to determine a reset we should initially use "startTime" and if that is missing (0), fallback to the prometheus way to detect reset?

result.value = deltaValue
result.prevTimestamp = prev.Timestamp
return result, true
}
return result, false
})
}

type delta struct {
value float64
prevTimestamp time.Time
}
175 changes: 175 additions & 0 deletions processor/cumulativetodeltaprocessor/processor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cumulativetodeltaprocessor

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/model/pdata"
)

type testMetric struct {
metricNames []string
metricValues [][]float64
isCumulative []bool
}

type cumulativeToDeltaTest struct {
name string
metrics []string
inMetrics pdata.Metrics
outMetrics pdata.Metrics
}

var (
testCases = []cumulativeToDeltaTest{
{
name: "cumulative_to_delta_expect_same",
metrics: nil,
inMetrics: generateTestMetrics(testMetric{
metricNames: []string{"metric_1", "metric_2"},
metricValues: [][]float64{{100}, {4}},
isCumulative: []bool{true, true},
}),
outMetrics: generateTestMetrics(testMetric{
metricNames: []string{"metric_1", "metric_2"},
metricValues: [][]float64{{100}, {4}},
isCumulative: []bool{true, true},
}),
},
{
name: "cumulative_to_delta_one_positive",
metrics: []string{"metric_1"},
inMetrics: generateTestMetrics(testMetric{
metricNames: []string{"metric_1", "metric_2"},
metricValues: [][]float64{{100, 200, 500}, {4}},
isCumulative: []bool{true, true},
}),
outMetrics: generateTestMetrics(testMetric{
metricNames: []string{"metric_1", "metric_2"},
metricValues: [][]float64{{100, 100, 300}, {4}},
isCumulative: []bool{false, true},
}),
},
}
)

func TestCumulativeToDeltaProcessor(t *testing.T) {
for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
// next stores the results of the filter metric processor
next := new(consumertest.MetricsSink)
cfg := &Config{
ProcessorSettings: config.NewProcessorSettings(config.NewID(typeStr)),
Metrics: test.metrics,
}
factory := NewFactory()
mgp, err := factory.CreateMetricsProcessor(
context.Background(),
componenttest.NewNopProcessorCreateSettings(),
cfg,
next,
)
assert.NotNil(t, mgp)
assert.Nil(t, err)

caps := mgp.Capabilities()
assert.True(t, caps.MutatesData)
ctx := context.Background()
require.NoError(t, mgp.Start(ctx, nil))

cErr := mgp.ConsumeMetrics(context.Background(), test.inMetrics)
assert.Nil(t, cErr)
got := next.AllMetrics()

require.Equal(t, 1, len(got))
require.Equal(t, test.outMetrics.ResourceMetrics().Len(), got[0].ResourceMetrics().Len())

expectedMetrics := test.outMetrics.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics()
actualMetrics := got[0].ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics()

require.Equal(t, expectedMetrics.Len(), actualMetrics.Len())

for i := 0; i < expectedMetrics.Len(); i++ {
eM := expectedMetrics.At(i)
aM := actualMetrics.At(i)

require.Equal(t, eM.Name(), aM.Name())

if eM.DataType() == pdata.MetricDataTypeGauge {
eDataPoints := eM.Gauge().DataPoints()
aDataPoints := aM.Gauge().DataPoints()
require.Equal(t, eDataPoints.Len(), aDataPoints.Len())

for j := 0; j < eDataPoints.Len(); j++ {
require.Equal(t, eDataPoints.At(j).Value(), aDataPoints.At(j).Value())
}
}

if eM.DataType() == pdata.MetricDataTypeSum {
eDataPoints := eM.Sum().DataPoints()
aDataPoints := aM.Sum().DataPoints()

require.Equal(t, eDataPoints.Len(), aDataPoints.Len())
require.Equal(t, eM.Sum().AggregationTemporality(), aM.Sum().AggregationTemporality())

for j := 0; j < eDataPoints.Len(); j++ {
require.Equal(t, eDataPoints.At(j).Value(), aDataPoints.At(j).Value())
}
}

}

require.NoError(t, mgp.Shutdown(ctx))
})
}
}

func generateTestMetrics(tm testMetric) pdata.Metrics {
md := pdata.NewMetrics()
now := time.Now()

rm := md.ResourceMetrics().AppendEmpty()
ms := rm.InstrumentationLibraryMetrics().AppendEmpty().Metrics()
for i, name := range tm.metricNames {
m := ms.AppendEmpty()
m.SetName(name)
m.SetDataType(pdata.MetricDataTypeSum)

sum := m.Sum()
sum.SetIsMonotonic(true)

if tm.isCumulative[i] {
sum.SetAggregationTemporality(pdata.AggregationTemporalityCumulative)
} else {
sum.SetAggregationTemporality(pdata.AggregationTemporalityDelta)
}

for _, value := range tm.metricValues[i] {
dp := m.Sum().DataPoints().AppendEmpty()
dp.SetTimestamp(pdata.TimestampFromTime(now.Add(10 * time.Second)))
dp.SetValue(value)
}
}

return md
}