From 90ba426766e0fc355a94820bd3c746a9a6ba7e65 Mon Sep 17 00:00:00 2001 From: chenos Date: Wed, 31 Jul 2024 13:28:47 +0800 Subject: [PATCH] feat: improve code --- .../src/__tests__/pub-sub-manager.test.ts | 41 +--- packages/core/server/src/pub-sub-manager.ts | 213 ------------------ .../src/pub-sub-manager/handler-manager.ts | 118 ++++++++++ .../core/server/src/pub-sub-manager/index.ts | 13 ++ .../src/pub-sub-manager/pub-sub-manager.ts | 104 +++++++++ .../core/server/src/pub-sub-manager/types.ts | 32 +++ .../test/src/server/memory-pub-sub-adapter.ts | 4 + 7 files changed, 274 insertions(+), 251 deletions(-) delete mode 100644 packages/core/server/src/pub-sub-manager.ts create mode 100644 packages/core/server/src/pub-sub-manager/handler-manager.ts create mode 100644 packages/core/server/src/pub-sub-manager/index.ts create mode 100644 packages/core/server/src/pub-sub-manager/pub-sub-manager.ts create mode 100644 packages/core/server/src/pub-sub-manager/types.ts diff --git a/packages/core/server/src/__tests__/pub-sub-manager.test.ts b/packages/core/server/src/__tests__/pub-sub-manager.test.ts index 5304e83c1e..cce18ab041 100644 --- a/packages/core/server/src/__tests__/pub-sub-manager.test.ts +++ b/packages/core/server/src/__tests__/pub-sub-manager.test.ts @@ -104,30 +104,12 @@ describe('skipSelf, unsubscribe, debounce', () => { pubSubManager.publish('test1', 'message2'); pubSubManager.publish('test1', 'message2'); await sleep(500); - //@ts-ignore - expect(pubSubManager['messageHandlers'].size).toBe(2); + expect(pubSubManager['handlerManager']['uniqueMessageHandlers'].size).toBe(2); await sleep(2000); - //@ts-ignore - expect(pubSubManager['messageHandlers'].size).toBe(0); + expect(pubSubManager['handlerManager']['uniqueMessageHandlers'].size).toBe(0); expect(mockListener).toBeCalledTimes(2); }); - test('debounce', async () => { - const mockListener = vi.fn(); - await pubSubManager.subscribeAll(mockListener, { debounce: 1000 }); - pubSubManager.publish('test1', 'message1'); - pubSubManager.publish('test1', 'message1'); - pubSubManager.publish('test1', 'message2'); - pubSubManager.publish('test1', 'message2'); - pubSubManager.publish('test2', 'message2'); - pubSubManager.publish('test2', 'message2'); - await sleep(500); - expect(pubSubManager['messageHandlers'].size).toBe(3); - await sleep(2000); - expect(pubSubManager['messageHandlers'].size).toBe(0); - expect(mockListener).toBeCalledTimes(3); - }); - test('message format', async () => { const mockListener = vi.fn(); await pubSubManager.subscribe('test1', mockListener); @@ -164,22 +146,6 @@ describe('skipSelf, unsubscribe, debounce', () => { await pubSubManager.publish('test1', 'message1'); expect(mockListener).toBeCalledTimes(1); }); - - test('subscribeAll + skipSelf: true', async () => { - const mockListener = vi.fn(); - await pubSubManager.subscribeAll(mockListener); - await pubSubManager.publish('test1', 'message1'); - expect(mockListener).toHaveBeenCalled(); - expect(mockListener).toBeCalledTimes(1); - expect(mockListener).toHaveBeenCalledWith('test1', 'message1'); - }); - - test('publish + skipSelf: false', async () => { - const mockListener = vi.fn(); - await pubSubManager.subscribeAll(mockListener); - await pubSubManager.publish('test1', 'message1', { skipSelf: true }); - expect(mockListener).not.toHaveBeenCalled(); - }); }); describe('Pub/Sub', () => { @@ -249,8 +215,7 @@ describe('app.pubSubManager', () => { }); test('adapter', async () => { - expect(pubSubManager.connected).toBe(true); - expect(pubSubManager.adapter).toBeInstanceOf(MemoryPubSubAdapter); + expect(await pubSubManager.isConnected()).toBe(true); }); test('subscribe + publish', async () => { diff --git a/packages/core/server/src/pub-sub-manager.ts b/packages/core/server/src/pub-sub-manager.ts deleted file mode 100644 index ce4960bc42..0000000000 --- a/packages/core/server/src/pub-sub-manager.ts +++ /dev/null @@ -1,213 +0,0 @@ -/** - * This file is part of the NocoBase (R) project. - * Copyright (c) 2020-2024 NocoBase Co., Ltd. - * Authors: NocoBase Team. - * - * This project is dual-licensed under AGPL-3.0 and NocoBase Commercial License. - * For more information, please refer to: https://www.nocobase.com/agreement. - */ - -import { uid } from '@nocobase/utils'; -import crypto from 'crypto'; -import _ from 'lodash'; -import Application from './application'; - -export interface PubSubManagerOptions { - channelPrefix?: string; -} - -export interface PubSubManagerPublishOptions { - skipSelf?: boolean; - onlySelf?: boolean; -} - -export interface PubSubManagerSubscribeOptions { - debounce?: number; -} - -export const createPubSubManager = (app: Application, options: PubSubManagerOptions) => { - const pubSubManager = new PubSubManager(options); - app.on('afterStart', async () => { - await pubSubManager.connect(); - }); - app.on('afterStop', async () => { - await pubSubManager.close(); - }); - return pubSubManager; -}; - -export class PubSubManager { - adapter: IPubSubAdapter; - messageHandlers = new Map(); - subscribes = new Map(); - publisherId: string; - - constructor(protected options: PubSubManagerOptions = {}) { - this.publisherId = uid(); - } - - get channelPrefix() { - return this.options?.channelPrefix ? `${this.options.channelPrefix}.` : ''; - } - - get connected() { - return this.adapter?.connected || false; - } - - setAdapter(adapter: IPubSubAdapter) { - this.adapter = adapter; - } - - async connect() { - if (!this.adapter) { - return; - } - await this.adapter.connect(); - for (const [channel, callbacks] of this.subscribes) { - for (const [, fn] of callbacks) { - await this.adapter.subscribe(`${this.channelPrefix}${channel}`, fn); - } - } - } - - async close() { - if (!this.adapter) { - return; - } - return await this.adapter.close(); - } - - async getMessageHash(message) { - const encoder = new TextEncoder(); - const data = encoder.encode(JSON.stringify(message)); - const hashBuffer = await crypto.subtle.digest('SHA-256', data); - const hashArray = Array.from(new Uint8Array(hashBuffer)); - const hashHex = hashArray.map((b) => b.toString(16).padStart(2, '0')).join(''); - return hashHex; - } - - async subscribe(channel: string, callback, options: PubSubManagerSubscribeOptions = {}) { - const { debounce = 0 } = options; - const wrappedCallback = async (wrappedMessage) => { - const json = JSON.parse(wrappedMessage); - if (!this.verifyMessage(json)) { - return; - } - await this.handleMessage({ channel, message: json.message, debounce, callback }); - }; - if (!this.subscribes.has(channel)) { - const map = new Map(); - this.subscribes.set(channel, map); - } - const map: Map = this.subscribes.get(channel); - const previous = map.get(callback); - if (previous) { - await this.adapter.unsubscribe(`${this.channelPrefix}${channel}`, previous); - } - map.set(callback, wrappedCallback); - if (this.connected) { - await this.adapter.subscribe(`${this.channelPrefix}${channel}`, wrappedCallback); - } - } - - async unsubscribe(channel, callback) { - const map: Map = this.subscribes.get(channel); - let fn = null; - if (map) { - fn = map.get(callback); - map.delete(callback); - } - if (!this.adapter || !fn) { - return; - } - return this.adapter.unsubscribe(`${this.channelPrefix}${channel}`, fn); - } - - async publish(channel, message, options?: PubSubManagerPublishOptions) { - if (!this.adapter) { - return; - } - - const wrappedMessage = JSON.stringify({ - publisherId: this.publisherId, - ...options, - message: message, - }); - - return this.adapter.publish(`${this.channelPrefix}${channel}`, wrappedMessage); - } - - async subscribeAll(callback, options: PubSubManagerSubscribeOptions = {}) { - if (!this.adapter) { - return; - } - const { debounce = 0 } = options; - return this.adapter.subscribeAll(async (channel: string, wrappedMessage) => { - if (!channel.startsWith(this.channelPrefix)) { - return; - } - const json = JSON.parse(wrappedMessage); - if (!this.verifyMessage(json)) { - return; - } - const realChannel = channel.substring(this.channelPrefix.length); - await this.handleMessage({ - callback, - debounce, - subscribeAll: true, - channel: realChannel, - message: json.message, - }); - }); - } - - protected async handleMessage({ channel, message, callback, debounce, subscribeAll = false }) { - const args = subscribeAll ? [channel, message] : [message]; - if (!debounce) { - await callback(...args); - return; - } - const prefix = subscribeAll ? '__subscribe_all__' : '__subscribe__'; - const messageHash = prefix + channel + (await this.getMessageHash(message)); - if (!this.messageHandlers.has(messageHash)) { - this.messageHandlers.set(messageHash, this.debounce(callback, debounce)); - } - const handleMessage = this.messageHandlers.get(messageHash); - try { - const args = subscribeAll ? [channel, message] : [message]; - await handleMessage(...args); - setTimeout(() => { - this.messageHandlers.delete(messageHash); - }, debounce); - } catch (error) { - this.messageHandlers.delete(messageHash); - throw error; - } - } - - protected verifyMessage({ onlySelf, skipSelf, publisherId }) { - if (onlySelf && publisherId !== this.publisherId) { - return; - } else if (!onlySelf && skipSelf && publisherId === this.publisherId) { - return; - } - return true; - } - - protected debounce(func, wait: number) { - if (wait) { - return _.debounce(func, wait); - } - return func; - } -} - -export interface IPubSubAdapter { - connected?: boolean; - connect(): Promise; - close(): Promise; - subscribe(channel: string, callback): Promise; - unsubscribe(channel: string, callback): Promise; - publish(channel: string, message): Promise; - subscribeAll(callback): Promise; -} diff --git a/packages/core/server/src/pub-sub-manager/handler-manager.ts b/packages/core/server/src/pub-sub-manager/handler-manager.ts new file mode 100644 index 0000000000..efeb6d54b8 --- /dev/null +++ b/packages/core/server/src/pub-sub-manager/handler-manager.ts @@ -0,0 +1,118 @@ +/** + * This file is part of the NocoBase (R) project. + * Copyright (c) 2020-2024 NocoBase Co., Ltd. + * Authors: NocoBase Team. + * + * This project is dual-licensed under AGPL-3.0 and NocoBase Commercial License. + * For more information, please refer to: https://www.nocobase.com/agreement. + */ + +import _ from 'lodash'; +import { type PubSubManagerSubscribeOptions } from './types'; + +export class HandlerManager { + headlers: Map; + uniqueMessageHandlers: Map; + + constructor(protected publisherId: string) { + this.reset(); + } + + protected async getMessageHash(message) { + const encoder = new TextEncoder(); + const data = encoder.encode(JSON.stringify(message)); + const hashBuffer = await crypto.subtle.digest('SHA-256', data); + const hashArray = Array.from(new Uint8Array(hashBuffer)); + const hashHex = hashArray.map((b) => b.toString(16).padStart(2, '0')).join(''); + return hashHex; + } + + protected verifyMessage({ onlySelf, skipSelf, publisherId }) { + if (onlySelf && publisherId !== this.publisherId) { + return; + } else if (!onlySelf && skipSelf && publisherId === this.publisherId) { + return; + } + return true; + } + + protected debounce(func, wait: number) { + if (wait) { + return _.debounce(func, wait); + } + return func; + } + + async handleMessage({ channel, message, callback, debounce }) { + if (!debounce) { + await callback(message); + return; + } + const messageHash = channel + (await this.getMessageHash(message)); + if (!this.uniqueMessageHandlers.has(messageHash)) { + this.uniqueMessageHandlers.set(messageHash, this.debounce(callback, debounce)); + } + const handler = this.uniqueMessageHandlers.get(messageHash); + try { + await handler(message); + setTimeout(() => { + this.uniqueMessageHandlers.delete(messageHash); + }, debounce); + } catch (error) { + this.uniqueMessageHandlers.delete(messageHash); + throw error; + } + } + + wrapper(channel, callback, options) { + const { debounce = 0 } = options; + return async (wrappedMessage) => { + const json = JSON.parse(wrappedMessage); + if (!this.verifyMessage(json)) { + return; + } + await this.handleMessage({ channel, message: json.message, debounce, callback }); + }; + } + + set(channel: string, callback, options: PubSubManagerSubscribeOptions) { + if (!this.headlers.has(channel)) { + this.headlers.set(channel, new Map()); + } + const headlerMap = this.headlers.get(channel); + const headler = this.wrapper(channel, callback, options); + headlerMap.set(callback, headler); + return headler; + } + + get(channel: string, callback) { + const headlerMap = this.headlers.get(channel); + if (!headlerMap) { + return; + } + return headlerMap.get(callback); + } + + delete(channel: string, callback) { + const headlerMap = this.headlers.get(channel); + if (!headlerMap) { + return; + } + const headler = headlerMap.get(callback); + headlerMap.delete(callback); + return headler; + } + + reset() { + this.headlers = new Map(); + this.uniqueMessageHandlers = new Map(); + } + + async each(callback) { + for (const [channel, headlerMap] of this.headlers) { + for (const headler of headlerMap.values()) { + await callback(channel, headler); + } + } + } +} diff --git a/packages/core/server/src/pub-sub-manager/index.ts b/packages/core/server/src/pub-sub-manager/index.ts new file mode 100644 index 0000000000..d19a1603bb --- /dev/null +++ b/packages/core/server/src/pub-sub-manager/index.ts @@ -0,0 +1,13 @@ +/** + * This file is part of the NocoBase (R) project. + * Copyright (c) 2020-2024 NocoBase Co., Ltd. + * Authors: NocoBase Team. + * + * This project is dual-licensed under AGPL-3.0 and NocoBase Commercial License. + * For more information, please refer to: https://www.nocobase.com/agreement. + */ + +export * from './handler-manager'; +export * from './pub-sub-manager'; + +export * from './types'; diff --git a/packages/core/server/src/pub-sub-manager/pub-sub-manager.ts b/packages/core/server/src/pub-sub-manager/pub-sub-manager.ts new file mode 100644 index 0000000000..5006aa4bf4 --- /dev/null +++ b/packages/core/server/src/pub-sub-manager/pub-sub-manager.ts @@ -0,0 +1,104 @@ +/** + * This file is part of the NocoBase (R) project. + * Copyright (c) 2020-2024 NocoBase Co., Ltd. + * Authors: NocoBase Team. + * + * This project is dual-licensed under AGPL-3.0 and NocoBase Commercial License. + * For more information, please refer to: https://www.nocobase.com/agreement. + */ + +import { uid } from '@nocobase/utils'; +import Application from '../application'; +import { HandlerManager } from './handler-manager'; +import { + type IPubSubAdapter, + type PubSubManagerOptions, + type PubSubManagerPublishOptions, + type PubSubManagerSubscribeOptions, +} from './types'; + +export const createPubSubManager = (app: Application, options: PubSubManagerOptions) => { + const pubSubManager = new PubSubManager(options); + app.on('afterStart', async () => { + await pubSubManager.connect(); + }); + app.on('afterStop', async () => { + await pubSubManager.close(); + }); + return pubSubManager; +}; + +export class PubSubManager { + protected publisherId: string; + protected adapter: IPubSubAdapter; + protected handlerManager: HandlerManager; + + constructor(protected options: PubSubManagerOptions = {}) { + this.publisherId = uid(); + this.handlerManager = new HandlerManager(this.publisherId); + } + + get channelPrefix() { + return this.options?.channelPrefix ? `${this.options.channelPrefix}.` : ''; + } + + setAdapter(adapter: IPubSubAdapter) { + this.adapter = adapter; + } + + async isConnected() { + return this.adapter?.isConnected(); + } + + async connect() { + if (!this.adapter) { + return; + } + await this.adapter.connect(); + // 如果没连接前添加的订阅,连接后需要把订阅添加上 + await this.handlerManager.each(async (channel, headler) => { + await this.adapter.subscribe(`${this.channelPrefix}${channel}`, headler); + }); + } + + async close() { + if (!this.adapter) { + return; + } + return await this.adapter.close(); + } + + async subscribe(channel: string, callback, options: PubSubManagerSubscribeOptions = {}) { + // 先退订,防止重复订阅 + await this.unsubscribe(channel, callback); + const handler = this.handlerManager.set(channel, callback, options); + // 连接之后才能订阅 + if (await this.adapter.isConnected()) { + await this.adapter.subscribe(`${this.channelPrefix}${channel}`, handler); + } + } + + async unsubscribe(channel, callback) { + const handler = this.handlerManager.delete(channel, callback); + + if (!this.adapter || !handler) { + return; + } + + return this.adapter.unsubscribe(`${this.channelPrefix}${channel}`, handler); + } + + async publish(channel, message, options?: PubSubManagerPublishOptions) { + if (!this.adapter) { + return; + } + + const wrappedMessage = JSON.stringify({ + publisherId: this.publisherId, + ...options, + message: message, + }); + + return this.adapter.publish(`${this.channelPrefix}${channel}`, wrappedMessage); + } +} diff --git a/packages/core/server/src/pub-sub-manager/types.ts b/packages/core/server/src/pub-sub-manager/types.ts new file mode 100644 index 0000000000..0fca8cfb66 --- /dev/null +++ b/packages/core/server/src/pub-sub-manager/types.ts @@ -0,0 +1,32 @@ +/** + * This file is part of the NocoBase (R) project. + * Copyright (c) 2020-2024 NocoBase Co., Ltd. + * Authors: NocoBase Team. + * + * This project is dual-licensed under AGPL-3.0 and NocoBase Commercial License. + * For more information, please refer to: https://www.nocobase.com/agreement. + */ + +export interface PubSubManagerOptions { + channelPrefix?: string; +} + +export interface PubSubManagerPublishOptions { + skipSelf?: boolean; + onlySelf?: boolean; +} + +export interface PubSubManagerSubscribeOptions { + debounce?: number; +} + +export type PubSubCallback = (message: any) => Promise; + +export interface IPubSubAdapter { + isConnected(): Promise; + connect(): Promise; + close(): Promise; + subscribe(channel: string, callback: PubSubCallback): Promise; + unsubscribe(channel: string, callback: PubSubCallback): Promise; + publish(channel: string, message: any): Promise; +} diff --git a/packages/core/test/src/server/memory-pub-sub-adapter.ts b/packages/core/test/src/server/memory-pub-sub-adapter.ts index f9cf0b4993..0320dc5630 100644 --- a/packages/core/test/src/server/memory-pub-sub-adapter.ts +++ b/packages/core/test/src/server/memory-pub-sub-adapter.ts @@ -47,6 +47,10 @@ export class MemoryPubSubAdapter implements IPubSubAdapter { this.connected = false; } + async isConnected() { + return this.connected; + } + async subscribe(channel, callback) { this.emitter.on(channel, callback); }