From d21dae32317a013da622f7218e6955f25c4ea8f5 Mon Sep 17 00:00:00 2001
From: admin <572701190@qq.com>
Date: Fri, 8 May 2026 22:39:42 +0800
Subject: [PATCH] auto detect patient id type
---
README.md | 2 ++
app/main.py | 3 ++-
app/processor.py | 64 +++++++++++++++++++++++++++++++++++++++++++++---
3 files changed, 64 insertions(+), 5 deletions(-)
diff --git a/README.md b/README.md
index 4c0c6d8..9626ca8 100644
--- a/README.md
+++ b/README.md
@@ -31,6 +31,8 @@ V1:zip 解压后包含 `Patients_info.csv`、`Tests_List`、`Tests_Detail_List
V2:zip 解压后包含 `Patients_info.csv`,并按患者目录分别保存检测汇总和具体检测,输出多个患者 Excel。
+患者编号类型可选择自动识别。自动识别会读取 `Patients_info.csv` 中的 `pat_no`,并与 `Tests_List` 文件名或患者目录名比对:若更匹配 10 位补零编号,则使用 `pat_no`;若更匹配原始编号,则使用 `zhuyuanhao`。
+
导出的压缩包默认只包含 Excel 结果,不包含处理日志。系统默认输出全部检测记录,并可选择是否保留:
- 基本工作表
diff --git a/app/main.py b/app/main.py
index cb4381b..6eff762 100644
--- a/app/main.py
+++ b/app/main.py
@@ -48,6 +48,7 @@ def index() -> str:
@@ -78,7 +79,7 @@ def index() -> str:
async def process(
file: UploadFile = File(...),
mode: str = Form("auto"),
- data_type: str = Form("pat_no"),
+ data_type: str = Form("auto"),
result_name: str = Form("Result"),
preview_rows: int = Form(20),
include_basic_sheets: str | None = Form(None),
diff --git a/app/processor.py b/app/processor.py
index 79173ba..f2736d6 100644
--- a/app/processor.py
+++ b/app/processor.py
@@ -1,3 +1,4 @@
+import csv
import os
import shutil
import subprocess
@@ -56,7 +57,7 @@ def run_processing(
) -> ProcessingResult:
if mode not in {"auto", "v1", "v2"}:
raise ProcessingError("处理模式不正确。")
- if data_type not in {"pat_no", "zhuyuanhao"}:
+ if data_type not in {"auto", "pat_no", "zhuyuanhao"}:
raise ProcessingError("患者编号类型不正确。")
extract_dir = job_dir / "input"
@@ -67,6 +68,7 @@ def run_processing(
_safe_extract(zip_path, extract_dir)
data_dir = _find_data_root(extract_dir)
selected_mode = _detect_mode(data_dir) if mode == "auto" else mode
+ selected_data_type = _detect_data_type(data_dir, selected_mode) if data_type == "auto" else data_type
clean_name = _clean_result_name(result_name)
if selected_mode == "v1":
@@ -78,7 +80,7 @@ def run_processing(
str(result_path),
str(show_not_match),
str(show_all_infos),
- data_type,
+ selected_data_type,
]
elif selected_mode == "v2":
cmd = [
@@ -93,7 +95,7 @@ def run_processing(
"--show_all_infos",
str(show_all_infos),
"--data_type",
- data_type,
+ selected_data_type,
]
else:
raise ProcessingError("无法识别数据目录结构,请手动选择 V1 或 V2。")
@@ -115,7 +117,7 @@ def run_processing(
)
(job_dir / "process.log").write_text(
- "mode=" + selected_mode + "\n\n" + completed.stdout,
+ "mode=" + selected_mode + "\n" + "data_type=" + selected_data_type + "\n\n" + completed.stdout,
encoding="utf-8",
)
if completed.returncode != 0:
@@ -238,6 +240,60 @@ def _detect_mode(data_dir: Path) -> str:
raise ProcessingError("无法自动识别 V1/V2 数据结构。")
+def _detect_data_type(data_dir: Path, selected_mode: str) -> str:
+ raw_ids = _read_patient_ids(data_dir / "Patients_info.csv")
+ if not raw_ids:
+ return "pat_no"
+
+ raw_id_set = set(raw_ids)
+ padded_ids = {_pad_patient_id(value) for value in raw_ids}
+
+ if selected_mode == "v1":
+ evidence_names = {
+ path.stem
+ for path in (data_dir / "Tests_List").glob("*.csv")
+ if path.is_file()
+ }
+ elif selected_mode == "v2":
+ evidence_names = {path.name for path in data_dir.iterdir() if path.is_dir()}
+ else:
+ evidence_names = set()
+
+ if not evidence_names:
+ return "pat_no"
+
+ raw_score = len(evidence_names & raw_id_set)
+ padded_score = len(evidence_names & padded_ids)
+ if raw_score > padded_score:
+ return "zhuyuanhao"
+ return "pat_no"
+
+
+def _read_patient_ids(patients_info_path: Path) -> list[str]:
+ if not patients_info_path.exists():
+ return []
+ try:
+ return _read_patient_ids_with_encoding(patients_info_path, "utf-8-sig")
+ except UnicodeDecodeError:
+ return _read_patient_ids_with_encoding(patients_info_path, "gb18030")
+
+
+def _read_patient_ids_with_encoding(patients_info_path: Path, encoding: str) -> list[str]:
+ with patients_info_path.open("r", encoding=encoding, newline="") as file:
+ return [
+ str(row.get("pat_no", "")).strip()
+ for row in csv.DictReader(file)
+ if str(row.get("pat_no", "")).strip()
+ ]
+
+
+def _pad_patient_id(value: str) -> str:
+ try:
+ return f"{int(value):010}"
+ except ValueError:
+ return value
+
+
def _clean_result_name(result_name: str) -> str:
name = (result_name or "Result").strip()
if name.lower().endswith(".xlsx"):