姥恫 发表于 2025-11-18 17:10:00

Springboo下的MQTT多broker实现

参考项目:GitHub - mqtt-spring-boot-starter
背景说明:
和原作者一样,也是身处IOT物联网公司,身不由己,哈哈
实现功能:
1、多broker连接,这是原作者造好的轮子
2、指定信息发布,源码上有些问题,后面做了修复处理,已经可以正常使用了。
并且由于业务场景不一样,将原作者的点对点发送,修改为订阅|发布模式。
具体位置位于 MqttClientConfiguration文件的initializeMqttClient方法中的对应【入站通道】
3、ssl连接,目前没用上,先删除了
业务代码:
package com.smart.common.mqtt;


import org.springframework.stereotype.Component;

import java.lang.annotation.*;

/**
* <p>
* MqttClient
* </p >
*
* @author TL
* @version 1.0.0
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Component
public @interface MqttClient {
    /**
   * MQTT客户端ID
   */
    String value() default "";

    /**
   * 客户端名称,用于配置中引用
   */
    String name();
}@MqttClientpackage com.smart.common.mqtt;


import org.springframework.core.annotation.AliasFor;

import java.lang.annotation.*;

/**
* <p>
* MqttSubscribe
* </p >
*
* @author TL
* @version 1.0.0
*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface MqttSubscribe {

    /**
   * 订阅的主题
   */
    @AliasFor("topic")
    String value() default "";

    /**
   * 订阅的主题
   */
    @AliasFor("value")
    String topic() default "";

    /**
   * QoS质量等级:0, 1, 2
   */
    int qos() default 1;

    /**
   * 客户端名称,用于指定哪个MQTT客户端处理该订阅
   */
    String client() default "default";
}@MqttSubscribepackage com.smart.common.mqtt;


import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.NestedConfigurationProperty;

import java.util.HashMap;
import java.util.Map;

/**
* <p>
* MqttProperties
* </p >
*
* @author TL
* @version 1.0.0
*/
@Data
@ConfigurationProperties(prefix = "mqtt")
public class MqttProperties {
    /**
   * 是否启用MQTT
   */
    private boolean enabled = true;

    /**
   * 线程池配置
   */
    @NestedConfigurationProperty
    private MqttSchedulerConfig threadPool = new MqttSchedulerConfig();

    /**
   * 默认客户端配置
   */
    @NestedConfigurationProperty
    private ClientConfig defaultClient = new ClientConfig();

    /**
   * 多客户端配置,key为客户端名称
   */
    private Map<String, ClientConfig> clients = new HashMap<>();

    @Data
    public static class ClientConfig {
      /**
         * MQTT服务器地址,例如:tcp://localhost:1883或ssl://localhost:8883
         */
      private String serverUri = "tcp://localhost:1883";

      /**
         * 客户端ID
         */
      private String clientId = "mqtt-client-" + System.currentTimeMillis();

      /**
         * 用户名
         */
      private String username;

      /**
         * 密码
         */
      private String password;

      /**
         * 清除会话
         */
      private boolean cleanSession = true;

      /**
         * 连接超时时间(秒)
         */
      private int connectionTimeout = 30;

      /**
         * 保持连接心跳时间(秒)
         */
      private int keepAliveInterval = 60;

      /**
         * 是否自动重连
         */
      private boolean automaticReconnect = true;

      /**
         * 默认的QoS级别
         */
      private int defaultQos = 1;

      /**
         * 默认主题
         */
      private String defaultTopic;

    }
}MqttPropertiespackage com.smart.common.mqtt;

import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;

/**
* <p>
* MqttAutoConfiguration
* </p >
*
* @author TL
* @version 1.0.0
*/
@Configuration
@EnableConfigurationProperties(MqttProperties.class)
@ConditionalOnProperty(prefix = "mqtt", name = "enabled", havingValue = "true", matchIfMissing = true)
public class MqttAutoConfiguration {

    @Bean
    @ConditionalOnMissingBean
    public MqttClientFactory mqttClientFactory() {
      return new MqttPahoClientFactoryImpl();
    }
/**
* 配置MQTT消息处理器Bean
* 当Spring容器中不存在该Bean时才会创建
*
* @return MqttMessageHandler 返回一个默认的MQTT消息处理器实例
*/

    @Bean    // 条注解:当Spring容器中不存在相同类型的Bean时,才会创建这个Bean
    @ConditionalOnMissingBean
    public MqttMessageHandler mqttMessageHandler() {    // 创建并返回一个默认的MQTT消息处理器实例
      return new DefaultMqttMessageHandler();
    }

    @Bean
    public MqttClientConfiguration mqttClientConfiguration(MqttProperties mqttProperties,
                                                         MqttClientFactory mqttClientFactory,
                                                         MqttMessageHandler mqttMessageHandler,
                                                         ThreadPoolTaskScheduler mqttTaskScheduler) { // 注入调度器
      return new MqttClientConfiguration(mqttProperties, mqttClientFactory, mqttMessageHandler);
    }
    @Bean
    public MqttTemplate mqttTemplate(MqttClientConfiguration mqttClientConfiguration) {
      return new MqttTemplate(
                mqttClientConfiguration.getOutboundHandlers(),
                mqttClientConfiguration.getClientConfigs());
    }
}MqttAutoConfigurationpackage com.smart.common.mqtt;


import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.aop.framework.AopProxyUtils;
import org.springframework.aop.support.AopUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.integration.channel.AbstractSubscribableChannel;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.PublishSubscribeChannel;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.util.Assert;
import org.springframework.util.ReflectionUtils;

import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* <p>
* MqttClientConfiguration
* </p >
*
* @author TL
* @version 1.0.0
*/
@Slf4j
public class MqttClientConfiguration implements BeanPostProcessor,
      ApplicationListener<ContextRefreshedEvent>,
      DisposableBean,
      BeanFactoryAware {

    private final MqttProperties mqttProperties;
    private final MqttClientFactory mqttClientFactory;
    private final MqttMessageHandler defaultMqttMessageHandler;

    @Autowired
    private ThreadPoolTaskScheduler mqttTaskScheduler;

    // 存储客户端工厂
    private final Map<String, MqttPahoClientFactory> clientFactories = new ConcurrentHashMap<>();

    // 存储MQTT出站处理器
    @Getter
    private final Map<String, MqttPahoMessageHandler> outboundHandlers = new ConcurrentHashMap<>();

    // 存储MQTT入站适配器
    private final Map<String, MqttPahoMessageDrivenChannelAdapter> inboundAdapters = new ConcurrentHashMap<>();

    // 存储消息通道
    private final Map<String, AbstractSubscribableChannel> channels = new ConcurrentHashMap<>();

    // 存储订阅信息
    private final Map<String, List<SubscriptionInfo>> subscriptions = new ConcurrentHashMap<>();

    private boolean initialized = false;

    @Autowired
    public MqttClientConfiguration(MqttProperties mqttProperties,
                                 MqttClientFactory mqttClientFactory,
                                 MqttMessageHandler defaultMqttMessageHandler) {
      this.mqttProperties = mqttProperties;
      this.mqttClientFactory = mqttClientFactory;
      this.defaultMqttMessageHandler = defaultMqttMessageHandler;
    }

    @Override
    public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
    }

    @Override
    public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
      return bean;
    }

    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
      Class<?> targetClass = AopUtils.isAopProxy(bean) ?
                AopProxyUtils.ultimateTargetClass(bean) : bean.getClass();

      // 处理MqttClient注解
      MqttClient mqttClientAnnotation = AnnotationUtils.findAnnotation(targetClass, MqttClient.class);
      if (mqttClientAnnotation != null) {
            log.info("Found MQTT client: {}", mqttClientAnnotation.name());
      }

      // 查找带有MqttSubscribe注解的方法
      ReflectionUtils.doWithMethods(targetClass, method -> {
            MqttSubscribe mqttSubscribe = AnnotationUtils.findAnnotation(method, MqttSubscribe.class);
            if (mqttSubscribe != null) {
                registerSubscription(bean, method, mqttSubscribe);
            }
      });

      return bean;
    }

    private void registerSubscription(Object bean, Method method, MqttSubscribe mqttSubscribe) {
      String topic = mqttSubscribe.topic().isEmpty() ? mqttSubscribe.value() : mqttSubscribe.topic();
      Assert.hasText(topic, "Topic must be specified in @MqttSubscribe annotation");

      String clientName = mqttSubscribe.client();
      int qos = mqttSubscribe.qos();

      log.info("Registering MQTT subscription: topic={}, qos={}, client={}, method={}.{}",
                topic, qos, clientName, bean.getClass().getSimpleName(), method.getName());

      // 将订阅信息存储起来,等待context刷新后统一处理
      SubscriptionInfo subscriptionInfo = new SubscriptionInfo(bean, method, topic, qos);
      subscriptions.computeIfAbsent(clientName, k -> new ArrayList<>()).add(subscriptionInfo);
    }

    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
      if (initialized || !mqttProperties.isEnabled()) {
            return;
      }

      try {
            // 初始化所有MQTT客户端
            initializeMqttClients();

            // 处理所有订阅
            processSubscriptions();

            initialized = true;
            log.info("MQTT clients initialized successfully");
      } catch (Exception e) {
            log.error("Failed to initialize MQTT clients", e);
            throw new RuntimeException("Failed to initialize MQTT clients", e);
      }
    }

    private void initializeMqttClients() throws Exception {
      // 初始化默认客户端
      initializeMqttClient("default", mqttProperties.getDefaultClient());

      // 初始化其他客户端
      for (Map.Entry<String, MqttProperties.ClientConfig> entry : mqttProperties.getClients().entrySet()) {
            initializeMqttClient(entry.getKey(), entry.getValue());
      }
    }

    private void initializeMqttClient(String clientName, MqttProperties.ClientConfig config) throws Exception {
      // 创建MQTT客户端工厂
      MqttPahoClientFactory clientFactory = mqttClientFactory.createClientFactory(config);
      clientFactories.put(clientName, clientFactory);

      // 创建入站通道
      PublishSubscribeChannel inboundChannel = new PublishSubscribeChannel();
      channels.put(clientName + "-inbound", inboundChannel);

      // 创建出站通道
      DirectChannel outboundChannel = new DirectChannel();
      channels.put(clientName + "-outbound", outboundChannel);

      // 创建出站处理器
      MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(
                config.getClientId() + "-outbound", clientFactory);
      messageHandler.setAsync(true);
      if (config.getDefaultTopic() != null) {
            messageHandler.setDefaultTopic(config.getDefaultTopic());
      }
      messageHandler.setDefaultQos(config.getDefaultQos());
      messageHandler.setConverter(new DefaultPahoMessageConverter());
      outboundHandlers.put(clientName, messageHandler);

      log.debug("Initialized MQTT client: {}", clientName);
    }

    private void processSubscriptions() {
      // 为每个客户端创建订阅适配器
      for (Map.Entry<String, List<SubscriptionInfo>> entry : subscriptions.entrySet()) {
            String clientName = entry.getKey();
            List<SubscriptionInfo> clientSubscriptions = entry.getValue();

            if (clientSubscriptions.isEmpty()) {
                continue;
            }

            // 获取客户端配置
            MqttProperties.ClientConfig config = clientName.equals("default") ?
                  mqttProperties.getDefaultClient() : mqttProperties.getClients().get(clientName);

            if (config == null) {
                log.warn("No configuration found for MQTT client: {}, skipping subscriptions", clientName);
                continue;
            }

            // 获取客户端工厂
            MqttPahoClientFactory clientFactory = clientFactories.get(clientName);
            if (clientFactory == null) {
                log.warn("No factory found for MQTT client: {}, skipping subscriptions", clientName);
                continue;
            }

            // 获取入站通道
            AbstractSubscribableChannel inboundChannel = channels.get(clientName + "-inbound");

            // 创建入站适配器
            String[] topics = clientSubscriptions.stream()
                  .map(SubscriptionInfo::getTopic)
                  .distinct()
                  .toArray(String[]::new);

            int[] qos = clientSubscriptions.stream()
                  .map(SubscriptionInfo::getQos)
                  .mapToInt(Integer::intValue)
                  .toArray();

            MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
                  config.getClientId() + "-inbound", clientFactory, topics);
            adapter.setQos(qos);
            adapter.setConverter(new DefaultPahoMessageConverter());
            adapter.setOutputChannel(inboundChannel);
            adapter.setCompletionTimeout(5000);
            adapter.setTaskScheduler(mqttTaskScheduler);

            // 启动适配器
            adapter.start();
            inboundAdapters.put(clientName, adapter);

            // 添加消息处理器
            inboundChannel.subscribe(message -> {
                String topic = (String) message.getHeaders().get("mqtt_receivedTopic");

                // 调用默认处理器
                defaultMqttMessageHandler.handleMessage(message, topic, clientName);

                // 调用特定的订阅方法
                clientSubscriptions.stream()
                        .filter(subscription -> {
                            assert topic != null;
                            return topicMatches(subscription.getTopic(), topic);
                        })
                        .forEach(subscription -> {
                            try {
                              ReflectionUtils.makeAccessible(subscription.getMethod());
                              if (subscription.getMethod().getParameterCount() == 1) {
                                    subscription.getMethod().invoke(subscription.getBean(), message.getPayload());
                              } else if (subscription.getMethod().getParameterCount() == 2) {
                                    subscription.getMethod().invoke(subscription.getBean(),
                                          message.getPayload(), topic);
                              } else if (subscription.getMethod().getParameterCount() == 3) {
                                    subscription.getMethod().invoke(subscription.getBean(),
                                          message.getPayload(), topic, clientName);
                              } else {
                                    subscription.getMethod().invoke(subscription.getBean());
                              }
                            } catch (Exception e) {
                              log.error("Error invoking subscription method: {}",
                                        subscription.getMethod().getName(), e);
                            }
                        });
            });

            log.info("Started MQTT subscription adapter for client: {} with topics: {}",
                  clientName, String.join(", ", topics));
      }
    }

    private boolean topicMatches(String subscription, String actualTopic) {
      // 将主题分割为段
      String[] subParts = subscription.split("/");
      String[] topicParts = actualTopic.split("/");

      // 如果订阅主题以 # 结尾,并且前面的所有部分都匹配,则匹配
      if (subParts.length > 0 && subParts.equals("#")) {
            if (topicParts.length < subParts.length - 1) {
                return false;
            }

            for (int i = 0; i < subParts.length - 1; i++) {
                if (!subParts.equals("+") && !subParts.equals(topicParts)) {
                  return false;
                }
            }
            return true;
      }

      // 如果段数不同且不是 # 结尾,则不匹配
      if (subParts.length != topicParts.length) {
            return false;
      }

      // 检查每个段是否匹配
      for (int i = 0; i < subParts.length; i++) {
            if (!subParts.equals("+") && !subParts.equals(topicParts)) {
                return false;
            }
      }

      return true;
    }

    @Override
    public void destroy() throws Exception {
      // 关闭所有入站适配器
      for (MqttPahoMessageDrivenChannelAdapter adapter : inboundAdapters.values()) {
            try {
                adapter.stop();
            } catch (Exception e) {
                log.warn("Error stopping MQTT adapter", e);
            }
      }

      log.info("MQTT clients destroyed");
    }

    public Map<String, MqttProperties.ClientConfig> getClientConfigs() {
      Map<String, MqttProperties.ClientConfig> configs = new HashMap<>();
      configs.put("default", mqttProperties.getDefaultClient());
      configs.putAll(mqttProperties.getClients());
      return configs;
    }

    // 订阅信息内部类
    @Getter
    private static class SubscriptionInfo {
      private final Object bean;
      private final Method method;
      private final String topic;
      private final int qos;

      public SubscriptionInfo(Object bean, Method method, String topic, int qos) {
            this.bean = bean;
            this.method = method;
            this.topic = topic;
            this.qos = qos;
      }

    }
}MqttClientConfigurationpackage com.smart.common.mqtt;


import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;

/**
* <p>
* MqttSchedulerConfig
* </p >
*
* @author TL
* @version 1.0.0
*/
@Configuration
public class MqttSchedulerConfig {

    @Value("${mqtt.thread-pool.size:10}")
    private int size;

    /**
   * MQTT适配器需要一个TaskScheduler来管理连接和心跳
   */
    @Bean
    public ThreadPoolTaskScheduler mqttTaskScheduler() {
      ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
      scheduler.setPoolSize(size);
      scheduler.setThreadNamePrefix("mqtt-scheduler-");
      scheduler.setWaitForTasksToCompleteOnShutdown(true);
      scheduler.setAwaitTerminationSeconds(60);
      scheduler.setRemoveOnCancelPolicy(true);
      return scheduler;
    }
}MqttSchedulerConfigpackage com.smart.common.mqtt;


import org.springframework.integration.mqtt.core.MqttPahoClientFactory;

/**
* <p>
* MqttClientFactory
* </p >
*
* @author TL
* @version 1.0.0
*/
public interface MqttClientFactory {

    /**
   * 创建MQTT客户端工厂
   *
   * @param clientConfig 客户端配置
   * @return MQTT客户端工厂
   */
    MqttPahoClientFactory createClientFactory(MqttProperties.ClientConfig clientConfig) throws Exception;
}MqttClientFactorypackage com.smart.common.mqtt;


import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;


/**
* <p>
* MqttPahoClientFactoryImpl
* </p >
*
* @author TL
* @version 1.0.0
*/
public class MqttPahoClientFactoryImpl implements MqttClientFactory {

    @Override
    public MqttPahoClientFactory createClientFactory(MqttProperties.ClientConfig clientConfig) throws Exception {
      DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
      MqttConnectOptions options = new MqttConnectOptions();

      // 设置基本连接属性
      options.setServerURIs(new String[]{clientConfig.getServerUri()});
      if (clientConfig.getUsername() != null) {
            options.setUserName(clientConfig.getUsername());
      }
      if (clientConfig.getPassword() != null) {
            options.setPassword(clientConfig.getPassword().toCharArray());
      }
      options.setCleanSession(clientConfig.isCleanSession());
      options.setConnectionTimeout(clientConfig.getConnectionTimeout());
      options.setKeepAliveInterval(clientConfig.getKeepAliveInterval());
      options.setAutomaticReconnect(clientConfig.isAutomaticReconnect());

      factory.setConnectionOptions(options);
      return factory;
    }
}MqttPahoClientFactoryImplpackage com.smart.common.mqtt;


import org.springframework.messaging.Message;

/**
* <p>
* MqttMessageHandler
* </p >
*
* @author TL
* @version 1.0.0
*/
public interface MqttMessageHandler {
    /**
   * 处理MQTT消息
   *
   * @param message 消息
   * @param topic 主题
   * @param clientName 客户端名称
   */
    void handleMessage(Message<?> message, String topic, String clientName);
}MqttMessageHandlerpackage com.smart.common.mqtt;


import lombok.extern.slf4j.Slf4j;
import org.springframework.messaging.Message;

/**
* <p>
* DefaultMqttMessageHandler
* </p >
*
* @author TL
* @version 1.0.0
*/
@Slf4j
public class DefaultMqttMessageHandler implements MqttMessageHandler {

    @Override
    public void handleMessage(Message<?> message, String topic, String clientName) {
      log.info("Received message from client [{}] on topic [{}]: {}",
                clientName, topic, message.getPayload());
    }
}DefaultMqttMessageHandlerpackage com.smart.common.mqtt;


import lombok.extern.slf4j.Slf4j;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessagingException;
import org.springframework.util.Assert;

import java.util.Map;

/**
* <p>
* MqttTemplate
* </p >
*
* @author TL
* @version 1.0.0
*/
@Slf4j
public class MqttTemplate {

    private final Map<String, MqttPahoMessageHandler> messageHandlers;
    private final Map<String, MqttProperties.ClientConfig> clientConfigs;

    public MqttTemplate(Map<String, MqttPahoMessageHandler> messageHandlers,
                        Map<String, MqttProperties.ClientConfig> clientConfigs) {
      this.messageHandlers = messageHandlers;
      this.clientConfigs = clientConfigs;
    }

    /**
   * 发送消息到默认主题
   *
   * @param payload 消息内容
   * @param clientName 客户端名称
   */
    public void sendToDefaultTopic(Object payload, String clientName) {
      MqttProperties.ClientConfig config = getClientConfig(clientName);
      Assert.hasText(config.getDefaultTopic(),
                "Default topic not configured for client: " + clientName);

      send(payload, config.getDefaultTopic(), config.getDefaultQos(), clientName);
    }

    /**
   * 发送消息到指定主题
   *
   * @param payload 消息内容
   * @param topic 主题
   * @param clientName 客户端名称
   */
    public void send(Object payload, String topic, String clientName) {
      MqttProperties.ClientConfig config = getClientConfig(clientName);
      send(payload, topic, config.getDefaultQos(), clientName);
    }

    /**
   * 发送消息到指定主题,并指定QoS
   *
   * @param payload 消息内容
   * @param topic 主题
   * @param qos QoS等级
   * @param clientName 客户端名称
   */
    public void send(Object payload, String topic, int qos, String clientName) {
      MqttPahoMessageHandler messageHandler = messageHandlers.get(clientName);
      if (messageHandler == null) {
            throw new IllegalStateException("No MQTT client found with name: " + clientName);
      }

      Message<?> message = MessageBuilder.withPayload(payload)
                .setHeader(MqttHeaders.TOPIC, topic)
                .setHeader(MqttHeaders.QOS, qos)
                .setHeader(MqttHeaders.RETAINED, true)// 设置消息保留
                .build();

      try {
            messageHandler.handleMessage(message);
            log.debug("Sent message to topic [{}] with client [{}]", topic, clientName);
      } catch (MessagingException e) {
            log.error("Failed to send message to topic [{}] with client [{}]", topic, clientName, e);
            throw e;
      }
    }

    private MqttProperties.ClientConfig getClientConfig(String clientName) {
      MqttProperties.ClientConfig config = clientConfigs.get(clientName);
      if (config == null) {
            throw new IllegalStateException("No MQTT client configuration found with name: " + clientName);
      }
      return config;
    }
}MqttTemplateMAVEN依赖:
<dependency>
       <groupId>org.springframework.integration</groupId>
       spring-integration-mqtt</artifactId>
</dependency>配置文件:
mqtt:
    enabled: true
    thread-pool:
      size: 10
    default-client:
      server-uri: tcp://secondary-broker:1883
      username: another-user
      password: another-password
      client-id: event-${random.uuid}
      default-topic: event
      default-qos: 1
    clients:
      secondary:
            server-uri: tcp://secondary-broker:1883
            username: another-user
            password: another-password
            default-topic: secondary/topic
      monitoring:
            server-uri: tcp://secondary-broker:1883
            username: another-user
            password: another-password
            default-topic: secondary/topic使用案例:
package com.smart.web.controller.biz;

import cn.hutool.json.JSONUtil;
import com.smart.common.core.domain.AjaxResult;
import com.smart.common.mqtt.MqttSubscribe;
import com.smart.common.mqtt.MqttTemplate;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.*;

@RestController
@RequestMapping("/mqtt")
@RequiredArgsConstructor
@Slf4j
public class MultiMqttController {

    private final MqttTemplate mqttService;

    /**
   * 向指定broker的topic主题发送信息
   * @param broker
   * @param topic
   * @param message
   * @return
   */
    @PostMapping("/publish/{broker}/{topic}")
    public AjaxResult publish(
            @PathVariable("broker") String broker,
            @PathVariable("topic") String topic,
            @RequestBody Object message) {
      mqttService.send(JSONUtil.toJsonStr(message),topic,broker);
      return AjaxResult.success("Message published to "+topic + " :"+message);
    }

    /**
   * 订阅测试
   * @param message
   */
    @MqttSubscribe(topic = "event", client = "default")
    public void handle(String message) {
      log.info("[{}]-[{}] Received message: {}","default","event", message);
    }



来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

眸胝 发表于 2025-11-30 01:18:29

鼓励转贴优秀软件安全工具和文档!

碛物 发表于 2025-12-7 18:18:55

谢谢楼主提供!

班嘉淑 发表于 7 天前

喜欢鼓捣这些软件,现在用得少,谢谢分享!

崔竹 发表于 3 天前

用心讨论,共获提升!
页: [1]
查看完整版本: Springboo下的MQTT多broker实现