235 lines
8.1 KiB
Python
235 lines
8.1 KiB
Python
"""SAM 2 engine wrapper with lazy loading and fallback stubs."""
|
|
|
|
import logging
|
|
import os
|
|
from typing import Optional
|
|
|
|
import numpy as np
|
|
|
|
from config import settings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Attempt to import SAM 2; fall back to stubs if unavailable.
|
|
# ---------------------------------------------------------------------------
|
|
try:
|
|
import torch
|
|
from sam2.build_sam import build_sam2
|
|
from sam2.sam2_image_predictor import SAM2ImagePredictor
|
|
|
|
SAM2_AVAILABLE = True
|
|
logger.info("SAM2 library imported successfully.")
|
|
except Exception as exc: # noqa: BLE001
|
|
SAM2_AVAILABLE = False
|
|
logger.warning("SAM2 import failed (%s). Using stub engine.", exc)
|
|
|
|
|
|
class SAM2Engine:
|
|
"""Lazy-loaded SAM 2 inference engine."""
|
|
|
|
def __init__(self) -> None:
|
|
self._predictor: Optional[SAM2ImagePredictor] = None
|
|
self._model_loaded = False
|
|
|
|
# -----------------------------------------------------------------------
|
|
# Internal helpers
|
|
# -----------------------------------------------------------------------
|
|
def _load_model(self) -> None:
|
|
"""Load the SAM 2 model and predictor on first use."""
|
|
if self._model_loaded:
|
|
return
|
|
|
|
if not SAM2_AVAILABLE:
|
|
logger.warning("SAM2 not available; skipping model load.")
|
|
self._model_loaded = True
|
|
return
|
|
|
|
if not os.path.isfile(settings.sam_model_path):
|
|
logger.error("SAM checkpoint not found at %s", settings.sam_model_path)
|
|
self._model_loaded = True
|
|
return
|
|
|
|
try:
|
|
model = build_sam2(
|
|
settings.sam_model_config,
|
|
settings.sam_model_path,
|
|
device="cuda",
|
|
)
|
|
self._predictor = SAM2ImagePredictor(model)
|
|
self._model_loaded = True
|
|
logger.info("SAM 2 model loaded from %s", settings.sam_model_path)
|
|
except Exception as exc: # noqa: BLE001
|
|
logger.error("Failed to load SAM 2 model: %s", exc)
|
|
self._model_loaded = True # Prevent repeated load attempts
|
|
|
|
def _ensure_ready(self) -> bool:
|
|
"""Ensure the model is loaded; return whether it is usable."""
|
|
self._load_model()
|
|
return SAM2_AVAILABLE and self._predictor is not None
|
|
|
|
# -----------------------------------------------------------------------
|
|
# Public API
|
|
# -----------------------------------------------------------------------
|
|
def predict_points(
|
|
self,
|
|
image: np.ndarray,
|
|
points: list[list[float]],
|
|
labels: list[int],
|
|
) -> tuple[list[list[list[float]]], list[float]]:
|
|
"""Run point-prompt segmentation.
|
|
|
|
Args:
|
|
image: HWC numpy array (uint8).
|
|
points: List of [x, y] normalized coordinates (0-1).
|
|
labels: 1 for foreground, 0 for background.
|
|
|
|
Returns:
|
|
Tuple of (polygons, scores).
|
|
"""
|
|
if not self._ensure_ready():
|
|
logger.warning("SAM2 not ready; returning dummy masks.")
|
|
return self._dummy_polygons(image.shape[1], image.shape[0]), [0.5]
|
|
|
|
try:
|
|
h, w = image.shape[:2]
|
|
pts = np.array([[p[0] * w, p[1] * h] for p in points], dtype=np.float32)
|
|
lbls = np.array(labels, dtype=np.int32)
|
|
|
|
with torch.inference_mode(): # type: ignore[name-defined]
|
|
self._predictor.set_image(image)
|
|
masks, scores, _ = self._predictor.predict(
|
|
point_coords=pts,
|
|
point_labels=lbls,
|
|
multimask_output=True,
|
|
)
|
|
|
|
polygons = []
|
|
for m in masks:
|
|
poly = self._mask_to_polygon(m)
|
|
if poly:
|
|
polygons.append(poly)
|
|
|
|
return polygons, scores.tolist()
|
|
except Exception as exc: # noqa: BLE001
|
|
logger.error("SAM2 point prediction failed: %s", exc)
|
|
return self._dummy_polygons(image.shape[1], image.shape[0]), [0.5]
|
|
|
|
def predict_box(
|
|
self,
|
|
image: np.ndarray,
|
|
box: list[float],
|
|
) -> tuple[list[list[list[float]]], list[float]]:
|
|
"""Run box-prompt segmentation.
|
|
|
|
Args:
|
|
image: HWC numpy array (uint8).
|
|
box: [x1, y1, x2, y2] normalized coordinates.
|
|
|
|
Returns:
|
|
Tuple of (polygons, scores).
|
|
"""
|
|
if not self._ensure_ready():
|
|
logger.warning("SAM2 not ready; returning dummy masks.")
|
|
return self._dummy_polygons(image.shape[1], image.shape[0]), [0.5]
|
|
|
|
try:
|
|
h, w = image.shape[:2]
|
|
bbox = np.array(
|
|
[box[0] * w, box[1] * h, box[2] * w, box[3] * h],
|
|
dtype=np.float32,
|
|
)
|
|
|
|
with torch.inference_mode(): # type: ignore[name-defined]
|
|
self._predictor.set_image(image)
|
|
masks, scores, _ = self._predictor.predict(
|
|
box=bbox[None, :],
|
|
multimask_output=False,
|
|
)
|
|
|
|
polygons = []
|
|
for m in masks:
|
|
poly = self._mask_to_polygon(m)
|
|
if poly:
|
|
polygons.append(poly)
|
|
|
|
return polygons, scores.tolist()
|
|
except Exception as exc: # noqa: BLE001
|
|
logger.error("SAM2 box prediction failed: %s", exc)
|
|
return self._dummy_polygons(image.shape[1], image.shape[0]), [0.5]
|
|
|
|
def predict_auto(self, image: np.ndarray) -> tuple[list[list[list[float]]], list[float]]:
|
|
"""Run automatic mask generation (grid of points).
|
|
|
|
Args:
|
|
image: HWC numpy array (uint8).
|
|
|
|
Returns:
|
|
Tuple of (polygons, scores).
|
|
"""
|
|
if not self._ensure_ready():
|
|
logger.warning("SAM2 not ready; returning dummy masks.")
|
|
return self._dummy_polygons(image.shape[1], image.shape[0]), [0.5]
|
|
|
|
try:
|
|
with torch.inference_mode(): # type: ignore[name-defined]
|
|
self._predictor.set_image(image)
|
|
# Generate a uniform 16x16 grid of point prompts
|
|
h, w = image.shape[:2]
|
|
grid = np.mgrid[0:1:17j, 0:1:17j].reshape(2, -1).T
|
|
pts = grid * np.array([w, h])
|
|
lbls = np.ones(pts.shape[0], dtype=np.int32)
|
|
|
|
masks, scores, _ = self._predictor.predict(
|
|
point_coords=pts,
|
|
point_labels=lbls,
|
|
multimask_output=True,
|
|
)
|
|
|
|
polygons = []
|
|
for m in masks[:3]: # Limit to top 3 masks
|
|
poly = self._mask_to_polygon(m)
|
|
if poly:
|
|
polygons.append(poly)
|
|
|
|
return polygons, scores[:3].tolist()
|
|
except Exception as exc: # noqa: BLE001
|
|
logger.error("SAM2 auto prediction failed: %s", exc)
|
|
return self._dummy_polygons(image.shape[1], image.shape[0]), [0.5]
|
|
|
|
# -----------------------------------------------------------------------
|
|
# Helpers
|
|
# -----------------------------------------------------------------------
|
|
@staticmethod
|
|
def _mask_to_polygon(mask: np.ndarray) -> list[list[float]]:
|
|
"""Convert a binary mask to a normalized polygon."""
|
|
import cv2
|
|
|
|
if mask.dtype != np.uint8:
|
|
mask = (mask > 0).astype(np.uint8)
|
|
contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
|
|
h, w = mask.shape[:2]
|
|
largest = []
|
|
for cnt in contours:
|
|
if len(cnt) > len(largest):
|
|
largest = cnt
|
|
if len(largest) < 3:
|
|
return []
|
|
return [[float(pt[0][0]) / w, float(pt[0][1]) / h] for pt in largest]
|
|
|
|
@staticmethod
|
|
def _dummy_polygons(w: int, h: int) -> list[list[list[float]]]:
|
|
"""Return a dummy rectangle polygon for fallback mode."""
|
|
return [
|
|
[
|
|
[0.25, 0.25],
|
|
[0.75, 0.25],
|
|
[0.75, 0.75],
|
|
[0.25, 0.75],
|
|
]
|
|
]
|
|
|
|
|
|
# Singleton instance
|
|
sam_engine = SAM2Engine()
|