客戶端的啟動流程
看上面的客戶端啟動的腳本圖,可以看到,zookeeper客戶端腳本運行的入口ZookeeperMain.java的main()方法, 關于這個類可以理解成它是程序啟動的輔助類,由它提供開始的位置,進而加載出zk client的上下文
創建ZooKeeperMain對象
// todo zookeeper的入口方法
public static void main(String args[]) throws KeeperException, IOException, InterruptedException {
// todo new ZK客戶端
ZooKeeperMain main = new ZooKeeperMain(args);
// todo run方法的實現在下面
main.run();
}
跟蹤
ZooKeeperMain main = new ZooKeeperMain(args);能往下追很長的代碼,提前說main.run()的作用,就是對用戶輸入的命令進行下一步處理
如上是入口函數的位置,跟進這兩個函數,可以找到我們在client端的命令行中可以輸入命令和zookeeper服務端進行通信的原因(開起了新的線程),以及zookeeper的客戶端所依賴的其他類
跟進ZooKeeperMain main = new ZooKeeperMain(args);
public ZooKeeperMain(String args[]) throws IOException, InterruptedException {
cl.parseOptions(args);
// todo 連接到客戶端
connectToZK(cl.getOption("server"));
}
我們在命令行啟動客戶端時,輸入命令zkCli.sh -server localhost:2181,其中的args數組, 就是我們在啟動就是我們輸入的參數,
構建zookeeperMain對象時,上面主要做了兩件事
- 解析args參數數組
- 連接客戶端
解析參數數組的邏輯就在下面, 很熟悉,就是我們在命令行啟動zookeeper時輸入的命令可選項
public boolean parseOptions(String[] args) {
List<String> argList = Arrays.asList(args);
Iterator<String> it = argList.iterator();
while (it.hasNext()) {
String opt = it.next();
try {
if (opt.equals("-server")) {
options.put("server", it.next());
} else if (opt.equals("-timeout")) {
options.put("timeout", it.next());
} else if (opt.equals("-r")) {
options.put("readonly", "true");
}
} catch (NoSuchElementException e) {
System.err.println("Error: no argument found for option "
+ opt);
return false;
}
if (!opt.startsWith("-")) {
command = opt;
cmdArgs = new ArrayList<String>();
cmdArgs.add(command);
while (it.hasNext()) {
cmdArgs.add(it.next());
}
return true;
}
}
return true;
}
創建ZooKeeper客戶端的對象
接著看如果連接客戶端, connectToZK(String newHost) 同樣是本類方法,源碼如下:
// todo 來到這里
protected void connectToZK(String newHost) throws InterruptedException, IOException {
if (zk != null && zk.getState().isAlive()) {
zk.close();
}
//todo 命令行中的server 后面跟著 host主機地址
host = newHost;
boolean readOnly = cl.getOption("readonly") != null;
// todo 創建zookeeper的實例
zk = new ZooKeeper(host,
Integer.parseInt(cl.getOption("timeout")),
new MyWatcher(), readOnly);
}
到這里算是個小高潮吧,畢竟看到了zookeeper client的封裝類ZooKeeper, 這個類上的注解大概是這么介紹這個類的
- 它是個Zookeeper 客戶端的封裝類, 它的第一個參數是
host:port,host:port,host:port這種格式的字符串,逗號左右是不同的服務端的地址 - 會異步的創建session,通常這個session在構造函數執行完之間就已經創建完成了
- watcher 是監聽者,它被通知的時刻不確定,可能是構造方法執行完成前,也可能在這之后
- 只要沒有連接成功, zookeeper客戶端,會一直嘗試從提供的服務地址串中選擇出一個嘗試鏈接
跟進ZooKeeper的構造方法
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,boolean canBeReadOnly) throws IOException{
LOG.info("Initiating client connection, connectString=" + connectString
+ " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);
watchManager.defaultWatcher = watcher;
// todo 包裝服務端的地址
ConnectStringParser connectStringParser = new ConnectStringParser(
connectString);
//todo 將服務端的地址封裝進 StaticHostProvider -> HostProvider中
HostProvider hostProvider = new StaticHostProvider(connectStringParser.getServerAddresses());
// todo 創建客戶端的上下文, 這個上下文對象的亮點就是它維護了一個客戶端的socket
cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
hostProvider, sessionTimeout, this, watchManager,
// todo 跟進這個方法,getClientCnxnSocket, 獲取出客戶端上下文中的socket
getClientCnxnSocket(), canBeReadOnly);
// todo 啟動客戶端
cnxn.start();
}
主要做了這么幾件事
- 將服務端的地址解析封裝進了
StaticHostProvider類中, 可以把這個類理解成專門存放服務端地址的set 集合 - 創建出了客戶端的上下文對象: ClientCnxn, 當然在這之前,入參位置還有一個
getClientCnxnSocket()這個函數可以創建出客戶端的NIO Socket 然后調用
cnxn.start()其實就是啟動了客戶端的另外兩條線程sendThread和eventThread下面會詳細說創建客戶端的 NioSocket
繼續跟進源碼getClientCnxnSocket()通過反射,zk客戶端使用的socket對象是ClientCnxnSocketNIO
//todo 通過反射創建出客戶端上下文中的 socket , 實際的ClientCnxnSocketNIO 是 ClientCnxnSocket的子類
// todo ---> zookeeper 封裝的 NIO的邏輯都在 實際的ClientCnxnSocketNIO
private static ClientCnxnSocket getClientCnxnSocket() throws IOException {
// todo zookeeper.clientCnxnSocket
String clientCnxnSocketName = System.getProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET);
if (clientCnxnSocketName == null) {
// todo 上面String其實就是這個類的name, 根進去看一下它的屬性
// todo 這個類維護了NioSocket使用到的 selector 選擇器 , 已經發生的感興趣的事件SelectionKey
clientCnxnSocketName = ClientCnxnSocketNIO.class.getName();
}
try {
// todo 可以看到客戶端使用的 NioSocket
return (ClientCnxnSocket) Class.forName(clientCnxnSocketName).getDeclaredConstructor()
.newInstance();
} catch (Exception e) {
IOException ioe = new IOException("Couldn't instantiate "
+ clientCnxnSocketName);
ioe.initCause(e);
throw ioe;
}
}
創建 ClientCnxn客戶端的上下文
創建上下文,構造函數中的諸多屬性都是在前面讀取配置文件或是新添加進來的,重點是最后兩行,它創建了兩條線程類,和zk客戶端的IO息息相關
public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {
this.zooKeeper = zooKeeper;
this.watcher = watcher;
this.sessionId = sessionId; // todo 剛才傳遞過來的值為0
this.sessionPasswd = sessionPasswd;
this.sessionTimeout = sessionTimeout;
this.hostProvider = hostProvider;
this.chrootPath = chrootPath;
connectTimeout = sessionTimeout / hostProvider.size();
// todo 添加read的超時時間
readTimeout = sessionTimeout * 2 / 3;
readOnly = canBeReadOnly;
// todo 創建了一個seadThread 線程
sendThread = new SendThread(clientCnxnSocket);
eventThread = new EventThread();
}
創建SendThread
sendThred是一個客戶端的線程類,什么時候開啟? 其實就在上面,當創建了ClientCnxn后,調用的cnxn.start()就是在開啟它的run() , 它有什么作用? 它的run()是一個無限循環,除非運到了close的條件,否則他就會一直循環下去, 比如向服務端發送心跳,或者向服務端發送我們在控制臺輸入的數據以及接受服務端發送過來的響應
這是他的構造方法,可以看到它還是一個守護線程,并擁有客戶端socket的引用,有了NIO Socket相關技能
//todo
SendThread(ClientCnxnSocket clientCnxnSocket) {
super(makeThreadName("-SendThread()"));
// todo 設置狀態 Connecting
state = States.CONNECTING;
// todo 就是在 Zookeeper new ClientCnxn 時, 在倒數第二個位置使傳遞進去一個函數實際的
this.clientCnxnSocket = clientCnxnSocket;
// todo 設置成守護線程
setDaemon(true);
}
它的Run方法, 真的是好長啊, 比我上面寫的部分內容還長(大概兩百行了), 大概它的流程 ,每次循環:
- 檢查一下客戶端的socket有沒有和服務端的socket建立連接
- 沒有建立連接
- 嘗試選出其他的server地址進行連接
- 如果滿足close的條件,直接break 跳出整個while循環
- 如果已經建立了連接
- 計算 to = 讀取的超時時間 - 服務端的響應時間
- 未連接的狀態
- 計算 to = 連接超時時間 - 服務端的響應時間
- 上面的兩個to, 如果小于0, 說明客戶端和服務端通信出現了異常, 很可能是server的session time out,于是拋出異常
- 如果連接狀態是健康的,向服務端發送心跳
clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);向服務端發送數據
- 沒有建立連接
在這個負責和服務端進行IO操作的線程中,只要不是close或其他重大錯誤,一般可以預知的異常都有try起來,然后記錄日志,并沒有其他操作,循環還是會進行
// todo introduce 介紹
clientCnxnSocket.introduce(this,sessionId); // todo this,sessionId == 0
clientCnxnSocket.updateNow();
clientCnxnSocket.updateLastSendAndHeard();
int to;
long lastPingRwServer = Time.currentElapsedTime();
final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds
InetSocketAddress serverAddress = null;
// todo 這個while循環中存在建立連接的過程, 已經連接建立失敗后不斷重試的過程
//todo state.isAlive() 默認是 NOT_CONNECTED
while (state.isAlive()) {
try {
//todo 1111 如果socket還沒有連接 /////////////////////////////////////////////////////////////////////////////////////////////////////////
//todo 如果socket還沒有連接
if (!clientCnxnSocket.isConnected()) {
// todo 判斷是不是第一次連接, 如果不是第一次進入下面try代碼塊, 隨機產生一個小于一秒的時間
if(!isFirstConnect){
try {
Thread.sleep(r.nextInt(1000));
} catch (InterruptedException e) {
LOG.warn("Unexpected exception", e);
}
}
// don't re-establish connection if we are closing
// todo 如果是closing 或者 已經關閉了, 直接退出這個循環
if (closing || !state.isAlive()) {
break;
}
if (rwServerAddress != null) {
serverAddress = rwServerAddress;
rwServerAddress = null;
} else {
// todo 連接失敗時,來這里重試連接
// todo 從我們傳遞進來的host地址中選擇一個地址
serverAddress = hostProvider.next(1000);
}
// todo client和server進行socket連接
// todo 跟進去 ,實現邏輯在上面
// todo 這個方法開始建立連接,并將 isFasterConnect改成了 false
startConnect(serverAddress);
clientCnxnSocket.updateLastSendAndHeard();
}
//todo 2222 如果socket處于連接狀態 /////////////////////////////////////////////////////////////////////////////////////////////////////////
// todo 下面的連接狀態
if (state.isConnected()) {
// determine whether we need to send an AuthFailed event.
if (zooKeeperSaslClient != null) {
boolean sendAuthEvent = false;
if (zooKeeperSaslClient.getSaslState() == ZooKeeperSaslClient.SaslState.INITIAL) {
try {
zooKeeperSaslClient.initialize(ClientCnxn.this);
} catch (SaslException e) {
LOG.error("SASL authentication with Zookeeper Quorum member failed: " + e);
state = States.AUTH_FAILED;
sendAuthEvent = true;
}
}
KeeperState authState = zooKeeperSaslClient.getKeeperState();
if (authState != null) {
if (authState == KeeperState.AuthFailed) {
// An authentication error occurred during authentication with the Zookeeper Server.
state = States.AUTH_FAILED;
sendAuthEvent = true;
} else {
if (authState == KeeperState.SaslAuthenticated) {
sendAuthEvent = true;
}
}
}
if (sendAuthEvent == true) {
eventThread.queueEvent(new WatchedEvent(
Watcher.Event.EventType.None,
authState,null));
}
}
// todo 連接成功的話執行to 為下面值
// todo to = 讀取的超時時間 - 上一次的讀取時間
// todo 如果預訂的超時時間 - 上次讀的時間 <= 0 說明超時了
to = readTimeout - clientCnxnSocket.getIdleRecv();
} else {
// todo 如果沒有連接成功, 就會來到這里, 給 to 賦值
to = connectTimeout - clientCnxnSocket.getIdleRecv();
}
//todo 3333 異常處理 /////////////////////////////////////////////////////////////////////////////////////////////////////////
// todo 下面拋出來了異常
if (to <= 0) {
String warnInfo;
warnInfo = "Client session timed out, have not heard from server in "
+ clientCnxnSocket.getIdleRecv()
+ "ms"
+ " for sessionid 0x"
+ Long.toHexString(sessionId);
LOG.warn(warnInfo);
// todo 這里拋出來了異常, 下面的try 就會把它抓住
throw new SessionTimeoutException(warnInfo);
}
//todo 44444 連接成功執行的邏輯 /////////////////////////////////////////////////////////////////////////////////////////////////////////
// todo 下面的是連接成功執行的邏輯
if (state.isConnected()) {
// todo 為了防止競爭狀態丟失發送第二個ping, 同時也避免出現很多的ping
//1000(1 second) is to prevent(阻止) race condition missing to send the second ping
//also make sure not to send too many pings when readTimeout is small
int timeToNextPing = readTimeout / 2 - clientCnxnSocket.getIdleSend() -
((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0);
//send a ping request either time is due or no packet sent out within MAX_SEND_PING_INTERVAL
if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) {
// todo 客戶端一直在這里循環, 如果連接成功的話, 每次循環都來到這個邏輯這里發送 ping
sendPing();
clientCnxnSocket.updateLastSend();
} else {
if (timeToNextPing < to) {
to = timeToNextPing;
}
}
}
//todo 55555 /////////////////////////////////////////////////////////////////////////////////////////////////////////
// If we are in read-only mode, seek for read/write server
// todo 只讀狀態 相關邏輯
if (state == States.CONNECTEDREADONLY) {
long now = Time.currentElapsedTime();
int idlePingRwServer = (int) (now - lastPingRwServer);
if (idlePingRwServer >= pingRwTimeout) {
lastPingRwServer = now;
idlePingRwServer = 0;
pingRwTimeout =
Math.min(2*pingRwTimeout, maxPingRwTimeout);
pingRwServer();
}
to = Math.min(to, pingRwTimeout - idlePingRwServer);
}
//todo 66666 /////////////////////////////////////////////////////////////////////////////////////////////////////////
// todo 消費outgoingqueue, 完成向服務端的發送發送
// todo doTransport 是 ClientCnxnSocket 的抽象方法, 實現類clientCnxnSocketNio
clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);
} catch (Throwable e) {
// todo 在這個try中處理里面的拋出來的異常
if (closing) {
// todo 如果是請求關閉, 直接退出 break 出while循環
if (LOG.isDebugEnabled()) {
// closing so this is expected
LOG.debug("An exception was thrown while closing send thread for session 0x"
+ Long.toHexString(getSessionId())
+ " : " + e.getMessage());
}
break;
} else {
// todo 只要不是退出異常, 下面的異常都是僅僅打印了一下出現了什么異常
// this is ugly, you have a better way speak up
if (e instanceof SessionExpiredException) {
LOG.info(e.getMessage() + ", closing socket connection");
} else if (e instanceof SessionTimeoutException) {
LOG.info(e.getMessage() + RETRY_CONN_MSG);
} else if (e instanceof EndOfStreamException) {
LOG.info(e.getMessage() + RETRY_CONN_MSG);
} else if (e instanceof RWServerFoundException) {
LOG.info(e.getMessage());
} else if (e instanceof SocketException) {
LOG.info("Socket error occurred: {}: {}", serverAddress, e.getMessage());
} else {
LOG.warn("Session 0x{} for server {}, unexpected error{}",
Long.toHexString(getSessionId()),
serverAddress,
RETRY_CONN_MSG,
e);
}
// todo 這個方法中, isFirstConnect = true
cleanup();
if (state.isAlive()) {
eventThread.queueEvent(new WatchedEvent(
Event.EventType.None,
Event.KeeperState.Disconnected,
null));
}
clientCnxnSocket.updateNow();
clientCnxnSocket.updateLastSendAndHeard();
}
}
} // todo while循環的結束符號 , 這是個while循環, 除了上面的close其他異常都會繼續循環, 接著上去再看一遍
cleanup();
clientCnxnSocket.close();
if (state.isAlive()) {
eventThread.queueEvent(new WatchedEvent(Event.EventType.None,
Event.KeeperState.Disconnected, null));
}
ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),
"SendThread exited loop for session: 0x"
+ Long.toHexString(getSessionId()));
}
在上面這個200行的Run方法中比較值得注意的幾個方法如下
- 如果做到下次選出一個非當前server的地址
針對下標運行,對數組的size取模, 再賦值給自己,所以就實現了從0 - array.size()的循環
,【巨型】【十萬】【更加】【說不】,【剔除】【塔狂】【有一】.【毒藥】【劈去】【就完】【橋右】,【點像】【水聲】【險鯤】黑帽seo研究【十幾】,【狐那】【都掩】【用到】【思想】.【來短】!【若無】【是一】【君之】【全部】【升起】【就會】【姐聽】【嗯我】【必然】【身金】【得更】【聲驚】【佛土】【應的】【一會】【響之】【而說】【量波】【得泰】【死有】【原了】【口中】【不高】【沒有】【不是】【如出】【衣袍】【巨大】【那火】【停頓】【雖然】【難度】【通天】【后多】【敏銳】【出現】, public InetSocketAddress next(long spinDelay) {
currentIndex = ++currentIndex % serverAddresses.size();
if (currentIndex == lastIndex && spinDelay > 0) {
try {
Thread.sleep(spinDelay);
} catch (InterruptedException e) {
LOG.warn("Unexpected exception", e);
}
} else if (lastIndex == -1) {
// We don't want to sleep on the first ever connect attempt.
lastIndex = 0;
}
- 如果檢查到了沒有連接的話,就是用clientCnxnSocket進行連接
這個函數中,將標記是否是第一次連接的標記置為了flase, 并且拿到了sessionid
public ZooKeeperMain(String args[]) throws IOException, InterruptedException {
cl.parseOptions(args);
// todo 連接到客戶端
connectToZK(cl.getOption("server"));
}0
SendThread 和 服務端的IO溝通
跟進上面Run方法的如下方法,doTranprot
public ZooKeeperMain(String args[]) throws IOException, InterruptedException {
cl.parseOptions(args);
// todo 連接到客戶端
connectToZK(cl.getOption("server"));
}1
他是本類的抽象方法,具體的實現類是clientCnxnSocketNIO
跟進這個方法,其中有一步跟重要doIO(pendingQueue, outgoingQueue, cnxn);
public ZooKeeperMain(String args[]) throws IOException, InterruptedException {
cl.parseOptions(args);
// todo 連接到客戶端
connectToZK(cl.getOption("server"));
}2
- DoIo的源碼如下
它分成了兩大模塊
- 讀就緒, 讀取服務端發送過來的數據
- 寫就緒, 往客戶端發送用戶在控制臺輸入的命令
public ZooKeeperMain(String args[]) throws IOException, InterruptedException {
cl.parseOptions(args);
// todo 連接到客戶端
connectToZK(cl.getOption("server"));
}3
思考:
雖然找到了客戶端往服務端發送數據的代碼, 但是問題來了, 它發送的什么數據啊? 在上面可以看到,它每次發送的數據都被包裝車成了packet類型,并且,繼續往下跟進可以看到這個packet來自于一個叫outgoingqueue的隊列中
client想往服務端發送什么?其實發送就是我們手動輸入的命令,只不過他把我們的命令解析出來并且進行了封裝,進行了哪些封裝? String-> request -> packet -> socket ,這個packet就在上面的部分被消費
到目前為止,算上一開始的主線程,其實已經有3條線程了, 分別是主線程,SendThread和eventThread
代碼讀到這里,sendThread部分其實已經結束了,我們直到了它正在消費outgoingqueue中的內容,接下來的任務返回回去,從新回到 ZooKeeperMain中,看一開始主線程時如何處理用戶在命令行的輸入的
public ZooKeeperMain(String args[]) throws IOException, InterruptedException {
cl.parseOptions(args);
// todo 連接到客戶端
connectToZK(cl.getOption("server"));
}4
跟進 main.run(), 主要做了如下幾件事
- 通過反射創建出可以獲取控制臺輸入的對象
jline.ConsoleReader - 通過反射創建出可以解析鍵盤錄入的對象
- 開啟while循環,等待用戶的輸入,處理用戶的輸入
executeLine(line);
public ZooKeeperMain(String args[]) throws IOException, InterruptedException {
cl.parseOptions(args);
// todo 連接到客戶端
connectToZK(cl.getOption("server"));
}5
繼續跟進 executeLine(line);,做了如下幾件事
- 處理用戶輸入
- 將命令添加到歷史命令
- 處理命令
- 命令數+1
public ZooKeeperMain(String args[]) throws IOException, InterruptedException {
cl.parseOptions(args);
// todo 連接到客戶端
connectToZK(cl.getOption("server"));
}6
處理命令的邏輯如下:
將命令解析出來,通過if分支語句,判斷用戶輸入的什么命令, 然后再進一步處理
public ZooKeeperMain(String args[]) throws IOException, InterruptedException {
cl.parseOptions(args);
// todo 連接到客戶端
connectToZK(cl.getOption("server"));
}7
比如,用戶輸入的是創建新節點的命令create /path, 就會有下面的函數處理
public ZooKeeperMain(String args[]) throws IOException, InterruptedException {
cl.parseOptions(args);
// todo 連接到客戶端
connectToZK(cl.getOption("server"));
}8
跟進這個方法 , 主要做了下面幾件事
- 校驗合法性
- 封裝進 request
- 添加acl
- 提交submitRequest(),他是個重要的阻塞方法,每次執行都會阻塞等待服務端的響應
- 等待響應結果
public ZooKeeperMain(String args[]) throws IOException, InterruptedException {
cl.parseOptions(args);
// todo 連接到客戶端
connectToZK(cl.getOption("server"));
}9
客戶端的阻塞式等待 -- 自旋鎖
跟進submitRequest()
public boolean parseOptions(String[] args) {
List<String> argList = Arrays.asList(args);
Iterator<String> it = argList.iterator();
while (it.hasNext()) {
String opt = it.next();
try {
if (opt.equals("-server")) {
options.put("server", it.next());
} else if (opt.equals("-timeout")) {
options.put("timeout", it.next());
} else if (opt.equals("-r")) {
options.put("readonly", "true");
}
} catch (NoSuchElementException e) {
System.err.println("Error: no argument found for option "
+ opt);
return false;
}
if (!opt.startsWith("-")) {
command = opt;
cmdArgs = new ArrayList<String>();
cmdArgs.add(command);
while (it.hasNext()) {
cmdArgs.add(it.next());
}
return true;
}
}
return true;
}0
在上面的代碼中,可以看到可以他是使用一個while(!packet,finishes){} 來阻塞程序的, 剛看看到用戶的命令被封裝進了request, 接下來, 在queuePacket(h, r, request, response, null, null, null, null, watchRegistration);中,可以看到他被封裝進packet,然后添加到outgoingqueue隊列中,源碼如下
public boolean parseOptions(String[] args) {
List<String> argList = Arrays.asList(args);
Iterator<String> it = argList.iterator();
while (it.hasNext()) {
String opt = it.next();
try {
if (opt.equals("-server")) {
options.put("server", it.next());
} else if (opt.equals("-timeout")) {
options.put("timeout", it.next());
} else if (opt.equals("-r")) {
options.put("readonly", "true");
}
} catch (NoSuchElementException e) {
System.err.println("Error: no argument found for option "
+ opt);
return false;
}
if (!opt.startsWith("-")) {
command = opt;
cmdArgs = new ArrayList<String>();
cmdArgs.add(command);
while (it.hasNext()) {
cmdArgs.add(it.next());
}
return true;
}
}
return true;
}1
在這個方法的最后一行,點睛,selector.wakeup(); 就是通知選擇器,別再阻塞select了,趕緊去做其他工作
因為選擇器在sendThread的doTransport()方法中,有阻塞的操作,我重新把代碼貼出來如下
服務端的NIOSocket -> ClientCnxnSocket 都是ClientCnxn上下文的封裝類的,SendThread同樣也是,它可以使用
現在再看,喚醒selector 讓他去做其他事 ,其實即使doIO(),這個方法代碼其實我在上面貼出來過,就是分成兩大部分,讀就緒與寫就緒
public boolean parseOptions(String[] args) {
List<String> argList = Arrays.asList(args);
Iterator<String> it = argList.iterator();
while (it.hasNext()) {
String opt = it.next();
try {
if (opt.equals("-server")) {
options.put("server", it.next());
} else if (opt.equals("-timeout")) {
options.put("timeout", it.next());
} else if (opt.equals("-r")) {
options.put("readonly", "true");
}
} catch (NoSuchElementException e) {
System.err.println("Error: no argument found for option "
+ opt);
return false;
}
if (!opt.startsWith("-")) {
command = opt;
cmdArgs = new ArrayList<String>();
cmdArgs.add(command);
while (it.hasNext()) {
cmdArgs.add(it.next());
}
return true;
}
}
return true;
}2
寫到這里其實已經把整個過程順下來了,下面再重新看看,sendThread是如果消費packet并且修改然后得到服務端的響應,修改pakcet.finished屬性的, 因為現在主線的submitRequest還在阻塞呢
往服務端寫
客戶端的socket的實現類是ClientCnxnSocketNio, 它往服務端寫的邏輯如下, 不難看出使用的java原生的sock.write(p.bb); // 發送服務端 , 亮點是后面的操作pendingQueue.add(p);被寫過的packet被添加到了pengingqueue中
public boolean parseOptions(String[] args) {
List<String> argList = Arrays.asList(args);
Iterator<String> it = argList.iterator();
while (it.hasNext()) {
String opt = it.next();
try {
if (opt.equals("-server")) {
options.put("server", it.next());
} else if (opt.equals("-timeout")) {
options.put("timeout", it.next());
} else if (opt.equals("-r")) {
options.put("readonly", "true");
}
} catch (NoSuchElementException e) {
System.err.println("Error: no argument found for option "
+ opt);
return false;
}
if (!opt.startsWith("-")) {
command = opt;
cmdArgs = new ArrayList<String>();
cmdArgs.add(command);
while (it.hasNext()) {
cmdArgs.add(it.next());
}
return true;
}
}
return true;
}3
上面說了, 為啥被使用過的pakcet還要保留一份呢? 還是那個原因,主線程還因為pakcet的finish狀態未被該變而阻塞呢, 那什么時候改變呢? 答案是受到服務端的響應之后改變,在哪里收到呢? 就是DoIo()的讀就緒模塊,下面附上源碼,它的解析我寫在這段代碼下面
從服務端讀
public boolean parseOptions(String[] args) {
List<String> argList = Arrays.asList(args);
Iterator<String> it = argList.iterator();
while (it.hasNext()) {
String opt = it.next();
try {
if (opt.equals("-server")) {
options.put("server", it.next());
} else if (opt.equals("-timeout")) {
options.put("timeout", it.next());
} else if (opt.equals("-r")) {
options.put("readonly", "true");
}
} catch (NoSuchElementException e) {
System.err.println("Error: no argument found for option "
+ opt);
return false;
}
if (!opt.startsWith("-")) {
command = opt;
cmdArgs = new ArrayList<String>();
cmdArgs.add(command);
while (it.hasNext()) {
cmdArgs.add(it.next());
}
return true;
}
}
return true;
}4
如上代碼的最后部分,sendThread.readResponse(incomingBuffer); 下面是它的源碼,它首先是從buffer中讀取出服務端的發送的數據,然后一通解析,封裝進pendingqueue的packet中,并且在方法的最后部分終于完成了狀態的修改
public boolean parseOptions(String[] args) {
List<String> argList = Arrays.asList(args);
Iterator<String> it = argList.iterator();
while (it.hasNext()) {
String opt = it.next();
try {
if (opt.equals("-server")) {
options.put("server", it.next());
} else if (opt.equals("-timeout")) {
options.put("timeout", it.next());
} else if (opt.equals("-r")) {
options.put("readonly", "true");
}
} catch (NoSuchElementException e) {
System.err.println("Error: no argument found for option "
+ opt);
return false;
}
if (!opt.startsWith("-")) {
command = opt;
cmdArgs = new ArrayList<String>();
cmdArgs.add(command);
while (it.hasNext()) {
cmdArgs.add(it.next());
}
return true;
}
}
return true;
}5
解開客戶端的阻塞狀態
進入finishPacket(packet)
public boolean parseOptions(String[] args) {
List<String> argList = Arrays.asList(args);
Iterator<String> it = argList.iterator();
while (it.hasNext()) {
String opt = it.next();
try {
if (opt.equals("-server")) {
options.put("server", it.next());
} else if (opt.equals("-timeout")) {
options.put("timeout", it.next());
} else if (opt.equals("-r")) {
options.put("readonly", "true");
}
} catch (NoSuchElementException e) {
System.err.println("Error: no argument found for option "
+ opt);
return false;
}
if (!opt.startsWith("-")) {
command = opt;
cmdArgs = new ArrayList<String>();
cmdArgs.add(command);
while (it.hasNext()) {
cmdArgs.add(it.next());
}
return true;
}
}
return true;
}6。轉載請注明來源地址:黑帽SEO http://www.790079.com
專注于SEO培訓,快速排名
黑帽WiKi_黑帽百科(www.790079.com),8年黑帽SEO優化技術,黑帽seo快速排名,黑帽SEO技術培訓學習,黑帽SEO快速排名程序、泛目錄、寄生蟲技術,贈送免費黑帽SEO視頻教程
(黑帽seo技術,網站快速排名,蜘蛛池加速收錄,目錄程序定制)
掃一下添加微信:
