摘要:很多时候我们在构建一个系统的时候,有很多需要请求外部资源、外部接口的需求,大概分一分有下面几种形式:
很多时候我们在构建一个系统的时候,有很多需要请求外部资源、外部接口的需求,大概分一分有下面几种形式:
1)定时任务,比如定时数据同步。为什么不用crontab呢,因为Cron的时间精度是1分钟,如果你需要30秒就执行一次的话,就抓瞎了。
2)延迟任务,比如有时候需要定义半小时后发通知什么的,同理用crontab精度不够,写个死循环的话,那就需要自己写逻辑来实现了。
3)立即执行的任务,这个就是对外发送个http请求嘛,为啥还要搞个莫名堂的管理系统?
在执行对外发送http请求,也就是说其实系统是产生了对外的依赖和耦合,作业管理系统(Task Manager)就是用解耦对外依赖而产生的。
如果将作业管理耦合到系统中,那么就有下面的问题需要考虑:
1)错误逻辑的处理,因为外部系统并不是一直有效,那么如果出错需要怎么处理,如果是偶发性的错误怎么办,如果是整个服务都无法访问应该如何处理?是重试还是直接报错?重试多少次?实在无法访问了,出错的任务怎么保留?怎么处理私信箱?
2)异步任务的管理,因为不能堵塞正常的业务逻辑,那么就必须异步处理,那如果就简单 async-await放去后台跑的话,那么你在重启服务的时候就很危险了,你怎么知道现在有没有正在执行的任务,一旦重启这些任务中断了,能不能恢复?在这个基础上又延伸出了
2.1)任务的持久化
2.2)任务的暂停和恢复,包括重启服务后在持久化队列中任务的恢复
2.3)任务的并发能力,比如某些比较慢的服务容易堆积,需要多几个线程来跑,某些服务又有并发限制,只能一个线程去执行。
如果考虑到以上的问题,那么在自己的业务中加入这些逻辑就会变得很复杂了,在Python中有类似Celery的库来实现,但是还是有很多东西要自己来实现。于是一个简单轻量化的作业管理系统,独立进程部署,这样不论是用什么语言来写业务逻辑,都可以很容易集成进去,就很有必要了。
poulpe,法语中就是八爪鱼,用于把各个系统粘接起来,和谁都有一腿。该系统用于简化线上系统的任务调度开发,支持秒级粒度的定时任务触发,支持即时任务调度以及延迟任务调度。现阶段支持触发HTTP请求,服务端本地程序调用,邮件发送等任务类型。任务执行在多个预启动的工作线程中,支持对任务的并发能力进行控制。如此这般,开发人员可以将精力集中于业务,而不需要关心多线程,多进程,并发,线程隔离,进程调用,错误重试,任务持久化,重启任务的恢复等技术细节。采用Rust开发,发布的基于x86架构的Musl编译版本,只有一个可执行文件,开箱即用,Musl编译使得程序没有任何多余的依赖,可以在Apline等最小化的Docker镜像中执行,部署方便快捷。
Git地址在:GitHub - ipconfiger/poulpe: Light weight easy to use task manager written by Rust
支持的作业模式有:
1)定时任务(秒级粒度)
2)延迟触发任务
3)即时触发任务
作业执行的类型支持:
1)HTTP(GET|POST)
2)邮件
3)服务端本地脚本
只有一个可执行文件就可以完成部署,启动的方法如下:
./poulpe \
--port 8000 #运行的端口
--redis redis://127.0.0.1 #用于持久化作业的redis连接
--cron /etc/cron_task #定义定时任务的文件地址
--dead /tmp/deadpool #死信箱目录,用于存储死任务
--workers 4 #启动工作线程的数量
--retry_interval 10 #错误重试间隔时间
--max_retry 3 #最多重试的次数
--smtp_server xxx.xxx.com #SMTP服务器地址
--smtp_port 23 #SMTP服务器端口
--smtp_name alex #SMTP账号
--smtp_pwd ***** #SMTP密码
--starttls #是否启用starttls
--kafka_servers #Kafka的Broker列表,支持接入群集
--kafka_topic #接收任务的Topic
--kafka_resp_topic #返回任务执行结果的Topic
实时触发和延迟触发均支持 HTTP、 Kafka、WebSockets等 多通道接入,默认只开启HTTP和WebSockets,在设置好正确的kafka前缀的参数后,即可从Kafka接收任务
HTTP请求的JSON Body和Kafka的消息题的格式是一致的,都是JSON格式,定义如下:
{
"method": "作业执行的类型 GET|POST|EXEC|MAIL_TO",
"delay": 0, # 延迟执行的秒数,0为即时触发
"name": "命令名", # GET|POST的时候是请求地址,EXEC为命令名,MAIL_TO为邮件名
"params": "参数", # GET时为QueryString,POST时是JSON字符串,EXEC时为空格隔开的参数,MAIL_TO的时候为专门定义的JSON字符串
"cc": "1 3", # 空格隔开的指定线程编号,这里指定两个线程,表示最多可以并行执行2个任务,留空表示不限制
"wait": 0 # 可选,大于0表示需要在提交任务后等待执行的结果,数值为等待超时时间
}
系统提供long pulling获取执行结果的接口,以提供给需要更长执行时间的任务用于获取执行结果:比如压缩视频
获取系统当前状态(Get System Status)
通过一个GET请求获取当前运行状态,包括正在执行的任务id列表,等待执行的延迟任务id列表,执行报错任务id列表,以及各个工作线程的执行队列长度信息
来源:视未来