Files
HIS_Sur_Data_Deal/app/processor.py
2026-05-08 23:06:00 +08:00

692 lines
24 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import csv
import os
import shutil
import subprocess
import sys
import uuid
import zipfile
from dataclasses import dataclass
from pathlib import Path
from openpyxl import load_workbook
from openpyxl.styles import PatternFill
from openpyxl.styles import Font
PROCESSOR_DIR = Path(__file__).resolve().parent / "processors"
SUMMARY_SHEET_NAME = "\u672a\u68c0\u6d4b\u5230\u5185\u5bb9\u6c47\u603b"
UNMATCHED_HEADER = "\u672a\u5339\u914d\u68c0\u6d4b\u5185\u5bb9"
UNMATCHED_FILL = PatternFill(fill_type="solid", fgColor="FCE4D6")
class ProcessingError(Exception):
pass
@dataclass
class SheetSummary:
name: str
rows: int
columns: int
preview: list[list[str]]
@dataclass
class ExcelSummary:
filename: str
relpath: str
sheets: list[SheetSummary]
@dataclass
class ProcessingResult:
job_id: str
mode: str
output_dir: Path
zip_path: Path
files: list[ExcelSummary]
def run_processing(
zip_path: Path,
job_dir: Path,
mode: str,
data_type: str,
result_name: str,
show_not_match: bool,
show_all_infos: bool,
preview_rows: int = 20,
include_basic_sheets: bool = True,
include_unmatched_items: bool = True,
include_summary_sheet: bool = True,
) -> ProcessingResult:
if mode not in {"auto", "v1", "v2"}:
raise ProcessingError("处理模式不正确。")
if data_type not in {"auto", "pat_no", "zhuyuanhao"}:
raise ProcessingError("患者编号类型不正确。")
extract_dir = job_dir / "input"
output_dir = job_dir / "output"
extract_dir.mkdir(parents=True, exist_ok=True)
output_dir.mkdir(parents=True, exist_ok=True)
_safe_extract(zip_path, extract_dir)
data_dir = _find_data_root(extract_dir)
selected_mode = _detect_mode(data_dir) if mode == "auto" else mode
selected_data_type = _detect_data_type(data_dir, selected_mode) if data_type == "auto" else data_type
clean_name = _clean_result_name(result_name)
if selected_mode == "v1":
result_path = output_dir / f"{clean_name}.xlsx"
cmd = [
sys.executable,
str(PROCESSOR_DIR / "V1-ALL_convert_Lab_Test_data.py"),
str(data_dir),
str(result_path),
str(show_not_match),
str(show_all_infos),
selected_data_type,
]
elif selected_mode == "v2":
cmd = [
sys.executable,
str(PROCESSOR_DIR / "V2-Every_Pat_File_convert_Lab_Test_data.py"),
"--file_dir",
str(data_dir),
"--result_save_file_name",
clean_name,
"--show_not_match",
str(show_not_match),
"--show_all_infos",
str(show_all_infos),
"--data_type",
selected_data_type,
]
else:
raise ProcessingError("无法识别数据目录结构,请手动选择 V1 或 V2。")
env = os.environ.copy()
env["PYTHONUTF8"] = "1"
env["PYTHONIOENCODING"] = "utf-8"
completed = subprocess.run(
cmd,
cwd=PROCESSOR_DIR,
env=env,
text=True,
encoding="utf-8",
errors="replace",
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
timeout=60 * 30,
)
(job_dir / "process.log").write_text(
"mode=" + selected_mode + "\n" + "data_type=" + selected_data_type + "\n\n" + completed.stdout,
encoding="utf-8",
)
if completed.returncode != 0:
raise ProcessingError(f"处理脚本退出码 {completed.returncode}\n{completed.stdout[-4000:]}")
if selected_mode == "v2":
_collect_v2_outputs(data_dir, output_dir)
xlsx_files = sorted(output_dir.rglob("*.xlsx"))
if not xlsx_files:
raise ProcessingError("处理完成但没有生成 Excel 文件,请检查数据结构。")
for xlsx_file in xlsx_files:
_remove_default_empty_sheet(xlsx_file)
_postprocess_workbook(
xlsx_file,
include_basic_sheets=include_basic_sheets,
include_unmatched_items=include_unmatched_items,
include_summary_sheet=include_summary_sheet,
)
result_zip = job_dir / "result.zip"
_create_result_zip(output_dir, result_zip)
return ProcessingResult(
job_id=job_dir.name,
mode=selected_mode,
output_dir=output_dir,
zip_path=result_zip,
files=[_summarize_workbook(path, output_dir, preview_rows) for path in xlsx_files],
)
def create_result_zip(
job_dir: Path,
include_basic_sheets: bool = True,
include_unmatched_items: bool = True,
include_summary_sheet: bool = True,
) -> Path:
output_dir = job_dir / "output"
result_zip = job_dir / "result.zip"
if not output_dir.exists():
raise ProcessingError("结果目录不存在。")
export_dir = _new_export_dir(job_dir)
for path in sorted(output_dir.rglob("*.xlsx")):
if path.is_file():
target = export_dir / path.relative_to(output_dir)
target.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(path, target)
_apply_export_options(
target,
include_basic_sheets=include_basic_sheets,
include_unmatched_items=include_unmatched_items,
include_summary_sheet=include_summary_sheet,
)
result_zip = export_dir / "result.zip"
_create_result_zip(export_dir, result_zip)
return result_zip
def summarize_job(job_dir: Path, preview_rows: int = 20) -> ProcessingResult:
output_dir = job_dir / "output"
if not output_dir.exists():
raise ProcessingError("结果目录不存在。")
xlsx_files = sorted(output_dir.rglob("*.xlsx"))
if not xlsx_files:
raise ProcessingError("结果文件不存在。")
result_zip = job_dir / "result.zip"
mode = _read_mode(job_dir)
return ProcessingResult(
job_id=job_dir.name,
mode=mode,
output_dir=output_dir,
zip_path=result_zip,
files=[_summarize_workbook(path, output_dir, preview_rows) for path in xlsx_files],
)
def find_output_file(
job_dir: Path,
relpath: str,
include_basic_sheets: bool = True,
include_unmatched_items: bool = True,
include_summary_sheet: bool = True,
) -> Path:
output_dir = (job_dir / "output").resolve()
target = (output_dir / relpath).resolve()
if not str(target).startswith(str(output_dir)) or not target.is_file():
raise ProcessingError("结果文件不存在。")
if target.suffix.lower() != ".xlsx":
raise ProcessingError("只能导出 Excel 结果文件。")
export_dir = _new_export_dir(job_dir)
export_target = export_dir / target.name
shutil.copy2(target, export_target)
_apply_export_options(
export_target,
include_basic_sheets=include_basic_sheets,
include_unmatched_items=include_unmatched_items,
include_summary_sheet=include_summary_sheet,
)
return export_target
def _new_export_dir(job_dir: Path) -> Path:
export_root = job_dir / "exports"
export_root.mkdir(exist_ok=True)
export_dir = export_root / uuid.uuid4().hex
export_dir.mkdir()
return export_dir
def _apply_export_options(
path: Path,
include_basic_sheets: bool,
include_unmatched_items: bool,
include_summary_sheet: bool,
) -> None:
if not include_basic_sheets and not include_summary_sheet:
raise ProcessingError("\u81f3\u5c11\u9700\u8981\u5bfc\u51fa\u57fa\u672c\u5de5\u4f5c\u8868\u6216\u672a\u68c0\u6d4b\u5230\u5185\u5bb9\u6c47\u603b\u8868\u3002")
workbook = load_workbook(path)
try:
if not include_unmatched_items:
_remove_unmatched_columns(workbook)
if not include_summary_sheet and SUMMARY_SHEET_NAME in workbook.sheetnames:
workbook.remove(workbook[SUMMARY_SHEET_NAME])
if not include_basic_sheets:
for sheet in list(workbook.worksheets):
if sheet.title != SUMMARY_SHEET_NAME:
workbook.remove(sheet)
if not workbook.worksheets:
workbook.create_sheet(SUMMARY_SHEET_NAME)
workbook.save(path)
finally:
workbook.close()
def _read_mode(job_dir: Path) -> str:
log_path = job_dir / "process.log"
if not log_path.exists():
return "unknown"
first_line = log_path.read_text(encoding="utf-8", errors="replace").splitlines()[0:1]
if first_line and first_line[0].startswith("mode="):
return first_line[0].split("=", 1)[1]
return "unknown"
def _safe_extract(zip_path: Path, target_dir: Path) -> None:
try:
with zipfile.ZipFile(zip_path) as zf:
for member in zf.infolist():
destination = (target_dir / member.filename).resolve()
if not str(destination).startswith(str(target_dir.resolve())):
raise ProcessingError("zip 中包含不安全路径。")
zf.extractall(target_dir)
except zipfile.BadZipFile as exc:
raise ProcessingError("zip 文件无法解压。") from exc
def _find_data_root(extract_dir: Path) -> Path:
candidates = [extract_dir]
children = [p for p in extract_dir.iterdir() if p.is_dir()]
if len(children) == 1 and not any(p.is_file() for p in extract_dir.iterdir()):
candidates.insert(0, children[0])
for candidate in candidates:
if (candidate / "Patients_info.csv").exists():
return candidate
for path in extract_dir.rglob("Patients_info.csv"):
return path.parent
raise ProcessingError("未找到 Patients_info.csv。")
def _detect_mode(data_dir: Path) -> str:
if (data_dir / "Tests_List").is_dir() and (data_dir / "Tests_Detail_List").is_dir():
return "v1"
patient_dirs = [p for p in data_dir.iterdir() if p.is_dir()]
for patient_dir in patient_dirs:
names = {p.name for p in patient_dir.iterdir()}
has_summary = any(name.endswith("_检测汇总.csv") for name in names)
has_detail_dir = any(name.endswith("_具体检测") and (patient_dir / name).is_dir() for name in names)
if has_summary and has_detail_dir:
return "v2"
raise ProcessingError("无法自动识别 V1/V2 数据结构。")
def _detect_data_type(data_dir: Path, selected_mode: str) -> str:
raw_ids = _read_patient_ids(data_dir / "Patients_info.csv")
if not raw_ids:
return "pat_no"
raw_id_set = set(raw_ids)
padded_ids = {_pad_patient_id(value) for value in raw_ids}
if selected_mode == "v1":
evidence_names = {
path.stem
for path in (data_dir / "Tests_List").glob("*.csv")
if path.is_file()
}
elif selected_mode == "v2":
evidence_names = {path.name for path in data_dir.iterdir() if path.is_dir()}
else:
evidence_names = set()
if not evidence_names:
return "pat_no"
raw_score = len(evidence_names & raw_id_set)
padded_score = len(evidence_names & padded_ids)
if raw_score > padded_score:
return "zhuyuanhao"
return "pat_no"
def _read_patient_ids(patients_info_path: Path) -> list[str]:
if not patients_info_path.exists():
return []
try:
return _read_patient_ids_with_encoding(patients_info_path, "utf-8-sig")
except UnicodeDecodeError:
return _read_patient_ids_with_encoding(patients_info_path, "gb18030")
def _read_patient_ids_with_encoding(patients_info_path: Path, encoding: str) -> list[str]:
with patients_info_path.open("r", encoding=encoding, newline="") as file:
return [
str(row.get("pat_no", "")).strip()
for row in csv.DictReader(file)
if str(row.get("pat_no", "")).strip()
]
def _pad_patient_id(value: str) -> str:
try:
return f"{int(value):010}"
except ValueError:
return value
def _clean_result_name(result_name: str) -> str:
name = (result_name or "Result").strip()
if name.lower().endswith(".xlsx"):
name = name[:-5]
forbidden = '<>:"/\\|?*'
name = "".join("_" if ch in forbidden else ch for ch in name).strip(" .")
return name or "Result"
def _collect_v2_outputs(data_dir: Path, output_dir: Path) -> None:
v2_dir = output_dir / "V2患者结果"
v2_dir.mkdir(exist_ok=True)
for path in data_dir.rglob("*.xlsx"):
if path.is_file():
target = v2_dir / path.name
if target.exists():
target = v2_dir / f"{path.parent.name}_{path.name}"
shutil.copy2(path, target)
def _create_result_zip(output_dir: Path, result_zip: Path) -> None:
with zipfile.ZipFile(result_zip, "w", compression=zipfile.ZIP_DEFLATED) as zf:
for path in sorted(output_dir.rglob("*.xlsx")):
if path.is_file():
zf.write(path, path.relative_to(output_dir))
def _remove_default_empty_sheet(path: Path) -> None:
workbook = load_workbook(path)
try:
if "Sheet" in workbook.sheetnames and len(workbook.sheetnames) > 1:
sheet = workbook["Sheet"]
if _is_empty_sheet(sheet):
workbook.remove(sheet)
workbook.save(path)
finally:
workbook.close()
def _postprocess_workbook(
path: Path,
include_basic_sheets: bool,
include_unmatched_items: bool,
include_summary_sheet: bool,
) -> None:
workbook = load_workbook(path)
try:
_normalize_unmatched_columns(workbook)
_remove_empty_not_find_rows(workbook)
summary_records = _collect_summary_records(workbook)
if not include_unmatched_items:
_remove_unmatched_columns(workbook)
if include_summary_sheet:
_replace_summary_sheet(workbook, summary_records)
if not include_basic_sheets:
for sheet in list(workbook.worksheets):
if sheet.title != SUMMARY_SHEET_NAME:
workbook.remove(sheet)
if not workbook.worksheets:
workbook.create_sheet(SUMMARY_SHEET_NAME)
workbook.save(path)
finally:
workbook.close()
def _normalize_unmatched_columns(workbook) -> None:
for sheet in workbook.worksheets:
if sheet.title == SUMMARY_SHEET_NAME or sheet.max_row < 2:
continue
header = [_cell_text(sheet.cell(1, col).value) for col in range(1, sheet.max_column + 1)]
unmatched_index = _find_header_index(header, UNMATCHED_HEADER)
if unmatched_index is None:
continue
marker_col = unmatched_index + 1
parsed_rows: list[dict[str, str]] = []
item_names: list[str] = []
for row_index in range(2, sheet.max_row + 1):
raw_values = [
_cell_text(sheet.cell(row_index, col).value)
for col in range(marker_col, sheet.max_column + 1)
]
parsed = _parse_unmatched_items([value for value in raw_values if value])
parsed_rows.append(parsed)
for item_name in parsed:
if item_name not in item_names:
item_names.append(item_name)
if sheet.max_column >= marker_col:
sheet.delete_cols(marker_col, sheet.max_column - marker_col + 1)
if not item_names:
sheet.delete_cols(marker_col, 1)
continue
sheet.cell(1, marker_col).value = UNMATCHED_HEADER
sheet.cell(1, marker_col).font = Font(bold=True)
sheet.cell(1, marker_col).fill = UNMATCHED_FILL
for offset, item_name in enumerate(item_names, start=1):
cell = sheet.cell(1, marker_col + offset)
cell.value = item_name
cell.font = Font(bold=True)
cell.fill = UNMATCHED_FILL
for row_index, parsed in enumerate(parsed_rows, start=2):
sheet.cell(row_index, marker_col).value = ""
sheet.cell(row_index, marker_col).fill = UNMATCHED_FILL
for offset, item_name in enumerate(item_names, start=1):
cell = sheet.cell(row_index, marker_col + offset)
cell.value = parsed.get(item_name, "")
cell.fill = UNMATCHED_FILL
def _remove_empty_not_find_rows(workbook) -> None:
for sheet in workbook.worksheets:
if sheet.title == SUMMARY_SHEET_NAME or sheet.max_row < 2:
continue
header = [_cell_text(sheet.cell(1, col).value) for col in range(1, sheet.max_column + 1)]
unmatched_index = _find_header_index(header, UNMATCHED_HEADER)
standard_end = unmatched_index if unmatched_index is not None else sheet.max_column
for row_index in range(sheet.max_row, 1, -1):
standard_values = [
_cell_text(sheet.cell(row_index, col).value)
for col in range(5, standard_end + 1)
]
unmatched_values = []
if unmatched_index is not None:
unmatched_values = [
_cell_text(sheet.cell(row_index, col).value)
for col in range(unmatched_index + 2, sheet.max_column + 1)
]
if _all_standard_values_missing(standard_values) and not any(unmatched_values):
sheet.delete_rows(row_index, 1)
def _collect_summary_records(workbook) -> list[dict[str, object]]:
records: list[dict[str, object]] = []
for sheet in workbook.worksheets:
if sheet.title == SUMMARY_SHEET_NAME or sheet.max_row < 2:
continue
header = [_cell_text(sheet.cell(1, col).value) for col in range(1, sheet.max_column + 1)]
unmatched_col = _find_header_index(header, UNMATCHED_HEADER)
if unmatched_col is None:
continue
for row_index in range(2, sheet.max_row + 1):
base_values = [_cell_text(sheet.cell(row_index, col).value) for col in range(1, 5)]
if not any(base_values):
continue
standard_values = [
_cell_text(sheet.cell(row_index, col).value)
for col in range(5, unmatched_col + 1)
]
item_values = {
_cell_text(sheet.cell(1, col).value): _cell_text(sheet.cell(row_index, col).value)
for col in range(unmatched_col + 2, sheet.max_column + 1)
if _cell_text(sheet.cell(1, col).value)
}
item_values = {name: value for name, value in item_values.items() if value}
if not item_values:
continue
if not _all_standard_values_missing(standard_values):
continue
records.append(
{
"name": base_values[0],
"patient_id": base_values[1],
"sample_time": base_values[2],
"reason": base_values[3],
"sheet": sheet.title,
"items": item_values,
}
)
return records
def _replace_summary_sheet(workbook, records: list[dict[str, object]]) -> None:
if SUMMARY_SHEET_NAME in workbook.sheetnames:
workbook.remove(workbook[SUMMARY_SHEET_NAME])
summary = workbook.create_sheet(SUMMARY_SHEET_NAME, 0)
if not records:
summary.append(["\u59d3\u540d", "\u4f4f\u9662\u53f7", "\u91c7\u6837\u65f6\u95f4", "\u68c0\u6d4b\u539f\u56e0"])
for cell in summary[summary.max_row]:
cell.font = Font(bold=True)
return
records = sorted(records, key=lambda item: (_cell_text(item["reason"]), _cell_text(item["sample_time"])))
reasons = []
for record in records:
reason = _cell_text(record["reason"])
if reason not in reasons:
reasons.append(reason)
for reason in reasons:
group = [record for record in records if _cell_text(record["reason"]) == reason]
item_names: list[str] = []
for record in group:
for item_name in record["items"]:
if item_name not in item_names:
item_names.append(item_name)
reason_label = f"\u68c0\u6d4b\u539f\u56e0\uff08\u4e0b\u65b9\u90fd\u662f{reason}\u539f\u56e0\uff09" if reason else "\u68c0\u6d4b\u539f\u56e0"
header = ["\u59d3\u540d", "\u4f4f\u9662\u53f7", "\u91c7\u6837\u65f6\u95f4", reason_label] + item_names
summary.append(header)
for cell in summary[summary.max_row]:
cell.font = Font(bold=True)
for record in group:
item_values = record["items"]
summary.append(
[
record["name"],
record["patient_id"],
record["sample_time"],
record["reason"],
]
+ [item_values.get(item_name, "") for item_name in item_names]
)
for column_cells in summary.columns:
max_length = max(len(_cell_text(cell.value)) for cell in column_cells)
summary.column_dimensions[column_cells[0].column_letter].width = min(max(max_length + 2, 12), 36)
def _remove_unmatched_columns(workbook) -> None:
for sheet in workbook.worksheets:
if sheet.title == SUMMARY_SHEET_NAME or sheet.max_row < 1:
continue
header = [_cell_text(sheet.cell(1, col).value) for col in range(1, sheet.max_column + 1)]
unmatched_col = _find_header_index(header, UNMATCHED_HEADER)
if unmatched_col is not None:
sheet.delete_cols(unmatched_col + 1, sheet.max_column - unmatched_col)
def _find_header_index(header: list[str], name: str) -> int | None:
for index, value in enumerate(header):
if value == name:
return index
return None
def _all_standard_values_missing(values: list[str]) -> bool:
non_empty_values = [value for value in values if value]
return bool(non_empty_values) and all(value == "Not_Find" for value in non_empty_values)
def _parse_unmatched_items(values: list[str]) -> dict[str, str]:
items: dict[str, list[str]] = {}
for value in values:
item_name, item_value = _split_unmatched_value(value)
if not item_name:
continue
items.setdefault(item_name, [])
if item_value and item_value not in items[item_name]:
items[item_name].append(item_value)
return {name: "".join(item_values) for name, item_values in items.items()}
def _split_unmatched_value(value: str) -> tuple[str, str]:
for separator in ("", ":"):
if separator in value:
name, result = value.split(separator, 1)
return name.strip(), result.strip()
return value.strip(), ""
def _is_empty_sheet(sheet) -> bool:
for row in sheet.iter_rows(values_only=True):
for value in row:
if value not in (None, ""):
return False
return True
def _summarize_workbook(path: Path, output_dir: Path, preview_rows: int) -> ExcelSummary:
sheets: list[SheetSummary] = []
workbook = load_workbook(path, read_only=True, data_only=True)
try:
for sheet in workbook.worksheets:
preview: list[list[str]] = []
for row in sheet.iter_rows(max_row=max(2, min(preview_rows, 200)), values_only=True):
preview.append([_cell_to_text(value) for value in row])
sheets.append(
SheetSummary(
name=sheet.title,
rows=sheet.max_row or 0,
columns=sheet.max_column or 0,
preview=preview,
)
)
finally:
workbook.close()
return ExcelSummary(
filename=path.name,
relpath=path.relative_to(output_dir).as_posix(),
sheets=sheets,
)
def _cell_to_text(value: object) -> str:
if value is None:
return ""
text = str(value)
return text if len(text) <= 80 else text[:77] + "..."
def _cell_text(value: object) -> str:
if value is None:
return ""
return str(value).strip()