开源软件名称:canal-glue
开源软件地址:https://gitee.com/throwableDoge/canal-glue
开源软件介绍:
简化ETL工作,编写一个Canal胶水层前提这是一篇憋了很久的文章,一直想写,却又一直忘记了写。整篇文章可能会有点流水账,相对详细地介绍怎么写一个小型的"框架"。这个精悍的胶水层已经在生产环境服役超过半年,这里尝试把耦合业务的代码去掉,提炼出一个相对简洁的版本。
之前写的几篇文章里面其中一篇曾经提到过Canal 解析MySQL 的binlog 事件后的对象如下(来源于Canal 源码com.alibaba.otter.canal.protocol.FlatMessage ): 如果直接对此原始对象进行解析,那么会出现很多解析模板代码,一旦有改动就会牵一发动全身,这是我们不希望发生的一件事。于是花了一点点时间写了一个Canal 胶水层,让接收到的FlatMessage 根据表名称直接转换为对应的DTO 实例,这样能在一定程度上提升开发效率并且减少模板化代码,这个胶水层的数据流示意图如下: 要编写这样的胶水层主要用到: 项目的模块如下: canal-glue-core :核心功能。spring-boot-starter-canal-glue :适配Spring 的IOC 容器,添加自动配置。canal-glue-example :使用例子和基准测试。
下文会详细分析此胶水层如何实现。 引入依赖为了不污染引用此模块的外部服务依赖,除了JSON 转换的依赖之外,其他依赖的scope 定义为provide 或者test 类型,依赖版本和BOM 如下: <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <spring.boot.version>2.3.0.RELEASE</spring.boot.version> <maven.compiler.plugin.version>3.8.1</maven.compiler.plugin.version> <lombok.version>1.18.12</lombok.version> <fastjson.version>1.2.73</fastjson.version></properties><dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-dependencies</artifactId> <version>${spring.boot.version}</version> <scope>import</scope> <type>pom</type> </dependency> </dependencies></dependencyManagement><dependencies> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>${lombok.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> <scope>provided</scope> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>${fastjson.version}</version> </dependency></dependencies> 其中,canal-glue-core 模块本质上只依赖于fastjson ,可以完全脱离spring 体系使用。 基本架构这里提供一个"后知后觉"的架构图,因为之前为了快速怼到线上,初版没有考虑这么多,甚至还耦合了业务代码,组件是后来抽离出来的: 设计配置模块(已经移除)设计配置模块在设计的时候考虑使用了外置配置文件和纯注解两种方式,前期使用了JSON外置配置文件的方式,纯注解是后来增加的,二选一。这一节简单介绍一下JSON外置配置文件的配置加载,纯注解留到后面处理器模块时候分析。
当初是想快速进行胶水层的开发,所以配置文件使用了可读性比较高的JSON 格式: { "version": 1, "module": "canal-glue", "databases": [ { "database": "db_payment_service", "processors": [ { "table": "payment_order", "processor": "x.y.z.PaymentOrderProcessor", "exceptionHandler": "x.y.z.PaymentOrderExceptionHandler" } ] }, { ...... } ]} JSON配置在设计的时候尽可能不要使用JSON Array作为顶层配置,因为这样做设计的对象会比较怪
因为使用该模块的应用有可能需要处理Canal 解析多个上游数据库的binlog 事件,所以配置模块设计的时候需要以database 为KEY ,挂载多个table 以及对应的表binlog 事件处理器以及异常处理器。然后对着JSON 文件的格式撸一遍对应的实体类出来: @Datapublic class CanalGlueProcessorConf { private String table; private String processor; private String exceptionHandler;}@Datapublic class CanalGlueDatabaseConf { private String database; private List<CanalGlueProcessorConf> processors;}@Datapublic class CanalGlueConf { private Long version; private String module; private List<CanalGlueDatabaseConf> database;} 实体编写完,接着可以编写一个配置加载器,简单起见,配置文件直接放ClassPath 之下,加载器如下: public interface CanalGlueConfLoader { CanalGlueConf load(String location);}// 实现public class ClassPathCanalGlueConfLoader implements CanalGlueConfLoader { @Override public CanalGlueConf load(String location) { ClassPathResource resource = new ClassPathResource(location); Assert.isTrue(resource.exists(), String.format("类路径下不存在文件%s", location)); try { String content = StreamUtils.copyToString(resource.getInputStream(), StandardCharsets.UTF_8); return JSON.parseObject(content, CanalGlueConf.class); } catch (IOException e) { // should not reach throw new IllegalStateException(e); } }} 读取ClassPath 下的某个location 为绝对路径的文件内容字符串,然后使用Fasfjson 转成CanalGlueConf 对象。这个是默认的实现,使用canal-glue 模块可以覆盖此实现,通过自定义的实现加载配置。 JSON配置模块在后来从业务系统抽离此胶水层的时候已经完全废弃,使用纯注解驱动和核心抽象组件继承的方式实现。
核心模块开发主要包括几个模块: - 基本模型定义。
- 适配器层开发。
- 转换器和解析器层开发。
- 处理器层开发。
- 全局组件自动配置模块开发(仅限于
Spring 体系,已经抽取到spring-boot-starter-canal-glue 模块)。 CanalGlue 开发。
基本模型定义定义顶层的KEY ,也就是对于某个数据库的某一个确定的表,需要一个唯一标识: // 模型表对象public interface ModelTable { String database(); String table(); static ModelTable of(String database, String table) { return DefaultModelTable.of(database, table); }}@RequiredArgsConstructor(access = AccessLevel.PACKAGE, staticName = "of")public class DefaultModelTable implements ModelTable { private final String database; private final String table; @Override public String database() { return database; } @Override public String table() { return table; } @Override public boolean equals(Object o) { if (this == o) { return true; } if (o == null || getClass() != o.getClass()) { return false; } DefaultModelTable that = (DefaultModelTable) o; return Objects.equals(database, that.database) && Objects.equals(table, that.table); } @Override public int hashCode() { return Objects.hash(database, table); }} 这里实现类DefaultModelTable 重写了equals() 和hashCode() 方法便于把ModelTable 实例应用为HashMap 容器的KEY ,这样后面就可以设计ModelTable -> Processor 的缓存结构。 由于Canal 投放到Kafka 的事件内容是一个原始字符串,所以要定义一个和前文提到的FlatMessage 基本一致的事件类CanalBinLogEvent : @Datapublic class CanalBinLogEvent { /** * 事件ID,没有实际意义 */ private Long id; /** * 当前更变后节点数据 */ private List<Map<String, String>> data; /** * 主键列名称列表 */ private List<String> pkNames; /** * 当前更变前节点数据 */ private List<Map<String, String>> old; /** * 类型 UPDATE\INSERT\DELETE\QUERY */ private String type; /** * binlog execute time */ private Long es; /** * dml build timestamp */ private Long ts; /** * 执行的sql,不一定存在 */ private String sql; /** * 数据库名称 */ private String database; /** * 表名称 */ private String table; /** * SQL类型映射 */ private Map<String, Integer> sqlType; /** * MySQL字段类型映射 */ private Map<String, String> mysqlType; /** * 是否DDL */ private Boolean isDdl;} 根据此事件对象,再定义解析完毕后的结果对象CanalBinLogResult : // 常量@RequiredArgsConstructor@Getterpublic enum BinLogEventType { QUERY("QUERY", "查询"), INSERT("INSERT", "新增"), UPDATE("UPDATE", "更新"), DELETE("DELETE", "删除"), ALTER("ALTER", "列修改操作"), UNKNOWN("UNKNOWN", "未知"), ; private final String type; private final String description; public static BinLogEventType fromType(String type) { for (BinLogEventType binLogType : BinLogEventType.values()) { if (binLogType.getType().equals(type)) { return binLogType; } } return BinLogEventType.UNKNOWN; }}// 常量@RequiredArgsConstructor@Getterpublic enum OperationType { /** * DML */ DML("dml", "DML语句"), /** * DDL */ DDL("ddl", "DDL语句"), ; private final String type; private final String description;}@Datapublic class CanalBinLogResult<T> { /** * 提取的长整型主键 */ private Long primaryKey; /** * binlog事件类型 */ private BinLogEventType binLogEventType; /** * 更变前的数据 */ private T beforeData; /** * 更变后的数据 */ private T afterData; /** * 数据库名称 */ private String databaseName; /** * 表名称 */ private String tableName; /** * sql语句 - 一般是DDL的时候有用 */ private String sql; /** * MySQL操作类型 */ private OperationType operationType;} 开发适配器层定义顶层的适配器SPI 接口: public interface SourceAdapter<SOURCE, SINK> { SINK adapt(SOURCE source);} 接着开发适配器实现类: // 原始字符串直接返回@RequiredArgsConstructor(access = AccessLevel.PACKAGE, staticName = "of")class RawStringSourceAdapter implements SourceAdapter<String, String> { @Override public String adapt(String source) { return source; }}// Fastjson转换@RequiredArgsConstructor(access = AccessLevel.PACKAGE, staticName = "of")class FastJsonSourceAdapter<T> implements SourceAdapter<String, T> { private final Class<T> klass; @Override public T adapt(String source) { if (StringUtils.isEmpty(source)) { return null; } return JSON.parseObject(source, klass); }}// Facadepublic enum SourceAdapterFacade { /** * 单例 */ X; private static final SourceAdapter<String, String> I_S_A = RawStringSourceAdapter.of(); @SuppressWarnings("unchecked") public <T> T adapt(Class<T> klass, String source) { if (klass.isAssignableFrom(String.class)) { return (T) I_S_A.adapt(source); } return FastJsonSourceAdapter.of(klass).adapt(source); }} 最终直接使用SourceAdapterFacade#adapt() 方法即可,因为实际上绝大多数情况下只会使用原始字符串和String -> Class实例 ,适配器层设计可以简单点。 开发转换器和解析器层对于Canal 解析完成的binlog 事件,data 和old 属性是K-V 结构,并且KEY 都是String 类型,需要遍历解析从能推导出完整的目标实例。 转换后的实例的属性类型目前只支持包装类,int等原始类型不支持
为了更好地通过目标实体和实际的数据库、表和列名称、列类型进行映射,引入了两个自定义注解CanalModel 和@CanalField ,它们的定义如下: // @CanalModel@Retention(RetentionPolicy.RUNTIME)@Target(ElementType.TYPE)public @interface CanalModel { /** * 目标数据库 */ String database(); /** * 目标表 */ String table(); /** * 属性名 -> 列名命名转换策略,可选值有:DEFAULT(原始)、UPPER_UNDERSCORE(驼峰转下划线大写)和LOWER_UNDERSCORE(驼峰转下划线小写) */ FieldNamingPolicy fieldNamingPolicy() default FieldNamingPolicy.DEFAULT;}// @CanalField@Retention(RetentionPolicy.RUNTIME)@Target(ElementType.FIELD)public @interface CanalField { /** * 行名称 * * @return columnName */ String columnName() default ""; /** * sql字段类型 * * @return JDBCType */ JDBCType sqlType() default JDBCType.NULL; /** * 转换器类型 * * @return klass */ Class<? extends BaseCanalFieldConverter<?>> converterKlass() default NullCanalFieldConverter.class;} 定义顶层转换器接口BinLogFieldConverter : public interface BinLogFieldConverter<SOURCE, TARGET> { TARGET convert(SOURCE source);} 目前暂定可以通过目标属性的Class 和通过注解指定的SQLType 类型进行匹配,所以再定义一个抽象转换器BaseCanalFieldConverter : public abstract class BaseCanalFieldConverter<T> implements BinLogFieldConverter<String, T> { private final SQLType sqlType; private final Class<?> klass; protected BaseCanalFieldConverter(SQLType sqlType, Class<?> klass) { this.sqlType = sqlType; this.klass = klass; } @Override public T convert(String source) { if (StringUtils.isEmpty(source)) { return null; } return convertInternal(source); } /** * 内部转换方法 * * @param source 源字符串 * @return T */ protected abstract T convertInternal(String source); /** * 返回SQL类型 * * @return SQLType */ public SQLType sqlType() { return sqlType; } /** * 返回类型 * * @return Class<?> */ public Class<?> typeKlass() { return klass; }} BaseCanalFieldConverter 是面向目标实例中的单个属性的,例如对于实例中的Long 类型的属性,可以实现一个BigIntCanalFieldConverter :
public class BigIntCanalFieldConverter extends BaseCanalFieldConverter<Long> { /** * 单例 */
|
请发表评论