Skip to content

Commit 280bd32

Browse files
committed
feat(event cache): introduce an OrderTracker for each room tracked by the event cache
The one hardship is that lazy-loading updates must NOT affect the order tracker, otherwise its internal state will be incorrect (disynchronized from the store) and thus return incorrect values upon shrink/lazy-load. In this specific case, some updates must be ignored, the same way we do it for the store using `let _ = store_updates().take()` in a few places. The author considered that a right place where to flush the pending updates was at the same time we flushed the updates-as-vector-diffs, since they would be observable at the same time.
1 parent 25e50a3 commit 280bd32

File tree

2 files changed

+328
-42
lines changed

2 files changed

+328
-42
lines changed

crates/matrix-sdk/src/event_cache/room/events.rs

Lines changed: 98 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use matrix_sdk_base::{
1919
event_cache::store::DEFAULT_CHUNK_CAPACITY,
2020
linked_chunk::{
2121
lazy_loader::{self, LazyLoaderError},
22-
ChunkContent, ChunkIdentifierGenerator, RawChunk,
22+
ChunkContent, ChunkIdentifierGenerator, OrderTracker, RawChunk,
2323
},
2424
};
2525
use matrix_sdk_common::linked_chunk::{
@@ -37,6 +37,9 @@ pub struct RoomEvents {
3737
///
3838
/// [`Update`]: matrix_sdk_base::linked_chunk::Update
3939
chunks_updates_as_vectordiffs: AsVector<Event, Gap>,
40+
41+
/// Tracker of the events ordering in this room.
42+
pub order_tracker: OrderTracker<Event, Gap>,
4043
}
4144

4245
impl Default for RoomEvents {
@@ -48,24 +51,27 @@ impl Default for RoomEvents {
4851
impl RoomEvents {
4952
/// Build a new [`RoomEvents`] struct with zero events.
5053
pub fn new() -> Self {
51-
Self::with_initial_linked_chunk(None)
54+
Self::with_initial_linked_chunk(None, None)
5255
}
5356

5457
/// Build a new [`RoomEvents`] struct with prior chunks knowledge.
5558
///
5659
/// The provided [`LinkedChunk`] must have been built with update history.
5760
pub fn with_initial_linked_chunk(
5861
linked_chunk: Option<LinkedChunk<DEFAULT_CHUNK_CAPACITY, Event, Gap>>,
62+
fully_loaded_linked_chunk: Option<LinkedChunk<DEFAULT_CHUNK_CAPACITY, Event, Gap>>,
5963
) -> Self {
6064
let mut linked_chunk = linked_chunk.unwrap_or_else(LinkedChunk::new_with_update_history);
6165

6266
let chunks_updates_as_vectordiffs = linked_chunk
6367
.as_vector()
64-
// SAFETY: The `LinkedChunk` has been built with `new_with_update_history`, so
65-
// `as_vector` must return `Some(…)`.
6668
.expect("`LinkedChunk` must have been built with `new_with_update_history`");
6769

68-
Self { chunks: linked_chunk, chunks_updates_as_vectordiffs }
70+
let order_tracker = linked_chunk
71+
.order_tracker(fully_loaded_linked_chunk.as_ref().map(|lc| lc.chunks()))
72+
.expect("`LinkedChunk` must have been built with `new_with_update_history`");
73+
74+
Self { chunks: linked_chunk, chunks_updates_as_vectordiffs, order_tracker }
6975
}
7076

7177
/// Clear all events.
@@ -76,6 +82,20 @@ impl RoomEvents {
7682
self.chunks.clear();
7783
}
7884

85+
fn inhibit_updates_to_ordering_tracker<F: FnOnce(&mut Self) -> R, R>(&mut self, f: F) -> R {
86+
// Start by flushing previous pending updates to the chunk ordering, if any.
87+
self.order_tracker.flush_updates(false);
88+
89+
// Call the function.
90+
let r = f(self);
91+
92+
// Now, flush other pending updates which have been caused by the function, and
93+
// ignore them.
94+
self.order_tracker.flush_updates(true);
95+
96+
r
97+
}
98+
7999
/// Replace the events with the given last chunk of events and generator.
80100
///
81101
/// This clears all the chunks in memory before resetting to the new chunk,
@@ -85,7 +105,11 @@ impl RoomEvents {
85105
last_chunk: Option<RawChunk<Event, Gap>>,
86106
chunk_identifier_generator: ChunkIdentifierGenerator,
87107
) -> Result<(), LazyLoaderError> {
88-
lazy_loader::replace_with(&mut self.chunks, last_chunk, chunk_identifier_generator)
108+
// Since `replace_with` is used only to unload some chunks, we don't want it to
109+
// affect the chunk ordering.
110+
self.inhibit_updates_to_ordering_tracker(move |this| {
111+
lazy_loader::replace_with(&mut this.chunks, last_chunk, chunk_identifier_generator)
112+
})
89113
}
90114

91115
/// Push events after all events or gaps.
@@ -231,6 +255,35 @@ impl RoomEvents {
231255
self.chunks.items()
232256
}
233257

258+
/// Return the order of an event in the room (main) linked chunk.
259+
///
260+
/// Can return `None` if the event can't be found in the linked chunk.
261+
pub fn event_order(&self, event_pos: Position) -> Option<usize> {
262+
self.order_tracker.ordering(event_pos)
263+
}
264+
265+
#[cfg(any(test, debug_assertions))]
266+
fn assert_event_ordering(&self) {
267+
let mut iter = self.chunks.items().enumerate();
268+
let Some((i, (first_event_pos, _))) = iter.next() else {
269+
return;
270+
};
271+
272+
// Sanity check.
273+
assert_eq!(i, 0);
274+
275+
// That's the offset in the full linked chunk. Will be 0 if the linked chunk is
276+
// entirely loaded, may be non-zero otherwise.
277+
let offset =
278+
self.event_order(first_event_pos).expect("first event's ordering must be known");
279+
280+
for (i, (next_pos, _)) in iter {
281+
let next_index =
282+
self.event_order(next_pos).expect("next event's ordering must be known");
283+
assert_eq!(offset + i, next_index, "event ordering must be continuous");
284+
}
285+
}
286+
234287
/// Get all updates from the room events as [`VectorDiff`].
235288
///
236289
/// Be careful that each `VectorDiff` is returned only once!
@@ -239,7 +292,17 @@ impl RoomEvents {
239292
///
240293
/// [`Update`]: matrix_sdk_base::linked_chunk::Update
241294
pub fn updates_as_vector_diffs(&mut self) -> Vec<VectorDiff<Event>> {
242-
self.chunks_updates_as_vectordiffs.take()
295+
let updates = self.chunks_updates_as_vectordiffs.take();
296+
297+
self.order_tracker.flush_updates(false);
298+
299+
if cfg!(any(test, debug_assertions)) {
300+
// Assert that the orderings are fully correct for all the events present in the
301+
// in-memory linked chunk.
302+
self.assert_event_ordering();
303+
}
304+
305+
updates
243306
}
244307

245308
/// Get a mutable reference to the [`LinkedChunk`] updates, aka
@@ -258,8 +321,9 @@ impl RoomEvents {
258321
pub fn debug_string(&self) -> Vec<String> {
259322
let mut result = Vec::new();
260323

261-
for chunk in self.chunks() {
262-
let content = chunk_debug_string(chunk.content());
324+
for chunk in self.chunks.chunks() {
325+
let content =
326+
chunk_debug_string(chunk.identifier(), chunk.content(), &self.order_tracker);
263327
let lazy_previous = if let Some(cid) = chunk.lazy_previous() {
264328
format!(" (lazy previous = {})", cid.index())
265329
} else {
@@ -289,24 +353,40 @@ impl RoomEvents {
289353
&mut self,
290354
raw_new_first_chunk: RawChunk<Event, Gap>,
291355
) -> Result<(), LazyLoaderError> {
292-
lazy_loader::insert_new_first_chunk(&mut self.chunks, raw_new_first_chunk)
356+
// This is only used when reinserting a chunk that was in persisted storage, so
357+
// we don't need to touch the chunk ordering for this.
358+
self.inhibit_updates_to_ordering_tracker(move |this| {
359+
lazy_loader::insert_new_first_chunk(&mut this.chunks, raw_new_first_chunk)
360+
})
293361
}
294362
}
295363

296364
/// Create a debug string for a [`ChunkContent`] for an event/gap pair.
297-
fn chunk_debug_string(content: &ChunkContent<Event, Gap>) -> String {
365+
fn chunk_debug_string(
366+
chunk_id: ChunkIdentifier,
367+
content: &ChunkContent<Event, Gap>,
368+
order_tracker: &OrderTracker<Event, Gap>,
369+
) -> String {
298370
match content {
299371
ChunkContent::Gap(Gap { prev_token }) => {
300372
format!("gap['{prev_token}']")
301373
}
302374
ChunkContent::Items(vec) => {
303375
let items = vec
304376
.iter()
305-
.map(|event| {
306-
// Limit event ids to 8 chars *after* the $.
377+
.enumerate()
378+
.map(|(i, event)| {
307379
event.event_id().map_or_else(
308380
|| "<no event id>".to_owned(),
309-
|id| id.as_str().chars().take(1 + 8).collect(),
381+
|id| {
382+
let pos = Position::new(chunk_id, i);
383+
let order = format!("#{}: ", order_tracker.ordering(pos).unwrap());
384+
385+
// Limit event ids to 8 chars *after* the $.
386+
let event_id = id.as_str().chars().take(1 + 8).collect::<String>();
387+
388+
format!("{order}{event_id}")
389+
},
310390
)
311391
})
312392
.collect::<Vec<_>>()
@@ -719,10 +799,13 @@ mod tests {
719799
]);
720800
room_events.push_gap(Gap { prev_token: "raclette".to_owned() });
721801

802+
// Flush updates to the order tracker.
803+
let _ = room_events.updates_as_vector_diffs();
804+
722805
let output = room_events.debug_string();
723806

724807
assert_eq!(output.len(), 2);
725-
assert_eq!(&output[0], "chunk #0: events[$12345678, $2]");
808+
assert_eq!(&output[0], "chunk #0: events[#0: $12345678, #1: $2]");
726809
assert_eq!(&output[1], "chunk #1: gap['raclette']");
727810
}
728811

0 commit comments

Comments
 (0)