From 23370b4c27185cd24e7d8b7f137efa3a588d4380 Mon Sep 17 00:00:00 2001 From: rany2 Date: Wed, 4 Jan 2023 23:45:22 +0200 Subject: [PATCH 01/14] Simplify edge_tts library usage --- src/edge_playback/__init__.py | 71 ++++---- src/edge_tts/communicate.py | 299 ++++++++++++++++++++-------------- src/edge_tts/exceptions.py | 13 ++ src/edge_tts/submaker.py | 8 +- src/edge_tts/util.py | 93 ++++------- 5 files changed, 263 insertions(+), 221 deletions(-) create mode 100644 src/edge_tts/exceptions.py diff --git a/src/edge_playback/__init__.py b/src/edge_playback/__init__.py index 86df7c4..863e4d0 100644 --- a/src/edge_playback/__init__.py +++ b/src/edge_playback/__init__.py @@ -12,44 +12,51 @@ from shutil import which def main(): - """ - Main function. - """ - if which("mpv") and which("edge-tts"): + depcheck_failed = False + if not which("mpv"): + print("mpv is not installed.", file=sys.stderr) + depcheck_failed = True + if not which("edge-tts"): + print("edge-tts is not installed.", file=sys.stderr) + depcheck_failed = True + if depcheck_failed: + print("Please install the missing dependencies.", file=sys.stderr) + sys.exit(1) + + media = None + subtitle = None + try: media = tempfile.NamedTemporaryFile(delete=False) + media.close() + subtitle = tempfile.NamedTemporaryFile(delete=False) - try: - media.close() - subtitle.close() + subtitle.close() - print() - print(f"Media file: {media.name}") - print(f"Subtitle file: {subtitle.name}\n") - with subprocess.Popen( - [ - "edge-tts", - "--boundary-type=1", - f"--write-media={media.name}", - f"--write-subtitles={subtitle.name}", - ] - + sys.argv[1:] - ) as process: - process.communicate() + print(f"Media file: {media.name}") + print(f"Subtitle file: {subtitle.name}\n") + with subprocess.Popen( + [ + "edge-tts", + f"--write-media={media.name}", + f"--write-subtitles={subtitle.name}", + ] + + sys.argv[1:] + ) as process: + process.communicate() - with subprocess.Popen( - [ - "mpv", - "--keep-open=yes", - f"--sub-file={subtitle.name}", - media.name, - ] - ) as process: - process.communicate() - finally: + with subprocess.Popen( + [ + "mpv", + f"--sub-file={subtitle.name}", + media.name, + ] + ) as process: + process.communicate() + finally: + if media is not None: os.unlink(media.name) + if subtitle is not None: os.unlink(subtitle.name) - else: - print("This script requires mpv and edge-tts.") if __name__ == "__main__": diff --git a/src/edge_tts/communicate.py b/src/edge_tts/communicate.py index f81d063..2c14322 100644 --- a/src/edge_tts/communicate.py +++ b/src/edge_tts/communicate.py @@ -4,16 +4,20 @@ Communicate package. import json +import re import time import uuid +from typing import Dict, Generator, List, Optional from xml.sax.saxutils import escape import aiohttp +from edge_tts.exceptions import * + from .constants import WSS_URL -def get_headers_and_data(data): +def get_headers_and_data(data: str | bytes) -> tuple[Dict[str, str], bytes]: """ Returns the headers and data from the given data. @@ -25,6 +29,8 @@ def get_headers_and_data(data): """ if isinstance(data, str): data = data.encode("utf-8") + if not isinstance(data, bytes): + raise TypeError("data must be str or bytes") headers = {} for line in data.split(b"\r\n\r\n")[0].split(b"\r\n"): @@ -37,7 +43,7 @@ def get_headers_and_data(data): return headers, b"\r\n\r\n".join(data.split(b"\r\n\r\n")[1:]) -def remove_incompatible_characters(string): +def remove_incompatible_characters(string: str | bytes) -> str: """ The service does not support a couple character ranges. Most important being the vertical tab character which is @@ -52,31 +58,30 @@ def remove_incompatible_characters(string): """ if isinstance(string, bytes): string = string.decode("utf-8") + if not isinstance(string, str): + raise TypeError("string must be str or bytes") - string = list(string) + chars: List[str] = list(string) - for idx, char in enumerate(string): - code = ord(char) + for idx, char in enumerate(chars): + code: int = ord(char) if (0 <= code <= 8) or (11 <= code <= 12) or (14 <= code <= 31): - string[idx] = " " + chars[idx] = " " - return "".join(string) + return "".join(chars) -def connect_id(): +def connect_id() -> str: """ Returns a UUID without dashes. - Args: - None - Returns: str: A UUID without dashes. """ return str(uuid.uuid4()).replace("-", "") -def iter_bytes(my_bytes): +def iter_bytes(my_bytes: bytes) -> Generator[bytes, None, None]: """ Iterates over bytes object @@ -90,20 +95,22 @@ def iter_bytes(my_bytes): yield my_bytes[i : i + 1] -def split_text_by_byte_length(text, byte_length): +def split_text_by_byte_length(text: bytes, byte_length: int) -> List[bytes]: """ Splits a string into a list of strings of a given byte length while attempting to keep words together. Args: - text (byte): The string to be split. - byte_length (int): The byte length of each string in the list. + text (str or bytes): The string to be split. + byte_length (int): The maximum byte length of each string in the list. Returns: - list: A list of strings of the given byte length. + list: A list of bytes of the given byte length. """ if isinstance(text, str): text = text.encode("utf-8") + if not isinstance(text, bytes): + raise TypeError("text must be str or bytes") words = [] while len(text) > byte_length: @@ -125,17 +132,10 @@ def split_text_by_byte_length(text, byte_length): return words -def mkssml(text, voice, pitch, rate, volume): +def mkssml(text: str | bytes, voice: str, pitch: str, rate: str, volume: str) -> str: """ Creates a SSML string from the given parameters. - Args: - text (str): The text to be spoken. - voice (str): The voice to be used. - pitch (str): The pitch to be used. - rate (str): The rate to be used. - volume (str): The volume to be used. - Returns: str: The SSML string. """ @@ -154,9 +154,6 @@ def date_to_string(): """ Return Javascript-style date string. - Args: - None - Returns: str: Javascript-style date string. """ @@ -171,15 +168,10 @@ def date_to_string(): ) -def ssml_headers_plus_data(request_id, timestamp, ssml): +def ssml_headers_plus_data(request_id: str, timestamp: str, ssml: str) -> str: """ Returns the headers and data to be used in the request. - Args: - request_id (str): The request ID. - timestamp (str): The timestamp. - ssml (str): The SSML string. - Returns: str: The headers and data to be used in the request. """ @@ -198,73 +190,86 @@ class Communicate: Class for communicating with the service. """ - def __init__(self): - """ - Initializes the Communicate class. - """ - self.date = date_to_string() - - async def run( + def __init__( self, - messages, - boundary_type=0, - codec="audio-24khz-48kbitrate-mono-mp3", - voice="Microsoft Server Speech Text to Speech Voice (en-US, AriaNeural)", - pitch="+0Hz", - rate="+0%", - volume="+0%", - proxy=None, + text: str | List[str], + voice: str = "Microsoft Server Speech Text to Speech Voice (en-US, AriaNeural)", + *, + pitch: str = "+0Hz", + rate: str = "+0%", + volume: str = "+0%", + proxy: Optional[str] = None, ): """ - Runs the Communicate class. + Initializes the Communicate class. - Args: - messages (str or list): A list of SSML strings or a single text. - boundery_type (int): The type of boundary to use. 0 for none, 1 for word_boundary, 2 for sentence_boundary. - codec (str): The codec to use. - voice (str): The voice to use. - pitch (str): The pitch to use. - rate (str): The rate to use. - volume (str): The volume to use. - - Yields: - tuple: The subtitle offset, subtitle, and audio data. + Raises: + ValueError: If the voice is not valid. """ - - word_boundary = False - - if boundary_type > 0: - word_boundary = True - if boundary_type > 1: - raise ValueError( - "Invalid boundary type. SentenceBoundary is no longer supported." + self.text = text + self.boundary_type = 1 + self.codec = "audio-24khz-48kbitrate-mono-mp3" + self.voice = voice + # Possible values for voice are: + # - Microsoft Server Speech Text to Speech Voice (cy-GB, NiaNeural) + # - cy-GB-NiaNeural + # Always send the first variant as that is what Microsoft Edge does. + match = re.match(r"^([a-z]{2})-([A-Z]{2})-(.+Neural)$", voice) + if match is not None: + self.voice = ( + "Microsoft Server Speech Text to Speech Voice" + + f" ({match.group(1)}-{match.group(2)}, {match.group(3)})" ) - word_boundary = str(word_boundary).lower() + if ( + re.match( + r"^Microsoft Server Speech Text to Speech Voice \(.+,.+\)$", + self.voice, + ) + is None + ): + raise ValueError(f"Invalid voice '{voice}'.") - websocket_max_size = 2 ** 16 + if re.match(r"^[+-]\d+Hz$", pitch) is None: + raise ValueError(f"Invalid pitch '{pitch}'.") + self.pitch = pitch + + if re.match(r"^[+-]0*([0-9]|([1-9][0-9])|100)%$", rate) is None: + raise ValueError(f"Invalid rate '{rate}'.") + self.rate = rate + + if re.match(r"^[+-]0*([0-9]|([1-9][0-9])|100)%$", volume) is None: + raise ValueError(f"Invalid volume '{volume}'.") + self.volume = volume + + self.proxy = proxy + + async def stream(self): + """Streams audio and metadata from the service.""" + + websocket_max_size = 2**16 overhead_per_message = ( len( ssml_headers_plus_data( - connect_id(), self.date, mkssml("", voice, pitch, rate, volume) + connect_id(), + date_to_string(), + mkssml("", self.voice, self.pitch, self.rate, self.volume), ) ) - + 50 - ) # margin of error - messages = split_text_by_byte_length( - escape(remove_incompatible_characters(messages)), + + 50 # margin of error + ) + texts = split_text_by_byte_length( + escape(remove_incompatible_characters(self.text)), websocket_max_size - overhead_per_message, ) - # Variables for the loop - download = False async with aiohttp.ClientSession(trust_env=True) as session: async with session.ws_connect( f"{WSS_URL}&ConnectionId={connect_id()}", compress=15, autoclose=True, autoping=True, - proxy=proxy, + proxy=self.proxy, headers={ "Pragma": "no-cache", "Cache-Control": "no-cache", @@ -275,9 +280,19 @@ class Communicate: " (KHTML, like Gecko) Chrome/91.0.4472.77 Safari/537.36 Edg/91.0.864.41", }, ) as websocket: - for message in messages: + for text in texts: + # download indicates whether we should be expecting audio data, + # this is so what we avoid getting binary data from the websocket + # and falsely thinking it's audio data. + download = False + + # audio_was_received indicates whether we have received audio data + # from the websocket. This is so we can raise an exception if we + # don't receive any audio data. + audio_was_received = False + # Each message needs to have the proper date - self.date = date_to_string() + date = date_to_string() # Prepare the request to be sent to the service. # @@ -290,26 +305,26 @@ class Communicate: # # Also pay close attention to double { } in request (escape for f-string). request = ( - f"X-Timestamp:{self.date}\r\n" + f"X-Timestamp:{date}\r\n" "Content-Type:application/json; charset=utf-8\r\n" "Path:speech.config\r\n\r\n" '{"context":{"synthesis":{"audio":{"metadataoptions":{' - f'"sentenceBoundaryEnabled":false,' - f'"wordBoundaryEnabled":{word_boundary}}},"outputFormat":"{codec}"' + '"sentenceBoundaryEnabled":false,"wordBoundaryEnabled":true},' + f'"outputFormat":"{self.codec}"' "}}}}\r\n" ) - # Send the request to the service. await websocket.send_str(request) - # Send the message itself. + await websocket.send_str( ssml_headers_plus_data( connect_id(), - self.date, - mkssml(message, voice, pitch, rate, volume), + date, + mkssml( + text, self.voice, self.pitch, self.rate, self.volume + ), ) ) - # Begin listening for the response. async for received in websocket: if received.type == aiohttp.WSMsgType.TEXT: parameters, data = get_headers_and_data(received.data) @@ -329,35 +344,34 @@ class Communicate: and parameters["Path"] == "audio.metadata" ): metadata = json.loads(data) - metadata_type = metadata["Metadata"][0]["Type"] - metadata_offset = metadata["Metadata"][0]["Data"][ - "Offset" - ] - if metadata_type == "WordBoundary": - metadata_duration = metadata["Metadata"][0]["Data"][ - "Duration" + for i in range(len(metadata["Metadata"])): + metadata_type = metadata["Metadata"][i]["Type"] + metadata_offset = metadata["Metadata"][i]["Data"][ + "Offset" ] - metadata_text = metadata["Metadata"][0]["Data"][ - "text" - ]["Text"] - yield ( - [ - metadata_offset, - metadata_duration, - ], - metadata_text, - None, - ) - elif metadata_type == "SentenceBoundary": - raise NotImplementedError( - "SentenceBoundary is not supported due to being broken." - ) - elif metadata_type == "SessionEnd": - continue - else: - raise NotImplementedError( - f"Unknown metadata type: {metadata_type}" - ) + if metadata_type == "WordBoundary": + metadata_duration = metadata["Metadata"][i][ + "Data" + ]["Duration"] + metadata_text = metadata["Metadata"][i]["Data"][ + "text" + ]["Text"] + yield { + "type": metadata_type, + "offset": metadata_offset, + "duration": metadata_duration, + "text": metadata_text, + } + elif metadata_type == "SentenceBoundary": + raise UnknownResponse( + "SentenceBoundary is not supported due to being broken." + ) + elif metadata_type == "SessionEnd": + continue + else: + raise UnknownResponse( + f"Unknown metadata type: {metadata_type}" + ) elif ( "Path" in parameters and parameters["Path"] == "response" @@ -368,25 +382,60 @@ class Communicate: Content-Type:application/json; charset=utf-8 Path:response - {"context":{"serviceTag":"yyyyyyyyyyyyyyyyyyy"},"audio":{"type":"inline","streamId":"zzzzzzzzzzzzzzzzz"}} + {"context":{"serviceTag":"yyyyyyyyyyyyyyyyyyy"},"audio": + {"type":"inline","streamId":"zzzzzzzzzzzzzzzzz"}} """ pass else: - raise ValueError( + raise UnknownResponse( "The response from the service is not recognized.\n" + received.data ) elif received.type == aiohttp.WSMsgType.BINARY: if download: - yield ( - None, - None, - b"Path:audio\r\n".join( + yield { + "type": "audio", + "data": b"Path:audio\r\n".join( received.data.split(b"Path:audio\r\n")[1:] ), - ) + } + audio_was_received = True else: - raise ValueError( + raise UnexpectedResponse( "The service sent a binary message, but we are not expecting one." ) - await websocket.close() + + if not audio_was_received: + raise NoAudioReceived( + "No audio was received from the service. Please verify that your parameters are correct." + ) + + async def save( + self, audio_fname: str | bytes, metadata_fname: Optional[str | bytes] = None + ): + """ + Save the audio and metadata to the specified files. + """ + written_audio = False + try: + audio = open(audio_fname, "wb") + metadata = None + if metadata_fname is not None: + metadata = open(metadata_fname, "w") + + async for message in self.stream(): + if message["type"] == "audio": + audio.write(message["data"]) + written_audio = True + elif metadata is not None and message["type"] == "WordBoundary": + json.dump(message, metadata) + metadata.write("\n") + finally: + audio.close() + if metadata is not None: + metadata.close() + + if not written_audio: + raise NoAudioReceived( + "No audio was received from the service, so the file is empty." + ) diff --git a/src/edge_tts/exceptions.py b/src/edge_tts/exceptions.py new file mode 100644 index 0000000..c37c55a --- /dev/null +++ b/src/edge_tts/exceptions.py @@ -0,0 +1,13 @@ +class UnknownResponse(Exception): + """Raised when an unknown response is received from the server.""" + + +class UnexpectedResponse(Exception): + """Raised when an unexpected response is received from the server. + + This hasn't happened yet, but it's possible that the server will + change its response format in the future.""" + + +class NoAudioReceived(Exception): + """Raised when no audio is received from the server.""" diff --git a/src/edge_tts/submaker.py b/src/edge_tts/submaker.py index 6988518..5a432c3 100644 --- a/src/edge_tts/submaker.py +++ b/src/edge_tts/submaker.py @@ -28,9 +28,9 @@ def mktimestamp(time_unit): Returns: str: The timecode of the subtitle. """ - hour = math.floor(time_unit / 10 ** 7 / 3600) - minute = math.floor((time_unit / 10 ** 7 / 60) % 60) - seconds = (time_unit / 10 ** 7) % 60 + hour = math.floor(time_unit / 10**7 / 3600) + minute = math.floor((time_unit / 10**7 / 60) % 60) + seconds = (time_unit / 10**7) % 60 return f"{hour:02d}:{minute:02d}:{seconds:06.3f}" @@ -48,7 +48,7 @@ class SubMaker: subtitles should overlap. """ self.subs_and_offset = [] - self.overlapping = overlapping * (10 ** 7) + self.overlapping = overlapping * (10**7) def create_sub(self, timestamp, text): """ diff --git a/src/edge_tts/util.py b/src/edge_tts/util.py index 6a4a29f..7f55ed5 100644 --- a/src/edge_tts/util.py +++ b/src/edge_tts/util.py @@ -11,9 +11,6 @@ from edge_tts import Communicate, SubMaker, list_voices async def _list_voices(proxy): - """ - List available voices. - """ for idx, voice in enumerate(await list_voices(proxy=proxy)): if idx != 0: print() @@ -26,34 +23,36 @@ async def _list_voices(proxy): async def _tts(args): - tts = Communicate() - subs = SubMaker(args.overlapping) - if args.write_media: - media_file = open(args.write_media, "wb") # pylint: disable=consider-using-with - async for i in tts.run( + tts = await Communicate( args.text, - args.boundary_type, - args.codec, args.voice, - args.pitch, - args.rate, - args.volume, proxy=args.proxy, - ): - if i[2] is not None: - if not args.write_media: - sys.stdout.buffer.write(i[2]) - else: - media_file.write(i[2]) - elif i[0] is not None and i[1] is not None: - subs.create_sub(i[0], i[1]) - if args.write_media: - media_file.close() - if not args.write_subtitles: - sys.stderr.write(subs.generate_subs()) - else: - with open(args.write_subtitles, "w", encoding="utf-8") as file: - file.write(subs.generate_subs()) + rate=args.rate, + volume=args.volume, + ) + try: + media_file = None + if args.write_media: + media_file = open(args.write_media, "wb") + + subs = SubMaker(args.overlapping) + async for data in tts.stream(): + if data["type"] == "audio": + if not args.write_media: + sys.stdout.buffer.write(data["data"]) + else: + media_file.write(data["data"]) + elif data["type"] == "WordBoundary": + subs.create_sub([data["offset"], data["duration"]], data["text"]) + + if not args.write_subtitles: + sys.stderr.write(subs.generate_subs()) + else: + with open(args.write_subtitles, "w", encoding="utf-8") as file: + file.write(subs.generate_subs()) + finally: + if media_file is not None: + media_file.close() async def _main(): @@ -64,23 +63,13 @@ async def _main(): parser.add_argument( "-v", "--voice", - help="voice for TTS. " - "Default: Microsoft Server Speech Text to Speech Voice (en-US, AriaNeural)", - default="Microsoft Server Speech Text to Speech Voice (en-US, AriaNeural)", - ) - parser.add_argument( - "-c", - "--codec", - help="codec format. Default: audio-24khz-48kbitrate-mono-mp3. " - "Another choice is webm-24khz-16bit-mono-opus. " - "For more info check https://bit.ly/2T33h6S", - default="audio-24khz-48kbitrate-mono-mp3", + help="voice for TTS. " "Default: en-US-AriaNeural", + default="en-US-AriaNeural", ) group.add_argument( "-l", "--list-voices", - help="lists available voices. " - "Edge's list is incomplete so check https://bit.ly/2SFq1d3", + help="lists available voices", action="store_true", ) parser.add_argument( @@ -109,32 +98,19 @@ async def _main(): type=float, ) parser.add_argument( - "-b", - "--boundary-type", - help="set boundary type for subtitles. Default 0 for none. Set 1 for word_boundary.", - default=0, - type=int, - ) - parser.add_argument( - "--write-media", help="instead of stdout, send media output to provided file" + "--write-media", help="send media output to file instead of stdout" ) parser.add_argument( "--write-subtitles", - help="instead of stderr, send subtitle output to provided file (implies boundary-type is 1)", - ) - parser.add_argument( - "--proxy", - help="proxy", + help="send subtitle output to provided file instead of stderr", ) + parser.add_argument("--proxy", help="use a proxy for TTS and voice list.") args = parser.parse_args() if args.list_voices: await _list_voices(args.proxy) sys.exit(0) - if args.write_subtitles and args.boundary_type == 0: - args.boundary_type = 1 - if args.text is not None or args.file is not None: if args.file is not None: # we need to use sys.stdin.read() because some devices @@ -151,9 +127,6 @@ async def _main(): def main(): - """ - Main function. - """ asyncio.get_event_loop().run_until_complete(_main()) From 7a9e4a62e3c5ba0535701d98ea3ea88892594f9d Mon Sep 17 00:00:00 2001 From: rany2 Date: Wed, 4 Jan 2023 23:52:21 +0200 Subject: [PATCH 02/14] Specify encoding on metadata fname open in save() --- src/edge_tts/communicate.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/edge_tts/communicate.py b/src/edge_tts/communicate.py index 2c14322..5c04c06 100644 --- a/src/edge_tts/communicate.py +++ b/src/edge_tts/communicate.py @@ -421,7 +421,7 @@ class Communicate: audio = open(audio_fname, "wb") metadata = None if metadata_fname is not None: - metadata = open(metadata_fname, "w") + metadata = open(metadata_fname, "w", encoding="utf-8") async for message in self.stream(): if message["type"] == "audio": From e54e09dfcba1ec09f047efc0c60b7cb7c5810c0c Mon Sep 17 00:00:00 2001 From: rany2 Date: Wed, 4 Jan 2023 23:53:06 +0200 Subject: [PATCH 03/14] Add newline for edge_tts __init__.py --- src/edge_tts/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/edge_tts/__init__.py b/src/edge_tts/__init__.py index 24b4e77..37e0709 100644 --- a/src/edge_tts/__init__.py +++ b/src/edge_tts/__init__.py @@ -4,4 +4,4 @@ __init__ for edge_tts from .communicate import Communicate from .list_voices import list_voices, VoicesManager -from .submaker import SubMaker \ No newline at end of file +from .submaker import SubMaker From f1709e7e93513a87ff50fdd4b0ecedc59bbccf99 Mon Sep 17 00:00:00 2001 From: rany2 Date: Thu, 5 Jan 2023 00:04:22 +0200 Subject: [PATCH 04/14] update dynamic_voice_selection.py for new API --- examples/dynamic_voice_selection.py | 24 +++++++++++++----------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/examples/dynamic_voice_selection.py b/examples/dynamic_voice_selection.py index fb85a1c..ce4cb9a 100644 --- a/examples/dynamic_voice_selection.py +++ b/examples/dynamic_voice_selection.py @@ -1,26 +1,28 @@ +#!/usr/bin/env python3 + +""" +Example of dynamic voice selection using VoicesManager. +""" + import asyncio -import edge_tts -from edge_tts import VoicesManager import random +import edge_tts +from edge_tts import VoicesManager + + async def main(): - """ - Main function - """ voices = await VoicesManager.create() - voice = voices.find(Gender="Male", Language="es") + voice = voices.find(Gender="Male", Language="es") # Also supports Locales # voice = voices.find(Gender="Female", Locale="es-AR") VOICE = random.choice(voice)["ShortName"] TEXT = "Hoy es un buen día." OUTPUT_FILE = "spanish.mp3" - communicate = edge_tts.Communicate() + communicate = edge_tts.Communicate(TEXT, VOICE) + communicate.save(OUTPUT_FILE) - with open(OUTPUT_FILE, "wb") as f: - async for i in communicate.run(TEXT, voice=VOICE): - if i[2] is not None: - f.write(i[2]) if __name__ == "__main__": asyncio.get_event_loop().run_until_complete(main()) From 4a091e485944d9a28182c94ae8d7e403f6f4abf4 Mon Sep 17 00:00:00 2001 From: rany2 Date: Thu, 5 Jan 2023 00:04:48 +0200 Subject: [PATCH 05/14] update examples/basic_generation.py for new API * also rename it --- examples/basic_generation.py | 22 ++++++++++++++++++++++ examples/example.py | 24 ------------------------ 2 files changed, 22 insertions(+), 24 deletions(-) create mode 100644 examples/basic_generation.py delete mode 100644 examples/example.py diff --git a/examples/basic_generation.py b/examples/basic_generation.py new file mode 100644 index 0000000..844513d --- /dev/null +++ b/examples/basic_generation.py @@ -0,0 +1,22 @@ +#!/usr/bin/env python3 + +""" +Basic example of edge_tts usage. +""" + +import asyncio + +import edge_tts + + +async def main(): + TEXT = "Hello World!" + VOICE = "en-GB-SoniaNeural" + OUTPUT_FILE = "test.mp3" + + communicate = edge_tts.Communicate(TEXT, VOICE) + await communicate.save(OUTPUT_FILE) + + +if __name__ == "__main__": + asyncio.get_event_loop().run_until_complete(main()) diff --git a/examples/example.py b/examples/example.py deleted file mode 100644 index 14ce848..0000000 --- a/examples/example.py +++ /dev/null @@ -1,24 +0,0 @@ -#!/usr/bin/env python3 -""" -Example Python script that shows how to use edge-tts as a module -""" - -import asyncio -import edge_tts - -async def main(): - """ - Main function - """ - TEXT = "Hello World!" - VOICE = "en-GB-SoniaNeural" - OUTPUT_FILE = "test.mp3" - - communicate = edge_tts.Communicate() - with open(OUTPUT_FILE, "wb") as f: - async for i in communicate.run(TEXT, voice=VOICE): - if i[2] is not None: - f.write(i[2]) - -if __name__ == "__main__": - asyncio.get_event_loop().run_until_complete(main()) \ No newline at end of file From cd84fa972a5899428ac813bd4e655a86b9faf47c Mon Sep 17 00:00:00 2001 From: rany2 Date: Thu, 5 Jan 2023 00:07:01 +0200 Subject: [PATCH 06/14] fixup! update dynamic_voice_selection.py for new API --- examples/dynamic_voice_selection.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/dynamic_voice_selection.py b/examples/dynamic_voice_selection.py index ce4cb9a..844e51d 100644 --- a/examples/dynamic_voice_selection.py +++ b/examples/dynamic_voice_selection.py @@ -21,7 +21,7 @@ async def main(): OUTPUT_FILE = "spanish.mp3" communicate = edge_tts.Communicate(TEXT, VOICE) - communicate.save(OUTPUT_FILE) + await communicate.save(OUTPUT_FILE) if __name__ == "__main__": From 8c356a000cf16f17bc8c21890ccecc4c8a2dd2c6 Mon Sep 17 00:00:00 2001 From: rany2 Date: Thu, 5 Jan 2023 00:07:08 +0200 Subject: [PATCH 07/14] Slightly cleanup some more --- src/edge_tts/communicate.py | 25 ++++++++----------------- src/edge_tts/exceptions.py | 2 ++ src/edge_tts/list_voices.py | 7 +++---- src/edge_tts/util.py | 16 ++++++++++------ 4 files changed, 23 insertions(+), 27 deletions(-) diff --git a/src/edge_tts/communicate.py b/src/edge_tts/communicate.py index 5c04c06..0f416e0 100644 --- a/src/edge_tts/communicate.py +++ b/src/edge_tts/communicate.py @@ -12,7 +12,8 @@ from xml.sax.saxutils import escape import aiohttp -from edge_tts.exceptions import * +from edge_tts.exceptions import (NoAudioReceived, UnexpectedResponse, + UnknownResponse) from .constants import WSS_URL @@ -207,7 +208,6 @@ class Communicate: ValueError: If the voice is not valid. """ self.text = text - self.boundary_type = 1 self.codec = "audio-24khz-48kbitrate-mono-mp3" self.voice = voice # Possible values for voice are: @@ -284,7 +284,7 @@ class Communicate: # download indicates whether we should be expecting audio data, # this is so what we avoid getting binary data from the websocket # and falsely thinking it's audio data. - download = False + download_audio = False # audio_was_received indicates whether we have received audio data # from the websocket. This is so we can raise an exception if we @@ -332,12 +332,12 @@ class Communicate: "Path" in parameters and parameters["Path"] == "turn.start" ): - download = True + download_audio = True elif ( "Path" in parameters and parameters["Path"] == "turn.end" ): - download = False + download_audio = False break elif ( "Path" in parameters @@ -376,15 +376,6 @@ class Communicate: "Path" in parameters and parameters["Path"] == "response" ): - # TODO: implement this: - """ - X-RequestId:xxxxxxxxxxxxxxxxxxxxxxxxx - Content-Type:application/json; charset=utf-8 - Path:response - - {"context":{"serviceTag":"yyyyyyyyyyyyyyyyyyy"},"audio": - {"type":"inline","streamId":"zzzzzzzzzzzzzzzzz"}} - """ pass else: raise UnknownResponse( @@ -392,7 +383,7 @@ class Communicate: + received.data ) elif received.type == aiohttp.WSMsgType.BINARY: - if download: + if download_audio: yield { "type": "audio", "data": b"Path:audio\r\n".join( @@ -402,12 +393,12 @@ class Communicate: audio_was_received = True else: raise UnexpectedResponse( - "The service sent a binary message, but we are not expecting one." + "We received a binary message, but we are not expecting one." ) if not audio_was_received: raise NoAudioReceived( - "No audio was received from the service. Please verify that your parameters are correct." + "No audio was received. Please verify that your parameters are correct." ) async def save( diff --git a/src/edge_tts/exceptions.py b/src/edge_tts/exceptions.py index c37c55a..dd2668c 100644 --- a/src/edge_tts/exceptions.py +++ b/src/edge_tts/exceptions.py @@ -1,3 +1,5 @@ +"""Exceptions for the Edge TTS project.""" + class UnknownResponse(Exception): """Raised when an unknown response is received from the server.""" diff --git a/src/edge_tts/list_voices.py b/src/edge_tts/list_voices.py index f1d50a3..1fba22b 100644 --- a/src/edge_tts/list_voices.py +++ b/src/edge_tts/list_voices.py @@ -1,5 +1,5 @@ """ -list_voices package. +list_voices package for edge_tts. """ import json @@ -9,13 +9,12 @@ import aiohttp from .constants import VOICE_LIST -async def list_voices(proxy=None): +async def list_voices(*, proxy=None): """ List all available voices and their attributes. This pulls data from the URL used by Microsoft Edge to return a list of - all available voices. However many more experimental voices are available - than are listed here. (See https://aka.ms/csspeech/voicenames) + all available voices. Returns: dict: A dictionary of voice attributes. diff --git a/src/edge_tts/util.py b/src/edge_tts/util.py index 7f55ed5..8c403a3 100644 --- a/src/edge_tts/util.py +++ b/src/edge_tts/util.py @@ -10,7 +10,8 @@ import sys from edge_tts import Communicate, SubMaker, list_voices -async def _list_voices(proxy): +async def _print_voices(proxy): + """Print all available voices.""" for idx, voice in enumerate(await list_voices(proxy=proxy)): if idx != 0: print() @@ -22,7 +23,8 @@ async def _list_voices(proxy): print(f"{key}: {voice[key]}") -async def _tts(args): +async def _run_tts(args): + """Run TTS after parsing arguments from command line.""" tts = await Communicate( args.text, args.voice, @@ -33,6 +35,7 @@ async def _tts(args): try: media_file = None if args.write_media: + # pylint: disable=consider-using-with media_file = open(args.write_media, "wb") subs = SubMaker(args.overlapping) @@ -55,7 +58,7 @@ async def _tts(args): media_file.close() -async def _main(): +async def _async_main(): parser = argparse.ArgumentParser(description="Microsoft Edge TTS") group = parser.add_mutually_exclusive_group(required=True) group.add_argument("-t", "--text", help="what TTS will say") @@ -108,7 +111,7 @@ async def _main(): args = parser.parse_args() if args.list_voices: - await _list_voices(args.proxy) + await _print_voices(args.proxy) sys.exit(0) if args.text is not None or args.file is not None: @@ -123,11 +126,12 @@ async def _main(): with open(args.file, "r", encoding="utf-8") as file: args.text = file.read() - await _tts(args) + await _run_tts(args) def main(): - asyncio.get_event_loop().run_until_complete(_main()) + """Run the main function using asyncio.""" + asyncio.get_event_loop().run_until_complete(_async_main()) if __name__ == "__main__": From fe8b86c7f5c357d28775f0e549033bbb588366cc Mon Sep 17 00:00:00 2001 From: rany2 Date: Thu, 5 Jan 2023 00:09:56 +0200 Subject: [PATCH 08/14] add docstring to VoicesManager's create() --- src/edge_tts/list_voices.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/edge_tts/list_voices.py b/src/edge_tts/list_voices.py index 1fba22b..6e41d96 100644 --- a/src/edge_tts/list_voices.py +++ b/src/edge_tts/list_voices.py @@ -48,6 +48,9 @@ class VoicesManager: @classmethod async def create(cls): + """ + Creates a VoicesManager object and populates it with all available voices. + """ self = VoicesManager() self.voices = await list_voices() self.voices = [ From efe0cbeddedbda30116373fd8d0e744208fea9ff Mon Sep 17 00:00:00 2001 From: rany2 Date: Thu, 5 Jan 2023 00:15:48 +0200 Subject: [PATCH 09/14] lint --- src/edge_tts/__init__.py | 2 +- src/edge_tts/exceptions.py | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/src/edge_tts/__init__.py b/src/edge_tts/__init__.py index 37e0709..342ae5e 100644 --- a/src/edge_tts/__init__.py +++ b/src/edge_tts/__init__.py @@ -3,5 +3,5 @@ __init__ for edge_tts """ from .communicate import Communicate -from .list_voices import list_voices, VoicesManager +from .list_voices import VoicesManager, list_voices from .submaker import SubMaker diff --git a/src/edge_tts/exceptions.py b/src/edge_tts/exceptions.py index dd2668c..16dcc57 100644 --- a/src/edge_tts/exceptions.py +++ b/src/edge_tts/exceptions.py @@ -1,5 +1,6 @@ """Exceptions for the Edge TTS project.""" + class UnknownResponse(Exception): """Raised when an unknown response is received from the server.""" From c4c3dc5a13e0e9f34df5a4954a0715eecdfd3443 Mon Sep 17 00:00:00 2001 From: rany2 Date: Thu, 5 Jan 2023 00:56:01 +0200 Subject: [PATCH 10/14] add more typing --- examples/basic_generation.py | 2 +- examples/dynamic_voice_selection.py | 2 +- lint.sh | 1 + mypy.ini | 13 ++++++ setup.cfg | 9 ++++- src/edge_playback/__init__.py | 63 ----------------------------- src/edge_playback/__main__.py | 57 +++++++++++++++++++++++++- src/edge_playback/py.typed | 0 src/edge_tts/communicate.py | 26 ++++++------ src/edge_tts/list_voices.py | 9 +++-- src/edge_tts/submaker.py | 41 +++++++++---------- src/edge_tts/util.py | 23 ++++++----- 12 files changed, 129 insertions(+), 117 deletions(-) create mode 100644 mypy.ini delete mode 100644 src/edge_playback/__init__.py create mode 100644 src/edge_playback/py.typed diff --git a/examples/basic_generation.py b/examples/basic_generation.py index 844513d..6c973b9 100644 --- a/examples/basic_generation.py +++ b/examples/basic_generation.py @@ -9,7 +9,7 @@ import asyncio import edge_tts -async def main(): +async def main() -> None: TEXT = "Hello World!" VOICE = "en-GB-SoniaNeural" OUTPUT_FILE = "test.mp3" diff --git a/examples/dynamic_voice_selection.py b/examples/dynamic_voice_selection.py index 844e51d..e7e67fb 100644 --- a/examples/dynamic_voice_selection.py +++ b/examples/dynamic_voice_selection.py @@ -11,7 +11,7 @@ import edge_tts from edge_tts import VoicesManager -async def main(): +async def main() -> None: voices = await VoicesManager.create() voice = voices.find(Gender="Male", Language="es") # Also supports Locales diff --git a/lint.sh b/lint.sh index b80309d..6532c9b 100755 --- a/lint.sh +++ b/lint.sh @@ -1,3 +1,4 @@ find src examples -name '*.py' | xargs black find src examples -name '*.py' | xargs isort find src examples -name '*.py' | xargs pylint +find src examples -name '*.py' | xargs mypy diff --git a/mypy.ini b/mypy.ini new file mode 100644 index 0000000..c06d521 --- /dev/null +++ b/mypy.ini @@ -0,0 +1,13 @@ +[mypy] +warn_return_any = True +warn_unused_configs = True + +#disallow_any_unimported = True +#disallow_any_expr = True +#disallow_any_decorated = True +#disallow_any_explicit = True +#disallow_any_generics = True +#disallow_subclassing_any = True +#disallow_untyped_calls = True +disallow_untyped_defs = True +disallow_incomplete_defs = True diff --git a/setup.cfg b/setup.cfg index 8e8cd4f..1bf386e 100644 --- a/setup.cfg +++ b/setup.cfg @@ -27,4 +27,11 @@ where=src [options.entry_points] console_scripts = edge-tts = edge_tts.__main__:main - edge-playback = edge_playback.__init__:main + edge-playback = edge_playback.__main__:main + +[options.extras_require] +dev = + black + isort + mypy + pylint diff --git a/src/edge_playback/__init__.py b/src/edge_playback/__init__.py deleted file mode 100644 index 863e4d0..0000000 --- a/src/edge_playback/__init__.py +++ /dev/null @@ -1,63 +0,0 @@ -#!/usr/bin/env python3 - -""" -Playback TTS with subtitles using edge-tts and mpv. -""" - -import os -import subprocess -import sys -import tempfile -from shutil import which - - -def main(): - depcheck_failed = False - if not which("mpv"): - print("mpv is not installed.", file=sys.stderr) - depcheck_failed = True - if not which("edge-tts"): - print("edge-tts is not installed.", file=sys.stderr) - depcheck_failed = True - if depcheck_failed: - print("Please install the missing dependencies.", file=sys.stderr) - sys.exit(1) - - media = None - subtitle = None - try: - media = tempfile.NamedTemporaryFile(delete=False) - media.close() - - subtitle = tempfile.NamedTemporaryFile(delete=False) - subtitle.close() - - print(f"Media file: {media.name}") - print(f"Subtitle file: {subtitle.name}\n") - with subprocess.Popen( - [ - "edge-tts", - f"--write-media={media.name}", - f"--write-subtitles={subtitle.name}", - ] - + sys.argv[1:] - ) as process: - process.communicate() - - with subprocess.Popen( - [ - "mpv", - f"--sub-file={subtitle.name}", - media.name, - ] - ) as process: - process.communicate() - finally: - if media is not None: - os.unlink(media.name) - if subtitle is not None: - os.unlink(subtitle.name) - - -if __name__ == "__main__": - main() diff --git a/src/edge_playback/__main__.py b/src/edge_playback/__main__.py index 2ac8c12..027e892 100644 --- a/src/edge_playback/__main__.py +++ b/src/edge_playback/__main__.py @@ -1,10 +1,63 @@ #!/usr/bin/env python3 """ -This is the main file for the edge_playback package. +Playback TTS with subtitles using edge-tts and mpv. """ -from edge_playback.__init__ import main +import os +import subprocess +import sys +import tempfile +from shutil import which + + +def main() -> None: + depcheck_failed = False + if not which("mpv"): + print("mpv is not installed.", file=sys.stderr) + depcheck_failed = True + if not which("edge-tts"): + print("edge-tts is not installed.", file=sys.stderr) + depcheck_failed = True + if depcheck_failed: + print("Please install the missing dependencies.", file=sys.stderr) + sys.exit(1) + + media = None + subtitle = None + try: + media = tempfile.NamedTemporaryFile(delete=False) + media.close() + + subtitle = tempfile.NamedTemporaryFile(delete=False) + subtitle.close() + + print(f"Media file: {media.name}") + print(f"Subtitle file: {subtitle.name}\n") + with subprocess.Popen( + [ + "edge-tts", + f"--write-media={media.name}", + f"--write-subtitles={subtitle.name}", + ] + + sys.argv[1:] + ) as process: + process.communicate() + + with subprocess.Popen( + [ + "mpv", + f"--sub-file={subtitle.name}", + media.name, + ] + ) as process: + process.communicate() + finally: + if media is not None: + os.unlink(media.name) + if subtitle is not None: + os.unlink(subtitle.name) + if __name__ == "__main__": main() diff --git a/src/edge_playback/py.typed b/src/edge_playback/py.typed new file mode 100644 index 0000000..e69de29 diff --git a/src/edge_tts/communicate.py b/src/edge_tts/communicate.py index 0f416e0..29bbb9a 100644 --- a/src/edge_tts/communicate.py +++ b/src/edge_tts/communicate.py @@ -7,7 +7,7 @@ import json import re import time import uuid -from typing import Dict, Generator, List, Optional +from typing import Any, AsyncGenerator, Dict, Generator, List, Optional from xml.sax.saxutils import escape import aiohttp @@ -96,7 +96,7 @@ def iter_bytes(my_bytes: bytes) -> Generator[bytes, None, None]: yield my_bytes[i : i + 1] -def split_text_by_byte_length(text: bytes, byte_length: int) -> List[bytes]: +def split_text_by_byte_length(text: str | bytes, byte_length: int) -> List[bytes]: """ Splits a string into a list of strings of a given byte length while attempting to keep words together. @@ -151,7 +151,7 @@ def mkssml(text: str | bytes, voice: str, pitch: str, rate: str, volume: str) -> return ssml -def date_to_string(): +def date_to_string() -> str: """ Return Javascript-style date string. @@ -193,7 +193,7 @@ class Communicate: def __init__( self, - text: str | List[str], + text: str, voice: str = "Microsoft Server Speech Text to Speech Voice (en-US, AriaNeural)", *, pitch: str = "+0Hz", @@ -207,9 +207,9 @@ class Communicate: Raises: ValueError: If the voice is not valid. """ - self.text = text - self.codec = "audio-24khz-48kbitrate-mono-mp3" - self.voice = voice + self.text: str = text + self.codec: str = "audio-24khz-48kbitrate-mono-mp3" + self.voice: str = voice # Possible values for voice are: # - Microsoft Server Speech Text to Speech Voice (cy-GB, NiaNeural) # - cy-GB-NiaNeural @@ -232,19 +232,19 @@ class Communicate: if re.match(r"^[+-]\d+Hz$", pitch) is None: raise ValueError(f"Invalid pitch '{pitch}'.") - self.pitch = pitch + self.pitch: str = pitch if re.match(r"^[+-]0*([0-9]|([1-9][0-9])|100)%$", rate) is None: raise ValueError(f"Invalid rate '{rate}'.") - self.rate = rate + self.rate: str = rate if re.match(r"^[+-]0*([0-9]|([1-9][0-9])|100)%$", volume) is None: raise ValueError(f"Invalid volume '{volume}'.") - self.volume = volume + self.volume: str = volume - self.proxy = proxy + self.proxy: Optional[str] = proxy - async def stream(self): + async def stream(self) -> AsyncGenerator[Dict[str, Any], None]: """Streams audio and metadata from the service.""" websocket_max_size = 2**16 @@ -403,7 +403,7 @@ class Communicate: async def save( self, audio_fname: str | bytes, metadata_fname: Optional[str | bytes] = None - ): + ) -> None: """ Save the audio and metadata to the specified files. """ diff --git a/src/edge_tts/list_voices.py b/src/edge_tts/list_voices.py index 6e41d96..2f18cc4 100644 --- a/src/edge_tts/list_voices.py +++ b/src/edge_tts/list_voices.py @@ -3,13 +3,14 @@ list_voices package for edge_tts. """ import json +from typing import Any, Optional import aiohttp from .constants import VOICE_LIST -async def list_voices(*, proxy=None): +async def list_voices(*, proxy: Optional[str] = None) -> Any: """ List all available voices and their attributes. @@ -47,7 +48,7 @@ class VoicesManager: """ @classmethod - async def create(cls): + async def create(cls): # type: ignore """ Creates a VoicesManager object and populates it with all available voices. """ @@ -59,12 +60,12 @@ class VoicesManager: ] return self - def find(self, **kwargs): + def find(self, **kwargs: Any) -> list[dict[str, Any]]: """ Finds all matching voices based on the provided attributes. """ matching_voices = [ - voice for voice in self.voices if kwargs.items() <= voice.items() + voice for voice in self.voices if kwargs.items() <= voice.items() # type: ignore ] return matching_voices diff --git a/src/edge_tts/submaker.py b/src/edge_tts/submaker.py index 5a432c3..373ca96 100644 --- a/src/edge_tts/submaker.py +++ b/src/edge_tts/submaker.py @@ -6,10 +6,11 @@ information provided by the service easier. """ import math +from typing import List, Tuple from xml.sax.saxutils import escape, unescape -def formatter(offset1, offset2, subdata): +def formatter(offset1: float, offset2: float, subdata: str) -> str: """ formatter returns the timecode and the text of the subtitle. """ @@ -19,7 +20,7 @@ def formatter(offset1, offset2, subdata): ) -def mktimestamp(time_unit): +def mktimestamp(time_unit: float) -> str: """ mktimestamp returns the timecode of the subtitle. @@ -39,7 +40,7 @@ class SubMaker: SubMaker class """ - def __init__(self, overlapping=1): + def __init__(self, overlapping: int = 1) -> None: """ SubMaker constructor. @@ -47,10 +48,11 @@ class SubMaker: overlapping (int): The amount of time in seconds that the subtitles should overlap. """ - self.subs_and_offset = [] - self.overlapping = overlapping * (10**7) + self.offset: List[Tuple[float, float]] = [] + self.subs: List[str] = [] + self.overlapping: int = overlapping * (10**7) - def create_sub(self, timestamp, text): + def create_sub(self, timestamp: Tuple[float, float], text: str) -> None: """ create_sub creates a subtitle with the given timestamp and text and adds it to the list of subtitles @@ -62,40 +64,37 @@ class SubMaker: Returns: None """ - timestamp[1] += timestamp[0] - self.subs_and_offset.append(timestamp) - self.subs_and_offset.append(text) + self.offset.append((timestamp[0], timestamp[0] + timestamp[1])) + self.subs.append(text) - def generate_subs(self): + def generate_subs(self) -> str: """ generate_subs generates the complete subtitle file. Returns: str: The complete subtitle file. """ - if len(self.subs_and_offset) >= 2: + if len(self.subs) == len(self.offset): data = "WEBVTT\r\n\r\n" - for offset, subs in zip( - self.subs_and_offset[::2], self.subs_and_offset[1::2] - ): + for offset, subs in zip(self.offset, self.subs): subs = unescape(subs) - subs = [subs[i : i + 79] for i in range(0, len(subs), 79)] + split_subs: List[str] = [subs[i : i + 79] for i in range(0, len(subs), 79)] - for i in range(len(subs) - 1): - sub = subs[i] + for i in range(len(split_subs) - 1): + sub = split_subs[i] split_at_word = True if sub[-1] == " ": - subs[i] = sub[:-1] + split_subs[i] = sub[:-1] split_at_word = False if sub[0] == " ": - subs[i] = sub[1:] + split_subs[i] = sub[1:] split_at_word = False if split_at_word: - subs[i] += "-" + split_subs[i] += "-" - subs = "\r\n".join(subs) + subs = "\r\n".join(split_subs) data += formatter(offset[0], offset[1] + self.overlapping, subs) return data diff --git a/src/edge_tts/util.py b/src/edge_tts/util.py index 8c403a3..132896c 100644 --- a/src/edge_tts/util.py +++ b/src/edge_tts/util.py @@ -5,12 +5,14 @@ Main package. import argparse import asyncio +from io import BufferedWriter import sys +from typing import Any from edge_tts import Communicate, SubMaker, list_voices -async def _print_voices(proxy): +async def _print_voices(*, proxy: str) -> None: """Print all available voices.""" for idx, voice in enumerate(await list_voices(proxy=proxy)): if idx != 0: @@ -23,9 +25,9 @@ async def _print_voices(proxy): print(f"{key}: {voice[key]}") -async def _run_tts(args): +async def _run_tts(args: Any) -> None: """Run TTS after parsing arguments from command line.""" - tts = await Communicate( + tts = Communicate( args.text, args.voice, proxy=args.proxy, @@ -35,18 +37,17 @@ async def _run_tts(args): try: media_file = None if args.write_media: - # pylint: disable=consider-using-with media_file = open(args.write_media, "wb") subs = SubMaker(args.overlapping) async for data in tts.stream(): if data["type"] == "audio": - if not args.write_media: - sys.stdout.buffer.write(data["data"]) - else: + if isinstance(media_file, BufferedWriter): media_file.write(data["data"]) + else: + sys.stdout.buffer.write(data["data"]) elif data["type"] == "WordBoundary": - subs.create_sub([data["offset"], data["duration"]], data["text"]) + subs.create_sub((data["offset"], data["duration"]), data["text"]) if not args.write_subtitles: sys.stderr.write(subs.generate_subs()) @@ -58,7 +59,7 @@ async def _run_tts(args): media_file.close() -async def _async_main(): +async def _async_main() -> None: parser = argparse.ArgumentParser(description="Microsoft Edge TTS") group = parser.add_mutually_exclusive_group(required=True) group.add_argument("-t", "--text", help="what TTS will say") @@ -111,7 +112,7 @@ async def _async_main(): args = parser.parse_args() if args.list_voices: - await _print_voices(args.proxy) + await _print_voices(proxy=args.proxy) sys.exit(0) if args.text is not None or args.file is not None: @@ -129,7 +130,7 @@ async def _async_main(): await _run_tts(args) -def main(): +def main() -> None: """Run the main function using asyncio.""" asyncio.get_event_loop().run_until_complete(_async_main()) From 3e3828c04ab32c165c08d322aa6b19cbe6ee6a5b Mon Sep 17 00:00:00 2001 From: rany2 Date: Thu, 5 Jan 2023 00:58:36 +0200 Subject: [PATCH 11/14] more typing --- mypy.ini | 2 +- src/edge_tts/list_voices.py | 17 +++++++++++++---- src/edge_tts/submaker.py | 4 +++- src/edge_tts/util.py | 4 ++-- 4 files changed, 19 insertions(+), 8 deletions(-) diff --git a/mypy.ini b/mypy.ini index c06d521..f7b2f0b 100644 --- a/mypy.ini +++ b/mypy.ini @@ -8,6 +8,6 @@ warn_unused_configs = True #disallow_any_explicit = True #disallow_any_generics = True #disallow_subclassing_any = True -#disallow_untyped_calls = True +disallow_untyped_calls = True disallow_untyped_defs = True disallow_incomplete_defs = True diff --git a/src/edge_tts/list_voices.py b/src/edge_tts/list_voices.py index 2f18cc4..9793b5d 100644 --- a/src/edge_tts/list_voices.py +++ b/src/edge_tts/list_voices.py @@ -3,7 +3,7 @@ list_voices package for edge_tts. """ import json -from typing import Any, Optional +from typing import Any, Dict, List, Optional import aiohttp @@ -47,8 +47,12 @@ class VoicesManager: A class to find the correct voice based on their attributes. """ + def __init__(self) -> None: + self.voices: List[Dict[str, Any]] = [] + self.called_create: bool = False + @classmethod - async def create(cls): # type: ignore + async def create(cls: Any) -> "VoicesManager": """ Creates a VoicesManager object and populates it with all available voices. """ @@ -58,14 +62,19 @@ class VoicesManager: {**voice, **{"Language": voice["Locale"].split("-")[0]}} for voice in self.voices ] + self.called_create = True return self - def find(self, **kwargs: Any) -> list[dict[str, Any]]: + def find(self, **kwargs: Any) -> List[Dict[str, Any]]: """ Finds all matching voices based on the provided attributes. """ + if not self.called_create: + raise RuntimeError( + "VoicesManager.find() called before VoicesManager.create()" + ) matching_voices = [ - voice for voice in self.voices if kwargs.items() <= voice.items() # type: ignore + voice for voice in self.voices if kwargs.items() <= voice.items() ] return matching_voices diff --git a/src/edge_tts/submaker.py b/src/edge_tts/submaker.py index 373ca96..03a04db 100644 --- a/src/edge_tts/submaker.py +++ b/src/edge_tts/submaker.py @@ -78,7 +78,9 @@ class SubMaker: data = "WEBVTT\r\n\r\n" for offset, subs in zip(self.offset, self.subs): subs = unescape(subs) - split_subs: List[str] = [subs[i : i + 79] for i in range(0, len(subs), 79)] + split_subs: List[str] = [ + subs[i : i + 79] for i in range(0, len(subs), 79) + ] for i in range(len(split_subs) - 1): sub = split_subs[i] diff --git a/src/edge_tts/util.py b/src/edge_tts/util.py index 132896c..638a5dc 100644 --- a/src/edge_tts/util.py +++ b/src/edge_tts/util.py @@ -5,8 +5,8 @@ Main package. import argparse import asyncio -from io import BufferedWriter import sys +from io import BufferedWriter from typing import Any from edge_tts import Communicate, SubMaker, list_voices @@ -45,7 +45,7 @@ async def _run_tts(args: Any) -> None: if isinstance(media_file, BufferedWriter): media_file.write(data["data"]) else: - sys.stdout.buffer.write(data["data"]) + sys.stdout.buffer.write(data["data"]) elif data["type"] == "WordBoundary": subs.create_sub((data["offset"], data["duration"]), data["text"]) From d7f60bc49f3a7b5390aaa0c8a1f6bdec8619a99f Mon Sep 17 00:00:00 2001 From: rany2 Date: Thu, 5 Jan 2023 01:03:15 +0200 Subject: [PATCH 12/14] more typing --- mypy.ini | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/mypy.ini b/mypy.ini index f7b2f0b..9e84878 100644 --- a/mypy.ini +++ b/mypy.ini @@ -2,12 +2,15 @@ warn_return_any = True warn_unused_configs = True -#disallow_any_unimported = True +disallow_any_unimported = True #disallow_any_expr = True -#disallow_any_decorated = True +disallow_any_decorated = True #disallow_any_explicit = True -#disallow_any_generics = True -#disallow_subclassing_any = True +disallow_any_generics = True +disallow_subclassing_any = True disallow_untyped_calls = True disallow_untyped_defs = True disallow_incomplete_defs = True + +[mypy-edge_tts.list_voices] +disallow_any_decorated = False From 57c5143aac5b7887e8bc091c549abab490e2d7cb Mon Sep 17 00:00:00 2001 From: rany2 Date: Thu, 5 Jan 2023 01:13:33 +0200 Subject: [PATCH 13/14] complete typing for now --- mypy.ini | 23 ++++++++++++++++++----- src/edge_tts/__init__.py | 2 ++ 2 files changed, 20 insertions(+), 5 deletions(-) diff --git a/mypy.ini b/mypy.ini index 9e84878..01e108d 100644 --- a/mypy.ini +++ b/mypy.ini @@ -1,16 +1,29 @@ [mypy] -warn_return_any = True -warn_unused_configs = True - disallow_any_unimported = True -#disallow_any_expr = True +disallow_any_expr = False disallow_any_decorated = True -#disallow_any_explicit = True +disallow_any_explicit = False disallow_any_generics = True disallow_subclassing_any = True + disallow_untyped_calls = True disallow_untyped_defs = True disallow_incomplete_defs = True +check_untyped_defs = True +disallow_untyped_decorators = True + +implicit_optional = False +strict_optional = True + +warn_redundant_casts = True +warn_unused_ignores = True +warn_no_return = True +warn_return_any = True +warn_unreachable = True + +strict_concatenate = True +strict_equality = True +strict = True [mypy-edge_tts.list_voices] disallow_any_decorated = False diff --git a/src/edge_tts/__init__.py b/src/edge_tts/__init__.py index 342ae5e..8ea0ee7 100644 --- a/src/edge_tts/__init__.py +++ b/src/edge_tts/__init__.py @@ -5,3 +5,5 @@ __init__ for edge_tts from .communicate import Communicate from .list_voices import VoicesManager, list_voices from .submaker import SubMaker + +__all__ = ["Communicate", "VoicesManager", "list_voices", "SubMaker"] From e55b18d3f49658a88f5206ce75faae1cbadf2976 Mon Sep 17 00:00:00 2001 From: rany2 Date: Thu, 5 Jan 2023 01:14:56 +0200 Subject: [PATCH 14/14] add mypy workflow --- .github/workflows/lint.yml | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) create mode 100644 .github/workflows/lint.yml diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml new file mode 100644 index 0000000..ea05295 --- /dev/null +++ b/.github/workflows/lint.yml @@ -0,0 +1,26 @@ +name: Lint + +on: + push: + paths: + - '*.py' + +jobs: + mypy: + runs-on: ubuntu-latest + steps: + - name: Setup Python + uses: actions/setup-python@v1 + with: + python-version: 3.7.4 + architecture: x64 + - name: Checkout + uses: actions/checkout@v1 + - name: Install mypy + run: pip install mypy + - name: Run mypy + uses: sasanquaneuf/mypy-github-action@releases/v1 + with: + checkName: 'mypy' # NOTE: this needs to be the same as the job name + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}