Skip to content
This repository was archived by the owner on May 17, 2025. It is now read-only.

Allow nested filtering #273

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 79 additions & 16 deletions lib/pubsub/getFilteredSubs-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,27 @@ import { collapseKeys, getFilteredSubs } from './getFilteredSubs'
describe('collapseKeys', () => {
it('makes the deep objects into dots', () => {
assert.deepEqual(collapseKeys({}), {})
assert.deepEqual(collapseKeys({ a: 4, b: { c: 5, d: 'hi', e: { f: false } } }), {
a: 4,
'b.c': 5,
'b.d': 'hi',
'b.e.f': false,
})
assert.deepEqual(collapseKeys({ a: [1, 2, 3, { b: 4, c: [], d: null, e: undefined }] }), {
'a.0': 1,
'a.1': 2,
'a.2': 3,
'a.3.b': 4,
})
assert.deepEqual(
collapseKeys({ a: 4, b: { c: 5, d: 'hi', e: { f: false } } }),
{
a: 4,
'b.c': 5,
'b.d': 'hi',
'b.e.f': false,
},
)
assert.deepEqual(
collapseKeys({ a: [1, 2, 3, { b: 4, c: [], d: null, e: undefined }] }),
{
'a.0': 1,
'a.1': 2,
'a.2': 3,
'a.3.b': 4,
},
)
})
})


// since we're not resetting the db every time we need to change this
let count = 1
const makeTopic = () => `topic-${count++}`
Expand Down Expand Up @@ -53,7 +58,13 @@ describe('getFilteredSubs', () => {
}

await server.models.subscription.put(subscription)
assert.containSubset(await getFilteredSubs({ server, event: { topic, payload: { language: 'en' } } }), [{ topic, id: '1' }])
assert.containSubset(
await getFilteredSubs({
server,
event: { topic, payload: { language: 'en' } },
}),
[{ topic, id: '1' }],
)
})

it('can match on payload', async () => {
Expand All @@ -74,8 +85,60 @@ describe('getFilteredSubs', () => {

await server.models.subscription.put(subscription)

assert.containSubset(await getFilteredSubs({ server, event: { topic, payload: { language: 'en' } } }), [{ topic, id: '2' }])
assert.deepEqual(await getFilteredSubs({ server, event: { topic, payload: { language: 'en-gb' } } }), [])
assert.containSubset(
await getFilteredSubs({
server,
event: { topic, payload: { language: 'en' } },
}),
[{ topic, id: '2' }],
)
assert.deepEqual(
await getFilteredSubs({
server,
event: { topic, payload: { language: 'en-gb' } },
}),
[],
)
})

it('can match on nested payload', async () => {
const topic = makeTopic()
const server = await mockServerContext()
const subscription = {
id: '2',
topic,
filter: { meta: { user: 'foo' }, message: { content: 'hi' } },
subscriptionId: '2',
subscription: {} as any,
connectionId: 'abcd',
connectionInitPayload: {},
requestContext: {} as any,
ttl: Math.floor(Date.now() / 1000) + 100000,
createdAt: Date.now(),
}

await server.models.subscription.put(subscription)

assert.containSubset(
await getFilteredSubs({
server,
event: {
topic,
payload: { meta: { user: 'foo' }, message: { content: 'hi' } },
},
}),
[{ topic, id: '2' }],
)
assert.deepEqual(
await getFilteredSubs({
server,
event: {
topic,
payload: { meta: { user: 'lol' }, message: { content: 'bye' } },
},
}),
[],
)
})

it('can match on no payload', async () => {
Expand Down
39 changes: 33 additions & 6 deletions lib/pubsub/getFilteredSubs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,13 @@
import { collect } from 'streaming-iterables'
import { ServerClosure, Subscription } from '../types'

export const getFilteredSubs = async ({ server, event }: { server: Omit<ServerClosure, 'gateway'>, event: { topic: string, payload?: Record<string, any> } }): Promise<Subscription[]> => {
export const getFilteredSubs = async ({
server,
event,
}: {
server: Omit<ServerClosure, 'gateway'>
event: { topic: string, payload?: Record<string, any> }
}): Promise<Subscription[]> => {
if (!event.payload || Object.keys(event.payload).length === 0) {
server.log('getFilteredSubs', { event })

Expand All @@ -18,18 +24,35 @@ export const getFilteredSubs = async ({ server, event }: { server: Omit<ServerCl
const flattenPayload = collapseKeys(event.payload)

const filterExpressions: string[] = []
const expressionAttributeValues: { [key: string]: string | number | boolean } = {}
const expressionAttributeValues: {
[key: string]: string | number | boolean
} = {}
const expressionAttributeNames: { [key: string]: string } = {}

let attributeCounter = 0
for (const [key, value] of Object.entries(flattenPayload)) {
const aliasNumber = attributeCounter++
expressionAttributeNames[`#${aliasNumber}`] = key
const keyParts = key.split('.')
const keyPartsAttributeName = keyParts
.map((part, index) => `#${aliasNumber + index}`)
.join('.')
key.split('.').forEach((keyPart, index) => {
expressionAttributeNames[`#${aliasNumber + index}`] = keyPart
attributeCounter += index
})
expressionAttributeValues[`:${aliasNumber}`] = value
filterExpressions.push(`(#filter.#${aliasNumber} = :${aliasNumber} OR attribute_not_exists(#filter.#${aliasNumber}))`)

filterExpressions.push(
`(#filter.${keyPartsAttributeName} = :${aliasNumber} OR attribute_not_exists(#filter.${keyPartsAttributeName}))`,
)
}

server.log('getFilteredSubs', { event, expressionAttributeNames, expressionAttributeValues, filterExpressions })
server.log('getFilteredSubs', {
event,
expressionAttributeNames,
expressionAttributeValues,
filterExpressions,
})

const iterator = server.models.subscription.query({
IndexName: 'TopicIndex',
Expand All @@ -54,7 +77,11 @@ export const collapseKeys = (
): Record<string, number | string | boolean> => {
const record = {}
for (const [k1, v1] of Object.entries(obj)) {
if (typeof v1 === 'string' || typeof v1 === 'number' || typeof v1 === 'boolean') {
if (
typeof v1 === 'string' ||
typeof v1 === 'number' ||
typeof v1 === 'boolean'
) {
record[k1] = v1
continue
}
Expand Down