Skip to content

Commit 439740f

Browse files
committed
Allow overriding parsed change attributes with a custom function
1 parent 8da374d commit 439740f

File tree

7 files changed

+42
-11
lines changed

7 files changed

+42
-11
lines changed

core/.eslintrc.js

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,7 @@ module.exports = {
88
node: true,
99
},
1010
ignorePatterns: ['dist/**', 'node_modules/**'],
11+
rules: {
12+
'@typescript-eslint/no-explicit-any': 'off',
13+
},
1114
}

core/src/fetched-record.ts

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,7 @@ export const MESSAGE_PREFIX_CONTEXT = '_bemi'
88
export const MESSAGE_PREFIX_HEARTBEAT = '_bemi_heartbeat'
99
const UNAVAILABLE_VALUE_PLACEHOLDER = '__bemi_unavailable_value'
1010

11-
// eslint-disable-next-line @typescript-eslint/no-explicit-any
12-
const parseDebeziumData = (debeziumChange: any, now: Date) => {
11+
const parseDebeziumData = (debeziumChange: any, now: Date): RequiredEntityData<Change> => {
1312
const {
1413
op,
1514
before: beforeRaw,
@@ -86,8 +85,13 @@ export class FetchedRecord {
8685
this.messagePrefix = messagePrefix
8786
}
8887

89-
static fromNatsMessage(natsMessage: JsMsg, now = new Date()) {
90-
// eslint-disable-next-line @typescript-eslint/no-explicit-any
88+
static fromNatsMessage(
89+
natsMessage: JsMsg,
90+
{
91+
now = new Date(),
92+
changeAttributesOverride = (changeAttributes: RequiredEntityData<Change>) => changeAttributes,
93+
} = {},
94+
) {
9195
const debeziumData = decodeData(natsMessage.data) as any
9296

9397
const messagePrefix = debeziumData.message?.prefix
@@ -96,8 +100,10 @@ export class FetchedRecord {
96100
return null
97101
}
98102

103+
const changeAttributes = changeAttributesOverride(parseDebeziumData(debeziumData, now))
104+
99105
return new FetchedRecord({
100-
changeAttributes: parseDebeziumData(debeziumData, now),
106+
changeAttributes,
101107
subject: natsMessage.subject,
102108
streamSequence: natsMessage.info.streamSequence,
103109
messagePrefix,

core/src/ingestion.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { MikroORM } from '@mikro-orm/postgresql'
1+
import { MikroORM, RequiredEntityData } from '@mikro-orm/postgresql'
22
import { Consumer, JsMsg } from 'nats'
33

44
import { logger } from './logger'
@@ -69,12 +69,14 @@ export const runIngestionLoop = async ({
6969
fetchBatchSize = 100,
7070
insertBatchSize = 100,
7171
useBuffer = false,
72+
changeAttributesOverride = (changeAttributes: RequiredEntityData<Change>) => changeAttributes,
7273
}: {
7374
orm: MikroORM
7475
consumer: Consumer
7576
fetchBatchSize?: number
7677
insertBatchSize?: number
7778
useBuffer?: boolean
79+
changeAttributesOverride?: (changeAttributes: RequiredEntityData<Change>) => RequiredEntityData<Change>
7880
}) => {
7981
let lastStreamSequence: number | null = null
8082
let fetchedRecordBuffer = new FetchedRecordBuffer()
@@ -102,7 +104,7 @@ export const runIngestionLoop = async ({
102104
const now = new Date()
103105
const natsMessages = Object.values(natsMessageBySequence)
104106
const fetchedRecords = natsMessages
105-
.map((m: JsMsg) => FetchedRecord.fromNatsMessage(m, now))
107+
.map((m: JsMsg) => FetchedRecord.fromNatsMessage(m, { now, changeAttributesOverride }))
106108
.filter((r) => r) as FetchedRecord[]
107109
const { stitchedFetchedRecords, newFetchedRecordBuffer, ackStreamSequence } = stitchFetchedRecords({
108110
fetchedRecordBuffer: fetchedRecordBuffer.addFetchedRecords(fetchedRecords),

core/src/logger.ts

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ import util from 'util'
22

33
const LOG_LEVEL = process.env.LOG_LEVEL
44

5-
// eslint-disable-next-line @typescript-eslint/no-explicit-any
65
const log = (message: any) => {
76
if (typeof message === 'string') {
87
return console.log(message)
@@ -12,11 +11,9 @@ const log = (message: any) => {
1211
}
1312

1413
export const logger = {
15-
// eslint-disable-next-line @typescript-eslint/no-explicit-any
1614
debug: (message: any) => {
1715
if (LOG_LEVEL === 'DEBUG') log(message)
1816
},
19-
// eslint-disable-next-line @typescript-eslint/no-explicit-any
2017
info: (message: any) => {
2118
log(message)
2219
},

core/src/specs/fetched-record.spec.ts

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,4 +120,24 @@ describe('fromNatsMessage', () => {
120120

121121
expect(result).toStrictEqual(null)
122122
})
123+
124+
test('customizes changeAttributes with changeAttributesOverride', () => {
125+
const subject = 'bemi-subject'
126+
const natsMessage = buildNatsMessage({ subject, streamSequence: 1, data: MESSAGE_DATA.CREATE })
127+
128+
const result = FetchedRecord.fromNatsMessage(natsMessage, {
129+
changeAttributesOverride: (changeAttributes) => ({
130+
...changeAttributes,
131+
primaryKey: 'custom-primary-key',
132+
}),
133+
})
134+
135+
expect(result).toStrictEqual(
136+
new FetchedRecord({
137+
subject,
138+
streamSequence: 1,
139+
changeAttributes: { ...CHANGE_ATTRIBUTES.CREATE, primaryKey: 'custom-primary-key' },
140+
}),
141+
)
142+
})
123143
})

core/src/specs/fixtures/nats-messages.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,6 @@ export const buildNatsMessage = ({
220220
streamSequence,
221221
}: {
222222
subject: string
223-
// eslint-disable-next-line @typescript-eslint/no-explicit-any
224223
data: any
225224
streamSequence: number
226225
}): JsMsg => ({

docs/docs/changelog.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,10 @@ keywords: ['Bemi Changelog', 'Bemi New Features', 'Postgres Audit Trails', 'Chan
1515
* Allow setting and customizing [Ignore Column Rules](https://docs.bemi.io/postgresql/source-database#ignoring-by-columns)
1616
* Platform
1717
* Create PG publications limited to specific tables with selective tracking
18+
* [Bemi Core](https://github.com/BemiHQ/bemi)
19+
* Allow customizing parsed change attributes with an override function
20+
* [Bemi Prisma](https://github.com/BemiHQ/bemi-prisma)
21+
* Fix compatibility with Prisma v5.20+ driver adapter
1822

1923
## 2024-09
2024

0 commit comments

Comments
 (0)