fix(plugin-workflow): use dual pipes to process triggers (#1187)

* fix(plugin-workflow): use dual pipes to process triggers

* refactor(plugin-workflow): refactor dual pipe structure
This commit is contained in:
Junyi 2022-12-02 22:34:47 -08:00 committed by GitHub
parent e5a9721674
commit c19ca1f30b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 173 additions and 140 deletions

View File

@ -48,8 +48,8 @@ export interface Trigger {
view?: ISchema;
scope?: { [key: string]: any };
components?: { [key: string]: any };
render?(props): React.ReactElement;
getter?(node: any): React.ReactElement;
render?(props): React.ReactNode;
getter?(node: any): React.ReactNode;
};
export const triggers = new Registry<Trigger>();
@ -70,56 +70,52 @@ function TriggerExecution() {
<SchemaComponent
schema={{
type: 'void',
name: execution.id,
'x-component': 'Action',
'x-component-props': {
title: <InfoOutlined />,
shape: 'circle',
className: 'workflow-node-job-button',
type: 'primary'
},
properties: {
trigger: {
[execution.id]: {
type: 'void',
'x-component': 'Action',
'x-component-props': {
title: <InfoOutlined />,
shape: 'circle',
className: 'workflow-node-job-button',
type: 'primary'
'x-decorator': 'Form',
'x-decorator-props': {
initialValue: execution
},
'x-component': 'Action.Modal',
title: (
<div className={cx(nodeTitleClass)}>
<Tag>{compile(trigger.title)}</Tag>
<strong>{workflow.title}</strong>
<span className="workflow-node-id">#{execution.id}</span>
</div>
),
properties: {
[execution.id]: {
type: 'void',
'x-decorator': 'Form',
'x-decorator-props': {
initialValue: execution
createdAt: {
type: 'string',
title: `{{t("Triggered at", { ns: "${NAMESPACE}" })}}`,
'x-decorator': 'FormItem',
'x-component': 'DatePicker',
'x-component-props': {
showTime: true
},
'x-component': 'Action.Modal',
title: (
<div className={cx(nodeTitleClass)}>
<Tag>{compile(trigger.title)}</Tag>
<strong>{workflow.title}</strong>
<span className="workflow-node-id">#{execution.id}</span>
</div>
),
properties: {
createdAt: {
type: 'string',
title: `{{t("Triggered at", { ns: "${NAMESPACE}" })}}`,
'x-decorator': 'FormItem',
'x-component': 'DatePicker',
'x-component-props': {
showTime: true
},
'x-read-pretty': true,
},
context: {
type: 'object',
title: `{{t("Trigger variables", { ns: "${NAMESPACE}" })}}`,
'x-decorator': 'FormItem',
'x-component': 'Input.JSON',
'x-component-props': {
className: css`
padding: 1em;
background-color: #eee;
`
},
'x-read-pretty': true,
}
}
'x-read-pretty': true,
},
context: {
type: 'object',
title: `{{t("Trigger variables", { ns: "${NAMESPACE}" })}}`,
'x-decorator': 'FormItem',
'x-component': 'Input.JSON',
'x-component-props': {
className: css`
padding: 1em;
background-color: #eee;
`
},
'x-read-pretty': true,
}
}
}

View File

@ -16,14 +16,15 @@ import initTriggers, { Trigger } from './triggers';
import JobModel from './models/Job';
type Pending = [ExecutionModel, JobModel?];
export default class WorkflowPlugin extends Plugin {
instructions: Registry<Instruction> = new Registry();
triggers: Registry<Trigger> = new Registry();
calculators = calculators;
extensions = extensions;
executing: Promise<any> = null;
pending: [ExecutionModel, JobModel][] = [];
executing: ExecutionModel = null;
pending: Pending[] = [];
events: [WorkflowModel, any, { context?: any }][] = [];
onBeforeSave = async (instance: WorkflowModel, options) => {
const Model = <typeof WorkflowModel>instance.constructor;
@ -102,7 +103,7 @@ export default class WorkflowPlugin extends Plugin {
});
// check for not started executions
await this.dispatch();
this.dispatch();
});
this.app.on('beforeStop', async () => {
@ -132,12 +133,25 @@ export default class WorkflowPlugin extends Plugin {
}
}
public async trigger(workflow: WorkflowModel, context: Object, options: Transactionable & { context?: any } = {}): Promise<void> {
public trigger(workflow: WorkflowModel, context: Object, options: { context?: any } = {}): Promise<void> {
// `null` means not to trigger
if (context == null) {
return null;
return;
}
this.events.push([workflow, context, options]);
if (this.events.length > 1) {
return;
}
// NOTE: no await for quick return
setTimeout(this.prepare);
}
private prepare = async () => {
const [workflow, context, options] = this.events[0];
if (options.context?.executionId) {
// NOTE: no transaction here for read-uncommitted execution
const existed = await workflow.countExecutions({
@ -152,43 +166,47 @@ export default class WorkflowPlugin extends Plugin {
}
}
const transaction = await (<typeof WorkflowModel>workflow.constructor).database.sequelize.transaction();
await this.db.sequelize.transaction(async transaction => {
const execution = await workflow.createExecution({
context,
key: workflow.key,
status: EXECUTION_STATUS.CREATED,
useTransaction: workflow.useTransaction,
}, { transaction });
const execution = await workflow.createExecution({
context,
key: workflow.key,
status: EXECUTION_STATUS.CREATED,
useTransaction: workflow.useTransaction,
}, { transaction });
const executed = await workflow.countExecutions({ transaction });
console.log('workflow triggered:', new Date(), workflow.id, execution.id);
// NOTE: not to trigger afterUpdate hook here
await workflow.update({ executed }, { transaction, hooks: false });
const executed = await workflow.countExecutions({ transaction });
const allExecuted = await (<typeof ExecutionModel>execution.constructor).count({
where: {
key: workflow.key
},
transaction
});
await (<typeof WorkflowModel>workflow.constructor).update({
allExecuted
}, {
where: {
key: workflow.key
},
individualHooks: true,
transaction
});
// NOTE: not to trigger afterUpdate hook here
await workflow.update({ executed }, { transaction, hooks: false });
execution.workflow = workflow;
const allExecuted = await (<typeof ExecutionModel>execution.constructor).count({
where: {
key: workflow.key
},
transaction
});
await (<typeof WorkflowModel>workflow.constructor).update({
allExecuted
}, {
where: {
key: workflow.key
},
individualHooks: true,
transaction
return execution;
});
execution.workflow = workflow;
this.events.shift();
await transaction.commit();
setTimeout(() => this.dispatch(execution));
if (this.events.length) {
await this.prepare();
} else {
this.dispatch();
}
}
public async resume(job) {
@ -196,28 +214,37 @@ export default class WorkflowPlugin extends Plugin {
job.execution = await job.getExecution();
}
setTimeout(() => this.dispatch(job.execution, job));
this.pending.push([job.execution, job]);
this.dispatch();
}
private async dispatch(execution?: ExecutionModel, job?: JobModel) {
private async dispatch() {
if (this.executing) {
if (job) {
this.pending.push([execution, job]);
}
return;
}
if (!execution) {
execution = await this.db.getRepository('executions').findOne({
let next: Pending;
// resuming has high priority
if (this.pending.length) {
next = this.pending.shift();
} else {
const execution = await this.db.getRepository('executions').findOne({
filter: {
status: EXECUTION_STATUS.CREATED
},
sort: 'createdAt'
}) as ExecutionModel;
if (!execution) {
return;
if (execution) {
next = [execution];
}
};
if (next) {
this.process(...next);
}
}
private async process(execution: ExecutionModel, job?: JobModel) {
this.executing = execution;
if (execution.status === EXECUTION_STATUS.CREATED) {
await execution.update({ status: EXECUTION_STATUS.STARTED });
@ -225,16 +252,13 @@ export default class WorkflowPlugin extends Plugin {
const processor = this.createProcessor(execution);
this.executing = job ? processor.resume(job) : processor.start();
console.log('workflow processing:', new Date(), execution.workflowId, execution.id);
await this.executing;
await (job ? processor.resume(job) : processor.start());
this.executing = null;
setTimeout(() => {
const args = this.pending.length ? this.pending.shift() : [];
this.dispatch(...args);
});
this.dispatch();
}
private createProcessor(execution: ExecutionModel, options = {}): Processor {

View File

@ -70,15 +70,13 @@ export default class Processor {
const { options } = this;
const { sequelize } = (<typeof ExecutionModel>this.execution.constructor).database;
// @ts-ignore
return options.transaction && !options.transaction.finished
? options.transaction
: await sequelize.transaction();
: await options.plugin.db.sequelize.transaction();
}
async prepare(commit?: boolean) {
private async prepare() {
const transaction = await this.getTransaction();
this.transaction = transaction;
@ -97,10 +95,6 @@ export default class Processor {
});
this.makeJobs(jobs);
if (commit) {
await this.commit();
}
}
public async start() {

View File

@ -183,6 +183,41 @@ describe('workflow > Plugin', () => {
});
});
describe('cycling trigger', () => {
it('trigger should not be triggered more than once in same execution', async () => {
const workflow = await WorkflowModel.create({
enabled: true,
type: 'collection',
config: {
mode: 1,
collection: 'posts'
}
});
const n1 = await workflow.createNode({
type: 'create',
config: {
collection: 'posts',
params: {
values: {
title: 't2'
}
}
}
});
const post = await PostRepo.create({ values: { title: 't1' } });
await sleep(500);
const posts = await PostRepo.find();
expect(posts.length).toBe(2);
const [execution] = await workflow.getExecutions();
expect(execution.status).toBe(EXECUTION_STATUS.RESOLVED);
});
});
describe('dispatcher', () => {
it.skip('multiple triggers in same event', async () => {
const w1 = await WorkflowModel.create({
@ -203,15 +238,27 @@ describe('workflow > Plugin', () => {
}
});
const w3 = await WorkflowModel.create({
enabled: true,
type: 'collection',
config: {
mode: 1,
collection: 'posts'
}
});
const p1 = await PostRepo.create({ values: { title: 't1' } });
await sleep(500);
await sleep(1000);
const [e1] = await w1.getExecutions();
expect(e1.status).toBe(EXECUTION_STATUS.RESOLVED);
const [e2] = await w2.getExecutions();
expect(e2.status).toBe(EXECUTION_STATUS.RESOLVED);
const [e3] = await w3.getExecutions();
expect(e3.status).toBe(EXECUTION_STATUS.RESOLVED);
});
it('when server starts, process all created executions', async () => {

View File

@ -425,30 +425,4 @@ describe('workflow > Processor', () => {
expect(jobs.length).toEqual(5);
});
});
describe('cycling trigger', () => {
it('trigger should not be triggered more than once in same execution', async () => {
const n1 = await workflow.createNode({
type: 'create',
config: {
collection: 'posts',
params: {
values: {
title: 't2'
}
}
}
});
const post = await PostRepo.create({ values: { title: 't1' } });
await sleep(500);
const posts = await PostRepo.find();
expect(posts.length).toBe(2);
const [execution] = await workflow.getExecutions();
expect(execution.status).toBe(EXECUTION_STATUS.RESOLVED);
});
});
});

View File

@ -42,10 +42,10 @@ async function handler(this: CollectionTrigger, workflow: WorkflowModel, data: M
// NOTE: if no configured fields changed, do not trigger
if (changed
&& changed.length
&& changed
.filter(name => !['linkTo', 'hasOne', 'hasMany', 'belongsToMany'].includes(collection.getField(name).type))
.every(name => !data.changedWithAssociations(getFieldRawName(collection, name)))
&& changed.length
&& changed
.filter(name => !['linkTo', 'hasOne', 'hasMany', 'belongsToMany'].includes(collection.getField(name).type))
.every(name => !data.changedWithAssociations(getFieldRawName(collection, name)))
) {
return;
}
@ -70,10 +70,8 @@ async function handler(this: CollectionTrigger, workflow: WorkflowModel, data: M
}
}
setTimeout(() => {
this.plugin.trigger(workflow, { data: data.get() }, {
context
});
this.plugin.trigger(workflow, { data: data.get() }, {
context
});
}