Springboo下的MQTT多broker实现
参考项目:GitHub - mqtt-spring-boot-starter背景说明:
和原作者一样,也是身处IOT物联网公司,身不由己,哈哈
实现功能:
1、多broker连接,这是原作者造好的轮子
2、指定信息发布,源码上有些问题,后面做了修复处理,已经可以正常使用了。
并且由于业务场景不一样,将原作者的点对点发送,修改为订阅|发布模式。
具体位置位于 MqttClientConfiguration文件的initializeMqttClient方法中的对应【入站通道】
3、ssl连接,目前没用上,先删除了
业务代码:
package com.smart.common.mqtt;
import org.springframework.stereotype.Component;
import java.lang.annotation.*;
/**
* <p>
* MqttClient
* </p >
*
* @author TL
* @version 1.0.0
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Component
public @interface MqttClient {
/**
* MQTT客户端ID
*/
String value() default "";
/**
* 客户端名称,用于配置中引用
*/
String name();
}@MqttClientpackage com.smart.common.mqtt;
import org.springframework.core.annotation.AliasFor;
import java.lang.annotation.*;
/**
* <p>
* MqttSubscribe
* </p >
*
* @author TL
* @version 1.0.0
*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface MqttSubscribe {
/**
* 订阅的主题
*/
@AliasFor("topic")
String value() default "";
/**
* 订阅的主题
*/
@AliasFor("value")
String topic() default "";
/**
* QoS质量等级:0, 1, 2
*/
int qos() default 1;
/**
* 客户端名称,用于指定哪个MQTT客户端处理该订阅
*/
String client() default "default";
}@MqttSubscribepackage com.smart.common.mqtt;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.NestedConfigurationProperty;
import java.util.HashMap;
import java.util.Map;
/**
* <p>
* MqttProperties
* </p >
*
* @author TL
* @version 1.0.0
*/
@Data
@ConfigurationProperties(prefix = "mqtt")
public class MqttProperties {
/**
* 是否启用MQTT
*/
private boolean enabled = true;
/**
* 线程池配置
*/
@NestedConfigurationProperty
private MqttSchedulerConfig threadPool = new MqttSchedulerConfig();
/**
* 默认客户端配置
*/
@NestedConfigurationProperty
private ClientConfig defaultClient = new ClientConfig();
/**
* 多客户端配置,key为客户端名称
*/
private Map<String, ClientConfig> clients = new HashMap<>();
@Data
public static class ClientConfig {
/**
* MQTT服务器地址,例如:tcp://localhost:1883或ssl://localhost:8883
*/
private String serverUri = "tcp://localhost:1883";
/**
* 客户端ID
*/
private String clientId = "mqtt-client-" + System.currentTimeMillis();
/**
* 用户名
*/
private String username;
/**
* 密码
*/
private String password;
/**
* 清除会话
*/
private boolean cleanSession = true;
/**
* 连接超时时间(秒)
*/
private int connectionTimeout = 30;
/**
* 保持连接心跳时间(秒)
*/
private int keepAliveInterval = 60;
/**
* 是否自动重连
*/
private boolean automaticReconnect = true;
/**
* 默认的QoS级别
*/
private int defaultQos = 1;
/**
* 默认主题
*/
private String defaultTopic;
}
}MqttPropertiespackage com.smart.common.mqtt;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
/**
* <p>
* MqttAutoConfiguration
* </p >
*
* @author TL
* @version 1.0.0
*/
@Configuration
@EnableConfigurationProperties(MqttProperties.class)
@ConditionalOnProperty(prefix = "mqtt", name = "enabled", havingValue = "true", matchIfMissing = true)
public class MqttAutoConfiguration {
@Bean
@ConditionalOnMissingBean
public MqttClientFactory mqttClientFactory() {
return new MqttPahoClientFactoryImpl();
}
/**
* 配置MQTT消息处理器Bean
* 当Spring容器中不存在该Bean时才会创建
*
* @return MqttMessageHandler 返回一个默认的MQTT消息处理器实例
*/
@Bean // 条注解:当Spring容器中不存在相同类型的Bean时,才会创建这个Bean
@ConditionalOnMissingBean
public MqttMessageHandler mqttMessageHandler() { // 创建并返回一个默认的MQTT消息处理器实例
return new DefaultMqttMessageHandler();
}
@Bean
public MqttClientConfiguration mqttClientConfiguration(MqttProperties mqttProperties,
MqttClientFactory mqttClientFactory,
MqttMessageHandler mqttMessageHandler,
ThreadPoolTaskScheduler mqttTaskScheduler) { // 注入调度器
return new MqttClientConfiguration(mqttProperties, mqttClientFactory, mqttMessageHandler);
}
@Bean
public MqttTemplate mqttTemplate(MqttClientConfiguration mqttClientConfiguration) {
return new MqttTemplate(
mqttClientConfiguration.getOutboundHandlers(),
mqttClientConfiguration.getClientConfigs());
}
}MqttAutoConfigurationpackage com.smart.common.mqtt;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.aop.framework.AopProxyUtils;
import org.springframework.aop.support.AopUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.integration.channel.AbstractSubscribableChannel;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.PublishSubscribeChannel;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.util.Assert;
import org.springframework.util.ReflectionUtils;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* <p>
* MqttClientConfiguration
* </p >
*
* @author TL
* @version 1.0.0
*/
@Slf4j
public class MqttClientConfiguration implements BeanPostProcessor,
ApplicationListener<ContextRefreshedEvent>,
DisposableBean,
BeanFactoryAware {
private final MqttProperties mqttProperties;
private final MqttClientFactory mqttClientFactory;
private final MqttMessageHandler defaultMqttMessageHandler;
@Autowired
private ThreadPoolTaskScheduler mqttTaskScheduler;
// 存储客户端工厂
private final Map<String, MqttPahoClientFactory> clientFactories = new ConcurrentHashMap<>();
// 存储MQTT出站处理器
@Getter
private final Map<String, MqttPahoMessageHandler> outboundHandlers = new ConcurrentHashMap<>();
// 存储MQTT入站适配器
private final Map<String, MqttPahoMessageDrivenChannelAdapter> inboundAdapters = new ConcurrentHashMap<>();
// 存储消息通道
private final Map<String, AbstractSubscribableChannel> channels = new ConcurrentHashMap<>();
// 存储订阅信息
private final Map<String, List<SubscriptionInfo>> subscriptions = new ConcurrentHashMap<>();
private boolean initialized = false;
@Autowired
public MqttClientConfiguration(MqttProperties mqttProperties,
MqttClientFactory mqttClientFactory,
MqttMessageHandler defaultMqttMessageHandler) {
this.mqttProperties = mqttProperties;
this.mqttClientFactory = mqttClientFactory;
this.defaultMqttMessageHandler = defaultMqttMessageHandler;
}
@Override
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
}
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
return bean;
}
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
Class<?> targetClass = AopUtils.isAopProxy(bean) ?
AopProxyUtils.ultimateTargetClass(bean) : bean.getClass();
// 处理MqttClient注解
MqttClient mqttClientAnnotation = AnnotationUtils.findAnnotation(targetClass, MqttClient.class);
if (mqttClientAnnotation != null) {
log.info("Found MQTT client: {}", mqttClientAnnotation.name());
}
// 查找带有MqttSubscribe注解的方法
ReflectionUtils.doWithMethods(targetClass, method -> {
MqttSubscribe mqttSubscribe = AnnotationUtils.findAnnotation(method, MqttSubscribe.class);
if (mqttSubscribe != null) {
registerSubscription(bean, method, mqttSubscribe);
}
});
return bean;
}
private void registerSubscription(Object bean, Method method, MqttSubscribe mqttSubscribe) {
String topic = mqttSubscribe.topic().isEmpty() ? mqttSubscribe.value() : mqttSubscribe.topic();
Assert.hasText(topic, "Topic must be specified in @MqttSubscribe annotation");
String clientName = mqttSubscribe.client();
int qos = mqttSubscribe.qos();
log.info("Registering MQTT subscription: topic={}, qos={}, client={}, method={}.{}",
topic, qos, clientName, bean.getClass().getSimpleName(), method.getName());
// 将订阅信息存储起来,等待context刷新后统一处理
SubscriptionInfo subscriptionInfo = new SubscriptionInfo(bean, method, topic, qos);
subscriptions.computeIfAbsent(clientName, k -> new ArrayList<>()).add(subscriptionInfo);
}
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
if (initialized || !mqttProperties.isEnabled()) {
return;
}
try {
// 初始化所有MQTT客户端
initializeMqttClients();
// 处理所有订阅
processSubscriptions();
initialized = true;
log.info("MQTT clients initialized successfully");
} catch (Exception e) {
log.error("Failed to initialize MQTT clients", e);
throw new RuntimeException("Failed to initialize MQTT clients", e);
}
}
private void initializeMqttClients() throws Exception {
// 初始化默认客户端
initializeMqttClient("default", mqttProperties.getDefaultClient());
// 初始化其他客户端
for (Map.Entry<String, MqttProperties.ClientConfig> entry : mqttProperties.getClients().entrySet()) {
initializeMqttClient(entry.getKey(), entry.getValue());
}
}
private void initializeMqttClient(String clientName, MqttProperties.ClientConfig config) throws Exception {
// 创建MQTT客户端工厂
MqttPahoClientFactory clientFactory = mqttClientFactory.createClientFactory(config);
clientFactories.put(clientName, clientFactory);
// 创建入站通道
PublishSubscribeChannel inboundChannel = new PublishSubscribeChannel();
channels.put(clientName + "-inbound", inboundChannel);
// 创建出站通道
DirectChannel outboundChannel = new DirectChannel();
channels.put(clientName + "-outbound", outboundChannel);
// 创建出站处理器
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(
config.getClientId() + "-outbound", clientFactory);
messageHandler.setAsync(true);
if (config.getDefaultTopic() != null) {
messageHandler.setDefaultTopic(config.getDefaultTopic());
}
messageHandler.setDefaultQos(config.getDefaultQos());
messageHandler.setConverter(new DefaultPahoMessageConverter());
outboundHandlers.put(clientName, messageHandler);
log.debug("Initialized MQTT client: {}", clientName);
}
private void processSubscriptions() {
// 为每个客户端创建订阅适配器
for (Map.Entry<String, List<SubscriptionInfo>> entry : subscriptions.entrySet()) {
String clientName = entry.getKey();
List<SubscriptionInfo> clientSubscriptions = entry.getValue();
if (clientSubscriptions.isEmpty()) {
continue;
}
// 获取客户端配置
MqttProperties.ClientConfig config = clientName.equals("default") ?
mqttProperties.getDefaultClient() : mqttProperties.getClients().get(clientName);
if (config == null) {
log.warn("No configuration found for MQTT client: {}, skipping subscriptions", clientName);
continue;
}
// 获取客户端工厂
MqttPahoClientFactory clientFactory = clientFactories.get(clientName);
if (clientFactory == null) {
log.warn("No factory found for MQTT client: {}, skipping subscriptions", clientName);
continue;
}
// 获取入站通道
AbstractSubscribableChannel inboundChannel = channels.get(clientName + "-inbound");
// 创建入站适配器
String[] topics = clientSubscriptions.stream()
.map(SubscriptionInfo::getTopic)
.distinct()
.toArray(String[]::new);
int[] qos = clientSubscriptions.stream()
.map(SubscriptionInfo::getQos)
.mapToInt(Integer::intValue)
.toArray();
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
config.getClientId() + "-inbound", clientFactory, topics);
adapter.setQos(qos);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setOutputChannel(inboundChannel);
adapter.setCompletionTimeout(5000);
adapter.setTaskScheduler(mqttTaskScheduler);
// 启动适配器
adapter.start();
inboundAdapters.put(clientName, adapter);
// 添加消息处理器
inboundChannel.subscribe(message -> {
String topic = (String) message.getHeaders().get("mqtt_receivedTopic");
// 调用默认处理器
defaultMqttMessageHandler.handleMessage(message, topic, clientName);
// 调用特定的订阅方法
clientSubscriptions.stream()
.filter(subscription -> {
assert topic != null;
return topicMatches(subscription.getTopic(), topic);
})
.forEach(subscription -> {
try {
ReflectionUtils.makeAccessible(subscription.getMethod());
if (subscription.getMethod().getParameterCount() == 1) {
subscription.getMethod().invoke(subscription.getBean(), message.getPayload());
} else if (subscription.getMethod().getParameterCount() == 2) {
subscription.getMethod().invoke(subscription.getBean(),
message.getPayload(), topic);
} else if (subscription.getMethod().getParameterCount() == 3) {
subscription.getMethod().invoke(subscription.getBean(),
message.getPayload(), topic, clientName);
} else {
subscription.getMethod().invoke(subscription.getBean());
}
} catch (Exception e) {
log.error("Error invoking subscription method: {}",
subscription.getMethod().getName(), e);
}
});
});
log.info("Started MQTT subscription adapter for client: {} with topics: {}",
clientName, String.join(", ", topics));
}
}
private boolean topicMatches(String subscription, String actualTopic) {
// 将主题分割为段
String[] subParts = subscription.split("/");
String[] topicParts = actualTopic.split("/");
// 如果订阅主题以 # 结尾,并且前面的所有部分都匹配,则匹配
if (subParts.length > 0 && subParts.equals("#")) {
if (topicParts.length < subParts.length - 1) {
return false;
}
for (int i = 0; i < subParts.length - 1; i++) {
if (!subParts.equals("+") && !subParts.equals(topicParts)) {
return false;
}
}
return true;
}
// 如果段数不同且不是 # 结尾,则不匹配
if (subParts.length != topicParts.length) {
return false;
}
// 检查每个段是否匹配
for (int i = 0; i < subParts.length; i++) {
if (!subParts.equals("+") && !subParts.equals(topicParts)) {
return false;
}
}
return true;
}
@Override
public void destroy() throws Exception {
// 关闭所有入站适配器
for (MqttPahoMessageDrivenChannelAdapter adapter : inboundAdapters.values()) {
try {
adapter.stop();
} catch (Exception e) {
log.warn("Error stopping MQTT adapter", e);
}
}
log.info("MQTT clients destroyed");
}
public Map<String, MqttProperties.ClientConfig> getClientConfigs() {
Map<String, MqttProperties.ClientConfig> configs = new HashMap<>();
configs.put("default", mqttProperties.getDefaultClient());
configs.putAll(mqttProperties.getClients());
return configs;
}
// 订阅信息内部类
@Getter
private static class SubscriptionInfo {
private final Object bean;
private final Method method;
private final String topic;
private final int qos;
public SubscriptionInfo(Object bean, Method method, String topic, int qos) {
this.bean = bean;
this.method = method;
this.topic = topic;
this.qos = qos;
}
}
}MqttClientConfigurationpackage com.smart.common.mqtt;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
/**
* <p>
* MqttSchedulerConfig
* </p >
*
* @author TL
* @version 1.0.0
*/
@Configuration
public class MqttSchedulerConfig {
@Value("${mqtt.thread-pool.size:10}")
private int size;
/**
* MQTT适配器需要一个TaskScheduler来管理连接和心跳
*/
@Bean
public ThreadPoolTaskScheduler mqttTaskScheduler() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setPoolSize(size);
scheduler.setThreadNamePrefix("mqtt-scheduler-");
scheduler.setWaitForTasksToCompleteOnShutdown(true);
scheduler.setAwaitTerminationSeconds(60);
scheduler.setRemoveOnCancelPolicy(true);
return scheduler;
}
}MqttSchedulerConfigpackage com.smart.common.mqtt;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
/**
* <p>
* MqttClientFactory
* </p >
*
* @author TL
* @version 1.0.0
*/
public interface MqttClientFactory {
/**
* 创建MQTT客户端工厂
*
* @param clientConfig 客户端配置
* @return MQTT客户端工厂
*/
MqttPahoClientFactory createClientFactory(MqttProperties.ClientConfig clientConfig) throws Exception;
}MqttClientFactorypackage com.smart.common.mqtt;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
/**
* <p>
* MqttPahoClientFactoryImpl
* </p >
*
* @author TL
* @version 1.0.0
*/
public class MqttPahoClientFactoryImpl implements MqttClientFactory {
@Override
public MqttPahoClientFactory createClientFactory(MqttProperties.ClientConfig clientConfig) throws Exception {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
// 设置基本连接属性
options.setServerURIs(new String[]{clientConfig.getServerUri()});
if (clientConfig.getUsername() != null) {
options.setUserName(clientConfig.getUsername());
}
if (clientConfig.getPassword() != null) {
options.setPassword(clientConfig.getPassword().toCharArray());
}
options.setCleanSession(clientConfig.isCleanSession());
options.setConnectionTimeout(clientConfig.getConnectionTimeout());
options.setKeepAliveInterval(clientConfig.getKeepAliveInterval());
options.setAutomaticReconnect(clientConfig.isAutomaticReconnect());
factory.setConnectionOptions(options);
return factory;
}
}MqttPahoClientFactoryImplpackage com.smart.common.mqtt;
import org.springframework.messaging.Message;
/**
* <p>
* MqttMessageHandler
* </p >
*
* @author TL
* @version 1.0.0
*/
public interface MqttMessageHandler {
/**
* 处理MQTT消息
*
* @param message 消息
* @param topic 主题
* @param clientName 客户端名称
*/
void handleMessage(Message<?> message, String topic, String clientName);
}MqttMessageHandlerpackage com.smart.common.mqtt;
import lombok.extern.slf4j.Slf4j;
import org.springframework.messaging.Message;
/**
* <p>
* DefaultMqttMessageHandler
* </p >
*
* @author TL
* @version 1.0.0
*/
@Slf4j
public class DefaultMqttMessageHandler implements MqttMessageHandler {
@Override
public void handleMessage(Message<?> message, String topic, String clientName) {
log.info("Received message from client [{}] on topic [{}]: {}",
clientName, topic, message.getPayload());
}
}DefaultMqttMessageHandlerpackage com.smart.common.mqtt;
import lombok.extern.slf4j.Slf4j;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessagingException;
import org.springframework.util.Assert;
import java.util.Map;
/**
* <p>
* MqttTemplate
* </p >
*
* @author TL
* @version 1.0.0
*/
@Slf4j
public class MqttTemplate {
private final Map<String, MqttPahoMessageHandler> messageHandlers;
private final Map<String, MqttProperties.ClientConfig> clientConfigs;
public MqttTemplate(Map<String, MqttPahoMessageHandler> messageHandlers,
Map<String, MqttProperties.ClientConfig> clientConfigs) {
this.messageHandlers = messageHandlers;
this.clientConfigs = clientConfigs;
}
/**
* 发送消息到默认主题
*
* @param payload 消息内容
* @param clientName 客户端名称
*/
public void sendToDefaultTopic(Object payload, String clientName) {
MqttProperties.ClientConfig config = getClientConfig(clientName);
Assert.hasText(config.getDefaultTopic(),
"Default topic not configured for client: " + clientName);
send(payload, config.getDefaultTopic(), config.getDefaultQos(), clientName);
}
/**
* 发送消息到指定主题
*
* @param payload 消息内容
* @param topic 主题
* @param clientName 客户端名称
*/
public void send(Object payload, String topic, String clientName) {
MqttProperties.ClientConfig config = getClientConfig(clientName);
send(payload, topic, config.getDefaultQos(), clientName);
}
/**
* 发送消息到指定主题,并指定QoS
*
* @param payload 消息内容
* @param topic 主题
* @param qos QoS等级
* @param clientName 客户端名称
*/
public void send(Object payload, String topic, int qos, String clientName) {
MqttPahoMessageHandler messageHandler = messageHandlers.get(clientName);
if (messageHandler == null) {
throw new IllegalStateException("No MQTT client found with name: " + clientName);
}
Message<?> message = MessageBuilder.withPayload(payload)
.setHeader(MqttHeaders.TOPIC, topic)
.setHeader(MqttHeaders.QOS, qos)
.setHeader(MqttHeaders.RETAINED, true)// 设置消息保留
.build();
try {
messageHandler.handleMessage(message);
log.debug("Sent message to topic [{}] with client [{}]", topic, clientName);
} catch (MessagingException e) {
log.error("Failed to send message to topic [{}] with client [{}]", topic, clientName, e);
throw e;
}
}
private MqttProperties.ClientConfig getClientConfig(String clientName) {
MqttProperties.ClientConfig config = clientConfigs.get(clientName);
if (config == null) {
throw new IllegalStateException("No MQTT client configuration found with name: " + clientName);
}
return config;
}
}MqttTemplateMAVEN依赖:
<dependency>
<groupId>org.springframework.integration</groupId>
spring-integration-mqtt</artifactId>
</dependency>配置文件:
mqtt:
enabled: true
thread-pool:
size: 10
default-client:
server-uri: tcp://secondary-broker:1883
username: another-user
password: another-password
client-id: event-${random.uuid}
default-topic: event
default-qos: 1
clients:
secondary:
server-uri: tcp://secondary-broker:1883
username: another-user
password: another-password
default-topic: secondary/topic
monitoring:
server-uri: tcp://secondary-broker:1883
username: another-user
password: another-password
default-topic: secondary/topic使用案例:
package com.smart.web.controller.biz;
import cn.hutool.json.JSONUtil;
import com.smart.common.core.domain.AjaxResult;
import com.smart.common.mqtt.MqttSubscribe;
import com.smart.common.mqtt.MqttTemplate;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.*;
@RestController
@RequestMapping("/mqtt")
@RequiredArgsConstructor
@Slf4j
public class MultiMqttController {
private final MqttTemplate mqttService;
/**
* 向指定broker的topic主题发送信息
* @param broker
* @param topic
* @param message
* @return
*/
@PostMapping("/publish/{broker}/{topic}")
public AjaxResult publish(
@PathVariable("broker") String broker,
@PathVariable("topic") String topic,
@RequestBody Object message) {
mqttService.send(JSONUtil.toJsonStr(message),topic,broker);
return AjaxResult.success("Message published to "+topic + " :"+message);
}
/**
* 订阅测试
* @param message
*/
@MqttSubscribe(topic = "event", client = "default")
public void handle(String message) {
log.info("[{}]-[{}] Received message: {}","default","event", message);
}
}
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! 鼓励转贴优秀软件安全工具和文档! 谢谢楼主提供! 喜欢鼓捣这些软件,现在用得少,谢谢分享! 用心讨论,共获提升!
页:
[1]