diff --git a/packages/plugins/workflow/src/server/Plugin.ts b/packages/plugins/workflow/src/server/Plugin.ts index e426025a0b..e20429b369 100644 --- a/packages/plugins/workflow/src/server/Plugin.ts +++ b/packages/plugins/workflow/src/server/Plugin.ts @@ -161,7 +161,9 @@ export default class WorkflowPlugin extends Plugin { workflows.forEach((workflow: WorkflowModel) => { this.toggle(workflow); }); + }); + this.app.on('afterStart', () => { // check for not started executions this.dispatch(); }); diff --git a/packages/plugins/workflow/src/server/Processor.ts b/packages/plugins/workflow/src/server/Processor.ts index 95ba65d4a8..2a6618b6a9 100644 --- a/packages/plugins/workflow/src/server/Processor.ts +++ b/packages/plugins/workflow/src/server/Processor.ts @@ -109,7 +109,7 @@ export default class Processor { const head = this.nodes.find((item) => !item.upstream); await this.run(head, { result: execution.context }); } else { - await this.exit(null); + await this.exit(JOB_STATUS.RESOLVED); } } @@ -192,7 +192,7 @@ export default class Processor { } // parent node should take over the control - public async end(node, job) { + public async end(node, job: JobModel) { this.logger.debug(`branch ended at node (${node.id})`); const parentNode = this.findBranchParentNode(node); // no parent, means on main flow @@ -204,7 +204,7 @@ export default class Processor { // really done for all nodes // * should mark execution as done with last job status - return this.exit(job); + return this.exit(job.status); } async recall(node, job) { @@ -217,12 +217,12 @@ export default class Processor { return this.exec(instruction.resume.bind(instruction), node, job); } - async exit(job: JobModel | null) { - const status = job - ? (this.constructor).StatusMap[job.status] ?? Math.sign(job.status) - : EXECUTION_STATUS.RESOLVED; - this.logger.info(`execution (${this.execution.id}) all nodes finished, finishing execution...`); - await this.execution.update({ status }, { transaction: this.transaction }); + async exit(s?: number) { + if (typeof s === 'number') { + const status = (this.constructor).StatusMap[s] ?? Math.sign(s); + await this.execution.update({ status }, { transaction: this.transaction }); + } + this.logger.info(`execution (${this.execution.id}) exiting with status ${this.execution.status}`); await this.commit(); return null; } @@ -235,9 +235,8 @@ export default class Processor { if (payload instanceof model) { job = await payload.save({ transaction: this.transaction }); } else if (payload.id) { - [job] = await model.update(payload, { - where: { id: payload.id }, - returning: true, + job = await model.findByPk(payload.id); + await job.update(payload, { transaction: this.transaction, }); } else { diff --git a/packages/plugins/workflow/src/server/instructions/delay.ts b/packages/plugins/workflow/src/server/instructions/delay.ts index 6dafe578f6..93bae78a62 100644 --- a/packages/plugins/workflow/src/server/instructions/delay.ts +++ b/packages/plugins/workflow/src/server/instructions/delay.ts @@ -90,7 +90,7 @@ export default class implements Instruction { // add to schedule this.schedule(job, duration); - return processor.end(node, job); + return processor.exit(); }; resume = async (node, prevJob, processor: Processor) => { diff --git a/packages/plugins/workflow/src/server/instructions/loop.ts b/packages/plugins/workflow/src/server/instructions/loop.ts index 9c703b0792..596ed631d2 100644 --- a/packages/plugins/workflow/src/server/instructions/loop.ts +++ b/packages/plugins/workflow/src/server/instructions/loop.ts @@ -57,7 +57,7 @@ export default { const { result, status } = job; // if loop has been done (resolved / rejected), do not care newly executed branch jobs. if (status !== JOB_STATUS.PENDING) { - return null; + return processor.exit(); } const nextIndex = result + 1; diff --git a/packages/plugins/workflow/src/server/instructions/manual/actions.ts b/packages/plugins/workflow/src/server/instructions/manual/actions.ts index 16804fe5e8..f2a7a32990 100644 --- a/packages/plugins/workflow/src/server/instructions/manual/actions.ts +++ b/packages/plugins/workflow/src/server/instructions/manual/actions.ts @@ -65,7 +65,7 @@ export async function submit(context: Context, next) { await userJob.save({ transaction: processor.transaction }); - await processor.exit(userJob.job); + await processor.exit(); context.body = userJob; context.status = 202; diff --git a/packages/plugins/workflow/src/server/instructions/parallel.ts b/packages/plugins/workflow/src/server/instructions/parallel.ts index 4018c44442..d0cda6a8bf 100644 --- a/packages/plugins/workflow/src/server/instructions/parallel.ts +++ b/packages/plugins/workflow/src/server/instructions/parallel.ts @@ -96,7 +96,7 @@ export default { const { result, status } = job; // if parallel has been done (resolved / rejected), do not care newly executed branch jobs. if (status !== JOB_STATUS.PENDING) { - return null; + return processor.exit(); } // find the index of the node which start the branch @@ -114,7 +114,7 @@ export default { if (job.status === JOB_STATUS.PENDING) { await job.save({ transaction: processor.transaction }); - return processor.end(node, job); + return processor.exit(); } return job; diff --git a/packages/plugins/workflow/src/server/instructions/request.ts b/packages/plugins/workflow/src/server/instructions/request.ts index 5b1db106a4..41b950f084 100644 --- a/packages/plugins/workflow/src/server/instructions/request.ts +++ b/packages/plugins/workflow/src/server/instructions/request.ts @@ -67,13 +67,13 @@ export default class implements Instruction { }); }) .finally(() => { - this.plugin.app.logger.info(`[Workflow] request (#${node.id}) response received, status: ${job.get('status')}`); + processor.logger.info(`request (#${node.id}) response received, status: ${job.get('status')}`); this.plugin.resume(job); }); - this.plugin.app.logger.info(`[Workflow] request (#${node.id}) sent to "${config.url}", waiting for response...`); + processor.logger.info(`request (#${node.id}) sent to "${config.url}", waiting for response...`); - return null; + return processor.exit(); } async resume(node: FlowNodeModel, job, processor: Processor) {