Recently, we developed a kind of Internet of things related equipment, contacted with mqtt protocol, and found a lot of information on the Internet, but it is basically "half covered with Pipa", and there are always some missing contents. It may be that the authors don't think it's necessary to say something that is too basic, but it's not good for beginners like me.
Fortunately, after a lot of ups and downs, I finally adjusted one, which is recorded here to provide some reference for Xiaobai like me.
First of all, for communication based on mqqt protocol, you need to have a server, which I don't know...... (Pit 1)
The basic principle of mqtt service is that after you connect with the server, you can subscribe to n "topics", such as "hello", "test", etc. Then when Zhang San needs to send a message to the server, he will tell the server what the message is about, such as "hello". After receiving Zhang San's "hello" message, the server will check who has subscribed to the "hello" message, and then forward the message to those subscribers to complete the communication.
I use emqx as the mqtt server software, of course, there are other open-source ones, which is easy to find, not to mention.
emqx I put it on the newly purchased instance of alicloud. After buying a new instance, they need to change the password and restart it before they can connect to the instance, which they did not prompt (pit 2).
Then, after installing and running emqx, I found that the remote port was not opened, and I learned the security settings. The port listening to 0.0.0.0 address is the public network port (pit 3).
After the server software is installed, use the communication cat or paho to test it. It can connect normally and send the topic remotely (it must be tested first).
Finally, spring boot integrates mqtt as the client part.
Start with the original code: Download at ease, do not need C coins
1. The package required for the introduction of pox.xml is mainly the last three dependencies
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.2.5.RELEASE</version> <relativePath /> <!-- lookup parent from repository --> </parent> <groupId>com.bam</groupId> <artifactId>mqtt</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>war</packaging> <name>MQTT-1</name> <description>Demo project for Shiro</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-tomcat</artifactId> <scope>provided</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-freemarker</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <scope>runtime</scope> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-stream</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <scope>provided</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-configuration-processor</artifactId> <optional>true</optional> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
2. Then application.yml
mqtt: username: admin password: public host-url: tcp://123.123.123.123:1883 # The address and port of your own server need to be changed clientID: test # This change is not optional, but different clients must not be the same default-topic: test # Default theme timeout: 100 keepalive: 100
3. Read configuration file
package com.bam; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import lombok.Getter; import lombok.Setter; /** * @Classname MqttConfig * @Description mqtt Related configuration information * @Date 2020/3/5 11:00 * @Created by bam */ @Component @ConfigurationProperties("mqtt") @Setter @Getter public class MqttConfig { @Autowired private MqttPushClient mqttPushClient; /** * User name */ // @Value("username") private String username; /** * Password */ private String password; /** * Connection address */ private String hostUrl; /** * Customer Id */ private String clientID; /** * Default connection topic */ private String defaultTopic; /** * Timeout time */ private int timeout; /** * Keep connected */ private int keepalive; @Bean public MqttPushClient getMqttPushClient() { System.out.println("hostUrl: "+ hostUrl); System.out.println("clientID: "+ clientID); System.out.println("username: "+ username); System.out.println("password: "+ password); System.out.println("timeout: "+timeout); System.out.println("keepalive: "+ keepalive); mqttPushClient.connect(hostUrl, clientID, username, password, timeout, keepalive); // End with / / to subscribe to all topics starting with test mqttPushClient.subscribe(defaultTopic, 0); return mqttPushClient; } }
4. MqttPushClient publishing message class
package com.bam; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; /** * * @author bam * 2020 March 5th 2013 * MqttPushClient.java * */ @Component public class MqttPushClient { private static final Logger logger = LoggerFactory.getLogger(MqttPushClient.class); @Autowired private PushCallback pushCallback; private static MqttClient client; private static MqttClient getClient() { return client; } private static void setClient(MqttClient client) { MqttPushClient.client = client; } /** * Client connection * * @param host ip+port * @param clientID Client Id * @param username User name * @param password Password * @param timeout Timeout time * @param keepalive Retention number */ public void connect(String host, String clientID, String username, String password, int timeout, int keepalive) { MqttClient client; try { client = new MqttClient(host, clientID, new MemoryPersistence()); MqttConnectOptions options = new MqttConnectOptions(); options.setCleanSession(true); options.setUserName(username); options.setPassword(password.toCharArray()); options.setConnectionTimeout(timeout); options.setKeepAliveInterval(keepalive); MqttPushClient.setClient(client); try { client.setCallback(pushCallback); client.connect(options); } catch (Exception e) { e.printStackTrace(); } } catch (Exception e) { e.printStackTrace(); } } /** * Release * * @param qos Connection mode * @param retained Whether to retain * @param topic theme * @param pushMessage Message body */ public void publish(int qos, boolean retained, String topic, String pushMessage) { MqttMessage message = new MqttMessage(); message.setQos(qos); message.setRetained(retained); message.setPayload(pushMessage.getBytes()); MqttTopic mTopic = MqttPushClient.getClient().getTopic(topic); if (null == mTopic) { logger.error("topic not exist"); } MqttDeliveryToken token; try { token = mTopic.publish(message); token.waitForCompletion(); } catch (MqttPersistenceException e) { e.printStackTrace(); } catch (MqttException e) { e.printStackTrace(); } } /** * Subscribe to a topic * * @param topic theme * @param qos Connection mode */ public void subscribe(String topic, int qos) { logger.info("Start subscribing to topics" + topic); try { MqttPushClient.getClient().subscribe(topic, qos); } catch (MqttException e) { e.printStackTrace(); } } }
5. Handling of received messages
package com.bam; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * @Classname PushCallback * @Description Consumer monitoring * @Date 2019/4/11 23:31 * @Created by Jack */ @Component public class PushCallback implements MqttCallback { private static final Logger logger = LoggerFactory.getLogger(MqttPushClient.class); @Autowired private MqttConfig mqttConfig; private static MqttClient client; @Override public void connectionLost(Throwable throwable) { // After the connection is lost, it is usually reconnected here logger.info("Disconnected, can be reconnected"); if (null != client) { mqttConfig.getMqttPushClient(); } } @Override public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { // The message you get after you subscribe will be executed here logger.info("Receive message subject : " + topic); logger.info("receive messages Qos : " + mqttMessage.getQos()); logger.info("Receive message content : " + new String(mqttMessage.getPayload())); } @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { logger.info("deliveryComplete---------" + iMqttDeliveryToken.isComplete()); } }
6. Transmission method test
package com.bam; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.web.bind.annotation.*; import javax.validation.Valid; /** * * @author bam * 2020 March 5th 2013 * TestController.java * */ @RestController @RequestMapping("/") public class TestController { @Autowired private MqttPushClient mqttPushClient; @GetMapping(value = "/publishTopic") public String publishTopic() { String topicString = "test"; mqttPushClient.publish(0, false, topicString, "Test posting"); return "ok"; } // Send custom message content (using default theme) @RequestMapping("/publishTopic/{data}") public void test1(@PathVariable("data") String data) { String topicString = "test"; mqttPushClient.publish(0,false,topicString, data); return "ok"; } // Send custom message content and specify subject @RequestMapping("/publishTopic/{topic}/{data}") public void test2(@PathVariable("topic") String topic, @PathVariable("data") String data) { mqttPushClient.publish(0,false,topic, data); return "ok"; } }
- Upper figure