From da208e23f541e3b92bbf0a9acd1d7962666d3a10 Mon Sep 17 00:00:00 2001 From: KernelDeimos Date: Wed, 17 Apr 2024 12:22:53 -0400 Subject: [PATCH] Add a valve and internal pipe to commands --- .../src/ansi-shell/pipeline/Coupler.js | 15 ++++++- .../src/ansi-shell/pipeline/Pipeline.js | 5 +++ packages/phoenix/src/promise.js | 43 +++++++++++++++++++ 3 files changed, 62 insertions(+), 1 deletion(-) create mode 100644 packages/phoenix/src/promise.js diff --git a/packages/phoenix/src/ansi-shell/pipeline/Coupler.js b/packages/phoenix/src/ansi-shell/pipeline/Coupler.js index 43ffe423..2845be3a 100644 --- a/packages/phoenix/src/ansi-shell/pipeline/Coupler.js +++ b/packages/phoenix/src/ansi-shell/pipeline/Coupler.js @@ -16,6 +16,8 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ +import { TeePromise } from "../../promise.js"; + export class Coupler { static description = ` Connects a read stream to a write stream. @@ -26,6 +28,7 @@ export class Coupler { this.source = source; this.target = target; this.on_ = true; + this.closed_ = new TeePromise(); this.isDone = new Promise(rslv => { this.resolveIsDone = rslv; }) @@ -35,10 +38,20 @@ export class Coupler { off () { this.on_ = false; } on () { this.on_ = true; } + close () { + this.closed_.resolve({ + value: undefined, + done: true, + }); + } + async listenLoop_ () { this.active = true; for (;;) { - const { value, done } = await this.source.read(); + const { value, done } = await Promise.race([ + this.closed_, + this.source.read(), + ]); if ( done ) { this.source = null; this.target = null; diff --git a/packages/phoenix/src/ansi-shell/pipeline/Pipeline.js b/packages/phoenix/src/ansi-shell/pipeline/Pipeline.js index 2ad93864..27ae2c56 100644 --- a/packages/phoenix/src/ansi-shell/pipeline/Pipeline.js +++ b/packages/phoenix/src/ansi-shell/pipeline/Pipeline.js @@ -202,6 +202,10 @@ export class PreparedCommand { in_ = new MemReader(response); } + const internal_input_pipe = new Pipe(); + const valve = new Coupler(in_, internal_input_pipe.in); + in_ = internal_input_pipe.out; + // simple naive implementation for now const sig = { listeners_: [], @@ -297,6 +301,7 @@ export class PreparedCommand { let exit_code = 0; try { await execute(ctx); + valve.close(); } catch (e) { if ( e instanceof Exit ) { exit_code = e.code; diff --git a/packages/phoenix/src/promise.js b/packages/phoenix/src/promise.js new file mode 100644 index 00000000..20d080ae --- /dev/null +++ b/packages/phoenix/src/promise.js @@ -0,0 +1,43 @@ +export class TeePromise { + static STATUS_PENDING = Symbol('pending'); + static STATUS_RUNNING = {}; + static STATUS_DONE = Symbol('done'); + constructor () { + this.status_ = this.constructor.STATUS_PENDING; + this.donePromise = new Promise((resolve, reject) => { + this.doneResolve = resolve; + this.doneReject = reject; + }); + } + get status () { + return this.status_; + } + set status (status) { + this.status_ = status; + if ( status === this.constructor.STATUS_DONE ) { + this.doneResolve(); + } + } + resolve (value) { + this.status_ = this.constructor.STATUS_DONE; + this.doneResolve(value); + } + awaitDone () { + return this.donePromise; + } + then (fn, ...a) { + return this.donePromise.then(fn, ...a); + } + + reject (err) { + this.status_ = this.constructor.STATUS_DONE; + this.doneReject(err); + } + + /** + * @deprecated use then() instead + */ + onComplete(fn) { + return this.then(fn); + } +}