fix(plugin-workflow-request): fix request hanging when invalid header value (#4376)

* fix(plugin-workflow-request): fix request hanging when invalid header value

* fix(plugin-workflow-request): add trim for variable string field

* feat(plugin-workflow): make error result more sensible

* feat(plugin-workflow-request): unify response structure

* fix(plugin-workflow-request): fix test cases
This commit is contained in:
Junyi 2024-05-18 17:25:51 +08:00 committed by GitHub
parent 08a0b026b6
commit 9535116189
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 256 additions and 42 deletions

View File

@ -20,6 +20,7 @@ export type RequestConfig = Pick<AxiosRequestConfig, 'url' | 'method' | 'params'
headers: Array<Header>; headers: Array<Header>;
contentType: string; contentType: string;
ignoreFail: boolean; ignoreFail: boolean;
onlyData?: boolean;
}; };
const ContentTypeTransformers = { const ContentTypeTransformers = {
@ -37,16 +38,14 @@ async function request(config) {
// default headers // default headers
const { url, method = 'POST', contentType = 'application/json', data, timeout = 5000 } = config; const { url, method = 'POST', contentType = 'application/json', data, timeout = 5000 } = config;
const headers = (config.headers ?? []).reduce((result, header) => { const headers = (config.headers ?? []).reduce((result, header) => {
if (header.name.toLowerCase() === 'content-type') { const name = header.name?.trim();
// header.value = ['application/json', 'application/x-www-form-urlencoded'].includes(header.value) if (name.toLowerCase() === 'content-type') {
// ? header.value
// : 'application/json';
return result; return result;
} }
return Object.assign(result, { [header.name]: header.value }); return Object.assign(result, { [name]: header.value?.trim() });
}, {}); }, {});
const params = (config.params ?? []).reduce( const params = (config.params ?? []).reduce(
(result, param) => Object.assign(result, { [param.name]: param.value }), (result, param) => Object.assign(result, { [param.name]: param.value?.trim() }),
{}, {},
); );
@ -54,7 +53,7 @@ async function request(config) {
headers['Content-Type'] = contentType; headers['Content-Type'] = contentType;
return axios.request({ return axios.request({
url, url: url?.trim(),
method, method,
headers, headers,
params, params,
@ -67,6 +66,39 @@ async function request(config) {
}); });
} }
function successResponse(response, onlyData = false) {
return onlyData
? response.data
: {
status: response.status,
statusText: response.statusText,
headers: response.headers,
config: response.config,
data: response.data,
};
}
function failedResponse(error) {
let result = {
message: error.message,
stack: error.stack,
};
if (error.isAxiosError) {
if (error.response) {
Object.assign(result, {
status: error.response.status,
statusText: error.response.statusText,
headers: error.response.headers,
config: error.response.config,
data: error.response.data,
});
} else if (error.request) {
result = error.toJSON();
}
}
return result;
}
export default class extends Instruction { export default class extends Instruction {
async run(node: FlowNodeModel, prevJob, processor: Processor) { async run(node: FlowNodeModel, prevJob, processor: Processor) {
const config = processor.getParsedValue(node.config, node.id) as RequestConfig; const config = processor.getParsedValue(node.config, node.id) as RequestConfig;
@ -79,7 +111,7 @@ export default class extends Instruction {
const response = await request(config); const response = await request(config);
return { return {
status: JOB_STATUS.RESOLVED, status: JOB_STATUS.RESOLVED,
result: response.data, result: successResponse(response, config.onlyData),
}; };
} catch (error) { } catch (error) {
return { return {
@ -99,26 +131,37 @@ export default class extends Instruction {
// eslint-disable-next-line promise/catch-or-return // eslint-disable-next-line promise/catch-or-return
request(config) request(config)
.then((response) => { .then((response) => {
processor.logger.info(`request (#${node.id}) response success, status: ${response.status}`);
job.set({ job.set({
status: JOB_STATUS.RESOLVED, status: JOB_STATUS.RESOLVED,
result: response.data, result: successResponse(response, config.onlyData),
}); });
processor.logger.info(`request (#${node.id}) response success, status: ${response.status}`);
}) })
.catch((error) => { .catch((error) => {
if (error.isAxiosError) {
if (error.response) {
processor.logger.info(`request (#${node.id}) failed with response, status: ${error.response.status}`);
} else if (error.request) {
processor.logger.error(`request (#${node.id}) failed without resposne: ${error.message}`);
} else {
processor.logger.error(`request (#${node.id}) initiation failed: ${error.message}`);
}
} else {
processor.logger.error(`request (#${node.id}) failed unexpectedly: ${error.message}`);
}
job.set({ job.set({
status: JOB_STATUS.FAILED, status: JOB_STATUS.FAILED,
result: error.isAxiosError ? error.toJSON() : error.message, result: failedResponse(error),
}); });
if (error.response) {
processor.logger.info(`request (#${node.id}) response failed, status: ${error.response.status}`);
} else {
processor.logger.error(`request (#${node.id}) response failed: ${error.message}`);
}
}) })
.finally(() => { .finally(() => {
processor.logger.debug(`request (#${node.id}) ended, resume workflow...`);
setImmediate(() => {
this.workflow.resume(job); this.workflow.resume(job);
}); });
});
processor.logger.info(`request (#${node.id}) sent to "${config.url}", waiting for response...`); processor.logger.info(`request (#${node.id}) sent to "${config.url}", waiting for response...`);

View File

@ -32,12 +32,21 @@ class MockAPI {
get URL_400() { get URL_400() {
return `http://${HOST}:${this.port}/api/400`; return `http://${HOST}:${this.port}/api/400`;
} }
get URL_400_MESSAGE() {
return `http://${HOST}:${this.port}/api/400_message`;
}
get URL_400_OBJECT() {
return `http://${HOST}:${this.port}/api/400_object`;
}
get URL_404() { get URL_404() {
return `http://${HOST}:${this.port}/api/404`; return `http://${HOST}:${this.port}/api/404`;
} }
get URL_TIMEOUT() { get URL_TIMEOUT() {
return `http://${HOST}:${this.port}/api/timeout`; return `http://${HOST}:${this.port}/api/timeout`;
} }
get URL_END() {
return `http://${HOST}:${this.port}/api/end`;
}
constructor() { constructor() {
this.app = new Koa(); this.app = new Koa();
this.app.use(bodyParser()); this.app.use(bodyParser());
@ -46,6 +55,18 @@ class MockAPI {
if (ctx.path === '/api/400') { if (ctx.path === '/api/400') {
return ctx.throw(400); return ctx.throw(400);
} }
if (ctx.path === '/api/400_message') {
return ctx.throw(400, 'bad request message');
}
if (ctx.path === '/api/400_object') {
ctx.body = { a: 1 };
ctx.status = 400;
return;
}
if (ctx.path === '/api/end') {
ctx.res.socket.end();
return;
}
if (ctx.path === '/api/timeout') { if (ctx.path === '/api/timeout') {
await sleep(2000); await sleep(2000);
ctx.status = 204; ctx.status = 204;
@ -122,6 +143,27 @@ describe('workflow > instructions > request', () => {
}); });
describe('request static app routes', () => { describe('request static app routes', () => {
it('get data (legacy)', async () => {
await workflow.createNode({
type: 'request',
config: {
url: api.URL_DATA,
method: 'GET',
onlyData: true,
} as RequestConfig,
});
await PostRepo.create({ values: { title: 't1' } });
await sleep(500);
const [execution] = await workflow.getExecutions();
expect(execution.status).toBe(EXECUTION_STATUS.RESOLVED);
const [job] = await execution.getJobs();
expect(job.status).toBe(JOB_STATUS.RESOLVED);
expect(job.result).toMatchObject({ meta: {}, data: {} });
});
it('get data', async () => { it('get data', async () => {
await workflow.createNode({ await workflow.createNode({
type: 'request', type: 'request',
@ -136,10 +178,12 @@ describe('workflow > instructions > request', () => {
await sleep(500); await sleep(500);
const [execution] = await workflow.getExecutions(); const [execution] = await workflow.getExecutions();
expect(execution.status).toEqual(EXECUTION_STATUS.RESOLVED); expect(execution.status).toBe(EXECUTION_STATUS.RESOLVED);
const [job] = await execution.getJobs(); const [job] = await execution.getJobs();
expect(job.status).toEqual(JOB_STATUS.RESOLVED); expect(job.status).toBe(JOB_STATUS.RESOLVED);
expect(job.result).toEqual({ meta: {}, data: {} }); expect(job.result).toMatchObject({
data: { meta: {}, data: {} },
});
}); });
it('timeout', async () => { it('timeout', async () => {
@ -158,7 +202,7 @@ describe('workflow > instructions > request', () => {
const [execution] = await workflow.getExecutions(); const [execution] = await workflow.getExecutions();
const [job] = await execution.getJobs(); const [job] = await execution.getJobs();
expect(job.status).toEqual(JOB_STATUS.FAILED); expect(job.status).toBe(JOB_STATUS.FAILED);
expect(job.result).toMatchObject({ expect(job.result).toMatchObject({
code: 'ECONNABORTED', code: 'ECONNABORTED',
@ -188,7 +232,7 @@ describe('workflow > instructions > request', () => {
const [execution] = await workflow.getExecutions(); const [execution] = await workflow.getExecutions();
const [job] = await execution.getJobs(); const [job] = await execution.getJobs();
expect(job.status).toEqual(JOB_STATUS.RESOLVED); expect(job.status).toBe(JOB_STATUS.RESOLVED);
expect(job.result).toMatchObject({ expect(job.result).toMatchObject({
code: 'ECONNABORTED', code: 'ECONNABORTED',
name: 'Error', name: 'Error',
@ -197,13 +241,12 @@ describe('workflow > instructions > request', () => {
}); });
}); });
it('response 400', async () => { it('response 400 without body', async () => {
await workflow.createNode({ await workflow.createNode({
type: 'request', type: 'request',
config: { config: {
url: api.URL_400, url: api.URL_400,
method: 'GET', method: 'GET',
ignoreFail: false,
} as RequestConfig, } as RequestConfig,
}); });
@ -213,10 +256,74 @@ describe('workflow > instructions > request', () => {
const [execution] = await workflow.getExecutions(); const [execution] = await workflow.getExecutions();
const [job] = await execution.getJobs(); const [job] = await execution.getJobs();
expect(job.status).toEqual(JOB_STATUS.FAILED); expect(job.status).toBe(JOB_STATUS.FAILED);
expect(job.result.status).toBe(400); expect(job.result.status).toBe(400);
}); });
it('response 400 with text message', async () => {
await workflow.createNode({
type: 'request',
config: {
url: api.URL_400_MESSAGE,
method: 'GET',
} as RequestConfig,
});
await PostRepo.create({ values: { title: 't1' } });
await sleep(500);
const [execution] = await workflow.getExecutions();
const [job] = await execution.getJobs();
expect(job.status).toBe(JOB_STATUS.FAILED);
expect(job.result.status).toBe(400);
expect(job.result.data).toBe('bad request message');
});
it('response 400 with object', async () => {
await workflow.createNode({
type: 'request',
config: {
url: api.URL_400_OBJECT,
method: 'GET',
} as RequestConfig,
});
await PostRepo.create({ values: { title: 't1' } });
await sleep(500);
const [execution] = await workflow.getExecutions();
const [job] = await execution.getJobs();
expect(job.status).toBe(JOB_STATUS.FAILED);
expect(job.result.status).toBe(400);
expect(job.result.data).toEqual({ a: 1 });
});
it('response just end', async () => {
await workflow.createNode({
type: 'request',
config: {
url: api.URL_END,
method: 'GET',
} as RequestConfig,
});
await PostRepo.create({ values: { title: 't1' } });
await sleep(500);
const [execution] = await workflow.getExecutions();
const [job] = await execution.getJobs();
expect(job.status).toBe(JOB_STATUS.FAILED);
expect(job.result).toMatchObject({
code: 'ECONNRESET',
name: 'Error',
status: null,
message: 'socket hang up',
});
});
it('response 400 ignoreFail', async () => { it('response 400 ignoreFail', async () => {
await workflow.createNode({ await workflow.createNode({
type: 'request', type: 'request',
@ -234,7 +341,7 @@ describe('workflow > instructions > request', () => {
const [execution] = await workflow.getExecutions(); const [execution] = await workflow.getExecutions();
const [job] = await execution.getJobs(); const [job] = await execution.getJobs();
expect(job.status).toEqual(JOB_STATUS.RESOLVED); expect(job.status).toBe(JOB_STATUS.RESOLVED);
expect(job.result.status).toBe(400); expect(job.result.status).toBe(400);
}); });
@ -254,8 +361,8 @@ describe('workflow > instructions > request', () => {
const [execution] = await workflow.getExecutions(); const [execution] = await workflow.getExecutions();
const [job] = await execution.getJobs(); const [job] = await execution.getJobs();
expect(job.status).toEqual(JOB_STATUS.RESOLVED); expect(job.status).toBe(JOB_STATUS.RESOLVED);
expect(job.result.data).toEqual({ title: 't1' }); expect(job.result.data.data).toEqual({ title: 't1' });
}); });
// TODO(bug): should not use ejs // TODO(bug): should not use ejs
@ -278,8 +385,8 @@ describe('workflow > instructions > request', () => {
const [execution] = await workflow.getExecutions(); const [execution] = await workflow.getExecutions();
const [job] = await execution.getJobs(); const [job] = await execution.getJobs();
expect(job.status).toEqual(JOB_STATUS.RESOLVED); expect(job.status).toBe(JOB_STATUS.RESOLVED);
expect(job.result.data).toEqual({ title }); expect(job.result.data.data).toEqual({ title });
}); });
it.skip('request inside loop', async () => { it.skip('request inside loop', async () => {
@ -305,7 +412,7 @@ describe('workflow > instructions > request', () => {
await sleep(500); await sleep(500);
const [execution] = await workflow.getExecutions(); const [execution] = await workflow.getExecutions();
expect(execution.status).toEqual(EXECUTION_STATUS.RESOLVED); expect(execution.status).toBe(EXECUTION_STATUS.RESOLVED);
const jobs = await execution.getJobs({ order: [['id', 'ASC']] }); const jobs = await execution.getJobs({ order: [['id', 'ASC']] });
expect(jobs.length).toBe(3); expect(jobs.length).toBe(3);
expect(jobs.map((item) => item.status)).toEqual(Array(3).fill(JOB_STATUS.RESOLVED)); expect(jobs.map((item) => item.status)).toEqual(Array(3).fill(JOB_STATUS.RESOLVED));
@ -329,10 +436,10 @@ describe('workflow > instructions > request', () => {
await sleep(500); await sleep(500);
const [execution] = await workflow.getExecutions(); const [execution] = await workflow.getExecutions();
expect(execution.status).toEqual(EXECUTION_STATUS.RESOLVED); expect(execution.status).toBe(EXECUTION_STATUS.RESOLVED);
const [job] = await execution.getJobs(); const [job] = await execution.getJobs();
expect(job.status).toEqual(JOB_STATUS.RESOLVED); expect(job.status).toBe(JOB_STATUS.RESOLVED);
expect(job.result.data).toEqual({ a: 't1' }); expect(job.result.data.data).toEqual({ a: 't1' });
}); });
it('contentType as "application/x-www-form-urlencoded"', async () => { it('contentType as "application/x-www-form-urlencoded"', async () => {
@ -354,10 +461,34 @@ describe('workflow > instructions > request', () => {
await sleep(500); await sleep(500);
const [execution] = await workflow.getExecutions(); const [execution] = await workflow.getExecutions();
expect(execution.status).toEqual(EXECUTION_STATUS.RESOLVED); expect(execution.status).toBe(EXECUTION_STATUS.RESOLVED);
const [job] = await execution.getJobs(); const [job] = await execution.getJobs();
expect(job.status).toEqual(JOB_STATUS.RESOLVED); expect(job.status).toBe(JOB_STATUS.RESOLVED);
expect(job.result.data).toEqual({ a: ['t1', '&=1'] }); expect(job.result.data.data).toEqual({ a: ['t1', '&=1'] });
});
});
describe('invalid characters', () => {
it('\\n in header value should be trimed, and should not cause error', async () => {
const n1 = await workflow.createNode({
type: 'request',
config: {
url: api.URL_DATA,
method: 'POST',
data: { a: '{{$context.data.title}}' },
headers: [{ name: 'Authorization', value: 'abc\n' }],
},
});
await PostRepo.create({ values: { title: 't1' } });
await sleep(500);
const [execution] = await workflow.getExecutions();
expect(execution.status).toBe(EXECUTION_STATUS.RESOLVED);
const [job] = await execution.getJobs();
expect(job.status).toBe(JOB_STATUS.RESOLVED);
expect(job.result.data.data).toEqual({ a: 't1' });
}); });
}); });
@ -398,7 +529,7 @@ describe('workflow > instructions > request', () => {
expect(execution.status).toBe(EXECUTION_STATUS.RESOLVED); expect(execution.status).toBe(EXECUTION_STATUS.RESOLVED);
const [job] = await execution.getJobs(); const [job] = await execution.getJobs();
expect(job.status).toBe(JOB_STATUS.RESOLVED); expect(job.status).toBe(JOB_STATUS.RESOLVED);
expect(job.result.data).toMatchObject({}); expect(job.result.data.data).toMatchObject({});
server.close(); server.close();
}); });
@ -428,11 +559,11 @@ describe('workflow > instructions > request', () => {
const [execution] = await syncFlow.getExecutions(); const [execution] = await syncFlow.getExecutions();
expect(processor.execution.id).toEqual(execution.id); expect(processor.execution.id).toEqual(execution.id);
expect(processor.execution.status).toEqual(execution.status); expect(processor.execution.status).toBe(execution.status);
expect(execution.status).toEqual(EXECUTION_STATUS.RESOLVED); expect(execution.status).toBe(EXECUTION_STATUS.RESOLVED);
const [job] = await execution.getJobs(); const [job] = await execution.getJobs();
expect(job.status).toEqual(JOB_STATUS.RESOLVED); expect(job.status).toBe(JOB_STATUS.RESOLVED);
expect(job.result).toEqual({ meta: {}, data: {} }); expect(job.result.data).toEqual({ meta: {}, data: {} });
}); });
it('ignoreFail', async () => { it('ignoreFail', async () => {

View File

@ -0,0 +1,40 @@
/**
* This file is part of the NocoBase (R) project.
* Copyright (c) 2020-2024 NocoBase Co., Ltd.
* Authors: NocoBase Team.
*
* This project is dual-licensed under AGPL-3.0 and NocoBase Commercial License.
* For more information, please refer to: https://www.nocobase.com/agreement.
*/
import { Migration } from '@nocobase/server';
export default class extends Migration {
appVersion = '<1.0.0-alpha.15';
async up() {
const { db } = this.context;
const NodeRepo = db.getRepository('flow_nodes');
await db.sequelize.transaction(async (transaction) => {
const nodes = await NodeRepo.find({
filter: {
type: 'request',
},
transaction,
});
await nodes.reduce(
(promise, node) =>
promise.then(() => {
node.set('config', { ...node.config, onlyData: true });
node.changed('config', true);
return node.save({
silent: true,
transaction,
});
}),
Promise.resolve(),
);
});
}
}