数据仓库-同步机制
背景
江湖
enterprice
是一个多应用的项目, 其中data_repository
为江湖多应用项目的数据仓库,每个应用之间的数据要进行数据共享和同步,包括用户数据和每个应用之间的业务数据共享。
为了保障数据仓库应用稳定运行,数据仓库应用以 单应用形式运行。
代码库: data_repository
实现
同步方案配置
同步源库: 需要同步到数据仓库的数据库(DataBase)
同步源表: 需要同步到数据仓库的数据表(Table)
同步间隔: 需要同步的数据库、表的同步间隔时间(Interval)
同步方式: 手动同步、自动同步
DROP TABLE IF EXISTS `_table_sync_config`;
CREATE TABLE `_table_sync_config` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`sourceDatabase` varchar(255) DEFAULT NULL,
`sourceTable` varchar(255) DEFAULT NULL,
`syncTimeSlot` varchar(255) DEFAULT NULL,
`syncDesc` varchar(255) DEFAULT NULL COMMENT '同步状态, 正常, 源表不存在; 目标表不存在;,源表不存在; 目标表存在; ',
`lastSyncTime` varchar(255) DEFAULT NULL COMMENT '最后一次触发同步的时间',
`operation` varchar(255) DEFAULT 'insert' COMMENT '操作; insert, update, jhInsert, jhUpdate, jhDelete jhRestore',
`operationByUserId` varchar(255) DEFAULT NULL COMMENT '操作者userId',
`operationByUser` varchar(255) DEFAULT NULL COMMENT '操作者用户名',
`operationAt` varchar(255) DEFAULT NULL COMMENT '操作时间; E.g: 2021-05-28T10:24:54+08:00 ',
PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=207 DEFAULT CHARSET=utf8mb4;
同步机制
同步方式:
自动同步
: 修改data_repository/config/config.local.jsdataSyncStatus: '启用' // 是否启用同步,启用/禁用
schedule: {
immediate: true,
interval: '60s', // 1 分钟间隔; 2m 30s
type: 'worker', // 只有一个worker执行
disable: app.config.dataSyncStatus !== '启用',
}
手动同步
:- 批量同步: 在data_repository
数据同步管理
页面点击全部-手动同步
即可 - 单个同步: 在data_repository
数据同步管理
页面点击同步
即可
- 批量同步: 在data_repository
同步库类型
- 内部库表: 同一个mysql服务端的数据库、表
- 外部库表: 不在同一个mysql服务端的数据库、表
仓库表覆盖
如果定时同步过程中发现数据结构不一致或者数据不一致(参考[03_数据对比工具之diff]),会出发目标表的数据覆盖
表结构不一致
await targetKnex.raw(`DROP TABLE IF EXISTS ${targetDatabase}.${targetTable};`);
await targetKnex.raw(exceptTargetTableDDL);
await targetKnex.raw(`REPLACE INTO ${targetDatabase}.${targetTable} select * from ${sourceDatabase}.${sourceTable};`);
const syncDesc = '【表覆盖】结构不一致; 触发覆盖仓库表逻辑;';
await createTableSyncLog({jianghuKnex, tableSyncConfig, syncDesc, syncAction: '仓库表覆盖'});
表数据不一致
const {added, removed, changed} = hyperDiffResult;
if (added.length > 0) {
await knex(`${targetDatabase}.${targetTable}`).insert(added);
}
if (removed.length > 0) {
const idList = removed.map(item => item.id);
await knex(`${targetDatabase}.${targetTable}`).whereIn('id', idList).delete();
}
if (changed.length > 0) {
for (const item of changed) {
const {id, ...updateParam} = item.new;
await knex(`${targetDatabase}.${targetTable}`).where({id}).update(updateParam);
}
}
触发器覆盖
- 初次定时同步时,会创建mysql的Trigger(
insert
,update
,delete
)实现数据的实时同步await jianghuKnex.raw(`DROP TRIGGER IF EXISTS ${sourceDatabase}.${INSERTTriggerName};`);
await jianghuKnex.raw(INSERTTriggerCreateSql);
- 初次定时同步时,会创建mysql的Trigger(
清理数据
- 每次定时同步时,会处理掉已经废弃库、表关联的Trigger触发器,定时清理脏数据
const {
TRIGGER_SCHEMA: sourceDatabase,
TRIGGER_NAME: triggerName, EVENT_MANIPULATION: triggerEvent,
} = trigger;
const tableSyncConfigExist = tableSyncConfigList.find(item => triggerName === `${item.sourceDatabase}__${item.sourceTable}_${triggerEvent}`);
if (!tableSyncConfigExist) {
await jianghuKnex.raw(`DROP TRIGGER IF EXISTS ${sourceDatabase}.${triggerName};`);
logger.warn(`[${triggerName}]`, '无用的mysql trigger, 执行删除逻辑;');
}
- 每次定时同步时,会处理掉已经废弃库、表关联的Trigger触发器,定时清理脏数据
同步日志
每次定时同步的执行日志,会写入同步日志表
DROP TABLE IF EXISTS `_table_sync_log`;
CREATE TABLE `_table_sync_log` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`sourceDatabase` varchar(255) DEFAULT NULL,
`sourceTable` varchar(255) DEFAULT NULL,
`syncAction` varchar(255) DEFAULT NULL COMMENT '同步动作',
`syncDesc` varchar(255) DEFAULT NULL COMMENT '同步描述',
`syncTime` varchar(255) DEFAULT NULL COMMENT '同步触发时间',
`operation` varchar(255) DEFAULT 'insert' COMMENT '操作; insert, update, jhInsert, jhUpdate, jhDelete jhRestore',
`operationByUserId` varchar(255) DEFAULT NULL COMMENT '操作者userId',
`operationByUser` varchar(255) DEFAULT NULL COMMENT '操作者用户名',
`operationAt` varchar(255) DEFAULT NULL COMMENT '操作时间; E.g: 2021-05-28T10:24:54+08:00 ',
PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=9 DEFAULT CHARSET=utf8mb4;