from argparse import ArgumentParser from os import environ from os.path import dirname from re import M import shlex import subprocess import sys from protonvpn_cli.cli import FeatureEnum, protonvpn def run_proton(args): exit( subprocess.run( ["pipenv", "run", "proton"], cwd="/app", env=dict( environ, PIPENV_VENV_IN_PROJECT=f"{1}", PVPN_CMD_ARGS=" ".join(args))).returncode) protonvpn.ensure_connectivity() args = sys.argv[1:] if not args: args = shlex.split(environ.get("PVPN_CMD_ARGS") or "") environ["PVPN_CMD_ARGS"] = "" parser = ArgumentParser(exit_on_error=False) subParsers = parser.add_subparsers(dest="command") initParser = subParsers.add_parser("init", alias=["i"]) connectParser = subParsers.add_parser("connect", aliases=["c"]) for aliases in [ ["-f", "--fastest"], ["-r", "--random"], ["-s", "--streaming"], ["--sc"], ["--p2p"], ["--tor"] ]: connectParser.add_argument(*aliases, action="store_true") connectParser.add_argument("--cc") parsedArgs = None try: parsedArgs = parser.parse_args(args) except: pass if parsedArgs is not None and parsedArgs.command == "init": userName = input("Enter your Proton VPN username or email: ") subprocess.run(["protonvpn-cli", "login", userName]) else: session = protonvpn.get_session() try: session.ensure_valid() except: raise Exception("Your current session is invalid. Please initialize the session using the `init` subcommand.") environ["PVPN_USERNAME"] = session.vpn_username + (environ.get("PVPN_TAGS") or "") environ["PVPN_PASSWORD"] = session.vpn_password environ["PVPN_TIER"] = f"{session.vpn_tier}" if parsedArgs is not None and ( len( list( filter( lambda item: item[1] not in [False, None], vars(parsedArgs).items()))) > 1): country = protonvpn.get_country() def match(server): features = list() if parsedArgs.streaming: features.append(FeatureEnum.STREAMING) if parsedArgs.sc: features.append(FeatureEnum.SECURE_CORE) if parsedArgs.p2p: features.append(FeatureEnum.P2P) if parsedArgs.tor: features.append(FeatureEnum.TOR) return (parsedArgs.cc is None or server.exit_country.lower() == parsedArgs.cc.lower()) and ( all(feature in server.features for feature in features)) servers = session.servers.filter(match) if len(servers) > 0: if parsedArgs.fastest or not parsedArgs.random: server = servers.get_fastest_server() else: server = servers.get_random_server() run_proton(["connect", server.name]) else: raise Exception(f"Unable to find a server matching the specified criteria {args[1:]}!") else: run_proton(args)