From 5071fdb4c8f3dc183737707484090125bc772714 Mon Sep 17 00:00:00 2001 From: Cees-Jan Kiewiet Date: Thu, 26 Dec 2024 12:53:47 +0100 Subject: [PATCH] [2.x] Replace observables with iterables --- composer.json | 5 +- composer.lock | 185 +--------------------------------------- infection.json.dist | 8 +- src/Done.php | 14 +++ src/EventLoopBridge.php | 13 ++- src/Stream.php | 54 ++++++++++++ src/Value.php | 14 +++ 7 files changed, 98 insertions(+), 195 deletions(-) create mode 100644 src/Done.php create mode 100644 src/Stream.php create mode 100644 src/Value.php diff --git a/composer.json b/composer.json index 8a25255..47f3bf9 100644 --- a/composer.json +++ b/composer.json @@ -15,11 +15,8 @@ "react/async": "^4.2", "react/event-loop": "^1.5", "react/promise": "^2.11 || ^3.0", - "reactivex/rxphp": "^2.0.12", "wyrihaximus/constants": "^1.6.0", - "wyrihaximus/metrics": "^1.0 || ^2", - "wyrihaximus/react-awaitable-observable": "^1", - "wyrihaximus/react-event-loop-rx-scheduler-hook-up": "^0.1.1" + "wyrihaximus/metrics": "^1.0 || ^2" }, "require-dev": { "wyrihaximus/async-test-utilities": "^7.2 || ^5" diff --git a/composer.lock b/composer.lock index 43ae615..355b46f 100644 --- a/composer.lock +++ b/composer.lock @@ -4,7 +4,7 @@ "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies", "This file is @generated automatically" ], - "content-hash": "726ced98242b2202f7b26f2356a9cd8b", + "content-hash": "e972e1ad02dbc63b039ee95fe8067c31", "packages": [ { "name": "lcobucci/clock", @@ -342,74 +342,6 @@ ], "time": "2023-11-16T16:16:50+00:00" }, - { - "name": "reactivex/rxphp", - "version": "2.0.12", - "source": { - "type": "git", - "url": "https://github.com/ReactiveX/RxPHP.git", - "reference": "eee8eb20ec310632d0356ff1bcaccf5c90094ba6" - }, - "dist": { - "type": "zip", - "url": "https://api.github.com/repos/ReactiveX/RxPHP/zipball/eee8eb20ec310632d0356ff1bcaccf5c90094ba6", - "reference": "eee8eb20ec310632d0356ff1bcaccf5c90094ba6", - "shasum": "" - }, - "require": { - "php": ">=7.0.0", - "react/promise": "^3 || ~2.2" - }, - "require-dev": { - "phpunit/phpunit": "^8.5 || ^9", - "react/event-loop": "^1.0 || ^0.5 || ^0.4.2", - "satooshi/php-coveralls": "~1.0" - }, - "suggest": { - "react/event-loop": "Used for scheduling async operations" - }, - "type": "library", - "extra": { - "branch-alias": { - "dev-master": "2.0-dev" - } - }, - "autoload": { - "psr-4": { - "Rx\\": "src" - } - }, - "notification-url": "https://packagist.org/downloads/", - "license": [ - "MIT" - ], - "authors": [ - { - "name": "Alexander", - "email": "iam.asm89@gmail.com" - }, - { - "name": "David Dan", - "email": "davidwdan@gmail.com" - }, - { - "name": "Matt Bonneau", - "email": "matt@bonneau.net" - } - ], - "description": "Reactive extensions for php.", - "homepage": "https://github.com/ReactiveX/RxPHP", - "keywords": [ - "extensions", - "reactive", - "rx" - ], - "support": { - "issues": "https://github.com/ReactiveX/RxPHP/issues", - "source": "https://github.com/ReactiveX/RxPHP/tree/2.0.12" - }, - "time": "2023-11-27T16:37:30+00:00" - }, { "name": "thecodingmachine/safe", "version": "v1.3.3", @@ -654,115 +586,6 @@ } ], "time": "2021-11-10T16:03:58+00:00" - }, - { - "name": "wyrihaximus/react-awaitable-observable", - "version": "1.0.0", - "source": { - "type": "git", - "url": "https://github.com/WyriHaximus/reactphp-awaitable-observable.git", - "reference": "d8423f506342f15d4c30a4da404bd25f457caddb" - }, - "dist": { - "type": "zip", - "url": "https://api.github.com/repos/WyriHaximus/reactphp-awaitable-observable/zipball/d8423f506342f15d4c30a4da404bd25f457caddb", - "reference": "d8423f506342f15d4c30a4da404bd25f457caddb", - "shasum": "" - }, - "require": { - "php": "^8.1", - "react/async": "^4", - "react/promise": "^2.9", - "reactivex/rxphp": "^2.0.10" - }, - "require-dev": { - "wyrihaximus/async-test-utilities": "^5.0.11" - }, - "type": "library", - "autoload": { - "files": [ - "src/functions_include.php" - ], - "psr-4": { - "WyriHaximus\\React\\": "src/" - } - }, - "notification-url": "https://packagist.org/downloads/", - "license": [ - "MIT" - ], - "authors": [ - { - "name": "Cees-Jan Kiewiet", - "email": "ceesjank@gmail.com" - } - ], - "description": "🛠️ Make observables foreachable using async & await", - "support": { - "issues": "https://github.com/WyriHaximus/reactphp-awaitable-observable/issues", - "source": "https://github.com/WyriHaximus/reactphp-awaitable-observable/tree/1.0.0" - }, - "funding": [ - { - "url": "https://github.com/WyriHaximus", - "type": "github" - } - ], - "time": "2022-08-11T13:15:09+00:00" - }, - { - "name": "wyrihaximus/react-event-loop-rx-scheduler-hook-up", - "version": "0.1.1", - "source": { - "type": "git", - "url": "https://github.com/WyriHaximus/reactphp-event-loop-rx-scheduler-hook-up.git", - "reference": "462e794cba3c810b77d1e8cb33be43a902673272" - }, - "dist": { - "type": "zip", - "url": "https://api.github.com/repos/WyriHaximus/reactphp-event-loop-rx-scheduler-hook-up/zipball/462e794cba3c810b77d1e8cb33be43a902673272", - "reference": "462e794cba3c810b77d1e8cb33be43a902673272", - "shasum": "" - }, - "require": { - "php": "^8.1", - "react/event-loop": "^1.3", - "reactivex/rxphp": "^2.0" - }, - "conflict": { - "azjezz/psl": "<2" - }, - "require-dev": { - "wyrihaximus/async-test-utilities": "^5.0.25" - }, - "type": "library", - "extra": { - "unused": [ - "wyrihaximus/react-mutex", - "wyrihaximus/react-mutex-contracts" - ] - }, - "autoload": { - "files": [ - "src/bootstrap.php" - ] - }, - "notification-url": "https://packagist.org/downloads/", - "license": [ - "MIT" - ], - "description": "🪝 Hook up ReactPHP Event Loop to the RxPHP Scheduler", - "support": { - "issues": "https://github.com/WyriHaximus/reactphp-event-loop-rx-scheduler-hook-up/issues", - "source": "https://github.com/WyriHaximus/reactphp-event-loop-rx-scheduler-hook-up/tree/0.1.1" - }, - "funding": [ - { - "url": "https://github.com/WyriHaximus", - "type": "github" - } - ], - "time": "2023-02-26T15:05:42+00:00" } ], "packages-dev": [ @@ -9617,14 +9440,14 @@ ], "aliases": [], "minimum-stability": "stable", - "stability-flags": [], + "stability-flags": {}, "prefer-stable": false, - "prefer-lowest": true, + "prefer-lowest": false, "platform": { "php": "^8.1", "ext-parallel": "*" }, - "platform-dev": [], + "platform-dev": {}, "platform-overrides": { "php": "8.1.13" }, diff --git a/infection.json.dist b/infection.json.dist index e31539a..11f0133 100644 --- a/infection.json.dist +++ b/infection.json.dist @@ -1,5 +1,5 @@ { - "timeout": 120, + "timeout": 1800, "source": { "directories": [ "src" @@ -9,8 +9,12 @@ "text": "./var/infection.log", "summary": "./var/infection-summary.log", "json": "./var/infection.json", - "perMutator": "./var/infection-per-mutator.md" + "perMutator": "./var/infection-per-mutator.md", + "github": true }, + "minMsi": 100, + "minCoveredMsi": 100, + "ignoreMsiWithNoMutations": true, "mutators": { "@default": true }, diff --git a/src/Done.php b/src/Done.php new file mode 100644 index 0000000..6fb4e80 --- /dev/null +++ b/src/Done.php @@ -0,0 +1,14 @@ + */ + /** @var array */ private array $channels = []; /** @var array */ @@ -68,8 +66,7 @@ public function withMetrics(Metrics $metrics): self /** @return iterable */ public function observe(Channel $channel): iterable { - $subject = new Subject(); - $this->channels[spl_object_id($channel)] = $subject; + $this->channels[spl_object_id($channel)] = new Stream(); $this->events->addChannel($channel); if ($this->metrics instanceof Metrics) { @@ -78,7 +75,7 @@ public function observe(Channel $channel): iterable $this->startTimer(); - return awaitObservable($subject); + yield from $this->channels[spl_object_id($channel)]->iterable(); } public function await(Future $future): mixed @@ -228,7 +225,7 @@ private function handleFutureReadEvent(Events\Event $event): void private function handleChannelReadEvent(Events\Event $event): void { - $this->channels[spl_object_id($event->object)]->onNext($event->value); + $this->channels[spl_object_id($event->object)]->value($event->value); $this->events->addChannel($event->object); /** @phpstan-ignore-line */ if (! ($this->metrics instanceof Metrics)) { @@ -240,7 +237,7 @@ private function handleChannelReadEvent(Events\Event $event): void private function handleCloseEvent(Events\Event $event): void { - $this->channels[spl_object_id($event->object)]->onCompleted(); + $this->channels[spl_object_id($event->object)]->done(); unset($this->channels[spl_object_id($event->object)]); if (! ($this->metrics instanceof Metrics)) { diff --git a/src/Stream.php b/src/Stream.php new file mode 100644 index 0000000..0e65ce8 --- /dev/null +++ b/src/Stream.php @@ -0,0 +1,54 @@ +queue = new SplQueue(); + $this->queue->setIteratorMode(SplQueue::IT_MODE_DELETE); + $this->wait = new Deferred(); + } + + public function value(mixed $value): void + { + $this->queue->enqueue($value); + $this->wait->resolve(new Value()); + } + + public function done(): void + { + $this->wait->resolve(new Done()); + } + + /** @return iterable */ + public function iterable(): iterable + { + do { + $run = false; + $type = await($this->wait->promise()); + + foreach ($this->queue as $value) { + yield $value; + } + + if ($type instanceof Done) { + continue; + } + + $this->wait = new Deferred(); + $run = true; + } while ($run); + } +} diff --git a/src/Value.php b/src/Value.php new file mode 100644 index 0000000..ff96e71 --- /dev/null +++ b/src/Value.php @@ -0,0 +1,14 @@ +