Skip to content

Commit 820387b

Browse files
hallihanckittel
andauthored
Update leader election pattern (#120)
* Update Leader Election Example - Simplify example worker to a console app that can be run multiple times locally or in a distributed manner - Update to .NET 8.0 - Minor improvements to BlobDistributedMutex to provide more feedback of status * Update UI and cancellation logic for sample worker * Update README * Add general exception handler so that we retry on cases like network down, etc. * Slight refactor for readability * remove unused variables and add missing step to README * Apply suggestions from code review Co-authored-by: Chad Kittel <[email protected]> * Refactor README to match template * add dotnet-trace and fix typos * Update leader-election/Readme.md --------- Co-authored-by: Chad Kittel <[email protected]>
1 parent 0d6b204 commit 820387b

27 files changed

+250
-1449
lines changed

leader-election/DistributedMutex/BlobDistributedMutex.cs

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,17 @@ namespace DistributedMutex
99

1010
public class BlobDistributedMutex
1111
{
12-
private static readonly TimeSpan RenewInterval = TimeSpan.FromSeconds(45);
13-
private static readonly TimeSpan AcquireAttemptInterval = TimeSpan.FromSeconds(65);
12+
private static readonly TimeSpan RenewInterval = TimeSpan.FromSeconds(10);
13+
private static readonly TimeSpan AcquireAttemptInterval = TimeSpan.FromSeconds(20);
1414
private readonly BlobSettings blobSettings;
1515
private readonly Func<CancellationToken, Task> taskToRunWhenLeaseAcquired;
16+
private readonly Action? onLeaseTimeoutRetry;
1617

17-
public BlobDistributedMutex(BlobSettings blobSettings, Func<CancellationToken, Task> taskToRunWhenLeaseAquired)
18+
public BlobDistributedMutex(BlobSettings blobSettings, Func<CancellationToken, Task> taskToRunWhenLeaseAcquired, Action? onLeaseTimeoutRetry = null)
1819
{
1920
this.blobSettings = blobSettings;
20-
this.taskToRunWhenLeaseAcquired = taskToRunWhenLeaseAquired;
21+
this.taskToRunWhenLeaseAcquired = taskToRunWhenLeaseAcquired;
22+
this.onLeaseTimeoutRetry = onLeaseTimeoutRetry;
2123
}
2224

2325
public async Task RunTaskWhenMutexAcquired(CancellationToken token)
@@ -61,22 +63,22 @@ private async Task RunTaskWhenBlobLeaseAcquired(BlobLeaseManager leaseManager, C
6163
while (!token.IsCancellationRequested)
6264
{
6365
// Try to acquire the blob lease, otherwise wait for some time before we can try again.
64-
string leaseId = await this.TryAcquireLeaseOrWait(leaseManager, token);
66+
string? leaseId = await this.TryAcquireLeaseOrWait(leaseManager, token);
6567

6668
if (!string.IsNullOrEmpty(leaseId))
6769
{
68-
// Create a new linked cancellation token source, so if either the
70+
// Create a new linked cancellation token source, so if either the
6971
// original token is canceled or the lease cannot be renewed,
7072
// then the leader task can be canceled.
71-
using (var leaseCts =
73+
using (var leaseCts =
7274
CancellationTokenSource.CreateLinkedTokenSource(new[] { token }))
7375
{
7476
// Run the leader task.
7577
var leaderTask = this.taskToRunWhenLeaseAcquired.Invoke(leaseCts.Token);
7678

77-
// Keeps renewing the lease in regular intervals.
79+
// Keeps renewing the lease in regular intervals.
7880
// If the lease cannot be renewed, then the task completes.
79-
var renewLeaseTask =
81+
var renewLeaseTask =
8082
this.KeepRenewingLease(leaseManager, leaseId, leaseCts.Token);
8183

8284
// When any task completes (either the leader task or when it could
@@ -87,7 +89,7 @@ private async Task RunTaskWhenBlobLeaseAcquired(BlobLeaseManager leaseManager, C
8789
}
8890
}
8991

90-
private async Task<string> TryAcquireLeaseOrWait(BlobLeaseManager leaseManager, CancellationToken token)
92+
private async Task<string?> TryAcquireLeaseOrWait(BlobLeaseManager leaseManager, CancellationToken token)
9193
{
9294
try
9395
{
@@ -96,7 +98,10 @@ private async Task<string> TryAcquireLeaseOrWait(BlobLeaseManager leaseManager,
9698
{
9799
return leaseId;
98100
}
99-
101+
if(onLeaseTimeoutRetry != null)
102+
{
103+
onLeaseTimeoutRetry();
104+
}
100105
await Task.Delay(AcquireAttemptInterval, token);
101106
return null;
102107
}
@@ -127,7 +132,7 @@ private async Task KeepRenewingLease(BlobLeaseManager leaseManager, string lease
127132

128133
// We delay based on the time from the start of the last renew request to ensure
129134
var renewIntervalAdjusted = RenewInterval - renewOffset.Elapsed;
130-
135+
131136
// If the adjusted interval is greater than zero wait for that long
132137
if (renewIntervalAdjusted > TimeSpan.Zero)
133138
{

leader-election/DistributedMutex/BlobLeaseManager.cs

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,12 @@ public struct BlobSettings
1717
public readonly string BlobName;
1818
public BlobServiceClient BlobServiceClient;
1919

20-
2120
public BlobSettings(String storageConnStr, string container, string blobName)
2221
{
2322
var blobClientOptions = new BlobClientOptions();
2423
blobClientOptions.Retry.Delay = TimeSpan.FromSeconds(5);
2524
blobClientOptions.Retry.MaxRetries = 3;
26-
25+
2726
this.BlobServiceClient = new BlobServiceClient(storageConnStr, blobClientOptions);
2827
this.Container = container;
2928
this.BlobName = blobName;
@@ -47,7 +46,6 @@ public BlobLeaseManager(BlobServiceClient blobServiceClient, string leaseContain
4746
{
4847
this.leaseContainerClient = blobServiceClient.GetBlobContainerClient(leaseContainerName);
4948
this.leaseBlobClient = this.leaseContainerClient.GetPageBlobClient(leaseBlobName);
50-
5149
}
5250

5351
public void ReleaseLease(string leaseId)
@@ -57,23 +55,23 @@ public void ReleaseLease(string leaseId)
5755
var leaseClient = this.leaseBlobClient.GetBlobLeaseClient(leaseId);
5856
leaseClient.Release();
5957
}
60-
catch (RequestFailedException e)
58+
catch (RequestFailedException e)
6159
{
6260
// Lease will eventually be released.
6361
Trace.TraceError(e.ErrorCode);
6462
}
6563
}
6664

67-
public async Task<string> AcquireLeaseAsync(CancellationToken token)
65+
public async Task<string?> AcquireLeaseAsync(CancellationToken token)
6866
{
6967
bool blobNotFound = false;
7068
try
7169
{
7270
var leaseClient = this.leaseBlobClient.GetBlobLeaseClient();
73-
var lease = await leaseClient.AcquireAsync(TimeSpan.FromSeconds(60), null, token);
71+
var lease = await leaseClient.AcquireAsync(TimeSpan.FromSeconds(15), null, token);
7472
return lease.Value.LeaseId;
7573
}
76-
catch (RequestFailedException storageException)
74+
catch (RequestFailedException storageException)
7775
{
7876
Trace.TraceError(storageException.ErrorCode);
7977

@@ -88,6 +86,12 @@ public async Task<string> AcquireLeaseAsync(CancellationToken token)
8886
return null;
8987
}
9088
}
89+
catch (Exception e)
90+
{
91+
// If the storage account is unavailable or we fail for any other reason we still want to keep retrying
92+
Trace.TraceError(e.Message);
93+
return null;
94+
}
9195

9296
if (blobNotFound)
9397
{
Lines changed: 9 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -1,90 +1,13 @@
1-
<?xml version="1.0" encoding="utf-8"?>
2-
<Project ToolsVersion="12.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
3-
<Import Project="$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props" Condition="Exists('$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props')" />
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
43
<PropertyGroup>
5-
<Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
6-
<Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
7-
<ProjectGuid>{9C695186-E886-422E-B2E1-AE6AA83E5B4B}</ProjectGuid>
8-
<OutputType>Library</OutputType>
9-
<AppDesignerFolder>Properties</AppDesignerFolder>
10-
<RootNamespace>DistributedMutex</RootNamespace>
11-
<AssemblyName>DistributedMutex</AssemblyName>
12-
<TargetFrameworkVersion>v4.6.1</TargetFrameworkVersion>
13-
<FileAlignment>512</FileAlignment>
14-
<SolutionDir Condition="$(SolutionDir) == '' Or $(SolutionDir) == '*Undefined*'">..\</SolutionDir>
15-
<RestorePackages>true</RestorePackages>
16-
<TargetFrameworkProfile />
4+
<TargetFramework>net8.0</TargetFramework>
5+
<ImplicitUsings>enable</ImplicitUsings>
6+
<Nullable>enable</Nullable>
177
</PropertyGroup>
18-
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
19-
<DebugSymbols>true</DebugSymbols>
20-
<DebugType>full</DebugType>
21-
<Optimize>false</Optimize>
22-
<OutputPath>bin\Debug\</OutputPath>
23-
<DefineConstants>DEBUG;TRACE</DefineConstants>
24-
<ErrorReport>prompt</ErrorReport>
25-
<WarningLevel>4</WarningLevel>
26-
</PropertyGroup>
27-
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
28-
<DebugType>pdbonly</DebugType>
29-
<Optimize>true</Optimize>
30-
<OutputPath>bin\Release\</OutputPath>
31-
<DefineConstants>TRACE</DefineConstants>
32-
<ErrorReport>prompt</ErrorReport>
33-
<WarningLevel>4</WarningLevel>
34-
</PropertyGroup>
35-
<ItemGroup>
36-
<Reference Include="Azure.Core, Version=1.0.2.0, Culture=neutral, PublicKeyToken=92742159e12e44c8, processorArchitecture=MSIL">
37-
<HintPath>..\packages\Azure.Core.1.0.2\lib\netstandard2.0\Azure.Core.dll</HintPath>
38-
</Reference>
39-
<Reference Include="Azure.Storage.Blobs, Version=12.3.0.0, Culture=neutral, PublicKeyToken=92742159e12e44c8, processorArchitecture=MSIL">
40-
<HintPath>..\packages\Azure.Storage.Blobs.12.3.0\lib\netstandard2.0\Azure.Storage.Blobs.dll</HintPath>
41-
</Reference>
42-
<Reference Include="Azure.Storage.Common, Version=12.2.0.0, Culture=neutral, PublicKeyToken=92742159e12e44c8, processorArchitecture=MSIL">
43-
<HintPath>..\packages\Azure.Storage.Common.12.2.0\lib\netstandard2.0\Azure.Storage.Common.dll</HintPath>
44-
</Reference>
45-
<Reference Include="Microsoft.Bcl.AsyncInterfaces, Version=1.0.0.0, Culture=neutral, PublicKeyToken=cc7b13ffcd2ddd51, processorArchitecture=MSIL">
46-
<HintPath>..\packages\Microsoft.Bcl.AsyncInterfaces.1.0.0\lib\net461\Microsoft.Bcl.AsyncInterfaces.dll</HintPath>
47-
</Reference>
48-
<Reference Include="System" />
49-
<Reference Include="System.Buffers, Version=4.0.3.0, Culture=neutral, PublicKeyToken=cc7b13ffcd2ddd51, processorArchitecture=MSIL">
50-
<HintPath>..\packages\System.Buffers.4.5.0\lib\netstandard2.0\System.Buffers.dll</HintPath>
51-
</Reference>
52-
<Reference Include="System.Core" />
53-
<Reference Include="System.Diagnostics.DiagnosticSource, Version=4.0.4.0, Culture=neutral, PublicKeyToken=cc7b13ffcd2ddd51, processorArchitecture=MSIL">
54-
<HintPath>..\packages\System.Diagnostics.DiagnosticSource.4.6.0\lib\net46\System.Diagnostics.DiagnosticSource.dll</HintPath>
55-
</Reference>
56-
<Reference Include="System.Memory, Version=4.0.1.1, Culture=neutral, PublicKeyToken=cc7b13ffcd2ddd51, processorArchitecture=MSIL">
57-
<HintPath>..\packages\System.Memory.4.5.3\lib\netstandard2.0\System.Memory.dll</HintPath>
58-
</Reference>
59-
<Reference Include="System.Numerics" />
60-
<Reference Include="System.Numerics.Vectors, Version=4.1.4.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a, processorArchitecture=MSIL">
61-
<HintPath>..\packages\System.Numerics.Vectors.4.5.0\lib\net46\System.Numerics.Vectors.dll</HintPath>
62-
</Reference>
63-
<Reference Include="System.Runtime.CompilerServices.Unsafe, Version=4.0.4.1, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a, processorArchitecture=MSIL">
64-
<HintPath>..\packages\System.Runtime.CompilerServices.Unsafe.4.5.2\lib\netstandard2.0\System.Runtime.CompilerServices.Unsafe.dll</HintPath>
65-
</Reference>
66-
<Reference Include="System.Threading.Tasks.Extensions, Version=4.2.0.0, Culture=neutral, PublicKeyToken=cc7b13ffcd2ddd51, processorArchitecture=MSIL">
67-
<HintPath>..\packages\System.Threading.Tasks.Extensions.4.5.2\lib\netstandard2.0\System.Threading.Tasks.Extensions.dll</HintPath>
68-
</Reference>
69-
</ItemGroup>
70-
<ItemGroup>
71-
<Compile Include="BlobDistributedMutex.cs" />
72-
<Compile Include="BlobLeaseManager.cs" />
73-
</ItemGroup>
74-
<ItemGroup>
75-
<Folder Include="Properties\" />
76-
</ItemGroup>
8+
779
<ItemGroup>
78-
<None Include="app.config" />
79-
<None Include="packages.config" />
10+
<PackageReference Include="Azure.Storage.Blobs" Version="12.19.1" />
8011
</ItemGroup>
81-
<Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
82-
<Import Project="$(SolutionDir)\.nuget\NuGet.targets" Condition="Exists('$(SolutionDir)\.nuget\NuGet.targets')" />
83-
<!-- To modify your build process, add your task inside one of the targets below and uncomment it.
84-
Other similar extension points exist, see Microsoft.Common.targets.
85-
<Target Name="BeforeBuild">
86-
</Target>
87-
<Target Name="AfterBuild">
88-
</Target>
89-
-->
90-
</Project>
12+
13+
</Project>

0 commit comments

Comments
 (0)