Skip to content

Commit a1b6aa3

Browse files
fix: handle pg_replication_slots on pg<13
Signed-off-by: Michael Todorovic <[email protected]>
1 parent a324fe3 commit a1b6aa3

File tree

2 files changed

+46
-19
lines changed

2 files changed

+46
-19
lines changed

collector/pg_replication_slot.go

Lines changed: 42 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,11 @@ package collector
1616
import (
1717
"context"
1818
"database/sql"
19+
"fmt"
1920
"log/slog"
21+
"strings"
2022

23+
"github.com/blang/semver/v4"
2124
"github.com/prometheus/client_golang/prometheus"
2225
)
2326

@@ -81,26 +84,35 @@ var (
8184
"availability of WAL files claimed by this slot",
8285
[]string{"slot_name", "slot_type", "wal_status"}, nil,
8386
)
87+
)
88+
89+
func replicationSlotQuery(columns []string) string {
90+
return fmt.Sprintf("SELECT %s FROM pg_replication_slots;", strings.Join(columns, ","))
91+
}
8492

85-
pgReplicationSlotQuery = `SELECT
86-
slot_name,
87-
slot_type,
88-
CASE WHEN pg_is_in_recovery() THEN
93+
func (PGReplicationSlotCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error {
94+
db := instance.getDB()
95+
96+
columns := []string{
97+
"slot_name",
98+
"slot_type",
99+
`CASE WHEN pg_is_in_recovery() THEN
89100
pg_last_wal_receive_lsn() - '0/0'
90101
ELSE
91102
pg_current_wal_lsn() - '0/0'
92-
END AS current_wal_lsn,
93-
COALESCE(confirmed_flush_lsn, '0/0') - '0/0' AS confirmed_flush_lsn,
94-
active,
95-
safe_wal_size,
96-
wal_status
97-
FROM pg_replication_slots;`
98-
)
103+
END AS current_wal_lsn`,
104+
"COALESCE(confirmed_flush_lsn, '0/0') - '0/0' AS confirmed_flush_lsn",
105+
"active",
106+
}
107+
108+
abovePG13 := instance.version.GTE(semver.MustParse("13.0.0"))
109+
if abovePG13 {
110+
columns = append(columns, "safe_wal_size")
111+
columns = append(columns, "wal_status")
112+
}
99113

100-
func (PGReplicationSlotCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error {
101-
db := instance.getDB()
102114
rows, err := db.QueryContext(ctx,
103-
pgReplicationSlotQuery)
115+
replicationSlotQuery(columns))
104116
if err != nil {
105117
return err
106118
}
@@ -114,7 +126,22 @@ func (PGReplicationSlotCollector) Update(ctx context.Context, instance *instance
114126
var isActive sql.NullBool
115127
var safeWalSize sql.NullInt64
116128
var walStatus sql.NullString
117-
if err := rows.Scan(&slotName, &slotType, &walLSN, &flushLSN, &isActive, &safeWalSize, &walStatus); err != nil {
129+
130+
r := []any{
131+
&slotName,
132+
&slotType,
133+
&walLSN,
134+
&flushLSN,
135+
&isActive,
136+
}
137+
138+
if abovePG13 {
139+
r = append(r, &safeWalSize)
140+
r = append(r, &walStatus)
141+
}
142+
143+
err := rows.Scan(r...)
144+
if err != nil {
118145
return err
119146
}
120147

collector/pg_replication_slot_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ func TestPgReplicationSlotCollectorActive(t *testing.T) {
3434
columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active", "safe_wal_size", "wal_status"}
3535
rows := sqlmock.NewRows(columns).
3636
AddRow("test_slot", "physical", 5, 3, true, 323906992, "reserved")
37-
mock.ExpectQuery(sanitizeQuery(pgReplicationSlotQuery)).WillReturnRows(rows)
37+
mock.ExpectQuery(sanitizeQuery(replicationSlotQuery(columns))).WillReturnRows(rows)
3838

3939
ch := make(chan prometheus.Metric)
4040
go func() {
@@ -77,7 +77,7 @@ func TestPgReplicationSlotCollectorInActive(t *testing.T) {
7777
columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active", "safe_wal_size", "wal_status"}
7878
rows := sqlmock.NewRows(columns).
7979
AddRow("test_slot", "physical", 6, 12, false, -4000, "extended")
80-
mock.ExpectQuery(sanitizeQuery(pgReplicationSlotQuery)).WillReturnRows(rows)
80+
mock.ExpectQuery(sanitizeQuery(replicationSlotQuery(columns))).WillReturnRows(rows)
8181

8282
ch := make(chan prometheus.Metric)
8383
go func() {
@@ -120,7 +120,7 @@ func TestPgReplicationSlotCollectorActiveNil(t *testing.T) {
120120
columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active", "safe_wal_size", "wal_status"}
121121
rows := sqlmock.NewRows(columns).
122122
AddRow("test_slot", "physical", 6, 12, nil, nil, "lost")
123-
mock.ExpectQuery(sanitizeQuery(pgReplicationSlotQuery)).WillReturnRows(rows)
123+
mock.ExpectQuery(sanitizeQuery(replicationSlotQuery(columns))).WillReturnRows(rows)
124124

125125
ch := make(chan prometheus.Metric)
126126
go func() {
@@ -161,7 +161,7 @@ func TestPgReplicationSlotCollectorTestNilValues(t *testing.T) {
161161
columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active", "safe_wal_size", "wal_status"}
162162
rows := sqlmock.NewRows(columns).
163163
AddRow(nil, nil, nil, nil, true, nil, nil)
164-
mock.ExpectQuery(sanitizeQuery(pgReplicationSlotQuery)).WillReturnRows(rows)
164+
mock.ExpectQuery(sanitizeQuery(replicationSlotQuery(columns))).WillReturnRows(rows)
165165

166166
ch := make(chan prometheus.Metric)
167167
go func() {

0 commit comments

Comments
 (0)