add unmatched summary export options

This commit is contained in:
2026-05-08 22:35:21 +08:00
parent 1be6cb287e
commit 11600e7d09
5 changed files with 213 additions and 9 deletions

View File

@@ -7,6 +7,7 @@ from dataclasses import dataclass
from pathlib import Path
from openpyxl import load_workbook
from openpyxl.styles import Font
PROCESSOR_DIR = Path(__file__).resolve().parent / "processors"
@@ -49,6 +50,9 @@ def run_processing(
show_not_match: bool,
show_all_infos: bool,
preview_rows: int = 20,
include_basic_sheets: bool = True,
include_unmatched_items: bool = True,
include_summary_sheet: bool = True,
) -> ProcessingResult:
if mode not in {"auto", "v1", "v2"}:
raise ProcessingError("处理模式不正确。")
@@ -126,6 +130,12 @@ def run_processing(
for xlsx_file in xlsx_files:
_remove_default_empty_sheet(xlsx_file)
_postprocess_workbook(
xlsx_file,
include_basic_sheets=include_basic_sheets,
include_unmatched_items=include_unmatched_items,
include_summary_sheet=include_summary_sheet,
)
result_zip = job_dir / "result.zip"
_create_result_zip(output_dir, result_zip)
@@ -267,6 +277,164 @@ def _remove_default_empty_sheet(path: Path) -> None:
workbook.close()
def _postprocess_workbook(
path: Path,
include_basic_sheets: bool,
include_unmatched_items: bool,
include_summary_sheet: bool,
) -> None:
workbook = load_workbook(path)
try:
summary_records = _collect_summary_records(workbook)
if not include_unmatched_items:
_remove_unmatched_columns(workbook)
if include_summary_sheet:
_replace_summary_sheet(workbook, summary_records)
if not include_basic_sheets:
for sheet in list(workbook.worksheets):
if sheet.title != "未检测到内容汇总":
workbook.remove(sheet)
if not workbook.worksheets:
workbook.create_sheet("未检测到内容汇总")
workbook.save(path)
finally:
workbook.close()
def _collect_summary_records(workbook) -> list[dict[str, object]]:
records: list[dict[str, object]] = []
for sheet in workbook.worksheets:
if sheet.title == "未检测到内容汇总" or sheet.max_row < 2:
continue
header = [_cell_text(sheet.cell(1, col).value) for col in range(1, sheet.max_column + 1)]
unmatched_col = _find_header_index(header, "未匹配检测内容")
if unmatched_col is None:
continue
for row_index in range(2, sheet.max_row + 1):
base_values = [_cell_text(sheet.cell(row_index, col).value) for col in range(1, 5)]
if not any(base_values):
continue
standard_values = [
_cell_text(sheet.cell(row_index, col).value)
for col in range(5, unmatched_col + 1)
]
unmatched_values = [
_cell_text(sheet.cell(row_index, col).value)
for col in range(unmatched_col + 1, sheet.max_column + 1)
]
unmatched_values = [value for value in unmatched_values if value]
if not unmatched_values:
continue
if not _all_standard_values_missing(standard_values):
continue
records.append(
{
"name": base_values[0],
"patient_id": base_values[1],
"sample_time": base_values[2],
"reason": base_values[3],
"sheet": sheet.title,
"items": _parse_unmatched_items(unmatched_values),
}
)
return records
def _replace_summary_sheet(workbook, records: list[dict[str, object]]) -> None:
if "未检测到内容汇总" in workbook.sheetnames:
workbook.remove(workbook["未检测到内容汇总"])
summary = workbook.create_sheet("未检测到内容汇总", 0)
if not records:
summary.append(["姓名", "住院号", "采样时间", "检测原因"])
return
records = sorted(records, key=lambda item: (_cell_text(item["reason"]), _cell_text(item["sample_time"])))
reasons = []
for record in records:
reason = _cell_text(record["reason"])
if reason not in reasons:
reasons.append(reason)
for reason in reasons:
group = [record for record in records if _cell_text(record["reason"]) == reason]
item_names: list[str] = []
for record in group:
for item_name in record["items"]:
if item_name not in item_names:
item_names.append(item_name)
reason_label = f"检测原因(下方都是{reason}原因)" if reason else "检测原因"
header = ["姓名", "住院号", "采样时间", reason_label] + item_names
summary.append(header)
for cell in summary[summary.max_row]:
cell.font = Font(bold=True)
for record in group:
item_values = record["items"]
summary.append(
[
record["name"],
record["patient_id"],
record["sample_time"],
record["reason"],
]
+ [item_values.get(item_name, "") for item_name in item_names]
)
summary.append([])
for column_cells in summary.columns:
max_length = max(len(_cell_text(cell.value)) for cell in column_cells)
summary.column_dimensions[column_cells[0].column_letter].width = min(max(max_length + 2, 12), 36)
def _remove_unmatched_columns(workbook) -> None:
for sheet in workbook.worksheets:
if sheet.title == "未检测到内容汇总" or sheet.max_row < 1:
continue
header = [_cell_text(sheet.cell(1, col).value) for col in range(1, sheet.max_column + 1)]
unmatched_col = _find_header_index(header, "未匹配检测内容")
if unmatched_col is not None:
sheet.delete_cols(unmatched_col + 1, sheet.max_column - unmatched_col)
def _find_header_index(header: list[str], name: str) -> int | None:
for index, value in enumerate(header):
if value == name:
return index
return None
def _all_standard_values_missing(values: list[str]) -> bool:
non_empty_values = [value for value in values if value]
return bool(non_empty_values) and all(value == "Not_Find" for value in non_empty_values)
def _parse_unmatched_items(values: list[str]) -> dict[str, str]:
items: dict[str, list[str]] = {}
for value in values:
item_name, item_value = _split_unmatched_value(value)
if not item_name:
continue
items.setdefault(item_name, [])
if item_value and item_value not in items[item_name]:
items[item_name].append(item_value)
return {name: "".join(item_values) for name, item_values in items.items()}
def _split_unmatched_value(value: str) -> tuple[str, str]:
for separator in ("", ":"):
if separator in value:
name, result = value.split(separator, 1)
return name.strip(), result.strip()
return value.strip(), ""
def _is_empty_sheet(sheet) -> bool:
for row in sheet.iter_rows(values_only=True):
for value in row:
@@ -306,3 +474,9 @@ def _cell_to_text(value: object) -> str:
return ""
text = str(value)
return text if len(text) <= 80 else text[:77] + "..."
def _cell_text(value: object) -> str:
if value is None:
return ""
return str(value).strip()