Add realtime events support to

AnalyticsDatabaseService and BaseModel
This commit is contained in:
Simon Larsen 2023-11-15 18:49:43 +00:00
parent ee84a082dc
commit e12ea14d85
No known key found for this signature in database
GPG Key ID: AB45983AA9C81CDE
10 changed files with 182 additions and 8 deletions

View File

@ -9,6 +9,7 @@ import EnableWorkflowOn from '../Types/BaseDatabase/EnableWorkflowOn';
import ObjectID from '../Types/ObjectID';
import CommonModel from './CommonModel';
import Route from '../Types/API/Route';
import EnableRealtimeEventsOn from '../Utils/Realtime';
export default class AnalyticsBaseModel extends CommonModel {
public constructor(data: {
@ -23,6 +24,7 @@ export default class AnalyticsBaseModel extends CommonModel {
accessControl?: TableAccessControl | undefined;
primaryKeys: Array<string>; // this should be the subset of tableColumns
enableWorkflowOn?: EnableWorkflowOn | undefined;
enableRealtimeEventsOn?: EnableRealtimeEventsOn | undefined
}) {
super({
tableColumns: data.tableColumns,
@ -103,6 +105,7 @@ export default class AnalyticsBaseModel extends CommonModel {
this.accessControl = data.accessControl;
this.enableWorkflowOn = data.enableWorkflowOn;
this.crudApiPath = data.crudApiPath;
this.enableRealtimeEventsOn = data.enableRealtimeEventsOn;
// initialize Arrays.
for (const column of this.tableColumns) {
@ -144,6 +147,16 @@ export default class AnalyticsBaseModel extends CommonModel {
this._tableEngine = v;
}
private _enableRealtimeEventsOn : EnableRealtimeEventsOn | undefined;
public get enableRealtimeEventsOn() : EnableRealtimeEventsOn| undefined {
return this._enableRealtimeEventsOn;
}
public set enableRealtimeEventsOn(v : EnableRealtimeEventsOn| undefined) {
this._enableRealtimeEventsOn = v;
}
private _primaryKeys: Array<string> = [];
public get primaryKeys(): Array<string> {
return this._primaryKeys;

View File

@ -17,4 +17,12 @@ export interface ListenToModelEventJSON {
query: JSONObject;
eventType: ModelEventType;
tenantId: string;
}
select: JSONObject;
}
export default interface EnableRealtimeEventsOn {
create?: boolean | undefined;
update?: boolean | undefined;
delete?: boolean | undefined;
read?: boolean | undefined;
}

View File

@ -1,6 +1,9 @@
import SocketIO from 'socket.io';
import http from 'http';
import Express, { ExpressApplication } from '../Utils/Express';
import { createAdapter } from "@socket.io/redis-adapter";
import Redis from './Redis';
import DatabaseNotConnectedException from 'Common/Types/Exception/DatabaseNotConnectedException';
const app: ExpressApplication = Express.getExpressApp();
const server: http.Server = http.createServer(app);
@ -8,7 +11,7 @@ const server: http.Server = http.createServer(app);
export type Socket = SocketIO.Socket;
const io: SocketIO.Server = new SocketIO.Server(server, {
path: '/realtime/socket.io',
path: '/realtime/socket',
transports: ['websocket', 'polling'], // Using websocket does not require sticky session
perMessageDeflate: {
threshold: 1024, // Defaults to 1024
@ -22,4 +25,14 @@ const io: SocketIO.Server = new SocketIO.Server(server, {
},
});
if(!Redis.getClient()){
throw new DatabaseNotConnectedException('Redis is not connected. Please connect to Redis before connecting to SocketIO.');
}
const pubClient = Redis.getClient()!.duplicate();
const subClient = Redis.getClient()!.duplicate();
io.adapter(createAdapter(pubClient, subClient));
export default io;

View File

@ -47,6 +47,8 @@ import StatementGenerator from '../Utils/AnalyticsDatabase/StatementGenerator';
import CountBy from '../Types/AnalyticsDatabase/CountBy';
import DeleteOneBy from '../Types/AnalyticsDatabase/DeleteOneBy';
import UpdateOneBy from '../Types/AnalyticsDatabase/UpdateOneBy';
import Realtime from '../Utils/Realtime';
import { ModelEventType } from 'Common/Utils/Realtime';
export default class AnalyticsDatabaseService<
TBaseModel extends AnalyticsBaseModel
@ -684,6 +686,18 @@ export default class AnalyticsDatabaseService<
}
}
// emit realtime events to the client.
if(this.getModel().enableRealtimeEventsOn?.create && createBy.props.tenantId){
for(const item of items){
await Realtime.emitModelEvent({
model: item,
tenantId: createBy.props.tenantId,
eventType: ModelEventType.Create,
modelType: this.modelType,
})
}
}
return createBy.items;
} catch (error) {
await this.onCreateError(error as Exception);

View File

@ -1,9 +1,80 @@
import { JSONObject } from "Common/Types/JSON";
import io from "../Infrastructure/SocketIO";
import { EventName, ListenToModelEventJSON, ModelEventType } from 'Common/Utils/Realtime';
import { Socket } from "socket.io";
import DatabaseType from "Common/Types/BaseDatabase/DatabaseType";
import JSONFunctions from "Common/Types/JSONFunctions";
import BadDataException from "Common/Types/Exception/BadDataException";
import ObjectID from "Common/Types/ObjectID";
import BaseModel from "Common/Models/BaseModel";
import AnalyticsBaseModel from "Common/AnalyticsModels/BaseModel";
export default class Realtime {
public static joinRoom()
public static getRoomId(tenantId: string | ObjectID, modelName: string, eventType: ModelEventType): string {
return tenantId.toString() + '-' + modelName + '-' + eventType;
}
public static async listenToModelEvent(socket: Socket, data: ListenToModelEventJSON): Promise<void> {
// join the room.
socket.join(this.getRoomId(data.tenantId, data.modelName, data.eventType));
}
public static stopListeningToModelEvent(socket: Socket, data: ListenToModelEventJSON): void {
// leave this room.
socket.leave(this.getRoomId(data.tenantId, data.modelName, data.eventType));
}
public static emitModelEvent(data: { tenantId: string | ObjectID, eventType: ModelEventType, model: BaseModel | AnalyticsBaseModel, modelType: { new(): BaseModel | AnalyticsBaseModel } }): void {
let jsonObject: JSONObject = {};
if (data.model instanceof BaseModel) {
jsonObject = BaseModel.toJSON(data.model, data.modelType as { new(): BaseModel });
}
if (data.model instanceof AnalyticsBaseModel) {
jsonObject = AnalyticsBaseModel.toJSON(data.model, data.modelType as { new(): AnalyticsBaseModel });
}
io.to(this.getRoomId(data.tenantId, data.model.tableName!, data.eventType)).emit(this.getRoomId(data.tenantId, data.model.tableName!, data.eventType), jsonObject);
}
}
io.on('connection', (socket) => {
socket.on(EventName.ListenToModalEvent, async (data: JSONObject) => {
// TODO: validate if this soocket has access to this tenant
// TODO: validate if this socket has access to this model
// TODO: validate if this socket has access to this event type
// TODO: validate if this socket has access to this query
// TODO: validate if this socket has access to this select
// validate data
if (typeof data['eventType'] !== 'string') throw new BadDataException('eventType is not a string');
if (typeof data['modelType'] !== 'string') throw new BadDataException('modelType is not a string');
if (typeof data['modelName'] !== 'string') throw new BadDataException('modelName is not a string');
if (typeof data['query'] !== 'object') throw new BadDataException('query is not an object');
if (typeof data['tenantId'] !== 'string') throw new BadDataException('tenantId is not a string');
if (typeof data['select'] !== 'object') throw new BadDataException('select is not an object');
Realtime.listenToModelEvent(socket, {
eventType: data['eventType'] as ModelEventType,
modelType: data['modelType'] as DatabaseType,
modelName: data['modelName'] as string,
query: JSONFunctions.deserialize(
data['query'] as JSONObject,
),
tenantId: data['tenantId'] as string,
select: JSONFunctions.deserialize(
data['select'] as JSONObject,
)
});
})
});

View File

@ -14,6 +14,7 @@
"@opentelemetry/api": "^1.1.0",
"@opentelemetry/auto-instrumentations-node": "^0.31.0",
"@opentelemetry/sdk-node": "^0.30.0",
"@socket.io/redis-adapter": "^8.2.1",
"@types/ejs": "^3.1.1",
"@types/gridfs-stream": "^0.5.35",
"@types/json2csv": "^5.0.3",
@ -3536,6 +3537,22 @@
"resolved": "https://registry.npmjs.org/@socket.io/component-emitter/-/component-emitter-3.1.0.tgz",
"integrity": "sha512-+9jVqKhRSpsc591z5vX+X5Yyw+he/HCB4iQ/RYxw35CEPaY1gnsNE43nf9n9AaYjAQrTiI/mOwKUKdUs9vf7Xg=="
},
"node_modules/@socket.io/redis-adapter": {
"version": "8.2.1",
"resolved": "https://registry.npmjs.org/@socket.io/redis-adapter/-/redis-adapter-8.2.1.tgz",
"integrity": "sha512-6Dt7EZgGSBP0qvXeOKGx7NnSr2tPMbVDfDyL97zerZo+v69hMfL99skMCL3RKZlWVqLyRme2T0wcy3udHhtOsg==",
"dependencies": {
"debug": "~4.3.1",
"notepack.io": "~3.0.1",
"uid2": "1.0.0"
},
"engines": {
"node": ">=10.0.0"
},
"peerDependencies": {
"socket.io-adapter": "^2.4.0"
}
},
"node_modules/@sqltools/formatter": {
"version": "1.2.5",
"resolved": "https://registry.npmjs.org/@sqltools/formatter/-/formatter-1.2.5.tgz",
@ -7968,6 +7985,11 @@
"node": ">=0.10.0"
}
},
"node_modules/notepack.io": {
"version": "3.0.1",
"resolved": "https://registry.npmjs.org/notepack.io/-/notepack.io-3.0.1.tgz",
"integrity": "sha512-TKC/8zH5pXIAMVQio2TvVDTtPRX+DJPHDqjRbxogtFiByHyzKmy96RA0JtCQJ+WouyyL4A10xomQzgbUT+1jCg=="
},
"node_modules/npm-run-path": {
"version": "4.0.1",
"resolved": "https://registry.npmjs.org/npm-run-path/-/npm-run-path-4.0.1.tgz",
@ -9920,6 +9942,14 @@
"node": ">=0.8.0"
}
},
"node_modules/uid2": {
"version": "1.0.0",
"resolved": "https://registry.npmjs.org/uid2/-/uid2-1.0.0.tgz",
"integrity": "sha512-+I6aJUv63YAcY9n4mQreLUt0d4lvwkkopDNmpomkAUz0fAkEMV9pRWxN0EjhW1YfRhcuyHg2v3mwddCDW1+LFQ==",
"engines": {
"node": ">= 4.0.0"
}
},
"node_modules/undici": {
"version": "5.21.2",
"resolved": "https://registry.npmjs.org/undici/-/undici-5.21.2.tgz",

View File

@ -16,6 +16,7 @@
"@opentelemetry/api": "^1.1.0",
"@opentelemetry/auto-instrumentations-node": "^0.31.0",
"@opentelemetry/sdk-node": "^0.30.0",
"@socket.io/redis-adapter": "^8.2.1",
"@types/ejs": "^3.1.1",
"@types/gridfs-stream": "^0.5.35",
"@types/json2csv": "^5.0.3",

View File

@ -9,12 +9,15 @@ import { REALTIME_URL } from "../Config";
import URL from "Common/Types/API/URL";
import JSONFunctions from "Common/Types/JSONFunctions";
import DatabaseType from "Common/Types/BaseDatabase/DatabaseType";
import AnalyticsSelect from "./AnalyticsModelAPI/Select";
import Select from "./ModelAPI/Select";
export interface ListenToAnalyticsModelEvent<Model extends AnalyticsBaseModel> {
modelType: { new(): Model },
query: AnalyticsQuery<Model>,
eventType: ModelEventType,
tenantId: ObjectID
tenantId: ObjectID,
select: AnalyticsSelect<Model>
}
export interface ListenToModelEvent<Model extends BaseModel> {
@ -22,6 +25,7 @@ export interface ListenToModelEvent<Model extends BaseModel> {
query: Query<Model>,
tenantId: ObjectID
eventType: ModelEventType,
select: Select<Model>
}
export default class Reatime {
@ -29,7 +33,7 @@ export default class Reatime {
private socket!: Socket;
public constructor(){
const socket: Socket = SocketIO(URL.fromString(REALTIME_URL.toString()).addRoute("/socket.io").toString());
const socket: Socket = SocketIO(URL.fromString(REALTIME_URL.toString()).addRoute("/socket").toString());
this.socket = socket;
}
@ -41,7 +45,8 @@ export default class Reatime {
modelType: DatabaseType.Database,
modelName: listenToModelEvent.modelType.name,
query: JSONFunctions.serialize(listenToModelEvent.query),
tenantId: listenToModelEvent.tenantId.toString()
tenantId: listenToModelEvent.tenantId.toString(),
select: JSONFunctions.serialize(listenToModelEvent.select)
}
this.socket.emit(EventName.ListenToModalEvent, listenToModelEventJSON);
@ -54,7 +59,8 @@ export default class Reatime {
modelType: DatabaseType.AnalyticsDatabase,
modelName: listenToModelEvent.modelType.name,
query: JSONFunctions.serialize(listenToModelEvent.query),
tenantId: listenToModelEvent.tenantId.toString()
tenantId: listenToModelEvent.tenantId.toString(),
select: JSONFunctions.serialize(listenToModelEvent.select)
}
this.socket.emit(EventName.ListenToModalEvent, listenToModelEventJSON);

View File

@ -12,6 +12,9 @@ export default class Log extends AnalyticsBaseModel {
tableName: 'Log',
tableEngine: AnalyticsTableEngine.MergeTree,
singularName: 'Log',
enableRealtimeEventsOn: {
create: true,
},
pluralName: 'Logs',
crudApiPath: new Route('/logs'),
tableColumns: [

View File

@ -479,6 +479,21 @@ server {
client_max_body_size 50M;
}
location /realtime {
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
# enable WebSockets (for ws://sockjs not connected error in the accounts source: https://stackoverflow.com/questions/41381444/websocket-connection-failed-error-during-websocket-handshake-unexpected-respon)
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_pass http://dashboard-api;
client_max_body_size 50M;
}
location /analytics-api {
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;