diff --git a/README.md b/README.md index 3e4ae23..4c0c6d8 100644 --- a/README.md +++ b/README.md @@ -31,4 +31,10 @@ V1:zip 解压后包含 `Patients_info.csv`、`Tests_List`、`Tests_Detail_List V2:zip 解压后包含 `Patients_info.csv`,并按患者目录分别保存检测汇总和具体检测,输出多个患者 Excel。 -导出的压缩包默认只包含 Excel 结果,不包含处理日志。系统默认输出全部检测记录,并保留“未匹配检测内容”列作为额外辅助信息。 +导出的压缩包默认只包含 Excel 结果,不包含处理日志。系统默认输出全部检测记录,并可选择是否保留: + +- 基本工作表 +- 未匹配检测内容项 +- 未检测到内容汇总表 + +“未检测到内容汇总”会收集标准字段全部为 `Not_Find`、但存在未匹配检测内容的记录,并按检测原因排序汇总。 diff --git a/app/main.py b/app/main.py index b68b0c1..cb4381b 100644 --- a/app/main.py +++ b/app/main.py @@ -61,6 +61,11 @@ def index() -> str: +
+ + + +
\u9ed8\u8ba4\u8f93\u51fa\u5168\u90e8\u68c0\u6d4b\u8bb0\u5f55\uff0c\u5e76\u4fdd\u7559\u201c{UNMATCHED}\u201d\u5217\uff1b\u8be5\u5217\u4e3a\u989d\u5916\u8f85\u52a9\u4fe1\u606f\uff0c\u4fbf\u4e8e\u6838\u5bf9\u672a\u5f52\u7c7b\u9879\u76ee\u3002
@@ -76,6 +81,9 @@ async def process( data_type: str = Form("pat_no"), result_name: str = Form("Result"), preview_rows: int = Form(20), + include_basic_sheets: str | None = Form(None), + include_unmatched_items: str | None = Form(None), + include_summary_sheet: str | None = Form(None), ) -> str: if not file.filename or not file.filename.lower().endswith(".zip"): raise HTTPException(status_code=400, detail="\u8bf7\u4e0a\u4f20 zip \u6587\u4ef6\u3002") @@ -97,6 +105,9 @@ async def process( show_not_match=True, show_all_infos=True, preview_rows=rows, + include_basic_sheets=include_basic_sheets == "true", + include_unmatched_items=include_unmatched_items == "true", + include_summary_sheet=include_summary_sheet == "true", ) except ProcessingError as exc: raise HTTPException(status_code=400, detail=str(exc)) from exc @@ -308,6 +319,20 @@ def _page_shell(body: str, subtitle: str = "\u4e0a\u4f20\u201c\u5f85\u5904\u7406 grid-template-columns: repeat(auto-fit, minmax(220px, 1fr)); gap: 12px; }} + .checks {{ + display: flex; + flex-wrap: wrap; + gap: 14px; + color: var(--text); + font-size: 14px; + }} + .checks label {{ + display: inline-flex; + align-items: center; + gap: 8px; + margin: 0; + font-weight: 500; + }} button, .button, .small-button, .ghost {{ display: inline-flex; align-items: center; @@ -414,8 +439,8 @@ def _page_shell(body: str, subtitle: str = "\u4e0a\u4f20\u201c\u5f85\u5904\u7406 .table-wrap {{ width: 100%; max-width: 100%; - overflow-x: auto; - overflow-y: hidden; + max-height: 620px; + overflow: auto; background: #fff; }} table {{ diff --git a/app/processor.py b/app/processor.py index a263c83..79173ba 100644 --- a/app/processor.py +++ b/app/processor.py @@ -7,6 +7,7 @@ from dataclasses import dataclass from pathlib import Path from openpyxl import load_workbook +from openpyxl.styles import Font PROCESSOR_DIR = Path(__file__).resolve().parent / "processors" @@ -49,6 +50,9 @@ def run_processing( show_not_match: bool, show_all_infos: bool, preview_rows: int = 20, + include_basic_sheets: bool = True, + include_unmatched_items: bool = True, + include_summary_sheet: bool = True, ) -> ProcessingResult: if mode not in {"auto", "v1", "v2"}: raise ProcessingError("处理模式不正确。") @@ -126,6 +130,12 @@ def run_processing( for xlsx_file in xlsx_files: _remove_default_empty_sheet(xlsx_file) + _postprocess_workbook( + xlsx_file, + include_basic_sheets=include_basic_sheets, + include_unmatched_items=include_unmatched_items, + include_summary_sheet=include_summary_sheet, + ) result_zip = job_dir / "result.zip" _create_result_zip(output_dir, result_zip) @@ -267,6 +277,164 @@ def _remove_default_empty_sheet(path: Path) -> None: workbook.close() +def _postprocess_workbook( + path: Path, + include_basic_sheets: bool, + include_unmatched_items: bool, + include_summary_sheet: bool, +) -> None: + workbook = load_workbook(path) + try: + summary_records = _collect_summary_records(workbook) + + if not include_unmatched_items: + _remove_unmatched_columns(workbook) + + if include_summary_sheet: + _replace_summary_sheet(workbook, summary_records) + + if not include_basic_sheets: + for sheet in list(workbook.worksheets): + if sheet.title != "未检测到内容汇总": + workbook.remove(sheet) + + if not workbook.worksheets: + workbook.create_sheet("未检测到内容汇总") + + workbook.save(path) + finally: + workbook.close() + + +def _collect_summary_records(workbook) -> list[dict[str, object]]: + records: list[dict[str, object]] = [] + for sheet in workbook.worksheets: + if sheet.title == "未检测到内容汇总" or sheet.max_row < 2: + continue + header = [_cell_text(sheet.cell(1, col).value) for col in range(1, sheet.max_column + 1)] + unmatched_col = _find_header_index(header, "未匹配检测内容") + if unmatched_col is None: + continue + + for row_index in range(2, sheet.max_row + 1): + base_values = [_cell_text(sheet.cell(row_index, col).value) for col in range(1, 5)] + if not any(base_values): + continue + standard_values = [ + _cell_text(sheet.cell(row_index, col).value) + for col in range(5, unmatched_col + 1) + ] + unmatched_values = [ + _cell_text(sheet.cell(row_index, col).value) + for col in range(unmatched_col + 1, sheet.max_column + 1) + ] + unmatched_values = [value for value in unmatched_values if value] + if not unmatched_values: + continue + if not _all_standard_values_missing(standard_values): + continue + records.append( + { + "name": base_values[0], + "patient_id": base_values[1], + "sample_time": base_values[2], + "reason": base_values[3], + "sheet": sheet.title, + "items": _parse_unmatched_items(unmatched_values), + } + ) + return records + + +def _replace_summary_sheet(workbook, records: list[dict[str, object]]) -> None: + if "未检测到内容汇总" in workbook.sheetnames: + workbook.remove(workbook["未检测到内容汇总"]) + summary = workbook.create_sheet("未检测到内容汇总", 0) + if not records: + summary.append(["姓名", "住院号", "采样时间", "检测原因"]) + return + + records = sorted(records, key=lambda item: (_cell_text(item["reason"]), _cell_text(item["sample_time"]))) + reasons = [] + for record in records: + reason = _cell_text(record["reason"]) + if reason not in reasons: + reasons.append(reason) + + for reason in reasons: + group = [record for record in records if _cell_text(record["reason"]) == reason] + item_names: list[str] = [] + for record in group: + for item_name in record["items"]: + if item_name not in item_names: + item_names.append(item_name) + + reason_label = f"检测原因(下方都是{reason}原因)" if reason else "检测原因" + header = ["姓名", "住院号", "采样时间", reason_label] + item_names + summary.append(header) + for cell in summary[summary.max_row]: + cell.font = Font(bold=True) + + for record in group: + item_values = record["items"] + summary.append( + [ + record["name"], + record["patient_id"], + record["sample_time"], + record["reason"], + ] + + [item_values.get(item_name, "") for item_name in item_names] + ) + summary.append([]) + + for column_cells in summary.columns: + max_length = max(len(_cell_text(cell.value)) for cell in column_cells) + summary.column_dimensions[column_cells[0].column_letter].width = min(max(max_length + 2, 12), 36) + + +def _remove_unmatched_columns(workbook) -> None: + for sheet in workbook.worksheets: + if sheet.title == "未检测到内容汇总" or sheet.max_row < 1: + continue + header = [_cell_text(sheet.cell(1, col).value) for col in range(1, sheet.max_column + 1)] + unmatched_col = _find_header_index(header, "未匹配检测内容") + if unmatched_col is not None: + sheet.delete_cols(unmatched_col + 1, sheet.max_column - unmatched_col) + + +def _find_header_index(header: list[str], name: str) -> int | None: + for index, value in enumerate(header): + if value == name: + return index + return None + + +def _all_standard_values_missing(values: list[str]) -> bool: + non_empty_values = [value for value in values if value] + return bool(non_empty_values) and all(value == "Not_Find" for value in non_empty_values) + + +def _parse_unmatched_items(values: list[str]) -> dict[str, str]: + items: dict[str, list[str]] = {} + for value in values: + item_name, item_value = _split_unmatched_value(value) + if not item_name: + continue + items.setdefault(item_name, []) + if item_value and item_value not in items[item_name]: + items[item_name].append(item_value) + return {name: ";".join(item_values) for name, item_values in items.items()} + + +def _split_unmatched_value(value: str) -> tuple[str, str]: + for separator in (":", ":"): + if separator in value: + name, result = value.split(separator, 1) + return name.strip(), result.strip() + return value.strip(), "" + + def _is_empty_sheet(sheet) -> bool: for row in sheet.iter_rows(values_only=True): for value in row: @@ -306,3 +474,9 @@ def _cell_to_text(value: object) -> str: return "" text = str(value) return text if len(text) <= 80 else text[:77] + "..." + + +def _cell_text(value: object) -> str: + if value is None: + return "" + return str(value).strip() diff --git a/app/processors/V1-ALL_convert_Lab_Test_data.py b/app/processors/V1-ALL_convert_Lab_Test_data.py index d1e9f54..9bf717b 100644 --- a/app/processors/V1-ALL_convert_Lab_Test_data.py +++ b/app/processors/V1-ALL_convert_Lab_Test_data.py @@ -332,8 +332,8 @@ for pat_no in pat_no_col: if row_test_name_exist == True: break # 如果没有row_test_name_exist的话,将信息加入行信息中 - if row_test_name_exist == False: - rows_not_match.append(row_test_name) + if row_test_name_exist == False: + rows_not_match.append(f"{row_test_name}:{row_2.get(test_result_col_name, '')}") # 如果没有寻找到对应test_check_result的话,设置为Not_Find for test_result_name in test_check_list: @@ -361,4 +361,3 @@ save_excel() - diff --git a/app/processors/V2-Every_Pat_File_convert_Lab_Test_data.py b/app/processors/V2-Every_Pat_File_convert_Lab_Test_data.py index f56601d..264e9e1 100644 --- a/app/processors/V2-Every_Pat_File_convert_Lab_Test_data.py +++ b/app/processors/V2-Every_Pat_File_convert_Lab_Test_data.py @@ -297,8 +297,8 @@ for pat_no in os.listdir(file_dir): # 遍历 file_dir 下的所有文件和文 if row_test_name_exist == True: break # 如果没有row_test_name_exist的话,将信息加入行信息中 - if row_test_name_exist == False: - rows_not_match.append(row_test_name) + if row_test_name_exist == False: + rows_not_match.append(f"{row_test_name}:{row_2.get(test_result_col_name, '')}") # 如果没有寻找到对应test_check_result的话,设置为Not_Find for test_result_name in test_check_list: @@ -322,4 +322,4 @@ for pat_no in os.listdir(file_dir): # 遍历 file_dir 下的所有文件和文 add_content_to_excel(result_save_pth, test['test_check_name'], excel_head + excel_basic + excel_fromat_result) # 每处理一个患者数据,保存相关信息 - save_excel() \ No newline at end of file + save_excel()