refactor(plugin-workflow): refactor schedule trigger implementation (#3562)

* refactor(plugin-workflow): refactor schedule trigger implementation

* fix(plugin-workflow): fix test case

* fix(plugin-workflow): fix cleanup on app stop
This commit is contained in:
Junyi 2024-02-25 22:36:20 +08:00 committed by GitHub
parent e6719763f6
commit 88b281277c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 1204 additions and 946 deletions

View File

@ -20,6 +20,7 @@ export function EndsByField({ value, onChange }) {
onChange={(t) => {
onChange(t ? (t === 'field' ? {} : new Date()) : null);
}}
className="auto-width"
>
<Select.Option value={null}>{t('No end')}</Select.Option>
<Select.Option value={'field'}>{t('By field')}</Select.Option>

View File

@ -26,6 +26,7 @@ export function OnField({ value, onChange }) {
onChange={(field) => onChange({ ...value, field })}
filter={dateFieldFilter}
placeholder={t('Select field')}
className="auto-width"
/>
{value.field ? (
<Select
@ -39,6 +40,7 @@ export function OnField({ value, onChange }) {
{ value: -1, label: t('Before') },
{ value: 1, label: t('After') },
]}
className="auto-width"
/>
) : null}
{dir ? (
@ -56,6 +58,7 @@ export function OnField({ value, onChange }) {
{ value: 60000, label: lang('Minutes') },
{ value: 1000, label: lang('Seconds') },
]}
className="auto-width"
/>
</>
) : null}

View File

@ -65,7 +65,7 @@ const ModeFieldsets = {
},
},
},
[SCHEDULE_MODE.COLLECTION_FIELD]: {
[SCHEDULE_MODE.DATE_FIELD]: {
collection: {
...collection,
'x-reactions': [
@ -145,7 +145,7 @@ const ModeFieldsets = {
dependencies: ['mode', 'collection'],
fulfill: {
state: {
visible: `{{$deps[0] === ${SCHEDULE_MODE.COLLECTION_FIELD} && $deps[1]}}`,
visible: `{{$deps[0] === ${SCHEDULE_MODE.DATE_FIELD} && $deps[1]}}`,
},
},
},
@ -157,7 +157,7 @@ const ModeFieldsets = {
const scheduleModeOptions = [
{ value: SCHEDULE_MODE.STATIC, label: `{{t("Based on certain date", { ns: "${NAMESPACE}" })}}` },
{
value: SCHEDULE_MODE.COLLECTION_FIELD,
value: SCHEDULE_MODE.DATE_FIELD,
label: `{{t("Based on date field of collection", { ns: "${NAMESPACE}" })}}`,
},
];

View File

@ -1,4 +1,4 @@
export const SCHEDULE_MODE = {
STATIC: 0,
COLLECTION_FIELD: 1,
DATE_FIELD: 1,
};

View File

@ -38,7 +38,7 @@ export default class extends Trigger {
// ? config.appends.reduce((max, item) => Math.max(max, item.split('.').length), 1) + 1
// : 1;
if (config.mode === SCHEDULE_MODE.COLLECTION_FIELD) {
if (config.mode === SCHEDULE_MODE.DATE_FIELD) {
const [fieldOption] = getCollectionFieldOptions({
// depth,
appends: ['data', ...(config.appends?.map((item) => `data.${item}`) || [])],

View File

@ -1,369 +0,0 @@
import { Application } from '@nocobase/server';
import Database from '@nocobase/database';
import { getApp, sleep } from '@nocobase/plugin-workflow-test';
describe.skip('workflow > triggers > schedule', () => {
let app: Application;
let db: Database;
let PostRepo;
let CategoryRepo;
let WorkflowModel;
let WorkflowRepo;
beforeEach(async () => {
app = await getApp();
db = app.db;
const workflow = db.getCollection('workflows');
WorkflowModel = workflow.model;
WorkflowRepo = workflow.repository;
PostRepo = db.getCollection('posts').repository;
CategoryRepo = db.getCollection('categories').repository;
});
afterEach(() => app.destroy());
describe('constant mode', () => {
it('neither startsOn nor repeat configurated', async () => {
const workflow = await WorkflowModel.create({
enabled: true,
type: 'schedule',
config: {
mode: 0,
},
});
await sleep(3000);
const executions = await workflow.getExecutions();
expect(executions.length).toBe(0);
});
it('on every 2 seconds', async () => {
const now = new Date();
// NOTE: align to even(0, 2, ...) + 0.5 seconds to start
await sleep((2.5 - (now.getSeconds() % 2)) * 1000 - now.getMilliseconds());
const workflow = await WorkflowModel.create({
enabled: true,
type: 'schedule',
config: {
mode: 0,
startsOn: now.toISOString(),
repeat: '*/2 * * * * *',
},
});
await sleep(4000);
// sleep 1.5s at 2s trigger 1st time
// sleep 3.5s at 4s trigger 2nd time
const executions = await workflow.getExecutions();
expect(executions.length).toBe(2);
});
it('on every even seconds and limit 1', async () => {
const now = new Date();
// NOTE: align to even(0, 2, ...) + 0.5 seconds to start
await sleep((2.5 - (now.getSeconds() % 2)) * 1000 - now.getMilliseconds());
const workflow = await WorkflowModel.create({
enabled: true,
type: 'schedule',
config: {
mode: 0,
startsOn: now.toISOString(),
repeat: '*/2 * * * * *',
limit: 1,
},
});
await sleep(5000);
const executions = await workflow.getExecutions();
expect(executions.length).toBe(1);
});
it('on every 2 seconds after created and limit 1', async () => {
const now = new Date();
// NOTE: align to even(0, 2, ...) + 0.5 seconds to start
await sleep((2.5 - (now.getSeconds() % 2)) * 1000 - now.getMilliseconds());
const workflow = await WorkflowModel.create({
enabled: true,
type: 'schedule',
config: {
mode: 0,
startsOn: now.toISOString(),
repeat: 2000,
limit: 1,
},
});
await sleep(5000);
const executions = await workflow.getExecutions();
expect(executions.length).toBe(1);
});
it('on certain second', async () => {
const now = new Date();
const startsOn = now.toISOString();
now.setSeconds(now.getSeconds() + 3);
now.setMilliseconds(0);
const workflow = await WorkflowModel.create({
enabled: true,
type: 'schedule',
config: {
mode: 0,
startsOn,
repeat: `${now.getSeconds()} * * * * *`,
},
});
await sleep(5000);
const executions = await workflow.getExecutions();
expect(executions.length).toBe(1);
expect(executions[0].context.date).toBe(now.toISOString());
});
it('multiple workflows trigger at same time', async () => {
const now = new Date();
const startsOn = now.toISOString();
now.setSeconds(now.getSeconds() + 2);
now.setMilliseconds(0);
let w1, w2;
await db.sequelize.transaction(async (transaction) => {
w1 = await WorkflowRepo.create({
values: {
enabled: true,
type: 'schedule',
config: {
mode: 0,
startsOn,
repeat: `${now.getSeconds()} * * * * *`,
},
},
transaction,
});
});
await db.sequelize.transaction(async (transaction) => {
w2 = await WorkflowRepo.create({
values: {
enabled: true,
type: 'schedule',
config: {
mode: 0,
startsOn,
repeat: `${now.getSeconds()} * * * * *`,
},
},
transaction,
});
});
await sleep(3000);
await WorkflowModel.update({ enabled: false }, { where: { enabled: true } });
const [e1] = await w1.getExecutions();
expect(e1).toBeDefined();
expect(e1.context.date).toBe(now.toISOString());
const [e2] = await w2.getExecutions();
expect(e2).toBeDefined();
expect(e2.context.date).toBe(now.toISOString());
});
});
describe('collection field mode', () => {
it('starts on post.createdAt with offset', async () => {
const workflow = await WorkflowModel.create({
enabled: true,
type: 'schedule',
config: {
mode: 1,
collection: 'posts',
startsOn: {
field: 'createdAt',
offset: 2,
},
},
});
const now = new Date();
await sleep((2.5 - (now.getSeconds() % 2)) * 1000 - now.getMilliseconds());
const post = await PostRepo.create({ values: { title: 't1' } });
await sleep(1000);
const executions = await workflow.getExecutions();
expect(executions.length).toBe(0);
await sleep(2000);
const [execution] = await workflow.getExecutions();
expect(execution).toBeDefined();
expect(execution.context.data.id).toBe(post.id);
const triggerTime = new Date(post.createdAt.getTime() + 2000);
triggerTime.setMilliseconds(0);
expect(execution.context.date).toBe(triggerTime.toISOString());
});
it('starts on post.createdAt and repeat by cron', async () => {
const workflow = await WorkflowModel.create({
enabled: true,
type: 'schedule',
config: {
mode: 1,
collection: 'posts',
startsOn: {
field: 'createdAt',
},
repeat: '*/2 * * * * *',
},
});
const now = new Date();
await sleep((2.5 - (now.getSeconds() % 2)) * 1000 - now.getMilliseconds());
const startTime = new Date();
startTime.setMilliseconds(500);
const post = await PostRepo.create({ values: { title: 't1' } });
await sleep(5000);
// sleep 1.5s at 2s trigger 1st time
// sleep 3.5s at 4s trigger 2nd time
const executions = await workflow.getExecutions({ order: [['createdAt', 'ASC']] });
expect(executions.length).toBe(2);
const d1 = Date.parse(executions[0].context.date);
expect(d1 - 1500).toBe(startTime.getTime());
const d2 = Date.parse(executions[1].context.date);
expect(d2 - 3500).toBe(startTime.getTime());
});
it('starts on post.createdAt and repeat with endsOn at certain time', async () => {
const now = new Date();
await sleep((2.5 - (now.getSeconds() % 2)) * 1000 - now.getMilliseconds());
const startTime = new Date();
startTime.setMilliseconds(500);
const workflow = await WorkflowModel.create({
enabled: true,
type: 'schedule',
config: {
mode: 1,
collection: 'posts',
startsOn: {
field: 'createdAt',
},
repeat: '*/2 * * * * *',
endsOn: new Date(startTime.getTime() + 2500).toISOString(),
},
});
const post = await PostRepo.create({ values: { title: 't1' } });
await sleep(5000);
const executions = await workflow.getExecutions();
expect(executions.length).toBe(1);
const d1 = Date.parse(executions[0].context.date);
expect(d1 - 1500).toBe(startTime.getTime());
});
it('starts on post.createdAt and repeat with endsOn by offset', async () => {
const workflow = await WorkflowModel.create({
enabled: true,
type: 'schedule',
config: {
mode: 1,
collection: 'posts',
startsOn: {
field: 'createdAt',
},
repeat: '*/2 * * * * *',
endsOn: {
field: 'createdAt',
offset: 3,
},
},
});
const now = new Date();
await sleep((2.5 - (now.getSeconds() % 2)) * 1000 - now.getMilliseconds());
const startTime = new Date();
startTime.setMilliseconds(500);
const post = await PostRepo.create({ values: { title: 't1' } });
await sleep(5000);
const executions = await workflow.getExecutions();
expect(executions.length).toBe(1);
const d1 = Date.parse(executions[0].context.date);
expect(d1 - 1500).toBe(startTime.getTime());
});
it('starts on post.createdAt and repeat by number', async () => {
const workflow = await WorkflowModel.create({
enabled: true,
type: 'schedule',
config: {
mode: 1,
collection: 'posts',
startsOn: {
field: 'createdAt',
},
repeat: 2000,
endsOn: {
field: 'createdAt',
offset: 3,
},
},
});
const now = new Date();
await sleep((2.5 - (now.getSeconds() % 2)) * 1000 - now.getMilliseconds());
const startTime = new Date();
startTime.setMilliseconds(500);
const post = await PostRepo.create({ values: { title: 't1' } });
await sleep(5000);
const executions = await workflow.getExecutions();
expect(executions.length).toBe(1);
const d1 = Date.parse(executions[0].context.date);
expect(d1 - 1500).toBe(startTime.getTime());
});
it('appends', async () => {
const category = await CategoryRepo.create({ values: { name: 'c1' } });
const workflow = await WorkflowModel.create({
enabled: true,
type: 'schedule',
config: {
mode: 1,
collection: 'posts',
startsOn: {
field: 'createdAt',
offset: 2,
},
appends: ['category'],
},
});
const post = await PostRepo.create({ values: { title: 't1', categoryId: category.id } });
await sleep(5000);
const executions = await workflow.getExecutions();
expect(executions.length).toBe(1);
expect(executions[0].context.data.category.id).toBe(category.id);
});
});
});

View File

@ -0,0 +1,372 @@
import { MockServer } from '@nocobase/test';
import Database from '@nocobase/database';
import { getApp, sleep } from '@nocobase/plugin-workflow-test';
async function sleepToEvenSecond() {
const now = new Date();
// NOTE: align to even(0, 2, ...) + 0.5 seconds to start
await sleep((2.5 - (now.getSeconds() % 2)) * 1000 - now.getMilliseconds());
return now;
}
describe('workflow > triggers > schedule > date field mode', () => {
let app: MockServer;
let db: Database;
let PostRepo;
let CategoryRepo;
let WorkflowModel;
let WorkflowRepo;
beforeEach(async () => {
app = await getApp();
db = app.db;
const workflow = db.getCollection('workflows');
WorkflowModel = workflow.model;
WorkflowRepo = workflow.repository;
PostRepo = db.getCollection('posts').repository;
CategoryRepo = db.getCollection('categories').repository;
});
afterEach(() => app.destroy());
describe('configuration', () => {
it('starts on post.createdAt', async () => {
const workflow = await WorkflowModel.create({
enabled: true,
type: 'schedule',
config: {
mode: 1,
collection: 'posts',
startsOn: {
field: 'createdAt',
},
},
});
const now = await sleepToEvenSecond();
const post = await PostRepo.create({ values: { title: 't1' } });
await sleep(2000);
const executions = await workflow.getExecutions();
expect(executions.length).toBe(1);
expect(executions[0].context.data.id).toBe(post.id);
const triggerTime = new Date(post.createdAt);
triggerTime.setMilliseconds(0);
expect(executions[0].context.date).toBe(triggerTime.toISOString());
});
it('starts on post.createdAt with +offset', async () => {
const workflow = await WorkflowModel.create({
enabled: true,
type: 'schedule',
config: {
mode: 1,
collection: 'posts',
startsOn: {
field: 'createdAt',
offset: 2,
},
},
});
const now = await sleepToEvenSecond();
const post = await PostRepo.create({ values: { title: 't1' } });
await sleep(1000);
const e1s = await workflow.getExecutions();
expect(e1s.length).toBe(0);
await sleep(2000);
const e2s = await workflow.getExecutions();
expect(e2s.length).toBe(1);
expect(e2s[0].context.data.id).toBe(post.id);
const triggerTime = new Date(post.createdAt.getTime() + 2000);
triggerTime.setMilliseconds(0);
expect(e2s[0].context.date).toBe(triggerTime.toISOString());
});
it('starts on post.createdAt with -offset', async () => {
const workflow = await WorkflowModel.create({
enabled: true,
type: 'schedule',
config: {
mode: 1,
collection: 'posts',
startsOn: {
field: 'createdAt',
offset: -2,
},
},
});
const now = await sleepToEvenSecond();
const post = await PostRepo.create({ values: { title: 't1' } });
await sleep(3000);
const executions = await workflow.getExecutions();
expect(executions.length).toBe(0);
});
it('starts on post.createdAt and repeat by cron', async () => {
const workflow = await WorkflowModel.create({
enabled: true,
type: 'schedule',
config: {
mode: 1,
collection: 'posts',
startsOn: {
field: 'createdAt',
},
repeat: '*/2 * * * * *',
},
});
await sleepToEvenSecond();
const startTime = new Date();
startTime.setMilliseconds(0);
const post = await PostRepo.create({ values: { title: 't1' } });
await sleep(5000);
// immediately trigger 1st time
// sleep 1.5s at 2s trigger 2nd time
// sleep 3.5s at 4s trigger 3rd time
const executions = await workflow.getExecutions({ order: [['createdAt', 'ASC']] });
expect(executions.length).toBe(3);
const d0 = Date.parse(executions[0].context.date);
expect(d0).toBe(startTime.getTime());
const d1 = Date.parse(executions[1].context.date);
expect(d1 - 2000).toBe(startTime.getTime());
const d2 = Date.parse(executions[2].context.date);
expect(d2 - 4000).toBe(startTime.getTime());
});
it('starts on post.createdAt and repeat by cron with endsOn at certain time', async () => {
await sleepToEvenSecond();
const startTime = new Date();
startTime.setMilliseconds(0);
const workflow = await WorkflowModel.create({
enabled: true,
type: 'schedule',
config: {
mode: 1,
collection: 'posts',
startsOn: {
field: 'createdAt',
},
repeat: '*/2 * * * * *',
endsOn: new Date(startTime.getTime() + 3000).toISOString(),
},
});
const post = await PostRepo.create({ values: { title: 't1' } });
await sleep(5000);
const executions = await workflow.getExecutions({ order: [['createdAt', 'ASC']] });
expect(executions.length).toBe(2);
const d0 = Date.parse(executions[0].context.date);
expect(d0).toBe(startTime.getTime());
const d1 = Date.parse(executions[1].context.date);
expect(d1 - 2000).toBe(startTime.getTime());
});
it('starts on post.createdAt and repeat by cron with endsOn by offset', async () => {
const workflow = await WorkflowModel.create({
enabled: true,
type: 'schedule',
config: {
mode: 1,
collection: 'posts',
startsOn: {
field: 'createdAt',
},
repeat: '*/2 * * * * *',
endsOn: {
field: 'createdAt',
offset: 3,
},
},
});
await sleepToEvenSecond();
const startTime = new Date();
startTime.setMilliseconds(0);
const post = await PostRepo.create({ values: { title: 't1' } });
await sleep(5000);
const executions = await workflow.getExecutions({ order: [['createdAt', 'ASC']] });
expect(executions.length).toBe(2);
const d0 = Date.parse(executions[0].context.date);
expect(d0).toBe(startTime.getTime());
const d1 = Date.parse(executions[1].context.date);
expect(d1 - 2000).toBe(startTime.getTime());
});
it('starts on post.createdAt and repeat by cron and limit 1', async () => {
const workflow = await WorkflowModel.create({
enabled: true,
type: 'schedule',
config: {
mode: 1,
collection: 'posts',
startsOn: {
field: 'createdAt',
},
repeat: '*/2 * * * * *',
limit: 1,
},
});
await sleepToEvenSecond();
const startTime = new Date();
startTime.setMilliseconds(0);
const post = await PostRepo.create({ values: { title: 't1' } });
await sleep(5000);
const executions = await workflow.getExecutions({ order: [['createdAt', 'ASC']] });
expect(executions.length).toBe(1);
const d0 = Date.parse(executions[0].context.date);
expect(d0).toBe(startTime.getTime());
});
it('starts on post.createdAt and repeat by cron and limit 2', async () => {
const workflow = await WorkflowModel.create({
enabled: true,
type: 'schedule',
config: {
mode: 1,
collection: 'posts',
startsOn: {
field: 'createdAt',
},
repeat: '*/2 * * * * *',
limit: 2,
},
});
await sleepToEvenSecond();
const startTime = new Date();
startTime.setMilliseconds(0);
const post = await PostRepo.create({ values: { title: 't1' } });
await sleep(5000);
const executions = await workflow.getExecutions({ order: [['createdAt', 'ASC']] });
expect(executions.length).toBe(2);
const d0 = Date.parse(executions[0].context.date);
expect(d0).toBe(startTime.getTime());
const d1 = Date.parse(executions[1].context.date);
expect(d1 - 2000).toBe(startTime.getTime());
});
it('starts on post.createdAt and repeat by number', async () => {
const workflow = await WorkflowModel.create({
enabled: true,
type: 'schedule',
config: {
mode: 1,
collection: 'posts',
startsOn: {
field: 'createdAt',
},
repeat: 2000,
endsOn: {
field: 'createdAt',
offset: 3,
},
},
});
await sleepToEvenSecond();
const startTime = new Date();
startTime.setMilliseconds(0);
const post = await PostRepo.create({ values: { title: 't1' } });
await sleep(5000);
const executions = await workflow.getExecutions({ order: [['createdAt', 'ASC']] });
expect(executions.length).toBe(2);
const d0 = Date.parse(executions[0].context.date);
expect(d0).toBe(startTime.getTime());
const d1 = Date.parse(executions[1].context.date);
expect(d1 - 2000).toBe(startTime.getTime());
});
it('appends', async () => {
const category = await CategoryRepo.create({ values: { name: 'c1' } });
const workflow = await WorkflowModel.create({
enabled: true,
type: 'schedule',
config: {
mode: 1,
collection: 'posts',
startsOn: {
field: 'createdAt',
offset: 2,
},
appends: ['category'],
},
});
const post = await PostRepo.create({ values: { title: 't1', categoryId: category.id } });
await sleep(5000);
const executions = await workflow.getExecutions();
expect(executions.length).toBe(1);
expect(executions[0].context.data.category.id).toBe(category.id);
});
it('on field changed', async () => {
const workflow = await WorkflowModel.create({
enabled: true,
type: 'schedule',
config: {
mode: 1,
collection: 'posts',
startsOn: {
field: 'createdAt',
},
repeat: 1000,
endsOn: {
field: 'createdAt',
offset: 3,
},
},
});
await sleepToEvenSecond();
const post = await PostRepo.create({ values: { title: 't1' } });
await sleep(1700);
console.log('check executions');
const e1c = await workflow.countExecutions();
expect(e1c).toBe(2);
await post.update({ createdAt: new Date(post.createdAt.getTime() - 1000) });
await sleep(3000);
const e2c = await workflow.countExecutions();
expect(e2c).toBe(2);
});
});
});

View File

@ -0,0 +1,279 @@
import { scryptSync } from 'crypto';
import { MockServer } from '@nocobase/test';
import Database from '@nocobase/database';
import { getApp, sleep } from '@nocobase/plugin-workflow-test';
async function sleepToEvenSecond() {
const now = new Date();
// NOTE: align to even(0, 2, ...) + 0.5 seconds to start
await sleep((2.5 - (now.getSeconds() % 2)) * 1000 - now.getMilliseconds());
return now;
}
function consumeTime(n: number) {
console.time('consumeTime');
for (let i = 0; i < n; i++) {
scryptSync(`${i}`, 'salt', 64);
}
console.timeEnd('consumeTime');
}
describe('workflow > triggers > schedule > static mode', () => {
let app: MockServer;
let db: Database;
let PostRepo;
let CategoryRepo;
let WorkflowModel;
let WorkflowRepo;
beforeEach(async () => {
app = await getApp();
db = app.db;
const workflow = db.getCollection('workflows');
WorkflowModel = workflow.model;
WorkflowRepo = workflow.repository;
PostRepo = db.getCollection('posts').repository;
CategoryRepo = db.getCollection('categories').repository;
});
afterEach(() => app.destroy());
describe('configuration', () => {
it('neither startsOn nor repeat configurated', async () => {
const workflow = await WorkflowModel.create({
enabled: true,
type: 'schedule',
config: {
mode: 0,
},
});
await sleep(3000);
const executions = await workflow.getExecutions();
expect(executions.length).toBe(0);
});
it('start on certain time and no repeat', async () => {
await sleepToEvenSecond();
const start = new Date();
start.setMilliseconds(0);
start.setSeconds(start.getSeconds() + 2);
const workflow = await WorkflowModel.create({
enabled: true,
type: 'schedule',
config: {
mode: 0,
startsOn: start.toISOString(),
},
});
await sleep(3000);
const executions = await workflow.getExecutions();
expect(executions.length).toBe(1);
});
it('on every 2 seconds', async () => {
const start = await sleepToEvenSecond();
const workflow = await WorkflowModel.create({
enabled: true,
type: 'schedule',
config: {
mode: 0,
startsOn: start.toISOString(),
repeat: '*/2 * * * * *',
},
});
await sleep(4000);
// sleep 1.5s at 2s trigger 1st time
// sleep 3.5s at 4s trigger 2nd time
const executions = await workflow.getExecutions();
expect(executions.length).toBe(2);
});
it('on every even seconds and limit 1', async () => {
const start = await sleepToEvenSecond();
const workflow = await WorkflowModel.create({
enabled: true,
type: 'schedule',
config: {
mode: 0,
startsOn: start.toISOString(),
repeat: '*/2 * * * * *',
limit: 1,
},
});
await sleep(5000);
const executions = await workflow.getExecutions();
expect(executions.length).toBe(1);
});
it('start before now and repeat every 2 seconds after created and limit 1', async () => {
const start = await sleepToEvenSecond();
start.setMilliseconds(0);
const workflow = await WorkflowModel.create({
enabled: true,
type: 'schedule',
config: {
mode: 0,
startsOn: start.toISOString(),
repeat: 2000,
limit: 1,
},
});
await sleep(5000);
const executions = await workflow.getExecutions();
expect(executions.length).toBe(1);
expect(new Date(executions[0].context.date).getTime()).toBe(start.getTime() + 2000);
});
it('repeat on cron certain second', async () => {
const now = new Date();
now.setMilliseconds(0);
const startsOn = now.toISOString();
now.setSeconds(now.getSeconds() + 3);
await sleep(1500);
const workflow = await WorkflowModel.create({
enabled: true,
type: 'schedule',
config: {
mode: 0,
startsOn,
repeat: `${now.getSeconds()} * * * * *`,
},
});
await sleep(4000);
const executions = await workflow.getExecutions();
expect(executions.length).toBe(1);
const date = new Date(executions[0].context.date);
expect(date.getTime()).toBe(now.getTime());
});
it('multiple workflows trigger at same time', async () => {
const now = new Date();
const startsOn = now.toISOString();
now.setSeconds(now.getSeconds() + 2);
now.setMilliseconds(0);
let w1, w2;
await db.sequelize.transaction(async (transaction) => {
w1 = await WorkflowRepo.create({
values: {
enabled: true,
type: 'schedule',
config: {
mode: 0,
startsOn,
repeat: `${now.getSeconds()} * * * * *`,
},
},
transaction,
});
w2 = await WorkflowRepo.create({
values: {
enabled: true,
type: 'schedule',
config: {
mode: 0,
startsOn,
repeat: `${now.getSeconds()} * * * * *`,
},
},
transaction,
});
});
await sleep(3000);
await WorkflowModel.update({ enabled: false }, { where: { enabled: true } });
const [e1] = await w1.getExecutions();
expect(e1).toBeDefined();
const d1 = new Date(e1.context.date);
d1.setMilliseconds(0);
expect(d1.getTime()).toBe(now.getTime());
const [e2] = await w2.getExecutions();
expect(e2).toBeDefined();
const d2 = new Date(e1.context.date);
d2.setMilliseconds(0);
expect(d2.getTime()).toBe(now.getTime());
});
});
describe('dispatch', () => {
it('missed non-repeated scheduled time should not be triggered', async () => {
await sleepToEvenSecond();
const start = new Date();
start.setMilliseconds(0);
start.setSeconds(start.getSeconds() + 2);
const workflow = await WorkflowModel.create({
enabled: true,
type: 'schedule',
config: {
mode: 0,
startsOn: start.toISOString(),
},
});
await app.stop();
await sleep(3000);
await app.start();
await sleep(1000);
const c1 = await workflow.countExecutions();
expect(c1).toBe(0);
});
it('scheduled time on CPU heavy load should be triggered', async () => {
await sleepToEvenSecond();
const start = new Date();
start.setMilliseconds(0);
start.setSeconds(start.getSeconds() + 2);
const workflow = await WorkflowModel.create({
enabled: true,
type: 'schedule',
config: {
mode: 0,
startsOn: start.toISOString(),
},
});
await sleep(1000);
const c1 = await workflow.countExecutions();
expect(c1).toBe(0);
consumeTime(100); // on AMD 5600G takes about 2.7s
await sleep(2000);
const c2 = await workflow.countExecutions();
expect(c2).toBe(1);
});
});
});

View File

@ -1,572 +0,0 @@
import { fn, literal, Op, where } from '@nocobase/database';
import parser from 'cron-parser';
import type Plugin from '../Plugin';
import Trigger from '.';
import type { WorkflowModel } from '../types';
export type ScheduleOnField =
| string
| {
field: string;
// in seconds
offset?: number;
unit?: 1000 | 60000 | 3600000 | 86400000;
};
export interface ScheduleTriggerConfig {
// trigger mode
mode: number;
// how to repeat
repeat?: string | number | null;
// limit of repeat times
limit?: number;
startsOn?: ScheduleOnField;
endsOn?: ScheduleOnField;
}
export const SCHEDULE_MODE = {
CONSTANT: 0,
COLLECTION_FIELD: 1,
} as const;
interface ScheduleMode {
on?(this: ScheduleTrigger, workflow: WorkflowModel): void;
off?(this: ScheduleTrigger, workflow: WorkflowModel): void;
shouldCache(this: ScheduleTrigger, workflow: WorkflowModel, now: Date): Promise<boolean> | boolean;
trigger(this: ScheduleTrigger, workflow: WorkflowModel, now: Date): Promise<number> | number;
}
const ScheduleModes = new Map<number, ScheduleMode>();
function parseDateWithoutMs(date: string) {
return Math.floor(Date.parse(date) / 1000) * 1000;
}
ScheduleModes.set(SCHEDULE_MODE.CONSTANT, {
shouldCache(workflow, now) {
const { startsOn, endsOn, repeat } = workflow.config;
const timestamp = now.getTime();
// NOTE: align to second start
const startTime = parseDateWithoutMs(startsOn);
if (!startTime || startTime > timestamp + this.cacheCycle) {
return false;
}
if (repeat) {
if (typeof repeat === 'number') {
const next = timestamp - ((timestamp - startTime) % repeat) + repeat;
if (next <= timestamp || next > timestamp + this.cacheCycle) {
return false;
}
}
if (endsOn) {
const endTime = parseDateWithoutMs(endsOn);
if (!endTime || endTime <= timestamp) {
return false;
}
}
} else {
if (startTime <= timestamp) {
return false;
}
}
return true;
},
trigger(workflow, now) {
const { startsOn, endsOn, repeat } = workflow.config;
const timestamp = now.getTime();
// NOTE: align to second start
const startTime = parseDateWithoutMs(startsOn);
if (!startTime || startTime > timestamp) {
return 0;
}
if (repeat) {
if (typeof repeat === 'number') {
if (Math.round(timestamp - startTime) % repeat) {
return 0;
}
}
if (endsOn) {
const endTime = parseDateWithoutMs(endsOn);
if (!endTime || endTime < timestamp) {
return 0;
}
}
} else {
if (startTime !== timestamp) {
return 0;
}
}
this.workflow.trigger(workflow, { date: now });
return 1;
},
});
function getOnTimestampWithOffset(on, now: Date) {
switch (typeof on) {
case 'string':
return parseDateWithoutMs(on);
case 'object': {
const { field, offset = 0, unit = 1000 } = on;
if (!field) {
return null;
}
const timestamp = now.getTime();
// onDate + offset > now
// onDate > now - offset
return timestamp - offset * unit;
}
default:
return null;
}
}
function getDataOptionTime(data, on, dir = 1) {
if (!on) {
return null;
}
switch (typeof on) {
case 'string': {
const time = parseDateWithoutMs(on);
return time ? time : null;
}
case 'object': {
const { field, offset = 0, unit = 1000 } = on;
return data.get(field) ? data.get(field).getTime() - offset * unit * dir : null;
}
default:
return null;
}
}
function getHookId(workflow, type: string) {
return `${type}#${workflow.id}`;
}
const DialectTimestampFnMap: { [key: string]: (col: string) => string } = {
postgres(col) {
return `CAST(FLOOR(extract(epoch from "${col}")) AS INTEGER)`;
},
mysql(col) {
return `CAST(FLOOR(UNIX_TIMESTAMP(\`${col}\`)) AS SIGNED INTEGER)`;
},
sqlite(col) {
return `CAST(FLOOR(unixepoch(${col})) AS INTEGER)`;
},
};
DialectTimestampFnMap.mariadb = DialectTimestampFnMap.mysql;
ScheduleModes.set(SCHEDULE_MODE.COLLECTION_FIELD, {
on(workflow) {
const { collection, startsOn, endsOn, repeat } = workflow.config;
const event = `${collection}.afterSave`;
const name = getHookId(workflow, event);
if (this.events.has(name)) {
return;
}
// NOTE: toggle cache depends on new date
const listener = async (data, options) => {
const now = new Date();
now.setMilliseconds(0);
const timestamp = now.getTime();
const startTime = getDataOptionTime(data, startsOn);
const endTime = getDataOptionTime(data, endsOn, -1);
if (!startTime) {
return;
}
if (startTime && startTime > timestamp + this.cacheCycle) {
return;
}
if (endTime && endTime <= timestamp) {
return;
}
if (!matchNext.call(this, workflow, now)) {
return;
}
if (
typeof repeat === 'number' &&
repeat > this.cacheCycle &&
(timestamp - startTime) % repeat > this.cacheCycle
) {
return;
}
this.setCache(workflow);
};
this.events.set(name, listener);
this.workflow.app.db.on(event, listener);
},
off(workflow) {
const { collection } = workflow.config;
const event = `${collection}.afterSave`;
const name = getHookId(workflow, event);
if (this.events.has(name)) {
const listener = this.events.get(name);
this.events.delete(name);
this.workflow.app.db.off(event, listener);
}
},
async shouldCache(workflow, now) {
const { db } = this.workflow.app;
const { startsOn, endsOn, repeat, collection } = workflow.config;
const timestamp = now.getTime();
const startTimestamp = getOnTimestampWithOffset(startsOn, now);
if (!startTimestamp) {
return false;
}
const conditions: any[] = [
{
[startsOn.field]: {
[Op.lt]: new Date(startTimestamp + this.cacheCycle),
},
},
];
// when repeat is number, means repeat after startsOn
// (now - startsOn) % repeat <= cacheCycle
if (repeat) {
const tsFn = DialectTimestampFnMap[db.options.dialect];
if (typeof repeat === 'number' && repeat > this.cacheCycle && tsFn) {
const modExp = fn(
'MOD',
literal(`${Math.round(timestamp / 1000)} - ${tsFn(startsOn.field)}`),
Math.round(repeat / 1000),
);
conditions.push(where(modExp, { [Op.lt]: Math.round(this.cacheCycle / 1000) }));
// conditions.push(literal(`mod(${timestamp} - ${tsFn(startsOn.field)} * 1000, ${repeat}) < ${this.cacheCycle}`));
}
if (endsOn) {
const endTimestamp = getOnTimestampWithOffset(endsOn, now);
if (!endTimestamp) {
return false;
}
if (typeof endsOn === 'string') {
if (endTimestamp <= timestamp) {
return false;
}
} else {
conditions.push({
[endsOn.field]: {
[Op.gte]: new Date(endTimestamp + this.interval),
},
});
}
}
} else {
conditions.push({
[startsOn.field]: {
[Op.gte]: new Date(startTimestamp),
},
});
}
const { model } = db.getCollection(collection);
const count = await model.count({
where: { [Op.and]: conditions },
});
return Boolean(count);
},
async trigger(workflow, now: Date) {
const { startsOn, repeat, endsOn, collection, appends } = workflow.config;
const timestamp = now.getTime();
const startTimestamp = getOnTimestampWithOffset(startsOn, now);
if (!startTimestamp) {
return 0;
}
const conditions: any[] = [
{
[startsOn.field]: {
[Op.lt]: new Date(startTimestamp + this.interval),
},
},
];
if (repeat) {
// startsOn not after now
conditions.push({
[startsOn.field]: {
[Op.lt]: new Date(startTimestamp),
},
});
const tsFn = DialectTimestampFnMap[this.workflow.app.db.options.dialect];
if (typeof repeat === 'number' && tsFn) {
const modExp = fn(
'MOD',
literal(`${Math.round(timestamp / 1000)} - ${tsFn(startsOn.field)}`),
Math.round(repeat / 1000),
);
conditions.push(where(modExp, { [Op.eq]: 0 }));
// conditions.push(literal(`MOD(CAST(${timestamp} AS BIGINT) - CAST((FLOOR(${tsFn(startsOn.field)}) AS BIGINT) * 1000), ${repeat}) = 0`));
}
if (endsOn) {
const endTimestamp = getOnTimestampWithOffset(endsOn, now);
if (!endTimestamp) {
return 0;
}
if (typeof endsOn === 'string') {
if (endTimestamp <= timestamp) {
return 0;
}
} else {
conditions.push({
[endsOn.field]: {
[Op.gte]: new Date(endTimestamp + this.interval),
},
});
}
}
} else {
// startsOn exactly equal to now in 1s
conditions.push({
[startsOn.field]: {
[Op.gte]: new Date(startTimestamp),
},
});
}
const repo = this.workflow.app.db.getRepository(collection);
const instances = await repo.find({
where: {
[Op.and]: conditions,
},
appends,
...(workflow.config.limit
? {
limit: Math.max(workflow.config.limit - workflow.allExecuted, 0),
}
: {}),
});
instances.forEach((item) => {
this.workflow.trigger(workflow, {
date: now,
data: item.toJSON(),
});
});
return instances.length;
},
});
function matchNext(this: ScheduleTrigger, workflow, now: Date, range: number = this.cacheCycle): boolean {
const { repeat } = workflow.config;
// no repeat means no need to rerun
// but if in current cycle, should be put in cache
// no repeat but in current cycle means startsOn has been configured
// so we need to more info to determine if necessary config items
if (typeof repeat !== 'string') {
return true;
}
const currentDate = new Date(now);
currentDate.setMilliseconds(-1);
const timestamp = now.getTime();
const interval = parser.parseExpression(repeat, { currentDate });
const next = interval.next();
// NOTE: cache all workflows will be matched in current cycle
if (next.getTime() - timestamp <= range) {
return true;
}
return false;
}
export default class ScheduleTrigger extends Trigger {
sync = false;
static CacheRules = [
({ config, allExecuted }) => (config.limit ? allExecuted < config.limit : true) && config.startsOn,
matchNext,
function (workflow, now) {
const { mode } = workflow.config;
const modeHandlers = ScheduleModes.get(mode);
if (!modeHandlers) {
return false;
}
return modeHandlers.shouldCache.call(this, workflow, now);
},
];
static TriggerRules = [
({ config, allExecuted }) => (config.limit ? allExecuted < config.limit : true) && config.startsOn,
function (workflow, now) {
return matchNext.call(this, workflow, now, 0);
},
];
events = new Map();
private timer: NodeJS.Timeout | null = null;
private cache: Map<number | string, any> = new Map();
// running interval, default to 1s
interval = 1_000;
// caching workflows in range, default to 1min
cacheCycle = 60_000;
constructor(workflow: Plugin) {
super(workflow);
workflow.app.on('beforeStop', () => {
if (this.timer) {
clearInterval(this.timer);
}
});
}
init() {
if (this.workflow.app.getPlugin('multi-app-share-collection')?.enabled && this.workflow.app.name !== 'main') {
return;
}
if (this.timer) {
return;
}
const now = new Date();
// NOTE: assign to this.timer to avoid duplicated initialization
this.timer = setTimeout(
this.run,
// NOTE:
// try to align to system time on each second starts,
// after at least 1 second initialized for everything to get ready.
// so jobs in 2 seconds will be missed at first start.
1_000 - now.getMilliseconds(),
);
}
run = () => {
const now = new Date();
// 1001 to avoid 999
const nextInterval = 1_001 - now.getMilliseconds();
now.setMilliseconds(0);
// NOTE: trigger `onTick` for high interval jobs which are cached in last 1 min
this.onTick(now);
// NOTE: reload when second match cache cycle
if (!(now.getTime() % this.cacheCycle)) {
this.reload();
}
this.timer = setTimeout(this.run, nextInterval);
};
async onTick(now) {
// NOTE: trigger workflows in sequence when sqlite due to only one transaction
const isSqlite = this.workflow.app.db.options.dialect === 'sqlite';
return Array.from(this.cache.values()).reduce(
(prev, workflow) => {
if (!this.shouldTrigger(workflow, now)) {
return prev;
}
if (isSqlite) {
return prev.then(() => this.trigger(workflow, now));
}
this.trigger(workflow, now);
return null;
},
isSqlite ? Promise.resolve() : null,
);
}
async reload() {
const WorkflowRepo = this.workflow.app.db.getRepository('workflows');
const workflows = await WorkflowRepo.find({
filter: { enabled: true, type: 'schedule' },
});
// NOTE: clear cached jobs in last cycle
this.cache = new Map();
this.inspect(workflows);
}
inspect(workflows) {
const now = new Date();
now.setMilliseconds(0);
workflows.forEach(async (workflow) => {
const should = await this.shouldCache(workflow, now);
if (should) {
this.workflow.getLogger(workflow.id).info('caching scheduled workflow will run in next minute');
}
this.setCache(workflow, !should);
});
}
setCache(workflow, out = false) {
out ? this.cache.delete(workflow.id) : this.cache.set(workflow.id, workflow);
}
async shouldCache(workflow, now) {
for await (const rule of (<typeof ScheduleTrigger>this.constructor).CacheRules) {
if (!(await rule.call(this, workflow, now))) {
return false;
}
}
return true;
}
shouldTrigger(workflow, now): boolean {
for (const rule of (<typeof ScheduleTrigger>this.constructor).TriggerRules) {
if (!rule.call(this, workflow, now)) {
return false;
}
}
return true;
}
async trigger(workflow, date: Date) {
const { mode } = workflow.config;
const modeHandlers = ScheduleModes.get(mode);
if (!modeHandlers) {
return;
}
return modeHandlers.trigger.call(this, workflow, date);
}
on(workflow) {
// NOTE: lazy initialization
this.init();
const { mode } = workflow.config;
const modeHandlers = ScheduleModes.get(mode);
if (modeHandlers && modeHandlers.on) {
modeHandlers.on.call(this, workflow);
}
this.inspect([workflow]);
}
off(workflow) {
const { mode } = workflow.config;
const modeHandlers = ScheduleModes.get(mode);
if (modeHandlers && modeHandlers.off) {
modeHandlers.off.call(this, workflow);
}
this.cache.delete(workflow.id);
}
}

View File

@ -0,0 +1,369 @@
import { fn, literal, Op, Transactionable, where } from '@nocobase/database';
import parser from 'cron-parser';
import type Plugin from '../../Plugin';
import type { WorkflowModel } from '../../types';
import { parseDateWithoutMs, SCHEDULE_MODE } from './utils';
export type ScheduleOnField = {
field: string;
// in seconds
offset?: number;
unit?: 1000 | 60000 | 3600000 | 86400000;
};
export interface ScheduleTriggerConfig {
// trigger mode
mode: number;
// how to repeat
repeat?: string | number | null;
// limit of repeat times
limit?: number;
startsOn?: ScheduleOnField;
endsOn?: string | ScheduleOnField;
}
function getOnTimestampWithOffset({ field, offset = 0, unit = 1000 }: ScheduleOnField, now: Date) {
if (!field) {
return null;
}
const timestamp = now.getTime();
// onDate + offset > now
// onDate > now - offset
return timestamp - offset * unit;
}
function getDataOptionTime(record, on, dir = 1) {
if (!on) {
return null;
}
switch (typeof on) {
case 'string': {
const time = parseDateWithoutMs(on);
return time ? time : null;
}
case 'object': {
const { field, offset = 0, unit = 1000 } = on;
if (!record.get(field)) {
return null;
}
const second = new Date(record.get(field).getTime());
second.setMilliseconds(0);
return second.getTime() + offset * unit * dir;
}
default:
return null;
}
}
const DialectTimestampFnMap: { [key: string]: (col: string) => string } = {
postgres(col) {
return `CAST(FLOOR(extract(epoch from "${col}")) AS INTEGER)`;
},
mysql(col) {
return `CAST(FLOOR(UNIX_TIMESTAMP(\`${col}\`)) AS SIGNED INTEGER)`;
},
sqlite(col) {
return `CAST(FLOOR(unixepoch(${col})) AS INTEGER)`;
},
};
DialectTimestampFnMap.mariadb = DialectTimestampFnMap.mysql;
function getCronNextTime(cron, currentDate: Date): number {
const interval = parser.parseExpression(cron, { currentDate });
const next = interval.next();
return next.getTime();
}
function matchCronNextTime(cron, currentDate: Date, range: number): boolean {
return getCronNextTime(cron, currentDate) - currentDate.getTime() <= range;
}
function getHookId(workflow, type: string) {
return `${type}#${workflow.id}`;
}
export default class ScheduleTrigger {
events = new Map();
private timer: NodeJS.Timeout | null = null;
private cache: Map<string, any> = new Map();
// caching workflows in range, default to 5min
cacheCycle = 300_000;
constructor(public workflow: Plugin) {
workflow.app.on('afterStart', async () => {
if (this.timer) {
return;
}
this.timer = setInterval(() => this.reload(), this.cacheCycle);
this.reload();
});
workflow.app.on('beforeStop', () => {
if (this.timer) {
clearInterval(this.timer);
}
for (const [key, timer] of this.cache.entries()) {
clearTimeout(timer);
this.cache.delete(key);
}
});
}
async reload() {
const WorkflowRepo = this.workflow.app.db.getRepository('workflows');
const workflows = await WorkflowRepo.find({
filter: { enabled: true, type: 'schedule', 'config.mode': SCHEDULE_MODE.DATE_FIELD },
});
// NOTE: clear cached jobs in last cycle
this.cache = new Map();
this.inspect(workflows);
}
inspect(workflows: WorkflowModel[]) {
const now = new Date();
workflows.forEach(async (workflow) => {
const records = await this.loadRecordsToSchedule(workflow, now);
records.forEach((record) => {
const nextTime = this.getRecordNextTime(workflow, record);
this.schedule(workflow, record, nextTime, Boolean(nextTime));
});
});
}
// 1. startsOn in range -> yes
// 2. startsOn before now, has no repeat -> no
// 3. startsOn before now, and has repeat:
// a. repeat out of range -> no
// b. repeat in range (number or cron):
// i. endsOn after now -> yes
// ii. endsOn before now -> no
async loadRecordsToSchedule(
{ config: { collection, limit, startsOn, repeat, endsOn }, allExecuted }: WorkflowModel,
currentDate: Date,
) {
const { db } = this.workflow.app;
if (limit && allExecuted >= limit) {
return [];
}
if (!startsOn) {
return [];
}
const timestamp = currentDate.getTime();
const startTimestamp = getOnTimestampWithOffset(startsOn, currentDate);
if (!startTimestamp) {
return [];
}
const range = this.cacheCycle * 2;
const conditions: any[] = [
{
[startsOn.field]: {
// cache next 2 cycles
[Op.lt]: new Date(startTimestamp + range),
},
},
];
if (repeat) {
// when repeat is number, means repeat after startsOn
// (now - startsOn) % repeat <= cacheCycle
if (typeof repeat === 'number') {
const tsFn = DialectTimestampFnMap[db.options.dialect];
if (repeat > range && tsFn) {
const modExp = fn(
'MOD',
literal(`${Math.round(timestamp / 1000)} - ${tsFn(startsOn.field)}`),
Math.round(repeat / 1000),
);
conditions.push(where(modExp, { [Op.lt]: Math.round(range / 1000) }));
}
} else if (typeof repeat === 'string') {
if (!matchCronNextTime(repeat, currentDate, range)) {
return [];
}
}
if (endsOn) {
const now = new Date();
const endTimestamp = getOnTimestampWithOffset(endsOn, now);
if (!endTimestamp) {
return [];
}
if (typeof endsOn === 'string') {
if (endTimestamp <= timestamp) {
return [];
}
} else {
conditions.push({
[endsOn.field]: {
[Op.gte]: new Date(endTimestamp),
},
});
}
}
} else {
conditions.push({
[startsOn.field]: {
[Op.gte]: new Date(startTimestamp),
},
});
}
const { model } = db.getCollection(collection);
return model.findAll({
where: {
[Op.and]: conditions,
},
});
}
getRecordNextTime(workflow: WorkflowModel, record, nextSecond = false) {
const {
config: { startsOn, endsOn, repeat, limit },
allExecuted,
} = workflow;
if (limit && allExecuted >= limit) {
return null;
}
const range = this.cacheCycle;
const now = new Date();
now.setMilliseconds(nextSecond ? 1000 : 0);
const timestamp = now.getTime();
const startTime = getDataOptionTime(record, startsOn);
const endTime = getDataOptionTime(record, endsOn);
let nextTime = null;
if (!startTime) {
return null;
}
if (startTime > timestamp + range) {
return null;
}
if (startTime >= timestamp) {
return !endTime || (endTime >= startTime && endTime < timestamp + range) ? startTime : null;
} else {
if (!repeat) {
return null;
}
}
if (typeof repeat === 'number') {
const nextRepeatTime = ((startTime - timestamp) % repeat) + repeat;
if (nextRepeatTime > range) {
return null;
}
if (endTime && endTime < timestamp + nextRepeatTime) {
return null;
}
nextTime = timestamp + nextRepeatTime;
} else if (typeof repeat === 'string') {
nextTime = getCronNextTime(repeat, now);
if (nextTime - timestamp > range) {
return null;
}
if (endTime && endTime < nextTime) {
return null;
}
}
if (endTime && endTime <= timestamp) {
return null;
}
return nextTime;
}
schedule(workflow: WorkflowModel, record, nextTime, toggle = true, options = {}) {
const { model } = this.workflow.app.db.getCollection(workflow.config.collection);
const recordPk = record.get(model.primaryKeyAttribute);
if (toggle) {
const nextInterval = Math.max(0, nextTime - Date.now());
const key = `${workflow.id}:${recordPk}@${nextTime}`;
if (!this.cache.has(key)) {
if (nextInterval) {
this.cache.set(key, setTimeout(this.trigger.bind(this, workflow, record, nextTime), nextInterval));
} else {
return this.trigger(workflow, record, nextTime, options);
}
}
} else {
for (const [key, timer] of this.cache.entries()) {
if (key.startsWith(`${workflow.id}:${recordPk}@`)) {
clearTimeout(timer);
this.cache.delete(key);
}
}
}
}
async trigger(workflow: WorkflowModel, record, nextTime, { transaction }: Transactionable = {}) {
const { repository, model } = this.workflow.app.db.getCollection(workflow.config.collection);
const recordPk = record.get(model.primaryKeyAttribute);
const data = await repository.findOne({
filterByTk: recordPk,
appends: workflow.config.appends,
transaction,
});
const key = `${workflow.id}:${recordPk}@${nextTime}`;
this.cache.delete(key);
this.workflow.trigger(workflow, {
data: data.toJSON(),
date: new Date(nextTime),
});
if (!workflow.config.repeat || (workflow.config.limit && workflow.allExecuted >= workflow.config.limit - 1)) {
return;
}
const n = this.getRecordNextTime(workflow, data, true);
if (n) {
this.schedule(workflow, data, n, true);
}
}
on(workflow: WorkflowModel) {
this.inspect([workflow]);
const { collection } = workflow.config;
const event = `${collection}.afterSaveWithAssociations`;
const name = getHookId(workflow, event);
if (this.events.has(name)) {
return;
}
const listener = async (data, { transaction }) => {
const nextTime = this.getRecordNextTime(workflow, data);
return this.schedule(workflow, data, nextTime, Boolean(nextTime), { transaction });
};
this.events.set(name, listener);
this.workflow.app.db.on(event, listener);
}
off(workflow: WorkflowModel) {
for (const [key, timer] of this.cache.entries()) {
if (key.startsWith(`${workflow.id}:`)) {
clearTimeout(timer);
this.cache.delete(key);
}
}
const { collection } = workflow.config;
const event = `${collection}.afterSave`;
const name = getHookId(workflow, event);
if (this.events.has(name)) {
const listener = this.events.get(name);
this.events.delete(name);
this.workflow.app.db.off(event, listener);
}
}
}

View File

@ -0,0 +1,114 @@
import parser from 'cron-parser';
import type Plugin from '../../Plugin';
import { SCHEDULE_MODE, parseDateWithoutMs } from './utils';
export default class StaticScheduleTrigger {
private timers: Map<string, NodeJS.Timeout | null> = new Map();
constructor(public workflow: Plugin) {
workflow.app.on('afterStart', async () => {
const WorkflowRepo = this.workflow.app.db.getRepository('workflows');
const workflows = await WorkflowRepo.find({
filter: { enabled: true, type: 'schedule', 'config.mode': SCHEDULE_MODE.STATIC },
});
this.inspect(workflows);
});
workflow.app.on('beforeStop', () => {
for (const timer of this.timers.values()) {
clearInterval(timer);
}
});
}
inspect(workflows) {
const now = new Date();
now.setMilliseconds(0);
workflows.forEach((workflow) => {
const nextTime = this.getNextTime(workflow, now);
if (nextTime) {
this.workflow
.getLogger(workflow.id)
.info(`caching scheduled workflow will run at: ${new Date(nextTime).toISOString()}`);
} else {
this.workflow.getLogger(workflow.id).info('workflow will not be scheduled');
}
this.schedule(workflow, nextTime, nextTime >= now.getTime());
});
}
getNextTime({ config, allExecuted }, currentDate, nextSecond = false) {
if (config.limit && allExecuted >= config.limit) {
return null;
}
if (!config.startsOn) {
return null;
}
currentDate.setMilliseconds(nextSecond ? 1000 : 0);
const timestamp = currentDate.getTime();
const startTime = parseDateWithoutMs(config.startsOn);
if (startTime > timestamp) {
return startTime;
}
if (config.repeat) {
const endTime = config.endsOn ? parseDateWithoutMs(config.endsOn) : null;
if (endTime && endTime < timestamp) {
return null;
}
if (typeof config.repeat === 'string') {
const interval = parser.parseExpression(config.repeat, { currentDate });
const next = interval.next();
return next.getTime();
} else if (typeof config.repeat === 'number') {
return timestamp + ((timestamp - startTime) % config.repeat);
} else {
return null;
}
} else {
if (startTime < timestamp) {
return null;
}
return timestamp;
}
}
schedule(workflow, nextTime, toggle = true) {
const key = `${workflow.id}@${nextTime}`;
if (toggle) {
if (!this.timers.has(key)) {
const interval = Math.max(nextTime - Date.now(), 0);
this.timers.set(key, setTimeout(this.trigger.bind(this, workflow, nextTime), interval));
}
} else {
const timer = this.timers.get(key);
clearTimeout(timer);
this.timers.delete(key);
}
}
async trigger(workflow, time) {
this.timers.delete(`${workflow.id}@${time}`);
this.workflow.trigger(workflow, { date: new Date(time) });
if (!workflow.config.repeat || (workflow.config.limit && workflow.allExecuted >= workflow.config.limit - 1)) {
return;
}
const nextTime = this.getNextTime(workflow, new Date(), true);
if (nextTime) {
this.schedule(workflow, nextTime);
}
}
on(workflow) {
this.inspect([workflow]);
}
off(workflow) {
this.schedule(workflow, null, false);
}
}

View File

@ -0,0 +1,53 @@
import { Transactionable } from 'sequelize';
import Trigger from '..';
import type Plugin from '../../Plugin';
import { WorkflowModel } from '../../types';
import DateFieldScheduleTrigger from './DateFieldScheduleTrigger';
import StaticScheduleTrigger from './StaticScheduleTrigger';
import { SCHEDULE_MODE } from './utils';
export default class ScheduleTrigger extends Trigger {
sync = false;
private modes = new Map();
constructor(workflow: Plugin) {
super(workflow);
this.modes.set(SCHEDULE_MODE.STATIC, new StaticScheduleTrigger(workflow));
this.modes.set(SCHEDULE_MODE.DATE_FIELD, new DateFieldScheduleTrigger(workflow));
}
private getTrigger(mode: number) {
return this.modes.get(mode);
}
on(workflow) {
const mode = workflow.config.mode;
const trigger = this.getTrigger(mode);
if (trigger) {
trigger.on(workflow);
}
}
off(workflow) {
const mode = workflow.config.mode;
const trigger = this.getTrigger(mode);
if (trigger) {
trigger.off(workflow);
}
}
async validateEvent(workflow: WorkflowModel, context: any, options: Transactionable): Promise<boolean> {
if (!context.date) {
return false;
}
const existed = await workflow.countExecutions({
where: {
'context.date': context.date,
},
transaction: options.transaction,
});
return !existed;
}
}

View File

@ -0,0 +1,8 @@
export const SCHEDULE_MODE = {
STATIC: 0,
DATE_FIELD: 1,
} as const;
export function parseDateWithoutMs(date: string) {
return Math.floor(Date.parse(date) / 1000) * 1000;
}