前言
Apache Apollo是一個代理伺服器,其是在ActiveMQ基礎上發展而來的,可以支援STOMP, AMQP, MQTT, Openwire, SSL, and WebSockets 等多種協議。
正文廢話不多言,直接擼程式碼,先去maven的官網引入mqtt的第三方支援包。
服務端的程式碼:
package com.example.demo.mqtt.server; import com.example.demo.mqtt.PushCallback;import org.eclipse.paho.client.mqttv3.*;import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import java.util.ArrayList;import java.util.Arrays;import java.util.List; /** * Created by pengcheng.du on 2018/10/12.com.example.demo.mqtt.server.Server *//*伺服器端向客戶端推送不同的主題*/public class Server { public static final String SERVER_URL = "tcp://0.0.0.0:61613"; public static List<String> TOPICList = new ArrayList<>(Arrays.asList("wether","car","topic")); public static String clientid; private MqttClient mqttClient; public MqttTopic mqttTopic1; private MqttTopic getMqttTopic2; private String username = "admin"; private String password = "password"; private MqttMessage mqttMessage; /*//建立新的topic連結的時候使用此構造方法 public Server() throws Exception{ mqttClient = new MqttClient(SERVER_URL, clientid, new MemoryPersistence()); Server server = new Server(); server.mqttMessage = new MqttMessage(); server.mqttMessage.setQos(2); server.mqttMessage.setRetained(true); server.mqttMessage.setPayload("裝配成功".getBytes()); server.publish(server.mqttTopic1,server.mqttMessage); System.out.println(server.mqttMessage.isRetained()+":retained狀態"); connect(); }*/ public Server() throws Exception{ clientid = "wether"; mqttClient = new MqttClient(SERVER_URL, clientid, new MemoryPersistence()); String topic = "wether"; connect(topic); } public Server(String topic,String clientid) throws Exception{ TOPICList.add(topic); mqttClient = new MqttClient(SERVER_URL, clientid, new MemoryPersistence()); this.createCon(topic); //connect(topic); } public void connect(String topic){ MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); mqttConnectOptions.setCleanSession(false); mqttConnectOptions.setUserName(username); mqttConnectOptions.setPassword(password.toCharArray()); mqttConnectOptions.setConnectionTimeout(20); mqttConnectOptions.setKeepAliveInterval(20); try { mqttClient.setCallback(new PushCallback()); mqttClient.connect(mqttConnectOptions); mqttTopic1 = mqttClient.getTopic(topic); } catch (MqttException e) { e.printStackTrace(); } } public static void publish(MqttTopic topic,MqttMessage message) throws MqttException { System.out.println("話題是"+topic.toString()+"要傳送的訊息是"+message.toString()); MqttDeliveryToken publish_token = topic.publish(message); publish_token.waitForCompletion(); System.out.print("訊息已經推送到客戶端了"); } public static void main(String[] args) throws Exception { Server server = new Server(); server.mqttMessage = new MqttMessage(); server.mqttMessage.setQos(2); server.mqttMessage.setRetained(true); server.mqttMessage.setPayload("裝配成功".getBytes()); server.publish(server.mqttTopic1,server.mqttMessage); System.out.println(server.mqttMessage.isRetained()+":retained狀態"); } public String createCon(String topic) throws Exception{ Server server = new Server(); server.mqttMessage = new MqttMessage(); server.mqttMessage.setQos(2); server.mqttMessage.setRetained(true); server.mqttMessage.setPayload("裝配成功".getBytes()); server.publish(server.getMqttClient().getTopic(topic),server.mqttMessage); System.out.println(server.mqttMessage.isRetained()+":retained狀態"); return "build success"; } public static String getServerUrl() { return SERVER_URL; } public List<String> getTOPIC() { return TOPICList; } public static String getClientid() { return clientid; } public MqttClient getMqttClient() { return mqttClient; } public MqttTopic getMqttTopic1() { return mqttTopic1; } public MqttTopic getGetMqttTopic2() { return getMqttTopic2; } public String getUsername() { return username; } public String getPassword() { return password; } public MqttMessage getMqttMessage() { return mqttMessage; }}
Client端:
package com.example.demo.mqtt.client;import com.example.demo.mqtt.PushCallback;import org.eclipse.paho.client.mqttv3.MqttClient;import org.eclipse.paho.client.mqttv3.MqttConnectOptions;import org.eclipse.paho.client.mqttv3.MqttTopic;import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;import java.util.concurrent.ScheduledExecutorService;/*** Created by pengcheng.du on 2018/10/12.*/public class Client {public static final String SERVER_URL = "tcp://0.0.0.0:61613";public static final String TOPIC = "wether";public static final String clientid = "client4";private MqttClient client;private MqttConnectOptions options;private String userName = "admin";private String passWord = "password";private ScheduledExecutorService scheduler;private void start() throws Exception{client = new MqttClient(SERVER_URL, clientid, new MemoryPersistence());options = new MqttConnectOptions();options.setCleanSession(true);options.setUserName(userName);options.setPassword(passWord.toCharArray());options.setConnectionTimeout(20);options.setKeepAliveInterval(20);client.setCallback(new PushCallback());MqttTopic topic = client.getTopic(TOPIC);client.connect(options);int[] Qos = {1};String[] topic1 = {TOPIC};client.subscribe(topic1,Qos);}public static void main (String[] args) throws Exception{Client client = new Client();client.start();}}
傳送斷開回調 PushCallback.java
package com.example.demo.mqtt; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;import org.eclipse.paho.client.mqttv3.MqttCallback;import org.eclipse.paho.client.mqttv3.MqttMessage; /** * Created by pengcheng.du on 2018/10/12. */public class PushCallback implements MqttCallback { @Override //處理連結斷開 public void connectionLost(Throwable throwable) { // 連線丟失後,一般在這裡面進行重連 System.out.println("連線斷開,可以做重連"); } @Override //處理訊息送達 public void messageArrived(String s, MqttMessage mqttMessage) throws Exception { System.out.println("接收訊息主題 : " + s); System.out.println("接收訊息Qos : " + mqttMessage.getQos()); System.out.println("接收訊息內容 : " + new String(mqttMessage.getPayload())); } @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { }}
開放開放兩個介面,作為觸發點,亦可以使用其他方式:
package com.example.demo.controller; import com.alibaba.fastjson.JSON;import com.alibaba.fastjson.JSONObject;import com.example.demo.mqtt.client.Client;import com.example.demo.mqtt.server.Server;import org.eclipse.paho.client.mqttv3.MqttClient;import org.eclipse.paho.client.mqttv3.MqttException;import org.eclipse.paho.client.mqttv3.MqttMessage;import org.eclipse.paho.client.mqttv3.MqttTopic;import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.*; import java.util.Date;import java.util.HashMap;import java.util.List;import java.util.Map; /** * Created by pengcheng.du on 2018/10/12. */@RestControllerpublic class PublishMessage { private static String HOST = "tcp://0.0.0.0:61613"; public static MqttClient client; public static final String SERVER_URL = "tcp://0.0.0.0:61613"; //推送訊息 @RequestMapping(value = "mqtt/publishMessageOnTopic/{topic}",method = RequestMethod.POST) public Map<String,Object> publishMessageOnTopic( @PathVariable(value = "topic") String topic, @RequestBody String message) throws Exception { Map<String,Object> map = new HashMap<>(); JSONObject jsonObject = (JSONObject) JSONObject.parse(message); String mess = (String) jsonObject.get("message"); MqttMessage mqttMessage = new MqttMessage(); mqttMessage.setPayload(mess.getBytes()); mqttMessage.setQos(2); mqttMessage.setRetained(true); if (this.findIndex(topic) == -1) { map.put("data","no topic"); return map; } int index = this.findIndex(topic); Server server = new Server(); server.connect(topic); MqttTopic mqttTopic1 = server.mqttTopic1; Server.publish(mqttTopic1,mqttMessage); map.put("data","已傳送"); return map; } public int findIndex(String topic) throws Exception{ Server server = new Server(); final List<String> topicList = server.getTOPIC(); int Idnex = 0; for (String topic1: topicList) { if (topic.equals(topic1)) { break; } Idnex++; } if (Idnex==topicList.size()) { return -1; } else { return Idnex; } } private MqttClient getMqttClient(String clientId) throws MqttException { if (client != null && clientId.equals(client.getClientId())) { return client; } return new MqttClient(HOST, clientId); } //建立主題 @RequestMapping(value = "mqtt/createNewTopic/{topic}/{clientId}",method = RequestMethod.POST) public Map<String,Object> createNewTopic(@PathVariable(value = "topic") String topic , @PathVariable(value = "clientId") String clientId){ System.out.println("topic:"+topic+",clientId"+clientId); try{ Server server = new Server(topic,clientId); //server.connect(topic); MqttTopic topic1 = server.getMqttClient().getTopic(""); System.out.println("topic1:"+topic1); } catch (Exception e){ System.out.println(e.toString()); } return null; }}
執行結果:自上而下是請求觸發,服務端的控制,兩個客戶端的控制檯,均透過tcp協議獲取到了訂閱的話題message。
總結:MQTT只是一種訊息推送的協議目前(2016/1/13)為V3.1版本,而Apache Apollo是根據這種協議而開發的一款服務性的服務程式,被用來進行訊息推送。Apache Apollo說白了其實很簡單,就是在伺服器端建立一個唯一訂閱號,傳送者可以向這個訂閱號中發東西,然後接受者(即訂閱了這個訂閱號的人)都會收到這個訂閱號發出來的訊息。以此來完成訊息的推送。伺服器其實是一個訊息中轉站。
最新評論