Skip to content

Update cumulative to delta processor. #4444

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions processor/cumulativetodeltaprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,27 @@ The cumulative to delta processor (`cumulativetodeltaprocessor`) converts cumula

## Configuration

Configuration is specified through a list of metrics. The processor uses metric names to identify a set of cumulative sum metrics and converts them to cumulative delta.
The default configuration is to convert all monotonic sum metrics from aggregation temporality cumulative to aggregation temporality delta.

The following settings can be optionally configured:

- `metrics`: The processor uses metric names to identify a set of cumulative sum metrics and converts them to cumulative delta. Defaults to converting all metric names.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar question here. Do you have a use case where all metrics need to be translated? I think it would be the best if we will add regex support in future instead.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New Relic requires all metrics to be translated from cumulative -> delta temporality 😄
The New Relic platform does not currently support cumulative metric temporalities.

- `max_stale`: The total time a state entry will live past the time it was last seen. Set to 0 to retain state indefinitely. Default: 0
- `monotonic_only`: Specify whether only monotonic metrics are converted from cumulative to delta. Default: `true`. Set to `false` to convert metrics regardless of monotonic setting.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I there any real use case that you want to cover where the support of "gauges to deltas" needed? Otherwise I would recommend to remove this option and always skip non monotonic metrics.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, this is a good point. I think I'll go ahead and remove this option. After that update, this library will only attempt to convert monotonic metrics.


#### Example

```yaml
processors:
# processor name: cumulativetodelta
cumulativetodelta:

# list the cumulative sum metrics to convert to delta
# (optional - defaults to converting all monotonic cumulative sum metrics)
metrics:
- <metric_1_name>
- <metric_2_name>
.
.
- <metric_n_name>
```
```
17 changes: 9 additions & 8 deletions processor/cumulativetodeltaprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
package cumulativetodeltaprocessor

import (
"fmt"
"time"

"go.opentelemetry.io/collector/config"
)
Expand All @@ -24,15 +24,16 @@ import (
type Config struct {
config.ProcessorSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct

// List of cumulative sum metrics to convert to delta
// List of cumulative metrics to convert to delta. Default: converts all cumulative metrics to delta.
Metrics []string `mapstructure:"metrics"`

// MaxStale is the total time a state entry will live past the time it was last seen. Set to 0 to retain state indefinitely.
MaxStale time.Duration `mapstructure:"max_stale"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure about the use case here. Wondering, how important is it to expose as a config option? Less configuarability gives us simplicity.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The use case is for the prometheus receiver. The scrape interval is configurable and so we wouldn't want metrics to go stale prior to the scraper completing. Allowing this value to be configurable means that users can configure the staleness with their scrape interval + scrape duration + fudge factor.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good. I suggest MaxStaleness.

}

// Validate checks whether the input configuration has all of the required fields for the processor.
// An error is returned if there are any invalid inputs.
func (config *Config) Validate() error {
if len(config.Metrics) == 0 {
return fmt.Errorf("metric names are missing")
}
var _ config.Processor = (*Config)(nil)

// Validate checks if the processor configuration is valid
func (cfg *Config) Validate() error {
return nil
}
73 changes: 22 additions & 51 deletions processor/cumulativetodeltaprocessor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
package cumulativetodeltaprocessor

import (
"fmt"
"path"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All @@ -26,72 +26,43 @@ import (
"go.opentelemetry.io/collector/config/configtest"
)

const configFile = "config.yaml"

func TestLoadingFullConfig(t *testing.T) {

factories, err := componenttest.NopFactories()
assert.NoError(t, err)

factory := NewFactory()
factories.Processors[typeStr] = factory
cfg, err := configtest.LoadConfigAndValidate(path.Join(".", "testdata", configFile), factories)
assert.NoError(t, err)
require.NotNil(t, cfg)

tests := []struct {
configFile string
expCfg *Config
expCfg *Config
}{
{
configFile: "config_full.yaml",
expCfg: &Config{
ProcessorSettings: config.NewProcessorSettings(config.NewID(typeStr)),
ProcessorSettings: config.NewProcessorSettings(config.NewIDWithName(typeStr, "alt")),
Metrics: []string{
"metric1",
"metric2",
},
MaxStale: 10 * time.Second,
},
},
}

for _, test := range tests {
t.Run(test.expCfg.ID().String(), func(t *testing.T) {
factories, err := componenttest.NopFactories()
assert.NoError(t, err)

factory := NewFactory()
factories.Processors[typeStr] = factory
config, err := configtest.LoadConfigAndValidate(path.Join(".", "testdata", test.configFile), factories)
assert.NoError(t, err)
require.NotNil(t, config)

cfg := config.Processors[test.expCfg.ID()]
assert.Equal(t, test.expCfg, cfg)
})
}
}

func TestValidateConfig(t *testing.T) {
tests := []struct {
configName string
succeed bool
errorMessage string
}{
{
configName: "config_full.yaml",
succeed: true,
},
{
configName: "config_missing_name.yaml",
succeed: false,
errorMessage: "metric names are missing",
expCfg: &Config{
ProcessorSettings: config.NewProcessorSettings(config.NewID(typeStr)),
},
},
}

for _, test := range tests {
factories, err := componenttest.NopFactories()
assert.NoError(t, err)

factory := NewFactory()
factories.Processors[typeStr] = factory
t.Run(test.configName, func(t *testing.T) {
config, err := configtest.LoadConfigAndValidate(path.Join(".", "testdata", test.configName), factories)
if test.succeed {
assert.NotNil(t, config)
assert.NoError(t, err)
} else {
assert.EqualError(t, err, fmt.Sprintf("processor %q has invalid configuration: %s", typeStr, test.errorMessage))
}
t.Run(test.expCfg.ID().String(), func(t *testing.T) {
cfg := cfg.Processors[test.expCfg.ID()]
assert.Equal(t, test.expCfg, cfg)
})

}
}
3 changes: 1 addition & 2 deletions processor/cumulativetodeltaprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func createDefaultConfig() config.Processor {
}

func createMetricsProcessor(
ctx context.Context,
_ context.Context,
params component.ProcessorCreateSettings,
cfg config.Processor,
nextConsumer consumer.Metrics,
Expand All @@ -56,7 +56,6 @@ func createMetricsProcessor(
return nil, fmt.Errorf("configuration parsing error")
}

processorConfig.Validate()
metricsProcessor := newCumulativeToDeltaProcessor(processorConfig, params.Logger)

return processorhelper.NewMetricsProcessor(
Expand Down
2 changes: 1 addition & 1 deletion processor/cumulativetodeltaprocessor/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestCreateProcessors(t *testing.T) {
errorMessage string
}{
{
configName: "config_full.yaml",
configName: "config.yaml",
succeed: true,
},
}
Expand Down
3 changes: 0 additions & 3 deletions processor/cumulativetodeltaprocessor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/processor/cumul
go 1.17

require (
github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/metrics v0.36.0
github.com/stretchr/testify v1.7.0
go.opentelemetry.io/collector v0.36.1-0.20210923171211-10f543a9a43f
go.opentelemetry.io/collector/model v0.36.1-0.20210923171211-10f543a9a43f
Expand Down Expand Up @@ -40,5 +39,3 @@ require (
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
)

replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/metrics => ./../../internal/aws/metrics
152 changes: 93 additions & 59 deletions processor/cumulativetodeltaprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,32 +17,35 @@ package cumulativetodeltaprocessor
import (
"context"
"math"
"time"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/model/pdata"
"go.uber.org/zap"

awsmetrics "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/metrics"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/cumulativetodeltaprocessor/tracking"
)

type cumulativeToDeltaProcessor struct {
metrics map[string]bool
metrics map[string]struct{}
logger *zap.Logger
deltaCalculator awsmetrics.MetricCalculator
deltaCalculator tracking.MetricTracker
cancelFunc context.CancelFunc
}

func newCumulativeToDeltaProcessor(config *Config, logger *zap.Logger) *cumulativeToDeltaProcessor {
inputMetricSet := make(map[string]bool, len(config.Metrics))
for _, name := range config.Metrics {
inputMetricSet[name] = true
}

return &cumulativeToDeltaProcessor{
metrics: inputMetricSet,
ctx, cancel := context.WithCancel(context.Background())
p := &cumulativeToDeltaProcessor{
logger: logger,
deltaCalculator: newDeltaCalculator(),
deltaCalculator: tracking.NewMetricTracker(ctx, logger, config.MaxStale),
cancelFunc: cancel,
}
if len(config.Metrics) > 0 {
p.metrics = make(map[string]struct{}, len(config.Metrics))
for _, m := range config.Metrics {
p.metrics[m] = struct{}{}
}
}
return p
}

// Start is invoked during service startup.
Expand All @@ -53,64 +56,95 @@ func (ctdp *cumulativeToDeltaProcessor) Start(context.Context, component.Host) e
// processMetrics implements the ProcessMetricsFunc type.
func (ctdp *cumulativeToDeltaProcessor) processMetrics(_ context.Context, md pdata.Metrics) (pdata.Metrics, error) {
resourceMetricsSlice := md.ResourceMetrics()
for i := 0; i < resourceMetricsSlice.Len(); i++ {
rm := resourceMetricsSlice.At(i)
resourceMetricsSlice.RemoveIf(func(rm pdata.ResourceMetrics) bool {
ilms := rm.InstrumentationLibraryMetrics()
for j := 0; j < ilms.Len(); j++ {
ilm := ilms.At(j)
metricSlice := ilm.Metrics()
for k := 0; k < metricSlice.Len(); k++ {
metric := metricSlice.At(k)
if ctdp.metrics[metric.Name()] {
if metric.DataType() == pdata.MetricDataTypeSum && metric.Sum().AggregationTemporality() == pdata.AggregationTemporalityCumulative {
dataPoints := metric.Sum().DataPoints()

for l := 0; l < dataPoints.Len(); l++ {
fromDataPoint := dataPoints.At(l)
labelMap := make(map[string]string)

fromDataPoint.Attributes().Range(func(k string, v pdata.AttributeValue) bool {
labelMap[k] = v.AsString()
return true
})
datapointValue := fromDataPoint.DoubleVal()
if math.IsNaN(datapointValue) {
continue
}
result, _ := ctdp.deltaCalculator.Calculate(metric.Name(), labelMap, datapointValue, fromDataPoint.Timestamp().AsTime())
ilms.RemoveIf(func(ilm pdata.InstrumentationLibraryMetrics) bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer the new style here, thanks. 😀

ms := ilm.Metrics()
ms.RemoveIf(func(m pdata.Metric) bool {
if ctdp.metrics != nil {
if _, ok := ctdp.metrics[m.Name()]; !ok {
return false
}
}
baseIdentity := tracking.MetricIdentity{
Resource: rm.Resource(),
InstrumentationLibrary: ilm.InstrumentationLibrary(),
MetricDataType: m.DataType(),
MetricName: m.Name(),
MetricUnit: m.Unit(),
}
switch m.DataType() {
case pdata.MetricDataTypeSum:
ms := m.Sum()
if ms.AggregationTemporality() != pdata.AggregationTemporalityCumulative {
return false
}

fromDataPoint.SetDoubleVal(result.(delta).value)
fromDataPoint.SetStartTimestamp(pdata.NewTimestampFromTime(result.(delta).prevTimestamp))
}
metric.Sum().SetAggregationTemporality(pdata.AggregationTemporalityDelta)
// Ignore any metrics that aren't monotonic
if !ms.IsMonotonic() {
return false
}

baseIdentity.MetricIsMonotonic = ms.IsMonotonic()
ctdp.convertDataPoints(ms.DataPoints(), baseIdentity)
ms.SetAggregationTemporality(pdata.AggregationTemporalityDelta)
return ms.DataPoints().Len() == 0
default:
return false
}
}
}
}
})
return ilm.Metrics().Len() == 0
})
return rm.InstrumentationLibraryMetrics().Len() == 0
})
return md, nil
}

// Shutdown is invoked during service shutdown.
func (ctdp *cumulativeToDeltaProcessor) Shutdown(context.Context) error {
ctdp.cancelFunc()
return nil
}

func newDeltaCalculator() awsmetrics.MetricCalculator {
return awsmetrics.NewMetricCalculator(func(prev *awsmetrics.MetricValue, val interface{}, timestamp time.Time) (interface{}, bool) {
result := delta{value: val.(float64), prevTimestamp: timestamp}

if prev != nil {
deltaValue := val.(float64) - prev.RawValue.(float64)
result.value = deltaValue
result.prevTimestamp = prev.Timestamp
return result, true
}
return result, false
})
}
func (ctdp *cumulativeToDeltaProcessor) convertDataPoints(in interface{}, baseIdentity tracking.MetricIdentity) {
switch dps := in.(type) {
case pdata.NumberDataPointSlice:
dps.RemoveIf(func(dp pdata.NumberDataPoint) bool {
id := baseIdentity
id.StartTimestamp = dp.StartTimestamp()
id.Attributes = dp.Attributes()
id.MetricValueType = dp.Type()
point := tracking.ValuePoint{
ObservedTimestamp: dp.Timestamp(),
}
if id.IsFloatVal() {
// Do not attempt to transform NaN values
if math.IsNaN(dp.DoubleVal()) {
return false
}
point.FloatValue = dp.DoubleVal()
} else {
point.IntValue = dp.IntVal()
}
trackingPoint := tracking.MetricPoint{
Identity: id,
Value: point,
}
delta, valid := ctdp.deltaCalculator.Convert(trackingPoint)

type delta struct {
value float64
prevTimestamp time.Time
// When converting non-monotonic cumulative counters,
// the first data point is omitted since the initial
// reference is not assumed to be zero
if !valid {
return true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}
dp.SetStartTimestamp(delta.StartTimestamp)
if id.IsFloatVal() {
dp.SetDoubleVal(delta.FloatValue)
} else {
dp.SetIntVal(delta.IntValue)
}
return false
})
}
}
Loading