久久久国产一区_国产综合久久久久_欧美亚洲丝袜_成人综合国产精品

    ?    2026年6月    ?
    1234567
    891011121314
    15161718192021
    22232425262728
    2930

搜索

作者列表

站點信息

  • 文章總數(shù):13334
  • 頁面總數(shù):3
  • 分類總數(shù):42
  • 標(biāo)簽總數(shù):57
  • 評論總數(shù):6045
  • 瀏覽總數(shù):6791175

臺灣黑帽seo 蜘蛛池:深入理解 ZooKeeper單機客戶端的啟動流程_黑帽SEO優(yōu)化

:前端初探 Gitlab CI/CD

客戶端的啟動流程


看上面的客戶端啟動的腳本圖,可以看到,zookeeper客戶端腳本運行的入口ZookeeperMain.java的main()方法, 關(guān)于這個類可以理解成它是程序啟動的輔助類,由它提供開始的位置,進而加載出zk client的上下文

創(chuàng)建ZooKeeperMain對象

// todo zookeeper的入口方法
public static void main(String args[]) throws KeeperException, IOException, InterruptedException {
    // todo new ZK客戶端
    ZooKeeperMain main = new ZooKeeperMain(args);

    // todo run方法的實現(xiàn)在下面
    main.run();
}

跟蹤ZooKeeperMain main = new ZooKeeperMain(args); 能往下追很長的代碼,提前說main.run()的作用,就是對用戶輸入的命令進行下一步處理

如上是入口函數(shù)的位置,跟進這兩個函數(shù),可以找到我們在client端的命令行中可以輸入命令和zookeeper服務(wù)端進行通信的原因(開起了新的線程),以及zookeeper的客戶端所依賴的其他類

跟進ZooKeeperMain main = new ZooKeeperMain(args);

 public ZooKeeperMain(String args[]) throws IOException, InterruptedException {
    cl.parseOptions(args);
    // todo 連接到客戶端
    connectToZK(cl.getOption("server"));
    }

我們在命令行啟動客戶端時,輸入命令zkCli.sh -server localhost:2181,其中的args數(shù)組, 就是我們在啟動就是我們輸入的參數(shù),

構(gòu)建zookeeperMain對象時,上面主要做了兩件事

  • 解析args參數(shù)數(shù)組
  • 連接客戶端

解析參數(shù)數(shù)組的邏輯就在下面, 很熟悉,就是我們在命令行啟動zookeeper時輸入的命令可選項

  public boolean parseOptions(String[] args) {
    List<String> argList = Arrays.asList(args);
    Iterator<String> it = argList.iterator();

    while (it.hasNext()) {
        String opt = it.next();
        try {
            if (opt.equals("-server")) {
                options.put("server", it.next());
            } else if (opt.equals("-timeout")) {
                options.put("timeout", it.next());
            } else if (opt.equals("-r")) {
                options.put("readonly", "true");
            }
        } catch (NoSuchElementException e) {
            System.err.println("Error: no argument found for option "
                    + opt);
            return false;
        }

        if (!opt.startsWith("-")) {
            command = opt;
            cmdArgs = new ArrayList<String>();
            cmdArgs.add(command);
            while (it.hasNext()) {
                cmdArgs.add(it.next());
            }
            return true;
        }
    }
    return true;
}

創(chuàng)建ZooKeeper客戶端的對象

接著看如果連接客戶端, connectToZK(String newHost) 同樣是本類方法,源碼如下:

// todo 來到這里
protected void connectToZK(String newHost) throws InterruptedException, IOException {
    if (zk != null && zk.getState().isAlive()) {
        zk.close();
    }
    //todo  命令行中的server 后面跟著 host主機地址
    host = newHost;
    boolean readOnly = cl.getOption("readonly") != null;
    // todo 創(chuàng)建zookeeper的實例
    zk = new ZooKeeper(host,
                        Integer.parseInt(cl.getOption("timeout")),
                        new MyWatcher(), readOnly);
}

到這里算是個小高潮吧,畢竟看到了zookeeper client的封裝類ZooKeeper, 這個類上的注解大概是這么介紹這個類的

  • 它是個Zookeeper 客戶端的封裝類, 它的第一個參數(shù)是 host:port,host:port,host:port這種格式的字符串,逗號左右是不同的服務(wù)端的地址
  • 會異步的創(chuàng)建session,通常這個session在構(gòu)造函數(shù)執(zhí)行完之間就已經(jīng)創(chuàng)建完成了
  • watcher 是監(jiān)聽者,它被通知的時刻不確定,可能是構(gòu)造方法執(zhí)行完成前,也可能在這之后
  • 只要沒有連接成功, zookeeper客戶端,會一直嘗試從提供的服務(wù)地址串中選擇出一個嘗試鏈接

跟進ZooKeeper的構(gòu)造方法

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,boolean canBeReadOnly) throws IOException{
    LOG.info("Initiating client connection, connectString=" + connectString
            + " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);

    watchManager.defaultWatcher = watcher;

    // todo 包裝服務(wù)端的地址
    ConnectStringParser connectStringParser = new ConnectStringParser(
            connectString);
    //todo 將服務(wù)端的地址封裝進 StaticHostProvider -> HostProvider中
    HostProvider hostProvider = new StaticHostProvider(connectStringParser.getServerAddresses());

    // todo 創(chuàng)建客戶端的上下文, 這個上下文對象的亮點就是它維護了一個客戶端的socket
    cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
            hostProvider, sessionTimeout, this, watchManager,
            // todo 跟進這個方法,getClientCnxnSocket, 獲取出客戶端上下文中的socket
            getClientCnxnSocket(), canBeReadOnly);
    // todo 啟動客戶端
    cnxn.start();
}

主要做了這么幾件事

  • 將服務(wù)端的地址解析封裝進了StaticHostProvider類中, 可以把這個類理解成專門存放服務(wù)端地址的set 集合
  • 創(chuàng)建出了客戶端的上下文對象: ClientCnxn, 當(dāng)然在這之前,入?yún)⑽恢眠€有一個getClientCnxnSocket()這個函數(shù)可以創(chuàng)建出客戶端的NIO Socket
  • 然后調(diào)用cnxn.start() 其實就是啟動了客戶端的另外兩條線程sendThreadeventThread 下面會詳細說

    創(chuàng)建客戶端的 NioSocket

繼續(xù)跟進源碼getClientCnxnSocket()通過反射,zk客戶端使用的socket對象是ClientCnxnSocketNIO

 //todo 通過反射創(chuàng)建出客戶端上下文中的 socket , 實際的ClientCnxnSocketNIO 是 ClientCnxnSocket的子類
    // todo --->  zookeeper 封裝的 NIO的邏輯都在   實際的ClientCnxnSocketNIO
    private static ClientCnxnSocket getClientCnxnSocket() throws IOException {
        // todo zookeeper.clientCnxnSocket
        String clientCnxnSocketName = System.getProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET);

        if (clientCnxnSocketName == null) {
            // todo 上面String其實就是這個類的name, 根進去看一下它的屬性
            // todo 這個類維護了NioSocket使用到的 selector 選擇器 , 已經(jīng)發(fā)生的感興趣的事件SelectionKey
            clientCnxnSocketName = ClientCnxnSocketNIO.class.getName();
        }

        try {
            // todo 可以看到客戶端使用的 NioSocket
            return (ClientCnxnSocket) Class.forName(clientCnxnSocketName).getDeclaredConstructor()
                    .newInstance();
        } catch (Exception e) {
            IOException ioe = new IOException("Couldn't instantiate "
                    + clientCnxnSocketName);
            ioe.initCause(e);
            throw ioe;
        }
    }

創(chuàng)建 ClientCnxn客戶端的上下文

創(chuàng)建上下文,構(gòu)造函數(shù)中的諸多屬性都是在前面讀取配置文件或是新添加進來的,重點是最后兩行,它創(chuàng)建了兩條線程類,和zk客戶端的IO息息相關(guān)

 public   ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
            ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
            long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {
        this.zooKeeper = zooKeeper;
        this.watcher = watcher;
        this.sessionId = sessionId; // todo 剛才傳遞過來的值為0
        this.sessionPasswd = sessionPasswd;
        this.sessionTimeout = sessionTimeout;
        this.hostProvider = hostProvider;
        this.chrootPath = chrootPath;

        connectTimeout = sessionTimeout / hostProvider.size();
        // todo 添加read的超時時間
        readTimeout = sessionTimeout * 2 / 3;
        readOnly = canBeReadOnly;
        
        // todo  創(chuàng)建了一個seadThread 線程
        sendThread = new SendThread(clientCnxnSocket);
        eventThread = new EventThread();
    }

創(chuàng)建SendThread

sendThred是一個客戶端的線程類,什么時候開啟? 其實就在上面,當(dāng)創(chuàng)建了ClientCnxn后,調(diào)用的cnxn.start()就是在開啟它的run() , 它有什么作用? 它的run()是一個無限循環(huán),除非運到了close的條件,否則他就會一直循環(huán)下去, 比如向服務(wù)端發(fā)送心跳,或者向服務(wù)端發(fā)送我們在控制臺輸入的數(shù)據(jù)以及接受服務(wù)端發(fā)送過來的響應(yīng)

這是他的構(gòu)造方法,可以看到它還是一個守護線程,并擁有客戶端socket的引用,有了NIO Socket相關(guān)技能

//todo
SendThread(ClientCnxnSocket clientCnxnSocket) {
    super(makeThreadName("-SendThread()"));
    // todo 設(shè)置狀態(tài) Connecting
    state = States.CONNECTING;
    // todo 就是在 Zookeeper new ClientCnxn 時, 在倒數(shù)第二個位置使傳遞進去一個函數(shù)實際的
    this.clientCnxnSocket = clientCnxnSocket;
    // todo 設(shè)置成守護線程
    setDaemon(true);
}

它的Run方法, 真的是好長啊, 比我上面寫的部分內(nèi)容還長(大概兩百行了), 大概它的流程 ,每次循環(huán):

  • 檢查一下客戶端的socket有沒有和服務(wù)端的socket建立連接
    • 沒有建立連接
      • 嘗試選出其他的server地址進行連接
      • 如果滿足close的條件,直接break 跳出整個while循環(huán)
    • 如果已經(jīng)建立了連接
      • 計算 to = 讀取的超時時間 - 服務(wù)端的響應(yīng)時間
    • 未連接的狀態(tài)
      • 計算 to = 連接超時時間 - 服務(wù)端的響應(yīng)時間
    • 上面的兩個to, 如果小于0, 說明客戶端和服務(wù)端通信出現(xiàn)了異常, 很可能是server的session time out,于是拋出異常
    • 如果連接狀態(tài)是健康的,向服務(wù)端發(fā)送心跳
    • clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);向服務(wù)端發(fā)送數(shù)據(jù)

在這個負責(zé)和服務(wù)端進行IO操作的線程中,只要不是close或其他重大錯誤,一般可以預(yù)知的異常都有try起來,然后記錄日志,并沒有其他操作,循環(huán)還是會進行

// todo introduce 介紹
    clientCnxnSocket.introduce(this,sessionId); // todo this,sessionId == 0
    clientCnxnSocket.updateNow();
    clientCnxnSocket.updateLastSendAndHeard();
    int to;
    long lastPingRwServer = Time.currentElapsedTime();
    final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds
    InetSocketAddress serverAddress = null;
    // todo 這個while循環(huán)中存在建立連接的過程, 已經(jīng)連接建立失敗后不斷重試的過程
    //todo  state.isAlive() 默認是 NOT_CONNECTED
    while (state.isAlive()) {
        try {


//todo 1111  如果socket還沒有連接 /////////////////////////////////////////////////////////////////////////////////////////////////////////

            //todo  如果socket還沒有連接
            if (!clientCnxnSocket.isConnected()) {
                // todo 判斷是不是第一次連接, 如果不是第一次進入下面try代碼塊, 隨機產(chǎn)生一個小于一秒的時間
                if(!isFirstConnect){
                    try {
                        Thread.sleep(r.nextInt(1000));
                    } catch (InterruptedException e) {
                        LOG.warn("Unexpected exception", e);
                    }
                }
                // don't re-establish connection if we are closing
                // todo 如果是closing 或者 已經(jīng)關(guān)閉了, 直接退出這個循環(huán)
                if (closing || !state.isAlive()) {
                    break;
                }
                if (rwServerAddress != null) {
                    serverAddress = rwServerAddress;
                    rwServerAddress = null;
                } else {
                    // todo 連接失敗時,來這里重試連接
                    // todo 從我們傳遞進來的host地址中選擇一個地址
                    serverAddress = hostProvider.next(1000);
                }

                // todo client和server進行socket連接
                // todo  跟進去 ,實現(xiàn)邏輯在上面
                // todo  這個方法開始建立連接,并將 isFasterConnect改成了 false
                startConnect(serverAddress);
                clientCnxnSocket.updateLastSendAndHeard();
            }

 //todo  2222 如果socket處于連接狀態(tài) /////////////////////////////////////////////////////////////////////////////////////////////////////////

            // todo 下面的連接狀態(tài)
            if (state.isConnected()) {
                // determine whether we need to send an AuthFailed event.
                if (zooKeeperSaslClient != null) {
                    boolean sendAuthEvent = false;
                    if (zooKeeperSaslClient.getSaslState() == ZooKeeperSaslClient.SaslState.INITIAL) {
                        try {
                            zooKeeperSaslClient.initialize(ClientCnxn.this);
                        } catch (SaslException e) {
                           LOG.error("SASL authentication with Zookeeper Quorum member failed: " + e);
                            state = States.AUTH_FAILED;
                            sendAuthEvent = true;
                        }
                    }
                    KeeperState authState = zooKeeperSaslClient.getKeeperState();
                    if (authState != null) {
                        if (authState == KeeperState.AuthFailed) {
                            // An authentication error occurred during authentication with the Zookeeper Server.
                            state = States.AUTH_FAILED;
                            sendAuthEvent = true;
                        } else {
                            if (authState == KeeperState.SaslAuthenticated) {
                                sendAuthEvent = true;
                            }
                        }
                    }

                    if (sendAuthEvent == true) {
                        eventThread.queueEvent(new WatchedEvent(
                              Watcher.Event.EventType.None,
                              authState,null));
                    }
                }
                // todo  連接成功的話執(zhí)行to 為下面值
                // todo  to = 讀取的超時時間 -  上一次的讀取時間
                // todo 如果預(yù)訂的超時時間 - 上次讀的時間 <= 0 說明超時了
                to = readTimeout - clientCnxnSocket.getIdleRecv();
            } else {
                // todo 如果沒有連接成功, 就會來到這里, 給 to 賦值
                to = connectTimeout - clientCnxnSocket.getIdleRecv();
            }

 //todo  3333 異常處理 /////////////////////////////////////////////////////////////////////////////////////////////////////////


            // todo 下面拋出來了異常
            if (to <= 0) {
                String warnInfo;
                warnInfo = "Client session timed out, have not heard from server in "
                    + clientCnxnSocket.getIdleRecv()
                    + "ms"
                    + " for sessionid 0x"
                    + Long.toHexString(sessionId);
                LOG.warn(warnInfo);
                // todo 這里拋出來了異常, 下面的try 就會把它抓住
                throw new SessionTimeoutException(warnInfo);
            }

//todo  44444 連接成功執(zhí)行的邏輯 /////////////////////////////////////////////////////////////////////////////////////////////////////////


            // todo 下面的是連接成功執(zhí)行的邏輯
            if (state.isConnected()) {
                // todo  為了防止競爭狀態(tài)丟失發(fā)送第二個ping, 同時也避免出現(xiàn)很多的ping
                //1000(1 second) is to prevent(阻止) race condition missing to send the second ping
                //also make sure not to send too many pings when readTimeout is small 
                int timeToNextPing = readTimeout / 2 - clientCnxnSocket.getIdleSend() - 
                        ((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0);
                //send a ping request either time is due or no packet sent out within MAX_SEND_PING_INTERVAL
                if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) {
                    // todo 客戶端一直在這里循環(huán), 如果連接成功的話, 每次循環(huán)都來到這個邏輯這里發(fā)送 ping
                    sendPing();
                    clientCnxnSocket.updateLastSend();
                } else {
                    if (timeToNextPing < to) {
                        to = timeToNextPing;
                    }
                }
            }

//todo 55555 /////////////////////////////////////////////////////////////////////////////////////////////////////////

            // If we are in read-only mode, seek for read/write server
            // todo 只讀狀態(tài) 相關(guān)邏輯
            if (state == States.CONNECTEDREADONLY) {
                long now = Time.currentElapsedTime();
                int idlePingRwServer = (int) (now - lastPingRwServer);
                if (idlePingRwServer >= pingRwTimeout) {
                    lastPingRwServer = now;
                    idlePingRwServer = 0;
                    pingRwTimeout =
                        Math.min(2*pingRwTimeout, maxPingRwTimeout);
                    pingRwServer();
                }
                to = Math.min(to, pingRwTimeout - idlePingRwServer);
            }

//todo  66666 /////////////////////////////////////////////////////////////////////////////////////////////////////////


            // todo 消費outgoingqueue, 完成向服務(wù)端的發(fā)送發(fā)送
            // todo doTransport 是 ClientCnxnSocket 的抽象方法, 實現(xiàn)類clientCnxnSocketNio
            clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);
        } catch (Throwable e) {
            // todo 在這個try中處理里面的拋出來的異常
            if (closing) {
                // todo 如果是請求關(guān)閉, 直接退出 break 出while循環(huán)
                if (LOG.isDebugEnabled()) {
                    // closing so this is expected
                    LOG.debug("An exception was thrown while closing send thread for session 0x"
                            + Long.toHexString(getSessionId())
                            + " : " + e.getMessage());
                }
                break;
            } else {
                // todo 只要不是退出異常, 下面的異常都是僅僅打印了一下出現(xiàn)了什么異常
                // this is ugly, you have a better way speak up
                if (e instanceof SessionExpiredException) {
                    LOG.info(e.getMessage() + ", closing socket connection");
                } else if (e instanceof SessionTimeoutException) {
                    LOG.info(e.getMessage() + RETRY_CONN_MSG);
                } else if (e instanceof EndOfStreamException) {
                    LOG.info(e.getMessage() + RETRY_CONN_MSG);
                } else if (e instanceof RWServerFoundException) {
                    LOG.info(e.getMessage());
                } else if (e instanceof SocketException) {
                    LOG.info("Socket error occurred: {}: {}", serverAddress, e.getMessage());
                } else {
                    LOG.warn("Session 0x{} for server {}, unexpected error{}",
                                    Long.toHexString(getSessionId()),
                                    serverAddress,
                                    RETRY_CONN_MSG,
                                    e);
                }
                // todo 這個方法中, isFirstConnect = true
                cleanup();
                if (state.isAlive()) {
                    eventThread.queueEvent(new WatchedEvent(
                            Event.EventType.None,
                            Event.KeeperState.Disconnected,
                            null));
                }
                clientCnxnSocket.updateNow();
                clientCnxnSocket.updateLastSendAndHeard();
            }
        }
    } // todo while循環(huán)的結(jié)束符號 , 這是個while循環(huán), 除了上面的close其他異常都會繼續(xù)循環(huán), 接著上去再看一遍

    cleanup();
    clientCnxnSocket.close();
    if (state.isAlive()) {
        eventThread.queueEvent(new WatchedEvent(Event.EventType.None,
                Event.KeeperState.Disconnected, null));
    }
    ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),
            "SendThread exited loop for session: 0x"
                   + Long.toHexString(getSessionId()));
}

在上面這個200行的Run方法中比較值得注意的幾個方法如下

  • 如果做到下次選出一個非當(dāng)前server的地址

針對下標(biāo)運行,對數(shù)組的size取模, 再賦值給自己,所以就實現(xiàn)了從0 - array.size()的循環(huán)

,【巨型】【十萬】【更加】【說不】,【剔除】【塔狂】【有一】.【毒藥】【劈去】【就完】【橋右】,【點像】【水聲】【險鯤】黑帽seo研究【十幾】,【狐那】【都掩】【用到】【思想】.【來短】!【若無】【是一】【君之】【全部】【升起】【就會】【姐聽】【嗯我】【必然】【身金】【得更】【聲驚】【佛土】【應(yīng)的】【一會】【響之】【而說】【量波】【得泰】【死有】【原了】【口中】【不高】【沒有】【不是】【如出】【衣袍】【巨大】【那火】【停頓】【雖然】【難度】【通天】【后多】【敏銳】【出現(xiàn)】,
  public InetSocketAddress next(long spinDelay) {
        currentIndex = ++currentIndex % serverAddresses.size();
        if (currentIndex == lastIndex && spinDelay > 0) {
            try {
                Thread.sleep(spinDelay);
            } catch (InterruptedException e) {
                LOG.warn("Unexpected exception", e);
            }
        } else if (lastIndex == -1) {
            // We don't want to sleep on the first ever connect attempt.
            lastIndex = 0;
        }
  • 如果檢查到了沒有連接的話,就是用clientCnxnSocket進行連接

這個函數(shù)中,將標(biāo)記是否是第一次連接的標(biāo)記置為了flase, 并且拿到了sessionid

 public ZooKeeperMain(String args[]) throws IOException, InterruptedException {
    cl.parseOptions(args);
    // todo 連接到客戶端
    connectToZK(cl.getOption("server"));
    }0

SendThread 和 服務(wù)端的IO溝通

跟進上面Run方法的如下方法,doTranprot

 public ZooKeeperMain(String args[]) throws IOException, InterruptedException {
    cl.parseOptions(args);
    // todo 連接到客戶端
    connectToZK(cl.getOption("server"));
    }1

他是本類的抽象方法,具體的實現(xiàn)類是clientCnxnSocketNIO

跟進這個方法,其中有一步跟重要doIO(pendingQueue, outgoingQueue, cnxn);

 public ZooKeeperMain(String args[]) throws IOException, InterruptedException {
    cl.parseOptions(args);
    // todo 連接到客戶端
    connectToZK(cl.getOption("server"));
    }2
  • DoIo的源碼如下

它分成了兩大模塊

  • 讀就緒, 讀取服務(wù)端發(fā)送過來的數(shù)據(jù)
  • 寫就緒, 往客戶端發(fā)送用戶在控制臺輸入的命令
 public ZooKeeperMain(String args[]) throws IOException, InterruptedException {
    cl.parseOptions(args);
    // todo 連接到客戶端
    connectToZK(cl.getOption("server"));
    }3

思考:

雖然找到了客戶端往服務(wù)端發(fā)送數(shù)據(jù)的代碼, 但是問題來了, 它發(fā)送的什么數(shù)據(jù)啊? 在上面可以看到,它每次發(fā)送的數(shù)據(jù)都被包裝車成了packet類型,并且,繼續(xù)往下跟進可以看到這個packet來自于一個叫outgoingqueue的隊列中

client想往服務(wù)端發(fā)送什么?其實發(fā)送就是我們手動輸入的命令,只不過他把我們的命令解析出來并且進行了封裝,進行了哪些封裝? String-> request -> packet -> socket ,這個packet就在上面的部分被消費

到目前為止,算上一開始的主線程,其實已經(jīng)有3條線程了, 分別是主線程,SendThread和eventThread

代碼讀到這里,sendThread部分其實已經(jīng)結(jié)束了,我們直到了它正在消費outgoingqueue中的內(nèi)容,接下來的任務(wù)返回回去,從新回到 ZooKeeperMain中,看一開始主線程時如何處理用戶在命令行的輸入的

 public ZooKeeperMain(String args[]) throws IOException, InterruptedException {
    cl.parseOptions(args);
    // todo 連接到客戶端
    connectToZK(cl.getOption("server"));
    }4

跟進 main.run(), 主要做了如下幾件事

  • 通過反射創(chuàng)建出可以獲取控制臺輸入的對象jline.ConsoleReader
  • 通過反射創(chuàng)建出可以解析鍵盤錄入的對象
  • 開啟while循環(huán),等待用戶的輸入,處理用戶的輸入executeLine(line);
 public ZooKeeperMain(String args[]) throws IOException, InterruptedException {
    cl.parseOptions(args);
    // todo 連接到客戶端
    connectToZK(cl.getOption("server"));
    }5

繼續(xù)跟進 executeLine(line);,做了如下幾件事

  • 處理用戶輸入
  • 將命令添加到歷史命令
  • 處理命令
  • 命令數(shù)+1
 public ZooKeeperMain(String args[]) throws IOException, InterruptedException {
    cl.parseOptions(args);
    // todo 連接到客戶端
    connectToZK(cl.getOption("server"));
    }6

處理命令的邏輯如下:

將命令解析出來,通過if分支語句,判斷用戶輸入的什么命令, 然后再進一步處理

 public ZooKeeperMain(String args[]) throws IOException, InterruptedException {
    cl.parseOptions(args);
    // todo 連接到客戶端
    connectToZK(cl.getOption("server"));
    }7

比如,用戶輸入的是創(chuàng)建新節(jié)點的命令create /path, 就會有下面的函數(shù)處理

 public ZooKeeperMain(String args[]) throws IOException, InterruptedException {
    cl.parseOptions(args);
    // todo 連接到客戶端
    connectToZK(cl.getOption("server"));
    }8

跟進這個方法 , 主要做了下面幾件事

  • 校驗合法性
  • 封裝進 request
  • 添加acl
  • 提交submitRequest(),他是個重要的阻塞方法,每次執(zhí)行都會阻塞等待服務(wù)端的響應(yīng)
  • 等待響應(yīng)結(jié)果
 public ZooKeeperMain(String args[]) throws IOException, InterruptedException {
    cl.parseOptions(args);
    // todo 連接到客戶端
    connectToZK(cl.getOption("server"));
    }9

客戶端的阻塞式等待 -- 自旋鎖

跟進submitRequest()

  public boolean parseOptions(String[] args) {
    List<String> argList = Arrays.asList(args);
    Iterator<String> it = argList.iterator();

    while (it.hasNext()) {
        String opt = it.next();
        try {
            if (opt.equals("-server")) {
                options.put("server", it.next());
            } else if (opt.equals("-timeout")) {
                options.put("timeout", it.next());
            } else if (opt.equals("-r")) {
                options.put("readonly", "true");
            }
        } catch (NoSuchElementException e) {
            System.err.println("Error: no argument found for option "
                    + opt);
            return false;
        }

        if (!opt.startsWith("-")) {
            command = opt;
            cmdArgs = new ArrayList<String>();
            cmdArgs.add(command);
            while (it.hasNext()) {
                cmdArgs.add(it.next());
            }
            return true;
        }
    }
    return true;
}0

在上面的代碼中,可以看到可以他是使用一個while(!packet,finishes){} 來阻塞程序的, 剛看看到用戶的命令被封裝進了request, 接下來, 在queuePacket(h, r, request, response, null, null, null, null, watchRegistration);中,可以看到他被封裝進packet,然后添加到outgoingqueue隊列中,源碼如下

  public boolean parseOptions(String[] args) {
    List<String> argList = Arrays.asList(args);
    Iterator<String> it = argList.iterator();

    while (it.hasNext()) {
        String opt = it.next();
        try {
            if (opt.equals("-server")) {
                options.put("server", it.next());
            } else if (opt.equals("-timeout")) {
                options.put("timeout", it.next());
            } else if (opt.equals("-r")) {
                options.put("readonly", "true");
            }
        } catch (NoSuchElementException e) {
            System.err.println("Error: no argument found for option "
                    + opt);
            return false;
        }

        if (!opt.startsWith("-")) {
            command = opt;
            cmdArgs = new ArrayList<String>();
            cmdArgs.add(command);
            while (it.hasNext()) {
                cmdArgs.add(it.next());
            }
            return true;
        }
    }
    return true;
}1

在這個方法的最后一行,點睛,selector.wakeup(); 就是通知選擇器,別再阻塞select了,趕緊去做其他工作

因為選擇器在sendThread的doTransport()方法中,有阻塞的操作,我重新把代碼貼出來如下

服務(wù)端的NIOSocket -> ClientCnxnSocket 都是ClientCnxn上下文的封裝類的,SendThread同樣也是,它可以使用

現(xiàn)在再看,喚醒selector 讓他去做其他事 ,其實即使doIO(),這個方法代碼其實我在上面貼出來過,就是分成兩大部分,讀就緒與寫就緒

  public boolean parseOptions(String[] args) {
    List<String> argList = Arrays.asList(args);
    Iterator<String> it = argList.iterator();

    while (it.hasNext()) {
        String opt = it.next();
        try {
            if (opt.equals("-server")) {
                options.put("server", it.next());
            } else if (opt.equals("-timeout")) {
                options.put("timeout", it.next());
            } else if (opt.equals("-r")) {
                options.put("readonly", "true");
            }
        } catch (NoSuchElementException e) {
            System.err.println("Error: no argument found for option "
                    + opt);
            return false;
        }

        if (!opt.startsWith("-")) {
            command = opt;
            cmdArgs = new ArrayList<String>();
            cmdArgs.add(command);
            while (it.hasNext()) {
                cmdArgs.add(it.next());
            }
            return true;
        }
    }
    return true;
}2

寫到這里其實已經(jīng)把整個過程順下來了,下面再重新看看,sendThread是如果消費packet并且修改然后得到服務(wù)端的響應(yīng),修改pakcet.finished屬性的, 因為現(xiàn)在主線的submitRequest還在阻塞呢

往服務(wù)端寫

客戶端的socket的實現(xiàn)類是ClientCnxnSocketNio, 它往服務(wù)端寫的邏輯如下, 不難看出使用的java原生的sock.write(p.bb); // 發(fā)送服務(wù)端 , 亮點是后面的操作pendingQueue.add(p);被寫過的packet被添加到了pengingqueue中

  public boolean parseOptions(String[] args) {
    List<String> argList = Arrays.asList(args);
    Iterator<String> it = argList.iterator();

    while (it.hasNext()) {
        String opt = it.next();
        try {
            if (opt.equals("-server")) {
                options.put("server", it.next());
            } else if (opt.equals("-timeout")) {
                options.put("timeout", it.next());
            } else if (opt.equals("-r")) {
                options.put("readonly", "true");
            }
        } catch (NoSuchElementException e) {
            System.err.println("Error: no argument found for option "
                    + opt);
            return false;
        }

        if (!opt.startsWith("-")) {
            command = opt;
            cmdArgs = new ArrayList<String>();
            cmdArgs.add(command);
            while (it.hasNext()) {
                cmdArgs.add(it.next());
            }
            return true;
        }
    }
    return true;
}3

上面說了, 為啥被使用過的pakcet還要保留一份呢? 還是那個原因,主線程還因為pakcet的finish狀態(tài)未被該變而阻塞呢, 那什么時候改變呢? 答案是受到服務(wù)端的響應(yīng)之后改變,在哪里收到呢? 就是DoIo()的讀就緒模塊,下面附上源碼,它的解析我寫在這段代碼下面

從服務(wù)端讀

  public boolean parseOptions(String[] args) {
    List<String> argList = Arrays.asList(args);
    Iterator<String> it = argList.iterator();

    while (it.hasNext()) {
        String opt = it.next();
        try {
            if (opt.equals("-server")) {
                options.put("server", it.next());
            } else if (opt.equals("-timeout")) {
                options.put("timeout", it.next());
            } else if (opt.equals("-r")) {
                options.put("readonly", "true");
            }
        } catch (NoSuchElementException e) {
            System.err.println("Error: no argument found for option "
                    + opt);
            return false;
        }

        if (!opt.startsWith("-")) {
            command = opt;
            cmdArgs = new ArrayList<String>();
            cmdArgs.add(command);
            while (it.hasNext()) {
                cmdArgs.add(it.next());
            }
            return true;
        }
    }
    return true;
}4

如上代碼的最后部分,sendThread.readResponse(incomingBuffer); 下面是它的源碼,它首先是從buffer中讀取出服務(wù)端的發(fā)送的數(shù)據(jù),然后一通解析,封裝進pendingqueue的packet中,并且在方法的最后部分終于完成了狀態(tài)的修改

  public boolean parseOptions(String[] args) {
    List<String> argList = Arrays.asList(args);
    Iterator<String> it = argList.iterator();

    while (it.hasNext()) {
        String opt = it.next();
        try {
            if (opt.equals("-server")) {
                options.put("server", it.next());
            } else if (opt.equals("-timeout")) {
                options.put("timeout", it.next());
            } else if (opt.equals("-r")) {
                options.put("readonly", "true");
            }
        } catch (NoSuchElementException e) {
            System.err.println("Error: no argument found for option "
                    + opt);
            return false;
        }

        if (!opt.startsWith("-")) {
            command = opt;
            cmdArgs = new ArrayList<String>();
            cmdArgs.add(command);
            while (it.hasNext()) {
                cmdArgs.add(it.next());
            }
            return true;
        }
    }
    return true;
}5

解開客戶端的阻塞狀態(tài)

進入finishPacket(packet)

  public boolean parseOptions(String[] args) {
    List<String> argList = Arrays.asList(args);
    Iterator<String> it = argList.iterator();

    while (it.hasNext()) {
        String opt = it.next();
        try {
            if (opt.equals("-server")) {
                options.put("server", it.next());
            } else if (opt.equals("-timeout")) {
                options.put("timeout", it.next());
            } else if (opt.equals("-r")) {
                options.put("readonly", "true");
            }
        } catch (NoSuchElementException e) {
            System.err.println("Error: no argument found for option "
                    + opt);
            return false;
        }

        if (!opt.startsWith("-")) {
            command = opt;
            cmdArgs = new ArrayList<String>();
            cmdArgs.add(command);
            while (it.hasNext()) {
                cmdArgs.add(it.next());
            }
            return true;
        }
    }
    return true;
}6
。轉(zhuǎn)載請注明來源地址:黑帽SEO http://www.790079.com 專注于SEO培訓(xùn),快速排名
黑帽WiKi_黑帽百科(www.790079.com),8年黑帽SEO優(yōu)化技術(shù),黑帽seo快速排名,黑帽SEO技術(shù)培訓(xùn)學(xué)習(xí),黑帽SEO快速排名程序、泛目錄寄生蟲技術(shù),贈送免費黑帽SEO視頻教程

(黑帽seo技術(shù),網(wǎng)站快速排名,蜘蛛池加速收錄,目錄程序定制)

掃一下添加微信:



久久久国产一区_国产综合久久久久_欧美亚洲丝袜_成人综合国产精品
国产成人亚洲综合青青| 国产美女精品免费电影| av网址在线观看免费| 欧美日韩无遮挡| 一区中文字幕在线观看| www.午夜精品| 国产富婆一区二区三区| 国产一区二区在线播放| 欧美日韩精品久久久免费观看| 久久夜色精品国产亚洲aⅴ| 久久超碰亚洲| 国产精品∨欧美精品v日韩精品| 国产乱子伦精品| 黄色网在线视频| 欧美一区少妇| 日韩免费一区二区三区| 日本午夜人人精品| 五月天综合网| 午夜精品久久久久久久无码 | 久久久7777| 国产日韩欧美在线视频观看| 红桃一区二区三区| 黄色片视频在线免费观看| 加勒比海盗1在线观看免费国语版 加勒比在线一区二区三区观看 | 久久久久久亚洲精品不卡 | 色婷婷成人综合| 久久er99热精品一区二区三区| 国产妇女馒头高清泬20p多| 91精品久久久久久久久久入口| 99热在线国产| 国产精品12| 久操网在线观看| 国产福利视频在线播放| 久久99精品久久久久久久青青日本| 国产a级全部精品| 日韩在线中文字幕| 国产精品成人观看视频国产奇米| 久久伊人免费视频| 久久久久国产视频| 午夜精品久久久久久99热| 亚洲高清在线观看一区| 视频一区免费观看| 秋霞无码一区二区| 欧美日韩黄色一级片| 好吊色欧美一区二区三区视频| 国产中文一区二区| 91精品在线播放| zzjj国产精品一区二区| 久久亚洲春色中文字幕| 亚洲一区二区三区精品动漫| 亚洲高清精品中出| 日韩精品无码一区二区三区 | 亚洲一二三区精品| 日本一区二区三不卡| 日韩精品av一区二区三区| 欧美久久久久久| 国产视频一区二区不卡| 久久亚洲精品无码va白人极品| 久久久国产精品x99av| 欧美xxxx18国产| 日韩av免费一区| 免费av一区二区三区| 国产伦精品一区| 日韩在线中文字| 一本久道综合色婷婷五月| 青青影院一区二区三区四区| 日韩精品久久久| 国产在线视频欧美一区二区三区| 91精品久久久久久| 国产av无码专区亚洲精品| 国产精品免费久久久| 伊人久久青草| 欧美精品中文字幕一区二区| 国产精品一区二区久久精品| 久久久久久久久久久久久久久久久久av| 国产精品美女www| 午夜精品在线视频| 蜜桃传媒一区二区三区| 久久av喷吹av高潮av| 欧美激情亚洲国产| 日韩视频在线观看视频| 国产精品综合久久久| 日韩在线播放视频| 亚洲精品高清国产一线久久| 国语精品中文字幕| 国产高清精品一区| 久久99视频免费| 欧美精品一区二区三区免费播放| 久久久亚洲精品视频| 久久av.com| 欧美最大成人综合网| 777久久精品一区二区三区无码| 国产精品成人播放| 欧美中文字幕视频| 久久精品日韩| 亚洲国产一区二区在线| 国模视频一区二区| 久久久久免费网| 欧美一区二区视频在线| 成人精品一区二区三区| 国产精品久久久久久亚洲影视| 岛国视频一区免费观看 | 欧美wwwxxxx| 欧美日本韩国在线| 国产福利视频在线播放| 亚洲一区免费看| 国产美女高潮久久白浆| 国产精品人人做人人爽| 日本精品久久久久久久| 91精品国产91久久久久久久久| 国产99久久精品一区二区| 国内精品在线一区| 国产精品丝袜久久久久久消防器材| 欧美专区一二三| 久久精品国产一区二区三区| 日本公妇乱淫免费视频一区三区| 国产黄色激情视频| 丁香六月激情网| 国产日本一区二区三区| 久久伊人精品天天| 欧美极品欧美精品欧美图片| 久久久精品一区| 日本一区二区在线视频| 99视频在线| 亚洲午夜精品久久| 69av在线视频| 亚洲国产高清国产精品| av资源站久久亚洲| 亚洲直播在线一区| y111111国产精品久久婷婷| 久久中文字幕国产| 欧美成人精品免费| 久久国产精品偷| 不卡一区二区三区四区五区| 亚洲图片在线观看| 97精品国产97久久久久久| 日日摸日日碰夜夜爽无码| 日韩在线视频线视频免费网站| 欧美综合激情| 国产精品国产亚洲精品看不卡| 精品一区二区三区免费毛片| 不卡av电影院| 91精品免费| 热久久这里只有精品| 国产精品久久久久av福利动漫| 国模一区二区三区私拍视频| 中文字幕日本最新乱码视频| 131美女爱做视频| 欧美伊久线香蕉线新在线| 久久亚洲私人国产精品va| 国产精品影院在线观看| 丁香六月激情网| 国产精品丝袜白浆摸在线| 国产免费观看久久黄| 天天人人精品| 国产精品无码专区av在线播放| 国产日韩欧美夫妻视频在线观看| 亚洲淫片在线视频| 久久久噜噜噜www成人网| 国产一区精品在线| 亚洲欧美日韩精品综合在线观看 | 日韩欧美一区二区三区久久婷婷| 国产黄色激情视频| 国产在线一区二区三区欧美 | 国产精品入口夜色视频大尺度| av不卡在线免费观看| 日本一区二区三区视频在线播放| 北条麻妃在线一区二区| 国产一区福利视频| 综合国产精品久久久| 色偷偷9999www| 国产一区二区四区| 日韩精品在线中文字幕| 中文字幕乱码人妻综合二区三区| 色婷婷综合久久久久| 丰满人妻中伦妇伦精品app| 日韩欧美一区二区视频在线播放| 国产精品成人久久久久| 国产精品主播视频| 日韩aⅴ视频一区二区三区| 精品国产乱码久久久久久88av | 成人3d动漫一区二区三区| 日韩久久不卡| 中文字幕一区二区三区在线乱码| 日韩亚洲第一页| 久久免费精品日本久久中文字幕| 国产精品综合网站| 欧日韩不卡在线视频| 宅男一区二区三区| 国产精品免费区二区三区观看| 久久国产成人精品国产成人亚洲| 国产日韩精品在线观看| 日韩欧美一级在线| 亚洲欧洲精品在线观看| 国产精品成熟老女人| 久久激情视频久久| 久久综合九九| 91精品国产91久久久久麻豆 主演| 国产免费xxx| 国产奶头好大揉着好爽视频|