#!/usr/bin/env python3 from __future__ import annotations from dataclasses import replace import json from typing import Any, Dict, List, Optional, cast from api_client import ( ApiError, CmdResult, SingBoxProfile, SingBoxProfileApplyResult, SingBoxProfileHistoryResult, SingBoxProfileIssue, SingBoxProfileRenderResult, SingBoxProfileRollbackResult, SingBoxProfilesState, SingBoxProfileValidateResult, TransportCapabilities, TransportClient, TransportClientActionResult, TransportClientHealthSnapshot, TransportInterfacesSnapshot, TransportConflict, TransportConflicts, TransportHealthRefreshResult, TransportNetnsToggleResult, TransportOwnerLocksClearResult, TransportOwnerLocksSnapshot, TransportOwnershipSnapshot, TransportPolicy, TransportPolicyApplyResult, TransportPolicyIntent, TransportPolicyValidateResult, ) from .views import ActionView, TransportClientAction, TransportFlowPhase, TransportPolicyFlowView class TransportControllerMixin: def transport_clients( self, enabled_only: bool = False, kind: str = "", include_virtual: bool = False, ) -> List[TransportClient]: return self.client.transport_clients_get( enabled_only=enabled_only, kind=kind, include_virtual=include_virtual, ) def transport_interfaces(self) -> TransportInterfacesSnapshot: return self.client.transport_interfaces_get() def transport_health_refresh( self, *, client_ids: Optional[List[str]] = None, force: bool = False, ) -> TransportHealthRefreshResult: return self.client.transport_health_refresh(client_ids=client_ids, force=force) def transport_client_health(self, client_id: str) -> TransportClientHealthSnapshot: return self.client.transport_client_health_get(client_id) def transport_client_create_action( self, *, client_id: str, kind: str, name: str = "", enabled: bool = True, config: Optional[Dict[str, Any]] = None, ) -> ActionView: cid = str(client_id or "").strip() if not cid: raise ValueError("missing transport client id") res: CmdResult = self.client.transport_client_create( client_id=cid, kind=str(kind or "").strip().lower(), name=name, enabled=enabled, config=config, ) msg = res.message or "client create completed" return ActionView(ok=bool(res.ok), pretty_text=f"create {cid}: {msg}") def transport_client_action(self, client_id: str, action: TransportClientAction) -> ActionView: res: TransportClientActionResult = self.client.transport_client_action(client_id, action) status_bits = [] before = (res.status_before or "").strip() after = (res.status_after or "").strip() if before or after: status_bits.append(f"status {before or '-'} -> {after or '-'}") if res.code: status_bits.append(f"code={res.code}") if res.last_error: status_bits.append(f"last_error={res.last_error}") extra = f" ({'; '.join(status_bits)})" if status_bits else "" msg = res.message or f"{res.action} completed" return ActionView(ok=bool(res.ok), pretty_text=f"{res.action} {res.client_id}: {msg}{extra}") def transport_client_patch_action( self, client_id: str, *, name: Optional[str] = None, enabled: Optional[bool] = None, config: Optional[Dict[str, Any]] = None, ) -> ActionView: cid = str(client_id or "").strip() if not cid: raise ValueError("missing transport client id") res: CmdResult = self.client.transport_client_patch( cid, name=name, enabled=enabled, config=config, ) msg = res.message or "client patch completed" return ActionView(ok=bool(res.ok), pretty_text=f"patch {cid}: {msg}") def transport_client_delete_action( self, client_id: str, *, force: bool = False, cleanup: bool = True, ) -> ActionView: cid = str(client_id or "").strip() if not cid: raise ValueError("missing transport client id") res: CmdResult = self.client.transport_client_delete(cid, force=force, cleanup=cleanup) msg = res.message or "delete completed" return ActionView(ok=bool(res.ok), pretty_text=f"delete {cid}: {msg}") def transport_netns_toggle( self, *, enabled: Optional[bool] = None, client_ids: Optional[List[str]] = None, provision: bool = True, restart_running: bool = True, ) -> TransportNetnsToggleResult: ids = [ str(x).strip() for x in (client_ids or []) if str(x).strip() ] if client_ids is not None else None return self.client.transport_netns_toggle( enabled=enabled, client_ids=ids, provision=provision, restart_running=restart_running, ) def transport_policy_rollback_action(self, base_revision: int = 0) -> ActionView: base = int(base_revision or 0) if base <= 0: base = int(self.client.transport_policy_get().revision or 0) res: TransportPolicyApplyResult = self.client.transport_policy_rollback(base_revision=base) if res.ok: msg = res.message or "policy rollback applied" bits = [f"revision={int(res.policy_revision or 0)}"] if res.apply_id: bits.append(f"apply_id={res.apply_id}") return ActionView(ok=True, pretty_text=f"{msg} ({', '.join(bits)})") msg = res.message or "policy rollback failed" if res.code: msg = f"{msg} (code={res.code})" return ActionView(ok=False, pretty_text=msg) def transport_policy(self) -> TransportPolicy: return self.client.transport_policy_get() def transport_ownership(self) -> TransportOwnershipSnapshot: return self.client.transport_ownership_get() def transport_owner_locks(self) -> TransportOwnerLocksSnapshot: return self.client.transport_owner_locks_get() def transport_owner_locks_clear( self, *, base_revision: int = 0, client_id: str = "", destination_ip: str = "", destination_ips: Optional[List[str]] = None, confirm_token: str = "", ) -> TransportOwnerLocksClearResult: return self.client.transport_owner_locks_clear( base_revision=int(base_revision or 0), client_id=str(client_id or "").strip(), destination_ip=str(destination_ip or "").strip(), destination_ips=[ str(x).strip() for x in list(destination_ips or []) if str(x).strip() ], confirm_token=str(confirm_token or "").strip(), ) def transport_owner_locks_clear_action( self, *, base_revision: int = 0, client_id: str = "", destination_ip: str = "", destination_ips: Optional[List[str]] = None, confirm_token: str = "", ) -> ActionView: res = self.transport_owner_locks_clear( base_revision=base_revision, client_id=client_id, destination_ip=destination_ip, destination_ips=destination_ips, confirm_token=confirm_token, ) bits: List[str] = [] if res.code: bits.append(f"code={res.code}") bits.append(f"match={int(res.match_count)}") bits.append(f"cleared={int(res.cleared_count)}") bits.append(f"remaining={int(res.remaining_count)}") msg = (res.message or "owner-lock clear").strip() return ActionView(ok=bool(res.ok), pretty_text=f"{msg} ({', '.join(bits)})") def transport_conflicts(self) -> TransportConflicts: return self.client.transport_conflicts_get() def transport_capabilities(self) -> TransportCapabilities: return self.client.transport_capabilities_get() def transport_flow_draft( self, intents: Optional[List[TransportPolicyIntent]] = None, *, base_revision: int = 0, ) -> TransportPolicyFlowView: pol = self.client.transport_policy_get() rev = int(base_revision) if int(base_revision or 0) > 0 else int(pol.revision) return TransportPolicyFlowView( phase="draft", intents=list(intents) if intents is not None else list(pol.intents), base_revision=rev, current_revision=int(pol.revision), applied_revision=0, confirm_token="", valid=False, block_count=0, warn_count=0, diff_added=0, diff_changed=0, diff_removed=0, conflicts=[], apply_id="", rollback_available=False, message="draft ready", code="", ) def transport_flow_update_draft( self, flow: TransportPolicyFlowView, intents: List[TransportPolicyIntent], *, base_revision: int = 0, ) -> TransportPolicyFlowView: rev = int(base_revision) if int(base_revision or 0) > 0 else int(flow.current_revision or flow.base_revision) return replace( flow, phase="draft", intents=list(intents), base_revision=rev, applied_revision=0, confirm_token="", valid=False, block_count=0, warn_count=0, diff_added=0, diff_changed=0, diff_removed=0, conflicts=[], apply_id="", rollback_available=False, message="draft updated", code="", ) def transport_flow_validate( self, flow: TransportPolicyFlowView, *, allow_warnings: bool = True, ) -> TransportPolicyFlowView: res: TransportPolicyValidateResult = self.client.transport_policy_validate( base_revision=int(flow.base_revision or 0), intents=list(flow.intents), allow_warnings=allow_warnings, force_override=False, ) phase: TransportFlowPhase = "validated" if not res.valid or int(res.summary.block_count) > 0: phase = "risky" return replace( flow, phase=phase, base_revision=int(res.base_revision or flow.base_revision), current_revision=int(res.base_revision or flow.current_revision), confirm_token=res.confirm_token, valid=bool(res.valid), block_count=int(res.summary.block_count), warn_count=int(res.summary.warn_count), diff_added=int(res.diff.added), diff_changed=int(res.diff.changed), diff_removed=int(res.diff.removed), conflicts=list(res.conflicts or []), apply_id="", rollback_available=False, message=res.message or ("validated" if phase == "validated" else "blocking conflicts found"), code=res.code or "", ) def transport_flow_confirm(self, flow: TransportPolicyFlowView) -> TransportPolicyFlowView: if flow.phase != "risky": raise ValueError("confirm step is allowed only after risky validate") if not flow.confirm_token: raise ValueError("missing confirm token; run validate again") return replace( flow, phase="confirm", message="force apply requires explicit confirmation", code="FORCE_CONFIRM_REQUIRED", ) def transport_flow_apply( self, flow: TransportPolicyFlowView, *, force_override: bool = False, ) -> TransportPolicyFlowView: if flow.phase == "draft": return replace( flow, message="policy must be validated before apply", code="VALIDATE_REQUIRED", ) if flow.phase == "risky" and not force_override: return replace( flow, message="policy has blocking conflicts; open confirm step", code="POLICY_CONFLICT_BLOCK", ) if force_override and flow.phase != "confirm": return replace( flow, phase="risky", message="force apply requires confirm state", code="FORCE_CONFIRM_REQUIRED", ) if force_override and not flow.confirm_token: return replace( flow, phase="risky", message="confirm token is missing or expired; run validate again", code="FORCE_OVERRIDE_CONFIRM_REQUIRED", ) res: TransportPolicyApplyResult = self.client.transport_policy_apply( base_revision=int(flow.base_revision), intents=list(flow.intents), force_override=bool(force_override), confirm_token=flow.confirm_token if force_override else "", ) return self._transport_flow_from_apply_result(flow, res) def transport_flow_rollback(self, flow: TransportPolicyFlowView) -> TransportPolicyFlowView: base = int(flow.current_revision or flow.base_revision) res: TransportPolicyApplyResult = self.client.transport_policy_rollback(base_revision=base) return self._transport_flow_from_apply_result(flow, res) def _transport_flow_from_apply_result( self, flow: TransportPolicyFlowView, res: TransportPolicyApplyResult, ) -> TransportPolicyFlowView: if res.ok: pol = self.client.transport_policy_get() applied_rev = int(res.policy_revision or pol.revision) return TransportPolicyFlowView( phase="applied", intents=list(pol.intents), base_revision=applied_rev, current_revision=applied_rev, applied_revision=applied_rev, confirm_token="", valid=True, block_count=0, warn_count=0, diff_added=0, diff_changed=0, diff_removed=0, conflicts=[], apply_id=res.apply_id or "", rollback_available=bool(res.rollback_available), message=res.message or "policy applied", code=res.code or "", ) if res.code == "POLICY_REVISION_MISMATCH": current_rev = int(res.current_revision or 0) if current_rev <= 0: current_rev = int(self.client.transport_policy_get().revision) return replace( flow, phase="draft", base_revision=current_rev, current_revision=current_rev, confirm_token="", valid=False, message="policy revision changed; validate again", code=res.code, ) if res.code in ("POLICY_CONFLICT_BLOCK", "FORCE_OVERRIDE_CONFIRM_REQUIRED"): conflicts = list(res.conflicts or flow.conflicts) block_count = len([x for x in conflicts if (x.severity or "").strip().lower() == "block"]) return replace( flow, phase="risky", valid=False, block_count=block_count, conflicts=conflicts, message=res.message or "blocking conflicts", code=res.code, ) return replace( flow, phase="error", valid=False, message=res.message or "transport apply failed", code=res.code or "TRANSPORT_APPLY_ERROR", ) def singbox_profile_id_for_client(self, client: Optional[TransportClient]) -> str: if client is None: return "" cfg = getattr(client, "config", {}) or {} if isinstance(cfg, dict): for key in ("profile_id", "singbox_profile_id", "profile"): v = str(cfg.get(key) or "").strip() if v: return v return str(getattr(client, "id", "") or "").strip() def singbox_profile_ensure_linked( self, client: TransportClient, *, preferred_profile_id: str = "", ) -> ActionView: pid, state = self._ensure_singbox_profile_for_client( client, preferred_profile_id=str(preferred_profile_id or "").strip(), ) cid = str(getattr(client, "id", "") or "").strip() if state == "created": return ActionView(ok=True, pretty_text=f"profile {pid} created and linked to {cid}") if state == "linked": return ActionView(ok=True, pretty_text=f"profile {pid} linked to {cid}") return ActionView(ok=True, pretty_text=f"profile {pid} already linked to {cid}") def singbox_profile_validate_action( self, profile_id: str, *, check_binary: Optional[bool] = None, client: Optional[TransportClient] = None, ) -> ActionView: pid = self._resolve_singbox_profile_id(profile_id, client) res: SingBoxProfileValidateResult = self.client.transport_singbox_profile_validate( pid, check_binary=check_binary, ) ok = bool(res.ok and res.valid) if ok: msg = res.message or "profile is valid" return ActionView(ok=True, pretty_text=self._format_singbox_profile_action("validate", pid, msg, res)) msg = res.message or "profile validation failed" return ActionView(ok=False, pretty_text=self._format_singbox_profile_action("validate", pid, msg, res)) def singbox_profile_render_preview_action( self, profile_id: str, *, check_binary: Optional[bool] = None, persist: bool = False, client: Optional[TransportClient] = None, ) -> ActionView: pid = self._resolve_singbox_profile_id(profile_id, client) res: SingBoxProfileRenderResult = self.client.transport_singbox_profile_render( pid, check_binary=check_binary, persist=bool(persist), ) ok = bool(res.ok and res.valid) if ok: msg = res.message or ("rendered" if persist else "render preview ready") return ActionView(ok=True, pretty_text=self._format_singbox_profile_action("render", pid, msg, res)) msg = res.message or "render failed" return ActionView(ok=False, pretty_text=self._format_singbox_profile_action("render", pid, msg, res)) def singbox_profile_apply_action( self, profile_id: str, *, client_id: str = "", restart: Optional[bool] = True, skip_runtime: bool = False, check_binary: Optional[bool] = None, client: Optional[TransportClient] = None, ) -> ActionView: pid = self._resolve_singbox_profile_id(profile_id, client) cid = str(client_id or "").strip() if not cid and client is not None: cid = str(getattr(client, "id", "") or "").strip() res: SingBoxProfileApplyResult = self.client.transport_singbox_profile_apply( pid, client_id=cid, restart=restart, skip_runtime=skip_runtime, check_binary=check_binary, ) if res.ok: msg = res.message or "profile applied" return ActionView(ok=True, pretty_text=self._format_singbox_profile_action("apply", pid, msg, res)) msg = res.message or "profile apply failed" return ActionView(ok=False, pretty_text=self._format_singbox_profile_action("apply", pid, msg, res)) def singbox_profile_rollback_action( self, profile_id: str, *, client_id: str = "", restart: Optional[bool] = True, skip_runtime: bool = False, history_id: str = "", client: Optional[TransportClient] = None, ) -> ActionView: pid = self._resolve_singbox_profile_id(profile_id, client) cid = str(client_id or "").strip() if not cid and client is not None: cid = str(getattr(client, "id", "") or "").strip() res: SingBoxProfileRollbackResult = self.client.transport_singbox_profile_rollback( pid, client_id=cid, history_id=history_id, restart=restart, skip_runtime=skip_runtime, ) if res.ok: msg = res.message or "profile rollback applied" return ActionView(ok=True, pretty_text=self._format_singbox_profile_action("rollback", pid, msg, res)) msg = res.message or "profile rollback failed" return ActionView(ok=False, pretty_text=self._format_singbox_profile_action("rollback", pid, msg, res)) def singbox_profile_history_lines( self, profile_id: str, *, limit: int = 20, client: Optional[TransportClient] = None, ) -> List[str]: pid = self._resolve_singbox_profile_id(profile_id, client) res: SingBoxProfileHistoryResult = self.client.transport_singbox_profile_history(pid, limit=limit) lines: List[str] = [] for it in list(res.items or []): lines.append(self._format_singbox_history_line(it)) return lines def singbox_profile_get_for_client( self, client: TransportClient, *, profile_id: str = "", ) -> SingBoxProfile: pid = self._resolve_singbox_profile_id(profile_id, client) return self.client.transport_singbox_profile_get(pid) def singbox_profile_save_raw_for_client( self, client: TransportClient, *, profile_id: str = "", name: str = "", enabled: bool = True, protocol: str = "vless", raw_config: Optional[Dict[str, Any]] = None, ) -> ActionView: pid = self._resolve_singbox_profile_id(profile_id, client) current = self.client.transport_singbox_profile_get(pid) snap = self.client.transport_singbox_profile_patch( pid, base_revision=int(current.profile_revision or 0), name=(str(name or "").strip() or current.name or pid), enabled=bool(enabled), protocol=(str(protocol or "").strip().lower() or "vless"), mode="raw", raw_config=cast(Dict[str, Any], raw_config or {}), ) item = snap.item if item is None: return ActionView(ok=False, pretty_text=f"save profile {pid}: backend returned empty item") return ActionView( ok=True, pretty_text=( f"save profile {pid}: revision={int(item.profile_revision or 0)} " f"render_revision={int(item.render_revision or 0)}" ), ) def _format_singbox_profile_action( self, action: str, profile_id: str, message: str, res: SingBoxProfileValidateResult | SingBoxProfileRenderResult | SingBoxProfileApplyResult | SingBoxProfileRollbackResult, ) -> str: bits: List[str] = [] if getattr(res, "code", ""): bits.append(f"code={str(getattr(res, 'code', '')).strip()}") rev = int(getattr(res, "profile_revision", 0) or 0) if rev > 0: bits.append(f"rev={rev}") diff = getattr(res, "diff", None) if diff is not None: added = int(getattr(diff, "added", 0) or 0) changed = int(getattr(diff, "changed", 0) or 0) removed = int(getattr(diff, "removed", 0) or 0) bits.append(f"diff=+{added}/~{changed}/-{removed}") render_digest = str(getattr(res, "render_digest", "") or "").strip() if render_digest: bits.append(f"digest={render_digest[:12]}") client_id = str(getattr(res, "client_id", "") or "").strip() if client_id: bits.append(f"client={client_id}") config_path = str(getattr(res, "config_path", "") or "").strip() if config_path: bits.append(f"config={config_path}") history_id = str(getattr(res, "history_id", "") or "").strip() if history_id: bits.append(f"history={history_id}") render_path = str(getattr(res, "render_path", "") or "").strip() if render_path: bits.append(f"render={render_path}") render_revision = int(getattr(res, "render_revision", 0) or 0) if render_revision > 0: bits.append(f"render_rev={render_revision}") rollback_available = bool(getattr(res, "rollback_available", False)) if rollback_available: bits.append("rollback=available") errors = cast(List[SingBoxProfileIssue], list(getattr(res, "errors", []) or [])) warnings = cast(List[SingBoxProfileIssue], list(getattr(res, "warnings", []) or [])) if warnings: bits.append(f"warnings={len(warnings)}") if errors: bits.append(f"errors={len(errors)}") first = self._format_singbox_issue_brief(errors[0]) if first: bits.append(f"first_error={first}") tail = f" ({'; '.join(bits)})" if bits else "" return f"{action} profile {profile_id}: {message}{tail}" def _format_singbox_history_line(self, it) -> str: at = str(getattr(it, "at", "") or "").strip() or "-" action = str(getattr(it, "action", "") or "").strip() or "event" status = str(getattr(it, "status", "") or "").strip() or "unknown" msg = str(getattr(it, "message", "") or "").strip() code = str(getattr(it, "code", "") or "").strip() digest = str(getattr(it, "render_digest", "") or "").strip() client_id = str(getattr(it, "client_id", "") or "").strip() bits: List[str] = [] if code: bits.append(f"code={code}") if client_id: bits.append(f"client={client_id}") if digest: bits.append(f"digest={digest[:12]}") tail = f" ({'; '.join(bits)})" if bits else "" body = msg or "-" return f"{at} | {action} | {status} | {body}{tail}" def _resolve_singbox_profile_id(self, profile_id: str, client: Optional[TransportClient]) -> str: pid = str(profile_id or "").strip() if client is not None: ensured_pid, _ = self._ensure_singbox_profile_for_client(client, preferred_profile_id=pid) pid = ensured_pid if not pid: raise ValueError("missing singbox profile id") return pid def _ensure_singbox_profile_for_client( self, client: TransportClient, *, preferred_profile_id: str = "", ) -> tuple[str, str]: cid = str(getattr(client, "id", "") or "").strip() if not cid: raise ValueError("missing transport client id") pid = str(preferred_profile_id or "").strip() if not pid: pid = self.singbox_profile_id_for_client(client) if not pid: raise ValueError("cannot resolve singbox profile id for selected client") try: cur = self.client.transport_singbox_profile_get(pid) except ApiError as e: if int(getattr(e, "status_code", 0) or 0) != 404: raise raw_cfg = self._load_singbox_raw_config_from_client(client) protocol = self._infer_singbox_protocol(client, raw_cfg) snap: SingBoxProfilesState = self.client.transport_singbox_profile_create( profile_id=pid, name=str(getattr(client, "name", "") or "").strip() or pid, mode="raw", protocol=protocol, raw_config=raw_cfg, meta={"client_id": cid}, enabled=True, ) created = snap.item if created is None: raise RuntimeError("profile create returned empty item") return str(created.id or pid).strip(), "created" meta = dict(cur.meta or {}) if str(meta.get("client_id") or "").strip() == cid: return pid, "ok" meta["client_id"] = cid snap = self.client.transport_singbox_profile_patch( pid, base_revision=int(cur.profile_revision or 0), meta=meta, ) if snap.item is not None: pid = str(snap.item.id or pid).strip() return pid, "linked" def _load_singbox_raw_config_from_client(self, client: TransportClient) -> dict: cfg = getattr(client, "config", {}) or {} if not isinstance(cfg, dict): return {} path = str(cfg.get("config_path") or "").strip() if not path: return {} try: with open(path, "r", encoding="utf-8") as f: parsed = json.load(f) if isinstance(parsed, dict): return cast(dict, parsed) except Exception: return {} return {} def _infer_singbox_protocol(self, client: TransportClient, raw_cfg: dict) -> str: cfg = getattr(client, "config", {}) or {} if isinstance(cfg, dict): p = str(cfg.get("protocol") or "").strip().lower() if p: return p if isinstance(raw_cfg, dict): outbounds = raw_cfg.get("outbounds") or [] if isinstance(outbounds, list): for row in outbounds: if not isinstance(row, dict): continue t = str(row.get("type") or "").strip().lower() if not t: continue if t in ("direct", "block", "dns"): continue return t return "vless" def _format_singbox_issue_brief(self, issue: SingBoxProfileIssue) -> str: code = str(getattr(issue, "code", "") or "").strip() field = str(getattr(issue, "field", "") or "").strip() message = str(getattr(issue, "message", "") or "").strip() parts = [x for x in (code, field, message) if x] if not parts: return "" out = ": ".join(parts[:2]) if len(parts) > 1 else parts[0] if len(parts) > 2: out = f"{out}: {parts[2]}" return out if len(out) <= 140 else out[:137] + "..."