Canal数据采集框架的使用与错误信息总结
文章首发于:clawhub.club
背景
存在跨数据库同步数据的需求,对数据有实时同步的要求,采用阿里的开源框架Canal实时采集Mysql的binlog日志,将过滤后的数据统一标准格式发送到kafka中,在服务的消费端进行数据消费入库。
版本信息
Canal 1.1.3
Mysql支持版本 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x
工作原理
- Canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
- MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
- Canal 解析 binary log 对象(原始为 byte 流)
实现代码
1 | /** |
配置文件内配置Canal Server的连接地址,端口号,用户名和密码,将连接注册为Bean,加入Springboot管理。
1 | /** |
主线程持续采集binlog日志。
数据格式
数据传输格式:protobuff
Entry.Header.logfileName [binlog文件名]
Entry.Header.logfileOffset [binlog position]
Entry.Header.executeTime [发生的变更]
Entry.Header.schemaName
Entry.Header.tableName
Entry.Header.eventType [insert/update/delete类型]
Entry.entryType [事务头BEGIN/事务尾END/数据ROWDATA]
Entry.storeValue [byte数据,可展开,对应的类型为RowChange]
RowChange.isDdl [是否是ddl变更操作,比如create table/drop table]
RowChange.sql [具体的ddl sql]
RowChange.rowDatas [具体insert/update/delete的变更数据,可为多条,1个binlog event事件可对应多条变更,比如批处理]
RowChange.rowDatas.beforeColumns [Column类型的数组]
RowChange.rowDatasafterColumns [Column类型的数组]
Column.index
Column.sqlType [jdbc type]
Column.name [column name]
Column.isKey [是否为主键]
Column.updated [是否发生过变更]
Column.isNull [值是否为null]
Column.value [具体的内容,注意为文本]
注意事项
1.canal的配置文件连接数据库时要连接主库地址。
2.查询show VARIABLES like “log_bin” Mysql的binlog日志是否开启,设置为开启状态。
3.在canal服务端启动后会生成meta.dat文件,在conf/example的目录下,将meta.dat删除,重新启动可重置canal服务的偏移量。
4.canal在配置时将auto.scan配置为false,在conf的canal.properties中修改配置,canal/conf/example目录下存在实例配置instance.properties,可根据实际情况进行配置。
参考资料
https://github.com/alibaba/canal
简单总结
由于也是第一次使用canal,踩过的坑也较多,所幸能够一一解决。再此发现最好的解决方式是在github的canal评论中,所遇到的问题大多数人也遇到过。