import os import shutil import subprocess import sys import zipfile from dataclasses import dataclass from pathlib import Path from openpyxl import load_workbook PROCESSOR_DIR = Path(__file__).resolve().parent / "processors" class ProcessingError(Exception): pass @dataclass class SheetSummary: name: str rows: int columns: int preview: list[list[str]] @dataclass class ExcelSummary: filename: str relpath: str sheets: list[SheetSummary] @dataclass class ProcessingResult: job_id: str mode: str output_dir: Path zip_path: Path files: list[ExcelSummary] def run_processing( zip_path: Path, job_dir: Path, mode: str, data_type: str, result_name: str, show_not_match: bool, show_all_infos: bool, ) -> ProcessingResult: if mode not in {"auto", "v1", "v2"}: raise ProcessingError("处理模式不正确。") if data_type not in {"pat_no", "zhuyuanhao"}: raise ProcessingError("患者编号类型不正确。") extract_dir = job_dir / "input" output_dir = job_dir / "output" extract_dir.mkdir(parents=True, exist_ok=True) output_dir.mkdir(parents=True, exist_ok=True) _safe_extract(zip_path, extract_dir) data_dir = _find_data_root(extract_dir) selected_mode = _detect_mode(data_dir) if mode == "auto" else mode clean_name = _clean_result_name(result_name) if selected_mode == "v1": result_path = output_dir / f"{clean_name}.xlsx" cmd = [ sys.executable, str(PROCESSOR_DIR / "V1-ALL_convert_Lab_Test_data.py"), str(data_dir), str(result_path), str(show_not_match), str(show_all_infos), data_type, ] elif selected_mode == "v2": cmd = [ sys.executable, str(PROCESSOR_DIR / "V2-Every_Pat_File_convert_Lab_Test_data.py"), "--file_dir", str(data_dir), "--result_save_file_name", clean_name, "--show_not_match", str(show_not_match), "--show_all_infos", str(show_all_infos), "--data_type", data_type, ] else: raise ProcessingError("无法识别数据目录结构,请手动选择 V1 或 V2。") env = os.environ.copy() env["PYTHONUTF8"] = "1" env["PYTHONIOENCODING"] = "utf-8" completed = subprocess.run( cmd, cwd=PROCESSOR_DIR, env=env, text=True, encoding="utf-8", errors="replace", stdout=subprocess.PIPE, stderr=subprocess.STDOUT, timeout=60 * 30, ) (job_dir / "process.log").write_text( "mode=" + selected_mode + "\n\n" + completed.stdout, encoding="utf-8", ) if completed.returncode != 0: raise ProcessingError(f"处理脚本退出码 {completed.returncode}。\n{completed.stdout[-4000:]}") if selected_mode == "v2": _collect_v2_outputs(data_dir, output_dir) xlsx_files = sorted(output_dir.rglob("*.xlsx")) if not xlsx_files: raise ProcessingError("处理完成但没有生成 Excel 文件,请检查数据结构。") result_zip = job_dir / "result.zip" _create_result_zip(output_dir, result_zip) return ProcessingResult( job_id=job_dir.name, mode=selected_mode, output_dir=output_dir, zip_path=result_zip, files=[_summarize_workbook(path, output_dir) for path in xlsx_files], ) def create_result_zip(job_dir: Path) -> Path: output_dir = job_dir / "output" result_zip = job_dir / "result.zip" if not output_dir.exists(): raise ProcessingError("结果目录不存在。") _create_result_zip(output_dir, result_zip) return result_zip def find_output_file(job_dir: Path, relpath: str) -> Path: output_dir = (job_dir / "output").resolve() target = (output_dir / relpath).resolve() if not str(target).startswith(str(output_dir)) or not target.is_file(): raise ProcessingError("结果文件不存在。") if target.suffix.lower() != ".xlsx": raise ProcessingError("只能导出 Excel 结果文件。") return target def _safe_extract(zip_path: Path, target_dir: Path) -> None: try: with zipfile.ZipFile(zip_path) as zf: for member in zf.infolist(): destination = (target_dir / member.filename).resolve() if not str(destination).startswith(str(target_dir.resolve())): raise ProcessingError("zip 中包含不安全路径。") zf.extractall(target_dir) except zipfile.BadZipFile as exc: raise ProcessingError("zip 文件无法解压。") from exc def _find_data_root(extract_dir: Path) -> Path: candidates = [extract_dir] children = [p for p in extract_dir.iterdir() if p.is_dir()] if len(children) == 1 and not any(p.is_file() for p in extract_dir.iterdir()): candidates.insert(0, children[0]) for candidate in candidates: if (candidate / "Patients_info.csv").exists(): return candidate for path in extract_dir.rglob("Patients_info.csv"): return path.parent raise ProcessingError("未找到 Patients_info.csv。") def _detect_mode(data_dir: Path) -> str: if (data_dir / "Tests_List").is_dir() and (data_dir / "Tests_Detail_List").is_dir(): return "v1" patient_dirs = [p for p in data_dir.iterdir() if p.is_dir()] for patient_dir in patient_dirs: names = {p.name for p in patient_dir.iterdir()} has_summary = any(name.endswith("_检测汇总.csv") for name in names) has_detail_dir = any(name.endswith("_具体检测") and (patient_dir / name).is_dir() for name in names) if has_summary and has_detail_dir: return "v2" raise ProcessingError("无法自动识别 V1/V2 数据结构。") def _clean_result_name(result_name: str) -> str: name = (result_name or "Result").strip() if name.lower().endswith(".xlsx"): name = name[:-5] forbidden = '<>:"/\\|?*' name = "".join("_" if ch in forbidden else ch for ch in name).strip(" .") return name or "Result" def _collect_v2_outputs(data_dir: Path, output_dir: Path) -> None: v2_dir = output_dir / "V2患者结果" v2_dir.mkdir(exist_ok=True) for path in data_dir.rglob("*.xlsx"): if path.is_file(): target = v2_dir / path.name if target.exists(): target = v2_dir / f"{path.parent.name}_{path.name}" shutil.copy2(path, target) def _create_result_zip(output_dir: Path, result_zip: Path) -> None: with zipfile.ZipFile(result_zip, "w", compression=zipfile.ZIP_DEFLATED) as zf: for path in sorted(output_dir.rglob("*.xlsx")): if path.is_file(): zf.write(path, path.relative_to(output_dir)) def _summarize_workbook(path: Path, output_dir: Path) -> ExcelSummary: sheets: list[SheetSummary] = [] workbook = load_workbook(path, read_only=True, data_only=True) try: for sheet in workbook.worksheets: preview: list[list[str]] = [] for row in sheet.iter_rows(max_row=6, values_only=True): preview.append([_cell_to_text(value) for value in row]) sheets.append( SheetSummary( name=sheet.title, rows=sheet.max_row or 0, columns=sheet.max_column or 0, preview=preview, ) ) finally: workbook.close() return ExcelSummary( filename=path.name, relpath=path.relative_to(output_dir).as_posix(), sheets=sheets, ) def _cell_to_text(value: object) -> str: if value is None: return "" text = str(value) return text if len(text) <= 80 else text[:77] + "..."