From 82538f0dcd6a818965fcfa4f363cffaaa780b7c8 Mon Sep 17 00:00:00 2001 From: zizifn <1803942+zizifn@users.noreply.github.com> Date: Thu, 18 May 2023 01:19:52 +0800 Subject: [PATCH] add few code --- libs/cf-worker-vless/src/index.ts | 137 +++++++++++++++++++----------- libs/cf-worker-vless/src/ws.ts | 44 ++++++++++ libs/vless-js/src/lib/vless-js.ts | 1 + 3 files changed, 134 insertions(+), 48 deletions(-) create mode 100644 libs/cf-worker-vless/src/ws.ts diff --git a/libs/cf-worker-vless/src/index.ts b/libs/cf-worker-vless/src/index.ts index 1c68327..29c0125 100644 --- a/libs/cf-worker-vless/src/index.ts +++ b/libs/cf-worker-vless/src/index.ts @@ -7,6 +7,12 @@ import { connect } from 'cloudflare:sockets'; import { Buffer } from 'node:buffer'; import { validate } from 'uuid'; +function delay(ms) { + return new Promise((resolve, rej) => { + setTimeout(resolve, ms); + }); +} + interface Env { UUID: string; } @@ -31,48 +37,88 @@ export default { const webSocketPair = new WebSocketPair(); const [client, webSocket] = Object.values(webSocketPair); - console.log(WebSocket.READY_STATE_OPEN); - const earlyDataHeader = request.headers.get('sec-websocket-protocol') || ''; let remoteSocket: TransformStream = null; webSocket.accept(); - webSocket.addEventListener('message', async (event) => { - if (remoteSocket) { - const writer = remoteSocket.writable.getWriter(); - await writer.write(event.data); - writer.releaseLock(); - return; - } - const vlessBuffer: ArrayBuffer = event.data as ArrayBuffer; - const { - hasError, - message, - portRemote, - addressRemote, - rawDataIndex, - vlessVersion, - isUDP, - } = processVlessHeader(vlessBuffer, userID); - address = addressRemote || ''; - portWithRandomLog = `${portRemote}--${Math.random()} ${ - isUDP ? 'udp ' : 'tcp ' - } `; - log(`connecting`); - if (hasError) { - webSocket.close(); // server close will not casuse worker throw error - } - const vlessResponseHeader = new Uint8Array([vlessVersion![0], 0]); - const rawClientData = vlessBuffer.slice(rawDataIndex!); - remoteSocket = connect({ - hostname: addressRemote, - port: portRemote, - }); - log(`connected`); - const writer = remoteSocket.writable.getWriter(); - await writer.write(rawClientData); // first write, nomal is tls client hello - writer.releaseLock(); + const readableWebSocketStream = makeReadableWebSocketStream( + webSocket, + earlyDataHeader, + log + ); + let vlessResponseHeader = new Uint8Array([0, 0]); + let remoteConnectionReadyResolve: Function; + // ws-->remote + + readableWebSocketStream.pipeTo( + new WritableStream({ + async write(chunk, controller) { + if (remoteSocket) { + const writer = remoteSocket.writable.getWriter(); + await writer.write(chunk); + writer.releaseLock(); + return; + } + + const { + hasError, + message, + portRemote, + addressRemote, + rawDataIndex, + vlessVersion, + isUDP, + } = processVlessHeader(chunk, userID); + address = addressRemote || ''; + portWithRandomLog = `${portRemote}--${Math.random()} ${ + isUDP ? 'udp ' : 'tcp ' + } `; + // if UDP but port not DNS port, close it + if (isUDP && portRemote != 53) { + controller.error('UDP proxy only enable for DNS which is port 53'); + webSocket.close(); // server close will not casuse worker throw error + return; + } + if (hasError) { + controller.error(message); + webSocket.close(); // server close will not casuse worker throw error + return; + } + vlessResponseHeader = new Uint8Array([vlessVersion![0], 0]); + const rawClientData = chunk.slice(rawDataIndex!); + remoteSocket = connect({ + hostname: addressRemote, + port: portRemote, + }); + log(`connected`); + + const writer = remoteSocket.writable.getWriter(); + await writer.write(rawClientData); // first write, nomal is tls client hello + writer.releaseLock(); + + // remoteSocket ready + remoteConnectionReadyResolve(remoteSocket); + }, + close() { + console.log( + `[${address}:${portWithRandomLog}] readableWebSocketStream is close` + ); + }, + abort(reason) { + console.log( + `[${address}:${portWithRandomLog}] readableWebSocketStream is abort`, + JSON.stringify(reason) + ); + }, + }) + ); + + (async () => { + await new Promise((resolve) => (remoteConnectionReadyResolve = resolve)); + + // remote--> ws + let count = 0; remoteSocket.readable .pipeTo( new WritableStream({ @@ -83,7 +129,12 @@ export default { }, async write(chunk: Uint8Array, controller) { if (webSocket.readyState === WebSocket.READY_STATE_OPEN) { + if (count++ > 20000) { + // cf one package is 4096 byte(4kb), 4096 * 20000 = 80M + await delay(1); + } webSocket.send(chunk); + // console.log(chunk.byteLength); } else { controller.error( 'webSocket.readyState is not open, maybe close' @@ -110,17 +161,7 @@ export default { ); safeCloseWebSocket(webSocket); }); - - // end - }); - - webSocket.addEventListener('close', async (event) => { - console.log('-------------close-----------------', event); - }); - - webSocket.addEventListener('error', () => { - console.log('-------------error-----------------'); - }); + })(); return new Response(null, { status: 101, diff --git a/libs/cf-worker-vless/src/ws.ts b/libs/cf-worker-vless/src/ws.ts new file mode 100644 index 0000000..4507597 --- /dev/null +++ b/libs/cf-worker-vless/src/ws.ts @@ -0,0 +1,44 @@ +export default { + async fetch(request: Request) { + let address = ''; + let portWithRandomLog = ''; + + const log = (info: string, event?: any) => { + console.log(`[${address}:${portWithRandomLog}] ${info}`, event || ''); + }; + + const upgradeHeader = request.headers.get('Upgrade'); + if (!upgradeHeader || upgradeHeader !== 'websocket') { + return new Response('Expected Upgrade: websocket', { status: 426 }); + } + + const webSocketPair = new WebSocketPair(); + const [client, webSocket] = Object.values(webSocketPair); + const earlyDataHeader = request.headers.get('sec-websocket-protocol') || ''; + webSocket.accept(); + webSocket.addEventListener('message', (event) => { + console.log(event.data); + webSocket.send(`server reponse after client sent ${event.data}`); + if (event.data === 'close') { + webSocket.close(); + } + }); + webSocket.addEventListener('close', async (event) => { + console.log('-------------close-----------------', event); + webSocket.close(); + }); + + webSocket.addEventListener('error', () => { + console.log('-------------error-----------------'); + }); + + client.addEventListener('close', (event) => { + console.log('----------client---close-----------------', event); + }); + + return new Response(null, { + status: 101, + webSocket: client, + }); + }, +}; diff --git a/libs/vless-js/src/lib/vless-js.ts b/libs/vless-js/src/lib/vless-js.ts index a8bc8d0..670ffff 100644 --- a/libs/vless-js/src/lib/vless-js.ts +++ b/libs/vless-js/src/lib/vless-js.ts @@ -27,6 +27,7 @@ export function makeReadableWebSocketStream( return new ReadableStream({ start(controller) { ws.addEventListener('message', async (e: { data: ArrayBuffer }) => { + // console.log('-----', e.data); // is stream is cancel, skip controller.enqueue if (readableStreamCancel) { return;