Zookeeper Watch机制
Zookeeper Watch机制
Watcher是一种简单的机制,使客户端得到关于ZooKeeper集合中的更改的通知。 客户端可以在读取特定znode时设置Watcher。Watcher会向注册的客户端发送任何znode(客户端注册表)更改的通知。
1. 概述
ZooKeeper Watch 机制是指,客户端在所有的读命令上告知服务端:这个节点或者子节点变化时通知我,具体来说,支持的写操作有:
- getData
- getChildren
- exists
例如,我们在命令行可以输入 get -w /foo,其中 -w 参数就是用于告知 ZooKeeper 服务端,当前客户端想在 /foo 节点上设置一个监听器。
ZooKeeper Watch 机制的两个细节:
- wactch 是一次性触发的(除了永久递归 watch),如果客户端如果在一个 watch 通知后继续收到相同节点的 watch 通知,那么必须再次注册 watch 一次;
- 服务端发给客户端的 watch 通知并不包含具体的节点数据,其起到的作用非常存粹:告知客户端其关注的节点发生了 watch 事件;
本篇博客在客户端角度,从底层出发,看一下Zookeeper Watch机制。开始之前,先思考一下以下疑问,带着这些问题进行Zookeeper客户端的学习。
- Zookeeper 客户端如何进行网络请求
- Zookeeper 如何处理同步和异步请求
- Zookeeper如何注册和触发Watcher
- 我们常用的ZkClient又做了什么?
2. 客户端网络IO模型
Copy From ZooKeeper客户端源码解读(网络I/O)
2.1 整体结构图
ClientCnxnSocket 封装了底层Socket通信层, ClientCnxnSocket整体结构如图所示:
2.2 Packet
Packet是ClientCnxn内部定义的一个对协议层的封装,作为ZooKeeper中请求与响应的载体。
从上图可以看出,Packet中包含了请求头、响应头、请求体、响应体、节点路径和注册的Watcher等信息。
2.3 SenderThread
2.3.1 基本概念
SendThread是客户端ClientCnxn内部一个核心的I/O调度线程,用于管理客户端和服务端之间的所有网络I/O操作。在ZooKeeper客户端的实际运行过程中
- SendThead维护了客户端和服务端之间的会话生命周期,其通过在一定的周期频率内向服务器发送一个PING包来实现心跳检测,同时,在会话周期内,如果客户端和服务端之间出现TCP连接断开的情况,那么就会自动而且透明化完成重连操作。
- 另一方面,SendThread管理了客户端所有的请求发送和响应接收操作,其将上层客户端API操作转换成相应的请求协议并发送到服务端,并完成对同步调用的返回和异步调用的回调。
- 同时,SendThread还负责将来自服务端的事件传递给EventThread去处理。
Sender进程就一直尝试与Zookeeper服务器进行交互:
//org.apache.zookeeper.ClientCnxn.SendThread
@Override
public void run() {
// ...
while (state.isAlive()) {
clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);
}
//...
}
// org.apache.zookeeper.ClientCnxnSocketNIO#doTransport
void doTransport(...) {
...
//监听Selector,对读和写进行操作
for (SelectionKey k : selected) {
...
if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
//doIO
doIO(pendingQueue, cnxn);
}
...
}
}
void doIO(Queue<Packet> pendingQueue, ClientCnxn cnxn) throws InterruptedException, IOException {
SocketChannel sock = (SocketChannel) sockKey.channel();
if (sockKey.isReadable()) {
// 读操作
}
if (sockKey.isWritable()) {
//写操作
}
}
2.3.2 outgoingQueue和pendingQueue
/**
* These are the packets that have been sent and are waiting for a response.
*/
private final Queue<Packet> pendingQueue = new ArrayDeque<>();
/**
* These are the packets that need to be sent.
*/
private final LinkedBlockingDeque<Packet> outgoingQueue = new LinkedBlockingDeque<Packet>();
- ClientCnxn中,有两个比较核心的队列outgoingQueue和pendingQueue,分别代表客户端的请求发送队列和服务端响应的等待队列。
- outgoingQueue队列是一个请求发送队列,专门用于存储那些需要发送到服务端的Packet集合。
- pendingQueue队列是为了存储那些已经从客户端发送到服务端的,但是需要等待服务端响应的Packet集合。(实现同步异步请求的关键)
2.3.3 发送数据
在正常情况下(即客户端与服务端之间的TCP连接正常且会话有效的情况下):
- 用户通过各种接口发送请求,都会通过submitRequest方法,将请求封装为packet, 被保存到outgoingQueue队列方法
//org.apache.zookeeper.ClientCnxn#submitRequest
public ReplyHeader submitRequest(RequestHeader h, Record request,
Record response, WatchRegistration watchRegistration)
throws InterruptedException {
ReplyHeader r = new ReplyHeader();
Packet packet = queuePacket(h, r, request, response, null, null, null,
null, watchRegistration);
synchronized (packet) {
while (!packet.finished) {
packet.wait();
}
}
return r;
}
- SenderThread 从outgoingQueue队列中提取一个可发送的Packet对象,同时生成一个客户端请求序号XID,并将其设置到Packet请求头中去,然后将其进行序列化后进行发送。这里提到的获取一个可发送的Packet对象指的哪些Packet呢?在outgoingQueue队列中的Packet整体上是按照先进先出的顺序被处理的,但是如果检测到客户端与服务端之间正在进行SASL权限的话,那么那些不含请求头(requestHeader)的Packet(例如会话创建请求)是可以被发送的,其余的都无法发送。
- 请求发送完毕后,会立即将该Packet保存到pendingQueue队列中,以便等待服务端响应返回后进行相应的处理。
//// org.apache.zookeeper.ClientCnxnSocketNIO#doIO
if (sockKey.isWritable()) {
// 会从outgoingQueue队列中提取一个可发送的Packet对象
Packet p = findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress());
...
// 发送请求
sock.write(p.bb);
if (!p.bb.hasRemaining()) {
sentCount.getAndIncrement();
outgoingQueue.removeFirstOccurrence(p);
if (p.requestHeader != null
&& p.requestHeader.getType() != OpCode.ping
&& p.requestHeader.getType() != OpCode.auth) {
synchronized (pendingQueue) {
//写入pendingQueue
pendingQueue.add(p);
}
}
}
...
}
-
2.3.4 响应接收
客户端获取到来自服务端的完整响应数据后,根据不同的客户端请求类型,会进行不同的处理
-
如果检测到当前客户端还未进行初始化,那么说明当前客户端与服务端之间正在进行会话创建,那么就直接将接收到的ByteBuffer(incomingBuffer)序列化为ConnectResponse对象
-
如果当前客户端已经处于正常的会话周期,那么接收到的服务端响应是一个事件,让eventThread触发相应的watcher。
-
如果是一个常规的请求响应(指的是Create、GetData和Exist等操作请求),那么会从pendingQueue队列中取出一个Packet来进行相应的处理。通过在finishPacket方法中处理响应:
-
- 如果存在Watcher,就注册
- 如果是同步请求,可以让调用方从阻塞中恢复。
- 如果是异步请求,放入EventQueue等待后续通知
-
// org.apache.zookeeper.ClientCnxnSocketNIO#doIO
if (sockKey.isReadable()) {
int rc = sock.read(incomingBuffer);
sendThread.readResponse(incomingBuffer);
lenBuffer.clear();
incomingBuffer = lenBuffer;
updateLastHeard();
}
// org.apache.zookeeper.ClientCnxnSocketNIO#readResponse
void readResponse(ByteBuffer incomingBuffer) throws IOException {
ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
ReplyHeader replyHdr = new ReplyHeader();
replyHdr.deserialize(bbia, "header");
switch (replyHdr.getXid()) {
...
// -1 means notification(WATCHER_EVENT)
// 如果是事务通知
case NOTIFICATION_XID:
LOG.debug("Got notification session id: 0x{}",
Long.toHexString(sessionId));
WatcherEvent event = new WatcherEvent();
event.deserialize(bbia, "response");
...
WatchedEvent we = new WatchedEvent(event);
LOG.debug("Got {} for session id 0x{}", we, Long.toHexString(sessionId));
//让eventThread触发相应的watcher
eventThread.queueEvent(we);
return;
default:
break;
}
//如果是常规应答
Packet packet;
synchronized (pendingQueue) {
if (pendingQueue.size() == 0) {
throw new IOException("Nothing in the queue, but got " + replyHdr.getXid());
}
packet = pendingQueue.remove();
}
...
// 处理Watcher注册等逻辑
finishPacket(packet);
}
2.4 EventThread
EventThread中有一个waitingEvents队列,用于临时存放那么需要被触发的Object,包括那些客户点注册的Watcher和异步接口中注册的回到器AsyncCallBack。
- SenderThread收到 event通知请求 时,会将Watcher 加入到 EventThread
- SenderThread收到 应答请求 时,会将AsyncCallBack 加入到 EventThread
同时,EventThread会不断地从waitingEvents这个队列中取出Object,识别出其具体类型(Watcher或者AsynCallBack),并分别调用process和processResult接口方法来实现对事件的触发和回调
//org.apache.zookeeper.ClientCnxn.EventThread#run
public void run() {
...
while (true) {
Object event = waitingEvents.take();
if (event == eventOfDeath) {
wasKilled = true;
} else {
processEvent(event);
}
}
...
}
//org.apache.zookeeper.ClientCnxn.EventThread#processEvent
private void processEvent(Object event) {
try {
if (event instanceof WatcherSetEventPair) {
WatcherSetEventPair pair = (WatcherSetEventPair) event;
for (Watcher watcher : pair.watchers) {
try {
watcher.process(pair.event);
} catch (Throwable t) {
LOG.error("Error while calling watcher ", t);
}
}
} else {
Packet p = (Packet) event;
int rc = 0;
StatCallback cb = (StatCallback) p.cb;
cb.processResult(rc, clientPath, p.ctx,
((ExistsResponse) p.response)
.getStat());
}
}
}
3. Zookeeper 客户端Watcher机制原理
ZooKeeper 允许客户端向服务端注册一个Watcher监听,当服务端的一些指定事件触发了这个Watcher,那么就会向指定客户端发送一个事件通知来实现分布式的通知功能。ZooKeeper的Watcher机制主要包括客户端线程、客户端WatchManager和ZooKeeper服务器三部分。
- 客户端向 ZooKeeper 服务器注册 Watcher
- ZooKeeper 注册成功后,会对客户端做出应答。
- 客户端将 Watcher 对象存储在客户端的 WatchManager 中;
- ZooKeeper 服务端触发 Watcher 事件后,向客户端发送通知;
- 客户端线程从 WatchManager 中取出对应的 Watcher 对象来执行回调逻辑。
以 getData
接口为例,过一下客户端的注册逻辑:
注册
- 当发送一个带有 Watch 事件的请求时,客户端首先会把该会话标记为带有 Watch 监控的事件请求,发送给服务器。
// org.apache.zookeeper.ZooKeeper#getData
public byte[] getData(final String path, Watcher watcher, Stat stat){
...
WatchRegistration wcb = null;
if (watcher != null) {
wcb = new DataWatchRegistration(watcher, clientPath);
}
RequestHeader h = new RequestHeader();
request.setWatch(watcher != null);
...
GetDataResponse response = new GetDataResponse();
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
}
//org.apache.zookeeper.ClientCnxn#submitRequest
public ReplyHeader submitRequest(RequestHeader h, Record request,
Record response, WatchRegistration watchRegistration)
throws InterruptedException {
ReplyHeader r = new ReplyHeader();
Packet packet = queuePacket(h, r, request, response, null, null, null,
null, watchRegistration);
synchronized (packet) {
while (!packet.finished) {
packet.wait();
}
}
return r;
}
- 上一步的“发送”其实就是写入
outgoingQueue
, 等待SenderThread发送 - 调用负责处理服务器响应的
SendThread
线程类中的readResponse
方法接收服务端的回调,并在最后执行finishPacket()
方法将 Watch 注册到ZKWatchManager
中。
// org.apache.zookeeper.ClientCnxn#finishPacket
private void finishPacket(Packet p) {
int err = p.replyHeader.getErr();
if (p.watchRegistration != null) {
p.watchRegistration.register(err);
}
...
}
客户端回调的处理过程
- 客户端使用
SendThread.readResponse()
方法来统一处理服务端的相应。通过请求头信息判断为事件通知类型,首先将己收到的字节流反序列化转换成WatcherEvent
对象。然后调用eventThread.queueEvent( )
方法将接收到的事件交给 EventThread 线程进行处理。 - 按照通知的事件类型,从 ZKWatchManager 中查询注册过的客户端 Watch 信息。客户端在查询到对应的 Watch 信息后,会将其从 ZKWatchManager 的管理中删除。
- 将查询到的 Watcher 存储到
waitingEvents
队列中,调用 EventThread 类中的 run 方法会循环取出在waitingEvents
队列中等待的 Watcher 事件进行处理。
public void queueEvent(WatchedEvent event) {
if (event.getType() == EventType.None
&& sessionState == event.getState()) {
return;
}
sessionState = event.getState();
// materialize the watchers based on the event
WatcherSetEventPair pair = new WatcherSetEventPair(
watcher.materialize(event.getState(), event.getType(),
event.getPath()),
event);
// queue the pair (watch set & event) for later processing
waitingEvents.add(pair);
}
ZkClient”夺权“EventThread
在使用ZooKeeper的Java客户端时,经常需要处理几个问题:重复注册watcher、session失效重连、异常处理。要解决上述的几个问题,可以自己解决,也可以采用第三方的java客户端来完成。这里就介绍一种常用的客户端zkclient。
原理
我们常用的ZkClient其实就是一个Watcher:
public class ZkClient implements Watcher {
}
在创建Zookeeper客户端的时候,它将自己当作DefaultWatcher传入,并且之后再设置监听都 watch = false
,对所有注册的事件都采用ZkClient来处理。
即ZkClient全面接手waitingEvents
的事件处理逻辑,调用自己内部实现的一个Event队列。
重复注册
看一下如何重复注册watcher:
- 不直接采用event返回的修改数据。由于ZK的watch一次性注册原因,事件触发和再次注册之间存在一个时间差,可能导致zookeeper客户端不能够接收到完所有的ZK事件。如果直接使用事件返回的修改数据,可能会导致数据滞后。
- 查询最新数据并且注册watcher。依赖于hasListeners()的判断,来决定是否再次注册
虽然通过触发事件时 get+重复注册可以保证获取的事件为最新,但是会导致ABA 问题,即中间修改事件的丢失。如果是分布式客户端的情况下,就会出现问题。有的客户端读到中间状态B,有的感知不到B。
参考文档
ZooKeeper客户端源码解读(网络I/O)
Zookeeper Watcher机制原理
ZooKeeper 的网络通信协议详解
ZooKeeper Watch 机制源码解读