feat(plugin-workflow): add delay node type (#532)

* feat(plugin-workflow): add delay node type

* fix(plugin-workflow): fix test and add cases

* fix(plugin-workflow): fix processor.saveJob()
This commit is contained in:
Junyi 2022-06-24 23:28:49 +08:00 committed by GitHub
parent 3dce31f6a1
commit ef939b4277
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 401 additions and 61 deletions

View File

@ -553,6 +553,13 @@ export default {
'Continue after all branches succeeded': '全部分支都成功后才能继续',
'Continue after any branch succeeded': '任意分支成功后就继续',
'Delay': '延时',
'Duration': '时长',
'End Status': '到时状态',
'Select status': '选择状态',
'Succeed and continue': '通过并继续',
'Fail and exit': '失败并退出',
'Create record': '新增数据',
'Update record': '更新数据',
'Query record': '查询数据',

View File

@ -270,14 +270,6 @@ export function Operand({
display: flex;
gap: .5em;
align-items: center;
.ant-select,
.ant-cascader-picker,
.ant-picker,
.ant-input-number,
.ant-input-affix-wrapper{
width: auto;
}
`}>
<Cascader
allowClear={false}

View File

@ -0,0 +1,36 @@
import React from "react";
import { InputNumber, Select } from "antd";
import { useTranslation } from "react-i18next";
import { css } from "@emotion/css";
const UnitOptions = [
{ value: 1_000, label: 'Seconds' },
{ value: 60_000, label: 'Minutes' },
{ value: 3600_000, label: 'Hours' },
{ value: 86400_000, label: 'Days' },
{ value: 604800_000, label: 'Weeks' },
];
function getNumberOption(v) {
return UnitOptions.slice().reverse().find(item => !(v % item.value));
}
export default function ({ value = 60000, onChange }) {
const { t } = useTranslation();
const option = getNumberOption(value);
const quantity = Math.round(value / option.value);
return (
<fieldset className={css`
display: flex;
gap: .5em;
`}>
<InputNumber min={1} value={quantity} onChange={(v) => onChange(Math.round(v * option.value))}/>
<Select value={option.value} onChange={unit => onChange(Math.round(quantity * unit))}>
{UnitOptions.map(item => (
<Select.Option key={item.value} value={item.value}>{t(item.label)}</Select.Option>
))}
</Select>
</fieldset>
);
}

View File

@ -0,0 +1,38 @@
import Duration from "../components/Duration";
export default {
title: '{{t("Delay")}}',
type: 'delay',
group: 'control',
fieldset: {
'config.duration': {
type: 'number',
name: 'config.duration',
title: '{{t("Duration")}}',
'x-decorator': 'FormItem',
'x-component': 'Duration',
},
'config.endStatus': {
type: 'number',
name: 'config.endStatus',
title: '{{t("End Status")}}',
'x-decorator': 'FormItem',
'x-component': 'Select',
'x-component-props': {
placeholder: '{{t("Select status")}}',
},
enum: [
{ label: '{{t("Succeed and continue")}}', value: 1 },
{ label: '{{t("Fail and exit")}}', value: -1 },
]
}
},
view: {
},
scope: {
},
components: {
Duration
}
};

View File

@ -11,11 +11,13 @@ import { AddButton, useFlowContext } from '../WorkflowCanvas';
import calculation from './calculation';
import condition from './condition';
import create from './create';
import destroy from './destroy';
import parallel from './parallel';
import delay from './delay';
import query from './query';
import create from './create';
import update from './update';
import destroy from './destroy';
export interface Instruction {
title: string;
@ -33,13 +35,15 @@ export interface Instruction {
export const instructions = new Registry<Instruction>();
instructions.register('condition', condition);
instructions.register('parallel', parallel);
instructions.register('calculation', calculation);
instructions.register('delay', delay);
instructions.register('query', query);
instructions.register('create', create);
instructions.register('update', update);
instructions.register('destroy', destroy);
instructions.register('condition', condition);
instructions.register('parallel', parallel);
instructions.register('calculation', calculation);
function useUpdateAction() {
const { t } = useTranslation();
@ -220,7 +224,11 @@ export function NodeDefaultView(props) {
'x-component-props': {
disabled: workflow.executed,
className: css`
.ant-select{
.ant-select,
.ant-cascader-picker,
.ant-picker,
.ant-input-number,
.ant-input-affix-wrapper{
width: auto;
}
`

View File

@ -94,8 +94,17 @@ export default class WorkflowPlugin extends Plugin {
this.toggle(workflow);
});
});
// [Life Cycle]: initialize all necessary seed data
// this.app.on('db.init', async () => {});
this.app.on('beforeStop', async () => {
const collection = db.getCollection('workflows');
const workflows = await collection.repository.find({
filter: { enabled: true },
});
workflows.forEach((workflow: WorkflowModel) => {
this.toggle(workflow, false);
});
});
}
toggle(workflow: WorkflowModel, enable?: boolean) {

View File

@ -148,7 +148,7 @@ export default class Processor {
let job;
try {
// call instruction to get result and status
job = await instruction.call(node, prevJob, this);
job = await instruction(node, prevJob, this);
if (!job) {
return null;
}
@ -233,13 +233,23 @@ export default class Processor {
async saveJob(payload) {
const { database } = <typeof ExecutionModel>this.execution.constructor;
const { model } = database.getCollection('jobs');
const [job] = (await model.upsert(
{
let job;
if (payload instanceof model) {
job = await payload.save({ transaction: this.transaction });
} else if (payload.id) {
[job] = await model.update(payload, {
where: { id: payload.id },
returning: true,
transaction: this.transaction
});
} else {
job = await model.create({
...payload,
executionId: this.execution.id,
},
{ transaction: this.transaction },
)) as unknown as [JobModel, boolean | null];
}, {
transaction: this.transaction
});
}
this.jobsMap.set(job.id, job);
this.jobsMapByNodeId[job.nodeId] = job.result;

View File

@ -4,6 +4,7 @@ import { MockServer, mockServer } from '@nocobase/test';
import Plugin from '..';
import { JOB_STATUS } from '../constants';
import calculators from '../calculators';
import { ApplicationOptions } from '@nocobase/server';
export function sleep(ms: number) {
return new Promise(resolve => {
@ -11,13 +12,17 @@ export function sleep(ms: number) {
});
}
export async function getApp(options = {}): Promise<MockServer> {
interface MockAppOptions extends ApplicationOptions {
manual?: boolean;
}
export async function getApp({ manual, ...options }: MockAppOptions = {}): Promise<MockServer> {
const app = mockServer(options);
app.plugin(Plugin, {
instructions: {
echo: {
run({ result }, execution) {
run(node, { result }, execution) {
return {
status: JOB_STATUS.RESOLVED,
result
@ -26,18 +31,18 @@ export async function getApp(options = {}): Promise<MockServer> {
},
error: {
run(input, execution) {
run(node, input, execution) {
throw new Error('definite error');
}
},
'prompt->error': {
run(this, input, execution) {
run(node, input, execution) {
return {
status: JOB_STATUS.PENDING
};
},
resume(this, input, execution) {
resume(node, input, execution) {
throw new Error('input failed');
}
}
@ -60,7 +65,9 @@ export async function getApp(options = {}): Promise<MockServer> {
console.error(error);
}
await app.start();
if (!manual) {
await app.start();
}
return app;
}

View File

@ -0,0 +1,135 @@
import { Application } from '@nocobase/server';
import Database from '@nocobase/database';
import { getApp, sleep } from '..';
import { EXECUTION_STATUS, JOB_STATUS } from '../../constants';
describe('workflow > instructions > delay', () => {
let app: Application;
let db: Database;
let PostRepo;
let WorkflowModel;
let workflow;
beforeEach(async () => {
app = await getApp();
db = app.db;
WorkflowModel = db.getCollection('workflows').model;
PostRepo = db.getCollection('posts').repository;
workflow = await WorkflowModel.create({
enabled: true,
type: 'collection',
config: {
mode: 1,
collection: 'posts'
}
});
});
afterEach(() => app.stop());
describe('runtime', () => {
it('delay to resolved', async () => {
const n1 = await workflow.createNode({
type: 'delay',
config: {
duration: 1000,
endStatus: JOB_STATUS.RESOLVED
}
});
const post = await PostRepo.create({ values: { title: 't1' } });
const [e1] = await workflow.getExecutions();
expect(e1.status).toEqual(EXECUTION_STATUS.STARTED);
const [j1] = await e1.getJobs();
expect(j1.status).toBe(JOB_STATUS.PENDING);
await sleep(2000);
const [e2] = await workflow.getExecutions();
expect(e2.status).toEqual(EXECUTION_STATUS.RESOLVED);
const [j2] = await e2.getJobs();
expect(j2.status).toBe(JOB_STATUS.RESOLVED);
});
it('delay to reject', async () => {
const n1 = await workflow.createNode({
type: 'delay',
config: {
duration: 1000,
endStatus: JOB_STATUS.REJECTED
}
});
const post = await PostRepo.create({ values: { title: 't1' } });
const [e1] = await workflow.getExecutions();
expect(e1.status).toEqual(EXECUTION_STATUS.STARTED);
const [j1] = await e1.getJobs();
expect(j1.status).toBe(JOB_STATUS.PENDING);
await sleep(2000);
const [e2] = await workflow.getExecutions();
expect(e2.status).toEqual(EXECUTION_STATUS.REJECTED);
const [j2] = await e2.getJobs();
expect(j2.status).toBe(JOB_STATUS.REJECTED);
});
});
describe('app lifecycle', () => {
beforeEach(async () => {
await workflow.createNode({
type: 'delay',
config: {
duration: 2000,
endStatus: JOB_STATUS.RESOLVED
}
});
});
it('restart app should trigger delayed job', async () => {
const post = await PostRepo.create({ values: { title: 't1' } });
const [e1] = await workflow.getExecutions();
expect(e1.status).toEqual(EXECUTION_STATUS.STARTED);
const [j1] = await e1.getJobs();
expect(j1.status).toBe(JOB_STATUS.PENDING);
await app.stop();
await sleep(500);
await app.start();
await sleep(2000);
const [e2] = await workflow.getExecutions();
expect(e2.status).toEqual(EXECUTION_STATUS.RESOLVED);
const [j2] = await e2.getJobs();
expect(j2.status).toBe(JOB_STATUS.RESOLVED);
});
it('restart app should trigger missed delayed job', async () => {
const post = await PostRepo.create({ values: { title: 't1' } });
const [e1] = await workflow.getExecutions();
expect(e1.status).toEqual(EXECUTION_STATUS.STARTED);
const [j1] = await e1.getJobs();
expect(j1.status).toBe(JOB_STATUS.PENDING);
await app.stop();
await sleep(2000);
await app.start();
await sleep(500);
const [e2] = await workflow.getExecutions();
expect(e2.status).toEqual(EXECUTION_STATUS.RESOLVED);
const [j2] = await e2.getJobs();
expect(j2.status).toBe(JOB_STATUS.RESOLVED);
});
});
});

View File

@ -30,8 +30,8 @@ import { calculate } from "../calculators";
// }
export default {
async run(this: FlowNodeModel, prevJob, processor) {
const { calculation } = this.config || {};
async run(node: FlowNodeModel, prevJob, processor) {
const { calculation } = node.config || {};
const result = calculation
? calculate({

View File

@ -73,10 +73,10 @@ function logicCalculate(calculation, input, processor) {
export default {
async run(this, prevJob, processor) {
async run(node, prevJob, processor) {
// TODO(optimize): loading of jobs could be reduced and turned into incrementally in processor
// const jobs = await processor.getJobs();
const { calculation, rejectOnFalse } = this.config || {};
const { calculation, rejectOnFalse } = node.config || {};
const result = logicCalculate(calculation, prevJob, processor);
if (!result && rejectOnFalse) {
@ -90,12 +90,12 @@ export default {
status: JOB_STATUS.RESOLVED,
result,
// TODO(optimize): try unify the building of job
nodeId: this.id,
nodeId: node.id,
upstreamId: prevJob && prevJob.id || null
};
const branchNode = processor.nodes
.find(item => item.upstream === this && Boolean(item.branchIndex) === result);
.find(item => item.upstream === node && Boolean(item.branchIndex) === result);
if (!branchNode) {
return job;
@ -106,13 +106,13 @@ export default {
return processor.run(branchNode, savedJob);
},
async resume(this, branchJob, processor) {
async resume(node, branchJob, processor) {
if (branchJob.status === JOB_STATUS.RESOLVED) {
// return to continue this.downstream
// return to continue node.downstream
return branchJob;
}
// pass control to upper scope by ending current scope
return processor.end(this, branchJob);
return processor.end(node, branchJob);
}
};

View File

@ -2,13 +2,13 @@ import { JOB_STATUS } from "../constants";
import FlowNodeModel from "../models/FlowNode";
export default {
async run(this: FlowNodeModel, input, processor) {
async run(node: FlowNodeModel, input, processor) {
const {
collection,
params = {}
} = this.config;
} = node.config;
const repo = (<typeof FlowNodeModel>this.constructor).database.getRepository(collection);
const repo = (<typeof FlowNodeModel>node.constructor).database.getRepository(collection);
const options = processor.getParsedValue(params);
const result = await repo.create({
...options,

View File

@ -0,0 +1,92 @@
import Plugin from '..';
import { JOB_STATUS } from "../constants";
import ExecutionModel from '../models/Execution';
import JobModel from '../models/Job';
import Processor from '../Processor';
type ValueOf<T> = T[keyof T];
interface DelayConfig {
endStatus: ValueOf<typeof JOB_STATUS>;
duration: number;
}
export default class {
timers: Map<number, NodeJS.Timeout> = new Map();
constructor(protected plugin: Plugin) {
plugin.app.on('beforeStart', () => this.load());
plugin.app.on('beforeStop', () => this.unload())
}
async load() {
const { model } = this.plugin.db.getCollection('jobs');
const jobs = await model.findAll({
where: {
status: JOB_STATUS.PENDING
},
include: [
{
association: 'execution'
},
{
association: 'node',
where: {
type: 'delay'
},
required: true
}
]
}) as JobModel[];
jobs.forEach(job => {
this.schedule(job, job.node.config.duration);
});
}
unload() {
for (const timer of this.timers.values()) {
clearTimeout(timer);
}
this.timers = new Map();
}
schedule(job, duration: number) {
const now = new Date();
const createdAt = Date.parse(job.createdAt);
const delay = createdAt + duration - now.getTime();
const trigger = this.trigger.bind(this, job);
this.timers.set(job.id, setTimeout(trigger, Math.max(0, delay)));
}
async trigger(job) {
const { execution = await job.getExecution() as ExecutionModel } = job;
const processor = this.plugin.createProcessor(execution);
await processor.resume(job);
if (this.timers.get(job.id)) {
this.timers.delete(job.id);
}
}
run = async (node, prevJob, processor: Processor) => {
const job = await processor.saveJob({
status: JOB_STATUS.PENDING,
result: null,
nodeId: node.id,
upstreamId: prevJob?.id ?? null
});
const { duration } = node.config as DelayConfig;
// add to schedule
this.schedule(job, duration);
return processor.end(node, job);
};
resume = async (node, prevJob, processor: Processor) => {
const { endStatus } = node.config as DelayConfig;
prevJob.set('status', endStatus);
return prevJob;
};
}

View File

@ -2,13 +2,13 @@ import { JOB_STATUS } from "../constants";
import FlowNodeModel from "../models/FlowNode";
export default {
async run(this: FlowNodeModel, input, processor) {
async run(node: FlowNodeModel, input, processor) {
const {
collection,
params = {}
} = this.config;
} = node.config;
const repo = (<typeof FlowNodeModel>this.constructor).database.getRepository(collection);
const repo = (<typeof FlowNodeModel>node.constructor).database.getRepository(collection);
const options = processor.getParsedValue(params);
const result = await repo.destroy({
...options,

View File

@ -1,3 +1,7 @@
import path from 'path';
import { requireModule } from '@nocobase/utils';
import FlowNodeModel from '../models/FlowNode';
import Plugin from '..';
@ -24,7 +28,7 @@ export type InstructionResult = Job | Promise<Job>;
// - base on input and context, do any calculations or system call (io), and produce a result or pending.
export interface Instruction {
run(
this: FlowNodeModel,
node: FlowNodeModel,
// what should input to be?
// - just use previously output result for convenience?
input: any,
@ -35,7 +39,7 @@ export interface Instruction {
// for start node in main flow (or branch) to resume when manual sub branch triggered
resume?(
this: FlowNodeModel,
node: FlowNodeModel,
input: any,
processor: Processor
): InstructionResult
@ -56,6 +60,8 @@ export default function<T extends Instruction>(
instructions.register('update', update);
instructions.register('destroy', destroy);
instructions.register('delay', new (requireModule(path.join(__dirname, 'delay')))(plugin));
for (const [name, instruction] of Object.entries(more)) {
instructions.register(name, typeof instruction === 'function' ? new instruction(plugin) : instruction);
}

View File

@ -40,15 +40,15 @@ const StatusGetters = {
};
export default {
async run(this: FlowNodeModel, prevJob: JobModel, processor: Processor) {
async run(node: FlowNodeModel, prevJob: JobModel, processor: Processor) {
const branches = processor.nodes
.filter(item => item.upstream === this && item.branchIndex !== null)
.filter(item => item.upstream === node && item.branchIndex !== null)
.sort((a, b) => a.branchIndex - b.branchIndex);
const job = await processor.saveJob({
status: JOB_STATUS.PENDING,
result: Array(branches.length).fill(null),
nodeId: this.id,
nodeId: node.id,
upstreamId: prevJob?.id ?? null
});
@ -59,11 +59,11 @@ export default {
// another benifit of this is, it could handle sequenced branches in future.
await branches.reduce((promise: Promise<any>, branch) => promise.then(() => processor.run(branch, job)), Promise.resolve());
return processor.end(this, job);
return processor.end(node, job);
},
async resume(this, branchJob, processor: Processor) {
const job = processor.findBranchParentJob(branchJob, this);
async resume(node: FlowNodeModel, branchJob, processor: Processor) {
const job = processor.findBranchParentJob(branchJob, node);
const { result, status } = job;
// if parallel has been done (resolved / rejected), do not care newly executed branch jobs.
@ -74,7 +74,7 @@ export default {
// find the index of the node which start the branch
const jobNode = processor.nodesMap.get(branchJob.nodeId);
const { branchIndex } = processor.findBranchStartNode(jobNode);
const { mode = PARALLEL_MODE.ALL } = this.config || {};
const { mode = PARALLEL_MODE.ALL } = node.config || {};
const newResult = [...result.slice(0, branchIndex), branchJob.get(), ...result.slice(branchIndex + 1)];
job.set({
@ -84,7 +84,7 @@ export default {
if (job.status === JOB_STATUS.PENDING) {
await job.save({ transaction: processor.transaction });
return processor.end(this, job);
return processor.end(node, job);
}
return job;

View File

@ -1,13 +1,13 @@
import { JOB_STATUS } from "../constants";
export default {
run(this, input, processor) {
run(node, input, processor) {
return {
status: JOB_STATUS.PENDING
};
},
resume(this, job, processor) {
resume(node, job, processor) {
job.set('status', JOB_STATUS.RESOLVED);
return job;
}

View File

@ -3,14 +3,14 @@ import Processor from "../Processor";
import { JOB_STATUS } from "../constants";
export default {
async run(this: FlowNodeModel, input, processor: Processor) {
async run(node: FlowNodeModel, input, processor: Processor) {
const {
collection,
multiple,
params = {}
} = this.config;
} = node.config;
const repo = (<typeof FlowNodeModel>this.constructor).database.getRepository(collection);
const repo = (<typeof FlowNodeModel>node.constructor).database.getRepository(collection);
const options = processor.getParsedValue(params);
const result = await (multiple ? repo.find : repo.findOne).call(repo, {
...options,

View File

@ -3,14 +3,14 @@ import Processor from "../Processor";
import { JOB_STATUS } from "../constants";
export default {
async run(this: FlowNodeModel, input, processor: Processor) {
async run(node: FlowNodeModel, input, processor: Processor) {
const {
collection,
multiple = false,
params = {}
} = this.config;
} = node.config;
const repo = (<typeof FlowNodeModel>this.constructor).database.getRepository(collection);
const repo = (<typeof FlowNodeModel>node.constructor).database.getRepository(collection);
const options = processor.getParsedValue(params);
const result = await repo.update({
...options,