from __future__ import annotations from typing import Any, Dict, List, Optional, cast from .models import * from .utils import strip_ansi class TrafficApiMixin: def traffic_mode_get(self) -> TrafficModeStatus: data = cast(Dict[str, Any], self._json(self._request("GET", "/api/v1/traffic/mode")) or {}) return self._parse_traffic_mode_status(data, fallback_mode="selective") def traffic_mode_set( self, mode: str, preferred_iface: Optional[str] = None, auto_local_bypass: Optional[bool] = None, ingress_reply_bypass: Optional[bool] = None, force_vpn_subnets: Optional[List[str]] = None, force_vpn_uids: Optional[List[str]] = None, force_vpn_cgroups: Optional[List[str]] = None, force_direct_subnets: Optional[List[str]] = None, force_direct_uids: Optional[List[str]] = None, force_direct_cgroups: Optional[List[str]] = None, ) -> TrafficModeStatus: m = str(mode or "").strip().lower() if m not in ("selective", "full_tunnel", "direct"): raise ValueError(f"Invalid traffic mode: {mode}") payload: Dict[str, Any] = {"mode": m} if preferred_iface is not None: payload["preferred_iface"] = str(preferred_iface).strip() if auto_local_bypass is not None: payload["auto_local_bypass"] = bool(auto_local_bypass) if ingress_reply_bypass is not None: payload["ingress_reply_bypass"] = bool(ingress_reply_bypass) if force_vpn_subnets is not None: payload["force_vpn_subnets"] = [str(x) for x in force_vpn_subnets] if force_vpn_uids is not None: payload["force_vpn_uids"] = [str(x) for x in force_vpn_uids] if force_vpn_cgroups is not None: payload["force_vpn_cgroups"] = [str(x) for x in force_vpn_cgroups] if force_direct_subnets is not None: payload["force_direct_subnets"] = [str(x) for x in force_direct_subnets] if force_direct_uids is not None: payload["force_direct_uids"] = [str(x) for x in force_direct_uids] if force_direct_cgroups is not None: payload["force_direct_cgroups"] = [str(x) for x in force_direct_cgroups] data = cast( Dict[str, Any], self._json( self._request( "POST", "/api/v1/traffic/mode", json_body=payload, ) ) or {}, ) return self._parse_traffic_mode_status(data, fallback_mode=m) def traffic_mode_test(self) -> TrafficModeStatus: data = cast( Dict[str, Any], self._json(self._request("GET", "/api/v1/traffic/mode/test")) or {}, ) return self._parse_traffic_mode_status(data, fallback_mode="selective") def traffic_advanced_reset(self) -> TrafficModeStatus: data = cast( Dict[str, Any], self._json(self._request("POST", "/api/v1/traffic/advanced/reset")) or {}, ) return self._parse_traffic_mode_status(data, fallback_mode="selective") def traffic_interfaces_get(self) -> TrafficInterfaces: data = cast( Dict[str, Any], self._json(self._request("GET", "/api/v1/traffic/interfaces")) or {}, ) raw = data.get("interfaces") or [] if not isinstance(raw, list): raw = [] return TrafficInterfaces( interfaces=[str(x) for x in raw if str(x).strip()], preferred_iface=str(data.get("preferred_iface") or ""), active_iface=str(data.get("active_iface") or ""), iface_reason=str(data.get("iface_reason") or ""), ) def traffic_candidates_get(self) -> TrafficCandidates: data = cast( Dict[str, Any], self._json(self._request("GET", "/api/v1/traffic/candidates")) or {}, ) subnets: List[TrafficCandidateSubnet] = [] for it in (data.get("subnets") or []): if not isinstance(it, dict): continue cidr = str(it.get("cidr") or "").strip() if not cidr: continue subnets.append( TrafficCandidateSubnet( cidr=cidr, dev=str(it.get("dev") or "").strip(), kind=str(it.get("kind") or "").strip(), linkdown=bool(it.get("linkdown", False)), ) ) units: List[TrafficCandidateUnit] = [] for it in (data.get("units") or []): if not isinstance(it, dict): continue unit = str(it.get("unit") or "").strip() if not unit: continue units.append( TrafficCandidateUnit( unit=unit, description=str(it.get("description") or "").strip(), cgroup=str(it.get("cgroup") or "").strip(), ) ) uids: List[TrafficCandidateUID] = [] for it in (data.get("uids") or []): if not isinstance(it, dict): continue try: uid = int(it.get("uid", 0) or 0) except Exception: continue user = str(it.get("user") or "").strip() raw_ex = it.get("examples") or [] if not isinstance(raw_ex, list): raw_ex = [] examples = [str(x) for x in raw_ex if str(x).strip()] uids.append(TrafficCandidateUID(uid=uid, user=user, examples=examples)) return TrafficCandidates( generated_at=str(data.get("generated_at") or ""), subnets=subnets, units=units, uids=uids, ) def traffic_appmarks_status(self) -> TrafficAppMarksStatus: data = cast( Dict[str, Any], self._json(self._request("GET", "/api/v1/traffic/appmarks")) or {}, ) return TrafficAppMarksStatus( vpn_count=int(data.get("vpn_count", 0) or 0), direct_count=int(data.get("direct_count", 0) or 0), message=str(data.get("message") or ""), ) def traffic_appmarks_items(self) -> List[TrafficAppMarkItem]: data = cast( Dict[str, Any], self._json(self._request("GET", "/api/v1/traffic/appmarks/items")) or {}, ) raw = data.get("items") or [] if not isinstance(raw, list): raw = [] out: List[TrafficAppMarkItem] = [] for it in raw: if not isinstance(it, dict): continue try: mid = int(it.get("id", 0) or 0) except Exception: mid = 0 tgt = str(it.get("target") or "").strip().lower() if mid <= 0 or tgt not in ("vpn", "direct"): continue out.append( TrafficAppMarkItem( id=mid, target=tgt, cgroup=str(it.get("cgroup") or "").strip(), cgroup_rel=str(it.get("cgroup_rel") or "").strip(), level=int(it.get("level", 0) or 0), unit=str(it.get("unit") or "").strip(), command=str(it.get("command") or "").strip(), app_key=str(it.get("app_key") or "").strip(), added_at=str(it.get("added_at") or "").strip(), expires_at=str(it.get("expires_at") or "").strip(), remaining_sec=int(it.get("remaining_sec", 0) or 0), ) ) return out def traffic_appmarks_apply( self, *, op: str, target: str, cgroup: str = "", unit: str = "", command: str = "", app_key: str = "", timeout_sec: int = 0, ) -> TrafficAppMarksResult: payload: Dict[str, Any] = { "op": str(op or "").strip().lower(), "target": str(target or "").strip().lower(), } if cgroup: payload["cgroup"] = str(cgroup).strip() if unit: payload["unit"] = str(unit).strip() if command: payload["command"] = str(command).strip() if app_key: payload["app_key"] = str(app_key).strip() if int(timeout_sec or 0) > 0: payload["timeout_sec"] = int(timeout_sec) data = cast( Dict[str, Any], self._json(self._request("POST", "/api/v1/traffic/appmarks", json_body=payload)) or {}, ) return TrafficAppMarksResult( ok=bool(data.get("ok", False)), message=str(data.get("message") or ""), op=str(data.get("op") or payload["op"]), target=str(data.get("target") or payload["target"]), cgroup=str(data.get("cgroup") or payload.get("cgroup") or ""), cgroup_id=int(data.get("cgroup_id", 0) or 0), timeout_sec=int(data.get("timeout_sec", 0) or 0), ) def traffic_app_profiles_list(self) -> List[TrafficAppProfile]: data = cast( Dict[str, Any], self._json(self._request("GET", "/api/v1/traffic/app-profiles")) or {}, ) raw = data.get("profiles") or [] if not isinstance(raw, list): raw = [] out: List[TrafficAppProfile] = [] for it in raw: if not isinstance(it, dict): continue pid = str(it.get("id") or "").strip() if not pid: continue out.append( TrafficAppProfile( id=pid, name=str(it.get("name") or "").strip(), app_key=str(it.get("app_key") or "").strip(), command=str(it.get("command") or "").strip(), target=str(it.get("target") or "").strip().lower(), ttl_sec=int(it.get("ttl_sec", 0) or 0), vpn_profile=str(it.get("vpn_profile") or "").strip(), created_at=str(it.get("created_at") or "").strip(), updated_at=str(it.get("updated_at") or "").strip(), ) ) return out def traffic_app_profile_upsert( self, *, id: str = "", name: str = "", app_key: str = "", command: str, target: str, ttl_sec: int = 0, vpn_profile: str = "", ) -> TrafficAppProfileSaveResult: payload: Dict[str, Any] = { "command": str(command or "").strip(), "target": str(target or "").strip().lower(), } if id: payload["id"] = str(id).strip() if name: payload["name"] = str(name).strip() if app_key: payload["app_key"] = str(app_key).strip() if int(ttl_sec or 0) > 0: payload["ttl_sec"] = int(ttl_sec) if vpn_profile: payload["vpn_profile"] = str(vpn_profile).strip() data = cast( Dict[str, Any], self._json( self._request("POST", "/api/v1/traffic/app-profiles", json_body=payload) ) or {}, ) msg = str(data.get("message") or "") raw = data.get("profiles") or [] if not isinstance(raw, list): raw = [] prof: Optional[TrafficAppProfile] = None if raw and isinstance(raw[0], dict): it = cast(Dict[str, Any], raw[0]) pid = str(it.get("id") or "").strip() if pid: prof = TrafficAppProfile( id=pid, name=str(it.get("name") or "").strip(), app_key=str(it.get("app_key") or "").strip(), command=str(it.get("command") or "").strip(), target=str(it.get("target") or "").strip().lower(), ttl_sec=int(it.get("ttl_sec", 0) or 0), vpn_profile=str(it.get("vpn_profile") or "").strip(), created_at=str(it.get("created_at") or "").strip(), updated_at=str(it.get("updated_at") or "").strip(), ) ok = bool(prof) and (msg.strip().lower() in ("saved", "ok")) if not msg and ok: msg = "saved" return TrafficAppProfileSaveResult(ok=ok, message=msg, profile=prof) def traffic_app_profile_delete(self, id: str) -> CmdResult: pid = str(id or "").strip() if not pid: raise ValueError("missing id") data = cast( Dict[str, Any], self._json( self._request("DELETE", "/api/v1/traffic/app-profiles", params={"id": pid}) ) or {}, ) return CmdResult( ok=bool(data.get("ok", False)), message=str(data.get("message") or ""), exit_code=None, stdout="", stderr="", ) def traffic_audit_get(self) -> TrafficAudit: data = cast( Dict[str, Any], self._json(self._request("GET", "/api/v1/traffic/audit")) or {}, ) raw_issues = data.get("issues") or [] if not isinstance(raw_issues, list): raw_issues = [] return TrafficAudit( ok=bool(data.get("ok", False)), message=strip_ansi(str(data.get("message") or "").strip()), now=str(data.get("now") or "").strip(), pretty=strip_ansi(str(data.get("pretty") or "").strip()), issues=[strip_ansi(str(x)).strip() for x in raw_issues if str(x).strip()], ) def _parse_traffic_mode_status(self, data: Dict[str, Any], *, fallback_mode: str) -> TrafficModeStatus: mode = str(data.get("mode") or fallback_mode or "selective") return TrafficModeStatus( mode=mode, desired_mode=str(data.get("desired_mode") or data.get("mode") or mode), applied_mode=str(data.get("applied_mode") or "direct"), preferred_iface=str(data.get("preferred_iface") or ""), advanced_active=bool(data.get("advanced_active", False)), auto_local_bypass=bool(data.get("auto_local_bypass", True)), auto_local_active=bool(data.get("auto_local_active", False)), ingress_reply_bypass=bool(data.get("ingress_reply_bypass", False)), ingress_reply_active=bool(data.get("ingress_reply_active", False)), bypass_candidates=int(data.get("bypass_candidates", 0) or 0), force_vpn_subnets=[str(x) for x in (data.get("force_vpn_subnets") or []) if str(x).strip()], force_vpn_uids=[str(x) for x in (data.get("force_vpn_uids") or []) if str(x).strip()], force_vpn_cgroups=[str(x) for x in (data.get("force_vpn_cgroups") or []) if str(x).strip()], force_direct_subnets=[str(x) for x in (data.get("force_direct_subnets") or []) if str(x).strip()], force_direct_uids=[str(x) for x in (data.get("force_direct_uids") or []) if str(x).strip()], force_direct_cgroups=[str(x) for x in (data.get("force_direct_cgroups") or []) if str(x).strip()], overrides_applied=int(data.get("overrides_applied", 0) or 0), cgroup_resolved_uids=int(data.get("cgroup_resolved_uids", 0) or 0), cgroup_warning=str(data.get("cgroup_warning") or ""), active_iface=str(data.get("active_iface") or ""), iface_reason=str(data.get("iface_reason") or ""), rule_mark=bool(data.get("rule_mark", False)), rule_full=bool(data.get("rule_full", False)), ingress_rule_present=bool(data.get("ingress_rule_present", False)), ingress_nft_active=bool(data.get("ingress_nft_active", False)), table_default=bool(data.get("table_default", False)), probe_ok=bool(data.get("probe_ok", False)), probe_message=str(data.get("probe_message") or ""), healthy=bool(data.get("healthy", False)), message=str(data.get("message") or ""), )