Files
elmprodvpn/selective-vpn-gui/main_window/singbox/links_helpers_mixin.py

338 lines
12 KiB
Python

from __future__ import annotations
import base64
import binascii
import json
import re
from urllib.parse import unquote
from typing import Any
from main_window.constants import SINGBOX_EDITOR_PROTOCOL_IDS, SINGBOX_PROTOCOL_SEED_SPEC
class SingBoxLinksHelpersMixin:
def _slugify_connection_id(self, text: str) -> str:
raw = str(text or "").strip().lower()
raw = re.sub(r"[^a-z0-9]+", "-", raw)
raw = re.sub(r"-{2,}", "-", raw).strip("-")
if not raw:
raw = "connection"
if not raw.startswith("sg-"):
raw = f"sg-{raw}"
return raw
def _next_free_transport_client_id(self, base_hint: str) -> str:
base = self._slugify_connection_id(base_hint)
existing = {str(getattr(c, "id", "") or "").strip() for c in (self._transport_clients or [])}
if base not in existing:
return base
i = 2
while True:
cid = f"{base}-{i}"
if cid not in existing:
return cid
i += 1
def _template_singbox_client(self):
selected = self._selected_transport_client()
if selected is not None and str(getattr(selected, "kind", "") or "").strip().lower() == "singbox":
return selected
for c in self._transport_clients or []:
if str(getattr(c, "kind", "") or "").strip().lower() == "singbox":
return c
return None
def _default_new_singbox_client_config(self, client_id: str, *, protocol: str = "vless") -> dict[str, Any]:
cfg: dict[str, Any] = {}
tpl = self._template_singbox_client()
if tpl is not None:
src_cfg = getattr(tpl, "config", {}) or {}
if isinstance(src_cfg, dict):
for key in (
"runner",
"runtime_mode",
"require_binary",
"exec_start",
"singbox_bin",
"packaging_profile",
"packaging_system_fallback",
"bin_root",
"hardening_enabled",
"hardening_profile",
"restart",
"restart_sec",
"watchdog_sec",
"start_limit_interval_sec",
"start_limit_burst",
"timeout_start_sec",
"timeout_stop_sec",
"bootstrap_bypass_strict",
"netns_enabled",
"netns_name",
"netns_auto_cleanup",
"netns_setup_strict",
"singbox_dns_migrate_legacy",
"singbox_dns_migrate_strict",
):
if key in src_cfg:
cfg[key] = json.loads(json.dumps(src_cfg.get(key)))
cid = str(client_id or "").strip()
if not cid:
return cfg
for key in ("profile", "profile_id", "singbox_profile_id"):
cfg.pop(key, None)
config_path = f"/etc/selective-vpn/transports/{cid}/singbox.json"
cfg["config_path"] = config_path
cfg["singbox_config_path"] = config_path
runner = str(cfg.get("runner") or "").strip().lower()
if not runner:
cfg["runner"] = "systemd"
runner = "systemd"
if runner == "systemd":
cfg["unit"] = "singbox@.service"
if "runtime_mode" not in cfg:
cfg["runtime_mode"] = "exec"
if "require_binary" not in cfg:
cfg["require_binary"] = True
cfg["profile_id"] = cid
cfg["protocol"] = self._normalized_seed_protocol(protocol)
return cfg
def _normalized_seed_protocol(self, protocol: str) -> str:
proto = str(protocol or "vless").strip().lower() or "vless"
if proto not in SINGBOX_EDITOR_PROTOCOL_IDS:
proto = "vless"
return proto
def _protocol_seed_spec(self, protocol: str) -> dict[str, Any]:
proto = self._normalized_seed_protocol(protocol)
spec = SINGBOX_PROTOCOL_SEED_SPEC.get(proto) or SINGBOX_PROTOCOL_SEED_SPEC.get("vless") or {}
if not isinstance(spec, dict):
spec = {}
return dict(spec)
def _seed_editor_values_for_protocol(self, protocol: str, *, profile_name: str = "") -> dict[str, Any]:
proto = self._normalized_seed_protocol(protocol)
spec = self._protocol_seed_spec(proto)
security = str(spec.get("security") or "none").strip().lower() or "none"
port = int(spec.get("port") or (51820 if proto == "wireguard" else 443))
return {
"profile_name": str(profile_name or "").strip(),
"enabled": True,
"protocol": proto,
"server": "",
"port": port,
"uuid": "",
"password": "",
"flow": "",
"packet_encoding": "",
"transport": "tcp",
"path": "",
"grpc_service": "",
"security": security,
"sni": "",
"utls_fp": "",
"reality_public_key": "",
"reality_short_id": "",
"tls_insecure": False,
"sniff": True,
"ss_method": "aes-128-gcm",
"ss_plugin": "",
"hy2_up_mbps": 0,
"hy2_down_mbps": 0,
"hy2_obfs": "",
"hy2_obfs_password": "",
"tuic_congestion": "",
"tuic_udp_mode": "",
"tuic_zero_rtt": False,
"wg_private_key": "",
"wg_peer_public_key": "",
"wg_psk": "",
"wg_local_address": "",
"wg_reserved": "",
"wg_mtu": 0,
}
def _seed_raw_config_for_protocol(self, protocol: str) -> dict[str, Any]:
proto = self._normalized_seed_protocol(protocol)
spec = self._protocol_seed_spec(proto)
port = int(spec.get("port") or (51820 if proto == "wireguard" else 443))
proxy: dict[str, Any] = {
"type": proto,
"tag": "proxy",
"server": "",
"server_port": port,
}
proxy_defaults = spec.get("proxy_defaults") or {}
if isinstance(proxy_defaults, dict):
for key, value in proxy_defaults.items():
proxy[key] = json.loads(json.dumps(value))
tls_security = str(spec.get("tls_security") or "").strip().lower()
if tls_security in ("tls", "reality"):
self._apply_proxy_tls(proxy, security=tls_security)
return self._build_singbox_raw_config_from_proxy(proxy, sniff=True)
def _parse_wg_reserved_values(self, raw_values: list[str], *, strict: bool) -> list[int]:
vals = [str(x).strip() for x in list(raw_values or []) if str(x).strip()]
if len(vals) > 3:
if strict:
raise RuntimeError("WG reserved accepts up to 3 values (0..255)")
vals = vals[:3]
out: list[int] = []
for token in vals:
try:
num = int(token)
except Exception:
if strict:
raise RuntimeError(f"WG reserved value '{token}' is not an integer")
continue
if num < 0 or num > 255:
if strict:
raise RuntimeError(f"WG reserved value '{token}' must be in range 0..255")
continue
out.append(num)
return out
def _query_value(self, query: dict[str, list[str]], *keys: str) -> str:
for k in keys:
vals = query.get(str(k or "").strip())
if not vals:
continue
v = str(vals[0] or "").strip()
if v:
return unquote(v)
return ""
def _query_bool(self, query: dict[str, list[str]], *keys: str) -> bool:
v = self._query_value(query, *keys).strip().lower()
return v in ("1", "true", "yes", "on")
def _query_csv(self, query: dict[str, list[str]], *keys: str) -> list[str]:
raw = self._query_value(query, *keys)
if not raw:
return []
out: list[str] = []
for p in raw.split(","):
val = str(p or "").strip()
if val:
out.append(val)
return out
def _normalize_link_transport(self, value: str) -> str:
v = str(value or "").strip().lower() or "tcp"
if v == "raw":
v = "tcp"
if v in ("h2", "http2"):
v = "http"
if v not in ("tcp", "ws", "grpc", "http", "httpupgrade", "quic"):
v = "tcp"
return v
def _b64_urlsafe_decode(self, value: str) -> str:
raw = str(value or "").strip()
if not raw:
return ""
pad = "=" * ((4 - (len(raw) % 4)) % 4)
try:
data = base64.urlsafe_b64decode((raw + pad).encode("utf-8"))
return data.decode("utf-8", errors="replace")
except (binascii.Error, ValueError):
return ""
def _apply_proxy_transport(
self,
proxy: dict[str, Any],
*,
transport: str,
path: str = "",
grpc_service: str = "",
) -> None:
t = self._normalize_link_transport(transport)
if t in ("", "tcp"):
proxy.pop("transport", None)
return
tx: dict[str, Any] = {"type": t}
if t in ("ws", "http", "httpupgrade"):
tx["path"] = str(path or "/").strip() or "/"
if t == "grpc":
tx["service_name"] = str(grpc_service or "").strip()
proxy["transport"] = tx
def _apply_proxy_tls(
self,
proxy: dict[str, Any],
*,
security: str,
sni: str = "",
utls_fp: str = "",
tls_insecure: bool = False,
reality_public_key: str = "",
reality_short_id: str = "",
alpn: list[str] | None = None,
) -> None:
sec = str(security or "").strip().lower()
if sec not in ("none", "tls", "reality"):
sec = "none"
if sec == "none":
proxy.pop("tls", None)
return
tls: dict[str, Any] = {
"enabled": True,
"insecure": bool(tls_insecure),
}
if str(sni or "").strip():
tls["server_name"] = str(sni).strip()
if str(utls_fp or "").strip():
tls["utls"] = {"enabled": True, "fingerprint": str(utls_fp).strip().lower()}
alpn_vals = [str(x).strip() for x in list(alpn or []) if str(x).strip()]
if alpn_vals:
tls["alpn"] = alpn_vals
if sec == "reality":
reality: dict[str, Any] = {
"enabled": True,
"public_key": str(reality_public_key or "").strip(),
}
sid = str(reality_short_id or "").strip()
if sid:
reality["short_id"] = sid
tls["reality"] = reality
proxy["tls"] = tls
def _build_singbox_raw_config_from_proxy(
self,
proxy: dict[str, Any],
*,
sniff: bool = True,
) -> dict[str, Any]:
return {
"inbounds": [
{
"type": "socks",
"tag": "socks-in",
"listen": "127.0.0.1",
"listen_port": 10808,
"sniff": bool(sniff),
"sniff_override_destination": bool(sniff),
}
],
"outbounds": [
proxy,
{"type": "direct", "tag": "direct"},
],
"route": {
"final": "direct",
"rules": [
{"inbound": ["socks-in"], "outbound": "proxy"},
],
},
}