fix(plugin-workflow): try to avoid occasionally duplicated executions (#2196)

* fix(plugin-workflow): try to avoid occasionally duplicated executions

* test(plugin-workflow): avoid appends error and add more test cases

* test(plugin-workflow): fix type and adjust waiting time for test cases

* fix(plugin-workflow): refactor delay logic and fix failed cases
This commit is contained in:
Junyi 2023-07-07 13:07:05 +07:00 committed by GitHub
parent 09d5477983
commit 50786621bb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 76 additions and 28 deletions

View File

@ -26,7 +26,7 @@ export default class WorkflowPlugin extends Plugin {
instructions: Registry<Instruction> = new Registry();
triggers: Registry<Trigger> = new Registry();
functions: Registry<CustomFunction> = new Registry();
private executing: ExecutionModel | null = null;
private executing = false;
private pending: Pending[] = [];
private events: [WorkflowModel, any, { context?: any }][] = [];
@ -253,9 +253,6 @@ export default class WorkflowPlugin extends Plugin {
{ transaction },
);
const executed = await workflow.countExecutions({ transaction });
// NOTE: not to trigger afterUpdate hook here
await workflow.increment('executed', { transaction });
await (<typeof WorkflowModel>workflow.constructor).increment('allExecuted', {
@ -301,6 +298,8 @@ export default class WorkflowPlugin extends Plugin {
return;
}
this.executing = true;
let next: Pending | null = null;
// resuming has high priority
if (this.pending.length) {
@ -310,20 +309,21 @@ export default class WorkflowPlugin extends Plugin {
filter: {
status: EXECUTION_STATUS.QUEUEING,
},
appends: ['workflow'],
sort: 'createdAt',
})) as ExecutionModel;
if (execution) {
if (execution && execution.workflow.enabled) {
next = [execution];
}
}
if (next) {
this.process(...next);
} else {
this.executing = false;
}
}
private async process(execution: ExecutionModel, job?: JobModel) {
this.executing = execution;
if (execution.status === EXECUTION_STATUS.QUEUEING) {
await execution.update({ status: EXECUTION_STATUS.STARTED });
}
@ -341,7 +341,7 @@ export default class WorkflowPlugin extends Plugin {
this.getLogger(execution.workflowId).error(`execution (${execution.id}) error: ${err.message}`, err);
}
this.executing = null;
this.executing = false;
this.dispatch();
}

View File

@ -318,6 +318,27 @@ describe('workflow > Plugin', () => {
expect(e3.status).toBe(EXECUTION_STATUS.RESOLVED);
});
it('multiple events on same workflow', async () => {
const w1 = await WorkflowModel.create({
enabled: true,
type: 'collection',
config: {
mode: 1,
collection: 'posts',
},
});
const p1 = await PostRepo.create({ values: { title: 't1' } });
const p2 = await PostRepo.create({ values: { title: 't2' } });
const p3 = await PostRepo.create({ values: { title: 't3' } });
await sleep(1000);
const executions = await w1.getExecutions();
expect(executions.length).toBe(3);
expect(executions.map((item) => item.status)).toEqual(Array(3).fill(EXECUTION_STATUS.RESOLVED));
});
it('when server starts, process all created executions', async () => {
const w1 = await WorkflowModel.create({
enabled: true,
@ -348,8 +369,30 @@ describe('workflow > Plugin', () => {
await sleep(500);
const [e2] = await w1.getExecutions();
expect(e2.status).toBe(EXECUTION_STATUS.RESOLVED);
await e1.reload();
expect(e1.status).toBe(EXECUTION_STATUS.RESOLVED);
await w1.update({ enabled: false });
await app.stop();
await db.reconnect();
const e2 = await ExecutionModel.create({
workflowId: w1.id,
key: w1.key,
useTransaction: w1.useTransaction,
context: {
data: p1.get(),
},
});
await app.start();
await sleep(500);
await e2.reload();
expect(e2.status).toBe(EXECUTION_STATUS.QUEUEING);
});
});
});

View File

@ -32,7 +32,7 @@ describe('workflow > instructions > request', () => {
ctx.withoutDataWrapping = true;
ctx.body = {
meta: { title: ctx.query.title },
data: { title: ctx.request.body.title },
data: { title: ctx.request.body['title'] },
};
}
next();

View File

@ -16,11 +16,11 @@ export default class implements Instruction {
timers: Map<number, NodeJS.Timeout> = new Map();
constructor(protected plugin: Plugin) {
plugin.app.on('beforeStart', () => this.load());
plugin.app.on('beforeStop', () => this.unload());
plugin.app.on('beforeStart', this.load);
plugin.app.on('beforeStop', this.unload);
}
async load() {
load = async () => {
const { model } = this.plugin.db.getCollection('jobs');
const jobs = (await model.findAll({
where: {
@ -47,31 +47,36 @@ export default class implements Instruction {
})) as JobModel[];
jobs.forEach((job) => {
this.schedule(job, job.node!.config.duration);
this.schedule(job);
});
}
};
unload() {
unload = () => {
for (const timer of this.timers.values()) {
clearTimeout(timer);
}
this.timers = new Map();
}
};
schedule(job, duration: number) {
schedule(job) {
const now = new Date();
const createdAt = Date.parse(job.createdAt);
const delay = createdAt + duration - now.getTime();
const trigger = this.trigger.bind(this, job);
this.timers.set(job.id, setTimeout(trigger, Math.max(0, delay)));
const delay = createdAt + job.node.config.duration - now.getTime();
if (delay > 0) {
const trigger = this.trigger.bind(this, job);
this.timers.set(job.id, setTimeout(trigger, delay));
} else {
this.trigger(job);
}
}
async trigger(job) {
const execution = (await job.getExecution()) as ExecutionModel;
if (execution.status === EXECUTION_STATUS.STARTED) {
job.execution = execution;
await this.plugin.resume(job);
if (!job.execution) {
job.execution = await job.getExecution();
}
if (job.execution.status === EXECUTION_STATUS.STARTED) {
this.plugin.resume(job);
}
if (this.timers.get(job.id)) {
this.timers.delete(job.id);
@ -85,10 +90,10 @@ export default class implements Instruction {
nodeId: node.id,
upstreamId: prevJob?.id ?? null,
});
job.node = node;
const { duration } = node.config as DelayConfig;
// add to schedule
this.schedule(job, duration);
this.schedule(job);
return processor.exit();
};