100 lines
3 KiB
Python
100 lines
3 KiB
Python
from argparse import ArgumentParser
|
|
from os import environ
|
|
from os.path import dirname
|
|
from re import M
|
|
import shlex
|
|
import subprocess
|
|
import sys
|
|
from protonvpn_cli.cli import FeatureEnum, protonvpn
|
|
|
|
def run_proton(args):
|
|
exit(
|
|
subprocess.run(
|
|
["pipenv", "run", "proton"],
|
|
cwd="/app",
|
|
env=dict(
|
|
environ,
|
|
PIPENV_VENV_IN_PROJECT=f"{1}",
|
|
PVPN_CMD_ARGS=" ".join(args))).returncode)
|
|
|
|
protonvpn.ensure_connectivity()
|
|
|
|
args = sys.argv[1:]
|
|
|
|
if not args:
|
|
args = shlex.split(environ.get("PVPN_CMD_ARGS") or "")
|
|
environ["PVPN_CMD_ARGS"] = ""
|
|
|
|
parser = ArgumentParser(exit_on_error=False)
|
|
subParsers = parser.add_subparsers(dest="command")
|
|
initParser = subParsers.add_parser("init", alias=["i"])
|
|
connectParser = subParsers.add_parser("connect", aliases=["c"])
|
|
|
|
for aliases in [
|
|
["-f", "--fastest"],
|
|
["-r", "--random"],
|
|
["-s", "--streaming"],
|
|
["--sc"],
|
|
["--p2p"],
|
|
["--tor"]
|
|
]:
|
|
connectParser.add_argument(*aliases, action="store_true")
|
|
|
|
connectParser.add_argument("--cc")
|
|
parsedArgs = None
|
|
|
|
try:
|
|
parsedArgs = parser.parse_args(args)
|
|
except:
|
|
pass
|
|
|
|
if parsedArgs is not None and parsedArgs.command == "init":
|
|
userName = input("Enter your Proton VPN username or email: ")
|
|
subprocess.run(["protonvpn-cli", "login", userName])
|
|
else:
|
|
session = protonvpn.get_session()
|
|
try:
|
|
session.ensure_valid()
|
|
except:
|
|
raise Exception("Your current session is invalid. Please initialize the session using the `init` subcommand.")
|
|
|
|
environ["PVPN_USERNAME"] = session.vpn_username + (environ.get("PVPN_TAGS") or "")
|
|
environ["PVPN_PASSWORD"] = session.vpn_password
|
|
environ["PVPN_TIER"] = f"{session.vpn_tier}"
|
|
|
|
if parsedArgs is not None and (
|
|
len(
|
|
list(
|
|
filter(
|
|
lambda item: item[1] not in [False, None],
|
|
vars(parsedArgs).items()))) > 1):
|
|
country = protonvpn.get_country()
|
|
|
|
def match(server):
|
|
features = list()
|
|
|
|
if parsedArgs.streaming:
|
|
features.append(FeatureEnum.STREAMING)
|
|
if parsedArgs.sc:
|
|
features.append(FeatureEnum.SECURE_CORE)
|
|
if parsedArgs.p2p:
|
|
features.append(FeatureEnum.P2P)
|
|
if parsedArgs.tor:
|
|
features.append(FeatureEnum.TOR)
|
|
|
|
return (parsedArgs.cc is None or server.exit_country.lower() == parsedArgs.cc.lower()) and (
|
|
all(feature in server.features for feature in features))
|
|
|
|
servers = session.servers.filter(match)
|
|
|
|
if len(servers) > 0:
|
|
if parsedArgs.fastest or not parsedArgs.random:
|
|
server = servers.get_fastest_server()
|
|
else:
|
|
server = servers.get_random_server()
|
|
|
|
run_proton(["connect", server.name])
|
|
else:
|
|
raise Exception(f"Unable to find a server matching the specified criteria {args[1:]}!")
|
|
else:
|
|
run_proton(args)
|