通讯协议

  • 基于TCP协议

  • 数据以\r\n分隔,数据类型五种,按第一个字节来区分

  • simple string: ”+“, 状态回复

1
    +OK\r\n
  • error: “-", 错误回复
1
    -ERR unknown command 'foobar'\r\n
  • integer: “:", 整数回复
1
    :10\r\n
  • bulk string: “$",用于命令请求和批量回复
1
    $6\r\nfoobar\r\n
  • multi bulk string: “*", 多条批量回复(-1表示不存在)
1
    *4\r\n$3\r\nbar\r\n$3\r\nfoo\r\n$5\r\nheloo\r\n$-1\r\n

redis-py

建立TCP连接
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
def _connect(self):
    "Create a TCP socket connection"
    err = None
    for res in socket.getaddrinfo(self.host, self.port, self.socket_type,
                                  socket.SOCK_STREAM):
        family, socktype, proto, canonname, socket_address = res
        sock = None
        try:
            sock = socket.socket(family, socktype, proto)
            # TCP_NODELAY
            sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)

            # TCP_KEEPALIVE
            if self.socket_keepalive:
                sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
                for k, v in iteritems(self.socket_keepalive_options):
                    sock.setsockopt(socket.IPPROTO_TCP, k, v)

            # set the socket_connect_timeout before we connect
            sock.settimeout(self.socket_connect_timeout)

            # connect
            sock.connect(socket_address)

            # set the socket_timeout now that we're connected
            sock.settimeout(self.socket_timeout)
            return sock

        except socket.error as _:
            err = _
            if sock is not None:
                sock.close()

    if err is not None:
        raise err
    raise socket.error("socket.getaddrinfo returned an empty list")
response解析
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
def read_response(self):
    response = self._buffer.readline()
    if not response:
        raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)

    byte, response = byte_to_chr(response[0]), response[1:]

    if byte not in ('-', '+', ':', '$', '*'):
        raise InvalidResponse("Protocol Error: %s, %s" %
                              (str(byte), str(response)))

    # server returned an error
    if byte == '-':
        response = nativestr(response)
        error = self.parse_error(response)
        # if the error is a ConnectionError, raise immediately so the user
        # is notified
        if isinstance(error, ConnectionError):
            raise error
        # otherwise, we're dealing with a ResponseError that might belong
        # inside a pipeline response. the connection's read_response()
        # and/or the pipeline's execute() will raise this error if
        # necessary, so just return the exception instance here.
        return error
    # single value
    elif byte == '+':
        pass
    # int value
    elif byte == ':':
        response = long(response)
    # bulk response
    elif byte == '$':
        length = int(response)
        if length == -1:
            return None
        response = self._buffer.read(length)
    # multi-bulk response
    elif byte == '*':
        length = int(response)
        if length == -1:
            return None
        response = [self.read_response() for i in xrange(length)]
    if isinstance(response, bytes):
        response = self.encoder.decode(response)
    return response
客户端打包
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def pack_command(self, *args):
    "Pack a series of arguments into the Redis protocol"
    output = []
    # the client might have included 1 or more literal arguments in
    # the command name, e.g., 'CONFIG GET'. The Redis server expects these
    # arguments to be sent separately, so split the first argument
    # manually. These arguments should be bytestrings so that they are
    # not encoded.
    if isinstance(args[0], unicode):
        args = tuple(args[0].encode().split()) + args[1:]
    elif b' ' in args[0]:
        args = tuple(args[0].split()) + args[1:]

    buff = SYM_EMPTY.join((SYM_STAR, str(len(args)).encode(), SYM_CRLF))

    buffer_cutoff = self._buffer_cutoff
    for arg in imap(self.encoder.encode, args):
        # to avoid large string mallocs, chunk the command into the
        # output list if we're sending large values
        if len(buff) > buffer_cutoff or len(arg) > buffer_cutoff:
            buff = SYM_EMPTY.join(
                (buff, SYM_DOLLAR, str(len(arg)).encode(), SYM_CRLF))
            output.append(buff)
            output.append(arg)
            buff = SYM_CRLF
        else:
            buff = SYM_EMPTY.join(
                (buff, SYM_DOLLAR, str(len(arg)).encode(),
                 SYM_CRLF, arg, SYM_CRLF))
    output.append(buff)
    return output

参考