首頁>技術>

這篇文章給大家分享的內容是關於Swoft 原始碼剖析之Swoole和Swoft的一些介紹(Task投遞/定時任務篇),有一定的參考價值,有需要的朋友可以參考一下。

喜歡我的文章就關注我吧,持續更新中!前言

Swoft的任務功能基於Swoole的Task機制,或者說Swoft的Task機制本質就是對Swoole的Task機制的封裝和加強。

任務投遞Task::deliver()將呼叫引數打包後根據$type引數通過Swoole的$server->taskCo()或$server->task()介面投遞到Task程序。Task本身始終是同步執行的,$type僅僅影響投遞這一操作的行為,Task::TYPE_ASYNC對應的$server->task()是非同步投遞,Task::deliver()呼叫後馬上返回;Task::TYPE_CO對應的$server->taskCo()是協程投遞,投遞後讓出協程控制,任務完成或執行超時後Task::deliver()才從協程返回。

任務執行
//Swoft\\Task\\Bootstrap\\Listeners\\TaskEventListener /** * The listener of swoole task * @SwooleListener({ * SwooleEvent::ON_TASK, * SwooleEvent::ON_FINISH, * }) */class TaskEventListener implements TaskInterface, FinishInterface{ /** * @param \\Swoole\\Server $server * @param int $taskId * @param int $workerId * @param mixed $data * @return mixed * @throws \\InvalidArgumentException */ public function onTask(Server $server, int $taskId, int $workerId, $data) { try { /* @var TaskExecutor $taskExecutor*/ $taskExecutor = App::getBean(TaskExecutor::class); $result = $taskExecutor->run($data); } catch (\\Throwable $throwable) { App::error(sprintf('TaskExecutor->run %s file=%s line=%d ', $throwable->getMessage(), $throwable->getFile(), $throwable->getLine())); $result = false;  // Release system resources App::trigger(AppEvent::RESOURCE_RELEASE);  App::trigger(TaskEvent::AFTER_TASK); } return $result; }}

此處是swoole.onTask的事件回撥,其職責僅僅是將將Worker程序投遞來的打包後的資料轉發給TaskExecutor。

Swoole的Task機制的本質是Worker程序將耗時任務投遞給同步的Task程序(又名TaskWorker)處理,所以swoole.onTask的事件回撥是在Task程序中執行的。上文說過,Worker程序是你大部分HTTP服務程式碼執行的環境,但是從TaskEventListener.onTask()方法開始,程式碼的執行環境都是Task程序,也就是說,TaskExecutor和具體的TaskBean都是執行在Task程序中的。

//Swoft\\Task\\TaskExecutor/** * The task executor * * @Bean() */class TaskExecutor{ /** * @param string $data * @return mixed */ public function run(string $data) { $data = TaskHelper::unpack($data);  $name = $data['name']; $type = $data['type']; $method = $data['method']; $params = $data['params']; $logid = $data['logid'] ?? uniqid('', true); $spanid = $data['spanid'] ?? 0;   $collector = TaskCollector::getCollector(); if (!isset($collector['task'][$name])) { return false; }  list(, $coroutine) = $collector['task'][$name]; $task = App::getBean($name); if ($coroutine) { $result = $this->runCoTask($task, $method, $params, $logid, $spanid, $name, $type); } else { $result = $this->runSyncTask($task, $method, $params, $logid, $spanid, $name, $type); }  return $result; }}

任務執行思路很簡單,將Worker程序發過來的資料解包還原成原來的呼叫引數,根據$name引數找到對應的TaskBean並呼叫其對應的task()方法。其中TaskBean使用類級別註解@Task(name="TaskName")或者@Task("TaskName")宣告。

值得一提的一點是,@Task註解除了name屬性,還有一個coroutine屬性,上述程式碼會根據該引數選擇使用協程的runCoTask()或者同步的runSyncTask()執行Task。但是由於而且由於Swoole的Task程序的執行是完全同步的,不支援協程,所以目前版本請該引數不要配置為true。同樣的在TaskBean中編寫的任務程式碼必須的同步阻塞的或者是要能根據環境自動將非同步非阻塞和協程降級為同步阻塞的

從Process中投遞任務

前面我們提到:

Swoole的Task機制的本質是Worker程序將耗時任務投遞給同步的Task程序(又名TaskWorker)處理。

換句話說,Swoole的$server->taskCo()或$server->task()都只能在Worker程序中使用。

這個限制大大的限制了使用場景。 如何能夠為了能夠在Process中投遞任務呢?Swoft為了繞過這個限制提供了Task::deliverByProcess()方法。其實現原理也很簡單,通過Swoole的$server->sendMessage()方法將呼叫資訊從Process中投遞到Worker程序中,然後由Worker程序替其投遞到Task程序當中,相關程式碼如下:

//Swoft\\Task\\Task.php/** * Deliver task by process * * @param string $taskName * @param string $methodName * @param array $params * @param string $type * @param int $timeout * @param int $workId * * @return bool */public static function deliverByProcess(string $taskName, string $methodName, array $params = [], int $timeout = 3, int $workId = 0, string $type = self::TYPE_ASYNC): bool{ /* @var PipeMessageInterface $pipeMessage */ $server = App::$server->getServer(); $pipeMessage = App::getBean(PipeMessage::class); $data = [ 'name' => $taskName, 'method' => $methodName, 'params' => $params, 'timeout' => $timeout, 'type' => $type, ];  $message = $pipeMessage->pack(PipeMessage::MESSAGE_TYPE_TASK, $data); return $server->sendMessage($message, $workId);}

資料打包後使用$server->sendMessage()投遞給Worker:

//Swoft\\Bootstrap\\Server\\ServerTrait.php/** * onPipeMessage event callback * * @param \\Swoole\\Server $server * @param int $srcWorkerId * @param string $message * @return void * @throws \\InvalidArgumentException */public function onPipeMessage(Server $server, int $srcWorkerId, string $message){ /* @var PipeMessageInterface $pipeMessage */ $pipeMessage = App::getBean(PipeMessage::class); list($type, $data) = $pipeMessage->unpack($message);  App::trigger(AppEvent::PIPE_MESSAGE, null, $type, $data, $srcWorkerId);}

$server->sendMessage後,Worker程序收到資料時會觸發一個swoole.pipeMessage事件的回撥,Swoft會將其轉換成自己的swoft.pipeMessage事件並觸發.

//Swoft\\Task\\Event\\Listeners\\PipeMessageListener.php/** * The pipe message listener * * @Listener(event=AppEvent::PIPE_MESSAGE) */class PipeMessageListener implements EventHandlerInterface{ /** * @param \\Swoft\\Event\\EventInterface $event */ public function handle(EventInterface $event) { $params = $event->getParams(); if (count($params) < 3) { return; }  list($type, $data, $srcWorkerId) = $params;  if ($type != PipeMessage::MESSAGE_TYPE_TASK) { return; }  $type = $data['type']; $taskName = $data['name']; $params = $data['params']; $timeout = $data['timeout']; $methodName = $data['method'];  // delever task Task::deliver($taskName, $methodName, $params, $type, $timeout); }}

swoft.pipeMessage事件最終由PipeMessageListener處理。在相關的監聽其中,如果發現swoft.pipeMessage事件由Task::deliverByProcess()產生的,Worker程序會替其執行一次Task::deliver(),最終將任務資料投遞到TaskWorker程序中。

一道簡單的回顧練習:從Task::deliverByProcess()到某TaskBean 最終執行任務,經歷了哪些程序,而呼叫鏈的哪些部分又分別是在哪些程序中執行?

從Command程序或其子程序中投遞任務
//Swoft\\Task\\QueueTask.php/** * @param string $data * @param int $taskWorkerId * @param int $srcWorkerId * * @return bool */public function deliver(string $data, int $taskWorkerId = null, $srcWorkerId = null){ if ($taskWorkerId === null) { $taskWorkerId = mt_rand($this->workerNum + 1, $this->workerNum + $this->taskNum); }  if ($srcWorkerId === null) { $srcWorkerId = mt_rand(0, $this->workerNum - 1); }  $this->check(); $data = $this->pack($data, $srcWorkerId); $result = \\msg_send($this->queueId, $taskWorkerId, $data, false); if (!$result) { return false; }  return true;}

對於Command程序的任務投遞,情況會更復雜一點。上文提到的Process,其往往衍生於Http/Rpc服務,作為同一個Manager的子孫程序,他們能夠拿到Swoole\\Server的控制代碼變數,從而通過$server->sendMessage(),$server->task()等方法進行任務投遞。

但在Swoft的體系中,還有一個十分路人的角色: Command。Command的程序從shell或cronb獨立啟動,和Http/Rpc服務相關的程序沒有親緣關係。因此Command程序以及從Command中啟動的Process程序是沒有辦法拿到Swoole\\Server的呼叫控制代碼直接通過UnixSocket進行任務投遞的。為了為這種程序提供任務投遞支援,Swoft利用了Swoole的Task程序的一個特殊功能----訊息佇列

同一個專案中Command和Http\\RpcServer 通過約定一個message_queue_key獲取到系統核心中的同一條訊息佇列,然後Comand程序就可以通過該訊息佇列向Task程序投遞任務了。該機制沒有提供對外的公開方法,僅僅被包含在Task::deliver()方法中,Swoft會根據當前環境隱式切換投遞方式。但該訊息佇列的實現依賴Semaphore拓展,如果你想使用,需要在編譯PHP時加上--enable-sysvmsg引數。

定時任務

除了手動執行的普通任務,Swoft還提供了精度為秒的定時任務功能用來在專案中替代Linux的Crontab功能.

Swoft用兩個前置Process---任務計劃程序:CronTimerProcess和任務執行程序CronExecProcess,和兩張記憶體資料表-----RunTimeTable(任務(配置)表)OriginTable((任務)執行表)用於定時任務的管理排程。兩張表的每行記錄的結構如下:

\\\\Swoft\\Task\\Crontab\\TableCrontab.php/** * 任務表,記錄使用者配置的任務資訊 * 表每行記錄包含的欄位如下,其中`rule`,`taskClass`,`taskMethod`生成key唯一確定一條記錄 * @var array $originStruct  */private $originStruct = [ 'rule' => [\\Swoole\\Table::TYPE_STRING, 100],//定時任務執行規則,對應@Scheduled註解的cron屬性 'taskClass' => [\\Swoole\\Table::TYPE_STRING, 255],//任務名 對應@Task的name屬性(預設為類名) 'taskMethod' => [\\Swoole\\Table::TYPE_STRING, 255],//Task方法,對應@Scheduled註解所在方法 'add_time' => [\\Swoole\\Table::TYPE_STRING, 11],//初始化該表內容時的10位時間戳]; /** * 執行表,記錄短時間內要執行的任務列表及其執行狀態 * 表每行記錄包含的欄位如下,其中`taskClass`,`taskMethod`,`minute`,`sec`生成key唯一確定一條記錄 * @var array $runTimeStruct  */private $runTimeStruct = [ 'taskClass' => [\\Swoole\\Table::TYPE_STRING, 255],//同上 'taskMethod' => [\\Swoole\\Table::TYPE_STRING, 255],//同上 'minute' => [\\Swoole\\Table::TYPE_STRING, 20],//需要執行任務的時間,精確到分鐘 格式date('YmdHi') 'sec' => [\\Swoole\\Table::TYPE_STRING, 20],//需要執行任務的時間,精確到分鐘 10位時間戳 'runStatus' => [\\Swoole\\TABLE::TYPE_INT, 4],//任務狀態,有 0(未執行) 1(已執行) 2(執行中) 三種。  //注意:這裡的執行是一個容易誤解的地方,此處的執行並不是指任務本身的執行,而是值`任務投遞`這一操作的執行,從巨集觀上看換成 _未投遞_,_已投遞_,_投遞中_描述會更準確。];
此處為何要使用Swoole的記憶體Table?

Swoft的的定時任務管理是分別由 任務計劃程序 和 任務執行程序 程序負責的。兩個程序的執行共同管理定時任務,如果使用程序間獨立的array()等結構,兩個程序必然需要頻繁的程序間通訊。而使用跨程序的Table(本文的Table,除非特別說明,都指Swoole的Swoole\\Table結構)直接進行程序間資料共享,不僅效能高,操作簡單 還解耦了兩個程序。

為了Table能夠在兩個程序間共同使用,Table必須在Swoole Server啟動前建立並分配記憶體。具體程式碼在Swoft\\Task\\Bootstrap\\Listeners->onBeforeStart()中,比較簡單,有興趣的可以自行閱讀。

背景介紹完了,我們來看看這兩個定時任務程序的行為

//Swoft\\Task\\Bootstrap\\Process\\CronTimerProcess.php/** * Crontab timer process * * @Process(name="cronTimer", boot=true) */class CronTimerProcess implements ProcessInterface{ /** * @param \\Swoft\\Process\\Process $process */ public function run(SwoftProcess $process) { //code.... /* @var \\Swoft\\Task\\Crontab\\Crontab $cron*/ $cron = App::getBean('crontab');  // Swoole/HttpServer $server = App::$server->getServer();  $time = (60 - date('s')) * 1000; $server->after($time, function () use ($server, $cron) { // Every minute check all tasks, and prepare the tasks that next execution point needs $cron->checkTask(); $server->tick(60 * 1000, function () use ($cron) { $cron->checkTask(); }); }); }}
//Swoft\\Task\\Crontab\\Crontab.php/** * 初始化runTimeTable資料 * * @param array $task 任務 * @param array $parseResult 解析crontab命令規則結果,即Task需要在當前分鐘內的哪些秒執行 * @return bool */private function initRunTimeTableData(array $task, array $parseResult): bool{ $runTimeTableTasks = $this->getRunTimeTable()->table;  $min = date('YmdHi'); $sec = strtotime(date('Y-m-d H:i')); foreach ($parseResult as $time) { $this->checkTaskQueue(false); $key = $this->getKey($task['rule'], $task['taskClass'], $task['taskMethod'], $min, $time + $sec); $runTimeTableTasks->set($key, [ 'taskClass' => $task['taskClass'], 'taskMethod' => $task['taskMethod'], 'minute' => $min, 'sec' => $time + $sec, 'runStatus' => self::NORMAL ]); }  return true;}

CronTimerProcess是Swoft的定時任務排程程序,其核心方法是Crontab->initRunTimeTableData()。

該程序使用了Swoole的定時器功能,通過Swoole\\Timer在每分鐘首秒時執行的回撥,CronTimerProcess每次被喚醒後都會遍歷任務表計算出當前這一分鐘內的60秒分別需要執行的任務清單,寫入執行表並標記為 未執行。

//Swoft\\Task\\Bootstrap\\Process/** * Crontab process * * @Process(name="cronExec", boot=true) */class CronExecProcess implements ProcessInterface{ /** * @param \\Swoft\\Process\\Process $process */ public function run(SwoftProcess $process) { $pname = App::$server->getPname(); $process->name(sprintf('%s cronexec process', $pname));  /** @var \\Swoft\\Task\\Crontab\\Crontab $cron */ $cron = App::getBean('crontab');  // Swoole/HttpServer $server = App::$server->getServer();  $server->tick(0.5 * 1000, function () use ($cron) { $tasks = $cron->getExecTasks(); if (!empty($tasks)) { foreach ($tasks as $task) {  // Diliver task  Task::deliverByProcess($task['taskClass'], $task['taskMethod']);  $cron->finishTask($task['key']); } } }); }}

CronExecProcess作為定時任務的執行者,通過Swoole\\Timer每0.5s喚醒自身一次,然後把 執行表 遍歷一次,挑選當下需要執行的任務,通過sendMessage()投遞出去並更新該 任務執行表中的狀態。該執行程序只負責任務的投遞,任務的實際實際執行仍然在Task程序中由TaskExecutor處理。

52

PHP

最新評論
  • BSA-TRITC(10mg/ml) TRITC-BSA 牛血清白蛋白改性標記羅丹明
  • 為你的uniapp設定一個全域性元件