From b23226370831f05b192b36347926cf0ca3b8fad0 Mon Sep 17 00:00:00 2001 From: Jan Prochazka Date: Thu, 12 Sep 2024 15:39:48 +0200 Subject: [PATCH] clickhouse import --- packages/tools/src/tableTransforms.ts | 1 + .../src/backend/Analyser.js | 9 +++++++ .../src/backend/createBulkInsertStream.js | 27 +++++++++++++++++++ .../src/backend/driver.js | 6 ++--- .../src/frontend/Dumper.js | 2 ++ 5 files changed, 42 insertions(+), 3 deletions(-) create mode 100644 plugins/dbgate-plugin-clickhouse/src/backend/createBulkInsertStream.js diff --git a/packages/tools/src/tableTransforms.ts b/packages/tools/src/tableTransforms.ts index d53c3857..6babccc3 100644 --- a/packages/tools/src/tableTransforms.ts +++ b/packages/tools/src/tableTransforms.ts @@ -10,6 +10,7 @@ export function prepareTableForImport(table: TableInfo): TableInfo { res.uniques = []; res.checks = []; if (res.primaryKey) res.primaryKey.constraintName = null; + res.tableEngine = null; return res; } diff --git a/plugins/dbgate-plugin-clickhouse/src/backend/Analyser.js b/plugins/dbgate-plugin-clickhouse/src/backend/Analyser.js index 3a734fc0..f0cc7adc 100644 --- a/plugins/dbgate-plugin-clickhouse/src/backend/Analyser.js +++ b/plugins/dbgate-plugin-clickhouse/src/backend/Analyser.js @@ -78,6 +78,15 @@ class Analyser extends DatabaseAnalyser { views: tableModificationsQueryData.rows.filter((x) => x.tableEngine == 'View'), }; } + + async _computeSingleObjectId() { + const { pureName } = this.singleObjectFilter; + const resId = await this.driver.query( + this.pool, + `SELECT uuid as id FROM system.tables WHERE database = '${this.pool._database_name}' AND name='${pureName}'` + ); + this.singleObjectId = resId.rows[0].id; + } } module.exports = Analyser; diff --git a/plugins/dbgate-plugin-clickhouse/src/backend/createBulkInsertStream.js b/plugins/dbgate-plugin-clickhouse/src/backend/createBulkInsertStream.js new file mode 100644 index 00000000..4cf6f3ff --- /dev/null +++ b/plugins/dbgate-plugin-clickhouse/src/backend/createBulkInsertStream.js @@ -0,0 +1,27 @@ +const { createBulkInsertStreamBase } = global.DBGATE_PACKAGES['dbgate-tools']; +const _ = require('lodash'); + +/** + * + * @param {import('dbgate-types').EngineDriver} driver + */ +function createOracleBulkInsertStream(driver, stream, pool, name, options) { + const writable = createBulkInsertStreamBase(driver, stream, pool, name, { + ...options, + // this is really not used, send method below is used instead + commitAfterInsert: true, + }); + + writable.send = async () => { + await pool.insert({ + table: name.pureName, + values: writable.buffer, + format: 'JSONEachRow', + }); + writable.buffer = []; + }; + + return writable; +} + +module.exports = createOracleBulkInsertStream; diff --git a/plugins/dbgate-plugin-clickhouse/src/backend/driver.js b/plugins/dbgate-plugin-clickhouse/src/backend/driver.js index 9ccd1e89..dcc83f97 100644 --- a/plugins/dbgate-plugin-clickhouse/src/backend/driver.js +++ b/plugins/dbgate-plugin-clickhouse/src/backend/driver.js @@ -3,6 +3,7 @@ const stream = require('stream'); const driverBase = require('../frontend/driver'); const Analyser = require('./Analyser'); const { createClient } = require('@clickhouse/client'); +const createBulkInsertStream = require('./createBulkInsertStream'); /** @type {import('dbgate-types').EngineDriver} */ const driver = { @@ -183,9 +184,8 @@ const driver = { return pass; }, - // called when importing into table or view - async writeTable(connection, name, options) { - return createBulkInsertStreamBase(this, stream, pool, name, options); + async writeTable(pool, name, options) { + return createBulkInsertStream(this, stream, pool, name, options); }, // detect server version async getVersion(client) { diff --git a/plugins/dbgate-plugin-clickhouse/src/frontend/Dumper.js b/plugins/dbgate-plugin-clickhouse/src/frontend/Dumper.js index f9a79215..77b3a05b 100644 --- a/plugins/dbgate-plugin-clickhouse/src/frontend/Dumper.js +++ b/plugins/dbgate-plugin-clickhouse/src/frontend/Dumper.js @@ -37,6 +37,8 @@ class Dumper extends SqlDumper { ); } } + + autoIncrement() {} } module.exports = Dumper;