first commit

This commit is contained in:
2026-05-02 17:40:07 +08:00
commit 664fec7485
20 changed files with 7271 additions and 0 deletions

31
.gitignore vendored Normal file
View File

@@ -0,0 +1,31 @@
# Python
__pycache__/
*.py[cod]
*.pyo
.pytest_cache/
.mypy_cache/
.ruff_cache/
.venv/
venv/
# Node/Vite
WebSite/node_modules/
WebSite/dist/
WebSite/.env.local
# Runtime outputs and uploaded data
app_output/
app_previews/
ppt_video/
web_library/
web_results/
# Local DICOM/demo data and archives
Ori_Head_CT/
*.dcm
*.zip
*.mp4
# OS/editor
.DS_Store
Thumbs.db

25
README.md Normal file
View File

@@ -0,0 +1,25 @@
# Head CT Morph
头颈部 CT 仰头形变工具与网页工作站。
## 主要入口
- `web_backend.py`: 本地 Python API 后端
- `head_extension_app.py`: 四状态 DICOM 形变核心逻辑与桌面界面
- `generate_head_extension_video.py`: 仰头角度变化 MP4 生成
- `video_generator_app.py`: 视频生成桌面界面
- `WebSite/`: React/Vite 网页工作站
## 本地运行
```bash
python -m pip install -r requirements.txt
cd WebSite
npm install
npm run backend
npm run dev
```
## 数据说明
仓库默认不提交 DICOM 数据、上传数据、运行结果、视频、ZIP 和 `node_modules`。这些内容会在本地运行时生成或由网页上传。

9
WebSite/.env.example Normal file
View File

@@ -0,0 +1,9 @@
# GEMINI_API_KEY: Required for Gemini AI API calls.
# AI Studio automatically injects this at runtime from user secrets.
# Users configure this via the Secrets panel in the AI Studio UI.
GEMINI_API_KEY="MY_GEMINI_API_KEY"
# APP_URL: The URL where this applet is hosted.
# AI Studio automatically injects this at runtime with the Cloud Run service URL.
# Used for self-referential links, OAuth callbacks, and API endpoints.
APP_URL="MY_APP_URL"

8
WebSite/.gitignore vendored Normal file
View File

@@ -0,0 +1,8 @@
node_modules/
build/
dist/
coverage/
.DS_Store
*.log
.env*
!.env.example

41
WebSite/README.md Normal file
View File

@@ -0,0 +1,41 @@
<div align="center">
<img width="1200" height="475" alt="GHBanner" src="https://github.com/user-attachments/assets/0aa67016-6eaf-458a-adb2-6e31a0763ed6" />
</div>
# Run and deploy your AI Studio app
This contains everything you need to run your app locally.
View your app in AI Studio: https://ai.studio/apps/b9650cf9-3b26-4e84-a699-78ba380bb4db
## Run Locally
**Prerequisites:** Node.js, Python 3.8+
1. Install Python dependencies in the project root:
`pip install -r ../requirements.txt`
2. Install website dependencies:
`npm install`
3. Start the local Python backend:
`npm run backend`
4. In another terminal, run the website:
`npm run dev`
The website talks to `http://127.0.0.1:8787` and maps UI actions to:
- `head_extension_app.py`: preview and four-state DICOM deformation output
- `generate_head_extension_video.py`: 0° to target-angle MP4 generation
- `video_generator_app.py`: kept as the desktop GUI wrapper for the same video generator
## Data Flow
The Image Library is now the source of DICOM data for the workstation:
1. Open `数据影像库`.
2. Click `上传文件夹` to choose a folder that contains `.dcm` files, or click `上传压缩包` to upload a `.zip` archive containing `.dcm` files.
3. Click `调阅工作站` on a library item.
4. The `影像变换工作站` will run preview, four-state deformation, and video generation from that selected library dataset.
5. Four-state deformation results are packaged by the backend as a downloadable `.zip` file under `../web_results/`.
6. Generated videos are also written under `../web_results/` and exposed as downloadable `.mp4` files.
Uploaded DICOM datasets are stored by the local backend under `../web_library/`.

13
WebSite/index.html Normal file
View File

@@ -0,0 +1,13 @@
<!doctype html>
<html lang="en">
<head>
<meta charset="UTF-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>My Google AI Studio App</title>
</head>
<body>
<div id="root"></div>
<script type="module" src="/src/main.tsx"></script>
</body>
</html>

6
WebSite/metadata.json Normal file
View File

@@ -0,0 +1,6 @@
{
"name": "",
"description": "",
"requestFramePermissions": [],
"majorCapabilities": []
}

4320
WebSite/package-lock.json generated Normal file

File diff suppressed because it is too large Load Diff

35
WebSite/package.json Normal file
View File

@@ -0,0 +1,35 @@
{
"name": "react-example",
"private": true,
"version": "0.0.0",
"type": "module",
"scripts": {
"dev": "vite --port=3000 --host=0.0.0.0",
"backend": "python ../web_backend.py",
"build": "vite build",
"preview": "vite preview",
"clean": "rm -rf dist",
"lint": "tsc --noEmit"
},
"dependencies": {
"@google/genai": "^1.29.0",
"@tailwindcss/vite": "^4.1.14",
"@vitejs/plugin-react": "^5.0.4",
"lucide-react": "^0.546.0",
"react": "^19.0.1",
"react-dom": "^19.0.1",
"vite": "^6.2.3",
"express": "^4.21.2",
"dotenv": "^17.2.3",
"motion": "^12.23.24"
},
"devDependencies": {
"@types/node": "^22.14.0",
"autoprefixer": "^10.4.21",
"tailwindcss": "^4.1.14",
"tsx": "^4.21.0",
"typescript": "~5.8.2",
"vite": "^6.2.3",
"@types/express": "^4.17.21"
}
}

1048
WebSite/src/App.tsx Normal file

File diff suppressed because it is too large Load Diff

1
WebSite/src/index.css Normal file
View File

@@ -0,0 +1 @@
@import "tailwindcss";

10
WebSite/src/main.tsx Normal file
View File

@@ -0,0 +1,10 @@
import {StrictMode} from 'react';
import {createRoot} from 'react-dom/client';
import App from './App.tsx';
import './index.css';
createRoot(document.getElementById('root')!).render(
<StrictMode>
<App />
</StrictMode>,
);

26
WebSite/tsconfig.json Normal file
View File

@@ -0,0 +1,26 @@
{
"compilerOptions": {
"target": "ES2022",
"experimentalDecorators": true,
"useDefineForClassFields": false,
"module": "ESNext",
"lib": [
"ES2022",
"DOM",
"DOM.Iterable"
],
"skipLibCheck": true,
"moduleResolution": "bundler",
"isolatedModules": true,
"moduleDetection": "force",
"allowJs": true,
"jsx": "react-jsx",
"paths": {
"@/*": [
"./*"
]
},
"allowImportingTsExtensions": true,
"noEmit": true
}
}

24
WebSite/vite.config.ts Normal file
View File

@@ -0,0 +1,24 @@
import tailwindcss from '@tailwindcss/vite';
import react from '@vitejs/plugin-react';
import path from 'path';
import {defineConfig, loadEnv} from 'vite';
export default defineConfig(({mode}) => {
const env = loadEnv(mode, '.', '');
return {
plugins: [react(), tailwindcss()],
define: {
'process.env.GEMINI_API_KEY': JSON.stringify(env.GEMINI_API_KEY),
},
resolve: {
alias: {
'@': path.resolve(__dirname, '.'),
},
},
server: {
// HMR is disabled in AI Studio via DISABLE_HMR env var.
// Do not modify—file watching is disabled to prevent flickering during agent edits.
hmr: process.env.DISABLE_HMR !== 'true',
},
};
});

View File

@@ -0,0 +1,190 @@
import os
import argparse
from pathlib import Path
import imageio.v2 as imageio
import numpy as np
from PIL import Image, ImageDraw, ImageFont
from scipy.ndimage import map_coordinates
from head_extension_app import (
crop_head_neck,
fit_image,
load_dicom_volume,
sagittal_mip,
)
OUTPUT_DIR = Path("ppt_video")
FPS = 30
DURATION_SECONDS = 6
END_HOLD_SECONDS = 1
def video_soft_bend_2d(image, angle_degrees):
"""Video-only 2D deformation with a broad neck transition.
The app's fast preview uses a compact transition, which is useful for
interactive feedback but can visibly split vertebrae in animation. This
broad ramp keeps the head and upper cervical spine moving together and
blends gradually into the lower neck/shoulder region.
"""
arr = np.asarray(image.convert("L")).astype(np.float32)
height, width = arr.shape
yy, xx = np.mgrid[0:height, 0:width]
pivot_x = int(width * 0.55)
pivot_y = int(height * 0.62)
# Broad transition: nearly full motion for head/upper C-spine, then a long
# smooth blend through the lower C-spine to avoid splitting bone structures.
full_motion_y = height * 0.50
fixed_y = height * 0.92
t = np.clip((yy - full_motion_y) / (fixed_y - full_motion_y), 0, 1)
weight = 1 - (t * t * (3 - 2 * t))
# A small x-dependent term keeps the posterior contour from looking like a
# straight sliced plane while remaining deterministic and smooth.
x_soft = np.clip((xx - width * 0.15) / (width * 0.75), 0, 1)
x_soft = x_soft * x_soft * (3 - 2 * x_soft)
weight = np.clip(weight * (0.90 + 0.10 * x_soft), 0, 1)
theta = np.deg2rad(angle_degrees) * weight
cos_t = np.cos(theta)
sin_t = np.sin(theta)
dx = xx - pivot_x
dy = yy - pivot_y
src_x = pivot_x + cos_t * dx + sin_t * dy
src_y = pivot_y - sin_t * dx + cos_t * dy
warped = map_coordinates(arr, [src_y, src_x], order=1, mode="constant", cval=0)
return Image.fromarray(np.clip(warped, 0, 255).astype(np.uint8)).convert("RGB")
def get_font(size):
for path in [r"C:\Windows\Fonts\arial.ttf", r"C:\Windows\Fonts\calibri.ttf"]:
if os.path.exists(path):
return ImageFont.truetype(path, size)
return ImageFont.load_default()
def smoothstep(t):
return t * t * (3 - 2 * t)
def make_frame(before_image, angle, max_angle):
after_image = video_soft_bend_2d(before_image, angle)
frame = Image.new("RGB", (1920, 1080), (0, 0, 0))
draw = ImageDraw.Draw(frame)
left = fit_image(before_image, 760, 720)
right = fit_image(after_image, 760, 720)
frame.paste(left, (120, 245))
frame.paste(right, (1040, 245))
title_font = get_font(48)
angle_font = get_font(56)
draw.text((135, 120), "Original: 0 deg", font=title_font, fill=(255, 255, 255))
draw.text(
(1055, 120),
f"Head extension: {angle:04.1f} deg",
font=title_font,
fill=(255, 255, 255),
)
# Yellow direction arrow on the animated side.
arrow = (255, 210, 60)
x0, y0, x1, y1 = 1390, 405, 1515, 335
draw.line((x0, y0, x1, y1), fill=arrow, width=8)
draw.polygon([(x1, y1), (x1 - 34, y1 + 3), (x1 - 12, y1 + 29)], fill=arrow)
# Minimal progress bar.
bar_x, bar_y, bar_w, bar_h = 1040, 980, 760, 12
draw.rounded_rectangle(
(bar_x, bar_y, bar_x + bar_w, bar_y + bar_h),
radius=6,
fill=(70, 70, 70),
)
fill_w = int(bar_w * angle / max_angle) if max_angle else 0
draw.rounded_rectangle(
(bar_x, bar_y, bar_x + fill_w, bar_y + bar_h),
radius=6,
fill=arrow,
)
draw.text((1040, 1000), "0 deg", font=get_font(24), fill=(210, 210, 210))
draw.text((1745, 1000), f"{max_angle:g} deg", font=get_font(24), fill=(210, 210, 210))
# Invisible-looking spacer: keeps the font imported above alive for some PIL builds.
_ = angle_font
return frame
def parse_args():
parser = argparse.ArgumentParser(
description="Generate a 0-degree to target-angle head-extension MP4 video."
)
parser.add_argument(
"--input",
default="input_ct_2F",
help="Input DICOM folder. Default: input_ct_2F",
)
parser.add_argument(
"--output",
default=str(OUTPUT_DIR / "head_extension_0_to_20deg.mp4"),
help="Output MP4 file path.",
)
parser.add_argument(
"--max-angle",
type=float,
default=20.0,
help="Target head-extension angle. Default: 20",
)
parser.add_argument(
"--duration",
type=float,
default=DURATION_SECONDS,
help="Animation duration in seconds before final hold. Default: 6",
)
return parser.parse_args()
def generate_video(input_dir, output_path, max_angle=20.0, duration_seconds=6.0):
output_file = Path(output_path)
output_file.parent.mkdir(parents=True, exist_ok=True)
volume = load_dicom_volume(input_dir)
before_image = crop_head_neck(sagittal_mip(volume))
moving_frames = int(FPS * duration_seconds)
hold_frames = FPS * END_HOLD_SECONDS
with imageio.get_writer(
output_file,
format="FFMPEG",
fps=FPS,
codec="libx264",
quality=8,
macro_block_size=1,
) as writer:
for index in range(moving_frames):
t = index / (moving_frames - 1)
angle = max_angle * smoothstep(t)
writer.append_data(np.asarray(make_frame(before_image, angle, max_angle)))
for _ in range(hold_frames):
writer.append_data(np.asarray(make_frame(before_image, max_angle, max_angle)))
return output_file.resolve()
def main():
args = parse_args()
output_file = generate_video(args.input, args.output, args.max_angle, args.duration)
print(output_file)
if __name__ == "__main__":
main()

641
head_extension_app.py Normal file
View File

@@ -0,0 +1,641 @@
import json
import os
import shutil
import tempfile
import threading
import time
import uuid
from pathlib import Path
from tkinter import (
BOTH,
DISABLED,
END,
HORIZONTAL,
NORMAL,
Button,
Entry,
Frame,
Label,
Scale,
StringVar,
Tk,
filedialog,
messagebox,
)
import numpy as np
import pydicom
import SimpleITK as sitk
from PIL import Image, ImageDraw, ImageFont, ImageTk
from platipy.imaging.generation.mask import get_external_mask
from platipy.imaging.registration.utils import apply_transform
np.alen = len
from DeformHeadCT.VolumeInfo import VolumeDeformation, convert_nifti_to_dicom_series
from DeformHeadCT.deformation import HeadDeformation, generate_field_rotation
APP_DIR = Path(__file__).resolve().parent
RUNTIME_DIR = Path(tempfile.gettempdir()) / "HeadExtensionApp"
PREVIEW_DIR = APP_DIR / "app_previews"
DEFAULT_COORDINATES_CUTOFF = [[145, 305, 256], [135, 205, 256]]
DEFAULT_VERTEBRAE = {
"Oc-C1": [220, 255, 256],
"C1-C2": [208, 255, 256],
"C2-C3": [190, 255, 256],
"C3-C4": [172, 258, 256],
"C4-C5": [154, 262, 256],
"C5-C6": [136, 268, 256],
"C6-C7": [118, 274, 256],
"C7-T1": [100, 282, 256],
}
STATE_LABELS = [
("original", "Original", "ct_original"),
("hard_boundary", "Hard boundary", "ct_hard_boundary"),
("gaussian_smooth", "Gaussian smooth", "ct_gaussian_smooth"),
("soft_transition", "Soft transition", "ct_soft_transition"),
]
def safe_mkdir(path):
Path(path).mkdir(parents=True, exist_ok=True)
def copy_dicom_to_ascii_folder(input_dir, run_id):
target = RUNTIME_DIR / run_id / "input_ct"
safe_mkdir(target)
count = 0
for file_path in Path(input_dir).iterdir():
if file_path.is_file() and file_path.suffix.lower() == ".dcm":
shutil.copy2(file_path, target / file_path.name)
count += 1
if count == 0:
raise RuntimeError("输入文件夹里没有找到 .dcm 文件。")
return target, count
def load_dicom_volume(input_dir):
items = []
for file_path in Path(input_dir).iterdir():
if file_path.is_file() and file_path.suffix.lower() == ".dcm":
ds = pydicom.dcmread(str(file_path), force=True)
instance = int(getattr(ds, "InstanceNumber", len(items)))
items.append((instance, ds))
if not items:
raise RuntimeError("输入文件夹里没有找到 .dcm 文件。")
items.sort(key=lambda item: item[0])
volume = []
for _, ds in items:
image = ds.pixel_array.astype(np.float32)
image = image * float(getattr(ds, "RescaleSlope", 1))
image = image + float(getattr(ds, "RescaleIntercept", 0))
volume.append(image)
return np.stack(volume, axis=0)
def ct_window(image, low=-500, high=1200):
image = np.clip((image - low) / (high - low), 0, 1)
return (image * 255).astype(np.uint8)
def sagittal_mip(volume):
x0 = max(0, volume.shape[2] // 2 - 21)
x1 = min(volume.shape[2], volume.shape[2] // 2 + 24)
image = ct_window(np.max(volume[:, :, x0:x1], axis=2))
return Image.fromarray(image).convert("RGB")
def crop_head_neck(image):
width, height = image.size
left = int(width * 0.09)
right = int(width * 0.89)
top = 0
bottom = int(height * 0.72)
return image.crop((left, top, right, bottom))
def fit_image(image, width, height):
scale = min(width / image.width, height / image.height)
resized = image.resize(
(int(image.width * scale), int(image.height * scale)),
Image.Resampling.LANCZOS,
)
canvas = Image.new("RGB", (width, height), (0, 0, 0))
canvas.paste(resized, ((width - resized.width) // 2, (height - resized.height) // 2))
return canvas
def preview_deform_2d(image, angle_degrees):
"""Fast visual preview only. The DICOM output uses the real 3D field."""
try:
from scipy.ndimage import map_coordinates
except Exception:
return image
arr = np.asarray(image.convert("L")).astype(np.float32)
h, w = arr.shape
yy, xx = np.mgrid[0:h, 0:w]
pivot_x = int(w * 0.55)
pivot_y = int(h * 0.62)
full_motion_y = h * 0.50
fixed_y = h * 0.92
t = np.clip((yy - full_motion_y) / (fixed_y - full_motion_y), 0, 1)
weight = 1 - (t * t * (3 - 2 * t))
x_soft = np.clip((xx - w * 0.15) / (w * 0.75), 0, 1)
x_soft = x_soft * x_soft * (3 - 2 * x_soft)
weight = np.clip(weight * (0.90 + 0.10 * x_soft), 0, 1)
theta = np.deg2rad(angle_degrees) * weight
cos_t = np.cos(theta)
sin_t = np.sin(theta)
dx = xx - pivot_x
dy = yy - pivot_y
src_x = pivot_x + cos_t * dx + sin_t * dy
src_y = pivot_y - sin_t * dx + cos_t * dy
warped = map_coordinates(arr, [src_y, src_x], order=1, mode="constant", cval=0)
return Image.fromarray(np.clip(warped, 0, 255).astype(np.uint8)).convert("RGB")
def transition_weight(image, coordinates_cutoff, width_voxels):
size_x, size_y, size_z = image.GetSize()
center_z = float(np.mean([p[0] for p in coordinates_cutoff]))
edge0 = center_z - float(width_voxels)
edge1 = center_z + float(width_voxels)
z_grid = np.arange(size_z, dtype=np.float32)[:, None, None]
weight = np.clip((z_grid - edge0) / (edge1 - edge0), 0, 1)
weight = weight * weight * (3 - 2 * weight)
return np.broadcast_to(weight, (size_z, size_y, size_x)).astype(np.float32)
def hard_boundary_weight(image, coordinates_cutoff):
size_x, size_y, size_z = image.GetSize()
center_z = float(np.mean([p[0] for p in coordinates_cutoff]))
z_grid = np.arange(size_z, dtype=np.float32)[:, None, None]
weight = (z_grid >= center_z).astype(np.float32)
return np.broadcast_to(weight, (size_z, size_y, size_x)).astype(np.float32)
def image_from_dvf_array(dvf_arr, reference_image):
dvf = sitk.GetImageFromArray(dvf_arr.astype(np.float32))
dvf.CopyInformation(reference_image)
return dvf
def apply_dvf_to_ct(image_ct, dvf):
transform = sitk.DisplacementFieldTransform(sitk.Cast(dvf, sitk.sitkVectorFloat64))
return apply_transform(
image_ct,
transform=transform,
interpolator=sitk.sitkLinear,
default_value=int(sitk.GetArrayViewFromImage(image_ct).min()),
)
def reset_folder(path):
path = Path(path)
if path.exists():
shutil.rmtree(path)
path.mkdir(parents=True, exist_ok=True)
def write_dicom_series(image, reference_dicom_dir, output_dir, run_root, state_key):
output_dir = Path(output_dir)
reset_folder(output_dir)
ascii_output_dir = run_root / "dicom_output" / state_key
reset_folder(ascii_output_dir)
convert_nifti_to_dicom_series(
image=image,
reference_dcm=str(reference_dicom_dir),
output_directory=str(ascii_output_dir),
)
for dicom_path in ascii_output_dir.glob("*.dcm"):
shutil.copy2(dicom_path, output_dir / dicom_path.name)
def write_info_json(info_path, input_dir, temp_dir, output_dir, angle_degrees, transition_width):
data = {
"name": "HEAD_EXTENSION",
"InputDirectory": str(input_dir).replace("\\", "/"),
"TempDirectory": str(temp_dir).replace("\\", "/"),
"OutputDirectory": str(output_dir).replace("\\", "/"),
"axes": [0, 0, -1],
"angles": [float(angle_degrees)],
"coordinates_cutoff": DEFAULT_COORDINATES_CUTOFF,
"transition_width_voxels": int(transition_width),
}
data.update(DEFAULT_VERTEBRAE)
info_path.write_text(json.dumps(data, indent=2), encoding="utf-8")
def run_deformation(input_dir, output_dir, angle_degrees, transition_width, progress):
run_id = time.strftime("%Y%m%d_%H%M%S_") + uuid.uuid4().hex[:8]
run_root = RUNTIME_DIR / run_id
temp_dir = run_root / "temp"
ascii_input_dir, copied_count = copy_dicom_to_ascii_folder(input_dir, run_id)
safe_mkdir(output_dir)
info_path = run_root / "head_extension.json"
write_info_json(
info_path,
ascii_input_dir,
temp_dir,
Path(output_dir),
angle_degrees,
transition_width,
)
progress(f"已复制 {copied_count} 张 DICOM开始转换体数据...")
vol_info = VolumeDeformation(InfoFile=str(info_path))
vol_info.PrepareDcmData()
patient_image_dir = vol_info.nifti_directory / vol_info.patientunderscore / "IMAGES"
if not patient_image_dir.exists():
patient_image_dir = next(vol_info.nifti_directory.glob("*/IMAGES"))
vol_info.patientunderscore = patient_image_dir.parent.name
root_image_dir = vol_info.nifti_directory / "IMAGES"
safe_mkdir(root_image_dir)
for image_path in patient_image_dir.glob("*.nii.gz"):
shutil.copy2(image_path, root_image_dir / image_path.name)
progress("正在生成四种状态的三维位移场...")
head_def = HeadDeformation(vol_info.nifti_directory, vol_info.patientunderscore, 0)
external_mask = get_external_mask(head_def.image_ct)
_, _, full_rotation_dvf, _ = generate_field_rotation(
head_def.image_ct,
external_mask,
tuple(vol_info.point_of_rotation[0]),
axis_of_rotation=vol_info.axes,
angle=-vol_info.angles[0] * np.pi / 180,
gaussian_smooth=0,
)
base_dvf_arr = sitk.GetArrayFromImage(full_rotation_dvf).astype(np.float32)
mask_arr = sitk.GetArrayFromImage(external_mask).astype(np.float32)
hard_weight = hard_boundary_weight(head_def.image_ct, vol_info.coordinates_cutoff)
soft_weight = transition_weight(
head_def.image_ct,
vol_info.coordinates_cutoff,
int(transition_width),
)
hard_dvf = image_from_dvf_array(
base_dvf_arr * (hard_weight * mask_arr)[..., None],
head_def.image_ct,
)
gaussian_dvf = sitk.SmoothingRecursiveGaussian(hard_dvf, 3)
soft_dvf = image_from_dvf_array(
base_dvf_arr * (soft_weight * mask_arr)[..., None],
head_def.image_ct,
)
soft_dvf = sitk.SmoothingRecursiveGaussian(soft_dvf, 3)
progress("正在应用形变...")
state_images = {
"original": head_def.image_ct,
"hard_boundary": apply_dvf_to_ct(head_def.image_ct, hard_dvf),
"gaussian_smooth": apply_dvf_to_ct(head_def.image_ct, gaussian_dvf),
"soft_transition": apply_dvf_to_ct(head_def.image_ct, soft_dvf),
}
progress("正在写出四种状态的完整 DICOM 序列...")
reference_dicom_dir = vol_info.nifti_directory / "dicom" / "ct"
output_paths = {}
for state_key, _, folder_name in STATE_LABELS:
state_output_dir = Path(output_dir) / folder_name
write_dicom_series(
state_images[state_key],
reference_dicom_dir,
state_output_dir,
run_root,
state_key,
)
output_paths[state_key] = state_output_dir
legacy_soft_dir = Path(output_dir) / "ct"
write_dicom_series(
state_images["soft_transition"],
reference_dicom_dir,
legacy_soft_dir,
run_root,
"legacy_soft_transition",
)
output_paths["legacy_soft"] = legacy_soft_dir
progress("正在生成四状态过程对比图...")
preview_paths = make_four_state_preview(state_images, Path(output_dir), angle_degrees)
make_output_preview_from_images(
state_images["original"],
state_images["soft_transition"],
Path(output_dir),
angle_degrees,
)
return output_paths, preview_paths
def make_output_preview(original_dir, deformed_dicom_dir, output_dir, angle_degrees):
safe_mkdir(PREVIEW_DIR)
orig = load_dicom_volume(original_dir)
deformed = load_dicom_volume(deformed_dicom_dir)[::-1]
before = crop_head_neck(sagittal_mip(orig))
after = crop_head_neck(sagittal_mip(deformed))
slide = Image.new("RGB", (2560, 1440), (0, 0, 0))
draw = ImageDraw.Draw(slide)
font_path = Path(r"C:\Windows\Fonts\arial.ttf")
title_font = ImageFont.truetype(str(font_path), 58) if font_path.exists() else ImageFont.load_default()
slide.paste(fit_image(before, 920, 675), (280, 390))
slide.paste(fit_image(after, 920, 675), (1360, 390))
draw.text((300, 190), "Before", font=title_font, fill=(255, 255, 255))
draw.text(
(1380, 190),
f"After: {angle_degrees:g} deg head extension",
font=title_font,
fill=(255, 255, 255),
)
arrow = (255, 210, 60)
x0, y0, x1, y1 = 1685, 545, 1855, 455
draw.line((x0, y0, x1, y1), fill=arrow, width=10)
draw.polygon([(x1, y1), (x1 - 42, y1 + 5), (x1 - 15, y1 + 36)], fill=arrow)
preview_path = output_dir / "before_after_preview.png"
slide.save(preview_path, quality=95)
return preview_path
def sitk_sagittal_panel(image):
volume = sitk.GetArrayFromImage(image)[::-1]
return crop_head_neck(sagittal_mip(volume))
def make_output_preview_from_images(original_image, deformed_image, output_dir, angle_degrees):
before = sitk_sagittal_panel(original_image)
after = sitk_sagittal_panel(deformed_image)
slide = Image.new("RGB", (2560, 1440), (0, 0, 0))
draw = ImageDraw.Draw(slide)
font_path = Path(r"C:\Windows\Fonts\arial.ttf")
title_font = ImageFont.truetype(str(font_path), 58) if font_path.exists() else ImageFont.load_default()
slide.paste(fit_image(before, 920, 675), (280, 390))
slide.paste(fit_image(after, 920, 675), (1360, 390))
draw.text((300, 190), "Before", font=title_font, fill=(255, 255, 255))
draw.text(
(1380, 190),
f"After: {angle_degrees:g} deg head extension",
font=title_font,
fill=(255, 255, 255),
)
arrow = (255, 210, 60)
x0, y0, x1, y1 = 1685, 545, 1855, 455
draw.line((x0, y0, x1, y1), fill=arrow, width=10)
draw.polygon([(x1, y1), (x1 - 42, y1 + 5), (x1 - 15, y1 + 36)], fill=arrow)
preview_path = Path(output_dir) / "before_after_preview.png"
slide.save(preview_path, quality=95)
return preview_path
def make_four_state_preview(state_images, output_dir, angle_degrees):
output_dir = Path(output_dir)
screenshot_dir = output_dir / "process_screenshots"
reset_folder(screenshot_dir)
panels = []
for state_key, label, _ in STATE_LABELS:
panel = sitk_sagittal_panel(state_images[state_key])
panel_path = screenshot_dir / f"{state_key}.png"
panel.save(panel_path, quality=95)
panels.append((label, panel))
slide = Image.new("RGB", (2560, 720), (0, 0, 0))
draw = ImageDraw.Draw(slide)
font_path = Path(r"C:\Windows\Fonts\arial.ttf")
title_font = ImageFont.truetype(str(font_path), 36) if font_path.exists() else ImageFont.load_default()
small_font = ImageFont.truetype(str(font_path), 24) if font_path.exists() else ImageFont.load_default()
panel_width = 560
panel_height = 430
margin = 55
gap = 70
y_image = 145
y_title = 62
for index, (label, panel) in enumerate(panels):
x = margin + index * (panel_width + gap)
slide.paste(fit_image(panel, panel_width, panel_height), (x, y_image))
draw.text((x, y_title), label, font=title_font, fill=(255, 255, 255))
draw.text(
(margin, 650),
f"Head extension angle: {angle_degrees:g} deg",
font=small_font,
fill=(190, 190, 190),
)
comparison_path = output_dir / "process_comparison_4states.png"
slide.save(comparison_path, quality=95)
return {
"comparison": comparison_path,
"screenshots": screenshot_dir,
}
class HeadExtensionApp:
def __init__(self, root):
self.root = root
self.root.title("头颈部 CT 仰头形变工具")
self.root.geometry("1180x820")
self.input_dir = StringVar(value=str(APP_DIR / "input_ct_2F"))
self.output_dir = StringVar(value=str(APP_DIR / "app_output"))
self.status = StringVar(value="请选择 DICOM 文件夹,调节角度后可先预览,再生成四状态 DICOM 和过程对比图。")
self.angle_text = StringVar(value="12.0°")
self.transition_text = StringVar(value="45")
self.cached_volume = None
self.preview_photo = None
self.preview_after_id = None
self.build_ui()
def build_ui(self):
top = Frame(self.root)
top.pack(fill="x", padx=12, pady=10)
Label(top, text="输入 DICOM 文件夹").grid(row=0, column=0, sticky="w")
Entry(top, textvariable=self.input_dir, width=92).grid(row=0, column=1, padx=8)
Button(top, text="选择", command=self.choose_input).grid(row=0, column=2)
Label(top, text="输出文件夹").grid(row=1, column=0, sticky="w", pady=6)
Entry(top, textvariable=self.output_dir, width=92).grid(row=1, column=1, padx=8)
Button(top, text="选择", command=self.choose_output).grid(row=1, column=2)
controls = Frame(self.root)
controls.pack(fill="x", padx=12)
Label(controls, text="仰头角度").grid(row=0, column=0, sticky="w")
self.angle = Scale(
controls,
from_=0,
to=20,
orient=HORIZONTAL,
resolution=0.5,
length=420,
command=self.on_angle_change,
)
self.angle.set(12)
self.angle.grid(row=0, column=1, sticky="w", padx=8)
Label(controls, textvariable=self.angle_text, width=8, anchor="w").grid(
row=0, column=2, sticky="w"
)
Label(controls, text="过渡平滑宽度").grid(row=0, column=3, sticky="w", padx=(20, 0))
self.transition = Scale(
controls,
from_=50,
to=160,
orient=HORIZONTAL,
resolution=10,
length=300,
command=self.on_transition_change,
)
self.transition.set(90)
self.transition.grid(row=0, column=4, sticky="w", padx=8)
Label(controls, textvariable=self.transition_text, width=8, anchor="w").grid(
row=0, column=5, sticky="w"
)
buttons = Frame(self.root)
buttons.pack(fill="x", padx=12, pady=8)
self.preview_button = Button(buttons, text="更新预览", command=self.update_preview, width=16)
self.preview_button.pack(side="left", padx=(0, 8))
self.run_all_button = Button(
buttons,
text="保存过程对比图+四状态DICOM",
command=self.start_run,
width=28,
)
self.run_all_button.pack(side="left")
self.preview_label = Label(self.root, bg="black")
self.preview_label.pack(fill=BOTH, expand=True, padx=12, pady=8)
Label(self.root, textvariable=self.status, anchor="w").pack(fill="x", padx=12, pady=(0, 8))
self.log = Entry(self.root)
self.log.pack(fill="x", padx=12, pady=(0, 10))
def on_angle_change(self, value):
self.angle_text.set(f"{float(value):.1f}°")
self.schedule_preview_refresh()
def on_transition_change(self, value):
self.transition_text.set(f"{int(float(value))}")
def schedule_preview_refresh(self):
if self.preview_after_id is not None:
self.root.after_cancel(self.preview_after_id)
self.preview_after_id = self.root.after(250, self.update_preview)
def choose_input(self):
path = filedialog.askdirectory(title="选择 DICOM 文件夹")
if path:
self.input_dir.set(path)
self.cached_volume = None
def choose_output(self):
path = filedialog.askdirectory(title="选择输出文件夹")
if path:
self.output_dir.set(path)
def set_busy(self, busy):
state = DISABLED if busy else NORMAL
self.preview_button.config(state=state)
self.run_all_button.config(state=state)
def update_status(self, message):
self.root.after(0, lambda: self.status.set(message))
def update_preview(self):
self.preview_after_id = None
try:
if self.cached_volume is None:
self.status.set("正在读取 DICOM 生成预览...")
self.cached_volume = load_dicom_volume(self.input_dir.get())
before = crop_head_neck(sagittal_mip(self.cached_volume))
after = preview_deform_2d(before, float(self.angle.get()))
canvas = Image.new("RGB", (1120, 610), (0, 0, 0))
draw = ImageDraw.Draw(canvas)
before_panel = fit_image(before, 520, 500)
after_panel = fit_image(after, 520, 500)
canvas.paste(before_panel, (30, 80))
canvas.paste(after_panel, (570, 80))
draw.text((40, 25), "Before", fill=(255, 255, 255))
draw.text((580, 25), f"Preview: {float(self.angle.get()):g} deg", fill=(255, 255, 255))
self.preview_photo = ImageTk.PhotoImage(canvas)
self.preview_label.config(image=self.preview_photo)
self.status.set("预览已更新。预览是快速 2D 示意,最终输出会使用 3D 位移场。")
except Exception as exc:
messagebox.showerror("预览失败", str(exc))
self.status.set("预览失败,请检查输入 DICOM 文件夹。")
def start_run(self):
self.set_busy(True)
self.status.set("开始生成四状态完整输出请稍等。300 层 CT 通常需要 1-3 分钟。")
thread = threading.Thread(target=self.run_worker, daemon=True)
thread.start()
def run_worker(self):
try:
output_paths, preview_paths = run_deformation(
self.input_dir.get(),
self.output_dir.get(),
float(self.angle.get()),
int(self.transition.get()),
self.update_status,
)
self.update_status(f"完成。四状态 DICOM 与过程对比图已输出到:{self.output_dir.get()}")
self.root.after(
0,
lambda: messagebox.showinfo(
"完成",
"四状态完整输出已生成:\n"
f"Original{output_paths['original']}\n"
f"Hard boundary{output_paths['hard_boundary']}\n"
f"Gaussian smooth{output_paths['gaussian_smooth']}\n"
f"Soft transition{output_paths['soft_transition']}\n\n"
f"过程对比图:\n{preview_paths['comparison']}\n\n"
f"兼容旧版本的 Soft transition 输出:\n{output_paths['legacy_soft']}",
),
)
except Exception as exc:
error_message = str(exc)
self.update_status("生成失败。")
self.root.after(0, lambda: messagebox.showerror("生成失败", error_message))
finally:
self.root.after(0, lambda: self.set_busy(False))
def main():
safe_mkdir(RUNTIME_DIR)
safe_mkdir(PREVIEW_DIR)
root = Tk()
app = HeadExtensionApp(root)
app.update_preview()
root.mainloop()
if __name__ == "__main__":
main()

9
requirements.txt Normal file
View File

@@ -0,0 +1,9 @@
numpy
scipy
pillow
pydicom
SimpleITK
platipy
CTHeadDeformation
imageio
imageio-ffmpeg

136
video_generator_app.py Normal file
View File

@@ -0,0 +1,136 @@
import threading
from pathlib import Path
from tkinter import (
DISABLED,
HORIZONTAL,
NORMAL,
Button,
Entry,
Frame,
Label,
Scale,
StringVar,
Tk,
filedialog,
messagebox,
)
from generate_head_extension_video import generate_video
class VideoGeneratorApp:
def __init__(self, root):
self.root = root
self.root.title("仰头角度变化视频生成工具")
self.root.geometry("760x260")
self.input_dir = StringVar(value="input_ct_2F")
self.output_file = StringVar(value=str(Path("ppt_video") / "head_extension_0_to_20deg.mp4"))
self.status = StringVar(value="请选择 DICOM 文件夹,然后点击生成视频。")
self.angle_text = StringVar(value="20°")
self.duration_text = StringVar(value="6 秒")
self.build_ui()
def build_ui(self):
top = Frame(self.root)
top.pack(fill="x", padx=12, pady=12)
Label(top, text="DICOM 文件夹").grid(row=0, column=0, sticky="w")
Entry(top, textvariable=self.input_dir, width=72).grid(row=0, column=1, padx=8)
Button(top, text="选择", command=self.choose_input).grid(row=0, column=2)
Label(top, text="输出视频").grid(row=1, column=0, sticky="w", pady=8)
Entry(top, textvariable=self.output_file, width=72).grid(row=1, column=1, padx=8)
Button(top, text="选择", command=self.choose_output).grid(row=1, column=2)
controls = Frame(self.root)
controls.pack(fill="x", padx=12, pady=4)
Label(controls, text="最大仰头角度").grid(row=0, column=0, sticky="w")
self.max_angle = Scale(
controls,
from_=5,
to=30,
orient=HORIZONTAL,
resolution=1,
length=360,
command=self.on_angle_change,
)
self.max_angle.set(20)
self.max_angle.grid(row=0, column=1, padx=8)
Label(controls, textvariable=self.angle_text, width=8, anchor="w").grid(row=0, column=2)
Label(controls, text="动画时长").grid(row=1, column=0, sticky="w")
self.duration = Scale(
controls,
from_=3,
to=12,
orient=HORIZONTAL,
resolution=1,
length=360,
command=self.on_duration_change,
)
self.duration.set(6)
self.duration.grid(row=1, column=1, padx=8)
Label(controls, textvariable=self.duration_text, width=8, anchor="w").grid(row=1, column=2)
bottom = Frame(self.root)
bottom.pack(fill="x", padx=12, pady=12)
self.generate_button = Button(bottom, text="生成视频", width=18, command=self.start_generate)
self.generate_button.pack(side="left")
Label(bottom, textvariable=self.status, anchor="w").pack(side="left", padx=12)
def choose_input(self):
path = filedialog.askdirectory(title="选择 DICOM 文件夹")
if path:
self.input_dir.set(path)
def choose_output(self):
path = filedialog.asksaveasfilename(
title="选择输出视频路径",
defaultextension=".mp4",
filetypes=[("MP4 视频", "*.mp4")],
initialfile="head_extension_0_to_20deg.mp4",
)
if path:
self.output_file.set(path)
def on_angle_change(self, value):
self.angle_text.set(f"{float(value):.0f}°")
def on_duration_change(self, value):
self.duration_text.set(f"{float(value):.0f}")
def start_generate(self):
self.generate_button.config(state=DISABLED)
self.status.set("正在生成视频,请稍等...")
thread = threading.Thread(target=self.generate_worker, daemon=True)
thread.start()
def generate_worker(self):
try:
output = generate_video(
self.input_dir.get(),
self.output_file.get(),
float(self.max_angle.get()),
float(self.duration.get()),
)
self.root.after(0, lambda: self.status.set(f"完成:{output}"))
self.root.after(0, lambda: messagebox.showinfo("完成", f"视频已生成:\n{output}"))
except Exception as exc:
error_message = str(exc)
self.root.after(0, lambda: self.status.set("生成失败。"))
self.root.after(0, lambda: messagebox.showerror("生成失败", error_message))
finally:
self.root.after(0, lambda: self.generate_button.config(state=NORMAL))
def main():
root = Tk()
VideoGeneratorApp(root)
root.mainloop()
if __name__ == "__main__":
main()

507
web_backend.py Normal file
View File

@@ -0,0 +1,507 @@
import base64
import json
import os
import shutil
import threading
import time
import traceback
import uuid
import zipfile
from email.parser import BytesParser
from email.policy import default
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from io import BytesIO
from pathlib import Path
from urllib.parse import parse_qs, unquote, urlparse
os.environ.setdefault("MPLCONFIGDIR", "/tmp/head_ct_morph_matplotlib")
from generate_head_extension_video import generate_video
from head_extension_app import (
APP_DIR,
crop_head_neck,
fit_image,
load_dicom_volume,
preview_deform_2d,
run_deformation,
sagittal_mip,
safe_mkdir,
)
HOST = "0.0.0.0"
PORT = 8787
JOBS = {}
JOBS_LOCK = threading.Lock()
LIBRARY_DIR = APP_DIR / "web_library"
LIBRARY_META = LIBRARY_DIR / "library.json"
RESULT_DIR = APP_DIR / "web_results"
def json_default(value):
if isinstance(value, Path):
return str(value.resolve())
return str(value)
def now_date():
return time.strftime("%Y-%m-%d")
def safe_filename(name):
return "".join(char if char.isalnum() or char in "._-" else "_" for char in Path(name).name)
def read_library_meta():
safe_mkdir(LIBRARY_DIR)
if LIBRARY_META.exists():
return json.loads(LIBRARY_META.read_text(encoding="utf-8"))
return []
def write_library_meta(items):
safe_mkdir(LIBRARY_DIR)
LIBRARY_META.write_text(json.dumps(items, ensure_ascii=False, indent=2), encoding="utf-8")
def folder_size_mb(path):
total = 0
for file_path in Path(path).glob("*.dcm"):
total += file_path.stat().st_size
return round(total / 1024 / 1024)
def build_library_record(item_id, patient_id, dicom_path, source="upload", version="DICOM-LIB"):
dicom_dir = Path(dicom_path).resolve()
file_count = len(list(dicom_dir.glob("*.dcm")))
return {
"id": item_id,
"patientId": patient_id,
"date": now_date(),
"version": version,
"status": "processed",
"size": folder_size_mb(dicom_dir),
"previewColor": "bg-slate-900" if source == "upload" else "bg-slate-800",
"dicomPath": str(dicom_dir),
"fileCount": file_count,
"source": source,
}
def list_library():
items = [
item for item in read_library_meta()
if item.get("source") != "seed" and item.get("id") != "seed_ori_head_ct"
]
live_items = []
changed = False
for item in items:
dicom_path = Path(item.get("dicomPath", ""))
if dicom_path.exists():
item["fileCount"] = len(list(dicom_path.glob("*.dcm")))
item["size"] = folder_size_mb(dicom_path)
live_items.append(item)
else:
changed = True
if changed:
write_library_meta(live_items)
return live_items
def parse_multipart(headers, body):
content_type = headers.get("content-type", "")
message = BytesParser(policy=default).parsebytes(
f"Content-Type: {content_type}\r\nMIME-Version: 1.0\r\n\r\n".encode("utf-8") + body
)
fields = {}
files = []
for part in message.iter_parts():
name = part.get_param("name", header="content-disposition")
filename = part.get_filename()
payload = part.get_payload(decode=True) or b""
if filename:
files.append((name, filename, payload))
elif name:
fields[name] = payload.decode("utf-8", errors="replace")
return fields, files
def write_dicom_payload(target_dir, filename, payload, index):
output_name = safe_filename(filename) or f"{index}.dcm"
output_path = target_dir / output_name
if output_path.exists():
output_path = target_dir / f"{index}_{output_name}"
output_path.write_bytes(payload)
def add_dicom_from_zip(target_dir, zip_filename, payload, start_index):
count = 0
try:
with zipfile.ZipFile(BytesIO(payload)) as archive:
for member in archive.infolist():
if member.is_dir():
continue
member_name = member.filename.replace("\\", "/")
if Path(member_name).suffix.lower() != ".dcm":
continue
if Path(member_name).is_absolute() or ".." in Path(member_name).parts:
continue
count += 1
with archive.open(member) as member_file:
write_dicom_payload(
target_dir,
f"{Path(zip_filename).stem}_{Path(member_name).name}",
member_file.read(),
start_index + count,
)
except zipfile.BadZipFile as exc:
raise RuntimeError(f"{zip_filename} 不是有效的 zip 压缩包。") from exc
return count
def upload_library_item(headers, body):
fields, files = parse_multipart(headers, body)
dicom_files = [
(filename, payload)
for _, filename, payload in files
if Path(filename).suffix.lower() == ".dcm"
]
zip_files = [
(filename, payload)
for _, filename, payload in files
if Path(filename).suffix.lower() == ".zip"
]
if not dicom_files and not zip_files:
raise RuntimeError("上传内容里没有找到 .dcm 文件或 .zip 压缩包。")
item_id = time.strftime("%Y%m%d_%H%M%S_") + uuid.uuid4().hex[:8]
first_upload_name = (dicom_files[0][0] if dicom_files else zip_files[0][0])
default_patient_id = Path(first_upload_name).stem.upper() if zip_files else f"WEB_{item_id[-8:].upper()}"
patient_id = fields.get("patientId") or default_patient_id
target_dir = LIBRARY_DIR / item_id / "dicom"
reset_dir(target_dir)
for index, (filename, payload) in enumerate(dicom_files, start=1):
write_dicom_payload(target_dir, filename, payload, index)
zip_dicom_count = 0
for filename, payload in zip_files:
zip_dicom_count += add_dicom_from_zip(
target_dir,
filename,
payload,
len(dicom_files) + zip_dicom_count,
)
total_dicom_count = len(dicom_files) + zip_dicom_count
if total_dicom_count == 0:
shutil.rmtree(target_dir.parent)
raise RuntimeError("压缩包里没有找到 .dcm 文件。")
record = build_library_record(
item_id,
patient_id,
target_dir,
source="upload",
version="DICOM-ZIP" if zip_files and not dicom_files else "DICOM-LIB",
)
items = [record, *list_library()]
write_library_meta(items)
return record
def reset_dir(path):
path = Path(path)
if path.exists():
shutil.rmtree(path)
path.mkdir(parents=True, exist_ok=True)
def zip_folder(source_dir, zip_path):
source_dir = Path(source_dir).resolve()
zip_path = Path(zip_path).resolve()
safe_mkdir(zip_path.parent)
if zip_path.exists():
zip_path.unlink()
with zipfile.ZipFile(zip_path, "w", compression=zipfile.ZIP_DEFLATED) as archive:
for file_path in source_dir.rglob("*"):
if file_path.is_file():
archive.write(file_path, file_path.relative_to(source_dir))
return zip_path
def set_job(job_id, **updates):
with JOBS_LOCK:
job = JOBS[job_id]
job.update(updates)
job["updatedAt"] = time.strftime("%Y-%m-%d %H:%M:%S")
def get_job(job_id):
with JOBS_LOCK:
job = JOBS.get(job_id)
return dict(job) if job else None
def start_job(kind, worker):
job_id = uuid.uuid4().hex[:12]
with JOBS_LOCK:
JOBS[job_id] = {
"id": job_id,
"kind": kind,
"status": "running",
"message": "任务已启动。",
"result": None,
"error": None,
"createdAt": time.strftime("%Y-%m-%d %H:%M:%S"),
"updatedAt": time.strftime("%Y-%m-%d %H:%M:%S"),
}
def run():
try:
result = worker(job_id)
set_job(job_id, status="completed", message="任务完成。", result=result)
except Exception as exc:
set_job(
job_id,
status="failed",
message="任务失败。",
error=str(exc),
traceback=traceback.format_exc(),
)
threading.Thread(target=run, daemon=True).start()
return get_job(job_id)
def make_preview(input_dir, angle_degrees):
volume = load_dicom_volume(input_dir)
before = crop_head_neck(sagittal_mip(volume))
after = preview_deform_2d(before, float(angle_degrees))
canvas = BytesIO()
preview = fit_image(after, 720, 520)
preview.save(canvas, format="PNG")
encoded = base64.b64encode(canvas.getvalue()).decode("ascii")
return {
"image": f"data:image/png;base64,{encoded}",
"source": str(Path(input_dir).resolve()),
"angleDegrees": float(angle_degrees),
}
def serialize_outputs(output_paths, preview_paths, zip_path=None):
return {
"outputs": {key: str(Path(value).resolve()) for key, value in output_paths.items()},
"previews": {
"comparison": str(Path(preview_paths["comparison"]).resolve()),
"screenshots": str(Path(preview_paths["screenshots"]).resolve()),
},
"zip": {
"path": str(Path(zip_path).resolve()),
"name": Path(zip_path).name,
"size": Path(zip_path).stat().st_size,
} if zip_path else None,
}
class Handler(BaseHTTPRequestHandler):
def log_message(self, format, *args):
print("%s - %s" % (self.address_string(), format % args))
def do_OPTIONS(self):
self.send_response(204)
self.send_cors_headers()
self.end_headers()
def do_GET(self):
parsed = urlparse(self.path)
if parsed.path == "/api/health":
self.send_json({"ok": True, "service": "Head CT Morph backend"})
return
if parsed.path == "/api/defaults":
self.send_json(
{
"backend": f"http://127.0.0.1:{PORT}",
}
)
return
if parsed.path == "/api/library":
self.send_json({"items": list_library()})
return
if parsed.path == "/api/job":
params = parse_qs(parsed.query)
job_id = params.get("id", [""])[0]
job = get_job(job_id)
if not job:
self.send_json({"error": "任务不存在。"}, status=404)
return
self.send_json(job)
return
if parsed.path == "/api/file":
params = parse_qs(parsed.query)
file_path = Path(unquote(params.get("path", [""])[0])).resolve()
if not file_path.exists() or not file_path.is_file():
self.send_json({"error": "文件不存在。"}, status=404)
return
content_type = "application/octet-stream"
if file_path.suffix.lower() in [".png", ".jpg", ".jpeg"]:
content_type = "image/png" if file_path.suffix.lower() == ".png" else "image/jpeg"
elif file_path.suffix.lower() == ".mp4":
content_type = "video/mp4"
elif file_path.suffix.lower() == ".zip":
content_type = "application/zip"
data = file_path.read_bytes()
self.send_response(200)
self.send_cors_headers()
self.send_header("Content-Type", content_type)
if file_path.suffix.lower() == ".zip":
self.send_header("Content-Disposition", f'attachment; filename="{file_path.name}"')
self.send_header("Content-Length", str(len(data)))
self.end_headers()
self.wfile.write(data)
return
self.send_json({"error": "接口不存在。"}, status=404)
def do_POST(self):
parsed = urlparse(self.path)
try:
if parsed.path == "/api/library/upload":
body = self.read_bytes()
self.send_json(upload_library_item(self.headers, body), status=201)
return
body = self.read_json()
if parsed.path == "/api/preview":
self.send_json(make_preview(body["inputDir"], body.get("angleDegrees", 12)))
return
if parsed.path == "/api/deformation":
input_dir = body["inputDir"]
angle_degrees = float(body.get("angleDegrees", 12))
transition_width = int(body.get("transitionWidth", 90))
def worker(job_id):
job_root = RESULT_DIR / job_id
output_dir = job_root / "four_state_output"
reset_dir(job_root)
def progress(message):
set_job(job_id, message=message)
output_paths, preview_paths = run_deformation(
input_dir,
output_dir,
angle_degrees,
transition_width,
progress,
)
set_job(job_id, message="正在打包四状态输出 ZIP...")
zip_path = zip_folder(
output_dir,
job_root / f"head_ct_morph_{job_id}.zip",
)
return serialize_outputs(output_paths, preview_paths, zip_path)
self.send_json(start_job("deformation", worker), status=202)
return
if parsed.path == "/api/video":
input_dir = body["inputDir"]
max_angle = float(body.get("maxAngle", 20))
duration = float(body.get("durationSeconds", 6))
def worker(job_id):
job_root = RESULT_DIR / job_id
reset_dir(job_root)
output_file = job_root / f"head_extension_{job_id}.mp4"
set_job(job_id, message="正在生成 0° 到目标角度的视频。")
output = generate_video(input_dir, output_file, max_angle, duration)
output = Path(output).resolve()
return {
"video": {
"path": str(output),
"name": output.name,
"size": output.stat().st_size,
}
}
self.send_json(start_job("video", worker), status=202)
return
self.send_json({"error": "接口不存在。"}, status=404)
except Exception as exc:
self.send_json(
{
"error": str(exc),
"traceback": traceback.format_exc(),
},
status=500,
)
def do_DELETE(self):
parsed = urlparse(self.path)
if parsed.path != "/api/library":
self.send_json({"error": "接口不存在。"}, status=404)
return
params = parse_qs(parsed.query)
item_id = params.get("id", [""])[0]
items = list_library()
target = next((item for item in items if item["id"] == item_id), None)
if not target:
self.send_json({"error": "影像不存在。"}, status=404)
return
remaining = [item for item in items if item["id"] != item_id]
write_library_meta(remaining)
upload_root = Path(target["dicomPath"]).resolve().parent
if upload_root.exists() and upload_root.is_relative_to(LIBRARY_DIR.resolve()):
shutil.rmtree(upload_root)
self.send_json({"ok": True, "items": remaining})
def read_bytes(self):
length = int(self.headers.get("content-length", "0"))
if length == 0:
return b""
return self.rfile.read(length)
def read_json(self):
body = self.read_bytes()
if not body:
return {}
return json.loads(body.decode("utf-8"))
def send_cors_headers(self):
self.send_header("Access-Control-Allow-Origin", "*")
self.send_header("Access-Control-Allow-Methods", "GET,POST,OPTIONS")
self.send_header("Access-Control-Allow-Headers", "Content-Type")
def send_json(self, payload, status=200):
data = json.dumps(payload, ensure_ascii=False, default=json_default).encode("utf-8")
self.send_response(status)
self.send_cors_headers()
self.send_header("Content-Type", "application/json; charset=utf-8")
self.send_header("Content-Length", str(len(data)))
self.end_headers()
self.wfile.write(data)
def main():
safe_mkdir(APP_DIR / "app_output")
safe_mkdir(APP_DIR / "ppt_video")
safe_mkdir(LIBRARY_DIR)
safe_mkdir(RESULT_DIR)
server = ThreadingHTTPServer((HOST, PORT), Handler)
print(f"Head CT Morph backend running at http://{HOST}:{PORT}")
server.serve_forever()
if __name__ == "__main__":
main()

191
使用说明.md Normal file
View File

@@ -0,0 +1,191 @@
# 头颈部 CT 仰头形变工具使用说明
## 1. 工具用途
本工具用于将一组头颈部 CT DICOM 序列模拟成“仰头/后伸”状态,并输出新的 DICOM 序列和一张前后对比图。
适用场景:
- PPT 展示
- 方法演示
- 头颈部体位变化的初步可视化
- 插管体位下的仰头效果模拟
注意:本工具生成的是模拟形变结果,不应直接作为临床诊断或治疗计划依据。
## 2. 文件说明
用户版文件夹内包含:
- `启动头颈CT仰头形变工具.bat`:双击启动程序
- `head_extension_app.py`:主程序
- `generate_head_extension_video.py`:视频生成脚本
- `video_generator_app.py`:视频生成图形界面
- `requirements.txt`Python 依赖包列表
- `使用说明.md`:本说明文档
- `生成0到20度仰头视频.bat`:生成角度连续变化视频
## 3. 运行环境
需要安装 Python 3.8 或以上版本。
首次使用前,在当前文件夹打开命令行,执行:
```bat
pip install -r requirements.txt
```
如果已经在原电脑环境中配置好依赖,可直接双击启动。
## 4. 启动程序
双击:
```bat
启动头颈CT仰头形变工具.bat
```
若窗口没有打开,请在命令行中运行:
```bat
python head_extension_app.py
```
这样可以看到具体报错信息。
## 5. 输入数据要求
请选择一个只包含同一套 CT 序列 `.dcm` 文件的文件夹。
建议:
- 使用轴位 CT 序列
- 同一文件夹内不要混入多个 series
- 文件扩展名最好为 `.dcm`
- 推荐使用层厚较薄、覆盖完整头颈部的序列
## 6. 操作步骤
1. 点击“选择”,选择输入 DICOM 文件夹。
2. 点击“选择”,选择输出文件夹。
3. 调节“仰头角度”滑块。
4. 调节“过渡平滑宽度”滑块。
5. 点击“更新预览”查看快速示意。
6. 点击“生成形变 DICOM”。
7. 等待完成提示。
## 7. 参数说明
### 仰头角度
控制头部后伸程度。
建议范围:
- `5°`:轻微仰头
- `10°-15°`:较明显仰头
- `20°`:展示效果较强
### 过渡平滑宽度
控制头部旋转区域和肩颈固定区域之间的过渡范围。
建议:
- 默认 `90`
- 如果连接处仍有割裂感,可增大到 `110-140`
- 如果整体形变过软,可减小到 `60-80`
- 不建议使用过小数值,否则颈椎附近可能出现上下两段不连续的视觉效果
## 8. 输出结果
程序会在输出文件夹中生成:
- `ct_original/`:原始 Original 完整 DICOM 序列
- `ct_hard_boundary/`:硬边界 Hard boundary 完整 DICOM 序列
- `ct_gaussian_smooth/`:高斯平滑 Gaussian smooth 完整 DICOM 序列
- `ct_soft_transition/`:软过渡 Soft transition 完整 DICOM 序列
- `ct/`:兼容旧版本的软过渡 Soft transition DICOM 序列
- `process_comparison_4states.png`:四种状态过程对比图
- `process_screenshots/`:四种状态的单独截图
- `before_after_preview.png`:前后对比图
其中各个 `ct_*` 文件夹均可用 DICOM 查看器打开,也可作为后续 CT 三维重建的数据基础。
## 9. 常见问题
### 预览和最终 DICOM 完全一样吗?
不完全一样。
预览是快速 2D 示意,用于快速查看角度方向和大致效果;最终输出 DICOM 使用三维软过渡位移场生成。
### 为什么输出需要一段时间?
程序需要读取 CT、生成三维位移场、重采样体数据并重新写出 DICOM。300 层 CT 通常需要 1-3 分钟。
### 为什么连接处仍可能有轻微不自然?
目前方法使用软过渡位移场模拟头颈后伸,不是完整的多节颈椎生物力学模型。如果需要更真实的临床级结果,需要进一步标注各颈椎旋转中心并建立多段运动模型。
### 中文路径会不会影响运行?
程序内部会自动把临时文件放到系统英文临时目录,尽量避免中文路径导致的第三方库兼容问题。输入和输出路径可以使用中文。
## 10. 推荐展示设置
用于 PPT 展示时,建议:
- 仰头角度:`12°-20°`
- 过渡平滑宽度:`90-120`
- 使用输出的 `process_comparison_4states.png``before_after_preview.png`
如果希望展示角度连续变化,可另行生成视频动画。
## 11. 生成 0° 到 20° 连续变化视频
用户版文件夹内提供了视频生成功能,可生成一段从 `0°` 平滑变化到 `20°` 的仰头动画。
双击:
```bat
生成0到20度仰头视频.bat
```
程序会打开一个视频生成窗口。
在窗口中可以:
- 选择 DICOM 文件夹
- 选择输出视频位置
- 调节最大仰头角度
- 调节动画时长
- 点击“生成视频”
生成的视频位于:
```bat
ppt_video\head_extension_0_to_20deg.mp4
```
视频参数:
- 分辨率:`1920 × 1080`
- 帧率:`30 fps`
- 动画时长:约 `7 秒`
- 内容:左侧为原始 0° 图像,右侧为从 0° 平滑变化到 20° 的仰头效果
### 自定义视频角度和时长
不需要使用命令行。
在视频生成窗口中直接拖动滑块即可:
- “最大仰头角度”:控制视频最终达到的角度
- “动画时长”:控制从 0° 变化到最大角度所需时间
### 视频生成方式说明
视频使用轻量二维软弯曲预览算法生成,目的是用于 PPT 展示角度连续变化。它不会为每一帧重新生成完整三维 DICOM因此速度较快。
如果需要生成某一个具体角度下的完整 DICOM 结果,请使用主程序 `启动头颈CT仰头形变工具.bat`