239 lines
9.7 KiB
Python
239 lines
9.7 KiB
Python
#!/usr/bin/env python3
|
|
from __future__ import annotations
|
|
|
|
from typing import List, Optional
|
|
|
|
from api_client import (
|
|
CmdResult,
|
|
TrafficAppMarkItem,
|
|
TrafficAppMarksResult,
|
|
TrafficAppMarksStatus,
|
|
TrafficAppProfile,
|
|
TrafficAppProfileSaveResult,
|
|
TrafficAudit,
|
|
TrafficCandidates,
|
|
TrafficInterfaces,
|
|
TrafficModeStatus,
|
|
)
|
|
|
|
from .views import TrafficModeView
|
|
|
|
|
|
class TrafficControllerMixin:
|
|
def traffic_mode_view(self) -> TrafficModeView:
|
|
st: TrafficModeStatus = self.client.traffic_mode_get()
|
|
return TrafficModeView(
|
|
desired_mode=(st.desired_mode or st.mode or "selective"),
|
|
applied_mode=(st.applied_mode or "direct"),
|
|
preferred_iface=st.preferred_iface or "",
|
|
advanced_active=bool(st.advanced_active),
|
|
auto_local_bypass=bool(st.auto_local_bypass),
|
|
auto_local_active=bool(st.auto_local_active),
|
|
ingress_reply_bypass=bool(st.ingress_reply_bypass),
|
|
ingress_reply_active=bool(st.ingress_reply_active),
|
|
bypass_candidates=int(st.bypass_candidates),
|
|
force_vpn_subnets=list(st.force_vpn_subnets or []),
|
|
force_vpn_uids=list(st.force_vpn_uids or []),
|
|
force_vpn_cgroups=list(st.force_vpn_cgroups or []),
|
|
force_direct_subnets=list(st.force_direct_subnets or []),
|
|
force_direct_uids=list(st.force_direct_uids or []),
|
|
force_direct_cgroups=list(st.force_direct_cgroups or []),
|
|
overrides_applied=int(st.overrides_applied),
|
|
cgroup_resolved_uids=int(st.cgroup_resolved_uids),
|
|
cgroup_warning=st.cgroup_warning or "",
|
|
active_iface=st.active_iface or "",
|
|
iface_reason=st.iface_reason or "",
|
|
ingress_rule_present=bool(st.ingress_rule_present),
|
|
ingress_nft_active=bool(st.ingress_nft_active),
|
|
probe_ok=bool(st.probe_ok),
|
|
probe_message=st.probe_message or "",
|
|
healthy=bool(st.healthy),
|
|
message=st.message or "",
|
|
)
|
|
|
|
def traffic_mode_set(
|
|
self,
|
|
mode: str,
|
|
preferred_iface: Optional[str] = None,
|
|
auto_local_bypass: Optional[bool] = None,
|
|
ingress_reply_bypass: Optional[bool] = None,
|
|
force_vpn_subnets: Optional[List[str]] = None,
|
|
force_vpn_uids: Optional[List[str]] = None,
|
|
force_vpn_cgroups: Optional[List[str]] = None,
|
|
force_direct_subnets: Optional[List[str]] = None,
|
|
force_direct_uids: Optional[List[str]] = None,
|
|
force_direct_cgroups: Optional[List[str]] = None,
|
|
) -> TrafficModeView:
|
|
st: TrafficModeStatus = self.client.traffic_mode_set(
|
|
mode,
|
|
preferred_iface,
|
|
auto_local_bypass,
|
|
ingress_reply_bypass,
|
|
force_vpn_subnets,
|
|
force_vpn_uids,
|
|
force_vpn_cgroups,
|
|
force_direct_subnets,
|
|
force_direct_uids,
|
|
force_direct_cgroups,
|
|
)
|
|
return TrafficModeView(
|
|
desired_mode=(st.desired_mode or st.mode or mode),
|
|
applied_mode=(st.applied_mode or "direct"),
|
|
preferred_iface=st.preferred_iface or "",
|
|
advanced_active=bool(st.advanced_active),
|
|
auto_local_bypass=bool(st.auto_local_bypass),
|
|
auto_local_active=bool(st.auto_local_active),
|
|
ingress_reply_bypass=bool(st.ingress_reply_bypass),
|
|
ingress_reply_active=bool(st.ingress_reply_active),
|
|
bypass_candidates=int(st.bypass_candidates),
|
|
force_vpn_subnets=list(st.force_vpn_subnets or []),
|
|
force_vpn_uids=list(st.force_vpn_uids or []),
|
|
force_vpn_cgroups=list(st.force_vpn_cgroups or []),
|
|
force_direct_subnets=list(st.force_direct_subnets or []),
|
|
force_direct_uids=list(st.force_direct_uids or []),
|
|
force_direct_cgroups=list(st.force_direct_cgroups or []),
|
|
overrides_applied=int(st.overrides_applied),
|
|
cgroup_resolved_uids=int(st.cgroup_resolved_uids),
|
|
cgroup_warning=st.cgroup_warning or "",
|
|
active_iface=st.active_iface or "",
|
|
iface_reason=st.iface_reason or "",
|
|
ingress_rule_present=bool(st.ingress_rule_present),
|
|
ingress_nft_active=bool(st.ingress_nft_active),
|
|
probe_ok=bool(st.probe_ok),
|
|
probe_message=st.probe_message or "",
|
|
healthy=bool(st.healthy),
|
|
message=st.message or "",
|
|
)
|
|
|
|
def traffic_mode_test(self) -> TrafficModeView:
|
|
st: TrafficModeStatus = self.client.traffic_mode_test()
|
|
return TrafficModeView(
|
|
desired_mode=(st.desired_mode or st.mode or "selective"),
|
|
applied_mode=(st.applied_mode or "direct"),
|
|
preferred_iface=st.preferred_iface or "",
|
|
advanced_active=bool(st.advanced_active),
|
|
auto_local_bypass=bool(st.auto_local_bypass),
|
|
auto_local_active=bool(st.auto_local_active),
|
|
ingress_reply_bypass=bool(st.ingress_reply_bypass),
|
|
ingress_reply_active=bool(st.ingress_reply_active),
|
|
bypass_candidates=int(st.bypass_candidates),
|
|
force_vpn_subnets=list(st.force_vpn_subnets or []),
|
|
force_vpn_uids=list(st.force_vpn_uids or []),
|
|
force_vpn_cgroups=list(st.force_vpn_cgroups or []),
|
|
force_direct_subnets=list(st.force_direct_subnets or []),
|
|
force_direct_uids=list(st.force_direct_uids or []),
|
|
force_direct_cgroups=list(st.force_direct_cgroups or []),
|
|
overrides_applied=int(st.overrides_applied),
|
|
cgroup_resolved_uids=int(st.cgroup_resolved_uids),
|
|
cgroup_warning=st.cgroup_warning or "",
|
|
active_iface=st.active_iface or "",
|
|
iface_reason=st.iface_reason or "",
|
|
ingress_rule_present=bool(st.ingress_rule_present),
|
|
ingress_nft_active=bool(st.ingress_nft_active),
|
|
probe_ok=bool(st.probe_ok),
|
|
probe_message=st.probe_message or "",
|
|
healthy=bool(st.healthy),
|
|
message=st.message or "",
|
|
)
|
|
|
|
def traffic_advanced_reset(self) -> TrafficModeView:
|
|
st: TrafficModeStatus = self.client.traffic_advanced_reset()
|
|
return TrafficModeView(
|
|
desired_mode=(st.desired_mode or st.mode or "selective"),
|
|
applied_mode=(st.applied_mode or "direct"),
|
|
preferred_iface=st.preferred_iface or "",
|
|
advanced_active=bool(st.advanced_active),
|
|
auto_local_bypass=bool(st.auto_local_bypass),
|
|
auto_local_active=bool(st.auto_local_active),
|
|
ingress_reply_bypass=bool(st.ingress_reply_bypass),
|
|
ingress_reply_active=bool(st.ingress_reply_active),
|
|
bypass_candidates=int(st.bypass_candidates),
|
|
force_vpn_subnets=list(st.force_vpn_subnets or []),
|
|
force_vpn_uids=list(st.force_vpn_uids or []),
|
|
force_vpn_cgroups=list(st.force_vpn_cgroups or []),
|
|
force_direct_subnets=list(st.force_direct_subnets or []),
|
|
force_direct_uids=list(st.force_direct_uids or []),
|
|
force_direct_cgroups=list(st.force_direct_cgroups or []),
|
|
overrides_applied=int(st.overrides_applied),
|
|
cgroup_resolved_uids=int(st.cgroup_resolved_uids),
|
|
cgroup_warning=st.cgroup_warning or "",
|
|
active_iface=st.active_iface or "",
|
|
iface_reason=st.iface_reason or "",
|
|
ingress_rule_present=bool(st.ingress_rule_present),
|
|
ingress_nft_active=bool(st.ingress_nft_active),
|
|
probe_ok=bool(st.probe_ok),
|
|
probe_message=st.probe_message or "",
|
|
healthy=bool(st.healthy),
|
|
message=st.message or "",
|
|
)
|
|
|
|
def traffic_interfaces(self) -> List[str]:
|
|
st: TrafficInterfaces = self.client.traffic_interfaces_get()
|
|
vals = [x for x in st.interfaces if x]
|
|
if st.preferred_iface and st.preferred_iface not in vals:
|
|
vals.insert(0, st.preferred_iface)
|
|
return vals
|
|
|
|
def traffic_candidates(self) -> TrafficCandidates:
|
|
return self.client.traffic_candidates_get()
|
|
|
|
def traffic_appmarks_status(self) -> TrafficAppMarksStatus:
|
|
return self.client.traffic_appmarks_status()
|
|
|
|
def traffic_appmarks_items(self) -> List[TrafficAppMarkItem]:
|
|
return self.client.traffic_appmarks_items()
|
|
|
|
def traffic_appmarks_apply(
|
|
self,
|
|
*,
|
|
op: str,
|
|
target: str,
|
|
cgroup: str = "",
|
|
unit: str = "",
|
|
command: str = "",
|
|
app_key: str = "",
|
|
timeout_sec: int = 0,
|
|
) -> TrafficAppMarksResult:
|
|
return self.client.traffic_appmarks_apply(
|
|
op=op,
|
|
target=target,
|
|
cgroup=cgroup,
|
|
unit=unit,
|
|
command=command,
|
|
app_key=app_key,
|
|
timeout_sec=timeout_sec,
|
|
)
|
|
|
|
def traffic_app_profiles_list(self) -> List[TrafficAppProfile]:
|
|
return self.client.traffic_app_profiles_list()
|
|
|
|
def traffic_app_profile_upsert(
|
|
self,
|
|
*,
|
|
id: str = "",
|
|
name: str = "",
|
|
app_key: str = "",
|
|
command: str,
|
|
target: str,
|
|
ttl_sec: int = 0,
|
|
vpn_profile: str = "",
|
|
) -> TrafficAppProfileSaveResult:
|
|
return self.client.traffic_app_profile_upsert(
|
|
id=id,
|
|
name=name,
|
|
app_key=app_key,
|
|
command=command,
|
|
target=target,
|
|
ttl_sec=ttl_sec,
|
|
vpn_profile=vpn_profile,
|
|
)
|
|
|
|
def traffic_app_profile_delete(self, id: str) -> CmdResult:
|
|
return self.client.traffic_app_profile_delete(id)
|
|
|
|
def traffic_audit(self) -> TrafficAudit:
|
|
return self.client.traffic_audit_get()
|
|
|
|
# -------- Transport flow (E4.2 foundation) --------
|
|
|