import csv import os import shutil import subprocess import sys import uuid import zipfile from dataclasses import dataclass from datetime import datetime from pathlib import Path from openpyxl import load_workbook from openpyxl.styles import PatternFill from openpyxl.styles import Font PROCESSOR_DIR = Path(__file__).resolve().parent / "processors" SUMMARY_SHEET_NAME = "\u672a\u68c0\u6d4b\u5230\u5185\u5bb9\u6c47\u603b" UNMATCHED_HEADER = "\u672a\u5339\u914d\u68c0\u6d4b\u5185\u5bb9" UNMATCHED_FILL = PatternFill(fill_type="solid", fgColor="FCE4D6") SORT_FIELDS = {"none", "name", "sample_time", "reason"} SORT_ORDERS = {"asc", "desc"} class ProcessingError(Exception): pass @dataclass class SheetSummary: name: str rows: int columns: int preview: list[list[str]] @dataclass class ExcelSummary: filename: str relpath: str sheets: list[SheetSummary] @dataclass class ProcessingResult: job_id: str mode: str output_dir: Path zip_path: Path files: list[ExcelSummary] def run_processing( zip_path: Path, job_dir: Path, mode: str, data_type: str, result_name: str, show_not_match: bool, show_all_infos: bool, preview_rows: int = 20, include_basic_sheets: bool = True, include_unmatched_items: bool = True, include_summary_sheet: bool = True, sort_by: str = "sample_time", sort_order: str = "asc", ) -> ProcessingResult: if mode not in {"auto", "v1", "v2"}: raise ProcessingError("处理模式不正确。") if data_type not in {"auto", "pat_no", "zhuyuanhao"}: raise ProcessingError("患者编号类型不正确。") extract_dir = job_dir / "input" output_dir = job_dir / "output" extract_dir.mkdir(parents=True, exist_ok=True) output_dir.mkdir(parents=True, exist_ok=True) _safe_extract(zip_path, extract_dir) data_dir = _find_data_root(extract_dir) selected_mode = _detect_mode(data_dir) if mode == "auto" else mode selected_data_type = _detect_data_type(data_dir, selected_mode) if data_type == "auto" else data_type clean_name = _clean_result_name(result_name) if selected_mode == "v1": result_path = output_dir / f"{clean_name}.xlsx" cmd = [ sys.executable, str(PROCESSOR_DIR / "V1-ALL_convert_Lab_Test_data.py"), str(data_dir), str(result_path), str(show_not_match), str(show_all_infos), selected_data_type, ] elif selected_mode == "v2": cmd = [ sys.executable, str(PROCESSOR_DIR / "V2-Every_Pat_File_convert_Lab_Test_data.py"), "--file_dir", str(data_dir), "--result_save_file_name", clean_name, "--show_not_match", str(show_not_match), "--show_all_infos", str(show_all_infos), "--data_type", selected_data_type, ] else: raise ProcessingError("无法识别数据目录结构,请手动选择 V1 或 V2。") env = os.environ.copy() env["PYTHONUTF8"] = "1" env["PYTHONIOENCODING"] = "utf-8" completed = subprocess.run( cmd, cwd=PROCESSOR_DIR, env=env, text=True, encoding="utf-8", errors="replace", stdout=subprocess.PIPE, stderr=subprocess.STDOUT, timeout=60 * 30, ) (job_dir / "process.log").write_text( "mode=" + selected_mode + "\n" + "data_type=" + selected_data_type + "\n\n" + completed.stdout, encoding="utf-8", ) if completed.returncode != 0: raise ProcessingError(f"处理脚本退出码 {completed.returncode}。\n{completed.stdout[-4000:]}") if selected_mode == "v2": _collect_v2_outputs(data_dir, output_dir) xlsx_files = sorted(output_dir.rglob("*.xlsx")) if not xlsx_files: raise ProcessingError("处理完成但没有生成 Excel 文件,请检查数据结构。") for xlsx_file in xlsx_files: _remove_default_empty_sheet(xlsx_file) _postprocess_workbook( xlsx_file, include_basic_sheets=include_basic_sheets, include_unmatched_items=include_unmatched_items, include_summary_sheet=include_summary_sheet, sort_by=sort_by, sort_order=sort_order, ) result_zip = job_dir / "result.zip" _create_result_zip(output_dir, result_zip) return ProcessingResult( job_id=job_dir.name, mode=selected_mode, output_dir=output_dir, zip_path=result_zip, files=[_summarize_workbook(path, output_dir, preview_rows) for path in xlsx_files], ) def create_result_zip( job_dir: Path, include_basic_sheets: bool = True, include_unmatched_items: bool = True, include_summary_sheet: bool = True, sort_by: str = "sample_time", sort_order: str = "asc", ) -> Path: output_dir = job_dir / "output" result_zip = job_dir / "result.zip" if not output_dir.exists(): raise ProcessingError("结果目录不存在。") export_dir = _new_export_dir(job_dir) for path in sorted(output_dir.rglob("*.xlsx")): if path.is_file(): target = export_dir / path.relative_to(output_dir) target.parent.mkdir(parents=True, exist_ok=True) shutil.copy2(path, target) _apply_export_options( target, include_basic_sheets=include_basic_sheets, include_unmatched_items=include_unmatched_items, include_summary_sheet=include_summary_sheet, sort_by=sort_by, sort_order=sort_order, ) result_zip = export_dir / "result.zip" _create_result_zip(export_dir, result_zip) return result_zip def summarize_job( job_dir: Path, preview_rows: int = 20, include_basic_sheets: bool = True, include_unmatched_items: bool = True, include_summary_sheet: bool = True, sort_by: str = "sample_time", sort_order: str = "asc", ) -> ProcessingResult: output_dir = job_dir / "output" if not output_dir.exists(): raise ProcessingError("结果目录不存在。") preview_dir = _new_export_dir(job_dir) for path in sorted(output_dir.rglob("*.xlsx")): if path.is_file(): target = preview_dir / path.relative_to(output_dir) target.parent.mkdir(parents=True, exist_ok=True) shutil.copy2(path, target) _apply_export_options( target, include_basic_sheets=include_basic_sheets, include_unmatched_items=include_unmatched_items, include_summary_sheet=include_summary_sheet, sort_by=sort_by, sort_order=sort_order, ) xlsx_files = sorted(preview_dir.rglob("*.xlsx")) if not xlsx_files: raise ProcessingError("结果文件不存在。") result_zip = job_dir / "result.zip" mode = _read_mode(job_dir) return ProcessingResult( job_id=job_dir.name, mode=mode, output_dir=preview_dir, zip_path=result_zip, files=[_summarize_workbook(path, preview_dir, preview_rows) for path in xlsx_files], ) def find_output_file( job_dir: Path, relpath: str, include_basic_sheets: bool = True, include_unmatched_items: bool = True, include_summary_sheet: bool = True, sort_by: str = "sample_time", sort_order: str = "asc", ) -> Path: output_dir = (job_dir / "output").resolve() target = (output_dir / relpath).resolve() if not str(target).startswith(str(output_dir)) or not target.is_file(): raise ProcessingError("结果文件不存在。") if target.suffix.lower() != ".xlsx": raise ProcessingError("只能导出 Excel 结果文件。") export_dir = _new_export_dir(job_dir) export_target = export_dir / target.name shutil.copy2(target, export_target) _apply_export_options( export_target, include_basic_sheets=include_basic_sheets, include_unmatched_items=include_unmatched_items, include_summary_sheet=include_summary_sheet, sort_by=sort_by, sort_order=sort_order, ) return export_target def _new_export_dir(job_dir: Path) -> Path: export_root = job_dir / "exports" export_root.mkdir(exist_ok=True) export_dir = export_root / uuid.uuid4().hex export_dir.mkdir() return export_dir def _apply_export_options( path: Path, include_basic_sheets: bool, include_unmatched_items: bool, include_summary_sheet: bool, sort_by: str, sort_order: str, ) -> None: if not include_basic_sheets and not include_summary_sheet: raise ProcessingError("\u81f3\u5c11\u9700\u8981\u5bfc\u51fa\u57fa\u672c\u5de5\u4f5c\u8868\u6216\u672a\u68c0\u6d4b\u5230\u5185\u5bb9\u6c47\u603b\u8868\u3002") workbook = load_workbook(path) try: if not include_unmatched_items: _remove_unmatched_columns(workbook) if not include_summary_sheet and SUMMARY_SHEET_NAME in workbook.sheetnames: workbook.remove(workbook[SUMMARY_SHEET_NAME]) if not include_basic_sheets: for sheet in list(workbook.worksheets): if sheet.title != SUMMARY_SHEET_NAME: workbook.remove(sheet) if not workbook.worksheets: workbook.create_sheet(SUMMARY_SHEET_NAME) _sort_workbook(workbook, sort_by, sort_order) workbook.save(path) finally: workbook.close() def _read_mode(job_dir: Path) -> str: log_path = job_dir / "process.log" if not log_path.exists(): return "unknown" first_line = log_path.read_text(encoding="utf-8", errors="replace").splitlines()[0:1] if first_line and first_line[0].startswith("mode="): return first_line[0].split("=", 1)[1] return "unknown" def _safe_extract(zip_path: Path, target_dir: Path) -> None: try: with zipfile.ZipFile(zip_path) as zf: for member in zf.infolist(): destination = (target_dir / member.filename).resolve() if not str(destination).startswith(str(target_dir.resolve())): raise ProcessingError("zip 中包含不安全路径。") zf.extractall(target_dir) except zipfile.BadZipFile as exc: raise ProcessingError("zip 文件无法解压。") from exc def _find_data_root(extract_dir: Path) -> Path: candidates = [extract_dir] children = [p for p in extract_dir.iterdir() if p.is_dir()] if len(children) == 1 and not any(p.is_file() for p in extract_dir.iterdir()): candidates.insert(0, children[0]) for candidate in candidates: if (candidate / "Patients_info.csv").exists(): return candidate for path in extract_dir.rglob("Patients_info.csv"): return path.parent raise ProcessingError("未找到 Patients_info.csv。") def _detect_mode(data_dir: Path) -> str: if (data_dir / "Tests_List").is_dir() and (data_dir / "Tests_Detail_List").is_dir(): return "v1" patient_dirs = [p for p in data_dir.iterdir() if p.is_dir()] for patient_dir in patient_dirs: names = {p.name for p in patient_dir.iterdir()} has_summary = any(name.endswith("_检测汇总.csv") for name in names) has_detail_dir = any(name.endswith("_具体检测") and (patient_dir / name).is_dir() for name in names) if has_summary and has_detail_dir: return "v2" raise ProcessingError("无法自动识别 V1/V2 数据结构。") def _detect_data_type(data_dir: Path, selected_mode: str) -> str: raw_ids = _read_patient_ids(data_dir / "Patients_info.csv") if not raw_ids: return "pat_no" raw_id_set = set(raw_ids) padded_ids = {_pad_patient_id(value) for value in raw_ids} if selected_mode == "v1": evidence_names = { path.stem for path in (data_dir / "Tests_List").glob("*.csv") if path.is_file() } elif selected_mode == "v2": evidence_names = {path.name for path in data_dir.iterdir() if path.is_dir()} else: evidence_names = set() if not evidence_names: return "pat_no" raw_score = len(evidence_names & raw_id_set) padded_score = len(evidence_names & padded_ids) if raw_score > padded_score: return "zhuyuanhao" return "pat_no" def _read_patient_ids(patients_info_path: Path) -> list[str]: if not patients_info_path.exists(): return [] try: return _read_patient_ids_with_encoding(patients_info_path, "utf-8-sig") except UnicodeDecodeError: return _read_patient_ids_with_encoding(patients_info_path, "gb18030") def _read_patient_ids_with_encoding(patients_info_path: Path, encoding: str) -> list[str]: with patients_info_path.open("r", encoding=encoding, newline="") as file: return [ str(row.get("pat_no", "")).strip() for row in csv.DictReader(file) if str(row.get("pat_no", "")).strip() ] def _pad_patient_id(value: str) -> str: try: return f"{int(value):010}" except ValueError: return value def _clean_result_name(result_name: str) -> str: name = (result_name or "Result").strip() if name.lower().endswith(".xlsx"): name = name[:-5] forbidden = '<>:"/\\|?*' name = "".join("_" if ch in forbidden else ch for ch in name).strip(" .") return name or "Result" def _collect_v2_outputs(data_dir: Path, output_dir: Path) -> None: v2_dir = output_dir / "V2患者结果" v2_dir.mkdir(exist_ok=True) for path in data_dir.rglob("*.xlsx"): if path.is_file(): target = v2_dir / path.name if target.exists(): target = v2_dir / f"{path.parent.name}_{path.name}" shutil.copy2(path, target) def _create_result_zip(output_dir: Path, result_zip: Path) -> None: with zipfile.ZipFile(result_zip, "w", compression=zipfile.ZIP_DEFLATED) as zf: for path in sorted(output_dir.rglob("*.xlsx")): if path.is_file(): zf.write(path, path.relative_to(output_dir)) def _remove_default_empty_sheet(path: Path) -> None: workbook = load_workbook(path) try: if "Sheet" in workbook.sheetnames and len(workbook.sheetnames) > 1: sheet = workbook["Sheet"] if _is_empty_sheet(sheet): workbook.remove(sheet) workbook.save(path) finally: workbook.close() def _postprocess_workbook( path: Path, include_basic_sheets: bool, include_unmatched_items: bool, include_summary_sheet: bool, sort_by: str, sort_order: str, ) -> None: workbook = load_workbook(path) try: _normalize_unmatched_columns(workbook) summary_records = _collect_summary_records(workbook) _remove_not_found_rows(workbook) _remove_empty_unmatched_columns(workbook) if not include_unmatched_items: _remove_unmatched_columns(workbook) if include_summary_sheet: _replace_summary_sheet(workbook, summary_records) if not include_basic_sheets: for sheet in list(workbook.worksheets): if sheet.title != SUMMARY_SHEET_NAME: workbook.remove(sheet) if not workbook.worksheets: workbook.create_sheet(SUMMARY_SHEET_NAME) _sort_workbook(workbook, sort_by, sort_order) workbook.save(path) finally: workbook.close() def _normalize_unmatched_columns(workbook) -> None: for sheet in workbook.worksheets: if sheet.title == SUMMARY_SHEET_NAME or sheet.max_row < 2: continue header = [_cell_text(sheet.cell(1, col).value) for col in range(1, sheet.max_column + 1)] unmatched_index = _find_header_index(header, UNMATCHED_HEADER) if unmatched_index is None: continue marker_col = unmatched_index + 1 parsed_rows: list[dict[str, str]] = [] item_names: list[str] = [] for row_index in range(2, sheet.max_row + 1): raw_values = [ _cell_text(sheet.cell(row_index, col).value) for col in range(marker_col, sheet.max_column + 1) ] parsed = _parse_unmatched_items([value for value in raw_values if value]) parsed_rows.append(parsed) for item_name in parsed: if item_name not in item_names: item_names.append(item_name) if not item_names: if sheet.max_column >= marker_col: sheet.delete_cols(marker_col, sheet.max_column - marker_col + 1) continue if sheet.max_column >= marker_col: sheet.delete_cols(marker_col, sheet.max_column - marker_col + 1) sheet.cell(1, marker_col).value = UNMATCHED_HEADER sheet.cell(1, marker_col).font = Font(bold=True) sheet.cell(1, marker_col).fill = UNMATCHED_FILL for offset, item_name in enumerate(item_names, start=1): cell = sheet.cell(1, marker_col + offset) cell.value = item_name cell.font = Font(bold=True) cell.fill = UNMATCHED_FILL for row_index, parsed in enumerate(parsed_rows, start=2): sheet.cell(row_index, marker_col).value = "" sheet.cell(row_index, marker_col).fill = UNMATCHED_FILL for offset, item_name in enumerate(item_names, start=1): cell = sheet.cell(row_index, marker_col + offset) cell.value = parsed.get(item_name, "") cell.fill = UNMATCHED_FILL def _remove_not_found_rows(workbook) -> None: for sheet in workbook.worksheets: if sheet.title == SUMMARY_SHEET_NAME or sheet.max_row < 2: continue header = [_cell_text(sheet.cell(1, col).value) for col in range(1, sheet.max_column + 1)] unmatched_index = _find_header_index(header, UNMATCHED_HEADER) standard_end = unmatched_index if unmatched_index is not None else sheet.max_column for row_index in range(sheet.max_row, 1, -1): standard_values = [ _cell_text(sheet.cell(row_index, col).value) for col in range(5, standard_end + 1) ] if _all_standard_values_missing(standard_values): sheet.delete_rows(row_index, 1) def _collect_summary_records(workbook) -> list[dict[str, object]]: records: list[dict[str, object]] = [] for sheet in workbook.worksheets: if sheet.title == SUMMARY_SHEET_NAME or sheet.max_row < 2: continue header = [_cell_text(sheet.cell(1, col).value) for col in range(1, sheet.max_column + 1)] unmatched_col = _find_header_index(header, UNMATCHED_HEADER) standard_end = unmatched_col if unmatched_col is not None else sheet.max_column for row_index in range(2, sheet.max_row + 1): base_values = [_cell_text(sheet.cell(row_index, col).value) for col in range(1, 5)] if not any(base_values): continue standard_values = [ _cell_text(sheet.cell(row_index, col).value) for col in range(5, standard_end + 1) ] if not _all_standard_values_missing(standard_values): continue item_values = _summary_item_values(sheet, row_index, header, unmatched_col, standard_end) if not item_values: continue records.append( { "name": base_values[0], "patient_id": base_values[1], "sample_time": base_values[2], "reason": base_values[3], "sheet": sheet.title, "items": item_values, } ) return records def _summary_item_values( sheet, row_index: int, header: list[str], unmatched_col: int | None, standard_end: int, ) -> dict[str, str]: item_values: dict[str, str] = {} if unmatched_col is not None: for col in range(unmatched_col + 2, sheet.max_column + 1): item_name = _cell_text(sheet.cell(1, col).value) item_value = _cell_text(sheet.cell(row_index, col).value) if item_name and item_value: item_values[item_name] = item_value if item_values: return item_values for col in range(5, standard_end + 1): item_name = header[col - 1] if col - 1 < len(header) else "" item_value = _cell_text(sheet.cell(row_index, col).value) if item_name and item_value: item_values[item_name] = item_value return item_values def _replace_summary_sheet(workbook, records: list[dict[str, object]]) -> None: if SUMMARY_SHEET_NAME in workbook.sheetnames: workbook.remove(workbook[SUMMARY_SHEET_NAME]) summary = workbook.create_sheet(SUMMARY_SHEET_NAME, 0) if not records: summary.append(["\u59d3\u540d", "\u4f4f\u9662\u53f7", "\u91c7\u6837\u65f6\u95f4", "\u68c0\u6d4b\u539f\u56e0"]) for cell in summary[summary.max_row]: cell.font = Font(bold=True) return records = sorted(records, key=lambda item: (_cell_text(item["reason"]), _cell_text(item["sample_time"]))) reasons = [] for record in records: reason = _cell_text(record["reason"]) if reason not in reasons: reasons.append(reason) for reason in reasons: group = [record for record in records if _cell_text(record["reason"]) == reason] item_names: list[str] = [] for record in group: for item_name in record["items"]: if item_name not in item_names: item_names.append(item_name) reason_label = f"\u68c0\u6d4b\u539f\u56e0\uff08\u4e0b\u65b9\u90fd\u662f{reason}\u539f\u56e0\uff09" if reason else "\u68c0\u6d4b\u539f\u56e0" header = ["\u59d3\u540d", "\u4f4f\u9662\u53f7", "\u91c7\u6837\u65f6\u95f4", reason_label] + item_names summary.append(header) for cell in summary[summary.max_row]: cell.font = Font(bold=True) for record in group: item_values = record["items"] summary.append( [ record["name"], record["patient_id"], record["sample_time"], record["reason"], ] + [item_values.get(item_name, "") for item_name in item_names] ) for column_cells in summary.columns: max_length = max(len(_cell_text(cell.value)) for cell in column_cells) summary.column_dimensions[column_cells[0].column_letter].width = min(max(max_length + 2, 12), 36) def _remove_unmatched_columns(workbook) -> None: for sheet in workbook.worksheets: if sheet.title == SUMMARY_SHEET_NAME or sheet.max_row < 1: continue header = [_cell_text(sheet.cell(1, col).value) for col in range(1, sheet.max_column + 1)] unmatched_col = _find_header_index(header, UNMATCHED_HEADER) if unmatched_col is not None: sheet.delete_cols(unmatched_col + 1, sheet.max_column - unmatched_col) def _remove_empty_unmatched_columns(workbook) -> None: for sheet in workbook.worksheets: if sheet.title == SUMMARY_SHEET_NAME or sheet.max_row < 1: continue header = [_cell_text(sheet.cell(1, col).value) for col in range(1, sheet.max_column + 1)] unmatched_col = _find_header_index(header, UNMATCHED_HEADER) if unmatched_col is None: continue first_col = unmatched_col + 1 item_cols = [ col for col in range(first_col + 1, sheet.max_column + 1) if _cell_text(sheet.cell(1, col).value) ] used_item_cols = [ col for col in item_cols if any(_cell_text(sheet.cell(row, col).value) for row in range(2, sheet.max_row + 1)) ] if not used_item_cols: sheet.delete_cols(first_col, sheet.max_column - unmatched_col) continue for col in reversed(item_cols): if col not in used_item_cols: sheet.delete_cols(col, 1) def _sort_workbook(workbook, sort_by: str, sort_order: str) -> None: sort_by = sort_by if sort_by in SORT_FIELDS else "sample_time" sort_order = sort_order if sort_order in SORT_ORDERS else "asc" if sort_by == "none": return reverse = sort_order == "desc" for sheet in workbook.worksheets: if sheet.max_row < 3: continue if sheet.title == SUMMARY_SHEET_NAME: _sort_summary_sheet(sheet, sort_by, reverse) else: _sort_regular_sheet(sheet, sort_by, reverse) def _sort_regular_sheet(sheet, sort_by: str, reverse: bool) -> None: header = [_cell_text(sheet.cell(1, col).value) for col in range(1, sheet.max_column + 1)] sort_col = _sort_column_from_header(header, sort_by) if sort_col is None: return rows = _read_rows(sheet, 2, sheet.max_row) _sort_rows(rows, sort_col, sort_by, reverse) _write_rows(sheet, 2, rows) def _sort_summary_sheet(sheet, sort_by: str, reverse: bool) -> None: row_index = 1 while row_index <= sheet.max_row: row = [_cell_text(sheet.cell(row_index, col).value) for col in range(1, min(sheet.max_column, 4) + 1)] if not _is_summary_header_values(row): row_index += 1 continue header = [_cell_text(sheet.cell(row_index, col).value) for col in range(1, sheet.max_column + 1)] sort_col = _sort_column_from_header(header, sort_by) if sort_col is None: row_index += 1 continue start = row_index + 1 end = start while end <= sheet.max_row: next_row = [ _cell_text(sheet.cell(end, col).value) for col in range(1, min(sheet.max_column, 4) + 1) ] if _is_summary_header_values(next_row): break end += 1 if end > start: rows = _read_rows(sheet, start, end - 1) _sort_rows(rows, sort_col, sort_by, reverse) _write_rows(sheet, start, rows) row_index = end def _sort_column_from_header(header: list[str], sort_by: str) -> int | None: if sort_by == "name": names = ["姓名"] elif sort_by == "sample_time": names = ["采样时间"] elif sort_by == "reason": names = ["检测原因"] else: return None for index, value in enumerate(header): if any(value == name or value.startswith(name) for name in names): return index return None def _read_rows(sheet, start: int, end: int) -> list[list[object]]: return [ [sheet.cell(row_index, col).value for col in range(1, sheet.max_column + 1)] for row_index in range(start, end + 1) ] def _write_rows(sheet, start: int, rows: list[list[object]]) -> None: for offset, row in enumerate(rows): row_index = start + offset for col, value in enumerate(row, start=1): sheet.cell(row_index, col).value = value def _sort_rows(rows: list[list[object]], sort_col: int, sort_by: str, reverse: bool) -> None: rows.sort(key=lambda row: _sort_value(row[sort_col], sort_by)) if reverse: filled = [row for row in rows if _cell_text(row[sort_col])] empty = [row for row in rows if not _cell_text(row[sort_col])] filled.reverse() rows[:] = filled + empty def _sort_value(value: object, sort_by: str) -> tuple[int, object]: text = _cell_text(value) if not text: return (1, "") if sort_by == "sample_time": parsed = _parse_datetime(text) if parsed is not None: return (0, parsed.isoformat()) return (0, text) def _parse_datetime(value: str) -> datetime | None: for fmt in ("%Y-%m-%d %H:%M:%S", "%Y/%m/%d %H:%M:%S", "%Y-%m-%d", "%Y/%m/%d"): try: return datetime.strptime(value, fmt) except ValueError: continue return None def _is_summary_header_values(row: list[str]) -> bool: return ( len(row) >= 4 and row[0] == "姓名" and row[1] == "住院号" and row[2] == "采样时间" and row[3].startswith("检测原因") ) def _find_header_index(header: list[str], name: str) -> int | None: for index, value in enumerate(header): if value == name: return index return None def _all_standard_values_missing(values: list[str]) -> bool: non_empty_values = [value for value in values if value] return bool(non_empty_values) and all(value == "Not_Find" for value in non_empty_values) def _parse_unmatched_items(values: list[str]) -> dict[str, str]: items: dict[str, list[str]] = {} for value in values: item_name, item_value = _split_unmatched_value(value) if not item_name: continue items.setdefault(item_name, []) if item_value and item_value not in items[item_name]: items[item_name].append(item_value) return {name: ";".join(item_values) for name, item_values in items.items()} def _split_unmatched_value(value: str) -> tuple[str, str]: for separator in (":", ":"): if separator in value: name, result = value.split(separator, 1) return name.strip(), result.strip() return value.strip(), "" def _is_empty_sheet(sheet) -> bool: for row in sheet.iter_rows(values_only=True): for value in row: if value not in (None, ""): return False return True def _summarize_workbook(path: Path, output_dir: Path, preview_rows: int) -> ExcelSummary: sheets: list[SheetSummary] = [] workbook = load_workbook(path, read_only=True, data_only=True) try: for sheet in workbook.worksheets: preview: list[list[str]] = [] for row in sheet.iter_rows(max_row=max(2, min(preview_rows, 200)), values_only=True): preview.append([_cell_to_text(value) for value in row]) sheets.append( SheetSummary( name=sheet.title, rows=sheet.max_row or 0, columns=sheet.max_column or 0, preview=preview, ) ) finally: workbook.close() return ExcelSummary( filename=path.name, relpath=path.relative_to(output_dir).as_posix(), sheets=sheets, ) def _cell_to_text(value: object) -> str: if value is None: return "" text = str(value) return text if len(text) <= 80 else text[:77] + "..." def _cell_text(value: object) -> str: if value is None: return "" return str(value).strip()