diff --git a/CommonServer/Infrastructure/Semaphore.ts b/CommonServer/Infrastructure/Semaphore.ts index f00e5db19f..6a6ee272d1 100644 --- a/CommonServer/Infrastructure/Semaphore.ts +++ b/CommonServer/Infrastructure/Semaphore.ts @@ -1,16 +1,14 @@ import Redis, { ClientType } from "./Redis"; -import Dictionary from "Common/Types/Dictionary"; -import ObjectID from "Common/Types/ObjectID"; import { Mutex } from "redis-semaphore"; -export default class Semaphore { - private static mutexDictionary: Dictionary = {}; +export type SemaphoreMutex = Mutex; +export default class Semaphore { // returns the mutex id public static async lock(data: { key: string; lockTimeout?: number; - }): Promise { + }): Promise { if (!data.lockTimeout) { data.lockTimeout = 1000; } @@ -23,30 +21,16 @@ export default class Semaphore { throw new Error("Redis client is not connected"); } - const mutex: Mutex = new Mutex(client, key, { + const mutex: SemaphoreMutex = new Mutex(client, key, { lockTimeout: data.lockTimeout, }); await mutex.acquire(); - const mutexId: ObjectID = ObjectID.generate(); - - // add to the dictionary - this.mutexDictionary[mutexId.toString()] = mutex; - - return mutexId; + return mutex; } - public static async release(mutexId: ObjectID): Promise { - const mutex: Mutex | undefined = this.mutexDictionary[mutexId.toString()]; - - if (!mutex) { - return; // already released - } - + public static async release(mutex: SemaphoreMutex): Promise { await mutex.release(); - - // remove from the dictionary - delete this.mutexDictionary[mutexId.toString()]; } } diff --git a/CommonServer/Services/MonitorStatusTimelineService.ts b/CommonServer/Services/MonitorStatusTimelineService.ts index ea9c2ec2e7..690435ce6c 100644 --- a/CommonServer/Services/MonitorStatusTimelineService.ts +++ b/CommonServer/Services/MonitorStatusTimelineService.ts @@ -1,5 +1,5 @@ import PostgresDatabase from "../Infrastructure/PostgresDatabase"; -import Semaphore from "../Infrastructure/Semaphore"; +import Semaphore, { SemaphoreMutex } from "../Infrastructure/Semaphore"; import CreateBy from "../Types/Database/CreateBy"; import DeleteBy from "../Types/Database/DeleteBy"; import { OnCreate, OnDelete } from "../Types/Database/Hooks"; @@ -29,10 +29,10 @@ export class Service extends DatabaseService { throw new BadDataException("monitorId is null"); } - let mutexId: ObjectID | null = null; + let mutex: SemaphoreMutex | null = null; try { - mutexId = await Semaphore.lock({ + mutex = await Semaphore.lock({ key: createBy.data.monitorId.toString(), }); } catch (e) { @@ -98,7 +98,7 @@ export class Service extends DatabaseService { createBy, carryForward: { lastMonitorStatusTimelineId: lastMonitorStatusTimeline?.id || null, - mutexId: mutexId, + mutex: mutex, }, }; } @@ -139,9 +139,9 @@ export class Service extends DatabaseService { props: onCreate.createBy.props, }); - if (onCreate.carryForward.mutexId) { - const mutexId: ObjectID = onCreate.carryForward.mutexId; - await Semaphore.release(mutexId); + if (onCreate.carryForward.mutex) { + const mutex: SemaphoreMutex = onCreate.carryForward.mutex; + await Semaphore.release(mutex); } return createdItem; diff --git a/Ingestor/API/Monitor.ts b/Ingestor/API/Monitor.ts index 7b346bb98c..27a2c73f80 100644 --- a/Ingestor/API/Monitor.ts +++ b/Ingestor/API/Monitor.ts @@ -10,7 +10,9 @@ import BadDataException from "Common/Types/Exception/BadDataException"; import { JSONObject } from "Common/Types/JSON"; import ObjectID from "Common/Types/ObjectID"; import PositiveNumber from "Common/Types/PositiveNumber"; -import Semaphore from "CommonServer/Infrastructure/Semaphore"; +import Semaphore, { + SemaphoreMutex, +} from "CommonServer/Infrastructure/Semaphore"; import ClusterKeyAuthorization from "CommonServer/Middleware/ClusterKeyAuthorization"; import MonitorProbeService from "CommonServer/Services/MonitorProbeService"; import Query from "CommonServer/Types/Database/Query"; @@ -196,7 +198,7 @@ router.post( res: ExpressResponse, next: NextFunction, ): Promise => { - let mutexId: ObjectID | null = null; + let mutex: SemaphoreMutex | null = null; try { const data: JSONObject = req.body; @@ -224,7 +226,7 @@ router.post( } try { - mutexId = await Semaphore.lock({ + mutex = await Semaphore.lock({ key: probeId.toString(), }); } catch (err) { @@ -287,8 +289,12 @@ router.post( }); } - if (mutexId) { - await Semaphore.release(mutexId); + if (mutex) { + try { + await Semaphore.release(mutex); + } catch (err) { + logger.error(err); + } } const monitors: Array = monitorProbes @@ -321,8 +327,8 @@ router.post( ); } catch (err) { try { - if (mutexId) { - await Semaphore.release(mutexId); + if (mutex) { + await Semaphore.release(mutex); } } catch (err) { logger.error(err);