930 lines
31 KiB
Python
930 lines
31 KiB
Python
import csv
|
||
import os
|
||
import shutil
|
||
import subprocess
|
||
import sys
|
||
import uuid
|
||
import zipfile
|
||
from dataclasses import dataclass
|
||
from datetime import datetime
|
||
from pathlib import Path
|
||
|
||
from openpyxl import load_workbook
|
||
from openpyxl.styles import PatternFill
|
||
from openpyxl.styles import Font
|
||
|
||
|
||
PROCESSOR_DIR = Path(__file__).resolve().parent / "processors"
|
||
SUMMARY_SHEET_NAME = "\u672a\u68c0\u6d4b\u5230\u5185\u5bb9\u6c47\u603b"
|
||
UNASSIGNED_SHEET_NAME = "\u672a\u5f52\u5c5e\u68c0\u6d4b\u5185\u5bb9"
|
||
UNMATCHED_HEADER = "\u672a\u5339\u914d\u68c0\u6d4b\u5185\u5bb9"
|
||
UNMATCHED_FILL = PatternFill(fill_type="solid", fgColor="FCE4D6")
|
||
MAX_PREVIEW_ROWS = 10000
|
||
SORT_FIELDS = {"none", "name", "sample_time", "reason"}
|
||
SORT_ORDERS = {"asc", "desc"}
|
||
|
||
|
||
class ProcessingError(Exception):
|
||
pass
|
||
|
||
|
||
@dataclass
|
||
class SheetSummary:
|
||
name: str
|
||
rows: int
|
||
columns: int
|
||
preview: list[list[str]]
|
||
|
||
|
||
@dataclass
|
||
class ExcelSummary:
|
||
filename: str
|
||
relpath: str
|
||
sheets: list[SheetSummary]
|
||
|
||
|
||
@dataclass
|
||
class ProcessingResult:
|
||
job_id: str
|
||
mode: str
|
||
output_dir: Path
|
||
zip_path: Path
|
||
files: list[ExcelSummary]
|
||
|
||
|
||
def run_processing(
|
||
zip_path: Path,
|
||
job_dir: Path,
|
||
mode: str,
|
||
data_type: str,
|
||
result_name: str,
|
||
show_not_match: bool,
|
||
show_all_infos: bool,
|
||
preview_rows: int = 20,
|
||
include_basic_sheets: bool = True,
|
||
include_unmatched_items: bool = True,
|
||
include_summary_sheet: bool = True,
|
||
sort_by: str = "sample_time",
|
||
sort_order: str = "asc",
|
||
) -> ProcessingResult:
|
||
if mode not in {"auto", "v1", "v2"}:
|
||
raise ProcessingError("处理模式不正确。")
|
||
if data_type not in {"auto", "pat_no", "zhuyuanhao"}:
|
||
raise ProcessingError("患者编号类型不正确。")
|
||
|
||
extract_dir = job_dir / "input"
|
||
output_dir = job_dir / "output"
|
||
extract_dir.mkdir(parents=True, exist_ok=True)
|
||
output_dir.mkdir(parents=True, exist_ok=True)
|
||
|
||
_safe_extract(zip_path, extract_dir)
|
||
data_dir = _find_data_root(extract_dir)
|
||
selected_mode = _detect_mode(data_dir) if mode == "auto" else mode
|
||
selected_data_type = _detect_data_type(data_dir, selected_mode) if data_type == "auto" else data_type
|
||
|
||
clean_name = _clean_result_name(result_name)
|
||
if selected_mode == "v1":
|
||
result_path = output_dir / f"{clean_name}.xlsx"
|
||
cmd = [
|
||
sys.executable,
|
||
str(PROCESSOR_DIR / "V1-ALL_convert_Lab_Test_data.py"),
|
||
str(data_dir),
|
||
str(result_path),
|
||
str(show_not_match),
|
||
str(show_all_infos),
|
||
selected_data_type,
|
||
]
|
||
elif selected_mode == "v2":
|
||
cmd = [
|
||
sys.executable,
|
||
str(PROCESSOR_DIR / "V2-Every_Pat_File_convert_Lab_Test_data.py"),
|
||
"--file_dir",
|
||
str(data_dir),
|
||
"--result_save_file_name",
|
||
clean_name,
|
||
"--show_not_match",
|
||
str(show_not_match),
|
||
"--show_all_infos",
|
||
str(show_all_infos),
|
||
"--data_type",
|
||
selected_data_type,
|
||
]
|
||
else:
|
||
raise ProcessingError("无法识别数据目录结构,请手动选择 V1 或 V2。")
|
||
|
||
env = os.environ.copy()
|
||
env["PYTHONUTF8"] = "1"
|
||
env["PYTHONIOENCODING"] = "utf-8"
|
||
|
||
completed = subprocess.run(
|
||
cmd,
|
||
cwd=PROCESSOR_DIR,
|
||
env=env,
|
||
text=True,
|
||
encoding="utf-8",
|
||
errors="replace",
|
||
stdout=subprocess.PIPE,
|
||
stderr=subprocess.STDOUT,
|
||
timeout=60 * 30,
|
||
)
|
||
|
||
(job_dir / "process.log").write_text(
|
||
"mode=" + selected_mode + "\n" + "data_type=" + selected_data_type + "\n\n" + completed.stdout,
|
||
encoding="utf-8",
|
||
)
|
||
if completed.returncode != 0:
|
||
raise ProcessingError(f"处理脚本退出码 {completed.returncode}。\n{completed.stdout[-4000:]}")
|
||
|
||
if selected_mode == "v2":
|
||
_collect_v2_outputs(data_dir, output_dir)
|
||
|
||
xlsx_files = sorted(output_dir.rglob("*.xlsx"))
|
||
if not xlsx_files:
|
||
raise ProcessingError("处理完成但没有生成 Excel 文件,请检查数据结构。")
|
||
|
||
for xlsx_file in xlsx_files:
|
||
_remove_default_empty_sheet(xlsx_file)
|
||
_postprocess_workbook(
|
||
xlsx_file,
|
||
include_basic_sheets=include_basic_sheets,
|
||
include_unmatched_items=include_unmatched_items,
|
||
include_summary_sheet=include_summary_sheet,
|
||
sort_by=sort_by,
|
||
sort_order=sort_order,
|
||
)
|
||
|
||
result_zip = job_dir / "result.zip"
|
||
_create_result_zip(output_dir, result_zip)
|
||
return ProcessingResult(
|
||
job_id=job_dir.name,
|
||
mode=selected_mode,
|
||
output_dir=output_dir,
|
||
zip_path=result_zip,
|
||
files=[_summarize_workbook(path, output_dir, preview_rows) for path in xlsx_files],
|
||
)
|
||
|
||
|
||
def create_result_zip(
|
||
job_dir: Path,
|
||
include_basic_sheets: bool = True,
|
||
include_unmatched_items: bool = True,
|
||
include_summary_sheet: bool = True,
|
||
sort_by: str = "sample_time",
|
||
sort_order: str = "asc",
|
||
) -> Path:
|
||
output_dir = job_dir / "output"
|
||
result_zip = job_dir / "result.zip"
|
||
if not output_dir.exists():
|
||
raise ProcessingError("结果目录不存在。")
|
||
export_dir = _new_export_dir(job_dir)
|
||
for path in sorted(output_dir.rglob("*.xlsx")):
|
||
if path.is_file():
|
||
target = export_dir / path.relative_to(output_dir)
|
||
target.parent.mkdir(parents=True, exist_ok=True)
|
||
shutil.copy2(path, target)
|
||
_apply_export_options(
|
||
target,
|
||
include_basic_sheets=include_basic_sheets,
|
||
include_unmatched_items=include_unmatched_items,
|
||
include_summary_sheet=include_summary_sheet,
|
||
sort_by=sort_by,
|
||
sort_order=sort_order,
|
||
)
|
||
|
||
result_zip = export_dir / "result.zip"
|
||
_create_result_zip(export_dir, result_zip)
|
||
return result_zip
|
||
|
||
|
||
def summarize_job(
|
||
job_dir: Path,
|
||
preview_rows: int = 20,
|
||
include_basic_sheets: bool = True,
|
||
include_unmatched_items: bool = True,
|
||
include_summary_sheet: bool = True,
|
||
sort_by: str = "sample_time",
|
||
sort_order: str = "asc",
|
||
) -> ProcessingResult:
|
||
output_dir = job_dir / "output"
|
||
if not output_dir.exists():
|
||
raise ProcessingError("结果目录不存在。")
|
||
preview_dir = _new_export_dir(job_dir)
|
||
for path in sorted(output_dir.rglob("*.xlsx")):
|
||
if path.is_file():
|
||
target = preview_dir / path.relative_to(output_dir)
|
||
target.parent.mkdir(parents=True, exist_ok=True)
|
||
shutil.copy2(path, target)
|
||
_apply_export_options(
|
||
target,
|
||
include_basic_sheets=include_basic_sheets,
|
||
include_unmatched_items=include_unmatched_items,
|
||
include_summary_sheet=include_summary_sheet,
|
||
sort_by=sort_by,
|
||
sort_order=sort_order,
|
||
)
|
||
|
||
xlsx_files = sorted(preview_dir.rglob("*.xlsx"))
|
||
if not xlsx_files:
|
||
raise ProcessingError("结果文件不存在。")
|
||
result_zip = job_dir / "result.zip"
|
||
mode = _read_mode(job_dir)
|
||
return ProcessingResult(
|
||
job_id=job_dir.name,
|
||
mode=mode,
|
||
output_dir=preview_dir,
|
||
zip_path=result_zip,
|
||
files=[_summarize_workbook(path, preview_dir, preview_rows) for path in xlsx_files],
|
||
)
|
||
|
||
|
||
def find_output_file(
|
||
job_dir: Path,
|
||
relpath: str,
|
||
include_basic_sheets: bool = True,
|
||
include_unmatched_items: bool = True,
|
||
include_summary_sheet: bool = True,
|
||
sort_by: str = "sample_time",
|
||
sort_order: str = "asc",
|
||
) -> Path:
|
||
output_dir = (job_dir / "output").resolve()
|
||
target = (output_dir / relpath).resolve()
|
||
if not str(target).startswith(str(output_dir)) or not target.is_file():
|
||
raise ProcessingError("结果文件不存在。")
|
||
if target.suffix.lower() != ".xlsx":
|
||
raise ProcessingError("只能导出 Excel 结果文件。")
|
||
export_dir = _new_export_dir(job_dir)
|
||
export_target = export_dir / target.name
|
||
shutil.copy2(target, export_target)
|
||
_apply_export_options(
|
||
export_target,
|
||
include_basic_sheets=include_basic_sheets,
|
||
include_unmatched_items=include_unmatched_items,
|
||
include_summary_sheet=include_summary_sheet,
|
||
sort_by=sort_by,
|
||
sort_order=sort_order,
|
||
)
|
||
return export_target
|
||
|
||
|
||
def _new_export_dir(job_dir: Path) -> Path:
|
||
export_root = job_dir / "exports"
|
||
export_root.mkdir(exist_ok=True)
|
||
export_dir = export_root / uuid.uuid4().hex
|
||
export_dir.mkdir()
|
||
return export_dir
|
||
|
||
|
||
def _apply_export_options(
|
||
path: Path,
|
||
include_basic_sheets: bool,
|
||
include_unmatched_items: bool,
|
||
include_summary_sheet: bool,
|
||
sort_by: str,
|
||
sort_order: str,
|
||
) -> None:
|
||
if not include_basic_sheets and not include_summary_sheet:
|
||
raise ProcessingError("\u81f3\u5c11\u9700\u8981\u5bfc\u51fa\u57fa\u672c\u5de5\u4f5c\u8868\u6216\u672a\u68c0\u6d4b\u5230\u5185\u5bb9\u6c47\u603b\u8868\u3002")
|
||
|
||
workbook = load_workbook(path)
|
||
try:
|
||
if not include_unmatched_items:
|
||
_remove_unmatched_columns(workbook)
|
||
|
||
if not include_summary_sheet and SUMMARY_SHEET_NAME in workbook.sheetnames:
|
||
workbook.remove(workbook[SUMMARY_SHEET_NAME])
|
||
|
||
if not include_basic_sheets:
|
||
for sheet in list(workbook.worksheets):
|
||
if sheet.title != SUMMARY_SHEET_NAME:
|
||
workbook.remove(sheet)
|
||
|
||
if not workbook.worksheets:
|
||
workbook.create_sheet(SUMMARY_SHEET_NAME)
|
||
|
||
_sort_workbook(workbook, sort_by, sort_order)
|
||
workbook.save(path)
|
||
finally:
|
||
workbook.close()
|
||
|
||
|
||
def _read_mode(job_dir: Path) -> str:
|
||
log_path = job_dir / "process.log"
|
||
if not log_path.exists():
|
||
return "unknown"
|
||
first_line = log_path.read_text(encoding="utf-8", errors="replace").splitlines()[0:1]
|
||
if first_line and first_line[0].startswith("mode="):
|
||
return first_line[0].split("=", 1)[1]
|
||
return "unknown"
|
||
|
||
|
||
def _safe_extract(zip_path: Path, target_dir: Path) -> None:
|
||
try:
|
||
with zipfile.ZipFile(zip_path) as zf:
|
||
for member in zf.infolist():
|
||
destination = (target_dir / member.filename).resolve()
|
||
if not str(destination).startswith(str(target_dir.resolve())):
|
||
raise ProcessingError("zip 中包含不安全路径。")
|
||
zf.extractall(target_dir)
|
||
except zipfile.BadZipFile as exc:
|
||
raise ProcessingError("zip 文件无法解压。") from exc
|
||
|
||
|
||
def _find_data_root(extract_dir: Path) -> Path:
|
||
candidates = [extract_dir]
|
||
children = [p for p in extract_dir.iterdir() if p.is_dir()]
|
||
if len(children) == 1 and not any(p.is_file() for p in extract_dir.iterdir()):
|
||
candidates.insert(0, children[0])
|
||
|
||
for candidate in candidates:
|
||
if (candidate / "Patients_info.csv").exists():
|
||
return candidate
|
||
|
||
for path in extract_dir.rglob("Patients_info.csv"):
|
||
return path.parent
|
||
|
||
raise ProcessingError("未找到 Patients_info.csv。")
|
||
|
||
|
||
def _detect_mode(data_dir: Path) -> str:
|
||
if (data_dir / "Tests_List").is_dir() and (data_dir / "Tests_Detail_List").is_dir():
|
||
return "v1"
|
||
|
||
patient_dirs = [p for p in data_dir.iterdir() if p.is_dir()]
|
||
for patient_dir in patient_dirs:
|
||
names = {p.name for p in patient_dir.iterdir()}
|
||
has_summary = any(name.endswith("_检测汇总.csv") for name in names)
|
||
has_detail_dir = any(name.endswith("_具体检测") and (patient_dir / name).is_dir() for name in names)
|
||
if has_summary and has_detail_dir:
|
||
return "v2"
|
||
|
||
raise ProcessingError("无法自动识别 V1/V2 数据结构。")
|
||
|
||
|
||
def _detect_data_type(data_dir: Path, selected_mode: str) -> str:
|
||
raw_ids = _read_patient_ids(data_dir / "Patients_info.csv")
|
||
if not raw_ids:
|
||
return "pat_no"
|
||
|
||
raw_id_set = set(raw_ids)
|
||
padded_ids = {_pad_patient_id(value) for value in raw_ids}
|
||
|
||
if selected_mode == "v1":
|
||
evidence_names = {
|
||
path.stem
|
||
for path in (data_dir / "Tests_List").glob("*.csv")
|
||
if path.is_file()
|
||
}
|
||
elif selected_mode == "v2":
|
||
evidence_names = {path.name for path in data_dir.iterdir() if path.is_dir()}
|
||
else:
|
||
evidence_names = set()
|
||
|
||
if not evidence_names:
|
||
return "pat_no"
|
||
|
||
raw_score = len(evidence_names & raw_id_set)
|
||
padded_score = len(evidence_names & padded_ids)
|
||
if raw_score > padded_score:
|
||
return "zhuyuanhao"
|
||
return "pat_no"
|
||
|
||
|
||
def _read_patient_ids(patients_info_path: Path) -> list[str]:
|
||
if not patients_info_path.exists():
|
||
return []
|
||
try:
|
||
return _read_patient_ids_with_encoding(patients_info_path, "utf-8-sig")
|
||
except UnicodeDecodeError:
|
||
return _read_patient_ids_with_encoding(patients_info_path, "gb18030")
|
||
|
||
|
||
def _read_patient_ids_with_encoding(patients_info_path: Path, encoding: str) -> list[str]:
|
||
with patients_info_path.open("r", encoding=encoding, newline="") as file:
|
||
return [
|
||
str(row.get("pat_no", "")).strip()
|
||
for row in csv.DictReader(file)
|
||
if str(row.get("pat_no", "")).strip()
|
||
]
|
||
|
||
|
||
def _pad_patient_id(value: str) -> str:
|
||
try:
|
||
return f"{int(value):010}"
|
||
except ValueError:
|
||
return value
|
||
|
||
|
||
def _clean_result_name(result_name: str) -> str:
|
||
name = (result_name or "Result").strip()
|
||
if name.lower().endswith(".xlsx"):
|
||
name = name[:-5]
|
||
forbidden = '<>:"/\\|?*'
|
||
name = "".join("_" if ch in forbidden else ch for ch in name).strip(" .")
|
||
return name or "Result"
|
||
|
||
|
||
def _collect_v2_outputs(data_dir: Path, output_dir: Path) -> None:
|
||
v2_dir = output_dir / "V2患者结果"
|
||
v2_dir.mkdir(exist_ok=True)
|
||
for path in data_dir.rglob("*.xlsx"):
|
||
if path.is_file():
|
||
target = v2_dir / path.name
|
||
if target.exists():
|
||
target = v2_dir / f"{path.parent.name}_{path.name}"
|
||
shutil.copy2(path, target)
|
||
|
||
|
||
def _create_result_zip(output_dir: Path, result_zip: Path) -> None:
|
||
with zipfile.ZipFile(result_zip, "w", compression=zipfile.ZIP_DEFLATED) as zf:
|
||
for path in sorted(output_dir.rglob("*.xlsx")):
|
||
if path.is_file():
|
||
zf.write(path, path.relative_to(output_dir))
|
||
|
||
|
||
def _remove_default_empty_sheet(path: Path) -> None:
|
||
workbook = load_workbook(path)
|
||
try:
|
||
if "Sheet" in workbook.sheetnames and len(workbook.sheetnames) > 1:
|
||
sheet = workbook["Sheet"]
|
||
if _is_empty_sheet(sheet):
|
||
workbook.remove(sheet)
|
||
workbook.save(path)
|
||
finally:
|
||
workbook.close()
|
||
|
||
|
||
def _postprocess_workbook(
|
||
path: Path,
|
||
include_basic_sheets: bool,
|
||
include_unmatched_items: bool,
|
||
include_summary_sheet: bool,
|
||
sort_by: str,
|
||
sort_order: str,
|
||
) -> None:
|
||
workbook = load_workbook(path)
|
||
try:
|
||
_normalize_unmatched_columns(workbook)
|
||
_deduplicate_regular_rows(workbook)
|
||
summary_records = _collect_summary_records(workbook)
|
||
_remove_not_found_rows(workbook)
|
||
_remove_empty_unmatched_columns(workbook)
|
||
|
||
if not include_unmatched_items:
|
||
_remove_unmatched_columns(workbook)
|
||
|
||
if include_summary_sheet:
|
||
_replace_summary_sheet(workbook, summary_records)
|
||
|
||
if not include_basic_sheets:
|
||
for sheet in list(workbook.worksheets):
|
||
if sheet.title != SUMMARY_SHEET_NAME:
|
||
workbook.remove(sheet)
|
||
|
||
if not workbook.worksheets:
|
||
workbook.create_sheet(SUMMARY_SHEET_NAME)
|
||
|
||
_sort_workbook(workbook, sort_by, sort_order)
|
||
workbook.save(path)
|
||
finally:
|
||
workbook.close()
|
||
|
||
|
||
def _normalize_unmatched_columns(workbook) -> None:
|
||
for sheet in workbook.worksheets:
|
||
if sheet.title == SUMMARY_SHEET_NAME or sheet.max_row < 2:
|
||
continue
|
||
|
||
header = [_cell_text(sheet.cell(1, col).value) for col in range(1, sheet.max_column + 1)]
|
||
unmatched_index = _find_header_index(header, UNMATCHED_HEADER)
|
||
if unmatched_index is None:
|
||
continue
|
||
|
||
marker_col = unmatched_index + 1
|
||
parsed_rows: list[dict[str, str]] = []
|
||
item_names: list[str] = []
|
||
for row_index in range(2, sheet.max_row + 1):
|
||
raw_values = [
|
||
_cell_text(sheet.cell(row_index, col).value)
|
||
for col in range(marker_col, sheet.max_column + 1)
|
||
]
|
||
parsed = _parse_unmatched_items([value for value in raw_values if value])
|
||
parsed_rows.append(parsed)
|
||
for item_name in parsed:
|
||
if item_name not in item_names:
|
||
item_names.append(item_name)
|
||
|
||
if not item_names:
|
||
if sheet.max_column >= marker_col:
|
||
sheet.delete_cols(marker_col, sheet.max_column - marker_col + 1)
|
||
continue
|
||
|
||
if sheet.max_column >= marker_col:
|
||
sheet.delete_cols(marker_col, sheet.max_column - marker_col + 1)
|
||
|
||
sheet.cell(1, marker_col).value = UNMATCHED_HEADER
|
||
sheet.cell(1, marker_col).font = Font(bold=True)
|
||
sheet.cell(1, marker_col).fill = UNMATCHED_FILL
|
||
for offset, item_name in enumerate(item_names, start=1):
|
||
cell = sheet.cell(1, marker_col + offset)
|
||
cell.value = item_name
|
||
cell.font = Font(bold=True)
|
||
cell.fill = UNMATCHED_FILL
|
||
|
||
for row_index, parsed in enumerate(parsed_rows, start=2):
|
||
sheet.cell(row_index, marker_col).value = ""
|
||
sheet.cell(row_index, marker_col).fill = UNMATCHED_FILL
|
||
for offset, item_name in enumerate(item_names, start=1):
|
||
cell = sheet.cell(row_index, marker_col + offset)
|
||
cell.value = parsed.get(item_name, "")
|
||
cell.fill = UNMATCHED_FILL
|
||
|
||
|
||
def _remove_not_found_rows(workbook) -> None:
|
||
for sheet in workbook.worksheets:
|
||
if sheet.title == SUMMARY_SHEET_NAME or sheet.max_row < 2:
|
||
continue
|
||
header = [_cell_text(sheet.cell(1, col).value) for col in range(1, sheet.max_column + 1)]
|
||
unmatched_index = _find_header_index(header, UNMATCHED_HEADER)
|
||
standard_end = unmatched_index if unmatched_index is not None else sheet.max_column
|
||
|
||
for row_index in range(sheet.max_row, 1, -1):
|
||
standard_values = [
|
||
_cell_text(sheet.cell(row_index, col).value)
|
||
for col in range(5, standard_end + 1)
|
||
]
|
||
if _all_standard_values_missing(standard_values):
|
||
sheet.delete_rows(row_index, 1)
|
||
|
||
|
||
def _deduplicate_regular_rows(workbook) -> None:
|
||
for sheet in workbook.worksheets:
|
||
if sheet.title == SUMMARY_SHEET_NAME or sheet.max_row < 3:
|
||
continue
|
||
|
||
seen = set()
|
||
for row_index in range(sheet.max_row, 1, -1):
|
||
values = tuple(
|
||
_cell_text(sheet.cell(row_index, col).value)
|
||
for col in range(1, sheet.max_column + 1)
|
||
)
|
||
if not any(values):
|
||
continue
|
||
if values in seen:
|
||
sheet.delete_rows(row_index, 1)
|
||
else:
|
||
seen.add(values)
|
||
|
||
|
||
def _collect_summary_records(workbook) -> list[dict[str, object]]:
|
||
records: list[dict[str, object]] = []
|
||
for sheet in workbook.worksheets:
|
||
if sheet.title != UNASSIGNED_SHEET_NAME or sheet.max_row < 2:
|
||
continue
|
||
header = [_cell_text(sheet.cell(1, col).value) for col in range(1, sheet.max_column + 1)]
|
||
unmatched_col = _find_header_index(header, UNMATCHED_HEADER)
|
||
standard_end = unmatched_col if unmatched_col is not None else sheet.max_column
|
||
|
||
for row_index in range(2, sheet.max_row + 1):
|
||
base_values = [_cell_text(sheet.cell(row_index, col).value) for col in range(1, 5)]
|
||
if not any(base_values):
|
||
continue
|
||
standard_values = [
|
||
_cell_text(sheet.cell(row_index, col).value)
|
||
for col in range(5, standard_end + 1)
|
||
]
|
||
if standard_values and not _all_standard_values_missing(standard_values):
|
||
continue
|
||
item_values = _summary_item_values(sheet, row_index, header, unmatched_col, standard_end)
|
||
if not item_values:
|
||
continue
|
||
records.append(
|
||
{
|
||
"name": base_values[0],
|
||
"patient_id": base_values[1],
|
||
"sample_time": base_values[2],
|
||
"reason": base_values[3],
|
||
"sheet": sheet.title,
|
||
"items": item_values,
|
||
}
|
||
)
|
||
return records
|
||
|
||
|
||
def _summary_item_values(
|
||
sheet,
|
||
row_index: int,
|
||
header: list[str],
|
||
unmatched_col: int | None,
|
||
standard_end: int,
|
||
) -> dict[str, str]:
|
||
item_values: dict[str, str] = {}
|
||
if unmatched_col is not None:
|
||
for col in range(unmatched_col + 2, sheet.max_column + 1):
|
||
item_name = _cell_text(sheet.cell(1, col).value)
|
||
item_value = _cell_text(sheet.cell(row_index, col).value)
|
||
if item_name and item_value:
|
||
item_values[item_name] = item_value
|
||
if item_values:
|
||
return item_values
|
||
|
||
for col in range(5, standard_end + 1):
|
||
item_name = header[col - 1] if col - 1 < len(header) else ""
|
||
item_value = _cell_text(sheet.cell(row_index, col).value)
|
||
if item_name and item_value:
|
||
item_values[item_name] = item_value
|
||
return item_values
|
||
|
||
|
||
def _replace_summary_sheet(workbook, records: list[dict[str, object]]) -> None:
|
||
if SUMMARY_SHEET_NAME in workbook.sheetnames:
|
||
workbook.remove(workbook[SUMMARY_SHEET_NAME])
|
||
summary = workbook.create_sheet(SUMMARY_SHEET_NAME, 0)
|
||
if not records:
|
||
summary.append(["\u59d3\u540d", "\u4f4f\u9662\u53f7", "\u91c7\u6837\u65f6\u95f4", "\u68c0\u6d4b\u539f\u56e0"])
|
||
for cell in summary[summary.max_row]:
|
||
cell.font = Font(bold=True)
|
||
return
|
||
|
||
records = sorted(records, key=lambda item: (_cell_text(item["reason"]), _cell_text(item["sample_time"])))
|
||
reasons = []
|
||
for record in records:
|
||
reason = _cell_text(record["reason"])
|
||
if reason not in reasons:
|
||
reasons.append(reason)
|
||
|
||
for reason in reasons:
|
||
group = [record for record in records if _cell_text(record["reason"]) == reason]
|
||
item_names: list[str] = []
|
||
for record in group:
|
||
for item_name in record["items"]:
|
||
if item_name not in item_names:
|
||
item_names.append(item_name)
|
||
|
||
reason_label = f"\u68c0\u6d4b\u539f\u56e0\uff08\u4e0b\u65b9\u90fd\u662f{reason}\u539f\u56e0\uff09" if reason else "\u68c0\u6d4b\u539f\u56e0"
|
||
header = ["\u59d3\u540d", "\u4f4f\u9662\u53f7", "\u91c7\u6837\u65f6\u95f4", reason_label] + item_names
|
||
summary.append(header)
|
||
for cell in summary[summary.max_row]:
|
||
cell.font = Font(bold=True)
|
||
|
||
for record in group:
|
||
item_values = record["items"]
|
||
summary.append(
|
||
[
|
||
record["name"],
|
||
record["patient_id"],
|
||
record["sample_time"],
|
||
record["reason"],
|
||
]
|
||
+ [item_values.get(item_name, "") for item_name in item_names]
|
||
)
|
||
|
||
for column_cells in summary.columns:
|
||
max_length = max(len(_cell_text(cell.value)) for cell in column_cells)
|
||
summary.column_dimensions[column_cells[0].column_letter].width = min(max(max_length + 2, 12), 36)
|
||
|
||
|
||
def _remove_unmatched_columns(workbook) -> None:
|
||
for sheet in workbook.worksheets:
|
||
if sheet.title == SUMMARY_SHEET_NAME or sheet.max_row < 1:
|
||
continue
|
||
header = [_cell_text(sheet.cell(1, col).value) for col in range(1, sheet.max_column + 1)]
|
||
unmatched_col = _find_header_index(header, UNMATCHED_HEADER)
|
||
if unmatched_col is not None:
|
||
sheet.delete_cols(unmatched_col + 1, sheet.max_column - unmatched_col)
|
||
|
||
|
||
def _remove_empty_unmatched_columns(workbook) -> None:
|
||
for sheet in workbook.worksheets:
|
||
if sheet.title == SUMMARY_SHEET_NAME or sheet.max_row < 1:
|
||
continue
|
||
header = [_cell_text(sheet.cell(1, col).value) for col in range(1, sheet.max_column + 1)]
|
||
unmatched_col = _find_header_index(header, UNMATCHED_HEADER)
|
||
if unmatched_col is None:
|
||
continue
|
||
|
||
first_col = unmatched_col + 1
|
||
item_cols = [
|
||
col
|
||
for col in range(first_col + 1, sheet.max_column + 1)
|
||
if _cell_text(sheet.cell(1, col).value)
|
||
]
|
||
used_item_cols = [
|
||
col
|
||
for col in item_cols
|
||
if any(_cell_text(sheet.cell(row, col).value) for row in range(2, sheet.max_row + 1))
|
||
]
|
||
if not used_item_cols:
|
||
sheet.delete_cols(first_col, sheet.max_column - unmatched_col)
|
||
continue
|
||
|
||
for col in reversed(item_cols):
|
||
if col not in used_item_cols:
|
||
sheet.delete_cols(col, 1)
|
||
|
||
|
||
def _sort_workbook(workbook, sort_by: str, sort_order: str) -> None:
|
||
sort_by = sort_by if sort_by in SORT_FIELDS else "sample_time"
|
||
sort_order = sort_order if sort_order in SORT_ORDERS else "asc"
|
||
if sort_by == "none":
|
||
return
|
||
|
||
reverse = sort_order == "desc"
|
||
for sheet in workbook.worksheets:
|
||
if sheet.max_row < 3:
|
||
continue
|
||
if sheet.title == SUMMARY_SHEET_NAME:
|
||
_sort_summary_sheet(sheet, sort_by, reverse)
|
||
else:
|
||
_sort_regular_sheet(sheet, sort_by, reverse)
|
||
|
||
|
||
def _sort_regular_sheet(sheet, sort_by: str, reverse: bool) -> None:
|
||
header = [_cell_text(sheet.cell(1, col).value) for col in range(1, sheet.max_column + 1)]
|
||
sort_col = _sort_column_from_header(header, sort_by)
|
||
if sort_col is None:
|
||
return
|
||
|
||
rows = _read_rows(sheet, 2, sheet.max_row)
|
||
_sort_rows(rows, sort_col, sort_by, reverse)
|
||
_write_rows(sheet, 2, rows)
|
||
|
||
|
||
def _sort_summary_sheet(sheet, sort_by: str, reverse: bool) -> None:
|
||
row_index = 1
|
||
while row_index <= sheet.max_row:
|
||
row = [_cell_text(sheet.cell(row_index, col).value) for col in range(1, min(sheet.max_column, 4) + 1)]
|
||
if not _is_summary_header_values(row):
|
||
row_index += 1
|
||
continue
|
||
|
||
header = [_cell_text(sheet.cell(row_index, col).value) for col in range(1, sheet.max_column + 1)]
|
||
sort_col = _sort_column_from_header(header, sort_by)
|
||
if sort_col is None:
|
||
row_index += 1
|
||
continue
|
||
|
||
start = row_index + 1
|
||
end = start
|
||
while end <= sheet.max_row:
|
||
next_row = [
|
||
_cell_text(sheet.cell(end, col).value)
|
||
for col in range(1, min(sheet.max_column, 4) + 1)
|
||
]
|
||
if _is_summary_header_values(next_row):
|
||
break
|
||
end += 1
|
||
|
||
if end > start:
|
||
rows = _read_rows(sheet, start, end - 1)
|
||
_sort_rows(rows, sort_col, sort_by, reverse)
|
||
_write_rows(sheet, start, rows)
|
||
row_index = end
|
||
|
||
|
||
def _sort_column_from_header(header: list[str], sort_by: str) -> int | None:
|
||
if sort_by == "name":
|
||
names = ["姓名"]
|
||
elif sort_by == "sample_time":
|
||
names = ["采样时间"]
|
||
elif sort_by == "reason":
|
||
names = ["检测原因"]
|
||
else:
|
||
return None
|
||
|
||
for index, value in enumerate(header):
|
||
if any(value == name or value.startswith(name) for name in names):
|
||
return index
|
||
return None
|
||
|
||
|
||
def _read_rows(sheet, start: int, end: int) -> list[list[object]]:
|
||
return [
|
||
[sheet.cell(row_index, col).value for col in range(1, sheet.max_column + 1)]
|
||
for row_index in range(start, end + 1)
|
||
]
|
||
|
||
|
||
def _write_rows(sheet, start: int, rows: list[list[object]]) -> None:
|
||
for offset, row in enumerate(rows):
|
||
row_index = start + offset
|
||
for col, value in enumerate(row, start=1):
|
||
sheet.cell(row_index, col).value = value
|
||
|
||
|
||
def _sort_rows(rows: list[list[object]], sort_col: int, sort_by: str, reverse: bool) -> None:
|
||
rows.sort(key=lambda row: _sort_value(row[sort_col], sort_by))
|
||
if reverse:
|
||
filled = [row for row in rows if _cell_text(row[sort_col])]
|
||
empty = [row for row in rows if not _cell_text(row[sort_col])]
|
||
filled.reverse()
|
||
rows[:] = filled + empty
|
||
|
||
|
||
def _sort_value(value: object, sort_by: str) -> tuple[int, object]:
|
||
text = _cell_text(value)
|
||
if not text:
|
||
return (1, "")
|
||
if sort_by == "sample_time":
|
||
parsed = _parse_datetime(text)
|
||
if parsed is not None:
|
||
return (0, parsed.isoformat())
|
||
return (0, text)
|
||
|
||
|
||
def _parse_datetime(value: str) -> datetime | None:
|
||
for fmt in ("%Y-%m-%d %H:%M:%S", "%Y/%m/%d %H:%M:%S", "%Y-%m-%d", "%Y/%m/%d"):
|
||
try:
|
||
return datetime.strptime(value, fmt)
|
||
except ValueError:
|
||
continue
|
||
return None
|
||
|
||
|
||
def _is_summary_header_values(row: list[str]) -> bool:
|
||
return (
|
||
len(row) >= 4
|
||
and row[0] == "姓名"
|
||
and row[1] == "住院号"
|
||
and row[2] == "采样时间"
|
||
and row[3].startswith("检测原因")
|
||
)
|
||
|
||
|
||
def _find_header_index(header: list[str], name: str) -> int | None:
|
||
for index, value in enumerate(header):
|
||
if value == name:
|
||
return index
|
||
return None
|
||
|
||
|
||
def _all_standard_values_missing(values: list[str]) -> bool:
|
||
non_empty_values = [value for value in values if value]
|
||
return bool(non_empty_values) and all(value == "Not_Find" for value in non_empty_values)
|
||
|
||
|
||
def _parse_unmatched_items(values: list[str]) -> dict[str, str]:
|
||
items: dict[str, list[str]] = {}
|
||
for value in values:
|
||
item_name, item_value = _split_unmatched_value(value)
|
||
if not item_name:
|
||
continue
|
||
items.setdefault(item_name, [])
|
||
if item_value and item_value not in items[item_name]:
|
||
items[item_name].append(item_value)
|
||
return {name: ";".join(item_values) for name, item_values in items.items()}
|
||
|
||
|
||
def _split_unmatched_value(value: str) -> tuple[str, str]:
|
||
for separator in (":", ":"):
|
||
if separator in value:
|
||
name, result = value.split(separator, 1)
|
||
return name.strip(), result.strip()
|
||
return value.strip(), ""
|
||
|
||
|
||
def _is_empty_sheet(sheet) -> bool:
|
||
for row in sheet.iter_rows(values_only=True):
|
||
for value in row:
|
||
if value not in (None, ""):
|
||
return False
|
||
return True
|
||
|
||
|
||
def _summarize_workbook(path: Path, output_dir: Path, preview_rows: int) -> ExcelSummary:
|
||
sheets: list[SheetSummary] = []
|
||
workbook = load_workbook(path, read_only=True, data_only=True)
|
||
try:
|
||
for sheet in workbook.worksheets:
|
||
preview: list[list[str]] = []
|
||
for row in sheet.iter_rows(max_row=max(2, min(preview_rows, MAX_PREVIEW_ROWS)), values_only=True):
|
||
preview.append([_cell_to_text(value) for value in row])
|
||
sheets.append(
|
||
SheetSummary(
|
||
name=sheet.title,
|
||
rows=sheet.max_row or 0,
|
||
columns=sheet.max_column or 0,
|
||
preview=preview,
|
||
)
|
||
)
|
||
finally:
|
||
workbook.close()
|
||
|
||
return ExcelSummary(
|
||
filename=path.name,
|
||
relpath=path.relative_to(output_dir).as_posix(),
|
||
sheets=sheets,
|
||
)
|
||
|
||
|
||
def _cell_to_text(value: object) -> str:
|
||
if value is None:
|
||
return ""
|
||
text = str(value)
|
||
return text if len(text) <= 80 else text[:77] + "..."
|
||
|
||
|
||
def _cell_text(value: object) -> str:
|
||
if value is None:
|
||
return ""
|
||
return str(value).strip()
|