fix(plugin-workflow): fix branch and exit logic (#2103)

This commit is contained in:
Junyi 2023-06-23 22:21:19 +07:00 committed by GitHub
parent 20f673a6b3
commit 19341952b9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 21 additions and 20 deletions

View File

@ -161,7 +161,9 @@ export default class WorkflowPlugin extends Plugin {
workflows.forEach((workflow: WorkflowModel) => {
this.toggle(workflow);
});
});
this.app.on('afterStart', () => {
// check for not started executions
this.dispatch();
});

View File

@ -109,7 +109,7 @@ export default class Processor {
const head = this.nodes.find((item) => !item.upstream);
await this.run(head, { result: execution.context });
} else {
await this.exit(null);
await this.exit(JOB_STATUS.RESOLVED);
}
}
@ -192,7 +192,7 @@ export default class Processor {
}
// parent node should take over the control
public async end(node, job) {
public async end(node, job: JobModel) {
this.logger.debug(`branch ended at node (${node.id})`);
const parentNode = this.findBranchParentNode(node);
// no parent, means on main flow
@ -204,7 +204,7 @@ export default class Processor {
// really done for all nodes
// * should mark execution as done with last job status
return this.exit(job);
return this.exit(job.status);
}
async recall(node, job) {
@ -217,12 +217,12 @@ export default class Processor {
return this.exec(instruction.resume.bind(instruction), node, job);
}
async exit(job: JobModel | null) {
const status = job
? (<typeof Processor>this.constructor).StatusMap[job.status] ?? Math.sign(job.status)
: EXECUTION_STATUS.RESOLVED;
this.logger.info(`execution (${this.execution.id}) all nodes finished, finishing execution...`);
await this.execution.update({ status }, { transaction: this.transaction });
async exit(s?: number) {
if (typeof s === 'number') {
const status = (<typeof Processor>this.constructor).StatusMap[s] ?? Math.sign(s);
await this.execution.update({ status }, { transaction: this.transaction });
}
this.logger.info(`execution (${this.execution.id}) exiting with status ${this.execution.status}`);
await this.commit();
return null;
}
@ -235,9 +235,8 @@ export default class Processor {
if (payload instanceof model) {
job = await payload.save({ transaction: this.transaction });
} else if (payload.id) {
[job] = await model.update(payload, {
where: { id: payload.id },
returning: true,
job = await model.findByPk(payload.id);
await job.update(payload, {
transaction: this.transaction,
});
} else {

View File

@ -90,7 +90,7 @@ export default class implements Instruction {
// add to schedule
this.schedule(job, duration);
return processor.end(node, job);
return processor.exit();
};
resume = async (node, prevJob, processor: Processor) => {

View File

@ -57,7 +57,7 @@ export default {
const { result, status } = job;
// if loop has been done (resolved / rejected), do not care newly executed branch jobs.
if (status !== JOB_STATUS.PENDING) {
return null;
return processor.exit();
}
const nextIndex = result + 1;

View File

@ -65,7 +65,7 @@ export async function submit(context: Context, next) {
await userJob.save({ transaction: processor.transaction });
await processor.exit(userJob.job);
await processor.exit();
context.body = userJob;
context.status = 202;

View File

@ -96,7 +96,7 @@ export default {
const { result, status } = job;
// if parallel has been done (resolved / rejected), do not care newly executed branch jobs.
if (status !== JOB_STATUS.PENDING) {
return null;
return processor.exit();
}
// find the index of the node which start the branch
@ -114,7 +114,7 @@ export default {
if (job.status === JOB_STATUS.PENDING) {
await job.save({ transaction: processor.transaction });
return processor.end(node, job);
return processor.exit();
}
return job;

View File

@ -67,13 +67,13 @@ export default class implements Instruction {
});
})
.finally(() => {
this.plugin.app.logger.info(`[Workflow] request (#${node.id}) response received, status: ${job.get('status')}`);
processor.logger.info(`request (#${node.id}) response received, status: ${job.get('status')}`);
this.plugin.resume(job);
});
this.plugin.app.logger.info(`[Workflow] request (#${node.id}) sent to "${config.url}", waiting for response...`);
processor.logger.info(`request (#${node.id}) sent to "${config.url}", waiting for response...`);
return null;
return processor.exit();
}
async resume(node: FlowNodeModel, job, processor: Processor) {