share deformation job progress by account

This commit is contained in:
2026-05-03 01:23:10 +08:00
parent eb03bea7d4
commit 0aa9cffb97
2 changed files with 173 additions and 9 deletions

View File

@@ -42,6 +42,8 @@ DICOM_FILE_CACHE = {}
LIBRARY_DIR = APP_DIR / "web_library"
LIBRARY_META = LIBRARY_DIR / "library.json"
RESULT_DIR = APP_DIR / "web_results"
JOBS_META = RESULT_DIR / "jobs.json"
USER_TASKS_META = RESULT_DIR / "user_tasks.json"
PREVIEW_CACHE_DIR = LIBRARY_DIR / "_preview_cache"
@@ -59,6 +61,27 @@ def safe_filename(name):
return "".join(char if char.isalnum() or char in "._-" else "_" for char in Path(name).name)
def normalized_username(username):
username = str(username or "").strip()
return username or "anonymous"
def read_json_file(path, default):
path = Path(path)
if not path.exists():
return default
try:
return json.loads(path.read_text(encoding="utf-8"))
except Exception:
return default
def write_json_file(path, value):
path = Path(path)
safe_mkdir(path.parent)
path.write_text(json.dumps(value, ensure_ascii=False, indent=2, default=json_default), encoding="utf-8")
def read_library_meta():
safe_mkdir(LIBRARY_DIR)
if LIBRARY_META.exists():
@@ -441,11 +464,76 @@ def zip_result_bundle(zip_path, state_zips, preview_paths):
return zip_path
def read_user_tasks():
tasks = read_json_file(USER_TASKS_META, {})
return tasks if isinstance(tasks, dict) else {}
def write_user_tasks(tasks):
write_json_file(USER_TASKS_META, tasks)
def set_user_task(username, kind, job_id):
username = normalized_username(username)
tasks = read_user_tasks()
tasks.setdefault(username, {})[kind] = job_id
write_user_tasks(tasks)
def get_user_task_job(username, kind):
username = normalized_username(username)
tasks = read_user_tasks()
job_id = tasks.get(username, {}).get(kind)
if not job_id:
return None
return get_job(job_id)
def persist_jobs_locked():
write_json_file(JOBS_META, JOBS)
def load_persisted_jobs():
saved_jobs = read_json_file(JOBS_META, {})
if not isinstance(saved_jobs, dict):
return
now_text = time.strftime("%Y-%m-%d %H:%M:%S")
with JOBS_LOCK:
for job_id, job in saved_jobs.items():
if not isinstance(job, dict):
continue
if job.get("status") == "running":
job = {
**job,
"status": "failed",
"message": "后端已重启,运行中的任务已中断。",
"error": "后端服务重启后无法继续运行中的任务,请重新提交。",
"updatedAt": now_text,
}
JOBS[job_id] = job
persist_jobs_locked()
def deformation_progress_for_message(message):
if "已复制" in message:
return 20
if "正在生成四种状态" in message:
return 35
if "正在应用形变" in message:
return 55
if "正在写出四种状态" in message:
return 72
if "正在生成四状态过程对比图" in message:
return 82
return None
def set_job(job_id, **updates):
with JOBS_LOCK:
job = JOBS[job_id]
job.update(updates)
job["updatedAt"] = time.strftime("%Y-%m-%d %H:%M:%S")
persist_jobs_locked()
def get_job(job_id):
@@ -454,24 +542,30 @@ def get_job(job_id):
return dict(job) if job else None
def start_job(kind, worker):
def start_job(kind, worker, owner=None, params=None):
job_id = uuid.uuid4().hex[:12]
owner = normalized_username(owner)
with JOBS_LOCK:
JOBS[job_id] = {
"id": job_id,
"kind": kind,
"owner": owner,
"status": "running",
"message": "任务已启动。",
"progress": 1,
"params": params or {},
"result": None,
"error": None,
"createdAt": time.strftime("%Y-%m-%d %H:%M:%S"),
"updatedAt": time.strftime("%Y-%m-%d %H:%M:%S"),
}
persist_jobs_locked()
set_user_task(owner, kind, job_id)
def run():
try:
result = worker(job_id)
set_job(job_id, status="completed", message="任务完成。", result=result)
set_job(job_id, status="completed", message="任务完成。", progress=100, result=result)
except Exception as exc:
set_job(
job_id,
@@ -575,6 +669,16 @@ class Handler(BaseHTTPRequestHandler):
self.send_json(job)
return
if parsed.path == "/api/user/job":
params = parse_qs(parsed.query)
username = params.get("username", [""])[0]
kind = params.get("kind", ["deformation"])[0]
if kind not in ["deformation", "video"]:
self.send_json({"error": "未知任务类型。"}, status=400)
return
self.send_json({"job": get_user_task_job(username, kind)})
return
if parsed.path == "/api/file":
params = parse_qs(parsed.query)
file_path = Path(unquote(params.get("path", [""])[0])).resolve()
@@ -618,6 +722,7 @@ class Handler(BaseHTTPRequestHandler):
input_dir = body["inputDir"]
angle_degrees = float(body.get("angleDegrees", 12))
transition_width = int(body.get("transitionWidth", 90))
username = normalized_username(body.get("username"))
def worker(job_id):
job_root = RESULT_DIR / job_id
@@ -625,7 +730,11 @@ class Handler(BaseHTTPRequestHandler):
reset_dir(job_root)
def progress(message):
set_job(job_id, message=message)
progress_value = deformation_progress_for_message(message)
updates = {"message": message}
if progress_value is not None:
updates["progress"] = progress_value
set_job(job_id, **updates)
output_paths, preview_paths = run_deformation(
input_dir,
@@ -634,7 +743,7 @@ class Handler(BaseHTTPRequestHandler):
transition_width,
progress,
)
set_job(job_id, message="正在打包各状态 DICOM ZIP...")
set_job(job_id, message="正在打包各状态 DICOM ZIP...", progress=88)
state_zips = {}
for state_key in [
"original",
@@ -646,7 +755,7 @@ class Handler(BaseHTTPRequestHandler):
output_paths[state_key],
job_root / f"{state_key}_{job_id}.zip",
)
set_job(job_id, message="正在整理四状态总 ZIP...")
set_job(job_id, message="正在整理四状态总 ZIP...", progress=96)
zip_path = zip_result_bundle(
job_root / f"head_ct_morph_{job_id}.zip",
state_zips,
@@ -654,7 +763,19 @@ class Handler(BaseHTTPRequestHandler):
)
return serialize_outputs(output_paths, preview_paths, zip_path, state_zips)
self.send_json(start_job("deformation", worker), status=202)
self.send_json(
start_job(
"deformation",
worker,
owner=username,
params={
"inputDir": input_dir,
"angleDegrees": angle_degrees,
"transitionWidth": transition_width,
},
),
status=202,
)
return
if parsed.path == "/api/video":
@@ -747,6 +868,7 @@ def main():
safe_mkdir(APP_DIR / "ppt_video")
safe_mkdir(LIBRARY_DIR)
safe_mkdir(RESULT_DIR)
load_persisted_jobs()
server = ThreadingHTTPServer((HOST, PORT), Handler)
print(f"Head CT Morph backend running at http://{HOST}:{PORT}")
server.serve_forever()