使用kafka結合easyswoole非同步定時任務已經多程序來實現一個高效能的訊息佇列服務,主要用來實現飛飛物聯的裝置邏輯(規則引擎),比如根據感測器的資料發簡訊等等。
首先連線kafka,這裡的kaka我使用的百度雲提供的kafka服務,自己部署太麻煩而且難以維護,連線的參考例子在這裡/file/2020/01/04/20200104082602_7044.jpg ,其實最想使用的是微博的那個不使用擴充套件來連線kafka的庫,但是一直沒有解決使用ssl檔案連線的問題,因此就是用了rdkafka擴充套件,首先按照樣例中的說明安裝librdkafka
sh setup-librdkafka.shpecl install rdkafkaecho "extension=rdkafka.so" >> /etc/php.ini //根據實際位置
這樣就安裝好了librdkafka和php擴充套件,要注意的是版本號必須要新一些的,否則使用ssl的會報沒有該設定項的異常,排查這個異常花了一晚上的時間。
接下來在easyswoole建立一個連線kafka的基類,在飛飛物聯的專案中只會使用到consumer,因為producer的資料是來自天工的裝置資料
kafka.php – 連線kafka的基類
namespace App\\Lib\\Kafka;use \\RdKafka\\Conf;use \\RdKafka\\KafkaConsumer;use Swoole\\Exception;class Kafka{ private $topic = ''; private $config = [ 'broker' => 'xxxxxxxxx:9092', 'security_protocol' => 'ssl', 'client_pem' => EASYSWOOLE_ROOT.'/App/Lib/Kafka/client.pem', 'client_key' => EASYSWOOLE_ROOT.'/App/Lib/Kafka/client.key', 'ca_pem' => EASYSWOOLE_ROOT.'/App/Lib/Kafka/ca.pem', 'group_id' => 'kafka-feifei-swoole-consumer' ]; public function __construct($topic) { if(!extension_loaded(rdkafka)){ throw new Exception('rdkafka.so擴充套件必須開啟'); } if(!isset($topic) || empty($topic)){ throw new Exception('kafak例項化必須設定topic'); } $this->topic = $topic; } public function subscribe(){ $conf = new \\RdKafka\\Conf(); $conf->set('metadata.broker.list', $this->config['broker']); $conf->set('group.id', $this->config['group_id'].rand(0,10)); $conf->set('security.protocol', $this->config['security_protocol']); $conf->set('ssl.certificate.location', $this->config['client_pem']); $conf->set('ssl.key.location', $this->config['client_key']); $conf->set('ssl.ca.location', $this->config['ca_pem']); $consumer = new \\RdKafka\\KafkaConsumer($conf); $consumer->subscribe([$this->topic]); return $consumer; }}
這裡需要特別注意的是PHPstorm的程式碼檢查器好像找不到rdkafka這個擴充套件,但是沒有關係,我沒可以在初始化這個類的時候判斷一下擴充套件是否存在。這裡只實現了消費者,要使用消費者需要例項化的時候傳入消費者的topic,然後呼叫subscribe方法,接下來實際在easyswoole的mainServiceCreate中建立三個程序來處理kafka的訂閱事件
public static function mainServerCreate(EventRegister $register){ // TODO: Implement mainServerCreate() method.// 註冊Kafka消費事件, 開三個程序來處理 $allNum = 3; for($i = 0; $i < $allNum; $i++){ ServerManager::getInstance()->getSwooleServer()->addProcess((new Consumer("consumer_{$i}"))->getProcess()); }}
這裡new的Consumer就是處理消費的程序
Consumer.php
<?php/** * Created by bingxiong. * Date: 4/8/19 * Time: 10:22 PM * Description: */namespace App\\Lib\\Kafka;use EasySwoole\\Component\\Process\\AbstractProcess;class Consumer extends AbstractProcess{ private $isRun = false; public function run($arg) { // 在這裡處理kafka連線 // TODO: Implement run() method. $this->addTick(500,function (){ if(!$this->isRun){ $this->isRun = true; // 連線kafka並訂閱TOPIC $kafka = new Kafka('xxxxxxxxxxxx');//topic $consumer = $kafka->subscribe(); while(true){ try{ $message = $consumer->consume(120*1000); if($message){ switch ($message->err) { case RD_KAFKA_RESP_ERR_NO_ERROR: echo 'process name is'.$this->getProcessName().'\\n'; echo "partition:", $message->partition,", offset:", $message->offset,", ", $message->payload, "\\n"; break; case RD_KAFKA_RESP_ERR__PARTITION_EOF: echo "No more messages; will wait for more\\n"; break; case RD_KAFKA_RESP_ERR__TIMED_OUT: echo "Timed out\\n"; break; default: throw new \\Exception($message->errstr(), $message->err); break; } }else{ break; } }catch (\\Throwable $throwable){ break; } } $this->isRun = false; } var_dump($this->getProcessName().' task run check'); }); } public function onShutDown() { // TODO: Implement onShutDown() method. } public function onReceive(string $str, ...$args) { // TODO: Implement onReceive() method. }}
這裡使用了一個非同步任務addTick,如果長期沒有訊息的話也會每500秒去檢查一下有沒有新的訊息。這裡還是用了一個死迴圈,在這裡死迴圈中持續處理訊息過來之後的邏輯
1.png
現在已經使用swoole+kafka拿到裝置的資料了,接下來就是使用非同步任務或者非同步redis之類的去執行相應的業務邏輯了。