From bca579529bb9810b852b2cffeebd40a71d4d54c1 Mon Sep 17 00:00:00 2001 From: KernelDeimos Date: Tue, 30 Jul 2024 17:30:28 -0400 Subject: [PATCH] dev: fix incomplete support for multipart driver requests --- .../src/modules/test-drivers/doc/requests.md | 62 +++++++++ src/backend/src/routers/drivers/call.js | 118 +++++++++++++----- src/backend/src/services/drivers/types.js | 4 + 3 files changed, 151 insertions(+), 33 deletions(-) diff --git a/src/backend/src/modules/test-drivers/doc/requests.md b/src/backend/src/modules/test-drivers/doc/requests.md index 30b2e433..39275fb9 100644 --- a/src/backend/src/modules/test-drivers/doc/requests.md +++ b/src/backend/src/modules/test-drivers/doc/requests.md @@ -34,3 +34,65 @@ URL.createObjectURL(await (await fetch("http://api.puter.localhost:4100/drivers/ "method": "POST", })).blob()); ``` + +```javascript +await(async () => { + + blob = await (await fetch("http://api.puter.localhost:4100/drivers/call", { + "headers": { + "Content-Type": "application/json", + "Authorization": `Bearer ${puter.authToken}`, + }, + "body": JSON.stringify({ + interface: 'test-image', + method: 'get_image', + args: { + source_type: 'string:url:web' + } + }), + "method": "POST", + })).blob(); + + const endpoint = 'http://api.puter.localhost:4100/drivers/call'; + + const body = { + object: { + interface: 'test-image', + method: 'echo_image', + ['args.source']: { + $: 'file', + size: blob.size, + type: blob.type, + }, + }, + file: [ + blob, + ] + }; + + const formData = new FormData(); + for ( const k in body ) { + console.log('k', k); + const append = v => { + if ( v instanceof Blob ) { + formData.append(k, v, 'filename'); + } else { + formData.append(k, JSON.stringify(v)); + } + }; + if ( Array.isArray(body[k]) ) { + for ( const v of body[k] ) append(v); + } else { + append(body[k]); + } + } + const response = await fetch(endpoint, { + method: 'POST', + headers: { 'Authorization': `Bearer ${puter.authToken}` }, + body: formData + }); + const echo_blob = await response.blob(); + const echo_url = URL.createObjectURL(echo_blob); + return echo_url; +})(); +``` \ No newline at end of file diff --git a/src/backend/src/routers/drivers/call.js b/src/backend/src/routers/drivers/call.js index 8d0ff7aa..737231af 100644 --- a/src/backend/src/routers/drivers/call.js +++ b/src/backend/src/routers/drivers/call.js @@ -16,12 +16,15 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ +const APIError = require("../../api/APIError"); const eggspress = require("../../api/eggspress"); const { FileFacade } = require("../../services/drivers/FileFacade"); const { TypeSpec } = require("../../services/drivers/meta/Construct"); const { TypedValue } = require("../../services/drivers/meta/Runtime"); const { Context } = require("../../util/context"); +const { whatis } = require("../../util/langutil"); const { TeePromise } = require("../../util/promise"); +const { valid_file_size } = require("../../util/validutil"); let _handle_multipart; @@ -55,12 +58,14 @@ module.exports = eggspress('/drivers/call', { const x = Context.get(); const svc_driver = x.get('services').get('driver'); - const interface_name = req.body.interface; - const test_mode = req.body.test_mode; + let p_request = null; + let body; + if ( req.headers['content-type'].includes('multipart/form-data') ) { + ({ params: body, p_data_end: p_request } = await _handle_multipart(req)); + } else body = req.body; - const args = req.headers['content-type'].includes('multipart/form-data') - ? await _handle_multipart(req) - : req.body.args; + const interface_name = body.interface; + const test_mode = body.test_mode; let context = Context.get(); if ( test_mode ) context = context.sub({ test_mode: true }); @@ -68,12 +73,22 @@ module.exports = eggspress('/drivers/call', { const result = await context.arun(async () => { return await svc_driver.call({ iface: interface_name, - method: req.body.method, - args + method: body.method, + format: body.format, + args: body.args, }); }); + // We can't wait for the request to finish before responding; + // consider the case where a driver method implements a + // stream transformation, thus the stream from the request isn't + // consumed until the response is being sent. + _respond(res, result); + + // What we _can_ do is await the request promise while responding + // to ensure errors are caught here. + await p_request; }); const _respond = (res, result) => { @@ -96,49 +111,86 @@ const _respond = (res, result) => { }; _handle_multipart = async (req) => { - const busboy = require('busboy'); - const { Readable } = require('stream'); + const Busboy = require('busboy'); + const { PassThrough } = require('stream'); const params = {}; + const files = []; + let file_index = 0; - const bb = new busboy({ + const bb = Busboy({ headers: req.headers, }); - const p_ready = new TeePromise(); + const p_data_end = new TeePromise(); + const p_nonfile_data_end = new TeePromise(); bb.on('file', (fieldname, stream, details) => { - const file_facade = new FileFacade(); - file_facade.values.set('stream', stream); - file_facade.values.set('busboy:details', details); - if ( params.hasOwnProperty(fieldname) ) { - if ( ! Array.isArray(params[fieldname]) ) { - params[fieldname] = [params[fieldname]]; - } - params[fieldname].push(file_facade); - } else { - params[fieldname] = file_facade; - } + p_nonfile_data_end.resolve(); + const fileinfo = files[file_index++]; + stream.pipe(fileinfo.stream); }); - bb.on('field', (fieldname, value, details) => { - if ( params.hasOwnProperty(fieldname) ) { - if ( ! Array.isArray(params[fieldname]) ) { - params[fieldname] = [params[fieldname]]; + + const on_field = (fieldname, value) => { + const key_parts = fieldname.split('.'); + const last_key = key_parts.pop(); + let dst = params; + for ( let i = 0; i < key_parts.length; i++ ) { + if ( ! dst.hasOwnProperty(key_parts[i]) ) { + dst[key_parts[i]] = {}; } - params[fieldname].push(value); + if ( whatis(dst[key_parts[i]]) !== 'object' ) { + throw new Error( + `Tried to set member of non-object: ${key_parts[i]} in ${fieldname}` + ); + } + dst = dst[key_parts[i]]; + } + if ( whatis(value) === 'object' && value.$ === 'file' ) { + const fileinfo = value; + const { v: size, ok: size_ok } = + valid_file_size(fileinfo.size); + if ( ! size_ok ) { + throw APIError.create('invalid_file_metadata'); + } + fileinfo.size = size; + fileinfo.stream = new PassThrough(); + const file_facade = new FileFacade(); + file_facade.values.set('stream', fileinfo.stream); + fileinfo.facade = file_facade, + files.push(fileinfo); + value = file_facade; + } + if ( dst.hasOwnProperty(last_key) ) { + if ( ! Array.isArray(dst[last_key]) ) { + dst[last_key] = [dst[last_key]]; + } + dst[last_key].push(value); } else { - params[fieldname] = value; + dst[last_key] = value; + } + }; + + bb.on('field', (fieldname, value, details) => { + const o = JSON.parse(value); + for ( const k in o ) { + on_field(k, o[k]); } }); bb.on('error', (err) => { - p_ready.reject(err); + p_data_end.reject(err); }); bb.on('close', () => { - p_ready.resolve(); + p_data_end.resolve(); }); req.pipe(bb); - await p_ready; + (async () => { + await p_data_end; + p_nonfile_data_end.resolve(); + })(); - return params; -} \ No newline at end of file + await p_nonfile_data_end; + + return { params, p_data_end }; +} diff --git a/src/backend/src/services/drivers/types.js b/src/backend/src/services/drivers/types.js index cf381385..1c227bd9 100644 --- a/src/backend/src/services/drivers/types.js +++ b/src/backend/src/services/drivers/types.js @@ -98,6 +98,10 @@ class File extends BaseType { } async consolidate (ctx, input, { arg_name }) { + if ( input instanceof FileFacade ) { + return input; + } + const result = new FileFacade(); // DRY: Part of this is duplicating FSNodeParam, but FSNodeParam is // subject to change in PR #647, so this should be updated later.