From 72375ec6356af5c2465d607548ae297a2b43ff28 Mon Sep 17 00:00:00 2001 From: Jan Prochazka Date: Sun, 5 Apr 2020 20:48:04 +0200 Subject: [PATCH] query - basic print workflow - messages on client --- .../src/controllers/databaseConnections.js | 2 +- packages/api/src/controllers/sessions.js | 68 ++++++++++++++++++ packages/api/src/main.js | 2 + packages/api/src/proc/index.js | 2 + packages/api/src/proc/sessionProcess.js | 69 +++++++++++++++++++ packages/engines/mssql/index.js | 29 ++++++++ packages/types/engines.d.ts | 17 +++-- packages/types/index.d.ts | 7 ++ packages/web/src/query/MessagesView.js | 24 +++++++ packages/web/src/query/QueryToolbar.js | 14 ++-- packages/web/src/query/SessionMessagesView.js | 21 ++++++ packages/web/src/tabs/QueryTab.js | 62 ++++++++++++++--- packages/web/src/utility/useFetch.js | 22 ++++-- 13 files changed, 315 insertions(+), 24 deletions(-) create mode 100644 packages/api/src/controllers/sessions.js create mode 100644 packages/api/src/proc/sessionProcess.js create mode 100644 packages/web/src/query/MessagesView.js create mode 100644 packages/web/src/query/SessionMessagesView.js diff --git a/packages/api/src/controllers/databaseConnections.js b/packages/api/src/controllers/databaseConnections.js index f56ea79b..a3b69da6 100644 --- a/packages/api/src/controllers/databaseConnections.js +++ b/packages/api/src/controllers/databaseConnections.js @@ -49,7 +49,7 @@ module.exports = { }, /** @param {import('@dbgate/types').OpenedDatabaseConnection} conn */ - async sendRequest(conn, message) { + sendRequest(conn, message) { const msgid = uuidv1(); const promise = new Promise((resolve, reject) => { this.requests[msgid] = [resolve, reject]; diff --git a/packages/api/src/controllers/sessions.js b/packages/api/src/controllers/sessions.js new file mode 100644 index 00000000..8216b17c --- /dev/null +++ b/packages/api/src/controllers/sessions.js @@ -0,0 +1,68 @@ +const _ = require('lodash'); +const uuidv1 = require('uuid/v1'); +const connections = require('./connections'); +const socket = require('../utility/socket'); +const { fork } = require('child_process'); +const DatabaseAnalyser = require('@dbgate/engines/default/DatabaseAnalyser'); + +module.exports = { + /** @type {import('@dbgate/types').OpenedSession[]} */ + opened: [], + + handle_error(sesid, props) { + const { error } = props; + console.log(`Error in database session ${sesid}: ${error}`); + }, + + // handle_row(sesid, props) { + // const { row } = props; + // socket.emit('sessionRow', row); + // }, + + handle_info(sesid, props) { + const { info } = props; + socket.emit(`session-info-${sesid}`, info); + }, + + create_meta: 'post', + async create({ conid, database }) { + const sesid = uuidv1(); + const connection = await connections.get({ conid }); + const subprocess = fork(process.argv[1], ['sessionProcess']); + const newOpened = { + conid, + database, + subprocess, + connection, + sesid, + }; + this.opened.push(newOpened); + // @ts-ignore + subprocess.on('message', ({ msgtype, ...message }) => { + this[`handle_${msgtype}`](sesid, message); + }); + subprocess.send({ msgtype: 'connect', ...connection, database }); + return newOpened; + }, + + executeQuery_meta: 'post', + async executeQuery({ sesid, sql }) { + const session = this.opened.find((x) => x.sesid == sesid); + if (!session) { + throw new Error('Invalid session'); + } + + console.log(`Processing query, sesid=${sesid}, sql=${sql}`); + session.subprocess.send({ msgtype: 'executeQuery', sql }); + + return { state: 'ok' }; + }, + + // runCommand_meta: 'post', + // async runCommand({ conid, database, sql }) { + // console.log(`Running SQL command , conid=${conid}, database=${database}, sql=${sql}`); + // const opened = await this.ensureOpened(conid, database); + // const res = await this.sendRequest(opened, { msgtype: 'queryData', sql }); + // return res; + // }, +}; diff --git a/packages/api/src/main.js b/packages/api/src/main.js index 806b1e73..f4c39d26 100644 --- a/packages/api/src/main.js +++ b/packages/api/src/main.js @@ -10,6 +10,7 @@ const connections = require('./controllers/connections'); const serverConnections = require('./controllers/serverConnections'); const databaseConnections = require('./controllers/databaseConnections'); const tables = require('./controllers/tables'); +const sessions = require('./controllers/sessions'); const socket = require('./utility/socket'); function start() { @@ -27,6 +28,7 @@ function start() { useController(app, '/server-connections', serverConnections); useController(app, '/database-connections', databaseConnections); useController(app, '/tables', tables); + useController(app, '/sessions', sessions); if (fs.existsSync('/home/dbgate-docker/build')) { // server static files inside docker container diff --git a/packages/api/src/proc/index.js b/packages/api/src/proc/index.js index a4f87bce..8292b416 100644 --- a/packages/api/src/proc/index.js +++ b/packages/api/src/proc/index.js @@ -1,9 +1,11 @@ const connectProcess = require('./connectProcess'); const databaseConnectionProcess = require('./databaseConnectionProcess'); const serverConnectionProcess = require('./serverConnectionProcess'); +const sessionProcess = require('./sessionProcess'); module.exports = { connectProcess, databaseConnectionProcess, serverConnectionProcess, + sessionProcess, }; diff --git a/packages/api/src/proc/sessionProcess.js b/packages/api/src/proc/sessionProcess.js new file mode 100644 index 00000000..6ad0f49e --- /dev/null +++ b/packages/api/src/proc/sessionProcess.js @@ -0,0 +1,69 @@ +const engines = require('@dbgate/engines'); +const driverConnect = require('../utility/driverConnect'); + +let systemConnection; +let storedConnection; +let afterConnectCallbacks = []; + +async function handleConnect(connection) { + storedConnection = connection; + + const driver = engines(storedConnection); + systemConnection = await driverConnect(driver, storedConnection); + for (const [resolve, reject] of afterConnectCallbacks) { + resolve(); + } + afterConnectCallbacks = []; +} + +function waitConnected() { + if (systemConnection) return Promise.resolve(); + return new Promise((resolve, reject) => { + afterConnectCallbacks.push([resolve, reject]); + }); +} + +async function handleExecuteQuery({ sql }) { + await waitConnected(); + const driver = engines(storedConnection); + + await driver.stream(systemConnection, sql, { + recordset: (columns) => { + process.send({ msgtype: 'recordset', columns }); + }, + row: (row) => { + process.send({ msgtype: 'row', row }); + }, + error: (error) => { + process.send({ msgtype: 'error', error }); + }, + done: (result) => { + process.send({ msgtype: 'done', result }); + }, + info: (info) => { + process.send({ msgtype: 'info', info }); + }, + }); +} + +const messageHandlers = { + connect: handleConnect, + executeQuery: handleExecuteQuery, +}; + +async function handleMessage({ msgtype, ...other }) { + const handler = messageHandlers[msgtype]; + await handler(other); +} + +function start() { + process.on('message', async (message) => { + try { + await handleMessage(message); + } catch (e) { + process.send({ msgtype: 'error', error: e.message }); + } + }); +} + +module.exports = { start }; diff --git a/packages/engines/mssql/index.js b/packages/engines/mssql/index.js index 1f2f5a3e..f026ba13 100644 --- a/packages/engines/mssql/index.js +++ b/packages/engines/mssql/index.js @@ -44,6 +44,35 @@ const driver = { } return res; }, + async stream(pool, sql, options) { + const request = await pool.request(); + + const handleInfo = (info) => { + const { message, lineNumber, procName } = info; + options.info({ + message, + line: lineNumber, + procedure: procName, + time: new Date(), + }); + }; + + const handleDone = (result) => { + console.log('RESULT', result); + }; + + const handleRow = (row) => { + console.log('ROW', row); + }; + + request.stream = true; + request.on('recordset', options.recordset); + request.on('row', handleRow); + request.on('error', options.error); + request.on('done', handleDone); + request.on('info', handleInfo); + request.query(sql); + }, async getVersion(pool) { const { version } = (await this.query(pool, 'SELECT @@VERSION AS version')).rows[0]; return { version }; diff --git a/packages/types/engines.d.ts b/packages/types/engines.d.ts index dc634388..854ea7cf 100644 --- a/packages/types/engines.d.ts +++ b/packages/types/engines.d.ts @@ -1,12 +1,21 @@ -import { QueryResult } from "./query"; -import { SqlDialect } from "./dialect"; -import { SqlDumper } from "./dumper"; -import { DatabaseInfo } from "./dbinfo"; +import { QueryResult } from './query'; +import { SqlDialect } from './dialect'; +import { SqlDumper } from './dumper'; +import { DatabaseInfo } from './dbinfo'; + +export interface StreamOptions { + recordset: (columns) => void; + row: (row) => void; + error: (error) => void; + done: (result) => void; + info: (info) => void; +} export interface EngineDriver { engine: string; connect(nativeModules, { server, port, user, password, database }): any; query(pool: any, sql: string): Promise; + stream(pool: any, sql: string, options: StreamOptions); getVersion(pool: any): Promise<{ version: string }>; listDatabases( pool: any diff --git a/packages/types/index.d.ts b/packages/types/index.d.ts index fbe26c0b..1313c615 100644 --- a/packages/types/index.d.ts +++ b/packages/types/index.d.ts @@ -7,6 +7,13 @@ export interface OpenedDatabaseConnection { subprocess: ChildProcess; } +export interface OpenedSession { + sesid: string; + conid: string; + database: string; + subprocess: ChildProcess; +} + export interface StoredConnection { engine: string; server: string; diff --git a/packages/web/src/query/MessagesView.js b/packages/web/src/query/MessagesView.js new file mode 100644 index 00000000..25462116 --- /dev/null +++ b/packages/web/src/query/MessagesView.js @@ -0,0 +1,24 @@ +import React from 'react'; + +export default function MessagesView({ items }) { + return ( + + + + + + + + + {items.map((row, index) => ( + + + + + + + + ))} +
NumberMessageTimeProcedureLine
{index + 1}{row.message}{row.time}{row.procedure}{row.line}
+ ); +} diff --git a/packages/web/src/query/QueryToolbar.js b/packages/web/src/query/QueryToolbar.js index 13a23538..8e5a7a00 100644 --- a/packages/web/src/query/QueryToolbar.js +++ b/packages/web/src/query/QueryToolbar.js @@ -1,8 +1,10 @@ -import React from 'react' -import ToolbarButton from '../widgets/ToolbarButton' +import React from 'react'; +import ToolbarButton from '../widgets/ToolbarButton'; -export default function QueryToolbar() { - return <> - {}}>Execute +export default function QueryToolbar({ execute,isDatabaseDefined }) { + return ( + <> + Execute -} \ No newline at end of file + ); +} diff --git a/packages/web/src/query/SessionMessagesView.js b/packages/web/src/query/SessionMessagesView.js new file mode 100644 index 00000000..2a17a334 --- /dev/null +++ b/packages/web/src/query/SessionMessagesView.js @@ -0,0 +1,21 @@ +import React from 'react'; +import MessagesView from './MessagesView'; +import useSocket from '../utility/SocketProvider'; + +export default function SessionMessagesView({ sessionId }) { + const [messages, setMessages] = React.useState([]); + const socket = useSocket(); + + const handleInfo = React.useCallback((info) => setMessages((items) => [...items, info]), []); + + React.useEffect(() => { + if (sessionId && socket) { + socket.on(`session-info-${sessionId}`, handleInfo); + return () => { + socket.off(`session-info-${sessionId}`, handleInfo); + }; + } + }, [sessionId, socket]); + + return ; +} diff --git a/packages/web/src/tabs/QueryTab.js b/packages/web/src/tabs/QueryTab.js index 8bf3a1e9..42e31961 100644 --- a/packages/web/src/tabs/QueryTab.js +++ b/packages/web/src/tabs/QueryTab.js @@ -1,17 +1,30 @@ import React from 'react'; import ReactDOM from 'react-dom'; import _ from 'lodash'; +import axios from '../utility/axios'; import engines from '@dbgate/engines'; import useTableInfo from '../utility/useTableInfo'; import useConnectionInfo from '../utility/useConnectionInfo'; import SqlEditor from '../sqleditor/SqlEditor'; import { useUpdateDatabaseForTab } from '../utility/globalState'; import QueryToolbar from '../query/QueryToolbar'; +import styled from 'styled-components'; +import SessionMessagesView from '../query/SessionMessagesView'; + +const MainContainer = styled.div``; + +const EditorContainer = styled.div` + height: 600px; + position: relative; +`; + +const MessagesContainer = styled.div``; export default function QueryTab({ tabid, conid, database, tabVisible, toolbarPortalRef }) { const localStorageKey = `sql_${tabid}`; const [queryText, setQueryText] = React.useState(() => localStorage.getItem(localStorageKey) || ''); const queryTextRef = React.useRef(queryText); + const [sessionId, setSessionId] = React.useState(null); const saveToStorage = React.useCallback(() => localStorage.setItem(localStorageKey, queryTextRef.current), [ localStorageKey, @@ -22,26 +35,57 @@ export default function QueryTab({ tabid, conid, database, tabVisible, toolbarPo React.useEffect(() => { window.addEventListener('beforeunload', saveToStorage); return () => { + saveToStorage(); window.removeEventListener('beforeunload', saveToStorage); }; }, []); useUpdateDatabaseForTab(tabVisible, conid, database); + const connection = useConnectionInfo(conid); - const handleChange = text => { + const handleChange = (text) => { if (text != null) queryTextRef.current = text; setQueryText(text); saveToStorageDebounced(); }; - return ( - <> - + const handleExecute = async () => { + let sesid = sessionId; + if (!sesid) { + const resp = await axios.post('sessions/create', { + conid, + database, + }); + sesid = resp.data.sesid; + setSessionId(sesid); + } + const resp2 = await axios.post('sessions/execute-query', { + sesid, + sql: queryText, + }); + }; - {toolbarPortalRef && - toolbarPortalRef.current && - tabVisible && - ReactDOM.createPortal(, toolbarPortalRef.current)} - + return ( + + + + + {toolbarPortalRef && + toolbarPortalRef.current && + tabVisible && + ReactDOM.createPortal( + , + toolbarPortalRef.current + )} + + + + + ); } diff --git a/packages/web/src/utility/useFetch.js b/packages/web/src/utility/useFetch.js index 490aa299..4cd18127 100644 --- a/packages/web/src/utility/useFetch.js +++ b/packages/web/src/utility/useFetch.js @@ -16,9 +16,9 @@ export default function useFetch({ const [loadCounter, setLoadCounter] = React.useState(0); const socket = useSocket(); - const handleReload = () => { - setLoadCounter(loadCounter + 1); - }; + const handleReload = React.useCallback(() => { + setLoadCounter((counter) => counter + 1); + }, []); const indicators = [url, stableStringify(data), stableStringify(params), loadCounter]; @@ -32,15 +32,29 @@ export default function useFetch({ }); setValue([resp.data, loadedIndicators]); } + + // React.useEffect(() => { + // loadValue(indicators); + // if (reloadTrigger && socket) { + // socket.on(reloadTrigger, handleReload); + // return () => { + // socket.off(reloadTrigger, handleReload); + // }; + // } + // }, [...indicators, socket]); + React.useEffect(() => { loadValue(indicators); + }, [...indicators]); + + React.useEffect(() => { if (reloadTrigger && socket) { socket.on(reloadTrigger, handleReload); return () => { socket.off(reloadTrigger, handleReload); }; } - }, [...indicators, socket]); + }, [socket, reloadTrigger]); const [returnValue, loadedIndicators] = value; if (_.isEqual(indicators, loadedIndicators)) return returnValue;