"""Video/DICOM frame parsing and MinIO upload utilities.""" import logging import os import shutil import subprocess from pathlib import Path from typing import List, Optional import cv2 import numpy as np from pydicom import dcmread from minio_client import upload_file, BUCKET_NAME logger = logging.getLogger(__name__) def parse_video( video_path: str, output_dir: str, fps: int = 30, max_frames: Optional[int] = None, ) -> List[str]: """Extract frames from a video file using FFmpeg or OpenCV fallback. Args: video_path: Path to the input video file. output_dir: Directory to save extracted frames. fps: Target frame extraction rate. max_frames: Optional maximum number of frames to extract. Returns: List of paths to extracted frame images. """ os.makedirs(output_dir, exist_ok=True) frame_paths: List[str] = [] # Try FFmpeg first if shutil.which("ffmpeg"): try: pattern = os.path.join(output_dir, "frame_%06d.jpg") cmd = [ "ffmpeg", "-i", video_path, "-vf", f"fps={fps},scale=640:-1", "-q:v", "5", "-y", pattern, ] logger.info("Running FFmpeg: %s", " ".join(cmd)) result = subprocess.run(cmd, capture_output=True, text=True, check=False) if result.returncode == 0: frame_paths = sorted( [os.path.join(output_dir, f) for f in os.listdir(output_dir) if f.endswith(".jpg")] ) if max_frames: frame_paths = frame_paths[:max_frames] logger.info("Extracted %d frames via FFmpeg", len(frame_paths)) return frame_paths else: logger.warning("FFmpeg failed: %s", result.stderr) except Exception as exc: # noqa: BLE001 logger.warning("FFmpeg exception: %s", exc) # OpenCV fallback logger.info("Falling back to OpenCV frame extraction") cap = cv2.VideoCapture(video_path) if not cap.isOpened(): raise RuntimeError(f"Cannot open video: {video_path}") video_fps = cap.get(cv2.CAP_PROP_FPS) or 30 interval = max(1, int(round(video_fps / fps))) count = 0 saved = 0 while True: ret, frame = cap.read() if not ret: break if count % interval == 0: path = os.path.join(output_dir, f"frame_{saved:06d}.jpg") cv2.imwrite(path, frame, [cv2.IMWRITE_JPEG_QUALITY, 80]) frame_paths.append(path) saved += 1 if max_frames and saved >= max_frames: break count += 1 cap.release() logger.info("Extracted %d frames via OpenCV", len(frame_paths)) return frame_paths def parse_dicom( dicom_dir: str, output_dir: str, max_frames: Optional[int] = None, ) -> List[str]: """Extract frames from DICOM files in a directory. Args: dicom_dir: Directory containing .dcm files. output_dir: Directory to save extracted frames. max_frames: Optional maximum number of frames to extract. Returns: List of paths to extracted frame images. """ os.makedirs(output_dir, exist_ok=True) dcm_files = sorted( [f for f in os.listdir(dicom_dir) if f.lower().endswith(".dcm")] ) frame_paths: List[str] = [] for idx, fname in enumerate(dcm_files): if max_frames and idx >= max_frames: break path = os.path.join(dicom_dir, fname) try: ds = dcmread(path) pixel_array = ds.pixel_array # Normalize to 8-bit if pixel_array.dtype != np.uint8: pixel_array = pixel_array.astype(np.float32) pixel_array = ( (pixel_array - pixel_array.min()) / (pixel_array.max() - pixel_array.min() + 1e-8) * 255 ) pixel_array = pixel_array.astype(np.uint8) # Handle multi-frame DICOM if pixel_array.ndim == 3: for f in range(pixel_array.shape[0]): out_path = os.path.join(output_dir, f"frame_{idx:06d}_{f:03d}.png") cv2.imwrite(out_path, pixel_array[f]) frame_paths.append(out_path) else: out_path = os.path.join(output_dir, f"frame_{idx:06d}.png") cv2.imwrite(out_path, pixel_array) frame_paths.append(out_path) except Exception as exc: # noqa: BLE001 logger.error("Failed to read DICOM %s: %s", path, exc) logger.info("Extracted %d frames from DICOM", len(frame_paths)) return frame_paths def upload_frames_to_minio( frames: List[str], project_id: int, object_prefix: Optional[str] = None, ) -> List[str]: """Upload a list of local frame images to MinIO. Args: frames: List of local file paths. project_id: Project ID used for bucket path organization. object_prefix: Optional prefix override. Returns: List of object names (paths) in MinIO. """ prefix = object_prefix or f"projects/{project_id}/frames" object_names: List[str] = [] for frame_path in frames: fname = os.path.basename(frame_path) object_name = f"{prefix}/{fname}" try: with open(frame_path, "rb") as f: data = f.read() upload_file( object_name, data, content_type="image/jpeg", length=len(data), ) object_names.append(object_name) except Exception as exc: # noqa: BLE001 logger.error("Failed to upload %s: %s", frame_path, exc) logger.info("Uploaded %d/%d frames to MinIO", len(object_names), len(frames)) return object_names