Skip to content

Commit 43820a4

Browse files
committed
feat: updateConnectionMetricCache and buildConnectionMetric func
Signed-off-by: Yash Patel <[email protected]>
1 parent e1897ca commit 43820a4

File tree

2 files changed

+206
-28
lines changed

2 files changed

+206
-28
lines changed

pkg/controller/telemetry/metric.go

Lines changed: 203 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ const (
4747
metricFlushInterval = 5 * time.Second
4848

4949
DEFAULT_UNKNOWN = "-"
50+
51+
LONG_CONN_METRIC_THRESHOLD = uint64(30 * time.Second)
5052
)
5153

5254
var osStartTime time.Time
@@ -68,14 +70,16 @@ var TCP_STATES = map[uint32]string{
6870
}
6971

7072
type MetricController struct {
71-
EnableAccesslog atomic.Bool
72-
EnableMonitoring atomic.Bool
73-
EnableWorkloadMetric atomic.Bool
74-
workloadCache cache.WorkloadCache
75-
serviceCache cache.ServiceCache
76-
workloadMetricCache map[workloadMetricLabels]*workloadMetricInfo
77-
serviceMetricCache map[serviceMetricLabels]*serviceMetricInfo
78-
mutex sync.RWMutex
73+
EnableAccesslog atomic.Bool
74+
EnableMonitoring atomic.Bool
75+
EnableWorkloadMetric atomic.Bool
76+
EnableConnectionMetric atomic.Bool
77+
workloadCache cache.WorkloadCache
78+
serviceCache cache.ServiceCache
79+
workloadMetricCache map[workloadMetricLabels]*workloadMetricInfo
80+
serviceMetricCache map[serviceMetricLabels]*serviceMetricInfo
81+
connectionMetricCache map[connectionMetricLabels]*connectionMetricInfo
82+
mutex sync.RWMutex
7983
}
8084

8185
type workloadMetricInfo struct {
@@ -96,6 +100,13 @@ type serviceMetricInfo struct {
96100
ServiceConnFailed float64
97101
}
98102

103+
type connectionMetricInfo struct {
104+
ConnSentBytes float64
105+
ConnReceivedBytes float64
106+
ConnTotalRetrans float64
107+
ConnPacketLost float64
108+
}
109+
99110
type statistics struct {
100111
SentBytes uint32
101112
ReceivedBytes uint32
@@ -144,6 +155,7 @@ type connMetric struct {
144155
sentBytes uint32 // total bytes sent till now
145156
totalRetrans uint32 // total retransmits till now
146157
packetLost uint32 // total packets lost till now
158+
totalReports uint32 // number of times connection data is reported in ringbuffer
147159
}
148160

149161
type connectionSrcDst struct {
@@ -231,6 +243,39 @@ type serviceMetricLabels struct {
231243
connectionSecurityPolicy string
232244
}
233245

246+
type connectionMetricLabels struct {
247+
reporter string
248+
startTime string
249+
250+
sourceWorkload string
251+
sourceCanonicalService string
252+
sourceCanonicalRevision string
253+
sourceWorkloadNamespace string
254+
sourcePrincipal string
255+
sourceApp string
256+
sourceVersion string
257+
sourceCluster string
258+
sourceAddress string
259+
260+
destinationAddress string
261+
destinationPodAddress string
262+
destinationPodNamespace string
263+
destinationPodName string
264+
destinationWorkload string
265+
destinationCanonicalService string
266+
destinationCanonicalRevision string
267+
destinationWorkloadNamespace string
268+
destinationPrincipal string
269+
destinationApp string
270+
destinationVersion string
271+
destinationCluster string
272+
273+
requestProtocol string
274+
// TODO: responseFlags is not used for now
275+
responseFlags string
276+
connectionSecurityPolicy string
277+
}
278+
234279
func NewServiceMetricLabel() *serviceMetricLabels {
235280
return &serviceMetricLabels{}
236281
}
@@ -239,6 +284,10 @@ func NewWorkloadMetricLabel() *workloadMetricLabels {
239284
return &workloadMetricLabels{}
240285
}
241286

287+
func NewConnectionMetricLabel() *connectionMetricLabels {
288+
return &connectionMetricLabels{}
289+
}
290+
242291
func (w *workloadMetricLabels) withSource(workload *workloadapi.Workload) *workloadMetricLabels {
243292
if workload == nil {
244293
return w
@@ -311,12 +360,45 @@ func (s *serviceMetricLabels) withDestination(workload *workloadapi.Workload) *s
311360
return s
312361
}
313362

363+
func (w *connectionMetricLabels) withSource(workload *workloadapi.Workload) *connectionMetricLabels {
364+
if workload == nil {
365+
return w
366+
}
367+
w.sourceWorkload = workload.GetWorkloadName()
368+
w.sourceCanonicalService = workload.GetCanonicalName()
369+
w.sourceCanonicalRevision = workload.GetCanonicalRevision()
370+
w.sourceWorkloadNamespace = workload.GetNamespace()
371+
w.sourceApp = workload.GetCanonicalName()
372+
w.sourceVersion = workload.GetCanonicalRevision()
373+
w.sourceCluster = workload.GetClusterId()
374+
w.sourcePrincipal = buildPrincipal(workload)
375+
return w
376+
}
377+
378+
func (w *connectionMetricLabels) withDestination(workload *workloadapi.Workload) *connectionMetricLabels {
379+
if workload == nil {
380+
return w
381+
}
382+
w.destinationPodNamespace = workload.GetNamespace()
383+
w.destinationPodName = workload.GetName()
384+
w.destinationWorkload = workload.GetWorkloadName()
385+
w.destinationCanonicalService = workload.GetCanonicalName()
386+
w.destinationCanonicalRevision = workload.GetCanonicalRevision()
387+
w.destinationWorkloadNamespace = workload.GetNamespace()
388+
w.destinationApp = workload.GetCanonicalName()
389+
w.destinationVersion = workload.GetCanonicalRevision()
390+
w.destinationCluster = workload.GetClusterId()
391+
w.destinationPrincipal = buildPrincipal(workload)
392+
return w
393+
}
394+
314395
func NewMetric(workloadCache cache.WorkloadCache, serviceCache cache.ServiceCache, enableMonitoring bool) *MetricController {
315396
m := &MetricController{
316-
workloadCache: workloadCache,
317-
serviceCache: serviceCache,
318-
workloadMetricCache: map[workloadMetricLabels]*workloadMetricInfo{},
319-
serviceMetricCache: map[serviceMetricLabels]*serviceMetricInfo{},
397+
workloadCache: workloadCache,
398+
serviceCache: serviceCache,
399+
workloadMetricCache: map[workloadMetricLabels]*workloadMetricInfo{},
400+
serviceMetricCache: map[serviceMetricLabels]*serviceMetricInfo{},
401+
connectionMetricCache: map[connectionMetricLabels]*connectionMetricInfo{},
320402
}
321403
m.EnableMonitoring.Store(enableMonitoring)
322404
m.EnableAccesslog.Store(false)
@@ -411,6 +493,10 @@ func (m *MetricController) Run(ctx context.Context, mapOfTcpInfo *ebpf.Map) {
411493
workloadLabels = m.buildWorkloadMetric(&data)
412494
}
413495

496+
connectionLabels := connectionMetricLabels{}
497+
if m.EnableConnectionMetric.Load() && data.duration > LONG_CONN_METRIC_THRESHOLD {
498+
connectionLabels = m.buildConnectionMetric(&data)
499+
}
414500
if m.EnableAccesslog.Load() {
415501
// accesslogs at start of connection, at interval of 5 sec during connection lifecycle and at close of connection
416502
OutputAccesslog(data, tcp_conns[data.conSrcDstInfo], accesslog)
@@ -422,9 +508,12 @@ func (m *MetricController) Run(ctx context.Context, mapOfTcpInfo *ebpf.Map) {
422508

423509
m.mutex.Lock()
424510
if m.EnableWorkloadMetric.Load() {
425-
m.updateWorkloadMetricCache(data, workloadLabels)
511+
m.updateWorkloadMetricCache(data, workloadLabels, tcp_conns)
512+
}
513+
m.updateServiceMetricCache(data, serviceLabels, tcp_conns)
514+
if m.EnableConnectionMetric.Load() && data.duration > LONG_CONN_METRIC_THRESHOLD {
515+
m.updateConnectionMetricCache(data, connectionLabels)
426516
}
427-
m.updateServiceMetricCache(data, serviceLabels)
428517
m.mutex.Unlock()
429518
}
430519
}
@@ -466,11 +555,22 @@ func buildV4Metric(buf *bytes.Buffer, tcp_conns map[connectionSrcDst]connMetric)
466555
data.minRtt = connectData.statistics.RttMin
467556
data.totalRetrans = connectData.statistics.Retransmits - tcp_conns[data.conSrcDstInfo].totalRetrans
468557
data.packetLost = connectData.statistics.LostPackets - tcp_conns[data.conSrcDstInfo].packetLost
469-
tcp_conns[data.conSrcDstInfo] = connMetric{
470-
receivedBytes: connectData.ReceivedBytes,
471-
sentBytes: connectData.SentBytes,
472-
totalRetrans: connectData.statistics.Retransmits,
473-
packetLost: connectData.statistics.LostPackets,
558+
559+
cm, ok := tcp_conns[data.conSrcDstInfo]
560+
if ok {
561+
cm.receivedBytes = connectData.ReceivedBytes
562+
cm.sentBytes = connectData.SentBytes
563+
cm.totalRetrans = connectData.statistics.Retransmits
564+
cm.packetLost = connectData.statistics.LostPackets
565+
cm.totalReports++
566+
} else {
567+
tcp_conns[data.conSrcDstInfo] = connMetric{
568+
receivedBytes: connectData.ReceivedBytes,
569+
sentBytes: connectData.SentBytes,
570+
totalRetrans: connectData.statistics.Retransmits,
571+
packetLost: connectData.statistics.LostPackets,
572+
totalReports: 1,
573+
}
474574
}
475575

476576
return data, nil
@@ -510,11 +610,22 @@ func buildV6Metric(buf *bytes.Buffer, tcp_conns map[connectionSrcDst]connMetric)
510610
data.minRtt = connectData.statistics.RttMin
511611
data.totalRetrans = connectData.statistics.Retransmits - tcp_conns[data.conSrcDstInfo].totalRetrans
512612
data.packetLost = connectData.statistics.LostPackets - tcp_conns[data.conSrcDstInfo].packetLost
513-
tcp_conns[data.conSrcDstInfo] = connMetric{
514-
receivedBytes: connectData.ReceivedBytes,
515-
sentBytes: connectData.SentBytes,
516-
totalRetrans: connectData.statistics.Retransmits,
517-
packetLost: connectData.statistics.LostPackets,
613+
614+
cm, ok := tcp_conns[data.conSrcDstInfo]
615+
if ok {
616+
cm.receivedBytes = connectData.ReceivedBytes
617+
cm.sentBytes = connectData.SentBytes
618+
cm.totalRetrans = connectData.statistics.Retransmits
619+
cm.packetLost = connectData.statistics.LostPackets
620+
cm.totalReports++
621+
} else {
622+
tcp_conns[data.conSrcDstInfo] = connMetric{
623+
receivedBytes: connectData.ReceivedBytes,
624+
sentBytes: connectData.SentBytes,
625+
totalRetrans: connectData.statistics.Retransmits,
626+
packetLost: connectData.statistics.LostPackets,
627+
totalReports: 1,
628+
}
518629
}
519630

520631
return data, nil
@@ -652,6 +763,43 @@ func (m *MetricController) buildServiceMetric(data *requestMetric) (serviceMetri
652763
return *trafficLabels, *accesslog
653764
}
654765

766+
func (m *MetricController) buildConnectionMetric(data *requestMetric) connectionMetricLabels {
767+
var dstAddr, srcAddr []byte
768+
for i := range data.conSrcDstInfo.dst {
769+
dstAddr = binary.LittleEndian.AppendUint32(dstAddr, data.conSrcDstInfo.dst[i])
770+
srcAddr = binary.LittleEndian.AppendUint32(srcAddr, data.conSrcDstInfo.src[i])
771+
}
772+
773+
dstWorkload, dstIP := m.getWorkloadByAddress(restoreIPv4(dstAddr))
774+
srcWorkload, srcIP := m.getWorkloadByAddress(restoreIPv4(srcAddr))
775+
776+
if srcWorkload == nil {
777+
return connectionMetricLabels{}
778+
}
779+
780+
trafficLabels := NewConnectionMetricLabel()
781+
trafficLabels.withSource(srcWorkload).withDestination(dstWorkload)
782+
783+
trafficLabels.destinationAddress = dstIP + ":" + fmt.Sprintf("%d", data.conSrcDstInfo.dstPort)
784+
trafficLabels.sourceAddress = srcIP + ":" + fmt.Sprintf("%d", data.conSrcDstInfo.srcPort)
785+
trafficLabels.destinationPodAddress = dstIP
786+
trafficLabels.requestProtocol = "tcp"
787+
trafficLabels.connectionSecurityPolicy = "mutual_tls"
788+
789+
switch data.direction {
790+
case constants.INBOUND:
791+
trafficLabels.reporter = "destination"
792+
case constants.OUTBOUND:
793+
trafficLabels.reporter = "source"
794+
}
795+
796+
startTime := calculateUptime(osStartTime, data.startTime)
797+
startTimeInfo := fmt.Sprintf("%v", startTime)
798+
trafficLabels.startTime = startTimeInfo
799+
800+
return *trafficLabels
801+
}
802+
655803
func (m *MetricController) getWorkloadByAddress(address []byte) (*workloadapi.Workload, string) {
656804
networkAddr := cache.NetworkAddress{}
657805
networkAddr.Address, _ = netip.AddrFromSlice(address)
@@ -669,10 +817,10 @@ func buildPrincipal(workload *workloadapi.Workload) string {
669817
return DEFAULT_UNKNOWN
670818
}
671819

672-
func (m *MetricController) updateWorkloadMetricCache(data requestMetric, labels workloadMetricLabels) {
820+
func (m *MetricController) updateWorkloadMetricCache(data requestMetric, labels workloadMetricLabels, tcp_conns map[connectionSrcDst]connMetric) {
673821
v, ok := m.workloadMetricCache[labels]
674822
if ok {
675-
if data.state == TCP_ESTABLISHED {
823+
if data.state == TCP_ESTABLISHED && tcp_conns[data.conSrcDstInfo].totalReports == 1 {
676824
v.WorkloadConnOpened = v.WorkloadConnOpened + 1
677825
}
678826
if data.state == TCP_CLOSTED {
@@ -704,10 +852,10 @@ func (m *MetricController) updateWorkloadMetricCache(data requestMetric, labels
704852
}
705853
}
706854

707-
func (m *MetricController) updateServiceMetricCache(data requestMetric, labels serviceMetricLabels) {
855+
func (m *MetricController) updateServiceMetricCache(data requestMetric, labels serviceMetricLabels, tcp_conns map[connectionSrcDst]connMetric) {
708856
v, ok := m.serviceMetricCache[labels]
709857
if ok {
710-
if data.state == TCP_ESTABLISHED {
858+
if data.state == TCP_ESTABLISHED && tcp_conns[data.conSrcDstInfo].totalReports == 1 {
711859
v.ServiceConnOpened = v.ServiceConnOpened + 1
712860
}
713861
if data.state == TCP_CLOSTED {
@@ -735,12 +883,31 @@ func (m *MetricController) updateServiceMetricCache(data requestMetric, labels s
735883
}
736884
}
737885

886+
func (m *MetricController) updateConnectionMetricCache(data requestMetric, labels connectionMetricLabels) {
887+
v, ok := m.connectionMetricCache[labels]
888+
if ok {
889+
v.ConnSentBytes = v.ConnSentBytes + float64(data.sentBytes)
890+
v.ConnReceivedBytes = v.ConnReceivedBytes + float64(data.receivedBytes)
891+
v.ConnPacketLost = v.ConnPacketLost + float64(data.packetLost)
892+
v.ConnTotalRetrans = v.ConnTotalRetrans + float64(data.totalRetrans)
893+
} else {
894+
newConnectionMetricInfo := connectionMetricInfo{}
895+
newConnectionMetricInfo.ConnSentBytes = float64(data.sentBytes)
896+
newConnectionMetricInfo.ConnReceivedBytes = float64(data.receivedBytes)
897+
newConnectionMetricInfo.ConnPacketLost = float64(data.packetLost)
898+
newConnectionMetricInfo.ConnTotalRetrans = float64(data.totalRetrans)
899+
m.connectionMetricCache[labels] = &newConnectionMetricInfo
900+
}
901+
}
902+
738903
func (m *MetricController) updatePrometheusMetric() {
739904
m.mutex.Lock()
740905
workloadInfoCache := m.workloadMetricCache
741906
serviceInfoCache := m.serviceMetricCache
907+
connectionInfoCache := m.connectionMetricCache
742908
m.workloadMetricCache = map[workloadMetricLabels]*workloadMetricInfo{}
743909
m.serviceMetricCache = map[serviceMetricLabels]*serviceMetricInfo{}
910+
m.connectionMetricCache = map[connectionMetricLabels]*connectionMetricInfo{}
744911
m.mutex.Unlock()
745912

746913
for k, v := range workloadInfoCache {
@@ -763,6 +930,14 @@ func (m *MetricController) updatePrometheusMetric() {
763930
tcpSentBytesInService.With(serviceLabels).Add(v.ServiceConnSentBytes)
764931
}
765932

933+
for k, v := range connectionInfoCache {
934+
connectionLabels := struct2map(k)
935+
tcpConnectionTotalSendBytes.With(connectionLabels).Add(v.ConnSentBytes)
936+
tcpConnectionTotalReceivedBytes.With(connectionLabels).Add(v.ConnReceivedBytes)
937+
tcpConnectionTotalPacketLost.With(connectionLabels).Add(v.ConnPacketLost)
938+
tcpConnectionTotalRetrans.With(connectionLabels).Add(v.ConnTotalRetrans)
939+
}
940+
766941
// delete metrics
767942
deleteLock.Lock()
768943
// Creating a copy reduces the amount of time spent adding locks in the programme

pkg/controller/telemetry/utils.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ var (
9292

9393
connectionLabels = []string{
9494
"reporter",
95+
"start_time",
9596
"source_workload",
9697
"source_canonical_service",
9798
"source_canonical_revision",
@@ -120,6 +121,7 @@ var (
120121

121122
labelsMap = map[string]string{
122123
"reporter": "reporter",
124+
"startTime": "start_time",
123125
"sourceWorkload": "source_workload",
124126
"sourceCanonicalService": "source_canonical_service",
125127
"sourceCanonicalRevision": "source_canonical_revision",
@@ -312,6 +314,7 @@ func runPrometheusClient(registry *prometheus.Registry) {
312314
defer mu.Unlock()
313315
registry.MustRegister(tcpConnectionOpenedInWorkload, tcpConnectionClosedInWorkload, tcpReceivedBytesInWorkload, tcpSentBytesInWorkload, tcpConnectionTotalRetransInWorkload, tcpConnectionPacketLostInWorkload)
314316
registry.MustRegister(tcpConnectionOpenedInService, tcpConnectionClosedInService, tcpReceivedBytesInService, tcpSentBytesInService)
317+
registry.MustRegister(tcpConnectionTotalSendBytes, tcpConnectionTotalReceivedBytes, tcpConnectionTotalPacketLost, tcpConnectionTotalRetrans)
315318
registry.MustRegister(bpfProgOpDuration, bpfProgOpCount)
316319
registry.MustRegister(mapEntryCount, mapCountInNode)
317320

0 commit comments

Comments
 (0)