找回密码
 立即注册
首页 业界区 安全 若依集成WebSocket实现消息通知功能

若依集成WebSocket实现消息通知功能

章绮云 2025-5-31 23:55:14
实现效果


  • A用户和B用户同时在线时,A用户提交申请后,B用户可以收到消息通知
  • A用户在线,B用户不在线时,A用户可以提交申请,B用户上线后可以收到消息通知
  • WebSocket刷新不会断开连接
先看效果,使用admin用户向test用户发送通知进行测试:
1.gif

官方文档

我使用的是前后端分离版本的若依,前端使用的是vue3,官方文档中提供了集成WebSocket的方法:https://doc.ruoyi.vip/ruoyi-vue/document/cjjc.html#集成websocket实现实时通信 ,详细内容如下:
1、ruoyi-framework/pom.xml文件添加websocket依赖。
  1. <dependency>  
  2.    <groupId>org.springframework.boot</groupId>  
  3.    spring-boot-starter-websocket</artifactId>  
  4. </dependency>
复制代码
2、配置匿名访问(可选)
  1. // 如果需要不登录也可以访问,需要在`SecurityConfig.java`中设置匿名访问
  2. ("/websocket/**").permitAll()
复制代码
3、下载插件相关包和代码实现覆盖到工程中
  1. 提示
  2. 插件相关包和代码实现ruoyi-vue/集成websocket实现实时通信.zip
  3. 链接: https://pan.baidu.com/s/1y1g8NkelRT_pS0fIbmyP8g 提取码: mjs7
复制代码
4、测试验证
如果要测试验证可以把websocket.vue内容复制到login.vue,点击连接发送消息测试返回结果。
代码实现

官方文档中提供了SemaphoreUtils,WebSocketConfig,WebSocketServer和WebSocketUsers四个后端java文件用于集成WebSocket,我的代码仅在WebSocketServer中做修改,其余文件没有修改,修改后的代码如下。
此处为WebSocket实现消息通知的简易版思路,仅为玩具,不考虑性能,应该会存在bug,整体思路如下:
1、用户登陆成功后,向后端发送当前用户名,并在本地localstorage中存储值(用户刷新时使用)
2、后端接收到前端发送的用户名后,判断该用户是否在后端维护的用户列表中,如果不在,则新建用户,如果在,则说明该用户在前端进行了刷新,那么就不新建用户,而是更新现有用户列表中的信息
3、A用户向B用户发送消息,如果B用户在线,则通过后端直接发送消息给B用户,如果B用户不在线,则在后端维护一个缓存消息列表,保存B用户的用户名和需要发送的消息
4、当后端接收到前端新建WebSocket连接的请求后,判断该用户是否在后端维护的用户列表中,如果在,接着查找是否有发送给该用户的消息,如果有则发送消息通知,如果没有则更新之前用户列表中维护的信息,如果该用户不在已有用户列表中,则新建用户
修改后的WebSocketServer内容如下:
  1. package com.ruoyi.framework.websocket;
  2. import java.util.concurrent.ConcurrentHashMap;
  3. import java.util.concurrent.CopyOnWriteArrayList;
  4. import java.util.concurrent.Semaphore;
  5. import javax.websocket.OnClose;
  6. import javax.websocket.OnError;
  7. import javax.websocket.OnMessage;
  8. import javax.websocket.OnOpen;
  9. import javax.websocket.Session;
  10. import javax.websocket.server.ServerEndpoint;
  11. import com.fasterxml.jackson.databind.JsonNode;
  12. import com.fasterxml.jackson.databind.ObjectMapper;
  13. import org.slf4j.Logger;
  14. import org.slf4j.LoggerFactory;
  15. import org.springframework.stereotype.Component;
  16. /**
  17. * WebSocketServer 消息处理类,用于实现 WebSocket 服务端功能
  18. *
  19. * @author ruoyi
  20. */
  21. @Component
  22. @ServerEndpoint("/websocket/message")
  23. public class WebSocketServer {
  24.     /**
  25.      * WebSocketServer 日志控制器,用于记录日志信息
  26.      */
  27.     private static final Logger LOGGER = LoggerFactory.getLogger(WebSocketServer.class);
  28.     /**
  29.      * 默认最多允许同时在线人数 100
  30.      */
  31.     public static int socketMaxOnlineCount = 100;
  32.     /**
  33.      * 信号量,用于控制同时在线人数
  34.      */
  35.     private static Semaphore socketSemaphore = new Semaphore(socketMaxOnlineCount);
  36.     // 内部静态类 User,用于存储用户信息
  37.     private static class User {
  38.         private String id; // 用户 ID
  39.         private String name; // 用户名
  40.         private Session session; // 用户会话
  41.         public User(String id, String name, Session session) {
  42.             this.id = id;
  43.             this.name = name;
  44.             this.session = session;
  45.         }
  46.         // Getter 和 Setter 方法
  47.         public String getId() {
  48.             return id;
  49.         }
  50.         public void setId(String id) {
  51.             this.id = id;
  52.         }
  53.         public String getName() {
  54.             return name;
  55.         }
  56.         public void setName(String name) {
  57.             this.name = name;
  58.         }
  59.         public Session getSession() {
  60.             return session;
  61.         }
  62.         public void setSession(Session session) {
  63.             this.session = session;
  64.         }
  65.     }
  66.     /**
  67.      * 存储用户信息的 ConcurrentHashMap,键为用户 ID,值为 User 对象
  68.      */
  69.     private static ConcurrentHashMap<String, User> users = new ConcurrentHashMap<>();
  70.     /**
  71.      * 消息缓存的 ConcurrentHashMap,键为用户名,值为缓存消息列表
  72.      */
  73.     private static ConcurrentHashMap<String, CopyOnWriteArrayList<String>> messageCache = new ConcurrentHashMap<>();
  74.     /**
  75.      * 连接建立成功时调用的方法
  76.      *
  77.      * @param session 用户会话
  78.      * @throws Exception 可能抛出的异常
  79.      */
  80.     @OnOpen
  81.     public void onOpen(Session session) throws Exception {
  82.         boolean semaphoreFlag = false;
  83.         // 尝试获取信号量
  84.         semaphoreFlag = SemaphoreUtils.tryAcquire(socketSemaphore);
  85.         if (!semaphoreFlag) {
  86.             // 未获取到信号量,说明在线人数已达上限
  87.             LOGGER.error("\n 当前在线人数超过限制数- {}", socketMaxOnlineCount);
  88.             WebSocketUsers.sendMessageToUserByText(session, "当前在线人数超过限制数:" + socketMaxOnlineCount);
  89.             session.close();
  90.         } else {
  91.             // 将用户会话存储到 WebSocketUsers 中
  92.             WebSocketUsers.put(session.getId(), session);
  93.             LOGGER.info("\n 建立连接 - {}", session);
  94.             LOGGER.info("\n 当前人数 - {}", WebSocketUsers.getUsers().size());
  95.             WebSocketUsers.sendMessageToUserByText(session, "连接成功");
  96.         }
  97.     }
  98.     /**
  99.      * 连接关闭时调用的方法
  100.      *
  101.      * @param session 用户会话
  102.      */
  103.     @OnClose
  104.     public void onClose(Session session) {
  105.         LOGGER.info("\n 关闭连接 - {}", session);
  106.         // 移除用户信息,移除用户时,将该用户的session值置为null
  107.         // 用于判断该用户是否离线
  108.         for (User user : users.values()) {
  109.             if (user.getId().equals(session.getId())) {
  110.                 user.setSession(null);
  111.                 break;
  112.             }
  113.         }
  114.         boolean removeFlag = WebSocketUsers.remove(session.getId());
  115.         if (!removeFlag) {
  116.             // 如果移除失败,则释放信号量
  117.             SemaphoreUtils.release(socketSemaphore);
  118.         }
  119.     }
  120.     /**
  121.      * 抛出异常时调用的方法
  122.      *
  123.      * @param session     用户会话
  124.      * @param exception   异常信息
  125.      * @throws Exception  可能抛出的异常
  126.      */
  127.     @OnError
  128.     public void onError(Session session, Throwable exception) throws Exception {
  129.         if (session.isOpen()) {
  130.             // 如果会话仍然开启,则关闭会话
  131.             session.close();
  132.         }
  133.         String sessionId = session.getId();
  134.         LOGGER.info("\n 连接异常 - {}", sessionId);
  135.         LOGGER.info("\n 异常信息 - {}", exception);
  136.         // 移除用户信息并释放信号量
  137.         WebSocketUsers.remove(sessionId);
  138.         SemaphoreUtils.release(socketSemaphore);
  139.     }
  140.     /**
  141.      * 服务器接收到客户端消息时调用的方法
  142.      *
  143.      * @param message 消息内容
  144.      * @param session 用户会话
  145.      */
  146.     @OnMessage
  147.     public void onMessage(String message, Session session) {
  148.         try {
  149.             // 使用 Jackson 解析 JSON 消息
  150.             ObjectMapper objectMapper = new ObjectMapper();
  151.             JsonNode jsonNode = objectMapper.readTree(message);
  152.             if (jsonNode.has("username") && !jsonNode.get("username").asText().isEmpty()) {
  153.                 String username = jsonNode.get("username").asText();
  154.                 // 检查用户是否已存在
  155.                 boolean userExists = false;
  156.                 for (User user : users.values()) {
  157.                     if (user.getName().equals(username)) {
  158.                         userExists = true;
  159.                         // 更新用户的 ID 和会话
  160.                         user.setId(session.getId());
  161.                         user.setSession(session);
  162.                         // 检查该用户是否有缓存消息
  163.                         searchCache(username, user);
  164.                         LOGGER.info("\n用户 {} 已更新", username);
  165.                         LOGGER.info("\n当前用户列表 {}", users);
  166.                         break;
  167.                     }
  168.                 }
  169.                 if (!userExists) {
  170.                     // 新建用户并存储到 users 中
  171.                     User user = new User(session.getId(), username, session);
  172.                     users.put(session.getId(), user);
  173.                     LOGGER.info("\n用户 {} 已连接", username);
  174.                     LOGGER.info("\n当前用户列表 {}", users);
  175.                     // 检查是否有缓存消息
  176.                     searchCache(username, user);
  177.                 }
  178.             } else if (jsonNode.has("targetusername") && jsonNode.has("sendmessage")) {
  179.                 String targetusername = jsonNode.get("targetusername").asText();
  180.                 String sendmessage = jsonNode.get("sendmessage").asText();
  181.                 boolean userExists = false;
  182.                 // 查找目标用户,如果用户的session为空,则表示用户离线
  183.                 // 将通知存放到缓存消息列表中,如果不为空则表示用户在线,直接发送通知
  184.                 for (User user : users.values()) {
  185.                     if (user.getName().equals(targetusername)) {
  186.                         if (user.getSession() != null) {
  187.                             userExists = true;
  188.                             WebSocketUsers.sendMessageToUserByText(user.getSession(), sendmessage);
  189.                         }
  190.                         break;
  191.                     }
  192.                 }
  193.                 LOGGER.info("\n目标用户 {}", targetusername);
  194.                 LOGGER.info("\n发送消息 {}", sendmessage);
  195.                 LOGGER.info("\n当前用户列表 {}", users);
  196.                 if (!userExists) {
  197.                     // 如果目标用户不在线,则缓存通知
  198.                     messageCache.computeIfAbsent(targetusername, k -> new CopyOnWriteArrayList<String>()).add(sendmessage);
  199.                     LOGGER.info("\n消息已缓存,等待用户 {} 上线", targetusername);
  200.                 }
  201.             }
  202.         } catch (Exception e) {
  203.             // 如果解析失败,则按普通消息处理
  204.             String msg = message.replace("你", "我").replace("吗", "");
  205.             WebSocketUsers.sendMessageToUserByText(session, msg);
  206.         }
  207.     }
  208.     /**
  209.      * 检查一个用户的缓存通知并发送
  210.      *
  211.      * @param username 用户名
  212.      * @param user     用户对象
  213.      */
  214.     public void searchCache(String username, User user) {
  215.         if (messageCache.containsKey(username)) {
  216.             CopyOnWriteArrayList<String> cachedMessages = messageCache.get(username);
  217.             for (String cachedMessage : cachedMessages) {
  218.                 WebSocketUsers.sendMessageToUserByText(user.getSession(), cachedMessage);
  219.                 LOGGER.info("\n向用户 {} 发送缓存消息 - {}", username, cachedMessage);
  220.             }
  221.             // 清理缓存通知
  222.             messageCache.remove(username);
  223.             LOGGER.info("\n已清理用户 {} 的缓存消息", username);
  224.         }
  225.     }
  226. }
复制代码
为了维护WebSocket的状态,在pinia store中新建了websocket.js文件(路径为RuoYi-Vue3-master\src\store\modules\websocket.js)。
websocket.js内容如下:
  1. import { ref } from 'vue';
  2. import { ElNotification } from 'element-plus'; // 引入 Element UI 的通知功能
  3. import 'element-plus/dist/index.css';          // 引入 Element UI 的通知功能样式
  4. export const useWebSocketStore = defineStore('websocket', () => {
  5.   const ws = ref(null);
  6.   const connected = ref(false);
  7.   function initWebSocket(username) {
  8.     const wsuri = "ws://127.0.0.1:8080/websocket/message"; // WebSocket 服务器地址
  9.     ws.value = new WebSocket(wsuri);
  10.     ws.value.onopen = () => {
  11.       console.log('WebSocket 已连接');
  12.       connected.value = true;
  13.       ws.value.send(JSON.stringify({ username: username }));
  14.     };
  15.     ws.value.onmessage = (event) => {
  16.       console.log('收到消息:', event.data);
  17.       // 在这里处理收到的消息
  18.       if (event.data != "连接成功") {    // 用于测试,所以消息字符都是写死的
  19.         ElNotification({
  20.             title: '消息通知',
  21.             message: '您收到一条消息',
  22.             type: 'success',
  23.             position: 'bottom-right',
  24.         });
  25.       }
  26.     };
  27.     ws.value.onclose = () => {
  28.       console.log('WebSocket 已断开');
  29.       connected.value = false;
  30.     };
  31.     ws.value.onerror = (event) => {
  32.       console.error('WebSocket 错误:', event);
  33.       connected.value = false;
  34.     };
  35.   }
  36.   // 发送消息
  37.   function sendmessage(username, message) {
  38.     console.log(ws.value);
  39.     console.log(message);
  40.     if (message == "请求审批") {    // 用于测试,所以消息字符都是写死的
  41.         ws.value.send(JSON.stringify({ targetusername: username, sendmessage: message }));
  42.     } else {
  43.         ws.value.send(JSON.stringify({ logoutname: username, logoutmessage: message }));
  44.     }
  45.   }
  46.   
  47.   function closeWebSocket() {    // 此处仅为测试,写了close,但是没有使用,
  48.     if (ws.value) {
  49.       ws.value.close();
  50.       ws.value = null;
  51.       connected.value = false;
  52.     }
  53.   }
  54.   return { ws, initWebSocket, closeWebSocket, sendmessage };
  55. });
复制代码
我判断用户是否登陆的思路是,如果加载了(RuoYi-Vue3-master\src\layout)页面,那么用户肯定是登陆状态,所以就在localstorage中写入websocket:1,然后新建一个WebSocket连接,实现代码如下,如果用户刷新了,那么WebSocket连接会断开,但是用户刷新时,该组件会被重新加载,因此在onMounted中判断localstorage中是否存在标志,如果存在说明该用户在后端的用户列表中,那么直接将username发送给后端,后端在判断时会更新用户列表中的内容,从而实现用户刷新但是WebSocket没有断开。
修改后的layout代码如下:
  1. ...省略...
  2. ...省略...
复制代码
我在测试文件中添加了以一个申请按钮,并绑定了一个事件,实现代码如下:
  1. <template>
  2. ...省略...
  3. // 按钮代码
  4. <el-col :span="1.5">
  5.   <el-button
  6.     type="info"
  7.     plain
  8.     icon="DocumentAdd"
  9.     @click="handleApply"
  10.   >申请</el-button>
  11. </el-col>
  12. ...省略...
  13. </template>
复制代码
至此,就实现了一个简易版本的用户消息通知功能,如果要实现不同用户之间的审批功能(玩具),只需要在前端根据用户角色的不同展示不同的页面就可以了,大致效果如下。
  1. 1.前端通过import { checkRole } from "@/utils/permission";并在标签页中使用v-if来判断用户的角色以显示不同的页面。
  2. 2.通过自定义监听事件实现收到消息后表格刷新。
  3. (1)前端页面<script>中添加:
  4. // 定义一个方法来刷新数据
  5. function refreshData() {
  6.   getList();
  7. }
  8. // 在组件挂载时监听自定义事件
  9. onMounted(() => {
  10.   window.addEventListener('refreshData', refreshData);
  11. });
  12. // 在组件卸载时移除事件监听器
  13. onUnmounted(() => {
  14.   window.removeEventListener('refreshData', refreshData);
  15. });
  16. (2)websocket.js中在ElNotification后添加:
  17. ws.value.onmessage = (event) => {
  18.   console.log('收到消息:', event.data);
  19.   // 在这里处理收到的消息
  20.   if (event.data != "连接成功") {
  21.     ElNotification({
  22.         title: '消息通知',
  23.         message: '您收到一条消息',
  24.         type: 'success',
  25.         position: 'bottom-right',
  26.     });
  27.     // 收到消息后0.5秒刷新页面
  28.     setTimeout(() => {
  29.       // 收到消息后触发自定义事件来刷新数据
  30.       const event = new Event('refreshData');
  31.       window.dispatchEvent(event);
  32.     }, 500);
  33.   }
  34. };
复制代码
2.gif


来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
您需要登录后才可以回帖 登录 | 立即注册