数据仓库-同步机制

背景

江湖enterprice是一个多应用的项目, 其中data_repository为江湖多应用项目的数据仓库,每个应用之间的数据要进行数据共享和同步,包括用户数据和每个应用之间的业务数据共享。

为了保障数据仓库应用稳定运行,数据仓库应用以 单应用形式运行。
代码库: data_repository

实现

同步方案配置

  • 同步源库: 需要同步到数据仓库的数据库(DataBase)

  • 同步源表: 需要同步到数据仓库的数据表(Table)

  • 同步间隔: 需要同步的数据库、表的同步间隔时间(Interval)

  • 同步方式: 手动同步、自动同步

    1. DROP TABLE IF EXISTS `_table_sync_config`;
    2. CREATE TABLE `_table_sync_config` (
    3. `id` int(11) NOT NULL AUTO_INCREMENT,
    4. `sourceDatabase` varchar(255) DEFAULT NULL,
    5. `sourceTable` varchar(255) DEFAULT NULL,
    6. `syncTimeSlot` varchar(255) DEFAULT NULL,
    7. `syncDesc` varchar(255) DEFAULT NULL COMMENT '同步状态, 正常, 源表不存在; 目标表不存在;,源表不存在; 目标表存在; ',
    8. `lastSyncTime` varchar(255) DEFAULT NULL COMMENT '最后一次触发同步的时间',
    9. `operation` varchar(255) DEFAULT 'insert' COMMENT '操作; insert, update, jhInsert, jhUpdate, jhDelete jhRestore',
    10. `operationByUserId` varchar(255) DEFAULT NULL COMMENT '操作者userId',
    11. `operationByUser` varchar(255) DEFAULT NULL COMMENT '操作者用户名',
    12. `operationAt` varchar(255) DEFAULT NULL COMMENT '操作时间; E.g: 2021-05-28T10:24:54+08:00 ',
    13. PRIMARY KEY (`id`) USING BTREE
    14. ) ENGINE=InnoDB AUTO_INCREMENT=207 DEFAULT CHARSET=utf8mb4;

同步机制

  • 同步方式:

    • 自动同步: 修改data_repository/config/config.local.js
      1. dataSyncStatus: '启用' // 是否启用同步,启用/禁用
      1. schedule: {
      2. immediate: true,
      3. interval: '60s', // 1 分钟间隔; 2m 30s
      4. type: 'worker', // 只有一个worker执行
      5. disable: app.config.dataSyncStatus !== '启用',
      6. }
    • 手动同步:
      • 批量同步: 在data_repository数据同步管理页面点击全部-手动同步即可
      • 单个同步: 在data_repository数据同步管理页面点击同步即可
  • 同步库类型

    • 内部库表: 同一个mysql服务端的数据库、表
    • 外部库表: 不在同一个mysql服务端的数据库、表
  • 仓库表覆盖

    • 如果定时同步过程中发现数据结构不一致或者数据不一致(参考[03_数据对比工具之diff]),会出发目标表的数据覆盖

      • 表结构不一致

        1. await targetKnex.raw(`DROP TABLE IF EXISTS ${targetDatabase}.${targetTable};`);
        2. await targetKnex.raw(exceptTargetTableDDL);
        3. await targetKnex.raw(`REPLACE INTO ${targetDatabase}.${targetTable} select * from ${sourceDatabase}.${sourceTable};`);
        4. const syncDesc = '【表覆盖】结构不一致; 触发覆盖仓库表逻辑;';
        5. await createTableSyncLog({jianghuKnex, tableSyncConfig, syncDesc, syncAction: '仓库表覆盖'});
      • 表数据不一致

        1. const {added, removed, changed} = hyperDiffResult;
        2. if (added.length > 0) {
        3. await knex(`${targetDatabase}.${targetTable}`).insert(added);
        4. }
        5. if (removed.length > 0) {
        6. const idList = removed.map(item => item.id);
        7. await knex(`${targetDatabase}.${targetTable}`).whereIn('id', idList).delete();
        8. }
        9. if (changed.length > 0) {
        10. for (const item of changed) {
        11. const {id, ...updateParam} = item.new;
        12. await knex(`${targetDatabase}.${targetTable}`).where({id}).update(updateParam);
        13. }
        14. }
  • 触发器覆盖

    • 初次定时同步时,会创建mysql的Trigger(insert,update,delete)实现数据的实时同步
      1. await jianghuKnex.raw(`DROP TRIGGER IF EXISTS ${sourceDatabase}.${INSERTTriggerName};`);
      2. await jianghuKnex.raw(INSERTTriggerCreateSql);
  • 清理数据

    • 每次定时同步时,会处理掉已经废弃库、表关联的Trigger触发器,定时清理脏数据
      1. const {
      2. TRIGGER_SCHEMA: sourceDatabase,
      3. TRIGGER_NAME: triggerName, EVENT_MANIPULATION: triggerEvent,
      4. } = trigger;
      5. const tableSyncConfigExist = tableSyncConfigList.find(item => triggerName === `${item.sourceDatabase}__${item.sourceTable}_${triggerEvent}`);
      6. if (!tableSyncConfigExist) {
      7. await jianghuKnex.raw(`DROP TRIGGER IF EXISTS ${sourceDatabase}.${triggerName};`);
      8. logger.warn(`[${triggerName}]`, '无用的mysql trigger, 执行删除逻辑;');
      9. }
  • 同步日志

    • 每次定时同步的执行日志,会写入同步日志表

      1. DROP TABLE IF EXISTS `_table_sync_log`;
      2. CREATE TABLE `_table_sync_log` (
      3. `id` int(11) NOT NULL AUTO_INCREMENT,
      4. `sourceDatabase` varchar(255) DEFAULT NULL,
      5. `sourceTable` varchar(255) DEFAULT NULL,
      6. `syncAction` varchar(255) DEFAULT NULL COMMENT '同步动作',
      7. `syncDesc` varchar(255) DEFAULT NULL COMMENT '同步描述',
      8. `syncTime` varchar(255) DEFAULT NULL COMMENT '同步触发时间',
      9. `operation` varchar(255) DEFAULT 'insert' COMMENT '操作; insert, update, jhInsert, jhUpdate, jhDelete jhRestore',
      10. `operationByUserId` varchar(255) DEFAULT NULL COMMENT '操作者userId',
      11. `operationByUser` varchar(255) DEFAULT NULL COMMENT '操作者用户名',
      12. `operationAt` varchar(255) DEFAULT NULL COMMENT '操作时间; E.g: 2021-05-28T10:24:54+08:00 ',
      13. PRIMARY KEY (`id`) USING BTREE
      14. ) ENGINE=InnoDB AUTO_INCREMENT=9 DEFAULT CHARSET=utf8mb4;