Files
Head_CT_Morph/web_backend.py

722 lines
25 KiB
Python

import base64
import json
import os
import shutil
import threading
import time
import traceback
import uuid
import zipfile
from email.parser import BytesParser
from email.policy import default
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from io import BytesIO
from pathlib import Path
from urllib.parse import parse_qs, quote, unquote, urlparse
os.environ.setdefault("MPLCONFIGDIR", "/tmp/head_ct_morph_matplotlib")
import pydicom
from pydicom.multival import MultiValue
from PIL import Image
from generate_head_extension_video import generate_video
from head_extension_app import (
APP_DIR,
crop_head_neck,
ct_window,
fit_image,
load_dicom_volume,
preview_deform_2d,
run_deformation,
sagittal_mip,
safe_mkdir,
)
HOST = "0.0.0.0"
PORT = 8787
JOBS = {}
JOBS_LOCK = threading.Lock()
DICOM_FILE_CACHE = {}
LIBRARY_DIR = APP_DIR / "web_library"
LIBRARY_META = LIBRARY_DIR / "library.json"
RESULT_DIR = APP_DIR / "web_results"
PREVIEW_CACHE_DIR = LIBRARY_DIR / "_preview_cache"
def json_default(value):
if isinstance(value, Path):
return str(value.resolve())
return str(value)
def now_date():
return time.strftime("%Y-%m-%d")
def safe_filename(name):
return "".join(char if char.isalnum() or char in "._-" else "_" for char in Path(name).name)
def read_library_meta():
safe_mkdir(LIBRARY_DIR)
if LIBRARY_META.exists():
return json.loads(LIBRARY_META.read_text(encoding="utf-8"))
return []
def write_library_meta(items):
safe_mkdir(LIBRARY_DIR)
LIBRARY_META.write_text(json.dumps(items, ensure_ascii=False, indent=2), encoding="utf-8")
def folder_size_mb(path):
total = 0
for file_path in Path(path).glob("*.dcm"):
total += file_path.stat().st_size
return round(total / 1024 / 1024)
def build_library_record(item_id, patient_id, dicom_path, source="upload", version="DICOM-LIB"):
dicom_dir = Path(dicom_path).resolve()
file_count = len(list(dicom_dir.glob("*.dcm")))
return {
"id": item_id,
"patientId": patient_id,
"date": now_date(),
"version": version,
"status": "processed",
"size": folder_size_mb(dicom_dir),
"previewColor": "bg-slate-900" if source == "upload" else "bg-slate-800",
"dicomPath": str(dicom_dir),
"fileCount": file_count,
"source": source,
}
def list_library():
items = [
item for item in read_library_meta()
if item.get("source") != "seed" and item.get("id") != "seed_ori_head_ct"
]
live_items = []
changed = False
for item in items:
dicom_path = Path(item.get("dicomPath", ""))
if dicom_path.exists():
item["fileCount"] = len(list(dicom_path.glob("*.dcm")))
item["size"] = folder_size_mb(dicom_path)
live_items.append(item)
else:
changed = True
if changed:
write_library_meta(live_items)
return live_items
def sort_key_for_dicom(path):
path = Path(path)
try:
ds = pydicom.dcmread(str(path), stop_before_pixels=True, force=True)
return (0, int(getattr(ds, "InstanceNumber", 0)), path.name)
except Exception:
stem = path.stem
return (1, int(stem) if stem.isdigit() else 0, path.name)
def sorted_dicom_files(dicom_dir):
dicom_dir = Path(dicom_dir).resolve()
files = list(dicom_dir.glob("*.dcm"))
signature = (
str(dicom_dir),
len(files),
max((file_path.stat().st_mtime for file_path in files), default=0),
)
cached = DICOM_FILE_CACHE.get(str(dicom_dir))
if cached and cached["signature"] == signature:
return cached["files"]
sorted_files = sorted(files, key=sort_key_for_dicom)
DICOM_FILE_CACHE[str(dicom_dir)] = {
"signature": signature,
"files": sorted_files,
}
return sorted_files
def find_library_item(item_id):
return next((item for item in list_library() if item["id"] == item_id), None)
def make_library_slice_preview(item_id, index):
item = find_library_item(item_id)
if not item:
raise RuntimeError("影像库中没有找到该数据。")
dicom_files = sorted_dicom_files(item["dicomPath"])
if not dicom_files:
raise RuntimeError("该影像数据没有可预览的 .dcm 文件。")
count = len(dicom_files)
index = max(0, min(int(index), count - 1))
cache_dir = PREVIEW_CACHE_DIR / item_id
safe_mkdir(cache_dir)
preview_path = cache_dir / f"slice_{index:04d}.png"
if not preview_path.exists():
ds = pydicom.dcmread(str(dicom_files[index]), force=True)
image = ds.pixel_array.astype("float32")
image = image * float(getattr(ds, "RescaleSlope", 1))
image = image + float(getattr(ds, "RescaleIntercept", 0))
preview = Image.fromarray(ct_window(image)).convert("RGB")
preview = fit_image(preview, 720, 520)
preview.save(preview_path, format="PNG")
neighbors = [value for value in [index - 1, index + 1] if 0 <= value < count]
return {
"imageUrl": f"/api/file?path={quote(str(preview_path.resolve()), safe='')}",
"index": index,
"count": count,
"file": dicom_files[index].name,
"patientId": item["patientId"],
"neighbors": [
f"/api/library/preview?id={item_id}&index={neighbor}"
for neighbor in neighbors
],
}
def dicom_value(ds, name, fallback="-"):
value = getattr(ds, name, fallback)
if value in [None, ""]:
return fallback
if isinstance(value, (list, tuple, MultiValue)):
return " / ".join(str(item) for item in value)
return str(value)
def dicom_date(value):
value = str(value or "")
if len(value) == 8 and value.isdigit():
return f"{value[0:4]}-{value[4:6]}-{value[6:8]}"
return value or "-"
def dicom_time(value):
value = str(value or "")
if len(value) >= 6 and value[:6].isdigit():
return f"{value[0:2]}:{value[2:4]}:{value[4:6]}"
return value or "-"
def make_library_info(item_id):
item = find_library_item(item_id)
if not item:
raise RuntimeError("影像库中没有找到该数据。")
dicom_files = sorted_dicom_files(item["dicomPath"])
if not dicom_files:
raise RuntimeError("该影像数据没有可读取的 .dcm 文件。")
first = pydicom.dcmread(str(dicom_files[0]), stop_before_pixels=True, force=True)
last = pydicom.dcmread(str(dicom_files[-1]), stop_before_pixels=True, force=True)
pixel_spacing = dicom_value(first, "PixelSpacing")
matrix = f"{dicom_value(first, 'Columns')} x {dicom_value(first, 'Rows')}"
instance_range = f"{dicom_value(first, 'InstanceNumber')} - {dicom_value(last, 'InstanceNumber')}"
return {
"id": item["id"],
"patientId": item["patientId"],
"fileCount": len(dicom_files),
"groups": [
{
"title": "患者信息",
"items": [
{"label": "患者姓名", "value": dicom_value(first, "PatientName")},
{"label": "患者 ID", "value": dicom_value(first, "PatientID")},
{"label": "性别", "value": dicom_value(first, "PatientSex")},
{"label": "年龄", "value": dicom_value(first, "PatientAge")},
],
},
{
"title": "检查信息",
"items": [
{"label": "检查日期", "value": dicom_date(getattr(first, "StudyDate", ""))},
{"label": "检查时间", "value": dicom_time(getattr(first, "StudyTime", ""))},
{"label": "检查描述", "value": dicom_value(first, "StudyDescription")},
{"label": "检查号", "value": dicom_value(first, "AccessionNumber")},
{"label": "机构", "value": dicom_value(first, "InstitutionName")},
],
},
{
"title": "序列信息",
"items": [
{"label": "模态", "value": dicom_value(first, "Modality")},
{"label": "部位", "value": dicom_value(first, "BodyPartExamined")},
{"label": "序列描述", "value": dicom_value(first, "SeriesDescription")},
{"label": "序列号", "value": dicom_value(first, "SeriesNumber")},
{"label": "制造商", "value": dicom_value(first, "Manufacturer")},
],
},
{
"title": "图像参数",
"items": [
{"label": "切片数", "value": str(len(dicom_files))},
{"label": "Instance 范围", "value": instance_range},
{"label": "矩阵", "value": matrix},
{"label": "像素间距", "value": pixel_spacing},
{"label": "层厚", "value": dicom_value(first, "SliceThickness")},
{"label": "层间距", "value": dicom_value(first, "SpacingBetweenSlices")},
{"label": "卷积核", "value": dicom_value(first, "ConvolutionKernel")},
],
},
{
"title": "扫描参数",
"items": [
{"label": "KVP", "value": dicom_value(first, "KVP")},
{"label": "管电流", "value": dicom_value(first, "XRayTubeCurrent")},
{"label": "曝光时间", "value": dicom_value(first, "ExposureTime")},
{"label": "重建直径", "value": dicom_value(first, "ReconstructionDiameter")},
],
},
],
}
def parse_multipart(headers, body):
content_type = headers.get("content-type", "")
message = BytesParser(policy=default).parsebytes(
f"Content-Type: {content_type}\r\nMIME-Version: 1.0\r\n\r\n".encode("utf-8") + body
)
fields = {}
files = []
for part in message.iter_parts():
name = part.get_param("name", header="content-disposition")
filename = part.get_filename()
payload = part.get_payload(decode=True) or b""
if filename:
files.append((name, filename, payload))
elif name:
fields[name] = payload.decode("utf-8", errors="replace")
return fields, files
def write_dicom_payload(target_dir, filename, payload, index):
output_name = safe_filename(filename) or f"{index}.dcm"
output_path = target_dir / output_name
if output_path.exists():
output_path = target_dir / f"{index}_{output_name}"
output_path.write_bytes(payload)
def add_dicom_from_zip(target_dir, zip_filename, payload, start_index):
count = 0
try:
with zipfile.ZipFile(BytesIO(payload)) as archive:
for member in archive.infolist():
if member.is_dir():
continue
member_name = member.filename.replace("\\", "/")
if Path(member_name).suffix.lower() != ".dcm":
continue
if Path(member_name).is_absolute() or ".." in Path(member_name).parts:
continue
count += 1
with archive.open(member) as member_file:
write_dicom_payload(
target_dir,
f"{Path(zip_filename).stem}_{Path(member_name).name}",
member_file.read(),
start_index + count,
)
except zipfile.BadZipFile as exc:
raise RuntimeError(f"{zip_filename} 不是有效的 zip 压缩包。") from exc
return count
def upload_library_item(headers, body):
fields, files = parse_multipart(headers, body)
dicom_files = [
(filename, payload)
for _, filename, payload in files
if Path(filename).suffix.lower() == ".dcm"
]
zip_files = [
(filename, payload)
for _, filename, payload in files
if Path(filename).suffix.lower() == ".zip"
]
if not dicom_files and not zip_files:
raise RuntimeError("上传内容里没有找到 .dcm 文件或 .zip 压缩包。")
item_id = time.strftime("%Y%m%d_%H%M%S_") + uuid.uuid4().hex[:8]
first_upload_name = (dicom_files[0][0] if dicom_files else zip_files[0][0])
default_patient_id = Path(first_upload_name).stem.upper() if zip_files else f"WEB_{item_id[-8:].upper()}"
patient_id = fields.get("patientId") or default_patient_id
target_dir = LIBRARY_DIR / item_id / "dicom"
reset_dir(target_dir)
for index, (filename, payload) in enumerate(dicom_files, start=1):
write_dicom_payload(target_dir, filename, payload, index)
zip_dicom_count = 0
for filename, payload in zip_files:
zip_dicom_count += add_dicom_from_zip(
target_dir,
filename,
payload,
len(dicom_files) + zip_dicom_count,
)
total_dicom_count = len(dicom_files) + zip_dicom_count
if total_dicom_count == 0:
shutil.rmtree(target_dir.parent)
raise RuntimeError("压缩包里没有找到 .dcm 文件。")
record = build_library_record(
item_id,
patient_id,
target_dir,
source="upload",
version="DICOM-ZIP" if zip_files and not dicom_files else "DICOM-LIB",
)
items = [record, *list_library()]
write_library_meta(items)
return record
def reset_dir(path):
path = Path(path)
if path.exists():
shutil.rmtree(path)
path.mkdir(parents=True, exist_ok=True)
def zip_folder(source_dir, zip_path):
source_dir = Path(source_dir).resolve()
zip_path = Path(zip_path).resolve()
safe_mkdir(zip_path.parent)
if zip_path.exists():
zip_path.unlink()
with zipfile.ZipFile(zip_path, "w", compression=zipfile.ZIP_DEFLATED) as archive:
for file_path in source_dir.rglob("*"):
if file_path.is_file():
archive.write(file_path, file_path.relative_to(source_dir))
return zip_path
def set_job(job_id, **updates):
with JOBS_LOCK:
job = JOBS[job_id]
job.update(updates)
job["updatedAt"] = time.strftime("%Y-%m-%d %H:%M:%S")
def get_job(job_id):
with JOBS_LOCK:
job = JOBS.get(job_id)
return dict(job) if job else None
def start_job(kind, worker):
job_id = uuid.uuid4().hex[:12]
with JOBS_LOCK:
JOBS[job_id] = {
"id": job_id,
"kind": kind,
"status": "running",
"message": "任务已启动。",
"result": None,
"error": None,
"createdAt": time.strftime("%Y-%m-%d %H:%M:%S"),
"updatedAt": time.strftime("%Y-%m-%d %H:%M:%S"),
}
def run():
try:
result = worker(job_id)
set_job(job_id, status="completed", message="任务完成。", result=result)
except Exception as exc:
set_job(
job_id,
status="failed",
message="任务失败。",
error=str(exc),
traceback=traceback.format_exc(),
)
threading.Thread(target=run, daemon=True).start()
return get_job(job_id)
def make_preview(input_dir, angle_degrees):
volume = load_dicom_volume(input_dir)
before = crop_head_neck(sagittal_mip(volume))
after = preview_deform_2d(before, float(angle_degrees))
canvas = BytesIO()
preview = fit_image(after, 720, 520)
preview.save(canvas, format="PNG")
encoded = base64.b64encode(canvas.getvalue()).decode("ascii")
return {
"image": f"data:image/png;base64,{encoded}",
"source": str(Path(input_dir).resolve()),
"angleDegrees": float(angle_degrees),
}
def zip_metadata(zip_path):
zip_path = Path(zip_path).resolve()
return {
"path": str(zip_path),
"name": zip_path.name,
"size": zip_path.stat().st_size,
}
def serialize_outputs(output_paths, preview_paths, zip_path=None, state_zips=None):
return {
"outputs": {key: str(Path(value).resolve()) for key, value in output_paths.items()},
"previews": {
"comparison": str(Path(preview_paths["comparison"]).resolve()),
"screenshots": str(Path(preview_paths["screenshots"]).resolve()),
},
"zip": zip_metadata(zip_path) if zip_path else None,
"stateZips": {
key: zip_metadata(value)
for key, value in (state_zips or {}).items()
},
}
class Handler(BaseHTTPRequestHandler):
def log_message(self, format, *args):
print("%s - %s" % (self.address_string(), format % args))
def do_OPTIONS(self):
self.send_response(204)
self.send_cors_headers()
self.end_headers()
def do_GET(self):
parsed = urlparse(self.path)
if parsed.path == "/api/health":
self.send_json({"ok": True, "service": "Head CT Morph backend"})
return
if parsed.path == "/api/defaults":
self.send_json(
{
"backend": f"http://127.0.0.1:{PORT}",
}
)
return
if parsed.path == "/api/library":
self.send_json({"items": list_library()})
return
if parsed.path == "/api/library/preview":
params = parse_qs(parsed.query)
item_id = params.get("id", [""])[0]
index = params.get("index", ["0"])[0]
self.send_json(make_library_slice_preview(item_id, index))
return
if parsed.path == "/api/library/info":
params = parse_qs(parsed.query)
item_id = params.get("id", [""])[0]
self.send_json(make_library_info(item_id))
return
if parsed.path == "/api/job":
params = parse_qs(parsed.query)
job_id = params.get("id", [""])[0]
job = get_job(job_id)
if not job:
self.send_json({"error": "任务不存在。"}, status=404)
return
self.send_json(job)
return
if parsed.path == "/api/file":
params = parse_qs(parsed.query)
file_path = Path(unquote(params.get("path", [""])[0])).resolve()
if not file_path.exists() or not file_path.is_file():
self.send_json({"error": "文件不存在。"}, status=404)
return
content_type = "application/octet-stream"
if file_path.suffix.lower() in [".png", ".jpg", ".jpeg"]:
content_type = "image/png" if file_path.suffix.lower() == ".png" else "image/jpeg"
elif file_path.suffix.lower() == ".mp4":
content_type = "video/mp4"
elif file_path.suffix.lower() == ".zip":
content_type = "application/zip"
data = file_path.read_bytes()
self.send_response(200)
self.send_cors_headers()
self.send_header("Content-Type", content_type)
if file_path.suffix.lower() == ".zip":
self.send_header("Content-Disposition", f'attachment; filename="{file_path.name}"')
self.send_header("Content-Length", str(len(data)))
self.end_headers()
self.wfile.write(data)
return
self.send_json({"error": "接口不存在。"}, status=404)
def do_POST(self):
parsed = urlparse(self.path)
try:
if parsed.path == "/api/library/upload":
body = self.read_bytes()
self.send_json(upload_library_item(self.headers, body), status=201)
return
body = self.read_json()
if parsed.path == "/api/preview":
self.send_json(make_preview(body["inputDir"], body.get("angleDegrees", 12)))
return
if parsed.path == "/api/deformation":
input_dir = body["inputDir"]
angle_degrees = float(body.get("angleDegrees", 12))
transition_width = int(body.get("transitionWidth", 90))
def worker(job_id):
job_root = RESULT_DIR / job_id
output_dir = job_root / "four_state_output"
reset_dir(job_root)
def progress(message):
set_job(job_id, message=message)
output_paths, preview_paths = run_deformation(
input_dir,
output_dir,
angle_degrees,
transition_width,
progress,
)
set_job(job_id, message="正在打包各状态 DICOM ZIP...")
state_zips = {}
for state_key in [
"original",
"hard_boundary",
"gaussian_smooth",
"soft_transition",
]:
state_zips[state_key] = zip_folder(
output_paths[state_key],
job_root / f"{state_key}_{job_id}.zip",
)
set_job(job_id, message="正在打包四状态输出 ZIP...")
zip_path = zip_folder(
output_dir,
job_root / f"head_ct_morph_{job_id}.zip",
)
return serialize_outputs(output_paths, preview_paths, zip_path, state_zips)
self.send_json(start_job("deformation", worker), status=202)
return
if parsed.path == "/api/video":
input_dir = body["inputDir"]
max_angle = float(body.get("maxAngle", 20))
duration = float(body.get("durationSeconds", 6))
def worker(job_id):
job_root = RESULT_DIR / job_id
reset_dir(job_root)
output_file = job_root / f"head_extension_{job_id}.mp4"
set_job(job_id, message="正在生成 0° 到目标角度的视频。")
output = generate_video(input_dir, output_file, max_angle, duration)
output = Path(output).resolve()
return {
"video": {
"path": str(output),
"name": output.name,
"size": output.stat().st_size,
}
}
self.send_json(start_job("video", worker), status=202)
return
self.send_json({"error": "接口不存在。"}, status=404)
except Exception as exc:
self.send_json(
{
"error": str(exc),
"traceback": traceback.format_exc(),
},
status=500,
)
def do_DELETE(self):
parsed = urlparse(self.path)
if parsed.path != "/api/library":
self.send_json({"error": "接口不存在。"}, status=404)
return
params = parse_qs(parsed.query)
item_id = params.get("id", [""])[0]
items = list_library()
target = next((item for item in items if item["id"] == item_id), None)
if not target:
self.send_json({"error": "影像不存在。"}, status=404)
return
remaining = [item for item in items if item["id"] != item_id]
write_library_meta(remaining)
upload_root = Path(target["dicomPath"]).resolve().parent
if upload_root.exists() and upload_root.is_relative_to(LIBRARY_DIR.resolve()):
DICOM_FILE_CACHE.pop(str(Path(target["dicomPath"]).resolve()), None)
shutil.rmtree(upload_root)
preview_cache = PREVIEW_CACHE_DIR / item_id
if preview_cache.exists():
shutil.rmtree(preview_cache)
self.send_json({"ok": True, "items": remaining})
def read_bytes(self):
length = int(self.headers.get("content-length", "0"))
if length == 0:
return b""
return self.rfile.read(length)
def read_json(self):
body = self.read_bytes()
if not body:
return {}
return json.loads(body.decode("utf-8"))
def send_cors_headers(self):
self.send_header("Access-Control-Allow-Origin", "*")
self.send_header("Access-Control-Allow-Methods", "GET,POST,OPTIONS")
self.send_header("Access-Control-Allow-Headers", "Content-Type")
def send_json(self, payload, status=200):
data = json.dumps(payload, ensure_ascii=False, default=json_default).encode("utf-8")
self.send_response(status)
self.send_cors_headers()
self.send_header("Content-Type", "application/json; charset=utf-8")
self.send_header("Content-Length", str(len(data)))
self.end_headers()
self.wfile.write(data)
def main():
safe_mkdir(APP_DIR / "app_output")
safe_mkdir(APP_DIR / "ppt_video")
safe_mkdir(LIBRARY_DIR)
safe_mkdir(RESULT_DIR)
server = ThreadingHTTPServer((HOST, PORT), Handler)
print(f"Head CT Morph backend running at http://{HOST}:{PORT}")
server.serve_forever()
if __name__ == "__main__":
main()