dev: fix incomplete support for multipart driver requests

This commit is contained in:
KernelDeimos 2024-07-30 17:30:28 -04:00
parent 1708b37164
commit bca579529b
3 changed files with 151 additions and 33 deletions

View File

@ -34,3 +34,65 @@ URL.createObjectURL(await (await fetch("http://api.puter.localhost:4100/drivers/
"method": "POST",
})).blob());
```
```javascript
await(async () => {
blob = await (await fetch("http://api.puter.localhost:4100/drivers/call", {
"headers": {
"Content-Type": "application/json",
"Authorization": `Bearer ${puter.authToken}`,
},
"body": JSON.stringify({
interface: 'test-image',
method: 'get_image',
args: {
source_type: 'string:url:web'
}
}),
"method": "POST",
})).blob();
const endpoint = 'http://api.puter.localhost:4100/drivers/call';
const body = {
object: {
interface: 'test-image',
method: 'echo_image',
['args.source']: {
$: 'file',
size: blob.size,
type: blob.type,
},
},
file: [
blob,
]
};
const formData = new FormData();
for ( const k in body ) {
console.log('k', k);
const append = v => {
if ( v instanceof Blob ) {
formData.append(k, v, 'filename');
} else {
formData.append(k, JSON.stringify(v));
}
};
if ( Array.isArray(body[k]) ) {
for ( const v of body[k] ) append(v);
} else {
append(body[k]);
}
}
const response = await fetch(endpoint, {
method: 'POST',
headers: { 'Authorization': `Bearer ${puter.authToken}` },
body: formData
});
const echo_blob = await response.blob();
const echo_url = URL.createObjectURL(echo_blob);
return echo_url;
})();
```

View File

@ -16,12 +16,15 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
const APIError = require("../../api/APIError");
const eggspress = require("../../api/eggspress");
const { FileFacade } = require("../../services/drivers/FileFacade");
const { TypeSpec } = require("../../services/drivers/meta/Construct");
const { TypedValue } = require("../../services/drivers/meta/Runtime");
const { Context } = require("../../util/context");
const { whatis } = require("../../util/langutil");
const { TeePromise } = require("../../util/promise");
const { valid_file_size } = require("../../util/validutil");
let _handle_multipart;
@ -55,12 +58,14 @@ module.exports = eggspress('/drivers/call', {
const x = Context.get();
const svc_driver = x.get('services').get('driver');
const interface_name = req.body.interface;
const test_mode = req.body.test_mode;
let p_request = null;
let body;
if ( req.headers['content-type'].includes('multipart/form-data') ) {
({ params: body, p_data_end: p_request } = await _handle_multipart(req));
} else body = req.body;
const args = req.headers['content-type'].includes('multipart/form-data')
? await _handle_multipart(req)
: req.body.args;
const interface_name = body.interface;
const test_mode = body.test_mode;
let context = Context.get();
if ( test_mode ) context = context.sub({ test_mode: true });
@ -68,12 +73,22 @@ module.exports = eggspress('/drivers/call', {
const result = await context.arun(async () => {
return await svc_driver.call({
iface: interface_name,
method: req.body.method,
args
method: body.method,
format: body.format,
args: body.args,
});
});
// We can't wait for the request to finish before responding;
// consider the case where a driver method implements a
// stream transformation, thus the stream from the request isn't
// consumed until the response is being sent.
_respond(res, result);
// What we _can_ do is await the request promise while responding
// to ensure errors are caught here.
await p_request;
});
const _respond = (res, result) => {
@ -96,49 +111,86 @@ const _respond = (res, result) => {
};
_handle_multipart = async (req) => {
const busboy = require('busboy');
const { Readable } = require('stream');
const Busboy = require('busboy');
const { PassThrough } = require('stream');
const params = {};
const files = [];
let file_index = 0;
const bb = new busboy({
const bb = Busboy({
headers: req.headers,
});
const p_ready = new TeePromise();
const p_data_end = new TeePromise();
const p_nonfile_data_end = new TeePromise();
bb.on('file', (fieldname, stream, details) => {
const file_facade = new FileFacade();
file_facade.values.set('stream', stream);
file_facade.values.set('busboy:details', details);
if ( params.hasOwnProperty(fieldname) ) {
if ( ! Array.isArray(params[fieldname]) ) {
params[fieldname] = [params[fieldname]];
}
params[fieldname].push(file_facade);
} else {
params[fieldname] = file_facade;
}
p_nonfile_data_end.resolve();
const fileinfo = files[file_index++];
stream.pipe(fileinfo.stream);
});
bb.on('field', (fieldname, value, details) => {
if ( params.hasOwnProperty(fieldname) ) {
if ( ! Array.isArray(params[fieldname]) ) {
params[fieldname] = [params[fieldname]];
const on_field = (fieldname, value) => {
const key_parts = fieldname.split('.');
const last_key = key_parts.pop();
let dst = params;
for ( let i = 0; i < key_parts.length; i++ ) {
if ( ! dst.hasOwnProperty(key_parts[i]) ) {
dst[key_parts[i]] = {};
}
params[fieldname].push(value);
if ( whatis(dst[key_parts[i]]) !== 'object' ) {
throw new Error(
`Tried to set member of non-object: ${key_parts[i]} in ${fieldname}`
);
}
dst = dst[key_parts[i]];
}
if ( whatis(value) === 'object' && value.$ === 'file' ) {
const fileinfo = value;
const { v: size, ok: size_ok } =
valid_file_size(fileinfo.size);
if ( ! size_ok ) {
throw APIError.create('invalid_file_metadata');
}
fileinfo.size = size;
fileinfo.stream = new PassThrough();
const file_facade = new FileFacade();
file_facade.values.set('stream', fileinfo.stream);
fileinfo.facade = file_facade,
files.push(fileinfo);
value = file_facade;
}
if ( dst.hasOwnProperty(last_key) ) {
if ( ! Array.isArray(dst[last_key]) ) {
dst[last_key] = [dst[last_key]];
}
dst[last_key].push(value);
} else {
params[fieldname] = value;
dst[last_key] = value;
}
};
bb.on('field', (fieldname, value, details) => {
const o = JSON.parse(value);
for ( const k in o ) {
on_field(k, o[k]);
}
});
bb.on('error', (err) => {
p_ready.reject(err);
p_data_end.reject(err);
});
bb.on('close', () => {
p_ready.resolve();
p_data_end.resolve();
});
req.pipe(bb);
await p_ready;
(async () => {
await p_data_end;
p_nonfile_data_end.resolve();
})();
return params;
}
await p_nonfile_data_end;
return { params, p_data_end };
}

View File

@ -98,6 +98,10 @@ class File extends BaseType {
}
async consolidate (ctx, input, { arg_name }) {
if ( input instanceof FileFacade ) {
return input;
}
const result = new FileFacade();
// DRY: Part of this is duplicating FSNodeParam, but FSNodeParam is
// subject to change in PR #647, so this should be updated later.