diff --git a/packages/api/src/shell/dataDuplicator.js b/packages/api/src/shell/dataDuplicator.js new file mode 100644 index 00000000..15f308c6 --- /dev/null +++ b/packages/api/src/shell/dataDuplicator.js @@ -0,0 +1,38 @@ +const stream = require('stream'); +const path = require('path'); +const { quoteFullName, fullNameToString, getLogger } = require('dbgate-tools'); +const requireEngineDriver = require('../utility/requireEngineDriver'); +const connectUtility = require('../utility/connectUtility'); +const logger = getLogger('dataDuplicator'); +const { DataDuplicator } = require('dbgate-datalib'); +const copyStream = require('./copyStream'); +const jsonLinesReader = require('./jsonLinesReader'); +const { resolveArchiveFolder } = require('../utility/directories'); + +async function dataDuplicator({ connection, archive, items, analysedStructure = null }) { + const driver = requireEngineDriver(connection); + const pool = await connectUtility(driver, connection, 'write'); + logger.info(`Connected.`); + + if (!analysedStructure) { + analysedStructure = await driver.analyseFull(pool); + } + + const dupl = new DataDuplicator( + pool, + driver, + analysedStructure, + items.map(item => ({ + name: item.name, + operation: item.operation, + matchColumns: item.matchColumns, + openStream: () => jsonLinesReader({ fileName: path.join(resolveArchiveFolder(archive), `${item.name}.jsonl`) }), + })), + stream, + copyStream + ); + + await dupl.run(); +} + +module.exports = dataDuplicator; diff --git a/packages/api/src/shell/index.js b/packages/api/src/shell/index.js index 31fd6ac1..2293a26e 100644 --- a/packages/api/src/shell/index.js +++ b/packages/api/src/shell/index.js @@ -26,6 +26,7 @@ const importDatabase = require('./importDatabase'); const loadDatabase = require('./loadDatabase'); const generateModelSql = require('./generateModelSql'); const modifyJsonLinesReader = require('./modifyJsonLinesReader'); +const dataDuplicator = require('./dataDuplicator'); const dbgateApi = { queryReader, @@ -55,6 +56,7 @@ const dbgateApi = { loadDatabase, generateModelSql, modifyJsonLinesReader, + dataDuplicator, }; requirePlugin.initializeDbgateApi(dbgateApi); diff --git a/packages/api/src/shell/jsonLinesReader.js b/packages/api/src/shell/jsonLinesReader.js index 399002ca..b226e487 100644 --- a/packages/api/src/shell/jsonLinesReader.js +++ b/packages/api/src/shell/jsonLinesReader.js @@ -35,7 +35,11 @@ class ParseStream extends stream.Transform { async function jsonLinesReader({ fileName, encoding = 'utf-8', limitRows = undefined }) { logger.info(`Reading file ${fileName}`); - const fileStream = fs.createReadStream(fileName, encoding); + const fileStream = fs.createReadStream( + fileName, + // @ts-ignore + encoding + ); const liner = byline(fileStream); const parser = new ParseStream({ limitRows }); liner.pipe(parser); diff --git a/packages/api/src/shell/modifyJsonLinesReader.js b/packages/api/src/shell/modifyJsonLinesReader.js index 5f1e7344..d76ed8c8 100644 --- a/packages/api/src/shell/modifyJsonLinesReader.js +++ b/packages/api/src/shell/modifyJsonLinesReader.js @@ -107,7 +107,11 @@ async function modifyJsonLinesReader({ }) { logger.info(`Reading file ${fileName} with change set`); - const fileStream = fs.createReadStream(fileName, encoding); + const fileStream = fs.createReadStream( + fileName, + // @ts-ignore + encoding + ); const liner = byline(fileStream); const parser = new ParseStream({ limitRows, changeSet, mergedRows, mergeKey, mergeMode }); liner.pipe(parser); diff --git a/packages/datalib/src/DataDuplicator.ts b/packages/datalib/src/DataDuplicator.ts new file mode 100644 index 00000000..b81ef5c9 --- /dev/null +++ b/packages/datalib/src/DataDuplicator.ts @@ -0,0 +1,202 @@ +import { createAsyncWriteStream, runCommandOnDriver, runQueryOnDriver } from 'dbgate-tools'; +import { DatabaseInfo, EngineDriver, ForeignKeyInfo, TableInfo } from 'dbgate-types'; +import _pick from 'lodash/pick'; +import _omit from 'lodash/omit'; + +export interface DataDuplicatorItem { + openStream: () => Promise; + name: string; + operation: 'copy' | 'lookup' | 'insertMissing'; + matchColumns: string[]; +} + +class DuplicatorReference { + constructor( + public base: DuplicatorItemHolder, + public ref: DuplicatorItemHolder, + public isMandatory: boolean, + public foreignKey: ForeignKeyInfo + ) {} + + get columnName() { + return this.foreignKey.columns[0].columnName; + } +} + +class DuplicatorItemHolder { + references: DuplicatorReference[] = []; + backReferences: DuplicatorReference[] = []; + table: TableInfo; + isPlanned = false; + idMap = {}; + autoColumn: string; + refByColumn: { [columnName: string]: DuplicatorReference } = {}; + isReferenced: boolean; + + get name() { + return this.item.name; + } + + constructor(public item: DataDuplicatorItem, public duplicator: DataDuplicator) { + this.table = duplicator.db.tables.find(x => x.pureName.toUpperCase() == item.name.toUpperCase()); + this.autoColumn = this.table.columns.find(x => x.autoIncrement)?.columnName; + if ( + this.table.primaryKey?.columns?.length != 1 || + this.table.primaryKey?.columns?.[0].columnName != this.autoColumn + ) { + this.autoColumn = null; + } + } + + initializeReferences() { + for (const fk of this.table.foreignKeys) { + if (fk.columns?.length != 1) continue; + const refHolder = this.duplicator.itemHolders.find(y => y.name.toUpperCase() == fk.refTableName.toUpperCase()); + if (refHolder == null) continue; + const isMandatory = this.table.columns.find(x => x.columnName == fk.columns[0]?.columnName)?.notNull; + const newref = new DuplicatorReference(this, refHolder, isMandatory, fk); + this.references.push(newref); + this.refByColumn[newref.columnName] = newref; + + refHolder.isReferenced = true; + } + } + + createInsertObject(chunk) { + const res = _omit( + _pick( + chunk, + this.table.columns.map(x => x.columnName) + ), + [this.autoColumn, ...this.backReferences.map(x => x.columnName)] + ); + + for (const key in res) { + const ref = this.refByColumn[key]; + if (ref) { + // remap id + res[key] = ref.ref.idMap[res[key]]; + } + } + + return res; + } + + async runImport() { + const readStream = await this.item.openStream(); + const driver = this.duplicator.driver; + const pool = this.duplicator.pool; + const writeStream = createAsyncWriteStream(this.duplicator.stream, { + processItem: async chunk => { + if (chunk.__isStreamHeader) { + return; + } + + const doCopy = async () => { + const insertedObj = this.createInsertObject(chunk); + await runCommandOnDriver(pool, driver, dmp => + dmp.putCmd( + '^insert ^into %f (%,i) ^values (%,v)', + this.table, + Object.keys(insertedObj), + Object.values(insertedObj) + ) + ); + if (this.autoColumn && this.isReferenced) { + const res = await runQueryOnDriver(pool, driver, dmp => dmp.selectScopeIdentity(this.table)); + const resId = Object.entries(res?.rows?.[0])?.[0]?.[1]; + if (resId != null) { + this.idMap[chunk[this.autoColumn]] = resId; + } + } + }; + + switch (this.item.operation) { + case 'copy': { + await doCopy(); + break; + } + case 'insertMissing': + case 'lookup': { + const res = await runQueryOnDriver(pool, driver, dmp => + dmp.put( + '^select %i ^from %f ^where %i = %v', + this.autoColumn, + this.table, + this.item.matchColumns[0], + chunk[this.item.matchColumns[0]] + ) + ); + const resId = Object.entries(res?.rows?.[0])?.[0]?.[1]; + if (resId != null) { + this.idMap[chunk[this.autoColumn]] = resId; + } else if (this.item.operation == 'insertMissing') { + await doCopy(); + } + break; + } + } + // this.idMap[oldId] = newId; + }, + }); + + await this.duplicator.copyStream(readStream, writeStream); + + // await this.duplicator.driver.writeQueryStream(this.duplicator.pool, { + // mapResultId: (oldId, newId) => { + // this.idMap[oldId] = newId; + // }, + // }); + } +} + +export class DataDuplicator { + itemHolders: DuplicatorItemHolder[]; + itemPlan: DuplicatorItemHolder[] = []; + + constructor( + public pool: any, + public driver: EngineDriver, + public db: DatabaseInfo, + public items: DataDuplicatorItem[], + public stream, + public copyStream: (input, output) => Promise + ) { + this.itemHolders = items.map(x => new DuplicatorItemHolder(x, this)); + this.itemHolders.forEach(x => x.initializeReferences()); + } + + findItemToPlan(): DuplicatorItemHolder { + for (const item of this.itemHolders) { + if (item.isPlanned) continue; + if (item.references.every(x => x.ref.isPlanned)) { + return item; + } + } + for (const item of this.itemHolders) { + if (item.isPlanned) continue; + if (item.references.every(x => x.ref.isPlanned || !x.isMandatory)) { + const backReferences = item.references.filter(x => !x.ref.isPlanned); + item.backReferences = backReferences; + return item; + } + } + throw new Error('Cycle in mandatory references'); + } + + createPlan() { + while (this.itemPlan.length < this.itemHolders.length) { + const item = this.findItemToPlan(); + item.isPlanned = true; + this.itemPlan.push(item); + } + } + + async run() { + this.createPlan(); + + for (const item of this.itemPlan) { + await item.runImport(); + } + } +} diff --git a/packages/datalib/src/index.ts b/packages/datalib/src/index.ts index db07707c..347769a5 100644 --- a/packages/datalib/src/index.ts +++ b/packages/datalib/src/index.ts @@ -22,3 +22,4 @@ export * from './processPerspectiveDefaultColunns'; export * from './PerspectiveDataPattern'; export * from './PerspectiveDataLoader'; export * from './perspectiveTools'; +export * from './DataDuplicator'; diff --git a/packages/tools/src/ScriptWriter.ts b/packages/tools/src/ScriptWriter.ts index cd84df05..84ab4bdb 100644 --- a/packages/tools/src/ScriptWriter.ts +++ b/packages/tools/src/ScriptWriter.ts @@ -57,6 +57,10 @@ export class ScriptWriter { this._put(`await dbgateApi.importDatabase(${JSON.stringify(options)});`); } + dataDuplicator(options) { + this._put(`await dbgateApi.dataDuplicator(${JSON.stringify(options)});`); + } + comment(s) { this._put(`// ${s}`); } @@ -143,6 +147,13 @@ export class ScriptWriterJson { }); } + dataDuplicator(options) { + this.commands.push({ + type: 'dataDuplicator', + options, + }); + } + getScript(schedule = null) { return { type: 'json', @@ -186,6 +197,9 @@ export function jsonScriptToJavascript(json) { case 'importDatabase': script.importDatabase(cmd.options); break; + case 'dataDuplicator': + script.dataDuplicator(cmd.options); + break; } } diff --git a/packages/tools/src/SqlDumper.ts b/packages/tools/src/SqlDumper.ts index 84a025cf..590b62c2 100644 --- a/packages/tools/src/SqlDumper.ts +++ b/packages/tools/src/SqlDumper.ts @@ -197,6 +197,8 @@ export class SqlDumper implements AlterProcessor { specialColumnOptions(column) {} + selectScopeIdentity(table: TableInfo) {} + columnDefinition(column: ColumnInfo, { includeDefault = true, includeNullable = true, includeCollate = true } = {}) { if (column.computedExpression) { this.put('^as %s', column.computedExpression); diff --git a/packages/tools/src/createAsyncWriteStream.ts b/packages/tools/src/createAsyncWriteStream.ts new file mode 100644 index 00000000..81dd1155 --- /dev/null +++ b/packages/tools/src/createAsyncWriteStream.ts @@ -0,0 +1,41 @@ +import _intersection from 'lodash/intersection'; +import _isArray from 'lodash/isArray'; +import { getLogger } from './getLogger'; + +const logger = getLogger('asyncWriteStream'); + +export interface AsyncWriteStreamOptions { + processItem: (chunk: any) => Promise; +} + +export function createAsyncWriteStream(stream, options: AsyncWriteStreamOptions): any { + const writable = new stream.Writable({ + objectMode: true, + }); + + writable._write = async (chunk, encoding, callback) => { + await options.processItem(chunk); + + // const { sql, id, newIdSql } = chunk; + // if (_isArray(sql)) { + // for (const item of sql) await driver.query(pool, item, { discardResult: true }); + // } else { + // await driver.query(pool, sql, { discardResult: true }); + // } + // if (newIdSql) { + // const res = await driver.query(pool, newIdSql); + // const resId = Object.entries(res?.rows?.[0])?.[0]?.[1]; + + // if (options?.mapResultId) { + // options?.mapResultId(id, resId as string); + // } + // } + callback(); + }; + + // writable._final = async callback => { + // callback(); + // }; + + return writable; +} diff --git a/packages/tools/src/createBulkInsertStreamBase.ts b/packages/tools/src/createBulkInsertStreamBase.ts index d6416528..689771bb 100644 --- a/packages/tools/src/createBulkInsertStreamBase.ts +++ b/packages/tools/src/createBulkInsertStreamBase.ts @@ -1,10 +1,11 @@ +import { EngineDriver, WriteTableOptions } from 'dbgate-types'; import _intersection from 'lodash/intersection'; import { getLogger } from './getLogger'; import { prepareTableForImport } from './tableTransforms'; const logger = getLogger('bulkStreamBase'); -export function createBulkInsertStreamBase(driver, stream, pool, name, options): any { +export function createBulkInsertStreamBase(driver: EngineDriver, stream, pool, name, options: WriteTableOptions): any { const fullNameQuoted = name.schemaName ? `${driver.dialect.quoteIdentifier(name.schemaName)}.${driver.dialect.quoteIdentifier(name.pureName)}` : driver.dialect.quoteIdentifier(name.pureName); @@ -58,21 +59,21 @@ export function createBulkInsertStreamBase(driver, stream, pool, name, options): const dmp = driver.createDumper(); dmp.putRaw(`INSERT INTO ${fullNameQuoted} (`); - dmp.putCollection(',', writable.columnNames, col => dmp.putRaw(driver.dialect.quoteIdentifier(col))); + dmp.putCollection(',', writable.columnNames, col => dmp.putRaw(driver.dialect.quoteIdentifier(col as string))); dmp.putRaw(')\n VALUES\n'); let wasRow = false; for (const row of rows) { if (wasRow) dmp.putRaw(',\n'); dmp.putRaw('('); - dmp.putCollection(',', writable.columnNames, col => dmp.putValue(row[col])); + dmp.putCollection(',', writable.columnNames, col => dmp.putValue(row[col as string])); dmp.putRaw(')'); wasRow = true; } dmp.putRaw(';'); // require('fs').writeFileSync('/home/jena/test.sql', dmp.s); // console.log(dmp.s); - await driver.query(pool, dmp.s); + await driver.query(pool, dmp.s, { discardResult: true }); }; writable.sendIfFull = async () => { diff --git a/packages/tools/src/driverBase.ts b/packages/tools/src/driverBase.ts index 7887f4b6..ac7b312a 100644 --- a/packages/tools/src/driverBase.ts +++ b/packages/tools/src/driverBase.ts @@ -2,7 +2,7 @@ import _compact from 'lodash/compact'; import { SqlDumper } from './SqlDumper'; import { splitQuery } from 'dbgate-query-splitter'; import { dumpSqlSelect } from 'dbgate-sqltree'; -import { EngineDriver, RunScriptOptions } from 'dbgate-types'; +import { EngineDriver, QueryResult, RunScriptOptions } from 'dbgate-types'; const dialect = { limitSelect: true, @@ -20,12 +20,22 @@ const dialect = { defaultSchemaName: null, }; -export async function runCommandOnDriver(pool, driver: EngineDriver, cmd: (dmp: SqlDumper) => void) { +export async function runCommandOnDriver(pool, driver: EngineDriver, cmd: (dmp: SqlDumper) => void): Promise { const dmp = driver.createDumper(); cmd(dmp as any); await driver.query(pool, dmp.s, { discardResult: true }); } +export async function runQueryOnDriver( + pool, + driver: EngineDriver, + cmd: (dmp: SqlDumper) => void +): Promise { + const dmp = driver.createDumper(); + cmd(dmp as any); + return await driver.query(pool, dmp.s); +} + export const driverBase = { analyserClass: null, dumperClass: SqlDumper, diff --git a/packages/tools/src/index.ts b/packages/tools/src/index.ts index b98ea267..f5e614e2 100644 --- a/packages/tools/src/index.ts +++ b/packages/tools/src/index.ts @@ -3,6 +3,7 @@ export * from './nameTools'; export * from './tableTransforms'; export * from './packageTools'; export * from './createBulkInsertStreamBase'; +export * from './createAsyncWriteStream'; export * from './DatabaseAnalyser'; export * from './driverBase'; export * from './SqlDumper'; diff --git a/packages/web/src/tabs/DataDuplicatorTab.svelte b/packages/web/src/tabs/DataDuplicatorTab.svelte index b077868a..1ec7d20d 100644 --- a/packages/web/src/tabs/DataDuplicatorTab.svelte +++ b/packages/web/src/tabs/DataDuplicatorTab.svelte @@ -1,22 +1,54 @@ + + -
-
Imported files
+ +
+
Source archive
+ { + setEditorData(old => ({ + ...old, + archiveFolder: e.detail, + })); + }} + options={$archiveFolders?.map(x => ({ + label: x.name, + value: x.name, + })) || []} + /> - Table', fieldName: 'name' }, - { header: 'Operation', fieldName: 'operation', slot: 2 }, - { header: 'Match column', fieldName: 'matchColumn1', slot: 3 }, - ]} - > - - { - changeTable({ ...row, isChecked: e.target.checked }); - }} - /> - - - { - changeTable({ ...row, operation: e.detail }); - }} - disabled={!row.isChecked} - options={[ - { label: 'Copy row', value: 'copy' }, - { label: 'Lookup (find matching row)', value: 'lookup' }, - { label: 'Insert if not exists', value: 'insertMissing' }, - ]} - /> - - - {#if row.operation != 'copy'} +
Imported files
+ + Table', fieldName: 'name' }, + { header: 'Operation', fieldName: 'operation', slot: 2 }, + { header: 'Match column', fieldName: 'matchColumn1', slot: 3 }, + ]} + > + + { + changeTable({ ...row, isChecked: e.target.checked }); + }} + /> + + { - changeTable({ ...row, matchColumn1: e.detail }); + changeTable({ ...row, operation: e.detail }); }} disabled={!row.isChecked} - options={$dbinfo?.tables - ?.find(x => x.pureName?.toUpperCase() == row.name.toUpperCase()) - ?.columns?.map(col => ({ - label: col.columnName, - value: col.columnName, - })) || []} + options={[ + { label: 'Copy row', value: 'copy' }, + { label: 'Lookup (find matching row)', value: 'lookup' }, + { label: 'Insert if not exists', value: 'insertMissing' }, + ]} /> - {/if} - - -
+ + + {#if row.operation != 'copy'} + { + changeTable({ ...row, matchColumn1: e.detail }); + }} + disabled={!row.isChecked} + options={$dbinfo?.tables + ?.find(x => x.pureName?.toUpperCase() == row.name.toUpperCase()) + ?.columns?.map(col => ({ + label: col.columnName, + value: col.columnName, + })) || []} + /> + {/if} + + +
+ + + + +