import base64 import json import os import shutil import threading import time import traceback import uuid import zipfile from email.parser import BytesParser from email.policy import default from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer from io import BytesIO from pathlib import Path from urllib.parse import parse_qs, quote, unquote, urlparse os.environ.setdefault("MPLCONFIGDIR", "/tmp/head_ct_morph_matplotlib") import pydicom from pydicom.multival import MultiValue from PIL import Image from generate_head_extension_video import generate_video from head_extension_app import ( APP_DIR, crop_head_neck, ct_window, draw_cutoff_line, fit_image, load_dicom_volume, preview_deform_2d, run_deformation, sagittal_mip, safe_mkdir, ) HOST = "0.0.0.0" PORT = 8787 JOBS = {} JOBS_LOCK = threading.Lock() DICOM_FILE_CACHE = {} LIBRARY_DIR = APP_DIR / "web_library" LIBRARY_META = LIBRARY_DIR / "library.json" RESULT_DIR = APP_DIR / "web_results" JOBS_META = RESULT_DIR / "jobs.json" USER_TASKS_META = RESULT_DIR / "user_tasks.json" PREVIEW_CACHE_DIR = LIBRARY_DIR / "_preview_cache" def json_default(value): if isinstance(value, Path): return str(value.resolve()) return str(value) def now_date(): return time.strftime("%Y-%m-%d") def safe_filename(name): return "".join(char if char.isalnum() or char in "._-" else "_" for char in Path(name).name) def normalized_username(username): username = str(username or "").strip() return username or "anonymous" def read_json_file(path, default): path = Path(path) if not path.exists(): return default try: return json.loads(path.read_text(encoding="utf-8")) except Exception: return default def write_json_file(path, value): path = Path(path) safe_mkdir(path.parent) path.write_text(json.dumps(value, ensure_ascii=False, indent=2, default=json_default), encoding="utf-8") def read_library_meta(): safe_mkdir(LIBRARY_DIR) if LIBRARY_META.exists(): return json.loads(LIBRARY_META.read_text(encoding="utf-8")) return [] def write_library_meta(items): safe_mkdir(LIBRARY_DIR) LIBRARY_META.write_text(json.dumps(items, ensure_ascii=False, indent=2), encoding="utf-8") def folder_size_mb(path): total = 0 for file_path in Path(path).glob("*.dcm"): total += file_path.stat().st_size return round(total / 1024 / 1024) def build_library_record(item_id, patient_id, dicom_path, source="upload", version="DICOM-LIB"): dicom_dir = Path(dicom_path).resolve() file_count = len(list(dicom_dir.glob("*.dcm"))) return { "id": item_id, "patientId": patient_id, "date": now_date(), "version": version, "status": "processed", "size": folder_size_mb(dicom_dir), "previewColor": "bg-slate-900" if source == "upload" else "bg-slate-800", "dicomPath": str(dicom_dir), "fileCount": file_count, "source": source, } def list_library(): items = [ item for item in read_library_meta() if item.get("source") != "seed" and item.get("id") != "seed_ori_head_ct" ] live_items = [] changed = False for item in items: dicom_path = Path(item.get("dicomPath", "")) if dicom_path.exists(): item["fileCount"] = len(list(dicom_path.glob("*.dcm"))) item["size"] = folder_size_mb(dicom_path) live_items.append(item) else: changed = True if changed: write_library_meta(live_items) return live_items def sort_key_for_dicom(path): path = Path(path) try: ds = pydicom.dcmread(str(path), stop_before_pixels=True, force=True) return (0, int(getattr(ds, "InstanceNumber", 0)), path.name) except Exception: stem = path.stem return (1, int(stem) if stem.isdigit() else 0, path.name) def sorted_dicom_files(dicom_dir): dicom_dir = Path(dicom_dir).resolve() files = list(dicom_dir.glob("*.dcm")) signature = ( str(dicom_dir), len(files), max((file_path.stat().st_mtime for file_path in files), default=0), ) cached = DICOM_FILE_CACHE.get(str(dicom_dir)) if cached and cached["signature"] == signature: return cached["files"] sorted_files = sorted(files, key=sort_key_for_dicom) DICOM_FILE_CACHE[str(dicom_dir)] = { "signature": signature, "files": sorted_files, } return sorted_files def find_library_item(item_id): return next((item for item in list_library() if item["id"] == item_id), None) def make_library_slice_preview(item_id, index): item = find_library_item(item_id) if not item: raise RuntimeError("影像库中没有找到该数据。") dicom_files = sorted_dicom_files(item["dicomPath"]) if not dicom_files: raise RuntimeError("该影像数据没有可预览的 .dcm 文件。") count = len(dicom_files) index = max(0, min(int(index), count - 1)) cache_dir = PREVIEW_CACHE_DIR / item_id safe_mkdir(cache_dir) preview_path = cache_dir / f"slice_{index:04d}.png" if not preview_path.exists(): ds = pydicom.dcmread(str(dicom_files[index]), force=True) image = ds.pixel_array.astype("float32") image = image * float(getattr(ds, "RescaleSlope", 1)) image = image + float(getattr(ds, "RescaleIntercept", 0)) preview = Image.fromarray(ct_window(image)).convert("RGB") preview = fit_image(preview, 720, 520) preview.save(preview_path, format="PNG") neighbors = [value for value in [index - 1, index + 1] if 0 <= value < count] return { "imageUrl": f"/api/file?path={quote(str(preview_path.resolve()), safe='')}", "index": index, "count": count, "file": dicom_files[index].name, "patientId": item["patientId"], "neighbors": [ f"/api/library/preview?id={item_id}&index={neighbor}" for neighbor in neighbors ], } def dicom_value(ds, name, fallback="-"): value = getattr(ds, name, fallback) if value in [None, ""]: return fallback if isinstance(value, (list, tuple, MultiValue)): return " / ".join(str(item) for item in value) return str(value) def dicom_date(value): value = str(value or "") if len(value) == 8 and value.isdigit(): return f"{value[0:4]}-{value[4:6]}-{value[6:8]}" return value or "-" def dicom_time(value): value = str(value or "") if len(value) >= 6 and value[:6].isdigit(): return f"{value[0:2]}:{value[2:4]}:{value[4:6]}" return value or "-" def make_library_info(item_id): item = find_library_item(item_id) if not item: raise RuntimeError("影像库中没有找到该数据。") dicom_files = sorted_dicom_files(item["dicomPath"]) if not dicom_files: raise RuntimeError("该影像数据没有可读取的 .dcm 文件。") first = pydicom.dcmread(str(dicom_files[0]), stop_before_pixels=True, force=True) last = pydicom.dcmread(str(dicom_files[-1]), stop_before_pixels=True, force=True) pixel_spacing = dicom_value(first, "PixelSpacing") matrix = f"{dicom_value(first, 'Columns')} x {dicom_value(first, 'Rows')}" instance_range = f"{dicom_value(first, 'InstanceNumber')} - {dicom_value(last, 'InstanceNumber')}" return { "id": item["id"], "patientId": item["patientId"], "fileCount": len(dicom_files), "groups": [ { "title": "患者信息", "items": [ {"label": "患者姓名", "value": dicom_value(first, "PatientName")}, {"label": "患者 ID", "value": dicom_value(first, "PatientID")}, {"label": "性别", "value": dicom_value(first, "PatientSex")}, {"label": "年龄", "value": dicom_value(first, "PatientAge")}, ], }, { "title": "检查信息", "items": [ {"label": "检查日期", "value": dicom_date(getattr(first, "StudyDate", ""))}, {"label": "检查时间", "value": dicom_time(getattr(first, "StudyTime", ""))}, {"label": "检查描述", "value": dicom_value(first, "StudyDescription")}, {"label": "检查号", "value": dicom_value(first, "AccessionNumber")}, {"label": "机构", "value": dicom_value(first, "InstitutionName")}, ], }, { "title": "序列信息", "items": [ {"label": "模态", "value": dicom_value(first, "Modality")}, {"label": "部位", "value": dicom_value(first, "BodyPartExamined")}, {"label": "序列描述", "value": dicom_value(first, "SeriesDescription")}, {"label": "序列号", "value": dicom_value(first, "SeriesNumber")}, {"label": "制造商", "value": dicom_value(first, "Manufacturer")}, ], }, { "title": "图像参数", "items": [ {"label": "切片数", "value": str(len(dicom_files))}, {"label": "Instance 范围", "value": instance_range}, {"label": "矩阵", "value": matrix}, {"label": "像素间距", "value": pixel_spacing}, {"label": "层厚", "value": dicom_value(first, "SliceThickness")}, {"label": "层间距", "value": dicom_value(first, "SpacingBetweenSlices")}, {"label": "卷积核", "value": dicom_value(first, "ConvolutionKernel")}, ], }, { "title": "扫描参数", "items": [ {"label": "KVP", "value": dicom_value(first, "KVP")}, {"label": "管电流", "value": dicom_value(first, "XRayTubeCurrent")}, {"label": "曝光时间", "value": dicom_value(first, "ExposureTime")}, {"label": "重建直径", "value": dicom_value(first, "ReconstructionDiameter")}, ], }, ], } def parse_multipart(headers, body): content_type = headers.get("content-type", "") message = BytesParser(policy=default).parsebytes( f"Content-Type: {content_type}\r\nMIME-Version: 1.0\r\n\r\n".encode("utf-8") + body ) fields = {} files = [] for part in message.iter_parts(): name = part.get_param("name", header="content-disposition") filename = part.get_filename() payload = part.get_payload(decode=True) or b"" if filename: files.append((name, filename, payload)) elif name: fields[name] = payload.decode("utf-8", errors="replace") return fields, files def write_dicom_payload(target_dir, filename, payload, index): output_name = safe_filename(filename) or f"{index}.dcm" output_path = target_dir / output_name if output_path.exists(): output_path = target_dir / f"{index}_{output_name}" output_path.write_bytes(payload) def add_dicom_from_zip(target_dir, zip_filename, payload, start_index): count = 0 try: with zipfile.ZipFile(BytesIO(payload)) as archive: for member in archive.infolist(): if member.is_dir(): continue member_name = member.filename.replace("\\", "/") if Path(member_name).suffix.lower() != ".dcm": continue if Path(member_name).is_absolute() or ".." in Path(member_name).parts: continue count += 1 with archive.open(member) as member_file: write_dicom_payload( target_dir, f"{Path(zip_filename).stem}_{Path(member_name).name}", member_file.read(), start_index + count, ) except zipfile.BadZipFile as exc: raise RuntimeError(f"{zip_filename} 不是有效的 zip 压缩包。") from exc return count def upload_library_item(headers, body): fields, files = parse_multipart(headers, body) dicom_files = [ (filename, payload) for _, filename, payload in files if Path(filename).suffix.lower() == ".dcm" ] zip_files = [ (filename, payload) for _, filename, payload in files if Path(filename).suffix.lower() == ".zip" ] if not dicom_files and not zip_files: raise RuntimeError("上传内容里没有找到 .dcm 文件或 .zip 压缩包。") item_id = time.strftime("%Y%m%d_%H%M%S_") + uuid.uuid4().hex[:8] first_upload_name = (dicom_files[0][0] if dicom_files else zip_files[0][0]) default_patient_id = Path(first_upload_name).stem.upper() if zip_files else f"WEB_{item_id[-8:].upper()}" patient_id = fields.get("patientId") or default_patient_id target_dir = LIBRARY_DIR / item_id / "dicom" reset_dir(target_dir) for index, (filename, payload) in enumerate(dicom_files, start=1): write_dicom_payload(target_dir, filename, payload, index) zip_dicom_count = 0 for filename, payload in zip_files: zip_dicom_count += add_dicom_from_zip( target_dir, filename, payload, len(dicom_files) + zip_dicom_count, ) total_dicom_count = len(dicom_files) + zip_dicom_count if total_dicom_count == 0: shutil.rmtree(target_dir.parent) raise RuntimeError("压缩包里没有找到 .dcm 文件。") record = build_library_record( item_id, patient_id, target_dir, source="upload", version="DICOM-ZIP" if zip_files and not dicom_files else "DICOM-LIB", ) items = [record, *list_library()] write_library_meta(items) return record def reset_dir(path): path = Path(path) if path.exists(): shutil.rmtree(path) path.mkdir(parents=True, exist_ok=True) def zip_folder(source_dir, zip_path): source_dir = Path(source_dir).resolve() zip_path = Path(zip_path).resolve() safe_mkdir(zip_path.parent) if zip_path.exists(): zip_path.unlink() with zipfile.ZipFile(zip_path, "w", compression=zipfile.ZIP_DEFLATED) as archive: for file_path in source_dir.rglob("*"): if file_path.is_file(): archive.write(file_path, file_path.relative_to(source_dir)) return zip_path def read_user_tasks(): tasks = read_json_file(USER_TASKS_META, {}) return tasks if isinstance(tasks, dict) else {} def write_user_tasks(tasks): write_json_file(USER_TASKS_META, tasks) def set_user_task(username, kind, job_id): username = normalized_username(username) tasks = read_user_tasks() tasks.setdefault(username, {})[kind] = job_id write_user_tasks(tasks) def get_user_task_job(username, kind): username = normalized_username(username) tasks = read_user_tasks() job_id = tasks.get(username, {}).get(kind) if not job_id: return None return get_job(job_id) def persist_jobs_locked(): write_json_file(JOBS_META, JOBS) def load_persisted_jobs(): saved_jobs = read_json_file(JOBS_META, {}) if not isinstance(saved_jobs, dict): return now_text = time.strftime("%Y-%m-%d %H:%M:%S") with JOBS_LOCK: for job_id, job in saved_jobs.items(): if not isinstance(job, dict): continue if job.get("status") == "running": job = { **job, "status": "failed", "message": "后端已重启,运行中的任务已中断。", "error": "后端服务重启后无法继续运行中的任务,请重新提交。", "updatedAt": now_text, } JOBS[job_id] = job persist_jobs_locked() def deformation_progress_for_message(message): if "已复制" in message: return 20 if "正在生成四种状态" in message: return 35 if "正在应用形变" in message: return 55 if "正在写出四种状态" in message: return 72 if "正在生成四状态过程对比图" in message: return 82 return None def set_job(job_id, **updates): with JOBS_LOCK: job = JOBS[job_id] job.update(updates) job["updatedAt"] = time.strftime("%Y-%m-%d %H:%M:%S") persist_jobs_locked() def get_job(job_id): with JOBS_LOCK: job = JOBS.get(job_id) return dict(job) if job else None def start_job(kind, worker, owner=None, params=None, remember_user_task=True): job_id = uuid.uuid4().hex[:12] owner = normalized_username(owner) with JOBS_LOCK: JOBS[job_id] = { "id": job_id, "kind": kind, "owner": owner, "status": "running", "message": "任务已启动。", "progress": 1, "params": params or {}, "result": None, "error": None, "createdAt": time.strftime("%Y-%m-%d %H:%M:%S"), "updatedAt": time.strftime("%Y-%m-%d %H:%M:%S"), } persist_jobs_locked() if remember_user_task: set_user_task(owner, kind, job_id) def run(): try: result = worker(job_id) set_job(job_id, status="completed", message="任务完成。", progress=100, result=result) except Exception as exc: set_job( job_id, status="failed", message="任务失败。", error=str(exc), traceback=traceback.format_exc(), ) threading.Thread(target=run, daemon=True).start() return get_job(job_id) def make_preview(input_dir, angle_degrees, show_cutoff_line=True): volume = load_dicom_volume(input_dir) before = crop_head_neck(sagittal_mip(volume)) before_display = draw_cutoff_line(before, volume.shape[0]) if show_cutoff_line else before after = preview_deform_2d(before_display, float(angle_degrees)) canvas_image = Image.new("RGB", (1440, 520), (0, 0, 0)) canvas_image.paste(fit_image(before_display, 700, 520), (0, 0)) canvas_image.paste(fit_image(after, 700, 520), (740, 0)) canvas = BytesIO() canvas_image.save(canvas, format="PNG") encoded = base64.b64encode(canvas.getvalue()).decode("ascii") return { "image": f"data:image/png;base64,{encoded}", "source": str(Path(input_dir).resolve()), "angleDegrees": float(angle_degrees), } def serialize_outputs(output_paths, preview_paths): return { "outputs": {key: str(Path(value).resolve()) for key, value in output_paths.items()}, "previews": { "comparison": str(Path(preview_paths["comparison"]).resolve()), "screenshots": str(Path(preview_paths["screenshots"]).resolve()), }, "zip": None, "stateZips": {}, } def file_metadata(file_path): file_path = Path(file_path).resolve() return { "path": str(file_path), "name": file_path.name, "size": file_path.stat().st_size, } def zip_selected_deformation_outputs(job_root, outputs, previews, package_options): dicom_keys = package_options.get("dicom") or [] image_keys = package_options.get("images") or [] if not dicom_keys and not image_keys: raise RuntimeError("请至少选择一个要打包的内容。") zip_path = Path(job_root) / f"head_ct_morph_selected_{Path(job_root).name}.zip" if zip_path.exists(): zip_path.unlink() dicom_names = { "original": "dicom/ct_original", "hard_boundary": "dicom/ct_hard_boundary", "gaussian_smooth": "dicom/ct_gaussian_smooth", "soft_transition": "dicom/ct_soft_transition", } image_files = { "comparison": Path(previews.get("comparison", "")), "original": Path(previews.get("screenshots", "")) / "original.png", "hard_boundary": Path(previews.get("screenshots", "")) / "hard_boundary.png", "gaussian_smooth": Path(previews.get("screenshots", "")) / "gaussian_smooth.png", "soft_transition": Path(previews.get("screenshots", "")) / "soft_transition.png", } image_names = { "comparison": "images/process_comparison_4states.png", "original": "images/original.png", "hard_boundary": "images/hard_boundary.png", "gaussian_smooth": "images/gaussian_smooth.png", "soft_transition": "images/soft_transition.png", } with zipfile.ZipFile(zip_path, "w", compression=zipfile.ZIP_DEFLATED) as archive: for key in dicom_keys: if key not in dicom_names: continue source_dir = Path(outputs.get(key, "")) if not source_dir.exists(): raise RuntimeError(f"{key} 的 DICOM 输出目录不存在。") for file_path in source_dir.rglob("*"): if file_path.is_file(): archive.write(file_path, Path(dicom_names[key]) / file_path.relative_to(source_dir)) for key in image_keys: if key not in image_files: continue source_file = image_files[key] if not source_file.exists(): raise RuntimeError(f"{key} 的图片输出不存在。") archive.write(source_file, image_names[key]) return zip_path def prepare_deformation_zip(job_id, target, package_options=None): job = get_job(job_id) if not job: raise RuntimeError("任务不存在。") if job.get("kind") != "deformation": raise RuntimeError("该任务不是四状态生成任务。") if job.get("status") != "completed": raise RuntimeError("四状态任务尚未完成,无法下载。") result = job.get("result") or {} outputs = result.get("outputs") or {} previews = result.get("previews") or {} job_root = RESULT_DIR / job_id if target == "all": if package_options: return zip_selected_deformation_outputs(job_root, outputs, previews, package_options) output_dir = job_root / "four_state_output" if not output_dir.exists(): raise RuntimeError("四状态输出目录不存在。") return zip_folder(output_dir, job_root / f"head_ct_morph_{job_id}.zip") state_labels = { "original": "original", "hard_boundary": "hard_boundary", "gaussian_smooth": "gaussian_smooth", "soft_transition": "soft_transition", } if target not in state_labels: raise RuntimeError("未知的四状态下载类型。") source_dir = Path(outputs.get(target, "")) if not source_dir.exists(): raise RuntimeError("该状态的 DICOM 输出目录不存在。") return zip_folder(source_dir, job_root / f"{target}_{job_id}.zip") class Handler(BaseHTTPRequestHandler): def log_message(self, format, *args): print("%s - %s" % (self.address_string(), format % args)) def do_OPTIONS(self): self.send_response(204) self.send_cors_headers() self.end_headers() def do_GET(self): parsed = urlparse(self.path) if parsed.path == "/api/health": self.send_json({"ok": True, "service": "Head CT Morph backend"}) return if parsed.path == "/api/defaults": self.send_json( { "backend": f"http://127.0.0.1:{PORT}", } ) return if parsed.path == "/api/library": self.send_json({"items": list_library()}) return if parsed.path == "/api/library/preview": params = parse_qs(parsed.query) item_id = params.get("id", [""])[0] index = params.get("index", ["0"])[0] self.send_json(make_library_slice_preview(item_id, index)) return if parsed.path == "/api/library/info": params = parse_qs(parsed.query) item_id = params.get("id", [""])[0] self.send_json(make_library_info(item_id)) return if parsed.path == "/api/job": params = parse_qs(parsed.query) job_id = params.get("id", [""])[0] job = get_job(job_id) if not job: self.send_json({"error": "任务不存在。"}, status=404) return self.send_json(job) return if parsed.path == "/api/user/job": params = parse_qs(parsed.query) username = params.get("username", [""])[0] kind = params.get("kind", ["deformation"])[0] if kind not in ["deformation", "video"]: self.send_json({"error": "未知任务类型。"}, status=400) return self.send_json({"job": get_user_task_job(username, kind)}) return if parsed.path == "/api/deformation/download": params = parse_qs(parsed.query) job_id = params.get("job", [""])[0] target = params.get("target", ["all"])[0] try: zip_path = prepare_deformation_zip(job_id, target) self.send_file(zip_path, "application/zip", as_attachment=True) except Exception as exc: self.send_json({"error": str(exc)}, status=400) return if parsed.path == "/api/file": params = parse_qs(parsed.query) file_path = Path(unquote(params.get("path", [""])[0])).resolve() if not file_path.exists() or not file_path.is_file(): self.send_json({"error": "文件不存在。"}, status=404) return content_type = "application/octet-stream" if file_path.suffix.lower() in [".png", ".jpg", ".jpeg"]: content_type = "image/png" if file_path.suffix.lower() == ".png" else "image/jpeg" elif file_path.suffix.lower() == ".mp4": content_type = "video/mp4" elif file_path.suffix.lower() == ".zip": content_type = "application/zip" self.send_file( file_path, content_type, as_attachment=file_path.suffix.lower() == ".zip", ) return self.send_json({"error": "接口不存在。"}, status=404) def do_POST(self): parsed = urlparse(self.path) try: if parsed.path == "/api/library/upload": body = self.read_bytes() self.send_json(upload_library_item(self.headers, body), status=201) return body = self.read_json() if parsed.path == "/api/preview": self.send_json( make_preview( body["inputDir"], body.get("angleDegrees", 12), bool(body.get("showCutoffLine", True)), ) ) return if parsed.path == "/api/deformation/package": source_job_id = body["jobId"] target = body.get("target", "all") package_options = body.get("packageOptions") username = normalized_username(body.get("username")) def worker(job_id): label = "四状态总 ZIP" if target == "all" else "本状态 DICOM ZIP" set_job(job_id, message=f"正在打包{label}...", progress=10) zip_path = prepare_deformation_zip(source_job_id, target, package_options) set_job(job_id, message="打包完成,准备下载...", progress=95) return {"file": file_metadata(zip_path)} self.send_json( start_job( "zip", worker, owner=username, params={ "sourceJobId": source_job_id, "target": target, "packageOptions": package_options, }, remember_user_task=False, ), status=202, ) return if parsed.path == "/api/deformation": input_dir = body["inputDir"] angle_degrees = float(body.get("angleDegrees", 12)) transition_width = int(body.get("transitionWidth", 90)) username = normalized_username(body.get("username")) def worker(job_id): job_root = RESULT_DIR / job_id output_dir = job_root / "four_state_output" reset_dir(job_root) def progress(message): progress_value = deformation_progress_for_message(message) updates = {"message": message} if progress_value is not None: updates["progress"] = progress_value set_job(job_id, **updates) output_paths, preview_paths = run_deformation( input_dir, output_dir, angle_degrees, transition_width, progress, ) return serialize_outputs(output_paths, preview_paths) self.send_json( start_job( "deformation", worker, owner=username, params={ "inputDir": input_dir, "angleDegrees": angle_degrees, "transitionWidth": transition_width, }, ), status=202, ) return if parsed.path == "/api/video": input_dir = body["inputDir"] max_angle = float(body.get("maxAngle", 20)) duration = float(body.get("durationSeconds", 6)) def worker(job_id): job_root = RESULT_DIR / job_id reset_dir(job_root) output_file = job_root / f"head_extension_{job_id}.mp4" set_job(job_id, message="正在生成 0° 到目标角度的视频。") output = generate_video(input_dir, output_file, max_angle, duration) output = Path(output).resolve() return { "video": { "path": str(output), "name": output.name, "size": output.stat().st_size, } } self.send_json(start_job("video", worker), status=202) return self.send_json({"error": "接口不存在。"}, status=404) except Exception as exc: self.send_json( { "error": str(exc), "traceback": traceback.format_exc(), }, status=500, ) def do_DELETE(self): parsed = urlparse(self.path) if parsed.path != "/api/library": self.send_json({"error": "接口不存在。"}, status=404) return params = parse_qs(parsed.query) item_id = params.get("id", [""])[0] items = list_library() target = next((item for item in items if item["id"] == item_id), None) if not target: self.send_json({"error": "影像不存在。"}, status=404) return remaining = [item for item in items if item["id"] != item_id] write_library_meta(remaining) upload_root = Path(target["dicomPath"]).resolve().parent if upload_root.exists() and upload_root.is_relative_to(LIBRARY_DIR.resolve()): DICOM_FILE_CACHE.pop(str(Path(target["dicomPath"]).resolve()), None) shutil.rmtree(upload_root) preview_cache = PREVIEW_CACHE_DIR / item_id if preview_cache.exists(): shutil.rmtree(preview_cache) self.send_json({"ok": True, "items": remaining}) def read_bytes(self): length = int(self.headers.get("content-length", "0")) if length == 0: return b"" return self.rfile.read(length) def read_json(self): body = self.read_bytes() if not body: return {} return json.loads(body.decode("utf-8")) def send_file(self, file_path, content_type="application/octet-stream", as_attachment=False): file_path = Path(file_path).resolve() if not file_path.exists() or not file_path.is_file(): self.send_json({"error": "文件不存在。"}, status=404) return self.send_response(200) self.send_cors_headers() self.send_header("Content-Type", content_type) if as_attachment: self.send_header("Content-Disposition", f'attachment; filename="{file_path.name}"') self.send_header("Content-Length", str(file_path.stat().st_size)) self.end_headers() with file_path.open("rb") as file_handle: shutil.copyfileobj(file_handle, self.wfile) def send_cors_headers(self): self.send_header("Access-Control-Allow-Origin", "*") self.send_header("Access-Control-Allow-Methods", "GET,POST,OPTIONS") self.send_header("Access-Control-Allow-Headers", "Content-Type") def send_json(self, payload, status=200): data = json.dumps(payload, ensure_ascii=False, default=json_default).encode("utf-8") self.send_response(status) self.send_cors_headers() self.send_header("Content-Type", "application/json; charset=utf-8") self.send_header("Content-Length", str(len(data))) self.end_headers() self.wfile.write(data) def main(): safe_mkdir(APP_DIR / "app_output") safe_mkdir(APP_DIR / "ppt_video") safe_mkdir(LIBRARY_DIR) safe_mkdir(RESULT_DIR) load_persisted_jobs() server = ThreadingHTTPServer((HOST, PORT), Handler) print(f"Head CT Morph backend running at http://{HOST}:{PORT}") server.serve_forever() if __name__ == "__main__": main()