文章首发于:clawhub.club


背景

存在跨数据库同步数据的需求,对数据有实时同步的要求,采用阿里的开源框架Canal实时采集Mysql的binlog日志,将过滤后的数据统一标准格式发送到kafka中,在服务的消费端进行数据消费入库。

版本信息

Canal 1.1.3
Mysql支持版本 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

工作原理

  • Canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
  • Canal 解析 binary log 对象(原始为 byte 流)

实现代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* canal连接
*
* @return
*/
@Bean
public CanalConnector getCanalConnector() {
String canalHost = canalCfg.getHost();
int canalPort = Integer.valueOf(canalCfg.getPort());
String canalDestination = canalCfg.getDestination();
String canalUsername = canalCfg.getUsername();
String canalPassword = canalCfg.getPassword();
canalConnector = CanalConnectors.newClusterConnector(Lists.newArrayList(new InetSocketAddress(canalHost, canalPort)), canalDestination, canalUsername, canalPassword);
try {
canalConnector.connect();
canalConnector.subscribe();
log.info("connect canal server successed, canal client started!")
} catch (Throwable t) {
log.error("failed to connect to canal server", t);
canalConnector.disconnect();
}
return canalConnector;
}

配置文件内配置Canal Server的连接地址,端口号,用户名和密码,将连接注册为Bean,加入Springboot管理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* 消费获取canal实体,成功后提交该批次信息的batchId,删除该条信息,出现异常则通过batchId进行数据回滚
*/
@Override
public void run() {
try {
while (taskRunning) {
Message message = canalConnector.getWithoutAck(batchSize);
long batchId = message.getId();
List<Entry> entries = message.getEntries();
if (batchId != -1 && entries.size() > 0) {
for (Entry entry : entries) {
if (entry.getEntryType() == EntryType.ROWDATA) {
log.info(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName()));
}
}
} else {
Thread.sleep(500);
}
canalConnector.ack(batchId);
}
} catch (CanalClientException e) {
log.error("canal obtain data is fail: {}", e);
} catch (InterruptedException e) {
log.error("当前线程异常:{}", e);
}
}

主线程持续采集binlog日志。

数据格式

数据传输格式:protobuff

Entry.Header.logfileName [binlog文件名]
Entry.Header.logfileOffset [binlog position]
Entry.Header.executeTime [发生的变更]
Entry.Header.schemaName
Entry.Header.tableName
Entry.Header.eventType [insert/update/delete类型]
Entry.entryType [事务头BEGIN/事务尾END/数据ROWDATA]
Entry.storeValue [byte数据,可展开,对应的类型为RowChange]
RowChange.isDdl [是否是ddl变更操作,比如create table/drop table]
RowChange.sql [具体的ddl sql]
RowChange.rowDatas [具体insert/update/delete的变更数据,可为多条,1个binlog event事件可对应多条变更,比如批处理]
RowChange.rowDatas.beforeColumns [Column类型的数组]
RowChange.rowDatasafterColumns [Column类型的数组]
Column.index
Column.sqlType [jdbc type]
Column.name [column name]
Column.isKey [是否为主键]
Column.updated [是否发生过变更]
Column.isNull [值是否为null]
Column.value [具体的内容,注意为文本]

注意事项

1.canal的配置文件连接数据库时要连接主库地址。
2.查询show VARIABLES like “log_bin” Mysql的binlog日志是否开启,设置为开启状态。
3.在canal服务端启动后会生成meta.dat文件,在conf/example的目录下,将meta.dat删除,重新启动可重置canal服务的偏移量。
4.canal在配置时将auto.scan配置为false,在conf的canal.properties中修改配置,canal/conf/example目录下存在实例配置instance.properties,可根据实际情况进行配置。

参考资料

https://github.com/alibaba/canal

简单总结

由于也是第一次使用canal,踩过的坑也较多,所幸能够一一解决。再此发现最好的解决方式是在github的canal评论中,所遇到的问题大多数人也遇到过。