<center id="qkqgy"><optgroup id="qkqgy"></optgroup></center>
  • <menu id="qkqgy"></menu>
    <nav id="qkqgy"></nav>
    <xmp id="qkqgy"><nav id="qkqgy"></nav>
  • <xmp id="qkqgy"><menu id="qkqgy"></menu>
    <menu id="qkqgy"><menu id="qkqgy"></menu></menu>
    <tt id="qkqgy"><tt id="qkqgy"></tt></tt>

  • 一 spingboot整合mqtt

    原理:

    ?二 操作案例

    2.1 工程結構

    ?2.2?配置pom文件
    <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId>
    <version>4.13</version> <scope>test</scope> </dependency> <!-- mqtt -->
    <dependency> <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-integration</artifactId> </dependency>
    <dependency> <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-stream</artifactId> </dependency> <dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId> </dependency> <!-- lombok -->
    <dependency> <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId> <version>1.16.16</version> </dependency> <!--
    springBoot的啟動器 --> <dependency> <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
    <version>2.0.1.RELEASE</version> </dependency> <dependency>
    <groupId>org.springframework</groupId> <artifactId>spring-web</artifactId>
    <version>5.1.5.RELEASE</version> </dependency>
    ?2.3?配置application配置文件
    server: port: 8081 spring: mqtt: username: admin # 賬號 password: public # 密碼
    host-url: tcp://172.16.71.150:1883 # mqtt連接tcp地址 client-id: mq-dky-0813 #
    客戶端Id,每個啟動的id要不同 default-topic: mq-dky-guolu # 默認主題 timeout: 100 # 超時時間
    keepalive: 100
    2.4 讀取配置文件,初始客戶端
    package com.ljf.mqtt.demo.config; import
    com.ljf.mqtt.demo.client.MqttPushClient; import lombok.Getter; import
    lombok.Setter; import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.context.properties.ConfigurationProperties;
    import org.springframework.context.annotation.Bean; import
    org.springframework.stereotype.Component; /** * @ClassName: MqttConfig *
    @Description: TODO * @Author: liujianfu * @Date: 2021/08/16?14:43:39? *
    @Version: V1.0 **/ @Component @ConfigurationProperties("spring.mqtt") @Setter
    @Getter public class MqttConfig { @Autowired private MqttPushClient
    mqttPushClient; /** * 用戶名 */ private String username; /** * 密碼 */ private
    String password; /** * 連接地址 */ private String hostUrl; /** * 客戶Id */ private
    String clientId; /** * 默認連接話題 */ private String defaultTopic; /** * 超時時間 */
    private int timeout; /** * 保持連接數 */ private int keepalive; @Bean public
    MqttPushClient getMqttPushClient() { mqttPushClient.connect(hostUrl, clientId,
    username, password, timeout, keepalive); // 以/#結尾表示訂閱所有以test開頭的主題
    mqttPushClient.subscribe(defaultTopic, 0); return mqttPushClient; } }
    2.4 訂閱推送客戶端
    package com.ljf.mqtt.demo.client; import
    com.ljf.mqtt.demo.listener.PushCallback; import
    org.eclipse.paho.client.mqttv3.*; import
    org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import
    org.slf4j.Logger; import org.slf4j.LoggerFactory; import
    org.springframework.beans.factory.annotation.Autowired; import
    org.springframework.stereotype.Component; /** * @ClassName: MqttPushClient *
    @Description: TODO * @Author: liujianfu * @Date: 2021/08/16?14:48:38? *
    @Version: V1.0 **/ @Component public class MqttPushClient { private static
    final Logger logger = LoggerFactory.getLogger(MqttPushClient.class); @Autowired
    private PushCallback pushCallback; private static MqttClient client; private
    static MqttClient getClient() { return client; } private static void
    setClient(MqttClient client) { MqttPushClient.client = client; } /** * 客戶端連接 *
    * @param host ip+端口 * @param clientID 客戶端Id * @param username 用戶名 * @param
    password 密碼 * @param timeout 超時時間 * @param keepalive 保留數 */ public void
    connect(String host, String clientID, String username, String password, int
    timeout, int keepalive) { MqttClient client; try { client = new
    MqttClient(host, clientID, new MemoryPersistence()); MqttConnectOptions options
    = new MqttConnectOptions(); options.setCleanSession(true);
    options.setUserName(username); options.setPassword(password.toCharArray());
    options.setConnectionTimeout(timeout); options.setKeepAliveInterval(keepalive);
    MqttPushClient.setClient(client); try { client.setCallback(pushCallback);
    client.connect(options); } catch (Exception e) { e.printStackTrace(); } } catch
    (Exception e) { e.printStackTrace(); } } /** * 發布 * * @param qos 連接方式 * @param
    retained 是否保留 * @param topic 主題 * @param pushMessage 消息體 */ public void
    publish(int qos, boolean retained, String topic, String pushMessage) {
    MqttMessage message = new MqttMessage(); message.setQos(qos);
    message.setRetained(retained); message.setPayload(pushMessage.getBytes());
    MqttTopic mTopic = MqttPushClient.getClient().getTopic(topic); if (null ==
    mTopic) { logger.error("topic not exist"); } MqttDeliveryToken token; try {
    token = mTopic.publish(message); token.waitForCompletion(); } catch
    (MqttPersistenceException e) { e.printStackTrace(); } catch (MqttException e) {
    e.printStackTrace(); } } /** * 訂閱某個主題 * * @param topic 主題 * @param qos 連接方式 */
    public void subscribe(String topic, int qos) {
    logger.info("==============開始訂閱主題=========" + topic); try {
    MqttPushClient.getClient().subscribe(topic, qos); } catch (MqttException e) {
    e.printStackTrace(); } } }
    2.5 定制監聽訂閱者
    package com.ljf.mqtt.demo.listener; import
    com.ljf.mqtt.demo.config.MqttConfig; import
    org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import
    org.eclipse.paho.client.mqttv3.MqttCallback; import
    org.eclipse.paho.client.mqttv3.MqttClient; import
    org.eclipse.paho.client.mqttv3.MqttMessage; import org.slf4j.Logger; import
    org.slf4j.LoggerFactory; import
    org.springframework.beans.factory.annotation.Autowired; import
    org.springframework.stereotype.Component; /** * @ClassName: PushCallback *
    @Description: TODO * @Author: liujianfu * @Date: 2021/08/16?14:52:20? *
    @Version: V1.0 **/ @Component public class PushCallback implements MqttCallback
    { private static final Logger logger =
    LoggerFactory.getLogger(PushCallback.class); @Autowired private MqttConfig
    mqttConfig; private static MqttClient client; @Override public void
    connectionLost(Throwable throwable) { // 連接丟失后,一般在這里面進行重連
    logger.info("連接斷開,可以做重連"); if (client == null || !client.isConnected()) {
    mqttConfig.getMqttPushClient(); } } @Override public void messageArrived(String
    topic, MqttMessage mqttMessage) throws Exception { // subscribe后得到的消息會執行到這里面
    logger.info("接收消息主題 : " + topic); logger.info("接收消息Qos : " +
    mqttMessage.getQos()); logger.info("接收消息內容 : " + new
    String(mqttMessage.getPayload())); } @Override public void
    deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
    logger.info("deliveryComplete---------" + iMqttDeliveryToken.isComplete()); } }
    2.6?發布數據
    package com.ljf.mqtt.demo.controller; import
    com.ljf.mqtt.demo.client.MqttPushClient; import com.ljf.mqtt.demo.utils.R;
    import org.springframework.beans.factory.annotation.Autowired; import
    org.springframework.web.bind.annotation.GetMapping; import
    org.springframework.web.bind.annotation.RequestMapping; import
    org.springframework.web.bind.annotation.RestController; /** * @ClassName:
    PullController * @Description: TODO * @Author: liujianfu * @Date:
    2021/08/16?14:56:18? * @Version: V1.0 **/ @RestController @RequestMapping("/")
    public class PullController { @Autowired private MqttPushClient mqttPushClient;
    /** * @author liujianfu * @description 測試發布主題 * @date 2021/8/16 15:04 * @param
    [] * @return RUtils */ @GetMapping(value = "/publishTopic") public R
    publishTopic(String sendMessage) { System.out.println("message:"+sendMessage);
    sendMessage=sendMessage+" : {\"name\":\"ljf\",\"age\":345}";
    mqttPushClient.publish(0,false,"mq-dky-guolu",sendMessage); return R.ok("OK");
    } }
    2.7?發布數據

    1.發布數據:

    ?2.訂閱消費數據

    ?3.emqx頁面

    4.在頁面進行模擬

    連接

    訂閱

    ?推送:

    java代碼客戶端:

    技術
    下載桌面版
    GitHub
    百度網盤(提取碼:draw)
    Gitee
    云服務器優惠
    阿里云優惠券
    騰訊云優惠券
    華為云優惠券
    站點信息
    問題反饋
    郵箱:ixiaoyang8@qq.com
    QQ群:766591547
    關注微信
    巨胸美乳无码人妻视频