fix(plugin-workflow): fix transaction chain in trigger (#1089)

This commit is contained in:
Junyi 2022-11-15 13:35:10 +08:00 committed by GitHub
parent 5cc83111db
commit 0288243a33
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 33 additions and 35 deletions

View File

@ -135,32 +135,30 @@ export default class WorkflowPlugin extends Plugin {
return null; return null;
} }
if (options.context?.executionId) {
// NOTE: no transaction here for read-uncommitted execution
const existed = await workflow.countExecutions({
where: {
id: options.context.executionId
}
});
if (existed) {
console.warn(`workflow ${workflow.id} has already been triggered in same execution (${options.context.executionId}), and newly triggering will be skipped.`);
return;
}
}
// @ts-ignore // @ts-ignore
const transaction = options.transaction && !options.transaction.finished const transaction = options.transaction && !options.transaction.finished
? options.transaction ? options.transaction
: await (<typeof WorkflowModel>workflow.constructor).database.sequelize.transaction(); : await (<typeof WorkflowModel>workflow.constructor).database.sequelize.transaction();
if (options.context?.transaction) {
const existed = await workflow.countExecutions({
where: {
transaction: options.context.transaction
},
transaction
});
if (existed) {
console.warn(`workflow ${workflow.id} has already been triggered in same execution (${options.context.transaction}), and newly triggering will be skipped.`);
return;
}
}
const execution = await workflow.createExecution({ const execution = await workflow.createExecution({
context, context,
key: workflow.key, key: workflow.key,
status: EXECUTION_STATUS.CREATED, status: EXECUTION_STATUS.CREATED,
useTransaction: workflow.useTransaction, useTransaction: workflow.useTransaction,
// @ts-ignore
transaction: options.context?.transaction ?? transaction?.id
}, { transaction }); }, { transaction });
console.log('workflow triggered:', new Date(), workflow.id, execution.id); console.log('workflow triggered:', new Date(), workflow.id, execution.id);
@ -193,7 +191,7 @@ export default class WorkflowPlugin extends Plugin {
await transaction.commit(); await transaction.commit();
} }
setTimeout(() => this.dispatch(execution), 0); setTimeout(() => this.dispatch(execution));
} }
public async resume(job) { public async resume(job) {
@ -201,7 +199,7 @@ export default class WorkflowPlugin extends Plugin {
job.execution = await job.getExecution(); job.execution = await job.getExecution();
} }
setTimeout(() => this.dispatch(job.execution, job), 0); setTimeout(() => this.dispatch(job.execution, job));
} }
private async dispatch(execution?: ExecutionModel, job?: JobModel) { private async dispatch(execution?: ExecutionModel, job?: JobModel) {
@ -228,7 +226,7 @@ export default class WorkflowPlugin extends Plugin {
await execution.update({ status: EXECUTION_STATUS.STARTED }); await execution.update({ status: EXECUTION_STATUS.STARTED });
} }
const processor = this.createProcessor(execution, { _context: { transaction: execution.transaction} }); const processor = this.createProcessor(execution);
this.executing = job ? processor.resume(job) : processor.start(); this.executing = job ? processor.resume(job) : processor.start();

View File

@ -13,8 +13,6 @@ import { EXECUTION_STATUS, JOB_STATUS } from './constants';
export interface ProcessorOptions extends Transactionable { export interface ProcessorOptions extends Transactionable {
// TODO(temp): pass request context here for $isVar and other operators
_context?: any;
plugin: Plugin plugin: Plugin
} }

View File

@ -12,7 +12,9 @@ export default {
const options = processor.getParsedValue(params); const options = processor.getParsedValue(params);
const result = await repo.create({ const result = await repo.create({
...options, ...options,
context: processor.options._context, context: {
executionId: processor.execution.id
},
transaction: processor.transaction transaction: processor.transaction
}); });

View File

@ -12,7 +12,9 @@ export default {
const options = processor.getParsedValue(params); const options = processor.getParsedValue(params);
const result = await repo.destroy({ const result = await repo.destroy({
...options, ...options,
context: processor.options._context, context: {
executionId: processor.execution.id
},
transaction: processor.transaction transaction: processor.transaction
}); });

View File

@ -14,7 +14,9 @@ export default {
const options = processor.getParsedValue(params); const options = processor.getParsedValue(params);
const result = await (multiple ? repo.find : repo.findOne).call(repo, { const result = await (multiple ? repo.find : repo.findOne).call(repo, {
...options, ...options,
context: processor.options._context, context: {
executionId: processor.execution.id
},
transaction: processor.transaction transaction: processor.transaction
}); });

View File

@ -14,7 +14,9 @@ export default {
const options = processor.getParsedValue(params); const options = processor.getParsedValue(params);
const result = await repo.update({ const result = await repo.update({
...options, ...options,
context: processor.options._context, context: {
executionId: processor.execution.id
},
transaction: processor.transaction transaction: processor.transaction
}); });

View File

@ -70,16 +70,10 @@ async function handler(this: CollectionTrigger, workflow: WorkflowModel, data: M
} }
} }
// TODO(bug): use setTimeout here test case will not exit (only work in SQLite)? setTimeout(() => {
// setTimeout(() => { this.plugin.trigger(workflow, { data: data.get() }, {
// this.plugin.trigger(workflow, { data: data.get() }, { context
// context });
// });
// }, 0);
await this.plugin.trigger(workflow, { data: data.get() }, {
context,
transaction
}); });
} }