很多开发工作无非将从某个数据源的数据变换写入到另外一个数据源. 最简单的例子:
insert into dst.stat_table
select a, b, @func(c) as d, count(1) as cnt
from src1.log_table
join src2.dim_table on
group by a, b, d
如果在同一个数据库上, 就是直接执行一个SQL的事情. 但是由于业务上的拆分, 相关的表往往不在同一个数据库. 此外即便单个数据库, 主从部署角度考虑, 为了避免大的聚合查询影响主库写入, 也需要从库读主库写. 这就涉及到了很多开发工作.
SQL是一个非常成功的声明式语言, 把要做什么事情写得清清楚楚, 没有一句废话. 所以我们直接用SQL配置化的方式来实现我们的数据同步逻辑呢?
以上就是一开始做一个内部异构数据同步工具 (yugong) 的想法.
一下记录一些实践过程中总结的点.
数据管道
直接用Unix Pipe的做法:
mysql -h $src_host $src_db -B -N -e "select * from src_table" | clickhouse-client --host $dst_host --database $dst_db --query="insert into dst_table format TabSeparated"
缺点:
- 基于文本的序列化
- 没有考虑批量写入优化的问题
所以:
- 尽量使用各数据库原生的协议/数据类型, 避免内存占用过高 (Python实现版本里面用字典处理每一行就比元组处理内存占用高不少).
- 基于内存, 或者文件的一次性管道/或者缓存即可, 解耦上游/下游读写, 不要想太多.
SQL解析
其实不用畏难, 一开始不需要高大上的语法解释器, 跟据实际需求将SQL正则提取成数据源的查询语句和目标库的写入语句, 执行拿到数据执行写入即可.
为了支持一些简单的逻辑, 如UT模式拿游标等, 可以类似SQL存储过程的方式, 引入变量.
select last_timestamp as @last_timestamp from bookeeper where job_name = '...';
select a, b, count(1) from log where ut > @last_timestamp group by a, b;
对于数据量较大, 需要拆分成较多子任务执行的, 可以通过返回多行的值, 拆成子任务的方式来解决, 程序内部做这种分拆逻辑都比较麻烦.
select today()-i as dt from system.numbers limit 10;
--- 由于上述返回的是一个时间列表, 下面的命令一定是针对每个dt分别一次执行
insert into dst select ... from src where dt = @dt
以上变量以及子任务拆分基本满足绝大多数的业务同步逻辑需求.
UDF支持
业务中我们需要对一些内部算法能力快速调用. 数据库自定义UDF的方法一般都不太便利, 简单做通过注入自定义函数的方式简单实现. 结合SQL查询面板可以给到需求放做快速结果验收.
select @nlp_method(fields) from ...
缺点是没有办法再回到原来的SQL数据库做进一步的变换逻辑, 但是对于数据算法识别后写入任务来说已经基本够用了.
DSN化配置
数据库相关配置统一使用类似url的方式来定义数据配置, 从而方便解析.
mysql://user:pass@host:port/db?option_key=option_value...
elasticsearch://host:port/index
DSN也可以直接注解到SQL语句里面, 从而一个SQL语句包含任务的所有信息
select * from mysql://host:port/database
统一批处理/流处理逻辑
数据来源不一定是固定的数据库表, 而可能是增量的数据, 基于表的模式我们可以通过UT方式来做小批量同步. 缺点: 额外UT索引负担, 计算不够实时.
因此我们很多时候还是需要从实时的时间日志来源, 如kafka中实时统计汇总的. 这是我们可以通过SQL来抽象从数据库以及从kafka取数的差异.
` insert into stat (dt, aid, country, cnt) values (date(ts), aid, country, 1) on duplicate key update cnt = values(cnt)+1 select ts, aid from kafka://kafka:9092/#json `
开发中注意几个点:
- kafka数据源的数据解析问题, 简单的用JSON自然问题不大, 如果是proto等协议的化, 用动态语言来做更适合.
- 由于不能利用源数据的SQL能力, 在程序中的变换及汇总逻辑, 做起来比较复杂, 先简单支持如max/min/sum/count(distinct)等聚合逻辑及group by即可.