feat: improve code

This commit is contained in:
chenos 2024-07-31 13:28:47 +08:00 committed by mytharcher
parent 2f9f7c7392
commit 90ba426766
7 changed files with 274 additions and 251 deletions

View File

@ -104,30 +104,12 @@ describe('skipSelf, unsubscribe, debounce', () => {
pubSubManager.publish('test1', 'message2');
pubSubManager.publish('test1', 'message2');
await sleep(500);
//@ts-ignore
expect(pubSubManager['messageHandlers'].size).toBe(2);
expect(pubSubManager['handlerManager']['uniqueMessageHandlers'].size).toBe(2);
await sleep(2000);
//@ts-ignore
expect(pubSubManager['messageHandlers'].size).toBe(0);
expect(pubSubManager['handlerManager']['uniqueMessageHandlers'].size).toBe(0);
expect(mockListener).toBeCalledTimes(2);
});
test('debounce', async () => {
const mockListener = vi.fn();
await pubSubManager.subscribeAll(mockListener, { debounce: 1000 });
pubSubManager.publish('test1', 'message1');
pubSubManager.publish('test1', 'message1');
pubSubManager.publish('test1', 'message2');
pubSubManager.publish('test1', 'message2');
pubSubManager.publish('test2', 'message2');
pubSubManager.publish('test2', 'message2');
await sleep(500);
expect(pubSubManager['messageHandlers'].size).toBe(3);
await sleep(2000);
expect(pubSubManager['messageHandlers'].size).toBe(0);
expect(mockListener).toBeCalledTimes(3);
});
test('message format', async () => {
const mockListener = vi.fn();
await pubSubManager.subscribe('test1', mockListener);
@ -164,22 +146,6 @@ describe('skipSelf, unsubscribe, debounce', () => {
await pubSubManager.publish('test1', 'message1');
expect(mockListener).toBeCalledTimes(1);
});
test('subscribeAll + skipSelf: true', async () => {
const mockListener = vi.fn();
await pubSubManager.subscribeAll(mockListener);
await pubSubManager.publish('test1', 'message1');
expect(mockListener).toHaveBeenCalled();
expect(mockListener).toBeCalledTimes(1);
expect(mockListener).toHaveBeenCalledWith('test1', 'message1');
});
test('publish + skipSelf: false', async () => {
const mockListener = vi.fn();
await pubSubManager.subscribeAll(mockListener);
await pubSubManager.publish('test1', 'message1', { skipSelf: true });
expect(mockListener).not.toHaveBeenCalled();
});
});
describe('Pub/Sub', () => {
@ -249,8 +215,7 @@ describe('app.pubSubManager', () => {
});
test('adapter', async () => {
expect(pubSubManager.connected).toBe(true);
expect(pubSubManager.adapter).toBeInstanceOf(MemoryPubSubAdapter);
expect(await pubSubManager.isConnected()).toBe(true);
});
test('subscribe + publish', async () => {

View File

@ -1,213 +0,0 @@
/**
* This file is part of the NocoBase (R) project.
* Copyright (c) 2020-2024 NocoBase Co., Ltd.
* Authors: NocoBase Team.
*
* This project is dual-licensed under AGPL-3.0 and NocoBase Commercial License.
* For more information, please refer to: https://www.nocobase.com/agreement.
*/
import { uid } from '@nocobase/utils';
import crypto from 'crypto';
import _ from 'lodash';
import Application from './application';
export interface PubSubManagerOptions {
channelPrefix?: string;
}
export interface PubSubManagerPublishOptions {
skipSelf?: boolean;
onlySelf?: boolean;
}
export interface PubSubManagerSubscribeOptions {
debounce?: number;
}
export const createPubSubManager = (app: Application, options: PubSubManagerOptions) => {
const pubSubManager = new PubSubManager(options);
app.on('afterStart', async () => {
await pubSubManager.connect();
});
app.on('afterStop', async () => {
await pubSubManager.close();
});
return pubSubManager;
};
export class PubSubManager {
adapter: IPubSubAdapter;
messageHandlers = new Map();
subscribes = new Map();
publisherId: string;
constructor(protected options: PubSubManagerOptions = {}) {
this.publisherId = uid();
}
get channelPrefix() {
return this.options?.channelPrefix ? `${this.options.channelPrefix}.` : '';
}
get connected() {
return this.adapter?.connected || false;
}
setAdapter(adapter: IPubSubAdapter) {
this.adapter = adapter;
}
async connect() {
if (!this.adapter) {
return;
}
await this.adapter.connect();
for (const [channel, callbacks] of this.subscribes) {
for (const [, fn] of callbacks) {
await this.adapter.subscribe(`${this.channelPrefix}${channel}`, fn);
}
}
}
async close() {
if (!this.adapter) {
return;
}
return await this.adapter.close();
}
async getMessageHash(message) {
const encoder = new TextEncoder();
const data = encoder.encode(JSON.stringify(message));
const hashBuffer = await crypto.subtle.digest('SHA-256', data);
const hashArray = Array.from(new Uint8Array(hashBuffer));
const hashHex = hashArray.map((b) => b.toString(16).padStart(2, '0')).join('');
return hashHex;
}
async subscribe(channel: string, callback, options: PubSubManagerSubscribeOptions = {}) {
const { debounce = 0 } = options;
const wrappedCallback = async (wrappedMessage) => {
const json = JSON.parse(wrappedMessage);
if (!this.verifyMessage(json)) {
return;
}
await this.handleMessage({ channel, message: json.message, debounce, callback });
};
if (!this.subscribes.has(channel)) {
const map = new Map();
this.subscribes.set(channel, map);
}
const map: Map<any, any> = this.subscribes.get(channel);
const previous = map.get(callback);
if (previous) {
await this.adapter.unsubscribe(`${this.channelPrefix}${channel}`, previous);
}
map.set(callback, wrappedCallback);
if (this.connected) {
await this.adapter.subscribe(`${this.channelPrefix}${channel}`, wrappedCallback);
}
}
async unsubscribe(channel, callback) {
const map: Map<any, any> = this.subscribes.get(channel);
let fn = null;
if (map) {
fn = map.get(callback);
map.delete(callback);
}
if (!this.adapter || !fn) {
return;
}
return this.adapter.unsubscribe(`${this.channelPrefix}${channel}`, fn);
}
async publish(channel, message, options?: PubSubManagerPublishOptions) {
if (!this.adapter) {
return;
}
const wrappedMessage = JSON.stringify({
publisherId: this.publisherId,
...options,
message: message,
});
return this.adapter.publish(`${this.channelPrefix}${channel}`, wrappedMessage);
}
async subscribeAll(callback, options: PubSubManagerSubscribeOptions = {}) {
if (!this.adapter) {
return;
}
const { debounce = 0 } = options;
return this.adapter.subscribeAll(async (channel: string, wrappedMessage) => {
if (!channel.startsWith(this.channelPrefix)) {
return;
}
const json = JSON.parse(wrappedMessage);
if (!this.verifyMessage(json)) {
return;
}
const realChannel = channel.substring(this.channelPrefix.length);
await this.handleMessage({
callback,
debounce,
subscribeAll: true,
channel: realChannel,
message: json.message,
});
});
}
protected async handleMessage({ channel, message, callback, debounce, subscribeAll = false }) {
const args = subscribeAll ? [channel, message] : [message];
if (!debounce) {
await callback(...args);
return;
}
const prefix = subscribeAll ? '__subscribe_all__' : '__subscribe__';
const messageHash = prefix + channel + (await this.getMessageHash(message));
if (!this.messageHandlers.has(messageHash)) {
this.messageHandlers.set(messageHash, this.debounce(callback, debounce));
}
const handleMessage = this.messageHandlers.get(messageHash);
try {
const args = subscribeAll ? [channel, message] : [message];
await handleMessage(...args);
setTimeout(() => {
this.messageHandlers.delete(messageHash);
}, debounce);
} catch (error) {
this.messageHandlers.delete(messageHash);
throw error;
}
}
protected verifyMessage({ onlySelf, skipSelf, publisherId }) {
if (onlySelf && publisherId !== this.publisherId) {
return;
} else if (!onlySelf && skipSelf && publisherId === this.publisherId) {
return;
}
return true;
}
protected debounce(func, wait: number) {
if (wait) {
return _.debounce(func, wait);
}
return func;
}
}
export interface IPubSubAdapter {
connected?: boolean;
connect(): Promise<any>;
close(): Promise<any>;
subscribe(channel: string, callback): Promise<any>;
unsubscribe(channel: string, callback): Promise<any>;
publish(channel: string, message): Promise<any>;
subscribeAll(callback): Promise<any>;
}

View File

@ -0,0 +1,118 @@
/**
* This file is part of the NocoBase (R) project.
* Copyright (c) 2020-2024 NocoBase Co., Ltd.
* Authors: NocoBase Team.
*
* This project is dual-licensed under AGPL-3.0 and NocoBase Commercial License.
* For more information, please refer to: https://www.nocobase.com/agreement.
*/
import _ from 'lodash';
import { type PubSubManagerSubscribeOptions } from './types';
export class HandlerManager {
headlers: Map<any, any>;
uniqueMessageHandlers: Map<any, any>;
constructor(protected publisherId: string) {
this.reset();
}
protected async getMessageHash(message) {
const encoder = new TextEncoder();
const data = encoder.encode(JSON.stringify(message));
const hashBuffer = await crypto.subtle.digest('SHA-256', data);
const hashArray = Array.from(new Uint8Array(hashBuffer));
const hashHex = hashArray.map((b) => b.toString(16).padStart(2, '0')).join('');
return hashHex;
}
protected verifyMessage({ onlySelf, skipSelf, publisherId }) {
if (onlySelf && publisherId !== this.publisherId) {
return;
} else if (!onlySelf && skipSelf && publisherId === this.publisherId) {
return;
}
return true;
}
protected debounce(func, wait: number) {
if (wait) {
return _.debounce(func, wait);
}
return func;
}
async handleMessage({ channel, message, callback, debounce }) {
if (!debounce) {
await callback(message);
return;
}
const messageHash = channel + (await this.getMessageHash(message));
if (!this.uniqueMessageHandlers.has(messageHash)) {
this.uniqueMessageHandlers.set(messageHash, this.debounce(callback, debounce));
}
const handler = this.uniqueMessageHandlers.get(messageHash);
try {
await handler(message);
setTimeout(() => {
this.uniqueMessageHandlers.delete(messageHash);
}, debounce);
} catch (error) {
this.uniqueMessageHandlers.delete(messageHash);
throw error;
}
}
wrapper(channel, callback, options) {
const { debounce = 0 } = options;
return async (wrappedMessage) => {
const json = JSON.parse(wrappedMessage);
if (!this.verifyMessage(json)) {
return;
}
await this.handleMessage({ channel, message: json.message, debounce, callback });
};
}
set(channel: string, callback, options: PubSubManagerSubscribeOptions) {
if (!this.headlers.has(channel)) {
this.headlers.set(channel, new Map());
}
const headlerMap = this.headlers.get(channel);
const headler = this.wrapper(channel, callback, options);
headlerMap.set(callback, headler);
return headler;
}
get(channel: string, callback) {
const headlerMap = this.headlers.get(channel);
if (!headlerMap) {
return;
}
return headlerMap.get(callback);
}
delete(channel: string, callback) {
const headlerMap = this.headlers.get(channel);
if (!headlerMap) {
return;
}
const headler = headlerMap.get(callback);
headlerMap.delete(callback);
return headler;
}
reset() {
this.headlers = new Map();
this.uniqueMessageHandlers = new Map();
}
async each(callback) {
for (const [channel, headlerMap] of this.headlers) {
for (const headler of headlerMap.values()) {
await callback(channel, headler);
}
}
}
}

View File

@ -0,0 +1,13 @@
/**
* This file is part of the NocoBase (R) project.
* Copyright (c) 2020-2024 NocoBase Co., Ltd.
* Authors: NocoBase Team.
*
* This project is dual-licensed under AGPL-3.0 and NocoBase Commercial License.
* For more information, please refer to: https://www.nocobase.com/agreement.
*/
export * from './handler-manager';
export * from './pub-sub-manager';
export * from './types';

View File

@ -0,0 +1,104 @@
/**
* This file is part of the NocoBase (R) project.
* Copyright (c) 2020-2024 NocoBase Co., Ltd.
* Authors: NocoBase Team.
*
* This project is dual-licensed under AGPL-3.0 and NocoBase Commercial License.
* For more information, please refer to: https://www.nocobase.com/agreement.
*/
import { uid } from '@nocobase/utils';
import Application from '../application';
import { HandlerManager } from './handler-manager';
import {
type IPubSubAdapter,
type PubSubManagerOptions,
type PubSubManagerPublishOptions,
type PubSubManagerSubscribeOptions,
} from './types';
export const createPubSubManager = (app: Application, options: PubSubManagerOptions) => {
const pubSubManager = new PubSubManager(options);
app.on('afterStart', async () => {
await pubSubManager.connect();
});
app.on('afterStop', async () => {
await pubSubManager.close();
});
return pubSubManager;
};
export class PubSubManager {
protected publisherId: string;
protected adapter: IPubSubAdapter;
protected handlerManager: HandlerManager;
constructor(protected options: PubSubManagerOptions = {}) {
this.publisherId = uid();
this.handlerManager = new HandlerManager(this.publisherId);
}
get channelPrefix() {
return this.options?.channelPrefix ? `${this.options.channelPrefix}.` : '';
}
setAdapter(adapter: IPubSubAdapter) {
this.adapter = adapter;
}
async isConnected() {
return this.adapter?.isConnected();
}
async connect() {
if (!this.adapter) {
return;
}
await this.adapter.connect();
// 如果没连接前添加的订阅,连接后需要把订阅添加上
await this.handlerManager.each(async (channel, headler) => {
await this.adapter.subscribe(`${this.channelPrefix}${channel}`, headler);
});
}
async close() {
if (!this.adapter) {
return;
}
return await this.adapter.close();
}
async subscribe(channel: string, callback, options: PubSubManagerSubscribeOptions = {}) {
// 先退订,防止重复订阅
await this.unsubscribe(channel, callback);
const handler = this.handlerManager.set(channel, callback, options);
// 连接之后才能订阅
if (await this.adapter.isConnected()) {
await this.adapter.subscribe(`${this.channelPrefix}${channel}`, handler);
}
}
async unsubscribe(channel, callback) {
const handler = this.handlerManager.delete(channel, callback);
if (!this.adapter || !handler) {
return;
}
return this.adapter.unsubscribe(`${this.channelPrefix}${channel}`, handler);
}
async publish(channel, message, options?: PubSubManagerPublishOptions) {
if (!this.adapter) {
return;
}
const wrappedMessage = JSON.stringify({
publisherId: this.publisherId,
...options,
message: message,
});
return this.adapter.publish(`${this.channelPrefix}${channel}`, wrappedMessage);
}
}

View File

@ -0,0 +1,32 @@
/**
* This file is part of the NocoBase (R) project.
* Copyright (c) 2020-2024 NocoBase Co., Ltd.
* Authors: NocoBase Team.
*
* This project is dual-licensed under AGPL-3.0 and NocoBase Commercial License.
* For more information, please refer to: https://www.nocobase.com/agreement.
*/
export interface PubSubManagerOptions {
channelPrefix?: string;
}
export interface PubSubManagerPublishOptions {
skipSelf?: boolean;
onlySelf?: boolean;
}
export interface PubSubManagerSubscribeOptions {
debounce?: number;
}
export type PubSubCallback = (message: any) => Promise<void>;
export interface IPubSubAdapter {
isConnected(): Promise<boolean>;
connect(): Promise<any>;
close(): Promise<any>;
subscribe(channel: string, callback: PubSubCallback): Promise<any>;
unsubscribe(channel: string, callback: PubSubCallback): Promise<any>;
publish(channel: string, message: any): Promise<any>;
}

View File

@ -47,6 +47,10 @@ export class MemoryPubSubAdapter implements IPubSubAdapter {
this.connected = false;
}
async isConnected() {
return this.connected;
}
async subscribe(channel, callback) {
this.emitter.on(channel, callback);
}