auto detect patient id type

This commit is contained in:
2026-05-08 22:39:42 +08:00
parent 11600e7d09
commit d21dae3231
3 changed files with 64 additions and 5 deletions

View File

@@ -48,6 +48,7 @@ def index() -> str:
<div>
<label for="data_type">\u60a3\u8005\u7f16\u53f7\u7c7b\u578b</label>
<select id="data_type" name="data_type">
<option value="auto">\u81ea\u52a8\u8bc6\u522b</option>
<option value="pat_no">\u60a3\u8005\u53f7 pat_no</option>
<option value="zhuyuanhao">\u4f4f\u9662\u53f7 zhuyuanhao</option>
</select>
@@ -78,7 +79,7 @@ def index() -> str:
async def process(
file: UploadFile = File(...),
mode: str = Form("auto"),
data_type: str = Form("pat_no"),
data_type: str = Form("auto"),
result_name: str = Form("Result"),
preview_rows: int = Form(20),
include_basic_sheets: str | None = Form(None),

View File

@@ -1,3 +1,4 @@
import csv
import os
import shutil
import subprocess
@@ -56,7 +57,7 @@ def run_processing(
) -> ProcessingResult:
if mode not in {"auto", "v1", "v2"}:
raise ProcessingError("处理模式不正确。")
if data_type not in {"pat_no", "zhuyuanhao"}:
if data_type not in {"auto", "pat_no", "zhuyuanhao"}:
raise ProcessingError("患者编号类型不正确。")
extract_dir = job_dir / "input"
@@ -67,6 +68,7 @@ def run_processing(
_safe_extract(zip_path, extract_dir)
data_dir = _find_data_root(extract_dir)
selected_mode = _detect_mode(data_dir) if mode == "auto" else mode
selected_data_type = _detect_data_type(data_dir, selected_mode) if data_type == "auto" else data_type
clean_name = _clean_result_name(result_name)
if selected_mode == "v1":
@@ -78,7 +80,7 @@ def run_processing(
str(result_path),
str(show_not_match),
str(show_all_infos),
data_type,
selected_data_type,
]
elif selected_mode == "v2":
cmd = [
@@ -93,7 +95,7 @@ def run_processing(
"--show_all_infos",
str(show_all_infos),
"--data_type",
data_type,
selected_data_type,
]
else:
raise ProcessingError("无法识别数据目录结构,请手动选择 V1 或 V2。")
@@ -115,7 +117,7 @@ def run_processing(
)
(job_dir / "process.log").write_text(
"mode=" + selected_mode + "\n\n" + completed.stdout,
"mode=" + selected_mode + "\n" + "data_type=" + selected_data_type + "\n\n" + completed.stdout,
encoding="utf-8",
)
if completed.returncode != 0:
@@ -238,6 +240,60 @@ def _detect_mode(data_dir: Path) -> str:
raise ProcessingError("无法自动识别 V1/V2 数据结构。")
def _detect_data_type(data_dir: Path, selected_mode: str) -> str:
raw_ids = _read_patient_ids(data_dir / "Patients_info.csv")
if not raw_ids:
return "pat_no"
raw_id_set = set(raw_ids)
padded_ids = {_pad_patient_id(value) for value in raw_ids}
if selected_mode == "v1":
evidence_names = {
path.stem
for path in (data_dir / "Tests_List").glob("*.csv")
if path.is_file()
}
elif selected_mode == "v2":
evidence_names = {path.name for path in data_dir.iterdir() if path.is_dir()}
else:
evidence_names = set()
if not evidence_names:
return "pat_no"
raw_score = len(evidence_names & raw_id_set)
padded_score = len(evidence_names & padded_ids)
if raw_score > padded_score:
return "zhuyuanhao"
return "pat_no"
def _read_patient_ids(patients_info_path: Path) -> list[str]:
if not patients_info_path.exists():
return []
try:
return _read_patient_ids_with_encoding(patients_info_path, "utf-8-sig")
except UnicodeDecodeError:
return _read_patient_ids_with_encoding(patients_info_path, "gb18030")
def _read_patient_ids_with_encoding(patients_info_path: Path, encoding: str) -> list[str]:
with patients_info_path.open("r", encoding=encoding, newline="") as file:
return [
str(row.get("pat_no", "")).strip()
for row in csv.DictReader(file)
if str(row.get("pat_no", "")).strip()
]
def _pad_patient_id(value: str) -> str:
try:
return f"{int(value):010}"
except ValueError:
return value
def _clean_result_name(result_name: str) -> str:
name = (result_name or "Result").strip()
if name.lower().endswith(".xlsx"):