398 lines
16 KiB
Python
398 lines
16 KiB
Python
from __future__ import annotations
|
|
|
|
from typing import Any, Dict, List, Optional, cast
|
|
|
|
from .models import *
|
|
from .utils import strip_ansi
|
|
|
|
|
|
class TrafficApiMixin:
|
|
def traffic_mode_get(self) -> TrafficModeStatus:
|
|
data = cast(Dict[str, Any], self._json(self._request("GET", "/api/v1/traffic/mode")) or {})
|
|
return self._parse_traffic_mode_status(data, fallback_mode="selective")
|
|
|
|
def traffic_mode_set(
|
|
self,
|
|
mode: str,
|
|
preferred_iface: Optional[str] = None,
|
|
auto_local_bypass: Optional[bool] = None,
|
|
ingress_reply_bypass: Optional[bool] = None,
|
|
force_vpn_subnets: Optional[List[str]] = None,
|
|
force_vpn_uids: Optional[List[str]] = None,
|
|
force_vpn_cgroups: Optional[List[str]] = None,
|
|
force_direct_subnets: Optional[List[str]] = None,
|
|
force_direct_uids: Optional[List[str]] = None,
|
|
force_direct_cgroups: Optional[List[str]] = None,
|
|
) -> TrafficModeStatus:
|
|
m = str(mode or "").strip().lower()
|
|
if m not in ("selective", "full_tunnel", "direct"):
|
|
raise ValueError(f"Invalid traffic mode: {mode}")
|
|
payload: Dict[str, Any] = {"mode": m}
|
|
if preferred_iface is not None:
|
|
payload["preferred_iface"] = str(preferred_iface).strip()
|
|
if auto_local_bypass is not None:
|
|
payload["auto_local_bypass"] = bool(auto_local_bypass)
|
|
if ingress_reply_bypass is not None:
|
|
payload["ingress_reply_bypass"] = bool(ingress_reply_bypass)
|
|
if force_vpn_subnets is not None:
|
|
payload["force_vpn_subnets"] = [str(x) for x in force_vpn_subnets]
|
|
if force_vpn_uids is not None:
|
|
payload["force_vpn_uids"] = [str(x) for x in force_vpn_uids]
|
|
if force_vpn_cgroups is not None:
|
|
payload["force_vpn_cgroups"] = [str(x) for x in force_vpn_cgroups]
|
|
if force_direct_subnets is not None:
|
|
payload["force_direct_subnets"] = [str(x) for x in force_direct_subnets]
|
|
if force_direct_uids is not None:
|
|
payload["force_direct_uids"] = [str(x) for x in force_direct_uids]
|
|
if force_direct_cgroups is not None:
|
|
payload["force_direct_cgroups"] = [str(x) for x in force_direct_cgroups]
|
|
data = cast(
|
|
Dict[str, Any],
|
|
self._json(
|
|
self._request(
|
|
"POST",
|
|
"/api/v1/traffic/mode",
|
|
json_body=payload,
|
|
)
|
|
)
|
|
or {},
|
|
)
|
|
return self._parse_traffic_mode_status(data, fallback_mode=m)
|
|
|
|
def traffic_mode_test(self) -> TrafficModeStatus:
|
|
data = cast(
|
|
Dict[str, Any],
|
|
self._json(self._request("GET", "/api/v1/traffic/mode/test")) or {},
|
|
)
|
|
return self._parse_traffic_mode_status(data, fallback_mode="selective")
|
|
|
|
def traffic_advanced_reset(self) -> TrafficModeStatus:
|
|
data = cast(
|
|
Dict[str, Any],
|
|
self._json(self._request("POST", "/api/v1/traffic/advanced/reset")) or {},
|
|
)
|
|
return self._parse_traffic_mode_status(data, fallback_mode="selective")
|
|
|
|
def traffic_interfaces_get(self) -> TrafficInterfaces:
|
|
data = cast(
|
|
Dict[str, Any],
|
|
self._json(self._request("GET", "/api/v1/traffic/interfaces")) or {},
|
|
)
|
|
raw = data.get("interfaces") or []
|
|
if not isinstance(raw, list):
|
|
raw = []
|
|
return TrafficInterfaces(
|
|
interfaces=[str(x) for x in raw if str(x).strip()],
|
|
preferred_iface=str(data.get("preferred_iface") or ""),
|
|
active_iface=str(data.get("active_iface") or ""),
|
|
iface_reason=str(data.get("iface_reason") or ""),
|
|
)
|
|
|
|
def traffic_candidates_get(self) -> TrafficCandidates:
|
|
data = cast(
|
|
Dict[str, Any],
|
|
self._json(self._request("GET", "/api/v1/traffic/candidates")) or {},
|
|
)
|
|
|
|
subnets: List[TrafficCandidateSubnet] = []
|
|
for it in (data.get("subnets") or []):
|
|
if not isinstance(it, dict):
|
|
continue
|
|
cidr = str(it.get("cidr") or "").strip()
|
|
if not cidr:
|
|
continue
|
|
subnets.append(
|
|
TrafficCandidateSubnet(
|
|
cidr=cidr,
|
|
dev=str(it.get("dev") or "").strip(),
|
|
kind=str(it.get("kind") or "").strip(),
|
|
linkdown=bool(it.get("linkdown", False)),
|
|
)
|
|
)
|
|
|
|
units: List[TrafficCandidateUnit] = []
|
|
for it in (data.get("units") or []):
|
|
if not isinstance(it, dict):
|
|
continue
|
|
unit = str(it.get("unit") or "").strip()
|
|
if not unit:
|
|
continue
|
|
units.append(
|
|
TrafficCandidateUnit(
|
|
unit=unit,
|
|
description=str(it.get("description") or "").strip(),
|
|
cgroup=str(it.get("cgroup") or "").strip(),
|
|
)
|
|
)
|
|
|
|
uids: List[TrafficCandidateUID] = []
|
|
for it in (data.get("uids") or []):
|
|
if not isinstance(it, dict):
|
|
continue
|
|
try:
|
|
uid = int(it.get("uid", 0) or 0)
|
|
except Exception:
|
|
continue
|
|
user = str(it.get("user") or "").strip()
|
|
raw_ex = it.get("examples") or []
|
|
if not isinstance(raw_ex, list):
|
|
raw_ex = []
|
|
examples = [str(x) for x in raw_ex if str(x).strip()]
|
|
uids.append(TrafficCandidateUID(uid=uid, user=user, examples=examples))
|
|
|
|
return TrafficCandidates(
|
|
generated_at=str(data.get("generated_at") or ""),
|
|
subnets=subnets,
|
|
units=units,
|
|
uids=uids,
|
|
)
|
|
|
|
def traffic_appmarks_status(self) -> TrafficAppMarksStatus:
|
|
data = cast(
|
|
Dict[str, Any],
|
|
self._json(self._request("GET", "/api/v1/traffic/appmarks")) or {},
|
|
)
|
|
return TrafficAppMarksStatus(
|
|
vpn_count=int(data.get("vpn_count", 0) or 0),
|
|
direct_count=int(data.get("direct_count", 0) or 0),
|
|
message=str(data.get("message") or ""),
|
|
)
|
|
|
|
def traffic_appmarks_items(self) -> List[TrafficAppMarkItem]:
|
|
data = cast(
|
|
Dict[str, Any],
|
|
self._json(self._request("GET", "/api/v1/traffic/appmarks/items")) or {},
|
|
)
|
|
raw = data.get("items") or []
|
|
if not isinstance(raw, list):
|
|
raw = []
|
|
|
|
out: List[TrafficAppMarkItem] = []
|
|
for it in raw:
|
|
if not isinstance(it, dict):
|
|
continue
|
|
try:
|
|
mid = int(it.get("id", 0) or 0)
|
|
except Exception:
|
|
mid = 0
|
|
tgt = str(it.get("target") or "").strip().lower()
|
|
if mid <= 0 or tgt not in ("vpn", "direct"):
|
|
continue
|
|
out.append(
|
|
TrafficAppMarkItem(
|
|
id=mid,
|
|
target=tgt,
|
|
cgroup=str(it.get("cgroup") or "").strip(),
|
|
cgroup_rel=str(it.get("cgroup_rel") or "").strip(),
|
|
level=int(it.get("level", 0) or 0),
|
|
unit=str(it.get("unit") or "").strip(),
|
|
command=str(it.get("command") or "").strip(),
|
|
app_key=str(it.get("app_key") or "").strip(),
|
|
added_at=str(it.get("added_at") or "").strip(),
|
|
expires_at=str(it.get("expires_at") or "").strip(),
|
|
remaining_sec=int(it.get("remaining_sec", 0) or 0),
|
|
)
|
|
)
|
|
return out
|
|
|
|
def traffic_appmarks_apply(
|
|
self,
|
|
*,
|
|
op: str,
|
|
target: str,
|
|
cgroup: str = "",
|
|
unit: str = "",
|
|
command: str = "",
|
|
app_key: str = "",
|
|
timeout_sec: int = 0,
|
|
) -> TrafficAppMarksResult:
|
|
payload: Dict[str, Any] = {
|
|
"op": str(op or "").strip().lower(),
|
|
"target": str(target or "").strip().lower(),
|
|
}
|
|
if cgroup:
|
|
payload["cgroup"] = str(cgroup).strip()
|
|
if unit:
|
|
payload["unit"] = str(unit).strip()
|
|
if command:
|
|
payload["command"] = str(command).strip()
|
|
if app_key:
|
|
payload["app_key"] = str(app_key).strip()
|
|
if int(timeout_sec or 0) > 0:
|
|
payload["timeout_sec"] = int(timeout_sec)
|
|
|
|
data = cast(
|
|
Dict[str, Any],
|
|
self._json(self._request("POST", "/api/v1/traffic/appmarks", json_body=payload))
|
|
or {},
|
|
)
|
|
return TrafficAppMarksResult(
|
|
ok=bool(data.get("ok", False)),
|
|
message=str(data.get("message") or ""),
|
|
op=str(data.get("op") or payload["op"]),
|
|
target=str(data.get("target") or payload["target"]),
|
|
cgroup=str(data.get("cgroup") or payload.get("cgroup") or ""),
|
|
cgroup_id=int(data.get("cgroup_id", 0) or 0),
|
|
timeout_sec=int(data.get("timeout_sec", 0) or 0),
|
|
)
|
|
|
|
def traffic_app_profiles_list(self) -> List[TrafficAppProfile]:
|
|
data = cast(
|
|
Dict[str, Any],
|
|
self._json(self._request("GET", "/api/v1/traffic/app-profiles")) or {},
|
|
)
|
|
raw = data.get("profiles") or []
|
|
if not isinstance(raw, list):
|
|
raw = []
|
|
|
|
out: List[TrafficAppProfile] = []
|
|
for it in raw:
|
|
if not isinstance(it, dict):
|
|
continue
|
|
pid = str(it.get("id") or "").strip()
|
|
if not pid:
|
|
continue
|
|
out.append(
|
|
TrafficAppProfile(
|
|
id=pid,
|
|
name=str(it.get("name") or "").strip(),
|
|
app_key=str(it.get("app_key") or "").strip(),
|
|
command=str(it.get("command") or "").strip(),
|
|
target=str(it.get("target") or "").strip().lower(),
|
|
ttl_sec=int(it.get("ttl_sec", 0) or 0),
|
|
vpn_profile=str(it.get("vpn_profile") or "").strip(),
|
|
created_at=str(it.get("created_at") or "").strip(),
|
|
updated_at=str(it.get("updated_at") or "").strip(),
|
|
)
|
|
)
|
|
return out
|
|
|
|
def traffic_app_profile_upsert(
|
|
self,
|
|
*,
|
|
id: str = "",
|
|
name: str = "",
|
|
app_key: str = "",
|
|
command: str,
|
|
target: str,
|
|
ttl_sec: int = 0,
|
|
vpn_profile: str = "",
|
|
) -> TrafficAppProfileSaveResult:
|
|
payload: Dict[str, Any] = {
|
|
"command": str(command or "").strip(),
|
|
"target": str(target or "").strip().lower(),
|
|
}
|
|
if id:
|
|
payload["id"] = str(id).strip()
|
|
if name:
|
|
payload["name"] = str(name).strip()
|
|
if app_key:
|
|
payload["app_key"] = str(app_key).strip()
|
|
if int(ttl_sec or 0) > 0:
|
|
payload["ttl_sec"] = int(ttl_sec)
|
|
if vpn_profile:
|
|
payload["vpn_profile"] = str(vpn_profile).strip()
|
|
|
|
data = cast(
|
|
Dict[str, Any],
|
|
self._json(
|
|
self._request("POST", "/api/v1/traffic/app-profiles", json_body=payload)
|
|
)
|
|
or {},
|
|
)
|
|
msg = str(data.get("message") or "")
|
|
raw = data.get("profiles") or []
|
|
if not isinstance(raw, list):
|
|
raw = []
|
|
prof: Optional[TrafficAppProfile] = None
|
|
if raw and isinstance(raw[0], dict):
|
|
it = cast(Dict[str, Any], raw[0])
|
|
pid = str(it.get("id") or "").strip()
|
|
if pid:
|
|
prof = TrafficAppProfile(
|
|
id=pid,
|
|
name=str(it.get("name") or "").strip(),
|
|
app_key=str(it.get("app_key") or "").strip(),
|
|
command=str(it.get("command") or "").strip(),
|
|
target=str(it.get("target") or "").strip().lower(),
|
|
ttl_sec=int(it.get("ttl_sec", 0) or 0),
|
|
vpn_profile=str(it.get("vpn_profile") or "").strip(),
|
|
created_at=str(it.get("created_at") or "").strip(),
|
|
updated_at=str(it.get("updated_at") or "").strip(),
|
|
)
|
|
|
|
ok = bool(prof) and (msg.strip().lower() in ("saved", "ok"))
|
|
if not msg and ok:
|
|
msg = "saved"
|
|
return TrafficAppProfileSaveResult(ok=ok, message=msg, profile=prof)
|
|
|
|
def traffic_app_profile_delete(self, id: str) -> CmdResult:
|
|
pid = str(id or "").strip()
|
|
if not pid:
|
|
raise ValueError("missing id")
|
|
data = cast(
|
|
Dict[str, Any],
|
|
self._json(
|
|
self._request("DELETE", "/api/v1/traffic/app-profiles", params={"id": pid})
|
|
)
|
|
or {},
|
|
)
|
|
return CmdResult(
|
|
ok=bool(data.get("ok", False)),
|
|
message=str(data.get("message") or ""),
|
|
exit_code=None,
|
|
stdout="",
|
|
stderr="",
|
|
)
|
|
|
|
def traffic_audit_get(self) -> TrafficAudit:
|
|
data = cast(
|
|
Dict[str, Any],
|
|
self._json(self._request("GET", "/api/v1/traffic/audit")) or {},
|
|
)
|
|
raw_issues = data.get("issues") or []
|
|
if not isinstance(raw_issues, list):
|
|
raw_issues = []
|
|
return TrafficAudit(
|
|
ok=bool(data.get("ok", False)),
|
|
message=strip_ansi(str(data.get("message") or "").strip()),
|
|
now=str(data.get("now") or "").strip(),
|
|
pretty=strip_ansi(str(data.get("pretty") or "").strip()),
|
|
issues=[strip_ansi(str(x)).strip() for x in raw_issues if str(x).strip()],
|
|
)
|
|
|
|
def _parse_traffic_mode_status(self, data: Dict[str, Any], *, fallback_mode: str) -> TrafficModeStatus:
|
|
mode = str(data.get("mode") or fallback_mode or "selective")
|
|
return TrafficModeStatus(
|
|
mode=mode,
|
|
desired_mode=str(data.get("desired_mode") or data.get("mode") or mode),
|
|
applied_mode=str(data.get("applied_mode") or "direct"),
|
|
preferred_iface=str(data.get("preferred_iface") or ""),
|
|
advanced_active=bool(data.get("advanced_active", False)),
|
|
auto_local_bypass=bool(data.get("auto_local_bypass", True)),
|
|
auto_local_active=bool(data.get("auto_local_active", False)),
|
|
ingress_reply_bypass=bool(data.get("ingress_reply_bypass", False)),
|
|
ingress_reply_active=bool(data.get("ingress_reply_active", False)),
|
|
bypass_candidates=int(data.get("bypass_candidates", 0) or 0),
|
|
force_vpn_subnets=[str(x) for x in (data.get("force_vpn_subnets") or []) if str(x).strip()],
|
|
force_vpn_uids=[str(x) for x in (data.get("force_vpn_uids") or []) if str(x).strip()],
|
|
force_vpn_cgroups=[str(x) for x in (data.get("force_vpn_cgroups") or []) if str(x).strip()],
|
|
force_direct_subnets=[str(x) for x in (data.get("force_direct_subnets") or []) if str(x).strip()],
|
|
force_direct_uids=[str(x) for x in (data.get("force_direct_uids") or []) if str(x).strip()],
|
|
force_direct_cgroups=[str(x) for x in (data.get("force_direct_cgroups") or []) if str(x).strip()],
|
|
overrides_applied=int(data.get("overrides_applied", 0) or 0),
|
|
cgroup_resolved_uids=int(data.get("cgroup_resolved_uids", 0) or 0),
|
|
cgroup_warning=str(data.get("cgroup_warning") or ""),
|
|
active_iface=str(data.get("active_iface") or ""),
|
|
iface_reason=str(data.get("iface_reason") or ""),
|
|
rule_mark=bool(data.get("rule_mark", False)),
|
|
rule_full=bool(data.get("rule_full", False)),
|
|
ingress_rule_present=bool(data.get("ingress_rule_present", False)),
|
|
ingress_nft_active=bool(data.get("ingress_nft_active", False)),
|
|
table_default=bool(data.get("table_default", False)),
|
|
probe_ok=bool(data.get("probe_ok", False)),
|
|
probe_message=str(data.get("probe_message") or ""),
|
|
healthy=bool(data.get("healthy", False)),
|
|
message=str(data.get("message") or ""),
|
|
)
|