mssql - incremental analysis

This commit is contained in:
Jan Prochazka 2020-04-11 20:24:30 +02:00
parent eb0c2f04bd
commit fae97a8b4a
12 changed files with 288 additions and 87 deletions

20
.vscode/launch.json vendored Normal file
View File

@ -0,0 +1,20 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"type": "node",
"request": "launch",
"name": "Launch API",
"skipFiles": [
"<node_internals>/**"
],
"program": "${workspaceFolder}/packages/api/src/index.js",
"outFiles": [
"${workspaceFolder}/**/*.js"
]
}
]
}

View File

@ -4,11 +4,21 @@ const driverConnect = require('../utility/driverConnect');
let systemConnection;
let storedConnection;
let afterConnectCallbacks = [];
let analysedStructure = null;
async function handleFullRefresh() {
const driver = engines(storedConnection);
const structure = await driver.analyseFull(systemConnection);
process.send({ msgtype: 'structure', structure });
analysedStructure = await driver.analyseFull(systemConnection);
process.send({ msgtype: 'structure', structure: analysedStructure });
}
async function handleIncrementalRefresh() {
const driver = engines(storedConnection);
const newStructure = await driver.analyseIncremental(systemConnection, analysedStructure);
if (newStructure != null) {
analysedStructure = newStructure;
process.send({ msgtype: 'structure', structure: analysedStructure });
}
}
async function handleConnect(connection) {
@ -17,7 +27,7 @@ async function handleConnect(connection) {
const driver = engines(storedConnection);
systemConnection = await driverConnect(driver, storedConnection);
handleFullRefresh();
setInterval(handleFullRefresh, 30 * 1000);
setInterval(handleIncrementalRefresh, 10 * 1000);
for (const [resolve] of afterConnectCallbacks) {
resolve();
}
@ -57,7 +67,7 @@ async function handleMessage({ msgtype, ...other }) {
}
function start() {
process.on('message', async message => {
process.on('message', async (message) => {
try {
await handleMessage(message);
} catch (e) {

View File

@ -1,3 +1,5 @@
const _ = require('lodash');
class DatabaseAnalyser {
/**
*
@ -6,14 +8,80 @@ class DatabaseAnalyser {
constructor(pool, driver) {
this.pool = pool;
this.driver = driver;
this.result = DatabaseAnalyser.createEmptyStructure();
// this.result = DatabaseAnalyser.createEmptyStructure();
/** @type {import('@dbgate/types').DatabaseInfo} */
this.structure = null;
/** import('@dbgate/types').DatabaseModification[]) */
this.modifications = null;
}
async runAnalysis() {}
async _runAnalysis() {
return DatabaseAnalyser.createEmptyStructure();
}
/** @returns {Promise<import('@dbgate/types').DatabaseModification[]>} */
async getModifications() {
if (this.structure != null) throw new Error('DatabaseAnalyse.getModifications - structure must not be filled');
return [];
}
async fullAnalysis() {
return this._runAnalysis();
}
async incrementalAnalysis(structure) {
this.structure = structure;
this.modifications = await this.getModifications();
if (this.modifications.length == 0) return null;
console.log('DB modifications detected:', this.modifications);
return this._runAnalysis();
}
mergeAnalyseResult(newlyAnalysed) {
if (this.structure == null) {
return {
...DatabaseAnalyser.createEmptyStructure(),
...newlyAnalysed,
};
}
const res = {};
for (const field of ['tables', 'views', 'functions', 'procedures', 'triggers']) {
const removedIds = this.modifications
.filter((x) => x.action == 'remove' && x.objectTypeField == field)
.map((x) => x.objectId);
const newArray = newlyAnalysed[field] || [];
const addedChangedIds = newArray.map((x) => x.objectId);
const removeAllIds = [...removedIds, ...addedChangedIds];
res[field] = _.sortBy(
[...this.structure[field].filter((x) => !removeAllIds.includes(x.objectId)), ...newArray],
(x) => x.pureName
);
}
return res;
// const {tables,views, functions, procedures, triggers} = this.structure;
// return {
// tables:
// }
}
// findObjectById(id) {
// return this.structure.tables.find((x) => x.objectId == id);
// }
}
/** @returns {import('@dbgate/types').DatabaseInfo} */
DatabaseAnalyser.createEmptyStructure = () => ({
tables: [],
views: [],
functions: [],
procedures: [],
triggers: [],
});
module.exports = DatabaseAnalyser;

View File

@ -1,10 +1,10 @@
const fp = require('lodash/fp');
const _ = require('lodash');
const sql = require('./sql')
const sql = require('./sql');
const DatabaseAnalayser = require('../default/DatabaseAnalyser');
const byTableFilter = table => x => x.pureName == table.pureName && x.schemaName == x.schemaName;
const byTableFilter = (table) => (x) => x.pureName == table.pureName && x.schemaName == x.schemaName;
function extractPrimaryKeys(table, pkColumns) {
const filtered = pkColumns.filter(byTableFilter(table));
@ -18,7 +18,7 @@ function extractPrimaryKeys(table, pkColumns) {
function extractForeignKeys(table, fkColumns) {
const grouped = _.groupBy(fkColumns.filter(byTableFilter(table)), 'constraintName');
return _.keys(grouped).map(constraintName => ({
return _.keys(grouped).map((constraintName) => ({
constraintName,
constraintType: 'foreignKey',
..._.pick(grouped[constraintName][0], [
@ -34,6 +34,25 @@ function extractForeignKeys(table, fkColumns) {
}));
}
function objectTypeToField(type) {
switch (type.trim()) {
case 'U':
return 'tables';
case 'V':
return 'views';
case 'P':
return 'procedures';
case 'IF':
case 'FN':
case 'TF':
return 'functions';
case 'TR':
return 'triggers';
default:
return null;
}
}
/** @returns {import('@dbgate/types').DbType} */
function detectType(col) {
switch (col.dataType) {
@ -172,37 +191,102 @@ class MsSqlAnalyser extends DatabaseAnalayser {
super(pool, driver);
}
async createQuery(
resFileName,
tables = false,
views = false,
procedures = false,
functions = false,
triggers = false
) {
createQuery(resFileName, filterIdObjects) {
let res = sql[resFileName];
res = res.replace('=[OBJECT_ID_CONDITION]', ' is not null');
if (!this.modifications || !filterIdObjects || this.modifications.length == 0) {
res = res.replace('=[OBJECT_ID_CONDITION]', ' is not null');
} else {
const filterIds = this.modifications
.filter((x) => filterIdObjects.includes(x.objectTypeField) && (x.action == 'add' || x.action == 'change'))
.map((x) => x.objectId);
if (filterIds.length == 0) {
res = res.replace('=[OBJECT_ID_CONDITION]', ' = 0');
} else {
res = res.replace('=[OBJECT_ID_CONDITION]', ` in (${filterIds.join(',')})`);
}
}
return res;
}
async runAnalysis() {
const tables = await this.driver.query(this.pool, await this.createQuery('tables'));
const columns = await this.driver.query(this.pool, await this.createQuery('columns'));
const pkColumns = await this.driver.query(this.pool, await this.createQuery('primaryKeys'));
const fkColumns = await this.driver.query(this.pool, await this.createQuery('foreignKeys'));
async _runAnalysis() {
const tables = await this.driver.query(this.pool, this.createQuery('tables', ['tables']));
const columns = await this.driver.query(this.pool, this.createQuery('columns', ['tables']));
const pkColumns = await this.driver.query(this.pool, this.createQuery('primaryKeys', ['tables']));
const fkColumns = await this.driver.query(this.pool, this.createQuery('foreignKeys', ['tables']));
this.result.tables = tables.rows.map(table => ({
...table,
columns: columns.rows
.filter(col => col.objectId == table.objectId)
.map(({ isNullable, isIdentity, ...col }) => ({
...col,
notNull: !isNullable,
autoIncrement: !!isIdentity,
commonType: detectType(col),
})),
primaryKey: extractPrimaryKeys(table, pkColumns.rows),
foreignKeys: extractForeignKeys(table, fkColumns.rows),
}));
return this.mergeAnalyseResult({
tables: tables.rows.map((table) => ({
...table,
columns: columns.rows
.filter((col) => col.objectId == table.objectId)
.map(({ isNullable, isIdentity, ...col }) => ({
...col,
notNull: !isNullable,
autoIncrement: !!isIdentity,
commonType: detectType(col),
})),
primaryKey: extractPrimaryKeys(table, pkColumns.rows),
foreignKeys: extractForeignKeys(table, fkColumns.rows),
})),
});
}
getDeletedObjectsForField(idArray, objectTypeField) {
return this.structure[objectTypeField]
.filter((x) => !idArray.includes(x.objectId))
.map((x) => ({
oldName: _.pick(x, ['schemaName', 'pureName']),
objectId: x.objectId,
action: 'remove',
objectTypeField,
}));
}
getDeletedObjects(idArray) {
return [
...this.getDeletedObjectsForField(idArray, 'tables'),
...this.getDeletedObjectsForField(idArray, 'views'),
...this.getDeletedObjectsForField(idArray, 'procedures'),
...this.getDeletedObjectsForField(idArray, 'functions'),
...this.getDeletedObjectsForField(idArray, 'triggers'),
];
}
async getModifications() {
const modificationsQueryData = await this.driver.query(this.pool, this.createQuery('modifications'));
// console.log('MOD - SRC', modifications);
// console.log(
// 'MODs',
// this.structure.tables.map((x) => x.modifyDate)
// );
const modifications = modificationsQueryData.rows.map((x) => {
const { type, objectId, modifyDate, schemaName, pureName } = x;
const field = objectTypeToField(type);
if (!this.structure[field]) return null;
// @ts-ignore
const obj = this.structure[field].find((x) => x.objectId == objectId);
// object not modified
if (obj && Math.abs(new Date(modifyDate).getTime() - new Date(obj.modifyDate).getTime()) < 1000) return null;
/** @type {import('@dbgate/types').DatabaseModification} */
const action = obj
? {
newName: { schemaName, pureName },
oldName: _.pick(obj, ['schemaName', 'pureName']),
action: 'change',
objectTypeField: field,
objectId,
}
: {
newName: { schemaName, pureName },
action: 'add',
objectTypeField: field,
objectId,
};
return action;
});
return [..._.compact(modifications), ...this.getDeletedObjects(modificationsQueryData.rows.map((x) => x.objectId))];
}
}

View File

@ -109,10 +109,12 @@ const driver = {
},
async analyseFull(pool) {
const analyser = new MsSqlAnalyser(pool, this);
await analyser.runAnalysis();
return analyser.result;
return await analyser.fullAnalysis();
},
async analyseIncremental(pool, structure) {
const analyser = new MsSqlAnalyser(pool, this);
return await analyser.incrementalAnalysis(structure);
},
// async analyseIncremental(pool) {},
createDumper() {
return new MsSqlDumper(this);
},

View File

@ -2,10 +2,12 @@ const columns = require('./columns');
const foreignKeys = require('./foreignKeys');
const primaryKeys = require('./primaryKeys');
const tables = require('./tables');
const modifications = require('./modifications');
module.exports = {
columns,
tables,
foreignKeys,
primaryKeys,
modifications,
};

View File

@ -0,0 +1,6 @@
module.exports = `
select o.object_id as objectId, o.modify_date as modifyDate, o.type, o.name as pureName, s.name as schemaName
from sys.objects o
inner join sys.schemas s on o.schema_id = s.schema_id
where o.type in ('U', 'V', 'P', 'IF', 'FN', 'TR', 'TF')
`;

View File

@ -1,7 +1,7 @@
module.exports = `
select
o.name as pureName, s.name as schemaName, o.object_id as objectId,
o.create_date, o.modify_date
o.create_date as createDate, o.modify_date as modifyDate
from sys.tables o
inner join sys.schemas s on o.schema_id = s.schema_id
where o.object_id =[OBJECT_ID_CONDITION]

View File

@ -9,29 +9,22 @@ class MySqlAnalyser extends DatabaseAnalayser {
super(pool, driver);
}
async createQuery(
resFileName,
tables = false,
views = false,
procedures = false,
functions = false,
triggers = false
) {
createQuery(resFileName, tables = false, views = false, procedures = false, functions = false, triggers = false) {
let res = sql[resFileName];
res = res.replace('=[OBJECT_NAME_CONDITION]', ' is not null');
res = res.replace('#DATABASE#', this.pool._database_name);
return res;
}
async runAnalysis() {
const tables = await this.driver.query(this.pool, await this.createQuery('tables'));
const columns = await this.driver.query(this.pool, await this.createQuery('columns'));
// const pkColumns = await this.driver.query(this.pool, await this.createQuery('primary_keys.sql'));
// const fkColumns = await this.driver.query(this.pool, await this.createQuery('foreign_keys.sql'));
const tables = await this.driver.query(this.pool, this.createQuery('tables'));
const columns = await this.driver.query(this.pool, this.createQuery('columns'));
// const pkColumns = await this.driver.query(this.pool, this.createQuery('primary_keys.sql'));
// const fkColumns = await this.driver.query(this.pool, this.createQuery('foreign_keys.sql'));
this.result.tables = tables.rows.map(table => ({
this.result.tables = tables.rows.map((table) => ({
...table,
columns: columns.rows
.filter(col => col.pureName == table.pureName)
.filter((col) => col.pureName == table.pureName)
.map(({ isNullable, extra, ...col }) => ({
...col,
notNull: !isNullable,

View File

@ -1,50 +1,34 @@
const fp = require("lodash/fp");
const _ = require("lodash");
const sql = require('./sql')
const fp = require('lodash/fp');
const _ = require('lodash');
const sql = require('./sql');
const DatabaseAnalayser = require("../default/DatabaseAnalyser");
const DatabaseAnalayser = require('../default/DatabaseAnalyser');
class MySqlAnalyser extends DatabaseAnalayser {
constructor(pool, driver) {
super(pool, driver);
}
async createQuery(
resFileName,
tables = false,
views = false,
procedures = false,
functions = false,
triggers = false
) {
createQuery(resFileName, tables = false, views = false, procedures = false, functions = false, triggers = false) {
let res = sql[resFileName];
res = res.replace("=[OBJECT_ID_CONDITION]", " is not null");
res = res.replace('=[OBJECT_ID_CONDITION]', ' is not null');
return res;
}
async runAnalysis() {
const tables = await this.driver.query(
this.pool,
await this.createQuery("tableModifications")
);
const columns = await this.driver.query(
this.pool,
await this.createQuery("columns")
);
// const pkColumns = await this.driver.query(this.pool, await this.createQuery('primary_keys.sql'));
// const fkColumns = await this.driver.query(this.pool, await this.createQuery('foreign_keys.sql'));
const tables = await this.driver.query(this.pool, this.createQuery('tableModifications'));
const columns = await this.driver.query(this.pool, this.createQuery('columns'));
// const pkColumns = await this.driver.query(this.pool, this.createQuery('primary_keys.sql'));
// const fkColumns = await this.driver.query(this.pool, this.createQuery('foreign_keys.sql'));
this.result.tables = tables.rows.map(table => ({
this.result.tables = tables.rows.map((table) => ({
...table,
columns: columns.rows
.filter(
col =>
col.pureName == table.pureName && col.schemaName == table.schemaName
)
.filter((col) => col.pureName == table.pureName && col.schemaName == table.schemaName)
.map(({ isNullable, ...col }) => ({
...col,
notNull: !isNullable
notNull: !isNullable,
})),
foreignKeys: []
foreignKeys: [],
// primaryKey: extractPrimaryKeys(table, pkColumns.rows),
// foreignKeys: extractForeignKeys(table, fkColumns.rows),
}));

View File

@ -43,12 +43,36 @@ export interface ColumnInfo {
defaultConstraint: string;
commonType?: DbType;
}
export interface TableInfo extends NamedObjectInfo {
export interface DatabaseObjectInfo extends NamedObjectInfo {
objectId?: string;
createDate?: string;
modifyDate?: string;
}
export interface SqlObjectInfo extends DatabaseObjectInfo {
createSql?: string;
}
export interface TableInfo extends DatabaseObjectInfo {
columns: ColumnInfo[];
primaryKey?: PrimaryKeyInfo;
foreignKeys: ForeignKeyInfo[];
dependencies?: ForeignKeyInfo[];
}
export interface ViewInfo extends SqlObjectInfo {}
export interface ProcedureInfo extends SqlObjectInfo {}
export interface FunctionInfo extends SqlObjectInfo {}
export interface TriggerInfo extends SqlObjectInfo {}
export interface DatabaseInfo {
tables: TableInfo[];
views: ViewInfo[];
procedures: ProcedureInfo[];
functions: FunctionInfo[];
triggers: TriggerInfo[];
}

View File

@ -1,7 +1,7 @@
import { QueryResult } from './query';
import { SqlDialect } from './dialect';
import { SqlDumper } from './dumper';
import { DatabaseInfo } from './dbinfo';
import { DatabaseInfo, NamedObjectInfo } from './dbinfo';
export interface StreamOptions {
recordset: (columns) => void;
@ -25,7 +25,15 @@ export interface EngineDriver {
}[]
>;
analyseFull(pool: any): Promise<DatabaseInfo>;
// analyseIncremental(pool: any): Promise<void>;
analyseIncremental(pool: any, structure: DatabaseInfo): Promise<DatabaseInfo>;
dialect: SqlDialect;
createDumper(): SqlDumper;
}
export interface DatabaseModification {
oldName?: NamedObjectInfo;
newName?: NamedObjectInfo;
objectId: string;
action: 'add' | 'remove' | 'change';
objectTypeField: keyof DatabaseInfo;
}