refactor(server): simplify api for publishing sync message (#4912)
Some checks failed
Build Docker Image / build-and-push (push) Waiting to run
Build Pro Image / build-and-push (push) Waiting to run
E2E / Build (push) Waiting to run
E2E / Core and plugins (push) Blocked by required conditions
E2E / plugin-workflow (push) Blocked by required conditions
E2E / plugin-workflow-approval (push) Blocked by required conditions
E2E / plugin-data-source-main (push) Blocked by required conditions
E2E / Comment on PR (push) Blocked by required conditions
NocoBase FrontEnd Test / frontend-test (18) (push) Waiting to run
NocoBase Backend Test / sqlite-test (20, false) (push) Has been cancelled
NocoBase Backend Test / sqlite-test (20, true) (push) Has been cancelled
NocoBase Backend Test / postgres-test (public, 20, nocobase, false) (push) Has been cancelled
NocoBase Backend Test / postgres-test (public, 20, nocobase, true) (push) Has been cancelled
NocoBase Backend Test / postgres-test (public, 20, public, false) (push) Has been cancelled
NocoBase Backend Test / postgres-test (public, 20, public, true) (push) Has been cancelled
NocoBase Backend Test / postgres-test (user_schema, 20, nocobase, false) (push) Has been cancelled
NocoBase Backend Test / postgres-test (user_schema, 20, nocobase, true) (push) Has been cancelled
NocoBase Backend Test / postgres-test (user_schema, 20, public, false) (push) Has been cancelled
NocoBase Backend Test / postgres-test (user_schema, 20, public, true) (push) Has been cancelled
NocoBase Backend Test / mysql-test (20, false) (push) Has been cancelled
NocoBase Backend Test / mysql-test (20, true) (push) Has been cancelled
NocoBase Backend Test / mariadb-test (20, false) (push) Has been cancelled
NocoBase Backend Test / mariadb-test (20, true) (push) Has been cancelled
Test on Windows / build (push) Has been cancelled

* refactor(server): simplify method invoking for publishing sync message

* feat(test): add mock-cluster for testing cluster cases

* refactor(server): simplify sync api

* refactor(server): simplify type of sync-manager
This commit is contained in:
Junyi 2024-07-20 23:26:50 +08:00 committed by GitHub
parent 2886b60b82
commit d5ff3167c9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 114 additions and 13 deletions

View File

@ -138,7 +138,15 @@ export abstract class Plugin<O = any> implements PluginInterface {
* Fired when a sync message is received. * Fired when a sync message is received.
* @experimental * @experimental
*/ */
onSync(message: SyncMessageData): Promise<void> | void {} onSync(message: SyncMessageData = {}): Promise<void> | void {}
/**
* Publish a sync message.
* @experimental
*/
sync(message?: SyncMessageData) {
this.app.syncManager.publish(this.name, message);
}
/** /**
* @deprecated * @deprecated

View File

@ -32,7 +32,6 @@ export type SyncMessage = {
*/ */
export class SyncManager { export class SyncManager {
private nodeId: string; private nodeId: string;
private app: Application;
private eventEmitter = new EventEmitter(); private eventEmitter = new EventEmitter();
private adapter: SyncAdapter = null; private adapter: SyncAdapter = null;
private incomingBuffer: SyncMessageData[] = []; private incomingBuffer: SyncMessageData[] = [];
@ -82,8 +81,7 @@ export class SyncManager {
} }
}; };
constructor(app: Application) { constructor(private app: Application) {
this.app = app;
this.nodeId = `${process.env.NODE_ID || randomUUID()}-${process.pid}`; this.nodeId = `${process.env.NODE_ID || randomUUID()}-${process.pid}`;
} }
@ -112,7 +110,7 @@ export class SyncManager {
/** /**
* Publish a message to the sync manager * Publish a message to the sync manager
*/ */
public publish(namespace: string, data: SyncMessageData) { public publish(namespace: string, data: SyncMessageData = {}) {
if (!this.adapter) { if (!this.adapter) {
return; return;
} }

View File

@ -12,7 +12,8 @@ import ws from 'ws';
export { mockDatabase, MockDatabase } from '@nocobase/database'; export { mockDatabase, MockDatabase } from '@nocobase/database';
export { default as supertest } from 'supertest'; export { default as supertest } from 'supertest';
export * from './mockServer'; export * from './mock-server';
export * from './mock-cluster';
export const pgOnly: () => any = () => (process.env.DB_DIALECT == 'postgres' ? describe : describe.skip); export const pgOnly: () => any = () => (process.env.DB_DIALECT == 'postgres' ? describe : describe.skip);
export const isPg = () => process.env.DB_DIALECT == 'postgres'; export const isPg = () => process.env.DB_DIALECT == 'postgres';
@ -22,9 +23,13 @@ export function randomStr() {
return Math.random().toString(36).substring(2); return Math.random().toString(36).substring(2);
} }
export const waitSecond = async (timeout = 1000) => { export function sleep(ms = 1000) {
await new Promise((resolve) => setTimeout(resolve, timeout)); return new Promise((resolve) => {
}; setTimeout(resolve, ms);
});
}
export const waitSecond = sleep;
export const startServerWithRandomPort = async (startServer) => { export const startServerWithRandomPort = async (startServer) => {
return await new Promise((resolve) => { return await new Promise((resolve) => {

View File

@ -0,0 +1,91 @@
/**
* This file is part of the NocoBase (R) project.
* Copyright (c) 2020-2024 NocoBase Co., Ltd.
* Authors: NocoBase Team.
*
* This project is dual-licensed under AGPL-3.0 and NocoBase Commercial License.
* For more information, please refer to: https://www.nocobase.com/agreement.
*/
import Path from 'node:path';
import { spawn, ChildProcess } from 'node:child_process';
import { getPortPromise } from 'portfinder';
import { uid } from '@nocobase/utils';
import { createMockServer } from './mock-server';
type ProcessConfig = {
env: Record<string, any>;
};
type ClusterOptions = {
script: string;
config: ProcessConfig;
instances: number;
plugins?: string[];
};
export class MockCluster {
private processes = [];
private mockApp;
constructor(private options: ClusterOptions) {}
async start(): Promise<number[]> {
// NOTE: use this for install app first
this.mockApp = await createMockServer({
plugins: this.options.plugins,
});
this.processes = [];
const ports = [];
for (let i = 0; i < this.options.instances; i++) {
const port = await getPortPromise();
const childProcess = spawn('node', ['./node_modules/tsx/dist/cli.mjs', this.options.script, 'start'], {
env: {
...process.env,
...this.options.config.env,
APP_PORT: `${port}`,
APPEND_PRESET_BUILT_IN_PLUGINS: (this.options.plugins ?? []).join(','),
SOCKET_PATH: `storage/tests/gateway-cluster-${uid()}.sock`,
PM2_HOME: Path.resolve(process.cwd(), `storage/tests/.pm2-${uid()}`),
},
});
await new Promise<ChildProcess>((resolve, reject) => {
const startTimer = setTimeout(() => reject(new Error('app not started in 10s')), 10000);
childProcess.stdout.on('data', (data) => {
console.log(data.toString());
if (data.toString().includes('app has been started')) {
clearTimeout(startTimer);
resolve(childProcess);
}
});
});
this.processes.push({
childProcess,
port,
});
ports.push(port);
}
return ports;
}
async stop() {
await this.mockApp.destroy();
return Promise.all(
this.processes.map(({ childProcess }) => {
const promise = new Promise((resolve) => {
childProcess.on('exit', resolve);
});
childProcess.kill();
return promise;
}),
);
}
}

View File

@ -106,14 +106,14 @@ export default class PluginFileManagerServer extends Plugin {
const Storage = this.db.getModel('storages'); const Storage = this.db.getModel('storages');
Storage.afterSave((m) => { Storage.afterSave((m) => {
this.storagesCache.set(m.id, m.toJSON()); this.storagesCache.set(m.id, m.toJSON());
this.app.syncManager.publish(this.name, { this.sync({
type: 'storageChange', type: 'storageChange',
storageId: `${m.id}`, storageId: `${m.id}`,
}); });
}); });
Storage.afterDestroy((m) => { Storage.afterDestroy((m) => {
this.storagesCache.delete(m.id); this.storagesCache.delete(m.id);
this.app.syncManager.publish(this.name, { this.sync({
type: 'storageRemove', type: 'storageRemove',
storageId: `${m.id}`, storageId: `${m.id}`,
}); });

View File

@ -15,7 +15,6 @@ import { MockServer, createMockServer, mockDatabase } from '@nocobase/test';
import functions from './functions'; import functions from './functions';
import triggers from './triggers'; import triggers from './triggers';
import instructions from './instructions'; import instructions from './instructions';
import { Resourcer } from '@nocobase/resourcer';
import { SequelizeDataSource } from '@nocobase/data-source-manager'; import { SequelizeDataSource } from '@nocobase/data-source-manager';
import { uid } from '@nocobase/utils'; import { uid } from '@nocobase/utils';

View File

@ -342,7 +342,7 @@ export default class PluginWorkflowServer extends Plugin {
this.enabledCache.delete(workflow.id); this.enabledCache.delete(workflow.id);
} }
if (!silent) { if (!silent) {
this.app.syncManager.publish(this.name, { this.sync({
type: 'statusChange', type: 'statusChange',
workflowId: `${workflow.id}`, workflowId: `${workflow.id}`,
enabled: `${Number(next)}`, enabled: `${Number(next)}`,