test(plugin-workflow): check for duplicated triggering (#4762)

* test(plugin-workflow): check for duplicated triggering

* fix(plugin-workflow): fix test case
This commit is contained in:
Junyi 2024-06-26 22:20:12 +08:00 committed by GitHub
parent 4999998a38
commit ca9b5b1f1e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 55 additions and 17 deletions

View File

@ -250,7 +250,10 @@ export default class PluginWorkflowServer extends Plugin {
// * load all workflows in db
// * add all hooks for enabled workflows
// * add hooks for create/update[enabled]/delete workflow to add/remove specific hooks
this.app.on('beforeStart', async () => {
this.app.on('afterStart', async () => {
this.app.setMaintainingMessage('check for not started executions');
this.ready = true;
const collection = db.getCollection('workflows');
const workflows = await collection.repository.find({
filter: { enabled: true },
@ -263,11 +266,7 @@ export default class PluginWorkflowServer extends Plugin {
this.checker = setInterval(() => {
this.dispatch();
}, 300_000);
});
this.app.on('afterStart', () => {
this.app.setMaintainingMessage('check for not started executions');
this.ready = true;
// check for not started executions
this.dispatch();
});

View File

@ -102,9 +102,13 @@ export default class Processor {
}
public async prepare() {
const { execution, transaction } = this;
const {
execution,
transaction,
options: { plugin },
} = this;
if (!execution.workflow) {
execution.workflow = await execution.getWorkflow({ transaction });
execution.workflow = plugin.enabledCache.get(execution.workflowId);
}
const nodes = await execution.workflow.getNodes({ transaction });

View File

@ -389,6 +389,10 @@ describe('workflow > triggers > schedule > static mode', () => {
},
});
const n1 = workflow.createNode({
type: 'echo',
});
(app.pm.get('workflow') as Plugin).trigger(
workflow,
{ date: start },
@ -399,6 +403,41 @@ describe('workflow > triggers > schedule > static mode', () => {
const e1s = await workflow.getExecutions();
expect(e1s.length).toBe(1);
const j1s = await e1s[0].getJobs();
expect(j1s.length).toBe(1);
});
it('toggle same workflow on should not be triggered in same time more than once', async () => {
await sleepToEvenSecond();
const start = new Date();
start.setMilliseconds(0);
start.setSeconds(start.getSeconds() + 2);
const workflow = await WorkflowRepo.create({
values: {
enabled: true,
type: 'schedule',
config: {
mode: 0,
startsOn: start.toISOString(),
},
},
});
const n1 = workflow.createNode({
type: 'echo',
});
const trigger = (app.pm.get('workflow') as Plugin).triggers.get(workflow.type);
trigger.on(workflow);
await sleep(3000);
const e1s = await workflow.getExecutions();
expect(e1s.length).toBe(1);
const j1s = await e1s[0].getJobs();
expect(j1s.length).toBe(1);
});
});
});

View File

@ -127,10 +127,9 @@ export default class ScheduleTrigger {
}
async reload() {
const WorkflowRepo = this.workflow.app.db.getRepository('workflows');
const workflows = await WorkflowRepo.find({
filter: { enabled: true, type: 'schedule', 'config.mode': SCHEDULE_MODE.DATE_FIELD },
});
const workflows = Array.from(this.workflow.enabledCache.values()).filter(
(item) => item.type === 'schedule' && item.config.mode === SCHEDULE_MODE.DATE_FIELD,
);
// NOTE: clear cached jobs in last cycle
this.cache = new Map();

View File

@ -20,10 +20,9 @@ export default class StaticScheduleTrigger {
constructor(public workflow: Plugin) {
workflow.app.on('afterStart', async () => {
const WorkflowRepo = this.workflow.app.db.getRepository('workflows');
const workflows = await WorkflowRepo.find({
filter: { enabled: true, type: 'schedule', 'config.mode': SCHEDULE_MODE.STATIC },
});
const workflows = Array.from(this.workflow.enabledCache.values()).filter(
(item) => item.type === 'schedule' && item.config.mode === SCHEDULE_MODE.STATIC,
);
this.inspect(workflows);
});

View File

@ -7,10 +7,8 @@
* For more information, please refer to: https://www.nocobase.com/agreement.
*/
import { Transactionable } from 'sequelize';
import Trigger from '..';
import type Plugin from '../../Plugin';
import { WorkflowModel } from '../../types';
import DateFieldScheduleTrigger from './DateFieldScheduleTrigger';
import StaticScheduleTrigger from './StaticScheduleTrigger';
import { SCHEDULE_MODE } from './utils';