Skip to content

Commit c06d0f6

Browse files
committed
Add job option to disable/enable jobs. Optimize job run start. Enable running distributed jobs manually.
1 parent fc9ec72 commit c06d0f6

File tree

9 files changed

+106
-132
lines changed

9 files changed

+106
-132
lines changed

samples/Foundatio.AppHost/Foundatio.AppHost.csproj

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,18 @@
11
<Project Sdk="Microsoft.NET.Sdk">
22

3-
<Sdk Name="Aspire.AppHost.Sdk" Version="9.0.0-rc.1.24511.1" />
3+
<Sdk Name="Aspire.AppHost.Sdk" Version="9.3.0" />
44

55
<PropertyGroup>
66
<OutputType>Exe</OutputType>
77
<TargetFramework>net8.0</TargetFramework>
88
<ImplicitUsings>enable</ImplicitUsings>
99
<Nullable>enable</Nullable>
10-
<IsAspireHost>true</IsAspireHost>
1110
<UserSecretsId>96cfcbc9-36fb-452f-9b99-0165197e1978</UserSecretsId>
12-
<IsPackable>False</IsPackable>
11+
<IsPackable>False</IsPackable>
1312
</PropertyGroup>
1413

1514
<ItemGroup>
16-
<PackageReference Include="Aspire.Hosting.AppHost" Version="9.1.0" />
15+
<PackageReference Include="Aspire.Hosting.AppHost" Version="9.3.0" />
1716
</ItemGroup>
1817

1918
<ItemGroup>
Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
11
<Project Sdk="Microsoft.NET.Sdk.Web">
2-
2+
33
<PropertyGroup>
44
<TargetFramework>net8.0</TargetFramework>
55
<IsPackable>False</IsPackable>
66
</PropertyGroup>
7-
7+
88
<ItemGroup>
99
<ProjectReference Include="..\..\src\Foundatio.Extensions.Hosting\Foundatio.Extensions.Hosting.csproj" />
1010
<ProjectReference Include="..\..\src\Foundatio\Foundatio.csproj" />
1111
</ItemGroup>
12-
12+
1313
<ItemGroup>
1414
<PackageReference Include="Serilog.AspNetCore" Version="9.0.0" />
1515
<PackageReference Include="Serilog.Settings.Configuration" Version="9.0.0" />
@@ -20,5 +20,5 @@
2020
<PackageReference Include="OpenTelemetry.Instrumentation.Http" Version="1.12.0" />
2121
<PackageReference Include="OpenTelemetry.Instrumentation.Runtime" Version="1.12.0" />
2222
</ItemGroup>
23-
23+
2424
</Project>

samples/Foundatio.HostingSample/Program.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
using System;
22
using System.Diagnostics;
33
using System.Linq;
4+
using System.Text.Json;
45
using System.Threading;
56
using System.Threading.Tasks;
67
using Foundatio.Caching;
@@ -31,6 +32,7 @@
3132
var builder = WebApplication.CreateBuilder(args);
3233

3334
builder.AddServiceDefaults();
35+
builder.Services.ConfigureHttpJsonOptions(o => { o.SerializerOptions.WriteIndented = true; });
3436

3537
builder.Services.AddSerilog();
3638

@@ -128,7 +130,7 @@
128130

129131
app.MapGet("/runjob", async httpContext =>
130132
{
131-
var jobManager = httpContext.RequestServices.GetRequiredService<JobManager>();
133+
var jobManager = httpContext.RequestServices.GetRequiredService<IJobManager>();
132134
await jobManager.RunJobAsync("EvenMinutes");
133135
await jobManager.RunJobAsync<EveryMinuteJob>();
134136
});

samples/Foundatio.HostingSample/Startup/MyStartupAction.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,11 @@ public async Task RunAsync(CancellationToken cancellationToken = default)
3232
await Task.Delay(500);
3333
}
3434

35-
_jobManager.AddOrUpdate("MyJob", "* * * * *", async () =>
35+
_jobManager.AddOrUpdate("MyJob", j => j.CronSchedule("* * * * *").JobAction(async () =>
3636
{
3737
_logger.LogInformation("Running MyJob");
3838
await Task.Delay(1000);
3939
_logger.LogInformation("MyJob Complete");
40-
});
40+
}));
4141
}
4242
}

src/Foundatio.Extensions.Hosting/Jobs/JobManager.cs

Lines changed: 6 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,8 @@ namespace Foundatio.Extensions.Hosting.Jobs;
1313

1414
public interface IJobManager
1515
{
16-
void AddOrUpdate<TJob>(string cronSchedule, Action<ScheduledJobOptionsBuilder> configure = null) where TJob : class, IJob;
17-
void AddOrUpdate(string jobName, string cronSchedule, Action<ScheduledJobOptionsBuilder> configure = null);
18-
void AddOrUpdate(string jobName, string cronSchedule, Func<IServiceProvider, CancellationToken, Task> action, Action<ScheduledJobOptionsBuilder> configure = null);
19-
void AddOrUpdate(string jobName, string cronSchedule, Func<CancellationToken, Task> action, Action<ScheduledJobOptionsBuilder> configure = null);
20-
void AddOrUpdate(string jobName, string cronSchedule, Func<Task> action, Action<ScheduledJobOptionsBuilder> configure = null);
21-
void AddOrUpdate(string jobName, string cronSchedule, Action<IServiceProvider, CancellationToken> action, Action<ScheduledJobOptionsBuilder> configure = null);
22-
void AddOrUpdate(string jobName, string cronSchedule, Action<CancellationToken> action, Action<ScheduledJobOptionsBuilder> configure = null);
23-
void AddOrUpdate(string jobName, string cronSchedule, Action action, Action<ScheduledJobOptionsBuilder> configure = null);
16+
void AddOrUpdate<TJob>(Action<ScheduledJobOptionsBuilder> configure = null) where TJob : class, IJob;
17+
void AddOrUpdate(string jobName, Action<ScheduledJobOptionsBuilder> configure = null);
2418
void Remove<TJob>() where TJob : class, IJob;
2519
void Remove(string jobName);
2620
JobStatus[] GetJobStatus();
@@ -50,7 +44,7 @@ public JobManager(IServiceProvider serviceProvider, ILoggerFactory loggerFactory
5044
throw new ArgumentException("A distributed cache client is required to run distributed jobs.");
5145
}
5246

53-
public void AddOrUpdate<TJob>(string cronSchedule, Action<ScheduledJobOptionsBuilder> configure = null) where TJob : class, IJob
47+
public void AddOrUpdate<TJob>(Action<ScheduledJobOptionsBuilder> configure = null) where TJob : class, IJob
5448
{
5549
string jobName = JobOptions.GetDefaultJobName(typeof(TJob));
5650
lock (_lock)
@@ -60,8 +54,7 @@ public void AddOrUpdate<TJob>(string cronSchedule, Action<ScheduledJobOptionsBui
6054
{
6155
var options = new ScheduledJobOptions
6256
{
63-
CronSchedule = cronSchedule,
64-
Name = jobName,
57+
Name = jobName,
6558
JobFactory = sp => sp.GetRequiredService<TJob>()
6659
};
6760
var builder = new ScheduledJobOptionsBuilder(options);
@@ -72,14 +65,13 @@ public void AddOrUpdate<TJob>(string cronSchedule, Action<ScheduledJobOptionsBui
7265
else
7366
{
7467
var builder = new ScheduledJobOptionsBuilder(job.Options);
75-
builder.CronSchedule(cronSchedule);
7668
configure?.Invoke(builder);
7769
job.Schedule = job.Options.CronSchedule;
7870
}
7971
}
8072
}
8173

82-
public void AddOrUpdate(string jobName, string cronSchedule, Action<ScheduledJobOptionsBuilder> configure = null)
74+
public void AddOrUpdate(string jobName, Action<ScheduledJobOptionsBuilder> configure = null)
8375
{
8476
lock (_lock)
8577
{
@@ -88,67 +80,22 @@ public void AddOrUpdate(string jobName, string cronSchedule, Action<ScheduledJob
8880
{
8981
var options = new ScheduledJobOptions
9082
{
91-
CronSchedule = cronSchedule,
92-
Name = jobName
83+
Name = jobName,
9384
};
9485
var builder = new ScheduledJobOptionsBuilder(options);
9586
configure?.Invoke(builder);
96-
options.JobFactory = options.JobFactory;
9787
_jobs.Add(new ScheduledJobRunner(options, _serviceProvider, _cacheClient, _loggerFactory));
9888
_jobsArray = _jobs.ToArray();
9989
}
10090
else
10191
{
10292
var builder = new ScheduledJobOptionsBuilder(job.Options);
103-
builder.CronSchedule(cronSchedule);
10493
configure?.Invoke(builder);
10594
job.Schedule = job.Options.CronSchedule;
10695
}
10796
}
10897
}
10998

110-
public void AddOrUpdate(string jobName, string cronSchedule, Func<IServiceProvider, CancellationToken, Task> action, Action<ScheduledJobOptionsBuilder> configure = null)
111-
{
112-
AddOrUpdate(jobName, cronSchedule, b => b.JobFactory(sp => new DynamicJob(sp, action)));
113-
}
114-
115-
public void AddOrUpdate(string jobName, string cronSchedule, Func<CancellationToken, Task> action, Action<ScheduledJobOptionsBuilder> configure = null)
116-
{
117-
AddOrUpdate(jobName, cronSchedule, (_, ct) => action(ct), configure);
118-
}
119-
120-
public void AddOrUpdate(string jobName, string cronSchedule, Func<Task> action, Action<ScheduledJobOptionsBuilder> configure = null)
121-
{
122-
AddOrUpdate(jobName, cronSchedule, (_, _) => action(), configure);
123-
}
124-
125-
public void AddOrUpdate(string jobName, string cronSchedule, Action<IServiceProvider, CancellationToken> action, Action<ScheduledJobOptionsBuilder> configure = null)
126-
{
127-
AddOrUpdate(jobName, cronSchedule, (sp, ct) =>
128-
{
129-
action(sp, ct);
130-
return Task.CompletedTask;
131-
}, configure);
132-
}
133-
134-
public void AddOrUpdate(string jobName, string cronSchedule, Action<CancellationToken> action, Action<ScheduledJobOptionsBuilder> configure = null)
135-
{
136-
AddOrUpdate(jobName, cronSchedule, (_, ct) =>
137-
{
138-
action(ct);
139-
return Task.CompletedTask;
140-
}, configure);
141-
}
142-
143-
public void AddOrUpdate(string jobName, string cronSchedule, Action action, Action<ScheduledJobOptionsBuilder> configure = null)
144-
{
145-
AddOrUpdate(jobName, cronSchedule, (_, _) =>
146-
{
147-
action();
148-
return Task.CompletedTask;
149-
}, configure);
150-
}
151-
15299
public void Remove<TJob>() where TJob : class, IJob
153100
{
154101
string jobName = JobOptions.GetDefaultJobName(typeof(TJob));

src/Foundatio.Extensions.Hosting/Jobs/ScheduledJobOptions.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,5 @@ public class ScheduledJobOptions
1212
public string CronSchedule { get; set; }
1313
public TimeZoneInfo CronTimeZone { get; set; }
1414
public bool IsDistributed { get; set; }
15+
public bool IsEnabled { get; set; } = true;
1516
}

src/Foundatio.Extensions.Hosting/Jobs/ScheduledJobOptionsBuilder.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,4 +99,16 @@ public ScheduledJobOptionsBuilder Distributed(bool value = true)
9999
Target.IsDistributed = value;
100100
return this;
101101
}
102+
103+
public ScheduledJobOptionsBuilder Enabled(bool value = true)
104+
{
105+
Target.IsEnabled = value;
106+
return this;
107+
}
108+
109+
public ScheduledJobOptionsBuilder Disabled()
110+
{
111+
Target.IsEnabled = false;
112+
return this;
113+
}
102114
}

src/Foundatio.Extensions.Hosting/Jobs/ScheduledJobRunner.cs

Lines changed: 39 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
using Foundatio.Extensions.Hosting.Cronos;
66
using Foundatio.Jobs;
77
using Foundatio.Lock;
8+
using Foundatio.Messaging;
89
using Foundatio.Utility;
910
using Microsoft.Extensions.DependencyInjection;
1011
using Microsoft.Extensions.Logging;
@@ -22,14 +23,12 @@ internal class ScheduledJobRunner
2223
private readonly ILockProvider _lockProvider;
2324
private readonly ILogger _logger;
2425
private readonly DateTime _baseDate = new(2010, 1, 1);
25-
private DateTime _lastStatusUpdate = DateTime.MinValue;
26-
private string _cacheKey;
2726

2827
public ScheduledJobRunner(ScheduledJobOptions jobOptions, IServiceProvider serviceProvider, ICacheClient cacheClient, ILoggerFactory loggerFactory = null)
2928
{
3029
_jobOptions = jobOptions;
3130
_jobOptions.Name ??= Guid.NewGuid().ToString("N").Substring(0, 10);
32-
_cacheKey = _jobOptions.Name.ToLower().Replace(' ', '_');
31+
CacheKey = _jobOptions.Name.ToLower().Replace(' ', '_');
3332
_serviceProvider = serviceProvider;
3433
_timeProvider = serviceProvider.GetService<TimeProvider>() ?? TimeProvider.System;
3534
_cacheClient = new ScopedCacheClient(cacheClient, "jobs");
@@ -39,17 +38,8 @@ public ScheduledJobRunner(ScheduledJobOptions jobOptions, IServiceProvider servi
3938
if (_cronSchedule == null)
4039
throw new ArgumentException("Could not parse schedule", nameof(ScheduledJobOptions.CronSchedule));
4140

42-
var interval = TimeSpan.FromDays(1);
43-
44-
var nextOccurrence = _cronSchedule.GetNextOccurrence(_timeProvider.GetUtcNow().UtcDateTime, _jobOptions.CronTimeZone ?? TimeZoneInfo.Local);
45-
if (nextOccurrence.HasValue)
46-
{
47-
var nextNextOccurrence = _cronSchedule.GetNextOccurrence(nextOccurrence.Value);
48-
if (nextNextOccurrence.HasValue)
49-
interval = nextNextOccurrence.Value.Subtract(nextOccurrence.Value);
50-
}
51-
52-
_lockProvider = new ThrottlingLockProvider(_cacheClient, 1, interval.Add(interval));
41+
var messageBus = serviceProvider.GetService<IMessageBus>() ?? new InMemoryMessageBus();
42+
_lockProvider = new CacheLockProvider(cacheClient, messageBus, loggerFactory);
5343

5444
NextRun = _cronSchedule.GetNextOccurrence(_timeProvider.GetUtcNow().UtcDateTime, _jobOptions.CronTimeZone ?? TimeZoneInfo.Local);
5545
}
@@ -68,40 +58,29 @@ public string Schedule
6858
}
6959
}
7060

71-
public DateTime? LastRun { get; private set; }
72-
public DateTime? LastSuccess { get; private set; }
73-
public string LastErrorMessage { get; private set; }
74-
public DateTime? NextRun { get; private set; }
75-
public Task RunTask { get; private set; }
61+
internal string CacheKey { get; private set; }
62+
63+
private DateTime? _lastRun;
7664

77-
public async ValueTask<bool> ShouldRunAsync()
65+
public DateTime? LastRun
7866
{
79-
if (_timeProvider.GetUtcNow().UtcDateTime.Subtract(_lastStatusUpdate).TotalSeconds > 15)
67+
get => _lastRun;
68+
internal set
8069
{
81-
try
82-
{
83-
var lastRun = await _cacheClient.GetAsync<DateTime>("lastrun:" + Options.Name).AnyContext();
84-
if (lastRun.HasValue)
85-
{
86-
LastRun = lastRun.Value;
87-
NextRun = _cronSchedule.GetNextOccurrence(LastRun.Value, _jobOptions.CronTimeZone ?? TimeZoneInfo.Local);
88-
}
89-
90-
var lastSuccess = await _cacheClient.GetAsync<DateTime>("lastsuccess:" + Options.Name).AnyContext();
91-
if (lastSuccess.HasValue)
92-
LastSuccess = lastSuccess.Value;
70+
_lastRun = value;
71+
NextRun = _cronSchedule.GetNextOccurrence(LastRun.Value, _jobOptions.CronTimeZone ?? TimeZoneInfo.Local);
72+
}
73+
}
9374

94-
var lastError = await _cacheClient.GetAsync<string>("lasterror:" + Options.Name).AnyContext();
95-
if (lastError.HasValue)
96-
LastErrorMessage = lastError.Value;
75+
public DateTime? LastSuccess { get; internal set; }
76+
public string LastErrorMessage { get; internal set; }
77+
public DateTime? NextRun { get; private set; }
78+
public Task RunTask { get; private set; }
9779

98-
_lastStatusUpdate = _timeProvider.GetUtcNow().UtcDateTime;
99-
}
100-
catch (Exception ex)
101-
{
102-
_logger.LogError(ex, "Error getting job ({JobName}) status", Options.Name);
103-
}
104-
}
80+
internal bool ShouldRun()
81+
{
82+
if (!Options.IsEnabled)
83+
return false;
10584

10685
if (!NextRun.HasValue)
10786
return false;
@@ -134,6 +113,8 @@ public async Task StartAsync(CancellationToken cancellationToken = default)
134113

135114
if (l == null)
136115
{
116+
_logger.LogInformation("Job ({JobName}) is already running, skipping", Options.Name);
117+
137118
try
138119
{
139120
// if we didn't get the lock, update the last run time
@@ -158,10 +139,10 @@ public async Task StartAsync(CancellationToken cancellationToken = default)
158139
}
159140
}
160141

161-
await using (l)
142+
// start running the job in a thread
143+
RunTask = Task.Factory.StartNew(async () =>
162144
{
163-
// start running the job in a thread
164-
RunTask = Task.Factory.StartNew(async () =>
145+
await using (l)
165146
{
166147
try
167148
{
@@ -211,25 +192,25 @@ public async Task StartAsync(CancellationToken cancellationToken = default)
211192
// ignored
212193
}
213194
}
214-
}, cancellationToken).Unwrap();
215-
216-
LastRun = _timeProvider.GetUtcNow().UtcDateTime;
217-
try
218-
{
219-
await _cacheClient.SetAsync("lastrun:" + Options.Name, LastRun.Value).AnyContext();
220-
}
221-
catch
222-
{
223-
// ignored
224195
}
225-
NextRun = _cronSchedule.GetNextOccurrence(LastRun.Value, _jobOptions.CronTimeZone ?? TimeZoneInfo.Local);
196+
}, cancellationToken).Unwrap();
197+
198+
LastRun = _timeProvider.GetUtcNow().UtcDateTime;
199+
try
200+
{
201+
await _cacheClient.SetAsync("lastrun:" + Options.Name, LastRun.Value).AnyContext();
202+
}
203+
catch
204+
{
205+
// ignored
226206
}
207+
NextRun = _cronSchedule.GetNextOccurrence(LastRun.Value, _jobOptions.CronTimeZone ?? TimeZoneInfo.Local);
227208
}
228209

229210
private string GetLockKey(DateTime date)
230211
{
231212
long minute = (long)date.Subtract(_baseDate).TotalMinutes;
232213

233-
return _cacheKey + ":" + minute;
214+
return CacheKey + ":" + minute;
234215
}
235216
}

0 commit comments

Comments
 (0)