feat(plugin-workflow): add sql node (#2276)

* feat(plugin-workflow): add sql node

* fix(plugin-workflow): fix test cases
This commit is contained in:
Junyi 2023-07-20 10:04:41 +07:00 committed by GitHub
parent d2a9e4acee
commit d587599c8a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 240 additions and 8 deletions

View File

@ -23,11 +23,9 @@
"classnames": "^2.3.1",
"cron-parser": "4.4.0",
"lru-cache": "8.0.5",
"react-js-cron": "^3.1.0",
"sequelize": "^6.26.0",
"winston": "^3.8.2"
"react-js-cron": "^3.1.0"
},
"devDependencies": {
"peerDependencies": {
"@ant-design/icons": "^5.1.4",
"@formily/antd-v5": "1.1.0-beta.4",
"@formily/core": "2.2.26",
@ -37,18 +35,21 @@
"@nocobase/database": "0.11.0-alpha.1",
"@nocobase/evaluators": "0.11.0-alpha.1",
"@nocobase/logger": "0.11.0-alpha.1",
"@nocobase/plugin-formula-field": "0.11.0-alpha.1",
"@nocobase/plugin-users": "0.11.0-alpha.1",
"@nocobase/resourcer": "0.11.0-alpha.1",
"@nocobase/server": "0.11.0-alpha.1",
"@nocobase/test": "0.11.0-alpha.1",
"@nocobase/utils": "0.11.0-alpha.1",
"@types/ejs": "^3.1.1",
"antd": "^5.6.4",
"dayjs": "^1.11.8",
"react": "18.x",
"react-i18next": "^11.15.1",
"react-router-dom": "^6.11.2"
"react-router-dom": "^6.11.2",
"sequelize": "^6.26.0",
"winston": "^3.8.2"
},
"devDependencies": {
"@nocobase/test": "0.11.0-alpha.1",
"@types/ejs": "^3.1.1"
},
"gitHead": "ce588eefb0bfc50f7d5bbee575e0b5e843bf6644"
}

View File

@ -241,4 +241,8 @@ export default {
'Dynamic expression': '动态表达式',
'An expression for calculation in each rows': '每行数据计算规则不同时使用',
Unconfigured: '未配置',
'SQL action': 'SQL 操作',
'Execute a SQL statement in database': '在数据库中执行一个 SQL 语句',
'Usage of SQL query result is not supported yet.': 'SQL 执行的结果暂不支持使用。'
};

View File

@ -35,6 +35,7 @@ import parallel from './parallel';
import query from './query';
import request from './request';
import update from './update';
import sql from './sql';
export interface Instruction {
title: string;
@ -71,6 +72,7 @@ instructions.register('destroy', destroy);
instructions.register('aggregate', aggregate);
instructions.register('request', request);
instructions.register('sql', sql);
function useUpdateAction() {
const form = useForm();

View File

@ -0,0 +1,37 @@
import React from 'react';
import { Variable, css } from '@nocobase/client';
import { NAMESPACE } from '../locale';
import { useWorkflowVariableOptions } from '../variable';
export default {
title: `{{t("SQL action", { ns: "${NAMESPACE}" })}}`,
type: 'sql',
group: 'extended',
description: `{{t("Execute a SQL statement in database.", { ns: "${NAMESPACE}" })}}`,
fieldset: {
sql: {
type: 'string',
required: true,
title: 'SQL',
description: `{{t("Usage of SQL query result is not supported yet.", { ns: "${NAMESPACE}" })}}`,
'x-decorator': 'FormItem',
'x-component': 'SQLInput',
'x-component-props': {
rows: 20,
className: css`
font-size: 80%;
font-family: Consolas, Monaco, 'Andale Mono', 'Ubuntu Mono', monospace;
`,
},
},
},
scope: {},
components: {
SQLInput(props) {
const scope = useWorkflowVariableOptions();
return <Variable.RawTextArea scope={scope} {...props} />;
},
},
};

View File

@ -0,0 +1,162 @@
import { Application } from '@nocobase/server';
import Database from '@nocobase/database';
import { getApp, sleep } from '..';
import { EXECUTION_STATUS, JOB_STATUS } from '../../constants';
describe('workflow > instructions > sql', () => {
let app: Application;
let db: Database;
let PostRepo;
let PostCollection;
let ReplyRepo;
let WorkflowModel;
let workflow;
beforeEach(async () => {
app = await getApp();
db = app.db;
WorkflowModel = db.getCollection('workflows').model;
PostCollection = db.getCollection('posts');
PostRepo = PostCollection.repository;
ReplyRepo = db.getCollection('replies').repository;
workflow = await WorkflowModel.create({
title: 'test workflow',
enabled: true,
type: 'collection',
config: {
mode: 1,
collection: 'posts',
},
});
});
afterEach(async () => await app.destroy());
describe('invalid', () => {
it('no sql', async () => {
const n1 = await workflow.createNode({
type: 'sql',
config: {},
});
const post = await PostRepo.create({ values: { title: 't1' } });
await sleep(500);
const [execution] = await workflow.getExecutions();
const [sqlJob] = await execution.getJobs({ order: [['id', 'ASC']] });
expect(execution.status).toBe(EXECUTION_STATUS.RESOLVED);
expect(sqlJob.status).toBe(JOB_STATUS.RESOLVED);
});
it('empty sql', async () => {
const n1 = await workflow.createNode({
type: 'sql',
config: {
sql: '',
},
});
const post = await PostRepo.create({ values: { title: 't1' } });
await sleep(500);
const [execution] = await workflow.getExecutions();
const [sqlJob] = await execution.getJobs({ order: [['id', 'ASC']] });
expect(execution.status).toBe(EXECUTION_STATUS.RESOLVED);
expect(sqlJob.status).toBe(JOB_STATUS.RESOLVED);
});
it('invalid sql', async () => {
const n1 = await workflow.createNode({
type: 'sql',
config: {
sql: '1',
},
});
const post = await PostRepo.create({ values: { title: 't1' } });
await sleep(500);
const [execution] = await workflow.getExecutions();
const [sqlJob] = await execution.getJobs({ order: [['id', 'ASC']] });
expect(execution.status).toBe(EXECUTION_STATUS.ERROR);
expect(sqlJob.status).toBe(JOB_STATUS.ERROR);
});
});
describe('sql with variables', () => {
it('update', async () => {
const queryInterface = db.sequelize.getQueryInterface();
const n1 = await workflow.createNode({
type: 'sql',
config: {
sql: `update ${PostCollection.quotedTableName()} set ${queryInterface.quoteIdentifier('read')}={{$context.data.id}} where ${queryInterface.quoteIdentifier('id')}={{$context.data.id}}`,
},
});
const n2 = await workflow.createNode({
type: 'query',
config: {
collection: 'posts',
params: {
filter: {
id: '{{ $context.data.id }}',
}
}
},
upstreamId: n1.id,
});
await n1.setDownstream(n2);
const post = await PostRepo.create({ values: { title: 't1' } });
await sleep(500);
const [execution] = await workflow.getExecutions();
const [sqlJob, queryJob] = await execution.getJobs({ order: [['id', 'ASC']] });
expect(sqlJob.status).toBe(JOB_STATUS.RESOLVED);
expect(queryJob.status).toBe(JOB_STATUS.RESOLVED);
expect(queryJob.result.read).toBe(post.id);
});
it('delete', async () => {
const queryInterface = db.sequelize.getQueryInterface();
const n1 = await workflow.createNode({
type: 'sql',
config: {
sql: `delete from ${PostCollection.quotedTableName()} where ${queryInterface.quoteIdentifier('id')}={{$context.data.id}};`,
},
});
const n2 = await workflow.createNode({
type: 'query',
config: {
collection: 'posts',
params: {
filter: {
id: '{{ $context.data.id }}',
}
}
},
upstreamId: n1.id,
});
await n1.setDownstream(n2);
const post = await PostRepo.create({ values: { title: 't1' } });
await sleep(500);
const [execution] = await workflow.getExecutions();
const [sqlJob, queryJob] = await execution.getJobs({ order: [['id', 'ASC']] });
expect(sqlJob.status).toBe(JOB_STATUS.RESOLVED);
expect(queryJob.status).toBe(JOB_STATUS.RESOLVED);
expect(queryJob.result).toBeNull();
});
});
});

View File

@ -46,6 +46,7 @@ export default function <T extends Instruction>(plugin, more: { [key: string]: T
'destroy',
'aggregate',
'request',
'sql',
].reduce(
(result, key) =>
Object.assign(result, {

View File

@ -0,0 +1,25 @@
import { Processor, JOB_STATUS } from '..';
import type { FlowNodeModel } from '../types';
export default {
async run(node: FlowNodeModel, input, processor: Processor) {
const { sequelize } = (<typeof FlowNodeModel>node.constructor).database;
const sql = processor.getParsedValue(node.config.sql ?? '', node).trim();
if (!sql) {
return {
status: JOB_STATUS.RESOLVED,
}
}
const result = await sequelize.query(sql, {
transaction: processor.transaction,
// plain: true,
// model: db.getCollection(node.config.collection).model
});
return {
result,
status: JOB_STATUS.RESOLVED,
};
},
};