Files
HIS_Sur_Data_Deal/app/processor.py

930 lines
31 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import csv
import os
import shutil
import subprocess
import sys
import uuid
import zipfile
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from openpyxl import load_workbook
from openpyxl.styles import PatternFill
from openpyxl.styles import Font
PROCESSOR_DIR = Path(__file__).resolve().parent / "processors"
SUMMARY_SHEET_NAME = "\u672a\u68c0\u6d4b\u5230\u5185\u5bb9\u6c47\u603b"
UNASSIGNED_SHEET_NAME = "\u672a\u5f52\u5c5e\u68c0\u6d4b\u5185\u5bb9"
UNMATCHED_HEADER = "\u672a\u5339\u914d\u68c0\u6d4b\u5185\u5bb9"
UNMATCHED_FILL = PatternFill(fill_type="solid", fgColor="FCE4D6")
MAX_PREVIEW_ROWS = 10000
SORT_FIELDS = {"none", "name", "sample_time", "reason"}
SORT_ORDERS = {"asc", "desc"}
class ProcessingError(Exception):
pass
@dataclass
class SheetSummary:
name: str
rows: int
columns: int
preview: list[list[str]]
@dataclass
class ExcelSummary:
filename: str
relpath: str
sheets: list[SheetSummary]
@dataclass
class ProcessingResult:
job_id: str
mode: str
output_dir: Path
zip_path: Path
files: list[ExcelSummary]
def run_processing(
zip_path: Path,
job_dir: Path,
mode: str,
data_type: str,
result_name: str,
show_not_match: bool,
show_all_infos: bool,
preview_rows: int = 20,
include_basic_sheets: bool = True,
include_unmatched_items: bool = True,
include_summary_sheet: bool = True,
sort_by: str = "sample_time",
sort_order: str = "asc",
) -> ProcessingResult:
if mode not in {"auto", "v1", "v2"}:
raise ProcessingError("处理模式不正确。")
if data_type not in {"auto", "pat_no", "zhuyuanhao"}:
raise ProcessingError("患者编号类型不正确。")
extract_dir = job_dir / "input"
output_dir = job_dir / "output"
extract_dir.mkdir(parents=True, exist_ok=True)
output_dir.mkdir(parents=True, exist_ok=True)
_safe_extract(zip_path, extract_dir)
data_dir = _find_data_root(extract_dir)
selected_mode = _detect_mode(data_dir) if mode == "auto" else mode
selected_data_type = _detect_data_type(data_dir, selected_mode) if data_type == "auto" else data_type
clean_name = _clean_result_name(result_name)
if selected_mode == "v1":
result_path = output_dir / f"{clean_name}.xlsx"
cmd = [
sys.executable,
str(PROCESSOR_DIR / "V1-ALL_convert_Lab_Test_data.py"),
str(data_dir),
str(result_path),
str(show_not_match),
str(show_all_infos),
selected_data_type,
]
elif selected_mode == "v2":
cmd = [
sys.executable,
str(PROCESSOR_DIR / "V2-Every_Pat_File_convert_Lab_Test_data.py"),
"--file_dir",
str(data_dir),
"--result_save_file_name",
clean_name,
"--show_not_match",
str(show_not_match),
"--show_all_infos",
str(show_all_infos),
"--data_type",
selected_data_type,
]
else:
raise ProcessingError("无法识别数据目录结构,请手动选择 V1 或 V2。")
env = os.environ.copy()
env["PYTHONUTF8"] = "1"
env["PYTHONIOENCODING"] = "utf-8"
completed = subprocess.run(
cmd,
cwd=PROCESSOR_DIR,
env=env,
text=True,
encoding="utf-8",
errors="replace",
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
timeout=60 * 30,
)
(job_dir / "process.log").write_text(
"mode=" + selected_mode + "\n" + "data_type=" + selected_data_type + "\n\n" + completed.stdout,
encoding="utf-8",
)
if completed.returncode != 0:
raise ProcessingError(f"处理脚本退出码 {completed.returncode}\n{completed.stdout[-4000:]}")
if selected_mode == "v2":
_collect_v2_outputs(data_dir, output_dir)
xlsx_files = sorted(output_dir.rglob("*.xlsx"))
if not xlsx_files:
raise ProcessingError("处理完成但没有生成 Excel 文件,请检查数据结构。")
for xlsx_file in xlsx_files:
_remove_default_empty_sheet(xlsx_file)
_postprocess_workbook(
xlsx_file,
include_basic_sheets=include_basic_sheets,
include_unmatched_items=include_unmatched_items,
include_summary_sheet=include_summary_sheet,
sort_by=sort_by,
sort_order=sort_order,
)
result_zip = job_dir / "result.zip"
_create_result_zip(output_dir, result_zip)
return ProcessingResult(
job_id=job_dir.name,
mode=selected_mode,
output_dir=output_dir,
zip_path=result_zip,
files=[_summarize_workbook(path, output_dir, preview_rows) for path in xlsx_files],
)
def create_result_zip(
job_dir: Path,
include_basic_sheets: bool = True,
include_unmatched_items: bool = True,
include_summary_sheet: bool = True,
sort_by: str = "sample_time",
sort_order: str = "asc",
) -> Path:
output_dir = job_dir / "output"
result_zip = job_dir / "result.zip"
if not output_dir.exists():
raise ProcessingError("结果目录不存在。")
export_dir = _new_export_dir(job_dir)
for path in sorted(output_dir.rglob("*.xlsx")):
if path.is_file():
target = export_dir / path.relative_to(output_dir)
target.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(path, target)
_apply_export_options(
target,
include_basic_sheets=include_basic_sheets,
include_unmatched_items=include_unmatched_items,
include_summary_sheet=include_summary_sheet,
sort_by=sort_by,
sort_order=sort_order,
)
result_zip = export_dir / "result.zip"
_create_result_zip(export_dir, result_zip)
return result_zip
def summarize_job(
job_dir: Path,
preview_rows: int = 20,
include_basic_sheets: bool = True,
include_unmatched_items: bool = True,
include_summary_sheet: bool = True,
sort_by: str = "sample_time",
sort_order: str = "asc",
) -> ProcessingResult:
output_dir = job_dir / "output"
if not output_dir.exists():
raise ProcessingError("结果目录不存在。")
preview_dir = _new_export_dir(job_dir)
for path in sorted(output_dir.rglob("*.xlsx")):
if path.is_file():
target = preview_dir / path.relative_to(output_dir)
target.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(path, target)
_apply_export_options(
target,
include_basic_sheets=include_basic_sheets,
include_unmatched_items=include_unmatched_items,
include_summary_sheet=include_summary_sheet,
sort_by=sort_by,
sort_order=sort_order,
)
xlsx_files = sorted(preview_dir.rglob("*.xlsx"))
if not xlsx_files:
raise ProcessingError("结果文件不存在。")
result_zip = job_dir / "result.zip"
mode = _read_mode(job_dir)
return ProcessingResult(
job_id=job_dir.name,
mode=mode,
output_dir=preview_dir,
zip_path=result_zip,
files=[_summarize_workbook(path, preview_dir, preview_rows) for path in xlsx_files],
)
def find_output_file(
job_dir: Path,
relpath: str,
include_basic_sheets: bool = True,
include_unmatched_items: bool = True,
include_summary_sheet: bool = True,
sort_by: str = "sample_time",
sort_order: str = "asc",
) -> Path:
output_dir = (job_dir / "output").resolve()
target = (output_dir / relpath).resolve()
if not str(target).startswith(str(output_dir)) or not target.is_file():
raise ProcessingError("结果文件不存在。")
if target.suffix.lower() != ".xlsx":
raise ProcessingError("只能导出 Excel 结果文件。")
export_dir = _new_export_dir(job_dir)
export_target = export_dir / target.name
shutil.copy2(target, export_target)
_apply_export_options(
export_target,
include_basic_sheets=include_basic_sheets,
include_unmatched_items=include_unmatched_items,
include_summary_sheet=include_summary_sheet,
sort_by=sort_by,
sort_order=sort_order,
)
return export_target
def _new_export_dir(job_dir: Path) -> Path:
export_root = job_dir / "exports"
export_root.mkdir(exist_ok=True)
export_dir = export_root / uuid.uuid4().hex
export_dir.mkdir()
return export_dir
def _apply_export_options(
path: Path,
include_basic_sheets: bool,
include_unmatched_items: bool,
include_summary_sheet: bool,
sort_by: str,
sort_order: str,
) -> None:
if not include_basic_sheets and not include_summary_sheet:
raise ProcessingError("\u81f3\u5c11\u9700\u8981\u5bfc\u51fa\u57fa\u672c\u5de5\u4f5c\u8868\u6216\u672a\u68c0\u6d4b\u5230\u5185\u5bb9\u6c47\u603b\u8868\u3002")
workbook = load_workbook(path)
try:
if not include_unmatched_items:
_remove_unmatched_columns(workbook)
if not include_summary_sheet and SUMMARY_SHEET_NAME in workbook.sheetnames:
workbook.remove(workbook[SUMMARY_SHEET_NAME])
if not include_basic_sheets:
for sheet in list(workbook.worksheets):
if sheet.title != SUMMARY_SHEET_NAME:
workbook.remove(sheet)
if not workbook.worksheets:
workbook.create_sheet(SUMMARY_SHEET_NAME)
_sort_workbook(workbook, sort_by, sort_order)
workbook.save(path)
finally:
workbook.close()
def _read_mode(job_dir: Path) -> str:
log_path = job_dir / "process.log"
if not log_path.exists():
return "unknown"
first_line = log_path.read_text(encoding="utf-8", errors="replace").splitlines()[0:1]
if first_line and first_line[0].startswith("mode="):
return first_line[0].split("=", 1)[1]
return "unknown"
def _safe_extract(zip_path: Path, target_dir: Path) -> None:
try:
with zipfile.ZipFile(zip_path) as zf:
for member in zf.infolist():
destination = (target_dir / member.filename).resolve()
if not str(destination).startswith(str(target_dir.resolve())):
raise ProcessingError("zip 中包含不安全路径。")
zf.extractall(target_dir)
except zipfile.BadZipFile as exc:
raise ProcessingError("zip 文件无法解压。") from exc
def _find_data_root(extract_dir: Path) -> Path:
candidates = [extract_dir]
children = [p for p in extract_dir.iterdir() if p.is_dir()]
if len(children) == 1 and not any(p.is_file() for p in extract_dir.iterdir()):
candidates.insert(0, children[0])
for candidate in candidates:
if (candidate / "Patients_info.csv").exists():
return candidate
for path in extract_dir.rglob("Patients_info.csv"):
return path.parent
raise ProcessingError("未找到 Patients_info.csv。")
def _detect_mode(data_dir: Path) -> str:
if (data_dir / "Tests_List").is_dir() and (data_dir / "Tests_Detail_List").is_dir():
return "v1"
patient_dirs = [p for p in data_dir.iterdir() if p.is_dir()]
for patient_dir in patient_dirs:
names = {p.name for p in patient_dir.iterdir()}
has_summary = any(name.endswith("_检测汇总.csv") for name in names)
has_detail_dir = any(name.endswith("_具体检测") and (patient_dir / name).is_dir() for name in names)
if has_summary and has_detail_dir:
return "v2"
raise ProcessingError("无法自动识别 V1/V2 数据结构。")
def _detect_data_type(data_dir: Path, selected_mode: str) -> str:
raw_ids = _read_patient_ids(data_dir / "Patients_info.csv")
if not raw_ids:
return "pat_no"
raw_id_set = set(raw_ids)
padded_ids = {_pad_patient_id(value) for value in raw_ids}
if selected_mode == "v1":
evidence_names = {
path.stem
for path in (data_dir / "Tests_List").glob("*.csv")
if path.is_file()
}
elif selected_mode == "v2":
evidence_names = {path.name for path in data_dir.iterdir() if path.is_dir()}
else:
evidence_names = set()
if not evidence_names:
return "pat_no"
raw_score = len(evidence_names & raw_id_set)
padded_score = len(evidence_names & padded_ids)
if raw_score > padded_score:
return "zhuyuanhao"
return "pat_no"
def _read_patient_ids(patients_info_path: Path) -> list[str]:
if not patients_info_path.exists():
return []
try:
return _read_patient_ids_with_encoding(patients_info_path, "utf-8-sig")
except UnicodeDecodeError:
return _read_patient_ids_with_encoding(patients_info_path, "gb18030")
def _read_patient_ids_with_encoding(patients_info_path: Path, encoding: str) -> list[str]:
with patients_info_path.open("r", encoding=encoding, newline="") as file:
return [
str(row.get("pat_no", "")).strip()
for row in csv.DictReader(file)
if str(row.get("pat_no", "")).strip()
]
def _pad_patient_id(value: str) -> str:
try:
return f"{int(value):010}"
except ValueError:
return value
def _clean_result_name(result_name: str) -> str:
name = (result_name or "Result").strip()
if name.lower().endswith(".xlsx"):
name = name[:-5]
forbidden = '<>:"/\\|?*'
name = "".join("_" if ch in forbidden else ch for ch in name).strip(" .")
return name or "Result"
def _collect_v2_outputs(data_dir: Path, output_dir: Path) -> None:
v2_dir = output_dir / "V2患者结果"
v2_dir.mkdir(exist_ok=True)
for path in data_dir.rglob("*.xlsx"):
if path.is_file():
target = v2_dir / path.name
if target.exists():
target = v2_dir / f"{path.parent.name}_{path.name}"
shutil.copy2(path, target)
def _create_result_zip(output_dir: Path, result_zip: Path) -> None:
with zipfile.ZipFile(result_zip, "w", compression=zipfile.ZIP_DEFLATED) as zf:
for path in sorted(output_dir.rglob("*.xlsx")):
if path.is_file():
zf.write(path, path.relative_to(output_dir))
def _remove_default_empty_sheet(path: Path) -> None:
workbook = load_workbook(path)
try:
if "Sheet" in workbook.sheetnames and len(workbook.sheetnames) > 1:
sheet = workbook["Sheet"]
if _is_empty_sheet(sheet):
workbook.remove(sheet)
workbook.save(path)
finally:
workbook.close()
def _postprocess_workbook(
path: Path,
include_basic_sheets: bool,
include_unmatched_items: bool,
include_summary_sheet: bool,
sort_by: str,
sort_order: str,
) -> None:
workbook = load_workbook(path)
try:
_normalize_unmatched_columns(workbook)
_deduplicate_regular_rows(workbook)
summary_records = _collect_summary_records(workbook)
_remove_not_found_rows(workbook)
_remove_empty_unmatched_columns(workbook)
if not include_unmatched_items:
_remove_unmatched_columns(workbook)
if include_summary_sheet:
_replace_summary_sheet(workbook, summary_records)
if not include_basic_sheets:
for sheet in list(workbook.worksheets):
if sheet.title != SUMMARY_SHEET_NAME:
workbook.remove(sheet)
if not workbook.worksheets:
workbook.create_sheet(SUMMARY_SHEET_NAME)
_sort_workbook(workbook, sort_by, sort_order)
workbook.save(path)
finally:
workbook.close()
def _normalize_unmatched_columns(workbook) -> None:
for sheet in workbook.worksheets:
if sheet.title == SUMMARY_SHEET_NAME or sheet.max_row < 2:
continue
header = [_cell_text(sheet.cell(1, col).value) for col in range(1, sheet.max_column + 1)]
unmatched_index = _find_header_index(header, UNMATCHED_HEADER)
if unmatched_index is None:
continue
marker_col = unmatched_index + 1
parsed_rows: list[dict[str, str]] = []
item_names: list[str] = []
for row_index in range(2, sheet.max_row + 1):
raw_values = [
_cell_text(sheet.cell(row_index, col).value)
for col in range(marker_col, sheet.max_column + 1)
]
parsed = _parse_unmatched_items([value for value in raw_values if value])
parsed_rows.append(parsed)
for item_name in parsed:
if item_name not in item_names:
item_names.append(item_name)
if not item_names:
if sheet.max_column >= marker_col:
sheet.delete_cols(marker_col, sheet.max_column - marker_col + 1)
continue
if sheet.max_column >= marker_col:
sheet.delete_cols(marker_col, sheet.max_column - marker_col + 1)
sheet.cell(1, marker_col).value = UNMATCHED_HEADER
sheet.cell(1, marker_col).font = Font(bold=True)
sheet.cell(1, marker_col).fill = UNMATCHED_FILL
for offset, item_name in enumerate(item_names, start=1):
cell = sheet.cell(1, marker_col + offset)
cell.value = item_name
cell.font = Font(bold=True)
cell.fill = UNMATCHED_FILL
for row_index, parsed in enumerate(parsed_rows, start=2):
sheet.cell(row_index, marker_col).value = ""
sheet.cell(row_index, marker_col).fill = UNMATCHED_FILL
for offset, item_name in enumerate(item_names, start=1):
cell = sheet.cell(row_index, marker_col + offset)
cell.value = parsed.get(item_name, "")
cell.fill = UNMATCHED_FILL
def _remove_not_found_rows(workbook) -> None:
for sheet in workbook.worksheets:
if sheet.title == SUMMARY_SHEET_NAME or sheet.max_row < 2:
continue
header = [_cell_text(sheet.cell(1, col).value) for col in range(1, sheet.max_column + 1)]
unmatched_index = _find_header_index(header, UNMATCHED_HEADER)
standard_end = unmatched_index if unmatched_index is not None else sheet.max_column
for row_index in range(sheet.max_row, 1, -1):
standard_values = [
_cell_text(sheet.cell(row_index, col).value)
for col in range(5, standard_end + 1)
]
if _all_standard_values_missing(standard_values):
sheet.delete_rows(row_index, 1)
def _deduplicate_regular_rows(workbook) -> None:
for sheet in workbook.worksheets:
if sheet.title == SUMMARY_SHEET_NAME or sheet.max_row < 3:
continue
seen = set()
for row_index in range(sheet.max_row, 1, -1):
values = tuple(
_cell_text(sheet.cell(row_index, col).value)
for col in range(1, sheet.max_column + 1)
)
if not any(values):
continue
if values in seen:
sheet.delete_rows(row_index, 1)
else:
seen.add(values)
def _collect_summary_records(workbook) -> list[dict[str, object]]:
records: list[dict[str, object]] = []
for sheet in workbook.worksheets:
if sheet.title != UNASSIGNED_SHEET_NAME or sheet.max_row < 2:
continue
header = [_cell_text(sheet.cell(1, col).value) for col in range(1, sheet.max_column + 1)]
unmatched_col = _find_header_index(header, UNMATCHED_HEADER)
standard_end = unmatched_col if unmatched_col is not None else sheet.max_column
for row_index in range(2, sheet.max_row + 1):
base_values = [_cell_text(sheet.cell(row_index, col).value) for col in range(1, 5)]
if not any(base_values):
continue
standard_values = [
_cell_text(sheet.cell(row_index, col).value)
for col in range(5, standard_end + 1)
]
if standard_values and not _all_standard_values_missing(standard_values):
continue
item_values = _summary_item_values(sheet, row_index, header, unmatched_col, standard_end)
if not item_values:
continue
records.append(
{
"name": base_values[0],
"patient_id": base_values[1],
"sample_time": base_values[2],
"reason": base_values[3],
"sheet": sheet.title,
"items": item_values,
}
)
return records
def _summary_item_values(
sheet,
row_index: int,
header: list[str],
unmatched_col: int | None,
standard_end: int,
) -> dict[str, str]:
item_values: dict[str, str] = {}
if unmatched_col is not None:
for col in range(unmatched_col + 2, sheet.max_column + 1):
item_name = _cell_text(sheet.cell(1, col).value)
item_value = _cell_text(sheet.cell(row_index, col).value)
if item_name and item_value:
item_values[item_name] = item_value
if item_values:
return item_values
for col in range(5, standard_end + 1):
item_name = header[col - 1] if col - 1 < len(header) else ""
item_value = _cell_text(sheet.cell(row_index, col).value)
if item_name and item_value:
item_values[item_name] = item_value
return item_values
def _replace_summary_sheet(workbook, records: list[dict[str, object]]) -> None:
if SUMMARY_SHEET_NAME in workbook.sheetnames:
workbook.remove(workbook[SUMMARY_SHEET_NAME])
summary = workbook.create_sheet(SUMMARY_SHEET_NAME, 0)
if not records:
summary.append(["\u59d3\u540d", "\u4f4f\u9662\u53f7", "\u91c7\u6837\u65f6\u95f4", "\u68c0\u6d4b\u539f\u56e0"])
for cell in summary[summary.max_row]:
cell.font = Font(bold=True)
return
records = sorted(records, key=lambda item: (_cell_text(item["reason"]), _cell_text(item["sample_time"])))
reasons = []
for record in records:
reason = _cell_text(record["reason"])
if reason not in reasons:
reasons.append(reason)
for reason in reasons:
group = [record for record in records if _cell_text(record["reason"]) == reason]
item_names: list[str] = []
for record in group:
for item_name in record["items"]:
if item_name not in item_names:
item_names.append(item_name)
reason_label = f"\u68c0\u6d4b\u539f\u56e0\uff08\u4e0b\u65b9\u90fd\u662f{reason}\u539f\u56e0\uff09" if reason else "\u68c0\u6d4b\u539f\u56e0"
header = ["\u59d3\u540d", "\u4f4f\u9662\u53f7", "\u91c7\u6837\u65f6\u95f4", reason_label] + item_names
summary.append(header)
for cell in summary[summary.max_row]:
cell.font = Font(bold=True)
for record in group:
item_values = record["items"]
summary.append(
[
record["name"],
record["patient_id"],
record["sample_time"],
record["reason"],
]
+ [item_values.get(item_name, "") for item_name in item_names]
)
for column_cells in summary.columns:
max_length = max(len(_cell_text(cell.value)) for cell in column_cells)
summary.column_dimensions[column_cells[0].column_letter].width = min(max(max_length + 2, 12), 36)
def _remove_unmatched_columns(workbook) -> None:
for sheet in workbook.worksheets:
if sheet.title == SUMMARY_SHEET_NAME or sheet.max_row < 1:
continue
header = [_cell_text(sheet.cell(1, col).value) for col in range(1, sheet.max_column + 1)]
unmatched_col = _find_header_index(header, UNMATCHED_HEADER)
if unmatched_col is not None:
sheet.delete_cols(unmatched_col + 1, sheet.max_column - unmatched_col)
def _remove_empty_unmatched_columns(workbook) -> None:
for sheet in workbook.worksheets:
if sheet.title == SUMMARY_SHEET_NAME or sheet.max_row < 1:
continue
header = [_cell_text(sheet.cell(1, col).value) for col in range(1, sheet.max_column + 1)]
unmatched_col = _find_header_index(header, UNMATCHED_HEADER)
if unmatched_col is None:
continue
first_col = unmatched_col + 1
item_cols = [
col
for col in range(first_col + 1, sheet.max_column + 1)
if _cell_text(sheet.cell(1, col).value)
]
used_item_cols = [
col
for col in item_cols
if any(_cell_text(sheet.cell(row, col).value) for row in range(2, sheet.max_row + 1))
]
if not used_item_cols:
sheet.delete_cols(first_col, sheet.max_column - unmatched_col)
continue
for col in reversed(item_cols):
if col not in used_item_cols:
sheet.delete_cols(col, 1)
def _sort_workbook(workbook, sort_by: str, sort_order: str) -> None:
sort_by = sort_by if sort_by in SORT_FIELDS else "sample_time"
sort_order = sort_order if sort_order in SORT_ORDERS else "asc"
if sort_by == "none":
return
reverse = sort_order == "desc"
for sheet in workbook.worksheets:
if sheet.max_row < 3:
continue
if sheet.title == SUMMARY_SHEET_NAME:
_sort_summary_sheet(sheet, sort_by, reverse)
else:
_sort_regular_sheet(sheet, sort_by, reverse)
def _sort_regular_sheet(sheet, sort_by: str, reverse: bool) -> None:
header = [_cell_text(sheet.cell(1, col).value) for col in range(1, sheet.max_column + 1)]
sort_col = _sort_column_from_header(header, sort_by)
if sort_col is None:
return
rows = _read_rows(sheet, 2, sheet.max_row)
_sort_rows(rows, sort_col, sort_by, reverse)
_write_rows(sheet, 2, rows)
def _sort_summary_sheet(sheet, sort_by: str, reverse: bool) -> None:
row_index = 1
while row_index <= sheet.max_row:
row = [_cell_text(sheet.cell(row_index, col).value) for col in range(1, min(sheet.max_column, 4) + 1)]
if not _is_summary_header_values(row):
row_index += 1
continue
header = [_cell_text(sheet.cell(row_index, col).value) for col in range(1, sheet.max_column + 1)]
sort_col = _sort_column_from_header(header, sort_by)
if sort_col is None:
row_index += 1
continue
start = row_index + 1
end = start
while end <= sheet.max_row:
next_row = [
_cell_text(sheet.cell(end, col).value)
for col in range(1, min(sheet.max_column, 4) + 1)
]
if _is_summary_header_values(next_row):
break
end += 1
if end > start:
rows = _read_rows(sheet, start, end - 1)
_sort_rows(rows, sort_col, sort_by, reverse)
_write_rows(sheet, start, rows)
row_index = end
def _sort_column_from_header(header: list[str], sort_by: str) -> int | None:
if sort_by == "name":
names = ["姓名"]
elif sort_by == "sample_time":
names = ["采样时间"]
elif sort_by == "reason":
names = ["检测原因"]
else:
return None
for index, value in enumerate(header):
if any(value == name or value.startswith(name) for name in names):
return index
return None
def _read_rows(sheet, start: int, end: int) -> list[list[object]]:
return [
[sheet.cell(row_index, col).value for col in range(1, sheet.max_column + 1)]
for row_index in range(start, end + 1)
]
def _write_rows(sheet, start: int, rows: list[list[object]]) -> None:
for offset, row in enumerate(rows):
row_index = start + offset
for col, value in enumerate(row, start=1):
sheet.cell(row_index, col).value = value
def _sort_rows(rows: list[list[object]], sort_col: int, sort_by: str, reverse: bool) -> None:
rows.sort(key=lambda row: _sort_value(row[sort_col], sort_by))
if reverse:
filled = [row for row in rows if _cell_text(row[sort_col])]
empty = [row for row in rows if not _cell_text(row[sort_col])]
filled.reverse()
rows[:] = filled + empty
def _sort_value(value: object, sort_by: str) -> tuple[int, object]:
text = _cell_text(value)
if not text:
return (1, "")
if sort_by == "sample_time":
parsed = _parse_datetime(text)
if parsed is not None:
return (0, parsed.isoformat())
return (0, text)
def _parse_datetime(value: str) -> datetime | None:
for fmt in ("%Y-%m-%d %H:%M:%S", "%Y/%m/%d %H:%M:%S", "%Y-%m-%d", "%Y/%m/%d"):
try:
return datetime.strptime(value, fmt)
except ValueError:
continue
return None
def _is_summary_header_values(row: list[str]) -> bool:
return (
len(row) >= 4
and row[0] == "姓名"
and row[1] == "住院号"
and row[2] == "采样时间"
and row[3].startswith("检测原因")
)
def _find_header_index(header: list[str], name: str) -> int | None:
for index, value in enumerate(header):
if value == name:
return index
return None
def _all_standard_values_missing(values: list[str]) -> bool:
non_empty_values = [value for value in values if value]
return bool(non_empty_values) and all(value == "Not_Find" for value in non_empty_values)
def _parse_unmatched_items(values: list[str]) -> dict[str, str]:
items: dict[str, list[str]] = {}
for value in values:
item_name, item_value = _split_unmatched_value(value)
if not item_name:
continue
items.setdefault(item_name, [])
if item_value and item_value not in items[item_name]:
items[item_name].append(item_value)
return {name: "".join(item_values) for name, item_values in items.items()}
def _split_unmatched_value(value: str) -> tuple[str, str]:
for separator in ("", ":"):
if separator in value:
name, result = value.split(separator, 1)
return name.strip(), result.strip()
return value.strip(), ""
def _is_empty_sheet(sheet) -> bool:
for row in sheet.iter_rows(values_only=True):
for value in row:
if value not in (None, ""):
return False
return True
def _summarize_workbook(path: Path, output_dir: Path, preview_rows: int) -> ExcelSummary:
sheets: list[SheetSummary] = []
workbook = load_workbook(path, read_only=True, data_only=True)
try:
for sheet in workbook.worksheets:
preview: list[list[str]] = []
for row in sheet.iter_rows(max_row=max(2, min(preview_rows, MAX_PREVIEW_ROWS)), values_only=True):
preview.append([_cell_to_text(value) for value in row])
sheets.append(
SheetSummary(
name=sheet.title,
rows=sheet.max_row or 0,
columns=sheet.max_column or 0,
preview=preview,
)
)
finally:
workbook.close()
return ExcelSummary(
filename=path.name,
relpath=path.relative_to(output_dir).as_posix(),
sheets=sheets,
)
def _cell_to_text(value: object) -> str:
if value is None:
return ""
text = str(value)
return text if len(text) <= 80 else text[:77] + "..."
def _cell_text(value: object) -> str:
if value is None:
return ""
return str(value).strip()