Skip to content

feat(graphql): add asyncDeepEquals to the graphqlClient #1496

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions packages/graphql/lib/src/core/observable_query.dart
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class ObservableQuery<TParsed> {
/// same as [_onDataCallbacks], but not removed after invocation
Set<OnData<TParsed>> _notRemovableOnDataCallbacks = Set();

/// call [queryManager.maybeRebroadcastQueries] after all other [_onDataCallbacks]
/// call [queryManager.maybeRebroadcastQueriesAsync] after all other [_onDataCallbacks]
///
/// Automatically appended as an [OnData]
FutureOr<void> _maybeRebroadcast(QueryResult? result) {
Expand All @@ -101,10 +101,10 @@ class ObservableQuery<TParsed> {
// data. It's valid GQL to have data _and_ exception. If options.carryForwardDataOnException
// are true, this condition may never get hit.
// If there are onDataCallbacks, it's possible they modify cache and are
// depending on maybeRebroadcastQueries being called.
// depending on maybeRebroadcastQueriesAsync being called.
return false;
}
return queryManager.maybeRebroadcastQueries(exclude: this);
return queryManager.maybeRebroadcastQueriesAsync(exclude: this);
}

/// The most recently seen result from this operation's stream
Expand Down
67 changes: 45 additions & 22 deletions packages/graphql/lib/src/core/query_manager.dart
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,29 @@ import 'package:graphql/src/scheduler/scheduler.dart';
import 'package:graphql/src/core/_query_write_handling.dart';

typedef DeepEqualsFn = bool Function(dynamic a, dynamic b);
typedef AsyncDeepEqualsFn = Future<bool> Function(dynamic a, dynamic b);

/// The equality function used for comparing cached and new data.
/// The equality function used for comparing cached and new data
/// in synchronous contexts like operator overrides or custom logic.
///
/// You can alternatively provide [optimizedDeepEquals] for a faster
/// equality check. Or provide your own via [GqlClient] constructor.
/// equality check, or provide your own via the [GqlClient] constructor.
DeepEqualsFn gqlDeepEquals = const DeepCollectionEquality().equals;

/// The async equality function used for comparing cached and new data
/// during asynchronous operations like rebroadcast checks.
///
/// Can be provided via the constructor for custom or isolate-based comparison logic.
AsyncDeepEqualsFn gqlAsyncDeepEquals =
(dynamic a, dynamic b) async => gqlDeepEquals(a, b);

class QueryManager {
QueryManager({
required this.link,
required this.cache,
this.alwaysRebroadcast = false,
DeepEqualsFn? deepEquals,
AsyncDeepEqualsFn? asyncDeepEquals,
bool deduplicatePollers = false,
this.requestTimeout = const Duration(seconds: 5),
}) {
Expand All @@ -44,12 +54,15 @@ class QueryManager {
if (deepEquals != null) {
gqlDeepEquals = deepEquals;
}
if (asyncDeepEquals != null) {
gqlAsyncDeepEquals = asyncDeepEquals;
}
}

final Link link;
final GraphQLCache cache;

/// Whether to skip deep equality checks in [maybeRebroadcastQueries]
/// Whether to skip deep equality checks in [maybeRebroadcastQueriesAsync]
final bool alwaysRebroadcast;

/// The timeout for resolving a query
Expand Down Expand Up @@ -154,7 +167,7 @@ class QueryManager {
)),
))
.map((QueryResult<TParsed> queryResult) {
maybeRebroadcastQueries();
maybeRebroadcastQueriesAsync();
return queryResult;
});
} catch (ex, trace) {
Expand All @@ -170,19 +183,19 @@ class QueryManager {

Future<QueryResult<TParsed>> query<TParsed>(
QueryOptions<TParsed> options) async {
final results = fetchQueryAsMultiSourceResult(_oneOffOpId, options);
final results = await fetchQueryAsMultiSourceResult(_oneOffOpId, options);
final eagerResult = results.eagerResult;
final networkResult = results.networkResult;
if (options.fetchPolicy != FetchPolicy.cacheAndNetwork ||
eagerResult.isLoading) {
final result = networkResult ?? eagerResult;
await result;
maybeRebroadcastQueries();
maybeRebroadcastQueriesAsync();
return result;
}
maybeRebroadcastQueries();
maybeRebroadcastQueriesAsync();
if (networkResult is Future<QueryResult<TParsed>>) {
networkResult.then((value) => maybeRebroadcastQueries());
networkResult.then((value) => maybeRebroadcastQueriesAsync());
}
return eagerResult;
}
Expand All @@ -205,7 +218,7 @@ class QueryManager {
}

/// wait until callbacks complete to rebroadcast
maybeRebroadcastQueries();
maybeRebroadcastQueriesAsync();

return result;
}
Expand Down Expand Up @@ -423,11 +436,12 @@ class QueryManager {
@experimental
Future<List<QueryResult<Object?>?>> refetchSafeQueries() async {
rebroadcastLocked = true;
final queriesSnapshot = List.of(queries.values);
final results = await Future.wait(
queries.values.where((q) => q.isRefetchSafe).map((q) => q.refetch()),
queriesSnapshot.where((q) => q.isRefetchSafe).map((q) => q.refetch()),
);
rebroadcastLocked = false;
maybeRebroadcastQueries();
maybeRebroadcastQueriesAsync();
return results;
}

Expand All @@ -444,7 +458,7 @@ class QueryManager {

/// Add a result to the [ObservableQuery] specified by `queryId`, if it exists.
///
/// Will [maybeRebroadcastQueries] from [ObservableQuery.addResult] if the [cache] has flagged the need to.
/// Will [maybeRebroadcastQueriesAsync] from [ObservableQuery.addResult] if the [cache] has flagged the need to.
///
/// Queries are registered via [setQuery] and [watchQuery]
void addQueryResult<TParsed>(
Expand Down Expand Up @@ -504,10 +518,10 @@ class QueryManager {
/// **Note on internal implementation details**:
/// There is sometimes confusion on when this is called, but rebroadcasts are requested
/// from every [addQueryResult] where `result.isNotLoading` as an [OnData] callback from [ObservableQuery].
bool maybeRebroadcastQueries({
Future<bool> maybeRebroadcastQueriesAsync({
ObservableQuery<Object?>? exclude,
bool force = false,
}) {
}) async {
if (rebroadcastLocked && !force) {
return false;
}
Expand All @@ -522,7 +536,11 @@ class QueryManager {
// to [readQuery] for it once.
final Map<Request, QueryResult<Object?>> diffQueryResultCache = {};
final Map<Request, bool> ignoreQueryResults = {};
for (final query in queries.values) {

final List<ObservableQuery<Object?>> queriesSnapshot =
List.of(queries.values);

for (final query in queriesSnapshot) {
final Request request = query.options.asRequest;
final cachedQueryResult = diffQueryResultCache[request];
if (query == exclude || !query.isRebroadcastSafe) {
Expand All @@ -545,7 +563,7 @@ class QueryManager {
query.options.asRequest,
optimistic: query.options.policies.mergeOptimisticData,
);
if (_cachedDataHasChangedFor(query, cachedData)) {
if (await _cachedDataHasChangedForAsync(query, cachedData)) {
// The data has changed
final queryResult = QueryResult(
data: cachedData,
Expand All @@ -567,14 +585,19 @@ class QueryManager {
return true;
}

bool _cachedDataHasChangedFor(
Future<bool> _cachedDataHasChangedForAsync(
ObservableQuery<Object?> query,
Map<String, dynamic>? cachedData,
) =>
cachedData != null &&
query.latestResult != null &&
(alwaysRebroadcast ||
!gqlDeepEquals(query.latestResult!.data, cachedData));
) async {
if (cachedData == null || query.latestResult == null) return false;
if (alwaysRebroadcast) return true;

final isEqual = await gqlAsyncDeepEquals(
query.latestResult!.data,
cachedData,
);
return !isEqual;
}

void setQuery(ObservableQuery<Object?> observableQuery) {
queries[observableQuery.queryId] = observableQuery;
Expand Down
8 changes: 6 additions & 2 deletions packages/graphql/lib/src/graphql_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class GraphQLClient implements GraphQLDataProxy {
DefaultPolicies? defaultPolicies,
bool alwaysRebroadcast = false,
DeepEqualsFn? deepEquals,
AsyncDeepEqualsFn? asyncDeepEquals,
bool deduplicatePollers = false,
Duration? queryRequestTimeout = const Duration(seconds: 5),
}) : defaultPolicies = defaultPolicies ?? DefaultPolicies(),
Expand All @@ -35,6 +36,7 @@ class GraphQLClient implements GraphQLDataProxy {
cache: cache,
alwaysRebroadcast: alwaysRebroadcast,
deepEquals: deepEquals,
asyncDeepEquals: asyncDeepEquals,
deduplicatePollers: deduplicatePollers,
requestTimeout: queryRequestTimeout,
);
Expand All @@ -57,6 +59,7 @@ class GraphQLClient implements GraphQLDataProxy {
DefaultPolicies? defaultPolicies,
bool? alwaysRebroadcast,
DeepEqualsFn? deepEquals,
AsyncDeepEqualsFn? asyncDeepEquals,
bool deduplicatePollers = false,
Duration? queryRequestTimeout,
}) {
Expand All @@ -66,6 +69,7 @@ class GraphQLClient implements GraphQLDataProxy {
defaultPolicies: defaultPolicies ?? this.defaultPolicies,
alwaysRebroadcast: alwaysRebroadcast ?? queryManager.alwaysRebroadcast,
deepEquals: deepEquals,
asyncDeepEquals: asyncDeepEquals,
deduplicatePollers: deduplicatePollers,
queryRequestTimeout: queryRequestTimeout ?? queryManager.requestTimeout,
);
Expand Down Expand Up @@ -269,7 +273,7 @@ class GraphQLClient implements GraphQLDataProxy {
/// pass through to [cache.writeQuery] and then rebroadcast any changes.
void writeQuery(request, {required data, broadcast = true}) {
cache.writeQuery(request, data: data, broadcast: broadcast);
queryManager.maybeRebroadcastQueries();
queryManager.maybeRebroadcastQueriesAsync().ignore();
}

/// pass through to [cache.writeFragment] and then rebroadcast any changes.
Expand All @@ -283,7 +287,7 @@ class GraphQLClient implements GraphQLDataProxy {
broadcast: broadcast,
data: data,
);
queryManager.maybeRebroadcastQueries();
queryManager.maybeRebroadcastQueriesAsync();
}

/// Resets the contents of the store with [cache.store.reset()]
Expand Down