Skip to content

Pass cycle heads as out parameter for maybe_changed_after #852

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/accumulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::panic::UnwindSafe;

use accumulated::{Accumulated, AnyAccumulated};

use crate::cycle::CycleHeads;
use crate::function::VerifyResult;
use crate::ingredient::{Ingredient, Jar};
use crate::loom::sync::Arc;
Expand Down Expand Up @@ -105,7 +106,7 @@ impl<A: Accumulator> Ingredient for IngredientImpl<A> {
_db: &dyn Database,
_input: Id,
_revision: Revision,
_in_cycle: bool,
_cycle_heads: &mut CycleHeads,
) -> VerifyResult {
panic!("nothing should ever depend on an accumulator directly")
}
Expand Down
16 changes: 16 additions & 0 deletions src/cycle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,22 @@ impl CycleHeads {
}
}

#[inline]
pub(crate) fn push_initial(&mut self, database_key_index: DatabaseKeyIndex) {
if let Some(existing) = self
.0
.iter()
.find(|candidate| candidate.database_key_index == database_key_index)
{
assert_eq!(existing.iteration_count, 0);
} else {
self.0.push(CycleHead {
database_key_index,
iteration_count: 0,
});
}
}

#[inline]
pub(crate) fn extend(&mut self, other: &Self) {
self.0.reserve(other.0.len());
Expand Down
6 changes: 3 additions & 3 deletions src/function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::ptr::NonNull;
pub(crate) use maybe_changed_after::VerifyResult;

use crate::accumulator::accumulated_map::{AccumulatedMap, InputAccumulatedValues};
use crate::cycle::{CycleHeadKind, CycleRecoveryAction, CycleRecoveryStrategy};
use crate::cycle::{CycleHeadKind, CycleHeads, CycleRecoveryAction, CycleRecoveryStrategy};
use crate::function::delete::DeletedEntries;
use crate::function::sync::{ClaimResult, SyncTable};
use crate::ingredient::Ingredient;
Expand Down Expand Up @@ -233,11 +233,11 @@ where
db: &dyn Database,
input: Id,
revision: Revision,
in_cycle: bool,
cycle_heads: &mut CycleHeads,
) -> VerifyResult {
// SAFETY: The `db` belongs to the ingredient as per caller invariant
let db = unsafe { self.view_caster.downcast_unchecked(db) };
self.maybe_changed_after(db, input, revision, in_cycle)
self.maybe_changed_after(db, input, revision, cycle_heads)
}

/// True if the input `input` contains a memo that cites itself as a cycle head.
Expand Down
11 changes: 8 additions & 3 deletions src/function/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,9 +176,14 @@ where
let opt_old_memo = self.get_memo_from_table_for(zalsa, id, memo_ingredient_index);
if let Some(old_memo) = opt_old_memo {
if old_memo.value.is_some() {
if let VerifyResult::Unchanged(_, cycle_heads) =
self.deep_verify_memo(db, zalsa, old_memo, self.database_key_index(id))
{
let mut cycle_heads = CycleHeads::default();
if let VerifyResult::Unchanged(_) = self.deep_verify_memo(
db,
zalsa,
old_memo,
self.database_key_index(id),
&mut cycle_heads,
) {
if cycle_heads.is_empty() {
// SAFETY: memo is present in memo_map and we have verified that it is
// still valid for the current revision.
Expand Down
134 changes: 48 additions & 86 deletions src/function/maybe_changed_after.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,53 +13,33 @@ use crate::{AsDynDatabase as _, Id, Revision};
/// Result of memo validation.
pub enum VerifyResult {
/// Memo has changed and needs to be recomputed.
///
/// The cycle heads encountered when validating the memo.
Changed(CycleHeads),
Changed,

/// Memo remains valid.
///
/// The first inner value tracks whether the memo or any of its dependencies have an
/// The inner value tracks whether the memo or any of its dependencies have an
/// accumulated value.
///
/// The second is the cycle heads encountered in validation; don't mark
/// memos verified until we've iterated the full cycle to ensure no inputs changed.
Unchanged(InputAccumulatedValues, CycleHeads),
/// Don't mark memos verified until we've iterated the full cycle to ensure no inputs changed
/// when encountering this variant.
Unchanged(InputAccumulatedValues),
}

impl VerifyResult {
pub(crate) fn changed_if(changed: bool) -> Self {
if changed {
Self::changed()
Self::Changed
} else {
Self::unchanged()
}
}

pub(crate) fn changed() -> Self {
Self::Changed(CycleHeads::default())
}

pub(crate) fn unchanged() -> Self {
Self::Unchanged(InputAccumulatedValues::Empty, CycleHeads::default())
}

pub(crate) fn cycle_heads(&self) -> &CycleHeads {
match self {
Self::Changed(cycle_heads) => cycle_heads,
Self::Unchanged(_, cycle_heads) => cycle_heads,
}
}

pub(crate) fn into_cycle_heads(self) -> CycleHeads {
match self {
Self::Changed(cycle_heads) => cycle_heads,
Self::Unchanged(_, cycle_heads) => cycle_heads,
}
Self::Unchanged(InputAccumulatedValues::Empty)
}

pub(crate) const fn is_unchanged(&self) -> bool {
matches!(self, Self::Unchanged(_, _))
matches!(self, Self::Unchanged(_))
}
}

Expand All @@ -72,7 +52,7 @@ where
db: &'db C::DbView,
id: Id,
revision: Revision,
in_cycle: bool,
cycle_heads: &mut CycleHeads,
) -> VerifyResult {
let (zalsa, zalsa_local) = db.zalsas();
let memo_ingredient_index = self.memo_ingredient_index(zalsa, id);
Expand All @@ -87,20 +67,17 @@ where
let memo_guard = self.get_memo_from_table_for(zalsa, id, memo_ingredient_index);
let Some(memo) = memo_guard else {
// No memo? Assume has changed.
return VerifyResult::changed();
return VerifyResult::Changed;
};

let can_shallow_update = self.shallow_verify_memo(zalsa, database_key_index, memo);
if can_shallow_update.yes() && !memo.may_be_provisional() {
self.update_shallow(zalsa, database_key_index, memo, can_shallow_update);

return if memo.revisions.changed_at > revision {
VerifyResult::changed()
VerifyResult::Changed
} else {
VerifyResult::Unchanged(
memo.revisions.accumulated_inputs.load(),
CycleHeads::default(),
)
VerifyResult::Unchanged(memo.revisions.accumulated_inputs.load())
};
}

Expand All @@ -110,7 +87,7 @@ where
id,
revision,
memo_ingredient_index,
in_cycle,
cycle_heads,
) {
return mcs;
} else {
Expand All @@ -127,7 +104,7 @@ where
key_index: Id,
revision: Revision,
memo_ingredient_index: MemoIngredientIndex,
in_cycle: bool,
cycle_heads: &mut CycleHeads,
) -> Option<VerifyResult> {
let database_key_index = self.database_key_index(key_index);

Expand All @@ -148,18 +125,16 @@ where
tracing::debug!(
"hit cycle at {database_key_index:?} in `maybe_changed_after`, returning fixpoint initial value",
);
return Some(VerifyResult::Unchanged(
InputAccumulatedValues::Empty,
CycleHeads::initial(database_key_index),
));
cycle_heads.push_initial(database_key_index);
return Some(VerifyResult::unchanged());
}
},
ClaimResult::Claimed(guard) => guard,
};
// Load the current memo, if any.
let Some(old_memo) = self.get_memo_from_table_for(zalsa, key_index, memo_ingredient_index)
else {
return Some(VerifyResult::changed());
return Some(VerifyResult::Changed);
};

tracing::debug!(
Expand All @@ -169,15 +144,13 @@ where
);

// Check if the inputs are still valid. We can just compare `changed_at`.
let deep_verify = self.deep_verify_memo(db, zalsa, old_memo, database_key_index);
let deep_verify =
self.deep_verify_memo(db, zalsa, old_memo, database_key_index, cycle_heads);
if deep_verify.is_unchanged() {
return Some(if old_memo.revisions.changed_at > revision {
VerifyResult::Changed(deep_verify.into_cycle_heads())
VerifyResult::Changed
} else {
VerifyResult::Unchanged(
old_memo.revisions.accumulated_inputs.load(),
deep_verify.into_cycle_heads(),
)
VerifyResult::Unchanged(old_memo.revisions.accumulated_inputs.load())
});
}

Expand All @@ -191,26 +164,23 @@ where
// the cycle head returned *fixpoint initial* without validating its dependencies.
// `in_cycle` tracks if the enclosing query is in a cycle. `deep_verify.cycle_heads` tracks
// if **this query** encountered a cycle (which means there's some provisional value somewhere floating around).
if old_memo.value.is_some() && !in_cycle && deep_verify.cycle_heads().is_empty() {
if old_memo.value.is_some() && cycle_heads.is_empty() {
let active_query = db.zalsa_local().push_query(database_key_index, 0);
let memo = self.execute(db, active_query, Some(old_memo));
let changed_at = memo.revisions.changed_at;

return Some(if changed_at > revision {
VerifyResult::changed()
VerifyResult::Changed
} else {
VerifyResult::Unchanged(
match &memo.revisions.accumulated {
Some(_) => InputAccumulatedValues::Any,
None => memo.revisions.accumulated_inputs.load(),
},
CycleHeads::default(),
)
VerifyResult::Unchanged(match &memo.revisions.accumulated {
Some(_) => InputAccumulatedValues::Any,
None => memo.revisions.accumulated_inputs.load(),
})
});
}

// Otherwise, nothing for it: have to consider the value to have changed.
Some(VerifyResult::Changed(deep_verify.into_cycle_heads()))
Some(VerifyResult::Changed)
}

/// `Some` if the memo's value and `changed_at` time is still valid in this revision.
Expand Down Expand Up @@ -249,7 +219,7 @@ where
);
if last_changed <= verified_at {
// No input of the suitable durability has changed since last verified.
ShallowUpdate::HigherDurability(revision_now)
ShallowUpdate::HigherDurability
} else {
ShallowUpdate::No
}
Expand All @@ -263,8 +233,8 @@ where
memo: &Memo<C::Output<'_>>,
update: ShallowUpdate,
) {
if let ShallowUpdate::HigherDurability(revision_now) = update {
memo.mark_as_verified(zalsa, revision_now, database_key_index);
if let ShallowUpdate::HigherDurability = update {
memo.mark_as_verified(zalsa, database_key_index);
memo.mark_outputs_as_verified(zalsa, database_key_index);
}
}
Expand Down Expand Up @@ -375,6 +345,7 @@ where
zalsa: &Zalsa,
old_memo: &Memo<C::Output<'_>>,
database_key_index: DatabaseKeyIndex,
cycle_heads: &mut CycleHeads,
) -> VerifyResult {
tracing::debug!(
"{database_key_index:?}: deep_verify_memo(old_memo = {old_memo:#?})",
Expand Down Expand Up @@ -408,30 +379,29 @@ where
// Conditionally specified queries
// where the value is specified
// in rev 1 but not in rev 2.
VerifyResult::changed()
VerifyResult::Changed
}
// Return `Unchanged` similar to the initial value that we insert
// when we hit the cycle. Any dependencies accessed when creating the fixpoint initial
// are tracked by the outer query. Nothing should have changed assuming that the
// fixpoint initial function is deterministic.
QueryOrigin::FixpointInitial => VerifyResult::Unchanged(
InputAccumulatedValues::Empty,
CycleHeads::initial(database_key_index),
),
QueryOrigin::FixpointInitial => {
cycle_heads.push_initial(database_key_index);
VerifyResult::unchanged()
}
QueryOrigin::DerivedUntracked(_) => {
// Untracked inputs? Have to assume that it changed.
VerifyResult::changed()
VerifyResult::Changed
}
QueryOrigin::Derived(edges) => {
let is_provisional = old_memo.may_be_provisional();

// If the value is from the same revision but is still provisional, consider it changed
// because we're now in a new iteration.
if can_shallow_update.yes() && is_provisional {
return VerifyResult::changed();
if can_shallow_update == ShallowUpdate::Verified && is_provisional {
return VerifyResult::Changed;
}

let mut cycle_heads = CycleHeads::default();
'cycle: loop {
// Fully tracked inputs? Iterate over the inputs and check them, one by one.
//
Expand All @@ -449,16 +419,12 @@ where
dyn_db,
zalsa,
last_verified_at,
!cycle_heads.is_empty(),
cycle_heads,
) {
VerifyResult::Changed(heads) => {
// Carry over the heads from the inner query to avoid
// backdating the outer query.
cycle_heads.extend(&heads);
break 'cycle VerifyResult::Changed(cycle_heads);
VerifyResult::Changed => {
break 'cycle VerifyResult::Changed;
}
VerifyResult::Unchanged(input_accumulated, cycles) => {
cycle_heads.extend(&cycles);
VerifyResult::Unchanged(input_accumulated) => {
inputs |= input_accumulated;
}
}
Expand Down Expand Up @@ -514,11 +480,7 @@ where
let in_heads = cycle_heads.remove(&database_key_index);

if cycle_heads.is_empty() {
old_memo.mark_as_verified(
zalsa,
zalsa.current_revision(),
database_key_index,
);
old_memo.mark_as_verified(zalsa, database_key_index);
old_memo.revisions.accumulated_inputs.store(inputs);

if is_provisional {
Expand All @@ -532,7 +494,7 @@ where
continue 'cycle;
}
}
break 'cycle VerifyResult::Unchanged(inputs, cycle_heads);
break 'cycle VerifyResult::Unchanged(inputs);
}
}
}
Expand All @@ -546,7 +508,7 @@ pub(super) enum ShallowUpdate {

/// The revision for the memo's durability hasn't changed. It can be marked as verified
/// in this revision.
HigherDurability(Revision),
HigherDurability,

/// The memo requires a deep verification.
No,
Expand All @@ -556,7 +518,7 @@ impl ShallowUpdate {
pub(super) fn yes(&self) -> bool {
matches!(
self,
ShallowUpdate::Verified | ShallowUpdate::HigherDurability(_)
ShallowUpdate::Verified | ShallowUpdate::HigherDurability
)
}
}
Loading