通讯协议


  • 组成

    • head: 消息头
      • payload length(int32): 消息长度
      • request_id(int32): 请求ID
      • response_to(int32): 服务端响应对应请求的reques_id(请求类型为OP_QUERY、OP_GET_MORE)
      • operation_code(int32): 消息类型
    • payload: 消息体
  • 类型

名称 代码 作用
OP_REPLY 1 回复客户端请求
OP_UPDATE 2001 更新数据
OP_INSERT 2002 插入数据
OP_QRERY 2004 请求
OP_GET_MORE 2005 获取更多的数据
OP_DELETE 2006 删除数据
OP_KILL_CURSORS 2007 游标已使用完
OP_MSG 2013 扩展消息
客户端消息
  • 可以发送除OP_REPLY之外的类型
  • 只有OP_QUERY和OP_GET_MORE会有响应
  • OP_UPDATE消息体
    • ZRRO(int32): 保留字
    • fullCollectionName(string): 表全称,如db1.collection1
    • flags(int32): 标志,第一位代表是否upsert, 第二位代表是否更新多个,其他位为保留位
    • selector(bson): 更新条件
    • update(bson): 更新操作

pymongo


客户端选项:
  • retryWrites/retryWrites: operations will be retried once after a network error on MongoDB 3.6+. Defaults to True
  • write concern选项
    • w: int/string 写入确认 0:无需任何确认 1(默认值):数据写入到Primary就向客户端发送确认 n: n个节点都写入了才确认 majority: 大多数节点写入之后才确认
    • journal:写入持久化到journal才确认
  • read preference
    • primary(默认): 只从primary读数据
    • primaryPreferred: 优先读primary
    • secondary:只从secondary节点读数据
    • secondaryPreferred: 优先读secondary
    • nearest: 按网络距离就近读取
  • readConcernLevel
    • local: 能读取任意数据
    • majority: 能读取到写入大多数节点的数据
获取对应数据库
1
2
3
4
def __getitem__(self, name):
    """Get a database by name.
    """
    return database.Database(self, name)
TCP连接建立(NODELAY, KEEPALIVE)
1
2
3
4
sock.setsockopt(socket.IPPROTO_TCP,socket.TCP_NODELAY, 1)          
sock.settimeout(options.connect_timeout)
sock.setsockopt(socket.SOL_SOCKET,socket.SO_KEEPALIVE, options.socket_keepalive)
_set_keepalive_times(sock)
新建sock
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def _get_socket_no_auth(self):
    """Get or create a SocketInfo. Can raise ConnectionFailure."""
    # We use the pid here to avoid issues with fork / multiprocessing.
    # See test.test_client:TestClient.test_fork for an example of
    # what could go wrong otherwise
    if self.pid != os.getpid():
        self.reset()

    # Get a free socket or create one.
    if not self._socket_semaphore.acquire(
            True, self.opts.wait_queue_timeout):
        self._raise_wait_queue_timeout()
    with self.lock:
        self.active_sockets += 1

    # We've now acquired the semaphore and must release it on error.
    sock_info = None
    try: 
        while sock_info is None:
            try:
                with self.lock:
                    sock_info = self.sockets.popleft()
            except IndexError:
                sock_info = self.connect()
    except Exception:
        self._socket_semaphore.release()
    return sock_info
放入pool
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
 def return_socket(self, sock_info, publish_checkin=True):
    """Return the socket to the pool, or if it's closed discard it.
    """
    if self.pid != os.getpid():
        self.reset()
    else:
        if self.closed:
            sock_info.close_socket(ConnectionClosedReason.POOL_CLOSED)
        elif sock_info.pool_id != self.pool_id:
            sock_info.close_socket(ConnectionClosedReason.STALE)
        elif not sock_info.closed:
            sock_info.update_last_checkin_time()
            sock_info.update_is_writable(self.is_writable)
            with self.lock:
                self.sockets.appendleft(sock_info)

    self._socket_semaphore.release()
    with self.lock:
        self.active_sockets -= 1
更新过期的sock
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def remove_stale_sockets(self):
    """Removes stale sockets then adds new ones if pool is too small."""
    if self.opts.max_idle_time_seconds is not None:
        with self.lock:
            while (self.sockets and
                   self.sockets[-1].idle_time_seconds() > self.opts.max_idle_time_seconds):
                sock_info = self.sockets.pop()
                sock_info.close_socket(ConnectionClosedReason.IDLE)
    while True:
        with self.lock:
            if (len(self.sockets) + self.active_sockets >=
                    self.opts.min_pool_size):
                # There are enough sockets in the pool.
                break

        # We must acquire the semaphore to respect max_pool_size.
        if not self._socket_semaphore.acquire(False):
            break
        try:
            sock_info = self.connect()
            with self.lock:
                self.sockets.appendleft(sock_info)
        finally:
            self._socket_semaphore.release()
检测sock是否可读:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
  def socket_closed(self, sock):
    """Return True if we know socket has been closed, False otherwise.
    """
    if self._poller:
        with self._lock:
            self._poller.register(sock, _EVENT_MASK)
            try:
                rd = self._poller.poll(0)
            finally:
                self._poller.unregister(sock)
    else:
        rd, _, _ = select.select([sock], [], [], 0)
    return len(rd) > 0
打包
1
2
3
4
5
6
7
8
def __pack_message(operation, data):
"""Takes message data and adds a message header based on the operation.

Returns the resultant message string.
"""
rid = _randint()
message = _pack_header(16 + len(data), rid, 0, operation)
return rid, message + data

参考