<center id="qkqgy"><optgroup id="qkqgy"></optgroup></center>
  • <menu id="qkqgy"></menu>
    <nav id="qkqgy"></nav>
    <xmp id="qkqgy"><nav id="qkqgy"></nav>
  • <xmp id="qkqgy"><menu id="qkqgy"></menu>
    <menu id="qkqgy"><menu id="qkqgy"></menu></menu>
    <tt id="qkqgy"><tt id="qkqgy"></tt></tt>

  • 通信協議

    基于TCP/IP協議,zk實現了自己的通信協議來完成客戶端與服務端,服務端與服務端之間的網絡通信,zk的通信協議整體上的設計非常簡單,

    客戶端發起連接,發送握手包進行timeout協商,協商成功后會返回一個session
    id和timeoout值.隨后就可以進行正常通信,通信過程中要在timeout范圍內發送ping包. zookeeper
    client和server之間的通信協議基本規則就是發送請求獲取響應.并根據響應做不同的動作.

    發送數據格式為:

    *
    消息長度+xid+request. xid每次請求必須是唯一的.消息長度和xid同為4字節,命令長度為4字節且必須為request的開始4字節.

    *
    命令是從1到11的數字表示,close的命令為-11.不同類型請求request有些差異

    *
    特殊請求具有固定的xid:watch_xid固定為-1,ping_xid為-2,auth_xid固定為-4.普通請求一般從0開始每次請求累加一次xid.

    響應數據為:

    *
    消息長度+header+response.消息長度為4字節,表明header+response的總長度.

    *
    header為xid,zxid,err對應長度為4,8,4.response根據請求類型不同具有差別

    *
    根據header里xid的區別分為watch,ping,auth,data這四種類型

    *
    根據這四種類型來區分返回消息是事件,還是認證,心跳和請求數據.client并以此作出不同響應.

    消息結構

    握手消息

    request消息體

    protocol_version+zxid+timeout+session_id+passwd_len+passwd+read_only.對應的字節長度為4,8,4,8,4,16,1
    取值除timeout外其他幾個皆可為0,password可以為任意16個字符.read_only為0或1(是布爾值). 注:握手包沒有xid和命令

    response消息體

    protocol_version+timeout+session_id+passwd_len+passwd+read_only.
    注:握手響應包沒有header.

    效果展示
    2020-04-21 19:26:53.990 localhost cms_watcher info INFO cms_watcher [pid:240]
    [Thread-4] [connection.py:646 _connect] Connecting to 30.3.3.60:9888, use_ssl:
    False 2020-04-21 19:26:53.990 localhost cms_watcher info INFO cms_watcher
    [pid:240] [Thread-4] [connection.py:650 _connect] Using session_id:
    144131667822575626 session_passwd: b'41f366ef7005bc5c859b7fc56fa40872'
    2020-04-21 19:26:53.990 localhost cms_watcher info INFO cms_watcher [pid:240]
    [Thread-4] [connection.py:299 _submit] Sending request(xid=None):
    Connect(protocol_version=0, last_zxid_seen=12884901993, time_out=30000,
    session_id=144131667822575626,
    passwd=b'A\xf3f\xefp\x05\xbc\\\x85\x9b\x7f\xc5o\xa4\x08r', read_only=None)
    2020-04-21 19:26:53.991 localhost cms_watcher info INFO cms_watcher [pid:240]
    [Thread-4] [connection.py:285 _invoke] Read response
    Connect(protocol_version=0, last_zxid_seen=0, time_out=30000,
    session_id=144131667822575626,
    passwd=b'A\xf3f\xefp\x05\xbc\\\x85\x9b\x7f\xc5o\xa4\x08r', read_only=False)
    2020-04-21 19:26:53.991 localhost cms_watcher info INFO cms_watcher [pid:240]
    [Thread-4] [connection.py:694 _connect] Session created, session_id:
    144131667822575626 session_passwd: b'41f366ef7005bc5c859b7fc56fa40872'
    negotiated session timeout: 30000 connect timeout: 10000.0 read timeout:
    20000.0 2020-04-21 19:26:53.991 localhost cms_watcher info INFO cms_watcher
    [pid:240] [Thread-4] [client.py:463 _session_callback] test: cur state
    CONNECTED, old state CONNECTING
    ping消息

    request消息體

    type (ping包只有一個字段就是命令值是11,它完整的發送包是4字節長度,4字節xid,4字節命令.)

    response消息體

    res_len+header+res (ping響應包一般只拆到header即可通過xid確認)

    效果展示
    2020-04-21 20:05:03.971 localhost cms_watcher info INFO cms_watcher [pid:240]
    [Thread-4] [connection.py:603 _connect_attempt] test: send ping 2020-04-21
    20:05:03.971 localhost cms_watcher info INFO cms_watcher [pid:240] [Thread-4]
    [connection.py:490 _send_ping] test: send ping 2020-04-21 20:05:03.971
    localhost cms_watcher info INFO cms_watcher [pid:240] [Thread-4]
    [connection.py:299 _submit] Sending request(xid=-2): Ping() 2020-04-21
    20:05:03.973 localhost cms_watcher info INFO cms_watcher [pid:240] [Thread-4]
    [connection.py:606 _connect_attempt] test: read socket 2020-04-21 20:05:03.973
    localhost cms_watcher info INFO cms_watcher [pid:240] [Thread-4]
    [connection.py:415 _read_socket] test: Received Ping
    getdata消息

    request消息體

    type+path_len+path+watcher type=4. path_len,是4字節,為path的長度 path為需要查詢的路徑,支持utf8
    watcher為布爾值.判斷是否有事件注冊.為1或0. 1字節

    response消息體

    data_len+data+stat data_len為data長度,4字節. stat由8,8,8,8,4,4,4,8,4,4,8字節順序組成.

    效果展示
    2020-04-21 20:25:13.460 localhost cms_watcher info INFO cms_watcher [pid:9078]
    [Thread-4] [connection.py:610 _connect_attempt] test: send request 2020-04-21
    20:25:13.460 localhost cms_watcher info INFO cms_watcher [pid:9078] [Thread-4]
    [connection.py:482 _send_request] test: send request xid 4 2020-04-21
    20:25:13.460 localhost cms_watcher info INFO cms_watcher [pid:9078] [Thread-4]
    [connection.py:299 _submit] Sending request(xid=4):
    GetData(path='/cms/config/items/item.cts_cfg', watcher=<bound method
    DataWatch._watcher of <kazoo.recipe.watchers.DataWatch object at
    0x7ff860ba3438>>) 2020-04-21 20:25:13.461 localhost cms_watcher info INFO
    cms_watcher [pid:9078] [Thread-4] [connection.py:606 _connect_attempt] test:
    read socket 2020-04-21 20:25:13.461 localhost cms_watcher info INFO cms_watcher
    [pid:9078] [Thread-4] [connection.py:448 _read_socket] test: Reading for header
    ReplyHeader(xid=4, zxid=17179869239, err=0)
    序列化和反序列化

    下面看下kazoo這個庫是怎樣根據zk的這個協議來組裝數據和解析數據的

    request字節流

    下面的代碼展示了將請求對象序列化成socket字節流的過程
    def _submit(self, request, timeout, xid=None): """Submit a request object with
    a timeout value and optional xid""" b = bytearray() if xid:
    b.extend(int_struct.pack(xid)) if request.type:
    b.extend(int_struct.pack(request.type)) b += request.serialize()
    self.logger.log( (BLATHER if isinstance(request, Ping) else logging.DEBUG),
    "Sending request(xid=%s): %s", xid, request)
    self._write(int_struct.pack(len(b)) + b, timeout)

    從上面的代碼可以看出,首先根據不同的請求,決定是否發送xid字段、type字段(也就是上面協議所說的),最后根據request對象序列化成字節流。這里的request就是kazoo/protocol/serialization.py定義的各個類實例
    比如連接類:
    class Connect(namedtuple('Connect', 'protocol_version last_zxid_seen' '
    time_out session_id passwd read_only')): type = None def serialize(self): b =
    bytearray() b.extend(int_long_int_long_struct.pack( self.protocol_version,
    self.last_zxid_seen, self.time_out, self.session_id))
    b.extend(write_buffer(self.passwd)) b.extend([1 if self.read_only else 0])
    return b @classmethod def deserialize(cls, bytes, offset): proto_version,
    timeout, session_id = int_int_long_struct.unpack_from( bytes, offset) offset +=
    int_int_long_struct.size password, offset = read_buffer(bytes, offset) try:
    read_only = bool_struct.unpack_from(bytes, offset)[0] is 1 offset +=
    bool_struct.size except struct.error: read_only = False return
    cls(proto_version, 0, timeout, session_id, password, read_only), offset
    response字節流

    下面的代碼展示了將socket字節流反序列化成對象的過程
    def _read_header(self, timeout): b = self._read(4, timeout) length =
    int_struct.unpack(b)[0] b = self._read(length, timeout) header, offset =
    ReplyHeader.deserialize(b, 0) return header, b, offset
    從上面的代碼可以看出,首先從socket中讀取4個字節,根據上面的協議,我們知道,這4個字節是data_len,即包的大小
    然后在根據len繼續讀取該大小的字節流,最后解析成具體的對象。
    class ReplyHeader(namedtuple('ReplyHeader', 'xid, zxid, err')): @classmethod
    def deserialize(cls, bytes, offset): """Given bytes and the current bytes
    offset, return a :class:`ReplyHeader` instance and the new offset""" new_offset
    = offset + reply_header_struct.size return cls._make(
    reply_header_struct.unpack_from(bytes, offset)), new_offset

    技術
    下載桌面版
    GitHub
    百度網盤(提取碼:draw)
    Gitee
    云服務器優惠
    阿里云優惠券
    騰訊云優惠券
    華為云優惠券
    站點信息
    問題反饋
    郵箱:ixiaoyang8@qq.com
    QQ群:766591547
    關注微信
    巨胸美乳无码人妻视频