2022-02-01 04:04:08 +00:00
|
|
|
import FlowNodeModel from "../models/FlowNode";
|
|
|
|
import JobModel from "../models/Job";
|
2022-06-20 15:29:21 +00:00
|
|
|
import Processor from "../Processor";
|
|
|
|
import { JOB_STATUS } from "../constants";
|
2022-02-01 04:04:08 +00:00
|
|
|
|
|
|
|
export const PARALLEL_MODE = {
|
|
|
|
ALL: 'all',
|
|
|
|
ANY: 'any',
|
|
|
|
RACE: 'race'
|
|
|
|
} as const;
|
|
|
|
|
2022-06-28 09:00:19 +00:00
|
|
|
const Modes = {
|
|
|
|
[PARALLEL_MODE.ALL]: {
|
|
|
|
next(previous) {
|
2023-02-20 03:52:06 +00:00
|
|
|
return previous.status >= JOB_STATUS.PENDING;
|
2022-06-28 09:00:19 +00:00
|
|
|
},
|
|
|
|
getStatus(result) {
|
2023-02-20 03:52:06 +00:00
|
|
|
const failedStatus = result.find(status => status != null && status < JOB_STATUS.PENDING)
|
|
|
|
if (typeof failedStatus !== 'undefined') {
|
|
|
|
return failedStatus;
|
2022-06-28 09:00:19 +00:00
|
|
|
}
|
|
|
|
if (result.every(status => status != null && status === JOB_STATUS.RESOLVED)) {
|
|
|
|
return JOB_STATUS.RESOLVED;
|
|
|
|
}
|
|
|
|
return JOB_STATUS.PENDING;
|
2022-02-01 04:04:08 +00:00
|
|
|
}
|
|
|
|
},
|
2022-06-28 09:00:19 +00:00
|
|
|
[PARALLEL_MODE.ANY]: {
|
|
|
|
next(previous) {
|
2023-02-20 03:52:06 +00:00
|
|
|
return previous.status <= JOB_STATUS.PENDING;
|
2022-06-28 09:00:19 +00:00
|
|
|
},
|
|
|
|
getStatus(result) {
|
|
|
|
if (result.some(status => status != null && status === JOB_STATUS.RESOLVED)) {
|
|
|
|
return JOB_STATUS.RESOLVED;
|
|
|
|
}
|
|
|
|
if (result.some(status => status != null ? status === JOB_STATUS.PENDING : true)) {
|
|
|
|
return JOB_STATUS.PENDING;
|
|
|
|
}
|
2023-02-20 03:52:06 +00:00
|
|
|
return JOB_STATUS.FAILED;
|
2022-06-28 09:00:19 +00:00
|
|
|
}
|
2022-02-01 04:04:08 +00:00
|
|
|
},
|
2022-06-28 09:00:19 +00:00
|
|
|
[PARALLEL_MODE.RACE]: {
|
|
|
|
next(previous) {
|
|
|
|
return previous.status === JOB_STATUS.PENDING;
|
|
|
|
},
|
|
|
|
getStatus(result) {
|
|
|
|
if (result.some(status => status != null && status === JOB_STATUS.RESOLVED)) {
|
|
|
|
return JOB_STATUS.RESOLVED;
|
|
|
|
}
|
2023-02-20 03:52:06 +00:00
|
|
|
const failedStatus = result.find(status => status != null && status < JOB_STATUS.PENDING);
|
|
|
|
if (typeof failedStatus !== 'undefined') {
|
|
|
|
return failedStatus;
|
2022-06-28 09:00:19 +00:00
|
|
|
}
|
|
|
|
return JOB_STATUS.PENDING;
|
|
|
|
}
|
2022-02-01 04:04:08 +00:00
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
export default {
|
2022-06-24 15:28:49 +00:00
|
|
|
async run(node: FlowNodeModel, prevJob: JobModel, processor: Processor) {
|
2022-06-28 09:00:19 +00:00
|
|
|
const branches = processor.getBranches(node);
|
2022-02-01 04:04:08 +00:00
|
|
|
|
2022-06-20 15:29:21 +00:00
|
|
|
const job = await processor.saveJob({
|
2022-02-01 04:04:08 +00:00
|
|
|
status: JOB_STATUS.PENDING,
|
|
|
|
result: Array(branches.length).fill(null),
|
2022-06-24 15:28:49 +00:00
|
|
|
nodeId: node.id,
|
2022-02-01 04:04:08 +00:00
|
|
|
upstreamId: prevJob?.id ?? null
|
|
|
|
});
|
|
|
|
|
|
|
|
// NOTE:
|
|
|
|
// use `reduce` but not `Promise.all` here to avoid racing manupulating db.
|
|
|
|
// for users, this is almost equivalent to `Promise.all`,
|
|
|
|
// because of the delay is not significant sensible.
|
2022-06-20 15:29:21 +00:00
|
|
|
// another benifit of this is, it could handle sequenced branches in future.
|
2022-06-28 09:00:19 +00:00
|
|
|
const { mode = PARALLEL_MODE.ALL } = node.config;
|
|
|
|
await branches.reduce((promise: Promise<any>, branch, i) =>
|
|
|
|
promise.then((previous) => {
|
|
|
|
if (i && !Modes[mode].next(previous)) {
|
|
|
|
return Promise.resolve(previous);
|
|
|
|
}
|
|
|
|
return processor.run(branch, job);
|
|
|
|
}), Promise.resolve());
|
2022-02-01 04:04:08 +00:00
|
|
|
|
2022-06-24 15:28:49 +00:00
|
|
|
return processor.end(node, job);
|
2022-02-01 04:04:08 +00:00
|
|
|
},
|
|
|
|
|
2022-06-24 15:28:49 +00:00
|
|
|
async resume(node: FlowNodeModel, branchJob, processor: Processor) {
|
2023-02-20 03:52:06 +00:00
|
|
|
const job = processor.findBranchParentJob(branchJob, node) as JobModel;
|
2022-02-01 04:04:08 +00:00
|
|
|
|
|
|
|
const { result, status } = job;
|
|
|
|
// if parallel has been done (resolved / rejected), do not care newly executed branch jobs.
|
|
|
|
if (status !== JOB_STATUS.PENDING) {
|
|
|
|
return null;
|
|
|
|
}
|
|
|
|
|
|
|
|
// find the index of the node which start the branch
|
2023-02-20 03:52:06 +00:00
|
|
|
const jobNode = processor.nodesMap.get(branchJob.nodeId) as FlowNodeModel;
|
|
|
|
const branchStartNode = processor.findBranchStartNode(jobNode, node) as FlowNodeModel;
|
2022-06-28 09:00:19 +00:00
|
|
|
const branches = processor.getBranches(node);
|
|
|
|
const branchIndex = branches.indexOf(branchStartNode);
|
2022-06-24 15:28:49 +00:00
|
|
|
const { mode = PARALLEL_MODE.ALL } = node.config || {};
|
2022-02-01 04:04:08 +00:00
|
|
|
|
2022-06-28 09:00:19 +00:00
|
|
|
const newResult = [...result.slice(0, branchIndex), branchJob.status, ...result.slice(branchIndex + 1)];
|
2022-02-01 04:04:08 +00:00
|
|
|
job.set({
|
|
|
|
result: newResult,
|
2022-06-28 09:00:19 +00:00
|
|
|
status: Modes[mode].getStatus(newResult)
|
2022-02-01 04:04:08 +00:00
|
|
|
});
|
|
|
|
|
|
|
|
if (job.status === JOB_STATUS.PENDING) {
|
2022-06-20 15:29:21 +00:00
|
|
|
await job.save({ transaction: processor.transaction });
|
2022-06-24 15:28:49 +00:00
|
|
|
return processor.end(node, job);
|
2022-02-01 04:04:08 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
return job;
|
|
|
|
}
|
2022-02-27 14:58:41 +00:00
|
|
|
};
|