fix(plugin-workflow): fix schedule trigger bug (#949)

This commit is contained in:
Junyi 2022-10-23 14:50:18 +08:00 committed by chenos
parent a47ca1dd6c
commit 9b8a4d1063
3 changed files with 215 additions and 187 deletions

View File

@ -40,7 +40,7 @@ export default observer(({ value, onChange }: any) => {
// && (!['linkTo', 'hasMany', 'hasOne', 'belongsToMany'].includes(field.type))
));
const fieldsSet = new Set(fields.map(field => field.name));
const unassignedFields = fields.filter(field => !(field.name in value));
return (
<fieldset className={css`
@ -131,18 +131,16 @@ export default observer(({ value, onChange }: any) => {
);
})
}
{Object.keys(value).filter(key => fieldsSet.has(key)).length < fields.length
{unassignedFields.length
? (
<Dropdown overlay={
<Menu onClick={({ key }) => onChange({ ...value, [key]: null })} className={css`
max-height: 300px;
overflow-y: auto;
`}>
{fields
.filter(field => !(field.name in value))
.map(field => (
<Menu.Item key={field.name}>{compile(field.uiSchema?.title ?? field.name)}</Menu.Item>
))}
{unassignedFields.map(field => (
<Menu.Item key={field.name}>{compile(field.uiSchema?.title ?? field.name)}</Menu.Item>
))}
</Menu>
}>
<Button icon={<PlusOutlined />}>{t('Add field')}</Button>

View File

@ -4,7 +4,7 @@ import { getApp, sleep } from '..';
describe.skip('workflow > triggers > schedule', () => {
describe('workflow > triggers > schedule', () => {
let app: Application;
let db: Database;
let PostRepo;
@ -24,7 +24,7 @@ describe.skip('workflow > triggers > schedule', () => {
afterEach(() => app.destroy());
describe('constant mode', () => {
it('no repeat configurated', async () => {
it('neither startsOn nor repeat configurated', async () => {
const workflow = await WorkflowModel.create({
enabled: true,
type: 'schedule',
@ -49,6 +49,7 @@ describe.skip('workflow > triggers > schedule', () => {
type: 'schedule',
config: {
mode: 0,
startsOn: now.toISOString(),
repeat: '*/2 * * * * *',
}
});
@ -71,6 +72,7 @@ describe.skip('workflow > triggers > schedule', () => {
type: 'schedule',
config: {
mode: 0,
startsOn: now.toISOString(),
repeat: '*/2 * * * * *',
limit: 1
}
@ -92,6 +94,7 @@ describe.skip('workflow > triggers > schedule', () => {
type: 'schedule',
config: {
mode: 0,
startsOn: now.toISOString(),
repeat: 2000,
limit: 1
}
@ -105,6 +108,7 @@ describe.skip('workflow > triggers > schedule', () => {
it('on certain second', async () => {
const now = new Date();
const startsOn = now.toISOString();
now.setSeconds(now.getSeconds() + 3);
now.setMilliseconds(0);
@ -113,6 +117,7 @@ describe.skip('workflow > triggers > schedule', () => {
type: 'schedule',
config: {
mode: 0,
startsOn,
repeat: `${now.getSeconds()} * * * * *`,
}
});
@ -126,6 +131,7 @@ describe.skip('workflow > triggers > schedule', () => {
it('multiple workflows trigger at same time', async () => {
const now = new Date();
const startsOn = now.toISOString();
now.setSeconds(now.getSeconds() + 2);
now.setMilliseconds(0);
@ -137,6 +143,7 @@ describe.skip('workflow > triggers > schedule', () => {
type: 'schedule',
config: {
mode: 0,
startsOn,
repeat: `${now.getSeconds()} * * * * *`,
}
},
@ -151,6 +158,7 @@ describe.skip('workflow > triggers > schedule', () => {
type: 'schedule',
config: {
mode: 0,
startsOn,
repeat: `${now.getSeconds()} * * * * *`,
}
},
@ -205,7 +213,7 @@ describe.skip('workflow > triggers > schedule', () => {
expect(execution.context.date).toBe(triggerTime.toISOString());
});
it('starts on post.createdAt and repeat', async () => {
it('starts on post.createdAt and repeat by cron', async () => {
const workflow = await WorkflowModel.create({
enabled: true,
type: 'schedule',
@ -223,6 +231,7 @@ describe.skip('workflow > triggers > schedule', () => {
await sleep((2.5 - now.getSeconds() % 2) * 1000 - now.getMilliseconds());
const startTime = new Date();
startTime.setMilliseconds(500);
console.log(startTime);
const post = await PostRepo.create({ values: { title: 't1' }});
@ -230,7 +239,7 @@ describe.skip('workflow > triggers > schedule', () => {
// sleep 1.5s at 2s trigger 1st time
// sleep 3.5s at 4s trigger 2nd time
const executions = await workflow.getExecutions();
const executions = await workflow.getExecutions({ order: [['createdAt', 'ASC']] });
expect(executions.length).toBe(2);
const d1 = Date.parse(executions[0].context.date);
expect(d1 - 1500).toBe(startTime.getTime());

View File

@ -1,6 +1,7 @@
import parser from 'cron-parser';
import { literal, Op, where, fn } from 'sequelize';
import Plugin, { Trigger } from '..';
import WorkflowModel from '../models/Workflow';
export type ScheduleOnField = string | {
field: string;
@ -26,10 +27,10 @@ export const SCHEDULE_MODE = {
} as const;
interface ScheduleMode {
on?(this: ScheduleTrigger, workflow): void;
off?(this: ScheduleTrigger, workflow): void;
shouldCache(this: ScheduleTrigger, workflow, now: Date): Promise<boolean> | boolean;
trigger(this: ScheduleTrigger, workflow, now: Date): any;
on?(this: ScheduleTrigger, workflow: WorkflowModel): void;
off?(this: ScheduleTrigger, workflow: WorkflowModel): void;
shouldCache(this: ScheduleTrigger, workflow: WorkflowModel, now: Date): Promise<boolean> | boolean;
trigger(this: ScheduleTrigger, workflow: WorkflowModel, now: Date): any;
}
const ScheduleModes = new Map<number, ScheduleMode>();
@ -38,62 +39,77 @@ ScheduleModes.set(SCHEDULE_MODE.CONSTANT, {
shouldCache(workflow, now) {
const { startsOn, endsOn, repeat } = workflow.config;
const timestamp = now.getTime();
if (startsOn) {
const startTime = Date.parse(startsOn);
if (!startTime || (startTime > timestamp + this.cacheCycle)) {
return false;
}
const startTime = Date.parse(startsOn);
if (!startTime || (startTime > timestamp + this.cacheCycle)) {
return false;
}
if (repeat) {
if (typeof repeat === 'number'
&& repeat > this.cacheCycle
&& (timestamp - startTime) % repeat > this.cacheCycle
) {
return false;
}
}
if (endsOn) {
const endTime = Date.parse(endsOn);
if (!endTime || endTime <= timestamp) {
if (endsOn) {
const endTime = Date.parse(endsOn);
if (!endTime || endTime <= timestamp) {
return false;
}
}
} else {
if (startTime < timestamp) {
return false;
}
}
return true;
},
trigger(workflow, date) {
trigger(workflow, now) {
const { startsOn, endsOn, repeat } = workflow.config;
if (startsOn) {
const startTime = Math.floor(Date.parse(startsOn) / 1000) * 1000;
const timestamp = now.getTime();
const startTime = Math.floor(Date.parse(startsOn) / 1000) * 1000;
if (repeat) {
if (typeof repeat === 'number') {
if (Math.round(date.getTime() - startTime) % repeat) {
if (Math.round(timestamp - startTime) % repeat) {
return;
}
}
if (endsOn) {
const endTime = Date.parse(endsOn);
if (!endTime || endTime < timestamp) {
return false;
}
}
} else {
if (startTime !== timestamp) {
return;
}
}
return this.plugin.trigger(workflow, { date });
return this.plugin.trigger(workflow, { date: now });
}
});
function getDateRangeFilter(on: ScheduleOnField, now: Date, dir: number) {
const timestamp = now.getTime();
const op = dir < 0 ? Op.lt : Op.gte;
function getOnTimestampWithOffset(on, now: Date) {
switch (typeof on) {
case 'string':
const time = Date.parse(on);
if (!time || (dir < 0 ? (timestamp < time) : (time <= timestamp))) {
return null;
}
break;
return Date.parse(on);
case 'object':
const { field, offset = 0, unit = 1000 } = on;
if (!field) {
return {};
return null;
}
return { [field]: { [op]: new Date(timestamp + offset * unit * dir) } };
const timestamp = now.getTime();
// onDate + offset > now
// onDate > now - offset
return timestamp - offset * unit;
default:
break;
return null;
}
return {};
}
function getDataOptionTime(data, on, dir = 1) {
@ -103,13 +119,13 @@ function getDataOptionTime(data, on, dir = 1) {
return time ? time : null;
case 'object':
const { field, offset = 0, unit = 1000 } = on;
return data[field] ? data[field].getTime() - offset * unit * dir : null;
return data.get(field) ? data.get(field).getTime() - offset * unit * dir : null;
default:
return null;
}
}
function getHookId(workflow, type) {
function getHookId(workflow, type: string) {
return `${type}#${workflow.id}`;
}
@ -131,39 +147,39 @@ ScheduleModes.set(SCHEDULE_MODE.COLLECTION_FIELD, {
const { collection, startsOn, endsOn, repeat } = workflow.config;
const event = `${collection}.afterSave`;
const name = getHookId(workflow, event);
if (!this.events.has(name)) {
// NOTE: toggle cache depends on new date
const listener = async (data, options) => {
const now = new Date();
now.setMilliseconds(0);
const timestamp = now.getTime();
const startTime = getDataOptionTime(data, startsOn);
const endTime = getDataOptionTime(data, endsOn, -1);
if (!startTime && !repeat) {
return;
}
if (startTime && startTime > timestamp + this.cacheCycle) {
return;
}
if (endTime && endTime <= timestamp) {
return;
}
if (!nextInCycle.call(this, workflow, now)) {
return;
}
if (typeof repeat === 'number'
&& repeat > this.cacheCycle
&& (timestamp - startTime) % repeat > this.cacheCycle
) {
return;
}
console.log('set cache', now);
this.setCache(workflow);
};
this.events.set(name, listener);
this.plugin.app.db.on(`${collection}.afterSave`, listener);
if (this.events.has(name)) {
return;
}
// NOTE: toggle cache depends on new date
const listener = async (data, options) => {
const now = new Date();
now.setMilliseconds(0);
const timestamp = now.getTime();
const startTime = getDataOptionTime(data, startsOn);
const endTime = getDataOptionTime(data, endsOn, -1);
if (!startTime && !repeat) {
return;
}
if (startTime && startTime > timestamp + this.cacheCycle) {
return;
}
if (endTime && endTime <= timestamp) {
return;
}
if (!matchNext.call(this, workflow, now)) {
return;
}
if (typeof repeat === 'number'
&& repeat > this.cacheCycle
&& (timestamp - startTime) % repeat > this.cacheCycle
) {
return;
}
this.setCache(workflow);
};
this.events.set(name, listener);
this.plugin.app.db.on(event, listener);
},
off(workflow) {
@ -173,37 +189,66 @@ ScheduleModes.set(SCHEDULE_MODE.COLLECTION_FIELD, {
if (this.events.has(name)) {
const listener = this.events.get(name);
this.events.delete(name);
this.plugin.app.db.off(`${collection}.afterSave`, listener);
this.plugin.app.db.off(event, listener);
}
},
async shouldCache(workflow, now) {
const { db } = this.plugin.app;
const { startsOn, endsOn, repeat, collection } = workflow.config;
const starts = getDateRangeFilter(startsOn, now, -1);
if (!starts || !Object.keys(starts).length) {
return false;
}
const ends = getDateRangeFilter(endsOn, now, 1);
if (!ends) {
const timestamp = now.getTime();
const startTimestamp = getOnTimestampWithOffset(startsOn, now);
if (!startTimestamp) {
return false;
}
const conditions: any[] = [starts, ends].filter(item => Boolean(Object.keys(item).length));
const conditions: any[] = [
{
[startsOn.field]: {
[Op.lt]: new Date(startTimestamp + this.cacheCycle)
}
}
];
// when repeat is number, means repeat after startsOn
// (now - startsOn) % repeat <= cacheCycle
const { db } = this.plugin.app;
const tsFn = DialectTimestampFnMap[db.options.dialect];
if (repeat
&& typeof repeat === 'number'
&& repeat > this.cacheCycle
&& tsFn
) {
const uts = now.getTime();
conditions.push(where(
fn('MOD', literal(`${Math.round(uts / 1000)} - ${tsFn(startsOn.field)}`), Math.round(repeat / 1000)),
{ [Op.lt]: Math.round(this.cacheCycle / 1000) }
));
// conditions.push(literal(`mod(${uts} - ${tsFn(startsOn.field)} * 1000, ${repeat}) < ${this.cacheCycle}`));
if (repeat) {
const tsFn = DialectTimestampFnMap[db.options.dialect];
if (typeof repeat === 'number'
&& repeat > this.cacheCycle
&& tsFn
) {
conditions.push(where(
fn('MOD', literal(`${Math.round(timestamp / 1000)} - ${tsFn(startsOn.field)}`), Math.round(repeat / 1000)),
{ [Op.lt]: Math.round(this.cacheCycle / 1000) }
));
// conditions.push(literal(`mod(${timestamp} - ${tsFn(startsOn.field)} * 1000, ${repeat}) < ${this.cacheCycle}`));
}
if (endsOn) {
const endTimestamp = getOnTimestampWithOffset(endsOn, now);
if (!endTimestamp) {
return false;
}
if (typeof endsOn === 'string') {
if (endTimestamp <= timestamp) {
return false;
}
} else {
conditions.push({
[endsOn.field]: {
[Op.gte]: new Date(endTimestamp + this.interval)
}
});
}
}
} else {
conditions.push({
[startsOn.field]: {
[Op.gte]: new Date(startTimestamp)
}
});
}
const { model } = db.getCollection(collection);
@ -214,31 +259,24 @@ ScheduleModes.set(SCHEDULE_MODE.COLLECTION_FIELD, {
return Boolean(count);
},
async trigger(workflow, date) {
const {
collection,
startsOn,
endsOn,
repeat
} = workflow.config;
async trigger(workflow, now: Date) {
const { startsOn, repeat, endsOn, collection } = workflow.config;
const timestamp = now.getTime();
if (typeof startsOn !== 'object') {
return;
const startTimestamp = getOnTimestampWithOffset(startsOn, now);
if (!startTimestamp) {
return false;
}
const timestamp = date.getTime();
const startTimestamp = timestamp - (startsOn.offset ?? 0) * (startsOn.unit ?? 1000);
const conditions = [];
if (!repeat) {
// startsOn exactly equal to now in 1s
conditions.push({
const conditions: any[] = [
{
[startsOn.field]: {
[Op.gte]: new Date(startTimestamp),
[Op.lt]: new Date(startTimestamp + 1000)
[Op.lt]: new Date(startTimestamp + this.interval)
}
});
} else {
}
];
if (repeat) {
// startsOn not after now
conditions.push({
[startsOn.field]: {
@ -255,25 +293,31 @@ ScheduleModes.set(SCHEDULE_MODE.COLLECTION_FIELD, {
// conditions.push(literal(`MOD(CAST(${timestamp} AS BIGINT) - CAST((FLOOR(${tsFn(startsOn.field)}) AS BIGINT) * 1000), ${repeat}) = 0`));
}
switch (typeof endsOn) {
case 'string':
const endTime = Date.parse(endsOn);
if (!endTime || endTime <= timestamp) {
return;
if (endsOn) {
const endTimestamp = getOnTimestampWithOffset(endsOn, now);
if (!endTimestamp) {
return false;
}
if (typeof endsOn === 'string') {
if (endTimestamp <= timestamp) {
return false;
}
break;
case 'object':
if (endsOn.field) {
conditions.push({
[endsOn.field]: {
[Op.gte]: new Date(timestamp - (endsOn.offset ?? 0) * (endsOn.unit ?? 1000) + 1000)
}
});
}
break;
default:
break;
} else {
conditions.push({
[endsOn.field]: {
[Op.gte]: new Date(endTimestamp + this.interval)
}
});
}
}
} else {
// startsOn exactly equal to now in 1s
conditions.push({
[startsOn.field]: {
[Op.gte]: new Date(startTimestamp)
}
});
}
const { model } = this.plugin.app.db.getCollection(collection);
@ -284,12 +328,12 @@ ScheduleModes.set(SCHEDULE_MODE.COLLECTION_FIELD, {
});
if (instances.length) {
console.log(instances.length, 'rows trigger at', date);
console.log(instances.length, 'rows trigger at', now);
}
instances.forEach(item => {
this.plugin.trigger(workflow, {
date,
date: now,
data: item.get()
});
});
@ -297,23 +341,16 @@ ScheduleModes.set(SCHEDULE_MODE.COLLECTION_FIELD, {
});
function nextInCycle(this: ScheduleTrigger, workflow, now: Date): boolean {
function matchNext(this: ScheduleTrigger, workflow, now: Date, range: number = this.cacheCycle): boolean {
const { repeat } = workflow.config;
// no repeat means no need to rerun
// but if in current cycle, should be put in cache
// no repeat but in current cycle means startsOn has been configured
// so we need to more info to determine if necessary config items
if (!repeat) {
if (typeof repeat !== 'string') {
return true;
}
switch (typeof repeat) {
case 'string':
break;
default:
return true;
}
const currentDate = new Date(now);
currentDate.setMilliseconds(-1);
const timestamp = now.getTime();
@ -321,18 +358,17 @@ function nextInCycle(this: ScheduleTrigger, workflow, now: Date): boolean {
let next = interval.next();
// NOTE: cache all workflows will be matched in current cycle
if (next.getTime() - timestamp <= this.cacheCycle) {
if (next.getTime() - timestamp <= range) {
return true;
}
return false;
}
export default class ScheduleTrigger extends Trigger {
static CacheRules = [
({ config, allExecuted }) => config.limit ? allExecuted < config.limit : true,
({ config, executed }) => !executed || config.repeat != null,
({ config }) => ['repeat', 'startsOn'].some(key => config[key]),
nextInCycle,
({ config, allExecuted }) => (config.limit ? allExecuted < config.limit : true) && config.startsOn,
matchNext,
function(workflow, now) {
const { mode } = workflow.config;
const modeHandlers = ScheduleModes.get(mode);
@ -341,26 +377,9 @@ export default class ScheduleTrigger extends Trigger {
];
static TriggerRules = [
({ config, allExecuted }) => config.limit ? allExecuted < config.limit : true,
({ config, executed }) => !executed || config.repeat != null,
({ config }) => ['repeat', 'startsOn'].some(key => config[key]),
({ config, allExecuted }) => (config.limit ? allExecuted < config.limit : true) && config.startsOn,
function (workflow, now) {
const { repeat } = workflow.config;
if (typeof repeat !== 'string') {
return true;
}
const currentDate = new Date(now);
currentDate.setMilliseconds(-1);
const timestamp = now.getTime();
const interval = parser.parseExpression(repeat, { currentDate });
let next = interval.next();
if (next.getTime() === timestamp) {
return true;
}
return false;
return matchNext.call(this, workflow, now, 0);
}
];
@ -386,25 +405,27 @@ export default class ScheduleTrigger extends Trigger {
}
init() {
if (!this.timer) {
const now = new Date();
// NOTE: assign to this.timer to avoid duplicated initialization
this.timer = setTimeout(
() => {
this.timer = setInterval(this.run, this.interval);
// initially trigger
// this.onTick(now);
},
// NOTE:
// try to align to system time on each second starts,
// after at least 1 second initialized for anything to get ready.
// so jobs in 2 seconds will be missed at first start.
1_000 - now.getMilliseconds()
);
if (this.timer) {
return;
}
const now = new Date();
// NOTE: assign to this.timer to avoid duplicated initialization
this.timer = setTimeout(
() => {
this.timer = setInterval(this.run, this.interval);
// initially trigger
// this.onTick(now);
},
// NOTE:
// try to align to system time on each second starts,
// after at least 1 second initialized for anything to get ready.
// so jobs in 2 seconds will be missed at first start.
1_000 - now.getMilliseconds()
);
}
run = () => {