fix(plugin-workflow): fix collection cycling triggering (#3448)

* fix(plugin-workflow): fix collection cycling triggering

* fix(plugin-workflow-test): fix test trigger

* fix(plugin-workflow): fix sqlite transaction triggering
This commit is contained in:
Junyi 2024-01-27 22:46:19 +08:00 committed by GitHub
parent b7e60eb8ec
commit fae544d1b1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 167 additions and 70 deletions

View File

@ -4,10 +4,16 @@ export default {
on() {}
off() {}
sync = true;
validateEvent() {
return true;
}
},
asyncTrigger: class {
constructor(public readonly workflow) {}
on() {}
off() {}
validateEvent() {
return true;
}
},
};

View File

@ -265,7 +265,7 @@ export default class PluginWorkflowServer extends Plugin {
this.getLogger(workflow.id).error(`trigger type ${workflow.type} of workflow ${workflow.id} is not implemented`);
return;
}
if (typeof enable !== 'undefined' ? enable : workflow.get('enabled')) {
if (enable ?? workflow.get('enabled')) {
// NOTE: remove previous listener if config updated
const prev = workflow.previous();
if (prev.config) {
@ -351,25 +351,15 @@ export default class PluginWorkflowServer extends Plugin {
}
private async createExecution(workflow: WorkflowModel, context, options): Promise<ExecutionModel | null> {
if (options.context?.executionId) {
// NOTE: no transaction here for read-uncommitted execution
const existed = await workflow.countExecutions({
where: {
id: options.context.executionId,
},
transaction: options.transaction,
});
if (existed) {
this.getLogger(workflow.id).warn(
`workflow ${workflow.id} has already been triggered in same execution (${options.context.executionId}), and newly triggering will be skipped.`,
);
return null;
}
}
const { transaction = await this.db.sequelize.transaction() } = options;
const trigger = this.triggers.get(workflow.type);
const valid = await trigger.validateEvent(workflow, context, { ...options, transaction });
if (!valid) {
if (!options.transaction) {
await transaction.commit();
}
return null;
}
const execution = await workflow.createExecution(
{
@ -410,6 +400,10 @@ export default class PluginWorkflowServer extends Plugin {
}
private prepare = async () => {
if (this.executing && this.db.options.dialect === 'sqlite') {
await this.executing;
}
const event = this.events.shift();
this.eventsCount = this.events.length;
if (!event) {
@ -423,7 +417,7 @@ export default class PluginWorkflowServer extends Plugin {
try {
const execution = await this.createExecution(...event);
// NOTE: cache first execution for most cases
if (!this.executing && !this.pending.length) {
if (execution && !this.executing && !this.pending.length) {
this.pending.push([execution]);
}
} catch (err) {
@ -449,6 +443,10 @@ export default class PluginWorkflowServer extends Plugin {
return;
}
if (this.events.length) {
return this.prepare();
}
this.executing = (async () => {
let next: Pending | null = null;
// resuming has high priority

View File

@ -233,52 +233,6 @@ describe('workflow > Plugin', () => {
});
});
describe('cycling trigger', () => {
it('trigger should not be triggered more than once in same execution', async () => {
const workflow = await WorkflowModel.create({
enabled: true,
type: 'collection',
config: {
mode: 1,
collection: 'posts',
},
});
const n1 = await workflow.createNode({
type: 'create',
config: {
collection: 'posts',
params: {
values: {
title: 't2',
},
},
},
});
const post = await PostRepo.create({ values: { title: 't1' } });
await sleep(500);
const posts = await PostRepo.find();
expect(posts.length).toBe(2);
const [execution] = await workflow.getExecutions();
expect(execution.status).toBe(EXECUTION_STATUS.RESOLVED);
// NOTE: second trigger to ensure no skipped event
const p3 = await PostRepo.create({ values: { title: 't2' } });
await sleep(500);
const posts2 = await PostRepo.find();
expect(posts2.length).toBe(4);
const [execution2] = await workflow.getExecutions({ order: [['createdAt', 'DESC']] });
expect(execution2.status).toBe(EXECUTION_STATUS.RESOLVED);
});
});
describe('dispatcher', () => {
it('multiple triggers in same event', async () => {
const w1 = await WorkflowModel.create({

View File

@ -371,6 +371,113 @@ describe('workflow > triggers > collection', () => {
});
});
describe('cycling trigger', () => {
it('trigger should not be triggered more than once in same execution', async () => {
const workflow = await WorkflowModel.create({
enabled: true,
type: 'collection',
config: {
mode: 1,
collection: 'posts',
},
});
const n1 = await workflow.createNode({
type: 'create',
config: {
collection: 'posts',
params: {
values: {
title: 't2',
},
},
},
});
const p1 = await PostRepo.create({ values: { title: 't1' } });
await sleep(500);
const posts = await PostRepo.find();
expect(posts.length).toBe(2);
const e1s = await workflow.getExecutions();
expect(e1s.length).toBe(1);
expect(e1s[0].status).toBe(EXECUTION_STATUS.RESOLVED);
// NOTE: second trigger to ensure no skipped event
const p3 = await PostRepo.create({ values: { title: 't3' } });
await sleep(500);
const posts2 = await PostRepo.find();
expect(posts2.length).toBe(4);
const e2s = await workflow.getExecutions({ order: [['createdAt', 'DESC']] });
expect(e2s.length).toBe(2);
expect(e2s[1].status).toBe(EXECUTION_STATUS.RESOLVED);
});
it('multiple cycling trigger should not trigger more than once', async () => {
const w1 = await WorkflowModel.create({
enabled: true,
type: 'collection',
config: {
mode: 1,
collection: 'posts',
},
});
const n1 = await w1.createNode({
type: 'create',
config: {
collection: 'categories',
params: {
values: {
title: 'c1',
},
},
},
});
const w2 = await WorkflowModel.create({
enabled: true,
type: 'collection',
config: {
mode: 1,
collection: 'categories',
},
});
const n2 = await w2.createNode({
type: 'create',
config: {
collection: 'posts',
params: {
values: {
title: 't2',
},
},
},
});
const p1 = await PostRepo.create({ values: { title: 't1' } });
await sleep(500);
const posts = await PostRepo.find();
expect(posts.length).toBe(2);
const e1s = await w1.getExecutions();
expect(e1s.length).toBe(1);
expect(e1s[0].status).toBe(EXECUTION_STATUS.RESOLVED);
const e2s = await w2.getExecutions();
expect(e2s.length).toBe(1);
expect(e2s[0].status).toBe(EXECUTION_STATUS.RESOLVED);
});
});
describe('sync', () => {
it('sync collection trigger', async () => {
const workflow = await WorkflowModel.create({

View File

@ -13,7 +13,7 @@ export class CreateInstruction extends Instruction {
const created = await repository.create({
...options,
context: {
executionId: processor.execution.id,
stack: Array.from(new Set((processor.execution.context.stack ?? []).concat(processor.execution.id))),
},
transaction: processor.transaction,
});

View File

@ -12,7 +12,7 @@ export class DestroyInstruction extends Instruction {
const result = await repo.destroy({
...options,
context: {
executionId: processor.execution.id,
stack: Array.from(new Set((processor.execution.context.stack ?? []).concat(processor.execution.id))),
},
transaction: processor.transaction,
});

View File

@ -12,7 +12,7 @@ export class UpdateInstruction extends Instruction {
const result = await repo.update({
...options,
context: {
executionId: processor.execution.id,
stack: Array.from(new Set((processor.execution.context.stack ?? []).concat(processor.execution.id))),
},
transaction: processor.transaction,
});

View File

@ -1,4 +1,4 @@
import { Collection, Model } from '@nocobase/database';
import { Collection, Model, Transactionable } from '@nocobase/database';
import Trigger from '.';
import { toJSON } from '../utils';
import type { WorkflowModel } from '../types';
@ -154,4 +154,33 @@ export default class CollectionTrigger extends Trigger {
}
}
}
async validateEvent(
workflow: WorkflowModel,
context: any,
options: { context?: { stack?: number[] } } & Transactionable,
): Promise<boolean> {
if (options.context?.stack) {
const existed = await workflow.countExecutions({
where: {
id: options.context.stack,
},
transaction: options.transaction,
});
if (existed) {
this.workflow
.getLogger(workflow.id)
.warn(
`workflow ${workflow.id} has already been triggered in stack executions (${options.context.stack}), and newly triggering will be skipped.`,
);
return false;
}
context.stack = options.context.stack;
}
return true;
}
}

View File

@ -6,6 +6,9 @@ export abstract class Trigger {
constructor(public readonly workflow: Plugin) {}
abstract on(workflow: WorkflowModel): void;
abstract off(workflow: WorkflowModel): void;
validateEvent(workflow: WorkflowModel, context: any, options: Transactionable): boolean | Promise<boolean> {
return true;
}
duplicateConfig?(workflow: WorkflowModel, options: Transactionable): object | Promise<object>;
sync?: boolean;
}