简介

Supervisor is a client/server system that allows its users to control a number of processes on UNIX-like operating systems.

当前最新版本为 4.2.4,发布于 2021-12-30

第一个发布版本为 1.0.3,发布于 2004-05-26

first_release

主要模块

supervisorctl

supervisorctl 是个命令行客户端,用户可以通过命令行输入指令来获取和变更进程状态等

命令列表

1
2
3
4
5
6
supervisor> help
default commands (type help <topic>):
=====================================
add    exit      open  reload  restart   start   tail
avail  fg        pid   remove  shutdown  status  update
clear  maintail  quit  reread  signal    stop    version

主流程

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
# file: supervisor/supervisorctl.py
class Controller(cmd.Cmd):
    def cmdloop(self, intro=None):
        while True:
            line = self.stdin.readline()
            stop = self.onecmd(line)
    def onecmd(self, line):
        cmd, arg, line = self.parseline(line)
        do_func = self._get_do_func(cmd)
        return do_func(arg)
        
    def _get_do_func(self, cmd):
        func_name = 'do_' + cmd
        func = getattr(self, func_name, self.default)
        return func
        
    def do_start(self, arg):
        supervisor = self.ctl.get_supervisor()
        names = arg.split()
        for name in names:
            result = supervisor.startProcess(name)
            self.ctl.output('%s: started' % name)

supervisord

主流程

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# file: supervisor/supervisord.py

class Supervisor:
    def __init__(self, options):
        self.options = options
        self.process_groups = {}
        # process_groups struct like:
        # {
        #    "groupA": {
        #        "groupA": <Subprocess instance>,
        #    },
        #    "groupB": {
        #        "programB1": <Subprocess instance>,
        #        "programB2": <Subprocess instance>,
        #    },
        # }
        
    def runforever(self):
        # 创建管理子进程的Subprocess类实例(实际子进程还没启动)
        for config in self.options.process_group_configs:
            self.add_process_group(config) 
            
        # 启动 httpservers
        self.options.openhttpservers(self)
        
        # 注册信号
        self.options.setsignals()
        
        # 全局变量:包含所有socket对象
        socket_map = self.options.get_socket_map()
        
        while 1:
            combined_map = {}
            combined_map.update(socket_map)
            
            # 添加子进程标准输入输出相关的pipe
            combined_map.update(self.get_process_map())
            
            # 注册文件描述符
            for fd, dispatcher in combined_map.items():
                if dispatcher.readable():
                    self.options.poller.register_readable(fd)
                if dispatcher.writable():
                    self.options.poller.register_writable(fd)
                    
            # 读取可读写的文件描述符
            r, w = self.options.poller.poll(timeout)
            
            # 处理对应事件
            for fd in r:
                if fd in combined_map:
                    dispatcher = combined_map[fd]
                    dispatcher.handle_read_event()
            for fd in w:
                if fd in combined_map:
                    dispatcher = combined_map[fd]
                    dispatcher.handle_write_event()
                    
            for group in pgroups:
                # 检查进程状态,确定是否要启动或者停止操作
                group.transition()
                
            # waitpid(0, 1) 检查是否有进程停止
            self.reap()
            
            # 信号处理: 是否要停止、重启、重新打开日志文件等
            self.handle_signal()
            
            # 发送心跳事件
            self.tick()

子进程实例

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63

def make_pipes(self, stderr=True):
    pipes = {}

    stdin, child_stdin = os.pipe()
    pipes['child_stdin'], pipes['stdin'] = stdin, child_stdin
    stdout, child_stdout = os.pipe()
    pipes['stdout'], pipes['child_stdout'] = stdout, child_stdout
    stderr, child_stderr = os.pipe()
    pipes['stderr'], pipes['child_stderr'] = stderr, child_stderr
    for fd in (pipes['stdout'], pipes['stderr'], pipes['stdin']):
        if fd is not None:
            flags = fcntl.fcntl(fd, fcntl.F_GETFL) | os.O_NDELAY   # O_NDELAY is an alias for O_NONBLOCK
            fcntl.fcntl(fd, fcntl.F_SETFL, flags)
    return pipes
    
class Subprocess(object):
    pid = 0
    def __init__(self, config):
        self.config = config
        self.dispatchers = {}
        self.pipes = {}
        self.state = ProcessStates.STOPPED
        
    def spawn(self):
        filename, argv = self.get_execv_args()
        self.dispatchers, self.pipes = self.config.make_dispatchers(self)
        
        pid = options.os.fork()
         
        if pid != 0:
            self._spawn_as_parent(pid)
        else:
            self._spawn_as_child(filename, argv)
            
    def _spawn_as_parent(self, pid):
        self.pid = pid
        options = self.config.options
        options.close_child_pipes(self.pipes)
        
    def _spawn_as_child(self, filename, argv):
        options = self.config.options
        options.setpgrp()
        
        # 文件描述符重定向
        options.dup2(self.pipes['child_stdin'], 0)
        options.dup2(self.pipes['child_stdout'], 1)
        if self.config.redirect_stderr:
            options.dup2(self.pipes['child_stdout'], 2)
        else:
            options.dup2(self.pipes['child_stderr'], 2)
        for i in range(3, options.minfds):
            options.close_fd(i)
            
        env = os.environ.copy()
        os.execve(filename, argv, env)
        
    def stop(self):
        return self.kill(self.config.stopsignal)
        
    def kill(self, sig)
        self.change_state(ProcessStates.STOPPING)
        os.kill(self.pid, sig)

http_server

支持基于 tcp socket 或者 unix domain socket 的 http server

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
class supervisor_af_inet_http_server(supervisor_http_server):
     def __init__(self, ip, port, logger_object):
        self.ip = ip
        self.port = port
        
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        
        self.handlers = []
        
        self.socket = sock
        sock.setblocking(0)
        sock.bind((ip, port))
        self.listen (1024)
        
        # 注册到全局 socket 字典里
        socket_map[sock.fileno] = self
        
    def handle_read_event(self):
        self.handle_accept()
        
    def handle_accept (self):
        conn, addr = self.socket.accept()
        http_channel(self, conn, addr)
        
class http_channel (asynchat.async_chat):
    def __init__ (self, server, conn, addr):
        self.socket = conn
        self.socket.setblocking(0)
        
        socket_map[self.socket.fileno] = self
        
        self.ac_in_buffer = b''
        self.ac_out_buffer = b''
        self.ac_out_buffer_size = 512
        terminator = '\r\n\r\n'
        
    def handle_read_event(self):
        self.handle_read()
        
    def handle_read (self):
        data = self.socket.recv(buffer_size)
        
        # 简化版 接收数据拼接 request, 实际逻辑更复杂
        self.ac_in_buffer += data
        if self.terminator in self.ac_in_buffer:
            request = self.new_reqeust()
            for h in self.server.handlers:
                if h.match (request):
                    h.handle_request (request)
        
    def handle_write_event(self):
        self.handle_write()
        
    def handle_write(self):
        obs = self.ac_out_buffer_size
        if len (self.ac_out_buffer) < obs:
            self.refill_buffer()
        if self.ac_out_buffer 
             num_sent = self.send (self.ac_out_buffer[:obs])
             if num_sent:
                self.ac_out_buffer = self.ac_out_buffer[num_sent:]

xmlrpc_interface

xmlrpc协议简介

xmlrpc 是一种基于 http 通信、使用 XML 格式序列化的远程过程调用方式

请求示例

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
POST /RPC2 HTTP/1.1  # 请求使用 POST 方法, path 为 /RPC2
Host: localhost:9011
Accept-Encoding: gzip
Content-Type: text/xml
User-Agent: Python-xmlrpc/3.8
Content-Length: 176

<?xml version='1.0'>
<methodCall> 
    <methodName>supervisor.stopProcess</methodName>
    <params>
        <param>
        <value><string>exits_123s</string</value>
        </param>
    </params>
</methodCall>

正常响应示例

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
HTTP/1.1 200 OK
Server: Medusa/1.12
Date: Thu, 24 Mar 2022 09:02:51 GMT
Content-Type: text/xml
Content-Length: 129

<?xml version='1.0'?>
<methodResponse>
    <params>
        <param>
            <value><boolean>1</boolean></value>
        </param>
    </params>
</methodResponse>

错误响应示例 (同样返回 http code 200 )

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
HTTP/1.1 200 OK
Server: Medusa/1.12
Date: Thu, 24 Mar 2022 08:53:09 GMT
Content-Type: text/xml
Content-Length: 279

<?xml version='1.0'?>
<methodResponse>
    <fault>
        <value>
            <struct>
                <member>
                    <name>faultCode</name>
                    <value><int>70</int></value>
                </member>
                <member>
                <name>faultString</name>
                <value><string>NOT_RUNNING: exits_123s</string></value>
                </member>
            </struct>
        </value>
    </fault>
</methodResponse>

supervisor中的实现

服务端 handler 实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
# file: supervisor/xmlrpcrpc.py
class supervisor_xmlrpc_handler(xmlrpc_handler):
    path = '/RPC2'
    def __init__(self, supervisord, subinterfaces):
        self.rpcinterface = RootRPCInterface(subinterfaces)
        self.supervisord = supervisord
        
    def match(self, request):
        return request.uri.startswith(self.path)
        
    def continue_request (self, data, request):
        params, method = xmlrpclib.loads (data)
        response = self.call (method, params)
        request.push (response)
        
    def call(self, method, params):
        return traverse(self.rpcinterface, method, params)
       

接口实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
# file: supervisor/rpcinterface.py

class SupervisorNamespaceRPCInterface:
    def __init__(self, supervisord):
        self.supervisord = supervisord
        
    def stopProcess(self, name, wait=True):
        process = self.supervisord.process_groups.get(name)
        if process is None:
            raise RPCError(Faults.BAD_NAME, name)
        if process.get_state() not in RUNNING_STATES:
            raise RPCError(Faults.NOT_RUNNING, name)

        msg = process.stop()
        if msg is not None:
            raise RPCError(Faults.FAILED, msg)
        return True

event

event 类型

  • SUPERVISOR_STATE_CHANGE: supervisord 状态相关
  • PROCESS_STATE: 子进程进程状态相关
  • PROCESS_GROUP: 进程组相关
  • TICK: 心跳相关

订阅实现逻辑

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
callbacks = []

def subscribe(type, callback):
    callbacks.append((type, callback))

def unsubscribe(type, callback):
    callbacks.remove((type, callback))

def notify(event):
    for type, callback in callbacks:
        if isinstance(event, type):
            callback(event)

eventlistener

eventlistener 是一种特殊的子进程,通过标准输入输出和 supervisord 来获取 event

  1. 向 stdout 写入 READY\n 向 supervisord 请求 event
  2. 读取 supervisord 在 stdin 写入的 event
  3. 向 stdout 写入 RESULT 2\nOK 向 supervisord 确认 event 已接收
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import sys

def write_stdout(s):
    # only eventlistener protocol messages may be sent to stdout
    sys.stdout.write(s)
    sys.stdout.flush()

def write_stderr(s):
    sys.stderr.write(s)
    sys.stderr.flush()

def main():
    while 1:
        # transition from ACKNOWLEDGED to READY
        write_stdout('READY\n')

        # read event
        line = sys.stdin.readline()
        headers = dict([ x.split(':') for x in line.split() ])
        
        payload = sys.stdin.read(int(headers['len']))
        

        # transition from READY to ACKNOWLEDGED
        write_stdout('RESULT 2\nOK')

if __name__ == '__main__':
    main()

总结

  1. 客户端和服务端通过 xmlrpc 通信
  2. 主进程通过将 socket 和 pipe 设为非阻塞模式,然后通过轮询处理读写事件
  3. 主进程和子进程之间通过信号和 pipe 通信

参考链接