在线时间:8:00-16:00
迪恩网络APP
随时随地掌握行业动态
扫描二维码
关注迪恩网络微信公众号
开源软件名称:go-bin2es开源软件地址:https://gitee.com/happyjoejoe/go-bin2es开源软件介绍:go-bin2esgo-bin2es is a service syncing binlog to es 架构图:采用了go-mysql可以过滤指定的db的table, 从而把binlog数据通过配置的方法过滤后, 刷新到 特点:
Examplecreate database test;create table Parent ( id int not null auto_increment primary key, name varchar(64) not null, sex char(1) not null)comment = '父';create table Child ( id int not null auto_increment primary key, name varchar(64) not null, sex char(1) not null, parent_id int not null)comment = '子';Parent:insert into Parent (name, sex) values ('Tom', "m"); -- id:1 Child:insert into Child (name, sex, parent_id) values ('Tom1', 'm', 1); -- Tom's child1 insert into Child (name, sex, parent_id) values ('Tom2', 'm', 1); -- Tom's child2 insert into Child (name, sex, parent_id) values ('Tom3', 'm', 1); -- Tom's child3 Parent:insert into Parent (name, sex) values ('Jerry', "m"); -- id:2 Child:insert into Child (name, sex, parent_id) values ('Jerry1', 'm', 2); -- Jerry's child1 insert into Child (name, sex, parent_id) values ('Jerry2', 'f', 2); -- Jerry's child2 editing your config.toml, and configure it like following:[bin2es]sync_ch_len = 64[es]nodes = [ "http://localhost:9200" #es集群]user = "elastic"passwd = "123456"enable_authentication = false #开启认证insecure_skip_verify = true #是否忽略证书校验bulk_size = 1024 #批量刷新个数flush_duration = 500 #批量刷新时间间隔, 单位:ms[mysql]addr = "localhost"port = 3306user = "bin2es"pwd = "bin2es"charset = "utf8mb4"server_id = 3 #与其他slave节点的server_id不同即可[master_info]addr = "localhost"port = 3306user = "bin2es"pwd = "bin2es"charset = "utf8mb4"schema = "bin2es"table = "master_info"flush_duration = 3 #binlog position 刷新时间间隔, 单位:s[zk]enable = falselock_path = "/go-bin2es-lock"session_timeout = 1hosts = [ "localhost:2181"][etcd]enable = trueenable_tls = truelock_path = "/go-bin2es-lock"dial_timeout = 3cert_path = "/etc/etcd/etcdSSL/etcd.pem"key_path = "/etc/etcd/etcdSSL/etcd-key.pem"ca_path = "/etc/etcd/etcdSSL/ca.pem"endpoints = [ "127.0.0.1:2379",][[source]]schema = "test"dump_table = "Parent"tables = [ "Parent", "Child"] editing bin2es.json[ { "schema":"test", "tables":[ "Parent", "Child" ], "actions":[ "insert", "update" ], "pipeline":[ { "DoSQL":{ "sql":"SELECT Parent.id, Parent.name, Parent.sex, group_concat(concat_ws('_', Child.name, Child.sex) separator ',') as Childs FROM Parent join Child on Parent.id = Child.parent_id WHERE (?) GROUP BY Parent.id", "placeholders":{ #若遇到Parent或Child, 自动将`?`替换为`$key.$value = {该表对应的$value对应的字段的值}` "Parent":"id", #eg: 若遇到Parent, 则`?`被替换为`Parent.id = 行数据对应的`id`字段的值` "Child":"parent_id" #eg: 若遇到Child, 则`?`被替换为`Child.parent_id = 行数据对应的`parent_id`字段的值` } } }, { "Object":{ "common":"profile", "fields":{ "name":"es_name", #将查询到的结果的`name`字段放进`profile`的`es_name`下 "sex":"es_sex" #同理 } } }, { "NestedArray":{ "sql_field":"Childs", #将查询到的结果的`Childs`字段解析放入到common指定的`childs`下 "common":"childs", "pos2fields":{ #其中解析结果的第一个位置放入到es的`es_name`下, 第二个位置放入到`es_sex`下 "es_name":1, "es_sex":2 }, "fields_seprator": "_", "group_seprator": "," } }, { "SetDocID": { "_id": "id" } } ], "dest":{ "index":"test_es" #es的索引名 } }] execute ./bin/go-bin2es
{ "took": 0, "timed_out": false, "_shards": { "total": 1, "successful": 1, "skipped": 0, "failed": 0 }, "hits": { "total": { "value": 2, "relation": "eq" }, "max_score": 1.0, "hits": [ { "_index": "test_es", "_type": "_doc", "_id": "1", "_score": 1.0, "_source": { "childs": [ #nested array { "chl_name": "Tom1", "chl_sex": "f" }, { "chl_name": "Tom2", "chl_sex": "f" }, { "chl_name": "Tom3", "chl_sex": "m" } ], "id": "1", "profile": { # object "es_name": "Tom", "es_sex": "m" } } }, { "_index": "test_es", "_type": "_doc", "_id": "2", "_score": 1.0, "_source": { "childs": [ #nested array { "chl_name": "Jerry1", "chl_sex": "m" }, { "chl_name": "Jerry2", "chl_sex": "f" } ], "id": "2", "profile": { # object "es_name": "Jerry", "es_sex": "m" } } } ] }} 如果你觉得对你有用, 可以给我打赏一瓶Colo |
请发表评论