Skip to content

Commit f98fb68

Browse files
authored
Merge pull request #59 from reactphp-parallel/2.x-replace-observables-with-iterables
[2.x] Replace observables with iterables
2 parents f5b368c + 5071fdb commit f98fb68

File tree

7 files changed

+97
-194
lines changed

7 files changed

+97
-194
lines changed

composer.json

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,8 @@
1515
"react/async": "^4.2",
1616
"react/event-loop": "^1.5",
1717
"react/promise": "^2.11 || ^3.0",
18-
"reactivex/rxphp": "^2.0.12",
1918
"wyrihaximus/constants": "^1.6.0",
20-
"wyrihaximus/metrics": "^1.0 || ^2",
21-
"wyrihaximus/react-awaitable-observable": "^1",
22-
"wyrihaximus/react-event-loop-rx-scheduler-hook-up": "^0.1.1"
19+
"wyrihaximus/metrics": "^1.0 || ^2"
2320
},
2421
"require-dev": {
2522
"wyrihaximus/async-test-utilities": "^7.2 || ^5"

composer.lock

Lines changed: 3 additions & 180 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

infection.json.dist

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
{
2-
"timeout": 120,
2+
"timeout": 1800,
33
"source": {
44
"directories": [
55
"src"
@@ -9,8 +9,12 @@
99
"text": "./var/infection.log",
1010
"summary": "./var/infection-summary.log",
1111
"json": "./var/infection.json",
12-
"perMutator": "./var/infection-per-mutator.md"
12+
"perMutator": "./var/infection-per-mutator.md",
13+
"github": true
1314
},
15+
"minMsi": 100,
16+
"minCoveredMsi": 100,
17+
"ignoreMsiWithNoMutations": true,
1418
"mutators": {
1519
"@default": true
1620
},

src/Done.php

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace ReactParallel\EventLoop;
6+
7+
/**
8+
* @internal
9+
*
10+
* Marker class for when a stream is done
11+
*/
12+
final class Done
13+
{
14+
}

src/EventLoopBridge.php

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,12 @@
1010
use React\EventLoop\Loop;
1111
use React\EventLoop\TimerInterface;
1212
use React\Promise\Deferred;
13-
use Rx\Subject\Subject;
1413
use WyriHaximus\Metrics\Label;
1514

1615
use function count;
1716
use function React\Async\await;
1817
use function spl_object_hash;
1918
use function spl_object_id;
20-
use function WyriHaximus\React\awaitObservable;
2119

2220
use const WyriHaximus\Constants\Numeric\ZERO;
2321

@@ -40,7 +38,7 @@ final class EventLoopBridge
4038

4139
private TimerInterface|null $timer = null;
4240

43-
/** @var array<int, Subject> */
41+
/** @var array<int, Stream> */
4442
private array $channels = [];
4543

4644
/** @var array<int, Deferred> */
@@ -68,8 +66,7 @@ public function withMetrics(Metrics $metrics): self
6866
/** @return iterable<mixed> */
6967
public function observe(Channel $channel): iterable
7068
{
71-
$subject = new Subject();
72-
$this->channels[spl_object_id($channel)] = $subject;
69+
$this->channels[spl_object_id($channel)] = new Stream();
7370
$this->events->addChannel($channel);
7471

7572
if ($this->metrics instanceof Metrics) {
@@ -78,7 +75,7 @@ public function observe(Channel $channel): iterable
7875

7976
$this->startTimer();
8077

81-
return awaitObservable($subject);
78+
yield from $this->channels[spl_object_id($channel)]->iterable();
8279
}
8380

8481
public function await(Future $future): mixed
@@ -228,7 +225,7 @@ private function handleFutureReadEvent(Events\Event $event): void
228225

229226
private function handleChannelReadEvent(Events\Event $event): void
230227
{
231-
$this->channels[spl_object_id($event->object)]->onNext($event->value);
228+
$this->channels[spl_object_id($event->object)]->value($event->value);
232229
$this->events->addChannel($event->object); /** @phpstan-ignore-line */
233230

234231
if (! ($this->metrics instanceof Metrics)) {
@@ -240,7 +237,7 @@ private function handleChannelReadEvent(Events\Event $event): void
240237

241238
private function handleCloseEvent(Events\Event $event): void
242239
{
243-
$this->channels[spl_object_id($event->object)]->onCompleted();
240+
$this->channels[spl_object_id($event->object)]->done();
244241
unset($this->channels[spl_object_id($event->object)]);
245242

246243
if (! ($this->metrics instanceof Metrics)) {

src/Stream.php

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace ReactParallel\EventLoop;
6+
7+
use React\Promise\Deferred;
8+
use SplQueue;
9+
10+
use function React\Async\await;
11+
12+
final class Stream
13+
{
14+
private SplQueue $queue;
15+
private Deferred $wait;
16+
17+
public function __construct()
18+
{
19+
$this->queue = new SplQueue();
20+
$this->queue->setIteratorMode(SplQueue::IT_MODE_DELETE);
21+
$this->wait = new Deferred();
22+
}
23+
24+
public function value(mixed $value): void
25+
{
26+
$this->queue->enqueue($value);
27+
$this->wait->resolve(new Value());
28+
}
29+
30+
public function done(): void
31+
{
32+
$this->wait->resolve(new Done());
33+
}
34+
35+
/** @return iterable<mixed> */
36+
public function iterable(): iterable
37+
{
38+
do {
39+
$run = false;
40+
$type = await($this->wait->promise());
41+
42+
foreach ($this->queue as $value) {
43+
yield $value;
44+
}
45+
46+
if ($type instanceof Done) {
47+
continue;
48+
}
49+
50+
$this->wait = new Deferred();
51+
$run = true;
52+
} while ($run);
53+
}
54+
}

src/Value.php

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace ReactParallel\EventLoop;
6+
7+
/**
8+
* @internal
9+
*
10+
* Marker class for when a stream is received a value
11+
*/
12+
final class Value
13+
{
14+
}

0 commit comments

Comments
 (0)