• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

canal-glue: 简化ETL工作,编写一个Canal胶水层

原作者: [db:作者] 来自: 网络 收藏 邀请

开源软件名称:

canal-glue

开源软件地址:

https://gitee.com/throwableDoge/canal-glue

开源软件介绍:

简化ETL工作,编写一个Canal胶水层

前提

这是一篇憋了很久的文章,一直想写,却又一直忘记了写。整篇文章可能会有点流水账,相对详细地介绍怎么写一个小型的"框架"。这个精悍的胶水层已经在生产环境服役超过半年,这里尝试把耦合业务的代码去掉,提炼出一个相对简洁的版本。

之前写的几篇文章里面其中一篇曾经提到过Canal解析MySQLbinlog事件后的对象如下(来源于Canal源码com.alibaba.otter.canal.protocol.FlatMessage):

如果直接对此原始对象进行解析,那么会出现很多解析模板代码,一旦有改动就会牵一发动全身,这是我们不希望发生的一件事。于是花了一点点时间写了一个Canal胶水层,让接收到的FlatMessage根据表名称直接转换为对应的DTO实例,这样能在一定程度上提升开发效率并且减少模板化代码,这个胶水层的数据流示意图如下:

要编写这样的胶水层主要用到:

  • 反射。
  • 注解。
  • 策略模式。
  • IOC容器(可选)。

项目的模块如下:

  • canal-glue-core:核心功能。
  • spring-boot-starter-canal-glue:适配SpringIOC容器,添加自动配置。
  • canal-glue-example:使用例子和基准测试。

下文会详细分析此胶水层如何实现。

引入依赖

为了不污染引用此模块的外部服务依赖,除了JSON转换的依赖之外,其他依赖的scope定义为provide或者test类型,依赖版本和BOM如下:

<properties>        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>        <maven.compiler.source>1.8</maven.compiler.source>        <maven.compiler.target>1.8</maven.compiler.target>        <spring.boot.version>2.3.0.RELEASE</spring.boot.version>        <maven.compiler.plugin.version>3.8.1</maven.compiler.plugin.version>        <lombok.version>1.18.12</lombok.version>        <fastjson.version>1.2.73</fastjson.version></properties><dependencyManagement>    <dependencies>        <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-dependencies</artifactId>            <version>${spring.boot.version}</version>            <scope>import</scope>            <type>pom</type>        </dependency>    </dependencies></dependencyManagement><dependencies>    <dependency>        <groupId>org.projectlombok</groupId>        <artifactId>lombok</artifactId>        <version>${lombok.version}</version>        <scope>provided</scope>    </dependency>    <dependency>        <groupId>org.springframework.boot</groupId>        <artifactId>spring-boot-starter-test</artifactId>        <scope>test</scope>    </dependency>    <dependency>        <groupId>org.springframework.boot</groupId>        <artifactId>spring-boot-starter</artifactId>        <scope>provided</scope>    </dependency>    <dependency>        <groupId>com.alibaba</groupId>        <artifactId>fastjson</artifactId>        <version>${fastjson.version}</version>    </dependency></dependencies>

其中,canal-glue-core模块本质上只依赖于fastjson,可以完全脱离spring体系使用。

基本架构

这里提供一个"后知后觉"的架构图,因为之前为了快速怼到线上,初版没有考虑这么多,甚至还耦合了业务代码,组件是后来抽离出来的:

设计配置模块(已经移除)

设计配置模块在设计的时候考虑使用了外置配置文件和纯注解两种方式,前期使用了JSON外置配置文件的方式,纯注解是后来增加的,二选一。这一节简单介绍一下JSON外置配置文件的配置加载,纯注解留到后面处理器模块时候分析。

当初是想快速进行胶水层的开发,所以配置文件使用了可读性比较高的JSON格式:

{  "version": 1,  "module": "canal-glue",  "databases": [    {      "database": "db_payment_service",      "processors": [        {          "table": "payment_order",          "processor": "x.y.z.PaymentOrderProcessor",          "exceptionHandler": "x.y.z.PaymentOrderExceptionHandler"        }      ]    },    {      ......    }  ]}

JSON配置在设计的时候尽可能不要使用JSON Array作为顶层配置,因为这样做设计的对象会比较怪

因为使用该模块的应用有可能需要处理Canal解析多个上游数据库的binlog事件,所以配置模块设计的时候需要以databaseKEY,挂载多个table以及对应的表binlog事件处理器以及异常处理器。然后对着JSON文件的格式撸一遍对应的实体类出来:

@Datapublic class CanalGlueProcessorConf {    private String table;    private String processor;    private String exceptionHandler;}@Datapublic class CanalGlueDatabaseConf {    private String database;    private List<CanalGlueProcessorConf> processors;}@Datapublic class CanalGlueConf {    private Long version;    private String module;    private List<CanalGlueDatabaseConf> database;}

实体编写完,接着可以编写一个配置加载器,简单起见,配置文件直接放ClassPath之下,加载器如下:

public interface CanalGlueConfLoader {    CanalGlueConf load(String location);}// 实现public class ClassPathCanalGlueConfLoader implements CanalGlueConfLoader {    @Override    public CanalGlueConf load(String location) {        ClassPathResource resource = new ClassPathResource(location);        Assert.isTrue(resource.exists(), String.format("类路径下不存在文件%s", location));        try {            String content = StreamUtils.copyToString(resource.getInputStream(), StandardCharsets.UTF_8);            return JSON.parseObject(content, CanalGlueConf.class);        } catch (IOException e) {            // should not reach            throw new IllegalStateException(e);        }    }}

读取ClassPath下的某个location为绝对路径的文件内容字符串,然后使用Fasfjson转成CanalGlueConf对象。这个是默认的实现,使用canal-glue模块可以覆盖此实现,通过自定义的实现加载配置。

JSON配置模块在后来从业务系统抽离此胶水层的时候已经完全废弃,使用纯注解驱动和核心抽象组件继承的方式实现。

核心模块开发

主要包括几个模块:

  • 基本模型定义。
  • 适配器层开发。
  • 转换器和解析器层开发。
  • 处理器层开发。
  • 全局组件自动配置模块开发(仅限于Spring体系,已经抽取到spring-boot-starter-canal-glue模块)。
  • CanalGlue开发。

基本模型定义

定义顶层的KEY,也就是对于某个数据库的某一个确定的表,需要一个唯一标识:

// 模型表对象public interface ModelTable {    String database();    String table();    static ModelTable of(String database, String table) {        return DefaultModelTable.of(database, table);    }}@RequiredArgsConstructor(access = AccessLevel.PACKAGE, staticName = "of")public class DefaultModelTable implements ModelTable {    private final String database;    private final String table;    @Override    public String database() {        return database;    }    @Override    public String table() {        return table;    }    @Override    public boolean equals(Object o) {        if (this == o) {            return true;        }        if (o == null || getClass() != o.getClass()) {            return false;        }        DefaultModelTable that = (DefaultModelTable) o;        return Objects.equals(database, that.database) &&                Objects.equals(table, that.table);    }    @Override    public int hashCode() {        return Objects.hash(database, table);    }}

这里实现类DefaultModelTable重写了equals()hashCode()方法便于把ModelTable实例应用为HashMap容器的KEY,这样后面就可以设计ModelTable -> Processor的缓存结构。

由于Canal投放到Kafka的事件内容是一个原始字符串,所以要定义一个和前文提到的FlatMessage基本一致的事件类CanalBinLogEvent

@Datapublic class CanalBinLogEvent {    /**     * 事件ID,没有实际意义     */    private Long id;    /**     * 当前更变后节点数据     */    private List<Map<String, String>> data;    /**     * 主键列名称列表     */    private List<String> pkNames;    /**     * 当前更变前节点数据     */    private List<Map<String, String>> old;    /**     * 类型 UPDATE\INSERT\DELETE\QUERY     */    private String type;    /**     * binlog execute time     */    private Long es;    /**     * dml build timestamp     */    private Long ts;    /**     * 执行的sql,不一定存在     */    private String sql;    /**     * 数据库名称     */    private String database;    /**     * 表名称     */    private String table;    /**     * SQL类型映射     */    private Map<String, Integer> sqlType;    /**     * MySQL字段类型映射     */    private Map<String, String> mysqlType;    /**     * 是否DDL     */    private Boolean isDdl;}

根据此事件对象,再定义解析完毕后的结果对象CanalBinLogResult

// 常量@RequiredArgsConstructor@Getterpublic enum BinLogEventType {        QUERY("QUERY", "查询"),    INSERT("INSERT", "新增"),    UPDATE("UPDATE", "更新"),    DELETE("DELETE", "删除"),    ALTER("ALTER", "列修改操作"),    UNKNOWN("UNKNOWN", "未知"),    ;    private final String type;    private final String description;    public static BinLogEventType fromType(String type) {        for (BinLogEventType binLogType : BinLogEventType.values()) {            if (binLogType.getType().equals(type)) {                return binLogType;            }        }        return BinLogEventType.UNKNOWN;    }}// 常量@RequiredArgsConstructor@Getterpublic enum OperationType {    /**     * DML     */    DML("dml", "DML语句"),    /**     * DDL     */    DDL("ddl", "DDL语句"),    ;    private final String type;    private final String description;}@Datapublic class CanalBinLogResult<T> {    /**     * 提取的长整型主键     */    private Long primaryKey;    /**     * binlog事件类型     */    private BinLogEventType binLogEventType;    /**     * 更变前的数据     */    private T beforeData;    /**     * 更变后的数据     */    private T afterData;    /**     * 数据库名称     */    private String databaseName;    /**     * 表名称     */    private String tableName;    /**     * sql语句 - 一般是DDL的时候有用     */    private String sql;    /**     * MySQL操作类型     */    private OperationType operationType;}

开发适配器层

定义顶层的适配器SPI接口:

public interface SourceAdapter<SOURCE, SINK> {    SINK adapt(SOURCE source);}

接着开发适配器实现类:

// 原始字符串直接返回@RequiredArgsConstructor(access = AccessLevel.PACKAGE, staticName = "of")class RawStringSourceAdapter implements SourceAdapter<String, String> {    @Override    public String adapt(String source) {        return source;    }}// Fastjson转换@RequiredArgsConstructor(access = AccessLevel.PACKAGE, staticName = "of")class FastJsonSourceAdapter<T> implements SourceAdapter<String, T> {    private final Class<T> klass;    @Override    public T adapt(String source) {        if (StringUtils.isEmpty(source)) {            return null;        }        return JSON.parseObject(source, klass);    }}// Facadepublic enum SourceAdapterFacade {    /**     * 单例     */    X;    private static final SourceAdapter<String, String> I_S_A = RawStringSourceAdapter.of();    @SuppressWarnings("unchecked")    public <T> T adapt(Class<T> klass, String source) {        if (klass.isAssignableFrom(String.class)) {            return (T) I_S_A.adapt(source);        }        return FastJsonSourceAdapter.of(klass).adapt(source);    }}

最终直接使用SourceAdapterFacade#adapt()方法即可,因为实际上绝大多数情况下只会使用原始字符串和String -> Class实例,适配器层设计可以简单点。

开发转换器和解析器层

对于Canal解析完成的binlog事件,dataold属性是K-V结构,并且KEY都是String类型,需要遍历解析从能推导出完整的目标实例。

转换后的实例的属性类型目前只支持包装类,int等原始类型不支持

为了更好地通过目标实体和实际的数据库、表和列名称、列类型进行映射,引入了两个自定义注解CanalModel@CanalField,它们的定义如下:

// @CanalModel@Retention(RetentionPolicy.RUNTIME)@Target(ElementType.TYPE)public @interface CanalModel {    /**     * 目标数据库     */    String database();    /**     * 目标表     */    String table();    /**     * 属性名 -> 列名命名转换策略,可选值有:DEFAULT(原始)、UPPER_UNDERSCORE(驼峰转下划线大写)和LOWER_UNDERSCORE(驼峰转下划线小写)     */    FieldNamingPolicy fieldNamingPolicy() default FieldNamingPolicy.DEFAULT;}// @CanalField@Retention(RetentionPolicy.RUNTIME)@Target(ElementType.FIELD)public @interface CanalField {    /**     * 行名称     *     * @return columnName     */    String columnName() default "";    /**     * sql字段类型     *     * @return JDBCType     */    JDBCType sqlType() default JDBCType.NULL;    /**     * 转换器类型     *     * @return klass     */    Class<? extends BaseCanalFieldConverter<?>> converterKlass() default NullCanalFieldConverter.class;}

定义顶层转换器接口BinLogFieldConverter

public interface BinLogFieldConverter<SOURCE, TARGET> {    TARGET convert(SOURCE source);}

目前暂定可以通过目标属性的Class和通过注解指定的SQLType类型进行匹配,所以再定义一个抽象转换器BaseCanalFieldConverter

public abstract class BaseCanalFieldConverter<T> implements BinLogFieldConverter<String, T> {    private final SQLType sqlType;    private final Class<?> klass;    protected BaseCanalFieldConverter(SQLType sqlType, Class<?> klass) {        this.sqlType = sqlType;        this.klass = klass;    }    @Override    public T convert(String source) {        if (StringUtils.isEmpty(source)) {            return null;        }        return convertInternal(source);    }    /**     * 内部转换方法     *     * @param source 源字符串     * @return T     */    protected abstract T convertInternal(String source);    /**     * 返回SQL类型     *     * @return SQLType     */    public SQLType sqlType() {        return sqlType;    }    /**     * 返回类型     *     * @return Class<?>     */    public Class<?> typeKlass() {        return klass;    }}

BaseCanalFieldConverter是面向目标实例中的单个属性的,例如对于实例中的Long类型的属性,可以实现一个BigIntCanalFieldConverter

public class BigIntCanalFieldConverter extends BaseCanalFieldConverter<Long> {    /**     * 单例     */    
                      

鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap