From 38b6350ef8bb0b332f72556466cd4e1468caf4e9 Mon Sep 17 00:00:00 2001 From: Jan Prochazka Date: Thu, 11 Jun 2020 13:58:34 +0200 Subject: [PATCH] mssql bulk table writer --- packages/api/src/shell/index.js | 2 + packages/api/src/shell/tableWriter.js | 14 +++ packages/engines/default/DatabaseAnalyser.js | 1 + packages/engines/default/SqlDumper.js | 2 +- packages/engines/mssql/MsSqlAnalyser.js | 20 ++++ .../engines/mssql/createBulkInsertStream.js | 94 +++++++++++++++++++ packages/engines/mssql/index.js | 23 +++++ packages/engines/mysql/index.js | 1 + packages/engines/postgres/index.js | 1 + packages/types/dialect.d.ts | 1 + packages/types/engines.d.ts | 16 +++- test/importTable.js | 16 +++- 12 files changed, 187 insertions(+), 4 deletions(-) create mode 100644 packages/api/src/shell/tableWriter.js create mode 100644 packages/engines/mssql/createBulkInsertStream.js diff --git a/packages/api/src/shell/index.js b/packages/api/src/shell/index.js index 31f6c3f5..d1e2f21e 100644 --- a/packages/api/src/shell/index.js +++ b/packages/api/src/shell/index.js @@ -2,6 +2,7 @@ const queryReader = require('./queryReader'); const csvWriter = require('./csvWriter'); const csvReader = require('./csvReader'); const runScript = require('./runScript'); +const tableWriter = require('./tableWriter'); const copyStream = require('./copyStream'); const fakeObjectReader = require('./fakeObjectReader'); const consoleObjectWriter = require('./consoleObjectWriter'); @@ -11,6 +12,7 @@ module.exports = { csvWriter, csvReader, runScript, + tableWriter, copyStream, fakeObjectReader, consoleObjectWriter, diff --git a/packages/api/src/shell/tableWriter.js b/packages/api/src/shell/tableWriter.js new file mode 100644 index 00000000..89ca5a8f --- /dev/null +++ b/packages/api/src/shell/tableWriter.js @@ -0,0 +1,14 @@ +const driverConnect = require('../utility/driverConnect'); + +const engines = require('@dbgate/engines'); + +async function tableWriter({ connection, schemaName, pureName, ...options }) { + console.log(`write table ${schemaName}.${pureName}`); + + const driver = engines(connection); + const pool = await driverConnect(driver, connection); + console.log(`Connected.`); + return await driver.writeTable(pool, { schemaName, pureName }, options); +} + +module.exports = tableWriter; diff --git a/packages/engines/default/DatabaseAnalyser.js b/packages/engines/default/DatabaseAnalyser.js index 6961e481..9fee5ea7 100644 --- a/packages/engines/default/DatabaseAnalyser.js +++ b/packages/engines/default/DatabaseAnalyser.js @@ -14,6 +14,7 @@ class DatabaseAnalyser { this.structure = null; /** import('@dbgate/types').DatabaseModification[]) */ this.modifications = null; + this.singleObjectFilter = null; } async _runAnalysis() { diff --git a/packages/engines/default/SqlDumper.js b/packages/engines/default/SqlDumper.js index 0705ece4..fbf2dd8b 100644 --- a/packages/engines/default/SqlDumper.js +++ b/packages/engines/default/SqlDumper.js @@ -155,7 +155,7 @@ class SqlDumper { if (column.isPersisted) this.put(' ^persisted'); return; } - this.put('%k', column.dataType); + this.put('%k', column.dataType || this.dialect.fallbackDataType); if (column.autoIncrement) { this.autoIncrement(); } diff --git a/packages/engines/mssql/MsSqlAnalyser.js b/packages/engines/mssql/MsSqlAnalyser.js index 44dd6c20..17cb98f2 100644 --- a/packages/engines/mssql/MsSqlAnalyser.js +++ b/packages/engines/mssql/MsSqlAnalyser.js @@ -3,6 +3,7 @@ const _ = require('lodash'); const sql = require('./sql'); const DatabaseAnalyser = require('../default/DatabaseAnalyser'); +const { filter } = require('lodash'); function objectTypeToField(type) { switch (type.trim()) { @@ -159,10 +160,17 @@ function detectType(col) { class MsSqlAnalyser extends DatabaseAnalyser { constructor(pool, driver) { super(pool, driver); + this.singleObjectId = null; } createQuery(resFileName, filterIdObjects) { let res = sql[resFileName]; + if (this.singleObjectFilter) { + const { typeField } = this.singleObjectFilter; + if (!this.singleObjectId) return null; + if (!filterIdObjects.includes(typeField)) return null; + return res.replace('=[OBJECT_ID_CONDITION]', ` = ${this.singleObjectId}`); + } if (!this.modifications || !filterIdObjects || this.modifications.length == 0) { res = res.replace('=[OBJECT_ID_CONDITION]', ' is not null'); } else { @@ -177,7 +185,19 @@ class MsSqlAnalyser extends DatabaseAnalyser { } return res; } + + async getSingleObjectId() { + if (this.singleObjectFilter) { + const { name, typeField } = this.singleObjectFilter; + const { schemaName, pureName } = name; + const fullName = schemaName ? `[${schemaName}].[${pureName}]` : pureName; + const resId = await this.driver.query(this.pool, `SELECT OBJECT_ID('${fullName}') AS id`); + this.singleObjectId = resId.rows[0].id; + } + } + async _runAnalysis() { + await this.getSingleObjectId(); const tablesRows = await this.driver.query(this.pool, this.createQuery('tables', ['tables'])); const columnsRows = await this.driver.query(this.pool, this.createQuery('columns', ['tables'])); const pkColumnsRows = await this.driver.query(this.pool, this.createQuery('primaryKeys', ['tables'])); diff --git a/packages/engines/mssql/createBulkInsertStream.js b/packages/engines/mssql/createBulkInsertStream.js new file mode 100644 index 00000000..93679466 --- /dev/null +++ b/packages/engines/mssql/createBulkInsertStream.js @@ -0,0 +1,94 @@ +const _ = require('lodash'); + +/** + * + * @param {import('@dbgate/types').EngineDriver} driver + */ +function createBulkInsertStream(driver, mssql, stream, pool, name, options) { + const fullName = name.schemaName ? `[${name.schemaName}].[${name.pureName}]` : name.pureName; + const fullNameQuoted = name.schemaName ? `[${name.schemaName}].[${name.pureName}]` : `[${name.pureName}]`; + + const writable = new stream.Writable({ + objectMode: true, + }); + + writable.buffer = []; + writable.structure = null; + writable.columnNames = null; + + writable.addRow = async (row) => { + if (writable.structure) { + writable.buffer.push(row); + } else { + writable.structure = row; + await writable.checkStructure(); + } + }; + + writable.checkStructure = async () => { + let structure = await driver.analyseSingleTable(pool, name); + if (structure && options.dropIfExists) { + console.log(`Dropping table ${fullName}`); + await driver.query(pool, `DROP TABLE ${fullNameQuoted}`); + } + if (options.createIfNotExists && (!structure || options.dropIfExists)) { + console.log(`Creating table ${fullName}`); + const dmp = driver.createDumper(); + dmp.createTable({ ...writable.structure, ...name }); + console.log(dmp.s); + await driver.query(pool, dmp.s); + structure = await driver.analyseSingleTable(pool, name); + } + if (options.truncate) { + await driver.query(pool, `TRUNCATE TABLE ${fullNameQuoted}`); + } + + const respTemplate = await pool.request().query(`SELECT * FROM ${fullNameQuoted} WHERE 1=0`); + writable.templateColumns = respTemplate.recordset.toTable().columns; + + this.columnNames = _.intersection( + structure.columns.map((x) => x.columnName), + writable.structure.columns.map((x) => x.columnName) + ); + }; + + writable.send = async () => { + const rows = writable.buffer; + writable.buffer = []; + const table = new mssql.Table(fullName); + // table.create = options.createIfNotExists; + for (const column of this.columnNames) { + const tcol = writable.templateColumns.find((x) => x.name == column); + // console.log('TCOL', tcol); + // console.log('TYPE', tcol.type, mssql.Int); + // table.columns.add(column, tcol ? tcol.type : mssql.NVarChar(mssql.MAX)); + table.columns.add(column, tcol ? tcol.type : mssql.NVarChar(mssql.MAX), { nullable: tcol.nullable }); + } + for (const row of rows) { + table.rows.add(...this.columnNames.map((col) => row[col])); + } + const request = pool.request(); + await request.bulk(table); + }; + + writable.sendIfFull = async () => { + if (writable.buffer.length > 100) { + await writable.send(); + } + }; + + writable._write = async (chunk, encoding, callback) => { + await writable.addRow(chunk); + await writable.sendIfFull(); + callback(); + }; + + writable._final = async (callback) => { + await writable.send(); + callback(); + }; + + return writable; +} + +module.exports = createBulkInsertStream; diff --git a/packages/engines/mssql/index.js b/packages/engines/mssql/index.js index 4f9bd7f4..2ef681c5 100644 --- a/packages/engines/mssql/index.js +++ b/packages/engines/mssql/index.js @@ -1,6 +1,8 @@ const _ = require('lodash'); const MsSqlAnalyser = require('./MsSqlAnalyser'); const MsSqlDumper = require('./MsSqlDumper'); +const createBulkInsertStream = require('./createBulkInsertStream'); +const { analyseSingleObject } = require('../mysql'); /** @type {import('@dbgate/types').SqlDialect} */ const dialect = { @@ -8,6 +10,7 @@ const dialect = { rangeSelect: true, offsetFetchRangeSyntax: true, stringEscapeChar: "'", + fallbackDataType: 'nvarchar(max)', quoteIdentifier(s) { return `[${s}]`; }, @@ -68,6 +71,12 @@ const driver = { }, // @ts-ignore async query(pool, sql) { + if (sql == null) { + return { + rows: [], + columns: [], + }; + } const resp = await pool.request().query(sql); // console.log(Object.keys(resp.recordset)); // console.log(resp); @@ -179,6 +188,10 @@ const driver = { return pass; }, + async writeTable(pool, name, options) { + const { stream, mssql } = pool._nativeModules; + return createBulkInsertStream(this, mssql, stream, pool, name, options); + }, async getVersion(pool) { const { version } = (await this.query(pool, 'SELECT @@VERSION AS version')).rows[0]; return { version }; @@ -191,6 +204,16 @@ const driver = { const analyser = new MsSqlAnalyser(pool, this); return analyser.fullAnalysis(); }, + async analyseSingleObject(pool, name, typeField = 'tables') { + const analyser = new MsSqlAnalyser(pool, this); + analyser.singleObjectFilter = { name, typeField }; + const res = await analyser.fullAnalysis(); + return res.tables[0]; + }, + // @ts-ignore + analyseSingleTable(pool, name) { + return this.analyseSingleObject(pool, name, 'tables'); + }, async analyseIncremental(pool, structure) { const analyser = new MsSqlAnalyser(pool, this); return analyser.incrementalAnalysis(structure); diff --git a/packages/engines/mysql/index.js b/packages/engines/mysql/index.js index b943be03..601a38cc 100644 --- a/packages/engines/mysql/index.js +++ b/packages/engines/mysql/index.js @@ -5,6 +5,7 @@ const MySqlDumper = require('./MySqlDumper'); const dialect = { rangeSelect: true, stringEscapeChar: '\\', + fallbackDataType: 'nvarchar(max)', quoteIdentifier(s) { return '`' + s + '`'; }, diff --git a/packages/engines/postgres/index.js b/packages/engines/postgres/index.js index f06b6dab..5c56c678 100644 --- a/packages/engines/postgres/index.js +++ b/packages/engines/postgres/index.js @@ -6,6 +6,7 @@ const PostgreDumper = require('./PostgreDumper'); const dialect = { rangeSelect: true, stringEscapeChar: '\\', + fallbackDataType: 'nvarchar(max)', quoteIdentifier(s) { return '"' + s + '"'; }, diff --git a/packages/types/dialect.d.ts b/packages/types/dialect.d.ts index 5e96a364..4cb44fed 100644 --- a/packages/types/dialect.d.ts +++ b/packages/types/dialect.d.ts @@ -4,4 +4,5 @@ export interface SqlDialect { stringEscapeChar: string; offsetFetchRangeSyntax?: boolean; quoteIdentifier(s: string): string; + fallbackDataType?: string; } diff --git a/packages/types/engines.d.ts b/packages/types/engines.d.ts index a6fb0374..56e2b5c5 100644 --- a/packages/types/engines.d.ts +++ b/packages/types/engines.d.ts @@ -2,7 +2,7 @@ import stream from 'stream'; import { QueryResult } from './query'; import { SqlDialect } from './dialect'; import { SqlDumper } from './dumper'; -import { DatabaseInfo, NamedObjectInfo } from './dbinfo'; +import { DatabaseInfo, NamedObjectInfo, TableInfo, ViewInfo, ProcedureInfo, FunctionInfo, TriggerInfo } from './dbinfo'; export interface StreamOptions { recordset: (columns) => void; @@ -12,13 +12,25 @@ export interface StreamOptions { info: (info) => void; } +export interface WriteTableOptions { + dropIfExists?: boolean; + truncate?: boolean; + createIfNotExists?: boolean; +} + export interface EngineDriver { engine: string; connect(nativeModules, { server, port, user, password, database }): any; query(pool: any, sql: string): Promise; stream(pool: any, sql: string, options: StreamOptions); readQuery(pool: any, sql: string): Promise; - writeTable(pool: any, { schemaName, pureName }): Promise; + writeTable(pool: any, name: NamedObjectInfo, options: WriteTableOptions): Promise; + analyseSingleObject( + pool: any, + name: NamedObjectInfo, + objectTypeField: keyof DatabaseInfo + ): Promise; + analyseSingleTable(pool: any, name: NamedObjectInfo): Promise; getVersion(pool: any): Promise<{ version: string }>; listDatabases( pool: any diff --git a/test/importTable.js b/test/importTable.js index 010ffa8b..e36b36cd 100644 --- a/test/importTable.js +++ b/test/importTable.js @@ -6,6 +6,19 @@ async function run() { // header: false, }); + const tableWriter = await dbgateApi.tableWriter({ + connection: { + server: 'localhost', + engine: 'mssql', + user: 'sa', + password: 'Pwd2020Db', + database: 'Chinook', + }, + schemaName: 'dbo', + pureName: 'Genre2', + createIfNotExists: true, + truncate: true, + }); // const tableWriter = await dbgateApi.tableWriter({ // connection: { @@ -21,7 +34,8 @@ async function run() { const consoleWriter = await dbgateApi.consoleObjectWriter(); - await dbgateApi.copyStream(csvReader, consoleWriter); + // await dbgateApi.copyStream(csvReader, consoleWriter); + await dbgateApi.copyStream(csvReader, tableWriter); } dbgateApi.runScript(run);