From 0288243a330239a1b15eba237b871d9df4c9dd3c Mon Sep 17 00:00:00 2001 From: Junyi Date: Tue, 15 Nov 2022 13:35:10 +0800 Subject: [PATCH] fix(plugin-workflow): fix transaction chain in trigger (#1089) --- .../plugins/workflow/src/server/Plugin.ts | 36 +++++++++---------- .../plugins/workflow/src/server/Processor.ts | 2 -- .../src/server/instructions/create.ts | 4 ++- .../src/server/instructions/destroy.ts | 4 ++- .../workflow/src/server/instructions/query.ts | 4 ++- .../src/server/instructions/update.ts | 4 ++- .../src/server/triggers/collection.ts | 14 +++----- 7 files changed, 33 insertions(+), 35 deletions(-) diff --git a/packages/plugins/workflow/src/server/Plugin.ts b/packages/plugins/workflow/src/server/Plugin.ts index 5b98e64568..a315e1b591 100644 --- a/packages/plugins/workflow/src/server/Plugin.ts +++ b/packages/plugins/workflow/src/server/Plugin.ts @@ -135,32 +135,30 @@ export default class WorkflowPlugin extends Plugin { return null; } + if (options.context?.executionId) { + // NOTE: no transaction here for read-uncommitted execution + const existed = await workflow.countExecutions({ + where: { + id: options.context.executionId + } + }); + + if (existed) { + console.warn(`workflow ${workflow.id} has already been triggered in same execution (${options.context.executionId}), and newly triggering will be skipped.`); + return; + } + } + // @ts-ignore const transaction = options.transaction && !options.transaction.finished ? options.transaction : await (workflow.constructor).database.sequelize.transaction(); - if (options.context?.transaction) { - const existed = await workflow.countExecutions({ - where: { - transaction: options.context.transaction - }, - transaction - }); - - if (existed) { - console.warn(`workflow ${workflow.id} has already been triggered in same execution (${options.context.transaction}), and newly triggering will be skipped.`); - return; - } - } - const execution = await workflow.createExecution({ context, key: workflow.key, status: EXECUTION_STATUS.CREATED, useTransaction: workflow.useTransaction, - // @ts-ignore - transaction: options.context?.transaction ?? transaction?.id }, { transaction }); console.log('workflow triggered:', new Date(), workflow.id, execution.id); @@ -193,7 +191,7 @@ export default class WorkflowPlugin extends Plugin { await transaction.commit(); } - setTimeout(() => this.dispatch(execution), 0); + setTimeout(() => this.dispatch(execution)); } public async resume(job) { @@ -201,7 +199,7 @@ export default class WorkflowPlugin extends Plugin { job.execution = await job.getExecution(); } - setTimeout(() => this.dispatch(job.execution, job), 0); + setTimeout(() => this.dispatch(job.execution, job)); } private async dispatch(execution?: ExecutionModel, job?: JobModel) { @@ -228,7 +226,7 @@ export default class WorkflowPlugin extends Plugin { await execution.update({ status: EXECUTION_STATUS.STARTED }); } - const processor = this.createProcessor(execution, { _context: { transaction: execution.transaction} }); + const processor = this.createProcessor(execution); this.executing = job ? processor.resume(job) : processor.start(); diff --git a/packages/plugins/workflow/src/server/Processor.ts b/packages/plugins/workflow/src/server/Processor.ts index b514ed6376..627ddefe0b 100644 --- a/packages/plugins/workflow/src/server/Processor.ts +++ b/packages/plugins/workflow/src/server/Processor.ts @@ -13,8 +13,6 @@ import { EXECUTION_STATUS, JOB_STATUS } from './constants'; export interface ProcessorOptions extends Transactionable { - // TODO(temp): pass request context here for $isVar and other operators - _context?: any; plugin: Plugin } diff --git a/packages/plugins/workflow/src/server/instructions/create.ts b/packages/plugins/workflow/src/server/instructions/create.ts index 4704ec614b..0eec5550fb 100644 --- a/packages/plugins/workflow/src/server/instructions/create.ts +++ b/packages/plugins/workflow/src/server/instructions/create.ts @@ -12,7 +12,9 @@ export default { const options = processor.getParsedValue(params); const result = await repo.create({ ...options, - context: processor.options._context, + context: { + executionId: processor.execution.id + }, transaction: processor.transaction }); diff --git a/packages/plugins/workflow/src/server/instructions/destroy.ts b/packages/plugins/workflow/src/server/instructions/destroy.ts index 310bd8ba8a..a0af6e0a88 100644 --- a/packages/plugins/workflow/src/server/instructions/destroy.ts +++ b/packages/plugins/workflow/src/server/instructions/destroy.ts @@ -12,7 +12,9 @@ export default { const options = processor.getParsedValue(params); const result = await repo.destroy({ ...options, - context: processor.options._context, + context: { + executionId: processor.execution.id + }, transaction: processor.transaction }); diff --git a/packages/plugins/workflow/src/server/instructions/query.ts b/packages/plugins/workflow/src/server/instructions/query.ts index 280cbcab4e..f79a3bd18d 100644 --- a/packages/plugins/workflow/src/server/instructions/query.ts +++ b/packages/plugins/workflow/src/server/instructions/query.ts @@ -14,7 +14,9 @@ export default { const options = processor.getParsedValue(params); const result = await (multiple ? repo.find : repo.findOne).call(repo, { ...options, - context: processor.options._context, + context: { + executionId: processor.execution.id + }, transaction: processor.transaction }); diff --git a/packages/plugins/workflow/src/server/instructions/update.ts b/packages/plugins/workflow/src/server/instructions/update.ts index cac6de677f..127cd784c2 100644 --- a/packages/plugins/workflow/src/server/instructions/update.ts +++ b/packages/plugins/workflow/src/server/instructions/update.ts @@ -14,7 +14,9 @@ export default { const options = processor.getParsedValue(params); const result = await repo.update({ ...options, - context: processor.options._context, + context: { + executionId: processor.execution.id + }, transaction: processor.transaction }); diff --git a/packages/plugins/workflow/src/server/triggers/collection.ts b/packages/plugins/workflow/src/server/triggers/collection.ts index c1ce705227..aeb6605dfd 100644 --- a/packages/plugins/workflow/src/server/triggers/collection.ts +++ b/packages/plugins/workflow/src/server/triggers/collection.ts @@ -70,16 +70,10 @@ async function handler(this: CollectionTrigger, workflow: WorkflowModel, data: M } } - // TODO(bug): use setTimeout here test case will not exit (only work in SQLite)? - // setTimeout(() => { - // this.plugin.trigger(workflow, { data: data.get() }, { - // context - // }); - // }, 0); - - await this.plugin.trigger(workflow, { data: data.get() }, { - context, - transaction + setTimeout(() => { + this.plugin.trigger(workflow, { data: data.get() }, { + context + }); }); }