From c57a67da09f060f5678fda9bfc3bd7a914b4c735 Mon Sep 17 00:00:00 2001 From: Jan Prochazka Date: Sun, 16 May 2021 13:59:49 +0200 Subject: [PATCH] postgre: using streamed query instead of cursor --- plugins/dbgate-plugin-postgres/package.json | 5 +- .../src/backend/drivers.js | 97 ++++++++----------- yarn.lock | 12 --- 3 files changed, 43 insertions(+), 71 deletions(-) diff --git a/plugins/dbgate-plugin-postgres/package.json b/plugins/dbgate-plugin-postgres/package.json index 6fc72f3c..fc0d2f01 100644 --- a/plugins/dbgate-plugin-postgres/package.json +++ b/plugins/dbgate-plugin-postgres/package.json @@ -34,9 +34,8 @@ "dbgate-tools": "^4.1.1", "lodash": "^4.17.15", "pg": "^7.17.0", - "pg-query-stream": "^3.1.1", + "sql-query-identifier": "^2.1.0", "webpack": "^4.42.0", - "webpack-cli": "^3.3.11", - "sql-query-identifier": "^2.1.0" + "webpack-cli": "^3.3.11" } } diff --git a/plugins/dbgate-plugin-postgres/src/backend/drivers.js b/plugins/dbgate-plugin-postgres/src/backend/drivers.js index 510f29d5..052018f1 100644 --- a/plugins/dbgate-plugin-postgres/src/backend/drivers.js +++ b/plugins/dbgate-plugin-postgres/src/backend/drivers.js @@ -26,29 +26,14 @@ function zipDataRow(rowArray, columns) { async function runStreamItem(client, sql, options) { return new Promise((resolve, reject) => { - const query = new pgQueryStream(sql, undefined, { rowMode: 'array' }); - const stream = client.query(query); - - // const handleInfo = (info) => { - // const { message, lineNumber, procName } = info; - // options.info({ - // message, - // line: lineNumber, - // procedure: procName, - // time: new Date(), - // severity: 'info', - // }); - // }; + const query = new pg.Query({ + text: sql, + rowMode: 'array', + }); let wasHeader = false; - const handleEnd = result => { - // console.log('RESULT', result); - resolve(); - }; - - let columns = null; - const handleReadable = () => { + query.on('row', row => { if (!wasHeader) { columns = extractPostgresColumns(query._result); if (columns && columns.length > 0) { @@ -57,21 +42,22 @@ async function runStreamItem(client, sql, options) { wasHeader = true; } - for (;;) { - const row = stream.read(); - if (!row) break; + options.row(zipDataRow(row, columns)); + }); - options.row(zipDataRow(row, columns)); + query.on('end', () => { + if (!wasHeader) { + columns = extractPostgresColumns(query._result); + if (columns && columns.length > 0) { + options.recordset(columns); + } + wasHeader = true; } - }; - // const handleFields = (columns) => { - // // console.log('FIELDS', columns[0].name); - // options.recordset(columns); - // // options.recordset(extractColumns(columns)); - // }; + resolve(); + }); - const handleError = error => { + query.on('error', error => { console.log('ERROR', error); const { message, lineNumber, procName } = error; options.info({ @@ -82,13 +68,9 @@ async function runStreamItem(client, sql, options) { severity: 'error', }); resolve(); - }; + }); - stream.on('error', handleError); - stream.on('readable', handleReadable); - // stream.on('result', handleRow) - // stream.on('data', handleRow) - stream.on('end', handleEnd); + client.query(query); }); } @@ -184,23 +166,20 @@ const drivers = driverBases.map(driverBase => ({ }; }, async readQuery(client, sql, structure) { - const query = new pgQueryStream(sql, undefined, { rowMode: 'array' }); - - const queryStream = client.query(query); + const query = new pg.Query({ + text: sql, + rowMode: 'array', + }); let wasHeader = false; + let columns = null; const pass = new stream.PassThrough({ objectMode: true, highWaterMark: 100, }); - const handleEnd = result => { - pass.end(); - }; - - let columns = null; - const handleReadable = () => { + query.on('row', row => { if (!wasHeader) { columns = extractPostgresColumns(query._result); pass.write({ @@ -210,22 +189,28 @@ const drivers = driverBases.map(driverBase => ({ wasHeader = true; } - for (;;) { - const row = queryStream.read(); - if (!row) break; + pass.write(zipDataRow(row, columns)); + }); - pass.write(zipDataRow(row, columns)); + query.on('end', () => { + if (!wasHeader) { + columns = extractPostgresColumns(query._result); + pass.write({ + __isStreamHeader: true, + ...(structure || { columns }), + }); + wasHeader = true; } - }; - const handleError = error => { + pass.end(); + }); + + query.on('error', error => { console.error(error); pass.end(); - }; + }); - queryStream.on('error', handleError); - queryStream.on('readable', handleReadable); - queryStream.on('end', handleEnd); + client.query(query); return pass; }, diff --git a/yarn.lock b/yarn.lock index b355f2c3..d6180b8a 100644 --- a/yarn.lock +++ b/yarn.lock @@ -6809,11 +6809,6 @@ pg-connection-string@0.1.3: resolved "https://registry.yarnpkg.com/pg-connection-string/-/pg-connection-string-0.1.3.tgz#da1847b20940e42ee1492beaf65d49d91b245df7" integrity sha1-2hhHsglA5C7hSSvq9l1J2RskXfc= -pg-cursor@^2.5.1: - version "2.5.2" - resolved "https://registry.yarnpkg.com/pg-cursor/-/pg-cursor-2.5.2.tgz#9217fc989fa64221a02d6ed4b37323267d90abde" - integrity sha512-yS0lxXA5WoIVK7BUgJr1uOJDJe5JxVezItTLvqnTXj6bF3di4UtQOrPx8RW3GpFmom2NTQfpEc2N6vvdpopQSw== - pg-int8@1.0.1: version "1.0.1" resolved "https://registry.yarnpkg.com/pg-int8/-/pg-int8-1.0.1.tgz#943bd463bf5b71b4170115f80f8efc9a0c0eb78c" @@ -6829,13 +6824,6 @@ pg-pool@^2.0.10: resolved "https://registry.yarnpkg.com/pg-pool/-/pg-pool-2.0.10.tgz#842ee23b04e86824ce9d786430f8365082d81c4a" integrity sha512-qdwzY92bHf3nwzIUcj+zJ0Qo5lpG/YxchahxIN8+ZVmXqkahKXsnl2aiJPHLYN9o5mB/leG+Xh6XKxtP7e0sjg== -pg-query-stream@^3.1.1: - version "3.4.2" - resolved "https://registry.yarnpkg.com/pg-query-stream/-/pg-query-stream-3.4.2.tgz#6cdf8f3086bfe01b2e4d7cc461ad4c8c6a7d0914" - integrity sha512-kaTzsi5TQ3XG1KUznEV3MnstM1U4k5Z9cZ02PNmKLMFeYiPEn83FUc2pPVPiKKv93ITI8e5oCh+zEOunjy0ZwQ== - dependencies: - pg-cursor "^2.5.1" - pg-types@^2.1.0: version "2.2.0" resolved "https://registry.yarnpkg.com/pg-types/-/pg-types-2.2.0.tgz#2d0250d636454f7cfa3b6ae0382fdfa8063254a3"