Quality of Service (QoS)
At most once
Core NATS is a fire-and-forget messaging system. It will only hold messages in memory.
If a subscriber is not listening on the subject, or is not active when the message is sent, the message is not received.
At least/ exactly once
JetStream support higher qualities of service (at least once and exactly once)
Subject-based Messaging
消息基于 Subject 匹配和传递
特殊符号
.
作为 token 分隔符
*
正则匹配 一个 token
>
正则匹配一个或者多个 token,且只能位于末尾
Core NATS
基于at most once
的服务质量,Core NATS 有三种消息传递模式
- publish-subscribe
- request-reply
- queuing
Publish-Subscribe
Publish-Subscribe 是一对多的通讯模式,一个发送者在一个 Subject 上发送的消息,任何活跃的订阅这个 Subject 的接收者都会收到消息
Request-Reply
Request-Reply 建立在 Publish-Subscribe
基础之上,接受者在收到消息后,会像 Reply subject 发送响应消息
Queue Groups
Queue Groups 建立在 Request-Reply
基础之上,当有多个接收者时,这些接收者就形成了一个 Queue Group
,
每次消息会随机有一个接受者来消费消息
JetStream
JetStream 是建立在 Core NATS 上的分布式持久系统,需要 1、3 或 5 个 server (分别容忍 0、1 和 2 个 server 挂掉)
Stream
存储类型
保留策略
- LimitsPolicy: MaxMsgs, MaxBytes, MaxAge, and MaxMsgsPerSubject
- InterestPolicy: 当没有消费者消费或者所有消费者消费了,消息会删除
- WorkQueuePolicy: 每个消息只能被消费一次,一个 WorkQueue 只能有一个消费者
丢弃策略
Consumer
Ack 策略
- AckExplicit: 每个消息都 Ack
- AckNone: 无 Ack
- AckAll: Ack 最后收到的一个消息
Deliver 策略
- DeliverAll: 接收所有消息
- DeliverLast: 接收最新的消息
- DeliverNew: 接收 Consumer 创建之后的消息
- DeliverByStartSequence: 接收指定 OptStartSeq 之后的消息
- DeliverByStartTime: 接收指定 OptStartTime 之后的消息
MaxAckPending
最大 Pending 数量,超过则不再传递消息
KV 存储
JetStream 可以提供 KV 功能, 具体包括增删改查,甚至可以监听 key的 改动、 获取 value 的历史版本
协议
NATS 使用文本格式的协议,基于 TCP 进行通信。以换行符做消息分隔符, 以空格或制表符作为字段分隔符。
可以直接使用 telnet 或者 netcat 作为客户端直接和服务段交互
具体操作符类型如下:
类型 |
发送方 |
语法 |
备注 |
INFO |
Server |
INFO {"option_name":option_value,...}␍␊ |
连接建立之后发送 |
CONNECT |
Client |
CONNECT {"option_name":option_value,...}␍␊ |
指定连接信息 |
PUB |
Client |
PUB <subject> [reply-to] <#bytes>␍␊[payload]␍␊ |
发布消息到服务段 |
SUB |
Client |
SUB <subject> [queue group] <sid>␍␊ |
订阅消息 |
UNSUB |
Client |
UNSUB <sid> [max_msgs]␍␊ |
取消订阅消息 |
MSG |
Server |
MSG <subject> <sid> [reply-to] <#bytes>␍␊[payload]␍␊ |
转发消息给客户端 |
PING |
Both |
PING␍␊ |
心跳请求 |
PONG |
Both |
PONG␍␊ |
心跳相应 |
+OK |
Server |
+OK␍␊ |
确认消息,verbose 模式才有 |
-ERR |
Server |
-ERR <error message>␍␊ |
错误 |
源码阅读
数据解析
基于一个有限状态机,接受到的数据会一个字符一个字节去录入和转移状态
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
for i = 0; i < len(buf); i++ {
b = buf[i]
switch c.state {
case OP_START:
...
case OP_P:
switch b {
case 'U', 'u':
c.state = OP_PU
case 'I', 'i':
c.state = OP_PI
case 'O', 'o':
c.state = OP_PO
default:
goto parseErr
}
case OP_PU:
switch b {
case 'B', 'b':
c.state = OP_PUB
default:
goto parseErr
}
case OP_PUB:
switch b {
case ' ', '\t':
c.state = OP_PUB_SPC
default:
goto parseErr
}
...
default:
goto parseErr
}
}
|
订阅列表
订阅列表使用前缀树来维护
1
2
3
4
5
6
7
8
9
10
|
type node struct {
next *level
psubs []*subscription
qsubs [][]*subscription
}
type level struct {
nodes map[string]*node
pwc, fwc *node // pwd、fwc 分别表示 '*' 和 '>' 订阅
}
|
插入操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
|
l := s.root
var n *node
for _, t := range tokens {
if len(t) == 0 || sfwc {
s.Unlock()
return ErrInvalidSubject
}
switch t[0] {
case pwc:
n = l.pwc
case fwc:
n = l.fwc
sfwc = true
default:
n = l.nodes[t]
}
if n == nil {
n = newNode()
switch t[0] {
case pwc:
l.pwc = n
case fwc:
l.fwc = n
default:
l.nodes[t] = n
}
}
if n.next == nil {
n.next = newLevel()
}
l = n.next
}
if sub.queue == nil {
n.psubs = append(n.psubs, sub)
} else {
// This is a queue subscription
if i := findQSliceForSub(sub, n.qsubs); i >= 0 {
n.qsubs[i] = append(n.qsubs[i], sub)
} else {
n.qsubs = append(n.qsubs, []*subscription{sub})
}
}
|
删除操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
sfwc := false
l := s.root
var n *node
// Track levels for pruning
var lnts [32]lnt
levels := lnts[:0]
for _, t := range tokens {
if len(t) == 0 || sfwc {
return ErrInvalidSubject
}
if l == nil {
return ErrNotFound
}
switch t[0] {
case pwc:
n = l.pwc
case fwc:
n = l.fwc
sfwc = true
default:
n = l.nodes[t]
}
if n != nil {
levels = append(levels, lnt{l, n, t})
l = n.next
} else {
l = nil
}
}
// 反向删除记录
for i := len(levels) - 1; i >= 0; i-- {
l, n, t := levels[i].l, levels[i].n, levels[i].t
if n.isEmpty() {
l.pruneNode(n, t)
}
}
|
查找操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
// matchLevel is used to recursively descend into the trie.
func matchLevel(l *level, toks []string, results *SublistResult) {
var pwc, n *node
for i, t := range toks {
if l == nil {
return
}
if l.fwc != nil {
addNodeToResults(l.fwc, results)
}
if pwc = l.pwc; pwc != nil {
matchLevel(pwc.next, toks[i+1:], results)
}
n = l.nodes[t]
if n != nil {
l = n.next
} else {
l = nil
}
}
if n != nil {
addNodeToResults(n, results)
}
if pwc != nil {
addNodeToResults(pwc, results)
}
}
|
集群
集群节点两两之间会建立一个类型为 ROUTER 的 Client。
建立连接后,会通过 SUB
类型消息告知当前节点订阅列表
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
func (s *Server) sendLocalSubsToRoute(route *client) {
b := bytes.Buffer{}
s.mu.Lock()
for _, client := range s.clients {
client.mu.Lock()
subs := make([]*subscription, 0, len(client.subs))
for _, sub := range client.subs {
subs = append(subs, sub)
}
client.mu.Unlock()
for _, sub := range subs {
rsid := routeSid(sub)
proto := fmt.Sprintf(subProto, sub.subject, sub.queue, rsid)
b.WriteString(proto)
}
}
s.mu.Unlock()
route.mu.Lock()
defer route.mu.Unlock()
route.sendProto(b.Bytes(), true)
}
|
当有新的节点加入后,会转发新节点的信息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
func (s *Server) forwardNewRouteInfoToKnownServers(info *Info) {
s.mu.Lock()
defer s.mu.Unlock()
b, _ := json.Marshal(info)
infoJSON := []byte(fmt.Sprintf(InfoProto, b))
for _, r := range s.routes {
r.mu.Lock()
if r.route.remoteID != info.ID {
r.sendInfo(infoJSON)
}
r.mu.Unlock()
}
}
|
当普通 Client 有新的订阅时,Route Client 会转发消息到所有其他节点
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
// sid 会被重新, 添加 RSID 前缀和客户端信息等
func routeSid(sub *subscription) string {
var qi string
if len(sub.queue) > 0 {
qi = "Q"
}
return fmt.Sprintf("%s%s:%d:%s", qi, RSID, sub.client.cid, sub.sid)
}
func (s *Server) broadcastInterestToRoutes(proto string) {
var arg []byte
if atomic.LoadInt32(&s.logging.trace) == 1 {
arg = []byte(proto[:len(proto)-LEN_CR_LF])
}
protoAsBytes := []byte(proto)
s.mu.Lock()
for _, route := range s.routes {
// FIXME(dlc) - Make same logic as deliverMsg
route.mu.Lock()
route.sendProto(protoAsBytes, true)
route.mu.Unlock()
route.traceOutOp("", arg)
}
s.mu.Unlock()
}
|
发布消息时,如果是 Route 类型的客户端,会对remoteID 做去重,以免给同一个 Route 客户端发送多次。
并且消息只会从 普通客户端发送到 Route 客户端,不会出现 Route 客户端 再发到 Route 客户端的情况
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
// processMsg is called to process an inbound msg from a client.
func (c *client) processMsg(msg []byte) {
var r *SublistResult
isRoute := c.typ == ROUTER
for _, sub := range r.psubs {
// Check if this is a send to a ROUTER, make sure we only send it
// once. The other side will handle the appropriate re-processing
// and fan-out. Also enforce 1-Hop semantics, so no routing to another.
if sub.client.typ == ROUTER {
// 只会转发一次
if isRoute {
continue
}
// route 去重
if _, ok := rmap[sub.client.route.remoteID]; ok {
c.Debugf("Ignoring route, already processed")
sub.client.mu.Unlock()
continue
}
rmap[sub.client.route.remoteID] = routeSeen
sub.client.mu.Unlock()
}
// Normal delivery
mh := c.msgHeader(msgh[:si], sub)
c.deliverMsg(sub, mh, msg)
}
}
|
Reference