diff --git a/packages/phoenix/packages/pty/exports.js b/packages/phoenix/packages/pty/exports.js index da317532..90301767 100644 --- a/packages/phoenix/packages/pty/exports.js +++ b/packages/phoenix/packages/pty/exports.js @@ -27,21 +27,56 @@ export class BetterReader { this.chunks_ = []; } - async read (opt_buffer) { + _create_cancel_response () { + return { + chunk: null, + n_read: 0, + debug_meta: { + source: 'delegate', + returning: 'cancelled', + this_value_should_not_be_used: true, + }, + }; + } + + async read_and_get_info (opt_buffer, cancel_state) { if ( ! opt_buffer && this.chunks_.length === 0 ) { - return await this.delegate.read(); + const chunk = await this.delegate.read(); + if ( cancel_state?.cancelled ) { + // push the chunk back onto the queue + this.chunks_.push(chunk); + return this._create_cancel_response(); + } + return { + chunk, + debug_meta: { source: 'delegate' }, + }; } const chunk = await this.getChunk_(); + if ( cancel_state?.cancelled ) { + // push the chunk back onto the queue + this.chunks_.push(chunk); + return this._create_cancel_response(); + } - if ( ! opt_buffer || ! chunk ) { - return chunk; + if ( ! opt_buffer ) { + return { chunk, debug_meta: { source: 'stored chunks', returning: 'chunk' } }; + } + + if ( ! chunk ) { + return { n_read: 0, debug_meta: { source: 'nothing', returning: 'byte count' } }; } this.chunks_.push(chunk); while ( this.getTotalBytesReady_() < opt_buffer.length ) { const read_chunk = await this.getChunk_(); + if ( cancel_state?.cancelled ) { + // push the chunk back onto the queue + this.chunks_.push(read_chunk); + return this._create_cancel_response(); + } if ( ! read_chunk ) { break; } @@ -63,14 +98,37 @@ export class BetterReader { offset += item.length; } - return offset; + return { + n_read: offset, + debug_meta: { source: 'stored chunks', returning: 'byte count' }, + }; + } + + read_with_cancel (opt_buffer) { + const cancel_state = { cancelled: false }; + const promise = (async () => { + const { chunk, n_read } = await this.read_and_get_info(opt_buffer, cancel_state); + console.log('this is outputting...', chunk, n_read, opt_buffer); + return opt_buffer ? n_read : chunk; + })(); + return { + canceller: () => { + cancel_state.cancelled = true; + }, + promise, + }; + } + + async read (opt_buffer) { + const { chunk, n_read } = await this.read_and_get_info(opt_buffer); + console.log('this is outputting...', chunk, n_read, opt_buffer); + return opt_buffer ? n_read : chunk; } async getChunk_() { if ( this.chunks_.length === 0 ) { - const { value } = await this.delegate.read().catch( ( err ) => { - return {}; - }); + const { value } = await this.delegate.read(); + console.log('got value?', value); return value; } @@ -84,6 +142,8 @@ export class BetterReader { this.chunks_ = []; + console.log('returning merged', merged); + return merged; } diff --git a/packages/phoenix/src/ansi-shell/pipeline/Coupler.js b/packages/phoenix/src/ansi-shell/pipeline/Coupler.js index b66af605..329fa0c4 100644 --- a/packages/phoenix/src/ansi-shell/pipeline/Coupler.js +++ b/packages/phoenix/src/ansi-shell/pipeline/Coupler.js @@ -16,7 +16,7 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -import { TeePromise } from "../../promise.js"; +import { TeePromise, raceCase } from "../../promise.js"; export class Coupler { static description = ` @@ -42,7 +42,6 @@ export class Coupler { close () { if (this.debug) console.log('closing coupler. source is', this.source); - this.source.releaseLock(); this.closed_.resolve({ done: true, }); @@ -51,11 +50,21 @@ export class Coupler { async listenLoop_ () { this.active = true; for (;;) { - const { value, done } = await Promise.race([ - this.closed_, - this.source.read(), - ]); + let canceller = () => {}; + let promise; + if ( this.source.read_with_cancel !== undefined ) { + ({ canceller, promise } = this.source.read_with_cancel()); + } else { + promise = this.source.read(); + } + const [which, { value, done }] = await raceCase({ + source: promise, + closed: this.closed_, + }); if ( done ) { + if ( which === 'closed' ) { + canceller(); + } this.source = null; this.target = null; this.active = false; diff --git a/packages/phoenix/src/ansi-shell/readline/readline.js b/packages/phoenix/src/ansi-shell/readline/readline.js index 9f8e69c7..600171f8 100644 --- a/packages/phoenix/src/ansi-shell/readline/readline.js +++ b/packages/phoenix/src/ansi-shell/readline/readline.js @@ -57,9 +57,11 @@ const ReadlineProcessorBuilder = builder => builder const { locals, externs } = ctx; const byteBuffer = new Uint8Array(1); - const bytesRead = await externs.in_.read(byteBuffer); + const { n_read: bytesRead, debug_meta } = await externs.in_.read_and_get_info(byteBuffer); if (bytesRead !== 1) { console.warn('Failed to read byte in get-byte state of readline'); + console.log('debug_meta', debug_meta); // so GC doesn't remove it + // debugger; } locals.byteBuffer = byteBuffer; locals.byte = byteBuffer[0]; diff --git a/packages/phoenix/src/promise.js b/packages/phoenix/src/promise.js index 20d080ae..5c47d2bd 100644 --- a/packages/phoenix/src/promise.js +++ b/packages/phoenix/src/promise.js @@ -41,3 +41,17 @@ export class TeePromise { return this.then(fn); } } + +/** + * raceCase is like Promise.race except it takes an object instead of + * an array, and returns the key of the promise that resolves first + * as well as the value that it resolved to. + * + * @param {Object.} promise_map + * + * @returns {Promise.<[string, any]>} + */ +export const raceCase = async (promise_map) => { + return Promise.race(Object.entries(promise_map).map( + ([key, promise]) => promise.then(value => [key, value]))); +};