clickhouse import

This commit is contained in:
Jan Prochazka 2024-09-12 15:39:48 +02:00
parent 086bc0d9f3
commit b232263708
5 changed files with 42 additions and 3 deletions

View File

@ -10,6 +10,7 @@ export function prepareTableForImport(table: TableInfo): TableInfo {
res.uniques = [];
res.checks = [];
if (res.primaryKey) res.primaryKey.constraintName = null;
res.tableEngine = null;
return res;
}

View File

@ -78,6 +78,15 @@ class Analyser extends DatabaseAnalyser {
views: tableModificationsQueryData.rows.filter((x) => x.tableEngine == 'View'),
};
}
async _computeSingleObjectId() {
const { pureName } = this.singleObjectFilter;
const resId = await this.driver.query(
this.pool,
`SELECT uuid as id FROM system.tables WHERE database = '${this.pool._database_name}' AND name='${pureName}'`
);
this.singleObjectId = resId.rows[0].id;
}
}
module.exports = Analyser;

View File

@ -0,0 +1,27 @@
const { createBulkInsertStreamBase } = global.DBGATE_PACKAGES['dbgate-tools'];
const _ = require('lodash');
/**
*
* @param {import('dbgate-types').EngineDriver} driver
*/
function createOracleBulkInsertStream(driver, stream, pool, name, options) {
const writable = createBulkInsertStreamBase(driver, stream, pool, name, {
...options,
// this is really not used, send method below is used instead
commitAfterInsert: true,
});
writable.send = async () => {
await pool.insert({
table: name.pureName,
values: writable.buffer,
format: 'JSONEachRow',
});
writable.buffer = [];
};
return writable;
}
module.exports = createOracleBulkInsertStream;

View File

@ -3,6 +3,7 @@ const stream = require('stream');
const driverBase = require('../frontend/driver');
const Analyser = require('./Analyser');
const { createClient } = require('@clickhouse/client');
const createBulkInsertStream = require('./createBulkInsertStream');
/** @type {import('dbgate-types').EngineDriver} */
const driver = {
@ -183,9 +184,8 @@ const driver = {
return pass;
},
// called when importing into table or view
async writeTable(connection, name, options) {
return createBulkInsertStreamBase(this, stream, pool, name, options);
async writeTable(pool, name, options) {
return createBulkInsertStream(this, stream, pool, name, options);
},
// detect server version
async getVersion(client) {

View File

@ -37,6 +37,8 @@ class Dumper extends SqlDumper {
);
}
}
autoIncrement() {}
}
module.exports = Dumper;