第三方数据延迟取不了现 如何做好分布式任务调度——Scheduler 的一些探索
作者 | 张宇轩,章逸,曾丹
PART 01
初识
1、找准定位:分布式任务调度平台
无论是互联网应用或者企业级应用,都充斥着大量的任务。我们常常需要一些任务调度系统帮助我们解决问题。随着微服务化架构的逐步演进,单体架构逐渐演变为分布式、微服务架构。在此的背景下,很多原先的单点式任务调度平台已经不能满足业务系统的需求。于是出现了一些基于分布式的任务调度平台。
2、摸清脉络:的结构和核心模块
名词解释:
通过架构图可以发现,主要有以下三个部分:
因此,我们可以用一句话解释清楚 所做的事情,即:在「指定时间」「通知执行器」以「指定方式」执行任务
这句话中包含了三个关键点,也分别代表着 的三个核心模块:
在一个Job的调度周期中,各个模块各司其职,整个流程如下:
拥有这三个核心模块后, 已具备了成熟的任务调度功能。另外,为了增加 的稳定性,有额外两个模块为其保驾护航:
本篇文章不对 所支持的定时任务能力作赘述,而是从三个方面(易用性、多功能性、稳定性)介绍 对于分布式任务调度的思考和探索:
PART 02
换位思考-快速接入
1、背景:效率至上,时间是金
以字节跳动内部为例,当前团队想要实现一个定时任务有多种方式:接入字节云的 平台、自己实现一套定时任务框架或者接入第三方定时任务框架。
对于第一种接入 平台,每一种定时任务都需要注册各自的 psm 和运行时环境(镜像),当任务需要访问依赖资源如 redis/db 等时,需要各自添加授权。任务代码逻辑有变化时也需要各自升级,导致开发、管理起来较为复杂。
对于第二种自己实现一套定时任务框架,不仅整体开发时间较长,且需要大量时间进行测试回归来保证框架的稳定性。如果项目内使用到的定时任务较多,那么自身研发一套框架用途也较广泛;若项目中使用到的定时任务较少,则 ROI 较低,很多时候也只是为了造轮子而造轮子。
因此,大多数项目面对增加定时任务的需求时,都会寻求直接接入第三方成熟的定时任务框架。对于他们来说,是否易于接入、与现有代码联系是否紧密、调试是否方便是很重要的选取指标。
基于这种背景, 在设计时就站在了接入方的角度,思考了如何让接入方能够在最短时间内以最低成本接入 ,实现自己的定时任务。
2、分析:站在用户角度想问题
站在接入方角度,对定时任务框架进行选型时最关注的几个点无非是定时任务执行准确性、最高支持 qps、定时设置多样性、接入成本这几个。对于前两个指标, 目前接入业务方 50+,日均调度任务 20w+ 次,与公司内其他第三方定时任务框架相比也较有竞争性,同时对于后两个关注点, 也有自己的风格。
3、丰富的调度设置
一般的定时任务框架只支持 表达式,例如0 1 * * *,代表每天凌晨一点执行一次。 功能强大,但是若配置复杂的定时策略,有一定学习成本,且可读性不高。因此,鉴于这种情况, 在 之上设计了更易读更强大的定时策略,做到所见即所得。
{
"startTime":1648029600000,
"timeZone":"Asia/Shanghai",
"repeatLevel":"month",
"repeatInterval":2,
"repeatDays":[3,5,23]
}
因此,该设置所代表的定时间隔为:每两月的3、5、23号触发一次
4、为什么要做工作日调度
可能有同学注意到, 对于重复级别的支持十分丰富,不仅可以按照普通的年、月、日等级别进行设置,还可以按照工作日进行重复调度(例如每两个工作日执行一次),这归因在 孵化于字节跳动内部企业服务系统,为诸如人事系统、权限系统等 ToB 服务提供定时任务能力。往往 ToB 客户的需求复杂多变,因此,需要提前具备更多能力,才能更好地服务好 ToB 客户。
在调研接入方需求时,得到了有些客户对于定时提醒这类任务的需求是尽量不要在「非工作日」打扰。于是, 决定增加工作日调度选项来适配客户潜在需求,也侧面说明了 为了让接入方更快更小成本接入做出的努力。
5、轻松的使用方式
相信「开箱即用」对于人们在采买诸如家电、数码产品时,是十分重要的一个考核指标。而对于对外提供的服务 or 框架,亦是如此。 的目标就是让接入方能够在短时间熟悉 、编写测试代码以及上线定时任务。
6、专注业务
如果想实现一个定时任务,接入方只需要三步:引入 sdk,绑定相应 ,在 接口中实现具体业务逻辑。同时,由于定时任务的实现位于原代码中,启动配置无需更改,本地测试也较为便捷。同时,在字节跳动环境下,无需新增 psm、授权配置等,尽可能做到了「开箱即用」。
import (
"context"
"code.byted.org/apaas/scheduler_sdk/executor" // 引入 sdk
)
func main() {
executorSvc, err := executor.NewExecutor(executor.NewDefaultExecutorConfig(),
&HelloWorld{}) // 绑定 processor
if err != nil {
panic(err)
}
if err = executorSvc.Run(); err != nil {
panic(err)
}
}
type HelloWorld struct {
ProcessorApiName string
}
func (h *HelloWorld) GetApiName() string {
return h.ProcessorApiName
}
// process 中实现具体业务逻辑
func (h *HelloWorld) SimpleProcess(ctx context.Context, tc *executor.TaskContext) (err error) {
tc.LogInfo(ctx, "hello world")
return
}
左右滑动查看完整代码
7、快速运维
没有程序员想主动写出 bug,但问题总是会突然出现。如何在出现问题时快速运维、快速止损,是所有工程师都追求的目标。 在这方面做了几种尝试:
(1)报警更直观
用户可以在创建 job 时,可以选择配置报警机器人,并把 机器人拉入对应报警群组。当检测到对应 job 出现问题时, 机器人会把相应报警推送到对应群组,做到实时响应。
(2)状态更清晰
目前 task 的相关状态如下,当一个 task 长期没有到终态时,根据状态码即可知 task 目前处于什么状态,从而推断是哪一步骤出了问题。
并且一些 常见的报错也做了封装,帮助快速定位问题,例如
PART 03
并肩作战-分片任务
1、背景:任务越多,挑战越大
一个成熟的项目中避免不了大型批量任务,比如通过 Excel、csv 或其他数据源批量创建或更新数据,批量任务一般数据量很大,如果按照单实例串行执行,那么不能充分利用计算机资源且一次运行会消费大量时间,用户体验不友好。
以 举例, 旧阶段的批量任务依赖于消息队列、Redis 实现,总体分为三大部分:
(1)解析并校验Excel,将数据解析成一条条数据,将每条消息封装成一条消息发送至消息队列;
(2)消费消息队列,进行创建、更新等操作,并在Redis记录总体进度并推送给用户,如果任务失败,会将行数和错误原因同时记录进redis;
(3)待所有数据处理结束后,如果redis中没有错误数据,则提示用户成功,否则根据错误信息生成Excel返回给用户。
使用消息队列、redis的定时任务可以提速和优化用户体验,但有以下不足:
基于这种背景, 丰富了原本的任务调度能力,补充了分片能力,以满足复杂繁琐的任务分片处理的需求。
2、分析:旧问题,新解法
若打算做出一套贴合业务需求的分片任务框架,需要先了解现阶段的分片任务的实现步骤。
现阶段的分片任务大致可以抽象成3个步骤:
要做的事情则是替换其中分片、消息队列、Redis 的功能,做出以下抽象:
type ShardingProcessor interface {
PreProcess(ctx context.Context, tc *TaskContext) error
ShardingProcess(ctx context.Context, tc *TaskContext) error
Notify(ctx context.Context, tc *TaskContext)error
PostProcess(ctx context.Context, tc *TaskContext) error
}
执行器需要实现接口以供调度器进行调度。调度过程如下:
支持分片任务重点在于丰富调度模型,提升调度器调度能力,完善执行器执行能力来达到支持分片任务的目的
3、调度侧能力
(1)分批调度的能力
调度侧需要根据任务进度依次生成 、、、 来调度执行器 中的 - - - 四个方法。单机调度 ,,,并行调度 ,总体调度呈现总-分-总的形式。调度过程如下:
(2)数据拆分的能力
数据拆分即任务分片,指的是将单一任务按照特定的逻辑切分为多个独立的子任务,将其分派到不同的节点执行,以提高任务的执行效率。
而 要处理的任务内部可能存在依赖关系(比如 业务中 批量创建的需求,由于存在 和 字段等,记录创建之间存在拓扑关系),所以在执行时需要优先级的概念,而不能被简单拆分为独立的子任务。
为了支持带优先级的任务分片, 接收的分片任务的数据特点如下:
是业务方自定义任务的二维数组;
第一个维度是任务执行的优先级,位于同一优先级下的任务并发执行,位于不同优先级的任务按优先级串行执行;
第二个维度是同一优先级下的自定义任务列表。关于自定义任务的结构, 不感知。业务方可以选择存储任务详情或是主键信息,并自定义处理逻辑,而 只做分片和调度工作。
二维数组可以是下面这样:
[
// 第一优先级的任务1,可以是主键
101,
// 第一优先级的任务2,可以是SQL语句
"Insert into tablename xxx",
// 第一优先级的任务3,可以是结构体等等...
{
"ID": 999,
"Name": "zhangsan"
}
],
[
// 第二优先级的任务1、2、...
102,
103,
...
]
]
了解了待分片任务的结构,我们来讨论如何对任务进行分片。比如,分片的数量由什么决定,单个分片上的信息是如何分配的,不同分片又是不同分派到不同的处理器上的...
分片数的确定
分片数的确定基于以下参数的值:数据量、任务创建时用户指定的单片最大数量、单片最小数量,以及实际可用的执行器数量。
分片算法
分片特征值( key)的选择要遵循的原则应该是基于最常用的访问方式。
由于 分片时并不关心业务数据的结构,所以选用数据数组的下标来作为分片特征值。
由于分片数量确定后,不涉及到由于分片的增加或减少对数据进行 的情况,所以无需考虑虚拟节点、一致性哈希等方式进行分片。
这里选用哈希分片的分片算法,原因是既可以均匀分布数据,实现起来也很简单。
分片的存储和派发
分片完成后,需要给每个分片创建一个 Task,并把分片的数据存储下来。
关于 Task 的派发,根据上面关于分片数的讨论,可以得到分片数和 数的关系:
为了让各个 Task / 各个分片 能够均匀派发给各个 ,也为了避免某个挂掉时,其他 不能均匀分摊挂掉的节点原先承担的分片,需要采用合理的分片策略。
在分片时,我们保证了各分片的数据是尽量均匀分布的,所以从分片到 的分派方式可以尽可能地简单,采用平均分配的策略即可。对于挂掉的节点所承担的分片,也采用同样的策略派发到存活的 上即可。
例如:
平均分配*:对于不能均分的情况,为了避免靠前的 总是承担更多的压力,可以根据待分配分片数量的奇偶来决定是升序分派还是降序分派。
4、进度通知的能力
总任务进度发生变化,则生成 发送至 。
5、执行侧分批执行的能力
执行侧需要实现并注册 SDK 提供的接口,来处理由调度侧发来的多种类型的Task。
(1)
预处理方法,可以进行但不限于以下的操作:
如果不需要预处理,可直接在方法内 ,分片时数据使用启动时的 Data
func (s *ShardingProcessor) PreProcess(ctx context.Context, tc taskContext) error{
oldData := tc.GetData()
// 用户业务, 数据处理
newData := Transform(oldData)
// 返回带拓扑排序的数据
tc.SetResult(newData)
return nil
}
左右滑动查看完整代码
(2)
分片处理函数,主要是进行数据更新、创建操作。 的入参是切分后的数组([]{})。 需要对参数进行两部分额外处理:
func (s *ShardingProcessor) ShardingProcess(ctx context.Context, tc taskContext) error{
taskData := tc.GetData()
for _, data := range taskData{
// 用户业务, 数据处理
}
tc.SetResult(...)
return
}
左右滑动查看完整代码
(3)
接受所有分片处理结果,进行后续处理,如生成错误文件。
func (s *ShardingProcessor) PostProcess(ctx context.Context, tc taskContext) error{
taskData := tc.GetData()
for _, result := range taskData{
// do something
}
// 用户业务
tc.SetResult(...)
}
左右滑动查看完整代码
(4)
提供给子任务上报的能力, 会根据所有子任务上报结果计算进度,通知 ,通知粒度为数据条数。如果接入方不主动上报子任务进度, 会根据子任务完成度进行通知,通知粒度为分片粒度。
6、分片任务流程
PART 04
削峰填谷-流量控制
1、背景:提供能力,而非施加压力
在 设计初期时,更多的是把注意力放在了如何能够快速、准确、低延迟的触发任务,为此还多次优化了触发器、分派器、派遣器三大模块的轮询逻辑,但是忽略了任务量过大时下游能否抗住流量的问题。
如果 在调度时无法准确感知下游压力,那么很容易将下游打挂,如:在定时任务首次上线时,因为 的装包机制导致数千个应用下配置了同样的定时任务,虽然一个包内的数十个定时任务触发时间分散,但是应用包之间的同一个任务触发时间相同,导致下游需要在同一时刻处理数千个任务,再加上任务的处理流程还会通过消息中间件进行扩散,导致数据库在任务执行阶段一直处理低IDLE 阶段。
2、分析:流量追踪,剥茧抽丝
目前大部分后端服务,通过分析任务的流量走向,可以大致确认每一条任务在执行过程中不论扩散还是非扩散流量都会走向DB,流量图大致如图。
任务的流量最终打到了 DB,所以流量控制的目标就更加清晰:对 DB 的流量控制。
需要对 DB 进行流量控制,那么就要设定合理的指标,理论上,只要指标采纳的足够合理,就能严格、准确的控制流量,指标则需要具备以下条件:
只需要实时监听着 DB 的指标,来判断任务是立刻执行,还是延迟执行就能有效的保护 DB。
3、指标选择
消费 的监控点,关于数据库的打点信息非常全面,能够非常轻易的获取到数据库宿主机的CPU、内存或数据库本身的连接数、查询数等指标,这些指标的权威性毋庸置疑,但是 通过将指标收集到本地代理,代理每 30s 做一次聚合发送至服务端,其时效性太差。
数据库不可用因素为:大量任务触发 -> DB 访问流量增高 -> CPU idle 降低 -> 数据库不可用。造成 CPU idle 降低的因素为 DB 流量增高,可以将 DB 的流量作为指标进行流量控制,缺点是需要自己采集指标。
4、指标收集
(1)指标范围
反映 DB 压力较为直接的指标是 cpu idle,但考虑到服务部署往往多实例以及 cpu idle 采集难度大的情况,以近似指标来代替。另一方面,通过历史数据分析,DB 流量与 cpu idle 有一定的关联,因此以 DB 流量作为 DB 压力指标。
5、数据存储
参考限流的实现方案,采用单独的 Redis 存储流量数据,以 1s 为时间窗口作为 Redis key,每个时间窗口的流量作为 Redis value,每次发生 DB 操作时更新流量数据。系统中存在多个 DB,每个 DB 单独统计,在 Redis key 中加入db信息。Redis key 设置10s过期时间,查询时根据过去3个窗口的加权平均(80%/15%/5%)作为当前流量,以处理窗口交界处的突发流量。
6、收集方式
目前 DB 流量已有 监控数据,但由于 会在本地聚合 30s 数据后上报,至少会有 30s的延迟。而造成 DB 压力大的定时任务多为短期集中触发,使用 数据会有感知不及时的问题,因此需要额外收集数据。参考 DB 数据采集的方式,通过 Gorm 的 机制插入具体的采集逻辑,减少对业务代码的侵入。
func SetMonitorCallBack(db *gorm.DB) {
db.Callback().Create().Before("gorm:before_create").Register("metric:before_create", beforeCreateCallback)
db.Callback().Delete().Before("gorm:before_delete").Register("metric:before_delete", beforeDeleteCallback)
...
}
func beforeCreateCallback(scope *gorm.Scope) {
beforeCallback(scope, "create")
}
func beforeCallback(scope *gorm.Scope, method string) {
...
}
左右滑动查看完整代码
在采集逻辑上,需要考虑以下几个问题:
性能: 中需要尽量减少延迟,优先使用异步的方式上报数据。使用 充当队列, 中将数据写入 ,当 容量满时丢弃数据,防止阻塞。另有异步协程从 中取数据上报。兼顾时效性和网络开销,在上报前预先在本地以100ms窗口做聚合。
type MetricType int8
const (
QueryCount MetricType = 1
)
type DBMetric struct {
DBName string
DBMethod string
Type MetricType
Timestamp int64
Value interface{}
}
// callback中将metric数据写入channel
func beforeCallback(scope *gorm.Scope, method string) {
dbName := getStringValueFromCtx(scope.DB().Ctx, CtxVariableDBName)
curMs := time.Now().UnixNano()/int64(time.Millisecond)
metric := DBMetric{dbName, method, QueryCount, curMs, 1}
select {
case ch <- metric:
default:
// channel is full, ignore this metric
}
}
// 异步上报,在resource_sdk的Init()中根据配置判断是否启动此协程
func metricAgent() {
windowSize := 100
// window time -> (metric key(dbName|dbMethod|type) -> metric)
aggrMetrics := map[int64]map[string]DBMetric
timer := time.NewTicker(windowSize)
defer func() {
timer.Stop()
}()
for {
select {
case msg := <-ch:
curWindow := curMs/windowSize
更新aggrMetrics中curWindow对应的metric(对queryCount来说是加1)
case <-timer.C:
将aggrMetrics中key+windowSize<=curTime的数据上报并清除
}
}
}
左右查看完整代码
运维成本:采集逻辑会运行在各个服务上,考虑到后续会收集更多的指标,直接上报 Redis 需要给各个服务开通读写权限,运维管理成本较高。基于此,使用额外的服务来管理指标数据,接收上报的指标数据存入 Redis,并通过接口的方式提供查询服务。指标存放在更加聚焦在DB资源的 服务中,在 服务中通过增加接口的方式实现指标数据的管理功能,同时,为了不影响 原有业务的稳定性,使用单独的集群提供服务。
1、流量阈值限制
调度速率与 DB 负载之间的关系较为复杂,本期采用简单的阈值反馈机制,设置 DB 流量阈值,当流量超出阈值时,停止 当前周期调度。根据历史数据,设置阈值为5K。
当流量未超出阈值时,不能预估任务对 DB 流量的影响,采用简单策略对任务数进行限制:
任务数 = max((DB流量阈值 - DB当前流量)* 100 / DB 流量阈值, 0)
2、DB路由
目前 的 DB 资源根据租户进行分配,不同租户的数据和流量会落在不同的 DB 上。会记录 Job 所处租户,所以在调度时,需要根据租户查找真实的 DB 资源,通过 DB 指标的健康状况来决定是否派遣任务:
3、调度控制流程