first commit

This commit is contained in:
2026-05-08 21:28:29 +08:00
commit 8598df4930
11 changed files with 1236 additions and 0 deletions

181
app/processor.py Normal file
View File

@@ -0,0 +1,181 @@
import os
import shutil
import subprocess
import sys
import zipfile
from pathlib import Path
PROCESSOR_DIR = Path(__file__).resolve().parent / "processors"
class ProcessingError(Exception):
pass
def run_processing(
zip_path: Path,
job_dir: Path,
mode: str,
data_type: str,
result_name: str,
show_not_match: bool,
show_all_infos: bool,
) -> Path:
if mode not in {"auto", "v1", "v2"}:
raise ProcessingError("处理模式不正确。")
if data_type not in {"pat_no", "zhuyuanhao"}:
raise ProcessingError("患者编号类型不正确。")
extract_dir = job_dir / "input"
output_dir = job_dir / "output"
extract_dir.mkdir(parents=True, exist_ok=True)
output_dir.mkdir(parents=True, exist_ok=True)
_safe_extract(zip_path, extract_dir)
data_dir = _find_data_root(extract_dir)
selected_mode = _detect_mode(data_dir) if mode == "auto" else mode
clean_name = _clean_result_name(result_name)
if selected_mode == "v1":
result_path = output_dir / f"{clean_name}.xlsx"
cmd = [
sys.executable,
str(PROCESSOR_DIR / "V1-ALL_convert_Lab_Test_data.py"),
str(data_dir),
str(result_path),
str(show_not_match),
str(show_all_infos),
data_type,
]
elif selected_mode == "v2":
cmd = [
sys.executable,
str(PROCESSOR_DIR / "V2-Every_Pat_File_convert_Lab_Test_data.py"),
"--file_dir",
str(data_dir),
"--result_save_file_name",
clean_name,
"--show_not_match",
str(show_not_match),
"--show_all_infos",
str(show_all_infos),
"--data_type",
data_type,
]
else:
raise ProcessingError("无法识别数据目录结构,请手动选择 V1 或 V2。")
env = os.environ.copy()
env["PYTHONUTF8"] = "1"
env["PYTHONIOENCODING"] = "utf-8"
completed = subprocess.run(
cmd,
cwd=PROCESSOR_DIR,
env=env,
text=True,
encoding="utf-8",
errors="replace",
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
timeout=60 * 30,
)
log_path = output_dir / "process.log"
log_path.write_text(
"mode=" + selected_mode + "\n\n" + completed.stdout,
encoding="utf-8",
)
if completed.returncode != 0:
raise ProcessingError(f"处理脚本退出码 {completed.returncode}\n{completed.stdout[-4000:]}")
if selected_mode == "v2":
_collect_v2_outputs(data_dir, output_dir)
_collect_logs(data_dir, output_dir)
xlsx_files = list(output_dir.rglob("*.xlsx"))
if not xlsx_files:
raise ProcessingError("处理完成但没有生成 Excel 文件,请检查数据结构和 process.log。")
result_zip = job_dir / "result.zip"
with zipfile.ZipFile(result_zip, "w", compression=zipfile.ZIP_DEFLATED) as zf:
for path in output_dir.rglob("*"):
if path.is_file():
zf.write(path, path.relative_to(output_dir))
return result_zip
def _safe_extract(zip_path: Path, target_dir: Path) -> None:
try:
with zipfile.ZipFile(zip_path) as zf:
for member in zf.infolist():
destination = (target_dir / member.filename).resolve()
if not str(destination).startswith(str(target_dir.resolve())):
raise ProcessingError("zip 中包含不安全路径。")
zf.extractall(target_dir)
except zipfile.BadZipFile as exc:
raise ProcessingError("zip 文件无法解压。") from exc
def _find_data_root(extract_dir: Path) -> Path:
candidates = [extract_dir]
children = [p for p in extract_dir.iterdir() if p.is_dir()]
if len(children) == 1 and not any(p.is_file() for p in extract_dir.iterdir()):
candidates.insert(0, children[0])
for candidate in candidates:
if (candidate / "Patients_info.csv").exists():
return candidate
for path in extract_dir.rglob("Patients_info.csv"):
return path.parent
raise ProcessingError("未找到 Patients_info.csv。")
def _detect_mode(data_dir: Path) -> str:
if (data_dir / "Tests_List").is_dir() and (data_dir / "Tests_Detail_List").is_dir():
return "v1"
patient_dirs = [p for p in data_dir.iterdir() if p.is_dir()]
for patient_dir in patient_dirs:
names = {p.name for p in patient_dir.iterdir()}
has_summary = any(name.endswith("_检测汇总.csv") for name in names)
has_detail_dir = any(name.endswith("_具体检测") and (patient_dir / name).is_dir() for name in names)
if has_summary and has_detail_dir:
return "v2"
raise ProcessingError("无法自动识别 V1/V2 数据结构。")
def _clean_result_name(result_name: str) -> str:
name = (result_name or "Result").strip()
if name.lower().endswith(".xlsx"):
name = name[:-5]
forbidden = '<>:"/\\|?*'
name = "".join("_" if ch in forbidden else ch for ch in name).strip(" .")
return name or "Result"
def _collect_v2_outputs(data_dir: Path, output_dir: Path) -> None:
v2_dir = output_dir / "V2患者结果"
v2_dir.mkdir(exist_ok=True)
for path in data_dir.rglob("*.xlsx"):
if path.is_file():
target = v2_dir / path.name
if target.exists():
target = v2_dir / f"{path.parent.name}_{path.name}"
shutil.copy2(path, target)
def _collect_logs(data_dir: Path, output_dir: Path) -> None:
logs_dir = output_dir / "logs"
for pattern in ("*.txt", "error.txt", "Error.txt"):
for path in data_dir.rglob(pattern):
if path.is_file():
logs_dir.mkdir(exist_ok=True)
relative = path.relative_to(data_dir)
target = logs_dir / relative
target.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(path, target)