Skip to content

Commit 873cdab

Browse files
committed
[2.x] Replace observables with iterables
1 parent d34a8cf commit 873cdab

File tree

6 files changed

+78
-196
lines changed

6 files changed

+78
-196
lines changed

composer.json

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,8 @@
1515
"react/async": "^4.2",
1616
"react/event-loop": "^1.5",
1717
"react/promise": "^2.11 || ^3.0",
18-
"reactivex/rxphp": "^2.0.12",
1918
"wyrihaximus/constants": "^1.6.0",
20-
"wyrihaximus/metrics": "^1.0 || ^2",
21-
"wyrihaximus/react-awaitable-observable": "^1",
22-
"wyrihaximus/react-event-loop-rx-scheduler-hook-up": "^0.1.1"
19+
"wyrihaximus/metrics": "^1.0 || ^2"
2320
},
2421
"require-dev": {
2522
"wyrihaximus/async-test-utilities": "^7.2 || ^5"

composer.lock

Lines changed: 4 additions & 181 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

infection.json.dist

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
{
2-
"timeout": 120,
2+
"timeout": 1800,
33
"source": {
44
"directories": [
55
"src"
@@ -9,8 +9,12 @@
99
"text": "./var/infection.log",
1010
"summary": "./var/infection-summary.log",
1111
"json": "./var/infection.json",
12-
"perMutator": "./var/infection-per-mutator.md"
12+
"perMutator": "./var/infection-per-mutator.md",
13+
"github": true
1314
},
15+
"minMsi": 100,
16+
"minCoveredMsi": 100,
17+
"ignoreMsiWithNoMutations": true,
1418
"mutators": {
1519
"@default": true
1620
},

src/Done.php

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace ReactParallel\EventLoop;
6+
7+
/**
8+
* @internal
9+
*
10+
* Marker class for when a stream is done
11+
*/
12+
final class Done
13+
{
14+
}

src/EventLoopBridge.php

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,12 @@
1010
use React\EventLoop\Loop;
1111
use React\EventLoop\TimerInterface;
1212
use React\Promise\Deferred;
13-
use Rx\Subject\Subject;
1413
use WyriHaximus\Metrics\Label;
1514

1615
use function count;
1716
use function React\Async\await;
1817
use function spl_object_hash;
1918
use function spl_object_id;
20-
use function WyriHaximus\React\awaitObservable;
2119

2220
use const WyriHaximus\Constants\Numeric\ZERO;
2321

@@ -40,7 +38,7 @@ final class EventLoopBridge
4038

4139
private TimerInterface|null $timer = null;
4240

43-
/** @var array<int, Subject> */
41+
/** @var array<int, Stream> */
4442
private array $channels = [];
4543

4644
/** @var array<int, Deferred> */
@@ -65,11 +63,12 @@ public function withMetrics(Metrics $metrics): self
6563
return $self;
6664
}
6765

68-
/** @return iterable<mixed> */
66+
/**
67+
* @return iterable<mixed>
68+
*/
6969
public function observe(Channel $channel): iterable
7070
{
71-
$subject = new Subject();
72-
$this->channels[spl_object_id($channel)] = $subject;
71+
$this->channels[spl_object_id($channel)] = new Stream();
7372
$this->events->addChannel($channel);
7473

7574
if ($this->metrics instanceof Metrics) {
@@ -78,7 +77,7 @@ public function observe(Channel $channel): iterable
7877

7978
$this->startTimer();
8079

81-
return awaitObservable($subject);
80+
yield from $this->channels[spl_object_id($channel)]->iterable();
8281
}
8382

8483
public function await(Future $future): mixed
@@ -228,7 +227,7 @@ private function handleFutureReadEvent(Events\Event $event): void
228227

229228
private function handleChannelReadEvent(Events\Event $event): void
230229
{
231-
$this->channels[spl_object_id($event->object)]->onNext($event->value);
230+
$this->channels[spl_object_id($event->object)]->value($event->value);
232231
$this->events->addChannel($event->object); /** @phpstan-ignore-line */
233232

234233
if (! ($this->metrics instanceof Metrics)) {
@@ -240,7 +239,7 @@ private function handleChannelReadEvent(Events\Event $event): void
240239

241240
private function handleCloseEvent(Events\Event $event): void
242241
{
243-
$this->channels[spl_object_id($event->object)]->onCompleted();
242+
$this->channels[spl_object_id($event->object)]->done();
244243
unset($this->channels[spl_object_id($event->object)]);
245244

246245
if (! ($this->metrics instanceof Metrics)) {

src/Stream.php

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace ReactParallel\EventLoop;
6+
7+
use React\Promise\Deferred;
8+
9+
use function React\Async\await;
10+
11+
final class Stream
12+
{
13+
private Deferred $wait;
14+
15+
public function __construct()
16+
{
17+
$this->wait = new Deferred();
18+
}
19+
20+
public function value(mixed $value): void
21+
{
22+
$this->wait->resolve($value);
23+
}
24+
25+
public function done(): void
26+
{
27+
$this->wait->resolve(new Done());
28+
}
29+
30+
public function iterable(): iterable
31+
{
32+
do {
33+
$run = false;
34+
$value = await($this->wait->promise());
35+
if ($value instanceof Done) {
36+
continue;
37+
}
38+
39+
$this->wait = new Deferred();
40+
$run = true;
41+
42+
yield $value;
43+
} while ($run);
44+
}
45+
}

0 commit comments

Comments
 (0)