数据仓库-同步机制
背景
江湖
enterprice是一个多应用的项目, 其中data_repository为江湖多应用项目的数据仓库,每个应用之间的数据要进行数据共享和同步,包括用户数据和每个应用之间的业务数据共享。
为了保障数据仓库应用稳定运行,数据仓库应用以 单应用形式运行。
代码库: data_repository
实现
同步方案配置
同步源库: 需要同步到数据仓库的数据库(DataBase)
同步源表: 需要同步到数据仓库的数据表(Table)
同步间隔: 需要同步的数据库、表的同步间隔时间(Interval)
同步方式: 手动同步、自动同步
DROP TABLE IF EXISTS `_table_sync_config`;CREATE TABLE `_table_sync_config` (`id` int(11) NOT NULL AUTO_INCREMENT,`sourceDatabase` varchar(255) DEFAULT NULL,`sourceTable` varchar(255) DEFAULT NULL,`syncTimeSlot` varchar(255) DEFAULT NULL,`syncDesc` varchar(255) DEFAULT NULL COMMENT '同步状态, 正常, 源表不存在; 目标表不存在;,源表不存在; 目标表存在; ',`lastSyncTime` varchar(255) DEFAULT NULL COMMENT '最后一次触发同步的时间',`operation` varchar(255) DEFAULT 'insert' COMMENT '操作; insert, update, jhInsert, jhUpdate, jhDelete jhRestore',`operationByUserId` varchar(255) DEFAULT NULL COMMENT '操作者userId',`operationByUser` varchar(255) DEFAULT NULL COMMENT '操作者用户名',`operationAt` varchar(255) DEFAULT NULL COMMENT '操作时间; E.g: 2021-05-28T10:24:54+08:00 ',PRIMARY KEY (`id`) USING BTREE) ENGINE=InnoDB AUTO_INCREMENT=207 DEFAULT CHARSET=utf8mb4;
同步机制
同步方式:
自动同步: 修改data_repository/config/config.local.jsdataSyncStatus: '启用' // 是否启用同步,启用/禁用
schedule: {immediate: true,interval: '60s', // 1 分钟间隔; 2m 30stype: 'worker', // 只有一个worker执行disable: app.config.dataSyncStatus !== '启用',}
手动同步:- 批量同步: 在data_repository
数据同步管理页面点击全部-手动同步即可 - 单个同步: 在data_repository
数据同步管理页面点击同步即可
- 批量同步: 在data_repository
同步库类型
- 内部库表: 同一个mysql服务端的数据库、表
- 外部库表: 不在同一个mysql服务端的数据库、表
仓库表覆盖
如果定时同步过程中发现数据结构不一致或者数据不一致(参考[03_数据对比工具之diff]),会出发目标表的数据覆盖
表结构不一致
await targetKnex.raw(`DROP TABLE IF EXISTS ${targetDatabase}.${targetTable};`);await targetKnex.raw(exceptTargetTableDDL);await targetKnex.raw(`REPLACE INTO ${targetDatabase}.${targetTable} select * from ${sourceDatabase}.${sourceTable};`);const syncDesc = '【表覆盖】结构不一致; 触发覆盖仓库表逻辑;';await createTableSyncLog({jianghuKnex, tableSyncConfig, syncDesc, syncAction: '仓库表覆盖'});
表数据不一致
const {added, removed, changed} = hyperDiffResult;if (added.length > 0) {await knex(`${targetDatabase}.${targetTable}`).insert(added);}if (removed.length > 0) {const idList = removed.map(item => item.id);await knex(`${targetDatabase}.${targetTable}`).whereIn('id', idList).delete();}if (changed.length > 0) {for (const item of changed) {const {id, ...updateParam} = item.new;await knex(`${targetDatabase}.${targetTable}`).where({id}).update(updateParam);}}
触发器覆盖
- 初次定时同步时,会创建mysql的Trigger(
insert,update,delete)实现数据的实时同步await jianghuKnex.raw(`DROP TRIGGER IF EXISTS ${sourceDatabase}.${INSERTTriggerName};`);await jianghuKnex.raw(INSERTTriggerCreateSql);
- 初次定时同步时,会创建mysql的Trigger(
清理数据
- 每次定时同步时,会处理掉已经废弃库、表关联的Trigger触发器,定时清理脏数据
const {TRIGGER_SCHEMA: sourceDatabase,TRIGGER_NAME: triggerName, EVENT_MANIPULATION: triggerEvent,} = trigger;const tableSyncConfigExist = tableSyncConfigList.find(item => triggerName === `${item.sourceDatabase}__${item.sourceTable}_${triggerEvent}`);if (!tableSyncConfigExist) {await jianghuKnex.raw(`DROP TRIGGER IF EXISTS ${sourceDatabase}.${triggerName};`);logger.warn(`[${triggerName}]`, '无用的mysql trigger, 执行删除逻辑;');}
- 每次定时同步时,会处理掉已经废弃库、表关联的Trigger触发器,定时清理脏数据
同步日志
每次定时同步的执行日志,会写入同步日志表
DROP TABLE IF EXISTS `_table_sync_log`;CREATE TABLE `_table_sync_log` (`id` int(11) NOT NULL AUTO_INCREMENT,`sourceDatabase` varchar(255) DEFAULT NULL,`sourceTable` varchar(255) DEFAULT NULL,`syncAction` varchar(255) DEFAULT NULL COMMENT '同步动作',`syncDesc` varchar(255) DEFAULT NULL COMMENT '同步描述',`syncTime` varchar(255) DEFAULT NULL COMMENT '同步触发时间',`operation` varchar(255) DEFAULT 'insert' COMMENT '操作; insert, update, jhInsert, jhUpdate, jhDelete jhRestore',`operationByUserId` varchar(255) DEFAULT NULL COMMENT '操作者userId',`operationByUser` varchar(255) DEFAULT NULL COMMENT '操作者用户名',`operationAt` varchar(255) DEFAULT NULL COMMENT '操作时间; E.g: 2021-05-28T10:24:54+08:00 ',PRIMARY KEY (`id`) USING BTREE) ENGINE=InnoDB AUTO_INCREMENT=9 DEFAULT CHARSET=utf8mb4;