From 9535116189aee4a6cfcc706333a7d6f4627924bb Mon Sep 17 00:00:00 2001 From: Junyi Date: Sat, 18 May 2024 17:25:51 +0800 Subject: [PATCH] fix(plugin-workflow-request): fix request hanging when invalid header value (#4376) * fix(plugin-workflow-request): fix request hanging when invalid header value * fix(plugin-workflow-request): add trim for variable string field * feat(plugin-workflow): make error result more sensible * feat(plugin-workflow-request): unify response structure * fix(plugin-workflow-request): fix test cases --- .../src/server/RequestInstruction.ts | 77 ++++++-- .../src/server/__tests__/instruction.test.ts | 181 +++++++++++++++--- ...518105632-make-legacy-config-compatible.ts | 40 ++++ 3 files changed, 256 insertions(+), 42 deletions(-) create mode 100644 packages/plugins/@nocobase/plugin-workflow-request/src/server/migrations/20240518105632-make-legacy-config-compatible.ts diff --git a/packages/plugins/@nocobase/plugin-workflow-request/src/server/RequestInstruction.ts b/packages/plugins/@nocobase/plugin-workflow-request/src/server/RequestInstruction.ts index 200019dd75..701e02039e 100644 --- a/packages/plugins/@nocobase/plugin-workflow-request/src/server/RequestInstruction.ts +++ b/packages/plugins/@nocobase/plugin-workflow-request/src/server/RequestInstruction.ts @@ -20,6 +20,7 @@ export type RequestConfig = Pick; contentType: string; ignoreFail: boolean; + onlyData?: boolean; }; const ContentTypeTransformers = { @@ -37,16 +38,14 @@ async function request(config) { // default headers const { url, method = 'POST', contentType = 'application/json', data, timeout = 5000 } = config; const headers = (config.headers ?? []).reduce((result, header) => { - if (header.name.toLowerCase() === 'content-type') { - // header.value = ['application/json', 'application/x-www-form-urlencoded'].includes(header.value) - // ? header.value - // : 'application/json'; + const name = header.name?.trim(); + if (name.toLowerCase() === 'content-type') { return result; } - return Object.assign(result, { [header.name]: header.value }); + return Object.assign(result, { [name]: header.value?.trim() }); }, {}); const params = (config.params ?? []).reduce( - (result, param) => Object.assign(result, { [param.name]: param.value }), + (result, param) => Object.assign(result, { [param.name]: param.value?.trim() }), {}, ); @@ -54,7 +53,7 @@ async function request(config) { headers['Content-Type'] = contentType; return axios.request({ - url, + url: url?.trim(), method, headers, params, @@ -67,6 +66,39 @@ async function request(config) { }); } +function successResponse(response, onlyData = false) { + return onlyData + ? response.data + : { + status: response.status, + statusText: response.statusText, + headers: response.headers, + config: response.config, + data: response.data, + }; +} + +function failedResponse(error) { + let result = { + message: error.message, + stack: error.stack, + }; + if (error.isAxiosError) { + if (error.response) { + Object.assign(result, { + status: error.response.status, + statusText: error.response.statusText, + headers: error.response.headers, + config: error.response.config, + data: error.response.data, + }); + } else if (error.request) { + result = error.toJSON(); + } + } + return result; +} + export default class extends Instruction { async run(node: FlowNodeModel, prevJob, processor: Processor) { const config = processor.getParsedValue(node.config, node.id) as RequestConfig; @@ -79,7 +111,7 @@ export default class extends Instruction { const response = await request(config); return { status: JOB_STATUS.RESOLVED, - result: response.data, + result: successResponse(response, config.onlyData), }; } catch (error) { return { @@ -99,25 +131,36 @@ export default class extends Instruction { // eslint-disable-next-line promise/catch-or-return request(config) .then((response) => { + processor.logger.info(`request (#${node.id}) response success, status: ${response.status}`); + job.set({ status: JOB_STATUS.RESOLVED, - result: response.data, + result: successResponse(response, config.onlyData), }); - processor.logger.info(`request (#${node.id}) response success, status: ${response.status}`); }) .catch((error) => { + if (error.isAxiosError) { + if (error.response) { + processor.logger.info(`request (#${node.id}) failed with response, status: ${error.response.status}`); + } else if (error.request) { + processor.logger.error(`request (#${node.id}) failed without resposne: ${error.message}`); + } else { + processor.logger.error(`request (#${node.id}) initiation failed: ${error.message}`); + } + } else { + processor.logger.error(`request (#${node.id}) failed unexpectedly: ${error.message}`); + } + job.set({ status: JOB_STATUS.FAILED, - result: error.isAxiosError ? error.toJSON() : error.message, + result: failedResponse(error), }); - if (error.response) { - processor.logger.info(`request (#${node.id}) response failed, status: ${error.response.status}`); - } else { - processor.logger.error(`request (#${node.id}) response failed: ${error.message}`); - } }) .finally(() => { - this.workflow.resume(job); + processor.logger.debug(`request (#${node.id}) ended, resume workflow...`); + setImmediate(() => { + this.workflow.resume(job); + }); }); processor.logger.info(`request (#${node.id}) sent to "${config.url}", waiting for response...`); diff --git a/packages/plugins/@nocobase/plugin-workflow-request/src/server/__tests__/instruction.test.ts b/packages/plugins/@nocobase/plugin-workflow-request/src/server/__tests__/instruction.test.ts index 05ecc56fe2..c762ad09cf 100644 --- a/packages/plugins/@nocobase/plugin-workflow-request/src/server/__tests__/instruction.test.ts +++ b/packages/plugins/@nocobase/plugin-workflow-request/src/server/__tests__/instruction.test.ts @@ -32,12 +32,21 @@ class MockAPI { get URL_400() { return `http://${HOST}:${this.port}/api/400`; } + get URL_400_MESSAGE() { + return `http://${HOST}:${this.port}/api/400_message`; + } + get URL_400_OBJECT() { + return `http://${HOST}:${this.port}/api/400_object`; + } get URL_404() { return `http://${HOST}:${this.port}/api/404`; } get URL_TIMEOUT() { return `http://${HOST}:${this.port}/api/timeout`; } + get URL_END() { + return `http://${HOST}:${this.port}/api/end`; + } constructor() { this.app = new Koa(); this.app.use(bodyParser()); @@ -46,6 +55,18 @@ class MockAPI { if (ctx.path === '/api/400') { return ctx.throw(400); } + if (ctx.path === '/api/400_message') { + return ctx.throw(400, 'bad request message'); + } + if (ctx.path === '/api/400_object') { + ctx.body = { a: 1 }; + ctx.status = 400; + return; + } + if (ctx.path === '/api/end') { + ctx.res.socket.end(); + return; + } if (ctx.path === '/api/timeout') { await sleep(2000); ctx.status = 204; @@ -122,6 +143,27 @@ describe('workflow > instructions > request', () => { }); describe('request static app routes', () => { + it('get data (legacy)', async () => { + await workflow.createNode({ + type: 'request', + config: { + url: api.URL_DATA, + method: 'GET', + onlyData: true, + } as RequestConfig, + }); + + await PostRepo.create({ values: { title: 't1' } }); + + await sleep(500); + + const [execution] = await workflow.getExecutions(); + expect(execution.status).toBe(EXECUTION_STATUS.RESOLVED); + const [job] = await execution.getJobs(); + expect(job.status).toBe(JOB_STATUS.RESOLVED); + expect(job.result).toMatchObject({ meta: {}, data: {} }); + }); + it('get data', async () => { await workflow.createNode({ type: 'request', @@ -136,10 +178,12 @@ describe('workflow > instructions > request', () => { await sleep(500); const [execution] = await workflow.getExecutions(); - expect(execution.status).toEqual(EXECUTION_STATUS.RESOLVED); + expect(execution.status).toBe(EXECUTION_STATUS.RESOLVED); const [job] = await execution.getJobs(); - expect(job.status).toEqual(JOB_STATUS.RESOLVED); - expect(job.result).toEqual({ meta: {}, data: {} }); + expect(job.status).toBe(JOB_STATUS.RESOLVED); + expect(job.result).toMatchObject({ + data: { meta: {}, data: {} }, + }); }); it('timeout', async () => { @@ -158,7 +202,7 @@ describe('workflow > instructions > request', () => { const [execution] = await workflow.getExecutions(); const [job] = await execution.getJobs(); - expect(job.status).toEqual(JOB_STATUS.FAILED); + expect(job.status).toBe(JOB_STATUS.FAILED); expect(job.result).toMatchObject({ code: 'ECONNABORTED', @@ -188,7 +232,7 @@ describe('workflow > instructions > request', () => { const [execution] = await workflow.getExecutions(); const [job] = await execution.getJobs(); - expect(job.status).toEqual(JOB_STATUS.RESOLVED); + expect(job.status).toBe(JOB_STATUS.RESOLVED); expect(job.result).toMatchObject({ code: 'ECONNABORTED', name: 'Error', @@ -197,13 +241,12 @@ describe('workflow > instructions > request', () => { }); }); - it('response 400', async () => { + it('response 400 without body', async () => { await workflow.createNode({ type: 'request', config: { url: api.URL_400, method: 'GET', - ignoreFail: false, } as RequestConfig, }); @@ -213,10 +256,74 @@ describe('workflow > instructions > request', () => { const [execution] = await workflow.getExecutions(); const [job] = await execution.getJobs(); - expect(job.status).toEqual(JOB_STATUS.FAILED); + expect(job.status).toBe(JOB_STATUS.FAILED); expect(job.result.status).toBe(400); }); + it('response 400 with text message', async () => { + await workflow.createNode({ + type: 'request', + config: { + url: api.URL_400_MESSAGE, + method: 'GET', + } as RequestConfig, + }); + + await PostRepo.create({ values: { title: 't1' } }); + + await sleep(500); + + const [execution] = await workflow.getExecutions(); + const [job] = await execution.getJobs(); + expect(job.status).toBe(JOB_STATUS.FAILED); + expect(job.result.status).toBe(400); + expect(job.result.data).toBe('bad request message'); + }); + + it('response 400 with object', async () => { + await workflow.createNode({ + type: 'request', + config: { + url: api.URL_400_OBJECT, + method: 'GET', + } as RequestConfig, + }); + + await PostRepo.create({ values: { title: 't1' } }); + + await sleep(500); + + const [execution] = await workflow.getExecutions(); + const [job] = await execution.getJobs(); + expect(job.status).toBe(JOB_STATUS.FAILED); + expect(job.result.status).toBe(400); + expect(job.result.data).toEqual({ a: 1 }); + }); + + it('response just end', async () => { + await workflow.createNode({ + type: 'request', + config: { + url: api.URL_END, + method: 'GET', + } as RequestConfig, + }); + + await PostRepo.create({ values: { title: 't1' } }); + + await sleep(500); + + const [execution] = await workflow.getExecutions(); + const [job] = await execution.getJobs(); + expect(job.status).toBe(JOB_STATUS.FAILED); + expect(job.result).toMatchObject({ + code: 'ECONNRESET', + name: 'Error', + status: null, + message: 'socket hang up', + }); + }); + it('response 400 ignoreFail', async () => { await workflow.createNode({ type: 'request', @@ -234,7 +341,7 @@ describe('workflow > instructions > request', () => { const [execution] = await workflow.getExecutions(); const [job] = await execution.getJobs(); - expect(job.status).toEqual(JOB_STATUS.RESOLVED); + expect(job.status).toBe(JOB_STATUS.RESOLVED); expect(job.result.status).toBe(400); }); @@ -254,8 +361,8 @@ describe('workflow > instructions > request', () => { const [execution] = await workflow.getExecutions(); const [job] = await execution.getJobs(); - expect(job.status).toEqual(JOB_STATUS.RESOLVED); - expect(job.result.data).toEqual({ title: 't1' }); + expect(job.status).toBe(JOB_STATUS.RESOLVED); + expect(job.result.data.data).toEqual({ title: 't1' }); }); // TODO(bug): should not use ejs @@ -278,8 +385,8 @@ describe('workflow > instructions > request', () => { const [execution] = await workflow.getExecutions(); const [job] = await execution.getJobs(); - expect(job.status).toEqual(JOB_STATUS.RESOLVED); - expect(job.result.data).toEqual({ title }); + expect(job.status).toBe(JOB_STATUS.RESOLVED); + expect(job.result.data.data).toEqual({ title }); }); it.skip('request inside loop', async () => { @@ -305,7 +412,7 @@ describe('workflow > instructions > request', () => { await sleep(500); const [execution] = await workflow.getExecutions(); - expect(execution.status).toEqual(EXECUTION_STATUS.RESOLVED); + expect(execution.status).toBe(EXECUTION_STATUS.RESOLVED); const jobs = await execution.getJobs({ order: [['id', 'ASC']] }); expect(jobs.length).toBe(3); expect(jobs.map((item) => item.status)).toEqual(Array(3).fill(JOB_STATUS.RESOLVED)); @@ -329,10 +436,10 @@ describe('workflow > instructions > request', () => { await sleep(500); const [execution] = await workflow.getExecutions(); - expect(execution.status).toEqual(EXECUTION_STATUS.RESOLVED); + expect(execution.status).toBe(EXECUTION_STATUS.RESOLVED); const [job] = await execution.getJobs(); - expect(job.status).toEqual(JOB_STATUS.RESOLVED); - expect(job.result.data).toEqual({ a: 't1' }); + expect(job.status).toBe(JOB_STATUS.RESOLVED); + expect(job.result.data.data).toEqual({ a: 't1' }); }); it('contentType as "application/x-www-form-urlencoded"', async () => { @@ -354,10 +461,34 @@ describe('workflow > instructions > request', () => { await sleep(500); const [execution] = await workflow.getExecutions(); - expect(execution.status).toEqual(EXECUTION_STATUS.RESOLVED); + expect(execution.status).toBe(EXECUTION_STATUS.RESOLVED); const [job] = await execution.getJobs(); - expect(job.status).toEqual(JOB_STATUS.RESOLVED); - expect(job.result.data).toEqual({ a: ['t1', '&=1'] }); + expect(job.status).toBe(JOB_STATUS.RESOLVED); + expect(job.result.data.data).toEqual({ a: ['t1', '&=1'] }); + }); + }); + + describe('invalid characters', () => { + it('\\n in header value should be trimed, and should not cause error', async () => { + const n1 = await workflow.createNode({ + type: 'request', + config: { + url: api.URL_DATA, + method: 'POST', + data: { a: '{{$context.data.title}}' }, + headers: [{ name: 'Authorization', value: 'abc\n' }], + }, + }); + + await PostRepo.create({ values: { title: 't1' } }); + + await sleep(500); + + const [execution] = await workflow.getExecutions(); + expect(execution.status).toBe(EXECUTION_STATUS.RESOLVED); + const [job] = await execution.getJobs(); + expect(job.status).toBe(JOB_STATUS.RESOLVED); + expect(job.result.data.data).toEqual({ a: 't1' }); }); }); @@ -398,7 +529,7 @@ describe('workflow > instructions > request', () => { expect(execution.status).toBe(EXECUTION_STATUS.RESOLVED); const [job] = await execution.getJobs(); expect(job.status).toBe(JOB_STATUS.RESOLVED); - expect(job.result.data).toMatchObject({}); + expect(job.result.data.data).toMatchObject({}); server.close(); }); @@ -428,11 +559,11 @@ describe('workflow > instructions > request', () => { const [execution] = await syncFlow.getExecutions(); expect(processor.execution.id).toEqual(execution.id); - expect(processor.execution.status).toEqual(execution.status); - expect(execution.status).toEqual(EXECUTION_STATUS.RESOLVED); + expect(processor.execution.status).toBe(execution.status); + expect(execution.status).toBe(EXECUTION_STATUS.RESOLVED); const [job] = await execution.getJobs(); - expect(job.status).toEqual(JOB_STATUS.RESOLVED); - expect(job.result).toEqual({ meta: {}, data: {} }); + expect(job.status).toBe(JOB_STATUS.RESOLVED); + expect(job.result.data).toEqual({ meta: {}, data: {} }); }); it('ignoreFail', async () => { diff --git a/packages/plugins/@nocobase/plugin-workflow-request/src/server/migrations/20240518105632-make-legacy-config-compatible.ts b/packages/plugins/@nocobase/plugin-workflow-request/src/server/migrations/20240518105632-make-legacy-config-compatible.ts new file mode 100644 index 0000000000..cad51aa649 --- /dev/null +++ b/packages/plugins/@nocobase/plugin-workflow-request/src/server/migrations/20240518105632-make-legacy-config-compatible.ts @@ -0,0 +1,40 @@ +/** + * This file is part of the NocoBase (R) project. + * Copyright (c) 2020-2024 NocoBase Co., Ltd. + * Authors: NocoBase Team. + * + * This project is dual-licensed under AGPL-3.0 and NocoBase Commercial License. + * For more information, please refer to: https://www.nocobase.com/agreement. + */ + +import { Migration } from '@nocobase/server'; + +export default class extends Migration { + appVersion = '<1.0.0-alpha.15'; + async up() { + const { db } = this.context; + + const NodeRepo = db.getRepository('flow_nodes'); + await db.sequelize.transaction(async (transaction) => { + const nodes = await NodeRepo.find({ + filter: { + type: 'request', + }, + transaction, + }); + + await nodes.reduce( + (promise, node) => + promise.then(() => { + node.set('config', { ...node.config, onlyData: true }); + node.changed('config', true); + return node.save({ + silent: true, + transaction, + }); + }), + Promise.resolve(), + ); + }); + } +}