博客
关于我
Flink CDC的使用
阅读量:479 次
发布时间:2019-03-06

本文共 4602 字,大约阅读时间需要 15 分钟。

MySQL数据准备与Flink CDC实时数据捕获

MySQL数据准备

在开始使用Flink CDC捕获MySQL变更数据之前,需要先准备好MySQL数据库。以下是具体的操作步骤:

  • 创建并使用数据库
  • create database if not exists test;
    use test;
    1. 创建学生表
    2. drop table if exists stu;
      create table stu (
      id int primary key auto_increment,
      name varchar(100),
      age int
      );
      1. 插入初始数据
      2. insert into stu(name, age) values("张三", 18);
        insert into stu(name, age) values("李四", 20);
        insert into stu(name, age) values("王五", 21);

        注意事项:确保表中有主键,否则Flink CDC可能无法正常工作。

        开启MySQL binlog

        为了实现Flink CDC对MySQL数据库的变更数据实时捕获,需要先开启MySQL的二进制日志。

      3. 修改MySQL配置文件
      4. sudo vim /etc/my.cnf
        1. 在配置文件中添加以下内容:
        2. server-id = 1
          log-bin=mysql-bin
          binlog_format=row
          binlog-do-db=test

          注意事项:启用binlog的数据库需要根据实际情况调整设置,确保二进制日志文件路径和权限正确。

          1. 重启MySQL服务
          2. sudo systemctl restart mysqld

            Flink代码开发

            本节将介绍如何使用Flink CDC从MySQL数据库实时捕获增删改数据。

            依赖管理

          3. 添加Flink CDC依赖
          4. com.ververica
            flink-connector-mysql-cdc
            2.4.0
            1. 其他Flink依赖(如 flink-table-api-java-bridge 等)
            2. org.apache.flink
              flink-table-api-java-bridge
              ${flink.version}
              import com.ververica.cdc.connectors.mysql.source.MySqlSource;
              import com.ververica.cdc.connectors.mysql.table.StartupOptions;
              import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
              import org.apache.flink.api.common.eventtime.WatermarkStrategy;
              import org.apache.flink.streaming.api.datastream.DataStreamSource;
              import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
              public class FlinkCDCDemo {
              public static void main(String[] args) throws Exception {
              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
              env.setParallelism(4);
              MySqlSource
              mySqlSource = MySqlSource.builder()
              .hostname("node4")
              .port(3306)
              .username("root")
              .password("000000")
              .databaseList("test")
              .tableList("test.stu")
              .deserializer(new JsonDebeziumDeserializationSchema())
              .startupOptions(StartupOptions.initial())
              .build();
              DataStreamSource
              dataStreamSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "mysql_source")
              .setParallelism(1);
              dataStreamSource.print();
              env.execute();
              }
              }

              注意事项:确保MySQL的binlog已经启用,并且Flink运行环境的版本与依赖版本匹配。

              测试与验证

              添加新数据

              执行以下SQL语句:

              mysql> insert into stu(name, age) values("赵六", 23);

              输出示例

              {
              "before": null,
              "after": {
              "id": 4,
              "name": "赵六",
              "age": 23
              },
              "source": {
              "version": "1.9.7.Final",
              "connector": "mysql",
              "name": "mysql_binlog_source",
              "ts_ms": 1719831654000,
              "snapshot": "false",
              "db": "test",
              "sequence": null,
              "table": "stu",
              "server_id": 1,
              "gtid": null,
              "file": "mysql-bin.000001",
              "pos": 2300,
              "row": 0,
              "thread": 13,
              "query": null
              },
              "op": "c",
              "ts_ms": 1719831654692,
              "transaction": null
              }

              修改数据

              执行以下SQL语句:

              mysql> update stu set name="zl", age=19 where name="赵六";

              输出示例

              {
              "before": {
              "id": 4,
              "name": "赵六",
              "age": 23
              },
              "after": {
              "id": 4,
              "name": "zl",
              "age": 19
              },
              "source": {
              "version": "1.9.7.Final",
              "connector": "mysql",
              "name": "mysql_binlog_source",
              "ts_ms": 1719831987000,
              "snapshot": "false",
              "db": "test",
              "sequence": null,
              "table": "stu",
              "server_id": 1,
              "gtid": null,
              "file": "mysql-bin.000001",
              "pos": 2604,
              "row": 0,
              "thread": 13,
              "query": null
              },
              "op": "u",
              "ts_ms": 1719831987238,
              "transaction": null
              }

              删除数据

              执行以下SQL语句:

              mysql> delete from stu where id=4;

              输出示例

              {
              "before": {
              "id": 4,
              "name": "zl",
              "age": 19
              },
              "after": null,
              "source": {
              "version": "1.9.7.Final",
              "connector": "mysql",
              "name": "mysql_binlog_source",
              "ts_ms": 1719832151000,
              "snapshot": "false",
              "db": "test",
              "sequence": null,
              "table": "stu",
              "server_id": 1,
              "gtid": null,
              "file": "mysql-bin.000001",
              "pos": 2913,
              "row": 0,
              "thread": 13,
              "query": null
              },
              "op": "d",
              "ts_ms": 1719832151198,
              "transaction": null
              }

              注意事项:通过IDEA控制台可以实时查看Flink程序的输出日志,确保数据捕获和处理过程中没有错误发生。

    转载地址:http://iaqbz.baihongyu.com/

    你可能感兴趣的文章
    mysql大批量删除(修改)The total number of locks exceeds the lock table size 错误的解决办法
    查看>>
    mysql如何做到存在就更新不存就插入_MySQL 索引及优化实战(二)
    查看>>
    MySQL如何实现ACID ?
    查看>>
    mysql如何记录数据库响应时间
    查看>>
    Mysql字段、索引操作
    查看>>
    MySQL字符集与排序规则
    查看>>
    mysql存储中文 但是读取乱码_mysql存储中文乱码
    查看>>
    mysql存储登录_php调用mysql存储过程会员登录验证实例分析
    查看>>
    MySql存储过程中limit传参
    查看>>
    MySQL存储过程入门
    查看>>
    mysql存储过程批量建表
    查看>>
    mysql存储过程详解
    查看>>
    MySQL学习-group by和having
    查看>>
    MySQL学习-MySQL条件查询
    查看>>
    MySQL学习-SQL语句的分类与MySQL简单查询
    查看>>
    MySQL学习-子查询及limit分页
    查看>>
    MySQL学习-排序与分组函数
    查看>>
    MySQL学习-连接查询
    查看>>
    Mysql学习总结(15)——Mysql错误码大全
    查看>>
    Mysql学习总结(19)——Mysql无法创建外键的原因
    查看>>