Skip to content

Commit 8da5b9d

Browse files
committed
Some rework based on PR #528
- issue #460 - rework based on [comment](#531 (comment))
1 parent 14e740f commit 8da5b9d

File tree

2 files changed

+56
-21
lines changed

2 files changed

+56
-21
lines changed

src/Queue/RabbitMQQueue.php

Lines changed: 31 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -183,11 +183,19 @@ public function laterRaw($delay, string $payload, $queue = null, int $attempts =
183183
*/
184184
public function bulk($jobs, $data = '', $queue = null): void
185185
{
186-
foreach ((array) $jobs as $job) {
186+
$this->publishBatch($jobs, $queue, $data);
187+
}
188+
189+
/**
190+
* @throws AMQPProtocolChannelException
191+
*/
192+
protected function publishBatch($jobs, $data = '', $queue = null): void
193+
{
194+
foreach ($jobs as $job) {
187195
$this->bulkRaw($this->createPayload($job, $queue, $data), $queue, ['job' => $job]);
188196
}
189197

190-
$this->publishBatch();
198+
$this->batchPublish();
191199
}
192200

193201
/**
@@ -274,21 +282,6 @@ public function setConnection(AbstractConnection $connection): RabbitMQQueue
274282
return $this;
275283
}
276284

277-
public function getChannel($forceNew = false): AMQPChannel
278-
{
279-
if (! $this->channel || $forceNew) {
280-
$this->channel = $this->createChannel();
281-
}
282-
283-
return $this->channel;
284-
}
285-
286-
protected function reconnect()
287-
{
288-
$this->getConnection()->reconnect();
289-
$this->getChannel(true);
290-
}
291-
292285
/**
293286
* Job class to use.
294287
*
@@ -746,13 +739,33 @@ protected function publishBasic($msg, $exchange = '', $destination = '', $mandat
746739
$this->getChannel()->basic_publish($msg, $exchange, $destination, $mandatory, $immediate, $ticket);
747740
}
748741

749-
protected function publishBatch(): void
742+
protected function batchPublish(): void
750743
{
751744
$this->getChannel()->publish_batch();
752745
}
753746

747+
public function getChannel($forceNew = false): AMQPChannel
748+
{
749+
if (! $this->channel || $forceNew) {
750+
$this->channel = $this->createChannel();
751+
}
752+
753+
return $this->channel;
754+
}
755+
754756
protected function createChannel(): AMQPChannel
755757
{
756758
return $this->getConnection()->channel();
757759
}
760+
761+
/**
762+
* @throws Exception
763+
*/
764+
protected function reconnect(): void
765+
{
766+
// Reconnects using the original connection settings.
767+
$this->getConnection()->reconnect();
768+
// Create a new main channel because all old channels are removed.
769+
$this->getChannel(true);
770+
}
758771
}

src/Queue/ReconnectTrait.php

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,16 @@
22

33
namespace VladimirYuldashev\LaravelQueueRabbitMQ\Queue;
44

5+
use Exception;
6+
use PhpAmqpLib\Channel\AMQPChannel;
57
use PhpAmqpLib\Exception\AMQPChannelClosedException;
68
use PhpAmqpLib\Exception\AMQPConnectionClosedException;
79

810
trait ReconnectTrait
911
{
12+
/**
13+
* @throws Exception
14+
*/
1015
protected function publishBasic($msg, $exchange = '', $destination = '', $mandatory = false, $immediate = false, $ticket = null): void
1116
{
1217
try {
@@ -17,13 +22,30 @@ protected function publishBasic($msg, $exchange = '', $destination = '', $mandat
1722
}
1823
}
1924

20-
protected function publishBatch(): void
25+
/**
26+
* @throws Exception
27+
*/
28+
protected function publishBatch($jobs, $data = '', $queue = null): void
2129
{
2230
try {
23-
parent::publishBatch();
31+
parent::publishBatch($jobs, $data, $queue);
2432
} catch (AMQPConnectionClosedException|AMQPChannelClosedException) {
2533
$this->reconnect();
26-
parent::publishBatch();
34+
parent::publishBatch($jobs, $data, $queue);
35+
}
36+
}
37+
38+
/**
39+
* @throws Exception
40+
*/
41+
protected function createChannel(): AMQPChannel
42+
{
43+
try {
44+
return parent::createChannel();
45+
} catch (AMQPConnectionClosedException) {
46+
$this->reconnect();
47+
48+
return parent::createChannel();
2749
}
2850
}
2951
}

0 commit comments

Comments
 (0)