190 lines
6.2 KiB
Python
190 lines
6.2 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
import shutil
|
|
import subprocess
|
|
import textwrap
|
|
import time
|
|
import urllib.request
|
|
from pathlib import Path
|
|
|
|
import cv2
|
|
import numpy as np
|
|
|
|
|
|
ROOT = Path(__file__).resolve().parents[1]
|
|
DEMO_DIR = ROOT / "storage" / "demos"
|
|
SAMPLE_VIDEO = ROOT / "storage" / "samples" / "synthetic_guidewire.mp4"
|
|
HOME_SCREENSHOT = DEMO_DIR / "01_home.png"
|
|
RESPONSE_JSON = DEMO_DIR / "latest_demo_response.json"
|
|
OUTPUT_VIDEO = DEMO_DIR / "isiseg_usage_demo.mp4"
|
|
BASE_URL = "http://127.0.0.1:8001"
|
|
|
|
|
|
def run(command: list[str], timeout: int = 120) -> subprocess.CompletedProcess[str]:
|
|
return subprocess.run(
|
|
command,
|
|
cwd=ROOT,
|
|
check=True,
|
|
text=True,
|
|
capture_output=True,
|
|
timeout=timeout,
|
|
)
|
|
|
|
|
|
def wait_for_service() -> None:
|
|
deadline = time.time() + 20
|
|
while time.time() < deadline:
|
|
try:
|
|
with urllib.request.urlopen(f"{BASE_URL}/api/health", timeout=2) as response:
|
|
if response.status == 200:
|
|
return
|
|
except OSError:
|
|
time.sleep(0.5)
|
|
raise RuntimeError(f"ISISeg service is not reachable at {BASE_URL}")
|
|
|
|
|
|
def ensure_sample() -> None:
|
|
if not SAMPLE_VIDEO.exists():
|
|
run(["bash", "scripts/generate_sample.sh"])
|
|
|
|
|
|
def capture_homepage() -> None:
|
|
chrome = shutil.which("google-chrome") or shutil.which("chromium") or shutil.which("chromium-browser")
|
|
if not chrome:
|
|
raise RuntimeError("Chrome/Chromium is required to capture the UI screenshot")
|
|
run(
|
|
[
|
|
chrome,
|
|
"--headless=new",
|
|
"--no-sandbox",
|
|
"--disable-gpu",
|
|
"--hide-scrollbars",
|
|
"--window-size=1440,1000",
|
|
f"--screenshot={HOME_SCREENSHOT}",
|
|
BASE_URL,
|
|
],
|
|
timeout=60,
|
|
)
|
|
|
|
|
|
def call_segmentation() -> dict:
|
|
command = [
|
|
"curl",
|
|
"-s",
|
|
"-X",
|
|
"POST",
|
|
"-F",
|
|
f"file=@{SAMPLE_VIDEO}",
|
|
"-F",
|
|
"method=fusion",
|
|
"-F",
|
|
"sensitivity=0.68",
|
|
"-F",
|
|
"frame_stride=12",
|
|
"-F",
|
|
"max_frames=4",
|
|
f"{BASE_URL}/api/segment",
|
|
]
|
|
completed = run(command, timeout=180)
|
|
payload = json.loads(completed.stdout)
|
|
RESPONSE_JSON.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")
|
|
return payload
|
|
|
|
|
|
def fit_image(image: np.ndarray, box_w: int, box_h: int) -> np.ndarray:
|
|
h, w = image.shape[:2]
|
|
scale = min(box_w / w, box_h / h)
|
|
resized = cv2.resize(image, (int(w * scale), int(h * scale)), interpolation=cv2.INTER_AREA)
|
|
canvas = np.full((box_h, box_w, 3), 18, dtype=np.uint8)
|
|
y = (box_h - resized.shape[0]) // 2
|
|
x = (box_w - resized.shape[1]) // 2
|
|
canvas[y : y + resized.shape[0], x : x + resized.shape[1]] = resized
|
|
return canvas
|
|
|
|
|
|
def put_lines(frame: np.ndarray, lines: list[str], x: int, y: int, size: float = 0.82, color=(238, 245, 239)) -> None:
|
|
for line in lines:
|
|
cv2.putText(frame, line, (x, y), cv2.FONT_HERSHEY_SIMPLEX, size, color, 2, cv2.LINE_AA)
|
|
y += int(34 * size / 0.82)
|
|
|
|
|
|
def wrap(text: str, width: int = 44) -> list[str]:
|
|
return textwrap.wrap(text, width=width, break_long_words=False)
|
|
|
|
|
|
def slate(title: str, subtitle: str, width: int = 1280, height: int = 720) -> np.ndarray:
|
|
frame = np.zeros((height, width, 3), dtype=np.uint8)
|
|
frame[:, :] = (18, 20, 18)
|
|
cv2.rectangle(frame, (0, 0), (width, height), (48, 64, 56), 18)
|
|
cv2.line(frame, (70, 90), (1210, 90), (102, 209, 185), 3)
|
|
cv2.putText(frame, title, (70, 185), cv2.FONT_HERSHEY_SIMPLEX, 1.35, (255, 209, 102), 3, cv2.LINE_AA)
|
|
put_lines(frame, wrap(subtitle, 58), 74, 260, size=0.82, color=(226, 234, 226))
|
|
cv2.putText(frame, BASE_URL, (74, 640), cv2.FONT_HERSHEY_SIMPLEX, 0.78, (102, 209, 185), 2, cv2.LINE_AA)
|
|
return frame
|
|
|
|
|
|
def screenshot_slide() -> np.ndarray:
|
|
image = cv2.imread(str(HOME_SCREENSHOT), cv2.IMREAD_COLOR)
|
|
frame = slate("1. Open Web Console", "Browser opens the ISISeg guidewire segmentation workspace.")
|
|
fitted = fit_image(image, 1120, 500)
|
|
frame[160:660, 80:1200] = fitted
|
|
return frame
|
|
|
|
|
|
def result_slide(payload: dict, frame_payload: dict, index: int) -> np.ndarray:
|
|
frame = slate(
|
|
f"2.{index} Segmentation Result",
|
|
"The demo uploads the synthetic guidewire video and renders real fusion masks returned by the API.",
|
|
)
|
|
overlay = cv2.imread(str(ROOT / frame_payload["overlay_url"].lstrip("/")), cv2.IMREAD_COLOR)
|
|
mask = cv2.imread(str(ROOT / frame_payload["mask_url"].lstrip("/")), cv2.IMREAD_GRAYSCALE)
|
|
mask_color = cv2.cvtColor(mask, cv2.COLOR_GRAY2BGR)
|
|
frame[178:598, 75:635] = fit_image(overlay, 560, 420)
|
|
frame[178:598, 645:1205] = fit_image(mask_color, 560, 420)
|
|
metrics = frame_payload["metrics"]
|
|
lines = [
|
|
f"job: {payload['job_id']} frame: {frame_payload['frame_index']}",
|
|
f"coverage: {metrics['coverage'] * 100:.3f}% skeleton: {metrics['skeleton_length']} components: {metrics['components']}",
|
|
]
|
|
put_lines(frame, lines, 75, 650, size=0.62, color=(238, 245, 239))
|
|
return frame
|
|
|
|
|
|
def write_video(slides: list[np.ndarray]) -> None:
|
|
writer = cv2.VideoWriter(str(OUTPUT_VIDEO), cv2.VideoWriter_fourcc(*"mp4v"), 24.0, (1280, 720))
|
|
for slide in slides:
|
|
for _ in range(72):
|
|
writer.write(slide)
|
|
writer.release()
|
|
|
|
|
|
def main() -> None:
|
|
DEMO_DIR.mkdir(parents=True, exist_ok=True)
|
|
wait_for_service()
|
|
ensure_sample()
|
|
capture_homepage()
|
|
payload = call_segmentation()
|
|
slides = [
|
|
slate(
|
|
"ISISeg Usage Demo",
|
|
"A runnable Web system for guidewire segmentation: open the console, upload a fluoroscopy-like video, choose fusion mode, and inspect overlay/mask outputs.",
|
|
),
|
|
screenshot_slide(),
|
|
]
|
|
for index, frame_payload in enumerate(payload["frames"][:4], start=1):
|
|
slides.append(result_slide(payload, frame_payload, index))
|
|
slides.append(
|
|
slate(
|
|
"Ready to Use",
|
|
f"Demo output video and API artifacts are stored under storage/demos and storage/jobs. Overlay video URL: {payload.get('video_url')}",
|
|
)
|
|
)
|
|
write_video(slides)
|
|
print(OUTPUT_VIDEO)
|
|
print(RESPONSE_JSON)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|