首頁>技術>

使用kafka結合easyswoole非同步定時任務已經多程序來實現一個高效能的訊息佇列服務,主要用來實現飛飛物聯的裝置邏輯(規則引擎),比如根據感測器的資料發簡訊等等。

首先連線kafka,這裡的kaka我使用的百度雲提供的kafka服務,自己部署太麻煩而且難以維護,連線的參考例子在這裡/file/2020/01/04/20200104082602_7044.jpg ,其實最想使用的是微博的那個不使用擴充套件來連線kafka的庫,但是一直沒有解決使用ssl檔案連線的問題,因此就是用了rdkafka擴充套件,首先按照樣例中的說明安裝librdkafka

sh setup-librdkafka.shpecl install rdkafkaecho "extension=rdkafka.so" >> /etc/php.ini //根據實際位置

這樣就安裝好了librdkafka和php擴充套件,要注意的是版本號必須要新一些的,否則使用ssl的會報沒有該設定項的異常,排查這個異常花了一晚上的時間。

接下來在easyswoole建立一個連線kafka的基類,在飛飛物聯的專案中只會使用到consumer,因為producer的資料是來自天工的裝置資料

kafka.php – 連線kafka的基類

namespace App\\Lib\\Kafka;use \\RdKafka\\Conf;use \\RdKafka\\KafkaConsumer;use Swoole\\Exception;class Kafka{    private $topic = '';    private $config = [        'broker' => 'xxxxxxxxx:9092',        'security_protocol' => 'ssl',        'client_pem' => EASYSWOOLE_ROOT.'/App/Lib/Kafka/client.pem',        'client_key' => EASYSWOOLE_ROOT.'/App/Lib/Kafka/client.key',        'ca_pem' => EASYSWOOLE_ROOT.'/App/Lib/Kafka/ca.pem',        'group_id' => 'kafka-feifei-swoole-consumer'    ];    public function __construct($topic)    {        if(!extension_loaded(rdkafka)){            throw new Exception('rdkafka.so擴充套件必須開啟');        }        if(!isset($topic) || empty($topic)){            throw new Exception('kafak例項化必須設定topic');        }        $this->topic = $topic;    }    public function subscribe(){        $conf = new \\RdKafka\\Conf();        $conf->set('metadata.broker.list', $this->config['broker']);        $conf->set('group.id', $this->config['group_id'].rand(0,10));        $conf->set('security.protocol', $this->config['security_protocol']);        $conf->set('ssl.certificate.location', $this->config['client_pem']);        $conf->set('ssl.key.location', $this->config['client_key']);        $conf->set('ssl.ca.location', $this->config['ca_pem']);        $consumer = new \\RdKafka\\KafkaConsumer($conf);        $consumer->subscribe([$this->topic]);        return $consumer;    }}

這裡需要特別注意的是PHPstorm的程式碼檢查器好像找不到rdkafka這個擴充套件,但是沒有關係,我沒可以在初始化這個類的時候判斷一下擴充套件是否存在。這裡只實現了消費者,要使用消費者需要例項化的時候傳入消費者的topic,然後呼叫subscribe方法,接下來實際在easyswoole的mainServiceCreate中建立三個程序來處理kafka的訂閱事件

public static function mainServerCreate(EventRegister $register){    // TODO: Implement mainServerCreate() method.//    註冊Kafka消費事件, 開三個程序來處理    $allNum = 3;    for($i = 0; $i < $allNum; $i++){        ServerManager::getInstance()->getSwooleServer()->addProcess((new Consumer("consumer_{$i}"))->getProcess());    }}

這裡new的Consumer就是處理消費的程序

Consumer.php

<?php/** * Created by bingxiong. * Date: 4/8/19 * Time: 10:22 PM * Description: */namespace App\\Lib\\Kafka;use EasySwoole\\Component\\Process\\AbstractProcess;class Consumer extends AbstractProcess{    private $isRun = false;    public function run($arg)    {        // 在這裡處理kafka連線        // TODO: Implement run() method.        $this->addTick(500,function (){            if(!$this->isRun){                $this->isRun = true;                // 連線kafka並訂閱TOPIC                $kafka = new Kafka('xxxxxxxxxxxx');//topic                $consumer = $kafka->subscribe();                while(true){                    try{                        $message = $consumer->consume(120*1000);                        if($message){                            switch ($message->err) {                                case RD_KAFKA_RESP_ERR_NO_ERROR:                                    echo 'process name is'.$this->getProcessName().'\\n';                                    echo "partition:", $message->partition,", offset:", $message->offset,", ", $message->payload, "\\n";                                    break;                                case RD_KAFKA_RESP_ERR__PARTITION_EOF:                                    echo "No more messages; will wait for more\\n";                                    break;                                case RD_KAFKA_RESP_ERR__TIMED_OUT:                                    echo "Timed out\\n";                                    break;                                default:                                    throw new \\Exception($message->errstr(), $message->err);                                    break;                            }                        }else{                            break;                        }                    }catch (\\Throwable $throwable){                        break;                    }                }                $this->isRun = false;            }            var_dump($this->getProcessName().' task run check');        });    }    public function onShutDown()    {        // TODO: Implement onShutDown() method.    }    public function onReceive(string $str, ...$args)    {        // TODO: Implement onReceive() method.    }}

這裡使用了一個非同步任務addTick,如果長期沒有訊息的話也會每500秒去檢查一下有沒有新的訊息。這裡還是用了一個死迴圈,在這裡死迴圈中持續處理訊息過來之後的邏輯

1.png

現在已經使用swoole+kafka拿到裝置的資料了,接下來就是使用非同步任務或者非同步redis之類的去執行相應的業務邏輯了。

最新評論
  • BSA-TRITC(10mg/ml) TRITC-BSA 牛血清白蛋白改性標記羅丹明
  • 複雜分散式架構下的計算治理之路