18、Flink CDC监听MySQL-Binlog实现数据监听
一、CDC简介:CDC(Change Data Capture)是变更数据捕获的简称,其核心思想是监测并捕获数据库的变动(包括数据或数据表的插入、更新、删除等),将这些变更按发生的顺序完整记录下来,并写入到消息中间件或数据仓库中以供其他服务进行订阅及消费。CDC技术广泛应用于数据同步、数据分发、数据采集等场景,是数据集成领域的重要工具。
1、CDC常用工具:
CDC工具
Debezium
Canal
Maxwell
Flink CDC
核心定位
多数据源 CDC 框架
轻量 MySQL 同步工具
MySQL 专属极简工具
实时处理一体化框架
支持数据源
MySQL、PostgreSQL、Oracle 等
MySQL(最佳)、PostgreSQL 等
仅 MySQL
MySQL、PostgreSQL 等(基于 Debezium)
典型输出目标
Kafka、Flink/Spark
Kafka、RocketMQ、数据库
Kafka、Redis、文件
Kafka、ES、Hive 等
突出优势
支持广泛,全量 + 增量同步
部署简单,国内生态适配好
配置极简,资源占用低
支持实时处理,Exactly-Once 语义
适用场景
多源同步、复杂数据管道
MySQL 为主的轻量同步
简单 MySQL 变更同步
实时数仓、捕获 + 处理一体化
2、相关参考:
Flink-CDC 开源地址
Flink-CDC 中文文档
Canal学习笔记
二、Flink CDC工作原理:
Flink CDC(Change Data Capture)的核心工作原理是通过捕获数据库的变更日志(如 binlog、WAL 等),将其转换为结构化事件流,接入 Flink 实时计算引擎进行处理,并最终同步到目标系统。其工作流程可拆解为以下关键步骤:
1、Debezium 捕获解析数据库日志:
Flink CDC 本身不直接解析数据库日志,而是集成 Debezium(开源 CDC 框架)作为底层捕获引擎,支持 MySQL、PostgreSQL、Oracle 等多种数据库,具体逻辑如下:
(1)、模拟从节点获取日志:
[*]对于支持主从复制的数据库(如 MySQL),Debezium 会伪装成数据库的从节点,向主库发送复制请求,获取变更日志(如 MySQL 的 binlog、PostgreSQL 的 WAL 日志)。
(2)、解析日志为结构化事件:
(3)数据库日志通常是二进制格式,Debezium 会将其解析为包含详细信息的结构化事件,包括:
[*]操作类型(INSERT/UPDATE/DELETE);
[*]变更数据(UPDATE 时包含旧值和新值,INSERT/DELETE 包含对应数据);
[*]表名、数据库名、操作时间戳等元数据。
2. 全量 + 增量同步(无锁机制):
Flink CDC 支持 “全量数据初始化 + 增量变更同步” 的无缝衔接,且通过无锁机制避免影响源库性能:
(1)、全量快照阶段:
[*]首次同步时,会对数据库表进行一次全量快照(读取当前所有数据),确保初始数据完整。
(2)、增量同步阶段:
[*]快照完成后,自动切换到增量同步模式,通过监控 binlog 等日志获取实时变更,且通过记录日志位置(如 binlog 的文件名和偏移量)保证全量与增量数据的连续性(无重复、无遗漏)。
3、 封装为 Flink Source 流入引擎:
解析后的结构化事件会被封装为 Flink Source 连接器,直接作为 Flink 的输入流:
(1)、变更事件以流的形式进入 Flink 计算引擎,每条事件对应一条数据记录,可通过 Flink 的 DataStream API 或 Table/SQL 进行处理。
4、Flink实时数据处理:
Flink CDC 不仅是 “捕获工具”,更能结合 Flink 的实时计算能力对变更数据进行处理:
(1)、数据清洗与转换:
[*]过滤无效数据、格式转换(如 JSON 转 Avro)、字段映射等。
(2)、关联与聚合:
[*]支持与维度表(如 MySQL 维表、HBase 维表)关联,或进行窗口聚合(如统计分钟级变更量)。
(3)、状态管理:
[*]利用 Flink 的状态后端(如 RocksDB)保存中间结果,支持复杂逻辑(如去重、累计计算)。
5、基于 Checkpoint 保证一致性:
Flink CDC 依赖 Flink 的 Checkpoint 机制 确保数据处理的一致性:
(1)、Checkpoint 触发:
[*]定期将当前处理进度(包括 CDC 捕获的日志位置、算子状态等)持久化到存储(如 HDFS、本地文件)。
(2)、故障恢复:
[*]若 Flink 任务失败,可从最近一次 Checkpoint 恢复状态和日志位置,保证数据不丢失、不重复,实现 Exactly-Once 语义(端到端一致性需下游 Sink 配合支持)。
6. 通过 Sink 同步到目标系统:
处理后的变更数据通过 Flink 的 Sink 连接器 写入目标存储,支持 Kafka、Elasticsearch、Hive、MySQL、TiDB 等多种系统。
三、MySQL 配置(开启 Binlog):
1、开启 Binlog(ROW 模式):
# MySQL 配置文件
# Linux:my.cnf配置文件(/etc/mysql/)
# Window:my.ini配置文件(C:\ProgramData\MySQL\MySQL Server 5.7\)
# 开启 Binlog
log_bin = mysql-bin
# 选择 ROW 模式(记录行级变更)
binlog-format = ROW
# 配置数据库唯一 ID(与 Canal 服务端的 slaveId 不同)
server-id = 1
2、重启 MySQL 并验证:
# 打开命令提示符(cmd/services.msc):
# 按 Win + R 键,输入 cmd,然后按 Enter 键打开命令提示符窗口。
# 停止MySQL服务:
net stop MySQL57
# 启动MySQL服务:
net start MySQL57
# 验证
SHOW VARIABLES LIKE 'log_bin';
SHOW VARIABLES LIKE 'binlog_format';
四、SpringBoot整合Flink CDC实现MySQL数据监听:
1、POM配置:
<dependency>
<groupId>org.apache.flink</groupId>
flink-java</artifactId>
<version>1.18.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
flink-streaming-java</artifactId>
<version>1.18.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
flink-clients</artifactId>
<version>1.18.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
flink-table-api-java-bridge</artifactId>
<version>1.18.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
flink-table-planner_2.12</artifactId>
<version>1.18.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
flink-connector-base</artifactId>
<version>1.18.1</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
flink-connector-mysql-cdc</artifactId>
<version>3.0.1</version>
</dependency>2、YML配置:
flink:
cdc:
# 是否开启CDC监听
auto-start: true
# 自定义一个唯一的id
server-id: "123456"
# 数据库配置
mysql:
hostname: localhost
port: 3306
username: root
password: 1233、Entity类声明:
DataChangeType.class
/**
* Flink CDC数据变更类型枚举
* 1、"c"表示创建
* 2、"u"表示更新
* 3、"d"表示删除
* 4、"r"表示读取
*/
public enum DataChangeType {
INSERT("c"),
UPDATE("u"),
DELETE("d"),
READ("r");
private final String code;
DataChangeType(String code) {
this.code = code;
}
public static DataChangeType getByCode(String code) {
for (DataChangeType type : values()) {
if (type.code.equals(code)) {
return type;
}
}
return null;
}
}FlinkCdcProperties.class
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
@Data
@Component
@ConfigurationProperties(prefix = "flink.cdc")
public class FlinkCdcProperties {
/**
* 是否自动启动CDC监听
*/
private boolean autoStart;
private String serverId;
/**
* MySQL配置
*/
private Mysql mysql = new Mysql();
@Data
public static class Mysql {
private String hostname;
private int port;
private String username;
private String password;
}
}4、FlinkCdcRunner数据变更监听启动器:
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.iven.flinkcdcdemoservice.entity.DataChangeType;
import com.iven.flinkcdcdemoservice.entity.FlinkCdcProperties;
import com.iven.flinkcdcdemoservice.handler.FlinkCdcHandlerRegistry;
import com.iven.flinkcdcdemoservice.handler.FlinkCdcHandler;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import java.io.Serializable;
import java.util.*;
@Slf4j
@Component
@RequiredArgsConstructor
public class FlinkCdcRunner implements CommandLineRunner {
// 配置属性
private final FlinkCdcProperties properties;
// 处理器注册中心
private final FlinkCdcHandlerRegistry handlerRegistry;
@Override
public void run(String... args) throws Exception {
// 总开关关闭则不启动
if (!properties.isAutoStart()) {
log.info("Flink CDC 总开关关闭,不启动监听");
return;
}
// 没有需要监听的表则不启动
List<String> monitoredTables = handlerRegistry.getMonitoredTables();
if (monitoredTables.isEmpty()) {
log.warn("未发现需要监听的表(未实现FlinkCdcTableHandler),不启动监听");
return;
}
// 1. 创建Flink执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置并行度为 1: server-id 的数量必须 ≥ 并行度
env.setParallelism(1);
// 启用检查点(可选)
// env.enableCheckpointing(5000);
// 配置检查点存储路径(本地路径或分布式存储如HDFS)
// env.getCheckpointConfig().setCheckpointStorage("file:///tmp/flink-cdc-checkpoints");
// 检查点超时时间(60秒未完成则取消)
// env.getCheckpointConfig().setCheckpointTimeout(60000);
// 允许检查点失败次数(默认0,即一次失败则任务失败)
// env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3);
// 禁用检查点
env.getCheckpointConfig().disableCheckpointing();
// 重试次数/重试间隔
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,Time.seconds(10)));
// 2. 配置MySQL CDC源(动态设置需要监听的表)
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.serverId(properties.getServerId())
.hostname(properties.getMysql().getHostname())
.port(properties.getMysql().getPort())
.username(properties.getMysql().getUsername())
.password(properties.getMysql().getPassword())
// 从监听的表中提取数据库列表(去重)
.databaseList(extractDatabases(monitoredTables))
// 直接使用注册中心收集的表列表
.tableList(monitoredTables.toArray(new String))
// 反序列化为JSON
.deserializer(new JsonDebeziumDeserializationSchema())
/* initial: 初始化快照,即全量导入后增量导入(检测更新数据写入)
* latest: 只进行增量导入(不读取历史变化)
* timestamp: 指定时间戳进行数据导入(大于等于指定时间错读取数据)
*/
.startupOptions(StartupOptions.latest())
.build();
// 3. 读取CDC数据流并处理
DataStreamSource<String> dataStream = env.fromSource(
mySqlSource,
WatermarkStrategy.noWatermarks(),
"MySQL-CDC-Source"
);
// 4. 解析数据并路由到对应处理器,使用静态内部类代替匿名内部类
dataStream.process(new CdcDataProcessFunction(handlerRegistry));
// 5. 启动Flink作业
env.execute("Flink-CDC-动态监听作业");
}
/**
* 静态内部类实现ProcessFunction,确保可序列化
*/
private static class CdcDataProcessFunction extends ProcessFunction<String, Void> implements Serializable {
private final FlinkCdcHandlerRegistry handlerRegistry;
// 通过构造函数传入依赖
public CdcDataProcessFunction(FlinkCdcHandlerRegistry handlerRegistry) {
this.handlerRegistry = handlerRegistry;
}
@Override
public void processElement(String json, Context ctx, Collector<Void> out) {
try {
JSONObject cdcData = JSON.parseObject(json);
// 操作类型:c/u/d
String op = cdcData.getString("op");
JSONObject source = cdcData.getJSONObject("source");
String dbName = source.getString("db");
String tableName = source.getString("table");
// 库名.表名
String fullTableName = dbName + "." + tableName;
// 找到对应表的处理器
FlinkCdcHandler handler = handlerRegistry.getHandler(fullTableName);
if (handler == null) {
log.warn("表[{}]无处理器,跳过处理", fullTableName);
return;
}
// 按事件类型分发
DataChangeType changeType = DataChangeType.getByCode(op);
if (changeType == null) {
log.warn("未知操作类型:{}", op);
return;
}
switch (changeType) {
case INSERT:
List<Map<String, Object>> insertData = Collections.singletonList(
cdcData.getJSONObject("after").getInnerMap()
);
handler.handleInsert(insertData);
break;
case UPDATE:
List<Map<String, Object>> beforeData = Collections.singletonList(
cdcData.getJSONObject("before").getInnerMap()
);
List<Map<String, Object>> afterData = Collections.singletonList(
cdcData.getJSONObject("after").getInnerMap()
);
handler.handleUpdate(beforeData, afterData);
break;
case DELETE:
List<Map<String, Object>> deleteData = Collections.singletonList(
cdcData.getJSONObject("before").getInnerMap()
);
handler.handleDelete(deleteData);
break;
case READ:
// 可以忽略快照阶段的读取操作,或根据需要处理
log.debug("处理快照读取操作: {}", fullTableName);
break;
}
} catch (Exception e) {
log.error("Flink-CDC数据处理发生未预期异常", e);
}
}
}
/**
* 从表名(库名.表名)中提取数据库列表(去重)
*
* @param tables
* @return
*/
private String[] extractDatabases(List<String> tables) {
// 截取库名(如demo.tb_user → demo)
return tables.stream()
.map(table -> table.split("\\."))
.distinct()
.toArray(String[]::new);
}
}5、FlinkCdcHandlerRegistry策略路由:
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* Flink CDC处理器注册中心
* 处理器注册中心(自动扫描监听表)
*/
@Slf4j
@Component
public class FlinkCdcHandlerRegistry implements ApplicationContextAware, Serializable {
// 缓存:表名(库名.表名)→ 处理器
private final Map<String, FlinkCdcHandler> handlerMap = new ConcurrentHashMap<>();
// 收集所有需要监听的表(供Flink CDC配置使用)
private List<String> monitoredTables;
@Override
public void setApplicationContext(ApplicationContext applicationContext) {
// 扫描所有实现类
Map<String, FlinkCdcHandler> beans = applicationContext.getBeansOfType(FlinkCdcHandler.class);
beans.values().forEach(handler -> {
String tableName = handler.getTableName();
handlerMap.put(tableName, handler);
log.info("注册监听表:{} → 处理器:{}", tableName, handler.getClass().getSimpleName());
});
// 提取所有需要监听的表
monitoredTables = new ArrayList<>(handlerMap.keySet());
}
/**
* 获取指定表的处理器
*
* @param tableName
* @return
*/
public FlinkCdcHandler getHandler(String tableName) {
return handlerMap.get(tableName);
}
/**
* 获取所有需要监听的表(供Flink CDC配置)
*
* @return
*/
public List<String> getMonitoredTables() {
return monitoredTables;
}
}6、FlinkCdcHandler策略模式数据处理:
FlinkCdcHandler
import java.util.List;
import java.util.Map;
/**
* 表数据处理接口,每个监听的表需实现此接口
*/
public interface FlinkCdcHandler {
/**
* 获取监听的表名(格式:库名.表名,如demo.tb_user)
*/
String getTableName();
/**
* 处理新增数据
*/
default void handleInsert(List<Map<String, Object>> dataList) {
// 默认空实现,子类可重写
}
/**
* 处理更新数据(包含变更前和变更后的数据)
* @param beforeList 变更前数据
* @param afterList 变更后数据
*/
default void handleUpdate(List<Map<String, Object>> beforeList, List<Map<String, Object>> afterList) {
// 默认空实现,子类可重写
}
/**
* 处理删除数据
*/
default void handleDelete(List<Map<String, Object>> dataList) {
// 默认空实现,子类可重写
}
}TbUserFlinkCdcHandler
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
@Slf4j
@Component
public class TbUserFlinkCdcHandler implements FlinkCdcHandler, Serializable {
@Override
public String getTableName() {
return "demo.tb_user";
}
@Override
public void handleInsert(List<Map<String, Object>> dataList) {
log.info("处理tb_user新增数据,共{}条", dataList.size());
dataList.forEach(data -> {
String id = (String)data.get("id");
String username = (String) data.get("name");
// 业务逻辑:如同步到ES、缓存等
log.info("新增用户:id={}, name={}", id, username);
});
}
@Override
public void handleUpdate(List<Map<String, Object>> beforeList, List<Map<String, Object>> afterList) {
log.info("处理tb_user更新数据,共{}条", afterList.size());
for (int i = 0; i < afterList.size(); i++) {
Map<String, Object> before = beforeList.get(i);
Map<String, Object> after = afterList.get(i);
log.info("更新用户:id={}, 旧用户名={}, 新用户名={}",
after.get("id"), before.get("name"), after.get("name"));
}
}
@Override
public void handleDelete(List<Map<String, Object>> dataList) {
log.info("处理tb_user删除数据,共{}条", dataList.size());
dataList.forEach(data -> {
log.info("删除用户:id={}", data.get("id"));
});
}
}
项目启动时,FlinkCdcHandlerRegistry 扫描并注册所有 FlinkCdcHandler 实现类,建立表与处理器的映射;FlinkCdcRunner 在 Spring 容器初始化后触发,检查启动条件,初始化 Flink 环境并构建 CDC 数据源,将数据流接入 CdcDataProcessFunction,该函数解析变更事件并路由到对应处理器执行业务逻辑,最后启动 Flink 作业持续监听处理。
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
页:
[1]