clickhouse export

This commit is contained in:
Jan Prochazka 2024-09-12 14:40:29 +02:00
parent e21c6d4872
commit 086bc0d9f3

View File

@ -77,9 +77,9 @@ const driver = {
let columnNames = null;
let dataTypes = null;
const stream = resultSet.stream();
const strm = resultSet.stream();
stream.on('data', (rows) => {
strm.on('data', (rows) => {
rows.forEach((row) => {
const json = row.json();
if (!columnNames) {
@ -102,11 +102,11 @@ const driver = {
});
});
stream.on('end', () => {
strm.on('end', () => {
options.done();
});
stream.on('error', (err) => {
strm.on('error', (err) => {
options.info({
message: err.toString(),
time: new Date(),
@ -131,16 +131,55 @@ const driver = {
}
},
// called when exporting table or view
async readQuery(connection, sql, structure) {
async readQuery(client, query, structure) {
const pass = new stream.PassThrough({
objectMode: true,
highWaterMark: 100,
});
// pass.write(structure)
// pass.write(row1)
// pass.write(row2)
// pass.end()
const resultSet = await client.query({
query,
format: 'JSONCompactEachRowWithNamesAndTypes',
});
let columnNames = null;
let dataTypes = null;
const strm = resultSet.stream();
strm.on('data', (rows) => {
rows.forEach((row) => {
const json = row.json();
if (!columnNames) {
columnNames = json;
return;
}
if (!dataTypes) {
dataTypes = json;
const columns = columnNames.map((columnName, i) => ({
columnName,
dataType: dataTypes[i],
}));
pass.write({
__isStreamHeader: true,
...(structure || { columns }),
});
return;
}
const data = _.zipObject(columnNames, json);
pass.write(data);
});
});
strm.on('end', () => {
pass.end();
});
strm.on('error', (err) => {
pass.end();
});
return pass;
},