Files
elmprodvpn/selective-vpn-gui/main_window/singbox/links_parsers_mixin.py

392 lines
15 KiB
Python

from __future__ import annotations
import re
from urllib.parse import parse_qs, unquote, urlsplit
from typing import Any
class SingBoxLinksParsersMixin:
def _parse_vless_link_payload(self, link: str) -> dict[str, Any]:
u = urlsplit(link)
query = parse_qs(u.query or "", keep_blank_values=True)
uuid = unquote(str(u.username or "").strip())
host = str(u.hostname or "").strip()
if not uuid:
raise RuntimeError("VLESS link has no UUID")
if not host:
raise RuntimeError("VLESS link has no host")
try:
port = int(u.port or 443)
except Exception:
port = 443
transport = self._normalize_link_transport(self._query_value(query, "type", "transport"))
security = self._query_value(query, "security").strip().lower() or "none"
if security == "xtls":
security = "tls"
if security not in ("none", "tls", "reality"):
security = "none"
path = self._query_value(query, "path", "spx")
if not path and str(u.path or "").strip() not in ("", "/"):
path = unquote(str(u.path or "").strip())
grpc_service = self._query_value(query, "serviceName", "service_name")
if transport == "grpc" and not grpc_service:
grpc_service = self._query_value(query, "path")
flow = self._query_value(query, "flow")
packet_encoding = self._query_value(query, "packetEncoding", "packet_encoding").strip().lower()
if packet_encoding in ("none", "off", "false"):
packet_encoding = ""
sni = self._query_value(query, "sni", "host")
utls_fp = self._query_value(query, "fp", "fingerprint")
reality_pk = self._query_value(query, "pbk", "public_key")
reality_sid = self._query_value(query, "sid", "short_id")
tls_insecure = self._query_bool(query, "allowInsecure", "insecure")
profile_name = unquote(str(u.fragment or "").strip()) or host
proxy: dict[str, Any] = {
"type": "vless",
"tag": "proxy",
"server": host,
"server_port": port,
"uuid": uuid,
}
if packet_encoding:
proxy["packet_encoding"] = packet_encoding
if flow:
proxy["flow"] = flow
self._apply_proxy_transport(proxy, transport=transport, path=path, grpc_service=grpc_service)
self._apply_proxy_tls(
proxy,
security=security,
sni=sni,
utls_fp=utls_fp,
tls_insecure=tls_insecure,
reality_public_key=reality_pk,
reality_short_id=reality_sid,
)
editor_values = {
"profile_name": profile_name,
"enabled": True,
"server": host,
"port": port,
"uuid": uuid,
"flow": flow,
"packet_encoding": packet_encoding,
"transport": transport,
"path": path,
"grpc_service": grpc_service,
"security": security,
"sni": sni,
"utls_fp": utls_fp,
"reality_public_key": reality_pk,
"reality_short_id": reality_sid,
"tls_insecure": tls_insecure,
"sniff": True,
}
return {
"protocol": "vless",
"profile_name": profile_name,
"raw_config": self._build_singbox_raw_config_from_proxy(proxy, sniff=True),
"editor_values": editor_values,
}
def _parse_trojan_link_payload(self, link: str) -> dict[str, Any]:
u = urlsplit(link)
query = parse_qs(u.query or "", keep_blank_values=True)
password = unquote(str(u.username or "").strip()) or self._query_value(query, "password")
host = str(u.hostname or "").strip()
if not password:
raise RuntimeError("Trojan link has no password")
if not host:
raise RuntimeError("Trojan link has no host")
try:
port = int(u.port or 443)
except Exception:
port = 443
transport = self._normalize_link_transport(self._query_value(query, "type", "transport"))
path = self._query_value(query, "path")
grpc_service = self._query_value(query, "serviceName", "service_name")
security = self._query_value(query, "security").strip().lower() or "tls"
if security not in ("none", "tls"):
security = "tls"
sni = self._query_value(query, "sni", "host")
utls_fp = self._query_value(query, "fp", "fingerprint")
tls_insecure = self._query_bool(query, "allowInsecure", "insecure")
alpn = self._query_csv(query, "alpn")
profile_name = unquote(str(u.fragment or "").strip()) or host
proxy: dict[str, Any] = {
"type": "trojan",
"tag": "proxy",
"server": host,
"server_port": port,
"password": password,
}
self._apply_proxy_transport(proxy, transport=transport, path=path, grpc_service=grpc_service)
self._apply_proxy_tls(
proxy,
security=security,
sni=sni,
utls_fp=utls_fp,
tls_insecure=tls_insecure,
alpn=alpn,
)
return {
"protocol": "trojan",
"profile_name": profile_name,
"raw_config": self._build_singbox_raw_config_from_proxy(proxy, sniff=True),
}
def _parse_ss_link_payload(self, link: str) -> dict[str, Any]:
raw = str(link or "").strip()
u = urlsplit(raw)
query = parse_qs(u.query or "", keep_blank_values=True)
profile_name = unquote(str(u.fragment or "").strip()) or "Shadowsocks"
body = raw[len("ss://"):]
body = body.split("#", 1)[0]
body = body.split("?", 1)[0]
method = ""
password = ""
host_port = ""
if "@" in body:
left, host_port = body.rsplit("@", 1)
creds = left
if ":" not in creds:
creds = self._b64_urlsafe_decode(creds)
if ":" not in creds:
raise RuntimeError("Shadowsocks link has invalid credentials")
method, password = creds.split(":", 1)
else:
decoded = self._b64_urlsafe_decode(body)
if "@" not in decoded:
raise RuntimeError("Shadowsocks link has invalid payload")
creds, host_port = decoded.rsplit("@", 1)
if ":" not in creds:
raise RuntimeError("Shadowsocks link has invalid credentials")
method, password = creds.split(":", 1)
hp = urlsplit("//" + host_port)
host = str(hp.hostname or "").strip()
if not host:
raise RuntimeError("Shadowsocks link has no host")
try:
port = int(hp.port or 8388)
except Exception:
port = 8388
proxy: dict[str, Any] = {
"type": "shadowsocks",
"tag": "proxy",
"server": host,
"server_port": port,
"method": str(method or "").strip(),
"password": str(password or "").strip(),
}
plugin = self._query_value(query, "plugin")
if plugin:
proxy["plugin"] = plugin
return {
"protocol": "shadowsocks",
"profile_name": profile_name,
"raw_config": self._build_singbox_raw_config_from_proxy(proxy, sniff=True),
}
def _parse_hysteria2_link_payload(self, link: str) -> dict[str, Any]:
u = urlsplit(link)
query = parse_qs(u.query or "", keep_blank_values=True)
password = unquote(str(u.username or "").strip()) or self._query_value(query, "password")
host = str(u.hostname or "").strip()
if not password:
raise RuntimeError("Hysteria2 link has no password")
if not host:
raise RuntimeError("Hysteria2 link has no host")
try:
port = int(u.port or 443)
except Exception:
port = 443
profile_name = unquote(str(u.fragment or "").strip()) or host
proxy: dict[str, Any] = {
"type": "hysteria2",
"tag": "proxy",
"server": host,
"server_port": port,
"password": password,
}
up_mbps = self._query_value(query, "up_mbps", "upmbps", "up")
down_mbps = self._query_value(query, "down_mbps", "downmbps", "down")
try:
if up_mbps:
proxy["up_mbps"] = int(float(up_mbps))
except Exception:
pass
try:
if down_mbps:
proxy["down_mbps"] = int(float(down_mbps))
except Exception:
pass
obfs_type = self._query_value(query, "obfs")
if obfs_type:
obfs: dict[str, Any] = {"type": obfs_type}
obfs_pw = self._query_value(query, "obfs-password", "obfs_password")
if obfs_pw:
obfs["password"] = obfs_pw
proxy["obfs"] = obfs
self._apply_proxy_tls(
proxy,
security="tls",
sni=self._query_value(query, "sni"),
tls_insecure=self._query_bool(query, "allowInsecure", "insecure"),
alpn=self._query_csv(query, "alpn"),
)
return {
"protocol": "hysteria2",
"profile_name": profile_name,
"raw_config": self._build_singbox_raw_config_from_proxy(proxy, sniff=True),
}
def _parse_tuic_link_payload(self, link: str) -> dict[str, Any]:
u = urlsplit(link)
query = parse_qs(u.query or "", keep_blank_values=True)
uuid = unquote(str(u.username or "").strip())
password = unquote(str(u.password or "").strip())
host = str(u.hostname or "").strip()
if not uuid:
raise RuntimeError("TUIC link has no UUID")
if not password:
raise RuntimeError("TUIC link has no password")
if not host:
raise RuntimeError("TUIC link has no host")
try:
port = int(u.port or 443)
except Exception:
port = 443
profile_name = unquote(str(u.fragment or "").strip()) or host
proxy: dict[str, Any] = {
"type": "tuic",
"tag": "proxy",
"server": host,
"server_port": port,
"uuid": uuid,
"password": password,
}
cc = self._query_value(query, "congestion_control", "congestion")
if cc:
proxy["congestion_control"] = cc
udp_mode = self._query_value(query, "udp_relay_mode")
if udp_mode:
proxy["udp_relay_mode"] = udp_mode
if self._query_bool(query, "zero_rtt_handshake", "zero_rtt"):
proxy["zero_rtt_handshake"] = True
self._apply_proxy_tls(
proxy,
security="tls",
sni=self._query_value(query, "sni", "host"),
utls_fp=self._query_value(query, "fp", "fingerprint"),
tls_insecure=self._query_bool(query, "allowInsecure", "insecure"),
alpn=self._query_csv(query, "alpn"),
)
return {
"protocol": "tuic",
"profile_name": profile_name,
"raw_config": self._build_singbox_raw_config_from_proxy(proxy, sniff=True),
}
def _parse_wireguard_link_payload(self, link: str) -> dict[str, Any]:
u = urlsplit(link)
query = parse_qs(u.query or "", keep_blank_values=True)
private_key = unquote(str(u.username or "").strip()) or self._query_value(query, "private_key", "privateKey")
host = str(u.hostname or "").strip()
if not host:
raise RuntimeError("WireGuard link has no host")
if not private_key:
raise RuntimeError("WireGuard link has no private key")
try:
port = int(u.port or 443)
except Exception:
port = 443
peer_public_key = self._query_value(query, "peer_public_key", "public_key", "peerPublicKey")
if not peer_public_key:
raise RuntimeError("WireGuard link has no peer public key")
local_address = self._query_csv(query, "local_address", "address", "localAddress")
if not local_address:
raise RuntimeError("WireGuard link has no local address")
profile_name = unquote(str(u.fragment or "").strip()) or host
proxy: dict[str, Any] = {
"type": "wireguard",
"tag": "proxy",
"server": host,
"server_port": port,
"private_key": private_key,
"peer_public_key": peer_public_key,
"local_address": local_address,
}
psk = self._query_value(query, "pre_shared_key", "psk", "preSharedKey")
if psk:
proxy["pre_shared_key"] = psk
reserved_vals = self._parse_wg_reserved_values(self._query_csv(query, "reserved"), strict=True)
if reserved_vals:
proxy["reserved"] = reserved_vals
mtu_val = self._query_value(query, "mtu")
try:
mtu = int(mtu_val) if mtu_val else 0
except Exception:
mtu = 0
if mtu > 0:
proxy["mtu"] = mtu
return {
"protocol": "wireguard",
"profile_name": profile_name,
"raw_config": self._build_singbox_raw_config_from_proxy(proxy, sniff=True),
}
def _extract_first_connection_link(self, text: str) -> str:
raw = str(text or "").strip()
if not raw:
return ""
m = re.search(r"(?i)(vless|trojan|ss|hysteria2|hy2|tuic|wireguard|wg)://\S+", raw)
if m:
return str(m.group(0) or "").strip()
if "://" in raw:
return raw.splitlines()[0].strip()
return ""
def _parse_connection_link_payload(self, text: str) -> dict[str, Any]:
raw = self._extract_first_connection_link(text)
if not raw:
raise RuntimeError(
"No supported link found. Supported schemes: "
"vless:// trojan:// ss:// hysteria2:// hy2:// tuic:// wireguard:// wg://"
)
u = urlsplit(raw)
scheme = str(u.scheme or "").strip().lower()
if scheme == "vless":
return self._parse_vless_link_payload(raw)
if scheme == "trojan":
return self._parse_trojan_link_payload(raw)
if scheme == "ss":
return self._parse_ss_link_payload(raw)
if scheme in ("hysteria2", "hy2"):
return self._parse_hysteria2_link_payload(raw)
if scheme == "tuic":
return self._parse_tuic_link_payload(raw)
if scheme in ("wireguard", "wg"):
return self._parse_wireguard_link_payload(raw)
raise RuntimeError(f"Unsupported link scheme: {scheme}")