From 03b656a25b164f6dbec1d9bcf9eeb445027539e4 Mon Sep 17 00:00:00 2001 From: Jan Prochazka Date: Sun, 17 May 2020 07:47:42 +0200 Subject: [PATCH] pg stream query support --- packages/api/package.json | 3 +- packages/api/src/utility/driverConnect.js | 2 + packages/engines/postgres/index.js | 72 ++++++++++++++++++++++- yarn.lock | 12 ++++ 4 files changed, 86 insertions(+), 3 deletions(-) diff --git a/packages/api/package.json b/packages/api/package.json index 081ab229..ed111681 100644 --- a/packages/api/package.json +++ b/packages/api/package.json @@ -19,7 +19,8 @@ "mssql": "^6.0.1", "mysql": "^2.17.1", "nedb-promises": "^4.0.1", - "pg": "^7.17.0" + "pg": "^7.17.0", + "pg-query-stream": "^3.1.1" }, "scripts": { "start": "nodemon src/index.js", diff --git a/packages/api/src/utility/driverConnect.js b/packages/api/src/utility/driverConnect.js index 31d2a612..83221e7f 100644 --- a/packages/api/src/utility/driverConnect.js +++ b/packages/api/src/utility/driverConnect.js @@ -1,11 +1,13 @@ const mssql = require('mssql'); const mysql = require('mysql'); const pg = require('pg'); +const pgQueryStream = require('pg-query-stream'); const nativeModules = { mssql, mysql, pg, + pgQueryStream, }; function driverConnect(driver, connection) { diff --git a/packages/engines/postgres/index.js b/packages/engines/postgres/index.js index df55521c..f06b6dab 100644 --- a/packages/engines/postgres/index.js +++ b/packages/engines/postgres/index.js @@ -1,10 +1,11 @@ +const _ = require('lodash'); const PostgreAnalyser = require('./PostgreAnalyser'); const PostgreDumper = require('./PostgreDumper'); /** @type {import('@dbgate/types').SqlDialect} */ const dialect = { rangeSelect: true, - stringEscapeChar: "\\", + stringEscapeChar: '\\', quoteIdentifier(s) { return '"' + s + '"'; }, @@ -13,7 +14,13 @@ const dialect = { /** @type {import('@dbgate/types').EngineDriver} */ const driver = { async connect(nativeModules, { server, port, user, password, database }) { - const client = new nativeModules.pg.Client({ host: server, port, user, password, database: database || 'postgres' }); + const client = new nativeModules.pg.Client({ + host: server, + port, + user, + password, + database: database || 'postgres', + }); await client.connect(); client._nativeModules = nativeModules; return client; @@ -22,6 +29,67 @@ const driver = { const res = await client.query(sql); return { rows: res.rows, columns: res.fields }; }, + async stream(client, sql, options) { + const query = new client._nativeModules.pgQueryStream(sql); + const stream = client.query(query); + + // const handleInfo = (info) => { + // const { message, lineNumber, procName } = info; + // options.info({ + // message, + // line: lineNumber, + // procedure: procName, + // time: new Date(), + // severity: 'info', + // }); + // }; + + let wasHeader = false; + + const handleEnd = (result) => { + // console.log('RESULT', result); + options.done(result); + }; + + const handleReadable = () => { + let row = stream.read(); + if (!wasHeader && row) { + options.recordset(_.keys(row).map((columnName) => ({ columnName }))); + wasHeader = true; + } + + while (row) { + options.row(row); + row = stream.read(); + } + }; + + // const handleFields = (columns) => { + // // console.log('FIELDS', columns[0].name); + // options.recordset(columns); + // // options.recordset(extractColumns(columns)); + // }; + + const handleError = (error) => { + console.log('ERROR', error); + const { message, lineNumber, procName } = error; + options.info({ + message, + line: lineNumber, + procedure: procName, + time: new Date(), + severity: 'error', + }); + }; + + stream.on('error', handleError); + stream.on('readable', handleReadable); + // stream.on('result', handleRow) + // stream.on('data', handleRow) + stream.on('end', handleEnd); + + return stream; + }, async getVersion(client) { const { rows } = await this.query(client, 'SELECT version()'); const { version } = rows[0]; diff --git a/yarn.lock b/yarn.lock index 2807db94..a46cca70 100644 --- a/yarn.lock +++ b/yarn.lock @@ -8315,6 +8315,11 @@ pg-connection-string@0.1.3: resolved "https://registry.yarnpkg.com/pg-connection-string/-/pg-connection-string-0.1.3.tgz#da1847b20940e42ee1492beaf65d49d91b245df7" integrity sha1-2hhHsglA5C7hSSvq9l1J2RskXfc= +pg-cursor@^2.2.1: + version "2.2.1" + resolved "https://registry.yarnpkg.com/pg-cursor/-/pg-cursor-2.2.1.tgz#f0f35f9e729889d795c8191141a6b15c5f8b18a7" + integrity sha512-C0DKcb8do7Mv9tTQvrB+hxPYgJ6FCKnu1CjPMb0txYHW+zULpOH0B01MNtjQA4nrhHJ4Qs1Nf58BGEc158wXIA== + pg-int8@1.0.1: version "1.0.1" resolved "https://registry.yarnpkg.com/pg-int8/-/pg-int8-1.0.1.tgz#943bd463bf5b71b4170115f80f8efc9a0c0eb78c" @@ -8330,6 +8335,13 @@ pg-pool@^2.0.10: resolved "https://registry.yarnpkg.com/pg-pool/-/pg-pool-2.0.10.tgz#842ee23b04e86824ce9d786430f8365082d81c4a" integrity sha512-qdwzY92bHf3nwzIUcj+zJ0Qo5lpG/YxchahxIN8+ZVmXqkahKXsnl2aiJPHLYN9o5mB/leG+Xh6XKxtP7e0sjg== +pg-query-stream@^3.1.1: + version "3.1.1" + resolved "https://registry.yarnpkg.com/pg-query-stream/-/pg-query-stream-3.1.1.tgz#f73c5d5252dd0728bbf7f67e53c1ac09713ea245" + integrity sha512-jkIgIzBPWEHqePfA5dKbjsN9dCFIlGnLQ3pEIhU10OhgyOmi0CuP8cGLNgCbCnbbtZEaSuyCAYpe/rtwYMoL9w== + dependencies: + pg-cursor "^2.2.1" + pg-types@^2.1.0: version "2.2.0" resolved "https://registry.yarnpkg.com/pg-types/-/pg-types-2.2.0.tgz#2d0250d636454f7cfa3b6ae0382fdfa8063254a3"