338 lines
12 KiB
Python
338 lines
12 KiB
Python
from __future__ import annotations
|
|
|
|
import base64
|
|
import binascii
|
|
import json
|
|
import re
|
|
from urllib.parse import unquote
|
|
from typing import Any
|
|
|
|
from main_window.constants import SINGBOX_EDITOR_PROTOCOL_IDS, SINGBOX_PROTOCOL_SEED_SPEC
|
|
|
|
|
|
class SingBoxLinksHelpersMixin:
|
|
def _slugify_connection_id(self, text: str) -> str:
|
|
raw = str(text or "").strip().lower()
|
|
raw = re.sub(r"[^a-z0-9]+", "-", raw)
|
|
raw = re.sub(r"-{2,}", "-", raw).strip("-")
|
|
if not raw:
|
|
raw = "connection"
|
|
if not raw.startswith("sg-"):
|
|
raw = f"sg-{raw}"
|
|
return raw
|
|
|
|
def _next_free_transport_client_id(self, base_hint: str) -> str:
|
|
base = self._slugify_connection_id(base_hint)
|
|
existing = {str(getattr(c, "id", "") or "").strip() for c in (self._transport_clients or [])}
|
|
if base not in existing:
|
|
return base
|
|
i = 2
|
|
while True:
|
|
cid = f"{base}-{i}"
|
|
if cid not in existing:
|
|
return cid
|
|
i += 1
|
|
|
|
def _template_singbox_client(self):
|
|
selected = self._selected_transport_client()
|
|
if selected is not None and str(getattr(selected, "kind", "") or "").strip().lower() == "singbox":
|
|
return selected
|
|
for c in self._transport_clients or []:
|
|
if str(getattr(c, "kind", "") or "").strip().lower() == "singbox":
|
|
return c
|
|
return None
|
|
|
|
def _default_new_singbox_client_config(self, client_id: str, *, protocol: str = "vless") -> dict[str, Any]:
|
|
cfg: dict[str, Any] = {}
|
|
tpl = self._template_singbox_client()
|
|
if tpl is not None:
|
|
src_cfg = getattr(tpl, "config", {}) or {}
|
|
if isinstance(src_cfg, dict):
|
|
for key in (
|
|
"runner",
|
|
"runtime_mode",
|
|
"require_binary",
|
|
"exec_start",
|
|
"singbox_bin",
|
|
"packaging_profile",
|
|
"packaging_system_fallback",
|
|
"bin_root",
|
|
"hardening_enabled",
|
|
"hardening_profile",
|
|
"restart",
|
|
"restart_sec",
|
|
"watchdog_sec",
|
|
"start_limit_interval_sec",
|
|
"start_limit_burst",
|
|
"timeout_start_sec",
|
|
"timeout_stop_sec",
|
|
"bootstrap_bypass_strict",
|
|
"netns_enabled",
|
|
"netns_name",
|
|
"netns_auto_cleanup",
|
|
"netns_setup_strict",
|
|
"singbox_dns_migrate_legacy",
|
|
"singbox_dns_migrate_strict",
|
|
):
|
|
if key in src_cfg:
|
|
cfg[key] = json.loads(json.dumps(src_cfg.get(key)))
|
|
|
|
cid = str(client_id or "").strip()
|
|
if not cid:
|
|
return cfg
|
|
|
|
for key in ("profile", "profile_id", "singbox_profile_id"):
|
|
cfg.pop(key, None)
|
|
|
|
config_path = f"/etc/selective-vpn/transports/{cid}/singbox.json"
|
|
cfg["config_path"] = config_path
|
|
cfg["singbox_config_path"] = config_path
|
|
|
|
runner = str(cfg.get("runner") or "").strip().lower()
|
|
if not runner:
|
|
cfg["runner"] = "systemd"
|
|
runner = "systemd"
|
|
|
|
if runner == "systemd":
|
|
cfg["unit"] = "singbox@.service"
|
|
|
|
if "runtime_mode" not in cfg:
|
|
cfg["runtime_mode"] = "exec"
|
|
if "require_binary" not in cfg:
|
|
cfg["require_binary"] = True
|
|
|
|
cfg["profile_id"] = cid
|
|
cfg["protocol"] = self._normalized_seed_protocol(protocol)
|
|
return cfg
|
|
|
|
def _normalized_seed_protocol(self, protocol: str) -> str:
|
|
proto = str(protocol or "vless").strip().lower() or "vless"
|
|
if proto not in SINGBOX_EDITOR_PROTOCOL_IDS:
|
|
proto = "vless"
|
|
return proto
|
|
|
|
def _protocol_seed_spec(self, protocol: str) -> dict[str, Any]:
|
|
proto = self._normalized_seed_protocol(protocol)
|
|
spec = SINGBOX_PROTOCOL_SEED_SPEC.get(proto) or SINGBOX_PROTOCOL_SEED_SPEC.get("vless") or {}
|
|
if not isinstance(spec, dict):
|
|
spec = {}
|
|
return dict(spec)
|
|
|
|
def _seed_editor_values_for_protocol(self, protocol: str, *, profile_name: str = "") -> dict[str, Any]:
|
|
proto = self._normalized_seed_protocol(protocol)
|
|
spec = self._protocol_seed_spec(proto)
|
|
security = str(spec.get("security") or "none").strip().lower() or "none"
|
|
port = int(spec.get("port") or (51820 if proto == "wireguard" else 443))
|
|
return {
|
|
"profile_name": str(profile_name or "").strip(),
|
|
"enabled": True,
|
|
"protocol": proto,
|
|
"server": "",
|
|
"port": port,
|
|
"uuid": "",
|
|
"password": "",
|
|
"flow": "",
|
|
"packet_encoding": "",
|
|
"transport": "tcp",
|
|
"path": "",
|
|
"grpc_service": "",
|
|
"security": security,
|
|
"sni": "",
|
|
"utls_fp": "",
|
|
"reality_public_key": "",
|
|
"reality_short_id": "",
|
|
"tls_insecure": False,
|
|
"sniff": True,
|
|
"ss_method": "aes-128-gcm",
|
|
"ss_plugin": "",
|
|
"hy2_up_mbps": 0,
|
|
"hy2_down_mbps": 0,
|
|
"hy2_obfs": "",
|
|
"hy2_obfs_password": "",
|
|
"tuic_congestion": "",
|
|
"tuic_udp_mode": "",
|
|
"tuic_zero_rtt": False,
|
|
"wg_private_key": "",
|
|
"wg_peer_public_key": "",
|
|
"wg_psk": "",
|
|
"wg_local_address": "",
|
|
"wg_reserved": "",
|
|
"wg_mtu": 0,
|
|
}
|
|
|
|
def _seed_raw_config_for_protocol(self, protocol: str) -> dict[str, Any]:
|
|
proto = self._normalized_seed_protocol(protocol)
|
|
spec = self._protocol_seed_spec(proto)
|
|
port = int(spec.get("port") or (51820 if proto == "wireguard" else 443))
|
|
proxy: dict[str, Any] = {
|
|
"type": proto,
|
|
"tag": "proxy",
|
|
"server": "",
|
|
"server_port": port,
|
|
}
|
|
proxy_defaults = spec.get("proxy_defaults") or {}
|
|
if isinstance(proxy_defaults, dict):
|
|
for key, value in proxy_defaults.items():
|
|
proxy[key] = json.loads(json.dumps(value))
|
|
|
|
tls_security = str(spec.get("tls_security") or "").strip().lower()
|
|
if tls_security in ("tls", "reality"):
|
|
self._apply_proxy_tls(proxy, security=tls_security)
|
|
return self._build_singbox_raw_config_from_proxy(proxy, sniff=True)
|
|
|
|
def _parse_wg_reserved_values(self, raw_values: list[str], *, strict: bool) -> list[int]:
|
|
vals = [str(x).strip() for x in list(raw_values or []) if str(x).strip()]
|
|
if len(vals) > 3:
|
|
if strict:
|
|
raise RuntimeError("WG reserved accepts up to 3 values (0..255)")
|
|
vals = vals[:3]
|
|
|
|
out: list[int] = []
|
|
for token in vals:
|
|
try:
|
|
num = int(token)
|
|
except Exception:
|
|
if strict:
|
|
raise RuntimeError(f"WG reserved value '{token}' is not an integer")
|
|
continue
|
|
if num < 0 or num > 255:
|
|
if strict:
|
|
raise RuntimeError(f"WG reserved value '{token}' must be in range 0..255")
|
|
continue
|
|
out.append(num)
|
|
return out
|
|
|
|
def _query_value(self, query: dict[str, list[str]], *keys: str) -> str:
|
|
for k in keys:
|
|
vals = query.get(str(k or "").strip())
|
|
if not vals:
|
|
continue
|
|
v = str(vals[0] or "").strip()
|
|
if v:
|
|
return unquote(v)
|
|
return ""
|
|
|
|
def _query_bool(self, query: dict[str, list[str]], *keys: str) -> bool:
|
|
v = self._query_value(query, *keys).strip().lower()
|
|
return v in ("1", "true", "yes", "on")
|
|
|
|
def _query_csv(self, query: dict[str, list[str]], *keys: str) -> list[str]:
|
|
raw = self._query_value(query, *keys)
|
|
if not raw:
|
|
return []
|
|
out: list[str] = []
|
|
for p in raw.split(","):
|
|
val = str(p or "").strip()
|
|
if val:
|
|
out.append(val)
|
|
return out
|
|
|
|
def _normalize_link_transport(self, value: str) -> str:
|
|
v = str(value or "").strip().lower() or "tcp"
|
|
if v == "raw":
|
|
v = "tcp"
|
|
if v in ("h2", "http2"):
|
|
v = "http"
|
|
if v not in ("tcp", "ws", "grpc", "http", "httpupgrade", "quic"):
|
|
v = "tcp"
|
|
return v
|
|
|
|
def _b64_urlsafe_decode(self, value: str) -> str:
|
|
raw = str(value or "").strip()
|
|
if not raw:
|
|
return ""
|
|
pad = "=" * ((4 - (len(raw) % 4)) % 4)
|
|
try:
|
|
data = base64.urlsafe_b64decode((raw + pad).encode("utf-8"))
|
|
return data.decode("utf-8", errors="replace")
|
|
except (binascii.Error, ValueError):
|
|
return ""
|
|
|
|
def _apply_proxy_transport(
|
|
self,
|
|
proxy: dict[str, Any],
|
|
*,
|
|
transport: str,
|
|
path: str = "",
|
|
grpc_service: str = "",
|
|
) -> None:
|
|
t = self._normalize_link_transport(transport)
|
|
if t in ("", "tcp"):
|
|
proxy.pop("transport", None)
|
|
return
|
|
tx: dict[str, Any] = {"type": t}
|
|
if t in ("ws", "http", "httpupgrade"):
|
|
tx["path"] = str(path or "/").strip() or "/"
|
|
if t == "grpc":
|
|
tx["service_name"] = str(grpc_service or "").strip()
|
|
proxy["transport"] = tx
|
|
|
|
def _apply_proxy_tls(
|
|
self,
|
|
proxy: dict[str, Any],
|
|
*,
|
|
security: str,
|
|
sni: str = "",
|
|
utls_fp: str = "",
|
|
tls_insecure: bool = False,
|
|
reality_public_key: str = "",
|
|
reality_short_id: str = "",
|
|
alpn: list[str] | None = None,
|
|
) -> None:
|
|
sec = str(security or "").strip().lower()
|
|
if sec not in ("none", "tls", "reality"):
|
|
sec = "none"
|
|
if sec == "none":
|
|
proxy.pop("tls", None)
|
|
return
|
|
tls: dict[str, Any] = {
|
|
"enabled": True,
|
|
"insecure": bool(tls_insecure),
|
|
}
|
|
if str(sni or "").strip():
|
|
tls["server_name"] = str(sni).strip()
|
|
if str(utls_fp or "").strip():
|
|
tls["utls"] = {"enabled": True, "fingerprint": str(utls_fp).strip().lower()}
|
|
alpn_vals = [str(x).strip() for x in list(alpn or []) if str(x).strip()]
|
|
if alpn_vals:
|
|
tls["alpn"] = alpn_vals
|
|
if sec == "reality":
|
|
reality: dict[str, Any] = {
|
|
"enabled": True,
|
|
"public_key": str(reality_public_key or "").strip(),
|
|
}
|
|
sid = str(reality_short_id or "").strip()
|
|
if sid:
|
|
reality["short_id"] = sid
|
|
tls["reality"] = reality
|
|
proxy["tls"] = tls
|
|
|
|
def _build_singbox_raw_config_from_proxy(
|
|
self,
|
|
proxy: dict[str, Any],
|
|
*,
|
|
sniff: bool = True,
|
|
) -> dict[str, Any]:
|
|
return {
|
|
"inbounds": [
|
|
{
|
|
"type": "socks",
|
|
"tag": "socks-in",
|
|
"listen": "127.0.0.1",
|
|
"listen_port": 10808,
|
|
"sniff": bool(sniff),
|
|
"sniff_override_destination": bool(sniff),
|
|
}
|
|
],
|
|
"outbounds": [
|
|
proxy,
|
|
{"type": "direct", "tag": "direct"},
|
|
],
|
|
"route": {
|
|
"final": "direct",
|
|
"rules": [
|
|
{"inbound": ["socks-in"], "outbound": "proxy"},
|
|
],
|
|
},
|
|
}
|