From 36dffe0a0fe25dbd5a11f5840a88fc42d0722ca7 Mon Sep 17 00:00:00 2001 From: Jan Prochazka Date: Mon, 28 Sep 2020 12:09:01 +0200 Subject: [PATCH] postgres - stream query reader --- packages/engines/postgres/index.js | 45 ++++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/packages/engines/postgres/index.js b/packages/engines/postgres/index.js index 256ebb2d..1134198a 100644 --- a/packages/engines/postgres/index.js +++ b/packages/engines/postgres/index.js @@ -120,6 +120,51 @@ const driver = { const analyser = new PostgreAnalyser(pool, this); return analyser.incrementalAnalysis(structure); }, + async readQuery(client, sql, structure) { + const query = new client._nativeModules.pgQueryStream(sql); + const { stream } = client._nativeModules; + + const queryStream = client.query(query); + + let wasHeader = false; + + const pass = new stream.PassThrough({ + objectMode: true, + highWaterMark: 100, + }); + + const handleEnd = (result) => { + pass.end(); + }; + + const handleReadable = () => { + let row = queryStream.read(); + if (!wasHeader && row) { + pass.write( + structure || { + columns: _.keys(row).map((columnName) => ({ columnName })), + } + ); + wasHeader = true; + } + + while (row) { + pass.write(row); + row = queryStream.read(); + } + }; + + const handleError = (error) => { + console.error(error); + pass.end(); + }; + + queryStream.on('error', handleError); + queryStream.on('readable', handleReadable); + queryStream.on('end', handleEnd); + + return pass; + }, createDumper() { return new PostgreDumper(this); },