mssql bulk table writer

This commit is contained in:
Jan Prochazka 2020-06-11 13:58:34 +02:00
parent a86f7e96ca
commit 38b6350ef8
12 changed files with 187 additions and 4 deletions

View File

@ -2,6 +2,7 @@ const queryReader = require('./queryReader');
const csvWriter = require('./csvWriter'); const csvWriter = require('./csvWriter');
const csvReader = require('./csvReader'); const csvReader = require('./csvReader');
const runScript = require('./runScript'); const runScript = require('./runScript');
const tableWriter = require('./tableWriter');
const copyStream = require('./copyStream'); const copyStream = require('./copyStream');
const fakeObjectReader = require('./fakeObjectReader'); const fakeObjectReader = require('./fakeObjectReader');
const consoleObjectWriter = require('./consoleObjectWriter'); const consoleObjectWriter = require('./consoleObjectWriter');
@ -11,6 +12,7 @@ module.exports = {
csvWriter, csvWriter,
csvReader, csvReader,
runScript, runScript,
tableWriter,
copyStream, copyStream,
fakeObjectReader, fakeObjectReader,
consoleObjectWriter, consoleObjectWriter,

View File

@ -0,0 +1,14 @@
const driverConnect = require('../utility/driverConnect');
const engines = require('@dbgate/engines');
async function tableWriter({ connection, schemaName, pureName, ...options }) {
console.log(`write table ${schemaName}.${pureName}`);
const driver = engines(connection);
const pool = await driverConnect(driver, connection);
console.log(`Connected.`);
return await driver.writeTable(pool, { schemaName, pureName }, options);
}
module.exports = tableWriter;

View File

@ -14,6 +14,7 @@ class DatabaseAnalyser {
this.structure = null; this.structure = null;
/** import('@dbgate/types').DatabaseModification[]) */ /** import('@dbgate/types').DatabaseModification[]) */
this.modifications = null; this.modifications = null;
this.singleObjectFilter = null;
} }
async _runAnalysis() { async _runAnalysis() {

View File

@ -155,7 +155,7 @@ class SqlDumper {
if (column.isPersisted) this.put(' ^persisted'); if (column.isPersisted) this.put(' ^persisted');
return; return;
} }
this.put('%k', column.dataType); this.put('%k', column.dataType || this.dialect.fallbackDataType);
if (column.autoIncrement) { if (column.autoIncrement) {
this.autoIncrement(); this.autoIncrement();
} }

View File

@ -3,6 +3,7 @@ const _ = require('lodash');
const sql = require('./sql'); const sql = require('./sql');
const DatabaseAnalyser = require('../default/DatabaseAnalyser'); const DatabaseAnalyser = require('../default/DatabaseAnalyser');
const { filter } = require('lodash');
function objectTypeToField(type) { function objectTypeToField(type) {
switch (type.trim()) { switch (type.trim()) {
@ -159,10 +160,17 @@ function detectType(col) {
class MsSqlAnalyser extends DatabaseAnalyser { class MsSqlAnalyser extends DatabaseAnalyser {
constructor(pool, driver) { constructor(pool, driver) {
super(pool, driver); super(pool, driver);
this.singleObjectId = null;
} }
createQuery(resFileName, filterIdObjects) { createQuery(resFileName, filterIdObjects) {
let res = sql[resFileName]; let res = sql[resFileName];
if (this.singleObjectFilter) {
const { typeField } = this.singleObjectFilter;
if (!this.singleObjectId) return null;
if (!filterIdObjects.includes(typeField)) return null;
return res.replace('=[OBJECT_ID_CONDITION]', ` = ${this.singleObjectId}`);
}
if (!this.modifications || !filterIdObjects || this.modifications.length == 0) { if (!this.modifications || !filterIdObjects || this.modifications.length == 0) {
res = res.replace('=[OBJECT_ID_CONDITION]', ' is not null'); res = res.replace('=[OBJECT_ID_CONDITION]', ' is not null');
} else { } else {
@ -177,7 +185,19 @@ class MsSqlAnalyser extends DatabaseAnalyser {
} }
return res; return res;
} }
async getSingleObjectId() {
if (this.singleObjectFilter) {
const { name, typeField } = this.singleObjectFilter;
const { schemaName, pureName } = name;
const fullName = schemaName ? `[${schemaName}].[${pureName}]` : pureName;
const resId = await this.driver.query(this.pool, `SELECT OBJECT_ID('${fullName}') AS id`);
this.singleObjectId = resId.rows[0].id;
}
}
async _runAnalysis() { async _runAnalysis() {
await this.getSingleObjectId();
const tablesRows = await this.driver.query(this.pool, this.createQuery('tables', ['tables'])); const tablesRows = await this.driver.query(this.pool, this.createQuery('tables', ['tables']));
const columnsRows = await this.driver.query(this.pool, this.createQuery('columns', ['tables'])); const columnsRows = await this.driver.query(this.pool, this.createQuery('columns', ['tables']));
const pkColumnsRows = await this.driver.query(this.pool, this.createQuery('primaryKeys', ['tables'])); const pkColumnsRows = await this.driver.query(this.pool, this.createQuery('primaryKeys', ['tables']));

View File

@ -0,0 +1,94 @@
const _ = require('lodash');
/**
*
* @param {import('@dbgate/types').EngineDriver} driver
*/
function createBulkInsertStream(driver, mssql, stream, pool, name, options) {
const fullName = name.schemaName ? `[${name.schemaName}].[${name.pureName}]` : name.pureName;
const fullNameQuoted = name.schemaName ? `[${name.schemaName}].[${name.pureName}]` : `[${name.pureName}]`;
const writable = new stream.Writable({
objectMode: true,
});
writable.buffer = [];
writable.structure = null;
writable.columnNames = null;
writable.addRow = async (row) => {
if (writable.structure) {
writable.buffer.push(row);
} else {
writable.structure = row;
await writable.checkStructure();
}
};
writable.checkStructure = async () => {
let structure = await driver.analyseSingleTable(pool, name);
if (structure && options.dropIfExists) {
console.log(`Dropping table ${fullName}`);
await driver.query(pool, `DROP TABLE ${fullNameQuoted}`);
}
if (options.createIfNotExists && (!structure || options.dropIfExists)) {
console.log(`Creating table ${fullName}`);
const dmp = driver.createDumper();
dmp.createTable({ ...writable.structure, ...name });
console.log(dmp.s);
await driver.query(pool, dmp.s);
structure = await driver.analyseSingleTable(pool, name);
}
if (options.truncate) {
await driver.query(pool, `TRUNCATE TABLE ${fullNameQuoted}`);
}
const respTemplate = await pool.request().query(`SELECT * FROM ${fullNameQuoted} WHERE 1=0`);
writable.templateColumns = respTemplate.recordset.toTable().columns;
this.columnNames = _.intersection(
structure.columns.map((x) => x.columnName),
writable.structure.columns.map((x) => x.columnName)
);
};
writable.send = async () => {
const rows = writable.buffer;
writable.buffer = [];
const table = new mssql.Table(fullName);
// table.create = options.createIfNotExists;
for (const column of this.columnNames) {
const tcol = writable.templateColumns.find((x) => x.name == column);
// console.log('TCOL', tcol);
// console.log('TYPE', tcol.type, mssql.Int);
// table.columns.add(column, tcol ? tcol.type : mssql.NVarChar(mssql.MAX));
table.columns.add(column, tcol ? tcol.type : mssql.NVarChar(mssql.MAX), { nullable: tcol.nullable });
}
for (const row of rows) {
table.rows.add(...this.columnNames.map((col) => row[col]));
}
const request = pool.request();
await request.bulk(table);
};
writable.sendIfFull = async () => {
if (writable.buffer.length > 100) {
await writable.send();
}
};
writable._write = async (chunk, encoding, callback) => {
await writable.addRow(chunk);
await writable.sendIfFull();
callback();
};
writable._final = async (callback) => {
await writable.send();
callback();
};
return writable;
}
module.exports = createBulkInsertStream;

View File

@ -1,6 +1,8 @@
const _ = require('lodash'); const _ = require('lodash');
const MsSqlAnalyser = require('./MsSqlAnalyser'); const MsSqlAnalyser = require('./MsSqlAnalyser');
const MsSqlDumper = require('./MsSqlDumper'); const MsSqlDumper = require('./MsSqlDumper');
const createBulkInsertStream = require('./createBulkInsertStream');
const { analyseSingleObject } = require('../mysql');
/** @type {import('@dbgate/types').SqlDialect} */ /** @type {import('@dbgate/types').SqlDialect} */
const dialect = { const dialect = {
@ -8,6 +10,7 @@ const dialect = {
rangeSelect: true, rangeSelect: true,
offsetFetchRangeSyntax: true, offsetFetchRangeSyntax: true,
stringEscapeChar: "'", stringEscapeChar: "'",
fallbackDataType: 'nvarchar(max)',
quoteIdentifier(s) { quoteIdentifier(s) {
return `[${s}]`; return `[${s}]`;
}, },
@ -68,6 +71,12 @@ const driver = {
}, },
// @ts-ignore // @ts-ignore
async query(pool, sql) { async query(pool, sql) {
if (sql == null) {
return {
rows: [],
columns: [],
};
}
const resp = await pool.request().query(sql); const resp = await pool.request().query(sql);
// console.log(Object.keys(resp.recordset)); // console.log(Object.keys(resp.recordset));
// console.log(resp); // console.log(resp);
@ -179,6 +188,10 @@ const driver = {
return pass; return pass;
}, },
async writeTable(pool, name, options) {
const { stream, mssql } = pool._nativeModules;
return createBulkInsertStream(this, mssql, stream, pool, name, options);
},
async getVersion(pool) { async getVersion(pool) {
const { version } = (await this.query(pool, 'SELECT @@VERSION AS version')).rows[0]; const { version } = (await this.query(pool, 'SELECT @@VERSION AS version')).rows[0];
return { version }; return { version };
@ -191,6 +204,16 @@ const driver = {
const analyser = new MsSqlAnalyser(pool, this); const analyser = new MsSqlAnalyser(pool, this);
return analyser.fullAnalysis(); return analyser.fullAnalysis();
}, },
async analyseSingleObject(pool, name, typeField = 'tables') {
const analyser = new MsSqlAnalyser(pool, this);
analyser.singleObjectFilter = { name, typeField };
const res = await analyser.fullAnalysis();
return res.tables[0];
},
// @ts-ignore
analyseSingleTable(pool, name) {
return this.analyseSingleObject(pool, name, 'tables');
},
async analyseIncremental(pool, structure) { async analyseIncremental(pool, structure) {
const analyser = new MsSqlAnalyser(pool, this); const analyser = new MsSqlAnalyser(pool, this);
return analyser.incrementalAnalysis(structure); return analyser.incrementalAnalysis(structure);

View File

@ -5,6 +5,7 @@ const MySqlDumper = require('./MySqlDumper');
const dialect = { const dialect = {
rangeSelect: true, rangeSelect: true,
stringEscapeChar: '\\', stringEscapeChar: '\\',
fallbackDataType: 'nvarchar(max)',
quoteIdentifier(s) { quoteIdentifier(s) {
return '`' + s + '`'; return '`' + s + '`';
}, },

View File

@ -6,6 +6,7 @@ const PostgreDumper = require('./PostgreDumper');
const dialect = { const dialect = {
rangeSelect: true, rangeSelect: true,
stringEscapeChar: '\\', stringEscapeChar: '\\',
fallbackDataType: 'nvarchar(max)',
quoteIdentifier(s) { quoteIdentifier(s) {
return '"' + s + '"'; return '"' + s + '"';
}, },

View File

@ -4,4 +4,5 @@ export interface SqlDialect {
stringEscapeChar: string; stringEscapeChar: string;
offsetFetchRangeSyntax?: boolean; offsetFetchRangeSyntax?: boolean;
quoteIdentifier(s: string): string; quoteIdentifier(s: string): string;
fallbackDataType?: string;
} }

View File

@ -2,7 +2,7 @@ import stream from 'stream';
import { QueryResult } from './query'; import { QueryResult } from './query';
import { SqlDialect } from './dialect'; import { SqlDialect } from './dialect';
import { SqlDumper } from './dumper'; import { SqlDumper } from './dumper';
import { DatabaseInfo, NamedObjectInfo } from './dbinfo'; import { DatabaseInfo, NamedObjectInfo, TableInfo, ViewInfo, ProcedureInfo, FunctionInfo, TriggerInfo } from './dbinfo';
export interface StreamOptions { export interface StreamOptions {
recordset: (columns) => void; recordset: (columns) => void;
@ -12,13 +12,25 @@ export interface StreamOptions {
info: (info) => void; info: (info) => void;
} }
export interface WriteTableOptions {
dropIfExists?: boolean;
truncate?: boolean;
createIfNotExists?: boolean;
}
export interface EngineDriver { export interface EngineDriver {
engine: string; engine: string;
connect(nativeModules, { server, port, user, password, database }): any; connect(nativeModules, { server, port, user, password, database }): any;
query(pool: any, sql: string): Promise<QueryResult>; query(pool: any, sql: string): Promise<QueryResult>;
stream(pool: any, sql: string, options: StreamOptions); stream(pool: any, sql: string, options: StreamOptions);
readQuery(pool: any, sql: string): Promise<stream.Readable>; readQuery(pool: any, sql: string): Promise<stream.Readable>;
writeTable(pool: any, { schemaName, pureName }): Promise<stream.Writeable>; writeTable(pool: any, name: NamedObjectInfo, options: WriteTableOptions): Promise<stream.Writeable>;
analyseSingleObject(
pool: any,
name: NamedObjectInfo,
objectTypeField: keyof DatabaseInfo
): Promise<TableInfo | ViewInfo | ProcedureInfo | FunctionInfo | TriggerInfo>;
analyseSingleTable(pool: any, name: NamedObjectInfo): Promise<TableInfo>;
getVersion(pool: any): Promise<{ version: string }>; getVersion(pool: any): Promise<{ version: string }>;
listDatabases( listDatabases(
pool: any pool: any

View File

@ -6,6 +6,19 @@ async function run() {
// header: false, // header: false,
}); });
const tableWriter = await dbgateApi.tableWriter({
connection: {
server: 'localhost',
engine: 'mssql',
user: 'sa',
password: 'Pwd2020Db',
database: 'Chinook',
},
schemaName: 'dbo',
pureName: 'Genre2',
createIfNotExists: true,
truncate: true,
});
// const tableWriter = await dbgateApi.tableWriter({ // const tableWriter = await dbgateApi.tableWriter({
// connection: { // connection: {
@ -21,7 +34,8 @@ async function run() {
const consoleWriter = await dbgateApi.consoleObjectWriter(); const consoleWriter = await dbgateApi.consoleObjectWriter();
await dbgateApi.copyStream(csvReader, consoleWriter); // await dbgateApi.copyStream(csvReader, consoleWriter);
await dbgateApi.copyStream(csvReader, tableWriter);
} }
dbgateApi.runScript(run); dbgateApi.runScript(run);