From 20fccf51d9b524d29cb8fd0ecb4a0f71e7b45217 Mon Sep 17 00:00:00 2001 From: Jan Prochazka Date: Thu, 8 Apr 2021 17:49:57 +0200 Subject: [PATCH] stream header flag + export from mongo --- packages/api/src/proc/sessionProcess.js | 8 ++++- packages/api/src/shell/copyStream.js | 6 +++- packages/api/src/shell/jsonLinesReader.js | 16 ++++++--- packages/api/src/shell/jsonLinesWriter.js | 10 ++++-- packages/api/src/shell/tableReader.js | 8 ++++- .../src/utility/EnsureStreamHeaderStream.js | 35 +++++++++++++++++++ .../src/appobj/DatabaseObjectAppObject.svelte | 4 +++ .../web/src/impexp/FormTablesSelect.svelte | 6 +++- .../web/src/impexp/SourceTargetConfig.svelte | 2 +- 9 files changed, 83 insertions(+), 12 deletions(-) create mode 100644 packages/api/src/utility/EnsureStreamHeaderStream.js diff --git a/packages/api/src/proc/sessionProcess.js b/packages/api/src/proc/sessionProcess.js index 4dae1f2b..46b4c7fe 100644 --- a/packages/api/src/proc/sessionProcess.js +++ b/packages/api/src/proc/sessionProcess.js @@ -22,7 +22,13 @@ class TableWriter { this.currentFile = path.join(jsldir(), `${this.jslid}.jsonl`); this.currentRowCount = 0; this.currentChangeIndex = 1; - fs.writeFileSync(this.currentFile, JSON.stringify(structure) + '\n'); + fs.writeFileSync( + this.currentFile, + JSON.stringify({ + ...structure, + __isStreamHeader: true, + }) + '\n' + ); this.currentStream = fs.createWriteStream(this.currentFile, { flags: 'a' }); this.writeCurrentStats(false, false); this.resultIndex = resultIndex; diff --git a/packages/api/src/shell/copyStream.js b/packages/api/src/shell/copyStream.js index dab470ee..353c7369 100644 --- a/packages/api/src/shell/copyStream.js +++ b/packages/api/src/shell/copyStream.js @@ -1,9 +1,13 @@ +const EnsureStreamHeaderStream = require('../utility/EnsureStreamHeaderStream'); + function copyStream(input, output) { return new Promise((resolve, reject) => { + const ensureHeader = new EnsureStreamHeaderStream(); const finisher = output['finisher'] || output; finisher.on('finish', resolve); finisher.on('error', reject); - input.pipe(output); + input.pipe(ensureHeader); + ensureHeader.pipe(output); }); } diff --git a/packages/api/src/shell/jsonLinesReader.js b/packages/api/src/shell/jsonLinesReader.js index f92f70dd..55c9ca11 100644 --- a/packages/api/src/shell/jsonLinesReader.js +++ b/packages/api/src/shell/jsonLinesReader.js @@ -3,9 +3,8 @@ const stream = require('stream'); const byline = require('byline'); class ParseStream extends stream.Transform { - constructor({ header, limitRows }) { + constructor({ limitRows }) { super({ objectMode: true }); - this.header = header; this.wasHeader = false; this.limitRows = limitRows; this.rowsWritten = 0; @@ -13,7 +12,14 @@ class ParseStream extends stream.Transform { _transform(chunk, encoding, done) { const obj = JSON.parse(chunk); if (!this.wasHeader) { - if (!this.header) this.push({ columns: Object.keys(obj).map(columnName => ({ columnName })) }); + if ( + !obj.__isStreamHeader && + // TODO remove isArray test + !Array.isArray(obj.columns) + ) { + this.push({ columns: Object.keys(obj).map(columnName => ({ columnName })) }); + } + this.wasHeader = true; } if (!this.limitRows || this.rowsWritten < this.limitRows) { @@ -24,12 +30,12 @@ class ParseStream extends stream.Transform { } } -async function jsonLinesReader({ fileName, encoding = 'utf-8', header = true, limitRows = undefined }) { +async function jsonLinesReader({ fileName, encoding = 'utf-8', limitRows = undefined }) { console.log(`Reading file ${fileName}`); const fileStream = fs.createReadStream(fileName, encoding); const liner = byline(fileStream); - const parser = new ParseStream({ header, limitRows }); + const parser = new ParseStream({ limitRows }); liner.pipe(parser); return parser; } diff --git a/packages/api/src/shell/jsonLinesWriter.js b/packages/api/src/shell/jsonLinesWriter.js index df31b5fe..4df935fa 100644 --- a/packages/api/src/shell/jsonLinesWriter.js +++ b/packages/api/src/shell/jsonLinesWriter.js @@ -8,10 +8,16 @@ class StringifyStream extends stream.Transform { this.wasHeader = false; } _transform(chunk, encoding, done) { + let skip = false; if (!this.wasHeader) { - if (this.header) this.push(JSON.stringify(chunk) + '\n'); + skip = + (chunk.__isStreamHeader || + // TODO remove isArray test + Array.isArray(chunk.columns)) && + !this.header; this.wasHeader = true; - } else { + } + if (!skip) { this.push(JSON.stringify(chunk) + '\n'); } done(); diff --git a/packages/api/src/shell/tableReader.js b/packages/api/src/shell/tableReader.js index a468185e..46e7fe16 100644 --- a/packages/api/src/shell/tableReader.js +++ b/packages/api/src/shell/tableReader.js @@ -1,6 +1,5 @@ const { quoteFullName, fullNameToString } = require('dbgate-tools'); const requireEngineDriver = require('../utility/requireEngineDriver'); -const { decryptConnection } = require('../utility/crypting'); const connectUtility = require('../utility/connectUtility'); async function tableReader({ connection, pureName, schemaName }) { @@ -10,6 +9,13 @@ async function tableReader({ connection, pureName, schemaName }) { const fullName = { pureName, schemaName }; + if (driver.dialect.nosql) { + // @ts-ignore + console.log(`Reading collection ${fullNameToString(fullName)}`); + // @ts-ignore + return await driver.readQuery(pool, JSON.stringify(fullName)); + } + const table = await driver.analyseSingleObject(pool, fullName, 'tables'); const query = `select * from ${quoteFullName(driver.dialect, fullName)}`; if (table) { diff --git a/packages/api/src/utility/EnsureStreamHeaderStream.js b/packages/api/src/utility/EnsureStreamHeaderStream.js new file mode 100644 index 00000000..8a0e2665 --- /dev/null +++ b/packages/api/src/utility/EnsureStreamHeaderStream.js @@ -0,0 +1,35 @@ +const stream = require('stream'); + +class EnsureStreamHeaderStream extends stream.Transform { + constructor() { + super({ objectMode: true }); + this.wasHeader = false; + } + _transform(chunk, encoding, done) { + if (!this.wasHeader) { + if (chunk.__isDynamicStructure) { + // ignore dynamic structure header + done(); + return; + } + + if ( + !chunk.__isStreamHeader && + // TODO remove isArray test + !Array.isArray(chunk.columns) + ) { + this.push({ + __isStreamHeader: true, + __isComputedStructure: true, + columns: Object.keys(chunk).map(columnName => ({ columnName })), + }); + } + + this.wasHeader = true; + } + this.push(chunk); + done(); + } +} + +module.exports = EnsureStreamHeaderStream; diff --git a/packages/web/src/appobj/DatabaseObjectAppObject.svelte b/packages/web/src/appobj/DatabaseObjectAppObject.svelte index 1597041a..521789cd 100644 --- a/packages/web/src/appobj/DatabaseObjectAppObject.svelte +++ b/packages/web/src/appobj/DatabaseObjectAppObject.svelte @@ -202,6 +202,10 @@ }, }, }, + { + label: 'Export', + isExport: true, + }, ], }; diff --git a/packages/web/src/impexp/FormTablesSelect.svelte b/packages/web/src/impexp/FormTablesSelect.svelte index 25e35e51..2757ab6f 100644 --- a/packages/web/src/impexp/FormTablesSelect.svelte +++ b/packages/web/src/impexp/FormTablesSelect.svelte @@ -15,7 +15,11 @@ const { values, setFieldValue } = getFormContext(); $: dbinfo = useDatabaseInfo({ conid: $values[conidName], database: $values[databaseName] }); - $: tablesOptions = [...(($dbinfo && $dbinfo.tables) || []), ...(($dbinfo && $dbinfo.views) || [])] + $: tablesOptions = [ + ...(($dbinfo && $dbinfo.tables) || []), + ...(($dbinfo && $dbinfo.views) || []), + ...(($dbinfo && $dbinfo.collections) || []), + ] .filter(x => !$values[schemaName] || x.schemaName == $values[schemaName]) .map(x => ({ value: x.pureName, diff --git a/packages/web/src/impexp/SourceTargetConfig.svelte b/packages/web/src/impexp/SourceTargetConfig.svelte index e0fc6284..547864b2 100644 --- a/packages/web/src/impexp/SourceTargetConfig.svelte +++ b/packages/web/src/impexp/SourceTargetConfig.svelte @@ -87,7 +87,7 @@ schemaName={schemaNameField} databaseName={databaseNameField} name={tablesField} - label="Tables / views" + label="Tables / views / collections" /> {/if} {/if}