From da1617729bc7e09f6a5878fd677cadd8542c2d99 Mon Sep 17 00:00:00 2001 From: Jan Prochazka Date: Thu, 22 Oct 2020 08:23:53 +0200 Subject: [PATCH] jsl data refactor --- packages/api/src/controllers/jsldata.js | 174 +++++++++--------- packages/api/src/proc/index.js | 2 + packages/api/src/proc/jslDatastoreProcess.js | 57 ++++++ packages/api/src/utility/DatastoreProxy.js | 55 ++++++ .../api/src/utility/JsonLinesDatastore.js | 115 ++++++++++++ 5 files changed, 318 insertions(+), 85 deletions(-) create mode 100644 packages/api/src/proc/jslDatastoreProcess.js create mode 100644 packages/api/src/utility/DatastoreProxy.js create mode 100644 packages/api/src/utility/JsonLinesDatastore.js diff --git a/packages/api/src/controllers/jsldata.js b/packages/api/src/controllers/jsldata.js index 9c71bb63..b451c130 100644 --- a/packages/api/src/controllers/jsldata.js +++ b/packages/api/src/controllers/jsldata.js @@ -1,6 +1,9 @@ const fs = require('fs'); const lineReader = require('line-reader'); +const { off } = require('process'); +const DatastoreProxy = require('../utility/DatastoreProxy'); const getJslFileName = require('../utility/getJslFileName'); +const JsonLinesDatastore = require('../utility/JsonLinesDatastore'); const socket = require('../utility/socket'); function readFirstLine(file) { @@ -19,76 +22,85 @@ function readFirstLine(file) { }); } - module.exports = { - openedReaders: {}, + datastores: {}, - closeReader(jslid) { - // console.log('CLOSING READER'); - if (!this.openedReaders[jslid]) return Promise.resolve(); - return new Promise((resolve, reject) => { - this.openedReaders[jslid].reader.close((err) => { - if (err) reject(err); - delete this.openedReaders[jslid]; - resolve(); - }); - }); - }, + // closeReader(jslid) { + // // console.log('CLOSING READER'); + // if (!this.openedReaders[jslid]) return Promise.resolve(); + // return new Promise((resolve, reject) => { + // this.openedReaders[jslid].reader.close((err) => { + // if (err) reject(err); + // delete this.openedReaders[jslid]; + // resolve(); + // }); + // }); + // }, - readLine(readerInfo) { - return new Promise((resolve, reject) => { - const { reader } = readerInfo; - if (!reader.hasNextLine()) { - resolve(null); - return; - } - reader.nextLine((err, line) => { - if (readerInfo.readedSchemaRow) readerInfo.readedDataRowCount += 1; - else readerInfo.readedSchemaRow = true; - if (err) reject(err); - resolve(line); - }); - }); - }, + // readLine(readerInfo) { + // return new Promise((resolve, reject) => { + // const { reader } = readerInfo; + // if (!reader.hasNextLine()) { + // resolve(null); + // return; + // } + // reader.nextLine((err, line) => { + // if (readerInfo.readedSchemaRow) readerInfo.readedDataRowCount += 1; + // else readerInfo.readedSchemaRow = true; + // if (err) reject(err); + // resolve(line); + // }); + // }); + // }, - openReader(jslid) { - // console.log('OPENING READER'); - // console.log( - // 'OPENING READER, LINES=', - // fs.readFileSync(path.join(jsldir(), `${jslid}.jsonl`), 'utf-8').split('\n').length - // ); - const file = getJslFileName(jslid); - return new Promise((resolve, reject) => - lineReader.open(file, (err, reader) => { - if (err) reject(err); - const readerInfo = { - reader, - readedDataRowCount: 0, - readedSchemaRow: false, - isReading: true, - }; - this.openedReaders[jslid] = readerInfo; - resolve(readerInfo); - }) - ); - }, + // openReader(jslid) { + // // console.log('OPENING READER'); + // // console.log( + // // 'OPENING READER, LINES=', + // // fs.readFileSync(path.join(jsldir(), `${jslid}.jsonl`), 'utf-8').split('\n').length + // // ); + // const file = getJslFileName(jslid); + // return new Promise((resolve, reject) => + // lineReader.open(file, (err, reader) => { + // if (err) reject(err); + // const readerInfo = { + // reader, + // readedDataRowCount: 0, + // readedSchemaRow: false, + // isReading: true, + // }; + // this.openedReaders[jslid] = readerInfo; + // resolve(readerInfo); + // }) + // ); + // }, - async ensureReader(jslid, offset) { - if (this.openedReaders[jslid] && this.openedReaders[jslid].readedDataRowCount > offset) { - await this.closeReader(jslid); + // async ensureReader(jslid, offset) { + // if (this.openedReaders[jslid] && this.openedReaders[jslid].readedDataRowCount > offset) { + // await this.closeReader(jslid); + // } + // let readerInfo = this.openedReaders[jslid]; + // if (!this.openedReaders[jslid]) { + // readerInfo = await this.openReader(jslid); + // } + // readerInfo.isReading = true; + // if (!readerInfo.readedSchemaRow) { + // await this.readLine(readerInfo); // skip structure + // } + // while (readerInfo.readedDataRowCount < offset) { + // await this.readLine(readerInfo); + // } + // return readerInfo; + // }, + + async ensureDatastore(jslid) { + let datastore = this.datastores[jslid]; + if (!datastore) { + datastore = new JsonLinesDatastore(getJslFileName(jslid)); + // datastore = new DatastoreProxy(getJslFileName(jslid)); + this.datastores[jslid] = datastore; } - let readerInfo = this.openedReaders[jslid]; - if (!this.openedReaders[jslid]) { - readerInfo = await this.openReader(jslid); - } - readerInfo.isReading = true; - if (!readerInfo.readedSchemaRow) { - await this.readLine(readerInfo); // skip structure - } - while (readerInfo.readedDataRowCount < offset) { - await this.readLine(readerInfo); - } - return readerInfo; + return datastore; }, getInfo_meta: 'get', @@ -101,20 +113,8 @@ module.exports = { getRows_meta: 'get', async getRows({ jslid, offset, limit }) { - const readerInfo = await this.ensureReader(jslid, offset); - const res = []; - for (let i = 0; i < limit; i += 1) { - const line = await this.readLine(readerInfo); - if (line == null) break; - res.push(JSON.parse(line)); - } - readerInfo.isReading = false; - if (readerInfo.closeAfterReadAndSendStats) { - await this.closeReader(jslid); - socket.emit(`jsldata-stats-${jslid}`, readerInfo.closeAfterReadAndSendStats); - readerInfo.closeAfterReadAndSendStats = null; - } - return res; + const datastore = await this.ensureDatastore(jslid); + return datastore.getRows(offset, limit); }, getStats_meta: 'get', @@ -126,12 +126,16 @@ module.exports = { async notifyChangedStats(stats) { console.log('SENDING STATS', JSON.stringify(stats)); - const readerInfo = this.openedReaders[stats.jslid]; - if (readerInfo && readerInfo.isReading) { - readerInfo.closeAfterReadAndSendStats = stats; - } else { - await this.closeReader(stats.jslid); - socket.emit(`jsldata-stats-${stats.jslid}`, stats); - } + const datastore = this.datastores[stats.jslid]; + if (datastore) await datastore.notifyChanged(); + socket.emit(`jsldata-stats-${stats.jslid}`, stats); + + // const readerInfo = this.openedReaders[stats.jslid]; + // if (readerInfo && readerInfo.isReading) { + // readerInfo.closeAfterReadAndSendStats = stats; + // } else { + // await this.closeReader(stats.jslid); + // socket.emit(`jsldata-stats-${stats.jslid}`, stats); + // } }, }; diff --git a/packages/api/src/proc/index.js b/packages/api/src/proc/index.js index 8292b416..b6f0be5a 100644 --- a/packages/api/src/proc/index.js +++ b/packages/api/src/proc/index.js @@ -2,10 +2,12 @@ const connectProcess = require('./connectProcess'); const databaseConnectionProcess = require('./databaseConnectionProcess'); const serverConnectionProcess = require('./serverConnectionProcess'); const sessionProcess = require('./sessionProcess'); +const jslDatastoreProcess = require('./jslDatastoreProcess'); module.exports = { connectProcess, databaseConnectionProcess, serverConnectionProcess, sessionProcess, + jslDatastoreProcess, }; diff --git a/packages/api/src/proc/jslDatastoreProcess.js b/packages/api/src/proc/jslDatastoreProcess.js new file mode 100644 index 00000000..3b3b112e --- /dev/null +++ b/packages/api/src/proc/jslDatastoreProcess.js @@ -0,0 +1,57 @@ +const childProcessChecker = require('../utility/childProcessChecker'); +const JsonLinesDatastore = require('../utility/JsonLinesDatastore'); + +let lastPing = null; +let datastore = new JsonLinesDatastore(); + +function handlePing() { + lastPing = new Date().getTime(); +} + +function handleOpen({file }) { + handlePing(); + datastore = new JsonLinesDatastore(file); +} + +async function handleRead({ msgid, offset, limit }) { + handlePing(); + const rows = await datastore.getRows(offset, limit); + process.send({ msgtype: 'response', msgid, rows }); +} + +function handleNotifyChanged() { + datastore.notifyChanged(); +} + +const messageHandlers = { + open: handleOpen, + read: handleRead, + ping: handlePing, + notifyChanged: handleNotifyChanged, +}; + +async function handleMessage({ msgtype, ...other }) { + const handler = messageHandlers[msgtype]; + await handler(other); +} + +function start() { + childProcessChecker(); + + setInterval(() => { + const time = new Date().getTime(); + if (time - lastPing > 60 * 1000) { + process.exit(0); + } + }, 60 * 1000); + + process.on('message', async (message) => { + try { + await handleMessage(message); + } catch (e) { + process.send({ msgtype: 'error', error: e.message }); + } + }); +} + +module.exports = { start }; diff --git a/packages/api/src/utility/DatastoreProxy.js b/packages/api/src/utility/DatastoreProxy.js new file mode 100644 index 00000000..437b3153 --- /dev/null +++ b/packages/api/src/utility/DatastoreProxy.js @@ -0,0 +1,55 @@ +const { fork } = require('child_process'); +const uuidv1 = require('uuid/v1'); + +class DatastoreProxy { + constructor(file) { + this.subprocess = null; + this.disconnected = false; + this.file = file; + this.requests = {}; + this.handle_response = this.handle_response.bind(this); + this.handle_ping = this.handle_ping.bind(this); + } + + // handle_response({ msgid, rows }) { + handle_response({ msgid, rows }) { + const [resolve, reject] = this.requests[msgid]; + resolve(rows); + delete this.requests[msgid]; + } + + handle_ping() {} + + + async ensureSubprocess() { + if (!this.subprocess) { + this.subprocess = fork(process.argv[1], ['jslDatastoreProcess']); + + // @ts-ignore + this.subprocess.on('message', ({ msgtype, ...message }) => { + // if (this.disconnected) return; + this[`handle_${msgtype}`](message); + }); + this.subprocess.on('exit', () => { + // if (this.disconnected) return; + this.subprocess = null; + }); + this.subprocess.send({ msgtype: 'open', file: this.file }); + } + return this.subprocess; + } + + async getRows(offset, limit) { + await this.ensureSubprocess(); + const msgid = uuidv1(); + const promise = new Promise((resolve, reject) => { + this.requests[msgid] = [resolve, reject]; + this.subprocess.send({ msgtype: 'read', msgid, offset, limit }); + }); + return promise; + } + + async notifyChanged() {} +} + +module.exports = DatastoreProxy; diff --git a/packages/api/src/utility/JsonLinesDatastore.js b/packages/api/src/utility/JsonLinesDatastore.js new file mode 100644 index 00000000..eed74914 --- /dev/null +++ b/packages/api/src/utility/JsonLinesDatastore.js @@ -0,0 +1,115 @@ +const lineReader = require('line-reader'); + +class JsonLinesDatastore { + constructor(file) { + this.file = file; + this.readerInfo = { + reader: null, + readedDataRowCount: 0, + readedSchemaRow: false, + isReading: false, + closeAfterRead: null, + closeAfterReadPromise: null, + }; + } + + async closeReader() { + if (!this.readerInfo) return Promise.resolve(); + return new Promise((resolve, reject) => { + this.readerInfo.reader.close((err) => { + if (err) reject(err); + resolve(); + }); + }); + } + + async notifyChanged() { + if (this.readerInfo && this.readerInfo.isReading) { + if (this.readerInfo.closeAfterRead) { + return this.readerInfo.closeAfterReadPromise; + } + const promise = new Promise((resolve, reject) => { + this.readerInfo.closeAfterRead = resolve; + }); + this.readerInfo.closeAfterReadPromise = promise; + return promise; + } else { + await this.closeReader(); + } + } + + async openReader() { + return new Promise((resolve, reject) => + lineReader.open(this.file, (err, reader) => { + if (err) reject(err); + + const readerInfo = { + reader, + readedDataRowCount: 0, + readedSchemaRow: false, + isReading: true, + closeAfterRead: null, + closeAfterReadPromise: null, + }; + this.readerInfo = readerInfo; + resolve(readerInfo); + }) + ); + } + + readLine(readerInfo) { + return new Promise((resolve, reject) => { + const { reader } = readerInfo; + if (!reader.hasNextLine()) { + resolve(null); + return; + } + reader.nextLine((err, line) => { + if (readerInfo.readedSchemaRow) readerInfo.readedDataRowCount += 1; + else readerInfo.readedSchemaRow = true; + if (err) reject(err); + resolve(line); + }); + }); + } + + async ensureReader(offset) { + if (this.readerInfo && this.readerInfo.readedDataRowCount > offset) { + await this.closeReader(); + } + let readerInfo = this.readerInfo; + if (!readerInfo || !readerInfo.reader) { + readerInfo = await this.openReader(); + } + readerInfo.isReading = true; + if (!readerInfo.readedSchemaRow) { + await this.readLine(readerInfo); // skip structure + } + while (readerInfo.readedDataRowCount < offset) { + await this.readLine(readerInfo); + } + return readerInfo; + } + + async getRows(offset, limit) { + const readerInfo = await this.ensureReader(offset); + const res = []; + for (let i = 0; i < limit; i += 1) { + const line = await this.readLine(readerInfo); + if (line == null) break; + res.push(JSON.parse(line)); + } + readerInfo.isReading = false; + if (readerInfo.closeAfterRead) { + await this.closeReader(); + // socket.emit(`jsldata-stats-${jslid}`, readerInfo.closeAfterReadAndSendStats); + const resolve = readerInfo.closeAfterRead; + readerInfo.closeAfterRead = null; + readerInfo.closeAfterReadPromise = null; + resolve(); + } + return res; + } +} + +module.exports = JsonLinesDatastore;