diff --git a/packages/api/package.json b/packages/api/package.json index c07a2c21..fd66f5cc 100644 --- a/packages/api/package.json +++ b/packages/api/package.json @@ -6,6 +6,7 @@ "dependencies": { "@dbgate/engines": "^0.1.0", "@dbgate/sqltree": "^0.1.0", + "async-lock": "^1.2.4", "axios": "^0.19.0", "body-parser": "^1.19.0", "bufferutil": "^4.0.1", diff --git a/packages/api/src/utility/JsonLinesDatastore.js b/packages/api/src/utility/JsonLinesDatastore.js index 7ed08076..95e6845d 100644 --- a/packages/api/src/utility/JsonLinesDatastore.js +++ b/packages/api/src/utility/JsonLinesDatastore.js @@ -1,72 +1,39 @@ const lineReader = require('line-reader'); +const AsyncLock = require('async-lock'); +const lock = new AsyncLock(); class JsonLinesDatastore { constructor(file) { this.file = file; this.reader = null; this.readedDataRowCount = 0; - this.readedSchemaRow = false; - this.isReading = false; - this.closeAfterRead = null; - this.closeAfterReadCallback = null; - this.closeAfterReadPromise = null; - - this.waitForReadyPromise = null; - this.waitForReadyResolve = null; - - // this.readerInfo = { - // reader: null, - // readedDataRowCount: 0, - // readedSchemaRow: false, - // isReading: false, - // closeAfterRead: null, - // closeAfterReadPromise: null, - // }; + this.notifyChangedCallback = null; } closeReader() { if (!this.reader) return; const reader = this.reader; this.reader = null; - reader.close(); - } - - waitForReady() { - if (this.isReading) { - if (this.waitForReadyResolve) { - return this.waitForReadyPromise; - } - const promise = new Promise((resolve, reject) => { - this.waitForReadyResolve = resolve; - }); - this.waitForReadyPromise = promise; - return promise; - } - return Promise.resolve(); + this.readedDataRowCount = 0; + this.readedSchemaRow = false; + reader.close(() => {}); } async notifyChanged(callback) { - if (this.isReading) { - this.closeAfterReadCallback = callback; - if (this.closeAfterRead) { - return this.closeAfterReadPromise; - } - const promise = new Promise((resolve, reject) => { - this.closeAfterRead = resolve; - }); - this.closeAfterReadPromise = promise; - return promise; - } else { + this.notifyChangedCallback = callback; + await lock.acquire('reader', async () => { this.closeReader(); - } + }); + const call = this.notifyChangedCallback; + this.notifyChangedCallback = null; + if (call) call(); } async openReader() { return new Promise((resolve, reject) => lineReader.open(this.file, (err, reader) => { if (err) reject(err); - resolve(reader); }) ); @@ -89,24 +56,12 @@ class JsonLinesDatastore { } async ensureReader(offset) { - console.log('ENSURE', offset); - for (;;) { - await this.waitForReady(); - if (this.readedDataRowCount > offset) { - this.closeReader(); - } - if (!this.reader) { - const reader = await this.openReader(); - if (this.isReading) { - reader.close(); // throw away this reader - continue; // reader is already used by other getRows, wait for free reader - } - this.reader = reader; - this.isReading = true; - break; - } else { - break; - } + if (this.readedDataRowCount > offset) { + this.closeReader(); + } + if (!this.reader) { + const reader = await this.openReader(); + this.reader = reader; } if (!this.readedSchemaRow) { await this.readLine(); // skip structure @@ -117,30 +72,16 @@ class JsonLinesDatastore { } async getRows(offset, limit) { - await this.ensureReader(offset); const res = []; - for (let i = 0; i < limit; i += 1) { - const line = await this.readLine(); - if (line == null) break; - res.push(JSON.parse(line)); - } - this.isReading = false; - if (this.closeAfterRead) { - if (this.closeAfterReadCallback) this.closeAfterReadCallback(); - this.closeReader(); - const resolve = this.closeAfterRead; - this.closeAfterRead = null; - this.closeAfterReadPromise = null; - this.closeAfterReadCallback = null; - resolve(); - } - if (this.waitForReadyResolve) { - const resolve = this.waitForReadyResolve; - this.waitForReadyResolve = null; - this.waitForReadyPromise = null; - resolve(); - } - console.log('RETURN', res.length); + await lock.acquire('reader', async () => { + await this.ensureReader(offset); + for (let i = 0; i < limit; i += 1) { + const line = await this.readLine(); + if (line == null) break; + res.push(JSON.parse(line)); + } + }); + // console.log('RETURN', res.length); return res; } } diff --git a/yarn.lock b/yarn.lock index aa5eee79..2f58d432 100644 --- a/yarn.lock +++ b/yarn.lock @@ -2265,6 +2265,11 @@ async-limiter@~1.0.0: resolved "https://registry.yarnpkg.com/async-limiter/-/async-limiter-1.0.1.tgz#dd379e94f0db8310b08291f9d64c3209766617fd" integrity sha512-csOlWGAcRFJaI6m+F2WKdnMKr4HhdhFVBk0H/QbJFMCr+uO2kwohwXQPxw/9OCxp05r5ghVBFSyioixx3gfkNQ== +async-lock@^1.2.4: + version "1.2.4" + resolved "https://registry.yarnpkg.com/async-lock/-/async-lock-1.2.4.tgz#80d0d612383045dd0c30eb5aad08510c1397cb91" + integrity sha512-UBQJC2pbeyGutIfYmErGc9RaJYnpZ1FHaxuKwb0ahvGiiCkPUf3p67Io+YLPmmv3RHY+mF6JEtNW8FlHsraAaA== + async@0.2.10: version "0.2.10" resolved "https://registry.yarnpkg.com/async/-/async-0.2.10.tgz#b6bbe0b0674b9d719708ca38de8c237cb526c3d1"