data duplicator works in simple case

This commit is contained in:
Jan Prochazka 2023-02-11 10:17:10 +01:00
parent f3dd187df7
commit b5e37053b8
17 changed files with 517 additions and 61 deletions

View File

@ -0,0 +1,38 @@
const stream = require('stream');
const path = require('path');
const { quoteFullName, fullNameToString, getLogger } = require('dbgate-tools');
const requireEngineDriver = require('../utility/requireEngineDriver');
const connectUtility = require('../utility/connectUtility');
const logger = getLogger('dataDuplicator');
const { DataDuplicator } = require('dbgate-datalib');
const copyStream = require('./copyStream');
const jsonLinesReader = require('./jsonLinesReader');
const { resolveArchiveFolder } = require('../utility/directories');
async function dataDuplicator({ connection, archive, items, analysedStructure = null }) {
const driver = requireEngineDriver(connection);
const pool = await connectUtility(driver, connection, 'write');
logger.info(`Connected.`);
if (!analysedStructure) {
analysedStructure = await driver.analyseFull(pool);
}
const dupl = new DataDuplicator(
pool,
driver,
analysedStructure,
items.map(item => ({
name: item.name,
operation: item.operation,
matchColumns: item.matchColumns,
openStream: () => jsonLinesReader({ fileName: path.join(resolveArchiveFolder(archive), `${item.name}.jsonl`) }),
})),
stream,
copyStream
);
await dupl.run();
}
module.exports = dataDuplicator;

View File

@ -26,6 +26,7 @@ const importDatabase = require('./importDatabase');
const loadDatabase = require('./loadDatabase');
const generateModelSql = require('./generateModelSql');
const modifyJsonLinesReader = require('./modifyJsonLinesReader');
const dataDuplicator = require('./dataDuplicator');
const dbgateApi = {
queryReader,
@ -55,6 +56,7 @@ const dbgateApi = {
loadDatabase,
generateModelSql,
modifyJsonLinesReader,
dataDuplicator,
};
requirePlugin.initializeDbgateApi(dbgateApi);

View File

@ -35,7 +35,11 @@ class ParseStream extends stream.Transform {
async function jsonLinesReader({ fileName, encoding = 'utf-8', limitRows = undefined }) {
logger.info(`Reading file ${fileName}`);
const fileStream = fs.createReadStream(fileName, encoding);
const fileStream = fs.createReadStream(
fileName,
// @ts-ignore
encoding
);
const liner = byline(fileStream);
const parser = new ParseStream({ limitRows });
liner.pipe(parser);

View File

@ -107,7 +107,11 @@ async function modifyJsonLinesReader({
}) {
logger.info(`Reading file ${fileName} with change set`);
const fileStream = fs.createReadStream(fileName, encoding);
const fileStream = fs.createReadStream(
fileName,
// @ts-ignore
encoding
);
const liner = byline(fileStream);
const parser = new ParseStream({ limitRows, changeSet, mergedRows, mergeKey, mergeMode });
liner.pipe(parser);

View File

@ -0,0 +1,202 @@
import { createAsyncWriteStream, runCommandOnDriver, runQueryOnDriver } from 'dbgate-tools';
import { DatabaseInfo, EngineDriver, ForeignKeyInfo, TableInfo } from 'dbgate-types';
import _pick from 'lodash/pick';
import _omit from 'lodash/omit';
export interface DataDuplicatorItem {
openStream: () => Promise<ReadableStream>;
name: string;
operation: 'copy' | 'lookup' | 'insertMissing';
matchColumns: string[];
}
class DuplicatorReference {
constructor(
public base: DuplicatorItemHolder,
public ref: DuplicatorItemHolder,
public isMandatory: boolean,
public foreignKey: ForeignKeyInfo
) {}
get columnName() {
return this.foreignKey.columns[0].columnName;
}
}
class DuplicatorItemHolder {
references: DuplicatorReference[] = [];
backReferences: DuplicatorReference[] = [];
table: TableInfo;
isPlanned = false;
idMap = {};
autoColumn: string;
refByColumn: { [columnName: string]: DuplicatorReference } = {};
isReferenced: boolean;
get name() {
return this.item.name;
}
constructor(public item: DataDuplicatorItem, public duplicator: DataDuplicator) {
this.table = duplicator.db.tables.find(x => x.pureName.toUpperCase() == item.name.toUpperCase());
this.autoColumn = this.table.columns.find(x => x.autoIncrement)?.columnName;
if (
this.table.primaryKey?.columns?.length != 1 ||
this.table.primaryKey?.columns?.[0].columnName != this.autoColumn
) {
this.autoColumn = null;
}
}
initializeReferences() {
for (const fk of this.table.foreignKeys) {
if (fk.columns?.length != 1) continue;
const refHolder = this.duplicator.itemHolders.find(y => y.name.toUpperCase() == fk.refTableName.toUpperCase());
if (refHolder == null) continue;
const isMandatory = this.table.columns.find(x => x.columnName == fk.columns[0]?.columnName)?.notNull;
const newref = new DuplicatorReference(this, refHolder, isMandatory, fk);
this.references.push(newref);
this.refByColumn[newref.columnName] = newref;
refHolder.isReferenced = true;
}
}
createInsertObject(chunk) {
const res = _omit(
_pick(
chunk,
this.table.columns.map(x => x.columnName)
),
[this.autoColumn, ...this.backReferences.map(x => x.columnName)]
);
for (const key in res) {
const ref = this.refByColumn[key];
if (ref) {
// remap id
res[key] = ref.ref.idMap[res[key]];
}
}
return res;
}
async runImport() {
const readStream = await this.item.openStream();
const driver = this.duplicator.driver;
const pool = this.duplicator.pool;
const writeStream = createAsyncWriteStream(this.duplicator.stream, {
processItem: async chunk => {
if (chunk.__isStreamHeader) {
return;
}
const doCopy = async () => {
const insertedObj = this.createInsertObject(chunk);
await runCommandOnDriver(pool, driver, dmp =>
dmp.putCmd(
'^insert ^into %f (%,i) ^values (%,v)',
this.table,
Object.keys(insertedObj),
Object.values(insertedObj)
)
);
if (this.autoColumn && this.isReferenced) {
const res = await runQueryOnDriver(pool, driver, dmp => dmp.selectScopeIdentity(this.table));
const resId = Object.entries(res?.rows?.[0])?.[0]?.[1];
if (resId != null) {
this.idMap[chunk[this.autoColumn]] = resId;
}
}
};
switch (this.item.operation) {
case 'copy': {
await doCopy();
break;
}
case 'insertMissing':
case 'lookup': {
const res = await runQueryOnDriver(pool, driver, dmp =>
dmp.put(
'^select %i ^from %f ^where %i = %v',
this.autoColumn,
this.table,
this.item.matchColumns[0],
chunk[this.item.matchColumns[0]]
)
);
const resId = Object.entries(res?.rows?.[0])?.[0]?.[1];
if (resId != null) {
this.idMap[chunk[this.autoColumn]] = resId;
} else if (this.item.operation == 'insertMissing') {
await doCopy();
}
break;
}
}
// this.idMap[oldId] = newId;
},
});
await this.duplicator.copyStream(readStream, writeStream);
// await this.duplicator.driver.writeQueryStream(this.duplicator.pool, {
// mapResultId: (oldId, newId) => {
// this.idMap[oldId] = newId;
// },
// });
}
}
export class DataDuplicator {
itemHolders: DuplicatorItemHolder[];
itemPlan: DuplicatorItemHolder[] = [];
constructor(
public pool: any,
public driver: EngineDriver,
public db: DatabaseInfo,
public items: DataDuplicatorItem[],
public stream,
public copyStream: (input, output) => Promise<void>
) {
this.itemHolders = items.map(x => new DuplicatorItemHolder(x, this));
this.itemHolders.forEach(x => x.initializeReferences());
}
findItemToPlan(): DuplicatorItemHolder {
for (const item of this.itemHolders) {
if (item.isPlanned) continue;
if (item.references.every(x => x.ref.isPlanned)) {
return item;
}
}
for (const item of this.itemHolders) {
if (item.isPlanned) continue;
if (item.references.every(x => x.ref.isPlanned || !x.isMandatory)) {
const backReferences = item.references.filter(x => !x.ref.isPlanned);
item.backReferences = backReferences;
return item;
}
}
throw new Error('Cycle in mandatory references');
}
createPlan() {
while (this.itemPlan.length < this.itemHolders.length) {
const item = this.findItemToPlan();
item.isPlanned = true;
this.itemPlan.push(item);
}
}
async run() {
this.createPlan();
for (const item of this.itemPlan) {
await item.runImport();
}
}
}

View File

@ -22,3 +22,4 @@ export * from './processPerspectiveDefaultColunns';
export * from './PerspectiveDataPattern';
export * from './PerspectiveDataLoader';
export * from './perspectiveTools';
export * from './DataDuplicator';

View File

@ -57,6 +57,10 @@ export class ScriptWriter {
this._put(`await dbgateApi.importDatabase(${JSON.stringify(options)});`);
}
dataDuplicator(options) {
this._put(`await dbgateApi.dataDuplicator(${JSON.stringify(options)});`);
}
comment(s) {
this._put(`// ${s}`);
}
@ -143,6 +147,13 @@ export class ScriptWriterJson {
});
}
dataDuplicator(options) {
this.commands.push({
type: 'dataDuplicator',
options,
});
}
getScript(schedule = null) {
return {
type: 'json',
@ -186,6 +197,9 @@ export function jsonScriptToJavascript(json) {
case 'importDatabase':
script.importDatabase(cmd.options);
break;
case 'dataDuplicator':
script.dataDuplicator(cmd.options);
break;
}
}

View File

@ -197,6 +197,8 @@ export class SqlDumper implements AlterProcessor {
specialColumnOptions(column) {}
selectScopeIdentity(table: TableInfo) {}
columnDefinition(column: ColumnInfo, { includeDefault = true, includeNullable = true, includeCollate = true } = {}) {
if (column.computedExpression) {
this.put('^as %s', column.computedExpression);

View File

@ -0,0 +1,41 @@
import _intersection from 'lodash/intersection';
import _isArray from 'lodash/isArray';
import { getLogger } from './getLogger';
const logger = getLogger('asyncWriteStream');
export interface AsyncWriteStreamOptions {
processItem: (chunk: any) => Promise<void>;
}
export function createAsyncWriteStream(stream, options: AsyncWriteStreamOptions): any {
const writable = new stream.Writable({
objectMode: true,
});
writable._write = async (chunk, encoding, callback) => {
await options.processItem(chunk);
// const { sql, id, newIdSql } = chunk;
// if (_isArray(sql)) {
// for (const item of sql) await driver.query(pool, item, { discardResult: true });
// } else {
// await driver.query(pool, sql, { discardResult: true });
// }
// if (newIdSql) {
// const res = await driver.query(pool, newIdSql);
// const resId = Object.entries(res?.rows?.[0])?.[0]?.[1];
// if (options?.mapResultId) {
// options?.mapResultId(id, resId as string);
// }
// }
callback();
};
// writable._final = async callback => {
// callback();
// };
return writable;
}

View File

@ -1,10 +1,11 @@
import { EngineDriver, WriteTableOptions } from 'dbgate-types';
import _intersection from 'lodash/intersection';
import { getLogger } from './getLogger';
import { prepareTableForImport } from './tableTransforms';
const logger = getLogger('bulkStreamBase');
export function createBulkInsertStreamBase(driver, stream, pool, name, options): any {
export function createBulkInsertStreamBase(driver: EngineDriver, stream, pool, name, options: WriteTableOptions): any {
const fullNameQuoted = name.schemaName
? `${driver.dialect.quoteIdentifier(name.schemaName)}.${driver.dialect.quoteIdentifier(name.pureName)}`
: driver.dialect.quoteIdentifier(name.pureName);
@ -58,21 +59,21 @@ export function createBulkInsertStreamBase(driver, stream, pool, name, options):
const dmp = driver.createDumper();
dmp.putRaw(`INSERT INTO ${fullNameQuoted} (`);
dmp.putCollection(',', writable.columnNames, col => dmp.putRaw(driver.dialect.quoteIdentifier(col)));
dmp.putCollection(',', writable.columnNames, col => dmp.putRaw(driver.dialect.quoteIdentifier(col as string)));
dmp.putRaw(')\n VALUES\n');
let wasRow = false;
for (const row of rows) {
if (wasRow) dmp.putRaw(',\n');
dmp.putRaw('(');
dmp.putCollection(',', writable.columnNames, col => dmp.putValue(row[col]));
dmp.putCollection(',', writable.columnNames, col => dmp.putValue(row[col as string]));
dmp.putRaw(')');
wasRow = true;
}
dmp.putRaw(';');
// require('fs').writeFileSync('/home/jena/test.sql', dmp.s);
// console.log(dmp.s);
await driver.query(pool, dmp.s);
await driver.query(pool, dmp.s, { discardResult: true });
};
writable.sendIfFull = async () => {

View File

@ -2,7 +2,7 @@ import _compact from 'lodash/compact';
import { SqlDumper } from './SqlDumper';
import { splitQuery } from 'dbgate-query-splitter';
import { dumpSqlSelect } from 'dbgate-sqltree';
import { EngineDriver, RunScriptOptions } from 'dbgate-types';
import { EngineDriver, QueryResult, RunScriptOptions } from 'dbgate-types';
const dialect = {
limitSelect: true,
@ -20,12 +20,22 @@ const dialect = {
defaultSchemaName: null,
};
export async function runCommandOnDriver(pool, driver: EngineDriver, cmd: (dmp: SqlDumper) => void) {
export async function runCommandOnDriver(pool, driver: EngineDriver, cmd: (dmp: SqlDumper) => void): Promise<void> {
const dmp = driver.createDumper();
cmd(dmp as any);
await driver.query(pool, dmp.s, { discardResult: true });
}
export async function runQueryOnDriver(
pool,
driver: EngineDriver,
cmd: (dmp: SqlDumper) => void
): Promise<QueryResult> {
const dmp = driver.createDumper();
cmd(dmp as any);
return await driver.query(pool, dmp.s);
}
export const driverBase = {
analyserClass: null,
dumperClass: SqlDumper,

View File

@ -3,6 +3,7 @@ export * from './nameTools';
export * from './tableTransforms';
export * from './packageTools';
export * from './createBulkInsertStreamBase';
export * from './createAsyncWriteStream';
export * from './DatabaseAnalyser';
export * from './driverBase';
export * from './SqlDumper';

View File

@ -1,22 +1,54 @@
<script lang="ts" context="module">
const getCurrentEditor = () => getActiveComponent('DataDuplicatorTab');
registerCommand({
id: 'dataDuplicator.run',
category: 'Data duplicator',
name: 'Run',
toolbar: true,
isRelatedToTab: true,
icon: 'icon run',
testEnabled: () => getCurrentEditor()?.canRun(),
onClick: () => getCurrentEditor().run(),
});
</script>
<script lang="ts">
import ro from 'date-fns/locale/ro';
import { ScriptWriter, ScriptWriterJson } from 'dbgate-tools';
import _ from 'lodash';
import ToolStripCommandButton from '../buttons/ToolStripCommandButton.svelte';
import ToolStripContainer from '../buttons/ToolStripContainer.svelte';
import invalidateCommands from '../commands/invalidateCommands';
import registerCommand from '../commands/registerCommand';
import TableControl from '../elements/TableControl.svelte';
import CheckboxField from '../forms/CheckboxField.svelte';
import SelectField from '../forms/SelectField.svelte';
import { extractShellConnection } from '../impexp/createImpExpScript';
import useEditorData from '../query/useEditorData';
import { currentArchive } from '../stores';
import { useArchiveFiles, useConnectionInfo, useDatabaseInfo } from '../utility/metadataLoaders';
import TableStructureTab from './TableStructureTab.svelte';
import { currentArchive, getCurrentConfig } from '../stores';
import { apiCall, apiOff, apiOn } from '../utility/api';
import { changeTab } from '../utility/common';
import createActivator, { getActiveComponent } from '../utility/createActivator';
import { useArchiveFiles, useArchiveFolders, useConnectionInfo, useDatabaseInfo } from '../utility/metadataLoaders';
import useEffect from '../utility/useEffect';
export let conid;
export let database;
export let tabid;
let busy = false;
let runnerId = null;
export const activator = createActivator('DataDuplicatorTab', true);
$: connection = useConnectionInfo({ conid });
$: dbinfo = useDatabaseInfo({ conid, database });
$: archiveFiles = useArchiveFiles({ folder: $currentArchive });
$: archiveFolders = useArchiveFolders();
$: archiveFiles = useArchiveFiles({ folder: $editorState?.value?.archiveFolder });
$: pairedNames = _.intersectionBy(
$dbinfo?.tables?.map(x => x.pureName),
@ -24,6 +56,16 @@
(x: string) => _.toUpper(x)
);
$: {
changeTab(tabid, tab => ({ ...tab, busy }));
}
$: {
busy;
runnerId;
invalidateCommands();
}
const { editorState, editorValue, setEditorData } = useEditorData({
tabid,
onInitialData: value => {
@ -41,8 +83,58 @@
}));
}
function createScript(forceScript = false) {
const config = getCurrentConfig();
const script = config.allowShellScripting || forceScript ? new ScriptWriter() : new ScriptWriterJson();
script.dataDuplicator({
connection: extractShellConnection($connection, database),
archive: $editorState.value.archiveFolder,
items: tableRows
.filter(x => x.isChecked)
.map(row => ({
name: row.name,
operation: row.operation,
matchColumns: _.compact([row.matchColumn1]),
})),
});
return script.getScript();
}
export function canRun() {
return !!tableRows.find(x => x.isChecked) && !busy;
}
export async function run() {
if (busy) return;
busy = true;
const script = await createScript();
let runid = runnerId;
const resp = await apiCall('runners/start', { script });
runid = resp.runid;
runnerId = runid;
}
$: effect = useEffect(() => registerRunnerDone(runnerId));
function registerRunnerDone(rid) {
if (rid) {
apiOn(`runner-done-${rid}`, handleRunnerDone);
return () => {
apiOff(`runner-done-${rid}`, handleRunnerDone);
};
} else {
return () => {};
}
}
$: $effect;
const handleRunnerDone = () => {
busy = false;
};
// $: console.log('$archiveFiles', $archiveFiles);
// $: console.log('$editorState', $editorState);
$: console.log('$editorState', $editorState.value);
$: tableRows = pairedNames.map(name => {
const item = $editorState?.value?.tables?.[name];
@ -59,63 +151,87 @@
matchColumn1,
};
});
$: console.log('$archiveFolders', $archiveFolders);
</script>
<div>
<div class="bold m-2">Imported files</div>
<ToolStripContainer>
<div>
<div class="bold m-2">Source archive</div>
<SelectField
isNative
value={$editorState.value?.archiveFolder}
on:change={e => {
setEditorData(old => ({
...old,
archiveFolder: e.detail,
}));
}}
options={$archiveFolders?.map(x => ({
label: x.name,
value: x.name,
})) || []}
/>
<TableControl
rows={tableRows}
columns={[
{ header: '', fieldName: 'isChecked', slot: 1 },
{ header: 'File=>Table', fieldName: 'name' },
{ header: 'Operation', fieldName: 'operation', slot: 2 },
{ header: 'Match column', fieldName: 'matchColumn1', slot: 3 },
]}
>
<svelte:fragment slot="1" let:row>
<CheckboxField
checked={row.isChecked}
on:change={e => {
changeTable({ ...row, isChecked: e.target.checked });
}}
/>
</svelte:fragment>
<svelte:fragment slot="2" let:row>
<SelectField
isNative
value={row.operation}
on:change={e => {
changeTable({ ...row, operation: e.detail });
}}
disabled={!row.isChecked}
options={[
{ label: 'Copy row', value: 'copy' },
{ label: 'Lookup (find matching row)', value: 'lookup' },
{ label: 'Insert if not exists', value: 'insertMissing' },
]}
/>
</svelte:fragment>
<svelte:fragment slot="3" let:row>
{#if row.operation != 'copy'}
<div class="bold m-2">Imported files</div>
<TableControl
rows={tableRows}
columns={[
{ header: '', fieldName: 'isChecked', slot: 1 },
{ header: 'File=>Table', fieldName: 'name' },
{ header: 'Operation', fieldName: 'operation', slot: 2 },
{ header: 'Match column', fieldName: 'matchColumn1', slot: 3 },
]}
>
<svelte:fragment slot="1" let:row>
<CheckboxField
checked={row.isChecked}
on:change={e => {
changeTable({ ...row, isChecked: e.target.checked });
}}
/>
</svelte:fragment>
<svelte:fragment slot="2" let:row>
<SelectField
isNative
value={row.matchColumn1}
value={row.operation}
on:change={e => {
changeTable({ ...row, matchColumn1: e.detail });
changeTable({ ...row, operation: e.detail });
}}
disabled={!row.isChecked}
options={$dbinfo?.tables
?.find(x => x.pureName?.toUpperCase() == row.name.toUpperCase())
?.columns?.map(col => ({
label: col.columnName,
value: col.columnName,
})) || []}
options={[
{ label: 'Copy row', value: 'copy' },
{ label: 'Lookup (find matching row)', value: 'lookup' },
{ label: 'Insert if not exists', value: 'insertMissing' },
]}
/>
{/if}
</svelte:fragment>
</TableControl>
</div>
</svelte:fragment>
<svelte:fragment slot="3" let:row>
{#if row.operation != 'copy'}
<SelectField
isNative
value={row.matchColumn1}
on:change={e => {
changeTable({ ...row, matchColumn1: e.detail });
}}
disabled={!row.isChecked}
options={$dbinfo?.tables
?.find(x => x.pureName?.toUpperCase() == row.name.toUpperCase())
?.columns?.map(col => ({
label: col.columnName,
value: col.columnName,
})) || []}
/>
{/if}
</svelte:fragment>
</TableControl>
</div>
<svelte:fragment slot="toolstrip">
<ToolStripCommandButton command="dataDuplicator.run" />
</svelte:fragment>
</ToolStripContainer>
<!-- <div>
{#each pairedNames as name}
<div>{name}</div>

View File

@ -155,6 +155,10 @@ class MsSqlDumper extends SqlDumper {
newname
);
}
selectScopeIdentity() {
this.put('^select ^scope_identity()');
}
}
MsSqlDumper.prototype.renameView = MsSqlDumper.prototype.renameObject;

View File

@ -89,6 +89,10 @@ class Dumper extends SqlDumper {
putByteArrayValue(value) {
this.putRaw(`unhex('${arrayToHexString(value)}')`);
}
selectScopeIdentity() {
this.put('^select ^last_insert_id()')
}
}
module.exports = Dumper;

View File

@ -99,6 +99,14 @@ class Dumper extends SqlDumper {
putByteArrayValue(value) {
this.putRaw(`e'\\\\x${arrayToHexString(value)}'`);
}
selectScopeIdentity(table) {
this.put(
"^SELECT currval(pg_get_serial_sequence('%f','%s'))",
table,
table.columns?.find(x => x.autoIncrement)?.[0]?.columnName
);
}
}
module.exports = Dumper;

View File

@ -16,6 +16,10 @@ class Dumper extends SqlDumper {
truncateTable(name) {
this.putCmd('^delete ^from %f', name);
}
selectScopeIdentity() {
this.put('^select last_insert_rowid()')
}
}
module.exports = Dumper;