admin 管理员组文章数量: 887021
【实战
什么是flink cdc?
cdc github源码地址
cdc官方文档
对很多初入门的人来说是无法理解cdc到底是什么个东西。 有这样一个需求,比如在mysql数据库中存在很多数据,但是公司要把mysql中的数据同步到数据仓库(starrocks), 数据仓库你可以理解为存储了各种各样来自不同数据库中表。
数据的同步目前对mysql来说比较常见是方式是使用:datax 和 canal配合, 为什么需要这两个框架配合呢?
因为datax不支持实时的同步, datax只能定义一个范围去同步,而且同步结束后程序就结束了。但是我想要的是数据仓库中的数据近乎实时的和mysql中的数据保持一致又该怎么办? 答案是再加上canal, canal和datax相反,它只支持指定一个binlog同步,然后会一直同步到现在,并且程序不会结束,会一直同步。 这样datax+canal就可以达到实时同步的功能。
这是业界比较常用的同步方式,datax同步历史数据,canal+kafka同步最新的数据,而且还要有一个程序去读取kafka中的binlog json数据(可以用flink或者spark又或者是flume)。可以看到这个链路比较长,不是很好。
下面是目前常见的cdc同步方案以及对比:
- DataX 不支持增量同步,Canal 不支持全量同步。虽然两者都是非常流行的数据同步工具,但
在场景支持上仍不完善。 - 在全量+增量一体化同步方面,只有 Flink CDC、Debezium、Oracle Goldengate 支持较好。
- 在架构方面,Apache Flink 是一个非常优秀的分布式流处理框架,因此 Flink CDC 作为
Apache Flink 的一个组件具有非常灵活的水平扩展能力。而 DataX 和 Canal 是个单机架构,
在大数据场景下容易面临性能瓶颈的问题。 - 在数据加工的能力上,CDC 工具是否能够方便地对数据做一些清洗、过滤、聚合,甚至关联打
宽? Flink CDC 依托强大的 Flink SQL 流式计算能力,可以非常方便地对数据进行加工。而
Debezium 等则需要通过复杂的 Java 代码才能完成,使用门槛比较高。 - 另外,在生态方面,这里指的是上下游存储的支持。Flink CDC 上下游非常丰富,支持对接
MySQL、PostgreSQL 等数据源,还支持写入到 TiDB、HBase、Kafka、Hudi 等各种存储系统
中,也支持灵活的自定义 connector。 - 我们看到flink cdc 是比较友好的方案, 其内部实现上用的是Debezium去采集binlong, 而且可通过参数scan.startup.mode 来控制同步行为:
- initial (默认):在第一次启动时对受监视的数据库表执行全量同步,并继续读取最新的 binlog。
- earliest-offset:跳过快照阶段,从可读取的最早 binlog 位点开始读取
- latest-offset:首次启动时,从不对受监视的数据库表执行快照, 连接器仅从 binlog 的结尾处开始读取,这意味着连接器只能读取在连接器启动之后的数据更改。
- specific-offset:跳过快照阶段,从指定的 binlog 位点开始读取。位点可通过 binlog 文件名和位置指定,或者在 GTID 在集群上启用时通过 GTID 集合指定。
- timestamp:跳过快照阶段,从指定的时间戳开始读取 binlog 事件。
一个demo
对flink_01 和flink_02 进行两个分表进行同步合并到:flink_merge
CREATE TABLE `flink_01` (`indicator_name` varchar(255) DEFAULT NULL COMMENT '指标名称',`indicator_value` varchar(255) DEFAULT NULL COMMENT '指标值',`indicator_code` int NOT NULL COMMENT '指标编码',`table_name` varchar(255) NOT NULL COMMENT '指标计算上游表名',`window_start` datetime NOT NULL COMMENT '窗口开始时间',`window_end` datetime DEFAULT NULL COMMENT '窗口截止时间',`create_time` datetime DEFAULT NULL COMMENT '创建更新时间',`indicator_description` varchar(255) DEFAULT NULL COMMENT '指标描述',PRIMARY KEY (`indicator_code`,`table_name`,`window_start`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
INSERT INTO `test`.`flink_01`(`indicator_name`, `indicator_value`, `indicator_code`, `table_name`, `window_start`, `window_end`, `create_time`, `indicator_description`) VALUES ('all_login_num', '52', 0, 'app_login_log', '2022-12-14 00:00:00', '2022-12-15 00:00:00', '2022-12-19 18:09:24', '登录用户数');
INSERT INTO `test`.`flink_01`(`indicator_name`, `indicator_value`, `indicator_code`, `table_name`, `window_start`, `window_end`, `create_time`, `indicator_description`) VALUES ('all_login_num', '49', 0, 'app_login_log', '2022-12-15 00:00:00', '2022-12-16 00:00:00', '2022-12-19 18:09:24', '登录用户数');
INSERT INTO `test`.`flink_01`(`indicator_name`, `indicator_value`, `indicator_code`, `table_name`, `window_start`, `window_end`, `create_time`, `indicator_description`) VALUES ('all_login_num', '62', 0, 'app_login_log', '2022-12-16 00:00:00', '2022-12-17 00:00:00', '2022-12-19 18:09:25', '登录用户数');
INSERT INTO `test`.`flink_01`(`indicator_name`, `indicator_value`, `indicator_code`, `table_name`, `window_start`, `window_end`, `create_time`, `indicator_description`) VALUES ('all_login_num', '3', 0, 'app_login_log', '2022-12-17 00:00:00', '2022-12-18 00:00:00', '2022-12-19 18:09:25', '登录用户数');
INSERT INTO `test`.`flink_01`(`indicator_name`, `indicator_value`, `indicator_code`, `table_name`, `window_start`, `window_end`, `create_time`, `indicator_description`) VALUES ('all_login_num', '37', 0, 'app_login_log', '2022-12-19 00:00:00', '2022-12-20 00:00:00', '2022-12-20 11:22:02', '登录用户数');
INSERT INTO `test`.`flink_01`(`indicator_name`, `indicator_value`, `indicator_code`, `table_name`, `window_start`, `window_end`, `create_time`, `indicator_description`) VALUES ('all_login_num', '20', 0, 'app_login_log', '2022-12-20 00:00:00', '2022-12-21 00:00:00', '2022-12-21 10:41:24', '登录用户数');
INSERT INTO `test`.`flink_01`(`indicator_name`, `indicator_value`, `indicator_code`, `table_name`, `window_start`, `window_end`, `create_time`, `indicator_description`) VALUES ('all_login_num', '19', 0, 'app_login_log', '2022-12-21 00:00:00', '2022-12-21 15:19:00', '2022-12-21 15:46:27', '登录用户数');CREATE TABLE `flink_02` (`indicator_name` varchar(255) DEFAULT NULL COMMENT '指标名称',`indicator_value` varchar(255) DEFAULT NULL COMMENT '指标值',`indicator_code` int NOT NULL COMMENT '指标编码',`table_name` varchar(255) NOT NULL COMMENT '指标计算上游表名',`window_start` datetime NOT NULL COMMENT '窗口开始时间',`window_end` datetime DEFAULT NULL COMMENT '窗口截止时间',`create_time` datetime DEFAULT NULL COMMENT '创建更新时间',`indicator_description` varchar(255) DEFAULT NULL COMMENT '指标描述',PRIMARY KEY (`indicator_code`,`table_name`,`window_start`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
INSERT INTO `test`.`flink_02`(`indicator_name`, `indicator_value`, `indicator_code`, `table_name`, `window_start`, `window_end`, `create_time`, `indicator_description`) VALUES ('all_login_num', '52', 0, 'app_login_log', '2022-12-14 00:00:00', '2022-12-15 00:00:00', '2022-12-19 18:09:24', '登录用户数');
INSERT INTO `test`.`flink_02`(`indicator_name`, `indicator_value`, `indicator_code`, `table_name`, `window_start`, `window_end`, `create_time`, `indicator_description`) VALUES ('all_login_num', '49', 0, 'app_login_log', '2022-12-15 00:00:00', '2022-12-16 00:00:00', '2022-12-19 18:09:24', '登录用户数');
INSERT INTO `test`.`flink_02`(`indicator_name`, `indicator_value`, `indicator_code`, `table_name`, `window_start`, `window_end`, `create_time`, `indicator_description`) VALUES ('all_login_num', '62', 0, 'app_login_log', '2022-12-16 00:00:00', '2022-12-17 00:00:00', '2022-12-19 18:09:25', '登录用户数');
INSERT INTO `test`.`flink_02`(`indicator_name`, `indicator_value`, `indicator_code`, `table_name`, `window_start`, `window_end`, `create_time`, `indicator_description`) VALUES ('all_login_num', '3', 0, 'app_login_log', '2022-12-17 00:00:00', '2022-12-18 00:00:00', '2022-12-19 18:09:25', '登录用户数');
INSERT INTO `test`.`flink_02`(`indicator_name`, `indicator_value`, `indicator_code`, `table_name`, `window_start`, `window_end`, `create_time`, `indicator_description`) VALUES ('all_login_num', '37', 0, 'app_login_log', '2022-12-19 00:00:00', '2022-12-20 00:00:00', '2022-12-20 11:22:02', '登录用户数');
INSERT INTO `test`.`flink_02`(`indicator_name`, `indicator_value`, `indicator_code`, `table_name`, `window_start`, `window_end`, `create_time`, `indicator_description`) VALUES ('all_login_num', '20', 0, 'app_login_log', '2022-12-20 00:00:00', '2022-12-21 00:00:00', '2022-12-21 10:41:24', '登录用户数');
INSERT INTO `test`.`flink_02`(`indicator_name`, `indicator_value`, `indicator_code`, `table_name`, `window_start`, `window_end`, `create_time`, `indicator_description`) VALUES ('all_login_num', '19', 1, 'app_login_log', '2022-12-21 00:00:00', '2022-12-21 15:19:00', '2022-12-21 15:46:27', '登录用户数');CREATE TABLE `flink_merge` (`indicator_name` varchar(255) DEFAULT NULL COMMENT '指标名称',`indicator_value` varchar(255) DEFAULT NULL COMMENT '指标值',`indicator_code` int NOT NULL COMMENT '指标编码',`table_name` varchar(255) NOT NULL COMMENT '指标计算上游表名',`window_start` datetime NOT NULL COMMENT '窗口开始时间',`window_end` datetime DEFAULT NULL COMMENT '窗口截止时间',`create_time` datetime DEFAULT NULL COMMENT '创建更新时间',`indicator_description` varchar(255) DEFAULT NULL COMMENT '指标描述',PRIMARY KEY (`indicator_code`,`table_name`,`window_start`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
所需要的依赖jar:
- mysql 的驱动请自行下载
- flink-sql 的连接器
.13/docs/connectors/table/jdbc/ 在这里下载flinksql 连接器
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_2.11</artifactId><version>1.13.6</version>
</dependency>
- flink-cdc 依赖
.3/content/connectors/mysql-cdc%28ZH%29.html#a-name-id-002-a 在这里下载
下载后的jar统一放在flink安装目录下的lib目录下即可。
运行程序
package com.test.demo.table.sql;import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;public class flinkcdc {public static void main(String[] args) {EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode()
// .inBatchMode().build();TableEnvironment tableEnv = TableEnvironment.create(settings);// 'table-name' = 'flink.*' 意思是读取tablename以flink开头的所有的表tableEnv.executeSql("CREATE TABLE `source_table`\n" +"(\n" +" `indicator_name` STRING,\n" +" `indicator_value` STRING,\n" +" `indicator_code` INT,\n" +" `table_name` STRING,\n" +" `window_start` TIMESTAMP(0),\n" +" `window_end` TIMESTAMP(0),\n" +" `create_time` TIMESTAMP,\n" +" `indicator_description` STRING,\n" +" PRIMARY KEY (`indicator_code`, `table_name`, `window_start`) NOT ENFORCED\n" +") WITH (\n" +" 'connector' = 'mysql-cdc',\n" +" 'hostname' = 'ip',\n" +" 'scan.startup.mode' = 'initial',\n" +" 'port' = '3306',\n" +" 'username' = 'root',\n" +" 'password' = '123456',\n" +" 'database-name' = 'test',\n" +" 'table-name' = 'flink.*'\n" +")");// tableEnv.sqlQuery("select * from MyTable").execute().print();
//查询的时候定义event_time窗口tableEnv.executeSql("CREATE TABLE `flink_merge`\n" +"(\n" +" `indicator_name` STRING,\n" +" `indicator_value` STRING,\n" +" `indicator_code` INT,\n" +" `table_name` STRING,\n" +" `window_start` TIMESTAMP(0),\n" +" `window_end` TIMESTAMP(0),\n" +" `create_time` TIMESTAMP,\n" +" `indicator_description` STRING,\n" +" PRIMARY KEY (`indicator_code`, `table_name`, `window_start`) NOT ENFORCED\n" +") WITH (\n" +" 'connector' = 'jdbc',\n" +" 'url' = 'jdbc:mysql://ip:3306/test',\n" +" 'driver' = 'com.mysql.cj.jdbc.Driver',\n" +" 'username' = 'root',\n" +" 'password' = '123456',\n" +" 'table-name' = 'flink_merge'\n" +")");//直接sql查询tableEnv.executeSql("insert into flink_merge select * from source_table");}
}
总结
按照上面的步骤就可以进行实时同步了, 如果你要在生产环境用建议配置上savepoint 和checkpoint, 这样可以达到断点续传的功能。 文件比较简短适合有一定flink基础的人快速开发,如果你对flink还不是很了解建议先去学下flink相关的知识,再来进行cdc的实验。 flink cdc可以说是以后数据同步的主流,和其他方式相比架构比较简单,而且通过参数控制是否是全量同步,十分友好。
多说一句,目前对flinksql我们公司已不用写代码进行开发了,而是用的streamx框架,streamx框架可以很方便配置savepoint/chekpoints, 以及启动参数,而且可以在web页面启动flinksql 不需要在控制台写一堆参数提交到yarn上,很方便。
本文标签: 实战
版权声明:本文标题:【实战 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.freenas.com.cn/free/1699609675h370443.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论