diff --git a/packages/core/server/src/lock-manager.ts b/packages/core/server/src/lock-manager.ts index af64533863..39f5a073bb 100644 --- a/packages/core/server/src/lock-manager.ts +++ b/packages/core/server/src/lock-manager.ts @@ -12,12 +12,17 @@ import { Mutex, tryAcquire, MutexInterface, E_CANCELED } from 'async-mutex'; export type Releaser = () => void | Promise; -export abstract class AbstractLockAdapter { - async connect() {} - async close() {} - abstract acquire(key: string, ttl: number): Releaser | Promise; - abstract runExclusive(key: string, fn: () => Promise, ttl: number): Promise; - // abstract tryAcquire(key: string, ttl: number): Releaser | Promise; +export interface ILock { + acquire(ttl: number): Releaser | Promise; + runExclusive(fn: () => Promise, ttl: number): Promise; +} + +export interface ILockAdapter { + connect(): Promise; + close(): Promise; + acquire(key: string, ttl: number): Releaser | Promise; + runExclusive(key: string, fn: () => Promise, ttl: number): Promise; + // tryAcquire(key: string, timeout?: number): Promise; } export class LockAbortError extends Error { @@ -26,9 +31,18 @@ export class LockAbortError extends Error { } } -class LocalLockAdapter extends AbstractLockAdapter { +export class LockAcquireError extends Error { + constructor(message, options) { + super(message, options); + } +} + +class LocalLockAdapter implements ILockAdapter { static locks = new Map(); + async connect() {} + async close() {} + private getLock(key: string): MutexInterface { let lock = (this.constructor).locks.get(key); if (!lock) { @@ -72,13 +86,26 @@ class LocalLockAdapter extends AbstractLockAdapter { clearTimeout(timer); } } - // async tryAcquire(key: string, ttl: number) { - // const lock = this.getLock(key); - // return lock.tryAcquire(ttl); + + // async tryAcquire(key: string) { + // try { + // const lock = this.getLock(key); + // await tryAcquire(lock); + // return { + // async acquire(ttl) { + // return this.acquire(key, ttl); + // }, + // async runExclusive(fn: () => Promise, ttl) { + // return this.runExclusive(key, fn, ttl); + // }, + // }; + // } catch (e) { + // throw new LockAcquireError('Lock acquire error', { cause: e }); + // } // } } -export interface LockAdapterConfig { +export interface LockAdapterConfig { Adapter: new (...args: any[]) => C; options?: Record; } @@ -89,7 +116,7 @@ export interface LockManagerOptions { export class LockManager { private registry = new Registry(); - private adapters = new Map(); + private adapters = new Map(); constructor(private options: LockManagerOptions = {}) { this.registry.register('local', { @@ -101,7 +128,7 @@ export class LockManager { this.registry.register(name, adapterConfig); } - private async getAdapter(): Promise { + private async getAdapter(): Promise { const type = this.options.defaultAdapter || 'local'; let client = this.adapters.get(type); if (!client) {