-
Notifications
You must be signed in to change notification settings - Fork 2.8k
cumulative_to_delta_processor: Add Cumulative to Delta Conversion logic #4216
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
62d8f56
64de598
9e1bac5
63c02e3
474868e
be9e823
4404e8d
678f7ac
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,35 +16,97 @@ package cumulativetodeltaprocessor | |
|
||
import ( | ||
"context" | ||
"time" | ||
|
||
"go.opentelemetry.io/collector/component" | ||
"go.opentelemetry.io/collector/model/pdata" | ||
"go.uber.org/zap" | ||
|
||
awsmetrics "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/metrics" | ||
) | ||
|
||
type cumulativeToDeltaProcessor struct { | ||
metrics []string | ||
logger *zap.Logger | ||
metrics map[string]bool | ||
logger *zap.Logger | ||
deltaCalculator awsmetrics.MetricCalculator | ||
} | ||
|
||
func newCumulativeToDeltaProcessor(config *Config, logger *zap.Logger) *cumulativeToDeltaProcessor { | ||
inputMetricSet := make(map[string]bool, len(config.Metrics)) | ||
for _, name := range config.Metrics { | ||
inputMetricSet[name] = true | ||
} | ||
|
||
return &cumulativeToDeltaProcessor{ | ||
metrics: config.Metrics, | ||
logger: logger, | ||
metrics: inputMetricSet, | ||
logger: logger, | ||
deltaCalculator: newDeltaCalculator(), | ||
} | ||
} | ||
|
||
// Start is invoked during service startup. | ||
func (mgp *cumulativeToDeltaProcessor) Start(context.Context, component.Host) error { | ||
func (ctdp *cumulativeToDeltaProcessor) Start(context.Context, component.Host) error { | ||
return nil | ||
} | ||
|
||
// processMetrics implements the ProcessMetricsFunc type. | ||
func (mgp *cumulativeToDeltaProcessor) processMetrics(_ context.Context, md pdata.Metrics) (pdata.Metrics, error) { | ||
func (ctdp *cumulativeToDeltaProcessor) processMetrics(_ context.Context, md pdata.Metrics) (pdata.Metrics, error) { | ||
resourceMetricsSlice := md.ResourceMetrics() | ||
for i := 0; i < resourceMetricsSlice.Len(); i++ { | ||
rm := resourceMetricsSlice.At(i) | ||
ilms := rm.InstrumentationLibraryMetrics() | ||
for j := 0; j < ilms.Len(); j++ { | ||
ilm := ilms.At(j) | ||
metricSlice := ilm.Metrics() | ||
for k := 0; k < metricSlice.Len(); k++ { | ||
metric := metricSlice.At(k) | ||
if ctdp.metrics[metric.Name()] { | ||
if metric.DataType() == pdata.MetricDataTypeSum && metric.Sum().AggregationTemporality() == pdata.AggregationTemporalityCumulative { | ||
dataPoints := metric.Sum().DataPoints() | ||
|
||
for l := 0; l < dataPoints.Len(); l++ { | ||
fromDataPoint := dataPoints.At(l) | ||
labelMap := make(map[string]string) | ||
|
||
fromDataPoint.LabelsMap().Range(func(k string, v string) bool { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am not sure. Is there any such possibility? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For sure they can come in any order since a "Map" (LabelMap) does not guarantee ordering. But you add them to another Map so there should be no problem. Correct? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hi @a-feld, I think we are already sorting them in the |
||
labelMap[k] = v | ||
return true | ||
}) | ||
|
||
result, _ := ctdp.deltaCalculator.Calculate(metric.Name(), labelMap, fromDataPoint.Value(), fromDataPoint.Timestamp().AsTime()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since the delta calculator operates only on metric name and label map, what happens if the same metricName + labelMap is sent across resources / instrumentation libraries? Do metrics compute deltas without taking into account the resource / instrumentation library? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point. For my use case, I don't think there would be a scenario where the metric name and labels cannot uniquly identify a metric. However, if thats a requirement, we can update the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We cannot implement "logic" for our use-cases. I think this problem needs to be taken care of. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hi @bogdandrutu , I pushed an update where we use Rsource attributes and Metric labels together to create the mapping. As @a-feld mentioned, we had a meeting over zoom. They had couple of more requirements. They will send a follow-up PR after this to address those needs. |
||
|
||
fromDataPoint.SetValue(result.(delta).value) | ||
fromDataPoint.SetStartTimestamp(pdata.TimestampFromTime(result.(delta).prevTimestamp)) | ||
} | ||
metric.Sum().SetAggregationTemporality(pdata.AggregationTemporalityDelta) | ||
} | ||
} | ||
} | ||
} | ||
} | ||
return md, nil | ||
} | ||
|
||
// Shutdown is invoked during service shutdown. | ||
func (mgp *cumulativeToDeltaProcessor) Shutdown(context.Context) error { | ||
func (ctdp *cumulativeToDeltaProcessor) Shutdown(context.Context) error { | ||
return nil | ||
} | ||
|
||
func newDeltaCalculator() awsmetrics.MetricCalculator { | ||
return awsmetrics.NewMetricCalculator(func(prev *awsmetrics.MetricValue, val interface{}, timestamp time.Time) (interface{}, bool) { | ||
result := delta{value: val.(float64), prevTimestamp: timestamp} | ||
|
||
if prev != nil { | ||
deltaValue := val.(float64) - prev.RawValue.(float64) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I had to look through See https://github.com/lightstep/opentelemetry-prometheus-sidecar/blob/main/retrieval/series_cache.go#L331 for a similar handler used to convert Prometheus cumulatives w/o a start time to cumulatives with a start time. If you pass through similar logic first, you won't see the negative delta (for monotonic cumulatives). Approving this PR because the implied logic fix probably belongs in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks Josh. Will check and update here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @jmacd in order to determine a reset we should initially use "startTime" and if that is missing (0), fallback to the prometheus way to detect reset? |
||
result.value = deltaValue | ||
result.prevTimestamp = prev.Timestamp | ||
return result, true | ||
} | ||
return result, false | ||
}) | ||
} | ||
|
||
type delta struct { | ||
value float64 | ||
prevTimestamp time.Time | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,175 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package cumulativetodeltaprocessor | ||
|
||
import ( | ||
"context" | ||
"testing" | ||
"time" | ||
|
||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/require" | ||
"go.opentelemetry.io/collector/component/componenttest" | ||
"go.opentelemetry.io/collector/config" | ||
"go.opentelemetry.io/collector/consumer/consumertest" | ||
"go.opentelemetry.io/collector/model/pdata" | ||
) | ||
|
||
type testMetric struct { | ||
metricNames []string | ||
metricValues [][]float64 | ||
isCumulative []bool | ||
} | ||
|
||
type cumulativeToDeltaTest struct { | ||
name string | ||
metrics []string | ||
inMetrics pdata.Metrics | ||
outMetrics pdata.Metrics | ||
} | ||
|
||
var ( | ||
testCases = []cumulativeToDeltaTest{ | ||
{ | ||
name: "cumulative_to_delta_expect_same", | ||
metrics: nil, | ||
inMetrics: generateTestMetrics(testMetric{ | ||
metricNames: []string{"metric_1", "metric_2"}, | ||
metricValues: [][]float64{{100}, {4}}, | ||
isCumulative: []bool{true, true}, | ||
}), | ||
outMetrics: generateTestMetrics(testMetric{ | ||
metricNames: []string{"metric_1", "metric_2"}, | ||
metricValues: [][]float64{{100}, {4}}, | ||
isCumulative: []bool{true, true}, | ||
}), | ||
}, | ||
{ | ||
name: "cumulative_to_delta_one_positive", | ||
metrics: []string{"metric_1"}, | ||
inMetrics: generateTestMetrics(testMetric{ | ||
metricNames: []string{"metric_1", "metric_2"}, | ||
metricValues: [][]float64{{100, 200, 500}, {4}}, | ||
isCumulative: []bool{true, true}, | ||
}), | ||
outMetrics: generateTestMetrics(testMetric{ | ||
metricNames: []string{"metric_1", "metric_2"}, | ||
metricValues: [][]float64{{100, 100, 300}, {4}}, | ||
isCumulative: []bool{false, true}, | ||
}), | ||
}, | ||
} | ||
) | ||
|
||
func TestCumulativeToDeltaProcessor(t *testing.T) { | ||
for _, test := range testCases { | ||
t.Run(test.name, func(t *testing.T) { | ||
// next stores the results of the filter metric processor | ||
next := new(consumertest.MetricsSink) | ||
cfg := &Config{ | ||
ProcessorSettings: config.NewProcessorSettings(config.NewID(typeStr)), | ||
Metrics: test.metrics, | ||
} | ||
factory := NewFactory() | ||
mgp, err := factory.CreateMetricsProcessor( | ||
context.Background(), | ||
componenttest.NewNopProcessorCreateSettings(), | ||
cfg, | ||
next, | ||
) | ||
assert.NotNil(t, mgp) | ||
assert.Nil(t, err) | ||
|
||
caps := mgp.Capabilities() | ||
assert.True(t, caps.MutatesData) | ||
ctx := context.Background() | ||
require.NoError(t, mgp.Start(ctx, nil)) | ||
|
||
cErr := mgp.ConsumeMetrics(context.Background(), test.inMetrics) | ||
assert.Nil(t, cErr) | ||
got := next.AllMetrics() | ||
|
||
require.Equal(t, 1, len(got)) | ||
require.Equal(t, test.outMetrics.ResourceMetrics().Len(), got[0].ResourceMetrics().Len()) | ||
|
||
expectedMetrics := test.outMetrics.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics() | ||
actualMetrics := got[0].ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics() | ||
|
||
require.Equal(t, expectedMetrics.Len(), actualMetrics.Len()) | ||
|
||
for i := 0; i < expectedMetrics.Len(); i++ { | ||
eM := expectedMetrics.At(i) | ||
aM := actualMetrics.At(i) | ||
|
||
require.Equal(t, eM.Name(), aM.Name()) | ||
|
||
if eM.DataType() == pdata.MetricDataTypeGauge { | ||
eDataPoints := eM.Gauge().DataPoints() | ||
aDataPoints := aM.Gauge().DataPoints() | ||
require.Equal(t, eDataPoints.Len(), aDataPoints.Len()) | ||
|
||
for j := 0; j < eDataPoints.Len(); j++ { | ||
require.Equal(t, eDataPoints.At(j).Value(), aDataPoints.At(j).Value()) | ||
} | ||
} | ||
|
||
if eM.DataType() == pdata.MetricDataTypeSum { | ||
eDataPoints := eM.Sum().DataPoints() | ||
aDataPoints := aM.Sum().DataPoints() | ||
|
||
require.Equal(t, eDataPoints.Len(), aDataPoints.Len()) | ||
require.Equal(t, eM.Sum().AggregationTemporality(), aM.Sum().AggregationTemporality()) | ||
|
||
for j := 0; j < eDataPoints.Len(); j++ { | ||
require.Equal(t, eDataPoints.At(j).Value(), aDataPoints.At(j).Value()) | ||
} | ||
} | ||
|
||
} | ||
|
||
require.NoError(t, mgp.Shutdown(ctx)) | ||
}) | ||
} | ||
} | ||
|
||
func generateTestMetrics(tm testMetric) pdata.Metrics { | ||
md := pdata.NewMetrics() | ||
now := time.Now() | ||
|
||
rm := md.ResourceMetrics().AppendEmpty() | ||
ms := rm.InstrumentationLibraryMetrics().AppendEmpty().Metrics() | ||
for i, name := range tm.metricNames { | ||
m := ms.AppendEmpty() | ||
m.SetName(name) | ||
m.SetDataType(pdata.MetricDataTypeSum) | ||
|
||
sum := m.Sum() | ||
sum.SetIsMonotonic(true) | ||
|
||
if tm.isCumulative[i] { | ||
sum.SetAggregationTemporality(pdata.AggregationTemporalityCumulative) | ||
} else { | ||
sum.SetAggregationTemporality(pdata.AggregationTemporalityDelta) | ||
} | ||
|
||
for _, value := range tm.metricValues[i] { | ||
dp := m.Sum().DataPoints().AppendEmpty() | ||
dp.SetTimestamp(pdata.TimestampFromTime(now.Add(10 * time.Second))) | ||
dp.SetValue(value) | ||
} | ||
} | ||
|
||
return md | ||
} |
Uh oh!
There was an error while loading. Please reload this page.