#187 split query stream

This commit is contained in:
Jan Prochazka 2021-10-28 11:00:05 +02:00
parent 81b1cda8c5
commit 55fb7ba8bb
4 changed files with 87 additions and 3 deletions

View File

@ -1,2 +1,2 @@
export * from './splitQuery';
export { splitQuery } from './splitQuery';
export * from './options';

View File

@ -166,7 +166,7 @@ function pushQuery(context: SplitLineContext) {
if (trimmed) context.pushOutput(trimmed);
}
function splitQueryLine(context: SplitLineContext) {
export function splitQueryLine(context: SplitLineContext) {
while (context.position < context.end) {
const token = scanToken(context);
if (!token) {
@ -221,6 +221,9 @@ function splitQueryLine(context: SplitLineContext) {
}
}
export function getInitialDelimiter(options: SplitterOptions) {
return options?.allowSemicolon === false ? null : SEMICOLON
}
export function splitQuery(sql: string, options: SplitterOptions = null): string[] {
const usedOptions = {
...defaultSplitterOptions,
@ -235,7 +238,7 @@ export function splitQuery(sql: string, options: SplitterOptions = null): string
const context: SplitLineContext = {
source: sql,
end: sql.length,
currentDelimiter: options?.allowSemicolon === false ? null : SEMICOLON,
currentDelimiter: getInitialDelimiter(options),
position: 0,
currentCommandStart: 0,
pushOutput: cmd => output.push(cmd),

View File

@ -0,0 +1,41 @@
import stream from 'stream';
import { SplitStreamContext, getInitialDelimiter, SplitLineContext, splitQueryLine } from './splitQuery';
import { SplitterOptions } from './options';
export class SplitQueryStream extends stream.Transform {
context: SplitStreamContext;
constructor(options: SplitterOptions) {
super({ objectMode: true });
this.context = {
commandPart: '',
options,
currentDelimiter: getInitialDelimiter(options),
pushOutput: cmd => this.push(cmd),
};
}
_transform(chunk, encoding, done) {
const lineContext: SplitLineContext = {
...this.context,
position: 0,
currentCommandStart: 0,
wasDataOnLine: false,
source: chunk,
end: chunk.length,
};
splitQueryLine(lineContext);
this.context.commandPart = lineContext.commandPart;
done();
}
_flush(done) {
const trimmed = this.context.commandPart;
if (trimmed) this.push(trimmed);
done();
}
}
export function splitQueryStream(sourceStream, options: SplitterOptions) {
const splitter = new SplitQueryStream(options);
sourceStream.pipe(splitter);
return splitter;
}

View File

@ -0,0 +1,40 @@
import { mysqlSplitterOptions, mssqlSplitterOptions, postgreSplitterOptions, noSplitSplitterOptions } from './options';
import stream from 'stream';
import { splitQueryStream } from './splitQueryStream';
function createInputStream(...lines) {
const pass = new stream.PassThrough({
objectMode: true,
});
lines.forEach(line => pass.write(line));
pass.end();
return pass;
}
function streamToArray(streamSource) {
return new Promise((resolve, reject) => {
const res = [];
streamSource.on('data', x => res.push(x));
streamSource.on('end', () => resolve(res));
});
}
test('stream: simple query', async () => {
const output = await streamToArray(splitQueryStream(createInputStream('select * from A'), mysqlSplitterOptions));
expect(output).toEqual(['select * from A']);
});
test('stream: query on 2 lines', async () => {
const output = await streamToArray(splitQueryStream(createInputStream('select * ', 'from A'), mysqlSplitterOptions));
expect(output).toEqual(['select * from A']);
});
test('stream: query on 2 lines', async () => {
const output = await streamToArray(
splitQueryStream(
createInputStream('SELECT * ', 'FROM `table1`;', 'SELECT *', ' FROM `table2`'),
mysqlSplitterOptions
)
);
expect(output).toEqual(['SELECT * FROM `table1`', 'SELECT * FROM `table2`']);
});