Skip to content

Commit a7720aa

Browse files
committed
feat(control-plane): add support for handling multiple events in a single invocation
Currently we restrict the `scale-up` Lambda to only handle a single event at a time. In very busy environments this can prove to be a bottleneck: there are calls to GitHub and AWS APIs that happen each time, and they can end up taking long enough that we can't process job queued events faster than they arrive. In our environment we are also using a pool, and typically we have responded to the alerts generated by this (SQS queue length growing) by expanding the size of the pool. This helps because we will more frequently find that we don't need to scale up, which allows the lambdas to exit a bit earlier, so we can get through the queue faster. But it makes the environment much less responsive to changes in usage patterns. At its core, this Lambda's task is to construct an EC2 `CreateFleet` call to create instances, after working out how many are needed. This is a job that can be batched. We can take any number of events, calculate the diff between our current state and the number of jobs we have, capping at the maximum, and then issue a single call. The thing to be careful about is how to handle partial failures, if EC2 creates some of the instances we wanted but not all of them. Lambda has a configurable function response type which can be set to `ReportBatchItemFailures`. In this mode, we return a list of failed messages from our handler and those are retried. We can make use of this to give back as many events as we failed to process. Now we're potentially processing multiple events in a single Lambda, one thing we should optimise for is not recreating GitHub API clients. We need one client for the app itself, which we use to find out installation IDs, and then one client for each installation which is relevant to the batch of events we are processing. This is done by creating a new client the first time we see an event for a given installation. We also remove the same `batch_size = 1` constraint from the `job-retry` Lambda. This Lambda is used to retry events that previously failed. However, instead of reporting failures to be retried, here we maintain the pre-existing fault-tolerant behaviour where errors are logged but explicitly do not cause message retries, avoiding infinite loops from persistent GitHub API issues or malformed events. Tests are added for all of this.
1 parent 63ed8b6 commit a7720aa

File tree

10 files changed

+1288
-287
lines changed

10 files changed

+1288
-287
lines changed

lambdas/functions/control-plane/src/lambda.test.ts

Lines changed: 147 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -70,19 +70,33 @@ vi.mock('@aws-github-runner/aws-powertools-util');
7070
vi.mock('@aws-github-runner/aws-ssm-util');
7171

7272
describe('Test scale up lambda wrapper.', () => {
73-
it('Do not handle multiple record sets.', async () => {
74-
await testInvalidRecords([sqsRecord, sqsRecord]);
73+
it('Do not handle empty record sets.', async () => {
74+
const sqsEventMultipleRecords: SQSEvent = {
75+
Records: [],
76+
};
77+
78+
await expect(scaleUpHandler(sqsEventMultipleRecords, context)).resolves.not.toThrow();
7579
});
7680

77-
it('Do not handle empty record sets.', async () => {
78-
await testInvalidRecords([]);
81+
it('Ignores non-sqs event sources.', async () => {
82+
const record = {
83+
...sqsRecord,
84+
eventSource: 'aws:non-sqs',
85+
};
86+
87+
const sqsEventMultipleRecordsNonSQS: SQSEvent = {
88+
Records: [record],
89+
};
90+
91+
await expect(scaleUpHandler(sqsEventMultipleRecordsNonSQS, context)).resolves.not.toThrow();
92+
expect(scaleUp).toHaveBeenCalledWith([]);
7993
});
8094

8195
it('Scale without error should resolve.', async () => {
8296
const mock = vi.fn(scaleUp);
8397
mock.mockImplementation(() => {
8498
return new Promise((resolve) => {
85-
resolve();
99+
resolve([]);
86100
});
87101
});
88102
await expect(scaleUpHandler(sqsEvent, context)).resolves.not.toThrow();
@@ -104,28 +118,137 @@ describe('Test scale up lambda wrapper.', () => {
104118
vi.mocked(scaleUp).mockImplementation(mock);
105119
await expect(scaleUpHandler(sqsEvent, context)).rejects.toThrow(error);
106120
});
107-
});
108121

109-
async function testInvalidRecords(sqsRecords: SQSRecord[]) {
110-
const mock = vi.fn(scaleUp);
111-
const logWarnSpy = vi.spyOn(logger, 'warn');
112-
mock.mockImplementation(() => {
113-
return new Promise((resolve) => {
114-
resolve();
122+
describe('Batch processing', () => {
123+
beforeEach(() => {
124+
vi.clearAllMocks();
125+
});
126+
127+
const createMultipleRecords = (count: number, eventSource = 'aws:sqs'): SQSRecord[] => {
128+
return Array.from({ length: count }, (_, i) => ({
129+
...sqsRecord,
130+
eventSource,
131+
messageId: `message-${i}`,
132+
body: JSON.stringify({
133+
...body,
134+
id: i + 1,
135+
}),
136+
}));
137+
};
138+
139+
it('Should handle multiple SQS records in a single invocation', async () => {
140+
const records = createMultipleRecords(3);
141+
const multiRecordEvent: SQSEvent = { Records: records };
142+
143+
const mock = vi.fn(scaleUp);
144+
mock.mockImplementation(() => Promise.resolve([]));
145+
vi.mocked(scaleUp).mockImplementation(mock);
146+
147+
await expect(scaleUpHandler(multiRecordEvent, context)).resolves.not.toThrow();
148+
expect(scaleUp).toHaveBeenCalledWith(
149+
expect.arrayContaining([
150+
expect.objectContaining({ messageId: 'message-0' }),
151+
expect.objectContaining({ messageId: 'message-1' }),
152+
expect.objectContaining({ messageId: 'message-2' }),
153+
]),
154+
);
155+
});
156+
157+
it('Should return batch item failures for rejected messages', async () => {
158+
const records = createMultipleRecords(3);
159+
const multiRecordEvent: SQSEvent = { Records: records };
160+
161+
const mock = vi.fn(scaleUp);
162+
mock.mockImplementation(() => Promise.resolve(['message-1', 'message-2']));
163+
vi.mocked(scaleUp).mockImplementation(mock);
164+
165+
const result = await scaleUpHandler(multiRecordEvent, context);
166+
expect(result).toEqual({
167+
batchItemFailures: [{ itemIdentifier: 'message-1' }, { itemIdentifier: 'message-2' }],
168+
});
169+
});
170+
171+
it('Should filter out non-SQS event sources', async () => {
172+
const sqsRecords = createMultipleRecords(2, 'aws:sqs');
173+
const nonSqsRecords = createMultipleRecords(1, 'aws:sns');
174+
const mixedEvent: SQSEvent = {
175+
Records: [...sqsRecords, ...nonSqsRecords],
176+
};
177+
178+
const mock = vi.fn(scaleUp);
179+
mock.mockImplementation(() => Promise.resolve([]));
180+
vi.mocked(scaleUp).mockImplementation(mock);
181+
182+
await scaleUpHandler(mixedEvent, context);
183+
expect(scaleUp).toHaveBeenCalledWith(
184+
expect.arrayContaining([
185+
expect.objectContaining({ messageId: 'message-0' }),
186+
expect.objectContaining({ messageId: 'message-1' }),
187+
]),
188+
);
189+
expect(scaleUp).not.toHaveBeenCalledWith(
190+
expect.arrayContaining([expect.objectContaining({ messageId: 'message-2' })]),
191+
);
192+
});
193+
194+
it('Should sort messages by retry count', async () => {
195+
const records = [
196+
{
197+
...sqsRecord,
198+
messageId: 'high-retry',
199+
body: JSON.stringify({ ...body, retryCounter: 5 }),
200+
},
201+
{
202+
...sqsRecord,
203+
messageId: 'low-retry',
204+
body: JSON.stringify({ ...body, retryCounter: 1 }),
205+
},
206+
{
207+
...sqsRecord,
208+
messageId: 'no-retry',
209+
body: JSON.stringify({ ...body }),
210+
},
211+
];
212+
const multiRecordEvent: SQSEvent = { Records: records };
213+
214+
const mock = vi.fn(scaleUp);
215+
mock.mockImplementation((messages) => {
216+
// Verify messages are sorted by retry count (ascending)
217+
expect(messages[0].messageId).toBe('no-retry');
218+
expect(messages[1].messageId).toBe('low-retry');
219+
expect(messages[2].messageId).toBe('high-retry');
220+
return Promise.resolve([]);
221+
});
222+
vi.mocked(scaleUp).mockImplementation(mock);
223+
224+
await scaleUpHandler(multiRecordEvent, context);
225+
});
226+
227+
it('Should return all failed messages when scaleUp throws non-ScaleError', async () => {
228+
const records = createMultipleRecords(2);
229+
const multiRecordEvent: SQSEvent = { Records: records };
230+
231+
const mock = vi.fn(scaleUp);
232+
mock.mockImplementation(() => Promise.reject(new Error('Generic error')));
233+
vi.mocked(scaleUp).mockImplementation(mock);
234+
235+
const result = await scaleUpHandler(multiRecordEvent, context);
236+
expect(result).toEqual({ batchItemFailures: [] });
237+
});
238+
239+
it('Should throw when scaleUp throws ScaleError', async () => {
240+
const records = createMultipleRecords(2);
241+
const multiRecordEvent: SQSEvent = { Records: records };
242+
243+
const error = new ScaleError('Critical scaling error');
244+
const mock = vi.fn(scaleUp);
245+
mock.mockImplementation(() => Promise.reject(error));
246+
vi.mocked(scaleUp).mockImplementation(mock);
247+
248+
await expect(scaleUpHandler(multiRecordEvent, context)).rejects.toThrow(error);
115249
});
116250
});
117-
const sqsEventMultipleRecords: SQSEvent = {
118-
Records: sqsRecords,
119-
};
120-
121-
await expect(scaleUpHandler(sqsEventMultipleRecords, context)).resolves.not.toThrow();
122-
123-
expect(logWarnSpy).toHaveBeenCalledWith(
124-
expect.stringContaining(
125-
'Event ignored, only one record at the time can be handled, ensure the lambda batch size is set to 1.',
126-
),
127-
);
128-
}
251+
});
129252

130253
describe('Test scale down lambda wrapper.', () => {
131254
it('Scaling down no error.', async () => {

lambdas/functions/control-plane/src/lambda.ts

Lines changed: 50 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,72 @@
11
import middy from '@middy/core';
22
import { logger, setContext } from '@aws-github-runner/aws-powertools-util';
33
import { captureLambdaHandler, tracer } from '@aws-github-runner/aws-powertools-util';
4-
import { Context, SQSEvent } from 'aws-lambda';
4+
import { Context, type SQSBatchItemFailure, type SQSBatchResponse, SQSEvent } from 'aws-lambda';
55

66
import { PoolEvent, adjust } from './pool/pool';
77
import ScaleError from './scale-runners/ScaleError';
88
import { scaleDown } from './scale-runners/scale-down';
9-
import { scaleUp } from './scale-runners/scale-up';
9+
import { type ActionRequestMessage, type ActionRequestMessageSQS, scaleUp } from './scale-runners/scale-up';
1010
import { SSMCleanupOptions, cleanSSMTokens } from './scale-runners/ssm-housekeeper';
1111
import { checkAndRetryJob } from './scale-runners/job-retry';
1212

13-
export async function scaleUpHandler(event: SQSEvent, context: Context): Promise<void> {
13+
export async function scaleUpHandler(event: SQSEvent, context: Context): Promise<SQSBatchResponse> {
1414
setContext(context, 'lambda.ts');
1515
logger.logEventIfEnabled(event);
1616

17-
if (event.Records.length !== 1) {
18-
logger.warn('Event ignored, only one record at the time can be handled, ensure the lambda batch size is set to 1.');
19-
return Promise.resolve();
17+
// Group the messages by their event source. We're only interested in
18+
// `aws:sqs`-originated messages.
19+
const groupedEvents = new Map<string, ActionRequestMessageSQS[]>();
20+
for (const { body, eventSource, messageId } of event.Records) {
21+
const group = groupedEvents.get(eventSource) || [];
22+
const payload = JSON.parse(body) as ActionRequestMessage;
23+
24+
if (group.length === 0) {
25+
groupedEvents.set(eventSource, group);
26+
}
27+
28+
groupedEvents.get(eventSource)?.push({
29+
...payload,
30+
messageId,
31+
});
32+
}
33+
34+
for (const [eventSource, messages] of groupedEvents.entries()) {
35+
if (eventSource === 'aws:sqs') {
36+
continue;
37+
}
38+
39+
logger.warn('Ignoring non-sqs event source', { eventSource, messages });
2040
}
2141

42+
const sqsMessages = groupedEvents.get('aws:sqs') ?? [];
43+
44+
// Sort messages by their retry count, so that we retry the same messages if
45+
// there's a persistent failure. This should cause messages to be dropped
46+
// quicker than if we retried in an arbitrary order.
47+
sqsMessages.sort((l, r) => {
48+
return (l.retryCounter ?? 0) - (r.retryCounter ?? 0);
49+
});
50+
51+
const batchItemFailures: SQSBatchItemFailure[] = [];
52+
2253
try {
23-
await scaleUp(event.Records[0].eventSource, JSON.parse(event.Records[0].body));
24-
return Promise.resolve();
54+
const rejectedMessageIds = await scaleUp(sqsMessages);
55+
56+
for (const messageId of rejectedMessageIds) {
57+
batchItemFailures.push({
58+
itemIdentifier: messageId,
59+
});
60+
}
61+
62+
return { batchItemFailures };
2563
} catch (e) {
2664
if (e instanceof ScaleError) {
27-
return Promise.reject(e);
28-
} else {
29-
logger.warn(`Ignoring error: ${e}`);
30-
return Promise.resolve();
65+
throw e;
3166
}
67+
68+
logger.warn(`Ignoring error: ${e}`);
69+
return { batchItemFailures };
3270
}
3371
}
3472

lambdas/functions/control-plane/src/local.ts

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,12 @@
11
import { logger } from '@aws-github-runner/aws-powertools-util';
22

3-
import { ActionRequestMessage, scaleUp } from './scale-runners/scale-up';
3+
import { type ActionRequestMessageSQS, scaleUp } from './scale-runners/scale-up';
44

55
const sqsEvent = {
66
Records: [
77
{
88
messageId: 'e8d74d08-644e-42ca-bf82-a67daa6c4dad',
99
receiptHandle:
10-
// eslint-disable-next-line max-len
1110
'AQEBCpLYzDEKq4aKSJyFQCkJduSKZef8SJVOperbYyNhXqqnpFG5k74WygVAJ4O0+9nybRyeOFThvITOaS21/jeHiI5fgaM9YKuI0oGYeWCIzPQsluW5CMDmtvqv1aA8sXQ5n2x0L9MJkzgdIHTC3YWBFLQ2AxSveOyIHwW+cHLIFCAcZlOaaf0YtaLfGHGkAC4IfycmaijV8NSlzYgDuxrC9sIsWJ0bSvk5iT4ru/R4+0cjm7qZtGlc04k9xk5Fu6A+wRxMaIyiFRY+Ya19ykcevQldidmEjEWvN6CRToLgclk=',
1211
body: {
1312
repositoryName: 'self-hosted',
@@ -35,7 +34,7 @@ const sqsEvent = {
3534
};
3635

3736
export function run(): void {
38-
scaleUp(sqsEvent.Records[0].eventSource, sqsEvent.Records[0].body as ActionRequestMessage)
37+
scaleUp(sqsEvent.Records as ActionRequestMessageSQS[])
3938
.then()
4039
.catch((e) => {
4140
logger.error(e);

lambdas/functions/control-plane/src/pool/pool.test.ts

Lines changed: 4 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -188,11 +188,7 @@ describe('Test simple pool.', () => {
188188
it('Top up pool with pool size 2 registered.', async () => {
189189
await adjust({ poolSize: 3 });
190190
expect(createRunners).toHaveBeenCalledTimes(1);
191-
expect(createRunners).toHaveBeenCalledWith(
192-
expect.anything(),
193-
expect.objectContaining({ numberOfRunners: 1 }),
194-
expect.anything(),
195-
);
191+
expect(createRunners).toHaveBeenCalledWith(expect.anything(), expect.anything(), 1, expect.anything());
196192
});
197193

198194
it('Should not top up if pool size is reached.', async () => {
@@ -268,11 +264,7 @@ describe('Test simple pool.', () => {
268264
it('Top up if the pool size is set to 5', async () => {
269265
await adjust({ poolSize: 5 });
270266
// 2 idle, top up with 3 to match a pool of 5
271-
expect(createRunners).toHaveBeenCalledWith(
272-
expect.anything(),
273-
expect.objectContaining({ numberOfRunners: 3 }),
274-
expect.anything(),
275-
);
267+
expect(createRunners).toHaveBeenCalledWith(expect.anything(), expect.anything(), 3, expect.anything());
276268
});
277269
});
278270

@@ -287,11 +279,7 @@ describe('Test simple pool.', () => {
287279
it('Top up if the pool size is set to 5', async () => {
288280
await adjust({ poolSize: 5 });
289281
// 2 idle, top up with 3 to match a pool of 5
290-
expect(createRunners).toHaveBeenCalledWith(
291-
expect.anything(),
292-
expect.objectContaining({ numberOfRunners: 3 }),
293-
expect.anything(),
294-
);
282+
expect(createRunners).toHaveBeenCalledWith(expect.anything(), expect.anything(), 3, expect.anything());
295283
});
296284
});
297285

@@ -341,11 +329,7 @@ describe('Test simple pool.', () => {
341329

342330
await adjust({ poolSize: 5 });
343331
// 2 idle, 2 prefixed idle top up with 1 to match a pool of 5
344-
expect(createRunners).toHaveBeenCalledWith(
345-
expect.anything(),
346-
expect.objectContaining({ numberOfRunners: 1 }),
347-
expect.anything(),
348-
);
332+
expect(createRunners).toHaveBeenCalledWith(expect.anything(), expect.anything(), 1, expect.anything());
349333
});
350334
});
351335
});

lambdas/functions/control-plane/src/pool/pool.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,11 +92,11 @@ export async function adjust(event: PoolEvent): Promise<void> {
9292
environment,
9393
launchTemplateName,
9494
subnets,
95-
numberOfRunners: topUp,
9695
amiIdSsmParameterName,
9796
tracingEnabled,
9897
onDemandFailoverOnError,
9998
},
99+
topUp,
100100
githubInstallationClient,
101101
);
102102
} else {

0 commit comments

Comments
 (0)