from argparse import ArgumentParser from os import chmod, environ from os.path import dirname from random import choice from re import M import shlex import subprocess import sys from protonvpn_cli import connection from protonvpn_cli.constants import PASSFILE from protonvpn_cli.utils import check_init, get_fastest_server, get_servers, set_config_value, pull_server_data def run_proton(args): exit( subprocess.run( ["proton"], cwd="/app", env=dict( environ, PVPN_CMD_ARGS=" ".join(args))).returncode) environ["PVPN_USERNAME"] = environ["PVPN_USERNAME"] + (environ["PVPN_TAGS"] or "") with open(PASSFILE, "w") as f: f.write("{0}\n{1}".format(environ["PVPN_USERNAME"], environ["PVPN_PASSWORD"])) chmod(PASSFILE, 0o600) check_init() set_config_value("USER", "username", environ["PVPN_USERNAME"]) set_config_value("USER", "tier", environ["PVPN_TIER"]) set_config_value("USER", "default_protocol", environ["PVPN_PROTOCOL"]) set_config_value("USER", "initialized", 1) args = sys.argv[1:] if not args: args = shlex.split(environ.get("PVPN_CMD_ARGS") or "") environ["PVPN_CMD_ARGS"] = "" parser = ArgumentParser(exit_on_error=False) subParsers = parser.add_subparsers(dest="command") initParser = subParsers.add_parser("init", aliases=["i"]) connectParser = subParsers.add_parser("connect", aliases=["c"]) for aliases in [ ["-f", "--fastest"], ["-r", "--random"], ["-s", "--streaming"], ["--sc"], ["--p2p"], ["--tor"] ]: connectParser.add_argument(*aliases, action="store_true") connectParser.add_argument("--cc") parsedArgs = None try: parsedArgs = parser.parse_args(args) except: pass if parsedArgs is not None and ( len( list( filter( lambda item: item[1] not in [False, None], vars(parsedArgs).items()))) > 1): def match(server): features = list() if parsedArgs.streaming: pass if parsedArgs.sc: pass if parsedArgs.p2p: pass if parsedArgs.tor: pass return (parsedArgs.cc is None or server.exit_country.lower() == parsedArgs.cc.lower()) and ( all(feature in server.features for feature in features)) pull_server_data(force=True) servers = list(filter(lambda server: match(server), get_servers())) if len(servers) > 0: if parsedArgs.fastest or not parsedArgs.random: server = get_fastest_server(servers) else: server = choice(servers) run_proton(["connect", server["Name"]]) else: raise Exception( f"Unable to find a server matching the specified criteria {args[1:]}!") else: run_proton(args)