#!/usr/bin/env python3 """SSE smoke test with active event trigger via /api/v1/trace/append.""" import http.client import json import os import sys import threading import time from urllib.parse import urlparse API_URL = os.environ.get("API_URL", "http://127.0.0.1:8080") TIMEOUT = float(os.environ.get("EVENTS_TIMEOUT_SEC", "12")) EVENT_REQUIRED = "trace_append" def parse_base(api_url: str): parsed = urlparse(api_url) if parsed.scheme != "http": raise ValueError("only http API_URL is supported for this smoke test") host = parsed.hostname or "127.0.0.1" port = parsed.port or 80 base_path = parsed.path.rstrip("/") return host, port, base_path def post_trace_append(host: str, port: int, base_path: str): # Small delay to ensure SSE subscription is active before trigger. time.sleep(1.0) conn = http.client.HTTPConnection(host, port, timeout=8) body = json.dumps({"kind": "gui", "line": f"sse-probe-{int(time.time())}"}) path = f"{base_path}/api/v1/trace/append" conn.request("POST", path, body=body, headers={"Content-Type": "application/json"}) resp = conn.getresponse() payload = resp.read().decode("utf-8", errors="ignore") if resp.status != 200: raise RuntimeError(f"trace/append failed: HTTP {resp.status}, body={payload}") conn.close() def main(): host, port, base_path = parse_base(API_URL) stream_path = f"{base_path}/api/v1/events/stream" conn = http.client.HTTPConnection(host, port, timeout=TIMEOUT) conn.putrequest("GET", stream_path) conn.putheader("Accept", "text/event-stream") conn.putheader("Cache-Control", "no-cache") conn.putheader("Connection", "keep-alive") conn.endheaders() resp = conn.getresponse() if resp.status != 200: print(f"[events] unexpected HTTP {resp.status}", file=sys.stderr) sys.exit(1) content_type = resp.getheader("Content-Type", "") if "text/event-stream" not in content_type: print(f"[events] bad Content-Type: {content_type}", file=sys.stderr) sys.exit(1) trigger_err = [] def trigger(): try: post_trace_append(host, port, base_path) except Exception as exc: trigger_err.append(str(exc)) t = threading.Thread(target=trigger, daemon=True) t.start() got_id = False got_required = False deadline = time.time() + TIMEOUT while time.time() < deadline: raw = resp.readline() if not raw: break line = raw.decode("utf-8", errors="ignore").strip() if line.startswith("id:"): got_id = True if line.startswith("event:"): event = line.split(":", 1)[1].strip() if event == EVENT_REQUIRED: got_required = True print(f"[events] got required event: {event}") break resp.close() conn.close() t.join(timeout=0.5) if trigger_err: print(f"[events] trigger failed: {trigger_err[0]}", file=sys.stderr) sys.exit(1) if not got_id: print("[events] no SSE event id observed", file=sys.stderr) sys.exit(1) if not got_required: print(f"[events] missing required event: {EVENT_REQUIRED}", file=sys.stderr) sys.exit(1) print("[events] stream smoke passed") if __name__ == "__main__": main()