from __future__ import annotations import re from urllib.parse import parse_qs, unquote, urlsplit from typing import Any class SingBoxLinksParsersMixin: def _parse_vless_link_payload(self, link: str) -> dict[str, Any]: u = urlsplit(link) query = parse_qs(u.query or "", keep_blank_values=True) uuid = unquote(str(u.username or "").strip()) host = str(u.hostname or "").strip() if not uuid: raise RuntimeError("VLESS link has no UUID") if not host: raise RuntimeError("VLESS link has no host") try: port = int(u.port or 443) except Exception: port = 443 transport = self._normalize_link_transport(self._query_value(query, "type", "transport")) security = self._query_value(query, "security").strip().lower() or "none" if security == "xtls": security = "tls" if security not in ("none", "tls", "reality"): security = "none" path = self._query_value(query, "path", "spx") if not path and str(u.path or "").strip() not in ("", "/"): path = unquote(str(u.path or "").strip()) grpc_service = self._query_value(query, "serviceName", "service_name") if transport == "grpc" and not grpc_service: grpc_service = self._query_value(query, "path") flow = self._query_value(query, "flow") packet_encoding = self._query_value(query, "packetEncoding", "packet_encoding").strip().lower() if packet_encoding in ("none", "off", "false"): packet_encoding = "" sni = self._query_value(query, "sni", "host") utls_fp = self._query_value(query, "fp", "fingerprint") reality_pk = self._query_value(query, "pbk", "public_key") reality_sid = self._query_value(query, "sid", "short_id") tls_insecure = self._query_bool(query, "allowInsecure", "insecure") profile_name = unquote(str(u.fragment or "").strip()) or host proxy: dict[str, Any] = { "type": "vless", "tag": "proxy", "server": host, "server_port": port, "uuid": uuid, } if packet_encoding: proxy["packet_encoding"] = packet_encoding if flow: proxy["flow"] = flow self._apply_proxy_transport(proxy, transport=transport, path=path, grpc_service=grpc_service) self._apply_proxy_tls( proxy, security=security, sni=sni, utls_fp=utls_fp, tls_insecure=tls_insecure, reality_public_key=reality_pk, reality_short_id=reality_sid, ) editor_values = { "profile_name": profile_name, "enabled": True, "server": host, "port": port, "uuid": uuid, "flow": flow, "packet_encoding": packet_encoding, "transport": transport, "path": path, "grpc_service": grpc_service, "security": security, "sni": sni, "utls_fp": utls_fp, "reality_public_key": reality_pk, "reality_short_id": reality_sid, "tls_insecure": tls_insecure, "sniff": True, } return { "protocol": "vless", "profile_name": profile_name, "raw_config": self._build_singbox_raw_config_from_proxy(proxy, sniff=True), "editor_values": editor_values, } def _parse_trojan_link_payload(self, link: str) -> dict[str, Any]: u = urlsplit(link) query = parse_qs(u.query or "", keep_blank_values=True) password = unquote(str(u.username or "").strip()) or self._query_value(query, "password") host = str(u.hostname or "").strip() if not password: raise RuntimeError("Trojan link has no password") if not host: raise RuntimeError("Trojan link has no host") try: port = int(u.port or 443) except Exception: port = 443 transport = self._normalize_link_transport(self._query_value(query, "type", "transport")) path = self._query_value(query, "path") grpc_service = self._query_value(query, "serviceName", "service_name") security = self._query_value(query, "security").strip().lower() or "tls" if security not in ("none", "tls"): security = "tls" sni = self._query_value(query, "sni", "host") utls_fp = self._query_value(query, "fp", "fingerprint") tls_insecure = self._query_bool(query, "allowInsecure", "insecure") alpn = self._query_csv(query, "alpn") profile_name = unquote(str(u.fragment or "").strip()) or host proxy: dict[str, Any] = { "type": "trojan", "tag": "proxy", "server": host, "server_port": port, "password": password, } self._apply_proxy_transport(proxy, transport=transport, path=path, grpc_service=grpc_service) self._apply_proxy_tls( proxy, security=security, sni=sni, utls_fp=utls_fp, tls_insecure=tls_insecure, alpn=alpn, ) return { "protocol": "trojan", "profile_name": profile_name, "raw_config": self._build_singbox_raw_config_from_proxy(proxy, sniff=True), } def _parse_ss_link_payload(self, link: str) -> dict[str, Any]: raw = str(link or "").strip() u = urlsplit(raw) query = parse_qs(u.query or "", keep_blank_values=True) profile_name = unquote(str(u.fragment or "").strip()) or "Shadowsocks" body = raw[len("ss://"):] body = body.split("#", 1)[0] body = body.split("?", 1)[0] method = "" password = "" host_port = "" if "@" in body: left, host_port = body.rsplit("@", 1) creds = left if ":" not in creds: creds = self._b64_urlsafe_decode(creds) if ":" not in creds: raise RuntimeError("Shadowsocks link has invalid credentials") method, password = creds.split(":", 1) else: decoded = self._b64_urlsafe_decode(body) if "@" not in decoded: raise RuntimeError("Shadowsocks link has invalid payload") creds, host_port = decoded.rsplit("@", 1) if ":" not in creds: raise RuntimeError("Shadowsocks link has invalid credentials") method, password = creds.split(":", 1) hp = urlsplit("//" + host_port) host = str(hp.hostname or "").strip() if not host: raise RuntimeError("Shadowsocks link has no host") try: port = int(hp.port or 8388) except Exception: port = 8388 proxy: dict[str, Any] = { "type": "shadowsocks", "tag": "proxy", "server": host, "server_port": port, "method": str(method or "").strip(), "password": str(password or "").strip(), } plugin = self._query_value(query, "plugin") if plugin: proxy["plugin"] = plugin return { "protocol": "shadowsocks", "profile_name": profile_name, "raw_config": self._build_singbox_raw_config_from_proxy(proxy, sniff=True), } def _parse_hysteria2_link_payload(self, link: str) -> dict[str, Any]: u = urlsplit(link) query = parse_qs(u.query or "", keep_blank_values=True) password = unquote(str(u.username or "").strip()) or self._query_value(query, "password") host = str(u.hostname or "").strip() if not password: raise RuntimeError("Hysteria2 link has no password") if not host: raise RuntimeError("Hysteria2 link has no host") try: port = int(u.port or 443) except Exception: port = 443 profile_name = unquote(str(u.fragment or "").strip()) or host proxy: dict[str, Any] = { "type": "hysteria2", "tag": "proxy", "server": host, "server_port": port, "password": password, } up_mbps = self._query_value(query, "up_mbps", "upmbps", "up") down_mbps = self._query_value(query, "down_mbps", "downmbps", "down") try: if up_mbps: proxy["up_mbps"] = int(float(up_mbps)) except Exception: pass try: if down_mbps: proxy["down_mbps"] = int(float(down_mbps)) except Exception: pass obfs_type = self._query_value(query, "obfs") if obfs_type: obfs: dict[str, Any] = {"type": obfs_type} obfs_pw = self._query_value(query, "obfs-password", "obfs_password") if obfs_pw: obfs["password"] = obfs_pw proxy["obfs"] = obfs self._apply_proxy_tls( proxy, security="tls", sni=self._query_value(query, "sni"), tls_insecure=self._query_bool(query, "allowInsecure", "insecure"), alpn=self._query_csv(query, "alpn"), ) return { "protocol": "hysteria2", "profile_name": profile_name, "raw_config": self._build_singbox_raw_config_from_proxy(proxy, sniff=True), } def _parse_tuic_link_payload(self, link: str) -> dict[str, Any]: u = urlsplit(link) query = parse_qs(u.query or "", keep_blank_values=True) uuid = unquote(str(u.username or "").strip()) password = unquote(str(u.password or "").strip()) host = str(u.hostname or "").strip() if not uuid: raise RuntimeError("TUIC link has no UUID") if not password: raise RuntimeError("TUIC link has no password") if not host: raise RuntimeError("TUIC link has no host") try: port = int(u.port or 443) except Exception: port = 443 profile_name = unquote(str(u.fragment or "").strip()) or host proxy: dict[str, Any] = { "type": "tuic", "tag": "proxy", "server": host, "server_port": port, "uuid": uuid, "password": password, } cc = self._query_value(query, "congestion_control", "congestion") if cc: proxy["congestion_control"] = cc udp_mode = self._query_value(query, "udp_relay_mode") if udp_mode: proxy["udp_relay_mode"] = udp_mode if self._query_bool(query, "zero_rtt_handshake", "zero_rtt"): proxy["zero_rtt_handshake"] = True self._apply_proxy_tls( proxy, security="tls", sni=self._query_value(query, "sni", "host"), utls_fp=self._query_value(query, "fp", "fingerprint"), tls_insecure=self._query_bool(query, "allowInsecure", "insecure"), alpn=self._query_csv(query, "alpn"), ) return { "protocol": "tuic", "profile_name": profile_name, "raw_config": self._build_singbox_raw_config_from_proxy(proxy, sniff=True), } def _parse_wireguard_link_payload(self, link: str) -> dict[str, Any]: u = urlsplit(link) query = parse_qs(u.query or "", keep_blank_values=True) private_key = unquote(str(u.username or "").strip()) or self._query_value(query, "private_key", "privateKey") host = str(u.hostname or "").strip() if not host: raise RuntimeError("WireGuard link has no host") if not private_key: raise RuntimeError("WireGuard link has no private key") try: port = int(u.port or 443) except Exception: port = 443 peer_public_key = self._query_value(query, "peer_public_key", "public_key", "peerPublicKey") if not peer_public_key: raise RuntimeError("WireGuard link has no peer public key") local_address = self._query_csv(query, "local_address", "address", "localAddress") if not local_address: raise RuntimeError("WireGuard link has no local address") profile_name = unquote(str(u.fragment or "").strip()) or host proxy: dict[str, Any] = { "type": "wireguard", "tag": "proxy", "server": host, "server_port": port, "private_key": private_key, "peer_public_key": peer_public_key, "local_address": local_address, } psk = self._query_value(query, "pre_shared_key", "psk", "preSharedKey") if psk: proxy["pre_shared_key"] = psk reserved_vals = self._parse_wg_reserved_values(self._query_csv(query, "reserved"), strict=True) if reserved_vals: proxy["reserved"] = reserved_vals mtu_val = self._query_value(query, "mtu") try: mtu = int(mtu_val) if mtu_val else 0 except Exception: mtu = 0 if mtu > 0: proxy["mtu"] = mtu return { "protocol": "wireguard", "profile_name": profile_name, "raw_config": self._build_singbox_raw_config_from_proxy(proxy, sniff=True), } def _extract_first_connection_link(self, text: str) -> str: raw = str(text or "").strip() if not raw: return "" m = re.search(r"(?i)(vless|trojan|ss|hysteria2|hy2|tuic|wireguard|wg)://\S+", raw) if m: return str(m.group(0) or "").strip() if "://" in raw: return raw.splitlines()[0].strip() return "" def _parse_connection_link_payload(self, text: str) -> dict[str, Any]: raw = self._extract_first_connection_link(text) if not raw: raise RuntimeError( "No supported link found. Supported schemes: " "vless:// trojan:// ss:// hysteria2:// hy2:// tuic:// wireguard:// wg://" ) u = urlsplit(raw) scheme = str(u.scheme or "").strip().lower() if scheme == "vless": return self._parse_vless_link_payload(raw) if scheme == "trojan": return self._parse_trojan_link_payload(raw) if scheme == "ss": return self._parse_ss_link_payload(raw) if scheme in ("hysteria2", "hy2"): return self._parse_hysteria2_link_payload(raw) if scheme == "tuic": return self._parse_tuic_link_payload(raw) if scheme in ("wireguard", "wg"): return self._parse_wireguard_link_payload(raw) raise RuntimeError(f"Unsupported link scheme: {scheme}")