2024-11-10 11:33:36 +00:00
|
|
|
from argparse import ArgumentParser
|
2024-12-04 15:50:29 +00:00
|
|
|
from os import chmod, environ
|
2024-11-10 11:33:36 +00:00
|
|
|
from os.path import dirname
|
2024-12-04 15:50:29 +00:00
|
|
|
from random import choice
|
2024-11-10 11:33:36 +00:00
|
|
|
from re import M
|
|
|
|
import shlex
|
|
|
|
import subprocess
|
|
|
|
import sys
|
2024-12-04 15:50:29 +00:00
|
|
|
from protonvpn_cli import connection
|
|
|
|
from protonvpn_cli.constants import PASSFILE
|
|
|
|
from protonvpn_cli.utils import check_init, get_fastest_server, get_servers, set_config_value, pull_server_data
|
2024-11-10 11:33:36 +00:00
|
|
|
|
2024-11-29 00:13:58 +00:00
|
|
|
|
2024-11-10 11:33:36 +00:00
|
|
|
def run_proton(args):
|
|
|
|
exit(
|
|
|
|
subprocess.run(
|
2024-12-04 15:50:29 +00:00
|
|
|
["proton"],
|
2024-11-10 11:33:36 +00:00
|
|
|
cwd="/app",
|
|
|
|
env=dict(
|
|
|
|
environ,
|
|
|
|
PVPN_CMD_ARGS=" ".join(args))).returncode)
|
|
|
|
|
2024-12-04 15:50:29 +00:00
|
|
|
environ["PVPN_USERNAME"] = environ["PVPN_USERNAME"] + (environ["PVPN_TAGS"] or "")
|
2024-11-29 00:13:58 +00:00
|
|
|
|
2024-12-04 15:50:29 +00:00
|
|
|
with open(PASSFILE, "w") as f:
|
|
|
|
f.write("{0}\n{1}".format(environ["PVPN_USERNAME"], environ["PVPN_PASSWORD"]))
|
|
|
|
chmod(PASSFILE, 0o600)
|
|
|
|
|
|
|
|
check_init()
|
|
|
|
set_config_value("USER", "username", environ["PVPN_USERNAME"])
|
|
|
|
set_config_value("USER", "tier", environ["PVPN_TIER"])
|
|
|
|
set_config_value("USER", "default_protocol", environ["PVPN_PROTOCOL"])
|
|
|
|
set_config_value("USER", "initialized", 1)
|
2024-11-10 11:33:36 +00:00
|
|
|
|
|
|
|
args = sys.argv[1:]
|
|
|
|
|
|
|
|
if not args:
|
|
|
|
args = shlex.split(environ.get("PVPN_CMD_ARGS") or "")
|
|
|
|
environ["PVPN_CMD_ARGS"] = ""
|
|
|
|
|
|
|
|
parser = ArgumentParser(exit_on_error=False)
|
|
|
|
subParsers = parser.add_subparsers(dest="command")
|
2024-12-04 15:50:29 +00:00
|
|
|
initParser = subParsers.add_parser("init", aliases=["i"])
|
2024-11-10 11:33:36 +00:00
|
|
|
connectParser = subParsers.add_parser("connect", aliases=["c"])
|
|
|
|
|
|
|
|
for aliases in [
|
|
|
|
["-f", "--fastest"],
|
|
|
|
["-r", "--random"],
|
|
|
|
["-s", "--streaming"],
|
|
|
|
["--sc"],
|
|
|
|
["--p2p"],
|
|
|
|
["--tor"]
|
|
|
|
]:
|
|
|
|
connectParser.add_argument(*aliases, action="store_true")
|
|
|
|
|
|
|
|
connectParser.add_argument("--cc")
|
|
|
|
parsedArgs = None
|
|
|
|
|
|
|
|
try:
|
|
|
|
parsedArgs = parser.parse_args(args)
|
|
|
|
except:
|
|
|
|
pass
|
|
|
|
|
2024-12-04 15:50:29 +00:00
|
|
|
if parsedArgs is not None and (
|
|
|
|
len(
|
|
|
|
list(
|
|
|
|
filter(
|
|
|
|
lambda item: item[1] not in [False, None],
|
|
|
|
vars(parsedArgs).items()))) > 1):
|
|
|
|
|
|
|
|
def match(server):
|
|
|
|
features = list()
|
|
|
|
|
|
|
|
if parsedArgs.streaming:
|
|
|
|
pass
|
|
|
|
if parsedArgs.sc:
|
|
|
|
pass
|
|
|
|
if parsedArgs.p2p:
|
|
|
|
pass
|
|
|
|
if parsedArgs.tor:
|
|
|
|
pass
|
|
|
|
|
|
|
|
return (parsedArgs.cc is None or server.exit_country.lower() == parsedArgs.cc.lower()) and (
|
|
|
|
all(feature in server.features for feature in features))
|
|
|
|
|
|
|
|
pull_server_data(force=True)
|
|
|
|
servers = list(filter(lambda server: match(server), get_servers()))
|
|
|
|
|
|
|
|
if len(servers) > 0:
|
|
|
|
if parsedArgs.fastest or not parsedArgs.random:
|
|
|
|
server = get_fastest_server(servers)
|
2024-11-10 11:33:36 +00:00
|
|
|
else:
|
2024-12-04 15:50:29 +00:00
|
|
|
server = choice(servers)
|
|
|
|
|
|
|
|
run_proton(["connect", server["Name"]])
|
2024-11-10 11:33:36 +00:00
|
|
|
else:
|
2024-12-04 15:50:29 +00:00
|
|
|
raise Exception(
|
|
|
|
f"Unable to find a server matching the specified criteria {args[1:]}!")
|
|
|
|
else:
|
|
|
|
run_proton(args)
|