diff --git a/Common/AnalyticsModels/BaseModel.ts b/Common/AnalyticsModels/BaseModel.ts index c5008dc679..f4d61f06db 100644 --- a/Common/AnalyticsModels/BaseModel.ts +++ b/Common/AnalyticsModels/BaseModel.ts @@ -9,6 +9,7 @@ import EnableWorkflowOn from '../Types/BaseDatabase/EnableWorkflowOn'; import ObjectID from '../Types/ObjectID'; import CommonModel from './CommonModel'; import Route from '../Types/API/Route'; +import EnableRealtimeEventsOn from '../Utils/Realtime'; export default class AnalyticsBaseModel extends CommonModel { public constructor(data: { @@ -23,6 +24,7 @@ export default class AnalyticsBaseModel extends CommonModel { accessControl?: TableAccessControl | undefined; primaryKeys: Array; // this should be the subset of tableColumns enableWorkflowOn?: EnableWorkflowOn | undefined; + enableRealtimeEventsOn?: EnableRealtimeEventsOn | undefined }) { super({ tableColumns: data.tableColumns, @@ -103,6 +105,7 @@ export default class AnalyticsBaseModel extends CommonModel { this.accessControl = data.accessControl; this.enableWorkflowOn = data.enableWorkflowOn; this.crudApiPath = data.crudApiPath; + this.enableRealtimeEventsOn = data.enableRealtimeEventsOn; // initialize Arrays. for (const column of this.tableColumns) { @@ -144,6 +147,16 @@ export default class AnalyticsBaseModel extends CommonModel { this._tableEngine = v; } + + private _enableRealtimeEventsOn : EnableRealtimeEventsOn | undefined; + public get enableRealtimeEventsOn() : EnableRealtimeEventsOn| undefined { + return this._enableRealtimeEventsOn; + } + public set enableRealtimeEventsOn(v : EnableRealtimeEventsOn| undefined) { + this._enableRealtimeEventsOn = v; + } + + private _primaryKeys: Array = []; public get primaryKeys(): Array { return this._primaryKeys; diff --git a/Common/Utils/Realtime.ts b/Common/Utils/Realtime.ts index 63523eec9e..b331508624 100644 --- a/Common/Utils/Realtime.ts +++ b/Common/Utils/Realtime.ts @@ -17,4 +17,12 @@ export interface ListenToModelEventJSON { query: JSONObject; eventType: ModelEventType; tenantId: string; -} \ No newline at end of file + select: JSONObject; +} + +export default interface EnableRealtimeEventsOn { + create?: boolean | undefined; + update?: boolean | undefined; + delete?: boolean | undefined; + read?: boolean | undefined; +} diff --git a/CommonServer/Infrastructure/SocketIO.ts b/CommonServer/Infrastructure/SocketIO.ts index 285fd61448..9581c86161 100644 --- a/CommonServer/Infrastructure/SocketIO.ts +++ b/CommonServer/Infrastructure/SocketIO.ts @@ -1,6 +1,9 @@ import SocketIO from 'socket.io'; import http from 'http'; import Express, { ExpressApplication } from '../Utils/Express'; +import { createAdapter } from "@socket.io/redis-adapter"; +import Redis from './Redis'; +import DatabaseNotConnectedException from 'Common/Types/Exception/DatabaseNotConnectedException'; const app: ExpressApplication = Express.getExpressApp(); const server: http.Server = http.createServer(app); @@ -8,7 +11,7 @@ const server: http.Server = http.createServer(app); export type Socket = SocketIO.Socket; const io: SocketIO.Server = new SocketIO.Server(server, { - path: '/realtime/socket.io', + path: '/realtime/socket', transports: ['websocket', 'polling'], // Using websocket does not require sticky session perMessageDeflate: { threshold: 1024, // Defaults to 1024 @@ -22,4 +25,14 @@ const io: SocketIO.Server = new SocketIO.Server(server, { }, }); + +if(!Redis.getClient()){ + throw new DatabaseNotConnectedException('Redis is not connected. Please connect to Redis before connecting to SocketIO.'); +} + +const pubClient = Redis.getClient()!.duplicate(); +const subClient = Redis.getClient()!.duplicate(); + +io.adapter(createAdapter(pubClient, subClient)); + export default io; diff --git a/CommonServer/Services/AnalyticsDatabaseService.ts b/CommonServer/Services/AnalyticsDatabaseService.ts index df40f773d1..fae173d409 100644 --- a/CommonServer/Services/AnalyticsDatabaseService.ts +++ b/CommonServer/Services/AnalyticsDatabaseService.ts @@ -47,6 +47,8 @@ import StatementGenerator from '../Utils/AnalyticsDatabase/StatementGenerator'; import CountBy from '../Types/AnalyticsDatabase/CountBy'; import DeleteOneBy from '../Types/AnalyticsDatabase/DeleteOneBy'; import UpdateOneBy from '../Types/AnalyticsDatabase/UpdateOneBy'; +import Realtime from '../Utils/Realtime'; +import { ModelEventType } from 'Common/Utils/Realtime'; export default class AnalyticsDatabaseService< TBaseModel extends AnalyticsBaseModel @@ -684,6 +686,18 @@ export default class AnalyticsDatabaseService< } } + // emit realtime events to the client. + if(this.getModel().enableRealtimeEventsOn?.create && createBy.props.tenantId){ + for(const item of items){ + await Realtime.emitModelEvent({ + model: item, + tenantId: createBy.props.tenantId, + eventType: ModelEventType.Create, + modelType: this.modelType, + }) + } + } + return createBy.items; } catch (error) { await this.onCreateError(error as Exception); diff --git a/CommonServer/Utils/Realtime.ts b/CommonServer/Utils/Realtime.ts index bba2e7aa47..0c76bfa7e5 100644 --- a/CommonServer/Utils/Realtime.ts +++ b/CommonServer/Utils/Realtime.ts @@ -1,9 +1,80 @@ +import { JSONObject } from "Common/Types/JSON"; import io from "../Infrastructure/SocketIO"; +import { EventName, ListenToModelEventJSON, ModelEventType } from 'Common/Utils/Realtime'; +import { Socket } from "socket.io"; +import DatabaseType from "Common/Types/BaseDatabase/DatabaseType"; +import JSONFunctions from "Common/Types/JSONFunctions"; +import BadDataException from "Common/Types/Exception/BadDataException"; +import ObjectID from "Common/Types/ObjectID"; +import BaseModel from "Common/Models/BaseModel"; +import AnalyticsBaseModel from "Common/AnalyticsModels/BaseModel"; + export default class Realtime { - public static joinRoom() + + public static getRoomId(tenantId: string | ObjectID, modelName: string, eventType: ModelEventType): string { + return tenantId.toString() + '-' + modelName + '-' + eventType; + } + + public static async listenToModelEvent(socket: Socket, data: ListenToModelEventJSON): Promise { + // join the room. + socket.join(this.getRoomId(data.tenantId, data.modelName, data.eventType)); + } + + public static stopListeningToModelEvent(socket: Socket, data: ListenToModelEventJSON): void { + // leave this room. + socket.leave(this.getRoomId(data.tenantId, data.modelName, data.eventType)); + } + + public static emitModelEvent(data: { tenantId: string | ObjectID, eventType: ModelEventType, model: BaseModel | AnalyticsBaseModel, modelType: { new(): BaseModel | AnalyticsBaseModel } }): void { + + let jsonObject: JSONObject = {}; + + if (data.model instanceof BaseModel) { + jsonObject = BaseModel.toJSON(data.model, data.modelType as { new(): BaseModel }); + } + + if (data.model instanceof AnalyticsBaseModel) { + jsonObject = AnalyticsBaseModel.toJSON(data.model, data.modelType as { new(): AnalyticsBaseModel }); + } + + io.to(this.getRoomId(data.tenantId, data.model.tableName!, data.eventType)).emit(this.getRoomId(data.tenantId, data.model.tableName!, data.eventType), jsonObject); + } } io.on('connection', (socket) => { - + socket.on(EventName.ListenToModalEvent, async (data: JSONObject) => { + + // TODO: validate if this soocket has access to this tenant + + // TODO: validate if this socket has access to this model + + // TODO: validate if this socket has access to this event type + + // TODO: validate if this socket has access to this query + + // TODO: validate if this socket has access to this select + + // validate data + + if (typeof data['eventType'] !== 'string') throw new BadDataException('eventType is not a string'); + if (typeof data['modelType'] !== 'string') throw new BadDataException('modelType is not a string'); + if (typeof data['modelName'] !== 'string') throw new BadDataException('modelName is not a string'); + if (typeof data['query'] !== 'object') throw new BadDataException('query is not an object'); + if (typeof data['tenantId'] !== 'string') throw new BadDataException('tenantId is not a string'); + if (typeof data['select'] !== 'object') throw new BadDataException('select is not an object'); + + Realtime.listenToModelEvent(socket, { + eventType: data['eventType'] as ModelEventType, + modelType: data['modelType'] as DatabaseType, + modelName: data['modelName'] as string, + query: JSONFunctions.deserialize( + data['query'] as JSONObject, + ), + tenantId: data['tenantId'] as string, + select: JSONFunctions.deserialize( + data['select'] as JSONObject, + ) + }); + }) }); \ No newline at end of file diff --git a/CommonServer/package-lock.json b/CommonServer/package-lock.json index f053c6d764..1c8502cc78 100644 --- a/CommonServer/package-lock.json +++ b/CommonServer/package-lock.json @@ -14,6 +14,7 @@ "@opentelemetry/api": "^1.1.0", "@opentelemetry/auto-instrumentations-node": "^0.31.0", "@opentelemetry/sdk-node": "^0.30.0", + "@socket.io/redis-adapter": "^8.2.1", "@types/ejs": "^3.1.1", "@types/gridfs-stream": "^0.5.35", "@types/json2csv": "^5.0.3", @@ -3536,6 +3537,22 @@ "resolved": "https://registry.npmjs.org/@socket.io/component-emitter/-/component-emitter-3.1.0.tgz", "integrity": "sha512-+9jVqKhRSpsc591z5vX+X5Yyw+he/HCB4iQ/RYxw35CEPaY1gnsNE43nf9n9AaYjAQrTiI/mOwKUKdUs9vf7Xg==" }, + "node_modules/@socket.io/redis-adapter": { + "version": "8.2.1", + "resolved": "https://registry.npmjs.org/@socket.io/redis-adapter/-/redis-adapter-8.2.1.tgz", + "integrity": "sha512-6Dt7EZgGSBP0qvXeOKGx7NnSr2tPMbVDfDyL97zerZo+v69hMfL99skMCL3RKZlWVqLyRme2T0wcy3udHhtOsg==", + "dependencies": { + "debug": "~4.3.1", + "notepack.io": "~3.0.1", + "uid2": "1.0.0" + }, + "engines": { + "node": ">=10.0.0" + }, + "peerDependencies": { + "socket.io-adapter": "^2.4.0" + } + }, "node_modules/@sqltools/formatter": { "version": "1.2.5", "resolved": "https://registry.npmjs.org/@sqltools/formatter/-/formatter-1.2.5.tgz", @@ -7968,6 +7985,11 @@ "node": ">=0.10.0" } }, + "node_modules/notepack.io": { + "version": "3.0.1", + "resolved": "https://registry.npmjs.org/notepack.io/-/notepack.io-3.0.1.tgz", + "integrity": "sha512-TKC/8zH5pXIAMVQio2TvVDTtPRX+DJPHDqjRbxogtFiByHyzKmy96RA0JtCQJ+WouyyL4A10xomQzgbUT+1jCg==" + }, "node_modules/npm-run-path": { "version": "4.0.1", "resolved": "https://registry.npmjs.org/npm-run-path/-/npm-run-path-4.0.1.tgz", @@ -9920,6 +9942,14 @@ "node": ">=0.8.0" } }, + "node_modules/uid2": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/uid2/-/uid2-1.0.0.tgz", + "integrity": "sha512-+I6aJUv63YAcY9n4mQreLUt0d4lvwkkopDNmpomkAUz0fAkEMV9pRWxN0EjhW1YfRhcuyHg2v3mwddCDW1+LFQ==", + "engines": { + "node": ">= 4.0.0" + } + }, "node_modules/undici": { "version": "5.21.2", "resolved": "https://registry.npmjs.org/undici/-/undici-5.21.2.tgz", diff --git a/CommonServer/package.json b/CommonServer/package.json index 42d8025005..8085e3625d 100644 --- a/CommonServer/package.json +++ b/CommonServer/package.json @@ -16,6 +16,7 @@ "@opentelemetry/api": "^1.1.0", "@opentelemetry/auto-instrumentations-node": "^0.31.0", "@opentelemetry/sdk-node": "^0.30.0", + "@socket.io/redis-adapter": "^8.2.1", "@types/ejs": "^3.1.1", "@types/gridfs-stream": "^0.5.35", "@types/json2csv": "^5.0.3", diff --git a/CommonUI/src/Utils/Realtime.ts b/CommonUI/src/Utils/Realtime.ts index cbfd0bdb7e..ad61e43bfb 100644 --- a/CommonUI/src/Utils/Realtime.ts +++ b/CommonUI/src/Utils/Realtime.ts @@ -9,12 +9,15 @@ import { REALTIME_URL } from "../Config"; import URL from "Common/Types/API/URL"; import JSONFunctions from "Common/Types/JSONFunctions"; import DatabaseType from "Common/Types/BaseDatabase/DatabaseType"; +import AnalyticsSelect from "./AnalyticsModelAPI/Select"; +import Select from "./ModelAPI/Select"; export interface ListenToAnalyticsModelEvent { modelType: { new(): Model }, query: AnalyticsQuery, eventType: ModelEventType, - tenantId: ObjectID + tenantId: ObjectID, + select: AnalyticsSelect } export interface ListenToModelEvent { @@ -22,6 +25,7 @@ export interface ListenToModelEvent { query: Query, tenantId: ObjectID eventType: ModelEventType, + select: Select } export default class Reatime { @@ -29,7 +33,7 @@ export default class Reatime { private socket!: Socket; public constructor(){ - const socket: Socket = SocketIO(URL.fromString(REALTIME_URL.toString()).addRoute("/socket.io").toString()); + const socket: Socket = SocketIO(URL.fromString(REALTIME_URL.toString()).addRoute("/socket").toString()); this.socket = socket; } @@ -41,7 +45,8 @@ export default class Reatime { modelType: DatabaseType.Database, modelName: listenToModelEvent.modelType.name, query: JSONFunctions.serialize(listenToModelEvent.query), - tenantId: listenToModelEvent.tenantId.toString() + tenantId: listenToModelEvent.tenantId.toString(), + select: JSONFunctions.serialize(listenToModelEvent.select) } this.socket.emit(EventName.ListenToModalEvent, listenToModelEventJSON); @@ -54,7 +59,8 @@ export default class Reatime { modelType: DatabaseType.AnalyticsDatabase, modelName: listenToModelEvent.modelType.name, query: JSONFunctions.serialize(listenToModelEvent.query), - tenantId: listenToModelEvent.tenantId.toString() + tenantId: listenToModelEvent.tenantId.toString(), + select: JSONFunctions.serialize(listenToModelEvent.select) } this.socket.emit(EventName.ListenToModalEvent, listenToModelEventJSON); diff --git a/Model/AnalyticsModels/Log.ts b/Model/AnalyticsModels/Log.ts index 1f44e0cf9e..540435b2f2 100644 --- a/Model/AnalyticsModels/Log.ts +++ b/Model/AnalyticsModels/Log.ts @@ -12,6 +12,9 @@ export default class Log extends AnalyticsBaseModel { tableName: 'Log', tableEngine: AnalyticsTableEngine.MergeTree, singularName: 'Log', + enableRealtimeEventsOn: { + create: true, + }, pluralName: 'Logs', crudApiPath: new Route('/logs'), tableColumns: [ diff --git a/Nginx/default.conf.template b/Nginx/default.conf.template index ab833847c4..0e3a6478db 100644 --- a/Nginx/default.conf.template +++ b/Nginx/default.conf.template @@ -479,6 +479,21 @@ server { client_max_body_size 50M; } + location /realtime { + proxy_set_header Host $host; + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + proxy_set_header X-Forwarded-Proto $scheme; + + # enable WebSockets (for ws://sockjs not connected error in the accounts source: https://stackoverflow.com/questions/41381444/websocket-connection-failed-error-during-websocket-handshake-unexpected-respon) + proxy_http_version 1.1; + proxy_set_header Upgrade $http_upgrade; + proxy_set_header Connection "upgrade"; + proxy_pass http://dashboard-api; + + client_max_body_size 50M; + } + location /analytics-api { proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr;