add few code

This commit is contained in:
zizifn 2023-05-18 01:19:52 +08:00 committed by zizifn
parent d56791c5d4
commit 82538f0dcd
3 changed files with 134 additions and 48 deletions

View File

@ -7,6 +7,12 @@ import { connect } from 'cloudflare:sockets';
import { Buffer } from 'node:buffer';
import { validate } from 'uuid';
function delay(ms) {
return new Promise((resolve, rej) => {
setTimeout(resolve, ms);
});
}
interface Env {
UUID: string;
}
@ -31,48 +37,88 @@ export default {
const webSocketPair = new WebSocketPair();
const [client, webSocket] = Object.values(webSocketPair);
console.log(WebSocket.READY_STATE_OPEN);
const earlyDataHeader = request.headers.get('sec-websocket-protocol') || '';
let remoteSocket: TransformStream = null;
webSocket.accept();
webSocket.addEventListener('message', async (event) => {
if (remoteSocket) {
const writer = remoteSocket.writable.getWriter();
await writer.write(event.data);
writer.releaseLock();
return;
}
const vlessBuffer: ArrayBuffer = event.data as ArrayBuffer;
const {
hasError,
message,
portRemote,
addressRemote,
rawDataIndex,
vlessVersion,
isUDP,
} = processVlessHeader(vlessBuffer, userID);
address = addressRemote || '';
portWithRandomLog = `${portRemote}--${Math.random()} ${
isUDP ? 'udp ' : 'tcp '
} `;
log(`connecting`);
if (hasError) {
webSocket.close(); // server close will not casuse worker throw error
}
const vlessResponseHeader = new Uint8Array([vlessVersion![0], 0]);
const rawClientData = vlessBuffer.slice(rawDataIndex!);
remoteSocket = connect({
hostname: addressRemote,
port: portRemote,
});
log(`connected`);
const writer = remoteSocket.writable.getWriter();
await writer.write(rawClientData); // first write, nomal is tls client hello
writer.releaseLock();
const readableWebSocketStream = makeReadableWebSocketStream(
webSocket,
earlyDataHeader,
log
);
let vlessResponseHeader = new Uint8Array([0, 0]);
let remoteConnectionReadyResolve: Function;
// ws-->remote
readableWebSocketStream.pipeTo(
new WritableStream({
async write(chunk, controller) {
if (remoteSocket) {
const writer = remoteSocket.writable.getWriter();
await writer.write(chunk);
writer.releaseLock();
return;
}
const {
hasError,
message,
portRemote,
addressRemote,
rawDataIndex,
vlessVersion,
isUDP,
} = processVlessHeader(chunk, userID);
address = addressRemote || '';
portWithRandomLog = `${portRemote}--${Math.random()} ${
isUDP ? 'udp ' : 'tcp '
} `;
// if UDP but port not DNS port, close it
if (isUDP && portRemote != 53) {
controller.error('UDP proxy only enable for DNS which is port 53');
webSocket.close(); // server close will not casuse worker throw error
return;
}
if (hasError) {
controller.error(message);
webSocket.close(); // server close will not casuse worker throw error
return;
}
vlessResponseHeader = new Uint8Array([vlessVersion![0], 0]);
const rawClientData = chunk.slice(rawDataIndex!);
remoteSocket = connect({
hostname: addressRemote,
port: portRemote,
});
log(`connected`);
const writer = remoteSocket.writable.getWriter();
await writer.write(rawClientData); // first write, nomal is tls client hello
writer.releaseLock();
// remoteSocket ready
remoteConnectionReadyResolve(remoteSocket);
},
close() {
console.log(
`[${address}:${portWithRandomLog}] readableWebSocketStream is close`
);
},
abort(reason) {
console.log(
`[${address}:${portWithRandomLog}] readableWebSocketStream is abort`,
JSON.stringify(reason)
);
},
})
);
(async () => {
await new Promise((resolve) => (remoteConnectionReadyResolve = resolve));
// remote--> ws
let count = 0;
remoteSocket.readable
.pipeTo(
new WritableStream({
@ -83,7 +129,12 @@ export default {
},
async write(chunk: Uint8Array, controller) {
if (webSocket.readyState === WebSocket.READY_STATE_OPEN) {
if (count++ > 20000) {
// cf one package is 4096 byte(4kb), 4096 * 20000 = 80M
await delay(1);
}
webSocket.send(chunk);
// console.log(chunk.byteLength);
} else {
controller.error(
'webSocket.readyState is not open, maybe close'
@ -110,17 +161,7 @@ export default {
);
safeCloseWebSocket(webSocket);
});
// end
});
webSocket.addEventListener('close', async (event) => {
console.log('-------------close-----------------', event);
});
webSocket.addEventListener('error', () => {
console.log('-------------error-----------------');
});
})();
return new Response(null, {
status: 101,

View File

@ -0,0 +1,44 @@
export default {
async fetch(request: Request) {
let address = '';
let portWithRandomLog = '';
const log = (info: string, event?: any) => {
console.log(`[${address}:${portWithRandomLog}] ${info}`, event || '');
};
const upgradeHeader = request.headers.get('Upgrade');
if (!upgradeHeader || upgradeHeader !== 'websocket') {
return new Response('Expected Upgrade: websocket', { status: 426 });
}
const webSocketPair = new WebSocketPair();
const [client, webSocket] = Object.values(webSocketPair);
const earlyDataHeader = request.headers.get('sec-websocket-protocol') || '';
webSocket.accept();
webSocket.addEventListener('message', (event) => {
console.log(event.data);
webSocket.send(`server reponse after client sent ${event.data}`);
if (event.data === 'close') {
webSocket.close();
}
});
webSocket.addEventListener('close', async (event) => {
console.log('-------------close-----------------', event);
webSocket.close();
});
webSocket.addEventListener('error', () => {
console.log('-------------error-----------------');
});
client.addEventListener('close', (event) => {
console.log('----------client---close-----------------', event);
});
return new Response(null, {
status: 101,
webSocket: client,
});
},
};

View File

@ -27,6 +27,7 @@ export function makeReadableWebSocketStream(
return new ReadableStream<ArrayBuffer>({
start(controller) {
ws.addEventListener('message', async (e: { data: ArrayBuffer }) => {
// console.log('-----', e.data);
// is stream is cancel, skip controller.enqueue
if (readableStreamCancel) {
return;