Skip to content

Commit 3098c8e

Browse files
authored
Bugfix/Prevent duplicated metrics on prometheus and opentel (#4269)
prevent duplicated metrics on prometheus and opentel
1 parent f963e5a commit 3098c8e

File tree

2 files changed

+289
-131
lines changed

2 files changed

+289
-131
lines changed

packages/server/src/metrics/OpenTelemetry.ts

Lines changed: 198 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@ import { diag, DiagLogLevel, DiagConsoleLogger, Attributes, Counter } from '@ope
66
import { getVersion } from 'flowise-components'
77
import express from 'express'
88

9+
// Create a static map to track created metrics and prevent duplicates
10+
const createdMetrics = new Map<string, boolean>()
11+
912
export class OpenTelemetry implements IMetricsProvider {
1013
private app: express.Application
1114
private resource: Resource
@@ -30,128 +33,225 @@ export class OpenTelemetry implements IMetricsProvider {
3033
if (process.env.METRICS_OPEN_TELEMETRY_DEBUG === 'true') {
3134
diag.setLogger(new DiagConsoleLogger(), DiagLogLevel.DEBUG)
3235
}
36+
37+
// Clear metrics tracking on new instance
38+
createdMetrics.clear()
3339
}
3440

3541
public getName(): string {
3642
return 'OpenTelemetry'
3743
}
3844

3945
async initializeCounters(): Promise<void> {
40-
// Define the resource with the service name for trace grouping
41-
const flowiseVersion = await getVersion()
42-
43-
this.resource = new Resource({
44-
[ATTR_SERVICE_NAME]: process.env.METRICS_SERVICE_NAME || 'FlowiseAI',
45-
[ATTR_SERVICE_VERSION]: flowiseVersion.version // Version as a label
46-
})
47-
48-
const metricProtocol = process.env.METRICS_OPEN_TELEMETRY_PROTOCOL || 'http' // Default to 'http'
49-
// Conditionally import the correct OTLP exporters based on protocol
50-
let OTLPMetricExporter
51-
if (metricProtocol === 'http') {
52-
OTLPMetricExporter = require('@opentelemetry/exporter-metrics-otlp-http').OTLPMetricExporter
53-
} else if (metricProtocol === 'grpc') {
54-
OTLPMetricExporter = require('@opentelemetry/exporter-metrics-otlp-grpc').OTLPMetricExporter
55-
} else if (metricProtocol === 'proto') {
56-
OTLPMetricExporter = require('@opentelemetry/exporter-metrics-otlp-proto').OTLPMetricExporter
57-
} else {
58-
console.error('Invalid METRICS_OPEN_TELEMETRY_PROTOCOL specified. Please set it to "http", "grpc", or "proto".')
59-
process.exit(1) // Exit if invalid protocol type is specified
60-
}
46+
try {
47+
// Define the resource with the service name for trace grouping
48+
const flowiseVersion = await getVersion()
6149

62-
this.otlpMetricExporter = new OTLPMetricExporter({
63-
url: process.env.METRICS_OPEN_TELEMETRY_METRIC_ENDPOINT // OTLP endpoint for metrics
64-
})
65-
66-
this.metricReader = new PeriodicExportingMetricReader({
67-
exporter: this.otlpMetricExporter,
68-
exportIntervalMillis: 5000 // Export metrics every 5 seconds
69-
})
70-
this.meterProvider = new MeterProvider({ resource: this.resource, readers: [this.metricReader] })
71-
72-
const meter = this.meterProvider.getMeter('flowise-metrics')
73-
// look at the FLOWISE_COUNTER enum in Interface.Metrics.ts and get all values
74-
// for each counter in the enum, create a new promClient.Counter and add it to the registry
75-
const enumEntries = Object.entries(FLOWISE_METRIC_COUNTERS)
76-
enumEntries.forEach(([name, value]) => {
77-
// derive proper counter name from the enum value (chatflow_created = Chatflow Created)
78-
const properCounterName: string = name.replace(/_/g, ' ').replace(/\b\w/g, (l) => l.toUpperCase())
79-
this.counters.set(
80-
value,
81-
meter.createCounter(value, {
82-
description: properCounterName
83-
})
84-
)
85-
})
86-
87-
// in addition to the enum counters, add a few more custom counters
88-
89-
const versionGuage = meter.createGauge('flowise_version', {
90-
description: 'Flowise version'
91-
})
92-
// remove the last dot from the version string, e.g. 2.1.3 -> 2.13 (guage needs a number - float)
93-
const formattedVersion = flowiseVersion.version.replace(/\.(\d+)$/, '$1')
94-
versionGuage.record(parseFloat(formattedVersion))
95-
96-
// Counter for HTTP requests with method, path, and status as labels
97-
this.httpRequestCounter = meter.createCounter('http_requests_total', {
98-
description: 'Counts the number of HTTP requests received'
99-
})
100-
101-
// Histogram to measure HTTP request duration in milliseconds
102-
this.httpRequestDuration = meter.createHistogram('http_request_duration_ms', {
103-
description: 'Records the duration of HTTP requests in ms'
104-
})
50+
this.resource = new Resource({
51+
[ATTR_SERVICE_NAME]: process.env.METRICS_SERVICE_NAME || 'FlowiseAI',
52+
[ATTR_SERVICE_VERSION]: flowiseVersion.version // Version as a label
53+
})
54+
55+
const metricProtocol = process.env.METRICS_OPEN_TELEMETRY_PROTOCOL || 'http' // Default to 'http'
56+
// Conditionally import the correct OTLP exporters based on protocol
57+
let OTLPMetricExporter
58+
if (metricProtocol === 'http') {
59+
OTLPMetricExporter = require('@opentelemetry/exporter-metrics-otlp-http').OTLPMetricExporter
60+
} else if (metricProtocol === 'grpc') {
61+
OTLPMetricExporter = require('@opentelemetry/exporter-metrics-otlp-grpc').OTLPMetricExporter
62+
} else if (metricProtocol === 'proto') {
63+
OTLPMetricExporter = require('@opentelemetry/exporter-metrics-otlp-proto').OTLPMetricExporter
64+
} else {
65+
console.error('Invalid METRICS_OPEN_TELEMETRY_PROTOCOL specified. Please set it to "http", "grpc", or "proto".')
66+
process.exit(1) // Exit if invalid protocol type is specified
67+
}
68+
69+
// Handle any existing metric exporter
70+
if (this.otlpMetricExporter) {
71+
try {
72+
await this.otlpMetricExporter.shutdown()
73+
} catch (error) {
74+
// Ignore shutdown errors
75+
}
76+
}
77+
78+
this.otlpMetricExporter = new OTLPMetricExporter({
79+
url: process.env.METRICS_OPEN_TELEMETRY_METRIC_ENDPOINT // OTLP endpoint for metrics
80+
})
81+
82+
// Clean up any existing metric reader
83+
if (this.metricReader) {
84+
try {
85+
await this.metricReader.shutdown()
86+
} catch (error) {
87+
// Ignore shutdown errors
88+
}
89+
}
90+
91+
this.metricReader = new PeriodicExportingMetricReader({
92+
exporter: this.otlpMetricExporter,
93+
exportIntervalMillis: 5000 // Export metrics every 5 seconds
94+
})
95+
96+
// Clean up any existing meter provider
97+
if (this.meterProvider) {
98+
try {
99+
await this.meterProvider.shutdown()
100+
} catch (error) {
101+
// Ignore shutdown errors
102+
}
103+
}
104+
105+
this.meterProvider = new MeterProvider({ resource: this.resource, readers: [this.metricReader] })
106+
107+
const meter = this.meterProvider.getMeter('flowise-metrics')
108+
// look at the FLOWISE_COUNTER enum in Interface.Metrics.ts and get all values
109+
// for each counter in the enum, create a new promClient.Counter and add it to the registry
110+
const enumEntries = Object.entries(FLOWISE_METRIC_COUNTERS)
111+
enumEntries.forEach(([name, value]) => {
112+
try {
113+
// Check if we've already created this metric
114+
if (!createdMetrics.has(value)) {
115+
// derive proper counter name from the enum value (chatflow_created = Chatflow Created)
116+
const properCounterName: string = name.replace(/_/g, ' ').replace(/\b\w/g, (l) => l.toUpperCase())
117+
this.counters.set(
118+
value,
119+
meter.createCounter(value, {
120+
description: properCounterName
121+
})
122+
)
123+
createdMetrics.set(value, true)
124+
}
125+
} catch (error) {
126+
// Log error but continue with other metrics
127+
console.error(`Error creating metric ${value}:`, error)
128+
}
129+
})
130+
131+
try {
132+
// Add version gauge if not already created
133+
if (!createdMetrics.has('flowise_version')) {
134+
const versionGuage = meter.createGauge('flowise_version', {
135+
description: 'Flowise version'
136+
})
137+
// remove the last dot from the version string, e.g. 2.1.3 -> 2.13 (gauge needs a number - float)
138+
const formattedVersion = flowiseVersion.version.replace(/\.(\d+)$/, '$1')
139+
versionGuage.record(parseFloat(formattedVersion))
140+
createdMetrics.set('flowise_version', true)
141+
}
142+
} catch (error) {
143+
console.error('Error creating version gauge:', error)
144+
}
145+
146+
try {
147+
// HTTP requests counter
148+
if (!createdMetrics.has('http_requests_total')) {
149+
this.httpRequestCounter = meter.createCounter('http_requests_total', {
150+
description: 'Counts the number of HTTP requests received'
151+
})
152+
createdMetrics.set('http_requests_total', true)
153+
}
154+
} catch (error) {
155+
console.error('Error creating HTTP request counter:', error)
156+
}
157+
158+
try {
159+
// HTTP request duration histogram
160+
if (!createdMetrics.has('http_request_duration_ms')) {
161+
this.httpRequestDuration = meter.createHistogram('http_request_duration_ms', {
162+
description: 'Records the duration of HTTP requests in ms'
163+
})
164+
createdMetrics.set('http_request_duration_ms', true)
165+
}
166+
} catch (error) {
167+
console.error('Error creating HTTP request duration histogram:', error)
168+
}
169+
170+
await this.setupMetricsEndpoint()
171+
} catch (error) {
172+
console.error('Error initializing OpenTelemetry metrics:', error)
173+
// Don't throw - allow app to continue without metrics
174+
}
105175
}
106176

107177
// Function to record HTTP request duration
108178
private recordHttpRequestDuration(durationMs: number, method: string, path: string, status: number) {
109-
this.httpRequestDuration.record(durationMs, {
110-
method,
111-
path,
112-
status: status.toString()
113-
})
179+
try {
180+
if (this.httpRequestDuration) {
181+
this.httpRequestDuration.record(durationMs, {
182+
method,
183+
path,
184+
status: status.toString()
185+
})
186+
}
187+
} catch (error) {
188+
// Log error but don't crash the application
189+
console.error('Error recording HTTP request duration:', error)
190+
}
114191
}
115192

116193
// Function to record HTTP requests with specific labels
117194
private recordHttpRequest(method: string, path: string, status: number) {
118-
this.httpRequestCounter.add(1, {
119-
method,
120-
path,
121-
status: status.toString()
122-
})
195+
try {
196+
if (this.httpRequestCounter) {
197+
this.httpRequestCounter.add(1, {
198+
method,
199+
path,
200+
status: status.toString()
201+
})
202+
}
203+
} catch (error) {
204+
// Log error but don't crash the application
205+
console.error('Error recording HTTP request:', error)
206+
}
123207
}
124208

125209
async setupMetricsEndpoint(): Promise<void> {
126-
// Graceful shutdown for telemetry data flushing
127-
process.on('SIGTERM', async () => {
128-
await this.metricReader.shutdown()
129-
await this.meterProvider.shutdown()
130-
})
131-
132-
// Runs before each requests
133-
this.app.use((req, res, next) => {
134-
res.locals.startEpoch = Date.now()
135-
next()
136-
})
137-
138-
// Runs after each requests
139-
this.app.use((req, res, next) => {
140-
res.on('finish', async () => {
141-
if (res.locals.startEpoch) {
142-
const responseTimeInMs = Date.now() - res.locals.startEpoch
143-
this.recordHttpRequest(req.method, req.path, res.statusCode)
144-
this.recordHttpRequestDuration(responseTimeInMs, req.method, req.path, res.statusCode)
210+
try {
211+
// Graceful shutdown for telemetry data flushing
212+
process.on('SIGTERM', async () => {
213+
try {
214+
if (this.metricReader) await this.metricReader.shutdown()
215+
if (this.meterProvider) await this.meterProvider.shutdown()
216+
} catch (error) {
217+
console.error('Error during metrics shutdown:', error)
145218
}
146219
})
147-
next()
148-
})
220+
221+
// Runs before each requests
222+
this.app.use((req, res, next) => {
223+
res.locals.startEpoch = Date.now()
224+
next()
225+
})
226+
227+
// Runs after each requests
228+
this.app.use((req, res, next) => {
229+
res.on('finish', async () => {
230+
try {
231+
if (res.locals.startEpoch) {
232+
const responseTimeInMs = Date.now() - res.locals.startEpoch
233+
this.recordHttpRequest(req.method, req.path, res.statusCode)
234+
this.recordHttpRequestDuration(responseTimeInMs, req.method, req.path, res.statusCode)
235+
}
236+
} catch (error) {
237+
console.error('Error in metrics middleware:', error)
238+
}
239+
})
240+
next()
241+
})
242+
} catch (error) {
243+
console.error('Error setting up metrics endpoint:', error)
244+
}
149245
}
150246

151247
async incrementCounter(counter: string, payload: any): Promise<void> {
152-
// Increment OpenTelemetry counter with the payload
153-
if (this.counters.has(counter)) {
154-
;(this.counters.get(counter) as Counter<Attributes>).add(1, payload)
248+
try {
249+
// Increment OpenTelemetry counter with the payload
250+
if (this.counters.has(counter)) {
251+
;(this.counters.get(counter) as Counter<Attributes>).add(1, payload)
252+
}
253+
} catch (error) {
254+
console.error(`Error incrementing counter ${counter}:`, error)
155255
}
156256
}
157257
}

0 commit comments

Comments
 (0)