一种基于etcd实践节点自动故障转移的思路

摘要:etcd作为基于raft强一致性协议实现的分布式存储, CP模型,天生对外输出协调和共识能力, 能确保不同客户端在同一时间读到的内容相同。在我们这个故障转移的场景,能稳定的输出唯一的leader。

自动故障转移是服务高可用的一种实现方式。mongodb,redis哨兵集群、 etcd都具备某种程度的故障转移能力。

今天记录利用etcd选举sdk实践 服务自动故障转移

服务以leader、follower多节点启动,日常leader接受所有业务流量,follower作为备用实例,不接受业务流量;

监测到leader宕机,follower节点自动提升为leader并接管业务流量。

既然是故障转移, 故所有节点的状态会发生变化, 这是一个状态机模型。

1>. 各节点向etcd注册节点标记, 并各自维持稳定的心跳保活;

2>. 参与选举

2>. 算法选出leader

4>. 迅速感知选举结果和换届

etcd作为基于raft强一致性协议实现的分布式存储, CP模型,天生对外输出协调和共识能力, 能确保不同客户端在同一时间读到的内容相同。在我们这个故障转移的场景,能稳定的输出唯一的leader。

etcd的客户端concurrent包提供了依赖于etcd并发操作的上层行为, 比如:分布式锁、选举、屏障。

选举能力由concurrent 包中的Election中提供。

下面是一个故障转移客户端

typeClient struct {
addr string
Leader string // 每个服务节点都知道集群中谁是leader
cli *clientv3.Client
val string // 注册到etcd的值, 标记节点
election *concurrency.Election
electsessionDoneCh <-chan struct{} // 换届的信号
IsLeader bool //标示当前节点是否是leader
}
2.1 节点初始化,维持心跳保活

每个节点需要维持稳定的心跳保活, 以便参选和换届。

session, err := concurrency.NewSession(cli, concurrency.WithTTL(10)) // 使用etcd的租约机制来实现心跳保活
iferr != nil {
returnnil, err
}
ele := concurrency.NewElection(session, LeaderkeyPfx)

`NewSession`[1]实现保活会话。

对应到原始的etcdctl是利用租约:

etcd 有租约操作,租约可以绑定到键值对,实现键值对的存活周期控制;甚至租约可以不绑定到键值对,仅做心跳保活(有刷新租约的机制)

etcdctl lease grent 30

etcdctl lease keep-alive 41ce93a9f806a53b

2.2 参选

向etcd注册节点标识,这里会将以上保活会话绑定到键值对,

注意:没有选上的节点会阻塞等待,选上的节点快速返回执行业务逻辑。

func (c *Client) Election(ctx context.Context, id string) bool {
c.Leader = c.leader
err := c.election.Campaign(ctx, id)
iferr != nil {
log.WithError(err).WithField("id", id).Error("Campaign error")
returnfalse
}
c.IsLeader =true
returntrue
}
对应到原始的etcdctl操作:etcdctl put ‐‐lease=41ce93a9f806a53b /merc/leader/41ce93a9f806a53b 127.0.0.1:8686注意:key= /merc/leader/41ce93a9f806a53b, value= 127.0.0.1:8686, 租约是41ce93a9f806a53b(持续保活的租约)2.3 选举算法

根据当前存活的、最早创建的节点信息键值对来决定leader , 核心API是Leader接口

// Leader returns the leader value forthe current election.
func (e *Election) Leader(ctx context.Context) (*v3.GetResponse, error) {
client := e.session.Client
resp, err := client.Get(ctx, e.keyPrefix, v3.WithFirstCreate...)
iferr != nil {
returnnil, err
}elseiflen(resp.Kvs) == 0 {
// no leader currently elected
returnnil, ErrElectionNoLeader
}
returnresp, nil
}

func (c *Client) leader string {
ctx, cancel := context.WithTimeout(context.Background, time.Second*3)
defer cancel

resp, err := c.election.Leader(ctx)
iferr != nil {
return""
}
returnstring(resp.Kvs[0].Value)
}

对应到原始的etcdctl操作是:etcdctl --endpoints=127.0.0.1:2379 get /merc/leader --prefix --sort-by=CREATE --order=ASCEND --limit=1

--prefix 这里我们指定--prefix /merc/leader筛选key

--sort-by :以x标准(创建时间)检索数据

-- order :以升降序对已检出的数据排序

-- limit:从已检出的数据中取x条数据显示

2.4 监控选举结果和换届

通过watch机制通知节点业务代码leader变更,核心是`Observe` API[3]

func (c *Client) Watchloop(id string, notify chan<- bool) error {
ch := c.election.Observe(context.TODO) // 信道传递最新的leader节点, 但是如果底层watcher被一其他方式中段或者超时, 信道会被关闭
tick := time.NewTicker(time.Minute*5) // 5min去问下,防止假死
defer tick.Stop
for{
select {
case<-c.electSessionDoneCh: // Done returns a channel that closes when the lease is orphaned, expires, or is otherwise no longer being refreshed.
log.Warning("Recv session event")
returnfmt.Errorf("session Done") // 意味心跳保活失败
caselatestLeaderResp, ok := <-ch: // 注意, 从closed(chan) 会持续读到零值, 造成死循环。
if!ok {
log.WithField("topic""watch-loop").Warn("channel closed, something underlying cause error.")
ch = c.election.Observe(context.TODO)
}else{
log.WithField("topic""watch-loop").Info(latestLeaderResp)
}
case<-tick.C:
}
ctx, cancel := context.WithTimeout(context.Background, time.Second*3)
defer cancel
resp, err := c.election.Leader(ctx)
var isLeader bool
iferr != nil {
iferr == concurrency.ErrElectionNoLeader {

defer cancel
isLeader = c.Election(ctx, id)
}else{
log.WithError(err).Errorf("watchLoop get leader error")
isLeader =false
}
}elseifstring(resp.Kvs[0].Value) != id { //收到leader变化消息,判断发现自己不是leader,停止工作
c.Leader = string(resp.Kvs[0].Value)
isLeader =false
}else{

isLeader =true
}
ifisLeader != c.IsLeader {
log.WithField("after", isLeader).WithField("before", c.IsLeader).WithField("leader", c.Leader).Info("reElect")
notify <- isLeader
c.IsLeader = isLeader
}
}
}
etcdctl watch --preifx=/merc/leader --start-rev=12345 --prefix监听指定前缀key在全局12345修订版之后的键值对。2.5 自动故障转移是节点的基础服务

自动故障转移,不是业务代码, 故需要在后台持续运行, 我们开两个goroutine去执行选举和监控换届的逻辑。

讲道理,每个节点只需要知道两个信息就能各司其职

谁是leader ==> 当前节点是什么角色===> 当前节点该做什么事情

感知集群leader变更的能力 ===>当前节点现在要不要改变行为

go func {
err := eCli.Watchloop(Id, notify) //后台监测与etcd的连通性以及leader节点的变化
log.WithError(err).Error("watchLoop error")
notify false
}

gofunc {
ifeCli.Election(context.TODO, Id) {
notify true
}
}

业务逻辑的承载有赖于 notify信道的传递。

如果不考虑依赖的CP模型,我们甚至可以使用 mysql,redis做选举

3.1 etcd强烈推荐使用层次化的键空间

与redis类似,虽然可以插入hello:world到键值对存储, 但在编程实践都都推荐使用命名空间的做法来避免键值冲突, redis推荐使用 shopping:users:u1200;

etcd v3[4]从逻辑上也是一个扁平的二进制键空间, 推荐使用前缀字符串来做命名空间。

这里我提出一个疑问?

Q: 为什么相比redis,etcd将键前缀看的如此重要,单独提供了查询配置选项?

A:etcd用于是为分布式系统的配置管理、服务发现和协调而设计的,它提供了强一致性和高可用性。它允许以层次化的方式组织数据, 这种层次化的特性比redis缓存要求的命名空间更强烈。

election.Campaign(val) 的实质是将K:V(节点id)添加到etcd, 并给予持续保活.

内部实现上, 续约值成为了注册键的一部分, 参与竞选的节点都注册到/merc/leader 前缀下。 func (e *Election) Campaign(ctx context.Context, val string) error {
s := e.session
client := e.session.Client

k := fmt.Sprintf("%s%x", e.keyPrefix, s.Lease)
txn := client.Txn(ctx).If(v3.Compare(v3.CreateRevision(k),"=", 0))
txn = txn.Then(v3.OpPut(k, val, v3.WithLease(s.Lease)))
txn = txn.Else(v3.OpGet(k))
resp, err := txn.Commit
iferr != nil {
returnerr
}
e.leaderKey, e.leaderRev, e.leaderSession = k, resp.Header.Revision, s
if!resp.Succeeded {
kv := resp.Responses[0].GetResponseRange.Kvs[0]
e.leaderRev = kv.CreateRevision
ifstring(kv.Value) != val {
iferr = e.Proclaim(ctx, val); err != nil {
e.Resign(ctx)
returnerr
}
}
}
3.2 etcd全局修订版本号在选举算法中的应用?

当选:当前存活的、最早创建的key是leader , 也就是说master/slave故障转移并不是随机的,下一个当上leader的是次早创建的节点。

client.Get(ctx, e.keyPrefix, v3.WithFirstCreate...)

当前存活的,最早创建的节点: 如何定义最早创建的节点?

应该是利用找到的存活节点中 revision最小的那一个key。

什么是revision修订版?

etcd 提供了对不常变更的数据的一致性查询和变更监听能力, 同时包含对于变更的快照查询和历史版本查询, etcd在全局键空间有一个revision修订版。键空间任意一个变更,该修订版都会单调递增。

etcd的客户端交互有赖于grpc请求, 我们看了通过发起的grpc请求来验证此次使用了修订版机制。

etcd的客户端交互API氛围三大类:KV、Watch、Lease, KV 操作都收敛到do函数内枚举发起grpc请求:

=> client.Get(ctx, e.keyPrefix, v3.WithFirstCreate...)

==> r, err := kv.Do(ctx, OpGet(key, opts...))

====> op.toRangeRequest 产生了https://etcd.io/docs/v3.5/learning/api/ 文档规定的grpc参数


func (kv *kv) Do(ctx context.Context, op Op) (OpResponse, error) {
var err error
switch op.t {
casetRange:
ifop.IsSortOptionValid {
var resp *pb.RangeResponse
resp, err = kv.remote.Range(ctx, op.toRangeRequest, kv.callOpts...)
iferr == nil {
returnOpResponse{get: (*GetResponse)(resp)}, nil
}
}else{
err = rpctypes.ErrInvalidSortOption
}
casetPut:
其中:v3.WithFirstCreate构造了OpOption { return withTop(SortByCreateRevision, SortAscend) }grpc range请求参数, 此处可验证找到一组key之后, 按照revision升序排列,第一个key为leader。3.3 etcd的watch机制

在监控换届时,我们用到了etcd异步监听变更事件的watch API[5]

异步监听变更, 本身是一个长期运行的行为, 在etcd是使用grpc的双向流式通信来实现。这个留待读者自行去解码[6]

复盘

本文从一个自动故障转移的生产实践, 延伸到

状态机机制

etcd提供一致性共识的基础能力

选举需要实现的: 参选机制、当选算法、换届方式、

描述了使用编程来实现选举和换届这一民主生活实践。

落地到etcd式特色选举,提炼了etcd全局修订版机制在选举算法中的应用, grpc流式通信在etcd watch机制中的应用。

[1]

https://github.com/etcd-io/etcd/blob/55500416335e959e347d368c7f8a7a0229db3f6a/client/v3/concurrency/session.go#L38

[2]

https://github.com/etcd-io/etcd/blob/9fa35e53f429ca8f21b0d6b26f24e1848f2652a6/client/v3/concurrency/election.go#L69

[3]

https://github.com/etcd-io/etcd/blob/9fa35e53f429ca8f21b0d6b26f24e1848f2652a6/client/v3/concurrency/election.go#L173

[4]

etcd v3:

https://etcd.io/docs/v3.5/learning/data_model/#logical-view

[5]

watch API:

[6]

解码:

https://github.com/etcd-io/etcd/blob/9fa35e53f429ca8f21b0d6b26f24e1848f2652a6/client/v3/watch.go#L548

本篇文字和图片均为原创,读者可结合图片探索源码, 欢迎反馈 ~。。~。

来源:opendotnet

相关推荐