Zookeeper Watch机制

Zookeeper Watch机制

Watcher是一种简单的机制,使客户端得到关于ZooKeeper集合中的更改的通知。 客户端可以在读取特定znode时设置Watcher。Watcher会向注册的客户端发送任何znode(客户端注册表)更改的通知。

1. 概述

ZooKeeper Watch 机制是指,客户端在所有的读命令上告知服务端:这个节点或者子节点变化时通知我,具体来说,支持的写操作有:

  • getData
  • getChildren
  • exists

例如,我们在命令行可以输入 get -w /foo,其中 -w 参数就是用于告知 ZooKeeper 服务端,当前客户端想在 /foo 节点上设置一个监听器。

ZooKeeper Watch 机制的两个细节:

  • wactch 是一次性触发的(除了永久递归 watch),如果客户端如果在一个 watch 通知后继续收到相同节点的 watch 通知,那么必须再次注册 watch 一次;
  • 服务端发给客户端的 watch 通知并不包含具体的节点数据,其起到的作用非常存粹:告知客户端其关注的节点发生了 watch 事件;

本篇博客在客户端角度,从底层出发,看一下Zookeeper Watch机制。开始之前,先思考一下以下疑问,带着这些问题进行Zookeeper客户端的学习。

  • Zookeeper 客户端如何进行网络请求
  • Zookeeper 如何处理同步和异步请求
  • Zookeeper如何注册和触发Watcher
  • 我们常用的ZkClient又做了什么?

2. 客户端网络IO模型

Copy From ZooKeeper客户端源码解读(网络I/O)

2.1 整体结构图

ClientCnxnSocket 封装了底层Socket通信层, ClientCnxnSocket整体结构如图所示:

客户端IO模型
图 客户端网络IO模型

2.2 Packet

Packet是ClientCnxn内部定义的一个对协议层的封装,作为ZooKeeper中请求与响应的载体。

packet

从上图可以看出,Packet中包含了请求头、响应头、请求体、响应体、节点路径和注册的Watcher等信息。

2.3 SenderThread

2.3.1 基本概念

SendThread是客户端ClientCnxn内部一个核心的I/O调度线程,用于管理客户端和服务端之间的所有网络I/O操作。在ZooKeeper客户端的实际运行过程中

  • SendThead维护了客户端和服务端之间的会话生命周期,其通过在一定的周期频率内向服务器发送一个PING包来实现心跳检测,同时,在会话周期内,如果客户端和服务端之间出现TCP连接断开的情况,那么就会自动而且透明化完成重连操作。
  • 另一方面,SendThread管理了客户端所有的请求发送和响应接收操作,其将上层客户端API操作转换成相应的请求协议并发送到服务端,并完成对同步调用的返回和异步调用的回调。
  • 同时,SendThread还负责将来自服务端的事件传递给EventThread去处理。

Sender进程就一直尝试与Zookeeper服务器进行交互:

//org.apache.zookeeper.ClientCnxn.SendThread

@Override
public void run() {
   // ...
   while (state.isAlive()) {
       clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);
   }
   //...
}
 // org.apache.zookeeper.ClientCnxnSocketNIO#doTransport
 void doTransport(...) {
         ...
        //监听Selector,对读和写进行操作
        for (SelectionKey k : selected) {
            ...
            if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
              	//doIO
                doIO(pendingQueue, cnxn);
            }
          ...
        }
        
        
    }

void doIO(Queue<Packet> pendingQueue, ClientCnxn cnxn) throws InterruptedException, IOException {
     SocketChannel sock = (SocketChannel) sockKey.channel();
  	 if (sockKey.isReadable()) {
       // 读操作
     }
  
  	  if (sockKey.isWritable()) {
        //写操作
      } 
}
  

2.3.2 outgoingQueue和pendingQueue

    /**
     * These are the packets that have been sent and are waiting for a response.
     */
    private final Queue<Packet> pendingQueue = new ArrayDeque<>();

    /**
     * These are the packets that need to be sent.
     */
    private final LinkedBlockingDeque<Packet> outgoingQueue = new LinkedBlockingDeque<Packet>();
  • ClientCnxn中,有两个比较核心的队列outgoingQueue和pendingQueue,分别代表客户端的请求发送队列和服务端响应的等待队列。
    • outgoingQueue队列是一个请求发送队列,专门用于存储那些需要发送到服务端的Packet集合
    • pendingQueue队列是为了存储那些已经从客户端发送到服务端的,但是需要等待服务端响应的Packet集合。(实现同步异步请求的关键

2.3.3 发送数据

在正常情况下(即客户端与服务端之间的TCP连接正常且会话有效的情况下):

  • 用户通过各种接口发送请求,都会通过submitRequest方法,将请求封装为packet, 被保存到outgoingQueue队列方法
//org.apache.zookeeper.ClientCnxn#submitRequest
public ReplyHeader submitRequest(RequestHeader h, Record request,
            Record response, WatchRegistration watchRegistration)
            throws InterruptedException {
        ReplyHeader r = new ReplyHeader();
        Packet packet = queuePacket(h, r, request, response, null, null, null,
                    null, watchRegistration);
        synchronized (packet) {
            while (!packet.finished) {
                packet.wait();
            }
        }
        return r;
    }
  • SenderThreadoutgoingQueue队列中提取一个可发送的Packet对象,同时生成一个客户端请求序号XID,并将其设置到Packet请求头中去,然后将其进行序列化后进行发送。这里提到的获取一个可发送的Packet对象指的哪些Packet呢?在outgoingQueue队列中的Packet整体上是按照先进先出的顺序被处理的,但是如果检测到客户端与服务端之间正在进行SASL权限的话,那么那些不含请求头(requestHeader)的Packet(例如会话创建请求)是可以被发送的,其余的都无法发送。
  • 请求发送完毕后,会立即将该Packet保存到pendingQueue队列中,以便等待服务端响应返回后进行相应的处理。
//// org.apache.zookeeper.ClientCnxnSocketNIO#doIO
if (sockKey.isWritable()) {
    // 会从outgoingQueue队列中提取一个可发送的Packet对象
    Packet p = findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress());
    ...
    // 发送请求
    sock.write(p.bb);
    if (!p.bb.hasRemaining()) {
         sentCount.getAndIncrement();
          outgoingQueue.removeFirstOccurrence(p);
          if (p.requestHeader != null
              && p.requestHeader.getType() != OpCode.ping
               && p.requestHeader.getType() != OpCode.auth) {
                   synchronized (pendingQueue) {
                     
                     //写入pendingQueue
                      pendingQueue.add(p);
                   }
             }
       }
     ... 
}
  • 2.3.4 响应接收

    客户端获取到来自服务端的完整响应数据后,根据不同的客户端请求类型,会进行不同的处理

    • 如果检测到当前客户端还未进行初始化,那么说明当前客户端与服务端之间正在进行会话创建,那么就直接将接收到的ByteBuffer(incomingBuffer)序列化为ConnectResponse对象

    • 如果当前客户端已经处于正常的会话周期,那么接收到的服务端响应是一个事件,让eventThread触发相应的watcher。

    • 如果是一个常规的请求响应(指的是Create、GetData和Exist等操作请求),那么会从pendingQueue队列中取出一个Packet来进行相应的处理。通过在finishPacket方法中处理响应:

      • 如果存在Watcher,就注册
      • 如果是同步请求,可以让调用方从阻塞中恢复。
      • 如果是异步请求,放入EventQueue等待后续通知
// org.apache.zookeeper.ClientCnxnSocketNIO#doIO
if (sockKey.isReadable()) {
      int rc = sock.read(incomingBuffer);
   		sendThread.readResponse(incomingBuffer);
      lenBuffer.clear();
      incomingBuffer = lenBuffer;
      updateLastHeard();
   
 }
// org.apache.zookeeper.ClientCnxnSocketNIO#readResponse
 void readResponse(ByteBuffer incomingBuffer) throws IOException {
            ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
            BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
            ReplyHeader replyHdr = new ReplyHeader();
            replyHdr.deserialize(bbia, "header");
		 switch (replyHdr.getXid()) {
         ...
           
        // -1 means notification(WATCHER_EVENT)
        // 如果是事务通知
         case NOTIFICATION_XID:
                LOG.debug("Got notification session id: 0x{}",
                    Long.toHexString(sessionId));
                WatcherEvent event = new WatcherEvent();
                event.deserialize(bbia, "response");
								...
                WatchedEvent we = new WatchedEvent(event);
                LOG.debug("Got {} for session id 0x{}", we, Long.toHexString(sessionId));
         				//让eventThread触发相应的watcher
                eventThread.queueEvent(we);
                return;
          default:
                break;
    }
   
   //如果是常规应答
   Packet packet;
   synchronized (pendingQueue) {
        if (pendingQueue.size() == 0) {
              throw new IOException("Nothing in the queue, but got " + replyHdr.getXid());
         }
         packet = pendingQueue.remove();
    }
   ...
   // 处理Watcher注册等逻辑
   finishPacket(packet);
   
 }  
            


2.4 EventThread

EventThread中有一个waitingEvents队列,用于临时存放那么需要被触发的Object,包括那些客户点注册的Watcher和异步接口中注册的回到器AsyncCallBack

  • SenderThread收到 event通知请求 时,会将Watcher 加入到 EventThread
  • SenderThread收到 应答请求 时,会将AsyncCallBack 加入到 EventThread

同时,EventThread会不断地从waitingEvents这个队列中取出Object,识别出其具体类型(Watcher或者AsynCallBack),并分别调用process和processResult接口方法来实现对事件的触发和回调

//org.apache.zookeeper.ClientCnxn.EventThread#run   
public void run() {
         ...
       while (true) {
          Object event = waitingEvents.take();
          if (event == eventOfDeath) {
               wasKilled = true;
           } else {
                processEvent(event);
            }
                    
        }

             ...
  }
//org.apache.zookeeper.ClientCnxn.EventThread#processEvent   
private void processEvent(Object event) {
          try {
              if (event instanceof WatcherSetEventPair) {
                 WatcherSetEventPair pair = (WatcherSetEventPair) event;
                        for (Watcher watcher : pair.watchers) {
                      try {
                          watcher.process(pair.event);
                      } catch (Throwable t) {
                          LOG.error("Error while calling watcher ", t);
                      }
                  } 
              } else {
                  Packet p = (Packet) event;
                  int rc = 0;
                  StatCallback cb = (StatCallback) p.cb;
                 cb.processResult(rc, clientPath, p.ctx,
                                      ((ExistsResponse) p.response)
                                              .getStat());
              }
          }
     
   }

3. Zookeeper 客户端Watcher机制原理

ZooKeeper 允许客户端向服务端注册一个Watcher监听,当服务端的一些指定事件触发了这个Watcher,那么就会向指定客户端发送一个事件通知来实现分布式的通知功能。ZooKeeper的Watcher机制主要包括客户端线程客户端WatchManagerZooKeeper服务器三部分。

  1. 客户端向 ZooKeeper 服务器注册 Watcher
  2. ZooKeeper 注册成功后,会对客户端做出应答。
  3. 客户端将 Watcher 对象存储在客户端的 WatchManager 中
  4. ZooKeeper 服务端触发 Watcher 事件后,向客户端发送通知;
  5. 客户端线程从 WatchManager 中取出对应的 Watcher 对象来执行回调逻辑。
Watcher注册与触发

图 Watcher注册与触发

getData 接口为例,过一下客户端的注册逻辑:

注册

  1. 当发送一个带有 Watch 事件的请求时,客户端首先会把该会话标记为带有 Watch 监控的事件请求,发送给服务器。
// org.apache.zookeeper.ZooKeeper#getData
public byte[] getData(final String path, Watcher watcher, Stat stat){
		...
		WatchRegistration wcb = null;
		if (watcher != null) {
		wcb = new DataWatchRegistration(watcher, clientPath);
		}
		RequestHeader h = new RequestHeader();
		request.setWatch(watcher != null);
		...
		GetDataResponse response = new GetDataResponse();
		ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
}
//org.apache.zookeeper.ClientCnxn#submitRequest
public ReplyHeader submitRequest(RequestHeader h, Record request,
            Record response, WatchRegistration watchRegistration)
            throws InterruptedException {
        ReplyHeader r = new ReplyHeader();
        Packet packet = queuePacket(h, r, request, response, null, null, null,
                    null, watchRegistration);
        synchronized (packet) {
            while (!packet.finished) {
                packet.wait();
            }
        }
        return r;
    }
  1. 上一步的“发送”其实就是写入outgoingQueue, 等待SenderThread发送
  2. 调用负责处理服务器响应的 SendThread 线程类中的readResponse 方法接收服务端的回调,并在最后执行 finishPacket()方法将 Watch 注册到 ZKWatchManager 中。
// org.apache.zookeeper.ClientCnxn#finishPacket
private void finishPacket(Packet p) {
		int err = p.replyHeader.getErr();
		if (p.watchRegistration != null) {
			p.watchRegistration.register(err);
		}
		...
}

客户端回调的处理过程

  1. 客户端使用 SendThread.readResponse() 方法来统一处理服务端的相应。通过请求头信息判断为事件通知类型,首先将己收到的字节流反序列化转换成 WatcherEvent 对象。然后调用 eventThread.queueEvent( )方法将接收到的事件交给 EventThread 线程进行处理。
  2. 按照通知的事件类型,从 ZKWatchManager 中查询注册过的客户端 Watch 信息。客户端在查询到对应的 Watch 信息后,会将其从 ZKWatchManager 的管理中删除。
  3. 将查询到的 Watcher 存储到 waitingEvents 队列中,调用 EventThread 类中的 run 方法会循环取出在 waitingEvents 队列中等待的 Watcher 事件进行处理。
 public void queueEvent(WatchedEvent event) {
            if (event.getType() == EventType.None
                    && sessionState == event.getState()) {
                return;
            }
            sessionState = event.getState();

            // materialize the watchers based on the event
            WatcherSetEventPair pair = new WatcherSetEventPair(
                    watcher.materialize(event.getState(), event.getType(),
                            event.getPath()),
                            event);
            // queue the pair (watch set & event) for later processing
            waitingEvents.add(pair);
        }

ZkClient”夺权“EventThread

在使用ZooKeeper的Java客户端时,经常需要处理几个问题:重复注册watcher、session失效重连、异常处理。要解决上述的几个问题,可以自己解决,也可以采用第三方的java客户端来完成。这里就介绍一种常用的客户端zkclient。

原理

我们常用的ZkClient其实就是一个Watcher:

public class ZkClient implements Watcher {
  
}

在创建Zookeeper客户端的时候,它将自己当作DefaultWatcher传入,并且之后再设置监听都 watch = false,对所有注册的事件都采用ZkClient来处理。

即ZkClient全面接手waitingEvents 的事件处理逻辑,调用自己内部实现的一个Event队列。

zkClient托管
图 zkClient“夺权”EventThread

重复注册

看一下如何重复注册watcher:

  • 不直接采用event返回的修改数据。由于ZK的watch一次性注册原因,事件触发和再次注册之间存在一个时间差,可能导致zookeeper客户端不能够接收到完所有的ZK事件。如果直接使用事件返回的修改数据,可能会导致数据滞后。
  • 查询最新数据并且注册watcher。依赖于hasListeners()的判断,来决定是否再次注册

虽然通过触发事件时 get+重复注册可以保证获取的事件为最新,但是会导致ABA 问题,即中间修改事件的丢失。如果是分布式客户端的情况下,就会出现问题。有的客户端读到中间状态B,有的感知不到B。

参考文档

ZooKeeper客户端源码解读(网络I/O)
Zookeeper Watcher机制原理
ZooKeeper 的网络通信协议详解
ZooKeeper Watch 机制源码解读