首頁>技術>

#程式設計師# #DevOps# #乾貨# #自動化運維# #saltstack#

2. 概念解釋

Salt Client

執行在Salt Master上,給Master傳送請求,並且得到結果返回的LocalClient類。

Salt Master有幾個重要的類

Publisher:給Minion傳送指令EventPublisher:Master上的事件釋出匯流排MWorkers:Salt Master的工作程序ReqServer:實現了一個MWorkers的多程序模型。接收Salt Client的請求或者Minion返回的結果,傳送給MWorker去處理3. 任務處理機制 在Master上執行salt '*' test.ping, 原理是透過SaltClient提交任務到ReqServer的TCP:4506埠,並且監聽在EventPublisher上獲取結果。ReqServer實現了一個MWorkers多程序模型。ReqServer收到請求後發給其中一個MWorker進行處理。MWorker做許可權檢查後將任務加密傳送給Publisher,並且把這個事件傳送給EventPublisher。所有Minion都事先連線到了Publisher的TCP:4505埠上,獲取到任務後解密處理。Minion處理完畢後將結果傳送到Master的ReqServer的TCP:4506埠。ReqServer又把這個結果發給了其中一個MWorker進行處理。MWorker拿到結果後進行解密,然後傳送到EventPublisher。此時SaltClient監聽在EventPublisher上等待獲取結果,碰到結果來了或者超時,命令列返回。3.1. 任務流程圖 3.2. 提交任務 入口 salt/scripts/salt.py呼叫 salt/salt/scripts.py: salt_main() 呼叫 salt/salt/cli/salt.py: SaltCMD:run()判斷是否為非同步任務,若是,jid = local.cmd_async(**kwargs)若否,則進行同步執行同步執行
    #    for full_ret in local.cmd_cli(**kwargs):         ret_, out, retcode = self._format_ret(full_ret)         ret.update(ret_)
local.cmd_cli呼叫 salt/salt/client/__init__.py: LocalClient:cmd_cli()cmd_cli()呼叫pub_data = self.run_job() -> self.pub() 往ReqServer的ret_port:tcp4506埠提交任務請求。
    #    def pub():        master_uri = 'tcp://' + salt.utils.ip_bracket(self.opts['interface']) + \                     ':' + str(self.opts['ret_port'])        channel = salt.transport.Channel.factory(self.opts,                                                 crypt='clear',                                                 master_uri=master_uri)        try:            payload = channel.send(payload_kwargs, timeout=timeout)        except SaltReqTimeoutError:            raise SaltReqTimeoutError(                'Salt request timed out. The master is not responding. '                'If this error persists after verifying the master is up, '                'worker_threads may need to be increased.'            )

這裡的salt.transport.Channel.factory最終會被例項化為AsyncZeroMQReqChannel,而且例項化的引數中有crypt=’clear’, 這就意味著,LocalClient給Master傳送的訊息都是Clear的,也就是未AES加密的。

任務內容的組裝在salt/client/__init__.py的def _prep_pub方法裡。其中cmd: 'publish'會在接下來的流程裡用到。

    #    def _prep_pub():    ...        payload_kwargs = {'cmd': 'publish',                          'tgt': tgt,                          'fun': fun,                          'arg': arg,                          'key': self.key,                          'tgt_type': expr_form,                          'ret': ret,                          'jid': jid}

最終在payload外包裝上’enc’: ‘clear’是在AsyncZeroMQReqChannel 裡的send方法實現的,然後呼叫_crypted_transfer函數里加上。

    #     @tornado.gen.coroutine        def send(self, load, tries=3, timeout=60):            '''            Send a request, return a future which will complete when we send the message            '''            if self.crypt == 'clear':                ret = yield self._uncrypted_transfer(load, tries=tries, timeout=timeout)            else:                ret = yield self._crypted_transfer(load, tries=tries, timeout=timeout)            raise tornado.gen.Return(ret)

最終提交到4506埠的資料變成了類似這樣的:

    #    {'enc': 'clear',     'load': {'arg': [],              'cmd': 'publish',              'fun': 'test.ping',              'jid': '',              'key': 'alsdkjfa.,maljf-==adflkjadflkjalkjadfadflkajdflkj',              'kwargs': {'show_jid': False, 'show_timeout': False},              'ret': '',              'tgt': 'client.jacky.com',              'tgt_type': 'glob',              'user': 'sudo_vagrant'}}
提交完任務後,cmd_cli()就呼叫 self.get_cli_event_returns()呼叫 self.get_iter_returns()判斷是否是MoM結構,若是ret_iter = self.get_returns_no_block('(salt/job|syndic/.*)/{0}'.format(jid), 'regex')若否,ret_iter = self.get_returns_no_block('salt/job/{0}'.format(jid))self.get_returns_no_block中
    #    while True:        raw = self.event.get_event(wait=0.01, tag=tag, match_type=match_type, full=True, no_block=True)        yield raw
呼叫了salt/utils/event.py:SaltEvent:get_event(), get_event又呼叫了_get_event(), 在裡面其以SUB的角色連線到了EventPublish監聽任務結果的訊息。

提交的任務的event內容是

    #    [DEBUG   ] Sending event - data = {'tgt_type': 'glob', 'jid': '20151123220104386580', 'tgt': 'client.jacky.com', '_stamp'    : '2015-11-23T14:01:04.386948', 'user': 'sudo_vagrant', 'arg': [], 'fun': 'test.ping', 'minions': ['client.jacky.com']}

這是salt命令後提交任務的debug輸出

    #     [root@master base]# salt 'client.jacky.com' test.ping -l debug    [DEBUG   ] Reading configuration from /etc/salt/master    [DEBUG   ] Missing configuration file: /root/.saltrc    [DEBUG   ] Configuration file path: /etc/salt/master    [WARNING ] Insecure logging configuration detected! Sensitive data may be logged.    [DEBUG   ] Reading configuration from /etc/salt/master    [DEBUG   ] Missing configuration file: /root/.saltrc    [DEBUG   ] MasterEvent PUB socket URI: ipc:///var/run/salt/master/master_event_pub.ipc    [DEBUG   ] MasterEvent PULL socket URI: ipc:///var/run/salt/master/master_event_pull.ipc    [DEBUG   ] Initializing new AsyncZeroMQReqChannel for ('/etc/salt/pki/master', 'master.jacky.com_master', 'tcp://127.0.0.1:4506', 'clear')    [DEBUG   ] LazyLoaded config.option    [DEBUG   ] get_iter_returns for jid 20151124092621201574 sent to set(['client.jacky.com']) will timeout at 09:26:26.241828    [DEBUG   ] jid 20151124092621201574 return from client.jacky.com    client.jacky.com:    	True    [DEBUG   ] jid 20151124092621201574 found all minions set(['client.jacky.com'])
3.3. ReqServer實現的多程序模型

ReqServer的啟動入口在salt/master.py裡。

    #    req_channels = []    for transport, opts in iter_transport_opts(self.opts):        chan = salt.transport.server.ReqServerChannel.factory(opts)        chan.pre_fork(self.process_manager)        req_channels.append(chan)

利用ReqServerChannel的工廠函式例項化ZeroMQ或Raet等其他協議的通訊通道,具體的實現在salt/transport/zeromq.py裡。 chan.pre_fork(self.process_manager) -> process_manager.add_process(self.zmq_device), 在zmq_device實現了Router+Dealer的模型,並且Client(Router)綁定了tcp://ip:4506埠,Worker(Dealer)根據ipc_mode的配置綁定了tcp://127.0.0.1:4515或者ipc://workers.ipc。

    #    while True:        try:            zmq.device(zmq.QUEUE, self.clients, self.workers)        except zmq.ZMQError as exc:            if exc.errno == errno.EINTR:                continue            raise exc

zmq.device對clients和worker之間的訊息傳遞進行了代理。

    #    for ind in range(int(self.opts['worker_threads'])):        self.process_manager.add_process(MWorker,                                         args=(self.opts,                                               self.master_key,                                               self.key,                                               req_channels,                                               ),                                         )    self.process_manager.run()

根據配置檔案裡worker_threads 的配置啟動相應個數的MWorker。

關於ZeroMQReqServerChannel的實現在salt/transport/zeromq.py中。客戶端會透過AsyncReqMessageClient往Router:4506埠提交請求。MWorker繼承自multiprocessing.Process,每個MWorker被建立的時候都傳入了req_channels ,在run()方法裡呼叫了self.__bind(), 透過post_fork()建立了REP型別的連線到Dealer上,等待處理任務。

    #    def __bind(self):        '''        Bind to the local port        '''        # using ZMQIOLoop since we *might* need zmq in there        zmq.eventloop.ioloop.install()        self.io_loop = zmq.eventloop.ioloop.ZMQIOLoop()        for req_channel in self.req_channels:            req_channel.post_fork(self._handle_payload, io_loop=self.io_loop)  # TODO: cleaner? Maybe lazily?        self.io_loop.start()

至此,這樣一個多程序模型就建立完畢。(該圖來自ZeroMQ官網)

3.4. MWorker的處理邏輯

MWorker的啟動入口在salt/master.py裡。

程式入口是run()方法。設定了self.clear_funcs和self.aes_funcs方法,並且呼叫__bind方法,透過post_fork()傳入了self._handle_payload任務處理函式,並且建立了REP型別的連線到Dealer上,等待處理任務。

    #    def __bind(self):        '''        Bind to the local port        '''        # using ZMQIOLoop since we *might* need zmq in there        zmq.eventloop.ioloop.install()        self.io_loop = zmq.eventloop.ioloop.ZMQIOLoop()        for req_channel in self.req_channels:            req_channel.post_fork(self._handle_payload, io_loop=self.io_loop)  # TODO: cleaner? Maybe lazily?        self.io_loop.start()

post_fork()裡呼叫ZeroMQReqServerChannel或者其他ReqServerChannel的實現,以ZeroMQReqServerChannel的post_fork為例,處理任務的入口在這幾行程式碼, 設定了任務處理函式 payload_handler。self.handle_message是MWorker在接收到訊息時的一個回撥函式,在裡面呼叫了payload_handler去處理任務。

    #    salt.transport.mixins.auth.AESReqServerMixin.post_fork(self, payload_handler, io_loop)    self.stream = zmq.eventloop.zmqstream.ZMQStream(self._socket, io_loop=self.io_loop)    self.stream.on_recv_stream(self.handle_message)

透過_handle_payload方法,我們可以看到cmd: 'publish', 那麼MWorker就會呼叫self._handle_clear.publish方法,將任務load傳送到PubServerChannel的PULL介面。

    #    @tornado.gen.coroutine    def _handle_payload(self, payload):        '''        The _handle_payload method is the key method used to figure out what        needs to be done with communication to the server        Example cleartext payload generated for 'salt myminion test.ping':        {'enc': 'clear',         'load': {'arg': [],                  'cmd': 'publish',                  'fun': 'test.ping',                  'jid': '',                  'key': 'alsdkjfa.,maljf-==adflkjadflkjalkjadfadflkajdflkj',                  'kwargs': {'show_jid': False, 'show_timeout': False},                  'ret': '',                  'tgt': 'myminion',                  'tgt_type': 'glob',                  'user': 'root'}}        :param dict payload: The payload route to the appropriate handler        '''        key = payload['enc']        load = payload['load']        ret = {'aes': self._handle_aes,               'clear': self._handle_clear}[key](load)        raise tornado.gen.Return(ret)    def _handle_clear(self, load):        '''        Process a cleartext command        :param dict load: Cleartext payload        :return: The result of passing the load to a function in ClearFuncs corresponding to                 the command specified in the load's 'cmd' key.        '''        log.trace('Clear payload received with command {cmd}'.format(**load))        if load['cmd'].startswith('__'):            return False        return getattr(self.clear_funcs, load['cmd'])(load), {'fun': 'send_clear'}
3.5. Publisher實現的Pub-Sub任務分發機制

Publisher的入口在salt/master.py裡。

在Master的啟動過程中start()函數里,建立了PubServerChannel,根據傳入引數不同建立ZeroMQPubServerChannel或者其他協議的實現,具體的實現在salt/transport/zeromq.py裡。這裡以PubServerChannel為例。

    #    for transport, opts in iter_transport_opts(self.opts):        chan = salt.transport.server.PubServerChannel.factory(opts)        chan.pre_fork(process_manager)        pub_channels.append(chan)
    #    def pre_fork(self, process_manager):        '''        Do anything necessary pre-fork. Since this is on the master side this will        primarily be used to create IPC channels and create our daemon process to        do the actual publishing        :param func process_manager: A ProcessManager, from salt.utils.process.ProcessManager        '''        process_manager.add_process(self._publish_daemon)

例項化chan後,呼叫pre_fork方法,利用_publish_daemon方法啟動新Daemon程序,監聽了pull_uri, pub_uri,並且在while迴圈裡將pull_uri接收到的任務轉發到pub_uri裡。

    #    def _publish_daemon(self):    ...       while True:            # Catch and handle EINTR from when this process is sent            # SIGUSR1 gracefully so we don't choke and die horribly            try:                package = pull_sock.recv()                unpacked_package = salt.payload.unpackage(package)                payload = unpacked_package['payload']                if self.opts['zmq_filtering']:                    # if you have a specific topic list, use that                    if 'topic_lst' in unpacked_package:                        for topic in unpacked_package['topic_lst']:                            # zmq filters are substring match, hash the topic                            # to avoid collisions                            htopic = hashlib.sha1(topic).hexdigest()                            pub_sock.send(htopic, flags=zmq.SNDMORE)                            pub_sock.send(payload)                            # otherwise its a broadcast                    else:                        # TODO: constants file for "broadcast"                        pub_sock.send('broadcast', flags=zmq.SNDMORE)                        pub_sock.send(payload)                else:                    pub_sock.send(payload)            except zmq.ZMQError as exc:                if exc.errno == errno.EINTR:                    continue                raise exc

MWorker將任務提交到Publisher是在self._handle_clear.publish方法裡實現的。

    #    def publish():    ...         # Send it!        self._send_pub(payload)    ...    def _send_pub(self, load):        '''        Take a load and send it across the network to connected minions        '''        for transport, opts in iter_transport_opts(self.opts):            chan = salt.transport.server.PubServerChannel.factory(opts)            chan.publish(load)

這裡的chan就是ZeroMQPubServerChannel的例項,呼叫該例項的publish方法,將payload加密,然後傳送給Publisher的pull_uri,即publish_pull.ipc埠。

    #    payload = {'enc': 'aes'}    crypticle = salt.crypt.Crypticle(self.opts, salt.master.SMaster.secrets['aes']['secret'].value)    payload['load'] = crypticle.dumps(load)    ...    pub_sock.connect(pull_uri)    ...    pub_sock.send(self.serial.dumps(int_payload))    ...

這是Master上的debug輸出

    #    [DEBUG   ] Sending event - data = {'_stamp': '2015-11-24T00:39:00.185467', 'minions': ['client.jacky.com']}    [DEBUG   ] Sending event - data = {'tgt_type': 'glob', 'jid': '20151124083900185051', 'tgt': 'client.jacky.com', '_stamp': '2015-11-24T00:39:00.186134', 'user': 'sudo_vagrant', 'arg': [], 'fun': 'test.ping', 'minions': ['client.jacky.com']}    [INFO    ] User sudo_vagrant Published command test.ping with jid 20151124083900185051    [DEBUG   ] Published command details {'tgt_type': 'glob', 'jid': '20151124083900185051', 'tgt': 'client.jacky.com', 'ret': 'mysql', 'user': 'sudo_vagrant', 'arg': [], 'fun': 'test.ping'}    [INFO    ] Got return from client.jacky.com for job 20151124083900185051    [DEBUG   ] Sending event - data = {'fun_args': [], 'jid': '20151124083900185051', 'return': True, 'retcode': 0, 'success': True, 'cmd': '_return', '_stamp': '2015-11-24T00:39:00.303857', 'fun': 'test.ping', 'id': 'client.jacky.com'}
3.6. Minion連線到Publisher獲取和處理任務

Minion在啟動過程中最終會進入tune_in方法,然後陷入無限循壞,不斷處理任務和返回結果。實現在salt/minion.py裡。

tune_in呼叫self.connect_masterconnect_master呼叫self.eval_master(self.opts, self.timeout, self.safe)eval_master中利用AsyncPubChannel的factory方法,根據不同引數例項化相應的pub_channel,這裡以AsyncZeroMQPubChannel為例,具體的實現在salt/transport/zeromq.py裡。
    #    try:        pub_channel = salt.transport.client.AsyncPubChannel.factory(opts, **factory_kwargs)        yield pub_channel.connect()        conn = True        break    except SaltClientError:        msg = ('Master {0} could not be reached, trying '               'next master (if any)'.format(opts['master']))        log.info(msg)        continue
呼叫AsyncZeroMQPubChannel的connect方法,以SUB的角色連線到Master的PUB埠4505。
        #        def connect(self):            if not self.auth.authenticated:                yield self.auth.authenticate()            self.publish_port = self.auth.creds['publish_port']            self._socket.connect(self.master_pub)    
tune_in方法裡,連線到Master之後,設定任務處理的回撥函式
    #    # add handler to subscriber    self.pub_channel.on_recv(self._handle_payload)
在pub_channel.on_recv其實就是AsyncZeroMQPubChannel.on_recv()方法,封裝了對收到的message進行解密。
     #     def on_recv():             @tornado.gen.coroutine            def wrap_callback(messages):                payload = yield self._decode_messages(messages)                if payload is not None:                    callback(payload)

self._decode_messages()呼叫了self._decode_payload()進行解密。見salt/transport/mixins/auth.py的class AESPubClientMixin(object)。

    #    @tornado.gen.coroutine    def _decode_payload(self, payload):        # we need to decrypt it        log.trace('Decoding payload: {0}'.format(payload))        if payload['enc'] == 'aes':            self._verify_master_signature(payload)            try:                payload['load'] = self.auth.crypticle.loads(payload['load'])            except salt.crypt.AuthenticationError:                yield self.auth.authenticate()                payload['load'] = self.auth.crypticle.loads(payload['load'])        raise tornado.gen.Return(payload)
self._handle_payload呼叫了_handle_decoded_payload,根據data['fun']的型別初始化target,接下來以target建立新的程序或者執行緒去處理這個任務(根據master裡multiprocessing引數來判斷是建立新程序還是執行緒)。具體處理邏輯在_thread_return或者_thread_multi_return裡,這裡以_thread_return為例。
        #        if isinstance(data['fun'], tuple) or isinstance(data['fun'], list):            target = Minion._thread_multi_return        else:            target = Minion._thread_return    
_thread_return中對任務進行處理,然後用minion_instance._return_pub返回結果給Master的4506埠。
    #    try:        func = minion_instance.functions[data['fun']]        args, kwargs = load_args_and_kwargs(            func,            data['arg'],            data)        minion_instance.functions.pack['__context__']['retcode'] = 0        if opts.get('sudo_user', ''):            sudo_runas = opts.get('sudo_user')            if 'sudo.salt_call' in minion_instance.functions:                return_data = minion_instance.functions['sudo.salt_call'](                        sudo_runas,                        data['fun'],                        *args,                        **kwargs)        else:            return_data = func(*args, **kwargs)
_return_pub方法中呼叫salt.transport.Channel的factory工廠方法例項化到master到連線,這裡的channel就是AsyncZeroMQReqChannel,使用channel.send方法將結果傳送。
    #    def _return_pub():    ...        channel = salt.transport.Channel.factory(self.opts)    ...            try:            ret_val = channel.send(load, timeout=timeout)        except SaltReqTimeoutError:            msg = ('The minion failed to return the job information for job '                   '{0}. This is often due to the master being shut down or '                   'overloaded. If the master is running consider increasing '                   'the worker_threads value.').format(jid)            log.warn(msg)            return ''

這裡的AsyncZeroMQReqChannel.send預設會使用aes加密的方式對訊息進行傳送。

    #     @tornado.gen.coroutine        def send(self, load, tries=3, timeout=60):            '''            Send a request, return a future which will complete when we send the message            '''            if self.crypt == 'clear':                ret = yield self._uncrypted_transfer(load, tries=tries, timeout=timeout)            else:                ret = yield self._crypted_transfer(load, tries=tries, timeout=timeout)            raise tornado.gen.Return(ret)

這是Minion上的debug輸出

    #     [INFO    ] User sudo_vagrant Executing command test.ping with jid 20151124083900185051    [DEBUG   ] Command details {'tgt_type': 'glob', 'jid': '20151124083900185051', 'tgt': 'client.jacky.com', 'ret': 'mysql', 'user': 'sudo_vagrant', 'arg': [], 'fun': 'test.ping'}    [INFO    ] Starting a new job with PID 27556    [DEBUG   ] LazyLoaded test.ping    [DEBUG   ] Minion return retry timer set to 2 seconds (randomized)    [INFO    ] Returning information for job: 20151124083900185051    [DEBUG   ] Initializing new AsyncZeroMQReqChannel for ('/etc/salt/pki/minion', 'client.jacky.com', 'tcp://192.168.33.20:4506', 'aes')    [DEBUG   ] Initializing new SAuth for ('/etc/salt/pki/minion', 'client.jacky.com', 'tcp://192.168.33.20:4506')
3.7. SaltClient將結果返回

在3.1裡講到了Client以SUB的角色連線到了Master的EventPublisher上監聽結果。自此,執行一個命令的閉環就完成了。

4. 小結

分析完這個流程,幾點值得學習的地方是

11
最新評論
  • BSA-TRITC(10mg/ml) TRITC-BSA 牛血清白蛋白改性標記羅丹明
  • 不做實驗也能發40多篇SCI?從R語言出發,你也可以做到