篁瞑普 发表于 2025-11-5 00:25:02

18、Flink CDC监听MySQL-Binlog实现数据监听

一、CDC简介:

CDC(Change Data Capture)是变更数据捕获的简称,其核心思想是监测并捕获数据库的变动(包括数据或数据表的插入、更新、删除等),将这些变更按发生的顺序完整记录下来,并写入到消息中间件或数据仓库中以供其他服务进行订阅及消费。CDC技术广泛应用于数据同步、数据分发、数据采集等场景,是数据集成领域的重要工具。
1、CDC常用工具:

CDC工具
Debezium
Canal
Maxwell
Flink CDC
核心定位
多数据源 CDC 框架
轻量 MySQL 同步工具
MySQL 专属极简工具
实时处理一体化框架
支持数据源
MySQL、PostgreSQL、Oracle 等
MySQL(最佳)、PostgreSQL 等
仅 MySQL
MySQL、PostgreSQL 等(基于 Debezium)
典型输出目标
Kafka、Flink/Spark
Kafka、RocketMQ、数据库
Kafka、Redis、文件
Kafka、ES、Hive 等
突出优势
支持广泛,全量 + 增量同步
部署简单,国内生态适配好
配置极简,资源占用低
支持实时处理,Exactly-Once 语义
适用场景
多源同步、复杂数据管道
MySQL 为主的轻量同步
简单 MySQL 变更同步
实时数仓、捕获 + 处理一体化
2、相关参考:

Flink-CDC 开源地址
Flink-CDC 中文文档
Canal学习笔记
 
二、Flink CDC工作原理:

Flink CDC(Change Data Capture)的核心工作原理是通过捕获数据库的变更日志(如 binlog、WAL 等),将其转换为结构化事件流,接入 Flink 实时计算引擎进行处理,并最终同步到目标系统。其工作流程可拆解为以下关键步骤:
1、Debezium 捕获解析数据库日志:

Flink CDC 本身不直接解析数据库日志,而是集成 Debezium(开源 CDC 框架)作为底层捕获引擎,支持 MySQL、PostgreSQL、Oracle 等多种数据库,具体逻辑如下:
(1)、模拟从节点获取日志:

[*]对于支持主从复制的数据库(如 MySQL),Debezium 会伪装成数据库的从节点,向主库发送复制请求,获取变更日志(如 MySQL 的 binlog、PostgreSQL 的 WAL 日志)。
(2)、解析日志为结构化事件:
(3)数据库日志通常是二进制格式,Debezium 会将其解析为包含详细信息的结构化事件,包括:

[*]操作类型(INSERT/UPDATE/DELETE);
[*]变更数据(UPDATE 时包含旧值和新值,INSERT/DELETE 包含对应数据);
[*]表名、数据库名、操作时间戳等元数据。
2. 全量 + 增量同步(无锁机制):

Flink CDC 支持 “全量数据初始化 + 增量变更同步” 的无缝衔接,且通过无锁机制避免影响源库性能:
(1)、全量快照阶段:

[*]首次同步时,会对数据库表进行一次全量快照(读取当前所有数据),确保初始数据完整。
(2)、增量同步阶段:

[*]快照完成后,自动切换到增量同步模式,通过监控 binlog 等日志获取实时变更,且通过记录日志位置(如 binlog 的文件名和偏移量)保证全量与增量数据的连续性(无重复、无遗漏)。
3、 封装为 Flink Source 流入引擎:

解析后的结构化事件会被封装为 Flink Source 连接器,直接作为 Flink 的输入流:
(1)、变更事件以流的形式进入 Flink 计算引擎,每条事件对应一条数据记录,可通过 Flink 的 DataStream API 或 Table/SQL 进行处理。
4、Flink实时数据处理:

Flink CDC 不仅是 “捕获工具”,更能结合 Flink 的实时计算能力对变更数据进行处理:
(1)、数据清洗与转换:

[*]过滤无效数据、格式转换(如 JSON 转 Avro)、字段映射等。
(2)、关联与聚合:

[*]支持与维度表(如 MySQL 维表、HBase 维表)关联,或进行窗口聚合(如统计分钟级变更量)。
(3)、状态管理:

[*]利用 Flink 的状态后端(如 RocksDB)保存中间结果,支持复杂逻辑(如去重、累计计算)。
5、基于 Checkpoint 保证一致性:

Flink CDC 依赖 Flink 的 Checkpoint 机制 确保数据处理的一致性:
(1)、Checkpoint 触发:

[*]定期将当前处理进度(包括 CDC 捕获的日志位置、算子状态等)持久化到存储(如 HDFS、本地文件)。
(2)、故障恢复:

[*]若 Flink 任务失败,可从最近一次 Checkpoint 恢复状态和日志位置,保证数据不丢失、不重复,实现 Exactly-Once 语义(端到端一致性需下游 Sink 配合支持)。
6. 通过 Sink 同步到目标系统:

处理后的变更数据通过 Flink 的 Sink 连接器 写入目标存储,支持 Kafka、Elasticsearch、Hive、MySQL、TiDB 等多种系统。
 
三、MySQL 配置(开启 Binlog):

1、开启 Binlog(ROW 模式):

# MySQL 配置文件
# Linux:my.cnf配置文件(/etc/mysql/)
# Window:my.ini配置文件(C:\ProgramData\MySQL\MySQL Server 5.7\)
# 开启 Binlog
log_bin = mysql-bin

# 选择 ROW 模式(记录行级变更)
binlog-format = ROW

# 配置数据库唯一 ID(与 Canal 服务端的 slaveId 不同)
server-id = 1

2、重启 MySQL 并验证:

# 打开命令提示符(cmd/services.msc):
# 按 Win + R 键,输入 cmd,然后按 Enter 键打开命令提示符窗口。
# 停止MySQL服务:
net stop MySQL57

# 启动MySQL服务:
net start MySQL57

# 验证
SHOW VARIABLES LIKE 'log_bin';
SHOW VARIABLES LIKE 'binlog_format';
 
四、SpringBoot整合Flink CDC实现MySQL数据监听:

1、POM配置:

      
      <dependency>
            <groupId>org.apache.flink</groupId>
            flink-java</artifactId>
            <version>1.18.1</version>
      </dependency>
      <dependency>
            <groupId>org.apache.flink</groupId>
            flink-streaming-java</artifactId>
            <version>1.18.1</version>
      </dependency>
      <dependency>
            <groupId>org.apache.flink</groupId>
            flink-clients</artifactId>
            <version>1.18.1</version>
      </dependency>
      
      <dependency>
            <groupId>org.apache.flink</groupId>
            flink-table-api-java-bridge</artifactId>
            <version>1.18.1</version>
      </dependency>
      <dependency>
            <groupId>org.apache.flink</groupId>
            flink-table-planner_2.12</artifactId>
            <version>1.18.1</version>
      </dependency>
      
      <dependency>
            <groupId>org.apache.flink</groupId>
            flink-connector-base</artifactId>
            <version>1.18.1</version>
      </dependency>
      
      <dependency>
            <groupId>com.ververica</groupId>
            flink-connector-mysql-cdc</artifactId>
            <version>3.0.1</version>
      </dependency>2、YML配置:

flink:
cdc:
    # 是否开启CDC监听
    auto-start: true
    # 自定义一个唯一的id
    server-id: "123456"
    # 数据库配置
    mysql:
      hostname: localhost
      port: 3306
      username: root
      password: 1233、Entity类声明:

DataChangeType.class
/**
* Flink CDC数据变更类型枚举
* 1、"c"表示创建
* 2、"u"表示更新
* 3、"d"表示删除
* 4、"r"表示读取
*/
public enum DataChangeType {

    INSERT("c"),
    UPDATE("u"),
    DELETE("d"),
    READ("r");

    private final String code;

    DataChangeType(String code) {
      this.code = code;
    }

    public static DataChangeType getByCode(String code) {
      for (DataChangeType type : values()) {
            if (type.code.equals(code)) {
                return type;
            }
      }
      return null;
    }

}FlinkCdcProperties.class
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;



@Data
@Component
@ConfigurationProperties(prefix = "flink.cdc")
public class FlinkCdcProperties {

    /**
   * 是否自动启动CDC监听
   */
    private boolean autoStart;

    private String serverId;

    /**
   * MySQL配置
   */
    private Mysql mysql = new Mysql();

    @Data
    public static class Mysql {
      private String hostname;
      private int port;
      private String username;
      private String password;
    }

}4、FlinkCdcRunner数据变更监听启动器:

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.iven.flinkcdcdemoservice.entity.DataChangeType;
import com.iven.flinkcdcdemoservice.entity.FlinkCdcProperties;
import com.iven.flinkcdcdemoservice.handler.FlinkCdcHandlerRegistry;
import com.iven.flinkcdcdemoservice.handler.FlinkCdcHandler;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

import java.io.Serializable;
import java.util.*;

@Slf4j
@Component
@RequiredArgsConstructor
public class FlinkCdcRunner implements CommandLineRunner {

    // 配置属性
    private final FlinkCdcProperties properties;

    // 处理器注册中心
    private final FlinkCdcHandlerRegistry handlerRegistry;

    @Override
    public void run(String... args) throws Exception {
      // 总开关关闭则不启动
      if (!properties.isAutoStart()) {
            log.info("Flink CDC 总开关关闭,不启动监听");
            return;
      }

      // 没有需要监听的表则不启动
      List<String> monitoredTables = handlerRegistry.getMonitoredTables();
      if (monitoredTables.isEmpty()) {
            log.warn("未发现需要监听的表(未实现FlinkCdcTableHandler),不启动监听");
            return;
      }

      // 1. 创建Flink执行环境
      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      // 设置并行度为 1: server-id 的数量必须 ≥ 并行度
      env.setParallelism(1);
      // 启用检查点(可选)
      // env.enableCheckpointing(5000);
      // 配置检查点存储路径(本地路径或分布式存储如HDFS)
      // env.getCheckpointConfig().setCheckpointStorage("file:///tmp/flink-cdc-checkpoints");
      // 检查点超时时间(60秒未完成则取消)
      // env.getCheckpointConfig().setCheckpointTimeout(60000);
      // 允许检查点失败次数(默认0,即一次失败则任务失败)
      // env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3);
      // 禁用检查点
      env.getCheckpointConfig().disableCheckpointing();
      // 重试次数/重试间隔
      env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,Time.seconds(10)));

      // 2. 配置MySQL CDC源(动态设置需要监听的表)
      MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .serverId(properties.getServerId())
                .hostname(properties.getMysql().getHostname())
                .port(properties.getMysql().getPort())
                .username(properties.getMysql().getUsername())
                .password(properties.getMysql().getPassword())
                // 从监听的表中提取数据库列表(去重)
                .databaseList(extractDatabases(monitoredTables))
                // 直接使用注册中心收集的表列表
                .tableList(monitoredTables.toArray(new String))
                // 反序列化为JSON
                .deserializer(new JsonDebeziumDeserializationSchema())
                /* initial: 初始化快照,即全量导入后增量导入(检测更新数据写入)
               * latest: 只进行增量导入(不读取历史变化)
               * timestamp: 指定时间戳进行数据导入(大于等于指定时间错读取数据)
               */
                .startupOptions(StartupOptions.latest())
                .build();

      // 3. 读取CDC数据流并处理
      DataStreamSource<String> dataStream = env.fromSource(
                mySqlSource,
                WatermarkStrategy.noWatermarks(),
                "MySQL-CDC-Source"
      );

      // 4. 解析数据并路由到对应处理器,使用静态内部类代替匿名内部类
      dataStream.process(new CdcDataProcessFunction(handlerRegistry));

      // 5. 启动Flink作业
      env.execute("Flink-CDC-动态监听作业");
    }

    /**
   * 静态内部类实现ProcessFunction,确保可序列化
   */
    private static class CdcDataProcessFunction extends ProcessFunction<String, Void> implements Serializable {
      private final FlinkCdcHandlerRegistry handlerRegistry;

      // 通过构造函数传入依赖
      public CdcDataProcessFunction(FlinkCdcHandlerRegistry handlerRegistry) {
            this.handlerRegistry = handlerRegistry;
      }

      @Override
      public void processElement(String json, Context ctx, Collector<Void> out) {
            try {
                JSONObject cdcData = JSON.parseObject(json);
                // 操作类型:c/u/d
                String op = cdcData.getString("op");
                JSONObject source = cdcData.getJSONObject("source");
                String dbName = source.getString("db");
                String tableName = source.getString("table");
                // 库名.表名
                String fullTableName = dbName + "." + tableName;

                // 找到对应表的处理器
                FlinkCdcHandler handler = handlerRegistry.getHandler(fullTableName);
                if (handler == null) {
                  log.warn("表[{}]无处理器,跳过处理", fullTableName);
                  return;
                }

                // 按事件类型分发
                DataChangeType changeType = DataChangeType.getByCode(op);
                if (changeType == null) {
                  log.warn("未知操作类型:{}", op);
                  return;
                }

                switch (changeType) {
                  case INSERT:
                        List<Map<String, Object>> insertData = Collections.singletonList(
                              cdcData.getJSONObject("after").getInnerMap()
                        );
                        handler.handleInsert(insertData);
                        break;
                  case UPDATE:
                        List<Map<String, Object>> beforeData = Collections.singletonList(
                              cdcData.getJSONObject("before").getInnerMap()
                        );
                        List<Map<String, Object>> afterData = Collections.singletonList(
                              cdcData.getJSONObject("after").getInnerMap()
                        );
                        handler.handleUpdate(beforeData, afterData);
                        break;
                  case DELETE:
                        List<Map<String, Object>> deleteData = Collections.singletonList(
                              cdcData.getJSONObject("before").getInnerMap()
                        );
                        handler.handleDelete(deleteData);
                        break;
                  case READ:
                        // 可以忽略快照阶段的读取操作,或根据需要处理
                        log.debug("处理快照读取操作: {}", fullTableName);
                        break;
                }
            } catch (Exception e) {
                log.error("Flink-CDC数据处理发生未预期异常", e);
            }
      }
    }

    /**
   * 从表名(库名.表名)中提取数据库列表(去重)
   *
   * @param tables
   * @return
   */
    private String[] extractDatabases(List<String> tables) {
      // 截取库名(如demo.tb_user → demo)
      return tables.stream()
                .map(table -> table.split("\\."))
                .distinct()
                .toArray(String[]::new);
    }

}5、FlinkCdcHandlerRegistry策略路由:

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* Flink CDC处理器注册中心
* 处理器注册中心(自动扫描监听表)
*/
@Slf4j
@Component
public class FlinkCdcHandlerRegistry implements ApplicationContextAware, Serializable {

    // 缓存:表名(库名.表名)→ 处理器
    private final Map<String, FlinkCdcHandler> handlerMap = new ConcurrentHashMap<>();

    // 收集所有需要监听的表(供Flink CDC配置使用)
    private List<String> monitoredTables;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) {
      // 扫描所有实现类
      Map<String, FlinkCdcHandler> beans = applicationContext.getBeansOfType(FlinkCdcHandler.class);
      beans.values().forEach(handler -> {
            String tableName = handler.getTableName();
            handlerMap.put(tableName, handler);
            log.info("注册监听表:{} → 处理器:{}", tableName, handler.getClass().getSimpleName());
      });
      // 提取所有需要监听的表
      monitoredTables = new ArrayList<>(handlerMap.keySet());
    }

    /**
   * 获取指定表的处理器
   *
   * @param tableName
   * @return
   */
    public FlinkCdcHandler getHandler(String tableName) {
      return handlerMap.get(tableName);
    }

    /**
   * 获取所有需要监听的表(供Flink CDC配置)
   *
   * @return
   */
    public List<String> getMonitoredTables() {
      return monitoredTables;
    }

}6、FlinkCdcHandler策略模式数据处理:

 FlinkCdcHandler
import java.util.List;
import java.util.Map;

/**
* 表数据处理接口,每个监听的表需实现此接口
*/
public interface FlinkCdcHandler {

    /**
   * 获取监听的表名(格式:库名.表名,如demo.tb_user)
   */
    String getTableName();

    /**
   * 处理新增数据
   */
    default void handleInsert(List<Map<String, Object>> dataList) {
      // 默认空实现,子类可重写
    }

    /**
   * 处理更新数据(包含变更前和变更后的数据)
   * @param beforeList 变更前数据
   * @param afterList 变更后数据
   */
    default void handleUpdate(List<Map<String, Object>> beforeList, List<Map<String, Object>> afterList) {
      // 默认空实现,子类可重写
    }

    /**
   * 处理删除数据
   */
    default void handleDelete(List<Map<String, Object>> dataList) {
      // 默认空实现,子类可重写
    }

}TbUserFlinkCdcHandler
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.io.Serializable;
import java.util.List;
import java.util.Map;

@Slf4j
@Component
public class TbUserFlinkCdcHandler implements FlinkCdcHandler, Serializable {
    @Override
    public String getTableName() {
      return "demo.tb_user";
    }

    @Override
    public void handleInsert(List<Map<String, Object>> dataList) {
      log.info("处理tb_user新增数据,共{}条", dataList.size());
      dataList.forEach(data -> {
            String id = (String)data.get("id");
            String username = (String) data.get("name");
            // 业务逻辑:如同步到ES、缓存等
            log.info("新增用户:id={}, name={}", id, username);
      });
    }

    @Override
    public void handleUpdate(List<Map<String, Object>> beforeList, List<Map<String, Object>> afterList) {
      log.info("处理tb_user更新数据,共{}条", afterList.size());
      for (int i = 0; i < afterList.size(); i++) {
            Map<String, Object> before = beforeList.get(i);
            Map<String, Object> after = afterList.get(i);
            log.info("更新用户:id={}, 旧用户名={}, 新用户名={}",
                  after.get("id"), before.get("name"), after.get("name"));
      }
    }

    @Override
    public void handleDelete(List<Map<String, Object>> dataList) {
      log.info("处理tb_user删除数据,共{}条", dataList.size());
      dataList.forEach(data -> {
            log.info("删除用户:id={}", data.get("id"));
      });
    }

}
项目启动时,FlinkCdcHandlerRegistry 扫描并注册所有 FlinkCdcHandler 实现类,建立表与处理器的映射;FlinkCdcRunner 在 Spring 容器初始化后触发,检查启动条件,初始化 Flink 环境并构建 CDC 数据源,将数据流接入 CdcDataProcessFunction,该函数解析变更事件并路由到对应处理器执行业务逻辑,最后启动 Flink 作业持续监听处理。
 

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
页: [1]
查看完整版本: 18、Flink CDC监听MySQL-Binlog实现数据监听