Created
June 30, 2024 17:26
-
-
Save tiimk/56e88a6e5d47157dedf40e2761683cf1 to your computer and use it in GitHub Desktop.
Create configs and auto vpn swapping
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import http.client | |
import http.cookies | |
import json | |
import base64 | |
import hashlib | |
import os | |
import random | |
import argparse | |
import time | |
from cryptography.hazmat.primitives import serialization | |
from cryptography.hazmat.primitives.asymmetric import x25519 | |
from cryptography.hazmat.primitives.asymmetric import ec | |
from cryptography.hazmat.primitives import hashes | |
""" | |
Copyright - FuseTim 2024 | |
This code is dual-licensed under both the MIT License and the Apache License 2.0. | |
You may choose either license to govern your use of this code. | |
MIT License: | |
https://opensource.org/licenses/MIT | |
Apache License 2.0: | |
https://www.apache.org/licenses/LICENSE-2.0 | |
By contributing to this project, you agree that your contributions will be licensed under | |
both the MIT License and the Apache License 2.0. | |
""" | |
###################################################################################### | |
# Credentials (found in Headers and Cookies) | |
auth_server = "" # See `x-pm-uid` header | |
auth_token = "" # See `AUTH-<x-pm-uid>` cookie | |
session_id = "" # See `Session-Id` cookie | |
web_app_version = "[email protected]" # See `x-pm-appversion` header | |
# Settings | |
prefix = "Wire_AuTo" # Prefix is used for config file and name in ProtonVPN Dashboard | |
output_dir = "./" | |
selected_countries = ["US"] | |
selected_cities = ["Chicago", "New York", "Ashburn"] | |
selected_tier = 2 # 0 = Free, 2 = Plus | |
selected_features = ["P2P"] # Features that a server should have ("P2P", "TOR", "SecureCore", "XOR", etc) or not ("-P2P", etc) | |
max_servers = 30 # Maximum of generated config | |
max_servers_per_city = 10 # Maximum number of servers per city | |
listing_only = False # Do not generate config, just list available servers with previous selectors | |
config_features = { | |
"SafeMode": False, | |
"SplitTCP": True, | |
"PortForwarding": True, | |
"RandomNAT": False, | |
"NetShieldLevel": 0, # 0, 1 or 2 | |
} | |
###################################################################################### | |
# Constants | |
connection = http.client.HTTPSConnection("account.protonvpn.com") | |
C = http.cookies.SimpleCookie() | |
C["AUTH-" + auth_server] = auth_token | |
C["Session-Id"] = session_id | |
headers = { | |
"x-pm-appversion": web_app_version, | |
"x-pm-uid": auth_server, | |
"Accept": "application/vnd.protonmail.v1+json", | |
"Cookie": C.output(attrs=[], header="", sep="; ") | |
} | |
def generateKeys(): | |
"""Generate a client key-pair using the API. Could be generated offline but need more work...""" | |
print("Generating key-pair...") | |
connection.request("GET", "/api/vpn/v1/certificate/key/EC", headers=headers) | |
response = connection.getresponse() | |
print("Status: {} and reason: {}".format(response.status, response.reason)) | |
resp = json.loads(response.read().decode()) | |
priv = resp["PrivateKey"].split("\n")[1] | |
pub = resp["PublicKey"].split("\n")[1] | |
print("Key generated:") | |
print("priv:", priv) | |
print("pub:", pub) | |
return [resp["PrivateKey"], pub, priv] | |
def getPubPEM(priv): | |
"""Return the Public key as string without headers""" | |
return priv[1] | |
def getPrivPEM(priv): | |
"""Return the Private key as PKCS#8 without headers""" | |
return priv[2] | |
def getPrivx25519(priv): | |
"""Return the x25519 base64-encoded private key, to be used in Wireguard config.""" | |
hash__ = hashlib.sha512(base64.b64decode(priv[2])[-32:]).digest() | |
hash_ = list(hash__)[:32] | |
hash_[0] &= 0xf8 | |
hash_[31] &= 0x7f | |
hash_[31] |= 0x40 | |
new_priv = base64.b64encode(bytes(hash_)).decode() | |
return new_priv | |
def registerConfig(server, priv): | |
"""Register a Wireguard configuration and return its raw response.""" | |
h = headers.copy() | |
h["Content-Type"] = "application/json" | |
print("Registering Config for server", server["Name"], "...") | |
body = { | |
"ClientPublicKey": getPubPEM(priv), | |
"Mode": "persistent", | |
"DeviceName": prefix + "-" + server["Name"].replace("#", "-"), | |
"Features": { | |
"peerName": server["Name"], | |
"peerIp": server["Servers"][0]["EntryIP"], | |
"peerPublicKey": server["Servers"][0]["X25519PublicKey"], | |
"platform": "Windows", | |
"SafeMode": config_features["SafeMode"], | |
"SplitTCP": config_features["SplitTCP"], | |
"PortForwarding": config_features["PortForwarding"] if server["Features"] & 4 == 4 else False, | |
"RandomNAT": config_features["RandomNAT"], | |
"NetShieldLevel": config_features["NetShieldLevel"], # 0, 1 or 2 | |
} | |
} | |
print("Request body for new config:", json.dumps(body, indent=4)) # Log the request body | |
connection.request("POST", "/api/vpn/v1/certificate", body=json.dumps(body), headers=h) | |
response = connection.getresponse() | |
print("Status: {} and reason: {}".format(response.status, response.reason)) | |
resp = json.loads(response.read().decode()) | |
print(resp) | |
return resp | |
def generateConfig(priv, register): | |
"""Generate a Wireguard config using the ProtonVPN API answer.""" | |
conf = """[Interface] | |
# Key for {prefix} | |
PrivateKey = {priv} | |
Address = 10.2.0.2/32 | |
DNS = 192.168.1.254 | |
[Peer] | |
# {server_name} | |
PublicKey = {server_pub} | |
AllowedIPs = 0.0.0.0/1, 128.0.0.0/1 | |
Endpoint = {server_endpoint}:51820 | |
""".format(prefix=prefix, priv=getPrivx25519(priv), server_name=register["Features"]["peerName"], | |
server_pub=register["Features"]["peerPublicKey"], server_endpoint=register["Features"]["peerIp"]) | |
return conf | |
def write_config_to_disk(name, conf): | |
with open(output_dir + "/" + name + ".conf", "w") as f: | |
f.write(conf) | |
def read_conf_file(file_path): | |
"""Read the Wireguard configuration file and extract details.""" | |
with open(file_path, 'r') as f: | |
lines = f.readlines() | |
config_data = {} | |
for line in lines: | |
if line.startswith("PrivateKey = "): | |
config_data["PrivateKey"] = line.split(" = ")[1].strip() | |
elif line.startswith("PublicKey = "): | |
config_data["PublicKey"] = line.split(" = ")[1].strip() | |
elif line.startswith("Endpoint = "): | |
config_data["Endpoint"] = line.split(" = ")[1].strip().split(":")[0] | |
return config_data | |
def renew_certificate_from_conf(device_name, cert): | |
"""Renew a Wireguard configuration by generating a new key pair.""" | |
h = headers.copy() | |
h["Content-Type"] = "application/json" | |
print("Renewing Config for server", device_name, "...") | |
new_keys = generateKeys() # Generate a new key pair | |
body = { | |
"ClientPublicKey": new_keys[1], | |
"Mode": "persistent", | |
"DeviceName": device_name, | |
"Features": { | |
"peerName": cert["Features"]["peerName"], | |
"peerIp": cert["Features"]["peerIp"], | |
"peerPublicKey": cert["Features"]["peerPublicKey"], | |
"platform": cert["Features"]["platform"], | |
"SafeMode": config_features["SafeMode"], | |
"SplitTCP": config_features["SplitTCP"], | |
"PortForwarding": config_features["PortForwarding"], | |
"RandomNAT": config_features["RandomNAT"], | |
"NetShieldLevel": config_features["NetShieldLevel"], | |
}, | |
"Renew": True | |
} | |
print("Request body for renewing config:", json.dumps(body, indent=4)) # Log the request body | |
connection.request("POST", "/api/vpn/v1/certificate", body=json.dumps(body), headers=h) | |
response = connection.getresponse() | |
print("Status: {} and reason: {}".format(response.status, response.reason)) | |
resp = json.loads(response.read().decode()) | |
print(resp) | |
return resp, new_keys | |
def parse_arguments(): | |
parser = argparse.ArgumentParser(description='ProtonVPN configuration script') | |
parser.add_argument('-extend', action='store_true', help='Renew existing certificates') | |
return parser.parse_args() | |
def fetch_existing_certificates(): | |
"""Fetch existing certificates from ProtonVPN account""" | |
h = headers.copy() | |
h["Content-Type"] = "application/json" | |
connection.request("GET", "/api/vpn/v1/certificate/all?Mode=persistent&Offset=0&Limit=51", headers=h) | |
response = connection.getresponse() | |
print("Status: {} and reason: {}".format(response.status, response.reason)) | |
resp = json.loads(response.read().decode()) | |
print("Fetched existing certificates:", resp) | |
return resp["Certificates"] | |
def get_existing_config_names(): | |
"""Get the list of existing configuration names without the file extension.""" | |
return [f.split(".")[0] for f in os.listdir(output_dir) if f.endswith(".conf")] | |
# VPN Listings | |
connection.request("GET", "/api/vpn/logicals", headers=headers) | |
response = connection.getresponse() | |
print("Status: {} and reason: {}".format(response.status, response.reason)) | |
servers = json.loads(response.read().decode())["LogicalServers"] | |
# Create a dictionary to track the number of servers per city | |
servers_per_city = {city: 0 for city in selected_cities} | |
# Collect eligible servers first | |
eligible_servers = [] | |
for s in servers: | |
feat = [ | |
"SecureCore" if s["Features"] & 1 == 1 else "-SecureCore", | |
"TOR" if s["Features"] & 2 == 2 else "-TOR", | |
"P2P" if s["Features"] & 4 == 4 else "-P2P", | |
"XOR" if s["Features"] & 8 == 8 else "-XOR", | |
"IPv6" if s["Features"] & 16 == 16 else "-IPv6" | |
] | |
if (not s["EntryCountry"] in selected_countries and not s["ExitCountry"] in selected_countries) or s["Tier"] != selected_tier: | |
continue | |
if s["City"] not in selected_cities: | |
continue | |
if len(list(filter(lambda sf: not (sf in feat), selected_features))) > 0: | |
continue | |
eligible_servers.append(s) | |
# Remove servers for which configurations already exist | |
existing_configs = get_existing_config_names() | |
eligible_servers = [s for s in eligible_servers if prefix + "-" + s["Name"].replace("#", "-") not in existing_configs] | |
# Shuffle the eligible servers to ensure randomness | |
random.shuffle(eligible_servers) | |
if __name__ == "__main__": | |
args = parse_arguments() | |
if args.extend: | |
print("Renewing existing certificates...") | |
existing_certificates = fetch_existing_certificates() | |
for cert in existing_certificates: | |
device_name = cert["DeviceName"].replace("#", "-") | |
if device_name in existing_configs: | |
renewed_config, new_keys = renew_certificate_from_conf(device_name, cert) | |
config = generateConfig(new_keys, renewed_config) | |
write_config_to_disk(device_name, config) | |
time.sleep(60) | |
else: | |
print("Generating new configurations...") | |
total_configs = sum(servers_per_city.values()) | |
for s in eligible_servers: | |
if total_configs >= max_servers: | |
break | |
device_name = prefix + "-" + s["Name"].replace("#", "-") | |
if servers_per_city[s["City"]] >= max_servers_per_city: | |
continue | |
servers_per_city[s["City"]] += 1 | |
total_configs += 1 | |
keys = generateKeys() | |
reg = registerConfig(s, keys) | |
config = generateConfig(keys, reg) | |
write_config_to_disk(device_name, config) | |
time.sleep(60) | |
connection.close() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import json | |
import time | |
import http.client | |
import subprocess | |
import argparse | |
# Configuration | |
config_dir = "D:\prot" # Directory where your config files are stored | |
wireguard_cmd = "wireguard /installtunnelservice" # Command to connect to a WireGuard tunnel | |
wireguard_uninstall_cmd = "wireguard /uninstalltunnelservice" # Command to disconnect a WireGuard tunnel | |
def fetch_logical_servers(): | |
"""Fetch logical server information from the ProtonVPN public API.""" | |
connection = http.client.HTTPSConnection("account.proton.me") | |
connection.request("GET", "/api/vpn/logicals") | |
response = connection.getresponse() | |
if response.status != 200: | |
raise Exception(f"Failed to fetch logical servers: {response.status} {response.reason}") | |
servers = json.loads(response.read().decode())["LogicalServers"] | |
return servers | |
def get_existing_configs(): | |
"""Get the list of existing configuration names without the file extension.""" | |
return [f.split(".")[0] for f in os.listdir(config_dir) if f.endswith(".conf")] | |
def extract_server_name(config_name): | |
"""Extract the relevant part of the configuration name to match it with the server names.""" | |
parts = config_name.split('-') | |
server_number = parts[-1] # Get the last part after the last hyphen | |
return f"{parts[-3]}-{parts[-2]}#{server_number}" # Construct the server name in the format US-IL#55 | |
def find_best_servers(servers, existing_configs, location=None): | |
"""Find the best and second best servers based on load and existing configurations.""" | |
best_server = None | |
second_best_server = None | |
lowest_load = float('inf') | |
second_lowest_load = float('inf') | |
for server in servers: | |
if location and not server['Name'].startswith(location): | |
continue | |
for config in existing_configs: | |
config_server_name = extract_server_name(config) | |
if server['Name'] == config_server_name: | |
if server["Load"] < lowest_load: | |
second_best_server = best_server | |
second_lowest_load = lowest_load | |
best_server = server | |
lowest_load = server["Load"] | |
elif server["Load"] < second_lowest_load: | |
second_best_server = server | |
second_lowest_load = server["Load"] | |
return best_server, second_best_server | |
def check_current_connection(): | |
"""Check if there is a current WireGuard connection.""" | |
command = "powershell -Command \"Get-NetAdapter | Where-Object {$_.InterfaceDescription -like '*WireGuard*'} | Select-Object -ExpandProperty Name\"" | |
result = subprocess.run(command, shell=True, capture_output=True, text=True) | |
adapters = result.stdout.splitlines() | |
if adapters: | |
tunnel_name = adapters[0] | |
return tunnel_name, adapters | |
return None, adapters | |
def disconnect_current_connection(tunnel_name): | |
"""Disconnect the current WireGuard connection.""" | |
command = f"{wireguard_uninstall_cmd} \"{tunnel_name}\"" | |
print(f"Disconnecting from {tunnel_name} with command: {command}") | |
try: | |
subprocess.run(command, shell=True, check=True) | |
print(f"Disconnected from {tunnel_name} successfully.") | |
# Wait for the adapter to be removed | |
time.sleep(1) | |
except subprocess.CalledProcessError as e: | |
print(f"Failed to disconnect: {e}") | |
def connect_to_server(server): | |
"""Connect to the server using WireGuard.""" | |
config_name = server['Name'].replace('#', '-') | |
config_path = os.path.join(config_dir, f"Wire_AuTo-{config_name}.conf") | |
command = f"{wireguard_cmd} \"{config_path}\"" | |
print(f"Connecting to {server['Name']} with load {server['Load']} using config {config_path}...") | |
print(f"Running command: {command}") | |
try: | |
subprocess.run(command, shell=True, check=True) | |
print("Connected successfully.") | |
except subprocess.CalledProcessError as e: | |
print(f"Failed to connect: {e}") | |
def main(location): | |
servers = fetch_logical_servers() | |
existing_configs = get_existing_configs() | |
# Create a dictionary to map server names to their loads | |
server_loads = {server['Name']: server['Load'] for server in servers} | |
print("\nExisting configurations:") | |
for config in existing_configs: | |
config_server_name = extract_server_name(config) | |
load = server_loads.get(config_server_name, "Unknown") | |
print(f"{config}: Load {load}") | |
# Check current connection and print adapters | |
current_tunnel, adapters = check_current_connection() | |
print("\nNetwork adapters:") | |
for adapter in adapters: | |
print(adapter) | |
print("\nChecking for matches:") | |
best_server, second_best_server = find_best_servers(servers, existing_configs, location) | |
if current_tunnel: | |
current_server_name = extract_server_name(current_tunnel) | |
print(f"Currently connected to: {current_server_name}") | |
if current_server_name == best_server['Name']: | |
print(f"Already connected to the best server: {best_server['Name']} with load {best_server['Load']}") | |
if second_best_server: | |
print(f"Connecting to the second best server: {second_best_server['Name']} with load {second_best_server['Load']}") | |
disconnect_current_connection(current_tunnel) | |
connect_to_server(second_best_server) | |
else: | |
print(f"Disconnecting from current server {current_server_name} and connecting to the best server {best_server['Name']} with load {best_server['Load']}") | |
disconnect_current_connection(current_tunnel) | |
connect_to_server(best_server) | |
else: | |
if best_server: | |
connect_to_server(best_server) | |
else: | |
print("No suitable server found.") | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser(description="Connect to the best ProtonVPN server based on load.") | |
parser.add_argument('-location', type=str, help="Preferred server location (e.g., US-NY)") | |
args = parser.parse_args() | |
main(args.location) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment