Skip to content
This repository was archived by the owner on Dec 24, 2022. It is now read-only.

Commit 0c39476

Browse files
committed
Rider clean up pub/sub server
1 parent dd12492 commit 0c39476

File tree

1 file changed

+86
-92
lines changed

1 file changed

+86
-92
lines changed

src/ServiceStack.Redis/RedisPubSubServer.cs

Lines changed: 86 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ private void Init()
164164
if (HeartbeatInterval != null)
165165
{
166166
heartbeatTimer = new Timer(SendHeartbeat, null,
167-
TimeSpan.FromMilliseconds(0), HeartbeatInterval.Value);
167+
TimeSpan.FromMilliseconds(0), HeartbeatInterval.GetValueOrDefault());
168168
}
169169

170170
Interlocked.CompareExchange(ref lastHeartbeatTicks, DateTime.UtcNow.Ticks, lastHeartbeatTicks);
@@ -178,7 +178,7 @@ void SendHeartbeat(object state)
178178
if (currentStatus != Status.Started)
179179
return;
180180

181-
if (DateTime.UtcNow - new DateTime(lastHeartbeatTicks) < HeartbeatInterval.Value)
181+
if (DateTime.UtcNow - new DateTime(lastHeartbeatTicks) < HeartbeatInterval.GetValueOrDefault())
182182
return;
183183

184184
OnHeartbeatSent?.Invoke();
@@ -229,97 +229,95 @@ private void RunLoop()
229229
//RESET
230230
while (Interlocked.CompareExchange(ref status, 0, 0) == Status.Started)
231231
{
232-
using (var redis = ClientsManager.GetReadOnlyClient())
233-
{
234-
masterClient = redis;
232+
using var redis = ClientsManager.GetReadOnlyClient();
233+
masterClient = redis;
235234

236-
//Record that we had a good run...
237-
Interlocked.CompareExchange(ref noOfContinuousErrors, 0, noOfContinuousErrors);
235+
//Record that we had a good run...
236+
Interlocked.CompareExchange(ref noOfContinuousErrors, 0, noOfContinuousErrors);
238237

239-
using (var subscription = redis.CreateSubscription())
240-
{
241-
subscription.OnUnSubscribe = HandleUnSubscribe;
238+
using var subscription = redis.CreateSubscription();
239+
subscription.OnUnSubscribe = HandleUnSubscribe;
242240

243-
if (OnMessageBytes != null)
244-
{
245-
bool IsCtrlMessage(byte[] msg)
246-
{
247-
if (msg.Length < 4)
248-
return false;
249-
return msg[0] == 'C' && msg[1] == 'T' && msg[0] == 'R' && msg[0] == 'L';
250-
}
241+
if (OnMessageBytes != null)
242+
{
243+
bool IsCtrlMessage(byte[] msg)
244+
{
245+
if (msg.Length < 4)
246+
return false;
247+
return msg[0] == 'C' && msg[1] == 'T' && msg[0] == 'R' && msg[0] == 'L';
248+
}
251249

252-
((RedisSubscription)subscription).OnMessageBytes = (channel, msg) => {
253-
if (IsCtrlMessage(msg))
254-
return;
250+
((RedisSubscription)subscription).OnMessageBytes = (channel, msg) => {
251+
if (IsCtrlMessage(msg))
252+
return;
255253

256-
OnMessageBytes(channel, msg);
257-
};
258-
}
254+
OnMessageBytes(channel, msg);
255+
};
256+
}
259257

260-
subscription.OnMessage = (channel, msg) =>
261-
{
262-
if (string.IsNullOrEmpty(msg))
263-
return;
258+
subscription.OnMessage = (channel, msg) =>
259+
{
260+
if (string.IsNullOrEmpty(msg))
261+
return;
264262

265-
var ctrlMsg = msg.LeftPart(':');
266-
if (ctrlMsg == ControlCommand.Control)
267-
{
268-
var op = Interlocked.CompareExchange(ref doOperation, Operation.NoOp, doOperation);
263+
var ctrlMsg = msg.LeftPart(':');
264+
if (ctrlMsg == ControlCommand.Control)
265+
{
266+
var op = Interlocked.CompareExchange(ref doOperation, Operation.NoOp, doOperation);
269267

270-
var msgType = msg.IndexOf(':') >= 0
271-
? msg.RightPart(':')
272-
: null;
268+
var msgType = msg.IndexOf(':') >= 0
269+
? msg.RightPart(':')
270+
: null;
273271

274-
OnControlCommand?.Invoke(msgType ?? Operation.GetName(op));
272+
OnControlCommand?.Invoke(msgType ?? Operation.GetName(op));
275273

276-
switch (op)
274+
switch (op)
275+
{
276+
case Operation.Stop:
277+
if (Log.IsDebugEnabled)
278+
Log.Debug("Stop Command Issued");
279+
280+
Interlocked.CompareExchange(ref status, Status.Stopping, Status.Started);
281+
try
277282
{
278-
case Operation.Stop:
279-
if (Log.IsDebugEnabled)
280-
Log.Debug("Stop Command Issued");
281-
282-
Interlocked.CompareExchange(ref status, Status.Stopping, Status.Started);
283-
try
284-
{
285-
if (Log.IsDebugEnabled)
286-
Log.Debug("UnSubscribe From All Channels...");
287-
288-
subscription.UnSubscribeFromAllChannels(); //Un block thread.
289-
}
290-
finally
291-
{
292-
Interlocked.CompareExchange(ref status, Status.Stopped, Status.Stopping);
293-
}
294-
return;
295-
296-
case Operation.Reset:
297-
subscription.UnSubscribeFromAllChannels(); //Un block thread.
298-
return;
299-
}
283+
if (Log.IsDebugEnabled)
284+
Log.Debug("UnSubscribe From All Channels...");
300285

301-
switch (msgType)
286+
// ReSharper disable once AccessToDisposedClosure
287+
subscription.UnSubscribeFromAllChannels(); //Un block thread.
288+
}
289+
finally
302290
{
303-
case ControlCommand.Pulse:
304-
Pulse();
305-
break;
291+
Interlocked.CompareExchange(ref status, Status.Stopped, Status.Stopping);
306292
}
307-
}
308-
else
309-
{
310-
OnMessage(channel, msg);
311-
}
312-
};
313-
314-
//blocks thread
315-
if (ChannelsMatching != null && ChannelsMatching.Length > 0)
316-
subscription.SubscribeToChannelsMatching(ChannelsMatching);
317-
else
318-
subscription.SubscribeToChannels(Channels);
319-
320-
masterClient = null;
293+
return;
294+
295+
case Operation.Reset:
296+
// ReSharper disable once AccessToDisposedClosure
297+
subscription.UnSubscribeFromAllChannels(); //Un block thread.
298+
return;
299+
}
300+
301+
switch (msgType)
302+
{
303+
case ControlCommand.Pulse:
304+
Pulse();
305+
break;
306+
}
321307
}
322-
}
308+
else
309+
{
310+
OnMessage(channel, msg);
311+
}
312+
};
313+
314+
//blocks thread
315+
if (ChannelsMatching != null && ChannelsMatching.Length > 0)
316+
subscription.SubscribeToChannelsMatching(ChannelsMatching);
317+
else
318+
subscription.SubscribeToChannels(Channels);
319+
320+
masterClient = null;
323321
}
324322

325323
OnStop?.Invoke();
@@ -363,7 +361,7 @@ private void Stop(bool shouldRestart)
363361
if (Log.IsDebugEnabled)
364362
Log.Debug("Stopping RedisPubSubServer...");
365363

366-
//Unblock current bgthread by issuing StopCommand
364+
//Unblock current bg thread by issuing StopCommand
367365
SendControlCommand(Operation.Stop);
368366
}
369367
}
@@ -382,12 +380,10 @@ private void NotifyAllSubscribers(string commandType=null)
382380

383381
try
384382
{
385-
using (var redis = ClientsManager.GetClient())
383+
using var redis = ClientsManager.GetClient();
384+
foreach (var channel in Channels)
386385
{
387-
foreach (var channel in Channels)
388-
{
389-
redis.PublishMessage(channel, msg);
390-
}
386+
redis.PublishMessage(channel, msg);
391387
}
392388
}
393389
catch (Exception ex)
@@ -406,13 +402,11 @@ private void HandleFailover(IRedisClientsManager clientsManager)
406402
if (masterClient != null)
407403
{
408404
//New thread-safe client with same connection info as connected master
409-
using (var currentlySubscribedClient = ((RedisClient)masterClient).CloneClient())
405+
using var currentlySubscribedClient = ((RedisClient)masterClient).CloneClient();
406+
Interlocked.CompareExchange(ref doOperation, Operation.Reset, doOperation);
407+
foreach (var channel in Channels)
410408
{
411-
Interlocked.CompareExchange(ref doOperation, Operation.Reset, doOperation);
412-
foreach (var channel in Channels)
413-
{
414-
currentlySubscribedClient.PublishMessage(channel, ControlCommand.Control);
415-
}
409+
currentlySubscribedClient.PublishMessage(channel, ControlCommand.Control);
416410
}
417411
}
418412
else
@@ -466,12 +460,12 @@ private void KillBgThreadIfExists()
466460
private void SleepBackOffMultiplier(int continuousErrorsCount)
467461
{
468462
if (continuousErrorsCount == 0) return;
469-
const int MaxSleepMs = 60 * 1000;
463+
const int maxSleepMs = 60 * 1000;
470464

471465
//exponential/random retry back-off.
472466
var nextTry = Math.Min(
473467
rand.Next((int)Math.Pow(continuousErrorsCount, 3), (int)Math.Pow(continuousErrorsCount + 1, 3) + 1),
474-
MaxSleepMs);
468+
maxSleepMs);
475469

476470
if (Log.IsDebugEnabled)
477471
Log.Debug("Sleeping for {0}ms after {1} continuous errors".Fmt(nextTry, continuousErrorsCount));

0 commit comments

Comments
 (0)