Skip to content

Commit 1241aa1

Browse files
committed
put versions on all the things
1 parent 16e949d commit 1241aa1

File tree

15 files changed

+72
-63
lines changed

15 files changed

+72
-63
lines changed

dev-tools/omdb/src/bin/omdb/db/alert.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1012,6 +1012,7 @@ async fn cmd_db_alert_info(
10121012
class,
10131013
payload,
10141014
num_dispatched,
1015+
schema_version,
10151016
} = alert;
10161017

10171018
const CLASS: &str = "class";
@@ -1029,7 +1030,7 @@ async fn cmd_db_alert_info(
10291030

10301031
println!("\n{:=<80}", "== ALERT ");
10311032
println!(" {ID:>WIDTH$}: {id:?}");
1032-
println!(" {CLASS:>WIDTH$}: {class}");
1033+
println!(" {CLASS:>WIDTH$}: {class} (v{schema_version})");
10331034
println!(" {TIME_CREATED:>WIDTH$}: {time_created}");
10341035
println!(" {TIME_MODIFIED:>WIDTH$}: {time_modified}");
10351036
println!();

nexus/db-model/src/alert.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
44

55
use crate::AlertClass;
6+
use crate::SqlU32;
67
use chrono::{DateTime, Utc};
78
use db_macros::Asset;
89
use nexus_db_schema::schema::alert;
@@ -40,6 +41,9 @@ pub struct Alert {
4041
pub payload: serde_json::Value,
4142

4243
pub num_dispatched: i64,
44+
45+
/// The version of the JSON schema for `payload`.
46+
pub schema_version: SqlU32,
4347
}
4448

4549
impl Alert {

nexus/db-queries/src/db/datastore/alert.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ impl DataStore {
2626
opctx: &OpContext,
2727
id: AlertUuid,
2828
class: AlertClass,
29+
version: u32,
2930
payload: serde_json::Value,
3031
) -> CreateResult<Alert> {
3132
let conn = self.pool_connection_authorized(&opctx).await?;
@@ -36,6 +37,7 @@ impl DataStore {
3637
class,
3738
payload,
3839
num_dispatched: 0,
40+
schema_version: version.into(),
3941
})
4042
.returning(Alert::as_returning())
4143
.get_result_async(&*conn)

nexus/db-queries/src/db/datastore/alert_rx.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1208,7 +1208,7 @@ mod test {
12081208
) -> (authz::Alert, crate::db::model::Alert) {
12091209
let id = AlertUuid::new_v4();
12101210
datastore
1211-
.alert_create(opctx, id, alert_class, serde_json::json!({}))
1211+
.alert_create(opctx, id, alert_class, 1, serde_json::json!({}))
12121212
.await
12131213
.expect("cant create ye event");
12141214
LookupPath::new(opctx, datastore).alert_id(id).fetch().await.expect(

nexus/db-queries/src/db/datastore/webhook_delivery.rs

Lines changed: 8 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -52,15 +52,6 @@ pub struct DeliveryConfig {
5252
pub lease_timeout: TimeDelta,
5353
}
5454

55-
/// A record from the [`WebhookDelivery`] table along with the event class and
56-
/// data of the corresponding [`Alert`] record.
57-
#[derive(Debug, Clone)]
58-
pub struct DeliveryAndEvent {
59-
pub delivery: WebhookDelivery,
60-
pub alert_class: AlertClass,
61-
pub event: serde_json::Value,
62-
}
63-
6455
impl DataStore {
6556
pub async fn webhook_delivery_create_batch(
6657
&self,
@@ -191,12 +182,11 @@ impl DataStore {
191182
opctx: &OpContext,
192183
rx_id: &AlertReceiverUuid,
193184
cfg: &DeliveryConfig,
194-
) -> Result<impl ExactSizeIterator<Item = DeliveryAndEvent> + 'static, Error>
195-
{
185+
) -> ListResultVec<(WebhookDelivery, Alert)> {
196186
let conn = self.pool_connection_authorized(opctx).await?;
197187
let now =
198188
diesel::dsl::now.into_sql::<diesel::pg::sql_types::Timestamptz>();
199-
let rows = dsl::webhook_delivery
189+
dsl::webhook_delivery
200190
// Filter out deliveries triggered by probe requests, as those are
201191
// executed synchronously by the probe endpoint, rather than by the
202192
// webhook deliverator.
@@ -232,20 +222,14 @@ impl DataStore {
232222
)),
233223
)
234224
.order_by(dsl::time_created.asc())
235-
// Join with the `webhook_event` table to get the event class, which
236-
// is necessary to construct delivery requests.
225+
// Join with the `alert` table to get the alert, including its
226+
// class, payload version, and payload (which are necessary to
227+
// construct delivery requests).let rows =
237228
.inner_join(alert_dsl::alert.on(alert_dsl::id.eq(dsl::alert_id)))
238-
.select((
239-
WebhookDelivery::as_select(),
240-
alert_dsl::alert_class,
241-
alert_dsl::payload,
242-
))
229+
.select((WebhookDelivery::as_select(), Alert::as_select()))
243230
.load_async(&*conn)
244231
.await
245-
.map_err(|e| public_error_from_diesel(e, ErrorHandler::Server))?;
246-
Ok(rows.into_iter().map(|(delivery, alert_class, event)| {
247-
DeliveryAndEvent { delivery, alert_class, event }
248-
}))
232+
.map_err(|e| public_error_from_diesel(e, ErrorHandler::Server))
249233
}
250234

251235
pub async fn webhook_delivery_start_attempt(
@@ -488,6 +472,7 @@ mod test {
488472
&opctx,
489473
alert_id,
490474
AlertClass::TestFoo,
475+
1,
491476
serde_json::json!({
492477
"answer": 42,
493478
}),

nexus/db-schema/src/schema.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2362,6 +2362,7 @@ table! {
23622362
payload -> Jsonb,
23632363
time_dispatched -> Nullable<Timestamptz>,
23642364
num_dispatched -> Int8,
2365+
schema_version -> Int8,
23652366
}
23662367
}
23672368

nexus/src/app/alert.rs

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ use nexus_db_lookup::LookupPath;
150150
use nexus_db_lookup::lookup;
151151
use nexus_db_queries::authz;
152152
use nexus_db_queries::context::OpContext;
153-
use nexus_db_queries::db::model::Alert;
153+
use nexus_db_queries::db::model::Alert as DbAlert;
154154
use nexus_db_queries::db::model::AlertClass;
155155
use nexus_db_queries::db::model::AlertDeliveryState;
156156
use nexus_db_queries::db::model::AlertDeliveryTrigger;
@@ -182,12 +182,12 @@ impl nexus_alerts::PublishAlert for Nexus {
182182
opctx: &OpContext,
183183
id: AlertUuid,
184184
alert: A,
185-
) -> Result<Alert, Error> {
185+
) -> Result<DbAlert, Error> {
186186
self.alert_publish(opctx, id, alert).await
187187
}
188188
}
189189

190-
pub(crate) fn alert_schemas() -> AlertSchemaRegistry {
190+
pub(crate) fn schemas() -> AlertSchemaRegistry {
191191
let mut registry = AlertSchemaRegistry::new();
192192

193193
#[cfg(debug_assertions)]
@@ -217,7 +217,7 @@ impl Nexus {
217217
opctx: &OpContext,
218218
id: AlertUuid,
219219
alert: A,
220-
) -> Result<Alert, Error> {
220+
) -> Result<DbAlert, Error> {
221221
#[cfg(debug_assertions)]
222222
{
223223
// In test builds, assert that this is a schema that we know about.
@@ -229,8 +229,8 @@ impl Nexus {
229229
None => panic!(
230230
"You have attempted to publish an alert type whose class \
231231
was not added to the alert schema registry in \
232-
`nexus::app::alert::alert_schemas()`! This means that \
233-
the alert type's schema will not be included in the \
232+
`nexus::app::alert::schemas()`! This means that the \
233+
alert type's schema will not be included in the \
234234
alert JSON schema. This is probably a mistake. Since I \
235235
am a test build, I will now panic!\n \
236236
alert class: {}\n \
@@ -244,7 +244,7 @@ impl Nexus {
244244
panic!(
245245
"You have attempted to publish an alert type whose schema \
246246
version is not present in the alert schema registry in \
247-
`nexus::app::alert::alert_schemas()`! This is probably a \
247+
`nexus::app::alert::schemas()`! This is probably a \
248248
mistake. Since I am a test build, I will now panic!\n \
249249
alert class: {}\n \
250250
alert version: {}",
@@ -255,7 +255,7 @@ impl Nexus {
255255
}
256256

257257
let payload =
258-
serde_json::to_value(event).map_err(|e| Error::InternalError {
258+
serde_json::to_value(&alert).map_err(|e| Error::InternalError {
259259
internal_message: format!(
260260
"failed to convert {} (class: {} v{}) to JSON: {e}",
261261
std::any::type_name::<A>(),
@@ -264,8 +264,10 @@ impl Nexus {
264264
),
265265
})?;
266266

267-
let alert =
268-
self.datastore().alert_create(opctx, id, class, payload).await?;
267+
let alert = self
268+
.datastore()
269+
.alert_create(opctx, id, A::CLASS, A::VERSION, payload)
270+
.await?;
269271

270272
slog::debug!(
271273
&opctx.log,

nexus/src/app/background/tasks/alert_dispatcher.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -454,7 +454,8 @@ mod test {
454454
&opctx,
455455
alert_id,
456456
db::model::AlertClass::TestQuuxBar,
457-
serde_json::json!({"msg": "help im trapped in a webhook event factory"}),
457+
1,
458+
serde_json::json!({"msg": "help im trapped in an alert factory"}),
458459
)
459460
.await
460461
.expect("creating the event should work");

nexus/src/app/background/tasks/webhook_deliverator.rs

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,9 @@ use crate::app::webhook::ReceiverClient;
3535
use futures::future::BoxFuture;
3636
use nexus_db_queries::context::OpContext;
3737
use nexus_db_queries::db::DataStore;
38-
use nexus_db_queries::db::datastore::webhook_delivery::DeliveryAndEvent;
3938
use nexus_db_queries::db::datastore::webhook_delivery::DeliveryAttemptState;
4039
pub use nexus_db_queries::db::datastore::webhook_delivery::DeliveryConfig;
40+
use nexus_db_queries::db::model::Alert;
4141
use nexus_db_queries::db::model::WebhookDeliveryAttemptResult;
4242
use nexus_db_queries::db::model::WebhookReceiverConfig;
4343
use nexus_db_queries::db::pagination::Paginator;
@@ -231,7 +231,9 @@ impl WebhookDeliverator {
231231
..Default::default()
232232
};
233233

234-
for DeliveryAndEvent { delivery, alert_class, event } in deliveries {
234+
for (delivery, alert) in deliveries {
235+
let Alert { class, payload, schema_version, .. } = alert;
236+
let version = schema_version.into();
235237
let attempt = (*delivery.attempts) + 1;
236238
let delivery_id = WebhookDeliveryUuid::from(delivery.id);
237239
match self
@@ -248,7 +250,8 @@ impl WebhookDeliverator {
248250
slog::trace!(&opctx.log,
249251
"webhook event delivery attempt started";
250252
"alert_id" => %delivery.alert_id,
251-
"alert_class" => %alert_class,
253+
"alert_class" => %class,
254+
"alert_version" => %version,
252255
"delivery_id" => %delivery_id,
253256
"attempt" => ?attempt,
254257
);
@@ -259,7 +262,8 @@ impl WebhookDeliverator {
259262
"delivery of this webhook event was already completed \
260263
at {time:?}";
261264
"alert_id" => %delivery.alert_id,
262-
"alert_class" => %alert_class,
265+
"alert_class" => %class,
266+
"alert_version" => %version,
263267
"delivery_id" => %delivery_id,
264268
"time_completed" => ?time,
265269
);
@@ -272,7 +276,8 @@ impl WebhookDeliverator {
272276
"delivery of this webhook event is in progress by \
273277
another Nexus";
274278
"alert_id" => %delivery.alert_id,
275-
"alert_class" => %alert_class,
279+
"alert_class" => %class,
280+
"alert_version" => %version,
276281
"delivery_id" => %delivery_id,
277282
"nexus_id" => %nexus_id,
278283
"time_started" => ?started,
@@ -286,7 +291,8 @@ impl WebhookDeliverator {
286291
"unexpected database error error starting webhook \
287292
delivery attempt";
288293
"alert_id" => %delivery.alert_id,
289-
"alert_class" => %alert_class,
294+
"alert_class" => %class,
295+
"alert_version" => %version,
290296
"delivery_id" => %delivery_id,
291297
"error" => %error,
292298
);
@@ -299,7 +305,9 @@ impl WebhookDeliverator {
299305

300306
// okay, actually do the thing...
301307
let delivery_attempt = match client
302-
.send_delivery_request(opctx, &delivery, alert_class, &event)
308+
.send_delivery_request(
309+
opctx, &delivery, class, version, &payload,
310+
)
303311
.await
304312
{
305313
Ok(delivery) => delivery,
@@ -326,7 +334,8 @@ impl WebhookDeliverator {
326334
&opctx.log,
327335
"{MSG}";
328336
"alert_id" => %delivery.alert_id,
329-
"alert_class" => %alert_class,
337+
"alert_class" => %class,
338+
"alert_version" => %version,
330339
"delivery_id" => %delivery_id,
331340
"error" => %e,
332341
);

nexus/src/app/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,9 @@ pub struct Nexus {
269269

270270
/// reports status of pending MGS-managed updates
271271
mgs_update_status_rx: watch::Receiver<MgsUpdateDriverStatus>,
272+
273+
/// Collection of JSON schemas for alert classes and versions.
274+
alert_schemas: alert::AlertSchemaRegistry,
272275
}
273276

274277
impl Nexus {
@@ -496,6 +499,7 @@ impl Nexus {
496499
)),
497500
tuf_artifact_replication_tx,
498501
mgs_update_status_rx,
502+
alert_schemas: alert::schemas(),
499503
};
500504

501505
// TODO-cleanup all the extra Arcs here seems wrong

nexus/src/app/webhook.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,7 @@ impl Nexus {
196196
LazyLock::new(|| serde_json::json!({}));
197197

198198
let attempt = match client
199-
.send_delivery_request(opctx, &delivery, CLASS, &DATA)
199+
.send_delivery_request(opctx, &delivery, CLASS, 1, &DATA)
200200
.await
201201
{
202202
Ok(attempt) => attempt,
@@ -376,6 +376,7 @@ impl<'a> ReceiverClient<'a> {
376376
opctx: &OpContext,
377377
delivery: &WebhookDelivery,
378378
alert_class: AlertClass,
379+
version: u32,
379380
data: &serde_json::Value,
380381
) -> Result<WebhookDeliveryAttempt, anyhow::Error> {
381382
const HDR_DELIVERY_ID: HeaderName =
@@ -437,6 +438,7 @@ impl<'a> ReceiverClient<'a> {
437438
"webhook {MSG}";
438439
"alert_id" => %delivery.alert_id,
439440
"alert_class" => %alert_class,
441+
"alert_version" => version,
440442
"delivery_id" => %delivery.id,
441443
"delivery_trigger" => %delivery.triggered_by,
442444
"error" => %e,

nexus/tests/integration_tests/webhooks.rs

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1711,11 +1711,7 @@ async fn subscription_remove_test(
17111711
// Publish an event. This should be received, as it matches the subscription
17121712
// we are about to delete.
17131713
let event = nexus
1714-
.alert_publish(
1715-
&opctx,
1716-
id1,
1717-
test_alerts::FooBar { hello: "joe" }),
1718-
)
1714+
.alert_publish(&opctx, id1, test_alerts::FooBar { hello: "joe" })
17191715
.await
17201716
.expect("event should be published successfully");
17211717
dbg!(event);
@@ -1742,11 +1738,7 @@ async fn subscription_remove_test(
17421738
// Publish an event. This one should not be received, as we are no longer
17431739
// subscribed to its event class.
17441740
let event = nexus
1745-
.alert_publish(
1746-
&opctx,
1747-
id2,
1748-
test_alerts::FooBar { hello: "robert" }
1749-
)
1741+
.alert_publish(&opctx, id2, test_alerts::FooBar { hello: "robert" })
17501742
.await
17511743
.expect("event should be published successfully");
17521744
dbg!(event);
@@ -1788,11 +1780,7 @@ async fn subscription_remove_test(
17881780
};
17891781

17901782
let event = nexus
1791-
.alert_publish(
1792-
&opctx,
1793-
id3,
1794-
test_alerts::Foo { hello_world: true }
1795-
)
1783+
.alert_publish(&opctx, id3, test_alerts::Foo { hello_world: true })
17961784
.await
17971785
.expect("event should be published successfully");
17981786
dbg!(event);
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
ALTER TABLE omicron.public.alert
2+
ADD COLUMN IF NOT EXISTS schema_version INT8 NOT NULL DEFAULT 1;
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
ALTER TABLE omicron.public.alert
2+
ALTER COLUMN schema_version
3+
DROP DEFAULT;

0 commit comments

Comments
 (0)