Skip to content
This repository was archived by the owner on May 17, 2025. It is now read-only.

feat: add configuration to exclude keys from events when filtering subs #330

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions docs/interfaces/SubscriptionServer.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@

### complete

• **complete**: (`event`: { `payload?`: `Record`<`string`, `any`\> ; `topic`: `string` }) => `Promise`<`void`\>
• **complete**: (`event`: { `payload?`: `Record`<`string`, `any`\> ; `topic`: `string` }, `excludeKeys?`: `string`[]) => `Promise`<`void`\>

#### Type declaration

▸ (`event`): `Promise`<`void`\>
▸ (`event`, `excludeKeys`): `Promise`<`void`\>

Send a complete message and end all relevant subscriptions. This might take some time depending on how many subscriptions there are.

Expand All @@ -32,6 +32,7 @@ The payload if present will be used to match against any filters the subscriptio
| `event` | `Object` |
| `event.payload?` | `Record`<`string`, `any`\> |
| `event.topic` | `string` |
| `excludeKeys?` | `string`[] |

##### Returns

Expand All @@ -41,11 +42,11 @@ ___

### publish

• **publish**: (`event`: { `payload`: `Record`<`string`, `any`\> ; `topic`: `string` }) => `Promise`<`void`\>
• **publish**: (`event`: { `payload`: `Record`<`string`, `any`\> ; `topic`: `string` }, `excludeKeys?`: `string`[]) => `Promise`<`void`\>

#### Type declaration

▸ (`event`): `Promise`<`void`\>
▸ (`event`, `excludeKeys`): `Promise`<`void`\>

Publish an event to all relevant subscriptions. This might take some time depending on how many subscriptions there are.

Expand All @@ -58,6 +59,7 @@ The payload if present will be used to match against any filters the subscriptio
| `event` | `Object` |
| `event.payload` | `Record`<`string`, `any`\> |
| `event.topic` | `string` |
| `excludeKeys?` | `string`[] |

##### Returns

Expand Down
4 changes: 2 additions & 2 deletions lib/pubsub/complete.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ import { getResolverAndArgs } from '../utils/getResolverAndArgs'
import { isArray } from '../utils/isArray'
import { getFilteredSubs } from './getFilteredSubs'

export const complete = (serverPromise: Promise<ServerClosure> | ServerClosure): SubscriptionServer['complete'] => async event => {
export const complete = (serverPromise: Promise<ServerClosure> | ServerClosure): SubscriptionServer['complete'] => async (event, excludeKeys) => {
const server = await serverPromise
const subscriptions = await getFilteredSubs({ server, event })
const subscriptions = await getFilteredSubs({ server, event, excludeKeys })
server.log('pubsub:complete', { event, subscriptions })

const iters = subscriptions.map(async (sub) => {
Expand Down
15 changes: 15 additions & 0 deletions lib/pubsub/getFilteredSubs-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,21 @@ describe('collapseKeys', () => {
'a.3.b': 4,
})
})

it('excludes excluded keys', () => {
assert.deepEqual(collapseKeys({ a: 4, b: { c: 5, d: 'hi', e: { f: false } } }, ['a', 'b.d']), {
'b.c': 5,
'b.e.f': false,
})
assert.deepEqual(collapseKeys({ a: [1, 2, 3, { b: 4, c: [], d: null, e: undefined }], f: { g: [{ h: 5 }, { i: 6 }], j: [7, 8, 9] } }, ['f.g', 'f.j.1']), {
'a.0': 1,
'a.1': 2,
'a.2': 3,
'a.3.b': 4,
'f.j.0': 7,
'f.j.2': 9,
})
})
})


Expand Down
29 changes: 15 additions & 14 deletions lib/pubsub/getFilteredSubs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
import { collect } from 'streaming-iterables'
import { ServerClosure, Subscription } from '../types'

export const getFilteredSubs = async ({ server, event }: { server: Omit<ServerClosure, 'gateway'>, event: { topic: string, payload?: Record<string, any> } }): Promise<Subscription[]> => {
export const getFilteredSubs = async ({ server, event, excludeKeys = [] }: { server: Omit<ServerClosure, 'gateway'>, event: { topic: string, payload?: Record<string, any> }, excludeKeys?: string[] }): Promise<Subscription[]> => {
if (!event.payload || Object.keys(event.payload).length === 0) {
server.log('getFilteredSubs', { event })
server.log('getFilteredSubs', { event, excludeKeys })

const iterator = server.models.subscription.query({
IndexName: 'TopicIndex',
Expand All @@ -15,7 +15,7 @@ export const getFilteredSubs = async ({ server, event }: { server: Omit<ServerCl

return await collect(iterator)
}
const flattenPayload = collapseKeys(event.payload)
const flattenPayload = collapseKeys(event.payload, excludeKeys)

const filterExpressions: string[] = []
const expressionAttributeValues: { [key: string]: string | number | boolean } = {}
Expand All @@ -29,7 +29,7 @@ export const getFilteredSubs = async ({ server, event }: { server: Omit<ServerCl
filterExpressions.push(`(#filter.#${aliasNumber} = :${aliasNumber} OR attribute_not_exists(#filter.#${aliasNumber}))`)
}

server.log('getFilteredSubs', { event, expressionAttributeNames, expressionAttributeValues, filterExpressions })
server.log('getFilteredSubs', { event, excludeKeys, expressionAttributeNames, expressionAttributeValues, filterExpressions })

const iterator = server.models.subscription.query({
IndexName: 'TopicIndex',
Expand All @@ -51,24 +51,25 @@ export const getFilteredSubs = async ({ server, event }: { server: Omit<ServerCl

export const collapseKeys = (
obj: Record<string, any>,
excludeKeys: string[] = [],
parent: string[] = [],
): Record<string, number | string | boolean> => {
const record = {}
for (const [k1, v1] of Object.entries(obj)) {
const path = [...parent, k1]
const key = path.join('.')
if (excludeKeys.includes(key)) {
continue
}
if (typeof v1 === 'string' || typeof v1 === 'number' || typeof v1 === 'boolean') {
record[k1] = v1
record[key] = v1
continue
}

if (v1 && typeof v1 === 'object') {
const next = {}

for (const [k2, v2] of Object.entries(v1)) {
next[`${k1}.${k2}`] = v2
}

for (const [k1, v1] of Object.entries(collapseKeys(next))) {
record[k1] = v1
for (const [k2, v2] of Object.entries(collapseKeys(v1, excludeKeys, path))) {
record[k2] = v2
}
continue
}
}
return record
Expand Down
6 changes: 3 additions & 3 deletions lib/pubsub/publish.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ import { postToConnection } from '../utils/postToConnection'
import { buildContext } from '../utils/buildContext'
import { getFilteredSubs } from './getFilteredSubs'

export const publish = (serverPromise: Promise<ServerClosure> | ServerClosure): SubscriptionServer['publish'] => async event => {
export const publish = (serverPromise: Promise<ServerClosure> | ServerClosure): SubscriptionServer['publish'] => async (event, excludeKeys) => {
const server = await serverPromise
server.log('pubsub:publish', { event })
const subscriptions = await getFilteredSubs({ server, event })
server.log('pubsub:publish', { event, excludeKeys })
const subscriptions = await getFilteredSubs({ server, event, excludeKeys })
server.log('pubsub:publish', { subscriptions: subscriptions.map(({ connectionId, filter, subscription }) => ({ connectionId, filter, subscription }) ) })

const iters = subscriptions.map(async (sub) => {
Expand Down
4 changes: 2 additions & 2 deletions lib/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,13 @@ export interface SubscriptionServer {
*
* The payload if present will be used to match against any filters the subscriptions might have.
*/
publish: (event: { topic: string, payload: Record<string, any>}) => Promise<void>
publish: (event: { topic: string, payload: Record<string, any>}, excludeKeys?: string[]) => Promise<void>
/**
* Send a complete message and end all relevant subscriptions. This might take some time depending on how many subscriptions there are.
*
* The payload if present will be used to match against any filters the subscriptions might have.
*/
complete: (event: { topic: string, payload?: Record<string, any> }) => Promise<void>
complete: (event: { topic: string, payload?: Record<string, any> }, excludeKeys?: string[]) => Promise<void>
}

/**
Expand Down