Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Select an option

  • Save Melchy/86eb4d94a039da562e8529ab475fa8b3 to your computer and use it in GitHub Desktop.

Select an option

Save Melchy/86eb4d94a039da562e8529ab475fa8b3 to your computer and use it in GitHub Desktop.
SkipWhenPreviousJobIsRunningAttribute.cs
using Hangfire.Client;
using Hangfire.Common;
using Hangfire.States;
using Hangfire.Storage;
namespace ContentAdmin.Worker.Infrastructure.Hangfire.JobScheduler.JobResolver;
// This is variation of https://gist.github.com/odinserj/a6ad7ba6686076c9b9b2e03fcf6bf74e
// which handles all edge cases (see comments on the gist)
public sealed class SkipWhenPreviousJobIsRunningAttribute : JobFilterAttribute, IClientFilter, IApplyStateFilter
{
private const string RunningKey = "Running";
private const string RunningJobIdKey = "RunningJobId";
private const string RecurringJobIdKey = "RecurringJobId";
private const string JobIsRunning = "yes";
private const string JobIsNotRunning = "no";
public void OnCreating(
CreatingContext context)
{
// We can't handle old storages
if (context.Connection is not JobStorageConnection connection)
{
throw new NotSupportedException("This version of storage doesn't support extended methods. Please try to update to the latest version.");
}
// We should run this filter only for background jobs based on
// recurring ones
if (!context.Parameters.TryGetValue(RecurringJobIdKey, out var parameter))
{
return;
}
var recurringJobId = parameter as string;
// RecurringJobId is malformed. This should not happen, but anyway.
if (string.IsNullOrWhiteSpace(recurringJobId))
{
return;
}
var running = connection.GetValueFromHash(CreateRecurringJobKey(recurringJobId), RunningKey);
if (JobIsRunning.Equals(running, StringComparison.OrdinalIgnoreCase))
{
var runningJobId = connection.GetValueFromHash(CreateRecurringJobKey(recurringJobId), RunningJobIdKey);
if (IsJobStale(connection, runningJobId))
{
ResetJobStatus(connection, recurringJobId);
}
else
{
context.Canceled = true;
}
}
}
public void OnCreated(
CreatedContext context)
{
}
public void OnStateApplied(
ApplyStateContext context,
IWriteOnlyTransaction transaction)
{
if (context.NewState is EnqueuedState)
{
TrySetRunningFlag(context, JobIsRunning);
}
else if ((context.NewState.IsFinal && !FailedState.StateName.Equals(context.OldStateName, StringComparison.OrdinalIgnoreCase)) ||
context.NewState is FailedState)
{
TrySetRunningFlag(context, JobIsNotRunning);
}
}
public void OnStateUnapplied(
ApplyStateContext context,
IWriteOnlyTransaction transaction)
{
}
private static void TrySetRunningFlag(
ApplyStateContext context,
string state)
{
// We can't handle old storages
if (context.Connection is not JobStorageConnection connection)
{
throw new NotSupportedException("This version of storage doesn't support extended methods. Please try to update to the latest version.");
}
var recurringJobId = context.GetJobParameter<string>(RecurringJobIdKey, allowStale: true);
if (string.IsNullOrWhiteSpace(recurringJobId))
{
return;
}
if (context.Storage.HasFeature(JobStorageFeatures.Transaction.AcquireDistributedLock))
{
// Acquire a lock in newer storages to avoid race conditions
((JobStorageTransaction)context.Transaction).AcquireDistributedLock(
$"lock:recurring-job:{recurringJobId}",
timeout: TimeSpan.FromSeconds(10));
}
// Checking whether a recurring job exists
var recurringJob = connection.GetValueFromHash(CreateRecurringJobKey(recurringJobId), "Job");
if (string.IsNullOrEmpty(recurringJob))
{
return;
}
// Changing the running state
var jobId = JobIsRunning.Equals(state, StringComparison.OrdinalIgnoreCase)
? context.BackgroundJob.Id
: string.Empty;
context.Transaction.SetRangeInHash(
CreateRecurringJobKey(recurringJobId),
[
new KeyValuePair<string, string>(RunningKey, state),
new KeyValuePair<string, string>(RunningJobIdKey, jobId),
]);
}
private static void ResetJobStatus(
JobStorageConnection connection,
string recurringJobId)
{
using var tx = connection.CreateWriteTransaction();
tx.SetRangeInHash(
CreateRecurringJobKey(recurringJobId),
[
new KeyValuePair<string, string>(RunningKey, JobIsNotRunning),
new KeyValuePair<string, string>(RunningJobIdKey, string.Empty),
]);
tx.Commit();
}
private static bool IsJobStale(
JobStorageConnection connection,
string? runningJobId)
{
if (string.IsNullOrWhiteSpace(runningJobId))
{
return true;
}
var jobData = connection.GetJobData(runningJobId);
if (jobData is null || string.IsNullOrEmpty(jobData.State))
{
return true;
}
return string.Equals(jobData.State, SucceededState.StateName, StringComparison.OrdinalIgnoreCase) ||
string.Equals(jobData.State, DeletedState.StateName, StringComparison.OrdinalIgnoreCase) ||
string.Equals(jobData.State, FailedState.StateName, StringComparison.OrdinalIgnoreCase);
}
private static string CreateRecurringJobKey(
string recurringJobId)
{
return $"recurring-job:{recurringJobId}";
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment