diff --git a/app/main.py b/app/main.py index 8c0cc3c..ff5c2ee 100644 --- a/app/main.py +++ b/app/main.py @@ -32,6 +32,7 @@ SORT_ORDERS = { "asc": "\u5347\u5e8f", "desc": "\u964d\u5e8f", } +MAX_PREVIEW_ROWS = 10000 app = FastAPI(title=APP_TITLE) @@ -68,7 +69,7 @@ def index() -> str:
- +
@@ -122,7 +123,7 @@ async def process( @app.get("/result/{job_id}", response_class=HTMLResponse) def result_page( job_id: str, - preview_rows: int = Query(20, ge=5, le=200), + preview_rows: int = Query(20, ge=5, le=MAX_PREVIEW_ROWS), include_basic_sheets: bool = Query(True), include_unmatched_items: bool = Query(True), include_summary_sheet: bool = Query(True), @@ -225,7 +226,7 @@ def _get_job_dir(job_id: str) -> Path: def _clean_preview_rows(preview_rows: int) -> int: - return max(5, min(int(preview_rows or 20), 200)) + return max(5, min(int(preview_rows or 20), MAX_PREVIEW_ROWS)) def _checked(value: bool) -> str: @@ -282,7 +283,7 @@ def _render_result(
- +
\u7ed3\u679c\u4e2d\u201c{UNMATCHED}\u201d\u5217\u4e3a\u989d\u5916\u8f85\u52a9\u4fe1\u606f\uff0c\u7528\u6765\u6807\u660e\u672a\u5f52\u5165\u6807\u51c6\u9879\u76ee\u7684\u68c0\u6d4b\u5185\u5bb9\u3002
diff --git a/app/processor.py b/app/processor.py index 26d1b1b..aa59f13 100644 --- a/app/processor.py +++ b/app/processor.py @@ -16,8 +16,10 @@ from openpyxl.styles import Font PROCESSOR_DIR = Path(__file__).resolve().parent / "processors" SUMMARY_SHEET_NAME = "\u672a\u68c0\u6d4b\u5230\u5185\u5bb9\u6c47\u603b" +UNASSIGNED_SHEET_NAME = "\u672a\u5f52\u5c5e\u68c0\u6d4b\u5185\u5bb9" UNMATCHED_HEADER = "\u672a\u5339\u914d\u68c0\u6d4b\u5185\u5bb9" UNMATCHED_FILL = PatternFill(fill_type="solid", fgColor="FCE4D6") +MAX_PREVIEW_ROWS = 10000 SORT_FIELDS = {"none", "name", "sample_time", "reason"} SORT_ORDERS = {"asc", "desc"} @@ -556,7 +558,7 @@ def _remove_not_found_rows(workbook) -> None: def _collect_summary_records(workbook) -> list[dict[str, object]]: records: list[dict[str, object]] = [] for sheet in workbook.worksheets: - if sheet.title == SUMMARY_SHEET_NAME or sheet.max_row < 2: + if sheet.title != UNASSIGNED_SHEET_NAME or sheet.max_row < 2: continue header = [_cell_text(sheet.cell(1, col).value) for col in range(1, sheet.max_column + 1)] unmatched_col = _find_header_index(header, UNMATCHED_HEADER) @@ -570,7 +572,7 @@ def _collect_summary_records(workbook) -> list[dict[str, object]]: _cell_text(sheet.cell(row_index, col).value) for col in range(5, standard_end + 1) ] - if not _all_standard_values_missing(standard_values): + if standard_values and not _all_standard_values_missing(standard_values): continue item_values = _summary_item_values(sheet, row_index, header, unmatched_col, standard_end) if not item_values: @@ -874,7 +876,7 @@ def _summarize_workbook(path: Path, output_dir: Path, preview_rows: int) -> Exce try: for sheet in workbook.worksheets: preview: list[list[str]] = [] - for row in sheet.iter_rows(max_row=max(2, min(preview_rows, 200)), values_only=True): + for row in sheet.iter_rows(max_row=max(2, min(preview_rows, MAX_PREVIEW_ROWS)), values_only=True): preview.append([_cell_to_text(value) for value in row]) sheets.append( SheetSummary( diff --git a/app/processors/V1-ALL_convert_Lab_Test_data.py b/app/processors/V1-ALL_convert_Lab_Test_data.py index 9bf717b..4506918 100644 --- a/app/processors/V1-ALL_convert_Lab_Test_data.py +++ b/app/processors/V1-ALL_convert_Lab_Test_data.py @@ -3,6 +3,7 @@ import csv, sys, os, copy, re import os.path as osp from openpyxl import Workbook, load_workbook +from dynamic_router import UNASSIGNED_SHEET_NAME, UNMATCHED_HEADER, append_routed_report, route_detail_rows # 向特定excel的sheet中添加内容 workbook = None # 全局变量,初始值为 None @@ -158,8 +159,8 @@ ALL_tests = [# 血细胞 # (1):血细胞分析+五分类;(2):血细胞分析+ {'test_rptunitid':[str(3), str(110),str(90)], 'test_check_name':'凝血', 'test_check_list':["凝血酶原时间", "凝血酶原活动度", "血浆凝血酶原时间比值", "凝血酶原国际标准化比值", "活化部分凝血活酶时间", "活化部分凝血活酶比值", "凝血酶时间", "凝血酶时间比值", "纤维蛋白原含量", "D-二聚体测定", "纤维蛋白原降解产物", "凝血酶生成时间","凝血速率","血小板功能"],\ 'test_check_list_all':["凝血酶原时间*", ["凝血酶原活动度", r"凝血酶原活度\(%\)"], ["凝血酶原比值", "血浆凝血酶原时间比值"], ["凝血酶原标准化比值", "凝血酶原国际标准化比值"], ["活化部分凝血活酶时间*", "活化部分凝血酶时间*"], "活化部分凝血活酶比值", "凝血酶时间", "凝血酶时间比值", "纤维蛋白原含量", [r"D-二聚体\(sysmex\)", "D-二聚体测定*"], [r"纤维蛋白\(原\)降解产物", "纤维蛋白原降解产物"], "凝血酶生成时间","凝血速率","血小板功能"], 'test_result_col_name':"result_str"}, # 肝功 # (5):平诊肝功十四项+平诊电解质八项+平诊肾功七项;(20)急诊肝功十二项_急诊肾功五项[复]_急诊电解质七项[复];东院区:(108):传染性指标检测八项 - {'test_rptunitid':[str(5), str(20), str(108)], 'test_check_name':'肝功', 'test_check_list':["谷草转氨酶", "谷丙转氨酶", "谷草/谷丙", "碱性磷酸酶", "γ谷氨酰氨转肽酶", "总胆红素", "直接胆红素", "间接胆红素", "胆碱脂酶", "总胆固醇", "总蛋白", "白蛋白", "球蛋白", "白球比", "总胆汁酸", "a-L-岩藻糖苷酶", "前白蛋白", "超氧化物歧化酶", "尿素", "肌酐", "胱抑素C", "葡萄糖", "尿酸", "钾", "钠", "氯", "磷", "钙", "镁", "二氧化碳结合率", "离子间隙", "糖化白蛋白", "视黄醇结合蛋白", "eGFR(CKD-EPI)", "a-l岩藻糖苷酶", "超氧化物歧化酶", "eGFR(MDRD)", r"eGFR单位ml/min/1.73m^2", "8597", "乳酸测定", "乳酸脱氢酶", "羟丁酸脱氢酶", "肌酸激酶", "肌酸激酶同工酶", "载脂蛋白A", "载脂蛋白B", "载脂蛋白E", "脂蛋白(a)", "缺血修饰白蛋白", "甘油三酯", "高密度脂蛋白", "低密度脂蛋白", ], \ - 'test_check_list_all':["谷草转氨酶", "谷丙转氨酶", "谷草/谷丙", "碱性磷酸酶", ["γ谷氨酰氨转肽酶", "γ-谷氨酰转肽酶"], "总胆红素", "直接胆红素", "间接胆红素", "胆碱脂酶", "总胆固醇", "总蛋白", "白蛋白", "球蛋白", "白球比", "总胆汁酸", "a-L-岩藻糖苷酶", "前白蛋白", "超氧化物歧化酶", "尿素", "肌酐", "胱抑素C", "葡萄糖", "尿酸", "钾", "钠", "氯", "磷", "钙", "镁", "二氧化碳结合率", "离子间隙", "糖化白蛋白", "视黄醇结合蛋白", r"eGFR\(CKD-EPI\)", "a-l岩藻糖苷酶", "超氧化物歧化酶", r"eGFR\(MDRD\)", r"eGFR单位ml/min/1.73m\^2", "8597", "乳酸测定", "乳酸脱氢酶", "羟丁酸脱氢酶", "肌酸激酶", "肌酸激酶同工酶", "载脂蛋白A", "载脂蛋白B", "载脂蛋白E", "脂蛋白(a)", "缺血修饰白蛋白", "甘油三酯", "高密度脂蛋白", "低密度脂蛋白", ], 'test_result_col_name':"result_str"}, + {'test_rptunitid':[str(5), str(20), str(108)], 'test_check_name':'肝功', 'test_check_list':["谷草转氨酶", "谷丙转氨酶", "谷草/谷丙", "碱性磷酸酶", "γ谷氨酰氨转肽酶", "总胆红素", "直接胆红素", "间接胆红素", "胆碱酯酶", "总胆固醇", "总蛋白", "白蛋白", "球蛋白", "白球比", "总胆汁酸", "a-L-岩藻糖苷酶", "前白蛋白", "超氧化物歧化酶", "尿素", "肌酐", "胱抑素C", "葡萄糖", "尿酸", "钾", "钠", "氯", "磷", "钙", "镁", "二氧化碳结合率", "离子间隙", "糖化白蛋白", "视黄醇结合蛋白", "eGFR(CKD-EPI)", "a-l岩藻糖苷酶", "超氧化物歧化酶", "eGFR(MDRD)", r"eGFR单位ml/min/1.73m^2", "8597", "乳酸测定", "乳酸脱氢酶", "羟丁酸脱氢酶", "肌酸激酶", "肌酸激酶同工酶", "载脂蛋白A", "载脂蛋白B", "载脂蛋白E", "脂蛋白(a)", "缺血修饰白蛋白", "甘油三酯", "高密度脂蛋白", "低密度脂蛋白", ], \ + 'test_check_list_all':["谷草转氨酶", "谷丙转氨酶", "谷草/谷丙", "碱性磷酸酶", ["γ谷氨酰氨转肽酶", "γ-谷氨酰转肽酶"], "总胆红素", "直接胆红素", "间接胆红素", "胆碱酯酶", "总胆固醇", "总蛋白", "白蛋白", "球蛋白", "白球比", "总胆汁酸", "a-L-岩藻糖苷酶", "前白蛋白", "超氧化物歧化酶", "尿素", "肌酐", "胱抑素C", "葡萄糖", "尿酸", "钾", "钠", "氯", "磷", "钙", "镁", "二氧化碳结合率", "离子间隙", "糖化白蛋白", "视黄醇结合蛋白", r"eGFR\(CKD-EPI\)", "a-l岩藻糖苷酶", "超氧化物歧化酶", r"eGFR\(MDRD\)", r"eGFR单位ml/min/1.73m\^2", "8597", "乳酸测定", "乳酸脱氢酶", "羟丁酸脱氢酶", "肌酸激酶", "肌酸激酶同工酶", "载脂蛋白A", "载脂蛋白B", "载脂蛋白E", "脂蛋白(a)", "缺血修饰白蛋白", "甘油三酯", "高密度脂蛋白", "低密度脂蛋白", ], 'test_result_col_name':"result_str"}, # 各类肿瘤标志物 # (6):各类肿瘤标志物,e.g.肿瘤标志物肺癌六项[复]、肿瘤标志物十项(男);东院区:(108):肿瘤标志物肺癌六项[复] {'test_rptunitid':[str(6), str(108)], 'test_check_name':'各类肿瘤标志物', 'test_check_list':["胃泌素释放肽前体(罗氏)", "鳞状上皮细胞癌抗原(罗氏)", "癌胚抗原", "甲胎蛋白", "糖类抗原125", "糖类抗原199", "糖类抗原724", "细胞角蛋白19片段", "神经元特异性烯醇化酶测定", "总前列腺特异性抗原", "游离前列腺抗原", "游离/总", "糖类抗原153","人附睾蛋白","绝经前 ROMA","绝经后 ROMA","铁蛋白","叶酸","维生素B12","高尔基体蛋白73测定","胃蛋白酶原Ⅰ","胃蛋白酶原Ⅱ","胃蛋白酶原Ⅰ/Ⅱ","降钙素","甲状腺球蛋白"],\ 'test_check_list_all':[r"胃泌素释放肽前体\(罗氏\)", ["鳞状上皮细胞癌相关抗原", r"鳞状上皮细胞癌抗原\(罗氏\)"], "癌胚抗原", "甲胎蛋白", "糖类抗原125", "糖类抗原199", "糖类抗原724", "细胞角蛋白19片段", "神经元特异性烯醇化酶测定", "总前列腺特异性抗原", "游离前列腺抗原", "游离/总", "糖类抗原153","人附睾蛋白","绝经前 ROMA","绝经后 ROMA","铁蛋白","叶酸","维生素B12","高尔基体蛋白73测定","胃蛋白酶原Ⅰ","胃蛋白酶原Ⅱ","胃蛋白酶原Ⅰ/Ⅱ","降钙素","甲状腺球蛋白"], 'test_result_col_name':"result_str"}, @@ -215,6 +216,7 @@ for test in ALL_tests: # 各项检测信息 add_content_to_excel(result_save_pth, test['test_check_name'], front_content + basic_content + test["test_check_list"] + ['未匹配检测内容']) else: add_content_to_excel(result_save_pth, test['test_check_name'], front_content + basic_content + test["test_check_list"]) +add_content_to_excel(result_save_pth, UNASSIGNED_SHEET_NAME, front_content + basic_content + [UNMATCHED_HEADER]) ####### 获取所有患者头部信息(Front_line) ####### @@ -251,113 +253,37 @@ for pat_no in pat_no_col: Error(str(pat_no)+"的excel_head为空") exit() print("__处理患者头为__:", excel_head) - # 遍历所有rptunitid - for test in ALL_tests: - # 特定检查rptunitid - test_rptunitid = test['test_rptunitid'] - if not isinstance(test_rptunitid, list) and not isinstance(test_rptunitid, tuple): - test_rptunitid = [test_rptunitid] - # 检查名 - test_check_name = test['test_check_name'] - # 检查项 - test_check_list_all = test['test_check_list_all'] - # 检查项_名 - test_check_list = test['test_check_list'] - if len(test_check_list_all) != len(test_check_list): - Error("test_check_list_all长度和test_check_list不同") - print(f"test_check_list_all:{test_check_list_all}\ntest_check_list:{test_check_list}") - exit() - # 检查结果所在列的列名 - test_result_col_name = test['test_result_col_name'] - # 存放检查结果的dict - test_check_result = {} - - print("获取患者 ", test_check_name, " 检查结果") - - ### 打开patno/pat_no.csv文件获取更详细信息 ### - with open(osp.join(patno_dir, pat_no+'.csv'), "r", encoding='utf-8-sig') as file: - reader = csv.DictReader(file) # 读取文件 - # 汇总特定检查行 - test_rptunitid_rows = [ row for row in reader if row['rptunitid'] in test_rptunitid ] - print("患者检查 ", test_check_name, "次数为", len(test_rptunitid_rows)) + with open(osp.join(patno_dir, pat_no+'.csv'), "r", encoding='utf-8-sig') as file: + reader = list(csv.DictReader(file)) + print("患者检查总次数为", len(reader)) - # 遍历所有特定检查行,提取关键信息 - rows_not_match = [] # 每一行内不匹配的内容 - for i in range(len(test_rptunitid_rows)): - print("处理患者第", i+1, "次检查") - row_1 = test_rptunitid_rows[i] - sampled_dt = row_1['sampled_dt'] # 获取时间信息 - ###### basic信息头生成 ###### - excel_basic = [] - for item in Basic_line: - excel_basic.append(row_1[Basic_line[item]]) - - ### 打开Patient_detail_infos/pat_no/文件 ###,获取特定检查文件存储路径 - if data_type == "zhuyuanhao": - row_file_path = os.path.join(Patient_detail_infos_dir, pat_no, 'None' + "_" + row_1['reporttype'] + "_" + row_1['rptunitid'] + "_" + row_1['reportid'].replace("_","-") + "_" + row_1['rechkdt'].replace(" ","-").replace(":","-").replace("_","-") +".csv") - elif data_type == "pat_no": - row_file_path = os.path.join(Patient_detail_infos_dir, pat_no, pat_no + "_" + row_1['reporttype'] + "_" + row_1['rptunitid'] + "_" + row_1['reportid'].replace("_","-") + "_" + row_1['rechkdt'].replace(" ","-").replace(":","-").replace("_","-") +".csv") - with open(row_file_path, "r") as row_file: - row_reader_ = csv.DictReader(row_file) # 读取文件,迭代器只能读取一次 - row_reader = [] - for r in row_reader_: - row_reader.append(r) - # 寻找所有检查项的值 - match = False # 检测内容中是否含有检测项目 - rows_not_match = [] # 每一行内不匹配的内容 - # 遍历所有行,查看对应检查结果 - for row_2 in row_reader: - row_test_name_exist = False - row_test_name = row_2['rpt_itemname'] - # 遍历所有待检查项 - for j in range(len(test_check_list_all)): - # 检查项名称 - test_result_name = test_check_list[j] - # 遍历所有完整版检查项目 - test_checks = test_check_list_all[j] - if isinstance(test_checks, str): - test_checks = [test_checks] - for test_check in test_checks: - # print(test_check, row_2['rpt_itemname'],"___") - if match_re(row_test_name, test_check):# test_check == row_test_name: # 检测项目属于其中 - match = True - row_test_name_exist = True - # temp临时存储变量,如果其为'.',则结果变为'None' - temp = row_2[test_result_col_name] - if (temp == '.' or temp == '') : - temp = 'None' - test_check_result[test_result_name] = temp - break - # 如果结果中存在row_test_name_exist的话,退出循环 - if row_test_name_exist == True: - break - # 如果没有row_test_name_exist的话,将信息加入行信息中 - if row_test_name_exist == False: - rows_not_match.append(f"{row_test_name}:{row_2.get(test_result_col_name, '')}") + for i, row_1 in enumerate(reader): + print("处理患者第", i+1, "次检查") + excel_basic = [] + for item in Basic_line: + excel_basic.append(row_1[Basic_line[item]]) - # 如果没有寻找到对应test_check_result的话,设置为Not_Find - for test_result_name in test_check_list: - if not test_result_name in test_check_result: - test_check_result[test_result_name] = 'Not_Find' + if data_type == "zhuyuanhao": + row_file_path = os.path.join(Patient_detail_infos_dir, pat_no, 'None' + "_" + row_1['reporttype'] + "_" + row_1['rptunitid'] + "_" + row_1['reportid'].replace("_","-") + "_" + row_1['rechkdt'].replace(" ","-").replace(":","-").replace("_","-") +".csv") + elif data_type == "pat_no": + row_file_path = os.path.join(Patient_detail_infos_dir, pat_no, pat_no + "_" + row_1['reporttype'] + "_" + row_1['rptunitid'] + "_" + row_1['reportid'].replace("_","-") + "_" + row_1['rechkdt'].replace(" ","-").replace(":","-").replace("_","-") +".csv") - # 如果没有检测到相匹配内容,且不输出所有信息,continue - if (match == False and show_all_infos == False): - continue - # 进行进一步操作 ing... - - - # 抽取结果格式转换 - excel_fromat_result = [] - for test_item in test['test_check_list']: - excel_fromat_result.append(test_check_result[test_item]) - # print("输出结果:", excel_fromat_result) - if show_not_match == True: - add_content_to_excel(result_save_pth, test['test_check_name'], excel_head + excel_basic + excel_fromat_result + rows_not_match) - else: - add_content_to_excel(result_save_pth, test['test_check_name'], excel_head + excel_basic + excel_fromat_result) + with open(row_file_path, "r", encoding='utf-8-sig') as row_file: + detail_rows = list(csv.DictReader(row_file)) + + sheet_results, unassigned_items = route_detail_rows(detail_rows, ALL_tests) + append_routed_report( + add_content_to_excel, + result_save_pth, + ALL_tests, + excel_head, + excel_basic, + sheet_results, + unassigned_items, + show_not_match, + ) # 每处理一个患者数据,保存相关信息 save_excel() - diff --git a/app/processors/V2-Every_Pat_File_convert_Lab_Test_data.py b/app/processors/V2-Every_Pat_File_convert_Lab_Test_data.py index 264e9e1..6d57844 100644 --- a/app/processors/V2-Every_Pat_File_convert_Lab_Test_data.py +++ b/app/processors/V2-Every_Pat_File_convert_Lab_Test_data.py @@ -4,6 +4,7 @@ import csv, sys, os, copy, re, argparse import os.path as osp from openpyxl import Workbook, load_workbook from V2_Data import Front_line, Basic_line, ALL_tests +from dynamic_router import UNASSIGNED_SHEET_NAME, UNMATCHED_HEADER, append_routed_report, route_detail_rows # 向特定excel的sheet中添加内容 workbook = None # 全局变量,初始值为 None @@ -192,6 +193,7 @@ for pat_no in os.listdir(file_dir): # 遍历 file_dir 下的所有文件和文 add_content_to_excel(result_save_pth, test['test_check_name'], front_content + basic_content + test["test_check_list"] + ['未匹配检测内容']) else: add_content_to_excel(result_save_pth, test['test_check_name'], front_content + basic_content + test["test_check_list"]) + add_content_to_excel(result_save_pth, UNASSIGNED_SHEET_NAME, front_content + basic_content + [UNMATCHED_HEADER]) # 信息头生成 excel_head = [] @@ -216,110 +218,35 @@ for pat_no in os.listdir(file_dir): # 遍历 file_dir 下的所有文件和文 ####### 获取所有患者检测信息(base_line,pat_no/pato_no.csv) ####### # 遍历所有患者(pat_no) - # 遍历所有rptunitid - for test in ALL_tests: - # 特定检查rptunitid - test_rptunitid = test['test_rptunitid'] - if not isinstance(test_rptunitid, list) and not isinstance(test_rptunitid, tuple): - test_rptunitid = [test_rptunitid] - # 检查名 - test_check_name = test['test_check_name'] - # 检查项 - test_check_list_all = test['test_check_list_all'] - # 检查项_名 - test_check_list = test['test_check_list'] - if len(test_check_list_all) != len(test_check_list): - Error("test_check_list_all长度和test_check_list不同", error_dir=osp.join(pat_file_dir, "Error.txt")) - print(f"test_check_list_all:{test_check_list_all}\ntest_check_list:{test_check_list}") - exit() - # 检查结果所在列的列名 - test_result_col_name = test['test_result_col_name'] - # 存放检查结果的dict - test_check_result = {} - - print("获取患者 ", test_check_name, " 检查结果") - - ### 打开patno/pat_no.csv文件获取更详细信息 ### - with open(patno_pth, "r", encoding="utf-8-sig") as file: - reader = csv.DictReader(file) # 读取文件 - # 汇总特定检查行 - test_rptunitid_rows = [ row for row in reader if row['rptunitid'] in test_rptunitid ] - print("患者检查 ", test_check_name, "次数为", len(test_rptunitid_rows)) + with open(patno_pth, "r", encoding="utf-8-sig") as file: + report_rows = list(csv.DictReader(file)) + print("患者检查总次数为", len(report_rows)) - # 遍历所有特定检查行,提取关键信息 - rows_not_match = [] # 每一行内不匹配的内容 - for i in range(len(test_rptunitid_rows)): - print("处理患者第", i+1, "次检查") - row_1 = test_rptunitid_rows[i] - sampled_dt = row_1['sampled_dt'] # 获取时间信息 - ###### basic信息头生成 ###### - excel_basic = [] - for item in Basic_line: - excel_basic.append(row_1[Basic_line[item]]) - - ### 打开Patient_detail_infos/pat_no/文件 ###,获取特定检查文件存储路径 - if data_type == "zhuyuanhao": - row_file_path = os.path.join(Patient_detail_infos_dir, 'None' + "_" + row_1['reporttype'] + "_" + row_1['rptunitid'] + "_" + row_1['reportid'].replace("_","-") + "_" + row_1['rechkdt'].replace(" ","-").replace(":","-").replace("_","-") +".csv") - elif data_type == "pat_no": - row_file_path = os.path.join(Patient_detail_infos_dir, pat_no + "_" + row_1['reporttype'] + "_" + row_1['rptunitid'] + "_" + row_1['reportid'].replace("_","-") + "_" + row_1['rechkdt'].replace(" ","-").replace(":","-").replace("_","-") +".csv") - with open(row_file_path, "r", encoding="utf-8-sig") as row_file: - row_reader_ = csv.DictReader(row_file) # 读取文件,迭代器只能读取一次 - row_reader = [] - for r in row_reader_: - row_reader.append(r) - # 寻找所有检查项的值 - match = False # 检测内容中是否含有检测项目 - rows_not_match = [] # 每一行内不匹配的内容 - # 遍历所有行,查看对应检查结果 - for row_2 in row_reader: - row_test_name_exist = False - row_test_name = row_2['rpt_itemname'] - # 遍历所有待检查项 - for j in range(len(test_check_list_all)): - # 检查项名称 - test_result_name = test_check_list[j] - # 遍历所有完整版检查项目 - test_checks = test_check_list_all[j] - if isinstance(test_checks, str): - test_checks = [test_checks] - for test_check in test_checks: - # print(test_check, row_2['rpt_itemname'],"___") - if match_re(row_test_name, test_check):# test_check == row_test_name: # 检测项目属于其中 - match = True - row_test_name_exist = True - # temp临时存储变量,如果其为'.',则结果变为'None' - temp = row_2[test_result_col_name] - if (temp == '.' or temp == '') : - temp = 'None' - test_check_result[test_result_name] = temp - break - # 如果结果中存在row_test_name_exist的话,退出循环 - if row_test_name_exist == True: - break - # 如果没有row_test_name_exist的话,将信息加入行信息中 - if row_test_name_exist == False: - rows_not_match.append(f"{row_test_name}:{row_2.get(test_result_col_name, '')}") + for i, row_1 in enumerate(report_rows): + print("处理患者第", i+1, "次检查") + excel_basic = [] + for item in Basic_line: + excel_basic.append(row_1[Basic_line[item]]) - # 如果没有寻找到对应test_check_result的话,设置为Not_Find - for test_result_name in test_check_list: - if not test_result_name in test_check_result: - test_check_result[test_result_name] = 'Not_Find' + if data_type == "zhuyuanhao": + row_file_path = os.path.join(Patient_detail_infos_dir, 'None' + "_" + row_1['reporttype'] + "_" + row_1['rptunitid'] + "_" + row_1['reportid'].replace("_","-") + "_" + row_1['rechkdt'].replace(" ","-").replace(":","-").replace("_","-") +".csv") + elif data_type == "pat_no": + row_file_path = os.path.join(Patient_detail_infos_dir, pat_no + "_" + row_1['reporttype'] + "_" + row_1['rptunitid'] + "_" + row_1['reportid'].replace("_","-") + "_" + row_1['rechkdt'].replace(" ","-").replace(":","-").replace("_","-") +".csv") - # 如果没有检测到相匹配内容,且不输出所有信息,continue - if (match == False and show_all_infos == False): - continue - # 进行进一步操作 ing... - - - # 抽取结果格式转换 - excel_fromat_result = [] - for test_item in test['test_check_list']: - excel_fromat_result.append(test_check_result[test_item]) - # print("输出结果:", excel_fromat_result) - if show_not_match == True: - add_content_to_excel(result_save_pth, test['test_check_name'], excel_head + excel_basic + excel_fromat_result + rows_not_match) - else: - add_content_to_excel(result_save_pth, test['test_check_name'], excel_head + excel_basic + excel_fromat_result) + with open(row_file_path, "r", encoding="utf-8-sig") as row_file: + detail_rows = list(csv.DictReader(row_file)) + + sheet_results, unassigned_items = route_detail_rows(detail_rows, ALL_tests) + append_routed_report( + add_content_to_excel, + result_save_pth, + ALL_tests, + excel_head, + excel_basic, + sheet_results, + unassigned_items, + show_not_match, + ) # 每处理一个患者数据,保存相关信息 save_excel() diff --git a/app/processors/V2_Data.py b/app/processors/V2_Data.py index 7a9da95..edb2ca8 100644 --- a/app/processors/V2_Data.py +++ b/app/processors/V2_Data.py @@ -15,8 +15,8 @@ ALL_tests = [# 血细胞 # (1):血细胞分析+五分类;(2):血细胞分析+ {'test_rptunitid':[str(3), str(110),str(90)], 'test_check_name':'凝血', 'test_check_list':["凝血酶原时间", "凝血酶原活动度", "血浆凝血酶原时间比值", "凝血酶原国际标准化比值", "活化部分凝血活酶时间", "活化部分凝血活酶比值", "凝血酶时间", "凝血酶时间比值", "纤维蛋白原含量", "D-二聚体测定", "纤维蛋白原降解产物", "凝血酶生成时间","凝血速率","血小板功能"],\ 'test_check_list_all':["凝血酶原时间*", ["凝血酶原活动度", r"凝血酶原活度\(%\)"], ["凝血酶原比值", "血浆凝血酶原时间比值"], ["凝血酶原标准化比值", "凝血酶原国际标准化比值"], ["活化部分凝血活酶时间*", "活化部分凝血酶时间*"], "活化部分凝血活酶比值", "凝血酶时间", "凝血酶时间比值", "纤维蛋白原含量", [r"D-二聚体\(sysmex\)", "D-二聚体测定*"], [r"纤维蛋白\(原\)降解产物", "纤维蛋白原降解产物"], "凝血酶生成时间","凝血速率","血小板功能"], 'test_result_col_name':"result_str"}, # 肝功 # (5):平诊肝功十四项+平诊电解质八项+平诊肾功七项;(20)急诊肝功十二项_急诊肾功五项[复]_急诊电解质七项[复];东院区:(108):传染性指标检测八项 - {'test_rptunitid':[str(5), str(20), str(108)], 'test_check_name':'肝功', 'test_check_list':["谷草转氨酶", "谷丙转氨酶", "谷草/谷丙", "碱性磷酸酶", "γ谷氨酰氨转肽酶", "总胆红素", "直接胆红素", "间接胆红素", "胆碱脂酶", "总胆固醇", "总蛋白", "白蛋白", "球蛋白", "白球比", "总胆汁酸", "a-L-岩藻糖苷酶", "前白蛋白", "超氧化物歧化酶", "尿素", "肌酐", "胱抑素C", "葡萄糖", "尿酸", "钾", "钠", "氯", "磷", "钙", "镁", "二氧化碳结合率", "离子间隙", "糖化白蛋白", "视黄醇结合蛋白", "eGFR(CKD-EPI)", "a-l岩藻糖苷酶", "超氧化物歧化酶", "eGFR(MDRD)", r"eGFR单位ml/min/1.73m^2", "8597", "乳酸测定", "乳酸脱氢酶", "羟丁酸脱氢酶", "肌酸激酶", "肌酸激酶同工酶", "载脂蛋白A", "载脂蛋白B", "载脂蛋白E", "脂蛋白(a)", "缺血修饰白蛋白", "甘油三酯", "高密度脂蛋白", "低密度脂蛋白", ], \ - 'test_check_list_all':["谷草转氨酶", "谷丙转氨酶", "谷草/谷丙", "碱性磷酸酶", ["γ谷氨酰氨转肽酶", "γ-谷氨酰转肽酶"], "总胆红素", "直接胆红素", "间接胆红素", "胆碱脂酶", "总胆固醇", "总蛋白", "白蛋白", "球蛋白", "白球比", "总胆汁酸", "a-L-岩藻糖苷酶", "前白蛋白", "超氧化物歧化酶", "尿素", "肌酐", "胱抑素C", "葡萄糖", "尿酸", "钾", "钠", "氯", "磷", "钙", "镁", "二氧化碳结合率", "离子间隙", "糖化白蛋白", "视黄醇结合蛋白", r"eGFR\(CKD-EPI\)", "a-l岩藻糖苷酶", "超氧化物歧化酶", r"eGFR\(MDRD\)", r"eGFR单位ml/min/1.73m\^2", "8597", "乳酸测定", "乳酸脱氢酶", "羟丁酸脱氢酶", "肌酸激酶", "肌酸激酶同工酶", "载脂蛋白A", "载脂蛋白B", "载脂蛋白E", "脂蛋白(a)", "缺血修饰白蛋白", "甘油三酯", "高密度脂蛋白", "低密度脂蛋白", ], 'test_result_col_name':"result_str"}, + {'test_rptunitid':[str(5), str(20), str(108)], 'test_check_name':'肝功', 'test_check_list':["谷草转氨酶", "谷丙转氨酶", "谷草/谷丙", "碱性磷酸酶", "γ谷氨酰氨转肽酶", "总胆红素", "直接胆红素", "间接胆红素", "胆碱酯酶", "总胆固醇", "总蛋白", "白蛋白", "球蛋白", "白球比", "总胆汁酸", "a-L-岩藻糖苷酶", "前白蛋白", "超氧化物歧化酶", "尿素", "肌酐", "胱抑素C", "葡萄糖", "尿酸", "钾", "钠", "氯", "磷", "钙", "镁", "二氧化碳结合率", "离子间隙", "糖化白蛋白", "视黄醇结合蛋白", "eGFR(CKD-EPI)", "a-l岩藻糖苷酶", "超氧化物歧化酶", "eGFR(MDRD)", r"eGFR单位ml/min/1.73m^2", "8597", "乳酸测定", "乳酸脱氢酶", "羟丁酸脱氢酶", "肌酸激酶", "肌酸激酶同工酶", "载脂蛋白A", "载脂蛋白B", "载脂蛋白E", "脂蛋白(a)", "缺血修饰白蛋白", "甘油三酯", "高密度脂蛋白", "低密度脂蛋白", ], \ + 'test_check_list_all':["谷草转氨酶", "谷丙转氨酶", "谷草/谷丙", "碱性磷酸酶", ["γ谷氨酰氨转肽酶", "γ-谷氨酰转肽酶"], "总胆红素", "直接胆红素", "间接胆红素", "胆碱酯酶", "总胆固醇", "总蛋白", "白蛋白", "球蛋白", "白球比", "总胆汁酸", "a-L-岩藻糖苷酶", "前白蛋白", "超氧化物歧化酶", "尿素", "肌酐", "胱抑素C", "葡萄糖", "尿酸", "钾", "钠", "氯", "磷", "钙", "镁", "二氧化碳结合率", "离子间隙", "糖化白蛋白", "视黄醇结合蛋白", r"eGFR\(CKD-EPI\)", "a-l岩藻糖苷酶", "超氧化物歧化酶", r"eGFR\(MDRD\)", r"eGFR单位ml/min/1.73m\^2", "8597", "乳酸测定", "乳酸脱氢酶", "羟丁酸脱氢酶", "肌酸激酶", "肌酸激酶同工酶", "载脂蛋白A", "载脂蛋白B", "载脂蛋白E", "脂蛋白(a)", "缺血修饰白蛋白", "甘油三酯", "高密度脂蛋白", "低密度脂蛋白", ], 'test_result_col_name':"result_str"}, # 各类肿瘤标志物 # (6):各类肿瘤标志物,e.g.肿瘤标志物肺癌六项[复]、肿瘤标志物十项(男);东院区:(108):肿瘤标志物肺癌六项[复] {'test_rptunitid':[str(6), str(108)], 'test_check_name':'各类肿瘤标志物', 'test_check_list':["胃泌素释放肽前体(罗氏)", "鳞状上皮细胞癌抗原(罗氏)", "癌胚抗原", "甲胎蛋白", "糖类抗原125", "糖类抗原199", "糖类抗原724", "细胞角蛋白19片段", "神经元特异性烯醇化酶测定", "总前列腺特异性抗原", "游离前列腺抗原", "游离/总", "糖类抗原153","人附睾蛋白","绝经前 ROMA","绝经后 ROMA","铁蛋白","叶酸","维生素B12","高尔基体蛋白73测定","胃蛋白酶原Ⅰ","胃蛋白酶原Ⅱ","胃蛋白酶原Ⅰ/Ⅱ","降钙素","甲状腺球蛋白"],\ 'test_check_list_all':[r"胃泌素释放肽前体\(罗氏\)", ["鳞状上皮细胞癌相关抗原", r"鳞状上皮细胞癌抗原\(罗氏\)"], "癌胚抗原", "甲胎蛋白", "糖类抗原125", "糖类抗原199", "糖类抗原724", "细胞角蛋白19片段", "神经元特异性烯醇化酶测定", "总前列腺特异性抗原", "游离前列腺抗原", "游离/总", "糖类抗原153","人附睾蛋白","绝经前 ROMA","绝经后 ROMA","铁蛋白","叶酸","维生素B12","高尔基体蛋白73测定","胃蛋白酶原Ⅰ","胃蛋白酶原Ⅱ","胃蛋白酶原Ⅰ/Ⅱ","降钙素","甲状腺球蛋白"], 'test_result_col_name':"result_str"}, diff --git a/app/processors/dynamic_router.py b/app/processors/dynamic_router.py new file mode 100644 index 0000000..76cbe0f --- /dev/null +++ b/app/processors/dynamic_router.py @@ -0,0 +1,81 @@ +import re + + +UNMATCHED_HEADER = "未匹配检测内容" +UNASSIGNED_SHEET_NAME = "未归属检测内容" + + +def match_re(value, pattern): + return re.match(str(pattern), str(value or "")) is not None + + +def clean_result(value): + if value in (None, "", "."): + return "None" + return str(value) + + +def detail_value(row): + for key in ("result_str", "result_ref", "result_txt", "result1"): + value = row.get(key, "") + if value not in (None, ""): + return clean_result(value) + return "" + + +def route_detail_rows(detail_rows, all_tests): + sheet_results = {} + unassigned_items = [] + + for detail_row in detail_rows: + item_name = detail_row.get("rpt_itemname", "") + matched_any = False + + for test in all_tests: + test_result_col_name = test["test_result_col_name"] + test_check_list = test["test_check_list"] + test_check_list_all = test["test_check_list_all"] + + for index, checks in enumerate(test_check_list_all): + if isinstance(checks, str): + checks = [checks] + if any(match_re(item_name, pattern) for pattern in checks): + sheet_name = test["test_check_name"] + result_name = test_check_list[index] + sheet_results.setdefault(sheet_name, {})[result_name] = clean_result( + detail_row.get(test_result_col_name, "") + ) + matched_any = True + break + + if not matched_any: + unassigned_items.append(f"{item_name}:{detail_value(detail_row)}") + + return sheet_results, unassigned_items + + +def append_routed_report( + add_content_to_excel, + result_save_path, + all_tests, + excel_head, + excel_basic, + sheet_results, + unassigned_items, + show_not_match, +): + for test in all_tests: + sheet_name = test["test_check_name"] + if sheet_name not in sheet_results: + continue + + result_values = sheet_results[sheet_name] + row = excel_head + excel_basic + [ + result_values.get(test_item, "Not_Find") for test_item in test["test_check_list"] + ] + if show_not_match: + row += unassigned_items + add_content_to_excel(result_save_path, sheet_name, row) + + if unassigned_items and not sheet_results: + add_content_to_excel(result_save_path, UNASSIGNED_SHEET_NAME, excel_head + excel_basic + unassigned_items) diff --git a/tests/verify_dynamic_routing.py b/tests/verify_dynamic_routing.py new file mode 100644 index 0000000..8b37bad --- /dev/null +++ b/tests/verify_dynamic_routing.py @@ -0,0 +1,181 @@ +#!/usr/bin/env python3 +import csv +import sys +import tempfile +import zipfile +from pathlib import Path + +from openpyxl import Workbook, load_workbook + +ROOT = Path(__file__).resolve().parents[1] +if str(ROOT) not in sys.path: + sys.path.insert(0, str(ROOT)) + +from app.main import _clean_preview_rows +from app.processor import _summarize_workbook, run_processing + + +REPORT_HEADERS = [ + "rptunitid", + "rechkdt", + "reportid", + "reporttype", + "req_reason", + "specimen_code", + "specimen_name", + "rptunitname", + "resultclass", + "pat_diag", + "alter_flag", + "emer_flag", + "sampled_dt", +] + +DETAIL_HEADERS = ["reportid", "rpt_itemname", "result_str", "result_ref"] + + +def write_csv(path, headers, rows): + path.parent.mkdir(parents=True, exist_ok=True) + with path.open("w", encoding="utf-8-sig", newline="") as file: + writer = csv.DictWriter(file, fieldnames=headers) + writer.writeheader() + writer.writerows(rows) + + +def detail_filename(patient_id, row): + return ( + f"{patient_id}_{row['reporttype']}_{row['rptunitid']}_" + f"{row['reportid'].replace('_', '-')}_" + f"{row['rechkdt'].replace(' ', '-').replace(':', '-').replace('_', '-')}.csv" + ) + + +def build_fixture(root): + patient_id = "0000000001" + write_csv(root / "Patients_info.csv", ["pat_name", "pat_no"], [{"pat_name": "验证患者", "pat_no": "1"}]) + + reports = [ + { + "rptunitid": "20", + "rechkdt": "2026-01-01 10:00:00", + "reportid": "20260101-20-1", + "reporttype": "10", + "req_reason": "肝功十项[复]_电解质五项[复]_肾功三项[复]", + "sampled_dt": "2026-01-01 08:00:00", + }, + { + "rptunitid": "20", + "rechkdt": "2026-01-01 10:05:00", + "reportid": "20260101-20-2", + "reporttype": "10", + "req_reason": "B型钠尿肽前体(Pro-BNP)测定", + "sampled_dt": "2026-01-01 08:05:00", + }, + { + "rptunitid": "20", + "rechkdt": "2026-01-01 10:10:00", + "reportid": "20260101-20-3", + "reporttype": "10", + "req_reason": "未知组合检测", + "sampled_dt": "2026-01-01 08:10:00", + }, + ] + for row in reports: + for header in REPORT_HEADERS: + row.setdefault(header, "") + + write_csv(root / "Tests_List" / f"{patient_id}.csv", REPORT_HEADERS, reports) + + details = [ + [{"reportid": reports[0]["reportid"], "rpt_itemname": "谷草转氨酶", "result_str": "21"}], + [{"reportid": reports[1]["reportid"], "rpt_itemname": "B型前脑尿钠肽", "result_str": "57.20"}], + [{"reportid": reports[2]["reportid"], "rpt_itemname": "神秘检测项目", "result_str": "42"}], + ] + detail_dir = root / "Tests_Detail_List" / patient_id + for report, rows in zip(reports, details): + for row in rows: + row.setdefault("result_ref", "") + write_csv(detail_dir / detail_filename(patient_id, report), DETAIL_HEADERS, rows) + + +def zip_dir(source, target): + with zipfile.ZipFile(target, "w", zipfile.ZIP_DEFLATED) as zf: + for path in source.rglob("*"): + if path.is_file(): + zf.write(path, path.relative_to(source).as_posix()) + + +def rows_for(ws): + return [["" if value is None else str(value) for value in row] for row in ws.iter_rows(values_only=True)] + + +def assert_dynamic_routing(workbook_path): + workbook = load_workbook(workbook_path, read_only=True, data_only=True) + try: + liver_rows = rows_for(workbook["肝功"]) + heart_rows = rows_for(workbook["心衰系列"]) + summary_text = "\n".join("\t".join(row) for row in rows_for(workbook["未检测到内容汇总"])) + + liver_data = [row for row in liver_rows[1:] if row and row[0]] + heart_data = [row for row in heart_rows[1:] if row and row[0]] + + assert len(liver_data) == 1, liver_data + assert liver_data[0][3] == "肝功十项[复]_电解质五项[复]_肾功三项[复]" + assert "21" in liver_data[0] + assert "57.20" not in liver_data[0] + + assert len(heart_data) == 1, heart_data + assert heart_data[0][3] == "B型钠尿肽前体(Pro-BNP)测定" + assert "57.20" in heart_data[0] + assert "肝功十项[复]_电解质五项[复]_肾功三项[复]" not in "\n".join("\t".join(row) for row in heart_data) + + assert "神秘检测项目" in summary_text + assert "谷草转氨酶" not in summary_text + assert "B型前脑尿钠肽" not in summary_text + finally: + workbook.close() + + +def assert_preview_can_exceed_200(tmp_path): + assert _clean_preview_rows(500) == 500 + + workbook_path = tmp_path / "preview.xlsx" + workbook = Workbook() + sheet = workbook.active + sheet.title = "预览" + for index in range(250): + sheet.append([index]) + workbook.save(workbook_path) + + summary = _summarize_workbook(workbook_path, tmp_path, 250) + assert len(summary.sheets[0].preview) == 250 + + +def main(): + with tempfile.TemporaryDirectory() as tmp: + tmp_path = Path(tmp) + data_dir = tmp_path / "data" + build_fixture(data_dir) + zip_path = tmp_path / "fixture.zip" + zip_dir(data_dir, zip_path) + + job_dir = tmp_path / "job" + result = run_processing( + zip_path=zip_path, + job_dir=job_dir, + mode="auto", + data_type="auto", + result_name="Verify", + show_not_match=True, + show_all_infos=True, + preview_rows=500, + ) + assert result.mode == "v1" + assert_dynamic_routing(job_dir / "output" / "Verify.xlsx") + assert_preview_can_exceed_200(tmp_path) + + print("dynamic routing verification passed") + + +if __name__ == "__main__": + main()