179 lines
5.2 KiB
Python
179 lines
5.2 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
from dataclasses import dataclass
|
|
from typing import Any
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class TransportProtocolInfo:
|
|
protocol: str = ""
|
|
transport: str = ""
|
|
security: str = ""
|
|
|
|
def summary(self) -> str:
|
|
proto = self.protocol if self.protocol else "n/a"
|
|
transport = self.transport if self.transport else "n/a"
|
|
security = self.security if self.security else "n/a"
|
|
return f"{proto} / {transport} / {security}"
|
|
|
|
|
|
def transport_protocol_info(client: Any) -> TransportProtocolInfo:
|
|
cfg = _as_dict(getattr(client, "config", {}) or {})
|
|
protocol = _first_non_empty(
|
|
cfg.get("protocol"),
|
|
cfg.get("profile_protocol"),
|
|
cfg.get("outbound"),
|
|
cfg.get("type"),
|
|
).lower()
|
|
transport = _first_non_empty(
|
|
cfg.get("transport"),
|
|
cfg.get("network"),
|
|
cfg.get("stream"),
|
|
).lower()
|
|
security = _normalize_security(
|
|
_first_non_empty(
|
|
cfg.get("security"),
|
|
cfg.get("tls_security"),
|
|
cfg.get("security_mode"),
|
|
)
|
|
)
|
|
|
|
if not protocol or not transport or not security:
|
|
raw_cfg = _load_raw_config_from_client_config(cfg)
|
|
if raw_cfg:
|
|
p2, t2, s2 = _infer_from_raw_config(raw_cfg)
|
|
if not protocol and p2:
|
|
protocol = p2
|
|
if not transport and t2:
|
|
transport = t2
|
|
if not security and s2:
|
|
security = s2
|
|
|
|
if protocol and not transport:
|
|
transport = _default_transport_for_protocol(protocol)
|
|
if protocol and not security:
|
|
security = "none"
|
|
return TransportProtocolInfo(protocol=protocol, transport=transport, security=security)
|
|
|
|
|
|
def transport_protocol_summary(client: Any) -> str:
|
|
return transport_protocol_info(client).summary()
|
|
|
|
|
|
def _infer_from_raw_config(raw_cfg: dict[str, Any]) -> tuple[str, str, str]:
|
|
outbounds = raw_cfg.get("outbounds") or []
|
|
if isinstance(outbounds, list):
|
|
for row in outbounds:
|
|
if not isinstance(row, dict):
|
|
continue
|
|
out_type = str(row.get("type") or "").strip().lower()
|
|
if not out_type or out_type in ("direct", "block", "dns"):
|
|
continue
|
|
tx = ""
|
|
transport_obj = row.get("transport")
|
|
if isinstance(transport_obj, dict):
|
|
tx = str(transport_obj.get("type") or "").strip().lower()
|
|
if not tx:
|
|
tx = str(row.get("network") or "").strip().lower()
|
|
sec = _extract_security(row)
|
|
return out_type, tx, sec
|
|
|
|
inbounds = raw_cfg.get("inbounds") or []
|
|
if isinstance(inbounds, list):
|
|
for row in inbounds:
|
|
if not isinstance(row, dict):
|
|
continue
|
|
in_type = str(row.get("type") or "").strip().lower()
|
|
if not in_type:
|
|
continue
|
|
network = str(row.get("network") or "").strip().lower()
|
|
sec = _extract_security(row)
|
|
return in_type, network, sec
|
|
|
|
return "", "", ""
|
|
|
|
|
|
def _load_raw_config_from_client_config(cfg: dict[str, Any]) -> dict[str, Any]:
|
|
path = _first_non_empty(
|
|
cfg.get("config_path"),
|
|
cfg.get("singbox_config_path"),
|
|
cfg.get("raw_config_path"),
|
|
)
|
|
if not path:
|
|
return {}
|
|
try:
|
|
with open(path, "r", encoding="utf-8") as f:
|
|
parsed = json.load(f)
|
|
except Exception:
|
|
return {}
|
|
if not isinstance(parsed, dict):
|
|
return {}
|
|
return parsed
|
|
|
|
|
|
def _as_dict(raw: Any) -> dict[str, Any]:
|
|
return raw if isinstance(raw, dict) else {}
|
|
|
|
|
|
def _normalize_security(value: str) -> str:
|
|
sec = str(value or "").strip().lower()
|
|
if not sec:
|
|
return ""
|
|
aliases = {
|
|
"off": "none",
|
|
"disabled": "none",
|
|
"plain": "none",
|
|
"reality-tls": "reality",
|
|
"xtls": "tls",
|
|
}
|
|
return aliases.get(sec, sec)
|
|
|
|
|
|
def _extract_security(node: dict[str, Any]) -> str:
|
|
sec = _normalize_security(_first_non_empty(node.get("security"), node.get("tls_security")))
|
|
if sec:
|
|
return sec
|
|
|
|
tls = _as_dict(node.get("tls"))
|
|
if not tls:
|
|
return ""
|
|
enabled_raw = tls.get("enabled")
|
|
if enabled_raw is False:
|
|
return "none"
|
|
|
|
reality = _as_dict(tls.get("reality"))
|
|
if reality:
|
|
if _truthy(reality.get("enabled")):
|
|
return "reality"
|
|
if _first_non_empty(reality.get("public_key"), reality.get("short_id"), reality.get("short_ids")):
|
|
return "reality"
|
|
return "tls"
|
|
|
|
|
|
def _truthy(raw: Any) -> bool:
|
|
if isinstance(raw, bool):
|
|
return raw
|
|
if isinstance(raw, int):
|
|
return raw != 0
|
|
if isinstance(raw, str):
|
|
return raw.strip().lower() in ("1", "true", "yes", "on")
|
|
return False
|
|
|
|
|
|
def _default_transport_for_protocol(protocol: str) -> str:
|
|
p = str(protocol or "").strip().lower()
|
|
if p in ("vless", "trojan", "shadowsocks", "socks", "http"):
|
|
return "tcp"
|
|
if p in ("wireguard", "hysteria2", "tuic"):
|
|
return "udp"
|
|
return ""
|
|
|
|
|
|
def _first_non_empty(*values: Any) -> str:
|
|
for value in values:
|
|
s = str(value or "").strip()
|
|
if s:
|
|
return s
|
|
return ""
|