diff --git a/packages/plugins/@nocobase/plugin-workflow-test/src/server/triggers.ts b/packages/plugins/@nocobase/plugin-workflow-test/src/server/triggers.ts index 91c5ae4544..844e125b91 100644 --- a/packages/plugins/@nocobase/plugin-workflow-test/src/server/triggers.ts +++ b/packages/plugins/@nocobase/plugin-workflow-test/src/server/triggers.ts @@ -4,10 +4,16 @@ export default { on() {} off() {} sync = true; + validateEvent() { + return true; + } }, asyncTrigger: class { constructor(public readonly workflow) {} on() {} off() {} + validateEvent() { + return true; + } }, }; diff --git a/packages/plugins/@nocobase/plugin-workflow/src/server/Plugin.ts b/packages/plugins/@nocobase/plugin-workflow/src/server/Plugin.ts index 1ef66b735c..dfdf3726d2 100644 --- a/packages/plugins/@nocobase/plugin-workflow/src/server/Plugin.ts +++ b/packages/plugins/@nocobase/plugin-workflow/src/server/Plugin.ts @@ -265,7 +265,7 @@ export default class PluginWorkflowServer extends Plugin { this.getLogger(workflow.id).error(`trigger type ${workflow.type} of workflow ${workflow.id} is not implemented`); return; } - if (typeof enable !== 'undefined' ? enable : workflow.get('enabled')) { + if (enable ?? workflow.get('enabled')) { // NOTE: remove previous listener if config updated const prev = workflow.previous(); if (prev.config) { @@ -351,25 +351,15 @@ export default class PluginWorkflowServer extends Plugin { } private async createExecution(workflow: WorkflowModel, context, options): Promise { - if (options.context?.executionId) { - // NOTE: no transaction here for read-uncommitted execution - const existed = await workflow.countExecutions({ - where: { - id: options.context.executionId, - }, - transaction: options.transaction, - }); - - if (existed) { - this.getLogger(workflow.id).warn( - `workflow ${workflow.id} has already been triggered in same execution (${options.context.executionId}), and newly triggering will be skipped.`, - ); - - return null; - } - } - const { transaction = await this.db.sequelize.transaction() } = options; + const trigger = this.triggers.get(workflow.type); + const valid = await trigger.validateEvent(workflow, context, { ...options, transaction }); + if (!valid) { + if (!options.transaction) { + await transaction.commit(); + } + return null; + } const execution = await workflow.createExecution( { @@ -410,6 +400,10 @@ export default class PluginWorkflowServer extends Plugin { } private prepare = async () => { + if (this.executing && this.db.options.dialect === 'sqlite') { + await this.executing; + } + const event = this.events.shift(); this.eventsCount = this.events.length; if (!event) { @@ -423,7 +417,7 @@ export default class PluginWorkflowServer extends Plugin { try { const execution = await this.createExecution(...event); // NOTE: cache first execution for most cases - if (!this.executing && !this.pending.length) { + if (execution && !this.executing && !this.pending.length) { this.pending.push([execution]); } } catch (err) { @@ -449,6 +443,10 @@ export default class PluginWorkflowServer extends Plugin { return; } + if (this.events.length) { + return this.prepare(); + } + this.executing = (async () => { let next: Pending | null = null; // resuming has high priority diff --git a/packages/plugins/@nocobase/plugin-workflow/src/server/__tests__/Plugin.test.ts b/packages/plugins/@nocobase/plugin-workflow/src/server/__tests__/Plugin.test.ts index bf3f0545a9..6483429806 100644 --- a/packages/plugins/@nocobase/plugin-workflow/src/server/__tests__/Plugin.test.ts +++ b/packages/plugins/@nocobase/plugin-workflow/src/server/__tests__/Plugin.test.ts @@ -233,52 +233,6 @@ describe('workflow > Plugin', () => { }); }); - describe('cycling trigger', () => { - it('trigger should not be triggered more than once in same execution', async () => { - const workflow = await WorkflowModel.create({ - enabled: true, - type: 'collection', - config: { - mode: 1, - collection: 'posts', - }, - }); - - const n1 = await workflow.createNode({ - type: 'create', - config: { - collection: 'posts', - params: { - values: { - title: 't2', - }, - }, - }, - }); - - const post = await PostRepo.create({ values: { title: 't1' } }); - - await sleep(500); - - const posts = await PostRepo.find(); - expect(posts.length).toBe(2); - - const [execution] = await workflow.getExecutions(); - expect(execution.status).toBe(EXECUTION_STATUS.RESOLVED); - - // NOTE: second trigger to ensure no skipped event - const p3 = await PostRepo.create({ values: { title: 't2' } }); - - await sleep(500); - - const posts2 = await PostRepo.find(); - expect(posts2.length).toBe(4); - - const [execution2] = await workflow.getExecutions({ order: [['createdAt', 'DESC']] }); - expect(execution2.status).toBe(EXECUTION_STATUS.RESOLVED); - }); - }); - describe('dispatcher', () => { it('multiple triggers in same event', async () => { const w1 = await WorkflowModel.create({ diff --git a/packages/plugins/@nocobase/plugin-workflow/src/server/__tests__/triggers/collection.test.ts b/packages/plugins/@nocobase/plugin-workflow/src/server/__tests__/triggers/collection.test.ts index cc74e8bd33..3ce379a164 100644 --- a/packages/plugins/@nocobase/plugin-workflow/src/server/__tests__/triggers/collection.test.ts +++ b/packages/plugins/@nocobase/plugin-workflow/src/server/__tests__/triggers/collection.test.ts @@ -371,6 +371,113 @@ describe('workflow > triggers > collection', () => { }); }); + describe('cycling trigger', () => { + it('trigger should not be triggered more than once in same execution', async () => { + const workflow = await WorkflowModel.create({ + enabled: true, + type: 'collection', + config: { + mode: 1, + collection: 'posts', + }, + }); + + const n1 = await workflow.createNode({ + type: 'create', + config: { + collection: 'posts', + params: { + values: { + title: 't2', + }, + }, + }, + }); + + const p1 = await PostRepo.create({ values: { title: 't1' } }); + + await sleep(500); + + const posts = await PostRepo.find(); + expect(posts.length).toBe(2); + + const e1s = await workflow.getExecutions(); + expect(e1s.length).toBe(1); + expect(e1s[0].status).toBe(EXECUTION_STATUS.RESOLVED); + + // NOTE: second trigger to ensure no skipped event + const p3 = await PostRepo.create({ values: { title: 't3' } }); + + await sleep(500); + + const posts2 = await PostRepo.find(); + expect(posts2.length).toBe(4); + + const e2s = await workflow.getExecutions({ order: [['createdAt', 'DESC']] }); + expect(e2s.length).toBe(2); + expect(e2s[1].status).toBe(EXECUTION_STATUS.RESOLVED); + }); + + it('multiple cycling trigger should not trigger more than once', async () => { + const w1 = await WorkflowModel.create({ + enabled: true, + type: 'collection', + config: { + mode: 1, + collection: 'posts', + }, + }); + + const n1 = await w1.createNode({ + type: 'create', + config: { + collection: 'categories', + params: { + values: { + title: 'c1', + }, + }, + }, + }); + + const w2 = await WorkflowModel.create({ + enabled: true, + type: 'collection', + config: { + mode: 1, + collection: 'categories', + }, + }); + + const n2 = await w2.createNode({ + type: 'create', + config: { + collection: 'posts', + params: { + values: { + title: 't2', + }, + }, + }, + }); + + const p1 = await PostRepo.create({ values: { title: 't1' } }); + + await sleep(500); + + const posts = await PostRepo.find(); + expect(posts.length).toBe(2); + + const e1s = await w1.getExecutions(); + expect(e1s.length).toBe(1); + expect(e1s[0].status).toBe(EXECUTION_STATUS.RESOLVED); + + const e2s = await w2.getExecutions(); + expect(e2s.length).toBe(1); + expect(e2s[0].status).toBe(EXECUTION_STATUS.RESOLVED); + }); + }); + describe('sync', () => { it('sync collection trigger', async () => { const workflow = await WorkflowModel.create({ diff --git a/packages/plugins/@nocobase/plugin-workflow/src/server/instructions/CreateInstruction.ts b/packages/plugins/@nocobase/plugin-workflow/src/server/instructions/CreateInstruction.ts index edc2ca9670..d8f1790aa3 100644 --- a/packages/plugins/@nocobase/plugin-workflow/src/server/instructions/CreateInstruction.ts +++ b/packages/plugins/@nocobase/plugin-workflow/src/server/instructions/CreateInstruction.ts @@ -13,7 +13,7 @@ export class CreateInstruction extends Instruction { const created = await repository.create({ ...options, context: { - executionId: processor.execution.id, + stack: Array.from(new Set((processor.execution.context.stack ?? []).concat(processor.execution.id))), }, transaction: processor.transaction, }); diff --git a/packages/plugins/@nocobase/plugin-workflow/src/server/instructions/DestroyInstruction.ts b/packages/plugins/@nocobase/plugin-workflow/src/server/instructions/DestroyInstruction.ts index f6e4be1600..9297742cf3 100644 --- a/packages/plugins/@nocobase/plugin-workflow/src/server/instructions/DestroyInstruction.ts +++ b/packages/plugins/@nocobase/plugin-workflow/src/server/instructions/DestroyInstruction.ts @@ -12,7 +12,7 @@ export class DestroyInstruction extends Instruction { const result = await repo.destroy({ ...options, context: { - executionId: processor.execution.id, + stack: Array.from(new Set((processor.execution.context.stack ?? []).concat(processor.execution.id))), }, transaction: processor.transaction, }); diff --git a/packages/plugins/@nocobase/plugin-workflow/src/server/instructions/UpdateInstruction.ts b/packages/plugins/@nocobase/plugin-workflow/src/server/instructions/UpdateInstruction.ts index e95ef41eed..2f4836f157 100644 --- a/packages/plugins/@nocobase/plugin-workflow/src/server/instructions/UpdateInstruction.ts +++ b/packages/plugins/@nocobase/plugin-workflow/src/server/instructions/UpdateInstruction.ts @@ -12,7 +12,7 @@ export class UpdateInstruction extends Instruction { const result = await repo.update({ ...options, context: { - executionId: processor.execution.id, + stack: Array.from(new Set((processor.execution.context.stack ?? []).concat(processor.execution.id))), }, transaction: processor.transaction, }); diff --git a/packages/plugins/@nocobase/plugin-workflow/src/server/triggers/CollectionTrigger.ts b/packages/plugins/@nocobase/plugin-workflow/src/server/triggers/CollectionTrigger.ts index 1faa2a0bcf..abe6de5255 100644 --- a/packages/plugins/@nocobase/plugin-workflow/src/server/triggers/CollectionTrigger.ts +++ b/packages/plugins/@nocobase/plugin-workflow/src/server/triggers/CollectionTrigger.ts @@ -1,4 +1,4 @@ -import { Collection, Model } from '@nocobase/database'; +import { Collection, Model, Transactionable } from '@nocobase/database'; import Trigger from '.'; import { toJSON } from '../utils'; import type { WorkflowModel } from '../types'; @@ -154,4 +154,33 @@ export default class CollectionTrigger extends Trigger { } } } + + async validateEvent( + workflow: WorkflowModel, + context: any, + options: { context?: { stack?: number[] } } & Transactionable, + ): Promise { + if (options.context?.stack) { + const existed = await workflow.countExecutions({ + where: { + id: options.context.stack, + }, + transaction: options.transaction, + }); + + if (existed) { + this.workflow + .getLogger(workflow.id) + .warn( + `workflow ${workflow.id} has already been triggered in stack executions (${options.context.stack}), and newly triggering will be skipped.`, + ); + + return false; + } + + context.stack = options.context.stack; + } + + return true; + } } diff --git a/packages/plugins/@nocobase/plugin-workflow/src/server/triggers/index.ts b/packages/plugins/@nocobase/plugin-workflow/src/server/triggers/index.ts index 34ec0a5565..1737c3c90f 100644 --- a/packages/plugins/@nocobase/plugin-workflow/src/server/triggers/index.ts +++ b/packages/plugins/@nocobase/plugin-workflow/src/server/triggers/index.ts @@ -6,6 +6,9 @@ export abstract class Trigger { constructor(public readonly workflow: Plugin) {} abstract on(workflow: WorkflowModel): void; abstract off(workflow: WorkflowModel): void; + validateEvent(workflow: WorkflowModel, context: any, options: Transactionable): boolean | Promise { + return true; + } duplicateConfig?(workflow: WorkflowModel, options: Transactionable): object | Promise; sync?: boolean; }