17、Canal监听MySQL-Binlog实现数据监听
一、Canal简介:Canal 是阿里巴巴开源的一款基于数据库增量日志解析的中间件,主要用于实现数据库变更数据的实时同步。
Canal源码
二、工作原理:
1、MySQL主备复制原理:
(1)、MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
(2)、MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
(3)、MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据
2、canal工作原理:
(1)、canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
(2)、MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
(3)、canal 解析 binary log 对象(原始为 byte 流)
三、MySQL 配置(开启 Binlog):
1、开启 Binlog(ROW 模式):
# MySQL 配置文件
# Linux:my.cnf配置文件(/etc/mysql/)
# Window:my.ini配置文件(C:\ProgramData\MySQL\MySQL Server 5.7\)
# 开启 Binlog
log_bin = mysql-bin
# 选择 ROW 模式(记录行级变更)
binlog-format = ROW
# 配置数据库唯一 ID(与 Canal 服务端的 slaveId 不同)
server-id = 1
2、重启 MySQL 并验证:
# 打开命令提示符(cmd/services.msc):
# 按 Win + R 键,输入 cmd,然后按 Enter 键打开命令提示符窗口。
# 停止MySQL服务:
net stop MySQL57
# 启动MySQL服务:
net start MySQL57
# 验证
SHOW VARIABLES LIKE 'log_bin';
SHOW VARIABLES LIKE 'binlog_format';
3、创建 Canal 专用账号(权限最小化):
-- 1. 创建支持远程连接的用户(% 表示任意 IP)
-- CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
-- 授予权限
-- GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- 2. 创建支持本地连接的用户(localhost)
CREATE USER 'canal'@'localhost' IDENTIFIED BY 'canal';
-- 授予相同权限
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'localhost';
-- 刷新权限,使配置生效
FLUSH PRIVILEGES;
四、Canal 服务端配置:
1、下载并解压 Canal 服务端:
github-canal包
2、配置 Canal 实例:
(1)、instance.properties配置:
# MySQL 主库地址(Canal 连接的 MySQL 地址)
canal.instance.master.address=127.0.0.1:3306
# MySQL 账号密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
(2)、windows启动 Canal 服务端:
1)、双击启动bin/startup.bat:
2)、存在黑屏闪退,修改bin/startup.bat,重启:
3)、日志:
五、SpringBoot整合Canal实现MySQL数据监听:
1、POM配置:
<dependency>
<groupId>com.alibaba.otter</groupId>
canal.client</artifactId>
<version>1.1.8</version>
</dependency>
<dependency>
<groupId>com.alibaba.otter</groupId>
canal.protocol</artifactId>
<version>1.1.8</version>
</dependency>2、YML配置:
canal:
# 自动启动同步标志位
auto-sync: true
instances:
# 第一个实例
instance1:
host: 127.0.0.1
port: 11111
# canal server 中配置的实例名(canal.destinations = example)
name: example
# 批量拉取条数
batch-size: 100
# 无数据时休眠时间(ms)
sleep-time: 10003、Entity类声明:
CanalProperties.class
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
/**
* Canal配置属性类(映射YAML配置)
*/
@Data
@Component
@ConfigurationProperties(prefix = "canal")
public class CanalProperties {
// 是否自动启动同步
private boolean autoSync = true;
// 多实例配置
private Map<String, InstanceConfig> instances = new HashMap<>();
@Data
public static class InstanceConfig {
private String host;
private Integer port;
private String name;
private Integer batchSize = 100;
private Integer sleepTime = 1000;
}
}DataEventTypeEnum.enum
import org.springframework.util.StringUtils;
import java.util.Arrays;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
public enum DataEventTypeEnum {
INSERT("INSERT"),
UPDATE("UPDATE"),
DELETE("DELETE");
private final String name;
DataEventTypeEnum(String name) {
this.name = name;
}
public String NAME() {
return name;
}
private static final Map<String, DataEventTypeEnum> NAME_MAP =
Arrays.stream(DataEventTypeEnum.values())
.collect(Collectors.toMap(DataEventTypeEnum::NAME, Function.identity()));
public static DataEventTypeEnum getEnum(String name) {
if (!StringUtils.hasText(name)) {
return null;
}
return NAME_MAP.get(name);
}
}JsonMessageType.class
import lombok.Data;
@Data
public class JsonMessageType {
/**
* 库名
*/
private String schemaName;
/**
* 表名
*/
private String tableName;
/**
* 事件类型
* (INSERT/UPDATE/DELETE)
*/
private String eventType;
/**
* 数据JSON字符串
*/
private String data;
}4、CanalRunnerAutoConfig启动Canal配置:
import com.iven.canal.entity.CanalProperties;
import com.iven.canal.handle.CanalWorkRegistry;
import com.iven.canal.utils.JsonMessageParser;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationRunner;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* Canal自动配置
*/
@Slf4j
@Configuration
@RequiredArgsConstructor
public class CanalRunnerAutoConfig {
private final CanalProperties canalProperties;
private final JsonMessageParser jsonMessageParser;
private final CanalWorkRegistry workRegistry;
@Bean
public ApplicationRunner canalApplicationRunner() {
return args -> {
if (!canalProperties.isAutoSync()) {
log.info("Canal自动同步已关闭");
return;
}
// 如果没有任何Work,则不启动Canal
if (!workRegistry.hasWork()) {
log.info("无表同步处理器,不启动Canal");
return;
}
// 启动所有配置的Canal实例
canalProperties.getInstances().forEach((instanceKey, config) -> {
CanalRunner runner = new CanalRunner(
config.getHost(),
config.getPort(),
config.getName(),
config.getBatchSize(),
config.getSleepTime(),
jsonMessageParser,
workRegistry
);
runner.start();
});
};
}
}5、CanalRunner拉取数据:
import com.alibaba.fastjson2.JSON;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.iven.canal.entity.JsonMessageType;
import com.iven.canal.handle.CanalWork;
import com.iven.canal.handle.CanalWorkRegistry;
import com.iven.canal.utils.JsonMessageParser;
import lombok.extern.slf4j.Slf4j;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* Canal运行器
* 手动管理生命周期
*
* 1、启动Canal实例
* 2、处理解析后的数据
*/
@Slf4j
public class CanalRunner {
private Thread thread;
private final String canalIp;
private final Integer canalPort;
private final String canalInstance;
private final Integer batchSize;
private final Integer sleepTime;
private final JsonMessageParser jsonMessageParser;
private final CanalWorkRegistry workRegistry;
public CanalRunner(String canalIp, Integer canalPort, String canalInstance, Integer batchSize,
Integer sleepTime, JsonMessageParser jsonMessageParser, CanalWorkRegistry workRegistry) {
this.canalIp = canalIp;
this.canalPort = canalPort;
this.canalInstance = canalInstance;
this.batchSize = batchSize;
this.sleepTime = sleepTime;
this.jsonMessageParser = jsonMessageParser;
this.workRegistry = workRegistry;
}
/**
* 启动Canal实例
*/
public void start() {
if (thread == null || !thread.isAlive()) {
thread = new Thread(this::run, "canal-runner-" + canalInstance);
thread.start();
log.info("Canal实例[{}]启动成功", canalInstance);
}
}
/**
* 停止Canal实例
*/
public void stop() {
if (thread != null && !thread.isInterrupted()) {
thread.interrupt();
}
}
private void run() {
log.info("Canal实例[{}]启动中...", canalInstance);
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress(canalIp, canalPort), canalInstance, "", "");
try {
connector.connect();
// 订阅所有表(后续通过Work过滤)
connector.subscribe();
connector.rollback();
while (!thread.isInterrupted()) {
Message message = connector.getWithoutAck(batchSize);
long batchId = message.getId();
List<CanalEntry.Entry> entries = message.getEntries();
if (batchId == -1 || entries.isEmpty()) {
Thread.sleep(sleepTime);
} else {
// 解析数据并处理
Map<String, List<JsonMessageType>> parsedData = jsonMessageParser.parse(entries);
processParsedData(parsedData);
// 确认处理成功
connector.ack(batchId);
}
}
} catch (InterruptedException e) {
log.info("Canal实例[{}]被中断", canalInstance);
} catch (Exception e) {
log.error("Canal实例[{}]运行异常", canalInstance, e);
// 处理失败回滚
connector.rollback();
} finally {
connector.disconnect();
log.info("Canal实例[{}]已停止", canalInstance);
}
}
/**
* 调用Work处理解析后的数据
*
* @param parsedData
*/
private void processParsedData(Map<String, List<JsonMessageType>> parsedData) {
parsedData.forEach((tableKey, dataList) -> {
// 获取该表的所有Work
List<CanalWork> works = workRegistry.getWorksByTable(tableKey);
if (!works.isEmpty() && !dataList.isEmpty()) {
// 转换数据格式(Json字符串 -> Map)
List<Map<String, Object>> dataMaps = dataList.stream()
.map(item -> JSON.<Map<String, Object>>parseObject(item.getData(), Map.class))
.collect(Collectors.toList());
String schemaName = dataList.get(0).getSchemaName();
// 调用每个Work的处理方法
works.forEach(work -> work.handle(dataMaps, dataList.get(0).getEventType(), schemaName));
}
});
}
}6、JsonMessageParser解析数据:
MessageParser
import com.alibaba.otter.canal.protocol.CanalEntry;
import java.util.List;
/**
* 消息解析器接口
*
*/
public interface MessageParser<T> {
T parse(List<CanalEntry.Entry> canalEntryList);
} JsonMessageParserimport com.alibaba.fastjson2.JSON;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.iven.canal.entity.DataEventTypeEnum;
import com.iven.canal.entity.JsonMessageType;
import com.iven.canal.handle.CanalWorkRegistry;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.util.*;
/**
* Json消息解析器
*
* 1、遍历原始数据列表接收
* 2、解析行级变更数据
* 3、封装为 JsonParseType
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class JsonMessageParser implements MessageParser<Map<String, List<JsonMessageType>>> {
private final CanalWorkRegistry workRegistry;
@Override
public Map<String, List<JsonMessageType>> parse(List<CanalEntry.Entry> canalEntryList) {
Map<String, List<JsonMessageType>> dataMap = new HashMap<>();
for (CanalEntry.Entry entry : canalEntryList) {
if (!CanalEntry.EntryType.ROWDATA.equals(entry.getEntryType())) {
continue;
}
// 1. 获取库名、表名、带库名的表标识
String schemaName = entry.getHeader().getSchemaName();
String tableName = entry.getHeader().getTableName();
String fullTableName = schemaName + "." + tableName;
// 2. 检查是否有对应的处理器(支持两种格式)
boolean hasFullTableWork = !CollectionUtils.isEmpty(workRegistry.getWorksByTable(fullTableName));
boolean hasSimpleTableWork = !CollectionUtils.isEmpty(workRegistry.getWorksByTable(tableName));
if (!hasFullTableWork && !hasSimpleTableWork) {
log.debug("表[{}]和[{}]均无同步处理器,跳过", fullTableName, tableName);
continue;
}
try {
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
rowChange.getRowDatasList().forEach(rowData -> {
JsonMessageType jsonMessageType = parseRowData(entry.getHeader(), rowChange.getEventType(), rowData);
if (jsonMessageType != null) {
// 3. 按存在的处理器类型,分别添加到数据映射中
if (hasFullTableWork) {
dataMap.computeIfAbsent(fullTableName, k -> new ArrayList<>()).add(jsonMessageType);
}
if (hasSimpleTableWork) {
dataMap.computeIfAbsent(tableName, k -> new ArrayList<>()).add(jsonMessageType);
}
}
});
} catch (Exception e) {
log.error("解析数据失败", e);
}
}
return dataMap;
}
private JsonMessageType parseRowData(CanalEntry.Header header, CanalEntry.EventType eventType,
CanalEntry.RowData rowData) {
// 获取库名
String schemaName = header.getSchemaName();
// 获取表名
String tableName = header.getTableName();
if (eventType == CanalEntry.EventType.DELETE) {
return dataWrapper(schemaName, tableName, DataEventTypeEnum.DELETE.NAME(), rowData.getBeforeColumnsList());
} else if (eventType == CanalEntry.EventType.INSERT) {
return dataWrapper(schemaName, tableName, DataEventTypeEnum.INSERT.NAME(), rowData.getAfterColumnsList());
} else if (eventType == CanalEntry.EventType.UPDATE) {
return dataWrapper(schemaName, tableName, DataEventTypeEnum.UPDATE.NAME(), rowData.getAfterColumnsList());
}
return null;
}
private JsonMessageType dataWrapper(String schemaName, String tableName, String eventType,
List<CanalEntry.Column> columns) {
Map<String, String> data = new HashMap<>();
columns.forEach(column -> data.put(column.getName(), column.getValue()));
JsonMessageType result = new JsonMessageType();
result.setSchemaName(schemaName);
result.setTableName(tableName);
result.setEventType(eventType);
result.setData(JSON.toJSONString(data));
return result;
}
}7、CanalWorkRegistry匹配处理器:
CanalWork
import java.util.List;
import java.util.Map;
/**
* Canal-Work处理器
*
*/
public interface CanalWork {
/**
* 返回需要处理的表名(如:tb_user)
*/
String getTableName();
/**
* 处理表数据的方法
* @param dataList 表数据列表(每条数据是字段名-值的Map)
* @param eventType 事件类型(INSERT/UPDATE/DELETE)
* @param schemaName 库名(用于区分不同库的表)
*/
void handle(List<Map<String, Object>> dataList, String eventType, String schemaName);
}CanalWorkRegistry
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* 处理器注册器,
* 扫描并缓存所有CanalWork实现类,按表名分组管理,提供查询表对应处理器的方法
*/
@Slf4j
@Component
public class CanalWorkRegistry implements ApplicationContextAware {
/**
* 表名 -> Work列表(支持一个表多个Work)
*/
private final Map<String, List<CanalWork>> tableWorkMap = new HashMap<>();
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
// 扫描所有CanalWork实现类
Map<String, CanalWork> workMap = applicationContext.getBeansOfType(CanalWork.class);
// 按表名分组
tableWorkMap.putAll(workMap.values().stream()
.collect(Collectors.groupingBy(CanalWork::getTableName)));
log.info("已注册的表同步处理器: {}", tableWorkMap.keySet());
}
/**
* 获取指定表的Work列表
*
* @param tableName
* @return
*/
public List<CanalWork> getWorksByTable(String tableName) {
return tableWorkMap.getOrDefault(tableName, Collections.emptyList());
}
/**
* 判断是否有表需要处理
*
* @return
*/
public boolean hasWork() {
return !tableWorkMap.isEmpty();
}
}8、CanalWork实现类处理数据:
import com.iven.canal.entity.DataEventTypeEnum;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Map;
/**
* tb_user表数据处理
*
* Canal服务 → 变更数据 → CanalRunner 拉取 → JsonMessageParser 解析 →
* 筛选出 tb_user 数据 → CanalWorkRegistry 获取 TbUserCanalWorkHandle →
* 调用 handle 方法 → 按事件类型(INSERT/UPDATE/DELETE)执行对应逻辑
*/
@Slf4j
@Component
public class TbUserCanalWorkHandle implements CanalWork {
@Override
public String getTableName() {
return "demo.tb_user";
}
@Override
public void handle(List<Map<String, Object>> dataList, String eventType, String schemaName) {
log.info("开始处理[{}库]的tb_user表数据,事件类型:{},数据量:{}", schemaName, eventType, dataList.size());
DataEventTypeEnum dataEventTypeEnum = DataEventTypeEnum.getEnum(eventType);
// 根据事件类型分别处理
switch (dataEventTypeEnum) {
case INSERT:
handleInsert(dataList, schemaName);
break;
case UPDATE:
handleUpdate(dataList, schemaName);
break;
case DELETE:
handleDelete(dataList, schemaName);
break;
default:
log.warn("未处理的事件类型:{}", eventType);
}
}
/**
* 处理新增数据
*/
private void handleInsert(List<Map<String, Object>> dataList, String schemaName) {
log.info("处理[{}库]的tb_user新增数据,共{}条", schemaName, dataList.size());
dataList.forEach(data -> {
Object userId = data.get("id");
Object username = data.get("name");
// 新增逻辑:如同步到ES、缓存初始化等
log.info("新增用户 - ID: {}, 用户名: {}", userId, username);
});
}
/**
* 处理更新数据
*/
private void handleUpdate(List<Map<String, Object>> dataList, String schemaName) {
log.info("处理[{}库]的tb_user更新数据,共{}条", schemaName, dataList.size());
dataList.forEach(data -> {
Object userId = data.get("id");
Object newPhone = data.get("phone"); // 假设更新了手机号
// 更新逻辑:如更新ES文档、刷新缓存等
log.info("更新用户 - ID: {}, 新手机号: {}", userId, newPhone);
});
}
/**
* 处理删除数据
*/
private void handleDelete(List<Map<String, Object>> dataList, String schemaName) {
log.info("处理[{}库]的tb_user删除数据,共{}条", schemaName, dataList.size());
dataList.forEach(data -> {
Object userId = data.get("id");
// 删除逻辑:如从ES删除、清除缓存等
log.info("删除用户 - ID: {}", userId);
});
}
}
调度流程:
整个流程通过注册器管理处理器、解析器转换数据格式、运行器控制 Canal 客户端生命周期,最终将数据库变更事件分发到对应表的处理器,实现了变更数据的监听与业务处理解耦。用户只需实现CanalWork接口,即可自定义任意表的变更处理逻辑。
(1)、初始化阶段
1)、Spring 容器启动时,CanalWorkRegistry 扫描所有 CanalWork 实现类(如 TbUserCanalWorkHandle),按表名分组缓存到 tableWorkMap 中。
2)、CanalRunnerAutoConfig 检查配置(CanalProperties),若开启自动同步且存在 CanalWork,则为每个 Canal 实例创建 CanalRunner 并启动。
(2)、运行阶段
1)、CanalRunner 建立与 Canal 服务的连接,订阅数据库变更事件。
2)、循环拉取变更数据(Message),通过 JsonMessageParser 解析为表名 - 数据列表的映射(Map)。
3)、调用 processParsedData 方法,根据表名从 CanalWorkRegistry 获取对应的 CanalWork 列表,执行 handle 方法处理数据。
(3)、销毁阶段
程序停止时,CanalRunner 中断线程,断开与 Canal 服务的连接。
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! 热心回复! 谢谢楼主提供! 用心讨论,共获提升! 东西不错很实用谢谢分享 这个有用。
页:
[1]