首頁>技術>

前言

Apache Apollo是一個代理伺服器,其是在ActiveMQ基礎上發展而來的,可以支援STOMP, AMQP, MQTT, Openwire, SSL, and WebSockets 等多種協議。

正文

廢話不多言,直接擼程式碼,先去maven的官網引入mqtt的第三方支援包。

服務端的程式碼:

package com.example.demo.mqtt.server;  import com.example.demo.mqtt.PushCallback;import org.eclipse.paho.client.mqttv3.*;import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import java.util.ArrayList;import java.util.Arrays;import java.util.List;  /** * Created by pengcheng.du on 2018/10/12.com.example.demo.mqtt.server.Server *//*伺服器端向客戶端推送不同的主題*/public class Server {    public static final String SERVER_URL = "tcp://0.0.0.0:61613";    public static List<String> TOPICList = new ArrayList<>(Arrays.asList("wether","car","topic"));    public static String clientid;     private MqttClient mqttClient;    public MqttTopic mqttTopic1;    private MqttTopic getMqttTopic2;    private String username = "admin";    private String password = "password";     private MqttMessage mqttMessage;     /*//建立新的topic連結的時候使用此構造方法    public Server() throws Exception{        mqttClient = new MqttClient(SERVER_URL, clientid, new MemoryPersistence());        Server server = new Server();        server.mqttMessage = new MqttMessage();        server.mqttMessage.setQos(2);        server.mqttMessage.setRetained(true);        server.mqttMessage.setPayload("裝配成功".getBytes());        server.publish(server.mqttTopic1,server.mqttMessage);        System.out.println(server.mqttMessage.isRetained()+":retained狀態");        connect();    }*/     public Server() throws Exception{        clientid = "wether";        mqttClient = new MqttClient(SERVER_URL, clientid, new MemoryPersistence());        String topic = "wether";        connect(topic);    }    public Server(String topic,String clientid) throws Exception{        TOPICList.add(topic);        mqttClient = new MqttClient(SERVER_URL, clientid, new MemoryPersistence());        this.createCon(topic);        //connect(topic);     }     public void connect(String topic){        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();        mqttConnectOptions.setCleanSession(false);        mqttConnectOptions.setUserName(username);        mqttConnectOptions.setPassword(password.toCharArray());         mqttConnectOptions.setConnectionTimeout(20);        mqttConnectOptions.setKeepAliveInterval(20);         try {            mqttClient.setCallback(new PushCallback());            mqttClient.connect(mqttConnectOptions);            mqttTopic1 = mqttClient.getTopic(topic);        } catch (MqttException e) {            e.printStackTrace();        }    }    public static void publish(MqttTopic topic,MqttMessage message) throws MqttException {        System.out.println("話題是"+topic.toString()+"要傳送的訊息是"+message.toString());        MqttDeliveryToken publish_token = topic.publish(message);        publish_token.waitForCompletion();        System.out.print("訊息已經推送到客戶端了");    }    public static void main(String[] args) throws Exception {        Server server = new Server();        server.mqttMessage = new MqttMessage();        server.mqttMessage.setQos(2);        server.mqttMessage.setRetained(true);        server.mqttMessage.setPayload("裝配成功".getBytes());        server.publish(server.mqttTopic1,server.mqttMessage);        System.out.println(server.mqttMessage.isRetained()+":retained狀態");    }    public String createCon(String topic) throws Exception{        Server server = new Server();        server.mqttMessage = new MqttMessage();        server.mqttMessage.setQos(2);        server.mqttMessage.setRetained(true);        server.mqttMessage.setPayload("裝配成功".getBytes());        server.publish(server.getMqttClient().getTopic(topic),server.mqttMessage);        System.out.println(server.mqttMessage.isRetained()+":retained狀態");        return "build success";    }     public static String getServerUrl() {        return SERVER_URL;    }     public List<String> getTOPIC() {        return TOPICList;    }     public static String getClientid() {        return clientid;    }     public MqttClient getMqttClient() {        return mqttClient;    }     public MqttTopic getMqttTopic1() {        return mqttTopic1;    }     public MqttTopic getGetMqttTopic2() {        return getMqttTopic2;    }     public String getUsername() {        return username;    }     public String getPassword() {        return password;    }     public MqttMessage getMqttMessage() {        return mqttMessage;    }}

Client端:

package com.example.demo.mqtt.client;import com.example.demo.mqtt.PushCallback;import org.eclipse.paho.client.mqttv3.MqttClient;import org.eclipse.paho.client.mqttv3.MqttConnectOptions;import org.eclipse.paho.client.mqttv3.MqttTopic;import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;import java.util.concurrent.ScheduledExecutorService;/*** Created by pengcheng.du on 2018/10/12.*/public class Client {public static final String SERVER_URL = "tcp://0.0.0.0:61613";public static final String TOPIC = "wether";public static final String clientid = "client4";private MqttClient client;private MqttConnectOptions options;private String userName = "admin";private String passWord = "password";private ScheduledExecutorService scheduler;private void start() throws Exception{client = new MqttClient(SERVER_URL, clientid, new MemoryPersistence());options = new MqttConnectOptions();options.setCleanSession(true);options.setUserName(userName);options.setPassword(passWord.toCharArray());options.setConnectionTimeout(20);options.setKeepAliveInterval(20);client.setCallback(new PushCallback());MqttTopic topic = client.getTopic(TOPIC);client.connect(options);int[] Qos = {1};String[] topic1 = {TOPIC};client.subscribe(topic1,Qos);}public static void main (String[] args) throws Exception{Client client = new Client();client.start();}}

傳送斷開回調 PushCallback.java

package com.example.demo.mqtt; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;import org.eclipse.paho.client.mqttv3.MqttCallback;import org.eclipse.paho.client.mqttv3.MqttMessage; /** * Created by pengcheng.du on 2018/10/12. */public class PushCallback implements MqttCallback {    @Override    //處理連結斷開    public void connectionLost(Throwable throwable) {        // 連線丟失後,一般在這裡面進行重連        System.out.println("連線斷開,可以做重連");    }     @Override    //處理訊息送達    public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {        System.out.println("接收訊息主題 : " + s);        System.out.println("接收訊息Qos : " + mqttMessage.getQos());        System.out.println("接收訊息內容 : " + new String(mqttMessage.getPayload()));     }     @Override    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {    }}

開放開放兩個介面,作為觸發點,亦可以使用其他方式:

package com.example.demo.controller; import com.alibaba.fastjson.JSON;import com.alibaba.fastjson.JSONObject;import com.example.demo.mqtt.client.Client;import com.example.demo.mqtt.server.Server;import org.eclipse.paho.client.mqttv3.MqttClient;import org.eclipse.paho.client.mqttv3.MqttException;import org.eclipse.paho.client.mqttv3.MqttMessage;import org.eclipse.paho.client.mqttv3.MqttTopic;import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.*; import java.util.Date;import java.util.HashMap;import java.util.List;import java.util.Map; /** * Created by pengcheng.du on 2018/10/12. */@RestControllerpublic class PublishMessage {    private static String HOST = "tcp://0.0.0.0:61613";    public static MqttClient client;     public static final String SERVER_URL = "tcp://0.0.0.0:61613";    //推送訊息    @RequestMapping(value = "mqtt/publishMessageOnTopic/{topic}",method = RequestMethod.POST)    public Map<String,Object> publishMessageOnTopic(            @PathVariable(value = "topic") String topic, @RequestBody String message) throws Exception {        Map<String,Object> map = new HashMap<>();        JSONObject jsonObject = (JSONObject) JSONObject.parse(message);        String mess = (String) jsonObject.get("message");        MqttMessage mqttMessage = new MqttMessage();        mqttMessage.setPayload(mess.getBytes());        mqttMessage.setQos(2);        mqttMessage.setRetained(true);        if (this.findIndex(topic) == -1) {            map.put("data","no topic");            return map;        }        int index = this.findIndex(topic);        Server server = new Server();        server.connect(topic);        MqttTopic mqttTopic1 = server.mqttTopic1;        Server.publish(mqttTopic1,mqttMessage);        map.put("data","已傳送");        return map;    }    public int findIndex(String topic) throws Exception{        Server server = new Server();        final List<String> topicList = server.getTOPIC();        int Idnex = 0;        for (String topic1: topicList) {            if (topic.equals(topic1)) {                break;            }            Idnex++;        }        if (Idnex==topicList.size()) {            return -1;        } else {            return Idnex;        }    }    private  MqttClient getMqttClient(String clientId) throws MqttException {        if (client != null && clientId.equals(client.getClientId())) {            return client;        }        return new MqttClient(HOST, clientId);    }    //建立主題    @RequestMapping(value = "mqtt/createNewTopic/{topic}/{clientId}",method = RequestMethod.POST)    public Map<String,Object> createNewTopic(@PathVariable(value = "topic") String topic ,                                             @PathVariable(value = "clientId") String clientId){        System.out.println("topic:"+topic+",clientId"+clientId);        try{            Server server = new Server(topic,clientId);            //server.connect(topic);            MqttTopic topic1 = server.getMqttClient().getTopic("");            System.out.println("topic1:"+topic1);        } catch (Exception e){            System.out.println(e.toString());        }         return null;    }}

執行結果:自上而下是請求觸發,服務端的控制,兩個客戶端的控制檯,均透過tcp協議獲取到了訂閱的話題message。

總結:

MQTT只是一種訊息推送的協議目前(2016/1/13)為V3.1版本,而Apache Apollo是根據這種協議而開發的一款服務性的服務程式,被用來進行訊息推送。Apache Apollo說白了其實很簡單,就是在伺服器端建立一個唯一訂閱號,傳送者可以向這個訂閱號中發東西,然後接受者(即訂閱了這個訂閱號的人)都會收到這個訂閱號發出來的訊息。以此來完成訊息的推送。伺服器其實是一個訊息中轉站。

11
  • BSA-TRITC(10mg/ml) TRITC-BSA 牛血清白蛋白改性標記羅丹明
  • 自學python第四天——python的輸入與輸出