"""SAM 2 engine wrapper with lazy loading and explicit runtime status.""" import logging import os from typing import Optional import numpy as np from config import settings logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Attempt to import PyTorch and SAM 2; fall back to stubs if unavailable. # --------------------------------------------------------------------------- try: import torch TORCH_AVAILABLE = True except Exception as exc: # noqa: BLE001 TORCH_AVAILABLE = False torch = None # type: ignore[assignment] logger.warning("PyTorch import failed (%s). SAM2 will be unavailable.", exc) try: from sam2.build_sam import build_sam2 from sam2.build_sam import build_sam2_video_predictor from sam2.sam2_image_predictor import SAM2ImagePredictor SAM2_AVAILABLE = True logger.info("SAM2 library imported successfully.") except Exception as exc: # noqa: BLE001 SAM2_AVAILABLE = False logger.warning("SAM2 import failed (%s). Using stub engine.", exc) class SAM2Engine: """Lazy-loaded SAM 2 inference engine.""" def __init__(self) -> None: self._predictor: Optional[SAM2ImagePredictor] = None self._video_predictor = None self._model_loaded = False self._video_model_loaded = False self._loaded_device: str | None = None self._last_error: str | None = None self._video_last_error: str | None = None # ----------------------------------------------------------------------- # Internal helpers # ----------------------------------------------------------------------- def _load_model(self) -> None: """Load the SAM 2 model and predictor on first use.""" if self._model_loaded: return if not TORCH_AVAILABLE: self._last_error = "PyTorch is not installed." logger.warning("PyTorch not available; skipping SAM2 model load.") self._model_loaded = True return if not SAM2_AVAILABLE: self._last_error = "sam2 package is not installed." logger.warning("SAM2 not available; skipping model load.") self._model_loaded = True return if not os.path.isfile(settings.sam_model_path): self._last_error = f"SAM2 checkpoint not found: {settings.sam_model_path}" logger.error("SAM checkpoint not found at %s", settings.sam_model_path) self._model_loaded = True return try: device = self._best_device() model = build_sam2( settings.sam_model_config, settings.sam_model_path, device=device, ) self._predictor = SAM2ImagePredictor(model) self._model_loaded = True self._loaded_device = device self._last_error = None logger.info("SAM 2 model loaded from %s on %s", settings.sam_model_path, device) except Exception as exc: # noqa: BLE001 self._last_error = str(exc) logger.error("Failed to load SAM 2 model: %s", exc) self._model_loaded = True # Prevent repeated load attempts def _load_video_model(self) -> None: """Load the SAM 2 video predictor on first propagation use.""" if self._video_model_loaded: return if not TORCH_AVAILABLE: self._video_last_error = "PyTorch is not installed." self._video_model_loaded = True return if not SAM2_AVAILABLE: self._video_last_error = "sam2 package is not installed." self._video_model_loaded = True return if not os.path.isfile(settings.sam_model_path): self._video_last_error = f"SAM2 checkpoint not found: {settings.sam_model_path}" self._video_model_loaded = True return try: device = self._best_device() self._video_predictor = build_sam2_video_predictor( settings.sam_model_config, settings.sam_model_path, device=device, ) self._video_model_loaded = True self._loaded_device = device self._video_last_error = None logger.info("SAM 2 video predictor loaded from %s on %s", settings.sam_model_path, device) except Exception as exc: # noqa: BLE001 self._video_last_error = str(exc) self._video_model_loaded = True logger.error("Failed to load SAM 2 video predictor: %s", exc) def _best_device(self) -> str: if TORCH_AVAILABLE and torch is not None and torch.cuda.is_available(): return "cuda" return "cpu" def _ensure_ready(self) -> bool: """Ensure the model is loaded; return whether it is usable.""" self._load_model() return SAM2_AVAILABLE and self._predictor is not None def _ensure_video_ready(self) -> bool: """Ensure the video predictor is loaded; return whether it is usable.""" self._load_video_model() return SAM2_AVAILABLE and self._video_predictor is not None def status(self) -> dict: """Return lightweight, real runtime status without forcing model load.""" checkpoint_exists = os.path.isfile(settings.sam_model_path) device = self._loaded_device or self._best_device() available = bool(TORCH_AVAILABLE and SAM2_AVAILABLE and checkpoint_exists) if self._predictor is not None: message = "SAM 2 model loaded and ready." elif available: message = "SAM 2 dependencies and checkpoint are present; model will load on first inference." else: missing = [] if not TORCH_AVAILABLE: missing.append("PyTorch") if not SAM2_AVAILABLE: missing.append("sam2 package") if not checkpoint_exists: missing.append("checkpoint") message = f"SAM 2 unavailable: missing {', '.join(missing)}." if self._last_error and not self._predictor: message = self._last_error return { "id": "sam2", "label": "SAM 2", "available": available, "loaded": self._predictor is not None, "device": device, "supports": ["point", "box", "interactive", "auto", "propagate"], "message": message, "package_available": SAM2_AVAILABLE, "checkpoint_exists": checkpoint_exists, "checkpoint_path": settings.sam_model_path, "python_ok": True, "torch_ok": TORCH_AVAILABLE, "cuda_required": False, } # ----------------------------------------------------------------------- # Public API # ----------------------------------------------------------------------- def predict_points( self, image: np.ndarray, points: list[list[float]], labels: list[int], ) -> tuple[list[list[list[float]]], list[float]]: """Run point-prompt segmentation. Args: image: HWC numpy array (uint8). points: List of [x, y] normalized coordinates (0-1). labels: 1 for foreground, 0 for background. Returns: Tuple of (polygons, scores). """ if not self._ensure_ready(): logger.warning("SAM2 not ready; returning dummy masks.") return self._dummy_polygons(image.shape[1], image.shape[0]), [0.5] try: h, w = image.shape[:2] pts = np.array([[p[0] * w, p[1] * h] for p in points], dtype=np.float32) lbls = np.array(labels, dtype=np.int32) with torch.inference_mode(): # type: ignore[name-defined] self._predictor.set_image(image) masks, scores, _ = self._predictor.predict( point_coords=pts, point_labels=lbls, multimask_output=True, ) polygons = [] for m in masks: poly = self._mask_to_polygon(m) if poly: polygons.append(poly) return polygons, scores.tolist() except Exception as exc: # noqa: BLE001 logger.error("SAM2 point prediction failed: %s", exc) return self._dummy_polygons(image.shape[1], image.shape[0]), [0.5] def predict_box( self, image: np.ndarray, box: list[float], ) -> tuple[list[list[list[float]]], list[float]]: """Run box-prompt segmentation. Args: image: HWC numpy array (uint8). box: [x1, y1, x2, y2] normalized coordinates. Returns: Tuple of (polygons, scores). """ if not self._ensure_ready(): logger.warning("SAM2 not ready; returning dummy masks.") return self._dummy_polygons(image.shape[1], image.shape[0]), [0.5] try: h, w = image.shape[:2] bbox = np.array( [box[0] * w, box[1] * h, box[2] * w, box[3] * h], dtype=np.float32, ) with torch.inference_mode(): # type: ignore[name-defined] self._predictor.set_image(image) masks, scores, _ = self._predictor.predict( box=bbox[None, :], multimask_output=False, ) polygons = [] for m in masks: poly = self._mask_to_polygon(m) if poly: polygons.append(poly) return polygons, scores.tolist() except Exception as exc: # noqa: BLE001 logger.error("SAM2 box prediction failed: %s", exc) return self._dummy_polygons(image.shape[1], image.shape[0]), [0.5] def predict_interactive( self, image: np.ndarray, box: list[float] | None, points: list[list[float]], labels: list[int], ) -> tuple[list[list[list[float]]], list[float]]: """Run combined box and point prompt segmentation for refinement.""" if not self._ensure_ready(): logger.warning("SAM2 not ready; returning dummy masks.") return self._dummy_polygons(image.shape[1], image.shape[0]), [0.5] try: h, w = image.shape[:2] bbox = None if box: bbox = np.array( [box[0] * w, box[1] * h, box[2] * w, box[3] * h], dtype=np.float32, ) pts = None lbls = None if points: pts = np.array([[p[0] * w, p[1] * h] for p in points], dtype=np.float32) lbls = np.array(labels, dtype=np.int32) with torch.inference_mode(): # type: ignore[name-defined] self._predictor.set_image(image) masks, scores, _ = self._predictor.predict( point_coords=pts, point_labels=lbls, box=bbox, multimask_output=False, ) polygons = [] for m in masks: poly = self._mask_to_polygon(m) if poly: polygons.append(poly) return polygons, scores.tolist() except Exception as exc: # noqa: BLE001 logger.error("SAM2 interactive prediction failed: %s", exc) return self._dummy_polygons(image.shape[1], image.shape[0]), [0.5] def predict_auto(self, image: np.ndarray) -> tuple[list[list[list[float]]], list[float]]: """Run automatic mask generation (grid of points). Args: image: HWC numpy array (uint8). Returns: Tuple of (polygons, scores). """ if not self._ensure_ready(): logger.warning("SAM2 not ready; returning dummy masks.") return self._dummy_polygons(image.shape[1], image.shape[0]), [0.5] try: with torch.inference_mode(): # type: ignore[name-defined] self._predictor.set_image(image) # Generate a uniform 16x16 grid of point prompts h, w = image.shape[:2] grid = np.mgrid[0:1:17j, 0:1:17j].reshape(2, -1).T pts = grid * np.array([w, h]) lbls = np.ones(pts.shape[0], dtype=np.int32) masks, scores, _ = self._predictor.predict( point_coords=pts, point_labels=lbls, multimask_output=True, ) polygons = [] for m in masks[:3]: # Limit to top 3 masks poly = self._mask_to_polygon(m) if poly: polygons.append(poly) return polygons, scores[:3].tolist() except Exception as exc: # noqa: BLE001 logger.error("SAM2 auto prediction failed: %s", exc) return self._dummy_polygons(image.shape[1], image.shape[0]), [0.5] def propagate_video( self, frame_paths: list[str], source_frame_index: int, seed: dict, direction: str = "forward", max_frames: int | None = None, ) -> list[dict]: """Propagate one seed mask across a prepared frame directory with SAM 2 video.""" if not self._ensure_video_ready(): raise RuntimeError(self._video_last_error or self.status()["message"]) if not frame_paths: return [] if source_frame_index < 0 or source_frame_index >= len(frame_paths): raise ValueError("source_frame_index is outside the frame sequence.") import cv2 source_image = cv2.imread(frame_paths[source_frame_index]) if source_image is None: raise RuntimeError("Failed to decode source frame for SAM 2 propagation.") height, width = source_image.shape[:2] seed_mask = self._polygons_to_mask(seed.get("polygons") or [], width, height) if not seed_mask.any(): bbox = seed.get("bbox") if isinstance(bbox, list) and len(bbox) == 4: seed_mask = self._bbox_to_mask(bbox, width, height) if not seed_mask.any(): raise ValueError("SAM 2 propagation requires a non-empty seed polygon or bbox.") inference_state = self._video_predictor.init_state( video_path=os.path.dirname(frame_paths[0]), offload_video_to_cpu=True, offload_state_to_cpu=True, ) self._video_predictor.add_new_mask( inference_state, frame_idx=source_frame_index, obj_id=1, mask=seed_mask, ) results: dict[int, dict] = {} def collect(reverse: bool) -> None: for out_frame_idx, out_obj_ids, out_mask_logits in self._video_predictor.propagate_in_video( inference_state, start_frame_idx=source_frame_index, max_frame_num_to_track=max_frames, reverse=reverse, ): masks = out_mask_logits if hasattr(masks, "detach"): masks = masks.detach().cpu().numpy() masks = np.asarray(masks) if masks.ndim == 4: masks = masks[:, 0] polygons = [] scores = [] for mask in masks: polygon = self._mask_to_polygon(mask > 0) if polygon: polygons.append(polygon) scores.append(1.0) results[int(out_frame_idx)] = { "frame_index": int(out_frame_idx), "polygons": polygons, "scores": scores, "object_ids": [int(obj_id) for obj_id in list(out_obj_ids)], } normalized_direction = direction.lower() if normalized_direction in {"forward", "both"}: collect(reverse=False) if normalized_direction in {"backward", "both"}: collect(reverse=True) try: self._video_predictor.reset_state(inference_state) except Exception: # noqa: BLE001 pass return [results[index] for index in sorted(results)] # ----------------------------------------------------------------------- # Helpers # ----------------------------------------------------------------------- @staticmethod def _mask_to_polygon(mask: np.ndarray) -> list[list[float]]: """Convert a binary mask to a normalized polygon.""" import cv2 if mask.dtype != np.uint8: mask = (mask > 0).astype(np.uint8) contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) h, w = mask.shape[:2] largest = [] for cnt in contours: if len(cnt) > len(largest): largest = cnt if len(largest) < 3: return [] return [[float(pt[0][0]) / w, float(pt[0][1]) / h] for pt in largest] @staticmethod def _dummy_polygons(w: int, h: int) -> list[list[list[float]]]: """Return a dummy rectangle polygon for fallback mode.""" return [ [ [0.25, 0.25], [0.75, 0.25], [0.75, 0.75], [0.25, 0.75], ] ] @staticmethod def _polygons_to_mask(polygons: list[list[list[float]]], width: int, height: int) -> np.ndarray: import cv2 mask = np.zeros((height, width), dtype=np.uint8) for polygon in polygons: if len(polygon) < 3: continue pts = np.array( [ [ int(round(min(max(float(x), 0.0), 1.0) * max(width - 1, 1))), int(round(min(max(float(y), 0.0), 1.0) * max(height - 1, 1))), ] for x, y in polygon ], dtype=np.int32, ) cv2.fillPoly(mask, [pts], 1) return mask.astype(bool) @staticmethod def _bbox_to_mask(bbox: list[float], width: int, height: int) -> np.ndarray: x, y, w, h = [min(max(float(value), 0.0), 1.0) for value in bbox] left = int(round(x * max(width - 1, 1))) top = int(round(y * max(height - 1, 1))) right = int(round(min(x + w, 1.0) * max(width - 1, 1))) bottom = int(round(min(y + h, 1.0) * max(height - 1, 1))) mask = np.zeros((height, width), dtype=bool) mask[top:max(bottom + 1, top + 1), left:max(right + 1, left + 1)] = True return mask # Singleton instance sam_engine = SAM2Engine()