WIP: Some incorrect things, and logging, will clean up later

What's done here:
- Call releaseLock() on the stdin reader, which immediately cancels any
  active reads and prevents new ones.
- Adjust BetterReader to pass releaseLock() on, and handle the input
  ending, and expose how much input was read into a buffer if it's
  partially filled.
- Add an error message when the one user of read(opt_buffer) gets too
  small a buffer.

That last one is readline trying to read 1 byte, and we currently
infinitely loop on that after exiting a process. So this is definitely
not correct but it feels like it's the right direction.
This commit is contained in:
Sam Atkins 2024-04-18 17:39:13 +01:00
parent 27fb88fd75
commit d1db4a11ef
4 changed files with 27 additions and 13 deletions

View File

@ -34,22 +34,25 @@ export class BetterReader {
const chunk = await this.getChunk_();
if ( ! opt_buffer ) {
if ( ! opt_buffer || ! chunk ) {
return chunk;
}
this.chunks_.push(chunk);
while ( this.getTotalBytesReady_() < opt_buffer.length ) {
this.chunks_.push(await this.getChunk_())
const read_chunk = await this.getChunk_();
if ( ! read_chunk ) {
break;
}
this.chunks_.push(read_chunk);
}
// TODO: need to handle EOT condition in this loop
let offset = 0;
for (;;) {
while ( this.chunks_.length > 0 && offset < opt_buffer.length ) {
let item = this.chunks_.shift();
if ( item === undefined ) {
throw new Error('calculation is wrong')
break;
}
if ( offset + item.length > opt_buffer.length ) {
const diff = opt_buffer.length - offset;
@ -58,16 +61,16 @@ export class BetterReader {
}
opt_buffer.set(item, offset);
offset += item.length;
if ( offset == opt_buffer.length ) break;
}
// return opt_buffer.length;
return offset;
}
async getChunk_() {
if ( this.chunks_.length === 0 ) {
const { value } = await this.delegate.read();
const { value } = await this.delegate.read().catch( ( err ) => {
return {};
});
return value;
}
@ -87,6 +90,11 @@ export class BetterReader {
getTotalBytesReady_ () {
return this.chunks_.reduce((sum, chunk) => sum + chunk.length, 0);
}
async releaseLock() {
console.log('cancelling betterreader, delegate is', this.delegate);
await this.delegate.releaseLock();
}
}
/**

View File

@ -24,7 +24,9 @@ export class Coupler {
Does not close the write stream when the read stream is closed.
`
constructor (source, target) {
constructor (source, target, debug) {
this.debug = debug || false;
console.log('creating coupler. source is', this.source);
this.source = source;
this.target = target;
this.on_ = true;
@ -39,8 +41,9 @@ export class Coupler {
on () { this.on_ = true; }
close () {
if (this.debug) console.log('closing coupler. source is', this.source);
this.source.releaseLock();
this.closed_.resolve({
value: undefined,
done: true,
});
}

View File

@ -203,7 +203,7 @@ export class PreparedCommand {
}
const internal_input_pipe = new Pipe();
const valve = new Coupler(in_, internal_input_pipe.in);
const valve = new Coupler(in_, internal_input_pipe.in, true);
in_ = internal_input_pipe.out;
// simple naive implementation for now

View File

@ -57,7 +57,10 @@ const ReadlineProcessorBuilder = builder => builder
const { locals, externs } = ctx;
const byteBuffer = new Uint8Array(1);
await externs.in_.read(byteBuffer);
const bytesRead = await externs.in_.read(byteBuffer);
if (bytesRead !== 1) {
console.warn('Failed to read byte in get-byte state of readline');
}
locals.byteBuffer = byteBuffer;
locals.byte = byteBuffer[0];
})