diff --git a/plugins/dbgate-plugin-clickhouse/src/backend/driver.js b/plugins/dbgate-plugin-clickhouse/src/backend/driver.js index 911bc3b6..9ccd1e89 100644 --- a/plugins/dbgate-plugin-clickhouse/src/backend/driver.js +++ b/plugins/dbgate-plugin-clickhouse/src/backend/driver.js @@ -77,9 +77,9 @@ const driver = { let columnNames = null; let dataTypes = null; - const stream = resultSet.stream(); + const strm = resultSet.stream(); - stream.on('data', (rows) => { + strm.on('data', (rows) => { rows.forEach((row) => { const json = row.json(); if (!columnNames) { @@ -102,11 +102,11 @@ const driver = { }); }); - stream.on('end', () => { + strm.on('end', () => { options.done(); }); - stream.on('error', (err) => { + strm.on('error', (err) => { options.info({ message: err.toString(), time: new Date(), @@ -131,16 +131,55 @@ const driver = { } }, // called when exporting table or view - async readQuery(connection, sql, structure) { + async readQuery(client, query, structure) { const pass = new stream.PassThrough({ objectMode: true, highWaterMark: 100, }); - // pass.write(structure) - // pass.write(row1) - // pass.write(row2) - // pass.end() + const resultSet = await client.query({ + query, + format: 'JSONCompactEachRowWithNamesAndTypes', + }); + + let columnNames = null; + let dataTypes = null; + + const strm = resultSet.stream(); + + strm.on('data', (rows) => { + rows.forEach((row) => { + const json = row.json(); + if (!columnNames) { + columnNames = json; + return; + } + if (!dataTypes) { + dataTypes = json; + + const columns = columnNames.map((columnName, i) => ({ + columnName, + dataType: dataTypes[i], + })); + + pass.write({ + __isStreamHeader: true, + ...(structure || { columns }), + }); + return; + } + const data = _.zipObject(columnNames, json); + pass.write(data); + }); + }); + + strm.on('end', () => { + pass.end(); + }); + + strm.on('error', (err) => { + pass.end(); + }); return pass; },