找回密码
 立即注册
首页 业界区 业界 行为型模式-协作与交互机制

行为型模式-协作与交互机制

固拆棚 3 天前
行为型模式聚焦于对象间的行为交互,通过规范对象协作方式提升系统的灵活性与可扩展性。在分布式系统中,由于多节点异步通信、网络不可靠性及状态一致性挑战,行为型模式需针对分布式特性进行适应性设计。本文从观察者、策略、命令、责任链、状态五大核心行为型模式出发,系统解析其在分布式场景下的演化与实践。
一、观察者模式:分布式事件的发布 - 订阅机制

1.1 模式核心与分布式适配

观察者模式通过发布 - 订阅机制实现对象间的松耦合通信,在分布式系统中演变为跨节点的事件驱动架构(EDA),解决节点间异步通知问题。
1. 基于消息队列的分布式观察者
  1. // 事件定义(跨节点传输)
  2. public class OrderEvent implements Serializable {
  3.    private String eventId;
  4.    private Long orderId;
  5.    private OrderStatus status;
  6.    private LocalDateTime timestamp;
  7.    // getters/setters
  8. }
  9. // 抽象主题(事件发布者)
  10. public interface EventPublisher {
  11.    void publish(OrderEvent event);
  12. }
  13. // 具体发布者(订单服务)
  14. public class OrderService implements EventPublisher {
  15.    @Autowired
  16.    private KafkaTemplate<String, OrderEvent> kafkaTemplate;
  17.    @Override
  18.    public void publish(OrderEvent event) {
  19.        // 发布事件到Kafka主题
  20.        kafkaTemplate.send("order-events", event.getOrderId().toString(), event);
  21.    }
  22.    // 订单状态变更时发布事件
  23.    public void updateStatus(Long orderId, OrderStatus status) {
  24.        OrderEvent event = new OrderEvent(UUID.randomUUID().toString(), orderId, status, LocalDateTime.now());
  25.        publish(event);
  26.    }
  27. }
  28. // 抽象观察者(事件订阅者)
  29. public interface EventSubscriber {
  30.    void onEvent(OrderEvent event);
  31. }
  32. // 具体观察者(库存服务)
  33. public class InventorySubscriber implements EventSubscriber {
  34.    @KafkaListener(topics = "order-events")
  35.    @Override
  36.    public void onEvent(OrderEvent event) {
  37.        if (event.getStatus() == OrderStatus.PAID) {
  38.            // 订单支付后扣减库存
  39.            inventoryService.deduct(event.getOrderId());
  40.        }
  41.    }
  42. }
  43. // 具体观察者(积分服务)
  44. public class PointSubscriber implements EventSubscriber {
  45.    @KafkaListener(topics = "order-events")
  46.    @Override
  47.    public void onEvent(OrderEvent event) {
  48.        if (event.getStatus() == OrderStatus.PAID) {
  49.            // 订单支付后增加积分
  50.            pointService.add(event.getOrderId());
  51.        }
  52.    }
  53. }
复制代码
2. 分布式场景关键特性


  • 异步解耦:发布者无需知道订阅者存在(如订单服务不依赖库存 / 积分服务),降低节点耦合。
  • 可靠性保障:通过消息队列的持久化(如 Kafka 的日志存储)确保事件不丢失,应对节点宕机。
  • 广播能力:同一事件可被多个订阅者消费(如订单支付事件同时通知库存、积分、物流服务)。
1.2 与传统观察者的核心区别

维度单体观察者模式分布式观察者模式(事件驱动)通信方式内存直接调用消息队列异步通信(跨节点)可靠性依赖本地调用,失败直接抛出异常消息重试机制,支持死信队列处理失败事件订阅管理代码中直接注册观察者通过消息队列主题动态订阅(无需代码变更)二、策略模式:分布式场景的动态算法切换

2.1 模式核心与负载均衡策略

策略模式通过封装不同算法实现动态切换,在分布式系统中广泛应用于负载均衡、路由策略、序列化方式选择等场景。
1. 分布式负载均衡的策略设计
  1. // 策略接口(负载均衡算法)
  2. public interface LoadBalanceStrategy {
  3.    String selectInstance(List<String> instances);
  4. }
  5. // 具体策略1:轮询
  6. public class RoundRobinStrategy implements LoadBalanceStrategy {
  7.    private AtomicInteger index = new AtomicInteger(0);
  8.    
  9.    @Override
  10.    public String selectInstance(List<String> instances) {
  11.        if (instances.isEmpty()) return null;
  12.        int i = index.getAndIncrement() % instances.size();
  13.        return instances.get(i);
  14.    }
  15. }
  16. // 具体策略2:权重
  17. public class WeightedStrategy implements LoadBalanceStrategy {
  18.    @Override
  19.    public String selectInstance(List<String> instances) {
  20.        // 假设实例格式为"ip:port:weight",解析权重并选择
  21.        int totalWeight = instances.stream()
  22.            .mapToInt(inst -> Integer.parseInt(inst.split(":")[2]))
  23.            .sum();
  24.        int random = new Random().nextInt(totalWeight);
  25.        int current = 0;
  26.        for (String inst : instances) {
  27.            int weight = Integer.parseInt(inst.split(":")[2]);
  28.            current += weight;
  29.            if (current > random) {
  30.                return inst;
  31.            }
  32.        }
  33.        return instances.get(0);
  34.    }
  35. }
  36. // 策略上下文(负载均衡器)
  37. public class LoadBalancer {
  38.    private LoadBalanceStrategy strategy;
  39.    // 动态设置策略(如从配置中心获取)
  40.    public void setStrategy(LoadBalanceStrategy strategy) {
  41.        this.strategy = strategy;
  42.    }
  43.    public String chooseInstance(String serviceName) {
  44.        // 从注册中心获取服务实例列表
  45.        List<String> instances = serviceDiscovery.getInstances(serviceName);
  46.        return strategy.selectInstance(instances);
  47.    }
  48. }
  49. // 使用示例
  50. public class ServiceConsumer {
  51.    public void invokeService() {
  52.        LoadBalancer balancer = new LoadBalancer();
  53.        // 从配置动态选择策略(如"weighted")
  54.        String strategyType = Config.get("loadbalance.strategy");
  55.        balancer.setStrategy(StrategyFactory.create(strategyType));
  56.        String instance = balancer.chooseInstance("order-service");
  57.        // 调用选中的实例
  58.    }
  59. }
复制代码
2. 分布式场景价值


  • 动态适配:根据集群状态切换策略(如低负载时用轮询,高负载时用权重策略)。
  • 灰度发布:通过策略路由部分流量到新版本实例(如GrayStrategy只将 10% 流量路由到新实例)。
三、命令模式:分布式任务的异步执行与重试

3.1 模式核心与分布式命令

命令模式通过封装请求为对象,实现请求的异步执行、日志记录与重试,在分布式系统中演变为跨节点任务调度机制。
1. 分布式任务的命令设计
  1. // 命令接口
  2. public interface DistributedCommand extends Serializable {
  3.    String getCommandId(); // 唯一命令ID(用于幂等性)
  4.    CommandResult execute(); // 执行命令
  5.    CommandResult compensate(); // 补偿命令(失败时执行)
  6. }
  7. // 具体命令:订单支付命令
  8. public class PaymentCommand implements DistributedCommand {
  9.    private String commandId;
  10.    private Long orderId;
  11.    private BigDecimal amount;
  12.    @Override
  13.    public CommandResult execute() {
  14.        try {
  15.            // 调用支付服务
  16.            paymentService.pay(orderId, amount);
  17.            return CommandResult.success();
  18.        } catch (Exception e) {
  19.            return CommandResult.failure(e.getMessage());
  20.        }
  21.    }
  22.    @Override
  23.    public CommandResult compensate() {
  24.        // 支付失败时执行退款
  25.        return paymentService.refund(orderId) ? CommandResult.success() : CommandResult.failure("退款失败");
  26.    }
  27.    // getters/setters
  28. }
  29. // 命令调用者(任务调度器)
  30. public class CommandScheduler {
  31.    @Autowired
  32.    private KafkaTemplate<String, DistributedCommand> kafkaTemplate;
  33.    @Autowired
  34.    private CommandRepository repository; // 命令日志存储
  35.    // 发送命令到执行节点
  36.    public void schedule(DistributedCommand command) {
  37.        // 保存命令日志(用于重试和恢复)
  38.        repository.save(new CommandLog(command.getCommandId(), command, CommandStatus.PENDING));
  39.        // 发送到命令主题
  40.        kafkaTemplate.send("command-topic", command.getCommandId(), command);
  41.    }
  42.    // 处理命令结果
  43.    public void handleResult(String commandId, CommandResult result) {
  44.        CommandLog log = repository.findById(commandId);
  45.        if (result.isSuccess()) {
  46.            log.setStatus(CommandStatus.SUCCESS);
  47.        } else {
  48.            log.setStatus(CommandStatus.FAILED);
  49.            // 失败重试(最多3次)
  50.            if (log.getRetryCount() < 3) {
  51.                log.incrementRetryCount();
  52.                kafkaTemplate.send("command-topic", commandId, log.getCommand());
  53.            }
  54.        }
  55.        repository.update(log);
  56.    }
  57. }
  58. // 命令执行者(工作节点)
  59. public class CommandWorker {
  60.    @KafkaListener(topics = "command-topic")
  61.    public void executeCommand(ConsumerRecord<String, DistributedCommand> record) {
  62.        DistributedCommand command = record.value();
  63.        CommandResult result = command.execute();
  64.        // 发送执行结果
  65.        commandScheduler.handleResult(command.getCommandId(), result);
  66.    }
  67. }
复制代码
2. 分布式场景优势


  • 异步执行:命令通过消息队列异步发送到工作节点,调用者无需等待执行完成。
  • 故障恢复:命令日志记录执行状态,节点宕机后可从日志恢复未完成命令。
  • 幂等设计:通过commandId确保重复执行(如消息重试)不会产生副作用。
四、责任链模式:分布式请求的链式处理

4.1 模式核心与 API 网关过滤链

责任链模式通过多个处理器依次处理请求,在分布式系统中广泛应用于 API 网关的请求过滤、分布式事务的阶段处理等场景。
1. API 网关的责任链设计
  1. // 处理器接口
  2. public interface GatewayFilter {
  3.    void doFilter(GatewayRequest request, GatewayResponse response, GatewayFilterChain chain);
  4. }
  5. // 具体过滤器1:认证过滤
  6. public class AuthFilter implements GatewayFilter {
  7.    @Override
  8.    public void doFilter(GatewayRequest request, GatewayResponse response, GatewayFilterChain chain) {
  9.        String token = request.getHeader("Authorization");
  10.        if (token == null || !authService.validate(token)) {
  11.            response.setStatusCode(401);
  12.            response.setBody("Unauthorized");
  13.            return; // 终止链
  14.        }
  15.        chain.doFilter(request, response); // 继续下一个过滤器
  16.    }
  17. }
  18. // 具体过滤器2:限流过滤
  19. public class RateLimitFilter implements GatewayFilter {
  20.    private RedisTemplate<String, Integer> redisTemplate;
  21.    
  22.    @Override
  23.    public void doFilter(GatewayRequest request, GatewayResponse response, GatewayFilterChain chain) {
  24.        String clientIp = request.getClientIp();
  25.        String key = "rate_limit:" + clientIp;
  26.        // 使用Redis实现限流
  27.        Long count = redisTemplate.opsForValue().increment(key, 1);
  28.        if (count == 1) {
  29.            redisTemplate.expire(key, 1, TimeUnit.MINUTES);
  30.        }
  31.        if (count > 100) { // 每分钟最多100次请求
  32.            response.setStatusCode(429);
  33.            response.setBody("Too Many Requests");
  34.            return;
  35.        }
  36.        chain.doFilter(request, response);
  37.    }
  38. }
  39. // 过滤器链
  40. public class DefaultGatewayFilterChain implements GatewayFilterChain {
  41.    private List<GatewayFilter> filters;
  42.    private int index = 0;
  43.    public DefaultGatewayFilterChain(List<GatewayFilter> filters) {
  44.        this.filters = filters;
  45.    }
  46.    @Override
  47.    public void doFilter(GatewayRequest request, GatewayResponse response) {
  48.        if (index < filters.size()) {
  49.            GatewayFilter filter = filters.get(index++);
  50.            filter.doFilter(request, response, this);
  51.        }
  52.    }
  53. }
  54. // API网关
  55. public class ApiGateway {
  56.    public GatewayResponse handleRequest(GatewayRequest request) {
  57.        GatewayResponse response = new GatewayResponse();
  58.        // 构建过滤器链(认证→限流→路由)
  59.        List<GatewayFilter> filters = Arrays.asList(
  60.            new AuthFilter(),
  61.            new RateLimitFilter(),
  62.            new RouteFilter()
  63.        );
  64.        new DefaultGatewayFilterChain(filters).doFilter(request, response);
  65.        return response;
  66.    }
  67. }
复制代码
2. 分布式场景价值


  • 横切逻辑复用:认证、限流等逻辑集中在网关,无需每个服务重复实现。
  • 动态扩展:通过配置中心动态增删过滤器(如临时添加维护模式过滤器)。
五、状态模式:分布式系统的状态机管理

5.1 模式核心与订单状态流转

状态模式通过封装对象状态及其转换逻辑,在分布式系统中用于管理跨节点的状态一致性(如订单状态、工作流状态)。
1. 分布式订单状态机
  1. // 状态接口
  2. public interface OrderState {
  3.    void pay(OrderContext context); // 支付操作
  4.    void ship(OrderContext context); // 发货操作
  5.    void complete(OrderContext context); // 完成操作
  6.    OrderStatus getStatus(); // 获取当前状态
  7. }
  8. // 具体状态:待支付
  9. public class PendingState implements OrderState {
  10.    @Override
  11.    public void pay(OrderContext context) {
  12.        // 状态转换:待支付→已支付
  13.        context.setState(new PaidState());
  14.        // 发布状态变更事件(通知其他节点)
  15.        context.publishEvent(OrderStatus.PAID);
  16.    }
  17.    @Override
  18.    public void ship(OrderContext context) {
  19.        throw new IllegalStateException("未支付订单不能发货");
  20.    }
  21.    @Override
  22.    public void complete(OrderContext context) {
  23.        throw new IllegalStateException("未支付订单不能完成");
  24.    }
  25.    @Override
  26.    public OrderStatus getStatus() { return OrderStatus.PENDING; }
  27. }
  28. // 具体状态:已支付(其他状态略)
  29. public class PaidState implements OrderState { /* 实现发货等操作 */ }
  30. // 上下文:订单状态管理器
  31. public class OrderContext {
  32.    private OrderState currentState;
  33.    private Long orderId;
  34.    private EventPublisher eventPublisher;
  35.    public OrderContext(Long orderId) {
  36.        this.orderId = orderId;
  37.        this.currentState = new PendingState(); // 初始状态
  38.    }
  39.    // 委托状态操作
  40.    public void pay() { currentState.pay(this); }
  41.    public void ship() { currentState.ship(this); }
  42.    public void complete() { currentState.complete(this); }
  43.    // 发布状态事件(跨节点同步)
  44.    public void publishEvent(OrderStatus status) {
  45.        eventPublisher.publish(new OrderEvent(orderId, status));
  46.    }
  47.    // 设置状态(仅允许状态内部调用)
  48.    void setState(OrderState state) { this.currentState = state; }
  49. }
  50. // 使用示例
  51. public class OrderController {
  52.    @PostMapping("/orders/{id}/pay")
  53.    public void payOrder(@PathVariable Long id) {
  54.        OrderContext context = orderContextManager.get(id);
  55.        context.pay(); // 状态转换由状态机管理
  56.    }
  57. }
复制代码
2. 分布式场景挑战与解决


  • 状态一致性:通过事件发布同步状态变更(如订单服务状态变更后,通知库存服务)。
  • 并发安全:状态转换加分布式锁(如 Redis 锁),防止并发操作导致状态错乱。
六、面试高频问题深度解析

6.1 基础概念类问题

Q:分布式环境下的观察者模式与传统观察者模式有何本质区别?如何保证事件不丢失?
A:

  • 本质区别
    传统观察者是进程内同步调用(如内存中的发布 - 订阅),分布式观察者基于消息队列异步通信,跨节点、跨进程。
  • 不丢失保证

  • 消息持久化:Kafka 将消息写入磁盘,确保 broker 宕机后数据不丢失。
  • 确认机制:消费者处理完成后发送 ACK(如 Kafka 的 offset 提交),未确认则重试。
  • 死信队列:多次重试失败的事件进入死信队列,人工干预处理。
Q:策略模式在分布式负载均衡中的应用?如何动态切换负载均衡策略?
A:

  • 应用场景
    策略模式封装轮询、权重、IP 哈希等负载均衡算法,LoadBalancer通过setStrategy()动态选择算法。
  • 动态切换

  • 配置中心(如 Nacos)存储策略类型(如weighted)。
  • 客户端监听配置变更,调用setStrategy()更新策略。
  • 示例:流量高峰时切换到权重策略(优先调度到高配节点),低谷时用轮询策略。
6.2 实战设计类问题

Q:如何用责任链模式设计一个支持多租户的 API 网关权限系统?
A:

  • 过滤器链设计


  • TenantFilter:解析请求中的租户 ID,验证租户合法性。
  • AuthFilter:验证租户内用户的令牌有效性。
  • PermissionFilter:检查用户是否有权限访问当前接口(结合租户 + 用户角色)。
  • RateLimitFilter:按租户限制 API 调用频率。

  • 执行逻辑
    网关接收请求后,依次执行过滤器链,任何过滤器失败则返回对应错误(如 401/403)。
  • 租户隔离
    每个过滤器通过request.getTenantId()获取租户上下文,确保权限校验在租户维度隔离。
Q:分布式状态机如何保证跨节点的状态一致性?
A:

  • 状态事件同步:状态变更时发布全局事件(如OrderStatusChangedEvent),所有节点消费事件更新本地状态。
  • 分布式锁:状态转换操作加锁(如 Redis 锁),防止并发修改导致状态冲突。
  • 状态校验:节点启动时从事件日志重建状态(如从 Kafka 消费历史事件恢复最新状态)。
总结:行为型模式的分布式设计原则

核心选型策略

分布式场景推荐模式核心解决问题跨节点异步通知观察者模式(消息队列)节点解耦,事件驱动协作动态算法切换(负载均衡等)策略模式算法与使用分离,支持动态适配分布式任务调度与重试命令模式任务异步执行,支持补偿与幂等性网关过滤、请求处理链责任链模式横切逻辑复用,动态扩展过滤器跨节点状态流转(订单等)状态模式状态转换逻辑封装,保证状态一致性分布式适配要点


  • 异步优先:尽量采用异步通信(消息队列)减少节点阻塞,应对网络延迟。
  • 幂等设计:所有跨节点操作需保证幂等性(如命令 ID、事件 ID),防止重试导致副作用。
  • 容错机制:结合重试、超时控制、熔断降级,应对网络分区与节点故障。
通过掌握行为型模式在分布式系统中的适配逻辑,不仅能在面试中清晰解析跨节点协作问题,更能在实际架构中设计松耦合、高可用的分布式系统,体现高级程序员对复杂系统的设计能力。

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
您需要登录后才可以回帖 登录 | 立即注册