-
Notifications
You must be signed in to change notification settings - Fork 2.8k
Update cumulative to delta processor. #4444
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
2791c68
bbe3cbc
d4d3c49
bcbc722
86840f1
8617d90
6464c55
5e59ad4
ebefb02
4ca2171
36c7d2a
88fa329
007837d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,18 +9,27 @@ The cumulative to delta processor (`cumulativetodeltaprocessor`) converts cumula | |
|
||
## Configuration | ||
|
||
Configuration is specified through a list of metrics. The processor uses metric names to identify a set of cumulative sum metrics and converts them to cumulative delta. | ||
The default configuration is to convert all monotonic sum metrics from aggregation temporality cumulative to aggregation temporality delta. | ||
|
||
The following settings can be optionally configured: | ||
|
||
- `metrics`: The processor uses metric names to identify a set of cumulative sum metrics and converts them to cumulative delta. Defaults to converting all metric names. | ||
- `max_stale`: The total time a state entry will live past the time it was last seen. Set to 0 to retain state indefinitely. Default: 0 | ||
- `monotonic_only`: Specify whether only monotonic metrics are converted from cumulative to delta. Default: `true`. Set to `false` to convert metrics regardless of monotonic setting. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I there any real use case that you want to cover where the support of "gauges to deltas" needed? Otherwise I would recommend to remove this option and always skip non monotonic metrics. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. True, this is a good point. I think I'll go ahead and remove this option. After that update, this library will only attempt to convert monotonic metrics. |
||
|
||
#### Example | ||
|
||
```yaml | ||
processors: | ||
# processor name: cumulativetodelta | ||
cumulativetodelta: | ||
|
||
# list the cumulative sum metrics to convert to delta | ||
# (optional - defaults to converting all monotonic cumulative sum metrics) | ||
metrics: | ||
- <metric_1_name> | ||
- <metric_2_name> | ||
. | ||
. | ||
- <metric_n_name> | ||
``` | ||
``` |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,7 +15,7 @@ | |
package cumulativetodeltaprocessor | ||
|
||
import ( | ||
"fmt" | ||
"time" | ||
|
||
"go.opentelemetry.io/collector/config" | ||
) | ||
|
@@ -24,15 +24,16 @@ import ( | |
type Config struct { | ||
config.ProcessorSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct | ||
|
||
// List of cumulative sum metrics to convert to delta | ||
// List of cumulative metrics to convert to delta. Default: converts all cumulative metrics to delta. | ||
Metrics []string `mapstructure:"metrics"` | ||
|
||
// MaxStale is the total time a state entry will live past the time it was last seen. Set to 0 to retain state indefinitely. | ||
MaxStale time.Duration `mapstructure:"max_stale"` | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am not sure about the use case here. Wondering, how important is it to expose as a config option? Less configuarability gives us simplicity. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The use case is for the prometheus receiver. The scrape interval is configurable and so we wouldn't want metrics to go stale prior to the scraper completing. Allowing this value to be configurable means that users can configure the staleness with their scrape interval + scrape duration + fudge factor. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looks good. I suggest |
||
} | ||
|
||
// Validate checks whether the input configuration has all of the required fields for the processor. | ||
// An error is returned if there are any invalid inputs. | ||
func (config *Config) Validate() error { | ||
if len(config.Metrics) == 0 { | ||
return fmt.Errorf("metric names are missing") | ||
} | ||
var _ config.Processor = (*Config)(nil) | ||
|
||
// Validate checks if the processor configuration is valid | ||
func (cfg *Config) Validate() error { | ||
return nil | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,32 +17,35 @@ package cumulativetodeltaprocessor | |
import ( | ||
"context" | ||
"math" | ||
"time" | ||
|
||
"go.opentelemetry.io/collector/component" | ||
"go.opentelemetry.io/collector/model/pdata" | ||
"go.uber.org/zap" | ||
|
||
awsmetrics "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/metrics" | ||
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/cumulativetodeltaprocessor/tracking" | ||
) | ||
|
||
type cumulativeToDeltaProcessor struct { | ||
metrics map[string]bool | ||
metrics map[string]struct{} | ||
logger *zap.Logger | ||
deltaCalculator awsmetrics.MetricCalculator | ||
deltaCalculator tracking.MetricTracker | ||
cancelFunc context.CancelFunc | ||
} | ||
|
||
func newCumulativeToDeltaProcessor(config *Config, logger *zap.Logger) *cumulativeToDeltaProcessor { | ||
inputMetricSet := make(map[string]bool, len(config.Metrics)) | ||
for _, name := range config.Metrics { | ||
inputMetricSet[name] = true | ||
} | ||
|
||
return &cumulativeToDeltaProcessor{ | ||
metrics: inputMetricSet, | ||
ctx, cancel := context.WithCancel(context.Background()) | ||
p := &cumulativeToDeltaProcessor{ | ||
logger: logger, | ||
deltaCalculator: newDeltaCalculator(), | ||
deltaCalculator: tracking.NewMetricTracker(ctx, logger, config.MaxStale), | ||
cancelFunc: cancel, | ||
} | ||
if len(config.Metrics) > 0 { | ||
p.metrics = make(map[string]struct{}, len(config.Metrics)) | ||
for _, m := range config.Metrics { | ||
p.metrics[m] = struct{}{} | ||
} | ||
} | ||
return p | ||
} | ||
|
||
// Start is invoked during service startup. | ||
|
@@ -53,64 +56,95 @@ func (ctdp *cumulativeToDeltaProcessor) Start(context.Context, component.Host) e | |
// processMetrics implements the ProcessMetricsFunc type. | ||
func (ctdp *cumulativeToDeltaProcessor) processMetrics(_ context.Context, md pdata.Metrics) (pdata.Metrics, error) { | ||
resourceMetricsSlice := md.ResourceMetrics() | ||
for i := 0; i < resourceMetricsSlice.Len(); i++ { | ||
rm := resourceMetricsSlice.At(i) | ||
resourceMetricsSlice.RemoveIf(func(rm pdata.ResourceMetrics) bool { | ||
ilms := rm.InstrumentationLibraryMetrics() | ||
for j := 0; j < ilms.Len(); j++ { | ||
ilm := ilms.At(j) | ||
metricSlice := ilm.Metrics() | ||
for k := 0; k < metricSlice.Len(); k++ { | ||
metric := metricSlice.At(k) | ||
if ctdp.metrics[metric.Name()] { | ||
if metric.DataType() == pdata.MetricDataTypeSum && metric.Sum().AggregationTemporality() == pdata.AggregationTemporalityCumulative { | ||
dataPoints := metric.Sum().DataPoints() | ||
|
||
for l := 0; l < dataPoints.Len(); l++ { | ||
fromDataPoint := dataPoints.At(l) | ||
labelMap := make(map[string]string) | ||
|
||
fromDataPoint.Attributes().Range(func(k string, v pdata.AttributeValue) bool { | ||
labelMap[k] = v.AsString() | ||
return true | ||
}) | ||
datapointValue := fromDataPoint.DoubleVal() | ||
if math.IsNaN(datapointValue) { | ||
continue | ||
} | ||
result, _ := ctdp.deltaCalculator.Calculate(metric.Name(), labelMap, datapointValue, fromDataPoint.Timestamp().AsTime()) | ||
ilms.RemoveIf(func(ilm pdata.InstrumentationLibraryMetrics) bool { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I prefer the new style here, thanks. 😀 |
||
ms := ilm.Metrics() | ||
ms.RemoveIf(func(m pdata.Metric) bool { | ||
if ctdp.metrics != nil { | ||
if _, ok := ctdp.metrics[m.Name()]; !ok { | ||
return false | ||
} | ||
} | ||
baseIdentity := tracking.MetricIdentity{ | ||
Resource: rm.Resource(), | ||
InstrumentationLibrary: ilm.InstrumentationLibrary(), | ||
MetricDataType: m.DataType(), | ||
MetricName: m.Name(), | ||
MetricUnit: m.Unit(), | ||
} | ||
switch m.DataType() { | ||
case pdata.MetricDataTypeSum: | ||
ms := m.Sum() | ||
if ms.AggregationTemporality() != pdata.AggregationTemporalityCumulative { | ||
return false | ||
} | ||
|
||
fromDataPoint.SetDoubleVal(result.(delta).value) | ||
fromDataPoint.SetStartTimestamp(pdata.NewTimestampFromTime(result.(delta).prevTimestamp)) | ||
} | ||
metric.Sum().SetAggregationTemporality(pdata.AggregationTemporalityDelta) | ||
// Ignore any metrics that aren't monotonic | ||
if !ms.IsMonotonic() { | ||
return false | ||
} | ||
|
||
baseIdentity.MetricIsMonotonic = ms.IsMonotonic() | ||
ctdp.convertDataPoints(ms.DataPoints(), baseIdentity) | ||
ms.SetAggregationTemporality(pdata.AggregationTemporalityDelta) | ||
return ms.DataPoints().Len() == 0 | ||
default: | ||
return false | ||
} | ||
} | ||
} | ||
} | ||
}) | ||
return ilm.Metrics().Len() == 0 | ||
}) | ||
return rm.InstrumentationLibraryMetrics().Len() == 0 | ||
}) | ||
return md, nil | ||
} | ||
|
||
// Shutdown is invoked during service shutdown. | ||
func (ctdp *cumulativeToDeltaProcessor) Shutdown(context.Context) error { | ||
ctdp.cancelFunc() | ||
return nil | ||
} | ||
|
||
func newDeltaCalculator() awsmetrics.MetricCalculator { | ||
return awsmetrics.NewMetricCalculator(func(prev *awsmetrics.MetricValue, val interface{}, timestamp time.Time) (interface{}, bool) { | ||
result := delta{value: val.(float64), prevTimestamp: timestamp} | ||
|
||
if prev != nil { | ||
deltaValue := val.(float64) - prev.RawValue.(float64) | ||
result.value = deltaValue | ||
result.prevTimestamp = prev.Timestamp | ||
return result, true | ||
} | ||
return result, false | ||
}) | ||
} | ||
func (ctdp *cumulativeToDeltaProcessor) convertDataPoints(in interface{}, baseIdentity tracking.MetricIdentity) { | ||
switch dps := in.(type) { | ||
case pdata.NumberDataPointSlice: | ||
dps.RemoveIf(func(dp pdata.NumberDataPoint) bool { | ||
id := baseIdentity | ||
id.StartTimestamp = dp.StartTimestamp() | ||
id.Attributes = dp.Attributes() | ||
id.MetricValueType = dp.Type() | ||
point := tracking.ValuePoint{ | ||
ObservedTimestamp: dp.Timestamp(), | ||
} | ||
if id.IsFloatVal() { | ||
// Do not attempt to transform NaN values | ||
if math.IsNaN(dp.DoubleVal()) { | ||
return false | ||
} | ||
point.FloatValue = dp.DoubleVal() | ||
} else { | ||
point.IntValue = dp.IntVal() | ||
} | ||
trackingPoint := tracking.MetricPoint{ | ||
Identity: id, | ||
Value: point, | ||
} | ||
delta, valid := ctdp.deltaCalculator.Convert(trackingPoint) | ||
|
||
type delta struct { | ||
value float64 | ||
prevTimestamp time.Time | ||
// When converting non-monotonic cumulative counters, | ||
// the first data point is omitted since the initial | ||
// reference is not assumed to be zero | ||
if !valid { | ||
return true | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sounds good. This behavior is described in https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/datamodel.md#resets-and-gaps |
||
} | ||
dp.SetStartTimestamp(delta.StartTimestamp) | ||
if id.IsFloatVal() { | ||
dp.SetDoubleVal(delta.FloatValue) | ||
} else { | ||
dp.SetIntVal(delta.IntValue) | ||
} | ||
return false | ||
}) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar question here. Do you have a use case where all metrics need to be translated? I think it would be the best if we will add regex support in future instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
New Relic requires all metrics to be translated from cumulative -> delta temporality 😄
The New Relic platform does not currently support cumulative metric temporalities.