Quality of Service (QoS)

At most once

Core NATS is a fire-and-forget messaging system. It will only hold messages in memory.

If a subscriber is not listening on the subject, or is not active when the message is sent, the message is not received.

At least/ exactly once

JetStream support higher qualities of service (at least once and exactly once)

Subject-based Messaging

消息基于 Subject 匹配和传递

特殊符号

  • .作为 token 分隔符
  • *正则匹配 一个 token
  • > 正则匹配一个或者多个 token,且只能位于末尾

subject_matching

Core NATS

基于at most once 的服务质量,Core NATS 有三种消息传递模式

  • publish-subscribe
  • request-reply
  • queuing

Publish-Subscribe

Publish-Subscribe 是一对多的通讯模式,一个发送者在一个 Subject 上发送的消息,任何活跃的订阅这个 Subject 的接收者都会收到消息

publish_subscribe

Request-Reply

Request-Reply 建立在 Publish-Subscribe 基础之上,接受者在收到消息后,会像 Reply subject 发送响应消息

request_reply

Queue Groups

Queue Groups 建立在 Request-Reply 基础之上,当有多个接收者时,这些接收者就形成了一个 Queue Group, 每次消息会随机有一个接受者来消费消息

queue_group

JetStream

JetStream 是建立在 Core NATS 上的分布式持久系统,需要 1、3 或 5 个 server (分别容忍 0、1 和 2 个 server 挂掉)

Stream

存储类型

  • File
  • Memory

保留策略

  • LimitsPolicy: MaxMsgs, MaxBytes, MaxAge, and MaxMsgsPerSubject
  • InterestPolicy: 当没有消费者消费或者所有消费者消费了,消息会删除
  • WorkQueuePolicy: 每个消息只能被消费一次,一个 WorkQueue 只能有一个消费者

丢弃策略

  • DiscardOld
  • DiscardNew

Consumer

Ack 策略

  • AckExplicit: 每个消息都 Ack
  • AckNone: 无 Ack
  • AckAll: Ack 最后收到的一个消息

Deliver 策略

  • DeliverAll: 接收所有消息
  • DeliverLast: 接收最新的消息
  • DeliverNew: 接收 Consumer 创建之后的消息
  • DeliverByStartSequence: 接收指定 OptStartSeq 之后的消息
  • DeliverByStartTime: 接收指定 OptStartTime 之后的消息

MaxAckPending

最大 Pending 数量,超过则不再传递消息

KV 存储

JetStream 可以提供 KV 功能, 具体包括增删改查,甚至可以监听 key的 改动、 获取 value 的历史版本

协议

NATS 使用文本格式的协议,基于 TCP 进行通信。以换行符做消息分隔符, 以空格或制表符作为字段分隔符。

可以直接使用 telnet 或者 netcat 作为客户端直接和服务段交互

1
nc -c localhost 4222

具体操作符类型如下:

类型 发送方 语法 备注
INFO Server INFO {"option_name":option_value,...}␍␊ 连接建立之后发送
CONNECT Client CONNECT {"option_name":option_value,...}␍␊ 指定连接信息
PUB Client PUB <subject> [reply-to] <#bytes>␍␊[payload]␍␊ 发布消息到服务段
SUB Client SUB <subject> [queue group] <sid>␍␊ 订阅消息
UNSUB Client UNSUB <sid> [max_msgs]␍␊ 取消订阅消息
MSG Server MSG <subject> <sid> [reply-to] <#bytes>␍␊[payload]␍␊ 转发消息给客户端
PING Both PING␍␊ 心跳请求
PONG Both PONG␍␊ 心跳相应
+OK Server +OK␍␊ 确认消息,verbose 模式才有
-ERR Server -ERR <error message>␍␊ 错误

源码阅读

数据解析

基于一个有限状态机,接受到的数据会一个字符一个字节去录入和转移状态

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
for i = 0; i < len(buf); i++ {
    b = buf[i]

    switch c.state {
    case OP_START:
       ...
    case OP_P:
        switch b {
        case 'U', 'u':
            c.state = OP_PU
        case 'I', 'i':
            c.state = OP_PI
        case 'O', 'o':
            c.state = OP_PO
        default:
            goto parseErr
        }
    case OP_PU:
        switch b {
        case 'B', 'b':
            c.state = OP_PUB
        default:
            goto parseErr
        }
    case OP_PUB:
        switch b {
        case ' ', '\t':
            c.state = OP_PUB_SPC
        default:
            goto parseErr
        }
    ...
    default:
        goto parseErr
    }
}

订阅列表

订阅列表使用前缀树来维护

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
type node struct {
	next  *level
	psubs []*subscription
	qsubs [][]*subscription
}

type level struct {
	nodes    map[string]*node
	pwc, fwc *node  // pwd、fwc 分别表示 '*' 和 '>' 订阅
}

插入操作

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
l := s.root
var n *node

for _, t := range tokens {
    if len(t) == 0 || sfwc {
        s.Unlock()
        return ErrInvalidSubject
    }

    switch t[0] {
    case pwc:
        n = l.pwc
    case fwc:
        n = l.fwc
        sfwc = true
    default:
        n = l.nodes[t]
    }
    if n == nil {
        n = newNode()
        switch t[0] {
        case pwc:
            l.pwc = n
        case fwc:
            l.fwc = n
        default:
            l.nodes[t] = n
        }
    }
    if n.next == nil {
        n.next = newLevel()
    }
    l = n.next
}
if sub.queue == nil {
    n.psubs = append(n.psubs, sub)
} else {
    // This is a queue subscription
    if i := findQSliceForSub(sub, n.qsubs); i >= 0 {
        n.qsubs[i] = append(n.qsubs[i], sub)
    } else {
        n.qsubs = append(n.qsubs, []*subscription{sub})
    }
}

删除操作

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
sfwc := false
l := s.root
var n *node

// Track levels for pruning
var lnts [32]lnt
levels := lnts[:0]


for _, t := range tokens {
    if len(t) == 0 || sfwc {
        return ErrInvalidSubject
    }
    if l == nil {
        return ErrNotFound
    }
    switch t[0] {
    case pwc:
        n = l.pwc
    case fwc:
        n = l.fwc
        sfwc = true
    default:
        n = l.nodes[t]
    }
    if n != nil {
        levels = append(levels, lnt{l, n, t})
        l = n.next
    } else {
        l = nil
    }
}


// 反向删除记录
for i := len(levels) - 1; i >= 0; i-- {
    l, n, t := levels[i].l, levels[i].n, levels[i].t
    if n.isEmpty() {
        l.pruneNode(n, t)
    }
}

查找操作

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// matchLevel is used to recursively descend into the trie.
func matchLevel(l *level, toks []string, results *SublistResult) {
	var pwc, n *node
	for i, t := range toks {
		if l == nil {
			return
		}
		if l.fwc != nil {
			addNodeToResults(l.fwc, results)
		}
		if pwc = l.pwc; pwc != nil {
			matchLevel(pwc.next, toks[i+1:], results)
		}
		n = l.nodes[t]
		if n != nil {
			l = n.next
		} else {
			l = nil
		}
	}
	if n != nil {
		addNodeToResults(n, results)
	}
	if pwc != nil {
		addNodeToResults(pwc, results)
	}
}

集群

集群节点两两之间会建立一个类型为 ROUTER 的 Client。

建立连接后,会通过 SUB 类型消息告知当前节点订阅列表

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
func (s *Server) sendLocalSubsToRoute(route *client) {
	b := bytes.Buffer{}
	s.mu.Lock()
	for _, client := range s.clients {
		client.mu.Lock()
		subs := make([]*subscription, 0, len(client.subs))
		for _, sub := range client.subs {
			subs = append(subs, sub)
		}
		client.mu.Unlock()
		for _, sub := range subs {
			rsid := routeSid(sub)
			proto := fmt.Sprintf(subProto, sub.subject, sub.queue, rsid)
			b.WriteString(proto)
		}
	}
	s.mu.Unlock()

	route.mu.Lock()
	defer route.mu.Unlock()
	route.sendProto(b.Bytes(), true)
}

当有新的节点加入后,会转发新节点的信息

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
func (s *Server) forwardNewRouteInfoToKnownServers(info *Info) {
	s.mu.Lock()
	defer s.mu.Unlock()

	b, _ := json.Marshal(info)
	infoJSON := []byte(fmt.Sprintf(InfoProto, b))

	for _, r := range s.routes {
		r.mu.Lock()
		if r.route.remoteID != info.ID {
			r.sendInfo(infoJSON)
		}
		r.mu.Unlock()
	}
}

当普通 Client 有新的订阅时,Route Client 会转发消息到所有其他节点

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// sid 会被重新, 添加 RSID 前缀和客户端信息等
func routeSid(sub *subscription) string {
	var qi string
	if len(sub.queue) > 0 {
		qi = "Q"
	}
	return fmt.Sprintf("%s%s:%d:%s", qi, RSID, sub.client.cid, sub.sid)
}


func (s *Server) broadcastInterestToRoutes(proto string) {
	var arg []byte
	if atomic.LoadInt32(&s.logging.trace) == 1 {
		arg = []byte(proto[:len(proto)-LEN_CR_LF])
	}
	protoAsBytes := []byte(proto)
	s.mu.Lock()
	for _, route := range s.routes {
		// FIXME(dlc) - Make same logic as deliverMsg
		route.mu.Lock()
		route.sendProto(protoAsBytes, true)
		route.mu.Unlock()
		route.traceOutOp("", arg)
	}
	s.mu.Unlock()
}

发布消息时,如果是 Route 类型的客户端,会对remoteID 做去重,以免给同一个 Route 客户端发送多次。 并且消息只会从 普通客户端发送到 Route 客户端,不会出现 Route 客户端 再发到 Route 客户端的情况

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// processMsg is called to process an inbound msg from a client.
func (c *client) processMsg(msg []byte) {
    var r *SublistResult
    isRoute := c.typ == ROUTER
    
    for _, sub := range r.psubs {
        // Check if this is a send to a ROUTER, make sure we only send it
		// once. The other side will handle the appropriate re-processing
		// and fan-out. Also enforce 1-Hop semantics, so no routing to another.
		if sub.client.typ == ROUTER {
		    // 只会转发一次
			if isRoute {
				continue
			}
			
			// route 去重
			if _, ok := rmap[sub.client.route.remoteID]; ok {
				c.Debugf("Ignoring route, already processed")
				sub.client.mu.Unlock()
				continue
			}
			rmap[sub.client.route.remoteID] = routeSeen
			sub.client.mu.Unlock()
		}
		// Normal delivery
		mh := c.msgHeader(msgh[:si], sub)
		c.deliverMsg(sub, mh, msg)
	}
}

Reference