From c9fefd14fda190efec6013acf156451c7a357dfe Mon Sep 17 00:00:00 2001 From: Jan Prochazka Date: Sat, 23 Jan 2021 17:08:17 +0100 Subject: [PATCH] fixed server connection status bug --- .../api/src/controllers/serverConnections.js | 69 ++++++++++--------- 1 file changed, 38 insertions(+), 31 deletions(-) diff --git a/packages/api/src/controllers/serverConnections.js b/packages/api/src/controllers/serverConnections.js index a5e92ebe..bbb058c5 100644 --- a/packages/api/src/controllers/serverConnections.js +++ b/packages/api/src/controllers/serverConnections.js @@ -2,6 +2,8 @@ const connections = require('./connections'); const socket = require('../utility/socket'); const { fork } = require('child_process'); const _ = require('lodash'); +const AsyncLock = require('async-lock'); +const lock = new AsyncLock(); module.exports = { opened: [], @@ -22,34 +24,37 @@ module.exports = { handle_ping() {}, async ensureOpened(conid) { - const existing = this.opened.find(x => x.conid == conid); - if (existing) return existing; - const connection = await connections.get({ conid }); - const subprocess = fork(process.argv[1], ['serverConnectionProcess', ...process.argv.slice(3)]); - const newOpened = { - conid, - subprocess, - databases: [], - connection, - status: { - name: 'pending', - }, - disconnected: false, - }; - this.opened.push(newOpened); - delete this.closed[conid]; - socket.emitChanged(`server-status-changed`); - // @ts-ignore - subprocess.on('message', ({ msgtype, ...message }) => { - if (newOpened.disconnected) return; - this[`handle_${msgtype}`](conid, message); + const res = await lock.acquire(conid, async () => { + const existing = this.opened.find(x => x.conid == conid); + if (existing) return existing; + const connection = await connections.get({ conid }); + const subprocess = fork(process.argv[1], ['serverConnectionProcess', ...process.argv.slice(3)]); + const newOpened = { + conid, + subprocess, + databases: [], + connection, + status: { + name: 'pending', + }, + disconnected: false, + }; + this.opened.push(newOpened); + delete this.closed[conid]; + socket.emitChanged(`server-status-changed`); + // @ts-ignore + subprocess.on('message', ({ msgtype, ...message }) => { + if (newOpened.disconnected) return; + this[`handle_${msgtype}`](conid, message); + }); + subprocess.on('exit', () => { + if (newOpened.disconnected) return; + this.close(conid, false); + }); + subprocess.send({ msgtype: 'connect', ...connection }); + return newOpened; }); - subprocess.on('exit', () => { - if (newOpened.disconnected) return; - this.close(conid, false); - }); - subprocess.send({ msgtype: 'connect', ...connection }); - return newOpened; + return res; }, close(conid, kill = true) { @@ -82,10 +87,12 @@ module.exports = { ping_meta: 'post', async ping({ connections }) { - for (const conid of connections) { - const opened = await this.ensureOpened(conid); - opened.subprocess.send({ msgtype: 'ping' }); - } + await Promise.all( + connections.map(async conid => { + const opened = await this.ensureOpened(conid); + opened.subprocess.send({ msgtype: 'ping' }); + }) + ); return { status: 'ok' }; },