diff --git a/packages/plugins/@nocobase/plugin-workflow-sql/src/server/SQLInstruction.ts b/packages/plugins/@nocobase/plugin-workflow-sql/src/server/SQLInstruction.ts index 85dd936d3c..124f08536c 100644 --- a/packages/plugins/@nocobase/plugin-workflow-sql/src/server/SQLInstruction.ts +++ b/packages/plugins/@nocobase/plugin-workflow-sql/src/server/SQLInstruction.ts @@ -24,12 +24,12 @@ export default class extends Instruction { }; } - // @ts-ignore - const [result, meta] = await db.sequelize.query(sql, { - transaction: this.workflow.useDataSourceTransaction(dataSourceName, processor.transaction), - // plain: true, - // model: db.getCollection(node.config.collection).model - }); + const [result = null, meta = null] = + (await db.sequelize.query(sql, { + transaction: this.workflow.useDataSourceTransaction(dataSourceName, processor.transaction), + // plain: true, + // model: db.getCollection(node.config.collection).model + })) ?? []; return { result: node.config.withMeta ? [result, meta] : result, diff --git a/packages/plugins/@nocobase/plugin-workflow-sql/src/server/__tests__/instruction.test.ts b/packages/plugins/@nocobase/plugin-workflow-sql/src/server/__tests__/instruction.test.ts index c069c49ce3..981543330e 100644 --- a/packages/plugins/@nocobase/plugin-workflow-sql/src/server/__tests__/instruction.test.ts +++ b/packages/plugins/@nocobase/plugin-workflow-sql/src/server/__tests__/instruction.test.ts @@ -14,6 +14,8 @@ import { getApp, sleep } from '@nocobase/plugin-workflow-test'; import Plugin from '..'; +const mysql = process.env.DB_DIALECT === 'mysql' ? describe : describe.skip; + describe('workflow > instructions > sql', () => { let app: Application; let db: Database; @@ -288,4 +290,48 @@ describe('workflow > instructions > sql', () => { expect(job.result[0].id).toBe(post.id); }); }); + + describe('dialects', () => { + mysql('mysql', () => { + it('stored procedure with result', async () => { + await db.sequelize.query(`DROP PROCEDURE IF EXISTS hello`); + await db.sequelize.query(`CREATE PROCEDURE hello(IN id INT) BEGIN select id + 1 as a; END;`); + const n1 = await workflow.createNode({ + type: 'sql', + config: { + sql: 'call hello(1)', + }, + }); + + await PostRepo.create({ values: { title: 't1' } }); + + await sleep(500); + + const [execution] = await workflow.getExecutions(); + const [sqlJob] = await execution.getJobs({ order: [['id', 'ASC']] }); + expect(sqlJob.status).toBe(JOB_STATUS.RESOLVED); + expect(sqlJob.result).toEqual({ a: 2 }); + }); + + it('stored procedure without result', async () => { + await db.sequelize.query(`DROP PROCEDURE IF EXISTS hello`); + await db.sequelize.query(`CREATE PROCEDURE hello(IN id INT) BEGIN declare i int default 0; END;`); + const n1 = await workflow.createNode({ + type: 'sql', + config: { + sql: 'call hello(1)', + }, + }); + + await PostRepo.create({ values: { title: 't1' } }); + + await sleep(500); + + const [execution] = await workflow.getExecutions(); + const [sqlJob] = await execution.getJobs({ order: [['id', 'ASC']] }); + expect(sqlJob.status).toBe(JOB_STATUS.RESOLVED); + expect(sqlJob.result).toBe(null); + }); + }); + }); }); diff --git a/packages/plugins/@nocobase/plugin-workflow/src/server/triggers/ScheduleTrigger/DateFieldScheduleTrigger.ts b/packages/plugins/@nocobase/plugin-workflow/src/server/triggers/ScheduleTrigger/DateFieldScheduleTrigger.ts index f87c99b3d1..20599abb08 100644 --- a/packages/plugins/@nocobase/plugin-workflow/src/server/triggers/ScheduleTrigger/DateFieldScheduleTrigger.ts +++ b/packages/plugins/@nocobase/plugin-workflow/src/server/triggers/ScheduleTrigger/DateFieldScheduleTrigger.ts @@ -12,7 +12,7 @@ import parser from 'cron-parser'; import type Plugin from '../../Plugin'; import type { WorkflowModel } from '../../types'; import { parseDateWithoutMs, SCHEDULE_MODE } from './utils'; -import { parseCollectionName } from '@nocobase/data-source-manager'; +import { parseCollectionName, SequelizeCollectionManager } from '@nocobase/data-source-manager'; export type ScheduleOnField = { field: string; @@ -68,10 +68,10 @@ function getDataOptionTime(record, on, dir = 1) { const DialectTimestampFnMap: { [key: string]: (col: string) => string } = { postgres(col) { - return `CAST(FLOOR(extract(epoch from "${col}")) AS INTEGER)`; + return `CAST(FLOOR(extract(epoch from ${col})) AS INTEGER)`; }, mysql(col) { - return `CAST(FLOOR(UNIX_TIMESTAMP(\`${col}\`)) AS SIGNED INTEGER)`; + return `CAST(FLOOR(UNIX_TIMESTAMP(${col})) AS SIGNED INTEGER)`; }, sqlite(col) { return `CAST(FLOOR(unixepoch(${col})) AS INTEGER)`; @@ -160,7 +160,7 @@ export default class ScheduleTrigger { { config: { collection, limit, startsOn, repeat, endsOn }, allExecuted }: WorkflowModel, currentDate: Date, ) { - const { db } = this.workflow.app; + const { dataSourceManager } = this.workflow.app; if (limit && allExecuted >= limit) { return []; } @@ -174,6 +174,14 @@ export default class ScheduleTrigger { return []; } + const [dataSourceName, collectionName] = parseCollectionName(collection); + const { collectionManager } = dataSourceManager.get(dataSourceName); + if (!(collectionManager instanceof SequelizeCollectionManager)) { + return []; + } + const { db } = collectionManager; + const { model } = collectionManager.getCollection(collectionName); + const range = this.cacheCycle * 2; const conditions: any[] = [ @@ -191,9 +199,12 @@ export default class ScheduleTrigger { if (typeof repeat === 'number') { const tsFn = DialectTimestampFnMap[db.options.dialect]; if (repeat > range && tsFn) { + const { field } = model.getAttributes()[startsOn.field]; const modExp = fn( 'MOD', - literal(`${Math.round(timestamp / 1000)} - ${tsFn(startsOn.field)}`), + literal( + `${Math.round(timestamp / 1000)} - ${db.sequelize.getQueryInterface().quoteIdentifiers(tsFn(field))}`, + ), Math.round(repeat / 1000), ); conditions.push(where(modExp, { [Op.lt]: Math.round(range / 1000) })); @@ -230,7 +241,6 @@ export default class ScheduleTrigger { }); } - const { model } = db.getCollection(collection); return model.findAll({ where: { [Op.and]: conditions,