Files
Head_CT_Morph/web_backend.py

1872 lines
66 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import base64
import json
import os
import shutil
import struct
import threading
import time
import traceback
import uuid
import zipfile
from email.parser import BytesParser
from email.policy import default
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from io import BytesIO
from pathlib import Path
from urllib.parse import parse_qs, quote, unquote, urlparse
os.environ.setdefault("MPLCONFIGDIR", "/tmp/head_ct_morph_matplotlib")
import pydicom
import numpy as np
from pydicom.multival import MultiValue
from PIL import Image, ImageDraw
from generate_head_extension_video import generate_video
from head_extension_app import (
APP_DIR,
crop_head_neck,
ct_window,
draw_cutoff_line,
fit_image,
load_dicom_volume,
preview_deform_with_cutoff_line,
preview_deform_2d,
run_deformation,
sagittal_mip,
safe_mkdir,
)
HOST = "0.0.0.0"
PORT = 8787
JOBS = {}
JOBS_LOCK = threading.Lock()
DICOM_FILE_CACHE = {}
DICOM_VOLUME_CACHE = {}
DICOM_VOLUME_CACHE_LOCK = threading.Lock()
DICOM_VOLUME_CACHE_LIMIT = 2
LIBRARY_DIR = APP_DIR / "web_library"
LIBRARY_META = LIBRARY_DIR / "library.json"
RESULT_DIR = APP_DIR / "web_results"
JOBS_META = RESULT_DIR / "jobs.json"
USER_TASKS_META = RESULT_DIR / "user_tasks.json"
PREVIEW_CACHE_DIR = LIBRARY_DIR / "_preview_cache"
MODEL_DIR = LIBRARY_DIR / "_stl_models"
STL_MODEL_CACHE = {}
STL_MODEL_CACHE_LOCK = threading.Lock()
SEGMENTATION_CACHE = {}
SEGMENTATION_CACHE_LOCK = threading.Lock()
SEGMENTATION_CACHE_LIMIT = 4
SEGMENTATION_COLORS = [
(255, 112, 32),
(34, 211, 238),
(168, 85, 247),
(74, 222, 128),
(250, 204, 21),
(244, 114, 182),
(96, 165, 250),
(251, 146, 60),
]
VIEWER_WINDOWS = {
"default": {"label": "默认", "low": -500, "high": 1200},
"bone": {"label": "骨窗", "low": -500, "high": 1800},
"soft_tissue": {"label": "软组织", "low": -160, "high": 240},
"brain": {"label": "脑窗", "low": 0, "high": 80},
"lung": {"label": "肺窗", "low": -1000, "high": 400},
}
def json_default(value):
if isinstance(value, Path):
return str(value.resolve())
return str(value)
def now_date():
return time.strftime("%Y-%m-%d")
def safe_filename(name):
return "".join(char if char.isalnum() or char in "._-" else "_" for char in Path(name).name)
def safe_model_filename(name):
name = safe_filename(name or "model.stl")
return name if name.lower().endswith(".stl") else f"{name}.stl"
def safe_segmentation_filename(name):
name = safe_filename(name or "segmentation.dcm")
path = Path(name)
suffix = path.suffix.lower()
if suffix in {".dcm", ".dicom"}:
return f"{path.stem}{suffix}"
return f"{name}.dcm"
def normalized_username(username):
username = str(username or "").strip()
return username or "anonymous"
def read_json_file(path, default):
path = Path(path)
if not path.exists():
return default
try:
return json.loads(path.read_text(encoding="utf-8"))
except Exception:
return default
def write_json_file(path, value):
path = Path(path)
safe_mkdir(path.parent)
path.write_text(json.dumps(value, ensure_ascii=False, indent=2, default=json_default), encoding="utf-8")
def read_library_meta():
safe_mkdir(LIBRARY_DIR)
if LIBRARY_META.exists():
return json.loads(LIBRARY_META.read_text(encoding="utf-8"))
return []
def write_library_meta(items):
safe_mkdir(LIBRARY_DIR)
LIBRARY_META.write_text(json.dumps(items, ensure_ascii=False, indent=2), encoding="utf-8")
def folder_size_mb(path):
total = 0
for file_path in Path(path).glob("*.dcm"):
total += file_path.stat().st_size
return round(total / 1024 / 1024)
def build_library_record(item_id, patient_id, dicom_path, source="upload", version="DICOM-LIB"):
dicom_dir = Path(dicom_path).resolve()
file_count = len(list(dicom_dir.glob("*.dcm")))
return {
"id": item_id,
"patientId": patient_id,
"date": now_date(),
"version": version,
"status": "processed",
"size": folder_size_mb(dicom_dir),
"previewColor": "bg-slate-900" if source == "upload" else "bg-slate-800",
"dicomPath": str(dicom_dir),
"fileCount": file_count,
"source": source,
}
def list_library():
items = [
item for item in read_library_meta()
if item.get("source") != "seed" and item.get("id") != "seed_ori_head_ct"
]
live_items = []
changed = False
for item in items:
dicom_path = Path(item.get("dicomPath", ""))
if dicom_path.exists():
item["fileCount"] = len(list(dicom_path.glob("*.dcm")))
item["size"] = folder_size_mb(dicom_path)
live_items.append(item)
else:
changed = True
if changed:
write_library_meta(live_items)
return live_items
def sort_key_for_dicom(path):
path = Path(path)
try:
ds = pydicom.dcmread(str(path), stop_before_pixels=True, force=True)
return (0, int(getattr(ds, "InstanceNumber", 0)), path.name)
except Exception:
stem = path.stem
return (1, int(stem) if stem.isdigit() else 0, path.name)
def sorted_dicom_files(dicom_dir):
dicom_dir = Path(dicom_dir).resolve()
files = list(dicom_dir.glob("*.dcm"))
signature = (
str(dicom_dir),
len(files),
max((file_path.stat().st_mtime for file_path in files), default=0),
)
cached = DICOM_FILE_CACHE.get(str(dicom_dir))
if cached and cached["signature"] == signature:
return cached["files"]
sorted_files = sorted(files, key=sort_key_for_dicom)
DICOM_FILE_CACHE[str(dicom_dir)] = {
"signature": signature,
"files": sorted_files,
}
return sorted_files
def dicom_dir_signature(dicom_dir):
dicom_dir = Path(dicom_dir).resolve()
files = list(dicom_dir.glob("*.dcm"))
return (
str(dicom_dir),
len(files),
max((file_path.stat().st_mtime for file_path in files), default=0),
)
def load_cached_dicom_volume(dicom_dir):
dicom_dir = Path(dicom_dir).resolve()
signature = dicom_dir_signature(dicom_dir)
cache_key = str(dicom_dir)
with DICOM_VOLUME_CACHE_LOCK:
cached = DICOM_VOLUME_CACHE.get(cache_key)
if cached and cached["signature"] == signature:
cached["last_access"] = time.time()
return cached["volume"]
volume = load_dicom_volume(dicom_dir)
DICOM_VOLUME_CACHE[cache_key] = {
"signature": signature,
"volume": volume,
"last_access": time.time(),
}
while len(DICOM_VOLUME_CACHE) > DICOM_VOLUME_CACHE_LIMIT:
oldest_key = min(
DICOM_VOLUME_CACHE,
key=lambda key: DICOM_VOLUME_CACHE[key].get("last_access", 0),
)
if oldest_key == cache_key:
break
DICOM_VOLUME_CACHE.pop(oldest_key, None)
return volume
def clear_dicom_caches(dicom_dir=None):
if dicom_dir is None:
DICOM_FILE_CACHE.clear()
with DICOM_VOLUME_CACHE_LOCK:
DICOM_VOLUME_CACHE.clear()
return
cache_key = str(Path(dicom_dir).resolve())
DICOM_FILE_CACHE.pop(cache_key, None)
with DICOM_VOLUME_CACHE_LOCK:
DICOM_VOLUME_CACHE.pop(cache_key, None)
def parse_ascii_stl(text):
vertices = []
triangles = []
for line in text.splitlines():
parts = line.strip().split()
if len(parts) == 4 and parts[0].lower() == "vertex":
try:
vertices.append([float(parts[1]), float(parts[2]), float(parts[3])])
except ValueError:
vertices = []
break
if len(vertices) == 3:
triangles.append(vertices)
vertices = []
if not triangles:
raise RuntimeError("STL 文件中没有可解析的三角面。")
return np.asarray(triangles, dtype=np.float32)
def parse_binary_stl(data):
if len(data) < 84:
raise RuntimeError("STL 文件过小,无法解析。")
triangle_count = struct.unpack_from("<I", data, 80)[0]
expected_size = 84 + triangle_count * 50
if triangle_count <= 0 or expected_size > len(data):
raise RuntimeError("Binary STL 三角面数量异常。")
triangles = np.zeros((triangle_count, 3, 3), dtype=np.float32)
offset = 84
for index in range(triangle_count):
values = struct.unpack_from("<12fH", data, offset)
triangles[index] = np.asarray(values[3:12], dtype=np.float32).reshape(3, 3)
offset += 50
return triangles
def parse_stl_bytes(data):
try:
text = data.decode("utf-8", errors="ignore")
except Exception:
text = ""
if text.lstrip().lower().startswith("solid"):
try:
return parse_ascii_stl(text)
except Exception:
pass
return parse_binary_stl(data)
def save_uploaded_stl(headers, body):
if not body:
raise RuntimeError("上传的 STL 文件为空。")
safe_mkdir(MODEL_DIR)
source_name = safe_model_filename(unquote(headers.get("x-file-name", "model.stl")))
model_id = uuid.uuid4().hex[:12]
model_path = MODEL_DIR / f"{model_id}_{source_name}"
model_path.write_bytes(body)
triangles = parse_stl_bytes(body)
bounds_min = triangles.reshape(-1, 3).min(axis=0).tolist()
bounds_max = triangles.reshape(-1, 3).max(axis=0).tolist()
with STL_MODEL_CACHE_LOCK:
STL_MODEL_CACHE[model_id] = {
"path": model_path,
"triangles": triangles,
"name": source_name,
"bounds": [bounds_min, bounds_max],
}
return {
"modelId": model_id,
"name": source_name,
"triangleCount": int(triangles.shape[0]),
"bounds": [bounds_min, bounds_max],
}
def load_stl_model(model_id):
model_id = safe_filename(model_id)
if not model_id:
raise RuntimeError("模型 ID 为空。")
with STL_MODEL_CACHE_LOCK:
cached = STL_MODEL_CACHE.get(model_id)
if cached:
return cached
matches = list(MODEL_DIR.glob(f"{model_id}_*.stl"))
if not matches:
raise RuntimeError("没有找到已上传的 STL 模型。")
model_path = matches[0]
triangles = parse_stl_bytes(model_path.read_bytes())
bounds_min = triangles.reshape(-1, 3).min(axis=0).tolist()
bounds_max = triangles.reshape(-1, 3).max(axis=0).tolist()
model = {
"path": model_path,
"triangles": triangles,
"name": model_path.name.split("_", 1)[1] if "_" in model_path.name else model_path.name,
"bounds": [bounds_min, bounds_max],
}
with STL_MODEL_CACHE_LOCK:
STL_MODEL_CACHE[model_id] = model
return model
def segmentation_root_for_item(item):
return Path(item["dicomPath"]).resolve().parent / "segmentations"
def segmentation_file_for_id(item, segmentation_id):
segmentation_id = safe_filename(segmentation_id)
if not segmentation_id:
return None
root = segmentation_root_for_item(item)
matches = sorted(root.glob(f"{segmentation_id}_*.dcm")) + sorted(root.glob(f"{segmentation_id}_*.dicom"))
return matches[0] if matches else None
def segmentation_meta_path(segmentation_path):
return segmentation_path.with_suffix(segmentation_path.suffix + ".json")
def segmentation_signature(segmentation_path):
segmentation_path = Path(segmentation_path).resolve()
return (
str(segmentation_path),
segmentation_path.stat().st_size,
segmentation_path.stat().st_mtime,
)
def segment_labels_from_dataset(ds):
labels = {}
for segment in getattr(ds, "SegmentSequence", []) or []:
try:
number = int(getattr(segment, "SegmentNumber", 0))
except Exception:
number = 0
if number > 0:
labels[number] = str(getattr(segment, "SegmentLabel", f"Segment {number}"))
return labels
def frame_segment_number(ds, frame_index):
per_frame = getattr(ds, "PerFrameFunctionalGroupsSequence", None)
if per_frame and frame_index < len(per_frame):
segment_sequence = getattr(per_frame[frame_index], "SegmentIdentificationSequence", None)
if segment_sequence:
try:
return int(getattr(segment_sequence[0], "ReferencedSegmentNumber", 1))
except Exception:
return 1
return 1
def frame_position_patient(ds, frame_index):
per_frame = getattr(ds, "PerFrameFunctionalGroupsSequence", None)
if per_frame and frame_index < len(per_frame):
plane_position = getattr(per_frame[frame_index], "PlanePositionSequence", None)
if plane_position and hasattr(plane_position[0], "ImagePositionPatient"):
return np.asarray(plane_position[0].ImagePositionPatient, dtype=np.float64)
if hasattr(ds, "ImagePositionPatient"):
return np.asarray(ds.ImagePositionPatient, dtype=np.float64)
return None
def normalize_segmentation_frames(pixel_array):
array = np.asarray(pixel_array)
if array.ndim == 2:
return array[np.newaxis, :, :]
if array.ndim == 3:
return array
if array.ndim == 4:
if array.shape[-1] == 1:
return array[..., 0]
return np.max(array, axis=-1)
raise RuntimeError("Segmentation Mask 像素维度不受支持。")
def resize_label_frame(frame, rows, cols):
frame = np.asarray(frame)
if frame.shape == (rows, cols):
return frame
image = Image.fromarray(frame.astype(np.uint16))
image = image.resize((cols, rows), Image.Resampling.NEAREST)
return np.asarray(image)
def load_segmentation_mask(item_id, segmentation_id):
item = find_library_item(item_id)
if not item:
raise RuntimeError("影像库中没有找到该数据。")
segmentation_path = segmentation_file_for_id(item, segmentation_id)
if not segmentation_path:
raise RuntimeError("没有找到已上传的 DICOM Segmentation Mask。")
signature = segmentation_signature(segmentation_path)
cache_key = f"{item_id}:{safe_filename(segmentation_id)}"
with SEGMENTATION_CACHE_LOCK:
cached = SEGMENTATION_CACHE.get(cache_key)
if cached and cached["signature"] == signature:
cached["last_access"] = time.time()
return cached["data"]
dicom_files = sorted_dicom_files(item["dicomPath"])
if not dicom_files:
raise RuntimeError("该影像数据没有可配准的 CT DICOM。")
first_ct = pydicom.dcmread(str(dicom_files[0]), stop_before_pixels=True, force=True)
ct_rows = int(getattr(first_ct, "Rows", 0) or 0)
ct_cols = int(getattr(first_ct, "Columns", 0) or 0)
if ct_rows <= 0 or ct_cols <= 0:
raise RuntimeError("CT DICOM 缺少 Rows/Columns 信息,无法配准 Segmentation Mask。")
ds = pydicom.dcmread(str(segmentation_path), force=True)
frames = normalize_segmentation_frames(ds.pixel_array)
geometry = dicom_geometry(item["dicomPath"])
label_volume = np.zeros((len(dicom_files), ct_rows, ct_cols), dtype=np.uint16)
labels = segment_labels_from_dataset(ds)
for frame_index, frame in enumerate(frames):
frame = resize_label_frame(frame, ct_rows, ct_cols)
active = frame > 0
if not np.any(active):
continue
position = frame_position_patient(ds, frame_index)
if geometry and position is not None:
voxel = (position - geometry["origin"]) @ geometry["inverse"].T
slice_index = int(round(float(voxel[0])))
elif len(frames) == len(dicom_files):
slice_index = frame_index
else:
slice_index = min(len(dicom_files) - 1, frame_index)
slice_index = max(0, min(len(dicom_files) - 1, slice_index))
if int(frame.max()) > 1 and not labels:
label_volume[slice_index][active] = np.maximum(label_volume[slice_index][active], frame[active].astype(np.uint16))
else:
label = max(1, frame_segment_number(ds, frame_index))
label_volume[slice_index][active] = label
labels.setdefault(label, f"Segment {label}")
unique_labels = sorted(int(value) for value in np.unique(label_volume) if int(value) > 0)
if not unique_labels:
raise RuntimeError("DICOM Segmentation Mask 中没有可渲染的分割像素。")
if not labels:
labels = {value: f"Label {value}" for value in unique_labels}
data = {
"itemId": item_id,
"segId": safe_filename(segmentation_id),
"path": segmentation_path,
"name": segmentation_path.name.split("_", 1)[1] if "_" in segmentation_path.name else segmentation_path.name,
"volume": label_volume,
"frameCount": int(frames.shape[0]),
"segmentCount": len(unique_labels),
"labels": [
{"value": int(value), "label": labels.get(int(value), f"Segment {int(value)}")}
for value in unique_labels
],
}
with SEGMENTATION_CACHE_LOCK:
SEGMENTATION_CACHE[cache_key] = {
"signature": signature,
"data": data,
"last_access": time.time(),
}
while len(SEGMENTATION_CACHE) > SEGMENTATION_CACHE_LIMIT:
oldest_key = min(
SEGMENTATION_CACHE,
key=lambda key: SEGMENTATION_CACHE[key].get("last_access", 0),
)
if oldest_key == cache_key:
break
SEGMENTATION_CACHE.pop(oldest_key, None)
return data
def serialize_segmentation(segmentation):
return {
"segId": segmentation["segId"],
"name": segmentation["name"],
"frameCount": segmentation["frameCount"],
"segmentCount": segmentation["segmentCount"],
"labels": segmentation["labels"],
}
def list_segmentations(item_id):
item = find_library_item(item_id)
if not item:
raise RuntimeError("影像库中没有找到该数据。")
root = segmentation_root_for_item(item)
if not root.exists():
return []
segmentations = []
for path in sorted(list(root.glob("*.dcm")) + list(root.glob("*.dicom"))):
meta = read_json_file(segmentation_meta_path(path), None)
if meta:
segmentations.append(meta)
continue
seg_id = path.name.split("_", 1)[0]
try:
segmentations.append(serialize_segmentation(load_segmentation_mask(item_id, seg_id)))
except Exception:
segmentations.append({
"segId": seg_id,
"name": path.name.split("_", 1)[1] if "_" in path.name else path.name,
"frameCount": 0,
"segmentCount": 0,
"labels": [],
})
return segmentations
def save_uploaded_segmentation(headers, body):
if not body:
raise RuntimeError("上传的 Segmentation Mask 文件为空。")
item_id = safe_filename(unquote(headers.get("x-library-id", "")))
item = find_library_item(item_id)
if not item:
raise RuntimeError("影像库中没有找到要绑定的 DICOM 数据。")
source_name = safe_segmentation_filename(unquote(headers.get("x-file-name", "segmentation.dcm")))
seg_id = uuid.uuid4().hex[:12]
root = segmentation_root_for_item(item)
safe_mkdir(root)
segmentation_path = root / f"{seg_id}_{source_name}"
segmentation_path.write_bytes(body)
try:
segmentation = load_segmentation_mask(item_id, seg_id)
meta = serialize_segmentation(segmentation)
write_json_file(segmentation_meta_path(segmentation_path), meta)
return meta
except Exception:
try:
segmentation_path.unlink()
except Exception:
pass
raise
def dicom_geometry(dicom_dir):
dicom_files = sorted_dicom_files(dicom_dir)
if not dicom_files:
return None
try:
first = pydicom.dcmread(str(dicom_files[0]), stop_before_pixels=True, force=True)
last = pydicom.dcmread(str(dicom_files[-1]), stop_before_pixels=True, force=True)
orientation = np.asarray(first.ImageOrientationPatient, dtype=np.float64)
col_dir = orientation[:3]
row_dir = orientation[3:]
slice_dir = np.cross(col_dir, row_dir)
pixel_spacing = np.asarray(first.PixelSpacing, dtype=np.float64)
row_spacing = float(pixel_spacing[0])
col_spacing = float(pixel_spacing[1])
first_pos = np.asarray(first.ImagePositionPatient, dtype=np.float64)
last_pos = np.asarray(last.ImagePositionPatient, dtype=np.float64)
slice_spacing = float(np.linalg.norm(last_pos - first_pos) / max(1, len(dicom_files) - 1))
if slice_spacing <= 0:
slice_spacing = float(getattr(first, "SliceThickness", 1) or 1)
basis = np.column_stack([
slice_dir * slice_spacing,
row_dir * row_spacing,
col_dir * col_spacing,
])
inverse = np.linalg.inv(basis)
return {
"origin": first_pos,
"inverse": inverse,
}
except Exception:
return None
def stl_triangles_to_voxels(triangles, dicom_dir):
geometry = dicom_geometry(dicom_dir)
if not geometry:
return triangles.astype(np.float32)
points = triangles.reshape(-1, 3).astype(np.float64)
voxel_points = (points - geometry["origin"]) @ geometry["inverse"].T
return voxel_points.reshape(triangles.shape).astype(np.float32)
def triangle_plane_segment(triangle, axis, value):
intersections = []
for start, end in [(triangle[0], triangle[1]), (triangle[1], triangle[2]), (triangle[2], triangle[0])]:
start_delta = float(start[axis] - value)
end_delta = float(end[axis] - value)
if abs(start_delta) < 1e-4 and abs(end_delta) < 1e-4:
continue
if abs(start_delta) < 1e-4:
intersections.append(start)
if start_delta * end_delta < 0:
t = start_delta / (start_delta - end_delta)
intersections.append(start + t * (end - start))
elif abs(end_delta) < 1e-4:
intersections.append(end)
unique = []
for point in intersections:
if not any(np.linalg.norm(point - existing) < 1e-3 for existing in unique):
unique.append(point)
if len(unique) >= 2:
return unique[0], unique[1]
return None
def make_stl_slice_mask(triangles, plane, index, image_shape):
height, width = image_shape
axis = 1 if plane == "coronal" else 2
mask_image = Image.new("L", (width, height), 0)
draw = ImageDraw.Draw(mask_image)
segment_count = 0
for triangle in triangles:
segment = triangle_plane_segment(triangle, axis, index)
if not segment:
continue
points = []
for point in segment:
if plane == "coronal":
x_value = float(point[2])
else:
x_value = float(point[1])
y_value = float(point[0])
points.append((x_value, y_value))
draw.line(points, fill=255, width=2)
segment_count += 1
if segment_count == 0:
return None, 0
mask = np.asarray(mask_image) > 0
try:
from scipy.ndimage import binary_fill_holes
filled = binary_fill_holes(mask)
if int(filled.sum()) > int(mask.sum()):
mask = filled
except Exception:
pass
mask_pixels = int(mask.sum())
if mask_pixels == 0:
return None, 0
return Image.fromarray((mask.astype(np.uint8) * 255), mode="L"), mask_pixels
def overlay_mask_on_preview(preview, mask):
overlay = Image.new("RGBA", preview.size, (255, 120, 20, 0))
alpha = mask.resize(preview.size, Image.Resampling.NEAREST)
overlay.putalpha(alpha.point(lambda value: 118 if value else 0))
preview_rgba = preview.convert("RGBA")
preview_rgba.alpha_composite(overlay)
return preview_rgba.convert("RGB")
def find_library_item(item_id):
return next((item for item in list_library() if item["id"] == item_id), None)
def make_library_slice_preview(item_id, index):
item = find_library_item(item_id)
if not item:
raise RuntimeError("影像库中没有找到该数据。")
dicom_files = sorted_dicom_files(item["dicomPath"])
if not dicom_files:
raise RuntimeError("该影像数据没有可预览的 .dcm 文件。")
count = len(dicom_files)
index = max(0, min(int(index), count - 1))
cache_dir = PREVIEW_CACHE_DIR / item_id
safe_mkdir(cache_dir)
preview_path = cache_dir / f"slice_{index:04d}.png"
if not preview_path.exists():
ds = pydicom.dcmread(str(dicom_files[index]), force=True)
image = ds.pixel_array.astype("float32")
image = image * float(getattr(ds, "RescaleSlope", 1))
image = image + float(getattr(ds, "RescaleIntercept", 0))
preview = Image.fromarray(ct_window(image)).convert("RGB")
preview = fit_image(preview, 720, 520)
preview.save(preview_path, format="PNG")
neighbors = [value for value in [index - 1, index + 1] if 0 <= value < count]
return {
"imageUrl": f"/api/file?path={quote(str(preview_path.resolve()), safe='')}",
"index": index,
"count": count,
"file": dicom_files[index].name,
"patientId": item["patientId"],
"neighbors": [
f"/api/library/preview?id={item_id}&index={neighbor}"
for neighbor in neighbors
],
}
def render_mask_only_preview(mask, size):
preview = Image.new("RGB", size, (8, 13, 28))
if mask is None:
return preview
alpha = mask.resize(size, Image.Resampling.NEAREST)
overlay = Image.new("RGBA", size, (255, 112, 32, 0))
overlay.putalpha(alpha.point(lambda value: 210 if value else 0))
preview_rgba = preview.convert("RGBA")
preview_rgba.alpha_composite(overlay)
return preview_rgba.convert("RGB")
def render_segmentation_label_preview(label_slice):
labels = np.asarray(label_slice, dtype=np.uint16)
rgb = np.zeros((labels.shape[0], labels.shape[1], 3), dtype=np.uint8)
rgb[:, :] = np.asarray((8, 13, 28), dtype=np.uint8)
for value in sorted(int(item) for item in np.unique(labels) if int(item) > 0):
color = SEGMENTATION_COLORS[(value - 1) % len(SEGMENTATION_COLORS)]
rgb[labels == value] = color
return Image.fromarray(rgb, mode="RGB")
def normalize_reformat_index(raw_index, count):
if str(raw_index) == "middle":
return count // 2
try:
return int(raw_index)
except Exception:
return count // 2
def make_segmentation_reformat_preview(item_id, segmentation_id, plane, index):
segmentation = load_segmentation_mask(item_id, segmentation_id)
plane = plane if plane in {"coronal", "sagittal"} else "coronal"
volume = segmentation["volume"]
if plane == "coronal":
count = volume.shape[1]
index = normalize_reformat_index(index, count)
index = max(0, min(index, count - 1))
label_slice = volume[:, index, :]
else:
count = volume.shape[2]
index = normalize_reformat_index(index, count)
index = max(0, min(index, count - 1))
label_slice = volume[:, :, index]
mask_pixels = int(np.count_nonzero(label_slice))
cache_dir = PREVIEW_CACHE_DIR / item_id / "segmentation"
safe_mkdir(cache_dir)
preview_path = cache_dir / f"{plane}_{index:04d}_seg_{safe_filename(segmentation_id)}.png"
if not preview_path.exists():
preview = render_segmentation_label_preview(label_slice)
preview = fit_image(preview, 960, 720)
preview.save(preview_path, format="PNG")
return {
"imageUrl": f"/api/file?path={quote(str(preview_path.resolve()), safe='')}",
"index": index,
"count": count,
"plane": plane,
"window": "segmentation",
"windowLabel": "Segmentation Mask",
"patientId": segmentation["itemId"],
"segId": segmentation["segId"],
"maskPixels": mask_pixels,
"segmentCount": segmentation["segmentCount"],
"labels": segmentation["labels"],
}
def make_library_reformat_preview(item_id, plane, index, window, model_id="", mask_only=False):
item = find_library_item(item_id)
if not item:
raise RuntimeError("影像库中没有找到该数据。")
plane = plane if plane in {"coronal", "sagittal"} else "coronal"
window = window if window in VIEWER_WINDOWS else "default"
volume = load_cached_dicom_volume(item["dicomPath"])
mask = None
mask_pixels = 0
if plane == "coronal":
count = volume.shape[1]
index = normalize_reformat_index(index, count)
index = max(0, min(index, count - 1))
image = volume[:, index, :]
else:
count = volume.shape[2]
index = normalize_reformat_index(index, count)
index = max(0, min(index, count - 1))
image = volume[:, :, index]
if model_id:
model = load_stl_model(model_id)
triangles = stl_triangles_to_voxels(model["triangles"], item["dicomPath"])
mask, mask_pixels = make_stl_slice_mask(triangles, plane, index, image.shape)
cache_dir = PREVIEW_CACHE_DIR / item_id / "reformat"
safe_mkdir(cache_dir)
model_suffix = f"_model_{safe_filename(model_id)}" if model_id else ""
view_suffix = "_mask_only" if mask_only and model_id else ""
preview_path = cache_dir / f"{plane}_{window}_{index:04d}{model_suffix}{view_suffix}.png"
if not preview_path.exists():
preset = VIEWER_WINDOWS[window]
if mask_only and model_id:
base_size = fit_image(Image.fromarray(ct_window(image, preset["low"], preset["high"])).convert("RGB"), 960, 720).size
preview = render_mask_only_preview(mask, base_size)
else:
preview = Image.fromarray(ct_window(image, preset["low"], preset["high"])).convert("RGB")
if mask is not None:
preview = overlay_mask_on_preview(preview, mask)
preview = fit_image(preview, 960, 720)
preview.save(preview_path, format="PNG")
return {
"imageUrl": f"/api/file?path={quote(str(preview_path.resolve()), safe='')}",
"index": index,
"count": count,
"plane": plane,
"window": window,
"windowLabel": VIEWER_WINDOWS[window]["label"],
"patientId": item["patientId"],
"modelId": model_id,
"maskPixels": mask_pixels,
}
def dicom_value(ds, name, fallback="-"):
value = getattr(ds, name, fallback)
if value in [None, ""]:
return fallback
if isinstance(value, (list, tuple, MultiValue)):
return " / ".join(str(item) for item in value)
return str(value)
def dicom_date(value):
value = str(value or "")
if len(value) == 8 and value.isdigit():
return f"{value[0:4]}-{value[4:6]}-{value[6:8]}"
return value or "-"
def dicom_time(value):
value = str(value or "")
if len(value) >= 6 and value[:6].isdigit():
return f"{value[0:2]}:{value[2:4]}:{value[4:6]}"
return value or "-"
def make_library_info(item_id):
item = find_library_item(item_id)
if not item:
raise RuntimeError("影像库中没有找到该数据。")
dicom_files = sorted_dicom_files(item["dicomPath"])
if not dicom_files:
raise RuntimeError("该影像数据没有可读取的 .dcm 文件。")
first = pydicom.dcmread(str(dicom_files[0]), stop_before_pixels=True, force=True)
last = pydicom.dcmread(str(dicom_files[-1]), stop_before_pixels=True, force=True)
pixel_spacing = dicom_value(first, "PixelSpacing")
matrix = f"{dicom_value(first, 'Columns')} x {dicom_value(first, 'Rows')}"
instance_range = f"{dicom_value(first, 'InstanceNumber')} - {dicom_value(last, 'InstanceNumber')}"
return {
"id": item["id"],
"patientId": item["patientId"],
"fileCount": len(dicom_files),
"groups": [
{
"title": "患者信息",
"items": [
{"label": "患者姓名", "value": dicom_value(first, "PatientName")},
{"label": "患者 ID", "value": dicom_value(first, "PatientID")},
{"label": "性别", "value": dicom_value(first, "PatientSex")},
{"label": "年龄", "value": dicom_value(first, "PatientAge")},
],
},
{
"title": "检查信息",
"items": [
{"label": "检查日期", "value": dicom_date(getattr(first, "StudyDate", ""))},
{"label": "检查时间", "value": dicom_time(getattr(first, "StudyTime", ""))},
{"label": "检查描述", "value": dicom_value(first, "StudyDescription")},
{"label": "检查号", "value": dicom_value(first, "AccessionNumber")},
{"label": "机构", "value": dicom_value(first, "InstitutionName")},
],
},
{
"title": "序列信息",
"items": [
{"label": "模态", "value": dicom_value(first, "Modality")},
{"label": "部位", "value": dicom_value(first, "BodyPartExamined")},
{"label": "序列描述", "value": dicom_value(first, "SeriesDescription")},
{"label": "序列号", "value": dicom_value(first, "SeriesNumber")},
{"label": "制造商", "value": dicom_value(first, "Manufacturer")},
],
},
{
"title": "图像参数",
"items": [
{"label": "切片数", "value": str(len(dicom_files))},
{"label": "Instance 范围", "value": instance_range},
{"label": "矩阵", "value": matrix},
{"label": "像素间距", "value": pixel_spacing},
{"label": "层厚", "value": dicom_value(first, "SliceThickness")},
{"label": "层间距", "value": dicom_value(first, "SpacingBetweenSlices")},
{"label": "卷积核", "value": dicom_value(first, "ConvolutionKernel")},
],
},
{
"title": "扫描参数",
"items": [
{"label": "KVP", "value": dicom_value(first, "KVP")},
{"label": "管电流", "value": dicom_value(first, "XRayTubeCurrent")},
{"label": "曝光时间", "value": dicom_value(first, "ExposureTime")},
{"label": "重建直径", "value": dicom_value(first, "ReconstructionDiameter")},
],
},
],
}
def parse_multipart(headers, body):
content_type = headers.get("content-type", "")
message = BytesParser(policy=default).parsebytes(
f"Content-Type: {content_type}\r\nMIME-Version: 1.0\r\n\r\n".encode("utf-8") + body
)
fields = {}
files = []
for part in message.iter_parts():
name = part.get_param("name", header="content-disposition")
filename = part.get_filename()
payload = part.get_payload(decode=True) or b""
if filename:
files.append((name, filename, payload))
elif name:
fields[name] = payload.decode("utf-8", errors="replace")
return fields, files
def write_dicom_payload(target_dir, filename, payload, index):
output_name = safe_filename(filename) or f"{index}.dcm"
output_path = target_dir / output_name
if output_path.exists():
output_path = target_dir / f"{index}_{output_name}"
output_path.write_bytes(payload)
def add_dicom_from_zip(target_dir, zip_filename, payload, start_index):
count = 0
try:
with zipfile.ZipFile(BytesIO(payload)) as archive:
for member in archive.infolist():
if member.is_dir():
continue
member_name = member.filename.replace("\\", "/")
if Path(member_name).suffix.lower() != ".dcm":
continue
if Path(member_name).is_absolute() or ".." in Path(member_name).parts:
continue
count += 1
with archive.open(member) as member_file:
write_dicom_payload(
target_dir,
f"{Path(zip_filename).stem}_{Path(member_name).name}",
member_file.read(),
start_index + count,
)
except zipfile.BadZipFile as exc:
raise RuntimeError(f"{zip_filename} 不是有效的 zip 压缩包。") from exc
return count
def validate_single_dicom_series(dicom_dir):
series_counts = {}
for dicom_path in Path(dicom_dir).glob("*.dcm"):
try:
ds = pydicom.dcmread(str(dicom_path), stop_before_pixels=True, force=True)
except Exception as exc:
raise RuntimeError(f"{dicom_path.name} 不是可读取的 DICOM 文件。") from exc
series_uid = str(getattr(ds, "SeriesInstanceUID", "") or "NO_SERIES_UID")
series_counts[series_uid] = series_counts.get(series_uid, 0) + 1
if len(series_counts) > 1:
series_summary = ", ".join(str(count) for count in sorted(series_counts.values(), reverse=True))
raise RuntimeError(
"上传内容包含多个 DICOM 序列,不能作为一个影像库数据集导入。"
f"检测到 {len(series_counts)} 个序列,分别约 {series_summary} 张。"
"请上传单个序列文件夹/ZIP。"
)
def reset_demo_environment():
source_dir = APP_DIR / "Ori_Head_CT"
source_zip = APP_DIR / "Ori_Head_CT.zip"
demo_root = LIBRARY_DIR / "demo_ori_head_ct"
demo_dicom_dir = demo_root / "dicom"
if not source_dir.exists() and not source_zip.exists():
raise RuntimeError("未找到 Ori_Head_CT 或 Ori_Head_CT.zip无法恢复演示环境。")
safe_mkdir(LIBRARY_DIR)
for child in LIBRARY_DIR.iterdir():
if child.is_dir():
shutil.rmtree(child)
else:
child.unlink()
safe_mkdir(demo_dicom_dir)
copied = 0
if source_dir.exists():
for dicom_path in sorted(source_dir.glob("*.dcm")):
shutil.copy2(dicom_path, demo_dicom_dir / dicom_path.name)
copied += 1
else:
with zipfile.ZipFile(source_zip) as archive:
for member in archive.infolist():
if member.is_dir():
continue
member_name = member.filename.replace("\\", "/")
if Path(member_name).suffix.lower() != ".dcm":
continue
if Path(member_name).is_absolute() or ".." in Path(member_name).parts:
continue
copied += 1
with archive.open(member) as member_file:
write_dicom_payload(demo_dicom_dir, Path(member_name).name, member_file.read(), copied)
if copied == 0:
shutil.rmtree(demo_root)
raise RuntimeError("Ori_Head_CT 中没有找到 .dcm 文件。")
validate_single_dicom_series(demo_dicom_dir)
record = build_library_record(
"demo_ori_head_ct",
"Ori_Head_CT",
demo_dicom_dir,
source="upload",
version="DICOM-DEMO",
)
write_library_meta([record])
DICOM_FILE_CACHE.clear()
if RESULT_DIR.exists():
shutil.rmtree(RESULT_DIR)
safe_mkdir(RESULT_DIR)
with JOBS_LOCK:
JOBS.clear()
persist_jobs_locked()
write_json_file(USER_TASKS_META, {})
return {
"ok": True,
"message": "演示环境已恢复出厂设置。",
"items": [record],
}
def upload_library_item(headers, body):
fields, files = parse_multipart(headers, body)
dicom_files = [
(filename, payload)
for _, filename, payload in files
if Path(filename).suffix.lower() == ".dcm"
]
zip_files = [
(filename, payload)
for _, filename, payload in files
if Path(filename).suffix.lower() == ".zip"
]
if not dicom_files and not zip_files:
raise RuntimeError("上传内容里没有找到 .dcm 文件或 .zip 压缩包。")
item_id = time.strftime("%Y%m%d_%H%M%S_") + uuid.uuid4().hex[:8]
first_upload_name = (dicom_files[0][0] if dicom_files else zip_files[0][0])
default_patient_id = Path(first_upload_name).stem.upper() if zip_files else f"WEB_{item_id[-8:].upper()}"
patient_id = fields.get("patientId") or default_patient_id
target_dir = LIBRARY_DIR / item_id / "dicom"
reset_dir(target_dir)
for index, (filename, payload) in enumerate(dicom_files, start=1):
write_dicom_payload(target_dir, filename, payload, index)
zip_dicom_count = 0
for filename, payload in zip_files:
zip_dicom_count += add_dicom_from_zip(
target_dir,
filename,
payload,
len(dicom_files) + zip_dicom_count,
)
total_dicom_count = len(dicom_files) + zip_dicom_count
if total_dicom_count == 0:
shutil.rmtree(target_dir.parent)
raise RuntimeError("压缩包里没有找到 .dcm 文件。")
try:
validate_single_dicom_series(target_dir)
except Exception:
shutil.rmtree(target_dir.parent)
raise
record = build_library_record(
item_id,
patient_id,
target_dir,
source="upload",
version="DICOM-ZIP" if zip_files and not dicom_files else "DICOM-LIB",
)
items = [record, *list_library()]
write_library_meta(items)
return record
def reset_dir(path):
path = Path(path)
if path.exists():
shutil.rmtree(path)
path.mkdir(parents=True, exist_ok=True)
def zip_folder(source_dir, zip_path):
source_dir = Path(source_dir).resolve()
zip_path = Path(zip_path).resolve()
safe_mkdir(zip_path.parent)
if zip_path.exists():
zip_path.unlink()
with zipfile.ZipFile(zip_path, "w", compression=zipfile.ZIP_DEFLATED) as archive:
for file_path in source_dir.rglob("*"):
if file_path.is_file():
archive.write(file_path, file_path.relative_to(source_dir))
return zip_path
def read_user_tasks():
tasks = read_json_file(USER_TASKS_META, {})
return tasks if isinstance(tasks, dict) else {}
def write_user_tasks(tasks):
write_json_file(USER_TASKS_META, tasks)
def set_user_task(username, kind, job_id):
username = normalized_username(username)
tasks = read_user_tasks()
tasks.setdefault(username, {})[kind] = job_id
write_user_tasks(tasks)
def get_user_task_job(username, kind):
username = normalized_username(username)
tasks = read_user_tasks()
job_id = tasks.get(username, {}).get(kind)
if not job_id:
return None
return get_job(job_id)
def persist_jobs_locked():
write_json_file(JOBS_META, JOBS)
def load_persisted_jobs():
saved_jobs = read_json_file(JOBS_META, {})
if not isinstance(saved_jobs, dict):
return
now_text = time.strftime("%Y-%m-%d %H:%M:%S")
with JOBS_LOCK:
for job_id, job in saved_jobs.items():
if not isinstance(job, dict):
continue
if job.get("status") == "running":
job = {
**job,
"status": "failed",
"message": "后端已重启,运行中的任务已中断。",
"error": "后端服务重启后无法继续运行中的任务,请重新提交。",
"updatedAt": now_text,
}
JOBS[job_id] = job
persist_jobs_locked()
def deformation_progress_for_message(message):
if "已复制" in message:
return 20
if "正在生成四种状态" in message:
return 35
if "正在应用形变" in message:
return 55
if "正在写出四种状态" in message:
return 72
if "正在生成四状态过程对比图" in message:
return 82
return None
def set_job(job_id, **updates):
with JOBS_LOCK:
job = JOBS[job_id]
job.update(updates)
job["updatedAt"] = time.strftime("%Y-%m-%d %H:%M:%S")
persist_jobs_locked()
def get_job(job_id):
with JOBS_LOCK:
job = JOBS.get(job_id)
return dict(job) if job else None
def start_job(kind, worker, owner=None, params=None, remember_user_task=True):
job_id = uuid.uuid4().hex[:12]
owner = normalized_username(owner)
with JOBS_LOCK:
JOBS[job_id] = {
"id": job_id,
"kind": kind,
"owner": owner,
"status": "running",
"message": "任务已启动。",
"progress": 1,
"params": params or {},
"result": None,
"error": None,
"createdAt": time.strftime("%Y-%m-%d %H:%M:%S"),
"updatedAt": time.strftime("%Y-%m-%d %H:%M:%S"),
}
persist_jobs_locked()
if remember_user_task:
set_user_task(owner, kind, job_id)
def run():
try:
result = worker(job_id)
set_job(job_id, status="completed", message="任务完成。", progress=100, result=result)
except Exception as exc:
set_job(
job_id,
status="failed",
message="任务失败。",
error=str(exc),
traceback=traceback.format_exc(),
)
threading.Thread(target=run, daemon=True).start()
return get_job(job_id)
def make_preview(
input_dir,
angle_degrees,
show_cutoff_line=True,
transition_width=90,
mode="soft_transition",
gaussian_sigma=3,
):
if mode not in {"hard_boundary", "gaussian_smooth", "soft_transition"}:
mode = "soft_transition"
volume = load_dicom_volume(input_dir)
before = crop_head_neck(sagittal_mip(volume))
before_display = draw_cutoff_line(before, volume.shape[0]) if show_cutoff_line else before
if mode in {"hard_boundary", "gaussian_smooth"}:
after = preview_deform_with_cutoff_line(
before,
volume.shape[0],
float(angle_degrees),
mode,
transition_width=float(transition_width),
gaussian_sigma=float(gaussian_sigma),
show_cutoff_line=show_cutoff_line,
)
else:
after = preview_deform_2d(
before_display,
float(angle_degrees),
mode,
transition_width=float(transition_width),
gaussian_sigma=float(gaussian_sigma),
)
canvas_image = Image.new("RGB", (1440, 520), (0, 0, 0))
canvas_image.paste(fit_image(before_display, 700, 520), (0, 0))
canvas_image.paste(fit_image(after, 700, 520), (740, 0))
panels = [
("Original", before_display),
(
"Hard boundary",
preview_deform_with_cutoff_line(
before,
volume.shape[0],
float(angle_degrees),
"hard_boundary",
show_cutoff_line=show_cutoff_line,
),
),
(
"Gaussian smooth",
preview_deform_with_cutoff_line(
before,
volume.shape[0],
float(angle_degrees),
"gaussian_smooth",
gaussian_sigma=float(gaussian_sigma),
show_cutoff_line=show_cutoff_line,
),
),
(
"Soft transition",
preview_deform_2d(
before_display,
float(angle_degrees),
"soft_transition",
transition_width=float(transition_width),
),
),
]
process_image = Image.new("RGB", (1440, 430), (0, 0, 0))
process_draw = ImageDraw.Draw(process_image)
panel_width = 330
panel_height = 300
margin = 35
gap = 20
for index, (label, panel) in enumerate(panels):
x = margin + index * (panel_width + gap)
process_draw.text((x, 28), label, fill=(230, 230, 230))
process_image.paste(fit_image(panel, panel_width, panel_height), (x, 70))
process_draw.text(
(margin, 390),
f"Angle {float(angle_degrees):.1f} deg",
fill=(170, 170, 170),
)
canvas = BytesIO()
canvas_image.save(canvas, format="PNG")
encoded = base64.b64encode(canvas.getvalue()).decode("ascii")
process_canvas = BytesIO()
process_image.save(process_canvas, format="PNG")
process_encoded = base64.b64encode(process_canvas.getvalue()).decode("ascii")
return {
"image": f"data:image/png;base64,{encoded}",
"processImage": f"data:image/png;base64,{process_encoded}",
"source": str(Path(input_dir).resolve()),
"angleDegrees": float(angle_degrees),
}
def serialize_outputs(output_paths, preview_paths):
return {
"outputs": {key: str(Path(value).resolve()) for key, value in output_paths.items()},
"previews": {
"comparison": str(Path(preview_paths["comparison"]).resolve()),
"screenshots": str(Path(preview_paths["screenshots"]).resolve()),
},
"zip": None,
"stateZips": {},
}
def file_metadata(file_path):
file_path = Path(file_path).resolve()
return {
"path": str(file_path),
"name": file_path.name,
"size": file_path.stat().st_size,
}
def zip_selected_deformation_outputs(job_root, outputs, previews, package_options):
dicom_keys = package_options.get("dicom") or []
image_keys = package_options.get("images") or []
if not dicom_keys and not image_keys:
raise RuntimeError("请至少选择一个要打包的内容。")
zip_path = Path(job_root) / f"head_ct_morph_selected_{Path(job_root).name}.zip"
if zip_path.exists():
zip_path.unlink()
dicom_names = {
"original": "dicom/ct_original",
"hard_boundary": "dicom/ct_hard_boundary",
"gaussian_smooth": "dicom/ct_gaussian_smooth",
"soft_transition": "dicom/ct_soft_transition",
}
image_files = {
"comparison": Path(previews.get("comparison", "")),
"original": Path(previews.get("screenshots", "")) / "original.png",
"hard_boundary": Path(previews.get("screenshots", "")) / "hard_boundary.png",
"gaussian_smooth": Path(previews.get("screenshots", "")) / "gaussian_smooth.png",
"soft_transition": Path(previews.get("screenshots", "")) / "soft_transition.png",
}
image_names = {
"comparison": "images/process_comparison_4states.png",
"original": "images/original.png",
"hard_boundary": "images/hard_boundary.png",
"gaussian_smooth": "images/gaussian_smooth.png",
"soft_transition": "images/soft_transition.png",
}
with zipfile.ZipFile(zip_path, "w", compression=zipfile.ZIP_DEFLATED) as archive:
for key in dicom_keys:
if key not in dicom_names:
continue
source_dir = Path(outputs.get(key, ""))
if not source_dir.exists():
raise RuntimeError(f"{key} 的 DICOM 输出目录不存在。")
for file_path in source_dir.rglob("*"):
if file_path.is_file():
archive.write(file_path, Path(dicom_names[key]) / file_path.relative_to(source_dir))
for key in image_keys:
if key not in image_files:
continue
source_file = image_files[key]
if not source_file.exists():
raise RuntimeError(f"{key} 的图片输出不存在。")
archive.write(source_file, image_names[key])
return zip_path
def prepare_deformation_zip(job_id, target, package_options=None):
job = get_job(job_id)
if not job:
raise RuntimeError("任务不存在。")
if job.get("kind") != "deformation":
raise RuntimeError("该任务不是四状态生成任务。")
if job.get("status") != "completed":
raise RuntimeError("四状态任务尚未完成,无法下载。")
result = job.get("result") or {}
outputs = result.get("outputs") or {}
previews = result.get("previews") or {}
job_root = RESULT_DIR / job_id
if target == "all":
if package_options:
return zip_selected_deformation_outputs(job_root, outputs, previews, package_options)
output_dir = job_root / "four_state_output"
if not output_dir.exists():
raise RuntimeError("四状态输出目录不存在。")
return zip_folder(output_dir, job_root / f"head_ct_morph_{job_id}.zip")
state_labels = {
"original": "original",
"hard_boundary": "hard_boundary",
"gaussian_smooth": "gaussian_smooth",
"soft_transition": "soft_transition",
}
if target not in state_labels:
raise RuntimeError("未知的四状态下载类型。")
source_dir = Path(outputs.get(target, ""))
if not source_dir.exists():
raise RuntimeError("该状态的 DICOM 输出目录不存在。")
return zip_folder(source_dir, job_root / f"{target}_{job_id}.zip")
class Handler(BaseHTTPRequestHandler):
def log_message(self, format, *args):
print("%s - %s" % (self.address_string(), format % args))
def do_OPTIONS(self):
self.send_response(204)
self.send_cors_headers()
self.end_headers()
def do_GET(self):
parsed = urlparse(self.path)
if parsed.path == "/api/health":
self.send_json({"ok": True, "service": "Head CT Morph backend"})
return
if parsed.path == "/api/defaults":
self.send_json(
{
"backend": f"http://127.0.0.1:{PORT}",
}
)
return
if parsed.path == "/api/library":
self.send_json({"items": list_library()})
return
if parsed.path == "/api/library/preview":
params = parse_qs(parsed.query)
item_id = params.get("id", [""])[0]
index = params.get("index", ["0"])[0]
self.send_json(make_library_slice_preview(item_id, index))
return
if parsed.path == "/api/library/reformat-preview":
params = parse_qs(parsed.query)
item_id = params.get("id", [""])[0]
plane = params.get("plane", ["coronal"])[0]
index = params.get("index", ["0"])[0]
window = params.get("window", ["default"])[0]
model_id = params.get("modelId", [""])[0]
mask_only = params.get("maskOnly", ["0"])[0] in {"1", "true", "yes"}
self.send_json(make_library_reformat_preview(item_id, plane, index, window, model_id, mask_only))
return
if parsed.path == "/api/segmentation/list":
params = parse_qs(parsed.query)
item_id = params.get("id", [""])[0]
self.send_json({"items": list_segmentations(item_id)})
return
if parsed.path == "/api/segmentation/preview":
params = parse_qs(parsed.query)
item_id = params.get("id", [""])[0]
segmentation_id = params.get("segId", [""])[0]
plane = params.get("plane", ["coronal"])[0]
index = params.get("index", ["0"])[0]
self.send_json(make_segmentation_reformat_preview(item_id, segmentation_id, plane, index))
return
if parsed.path == "/api/library/info":
params = parse_qs(parsed.query)
item_id = params.get("id", [""])[0]
self.send_json(make_library_info(item_id))
return
if parsed.path == "/api/job":
params = parse_qs(parsed.query)
job_id = params.get("id", [""])[0]
job = get_job(job_id)
if not job:
self.send_json({"error": "任务不存在。"}, status=404)
return
self.send_json(job)
return
if parsed.path == "/api/user/job":
params = parse_qs(parsed.query)
username = params.get("username", [""])[0]
kind = params.get("kind", ["deformation"])[0]
if kind not in ["deformation", "video"]:
self.send_json({"error": "未知任务类型。"}, status=400)
return
self.send_json({"job": get_user_task_job(username, kind)})
return
if parsed.path == "/api/deformation/download":
params = parse_qs(parsed.query)
job_id = params.get("job", [""])[0]
target = params.get("target", ["all"])[0]
try:
zip_path = prepare_deformation_zip(job_id, target)
self.send_file(zip_path, "application/zip", as_attachment=True)
except Exception as exc:
self.send_json({"error": str(exc)}, status=400)
return
if parsed.path == "/api/file":
params = parse_qs(parsed.query)
file_path = Path(unquote(params.get("path", [""])[0])).resolve()
if not file_path.exists() or not file_path.is_file():
self.send_json({"error": "文件不存在。"}, status=404)
return
content_type = "application/octet-stream"
if file_path.suffix.lower() in [".png", ".jpg", ".jpeg"]:
content_type = "image/png" if file_path.suffix.lower() == ".png" else "image/jpeg"
elif file_path.suffix.lower() == ".mp4":
content_type = "video/mp4"
elif file_path.suffix.lower() == ".zip":
content_type = "application/zip"
self.send_file(
file_path,
content_type,
as_attachment=file_path.suffix.lower() == ".zip",
)
return
self.send_json({"error": "接口不存在。"}, status=404)
def do_POST(self):
parsed = urlparse(self.path)
try:
if parsed.path == "/api/library/upload":
body = self.read_bytes()
self.send_json(upload_library_item(self.headers, body), status=201)
return
if parsed.path == "/api/model/upload":
body = self.read_bytes()
self.send_json(save_uploaded_stl(self.headers, body), status=201)
return
if parsed.path == "/api/segmentation/upload":
body = self.read_bytes()
self.send_json(save_uploaded_segmentation(self.headers, body), status=201)
return
body = self.read_json()
if parsed.path == "/api/demo/reset":
self.send_json(reset_demo_environment())
return
if parsed.path == "/api/preview":
self.send_json(
make_preview(
body["inputDir"],
body.get("angleDegrees", 12),
bool(body.get("showCutoffLine", True)),
body.get("transitionWidth", 90),
body.get("mode", "soft_transition"),
body.get("gaussianSigma", 3),
)
)
return
if parsed.path == "/api/deformation/package":
source_job_id = body["jobId"]
target = body.get("target", "all")
package_options = body.get("packageOptions")
username = normalized_username(body.get("username"))
def worker(job_id):
label = "四状态总 ZIP" if target == "all" else "本状态 DICOM ZIP"
set_job(job_id, message=f"正在打包{label}...", progress=10)
zip_path = prepare_deformation_zip(source_job_id, target, package_options)
set_job(job_id, message="打包完成,准备下载...", progress=95)
return {"file": file_metadata(zip_path)}
self.send_json(
start_job(
"zip",
worker,
owner=username,
params={
"sourceJobId": source_job_id,
"target": target,
"packageOptions": package_options,
},
remember_user_task=False,
),
status=202,
)
return
if parsed.path == "/api/deformation":
input_dir = body["inputDir"]
angle_degrees = float(body.get("angleDegrees", 12))
transition_width = int(body.get("transitionWidth", 90))
username = normalized_username(body.get("username"))
def worker(job_id):
job_root = RESULT_DIR / job_id
output_dir = job_root / "four_state_output"
reset_dir(job_root)
def progress(message):
progress_value = deformation_progress_for_message(message)
updates = {"message": message}
if progress_value is not None:
updates["progress"] = progress_value
set_job(job_id, **updates)
output_paths, preview_paths = run_deformation(
input_dir,
output_dir,
angle_degrees,
transition_width,
progress,
)
return serialize_outputs(output_paths, preview_paths)
self.send_json(
start_job(
"deformation",
worker,
owner=username,
params={
"inputDir": input_dir,
"angleDegrees": angle_degrees,
"transitionWidth": transition_width,
},
),
status=202,
)
return
if parsed.path == "/api/video":
input_dir = body["inputDir"]
max_angle = float(body.get("maxAngle", 20))
duration = float(body.get("durationSeconds", 6))
show_arrow = bool(body.get("showArrow", True))
mode = body.get("mode", "hard_boundary")
if mode not in {"hard_boundary", "gaussian_smooth", "soft_transition"}:
mode = "hard_boundary"
def worker(job_id):
job_root = RESULT_DIR / job_id
reset_dir(job_root)
output_file = job_root / f"head_extension_{mode}_{job_id}.mp4"
set_job(job_id, message=f"正在生成 {mode} 视频。")
output = generate_video(input_dir, output_file, max_angle, duration, show_arrow, mode)
output = Path(output).resolve()
return {
"video": {
"path": str(output),
"name": output.name,
"size": output.stat().st_size,
}
}
self.send_json(start_job("video", worker), status=202)
return
self.send_json({"error": "接口不存在。"}, status=404)
except Exception as exc:
self.send_json(
{
"error": str(exc),
"traceback": traceback.format_exc(),
},
status=500,
)
def do_DELETE(self):
parsed = urlparse(self.path)
if parsed.path != "/api/library":
self.send_json({"error": "接口不存在。"}, status=404)
return
params = parse_qs(parsed.query)
item_id = params.get("id", [""])[0]
items = list_library()
target = next((item for item in items if item["id"] == item_id), None)
if not target:
self.send_json({"error": "影像不存在。"}, status=404)
return
remaining = [item for item in items if item["id"] != item_id]
write_library_meta(remaining)
upload_root = Path(target["dicomPath"]).resolve().parent
if upload_root.exists() and upload_root.is_relative_to(LIBRARY_DIR.resolve()):
clear_dicom_caches(target["dicomPath"])
shutil.rmtree(upload_root)
preview_cache = PREVIEW_CACHE_DIR / item_id
if preview_cache.exists():
shutil.rmtree(preview_cache)
self.send_json({"ok": True, "items": remaining})
def read_bytes(self):
length = int(self.headers.get("content-length", "0"))
if length == 0:
return b""
return self.rfile.read(length)
def read_json(self):
body = self.read_bytes()
if not body:
return {}
return json.loads(body.decode("utf-8"))
def send_file(self, file_path, content_type="application/octet-stream", as_attachment=False):
file_path = Path(file_path).resolve()
if not file_path.exists() or not file_path.is_file():
self.send_json({"error": "文件不存在。"}, status=404)
return
self.send_response(200)
self.send_cors_headers()
self.send_header("Content-Type", content_type)
if as_attachment:
self.send_header("Content-Disposition", f'attachment; filename="{file_path.name}"')
self.send_header("Content-Length", str(file_path.stat().st_size))
self.end_headers()
try:
with file_path.open("rb") as file_handle:
shutil.copyfileobj(file_handle, self.wfile)
except (BrokenPipeError, ConnectionResetError, ConnectionAbortedError):
return
def send_cors_headers(self):
self.send_header("Access-Control-Allow-Origin", "*")
self.send_header("Access-Control-Allow-Methods", "GET,POST,OPTIONS")
self.send_header("Access-Control-Allow-Headers", "Content-Type, x-file-name, x-library-id")
def send_json(self, payload, status=200):
data = json.dumps(payload, ensure_ascii=False, default=json_default).encode("utf-8")
self.send_response(status)
self.send_cors_headers()
self.send_header("Content-Type", "application/json; charset=utf-8")
self.send_header("Content-Length", str(len(data)))
self.end_headers()
self.wfile.write(data)
def main():
safe_mkdir(APP_DIR / "app_output")
safe_mkdir(APP_DIR / "ppt_video")
safe_mkdir(LIBRARY_DIR)
safe_mkdir(RESULT_DIR)
safe_mkdir(MODEL_DIR)
load_persisted_jobs()
server = ThreadingHTTPServer((HOST, PORT), Handler)
print(f"Head CT Morph backend running at http://{HOST}:{PORT}")
server.serve_forever()
if __name__ == "__main__":
main()