Merge branch 'main' into next

This commit is contained in:
GitHub Actions Bot 2024-10-10 06:17:52 +00:00
commit 2672789a64
3 changed files with 68 additions and 12 deletions

View File

@ -24,12 +24,12 @@ export default class extends Instruction {
};
}
// @ts-ignore
const [result, meta] = await db.sequelize.query(sql, {
transaction: this.workflow.useDataSourceTransaction(dataSourceName, processor.transaction),
// plain: true,
// model: db.getCollection(node.config.collection).model
});
const [result = null, meta = null] =
(await db.sequelize.query(sql, {
transaction: this.workflow.useDataSourceTransaction(dataSourceName, processor.transaction),
// plain: true,
// model: db.getCollection(node.config.collection).model
})) ?? [];
return {
result: node.config.withMeta ? [result, meta] : result,

View File

@ -14,6 +14,8 @@ import { getApp, sleep } from '@nocobase/plugin-workflow-test';
import Plugin from '..';
const mysql = process.env.DB_DIALECT === 'mysql' ? describe : describe.skip;
describe('workflow > instructions > sql', () => {
let app: Application;
let db: Database;
@ -288,4 +290,48 @@ describe('workflow > instructions > sql', () => {
expect(job.result[0].id).toBe(post.id);
});
});
describe('dialects', () => {
mysql('mysql', () => {
it('stored procedure with result', async () => {
await db.sequelize.query(`DROP PROCEDURE IF EXISTS hello`);
await db.sequelize.query(`CREATE PROCEDURE hello(IN id INT) BEGIN select id + 1 as a; END;`);
const n1 = await workflow.createNode({
type: 'sql',
config: {
sql: 'call hello(1)',
},
});
await PostRepo.create({ values: { title: 't1' } });
await sleep(500);
const [execution] = await workflow.getExecutions();
const [sqlJob] = await execution.getJobs({ order: [['id', 'ASC']] });
expect(sqlJob.status).toBe(JOB_STATUS.RESOLVED);
expect(sqlJob.result).toEqual({ a: 2 });
});
it('stored procedure without result', async () => {
await db.sequelize.query(`DROP PROCEDURE IF EXISTS hello`);
await db.sequelize.query(`CREATE PROCEDURE hello(IN id INT) BEGIN declare i int default 0; END;`);
const n1 = await workflow.createNode({
type: 'sql',
config: {
sql: 'call hello(1)',
},
});
await PostRepo.create({ values: { title: 't1' } });
await sleep(500);
const [execution] = await workflow.getExecutions();
const [sqlJob] = await execution.getJobs({ order: [['id', 'ASC']] });
expect(sqlJob.status).toBe(JOB_STATUS.RESOLVED);
expect(sqlJob.result).toBe(null);
});
});
});
});

View File

@ -12,7 +12,7 @@ import parser from 'cron-parser';
import type Plugin from '../../Plugin';
import type { WorkflowModel } from '../../types';
import { parseDateWithoutMs, SCHEDULE_MODE } from './utils';
import { parseCollectionName } from '@nocobase/data-source-manager';
import { parseCollectionName, SequelizeCollectionManager } from '@nocobase/data-source-manager';
export type ScheduleOnField = {
field: string;
@ -68,10 +68,10 @@ function getDataOptionTime(record, on, dir = 1) {
const DialectTimestampFnMap: { [key: string]: (col: string) => string } = {
postgres(col) {
return `CAST(FLOOR(extract(epoch from "${col}")) AS INTEGER)`;
return `CAST(FLOOR(extract(epoch from ${col})) AS INTEGER)`;
},
mysql(col) {
return `CAST(FLOOR(UNIX_TIMESTAMP(\`${col}\`)) AS SIGNED INTEGER)`;
return `CAST(FLOOR(UNIX_TIMESTAMP(${col})) AS SIGNED INTEGER)`;
},
sqlite(col) {
return `CAST(FLOOR(unixepoch(${col})) AS INTEGER)`;
@ -160,7 +160,7 @@ export default class ScheduleTrigger {
{ config: { collection, limit, startsOn, repeat, endsOn }, allExecuted }: WorkflowModel,
currentDate: Date,
) {
const { db } = this.workflow.app;
const { dataSourceManager } = this.workflow.app;
if (limit && allExecuted >= limit) {
return [];
}
@ -174,6 +174,14 @@ export default class ScheduleTrigger {
return [];
}
const [dataSourceName, collectionName] = parseCollectionName(collection);
const { collectionManager } = dataSourceManager.get(dataSourceName);
if (!(collectionManager instanceof SequelizeCollectionManager)) {
return [];
}
const { db } = collectionManager;
const { model } = collectionManager.getCollection(collectionName);
const range = this.cacheCycle * 2;
const conditions: any[] = [
@ -191,9 +199,12 @@ export default class ScheduleTrigger {
if (typeof repeat === 'number') {
const tsFn = DialectTimestampFnMap[db.options.dialect];
if (repeat > range && tsFn) {
const { field } = model.getAttributes()[startsOn.field];
const modExp = fn(
'MOD',
literal(`${Math.round(timestamp / 1000)} - ${tsFn(startsOn.field)}`),
literal(
`${Math.round(timestamp / 1000)} - ${db.sequelize.getQueryInterface().quoteIdentifiers(tsFn(field))}`,
),
Math.round(repeat / 1000),
);
conditions.push(where(modExp, { [Op.lt]: Math.round(range / 1000) }));
@ -230,7 +241,6 @@ export default class ScheduleTrigger {
});
}
const { model } = db.getCollection(collection);
return model.findAll({
where: {
[Op.and]: conditions,