唐嘉懿 发表于 2025-7-17 08:57:22

如何用WebSocket打造毫秒级实时协作系统?

扫描二维码
关注或者微信搜一搜:编程智域 前端至全栈交流与成长
发现1000+提升效率与开发的AI工具和实用程序:https://tools.cmdragon.cn/
第六章:全栈项目实战示例:实时协作系统

一、需求分析:实时白板/协同编辑场景

实时协作系统需要实现多用户同时操作同一文档/白板,并实时同步所有变更。核心需求包括:

[*]毫秒级延迟:用户操作需在300ms内同步给所有参与者
[*]操作一致性:保证最终所有客户端呈现相同内容
[*]冲突处理:解决多用户同时修改同一区域的问题
[*]状态恢复:断线重连后自动同步最新状态
sequenceDiagram    participant 用户A    participant 用户B    participant 服务器      用户A ->> 服务器: 用户A登录    服务器 -->> 用户A: 登录成功响应      用户B ->> 服务器: 用户B登录    服务器 -->> 用户B: 登录成功响应      用户A ->> 服务器: 用户A创建新的白板会话    服务器 -->> 用户A: 白板会话ID      用户B ->> 服务器: 加入现有的白板会话 (会话ID)    服务器 -->> 用户B: 加入成功响应      用户A ->> 服务器: 用户A在白板上绘图/编辑    服务器 -->> 用户B: 更新操作      用户B ->> 服务器: 用户B在白板上绘图/编辑    服务器 -->> 用户A: 更新操作      用户A ->> 服务器: 保存白板内容    服务器 -->> 用户A: 保存成功确认      用户B ->> 服务器: 检视白板内容    服务器 -->> 用户B: 白板内容数据      用户A ->> 服务器: 退出白板会话    服务器 -->> 用户A: 退出成功确认      用户B ->> 服务器: 退出白板会话    服务器 -->> 用户B: 退出成功确认二、后端WebSocket服务搭建

依赖安装:
pip install fastapi==0.104.0 websockets==12.0 uvicorn==0.23.2 pydantic==2.5.2核心代码实现:
import asyncio
import logging
from typing import List, Dict

from fastapi import FastAPI, WebSocket
from pydantic import BaseModel

app = FastAPI()
logger = logging.getLogger("uvicorn.error")


class Operation(BaseModel):
    type: str# "insert" or "delete"
    position: int
    content: str = ""# 插入内容
    length: int = 1# 删除长度
    client_id: str = ""# 客户端标识
    version: int = 0# 操作版本号


# OT转换引擎(示例)
class OTEngine:
    @staticmethod
    def transform(op1: Operation, op2: Operation) -> Operation:
      """操作转换核心算法"""
      # 插入 vs 插入
      if op1.type == "insert" and op2.type == "insert":
            if op1.position < op2.position:
                return Operation(**{**op2.dict(), "position": op2.position + len(op1.content)})
            elif op1.position > op2.position:
                return op2
            else:# 相同位置按客户端ID排序
                return op2 if op1.client_id < op2.client_id else Operation(
                  **{**op2.dict(), "position": op2.position + len(op1.content)})

      # 插入 vs 删除
      elif op1.type == "insert" and op2.type == "delete":
            if op1.position <= op2.position:
                return Operation(**{**op2.dict(), "position": op2.position + len(op1.content)})
            else:
                return Operation(**{**op2.dict(), "position": op2.position})

      # 删除 vs 插入
      elif op1.type == "delete" and op2.type == "insert":
            if op1.position < op2.position:
                return Operation(**{**op2.dict(), "position": max(op2.position - op1.length, 0)})
            else:
                return op2

      # 删除 vs 删除
      else:
            if op1.position < op2.position:
                return Operation(**{**op2.dict(), "position": max(op2.position - op1.length, 0)})
            elif op1.position > op2.position:
                return Operation(**{**op2.dict(), "position": op2.position})
            else:# 相同位置取范围更大的删除
                return op2 if op1.length >= op2.length else Operation(**{**op2.dict(), "length": op1.length})


# 协同编辑房间管理器
class CollaborationRoom:
    def __init__(self, room_id: str):
      self.room_id = room_id
      self.connections = set()# 实际使用redis实现, 这里使用set模拟
      self.document = ""
      self.version = 0
      self.pending_ops: List = []
      self.lock = asyncio.Lock()
      self.client_states: Dict = {}# 客户端最后确认版本

    async def add_connection(self, websocket: WebSocket, client_id: str):
      async with self.lock:
            self.connections.add(websocket)
            self.client_states = self.version
            # 发送初始状态
            await websocket.send_json({
                "type": "snapshot",
                "document": self.document,
                "version": self.version
            })

    async def remove_connection(self, websocket: WebSocket, client_id: str):
      async with self.lock:
            self.connections.discard(websocket)
            if client_id in self.client_states:
                del self.client_states

    async def apply_operation(self, operation: Operation):
      """应用操作转换并更新文档"""
      async with self.lock:
            # 转换所有待处理操作
            transformed_op = operation
            for pending in self.pending_ops:
                transformed_op = OTEngine.transform(pending, transformed_op)

            # 应用转换后的操作
            if transformed_op.type == "insert":
                self.document = (self.document[:transformed_op.position] +
                                 transformed_op.content +
                                 self.document)
            elif transformed_op.type == "delete":
                start = transformed_op.position
                end = min(start + transformed_op.length, len(self.document))
                self.document = self.document[:start] + self.document

            # 更新状态
            self.version += 1
            self.pending_ops.append(transformed_op)

            # 广播转换后的操作
            broadcast_tasks = []
            for conn in self.connections:
                try:
                  broadcast_tasks.append(conn.send_json({
                        "type": "operation",
                        "operation": transformed_op.dict(),
                        "document": self.document,
                        "version": self.version
                  }))
                except:
                  pass
            await asyncio.gather(*broadcast_tasks, return_exceptions=True)

            # 清除已处理的操作
            min_client_version = min(self.client_states.values(), default=self.version)
            self.pending_ops =


# 全局房间管理
room_manager: Dict = {}# 实际使用redis实现, 这里使用字典模拟
global_lock = asyncio.Lock()


@app.websocket("/ws/{room_id}/{client_id}")
async def websocket_endpoint(websocket: WebSocket, room_id: str, client_id: str):
    await websocket.accept()

    # 获取或创建房间
    async with global_lock:
      if room_id not in room_manager:
            room_manager = CollaborationRoom(room_id)
      room = room_manager

    # 加入房间
    await room.add_connection(websocket, client_id)
    logger.info(f"Client {client_id} joined room {room_id}")

    try:
      while True:
            data = await websocket.receive_json()
            op = Operation(**data)
            op.client_id = client_id

            # 应用操作转换
            await room.apply_operation(op)

    except Exception as e:
      logger.error(f"Error in room {room_id}: {str(e)}")
    finally:
      # 离开房间
      await room.remove_connection(websocket, client_id)
      logger.info(f"Client {client_id} left room {room_id}")

      # 清理空房间
      async with global_lock:
            if not room.connections:
                del room_manager
                logger.info(f"Room {room_id} closed")关键机制:

[*]使用WebSocket协议替代HTTP长轮询
[*]维护活动连接池active_connections
[*]通过Pydantic模型验证操作格式
[*]广播模式实现实时同步
优化技巧:

[*]使用requestAnimationFrame合并高频操作
[*]添加操作版本号解决时序问题
[*]实现本地缓存防止数据丢失
[*]添加心跳机制检测连接状态
三、前端Vue.js连接实现

组件核心代码:
<template>

   
      房间: {{ roomId }} | 用户: {{ clientId }}
      {{ statusText }}
   

   
      
   

   
      在线用户 ({{ users.length }})
      
      
      {{ user }}
      
   

</template>sequenceDiagram    participant ClientA    participant Server    participant ClientB    ClientA->>ClientA: 用户输入操作    ClientA->>ClientA: 乐观更新UI    ClientA->>Server: 发送操作(位置+内容)    Server->>Server: 应用OT转换    Server->>ClientA: 广播确认操作    Server->>ClientB: 广播转换后操作    ClientB->>ClientB: 应用转换后操作四、消息同步策略与冲突解决


[*]状态同步机制:
sequenceDiagram    participant ClientA    participant Server    participant ClientB      ClientA->>Server: 发送操作OP_A(位置P)    Server->>Server: 转换操作:OP_A' = OT(OP_A, 待处理操作)    Server->>Server: 更新文档状态    Server->>ClientA: 广播转换后的OP_A'    Server->>ClientB: 广播转换后的OP_A'    ClientB->>ClientB: 应用OP_A'更新本地文档    ClientB->>Server: 发送新操作OP_B
[*]冲突解决流程:

[*]客户端发送操作时携带当前位置和版本
[*]服务端对并发操作进行转换排序
[*]转换后操作广播给所有客户端
[*]客户端收到操作后无条件应用

[*]消息协议设计:
// 快照消息(初始同步)
{
"type": "snapshot",
"document": "当前文本",
"version": 15
}

// 操作消息
{
"type": "operation",
"operation": {
   "type": "insert",
   "position": 5,
   "content": "hello",
   "client_id": "user1",
   "version": 16
},
"document": "更新后文本",
"version": 16
}
同步策略对比:
策略优点缺点最后写入优先实现简单可能丢失用户操作操作转换(OT)精确解决冲突算法复杂度高CRDT无需中心协调内存消耗较大五、压力测试与部署方案

压力测试命令:
pip install websocket-client
python -m websockets ws://localhost:8000/ws/room1 -c 1000 -m "测试消息"部署架构:
客户端 → Nginx → FastAPI (Uvicorn) → Redis ClusterNginx配置要点:
http {
    map $http_upgrade $connection_upgrade {
      default upgrade;
      '' close;
    }

    server {
      location /ws/ {
            proxy_pass http://backend;
            proxy_http_version 1.1;
            proxy_set_header Upgrade $http_upgrade;
            proxy_set_header Connection $connection_upgrade;
      }
    }
}课后 Quiz

Q1:当两个用户同时在第5个字符位置插入不同内容时,如何保证最终一致性?
答案:采用操作转换算法,根据操作逻辑时间戳调整插入位置。例如用户A插入"X",用户B插入"Y",最终在第5位显示"YX"或"XY"
,取决于操作到达服务器的顺序。
Q2:WebSocket连接频繁断开可能是什么原因?
解决方案:

[*]检查防火墙设置是否允许WS协议
[*]配置合理的心跳间隔(建议30秒)
[*]增加Nginx的超时设置:proxy_read_timeout 3600s;
proxy_send_timeout 3600s;
常见报错处理

错误1:403 Forbidden

[*]原因:跨域请求被阻止
[*]解决:添加CORS中间件from fastapi.middleware.cors import CORSMiddleware

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)
错误2:1006 Connection Closed Abnormally

[*]原因:客户端未正确处理断开事件
[*]解决:添加重连机制function connect() {
const ws = new WebSocket(url)
ws.onclose = () => setTimeout(connect, 1000)
}
通过本文的实践,开发者可以掌握实时协作系统的核心实现技术。建议在开发过程中使用wireshark
工具监控WebSocket流量,并配合Chrome DevTools的Performance面板进行前端性能优化。
余下文章内容请点击跳转至 个人博客页面 或者 扫码关注或者微信搜一搜:编程智域 前端至全栈交流与成长
,阅读完整的文章:如何用WebSocket打造毫秒级实时协作系统?
往期文章归档:


[*]如何让你的WebSocket连接既安全又高效?
[*]如何让多客户端会话管理不再成为你的技术噩梦? - cmdragon's Blog
[*]如何在FastAPI中玩转WebSocket消息处理?
[*]如何在FastAPI中玩转WebSocket,让实时通信不再烦恼? - cmdragon's Blog
[*]WebSocket与HTTP协议究竟有何不同?FastAPI如何让长连接变得如此简单? - cmdragon's Blog
[*]FastAPI如何玩转安全防护,让黑客望而却步?
[*]如何用三层防护体系打造坚不可摧的 API 安全堡垒? - cmdragon's Blog
[*]FastAPI安全加固:密钥轮换、限流策略与安全头部如何实现三重防护? - cmdragon's Blog
[*]如何在FastAPI中巧妙玩转数据脱敏,让敏感信息安全无忧? - cmdragon's Blog
[*]RBAC权限模型如何让API访问控制既安全又灵活? - cmdragon's Blog
[*]FastAPI中的敏感数据如何在不泄露的情况下翩翩起舞?
[*]FastAPI安全认证的终极秘籍:OAuth2与JWT如何完美融合? - cmdragon's Blog
[*]如何在FastAPI中打造坚不可摧的Web安全防线? - cmdragon's Blog
[*]如何用 FastAPI 和 RBAC 打造坚不可摧的安全堡垒? - cmdragon's Blog
[*]FastAPI权限配置:你的系统真的安全吗? - cmdragon's Blog
[*]FastAPI权限缓存:你的性能瓶颈是否藏在这只“看不见的手”里? | cmdragon's Blog
[*]FastAPI日志审计:你的权限系统是否真的安全无虞? | cmdragon's Blog
[*]如何在FastAPI中打造坚不可摧的安全防线? | cmdragon's Blog
[*]如何在FastAPI中实现权限隔离并让用户乖乖听话? | cmdragon's Blog
[*]如何在FastAPI中玩转权限控制与测试,让代码安全又优雅? | cmdragon's Blog
[*]如何在FastAPI中打造一个既安全又灵活的权限管理系统? | cmdragon's Blog
[*]FastAPI访问令牌的权限声明与作用域管理:你的API安全真的无懈可击吗? | cmdragon's Blog
[*]如何在FastAPI中构建一个既安全又灵活的多层级权限系统? | cmdragon's Blog
[*]FastAPI如何用角色权限让Web应用安全又灵活? | cmdragon's Blog
[*]FastAPI权限验证依赖项究竟藏着什么秘密? | cmdragon's Blog
[*]如何用FastAPI和Tortoise-ORM打造一个既高效又灵活的角色管理系统? | cmdragon's Blog
[*]JWT令牌如何在FastAPI中实现安全又高效的生成与验证? | cmdragon's Blog
[*]你的密码存储方式是否在向黑客招手? | cmdragon's Blog
[*]如何在FastAPI中轻松实现OAuth2认证并保护你的API? | cmdragon's Blog
[*]FastAPI安全机制:从OAuth2到JWT的魔法通关秘籍 | cmdragon's Blog
[*]FastAPI认证系统:从零到令牌大师的奇幻之旅 | cmdragon's Blog
[*]FastAPI安全异常处理:从401到422的奇妙冒险 | cmdragon's Blog
[*]FastAPI权限迷宫:RBAC与多层级依赖的魔法通关秘籍 | cmdragon's Blog
[*]JWT令牌:从身份证到代码防伪的奇妙之旅 | cmdragon's Blog
[*]FastAPI安全认证:从密码到令牌的魔法之旅 | cmdragon's Blog
[*]密码哈希:Bcrypt的魔法与盐值的秘密 | cmdragon's Blog
免费好用的热门在线工具


[*]CMDragon 在线工具 - 高级AI工具箱与开发者套件 | 免费好用的在线工具
[*]应用商店 - 发现1000+提升效率与开发的AI工具和实用程序 | 免费好用的在线工具
[*]CMDragon 更新日志 - 最新更新、功能与改进 | 免费好用的在线工具
[*]支持我们 - 成为赞助者 | 免费好用的在线工具
[*]AI文本生成图像 - 应用商店 | 免费好用的在线工具
[*]临时邮箱 - 应用商店 | 免费好用的在线工具
[*]二维码解析器 - 应用商店 | 免费好用的在线工具
[*]文本转思维导图 - 应用商店 | 免费好用的在线工具
[*]正则表达式可视化工具 - 应用商店 | 免费好用的在线工具
[*]文件隐写工具 - 应用商店 | 免费好用的在线工具
[*]IPTV 频道探索器 - 应用商店 | 免费好用的在线工具
[*]快传 - 应用商店 | 免费好用的在线工具
[*]随机抽奖工具 - 应用商店 | 免费好用的在线工具
[*]动漫场景查找器 - 应用商店 | 免费好用的在线工具
[*]时间工具箱 - 应用商店 | 免费好用的在线工具
[*]网速测试 - 应用商店 | 免费好用的在线工具
[*]AI 智能抠图工具 - 应用商店 | 免费好用的在线工具
[*]背景替换工具 - 应用商店 | 免费好用的在线工具
[*]艺术二维码生成器 - 应用商店 | 免费好用的在线工具
[*]Open Graph 元标签生成器 - 应用商店 | 免费好用的在线工具
[*]图像对比工具 - 应用商店 | 免费好用的在线工具
[*]图片压缩专业版 - 应用商店 | 免费好用的在线工具
[*]密码生成器 - 应用商店 | 免费好用的在线工具
[*]SVG优化器 - 应用商店 | 免费好用的在线工具
[*]调色板生成器 - 应用商店 | 免费好用的在线工具
[*]在线节拍器 - 应用商店 | 免费好用的在线工具
[*]IP归属地查询 - 应用商店 | 免费好用的在线工具
[*]CSS网格布局生成器 - 应用商店 | 免费好用的在线工具
[*]邮箱验证工具 - 应用商店 | 免费好用的在线工具
[*]书法练习字帖 - 应用商店 | 免费好用的在线工具
[*]金融计算器套件 - 应用商店 | 免费好用的在线工具
[*]中国亲戚关系计算器 - 应用商店 | 免费好用的在线工具
[*]Protocol Buffer 工具箱 - 应用商店 | 免费好用的在线工具
[*]IP归属地查询 - 应用商店 | 免费好用的在线工具
[*]图片无损放大 - 应用商店 | 免费好用的在线工具
[*]文本比较工具 - 应用商店 | 免费好用的在线工具
[*]IP批量查询工具 - 应用商店 | 免费好用的在线工具
[*]域名查询工具 - 应用商店 | 免费好用的在线工具
[*]DNS工具箱 - 应用商店 | 免费好用的在线工具
[*]网站图标生成器 - 应用商店 | 免费好用的在线工具
[*]XML Sitemap

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
页: [1]
查看完整版本: 如何用WebSocket打造毫秒级实时协作系统?