-
Notifications
You must be signed in to change notification settings - Fork 135
Support for Streams from query objects #88
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 9 commits
3ed735f
8a6bc25
502edad
a5d177e
372de28
f19c4e2
bd6f3ee
70a038c
b3e5fda
113fb55
3a6184d
0c79ef0
e00bdfd
2a4c6f8
80b1256
6aacbf9
5d69ebf
3c9698f
b3483ed
63600bb
e62ad2a
cd4a391
298169e
c32df43
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,76 @@ | ||
import 'dart:async'; | ||
import "dart:ffi"; | ||
import "bindings/bindings.dart"; | ||
import "bindings/signatures.dart"; | ||
|
||
import "store.dart"; | ||
import "query/query.dart"; | ||
|
||
// ignore_for_file: non_constant_identifier_names | ||
|
||
// dart callback signature | ||
typedef Any = void Function(Pointer<Void>, Pointer<Uint32>, int); | ||
|
||
class Observable { | ||
|
||
static final anyObserver = <int, Pointer<Void>>{}; | ||
static final any = <int, Any>{}; // radix? > tree? | ||
|
||
// sync:true -> ObjectBoxException: 10001 TX is not active anymore: #101 | ||
static final controller = StreamController<int>.broadcast(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is that comment on an issue that needs looking at? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, that requires some attention There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What does the reference of #101 mean here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It was generated here, see |
||
|
||
static void _anyCallback(Pointer<Void> user_data, Pointer<Uint32> mutated_ids, int mutated_count) { | ||
for(var i=0; i<mutated_count; i++) { | ||
// call schema's callback | ||
if (any.containsKey(mutated_ids[i])) { | ||
any[mutated_ids[i]](user_data, mutated_ids, mutated_count); | ||
} | ||
} | ||
} | ||
|
||
static subscribe(Store store) { | ||
final callback = Pointer.fromFunction<obx_observer_t<Void, Uint32>>(_anyCallback); | ||
Buggaboo marked this conversation as resolved.
Show resolved
Hide resolved
|
||
anyObserver[store.ptr.address] = bindings.obx_observe(store.ptr, callback, store.ptr); | ||
} | ||
|
||
// #53 ffi:Pointer finalizer | ||
static unsubscribe(Store store) { | ||
if (!anyObserver.containsKey(store.ptr.address)) { | ||
return; | ||
} | ||
bindings.obx_observer_close(anyObserver[store.ptr.address]); | ||
anyObserver.remove(store.ptr.address); | ||
} | ||
} | ||
|
||
extension ObservableStore on Store { | ||
subscribe () { Observable.subscribe(this); } | ||
unsubscribe () { Observable.unsubscribe(this); } | ||
} | ||
|
||
extension Streamable<T> on Query<T> { | ||
_setup() { | ||
if (!Observable.anyObserver.containsKey(this.store.ptr)) { | ||
this.store.subscribe(); | ||
} | ||
|
||
// Assume consensus on entityId over all available Stores | ||
Buggaboo marked this conversation as resolved.
Show resolved
Hide resolved
|
||
Observable.any[this.entityId] ??= (u, _, __) { | ||
// dummy value to trigger an event | ||
Observable.controller.add(u.address); | ||
}; | ||
} | ||
|
||
Stream<List<T>> findStream({int offset = 0, int limit = 0}) { | ||
_setup(); | ||
return Observable.controller.stream | ||
.map((_) => this.find(offset:offset, limit:limit)); | ||
} | ||
|
||
/// Use this for Query Property | ||
Stream<Query<T>> get stream { | ||
_setup(); | ||
return Observable.controller.stream | ||
.map((_) => this); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,156 @@ | ||
import "package:test/test.dart"; | ||
import "package:objectbox/src/bindings/bindings.dart"; | ||
import "package:objectbox/src/bindings/signatures.dart"; | ||
import "entity.dart"; | ||
import "entity2.dart"; | ||
import 'test_env.dart'; | ||
import 'objectbox.g.dart'; | ||
import "dart:ffi"; | ||
|
||
// ignore_for_file: non_constant_identifier_names | ||
|
||
/// Pointer.fromAddress(0) does not fire at all | ||
Pointer<Void> randomPtr = Pointer.fromAddress(1337); | ||
|
||
var callbackSingleTypeCounter = 0; | ||
void callbackSingleType(Pointer<Void> user_data) { | ||
expect(user_data.address, randomPtr.address); | ||
callbackSingleTypeCounter++; | ||
} | ||
|
||
var callbackAnyTypeCounter = 0; | ||
void callbackAnyType(Pointer<Void> user_data, Pointer<Uint32> mutated_ids, int mutated_count) { | ||
expect(user_data.address, randomPtr.address); | ||
callbackAnyTypeCounter++; | ||
} | ||
|
||
// dart callback signatures | ||
typedef Single = void Function(Pointer<Void>); | ||
typedef Any = void Function(Pointer<Void>, Pointer<Uint32>, int); | ||
|
||
class Observable { | ||
static Pointer<Void> singleObserver, anyObserver; | ||
|
||
static Single single; | ||
static Any any; | ||
|
||
Store store; | ||
|
||
Observable.fromStore(this.store); | ||
|
||
static void _anyCallback(Pointer<Void> user_data, Pointer<Uint32> mutated_ids, int mutated_count) { | ||
any(user_data, mutated_ids, mutated_count); | ||
} | ||
|
||
static void _singleCallback(Pointer<Void> user_data) { | ||
single(user_data); | ||
} | ||
|
||
void observeSingleType(int entityId, Single fn, Pointer<Void> identifier) { | ||
single = fn; | ||
final callback = Pointer.fromFunction<obx_observer_single_type_t<Void>>(_singleCallback); | ||
singleObserver = bindings.obx_observe_single_type(store.ptr, entityId, callback, identifier); | ||
} | ||
|
||
void observe(Any fn, Pointer<Void> identifier) { | ||
any = fn; | ||
final callback = Pointer.fromFunction<obx_observer_t<Void, Uint32>>(_anyCallback); | ||
anyObserver = bindings.obx_observe(store.ptr, callback, identifier); | ||
} | ||
} | ||
|
||
void main() async { | ||
TestEnv env; | ||
Box box; | ||
Store store; | ||
|
||
final testEntityId = getObjectBoxModel().model.findEntityByName("TestEntity").id.id; | ||
|
||
final List<TestEntity> simpleStringItems = <String>["One", "Two", "Three", "Four", "Five", "Six"].map((s) => | ||
TestEntity(tString: s)).toList(); | ||
|
||
final List<TestEntity> simpleNumberItems = [1,2,3,4,5,6].map((s) => | ||
TestEntity(tInt: s)).toList(); | ||
|
||
setUp(() { | ||
env = TestEnv("observers"); | ||
box = env.box; | ||
store = env.store; | ||
}); | ||
|
||
/// Non static function can't be used for ffi, but you can call a dynamic function | ||
/// aka closure inside a static function | ||
// void callbackAnyTypeNonStatic(Pointer<Void> user_data, Pointer<Uint32> mutated_ids, int mutated_count) { | ||
// expect(user_data.address, 0); | ||
// expect(mutated_count, 1); | ||
// } | ||
|
||
test("Observe any entity with class member callback", () async { | ||
final o = Observable.fromStore(store); | ||
var putCount = 0; | ||
o.observe((Pointer<Void> user_data, Pointer<Uint32> mutated_ids, int mutated_count) { | ||
expect(user_data.address, randomPtr.address); | ||
putCount++; | ||
}, randomPtr); | ||
|
||
box.putMany(simpleStringItems); | ||
simpleStringItems.forEach((i) => box.put(i)); | ||
simpleNumberItems.forEach((i) => box.put(i)); | ||
|
||
bindings.obx_observer_close(Observable.anyObserver); | ||
expect(putCount, 13); | ||
}); | ||
|
||
test("Observe a single entity with class member callback", () async { | ||
final o = Observable.fromStore(store); | ||
var putCount = 0; | ||
o.observeSingleType(testEntityId, (Pointer<Void> user_data) { | ||
putCount++; | ||
}, randomPtr); | ||
|
||
box.putMany(simpleStringItems); | ||
simpleStringItems.forEach((i) => box.put(i)); | ||
simpleNumberItems.forEach((i) => box.put(i)); | ||
|
||
bindings.obx_observer_close(Observable.singleObserver); | ||
expect(putCount, 13); | ||
}); | ||
|
||
test("Observe any entity with static callback", () async { | ||
final callback = Pointer.fromFunction<obx_observer_t<Void, Uint32>>(callbackAnyType); | ||
final observer = bindings.obx_observe(store.ptr, callback, Pointer.fromAddress(1337)); | ||
|
||
box.putMany(simpleStringItems); | ||
|
||
box.remove(1); | ||
|
||
// update value | ||
final entity2 = box.get(2); | ||
entity2.tString = "Dva"; | ||
box.put(entity2); | ||
|
||
final box2 = Box<TestEntity2>(store); | ||
box2.put(TestEntity2()); | ||
box2.remove(1); | ||
box2.put(TestEntity2()); | ||
|
||
expect(callbackAnyTypeCounter, 6); | ||
bindings.obx_observer_close(observer); | ||
}); | ||
|
||
test("Observe single entity", () async { | ||
final callback = Pointer.fromFunction<obx_observer_single_type_t<Void>>(callbackSingleType); | ||
final observer = bindings.obx_observe_single_type(store.ptr, testEntityId, callback, randomPtr); | ||
|
||
box.putMany(simpleStringItems); | ||
simpleStringItems.forEach((i) => box.put(i)); | ||
simpleNumberItems.forEach((i) => box.put(i)); | ||
|
||
expect(callbackSingleTypeCounter, 13); | ||
bindings.obx_observer_close(observer); | ||
}); | ||
|
||
tearDown(() { | ||
env.close(); | ||
}); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason why rxdart? Are there other alternatives you've considered?
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems to be the most popular one, and it supports Stream which is more idiomatic, by way of extensions. I expose a stream property from an observable query for that reason.