From d587599c8af517946a2ac3d540b5d12c7299fcd6 Mon Sep 17 00:00:00 2001 From: Junyi Date: Thu, 20 Jul 2023 10:04:41 +0700 Subject: [PATCH] feat(plugin-workflow): add sql node (#2276) * feat(plugin-workflow): add sql node * fix(plugin-workflow): fix test cases --- packages/plugins/workflow/package.json | 17 +- .../workflow/src/client/locale/zh-CN.ts | 4 + .../workflow/src/client/nodes/index.tsx | 2 + .../plugins/workflow/src/client/nodes/sql.tsx | 37 ++++ .../server/__tests__/instructions/sql.test.ts | 162 ++++++++++++++++++ .../workflow/src/server/instructions/index.ts | 1 + .../workflow/src/server/instructions/sql.ts | 25 +++ 7 files changed, 240 insertions(+), 8 deletions(-) create mode 100644 packages/plugins/workflow/src/client/nodes/sql.tsx create mode 100644 packages/plugins/workflow/src/server/__tests__/instructions/sql.test.ts create mode 100644 packages/plugins/workflow/src/server/instructions/sql.ts diff --git a/packages/plugins/workflow/package.json b/packages/plugins/workflow/package.json index 190d55a355..cb6c477b9e 100644 --- a/packages/plugins/workflow/package.json +++ b/packages/plugins/workflow/package.json @@ -23,11 +23,9 @@ "classnames": "^2.3.1", "cron-parser": "4.4.0", "lru-cache": "8.0.5", - "react-js-cron": "^3.1.0", - "sequelize": "^6.26.0", - "winston": "^3.8.2" + "react-js-cron": "^3.1.0" }, - "devDependencies": { + "peerDependencies": { "@ant-design/icons": "^5.1.4", "@formily/antd-v5": "1.1.0-beta.4", "@formily/core": "2.2.26", @@ -37,18 +35,21 @@ "@nocobase/database": "0.11.0-alpha.1", "@nocobase/evaluators": "0.11.0-alpha.1", "@nocobase/logger": "0.11.0-alpha.1", - "@nocobase/plugin-formula-field": "0.11.0-alpha.1", "@nocobase/plugin-users": "0.11.0-alpha.1", "@nocobase/resourcer": "0.11.0-alpha.1", "@nocobase/server": "0.11.0-alpha.1", - "@nocobase/test": "0.11.0-alpha.1", "@nocobase/utils": "0.11.0-alpha.1", - "@types/ejs": "^3.1.1", "antd": "^5.6.4", "dayjs": "^1.11.8", "react": "18.x", "react-i18next": "^11.15.1", - "react-router-dom": "^6.11.2" + "react-router-dom": "^6.11.2", + "sequelize": "^6.26.0", + "winston": "^3.8.2" + }, + "devDependencies": { + "@nocobase/test": "0.11.0-alpha.1", + "@types/ejs": "^3.1.1" }, "gitHead": "ce588eefb0bfc50f7d5bbee575e0b5e843bf6644" } diff --git a/packages/plugins/workflow/src/client/locale/zh-CN.ts b/packages/plugins/workflow/src/client/locale/zh-CN.ts index 018643c9e0..a63cdfed2d 100644 --- a/packages/plugins/workflow/src/client/locale/zh-CN.ts +++ b/packages/plugins/workflow/src/client/locale/zh-CN.ts @@ -241,4 +241,8 @@ export default { 'Dynamic expression': '动态表达式', 'An expression for calculation in each rows': '每行数据计算规则不同时使用', Unconfigured: '未配置', + + 'SQL action': 'SQL 操作', + 'Execute a SQL statement in database': '在数据库中执行一个 SQL 语句', + 'Usage of SQL query result is not supported yet.': 'SQL 执行的结果暂不支持使用。' }; diff --git a/packages/plugins/workflow/src/client/nodes/index.tsx b/packages/plugins/workflow/src/client/nodes/index.tsx index 3758b98616..5613f5e233 100644 --- a/packages/plugins/workflow/src/client/nodes/index.tsx +++ b/packages/plugins/workflow/src/client/nodes/index.tsx @@ -35,6 +35,7 @@ import parallel from './parallel'; import query from './query'; import request from './request'; import update from './update'; +import sql from './sql'; export interface Instruction { title: string; @@ -71,6 +72,7 @@ instructions.register('destroy', destroy); instructions.register('aggregate', aggregate); instructions.register('request', request); +instructions.register('sql', sql); function useUpdateAction() { const form = useForm(); diff --git a/packages/plugins/workflow/src/client/nodes/sql.tsx b/packages/plugins/workflow/src/client/nodes/sql.tsx new file mode 100644 index 0000000000..729a74c47f --- /dev/null +++ b/packages/plugins/workflow/src/client/nodes/sql.tsx @@ -0,0 +1,37 @@ +import React from 'react'; + +import { Variable, css } from '@nocobase/client'; + +import { NAMESPACE } from '../locale'; +import { useWorkflowVariableOptions } from '../variable'; + +export default { + title: `{{t("SQL action", { ns: "${NAMESPACE}" })}}`, + type: 'sql', + group: 'extended', + description: `{{t("Execute a SQL statement in database.", { ns: "${NAMESPACE}" })}}`, + fieldset: { + sql: { + type: 'string', + required: true, + title: 'SQL', + description: `{{t("Usage of SQL query result is not supported yet.", { ns: "${NAMESPACE}" })}}`, + 'x-decorator': 'FormItem', + 'x-component': 'SQLInput', + 'x-component-props': { + rows: 20, + className: css` + font-size: 80%; + font-family: Consolas, Monaco, 'Andale Mono', 'Ubuntu Mono', monospace; + `, + }, + }, + }, + scope: {}, + components: { + SQLInput(props) { + const scope = useWorkflowVariableOptions(); + return ; + }, + }, +}; diff --git a/packages/plugins/workflow/src/server/__tests__/instructions/sql.test.ts b/packages/plugins/workflow/src/server/__tests__/instructions/sql.test.ts new file mode 100644 index 0000000000..fa7cbd0bc8 --- /dev/null +++ b/packages/plugins/workflow/src/server/__tests__/instructions/sql.test.ts @@ -0,0 +1,162 @@ +import { Application } from '@nocobase/server'; +import Database from '@nocobase/database'; +import { getApp, sleep } from '..'; +import { EXECUTION_STATUS, JOB_STATUS } from '../../constants'; + +describe('workflow > instructions > sql', () => { + let app: Application; + let db: Database; + let PostRepo; + let PostCollection; + let ReplyRepo; + let WorkflowModel; + let workflow; + + beforeEach(async () => { + app = await getApp(); + + db = app.db; + WorkflowModel = db.getCollection('workflows').model; + PostCollection = db.getCollection('posts'); + PostRepo = PostCollection.repository; + ReplyRepo = db.getCollection('replies').repository; + + workflow = await WorkflowModel.create({ + title: 'test workflow', + enabled: true, + type: 'collection', + config: { + mode: 1, + collection: 'posts', + }, + }); + }); + + afterEach(async () => await app.destroy()); + + describe('invalid', () => { + it('no sql', async () => { + const n1 = await workflow.createNode({ + type: 'sql', + config: {}, + }); + + const post = await PostRepo.create({ values: { title: 't1' } }); + + await sleep(500); + + const [execution] = await workflow.getExecutions(); + const [sqlJob] = await execution.getJobs({ order: [['id', 'ASC']] }); + expect(execution.status).toBe(EXECUTION_STATUS.RESOLVED); + expect(sqlJob.status).toBe(JOB_STATUS.RESOLVED); + }); + + it('empty sql', async () => { + const n1 = await workflow.createNode({ + type: 'sql', + config: { + sql: '', + }, + }); + + const post = await PostRepo.create({ values: { title: 't1' } }); + + await sleep(500); + + const [execution] = await workflow.getExecutions(); + const [sqlJob] = await execution.getJobs({ order: [['id', 'ASC']] }); + expect(execution.status).toBe(EXECUTION_STATUS.RESOLVED); + expect(sqlJob.status).toBe(JOB_STATUS.RESOLVED); + }); + + it('invalid sql', async () => { + const n1 = await workflow.createNode({ + type: 'sql', + config: { + sql: '1', + }, + }); + + const post = await PostRepo.create({ values: { title: 't1' } }); + + await sleep(500); + + const [execution] = await workflow.getExecutions(); + const [sqlJob] = await execution.getJobs({ order: [['id', 'ASC']] }); + expect(execution.status).toBe(EXECUTION_STATUS.ERROR); + expect(sqlJob.status).toBe(JOB_STATUS.ERROR); + }); + }); + + describe('sql with variables', () => { + it('update', async () => { + const queryInterface = db.sequelize.getQueryInterface(); + const n1 = await workflow.createNode({ + type: 'sql', + config: { + sql: `update ${PostCollection.quotedTableName()} set ${queryInterface.quoteIdentifier('read')}={{$context.data.id}} where ${queryInterface.quoteIdentifier('id')}={{$context.data.id}}`, + }, + }); + + const n2 = await workflow.createNode({ + type: 'query', + config: { + collection: 'posts', + params: { + filter: { + id: '{{ $context.data.id }}', + } + } + }, + upstreamId: n1.id, + }); + + await n1.setDownstream(n2); + + const post = await PostRepo.create({ values: { title: 't1' } }); + + await sleep(500); + + const [execution] = await workflow.getExecutions(); + const [sqlJob, queryJob] = await execution.getJobs({ order: [['id', 'ASC']] }); + expect(sqlJob.status).toBe(JOB_STATUS.RESOLVED); + expect(queryJob.status).toBe(JOB_STATUS.RESOLVED); + expect(queryJob.result.read).toBe(post.id); + }); + + it('delete', async () => { + const queryInterface = db.sequelize.getQueryInterface(); + const n1 = await workflow.createNode({ + type: 'sql', + config: { + sql: `delete from ${PostCollection.quotedTableName()} where ${queryInterface.quoteIdentifier('id')}={{$context.data.id}};`, + }, + }); + + const n2 = await workflow.createNode({ + type: 'query', + config: { + collection: 'posts', + params: { + filter: { + id: '{{ $context.data.id }}', + } + } + }, + upstreamId: n1.id, + }); + + await n1.setDownstream(n2); + + const post = await PostRepo.create({ values: { title: 't1' } }); + + await sleep(500); + + const [execution] = await workflow.getExecutions(); + const [sqlJob, queryJob] = await execution.getJobs({ order: [['id', 'ASC']] }); + expect(sqlJob.status).toBe(JOB_STATUS.RESOLVED); + expect(queryJob.status).toBe(JOB_STATUS.RESOLVED); + expect(queryJob.result).toBeNull(); + }); + }); +}); diff --git a/packages/plugins/workflow/src/server/instructions/index.ts b/packages/plugins/workflow/src/server/instructions/index.ts index ec9c24e836..c1add7a64e 100644 --- a/packages/plugins/workflow/src/server/instructions/index.ts +++ b/packages/plugins/workflow/src/server/instructions/index.ts @@ -46,6 +46,7 @@ export default function (plugin, more: { [key: string]: T 'destroy', 'aggregate', 'request', + 'sql', ].reduce( (result, key) => Object.assign(result, { diff --git a/packages/plugins/workflow/src/server/instructions/sql.ts b/packages/plugins/workflow/src/server/instructions/sql.ts new file mode 100644 index 0000000000..55b67702b8 --- /dev/null +++ b/packages/plugins/workflow/src/server/instructions/sql.ts @@ -0,0 +1,25 @@ +import { Processor, JOB_STATUS } from '..'; +import type { FlowNodeModel } from '../types'; + +export default { + async run(node: FlowNodeModel, input, processor: Processor) { + const { sequelize } = (node.constructor).database; + const sql = processor.getParsedValue(node.config.sql ?? '', node).trim(); + if (!sql) { + return { + status: JOB_STATUS.RESOLVED, + } + } + + const result = await sequelize.query(sql, { + transaction: processor.transaction, + // plain: true, + // model: db.getCollection(node.config.collection).model + }); + + return { + result, + status: JOB_STATUS.RESOLVED, + }; + }, +};