文章转载自腾讯云数据库 作者史鹏宙,CSIG云与智慧产业事业群研发工程师 ClickHouse作为OLAP分析引擎已经被广泛使用,数据的导入导出是用户面临的第一个问题。由于ClickHouse本身无法很好地支持单条大批量的写入,因此在实时同步数据方面需要借助其他服务协助。本文给出一种结合CanalKafka的方案,并且给出在多个MySQL实例分库分表的场景下,如何将多张MySQL数据表写入同一张ClickHouse表的方法,欢迎大家批评指正。 首先来看看我们的需求背景: 1。实时同步多个MySQL实例数据到ClickHouse,每天规模500G,记录数目亿级别,可以接受分钟级别的同步延迟; 2。某些数据库表存在分库分表的操作,用户需要跨MySQL实例跨数据库的表同步到ClickHouse的一张表中; 3。现有的MySQLbinlog开源组件(Canal),无法做到多张源数据表到一张目的表的映射关系。 基本原理 一、使用JDBC方式同步 1。使用Canal组件完成binlog的解析和数据同步; 2。CanalServer进程会伪装成MySQL的slave,使用MySQL的binlog同步协议完成数据同步; 3。CanalAdapter进程负责从canalserver获取解析后的binlog,并且通过jdbc接口写入到ClickH 优点: 1。Canal组件原生支持; 缺点: 1。CanalAdpater写入时源表和目的表一一对应,灵活性不足; 2。需要维护两个Canal组件进程; 二、KafkaClickHouse物化视图方式同步 1。CanalServer完成binlog的解析,并且将解析后的json写入K 2。CanalServer可以根据正则表达式过滤数据库和表名,并且根据规则写入Kafka的 3。ClickHouse使用KafkaEngine和MaterializedView完成消息消费,并写入本地表; 优点: 1。Kafka支持水平扩展,可以根据数据规模调整partition数目; 2。Kafka引入后将写入请求合并,防止ClickHouse生成大量的小文件,从而影响查询性能; 3。CanalServer支持规则过滤,可以灵活配置上游的MySQL实例的数据库名和表名,并且指明写入的Kafkatopic名称; 缺点: 1。需要维护Kafka和配置规则; 2。ClickHouse需要新建相关的视图、KafkaEngine的外表等; 具体步骤 一、准备工作 1。如果使用TencentDB,则在控制台确认binlogformat为ROW,无需多余操作。 如果是自建MySQL,则在客户端中查询变量: 2。创建账号canal,用于同步binlog 二、Canal组件部署 前置条件: Canal组件部署的机器需要跟ClickHouse服务和MySQL网络互通; 需要在机器上部署java8,配置JAVAHOME、PATH等环境变量; 基本概念: 1。CanalServer组件部署 CanalServer的主要作用是订阅binlog信息并解析和定义instance相关信息,建议每个CanalServer进程对应一个MySQL实例; 1)下载canal。deployer1。1。4。tar。gz,解压 2)修改配置文件confcanal。properties,需要关注的配置如下: 3)配置instance 可以参照example新增新的instance,主要修改配置文件conf{instancename}instance。properties文件。 订阅172。21。48。35的MySQL的testdb数据库中的以tb开头的表的数据变更(例如tb20200801、tb20200802等),主要的步骤如下: 步骤1:创建example2实例:cddeployerconfcprexampleexample2 步骤2:修改deployerconfexample2instance。properties文件 步骤3:在confcanal。properties中修改canal。destinations,新增example2 订阅172。21。48。35的MySQL的empdb0数据库的employees20200801表,empdb1数据库的employees20200802表,并且数据写入K 步骤1:创建example2实例:cddeployerconfcprexampleexample3 步骤2:修改deployerconfexample3instance。properties文件 步骤3:在Kafka中新建topicemployeestopic,指定分区数目为3 步骤4:在confcanal。properties中修改canal。destinations,新增example3;修改服务模式为kafka,配置kafka相关信息; 2。CanalAdapter组件部署(只针对方案一) CanalAdapter的主要作用是通过JDBC接口写入ClickHouse数据,可以配置多个表的写入; 1)下载canal。adapter1。1。4。tar。gz,解压; 2)在lib目录下新增clickhouse驱动jar包及httpclient的jar包httpcore4。4。13。jar、httpclient4。3。3。jar、clickhousejdbc0。2。4。 3)修改配置文件confapplication。yml文件,修改canalServerHost、srcDataSources、canalAdapters的配置; 4)修改配置文件confrdbmytestuser。yml文件 上述的配置文件中,由于开启了mirrorDb:true,目的端的ClickHouse必须有相同的数据库名和表名。 修改adapter的confrdbmytestuser。yml配置文件,指定源数据库和目标数据库 在confrdb目录配置多个yml文件,分别指明不同的table名称。 Kafka服务配置 一、调整合理的producer参数 确认CanalServer里的canal。properties文件,重要参数见下表; 二、新建相关的topic名称 根据CanalServer里instance里配置文件instance。properties,注意分区数目与canal。mq。partitionsNum保持一致; partition数目需要考虑以下因素: 1。上游的MySQL的数据量。原则上数据写入量越大,应该分配更多的partition数目; 2。考虑下游ClickHouse的实例数目。topic的partition分区总数最好不大于下游ClickHouse的总实例数目,保证每个ClickHouse实例都能至少分配到一个 ClickHouse服务配置 根据上游MySQL实例的表的schema新建数据表; 引入Kafka时需要额外新建EngineKafka的外表以及相关的物化视图表; 建议: 1。为每个外表新增不同的kafkagroupname,防止相互影响; 2。设置kafkaskipbrokenmessages参数为合理值,遇到无法解析数据会跳过; 3。设置合理的kafkanumconsumers值,最好保证所有ClickHouse实例该值的总和大于topic的partition数目; 新建相关的分布式查询表; 服务启动 启动相关的Canal组件进程; 1。canalserver:shbinstartup。sh 2。canaladapter:shbinstartup。sh 在MySQL中插入数据,观察日志是否可以正常运行; 如果使用Kafka,可以通过kafkaconsoleconsumer。sh脚本观察binlog数据解析; 观察ClickHouse数据表中是否正常写入数据; 实际案例 需求:实时同步MySQL实例的empdb0。employees20200801表和empdb1。employees20200802数据表 方案:使用方案二 环境及参数: 1。在MySQL新建相关表 2。CanalServer配置 步骤1。修改confcanal。properties文件 步骤2。新增employees实例,修改employeesinstances。properties配置 3。Kafka配置 4。新增topicemployeestopic,分区数为3 5。ClickHouse建表 6。启动CanalServer服务 MySQL实例上游插入数据,观察数据是否在CanalServer解析正常,是否在ClickHouse中完成同步。