1.基本架構
?2.ZAB協(xié)議
?? ZooKeeper并沒有完全采用Paxos算法,而是使用了一種稱為ZooKeeper Atomic Broadcast(ZAB,zookeeper原子消息廣播協(xié)議)的協(xié)議作為其數(shù)據(jù)一致性的核心算法。
??? 2.1選擇Leader需用半數(shù)通過才選舉成成功,同時集群中已經(jīng)有過半的機器與該Leader服務器完成狀態(tài)同步(數(shù)據(jù)同步)才能開始服務。
??? 2.2所有事務請求必須由一個全局唯一的服務器來協(xié)調(diào)處理,這樣的服務器稱為Leader服務器,而余下的其他服務器則成為Follower服務器。Leader服務器負責將一個客戶端事務請求轉(zhuǎn)換成一個事務Proposal(提議),并將該Proposal分發(fā)給集群中所有的Follower服務器。之后Leader服務器需要等待所有Follower服務器的反饋,一旦超過半數(shù)的Follower服務器進行了正確反饋后,那么Leader就會再次向所有的Follower服務器分發(fā)Commit消息,要求其將前一個Proposal進行提交。
3.Leader和Follower啟動過程
4.請求處理
?? 4.1請求處理鏈
????? 4.1.1leader請求處理鏈
????? 4.1.2follower請求處理鏈
??? 4.2處理流程
??? 以creater服務端為leade為例流程如下
???? FollowerZooKeeperServer與LeaderZooKeeperServer處理流程的差別是FollowerRequestProcessor會將事務請求轉(zhuǎn)發(fā)給leader,SendAckRequestProcessor向leader返回事務提議正確的響應,其他的處理鏈都是一致的。SendAckRequestProcessor和AckRequestProcessor的區(qū)別是AckRequestProcessor是leader的本地調(diào)用。FollowerRequestProcessor的事務請求的代碼如下
Java代碼 public?void?run()?{?????????try?{?????????????while?(!finished)?{?????????????????Request?request?=?queuedRequests.take();?????????????????if?(LOG.isTraceEnabled())?{?????????????????????ZooTrace.logRequest(LOG,?ZooTrace.CLIENT_REQUEST_TRACE_MASK,?????????????????????????????'F',?request,?"");?????????????????}?????????????????if?(request?==?Request.requestOfDeath)?{?????????????????????break;?????????????????}?????????????????//?We?want?to?queue?the?request?to?be?processed?before?we?submit?????????????????//?the?request?to?the?leader?so?that?we?are?ready?to?receive?????????????????//?the?response?????????????????nextProcessor.processRequest(request);??????????????????????????????????//?We?now?ship?the?request?to?the?leader.?As?with?all?????????????????//?other?quorum?operations,?sync?also?follows?this?code?????????????????//?path,?but?different?from?others,?we?need?to?keep?track?????????????????//?of?the?sync?operations?this?follower?has?pending,?so?we?????????????????//?add?it?to?pendingSyncs.?????????????????switch?(request.type)?{?????????????????case?OpCode.sync:?????????????????????zks.pendingSyncs.add(request);?????????????????????zks.getFollower().request(request);?????????????????????break;?????????????????case?OpCode.create:?????????????????case?OpCode.delete:?????????????????case?OpCode.setData:?????????????????case?OpCode.setACL:?????????????????case?OpCode.createSession:?????????????????case?OpCode.closeSession:?????????????????case?OpCode.multi:?????????????????????zks.getFollower().request(request);?????????????????????break;?????????????????}?????????????}?????????}?catch?(Exception?e)?{?????????????LOG.error("Unexpected?exception?causing?exit",?e);?????????}?????????LOG.info("FollowerRequestProcessor?exited?loop!");?????}??
5.數(shù)據(jù)同步
??? ZooKeeper集群數(shù)據(jù)同步分為4類,分別為直接差異化同步(DIFF)、先回滾再差異化同步(TRUNC+DIFF)、回滾同步(TRUNC)和全量同步(SNAP)。在同步之前,leader服務器先對peerLastZxid(該leader服務器最好處理的ZXID)、minCommittedLog(leader服務器提議緩存隊列committedLog中的最小ZXID)、maxCommittedLog(leader服務器提議緩存隊列committedLog中的最大ZXID)進行初始化,然后通過這3個ZXID值進行判斷同步類型,并進行同步。代碼見LearnerHandler的run方法:
Java代碼 .....??long?peerLastZxid;??StateSummary?ss?=?null;??long?zxid?=?qp.getZxid();??long?newEpoch?=?leader.getEpochToPropose(this.getSid(),?lastAcceptedEpoch);????if?(this.getVersion()?<?0x10000)?{??????//?we?are?going?to?have?to?extrapolate?the?epoch?information??????long?epoch?=?ZxidUtils.getEpochFromZxid(zxid);??????ss?=?new?StateSummary(epoch,?zxid);??????//?fake?the?message??????leader.waitForEpochAck(this.getSid(),?ss);??}?else?{??????byte?ver[]?=?new?byte[4];??????ByteBuffer.wrap(ver).putInt(0x10000);??????QuorumPacket?newEpochPacket?=?new?QuorumPacket(Leader.LEADERINFO,?ZxidUtils.makeZxid(newEpoch,?0),?ver,?null);??????oa.writeRecord(newEpochPacket,?"packet");??????bufferedOutput.flush();??????QuorumPacket?ackEpochPacket?=?new?QuorumPacket();??????ia.readRecord(ackEpochPacket,?"packet");??????if?(ackEpochPacket.getType()?!=?Leader.ACKEPOCH)?{??????????LOG.error(ackEpochPacket.toString()??????????????????+?"?is?not?ACKEPOCH");??????????return;????????ByteBuffer?bbepoch?=?ByteBuffer.wrap(ackEpochPacket.getData());??????ss?=?new?StateSummary(bbepoch.getInt(),?ackEpochPacket.getZxid());??????leader.waitForEpochAck(this.getSid(),?ss);??}??peerLastZxid?=?ss.getLastZxid();????/*?the?default?to?send?to?the?follower?*/??int?packetToSend?=?Leader.SNAP;??long?zxidToSend?=?0;??long?leaderLastZxid?=?0;??/**?the?packets?that?the?follower?needs?to?get?updates?from?**/??long?updates?=?peerLastZxid;????/*?we?are?sending?the?diff?check?if?we?have?proposals?in?memory?to?be?able?to???*?send?a?diff?to?the???*/???ReentrantReadWriteLock?lock?=?leader.zk.getZKDatabase().getLogLock();??ReadLock?rl?=?lock.readLock();??try?{??????rl.lock();??????????????final?long?maxCommittedLog?=?leader.zk.getZKDatabase().getmaxCommittedLog();??????final?long?minCommittedLog?=?leader.zk.getZKDatabase().getminCommittedLog();??????LOG.info("Synchronizing?with?Follower?sid:?"?+?sid??????????????+"?maxCommittedLog=0x"+Long.toHexString(maxCommittedLog)??????????????+"?minCommittedLog=0x"+Long.toHexString(minCommittedLog)??????????????+"?peerLastZxid=0x"+Long.toHexString(peerLastZxid));????????LinkedList