Hangfire Redis 实现秒级定时任务、使用 CQRS 实现动态执行代码

360影视 动漫周边 2025-04-19 09:10 2

摘要:在微服务架构中中,定时任务是最常用的基础设施组件之一,社区中有很多定时任务类库或平台,例如 Quartz.NET、xxx-job,使用方法差异很大,比如 xxx-job 的核心是 http 请求,配置定时任务实现 http 请求具体的接口,不过用起来还是比较复

目录

定时任务需求

核心逻辑

使用 Redis 实现秒级定时任务

业务服务实现动态代码

最后

本文示例项目仓库:https://github.com/whuanle/HangfireDemo

主要有两个核心需求:

• 需要实现秒级定时任务;

• 开发者使用定时任务要简单,不要弄复杂了;

在微服务架构中中,定时任务是最常用的基础设施组件之一,社区中有很多定时任务类库或平台,例如 Quartz.NET、xxx-job,使用方法差异很大,比如 xxx-job 的核心是 http 请求,配置定时任务实现 http 请求具体的接口,不过用起来还是比较复杂的。

在微服务中,使用的组件太多了,如果每个组件的集成都搞得很麻烦,那么服务的代码很可能会大量膨胀,并且容易出现各种 bug。以 xxx-job 为例,如果项目中有 N 个定时任务,设计 N 个 http 接口被 xxx-job 回调触发,除了 http 接口数量庞大,在各个环节中还容易出现 bug。

在近期项目需求中,刚好要用到定时任务,结合 C# 语言的特性,笔者的方法是利用 Hangfire 框架和语言特性,封装一些方法,使得开发者可以无感使用定时任务,大大简化链路和使用难度。

使用示例,结合 MediatR 框架定义 CQRS ,该 Command 将会被定时任务触发执行:

public classMyTestRequest:HangfireRequest,IRequest
{
}

///
/// 要被定时任务执行的代码.
///
publicclassMyTestHandler:IRequestHandler
{
publicasyncTaskHandle(MyTestRequest request,CancellationToken cancellationToken)
{
// 逻辑

returnnewExecteTasResult
{
CancelTask=false
};
}
}

要启动一个定时任务,只需要:

private readonlySendHangfireService _hangfireService;

publicSendTaskController(SendHangfireService hangfireService)
{
_hangfireService = hangfireService;
}

[HttpGet("aaa")]
publicasyncTaskstring>SendAsync
{
await _hangfireService.Send(newMyTestRequest
{
CreateTime=DateTimeOffset.Now,
CronExpression="* * * * * *"
TaskId=Guid.NewGuid.ToString,
});

return"aaa";
}

通过这种方式使用定时任务,开发者只需要使用很简单的代码即可完成需求,不需要关注细节,也不需要定义各种 http 接口,并且犹豫不需要关注使用的外部定时任务框架,所以随时可以切换不同的定时任务实现。

image-20250418084329362HangfireServer 是定时任务服务实现,HangfireServer 服务只需要暴露两个接口addtaskcancel,分别用于添加定时任务和取消定时任务,无论什么业务的服务,都通过addtask服务添加。DemoApi 则是业务服务,业务服务只需要暴露一个·execute接口用于触发定时任务即可。

基础逻辑如下:

Hangfire

DemoApi

序列化参数Command

添加定时任务

请求

Hangfire:存储任务

addtask

Hangfire:执行任务

发起请求

发送定时任务

定义 Command

DemoApi:执行 Command

DemoApi:execute 接口

由于项目中使用的是 MediatR 框架实现 CQRS 模式,因此很容易实现定时任务动态调用代码,只需要按照平时的 CQRS 发送定时任务命令,指定定时任务要执行的 Command 即可。

例如,有以下 Command 需要被定时任务执行:

ACommand
BCommand
CCommand

首先这些命令会被序列化为 json ,发送到 HangfireServer 服务,HangfireServer 在恰当时机将参数原封不动推送到 DemoApi 服务,DemoApi 服务拿到这些参数序列化为对应的类型,然后通过 MediatR 发送命令,即可实现任意命令的定时任务动态调用。

下面来分别实现 HangfireServer 、DemoApi 服务。

在 Shred 项目中添加以下文件。

image-20250418093956062

其中 TaskRequest 内容如下,其它文件请参考示例项目。

public classTaskRequest
{
///
/// 任务 id.
///
publicstringTaskId{get;set;}="";

///
/// 定时任务要请求的服务地址或服务名称.
///
publicstringServiceName{get;set;}="";

///
/// 参数类型名称.
///
publicstringCommandType{get;set;}="";

///
/// 请求参数内容,json 序列化后的字符串.
///
publicstringCommandBody{get;set;}="";

///
/// Cron 表达式.
///
publicstringCronExpression{get;set;}="";

///
/// 创建时间.
///
publicstringCreateTime{get;set;}="";
}

Hangfire 本身配置比较复杂,其分布式实现对数据库性能要求比较高,因此使用 Mysql、Sqlserver 等数据库存储数据会带了很大的压力,而且要求实现秒级定时任务,NoSql 数据库可以更加好地实现这一需求,笔者这里使用 Redis 来存储任务数据。

HangfireServer 项目结构如下:

image-20250418094109409

对 HangfireServer 的设计主要分为几步:

• Hangfire 支持容器管理;

• 配置 Hangfire ;

• 定义 RecurringJobHandler 执行任务发起 http 请求到业务系统;

• 定义 http 接口,接收定时任务;

引入类库:

PackageReferenceInclude="Hangfire.AspNetCore"Version="1.8.18"/>
PackageReferenceInclude="Hangfire.Redis.StackExchange"Version="1.12.0"/>

首先是关于 Hangfire 本身的配置,现在几乎都是基于依赖注入的设计,不搞静态类型,所以我们需要实现定时任务执行器创建服务实例的,以便每次定时任务请求时,服务实例都是在一个新的容器,处以一个新的上下文中。

第一步

创建 HangfireJobActivatorScope、HangfireActivator 两个文件,实现 Hangfire 支持容器上下文。

///
/// 任务容器.
///
publicclassHangfireJobActivatorScope:JobActivatorScope
{
privatereadonlyIServiceScope _serviceScope;
privatereadonlystring_jobId;

///
/// Initializes a new instance of the class.
///
///


///


publicHangfireJobActivatorScope([Not]IServiceScope serviceScope,stringjobId)
{
_serviceScope = serviceScope ??thrownewArgumentException(nameof(serviceScope));
_jobId = jobId;
}

///
publicoverrideobjectResolve(Type type)
{
var res =ActivatorUtilities.GetServiceOrCreateInstance(_serviceScope.ServiceProvider, type);
return res;
}

///
publicoverridevoidDisposeScope
{
_serviceScope.Dispose;
}
}///
/// JobActivator.
///
publicclassHangfireActivator:JobActivator
{
privatereadonlyIServiceScopeFactory _serviceScopeFactory;

///
/// Initializes a new instance of the class.
///
///


publicHangfireActivator(IServiceScopeFactory serviceScopeFactory)
{
_serviceScopeFactory = serviceScopeFactory ??thrownewArgumentException(nameof(serviceScopeFactory));
}

///
publicoverrideJobActivatorScopeBeginScope(JobActivatorContext context)
{
returnnewHangfireJobActivatorScope(_serviceScopeFactory.CreateScope, context.BackgroundJob.Id);
}
}第二步

配置 Hangfire 服务,使其支持 Redis,并且配置一些参数。

private voidConfigureHangfire(IServiceCollection services)
{
var options =
newRedisStorageOptions
{
// 配置 redis 前缀,每个任务实例都会创建一个 key
Prefix="aaa:aaa:hangfire"
};

services.AddHangfire(
config =>
{
config.UseRedisStorage("{redis连接字符串}", options)
.SetDataCompatibilityLevel(CompatibilityLevel.Version_180)
.UseSimpleAssemblyNameTypeSerializer
.UseRecommendedSerializerSettings;
config.UseActivator(newHangfireActivator(services.BuildServiceProvider.GetRequiredService));
});

services.AddHangfireServer(options =>
{
// 注意,这里必须设置非常小的间隔
options.SchedulePollingInterval=TimeSpan.FromSeconds(1);

// 如果考虑到后续任务比较多,则需要调大此参数
options.WorkerCount=50;
});
}1744940846417第三步

实现 RecurringJobHandler 执行定时任务,发起 http 请求业务系统。

被调用方要返回 TaskInterfaceResponse 类型,主要考虑如果被调用方后续不需要在继续此定时任务,那么返回参数CancelTask = tre时,定时任务服务直接取消后续的任务即可,不需要被调用方手动调用接口取消。public classRecurringJobHandler
{
privatereadonlyIServiceProvider _serviceProvider;
publicRecurringJobHandler(IServiceProvider serviceProvider)
{
_serviceProvider = serviceProvider;
}

///
/// 执行任务.
///
///


/// Task.
publicasyncTaskHandler(TaskRequest taskRequest)
{
var ioc = _serviceProvider;

var recurringJobManager = ioc.GetRequiredService;
var httpClientFactory = ioc.GetRequiredService;
var logger = ioc.GetRequiredService>;
usingvar httpClient = httpClientFactory.CreateClient(taskRequest.ServiceName);

// 无论是否请求成功,都算完成了本次任务
try
{
// 请求子系统的接口
var response =await httpClient.PostAsJsonAsync(taskRequest.ServiceName, taskRequest);

var execteResult =await response.Content.ReadFromJsonAsync;

// 被调用方要求取消任务
if(execteResult !=&& execteResult.CancelTask)
{
recurringJobManager.RemoveIfExists(taskRequest.TaskId);
}
}
catch(Exception ex)
{
logger.LogError(ex,"Task error.");
}
}
}第四步

配置好 Hangfire 后,开始考虑如何接收任务和发起请求,首先定义一个 Http 接口或 grpc 接口。

[ApiController]
[Route("/execute")]
publicclassHangfireController:ControllerBase
{
privatereadonlyIRecurringJobManager _recurringJobManager;

publicHangfireController(IRecurringJobManager recurringJobManager)
{
_recurringJobManager = recurringJobManager;
}

[HttpPost("addtask")]
publicasyncTaskAddTask(TaskRequestvalue)
{
awaitTask.CompletedTask;
_recurringJobManager.AddOrUpdate(
value.TaskId,
task => task.Handler(value),
cronExpression:value.CronExpression,
options:newRecurringJobOptions
{
});
returnnewTaskResponse{};
}

[HttpPost("cancel")]
publicasyncTaskCancel(CancelTaskRequestvalue)
{

_recurringJobManager.RemoveIfExists(value.TaskId);

returnnewTaskResponse
{
};
}
}
业务服务只需要暴露一个exceute接口给 HangfireServer 即可,DemoApi 将 Command 序列化包装为请求参数给 HangfireServer ,然后 HangfireServer 原封不动地将参数请求到exceute接口。image-20250418095553964

对 DemoApi 主要设计过程如下:

• 定义 SendHangfireService 服务,包装 Command 数据和一些定时任务参数,通过 http 发送到 HangfireServer 中;

• 定义 ExecuteTaskHandler ,当接口被触发时,实现反序列化参数并使用 MediatR 发送 Command,实现动态执行;

• 定义 ExecuteController 接口,接收 HangfireServer 请求,并调用 ExecuteTaskHandler 处理请求;

DemoApi 引入类库如下-:

PackageReferenceInclude="Maomi.Core"Version="2.2.0"/>-
PackageReferenceInclude="MediatR"Version="12.5.0"/>

Maomi.Core 是一个模块化和自动服务注册框架。

第一步

定义 SendHangfireService 服务,包装 Command 数据和一些定时任务参数,通过 http 发送到 HangfireServer 中。

接收 HangfireServer 请求时,需要通过字符串查找出 Type,这就需要 DemoApi 启动时,自动扫描程序集并将对应的类型缓存起来。

为了将定时任务命令和其它 Command 区分处理,需要定义一个统一的抽象,当然也可以不这样做,也可以通过特性注解的方式做处理。

///
/// 定时任务抽象参数.
///
publicabstractclassHangfireRequest:IRequest
{
///
/// 定时任务 id.
///
publicstringTaskId{get;init;}=string.Empty;

///
/// 该任务创建时间.
///
publicDateTimeOffsetCreateTime{get;init;}
}

定义 HangireTypeFactory ,以便能够通过字符串快速查找 Type。

///
/// 记录 CQRS 中的命令类型,以便能够通过字符串快速查找 Type.
///
publicclassHangireTypeFactory
{
privatereadonlyConcurrentDictionarystring,Type> _typeDictionary;
publicHangireTypeFactory
{
_typeDictionary =newConcurrentDictionarystring,Type>;
}

publicvoidAdd(Type type)
{
if(!_typeDictionary.ContainsKey(type.Name))
{
_typeDictionary[type.Name]= type;
}
}

publicType?Get(stringtypeName)
{
if(_typeDictionary.TryGetValue(typeName,outvar type))
{
return type;
}

return _typeDictionary.FirstOrDefault(x => x.Value.FullName== typeName).Value;
}
}

最后实现 SendHangfireService 服务,能够包装参数发送到 HangfireServer 中。

当然,可以使用 CQRS 处理。

///
/// 定时任务服务,用于发送定时任务请求.
///
[InjectOnScoped]
publicclassSendHangfireService
{
privatestaticreadonlyJsonSerializerOptionsJsonOptions=newJsonSerializerOptions
{
AllowTrailingCommas=true
PropertyNameCaseInsensitive=true
PropertyNamingPolicy=JsonNamingPolicy.CamelCase,
ReadCommentHandling=JsonCommentHandling.Skip
};

privatereadonlyIHttpClientFactory _httpClientFactory;

publicSendHangfireService(IHttpClientFactory httpClientFactory)
{
_httpClientFactory = httpClientFactory;
}

///
/// 发送定时任务请求.
///
///
///


///


///
///
publicasyncTaskSend(TCommand request)
whereTCommand:HangfireRequest
{
usingvar httpClient = _httpClientFactory.CreateClient;

var taskRequest =newTaskRequest
{
TaskId= request.TaskId,
CommandBody=JsonSerializer.Serialize(request,JsonOptions),
ServiceName="http://127.0.0.1:5000/hangfire/execute"
CommandType=typeof(TCommand).Name??thrownewTypeLoadException(typeof(TCommand).Name),
CreateTime= request.CreateTime.ToUnixTimeMilliseconds.ToString,
CronExpression= request.CronExpression,
};

_ =await httpClient.PostAsJsonAsync("http://127.0.0.1:5001/execute/addtask", taskRequest);
}

///
/// 取消定时任务.
///
///



publicasyncTaskCancel(stringtaskId)
{

"http://127.0.0.1:5001/hangfire/cancel",newCancelTaskRequest
{
TaskId= taskId
});

}
}第二步

要实现通过 Type 动态执行某个 Command ,其实思路比较简单,也并不需要表达式树等麻烦的方式。

笔者的实现思路如下,定义 ExecuteTaskHandler 泛型类,直接以强类型的方式触发 Command,但是为了屏蔽泛型类型强类型在代码调用中的麻烦,需要再抽象一个接口 IHangfireTaskHandler 屏蔽泛型。

///
/// 定义执行任务的抽象,便于忽略泛型处理.
///
publicinterfaceIHangfireTaskHandler
{
///
/// 执行任务.
///
///


///
Task Handler(TaskRequest taskRequest);
}///
/// 用于反序列化参数并发送 Command.
///
/// 命令.
publicclassExecuteTaskHandler:IHangfireTaskHandler
whereTCommand:HangfireRequest,IRequest
{
privatereadonlyIMediator _mediator;

///
/// Initializes a new instance of the class.
///
///


publicExecuteTaskHandler(IMediator mediator)
{
_mediator = mediator;
}

privatestaticreadonlyJsonSerializerOptionsJsonSerializerOptions=newJsonSerializerOptions
{
AllowTrailingCommas=true
PropertyNameCaseInsensitive=true
PropertyNamingPolicy=JsonNamingPolicy.CamelCase,
ReadCommentHandling=JsonCommentHandling.Skip
};

///
publicasyncTaskHandler(TaskRequest taskRequest)
{
var command =JsonSerializer.Deserialize(taskRequest.CommandBody,JsonSerializerOptions)!;
if(command ==)
{
thrownewException("解析命令参数失败");
}

// 处理命令的逻辑
var response =await _mediator.Send(command);
return response;
}
}第三步实现定时任务execute触发接口,然后将参数转发到 ExecuteTaskHandler 中,这里通过依赖注入的方式屏蔽和解决强类型的问题。///
/// 定时任务触发入口.
///
[ApiController]
[Route("/hangfire")]
publicclassExecuteController:ControllerBase
{
privatereadonlyIServiceProvider _serviceProvider;
privatereadonlyHangireTypeFactory _hangireTypeFactory;

publicExecuteController(IServiceProvider serviceProvider,HangireTypeFactory hangireTypeFactory)
{
_serviceProvider = serviceProvider;
_hangireTypeFactory = hangireTypeFactory;
}

[HttpPost("execute")]
publicasyncTaskExecuteTask([FromBody]TaskRequest request)
{
var commandType = _hangireTypeFactory.Get(request.CommandType);

// 找不到该事件类型,取消后续事件执行
if(commandType ==)
{
returnnewExecteTasResult
{
CancelTask=true
};
}

var commandTypeHandler =typeof(ExecuteTaskHandler).MakeGenericType(commandType);

var handler = _serviceProvider.GetService(commandTypeHandler)asIHangfireTaskHandler;
if(handler ==)
{

{
CancelTask=true
};
}

returnawait handler.Handler(request);
}
}第四步封装好代码后,开始最后一个环境,配置和注册服务,由于笔者使用Maomi.Core框架,因此服务注册配置和扫描程序集变得非常简单,只需要通过Maomi.Core框架提供的接口即可最简单地实现功能。public classApiModule:Maomi.ModuleCore,IModule
{
privatereadonlyHangireTypeFactory _hangireTypeFactory;

publicApiModule
{
_hangireTypeFactory =newHangireTypeFactory;
}

publicoverridevoidConfigureServices(ServiceContext context)
{
context.Services.AddTransient(typeof(ExecuteTaskHandler));
context.Services.AddSingleton(_hangireTypeFactory);
context.Services.AddHttpClient;
context.Services.AddMediatR(o =>
{
o.RegisterServicesFromAssemblies(context.Modules.Select(x => x.Assembly).ToArray);
});
}

publicoverridevoidTypeFilter(Type type)
{
if(!type.IsClass|| type.IsAbstract)
{
return;
}

if(type.IsAssignableTo(typeof(HangfireRequest)))
{
_hangireTypeFactory.Add(type);
}
}
}1744942983410第五步

开发者可以这样写定时任务 Command 以及执行器,然后通过接口触发定时任务。

public classMyTestRequest:HangfireRequest,IRequest
{
}

///
/// 要被定时任务执行的代码.
///
publicclassMyTestHandler:IRequestHandler
{
privatestaticvolatileint_count;
privatestaticDateTimeOffset _lastTime;

publicasyncTaskHandle(MyTestRequest request,CancellationToken cancellationToken)
{
_count++;
if(_lastTime ==default)
{
_lastTime =DateTimeOffset.Now;
}

Console.WriteLine($"""
执行时间:{DateTimeOffset.Now.ToString("HH:mm:ss.ffff")}
执行频率(每 10s):{(_count / (DateTimeOffset.Now - _lastTime).TotalSeconds * 10)}
""");

returnnewExecteTasResult
{
CancelTask=false
};
}
}[ApiController]
[Route("/test")]
publicclassSendTaskController:ControllerBase
{
privatereadonlySendHangfireService _hangfireService;

publicSendTaskController(SendHangfireService hangfireService)
{
_hangfireService = hangfireService;
}

[HttpGet("aaa")]
publicasyncTaskstring>SendAsync
{
await _hangfireService.Send(newMyTestRequest
{
CreateTime=DateTimeOffset.Now,
CronExpression="* * * * * *"
TaskId=Guid.NewGuid.ToString,
});

return"aaa";
}
}

启动项目测试代码,记录执行频率和时间间隔。

image-20250418103509714动画本期封面模特:咕咕

来源:opendotnet

相关推荐