Skip to content

Support for Streams from query objects #88

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 24 commits into from
Oct 13, 2020
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
3ed735f
callbacks from C work
Buggaboo Dec 25, 2019
8a6bc25
clean up
Buggaboo Dec 25, 2019
502edad
Merge branch 'dev' into feature/future-stream
Buggaboo Mar 30, 2020
a5d177e
fixed tests, now using futures
Buggaboo May 17, 2020
372de28
Callbacks are pretty useless if we don't know which
Buggaboo May 22, 2020
f19c4e2
Merge branch 'dev' into feature/future-stream
Buggaboo Jul 16, 2020
bd6f3ee
Some Stream tests won't work without an 'await', ffi blows up
Buggaboo Jul 17, 2020
70a038c
improve sub and unsub behavior
Buggaboo Jul 17, 2020
b3e5fda
update readme for streams
Buggaboo Jul 17, 2020
113fb55
removed double quotes, also unnecessary "this"
Buggaboo Jul 30, 2020
3a6184d
removed superfluous extraneous redundant parameterized typedefs
Buggaboo Sep 24, 2020
0c79ef0
Merge branch 'main' into feature/future-stream
Buggaboo Sep 24, 2020
e00bdfd
double quotes to single
Buggaboo Sep 26, 2020
2a4c6f8
specified the types properly for the compiler on obj creation, instea…
Buggaboo Sep 26, 2020
80b1256
Code duplicated for the sake of shutting up a lint false positive:
Buggaboo Sep 26, 2020
6aacbf9
fix in case single observer for multiple stores
Buggaboo Sep 28, 2020
5d69ebf
applied stream to demo app and flattened the code a bit
Buggaboo Sep 28, 2020
3c9698f
the sync tests just required more Future.delayed(... 0))
Buggaboo Sep 28, 2020
b3483ed
forgot to close the query
Buggaboo Sep 28, 2020
63600bb
package private fields
Buggaboo Sep 29, 2020
e62ad2a
replaced listener with StreamBuilder, extracted database plumbing to …
Buggaboo Sep 29, 2020
cd4a391
oopsie
Buggaboo Sep 29, 2020
298169e
changed comments to just declaring something explodes
Buggaboo Oct 5, 2020
c32df43
Merge branch 'main' into feature/future-stream
greenrobot-team Oct 13, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,37 @@ scoreQuery.close();
query.close();
```

### Streams

Streams can be created from queries.
The streams can be extended with [rxdart](https://github.com/ReactiveX/rxdart);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason why rxdart? Are there other alternatives you've considered?

Copy link
Contributor Author

@Buggaboo Buggaboo Jul 17, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to be the most popular one, and it supports Stream which is more idiomatic, by way of extensions. I expose a stream property from an observable query for that reason.


```dart
import "package:objectbox/src/observable.dart";

// final store = ...
final query = box.query(condition).build();
final queryStream = query.stream;
final sub1 = queryStream.listen((query) {
print(query.count());
});

// box.put ...

sub1.cancel();

final stream = query.findStream(limit:5);
final sub2 = stream.listen((list) {
// ...
});

// clean up
sub2.cancel();
store.unsubscribe();

store.close();
```

Help wanted
-----------
ObjectBox for Dart is still in an early stage with limited feature set (compared to other languages).
Expand Down
2 changes: 1 addition & 1 deletion lib/objectbox.dart
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@ export "src/model.dart";
export "src/store.dart";
export "src/box.dart";
export "src/modelinfo/index.dart";
export "src/query/query.dart";
export "src/query/query.dart";
10 changes: 10 additions & 0 deletions lib/src/bindings/bindings.dart
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,11 @@ class _ObjectBoxBindings {

obx_query_visit_dart_t obx_query_visit;

// Observers
obx_observe_t obx_observe;
obx_observe_single_type_t<int> obx_observe_single_type;
obx_observer_close_t<void> obx_observer_close;

// query property
obx_query_prop_t<int> obx_query_prop;
obx_query_prop_close_t<int> obx_query_prop_close;
Expand Down Expand Up @@ -320,6 +325,11 @@ class _ObjectBoxBindings {

obx_query_visit = _fn<obx_query_visit_native_t>("obx_query_visit").asFunction();

// observers
obx_observe = _fn<obx_observe_t>("obx_observe").asFunction();
obx_observe_single_type = _fn<obx_observe_single_type_t<Uint32>>("obx_observe_single_type").asFunction();
obx_observer_close = _fn<obx_observer_close_t<Void>>("obx_observer_close").asFunction();

// query property
obx_query_prop = _fn<obx_query_prop_t<Uint32>>('obx_query_prop').asFunction();
obx_query_prop_close = _fn<obx_query_prop_close_t<Int32>>('obx_query_prop_close').asFunction();
Expand Down
8 changes: 8 additions & 0 deletions lib/src/bindings/signatures.dart
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,14 @@ typedef obx_query_visit_native_t = Int32 Function(Pointer<Void> query,
typedef obx_query_visit_dart_t = int Function(Pointer<Void> query,
Pointer<NativeFunction<obx_data_visitor_native_t>> visitor, Pointer<Void> user_data, int offset, int limit);

// observers

typedef obx_observer_t<U extends NativeType, T extends NativeType> = U Function(Pointer<Void> user_data, Pointer<Uint32> entity_id, T type_ids_count);
typedef obx_observer_single_type_t<U extends NativeType> = U Function(Pointer<Void> user_data);
typedef obx_observe_t = Pointer<Void> Function(Pointer<Void> store, Pointer<NativeFunction<obx_observer_t<Void, Uint32>>> callback, Pointer<Void> user_data);
typedef obx_observe_single_type_t<T> = Pointer<Void> Function(Pointer<Void> store, T entity_id, Pointer<NativeFunction<obx_observer_single_type_t<Void>>> callback, Pointer<Void> user_data);
typedef obx_observer_close_t<U extends NativeType> = U Function(Pointer<Void> observer);

// query property

typedef obx_query_prop_t<T> = Pointer<Void> Function(Pointer<Void> query, T propertyId); // Uint32 -> int
Expand Down
76 changes: 76 additions & 0 deletions lib/src/observable.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import 'dart:async';
import "dart:ffi";
import "bindings/bindings.dart";
import "bindings/signatures.dart";

import "store.dart";
import "query/query.dart";

// ignore_for_file: non_constant_identifier_names

// dart callback signature
typedef Any = void Function(Pointer<Void>, Pointer<Uint32>, int);

class Observable {

static final anyObserver = <int, Pointer<Void>>{};
static final any = <int, Any>{}; // radix? > tree?

// sync:true -> ObjectBoxException: 10001 TX is not active anymore: #101
static final controller = StreamController<int>.broadcast();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that comment on an issue that needs looking at?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that requires some attention

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does the reference of #101 mean here?

Copy link
Contributor Author

@Buggaboo Buggaboo Sep 28, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was generated here, see nativeCode.
GitHub issue number was triggered.


static void _anyCallback(Pointer<Void> user_data, Pointer<Uint32> mutated_ids, int mutated_count) {
for(var i=0; i<mutated_count; i++) {
// call schema's callback
if (any.containsKey(mutated_ids[i])) {
any[mutated_ids[i]](user_data, mutated_ids, mutated_count);
}
}
}

static subscribe(Store store) {
final callback = Pointer.fromFunction<obx_observer_t<Void, Uint32>>(_anyCallback);
anyObserver[store.ptr.address] = bindings.obx_observe(store.ptr, callback, store.ptr);
}

// #53 ffi:Pointer finalizer
static unsubscribe(Store store) {
if (!anyObserver.containsKey(store.ptr.address)) {
return;
}
bindings.obx_observer_close(anyObserver[store.ptr.address]);
anyObserver.remove(store.ptr.address);
}
}

extension ObservableStore on Store {
subscribe () { Observable.subscribe(this); }
unsubscribe () { Observable.unsubscribe(this); }
}

extension Streamable<T> on Query<T> {
_setup() {
if (!Observable.anyObserver.containsKey(this.store.ptr)) {
this.store.subscribe();
}

// Assume consensus on entityId over all available Stores
Observable.any[this.entityId] ??= (u, _, __) {
// dummy value to trigger an event
Observable.controller.add(u.address);
};
}

Stream<List<T>> findStream({int offset = 0, int limit = 0}) {
_setup();
return Observable.controller.stream
.map((_) => this.find(offset:offset, limit:limit));
}

/// Use this for Query Property
Stream<Query<T>> get stream {
_setup();
return Observable.controller.stream
.map((_) => this);
}
}
2 changes: 1 addition & 1 deletion lib/src/query/builder.dart
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class QueryBuilder<T> {
}

try {
return Query<T>._(_store, _fbManager, _cBuilder);
return Query<T>._(_store, _fbManager, _cBuilder, _entityId);
} finally {
checkObx(bindings.obx_qb_close(_cBuilder));
}
Expand Down
7 changes: 4 additions & 3 deletions lib/src/query/query.dart
Original file line number Diff line number Diff line change
Expand Up @@ -533,11 +533,12 @@ class ConditionGroupAll extends ConditionGroup {

class Query<T> {
Pointer<Void> _cQuery;
Store _store;
Store store;
OBXFlatbuffersManager _fbManager;
int entityId;

// package private ctor
Query._(this._store, this._fbManager, Pointer<Void> cBuilder) {
Query._(this.store, this._fbManager, Pointer<Void> cBuilder, this.entityId) {
_cQuery = checkObxPtr(bindings.obx_query_create(cBuilder), "create query");
}

Expand Down Expand Up @@ -572,7 +573,7 @@ class Query<T> {
}

List<T> find({int offset = 0, int limit = 0}) {
return _store.runInTransaction(TxMode.Read, () {
return store.runInTransaction(TxMode.Read, () {
if (bindings.obx_supports_bytes_array() == 1) {
final bytesArray = checkObxPtr(bindings.obx_query_find(_cQuery, offset, limit), "find");
try {
Expand Down
156 changes: 156 additions & 0 deletions test/observer_test.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
import "package:test/test.dart";
import "package:objectbox/src/bindings/bindings.dart";
import "package:objectbox/src/bindings/signatures.dart";
import "entity.dart";
import "entity2.dart";
import 'test_env.dart';
import 'objectbox.g.dart';
import "dart:ffi";

// ignore_for_file: non_constant_identifier_names

/// Pointer.fromAddress(0) does not fire at all
Pointer<Void> randomPtr = Pointer.fromAddress(1337);

var callbackSingleTypeCounter = 0;
void callbackSingleType(Pointer<Void> user_data) {
expect(user_data.address, randomPtr.address);
callbackSingleTypeCounter++;
}

var callbackAnyTypeCounter = 0;
void callbackAnyType(Pointer<Void> user_data, Pointer<Uint32> mutated_ids, int mutated_count) {
expect(user_data.address, randomPtr.address);
callbackAnyTypeCounter++;
}

// dart callback signatures
typedef Single = void Function(Pointer<Void>);
typedef Any = void Function(Pointer<Void>, Pointer<Uint32>, int);

class Observable {
static Pointer<Void> singleObserver, anyObserver;

static Single single;
static Any any;

Store store;

Observable.fromStore(this.store);

static void _anyCallback(Pointer<Void> user_data, Pointer<Uint32> mutated_ids, int mutated_count) {
any(user_data, mutated_ids, mutated_count);
}

static void _singleCallback(Pointer<Void> user_data) {
single(user_data);
}

void observeSingleType(int entityId, Single fn, Pointer<Void> identifier) {
single = fn;
final callback = Pointer.fromFunction<obx_observer_single_type_t<Void>>(_singleCallback);
singleObserver = bindings.obx_observe_single_type(store.ptr, entityId, callback, identifier);
}

void observe(Any fn, Pointer<Void> identifier) {
any = fn;
final callback = Pointer.fromFunction<obx_observer_t<Void, Uint32>>(_anyCallback);
anyObserver = bindings.obx_observe(store.ptr, callback, identifier);
}
}

void main() async {
TestEnv env;
Box box;
Store store;

final testEntityId = getObjectBoxModel().model.findEntityByName("TestEntity").id.id;

final List<TestEntity> simpleStringItems = <String>["One", "Two", "Three", "Four", "Five", "Six"].map((s) =>
TestEntity(tString: s)).toList();

final List<TestEntity> simpleNumberItems = [1,2,3,4,5,6].map((s) =>
TestEntity(tInt: s)).toList();

setUp(() {
env = TestEnv("observers");
box = env.box;
store = env.store;
});

/// Non static function can't be used for ffi, but you can call a dynamic function
/// aka closure inside a static function
// void callbackAnyTypeNonStatic(Pointer<Void> user_data, Pointer<Uint32> mutated_ids, int mutated_count) {
// expect(user_data.address, 0);
// expect(mutated_count, 1);
// }

test("Observe any entity with class member callback", () async {
final o = Observable.fromStore(store);
var putCount = 0;
o.observe((Pointer<Void> user_data, Pointer<Uint32> mutated_ids, int mutated_count) {
expect(user_data.address, randomPtr.address);
putCount++;
}, randomPtr);

box.putMany(simpleStringItems);
simpleStringItems.forEach((i) => box.put(i));
simpleNumberItems.forEach((i) => box.put(i));

bindings.obx_observer_close(Observable.anyObserver);
expect(putCount, 13);
});

test("Observe a single entity with class member callback", () async {
final o = Observable.fromStore(store);
var putCount = 0;
o.observeSingleType(testEntityId, (Pointer<Void> user_data) {
putCount++;
}, randomPtr);

box.putMany(simpleStringItems);
simpleStringItems.forEach((i) => box.put(i));
simpleNumberItems.forEach((i) => box.put(i));

bindings.obx_observer_close(Observable.singleObserver);
expect(putCount, 13);
});

test("Observe any entity with static callback", () async {
final callback = Pointer.fromFunction<obx_observer_t<Void, Uint32>>(callbackAnyType);
final observer = bindings.obx_observe(store.ptr, callback, Pointer.fromAddress(1337));

box.putMany(simpleStringItems);

box.remove(1);

// update value
final entity2 = box.get(2);
entity2.tString = "Dva";
box.put(entity2);

final box2 = Box<TestEntity2>(store);
box2.put(TestEntity2());
box2.remove(1);
box2.put(TestEntity2());

expect(callbackAnyTypeCounter, 6);
bindings.obx_observer_close(observer);
});

test("Observe single entity", () async {
final callback = Pointer.fromFunction<obx_observer_single_type_t<Void>>(callbackSingleType);
final observer = bindings.obx_observe_single_type(store.ptr, testEntityId, callback, randomPtr);

box.putMany(simpleStringItems);
simpleStringItems.forEach((i) => box.put(i));
simpleNumberItems.forEach((i) => box.put(i));

expect(callbackSingleTypeCounter, 13);
bindings.obx_observer_close(observer);
});

tearDown(() {
env.close();
});
}
Loading