add clickhouse service

This commit is contained in:
Simon Larsen 2023-08-21 17:23:19 +01:00
parent c8d1af6daf
commit bdc4c5f4f2
No known key found for this signature in database
GPG Key ID: AB45983AA9C81CDE
10 changed files with 247 additions and 16 deletions

View File

@ -1,5 +1,7 @@
import TableColumnType from '../Types/BaseDatabase/TableColumnType';
import AnalyticsTableColumn from '../Types/AnalyticsDatabase/TableColumn';
import BadDataException from '../Types/Exception/BadDataException';
import AnalyticsTableEngine from '../Types/AnalyticsDatabase/AnalyticsTableEngine';
export default class AnalyticsDataModel {
private _tableColumns: Array<AnalyticsTableColumn> = [];
@ -18,13 +20,41 @@ export default class AnalyticsDataModel {
this._tableName = v;
}
private _tableEngine : AnalyticsTableEngine = AnalyticsTableEngine.MergeTree;
public get tableEngine() : AnalyticsTableEngine {
return this._tableEngine;
}
public set tableEngine(v : AnalyticsTableEngine) {
this._tableEngine = v;
}
private _primaryKeys : Array<string> = [];
public get primaryKeys() : Array<string> {
return this._primaryKeys;
}
public set primaryKeys(v : Array<string>) {
this._primaryKeys = v;
}
public constructor(data: {
tableName: string;
tableEngine?: AnalyticsTableEngine | undefined;
tableColumns: Array<AnalyticsTableColumn>;
primaryKeys: Array<string>; // this should be the subset of tableColumns
}) {
let columns: Array<AnalyticsTableColumn> = [...data.tableColumns];
this.tableName = data.tableName;
this.tableColumns.push(
if(data.tableEngine){
this.tableEngine = data.tableEngine;
}
columns.push(
new AnalyticsTableColumn({
key: '_id',
title: 'ID',
@ -34,7 +64,7 @@ export default class AnalyticsDataModel {
})
);
this.tableColumns.push(
columns.push(
new AnalyticsTableColumn({
key: 'createdAt',
title: 'Created',
@ -44,7 +74,7 @@ export default class AnalyticsDataModel {
})
);
this.tableColumns.push(
columns.push(
new AnalyticsTableColumn({
key: 'updatedAt',
title: 'Updated',
@ -54,18 +84,22 @@ export default class AnalyticsDataModel {
})
);
this.tableColumns = this.tableColumns.concat(data.tableColumns);
}
if(!data.primaryKeys || data.primaryKeys.length === 0){
throw new BadDataException("Primary keys are required");
}
public toTableCreateStatement(): string {
return `CREATE TABLE IF NOT EXISTS ${this.tableName}
(
user_id UInt32,
message String,
timestamp DateTime,
metric Float32
)
ENGINE = MergeTree()
PRIMARY KEY (user_id, timestamp)`;
// check if primary keys are subset of tableColumns
console.log(columns);
data.primaryKeys.forEach((primaryKey) => {
if(!columns.find((column) => column.key === primaryKey)){
throw new BadDataException("Primary key "+primaryKey+" is not part of tableColumns");
}
});
this.primaryKeys = data.primaryKeys;
this.tableColumns = columns;
}
}

View File

@ -0,0 +1,5 @@
enum AnalyticsTableEngine {
MergeTree = 'MergeTree'
}
export default AnalyticsTableEngine;

View File

@ -7,6 +7,7 @@ import {
import Sleep from 'Common/Types/Sleep';
import { ClickHouseClient, createClient } from '@clickhouse/client';
import Stream from 'stream';
import DatabaseNotConnectedException from 'Common/Types/Exception/DatabaseNotConnectedException';
export type ClickhouseClient = ClickHouseClient<Stream.Readable>;
@ -43,6 +44,12 @@ export default class ClickhouseDatabase {
createClient(dataSourceOptions);
this.dataSource = clickhouseClient;
const result = await clickhouseClient.ping();
if(result.success === false){
throw new DatabaseNotConnectedException("Clickhouse Database is not connected")
}
logger.info(`Clickhouse Database Connected: ${dataSourceOptions.host?.toString()}`);
return clickhouseClient;

View File

@ -1 +1,104 @@
export default class AnalyticsDatabaseService {}
import TableColumnType from "Common/Types/BaseDatabase/TableColumnType";
import ClickhouseDatabase, { ClickhouseAppInstance, ClickhouseClient } from "../Infrastructure/ClickhouseDatabase";
import BaseService from "./BaseService";
import AnalyticsBaseModel from "Common/Models/AnalyticsBaseModel";
import BadDataException from "Common/Types/Exception/BadDataException";
export default class AnalyticsDatabaseService<TBaseModel extends AnalyticsBaseModel> extends BaseService {
public modelType!: { new(): TBaseModel };
public database!: ClickhouseDatabase;
public model!: TBaseModel;
public databaseClient!: ClickhouseClient;
public constructor(data: {
modelType: { new(): TBaseModel },
database?: ClickhouseDatabase | undefined;
}) {
super();
this.modelType = data.modelType;
this.model = new this.modelType();
if (data.database) {
this.database = data.database;
this.databaseClient = this.database.getDataSource() as ClickhouseClient;
}
}
public toTableCreateStatement(): string {
return `CREATE TABLE ${this.model.tableName} IF NOT EXISTS
(
${this.toColumnsCreateStatement()}
)
ENGINE = ${this.model.tableEngine}
PRIMARY KEY (${this.model.primaryKeys.map((key) => key).join(', ')}
`
}
public useDefaultDatabase(): void {
this.database = ClickhouseAppInstance;
this.databaseClient = this.database.getDataSource() as ClickhouseClient;
}
public async execute(query: string): Promise<void> {
if(!this.databaseClient){
this.useDefaultDatabase();
}
await this.databaseClient.exec({
query: query,
})
}
public toColumnsCreateStatement(): string {
let columns: string = '';
this.model.tableColumns.forEach((column) => {
columns += `${column.key} ${this.toColumnType(column.type)} ${column.required ? 'NOT NULL' : ' NULL'},\n`;
});
return columns;
}
public toColumnType(type: TableColumnType): string {
if (type === TableColumnType.ShortText) {
return 'String';
}
if (type === TableColumnType.LongText) {
return 'String';
}
if (type === TableColumnType.VeryLongText) {
return 'String';
}
if (type === TableColumnType.ObjectID) {
return 'String';
}
if (type === TableColumnType.Boolean) {
return 'Bool';
}
if (type === TableColumnType.Number) {
return 'Int32';
}
if (type === TableColumnType.BigNumber) {
return 'Int64';
}
if (type === TableColumnType.Date) {
return 'DateTime';
}
if (type === TableColumnType.Array) {
return 'Array';
}
throw new BadDataException("Unknown column type: " + type);
}
}

View File

@ -110,6 +110,9 @@ import UserNotificationSettingService from './UserNotificationSettingService';
import UserOnCallLogService from './UserOnCallLogService';
import UserOnCallLogTimelineService from './UserOnCallLogTimelineService';
import BaseService from './BaseService';
import LogService from './LogService';
import AnalyticsDatabaseService from './AnalyticsDatabaseService';
import AnalyticsBaseModel from 'Common/Models/AnalyticsBaseModel';
const services: Array<BaseService> = [
// Import all services in current folder here.
@ -217,4 +220,8 @@ const services: Array<BaseService> = [
WorkflowVariablesService,
];
export const AnalyticsServices: Array<AnalyticsDatabaseService<AnalyticsBaseModel>> = [
LogService
]
export default services;

View File

@ -0,0 +1,12 @@
import Log from 'Model/Models/Log';
import AnalyticsDatabaseService from './AnalyticsDatabaseService';
import ClickhouseDatabase from '../Infrastructure/ClickhouseDatabase';
export class Service extends AnalyticsDatabaseService<Log> {
public constructor(clickhouseDatabase?: ClickhouseDatabase | undefined) {
super({ modelType: Log, database: clickhouseDatabase });
}
}
export default new Service();

View File

@ -117,6 +117,10 @@ import IncidentTemplate from './IncidentTemplate';
import IncidentTemplateOwnerTeam from './IncidentTemplateOwnerTeam';
import IncidentTemplateOwnerUser from './IncidentTemplateOwnerUser';
import AnalyticsBaseModel from 'Common/Models/AnalyticsBaseModel';
import Log from './Log';
export default [
User,
Probe,
@ -218,3 +222,8 @@ export default [
IncidentTemplateOwnerTeam,
IncidentTemplateOwnerUser,
];
export const AnalyticsModels: Array<typeof AnalyticsBaseModel> = [
Log
];

39
Model/Models/Log.ts Normal file
View File

@ -0,0 +1,39 @@
import AnalyticsBaseModel from "Common/Models/AnalyticsBaseModel";
import AnalyticsTableColumn from "Common/Types/AnalyticsDatabase/TableColumn";
import TableColumnType from "Common/Types/BaseDatabase/TableColumnType";
import AnalyticsTableEngine from "Common/Types/AnalyticsDatabase/AnalyticsTableEngine";
export default class Log extends AnalyticsBaseModel {
public constructor() {
super({
tableName: 'Logs',
tableEngine: AnalyticsTableEngine.MergeTree,
tableColumns: [
new AnalyticsTableColumn({
key: 'projectId',
title: 'Project ID',
description: 'ID of project',
required: true,
type: TableColumnType.ObjectID,
}),
new AnalyticsTableColumn({
key: 'logContainerId',
title: 'Log Container ID',
description: 'ID of the log container',
required: true,
type: TableColumnType.ObjectID,
}),
new AnalyticsTableColumn({
key: 'logData',
title: 'Log Data',
description: 'Data of the log container',
required: true,
type: TableColumnType.VeryLongText,
}),
],
primaryKeys: [
'projectId', 'logContainerId', 'createdAt'
]
})
}
}

View File

@ -71,6 +71,7 @@ import './Jobs/UserOnCallLog/TimeoutStuckExecutions';
import './Jobs/IncomingRequestMonitor/CheckHeartbeat';
import { ClickhouseAppInstance } from 'CommonServer/Infrastructure/ClickhouseDatabase';
import AnalyticsTableManagement from './Utils/AnalyticsDatabase/TableManegement';
const APP_NAME: string = 'workers';
@ -96,6 +97,9 @@ const init: () => Promise<void> = async (): Promise<void> => {
);
await RunDatabaseMigrations();
// create tables in analytics database
await AnalyticsTableManagement.createTables();
// Job process.
QueueWorker.getWorker(

View File

@ -0,0 +1,11 @@
import { AnalyticsServices } from "CommonServer/Services/Index";
export default class AnalyticsTableManagement {
public static async createTables(): Promise<void> {
for(const service of AnalyticsServices){
// create a table if it does not exist
await service.execute(service.toTableCreateStatement());
}
}
}