Compare commits
2 Commits
v2026.05.0
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 4909705a2d | |||
| b55112b457 |
@@ -32,6 +32,7 @@ SORT_ORDERS = {
|
||||
"asc": "\u5347\u5e8f",
|
||||
"desc": "\u964d\u5e8f",
|
||||
}
|
||||
MAX_PREVIEW_ROWS = 10000
|
||||
|
||||
app = FastAPI(title=APP_TITLE)
|
||||
|
||||
@@ -68,7 +69,7 @@ def index() -> str:
|
||||
</div>
|
||||
<div>
|
||||
<label for="preview_rows">\u6bcf\u4e2a\u5de5\u4f5c\u8868\u9884\u89c8\u884c\u6570</label>
|
||||
<input id="preview_rows" name="preview_rows" type="number" min="5" max="200" value="20">
|
||||
<input id="preview_rows" name="preview_rows" type="number" min="5" max="{MAX_PREVIEW_ROWS}" value="20">
|
||||
</div>
|
||||
</div>
|
||||
<button type="submit">\u5f00\u59cb\u5904\u7406</button>
|
||||
@@ -122,7 +123,7 @@ async def process(
|
||||
@app.get("/result/{job_id}", response_class=HTMLResponse)
|
||||
def result_page(
|
||||
job_id: str,
|
||||
preview_rows: int = Query(20, ge=5, le=200),
|
||||
preview_rows: int = Query(20, ge=5, le=MAX_PREVIEW_ROWS),
|
||||
include_basic_sheets: bool = Query(True),
|
||||
include_unmatched_items: bool = Query(True),
|
||||
include_summary_sheet: bool = Query(True),
|
||||
@@ -225,7 +226,7 @@ def _get_job_dir(job_id: str) -> Path:
|
||||
|
||||
|
||||
def _clean_preview_rows(preview_rows: int) -> int:
|
||||
return max(5, min(int(preview_rows or 20), 200))
|
||||
return max(5, min(int(preview_rows or 20), MAX_PREVIEW_ROWS))
|
||||
|
||||
|
||||
def _checked(value: bool) -> str:
|
||||
@@ -282,7 +283,7 @@ def _render_result(
|
||||
</section>
|
||||
<form class="row-control" method="get" action="/result/{html.escape(result.job_id)}">
|
||||
<label for="preview_rows">\u6bcf\u4e2a\u5de5\u4f5c\u8868\u9884\u89c8\u884c\u6570</label>
|
||||
<input id="preview_rows" name="preview_rows" type="number" min="5" max="200" value="{preview_rows}">
|
||||
<input id="preview_rows" name="preview_rows" type="number" min="5" max="{MAX_PREVIEW_ROWS}" value="{preview_rows}">
|
||||
<button type="submit">\u5237\u65b0\u9884\u89c8</button>
|
||||
</form>
|
||||
<section class="hint">\u7ed3\u679c\u4e2d\u201c{UNMATCHED}\u201d\u5217\u4e3a\u989d\u5916\u8f85\u52a9\u4fe1\u606f\uff0c\u7528\u6765\u6807\u660e\u672a\u5f52\u5165\u6807\u51c6\u9879\u76ee\u7684\u68c0\u6d4b\u5185\u5bb9\u3002</section>
|
||||
|
||||
@@ -16,8 +16,10 @@ from openpyxl.styles import Font
|
||||
|
||||
PROCESSOR_DIR = Path(__file__).resolve().parent / "processors"
|
||||
SUMMARY_SHEET_NAME = "\u672a\u68c0\u6d4b\u5230\u5185\u5bb9\u6c47\u603b"
|
||||
UNASSIGNED_SHEET_NAME = "\u672a\u5f52\u5c5e\u68c0\u6d4b\u5185\u5bb9"
|
||||
UNMATCHED_HEADER = "\u672a\u5339\u914d\u68c0\u6d4b\u5185\u5bb9"
|
||||
UNMATCHED_FILL = PatternFill(fill_type="solid", fgColor="FCE4D6")
|
||||
MAX_PREVIEW_ROWS = 10000
|
||||
SORT_FIELDS = {"none", "name", "sample_time", "reason"}
|
||||
SORT_ORDERS = {"asc", "desc"}
|
||||
|
||||
@@ -462,6 +464,7 @@ def _postprocess_workbook(
|
||||
workbook = load_workbook(path)
|
||||
try:
|
||||
_normalize_unmatched_columns(workbook)
|
||||
_deduplicate_regular_rows(workbook)
|
||||
summary_records = _collect_summary_records(workbook)
|
||||
_remove_not_found_rows(workbook)
|
||||
_remove_empty_unmatched_columns(workbook)
|
||||
@@ -553,10 +556,29 @@ def _remove_not_found_rows(workbook) -> None:
|
||||
sheet.delete_rows(row_index, 1)
|
||||
|
||||
|
||||
def _deduplicate_regular_rows(workbook) -> None:
|
||||
for sheet in workbook.worksheets:
|
||||
if sheet.title == SUMMARY_SHEET_NAME or sheet.max_row < 3:
|
||||
continue
|
||||
|
||||
seen = set()
|
||||
for row_index in range(sheet.max_row, 1, -1):
|
||||
values = tuple(
|
||||
_cell_text(sheet.cell(row_index, col).value)
|
||||
for col in range(1, sheet.max_column + 1)
|
||||
)
|
||||
if not any(values):
|
||||
continue
|
||||
if values in seen:
|
||||
sheet.delete_rows(row_index, 1)
|
||||
else:
|
||||
seen.add(values)
|
||||
|
||||
|
||||
def _collect_summary_records(workbook) -> list[dict[str, object]]:
|
||||
records: list[dict[str, object]] = []
|
||||
for sheet in workbook.worksheets:
|
||||
if sheet.title == SUMMARY_SHEET_NAME or sheet.max_row < 2:
|
||||
if sheet.title != UNASSIGNED_SHEET_NAME or sheet.max_row < 2:
|
||||
continue
|
||||
header = [_cell_text(sheet.cell(1, col).value) for col in range(1, sheet.max_column + 1)]
|
||||
unmatched_col = _find_header_index(header, UNMATCHED_HEADER)
|
||||
@@ -570,7 +592,7 @@ def _collect_summary_records(workbook) -> list[dict[str, object]]:
|
||||
_cell_text(sheet.cell(row_index, col).value)
|
||||
for col in range(5, standard_end + 1)
|
||||
]
|
||||
if not _all_standard_values_missing(standard_values):
|
||||
if standard_values and not _all_standard_values_missing(standard_values):
|
||||
continue
|
||||
item_values = _summary_item_values(sheet, row_index, header, unmatched_col, standard_end)
|
||||
if not item_values:
|
||||
@@ -874,7 +896,7 @@ def _summarize_workbook(path: Path, output_dir: Path, preview_rows: int) -> Exce
|
||||
try:
|
||||
for sheet in workbook.worksheets:
|
||||
preview: list[list[str]] = []
|
||||
for row in sheet.iter_rows(max_row=max(2, min(preview_rows, 200)), values_only=True):
|
||||
for row in sheet.iter_rows(max_row=max(2, min(preview_rows, MAX_PREVIEW_ROWS)), values_only=True):
|
||||
preview.append([_cell_to_text(value) for value in row])
|
||||
sheets.append(
|
||||
SheetSummary(
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
import csv, sys, os, copy, re
|
||||
import os.path as osp
|
||||
from openpyxl import Workbook, load_workbook
|
||||
from dynamic_router import UNASSIGNED_SHEET_NAME, UNMATCHED_HEADER, append_routed_report, route_detail_rows
|
||||
|
||||
# 向特定excel的sheet中添加内容
|
||||
workbook = None # 全局变量,初始值为 None
|
||||
@@ -158,8 +159,8 @@ ALL_tests = [# 血细胞 # (1):血细胞分析+五分类;(2):血细胞分析+
|
||||
{'test_rptunitid':[str(3), str(110),str(90)], 'test_check_name':'凝血', 'test_check_list':["凝血酶原时间", "凝血酶原活动度", "血浆凝血酶原时间比值", "凝血酶原国际标准化比值", "活化部分凝血活酶时间", "活化部分凝血活酶比值", "凝血酶时间", "凝血酶时间比值", "纤维蛋白原含量", "D-二聚体测定", "纤维蛋白原降解产物", "凝血酶生成时间","凝血速率","血小板功能"],\
|
||||
'test_check_list_all':["凝血酶原时间*", ["凝血酶原活动度", r"凝血酶原活度\(%\)"], ["凝血酶原比值", "血浆凝血酶原时间比值"], ["凝血酶原标准化比值", "凝血酶原国际标准化比值"], ["活化部分凝血活酶时间*", "活化部分凝血酶时间*"], "活化部分凝血活酶比值", "凝血酶时间", "凝血酶时间比值", "纤维蛋白原含量", [r"D-二聚体\(sysmex\)", "D-二聚体测定*"], [r"纤维蛋白\(原\)降解产物", "纤维蛋白原降解产物"], "凝血酶生成时间","凝血速率","血小板功能"], 'test_result_col_name':"result_str"},
|
||||
# 肝功 # (5):平诊肝功十四项+平诊电解质八项+平诊肾功七项;(20)急诊肝功十二项_急诊肾功五项[复]_急诊电解质七项[复];东院区:(108):传染性指标检测八项
|
||||
{'test_rptunitid':[str(5), str(20), str(108)], 'test_check_name':'肝功', 'test_check_list':["谷草转氨酶", "谷丙转氨酶", "谷草/谷丙", "碱性磷酸酶", "γ谷氨酰氨转肽酶", "总胆红素", "直接胆红素", "间接胆红素", "胆碱脂酶", "总胆固醇", "总蛋白", "白蛋白", "球蛋白", "白球比", "总胆汁酸", "a-L-岩藻糖苷酶", "前白蛋白", "超氧化物歧化酶", "尿素", "肌酐", "胱抑素C", "葡萄糖", "尿酸", "钾", "钠", "氯", "磷", "钙", "镁", "二氧化碳结合率", "离子间隙", "糖化白蛋白", "视黄醇结合蛋白", "eGFR(CKD-EPI)", "a-l岩藻糖苷酶", "超氧化物歧化酶", "eGFR(MDRD)", r"eGFR单位ml/min/1.73m^2", "8597", "乳酸测定", "乳酸脱氢酶", "羟丁酸脱氢酶", "肌酸激酶", "肌酸激酶同工酶", "载脂蛋白A", "载脂蛋白B", "载脂蛋白E", "脂蛋白(a)", "缺血修饰白蛋白", "甘油三酯", "高密度脂蛋白", "低密度脂蛋白", ], \
|
||||
'test_check_list_all':["谷草转氨酶", "谷丙转氨酶", "谷草/谷丙", "碱性磷酸酶", ["γ谷氨酰氨转肽酶", "γ-谷氨酰转肽酶"], "总胆红素", "直接胆红素", "间接胆红素", "胆碱脂酶", "总胆固醇", "总蛋白", "白蛋白", "球蛋白", "白球比", "总胆汁酸", "a-L-岩藻糖苷酶", "前白蛋白", "超氧化物歧化酶", "尿素", "肌酐", "胱抑素C", "葡萄糖", "尿酸", "钾", "钠", "氯", "磷", "钙", "镁", "二氧化碳结合率", "离子间隙", "糖化白蛋白", "视黄醇结合蛋白", r"eGFR\(CKD-EPI\)", "a-l岩藻糖苷酶", "超氧化物歧化酶", r"eGFR\(MDRD\)", r"eGFR单位ml/min/1.73m\^2", "8597", "乳酸测定", "乳酸脱氢酶", "羟丁酸脱氢酶", "肌酸激酶", "肌酸激酶同工酶", "载脂蛋白A", "载脂蛋白B", "载脂蛋白E", "脂蛋白(a)", "缺血修饰白蛋白", "甘油三酯", "高密度脂蛋白", "低密度脂蛋白", ], 'test_result_col_name':"result_str"},
|
||||
{'test_rptunitid':[str(5), str(20), str(108)], 'test_check_name':'肝功', 'test_check_list':["谷草转氨酶", "谷丙转氨酶", "谷草/谷丙", "碱性磷酸酶", "γ谷氨酰氨转肽酶", "总胆红素", "直接胆红素", "间接胆红素", "胆碱酯酶", "总胆固醇", "总蛋白", "白蛋白", "球蛋白", "白球比", "总胆汁酸", "a-L-岩藻糖苷酶", "前白蛋白", "超氧化物歧化酶", "尿素", "肌酐", "胱抑素C", "葡萄糖", "尿酸", "钾", "钠", "氯", "磷", "钙", "镁", "二氧化碳结合率", "离子间隙", "糖化白蛋白", "视黄醇结合蛋白", "eGFR(CKD-EPI)", "a-l岩藻糖苷酶", "超氧化物歧化酶", "eGFR(MDRD)", r"eGFR单位ml/min/1.73m^2", "8597", "乳酸测定", "乳酸脱氢酶", "羟丁酸脱氢酶", "肌酸激酶", "肌酸激酶同工酶", "载脂蛋白A", "载脂蛋白B", "载脂蛋白E", "脂蛋白(a)", "缺血修饰白蛋白", "甘油三酯", "高密度脂蛋白", "低密度脂蛋白", ], \
|
||||
'test_check_list_all':["谷草转氨酶", "谷丙转氨酶", "谷草/谷丙", "碱性磷酸酶", ["γ谷氨酰氨转肽酶", "γ-谷氨酰转肽酶"], "总胆红素", "直接胆红素", "间接胆红素", "胆碱酯酶", "总胆固醇", "总蛋白", "白蛋白", "球蛋白", "白球比", "总胆汁酸", "a-L-岩藻糖苷酶", "前白蛋白", "超氧化物歧化酶", "尿素", "肌酐", "胱抑素C", "葡萄糖", "尿酸", "钾", "钠", "氯", "磷", "钙", "镁", "二氧化碳结合率", "离子间隙", "糖化白蛋白", "视黄醇结合蛋白", r"eGFR\(CKD-EPI\)", "a-l岩藻糖苷酶", "超氧化物歧化酶", r"eGFR\(MDRD\)", r"eGFR单位ml/min/1.73m\^2", "8597", "乳酸测定", "乳酸脱氢酶", "羟丁酸脱氢酶", "肌酸激酶", "肌酸激酶同工酶", "载脂蛋白A", "载脂蛋白B", "载脂蛋白E", "脂蛋白(a)", "缺血修饰白蛋白", "甘油三酯", "高密度脂蛋白", "低密度脂蛋白", ], 'test_result_col_name':"result_str"},
|
||||
# 各类肿瘤标志物 # (6):各类肿瘤标志物,e.g.肿瘤标志物肺癌六项[复]、肿瘤标志物十项(男);东院区:(108):肿瘤标志物肺癌六项[复]
|
||||
{'test_rptunitid':[str(6), str(108)], 'test_check_name':'各类肿瘤标志物', 'test_check_list':["胃泌素释放肽前体(罗氏)", "鳞状上皮细胞癌抗原(罗氏)", "癌胚抗原", "甲胎蛋白", "糖类抗原125", "糖类抗原199", "糖类抗原724", "细胞角蛋白19片段", "神经元特异性烯醇化酶测定", "总前列腺特异性抗原", "游离前列腺抗原", "游离/总", "糖类抗原153","人附睾蛋白","绝经前 ROMA","绝经后 ROMA","铁蛋白","叶酸","维生素B12","高尔基体蛋白73测定","胃蛋白酶原Ⅰ","胃蛋白酶原Ⅱ","胃蛋白酶原Ⅰ/Ⅱ","降钙素","甲状腺球蛋白"],\
|
||||
'test_check_list_all':[r"胃泌素释放肽前体\(罗氏\)", ["鳞状上皮细胞癌相关抗原", r"鳞状上皮细胞癌抗原\(罗氏\)"], "癌胚抗原", "甲胎蛋白", "糖类抗原125", "糖类抗原199", "糖类抗原724", "细胞角蛋白19片段", "神经元特异性烯醇化酶测定", "总前列腺特异性抗原", "游离前列腺抗原", "游离/总", "糖类抗原153","人附睾蛋白","绝经前 ROMA","绝经后 ROMA","铁蛋白","叶酸","维生素B12","高尔基体蛋白73测定","胃蛋白酶原Ⅰ","胃蛋白酶原Ⅱ","胃蛋白酶原Ⅰ/Ⅱ","降钙素","甲状腺球蛋白"], 'test_result_col_name':"result_str"},
|
||||
@@ -215,6 +216,7 @@ for test in ALL_tests: # 各项检测信息
|
||||
add_content_to_excel(result_save_pth, test['test_check_name'], front_content + basic_content + test["test_check_list"] + ['未匹配检测内容'])
|
||||
else:
|
||||
add_content_to_excel(result_save_pth, test['test_check_name'], front_content + basic_content + test["test_check_list"])
|
||||
add_content_to_excel(result_save_pth, UNASSIGNED_SHEET_NAME, front_content + basic_content + [UNMATCHED_HEADER])
|
||||
|
||||
|
||||
####### 获取所有患者头部信息(Front_line) #######
|
||||
@@ -251,113 +253,36 @@ for pat_no in pat_no_col:
|
||||
Error(str(pat_no)+"的excel_head为空")
|
||||
exit()
|
||||
print("__处理患者头为__:", excel_head)
|
||||
# 遍历所有rptunitid
|
||||
for test in ALL_tests:
|
||||
# 特定检查rptunitid
|
||||
test_rptunitid = test['test_rptunitid']
|
||||
if not isinstance(test_rptunitid, list) and not isinstance(test_rptunitid, tuple):
|
||||
test_rptunitid = [test_rptunitid]
|
||||
# 检查名
|
||||
test_check_name = test['test_check_name']
|
||||
# 检查项
|
||||
test_check_list_all = test['test_check_list_all']
|
||||
# 检查项_名
|
||||
test_check_list = test['test_check_list']
|
||||
if len(test_check_list_all) != len(test_check_list):
|
||||
Error("test_check_list_all长度和test_check_list不同")
|
||||
print(f"test_check_list_all:{test_check_list_all}\ntest_check_list:{test_check_list}")
|
||||
exit()
|
||||
# 检查结果所在列的列名
|
||||
test_result_col_name = test['test_result_col_name']
|
||||
# 存放检查结果的dict
|
||||
test_check_result = {}
|
||||
|
||||
print("获取患者 ", test_check_name, " 检查结果")
|
||||
|
||||
### 打开patno/pat_no.csv文件获取更详细信息 ###
|
||||
with open(osp.join(patno_dir, pat_no+'.csv'), "r", encoding='utf-8-sig') as file:
|
||||
reader = csv.DictReader(file) # 读取文件
|
||||
# 汇总特定检查行
|
||||
test_rptunitid_rows = [ row for row in reader if row['rptunitid'] in test_rptunitid ]
|
||||
print("患者检查 ", test_check_name, "次数为", len(test_rptunitid_rows))
|
||||
reader = list(csv.DictReader(file))
|
||||
print("患者检查总次数为", len(reader))
|
||||
|
||||
# 遍历所有特定检查行,提取关键信息
|
||||
rows_not_match = [] # 每一行内不匹配的内容
|
||||
for i in range(len(test_rptunitid_rows)):
|
||||
for i, row_1 in enumerate(reader):
|
||||
print("处理患者第", i+1, "次检查")
|
||||
row_1 = test_rptunitid_rows[i]
|
||||
sampled_dt = row_1['sampled_dt'] # 获取时间信息
|
||||
###### basic信息头生成 ######
|
||||
excel_basic = []
|
||||
for item in Basic_line:
|
||||
excel_basic.append(row_1[Basic_line[item]])
|
||||
|
||||
### 打开Patient_detail_infos/pat_no/文件 ###,获取特定检查文件存储路径
|
||||
if data_type == "zhuyuanhao":
|
||||
row_file_path = os.path.join(Patient_detail_infos_dir, pat_no, 'None' + "_" + row_1['reporttype'] + "_" + row_1['rptunitid'] + "_" + row_1['reportid'].replace("_","-") + "_" + row_1['rechkdt'].replace(" ","-").replace(":","-").replace("_","-") +".csv")
|
||||
elif data_type == "pat_no":
|
||||
row_file_path = os.path.join(Patient_detail_infos_dir, pat_no, pat_no + "_" + row_1['reporttype'] + "_" + row_1['rptunitid'] + "_" + row_1['reportid'].replace("_","-") + "_" + row_1['rechkdt'].replace(" ","-").replace(":","-").replace("_","-") +".csv")
|
||||
with open(row_file_path, "r") as row_file:
|
||||
row_reader_ = csv.DictReader(row_file) # 读取文件,迭代器只能读取一次
|
||||
row_reader = []
|
||||
for r in row_reader_:
|
||||
row_reader.append(r)
|
||||
# 寻找所有检查项的值
|
||||
match = False # 检测内容中是否含有检测项目
|
||||
rows_not_match = [] # 每一行内不匹配的内容
|
||||
# 遍历所有行,查看对应检查结果
|
||||
for row_2 in row_reader:
|
||||
row_test_name_exist = False
|
||||
row_test_name = row_2['rpt_itemname']
|
||||
# 遍历所有待检查项
|
||||
for j in range(len(test_check_list_all)):
|
||||
# 检查项名称
|
||||
test_result_name = test_check_list[j]
|
||||
# 遍历所有完整版检查项目
|
||||
test_checks = test_check_list_all[j]
|
||||
if isinstance(test_checks, str):
|
||||
test_checks = [test_checks]
|
||||
for test_check in test_checks:
|
||||
# print(test_check, row_2['rpt_itemname'],"___")
|
||||
if match_re(row_test_name, test_check):# test_check == row_test_name: # 检测项目属于其中
|
||||
match = True
|
||||
row_test_name_exist = True
|
||||
# temp临时存储变量,如果其为'.',则结果变为'None'
|
||||
temp = row_2[test_result_col_name]
|
||||
if (temp == '.' or temp == '') :
|
||||
temp = 'None'
|
||||
test_check_result[test_result_name] = temp
|
||||
break
|
||||
# 如果结果中存在row_test_name_exist的话,退出循环
|
||||
if row_test_name_exist == True:
|
||||
break
|
||||
# 如果没有row_test_name_exist的话,将信息加入行信息中
|
||||
if row_test_name_exist == False:
|
||||
rows_not_match.append(f"{row_test_name}:{row_2.get(test_result_col_name, '')}")
|
||||
|
||||
# 如果没有寻找到对应test_check_result的话,设置为Not_Find
|
||||
for test_result_name in test_check_list:
|
||||
if not test_result_name in test_check_result:
|
||||
test_check_result[test_result_name] = 'Not_Find'
|
||||
with open(row_file_path, "r", encoding='utf-8-sig') as row_file:
|
||||
detail_rows = list(csv.DictReader(row_file))
|
||||
|
||||
# 如果没有检测到相匹配内容,且不输出所有信息,continue
|
||||
if (match == False and show_all_infos == False):
|
||||
continue
|
||||
# 进行进一步操作 ing...
|
||||
|
||||
|
||||
# 抽取结果格式转换
|
||||
excel_fromat_result = []
|
||||
for test_item in test['test_check_list']:
|
||||
excel_fromat_result.append(test_check_result[test_item])
|
||||
# print("输出结果:", excel_fromat_result)
|
||||
if show_not_match == True:
|
||||
add_content_to_excel(result_save_pth, test['test_check_name'], excel_head + excel_basic + excel_fromat_result + rows_not_match)
|
||||
else:
|
||||
add_content_to_excel(result_save_pth, test['test_check_name'], excel_head + excel_basic + excel_fromat_result)
|
||||
sheet_results, unassigned_items = route_detail_rows(detail_rows, ALL_tests, row_1.get("req_reason", ""))
|
||||
append_routed_report(
|
||||
add_content_to_excel,
|
||||
result_save_pth,
|
||||
ALL_tests,
|
||||
excel_head,
|
||||
excel_basic,
|
||||
sheet_results,
|
||||
unassigned_items,
|
||||
show_not_match,
|
||||
)
|
||||
|
||||
# 每处理一个患者数据,保存相关信息
|
||||
save_excel()
|
||||
|
||||
|
||||
|
||||
|
||||
@@ -4,6 +4,7 @@ import csv, sys, os, copy, re, argparse
|
||||
import os.path as osp
|
||||
from openpyxl import Workbook, load_workbook
|
||||
from V2_Data import Front_line, Basic_line, ALL_tests
|
||||
from dynamic_router import UNASSIGNED_SHEET_NAME, UNMATCHED_HEADER, append_routed_report, route_detail_rows
|
||||
|
||||
# 向特定excel的sheet中添加内容
|
||||
workbook = None # 全局变量,初始值为 None
|
||||
@@ -192,6 +193,7 @@ for pat_no in os.listdir(file_dir): # 遍历 file_dir 下的所有文件和文
|
||||
add_content_to_excel(result_save_pth, test['test_check_name'], front_content + basic_content + test["test_check_list"] + ['未匹配检测内容'])
|
||||
else:
|
||||
add_content_to_excel(result_save_pth, test['test_check_name'], front_content + basic_content + test["test_check_list"])
|
||||
add_content_to_excel(result_save_pth, UNASSIGNED_SHEET_NAME, front_content + basic_content + [UNMATCHED_HEADER])
|
||||
|
||||
# 信息头生成
|
||||
excel_head = []
|
||||
@@ -216,110 +218,35 @@ for pat_no in os.listdir(file_dir): # 遍历 file_dir 下的所有文件和文
|
||||
####### 获取所有患者检测信息(base_line,pat_no/pato_no.csv) #######
|
||||
# 遍历所有患者(pat_no)
|
||||
|
||||
# 遍历所有rptunitid
|
||||
for test in ALL_tests:
|
||||
# 特定检查rptunitid
|
||||
test_rptunitid = test['test_rptunitid']
|
||||
if not isinstance(test_rptunitid, list) and not isinstance(test_rptunitid, tuple):
|
||||
test_rptunitid = [test_rptunitid]
|
||||
# 检查名
|
||||
test_check_name = test['test_check_name']
|
||||
# 检查项
|
||||
test_check_list_all = test['test_check_list_all']
|
||||
# 检查项_名
|
||||
test_check_list = test['test_check_list']
|
||||
if len(test_check_list_all) != len(test_check_list):
|
||||
Error("test_check_list_all长度和test_check_list不同", error_dir=osp.join(pat_file_dir, "Error.txt"))
|
||||
print(f"test_check_list_all:{test_check_list_all}\ntest_check_list:{test_check_list}")
|
||||
exit()
|
||||
# 检查结果所在列的列名
|
||||
test_result_col_name = test['test_result_col_name']
|
||||
# 存放检查结果的dict
|
||||
test_check_result = {}
|
||||
|
||||
print("获取患者 ", test_check_name, " 检查结果")
|
||||
|
||||
### 打开patno/pat_no.csv文件获取更详细信息 ###
|
||||
with open(patno_pth, "r", encoding="utf-8-sig") as file:
|
||||
reader = csv.DictReader(file) # 读取文件
|
||||
# 汇总特定检查行
|
||||
test_rptunitid_rows = [ row for row in reader if row['rptunitid'] in test_rptunitid ]
|
||||
print("患者检查 ", test_check_name, "次数为", len(test_rptunitid_rows))
|
||||
report_rows = list(csv.DictReader(file))
|
||||
print("患者检查总次数为", len(report_rows))
|
||||
|
||||
# 遍历所有特定检查行,提取关键信息
|
||||
rows_not_match = [] # 每一行内不匹配的内容
|
||||
for i in range(len(test_rptunitid_rows)):
|
||||
for i, row_1 in enumerate(report_rows):
|
||||
print("处理患者第", i+1, "次检查")
|
||||
row_1 = test_rptunitid_rows[i]
|
||||
sampled_dt = row_1['sampled_dt'] # 获取时间信息
|
||||
###### basic信息头生成 ######
|
||||
excel_basic = []
|
||||
for item in Basic_line:
|
||||
excel_basic.append(row_1[Basic_line[item]])
|
||||
|
||||
### 打开Patient_detail_infos/pat_no/文件 ###,获取特定检查文件存储路径
|
||||
if data_type == "zhuyuanhao":
|
||||
row_file_path = os.path.join(Patient_detail_infos_dir, 'None' + "_" + row_1['reporttype'] + "_" + row_1['rptunitid'] + "_" + row_1['reportid'].replace("_","-") + "_" + row_1['rechkdt'].replace(" ","-").replace(":","-").replace("_","-") +".csv")
|
||||
elif data_type == "pat_no":
|
||||
row_file_path = os.path.join(Patient_detail_infos_dir, pat_no + "_" + row_1['reporttype'] + "_" + row_1['rptunitid'] + "_" + row_1['reportid'].replace("_","-") + "_" + row_1['rechkdt'].replace(" ","-").replace(":","-").replace("_","-") +".csv")
|
||||
|
||||
with open(row_file_path, "r", encoding="utf-8-sig") as row_file:
|
||||
row_reader_ = csv.DictReader(row_file) # 读取文件,迭代器只能读取一次
|
||||
row_reader = []
|
||||
for r in row_reader_:
|
||||
row_reader.append(r)
|
||||
# 寻找所有检查项的值
|
||||
match = False # 检测内容中是否含有检测项目
|
||||
rows_not_match = [] # 每一行内不匹配的内容
|
||||
# 遍历所有行,查看对应检查结果
|
||||
for row_2 in row_reader:
|
||||
row_test_name_exist = False
|
||||
row_test_name = row_2['rpt_itemname']
|
||||
# 遍历所有待检查项
|
||||
for j in range(len(test_check_list_all)):
|
||||
# 检查项名称
|
||||
test_result_name = test_check_list[j]
|
||||
# 遍历所有完整版检查项目
|
||||
test_checks = test_check_list_all[j]
|
||||
if isinstance(test_checks, str):
|
||||
test_checks = [test_checks]
|
||||
for test_check in test_checks:
|
||||
# print(test_check, row_2['rpt_itemname'],"___")
|
||||
if match_re(row_test_name, test_check):# test_check == row_test_name: # 检测项目属于其中
|
||||
match = True
|
||||
row_test_name_exist = True
|
||||
# temp临时存储变量,如果其为'.',则结果变为'None'
|
||||
temp = row_2[test_result_col_name]
|
||||
if (temp == '.' or temp == '') :
|
||||
temp = 'None'
|
||||
test_check_result[test_result_name] = temp
|
||||
break
|
||||
# 如果结果中存在row_test_name_exist的话,退出循环
|
||||
if row_test_name_exist == True:
|
||||
break
|
||||
# 如果没有row_test_name_exist的话,将信息加入行信息中
|
||||
if row_test_name_exist == False:
|
||||
rows_not_match.append(f"{row_test_name}:{row_2.get(test_result_col_name, '')}")
|
||||
detail_rows = list(csv.DictReader(row_file))
|
||||
|
||||
# 如果没有寻找到对应test_check_result的话,设置为Not_Find
|
||||
for test_result_name in test_check_list:
|
||||
if not test_result_name in test_check_result:
|
||||
test_check_result[test_result_name] = 'Not_Find'
|
||||
|
||||
# 如果没有检测到相匹配内容,且不输出所有信息,continue
|
||||
if (match == False and show_all_infos == False):
|
||||
continue
|
||||
# 进行进一步操作 ing...
|
||||
|
||||
|
||||
# 抽取结果格式转换
|
||||
excel_fromat_result = []
|
||||
for test_item in test['test_check_list']:
|
||||
excel_fromat_result.append(test_check_result[test_item])
|
||||
# print("输出结果:", excel_fromat_result)
|
||||
if show_not_match == True:
|
||||
add_content_to_excel(result_save_pth, test['test_check_name'], excel_head + excel_basic + excel_fromat_result + rows_not_match)
|
||||
else:
|
||||
add_content_to_excel(result_save_pth, test['test_check_name'], excel_head + excel_basic + excel_fromat_result)
|
||||
sheet_results, unassigned_items = route_detail_rows(detail_rows, ALL_tests, row_1.get("req_reason", ""))
|
||||
append_routed_report(
|
||||
add_content_to_excel,
|
||||
result_save_pth,
|
||||
ALL_tests,
|
||||
excel_head,
|
||||
excel_basic,
|
||||
sheet_results,
|
||||
unassigned_items,
|
||||
show_not_match,
|
||||
)
|
||||
|
||||
# 每处理一个患者数据,保存相关信息
|
||||
save_excel()
|
||||
|
||||
@@ -15,8 +15,8 @@ ALL_tests = [# 血细胞 # (1):血细胞分析+五分类;(2):血细胞分析+
|
||||
{'test_rptunitid':[str(3), str(110),str(90)], 'test_check_name':'凝血', 'test_check_list':["凝血酶原时间", "凝血酶原活动度", "血浆凝血酶原时间比值", "凝血酶原国际标准化比值", "活化部分凝血活酶时间", "活化部分凝血活酶比值", "凝血酶时间", "凝血酶时间比值", "纤维蛋白原含量", "D-二聚体测定", "纤维蛋白原降解产物", "凝血酶生成时间","凝血速率","血小板功能"],\
|
||||
'test_check_list_all':["凝血酶原时间*", ["凝血酶原活动度", r"凝血酶原活度\(%\)"], ["凝血酶原比值", "血浆凝血酶原时间比值"], ["凝血酶原标准化比值", "凝血酶原国际标准化比值"], ["活化部分凝血活酶时间*", "活化部分凝血酶时间*"], "活化部分凝血活酶比值", "凝血酶时间", "凝血酶时间比值", "纤维蛋白原含量", [r"D-二聚体\(sysmex\)", "D-二聚体测定*"], [r"纤维蛋白\(原\)降解产物", "纤维蛋白原降解产物"], "凝血酶生成时间","凝血速率","血小板功能"], 'test_result_col_name':"result_str"},
|
||||
# 肝功 # (5):平诊肝功十四项+平诊电解质八项+平诊肾功七项;(20)急诊肝功十二项_急诊肾功五项[复]_急诊电解质七项[复];东院区:(108):传染性指标检测八项
|
||||
{'test_rptunitid':[str(5), str(20), str(108)], 'test_check_name':'肝功', 'test_check_list':["谷草转氨酶", "谷丙转氨酶", "谷草/谷丙", "碱性磷酸酶", "γ谷氨酰氨转肽酶", "总胆红素", "直接胆红素", "间接胆红素", "胆碱脂酶", "总胆固醇", "总蛋白", "白蛋白", "球蛋白", "白球比", "总胆汁酸", "a-L-岩藻糖苷酶", "前白蛋白", "超氧化物歧化酶", "尿素", "肌酐", "胱抑素C", "葡萄糖", "尿酸", "钾", "钠", "氯", "磷", "钙", "镁", "二氧化碳结合率", "离子间隙", "糖化白蛋白", "视黄醇结合蛋白", "eGFR(CKD-EPI)", "a-l岩藻糖苷酶", "超氧化物歧化酶", "eGFR(MDRD)", r"eGFR单位ml/min/1.73m^2", "8597", "乳酸测定", "乳酸脱氢酶", "羟丁酸脱氢酶", "肌酸激酶", "肌酸激酶同工酶", "载脂蛋白A", "载脂蛋白B", "载脂蛋白E", "脂蛋白(a)", "缺血修饰白蛋白", "甘油三酯", "高密度脂蛋白", "低密度脂蛋白", ], \
|
||||
'test_check_list_all':["谷草转氨酶", "谷丙转氨酶", "谷草/谷丙", "碱性磷酸酶", ["γ谷氨酰氨转肽酶", "γ-谷氨酰转肽酶"], "总胆红素", "直接胆红素", "间接胆红素", "胆碱脂酶", "总胆固醇", "总蛋白", "白蛋白", "球蛋白", "白球比", "总胆汁酸", "a-L-岩藻糖苷酶", "前白蛋白", "超氧化物歧化酶", "尿素", "肌酐", "胱抑素C", "葡萄糖", "尿酸", "钾", "钠", "氯", "磷", "钙", "镁", "二氧化碳结合率", "离子间隙", "糖化白蛋白", "视黄醇结合蛋白", r"eGFR\(CKD-EPI\)", "a-l岩藻糖苷酶", "超氧化物歧化酶", r"eGFR\(MDRD\)", r"eGFR单位ml/min/1.73m\^2", "8597", "乳酸测定", "乳酸脱氢酶", "羟丁酸脱氢酶", "肌酸激酶", "肌酸激酶同工酶", "载脂蛋白A", "载脂蛋白B", "载脂蛋白E", "脂蛋白(a)", "缺血修饰白蛋白", "甘油三酯", "高密度脂蛋白", "低密度脂蛋白", ], 'test_result_col_name':"result_str"},
|
||||
{'test_rptunitid':[str(5), str(20), str(108)], 'test_check_name':'肝功', 'test_check_list':["谷草转氨酶", "谷丙转氨酶", "谷草/谷丙", "碱性磷酸酶", "γ谷氨酰氨转肽酶", "总胆红素", "直接胆红素", "间接胆红素", "胆碱酯酶", "总胆固醇", "总蛋白", "白蛋白", "球蛋白", "白球比", "总胆汁酸", "a-L-岩藻糖苷酶", "前白蛋白", "超氧化物歧化酶", "尿素", "肌酐", "胱抑素C", "葡萄糖", "尿酸", "钾", "钠", "氯", "磷", "钙", "镁", "二氧化碳结合率", "离子间隙", "糖化白蛋白", "视黄醇结合蛋白", "eGFR(CKD-EPI)", "a-l岩藻糖苷酶", "超氧化物歧化酶", "eGFR(MDRD)", r"eGFR单位ml/min/1.73m^2", "8597", "乳酸测定", "乳酸脱氢酶", "羟丁酸脱氢酶", "肌酸激酶", "肌酸激酶同工酶", "载脂蛋白A", "载脂蛋白B", "载脂蛋白E", "脂蛋白(a)", "缺血修饰白蛋白", "甘油三酯", "高密度脂蛋白", "低密度脂蛋白", ], \
|
||||
'test_check_list_all':["谷草转氨酶", "谷丙转氨酶", "谷草/谷丙", "碱性磷酸酶", ["γ谷氨酰氨转肽酶", "γ-谷氨酰转肽酶"], "总胆红素", "直接胆红素", "间接胆红素", "胆碱酯酶", "总胆固醇", "总蛋白", "白蛋白", "球蛋白", "白球比", "总胆汁酸", "a-L-岩藻糖苷酶", "前白蛋白", "超氧化物歧化酶", "尿素", "肌酐", "胱抑素C", "葡萄糖", "尿酸", "钾", "钠", "氯", "磷", "钙", "镁", "二氧化碳结合率", "离子间隙", "糖化白蛋白", "视黄醇结合蛋白", r"eGFR\(CKD-EPI\)", "a-l岩藻糖苷酶", "超氧化物歧化酶", r"eGFR\(MDRD\)", r"eGFR单位ml/min/1.73m\^2", "8597", "乳酸测定", "乳酸脱氢酶", "羟丁酸脱氢酶", "肌酸激酶", "肌酸激酶同工酶", "载脂蛋白A", "载脂蛋白B", "载脂蛋白E", "脂蛋白(a)", "缺血修饰白蛋白", "甘油三酯", "高密度脂蛋白", "低密度脂蛋白", ], 'test_result_col_name':"result_str"},
|
||||
# 各类肿瘤标志物 # (6):各类肿瘤标志物,e.g.肿瘤标志物肺癌六项[复]、肿瘤标志物十项(男);东院区:(108):肿瘤标志物肺癌六项[复]
|
||||
{'test_rptunitid':[str(6), str(108)], 'test_check_name':'各类肿瘤标志物', 'test_check_list':["胃泌素释放肽前体(罗氏)", "鳞状上皮细胞癌抗原(罗氏)", "癌胚抗原", "甲胎蛋白", "糖类抗原125", "糖类抗原199", "糖类抗原724", "细胞角蛋白19片段", "神经元特异性烯醇化酶测定", "总前列腺特异性抗原", "游离前列腺抗原", "游离/总", "糖类抗原153","人附睾蛋白","绝经前 ROMA","绝经后 ROMA","铁蛋白","叶酸","维生素B12","高尔基体蛋白73测定","胃蛋白酶原Ⅰ","胃蛋白酶原Ⅱ","胃蛋白酶原Ⅰ/Ⅱ","降钙素","甲状腺球蛋白"],\
|
||||
'test_check_list_all':[r"胃泌素释放肽前体\(罗氏\)", ["鳞状上皮细胞癌相关抗原", r"鳞状上皮细胞癌抗原\(罗氏\)"], "癌胚抗原", "甲胎蛋白", "糖类抗原125", "糖类抗原199", "糖类抗原724", "细胞角蛋白19片段", "神经元特异性烯醇化酶测定", "总前列腺特异性抗原", "游离前列腺抗原", "游离/总", "糖类抗原153","人附睾蛋白","绝经前 ROMA","绝经后 ROMA","铁蛋白","叶酸","维生素B12","高尔基体蛋白73测定","胃蛋白酶原Ⅰ","胃蛋白酶原Ⅱ","胃蛋白酶原Ⅰ/Ⅱ","降钙素","甲状腺球蛋白"], 'test_result_col_name':"result_str"},
|
||||
|
||||
137
app/processors/dynamic_router.py
Normal file
137
app/processors/dynamic_router.py
Normal file
@@ -0,0 +1,137 @@
|
||||
import re
|
||||
|
||||
|
||||
UNMATCHED_HEADER = "未匹配检测内容"
|
||||
UNASSIGNED_SHEET_NAME = "未归属检测内容"
|
||||
|
||||
CATEGORY_REASON_KEYWORDS = {
|
||||
"血细胞": ["血细胞", "血常规"],
|
||||
"凝血": ["凝血"],
|
||||
"肝功": ["肝功", "肾功", "电解质", "葡萄糖", "心肌酶"],
|
||||
"各类肿瘤标志物": ["肿瘤", "标志物", "癌胚", "甲状旁腺", "降钙素", "鳞状细胞"],
|
||||
"七抗": ["七抗", "自身抗体"],
|
||||
"传染指标": ["传染"],
|
||||
"血气分析+生化分析": ["血气"],
|
||||
"感染指标": ["感染", "新冠", "冠状病毒", "结核", "细菌", "病毒", "HPV", "C反应蛋白", "降钙素原"],
|
||||
"基因检测指标": ["基因", "CYP"],
|
||||
"心衰系列": ["心衰", "B型", "BNP", "Pro-BNP", "钠尿肽", "肌钙蛋白"],
|
||||
"普通指标": ["血型", "隐血", "卡式"],
|
||||
"免疫系列": ["甲功", "甲状腺", "促甲状腺", "抗甲状腺"],
|
||||
"特殊指标": ["细胞因子", "白介素", "血管内皮"],
|
||||
"内分泌代谢系列": ["内分泌", "代谢", "儿茶酚胺", "ANCA"],
|
||||
"用药指导": ["用药", "VKORC", "CYP2C9", "ALDH2", "ApoE", "SLCO"],
|
||||
}
|
||||
|
||||
|
||||
def match_re(value, pattern):
|
||||
return re.match(str(pattern), str(value or "")) is not None
|
||||
|
||||
|
||||
def clean_result(value):
|
||||
if value in (None, "", "."):
|
||||
return "None"
|
||||
return str(value)
|
||||
|
||||
|
||||
def detail_value(row):
|
||||
for key in ("result_str", "result_ref", "result_txt", "result1"):
|
||||
value = row.get(key, "")
|
||||
if value not in (None, ""):
|
||||
return clean_result(value)
|
||||
return ""
|
||||
|
||||
|
||||
def route_detail_rows(detail_rows, all_tests, reason=""):
|
||||
sheet_results = {}
|
||||
unassigned_items = []
|
||||
tests_by_name = {test["test_check_name"]: test for test in all_tests}
|
||||
candidates_by_row = []
|
||||
candidate_counts = {}
|
||||
|
||||
for detail_row in detail_rows:
|
||||
candidates = _match_candidates(detail_row, all_tests)
|
||||
candidates_by_row.append((detail_row, candidates))
|
||||
for candidate in candidates:
|
||||
candidate_counts[candidate["sheet_name"]] = candidate_counts.get(candidate["sheet_name"], 0) + 1
|
||||
|
||||
for detail_row, candidates in candidates_by_row:
|
||||
item_name = detail_row.get("rpt_itemname", "")
|
||||
if not candidates:
|
||||
unassigned_items.append(f"{item_name}:{detail_value(detail_row)}")
|
||||
continue
|
||||
|
||||
candidate = _choose_candidate(candidates, reason, candidate_counts)
|
||||
test = tests_by_name[candidate["sheet_name"]]
|
||||
sheet_results.setdefault(candidate["sheet_name"], {})[candidate["result_name"]] = clean_result(
|
||||
detail_row.get(test["test_result_col_name"], "")
|
||||
)
|
||||
|
||||
return sheet_results, unassigned_items
|
||||
|
||||
|
||||
def _match_candidates(detail_row, all_tests):
|
||||
item_name = detail_row.get("rpt_itemname", "")
|
||||
candidates = []
|
||||
|
||||
for test_index, test in enumerate(all_tests):
|
||||
test_check_list = test["test_check_list"]
|
||||
test_check_list_all = test["test_check_list_all"]
|
||||
|
||||
for item_index, checks in enumerate(test_check_list_all):
|
||||
if isinstance(checks, str):
|
||||
checks = [checks]
|
||||
if any(match_re(item_name, pattern) for pattern in checks):
|
||||
candidates.append(
|
||||
{
|
||||
"sheet_name": test["test_check_name"],
|
||||
"result_name": test_check_list[item_index],
|
||||
"test_index": test_index,
|
||||
"item_index": item_index,
|
||||
}
|
||||
)
|
||||
break
|
||||
|
||||
return candidates
|
||||
|
||||
|
||||
def _choose_candidate(candidates, reason, candidate_counts):
|
||||
if len(candidates) == 1:
|
||||
return candidates[0]
|
||||
|
||||
reason = str(reason or "")
|
||||
|
||||
def score(candidate):
|
||||
sheet_name = candidate["sheet_name"]
|
||||
keywords = CATEGORY_REASON_KEYWORDS.get(sheet_name, [])
|
||||
reason_score = 100 if any(keyword and keyword in reason for keyword in keywords) else 0
|
||||
density_score = candidate_counts.get(sheet_name, 0)
|
||||
return (reason_score, density_score, -candidate["test_index"], -candidate["item_index"])
|
||||
|
||||
return max(candidates, key=score)
|
||||
|
||||
|
||||
def append_routed_report(
|
||||
add_content_to_excel,
|
||||
result_save_path,
|
||||
all_tests,
|
||||
excel_head,
|
||||
excel_basic,
|
||||
sheet_results,
|
||||
unassigned_items,
|
||||
show_not_match,
|
||||
):
|
||||
for test in all_tests:
|
||||
sheet_name = test["test_check_name"]
|
||||
if sheet_name not in sheet_results:
|
||||
continue
|
||||
|
||||
result_values = sheet_results[sheet_name]
|
||||
row = excel_head + excel_basic + [
|
||||
result_values.get(test_item, "Not_Find") for test_item in test["test_check_list"]
|
||||
]
|
||||
if show_not_match:
|
||||
row += unassigned_items
|
||||
add_content_to_excel(result_save_path, sheet_name, row)
|
||||
|
||||
if unassigned_items and not sheet_results:
|
||||
add_content_to_excel(result_save_path, UNASSIGNED_SHEET_NAME, excel_head + excel_basic + unassigned_items)
|
||||
209
tests/verify_dynamic_routing.py
Normal file
209
tests/verify_dynamic_routing.py
Normal file
@@ -0,0 +1,209 @@
|
||||
#!/usr/bin/env python3
|
||||
import csv
|
||||
import sys
|
||||
import tempfile
|
||||
import zipfile
|
||||
from pathlib import Path
|
||||
|
||||
from openpyxl import Workbook, load_workbook
|
||||
|
||||
ROOT = Path(__file__).resolve().parents[1]
|
||||
if str(ROOT) not in sys.path:
|
||||
sys.path.insert(0, str(ROOT))
|
||||
|
||||
from app.main import _clean_preview_rows
|
||||
from app.processor import _summarize_workbook, run_processing
|
||||
|
||||
|
||||
REPORT_HEADERS = [
|
||||
"rptunitid",
|
||||
"rechkdt",
|
||||
"reportid",
|
||||
"reporttype",
|
||||
"req_reason",
|
||||
"specimen_code",
|
||||
"specimen_name",
|
||||
"rptunitname",
|
||||
"resultclass",
|
||||
"pat_diag",
|
||||
"alter_flag",
|
||||
"emer_flag",
|
||||
"sampled_dt",
|
||||
]
|
||||
|
||||
DETAIL_HEADERS = ["reportid", "rpt_itemname", "result_str", "result_ref"]
|
||||
|
||||
|
||||
def write_csv(path, headers, rows):
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
with path.open("w", encoding="utf-8-sig", newline="") as file:
|
||||
writer = csv.DictWriter(file, fieldnames=headers)
|
||||
writer.writeheader()
|
||||
writer.writerows(rows)
|
||||
|
||||
|
||||
def detail_filename(patient_id, row):
|
||||
return (
|
||||
f"{patient_id}_{row['reporttype']}_{row['rptunitid']}_"
|
||||
f"{row['reportid'].replace('_', '-')}_"
|
||||
f"{row['rechkdt'].replace(' ', '-').replace(':', '-').replace('_', '-')}.csv"
|
||||
)
|
||||
|
||||
|
||||
def build_fixture(root):
|
||||
patient_id = "0000000001"
|
||||
write_csv(root / "Patients_info.csv", ["pat_name", "pat_no"], [{"pat_name": "验证患者", "pat_no": "1"}])
|
||||
|
||||
reports = [
|
||||
{
|
||||
"rptunitid": "20",
|
||||
"rechkdt": "2026-01-01 10:00:00",
|
||||
"reportid": "20260101-20-1",
|
||||
"reporttype": "10",
|
||||
"req_reason": "肝功十项[复]_电解质五项[复]_肾功三项[复]",
|
||||
"sampled_dt": "2026-01-01 08:00:00",
|
||||
},
|
||||
{
|
||||
"rptunitid": "20",
|
||||
"rechkdt": "2026-01-01 10:05:00",
|
||||
"reportid": "20260101-20-2",
|
||||
"reporttype": "10",
|
||||
"req_reason": "B型钠尿肽前体(Pro-BNP)测定",
|
||||
"sampled_dt": "2026-01-01 08:05:00",
|
||||
},
|
||||
{
|
||||
"rptunitid": "20",
|
||||
"rechkdt": "2026-01-01 10:10:00",
|
||||
"reportid": "20260101-20-3",
|
||||
"reporttype": "10",
|
||||
"req_reason": "未知组合检测",
|
||||
"sampled_dt": "2026-01-01 08:10:00",
|
||||
},
|
||||
{
|
||||
"rptunitid": "161",
|
||||
"rechkdt": "2026-01-01 10:15:00",
|
||||
"reportid": "20260101-161-4",
|
||||
"reporttype": "10",
|
||||
"req_reason": "甲功七项(化学发光法)[复]",
|
||||
"sampled_dt": "2026-01-01 08:15:00",
|
||||
},
|
||||
{
|
||||
"rptunitid": "161",
|
||||
"rechkdt": "2026-01-01 10:15:00",
|
||||
"reportid": "20260101-161-4",
|
||||
"reporttype": "10",
|
||||
"req_reason": "甲功七项(化学发光法)[复]",
|
||||
"sampled_dt": "2026-01-01 08:15:00",
|
||||
},
|
||||
]
|
||||
for row in reports:
|
||||
for header in REPORT_HEADERS:
|
||||
row.setdefault(header, "")
|
||||
|
||||
write_csv(root / "Tests_List" / f"{patient_id}.csv", REPORT_HEADERS, reports)
|
||||
|
||||
details = [
|
||||
[{"reportid": reports[0]["reportid"], "rpt_itemname": "谷草转氨酶", "result_str": "21"}],
|
||||
[{"reportid": reports[1]["reportid"], "rpt_itemname": "B型前脑尿钠肽", "result_str": "57.20"}],
|
||||
[{"reportid": reports[2]["reportid"], "rpt_itemname": "神秘检测项目", "result_str": "42"}],
|
||||
[
|
||||
{"reportid": reports[3]["reportid"], "rpt_itemname": "甲状腺球蛋白", "result_str": "6.02"},
|
||||
],
|
||||
[
|
||||
{"reportid": reports[4]["reportid"], "rpt_itemname": "甲状腺球蛋白", "result_str": "6.02"},
|
||||
],
|
||||
]
|
||||
detail_dir = root / "Tests_Detail_List" / patient_id
|
||||
for report, rows in zip(reports, details):
|
||||
for row in rows:
|
||||
row.setdefault("result_ref", "")
|
||||
write_csv(detail_dir / detail_filename(patient_id, report), DETAIL_HEADERS, rows)
|
||||
|
||||
|
||||
def zip_dir(source, target):
|
||||
with zipfile.ZipFile(target, "w", zipfile.ZIP_DEFLATED) as zf:
|
||||
for path in source.rglob("*"):
|
||||
if path.is_file():
|
||||
zf.write(path, path.relative_to(source).as_posix())
|
||||
|
||||
|
||||
def rows_for(ws):
|
||||
return [["" if value is None else str(value) for value in row] for row in ws.iter_rows(values_only=True)]
|
||||
|
||||
|
||||
def assert_dynamic_routing(workbook_path):
|
||||
workbook = load_workbook(workbook_path, read_only=True, data_only=True)
|
||||
try:
|
||||
liver_rows = rows_for(workbook["肝功"])
|
||||
heart_rows = rows_for(workbook["心衰系列"])
|
||||
immune_rows = rows_for(workbook["免疫系列"])
|
||||
summary_text = "\n".join("\t".join(row) for row in rows_for(workbook["未检测到内容汇总"]))
|
||||
|
||||
liver_data = [row for row in liver_rows[1:] if row and row[0]]
|
||||
heart_data = [row for row in heart_rows[1:] if row and row[0]]
|
||||
tumor_data = [row for row in rows_for(workbook["各类肿瘤标志物"])[1:] if row and row[0]]
|
||||
immune_data = [row for row in immune_rows[1:] if row and row[0]]
|
||||
|
||||
assert len(liver_data) == 1, liver_data
|
||||
assert liver_data[0][3] == "肝功十项[复]_电解质五项[复]_肾功三项[复]"
|
||||
assert "21" in liver_data[0]
|
||||
assert "57.20" not in liver_data[0]
|
||||
|
||||
assert len(heart_data) == 1, heart_data
|
||||
assert heart_data[0][3] == "B型钠尿肽前体(Pro-BNP)测定"
|
||||
assert "57.20" in heart_data[0]
|
||||
assert "肝功十项[复]_电解质五项[复]_肾功三项[复]" not in "\n".join("\t".join(row) for row in heart_data)
|
||||
|
||||
assert "神秘检测项目" in summary_text
|
||||
assert "谷草转氨酶" not in summary_text
|
||||
assert "B型前脑尿钠肽" not in summary_text
|
||||
assert len(tumor_data) == 0, tumor_data
|
||||
assert len(immune_data) == 1, immune_data
|
||||
assert immune_data[0][3] == "甲功七项(化学发光法)[复]"
|
||||
finally:
|
||||
workbook.close()
|
||||
|
||||
|
||||
def assert_preview_can_exceed_200(tmp_path):
|
||||
assert _clean_preview_rows(500) == 500
|
||||
|
||||
workbook_path = tmp_path / "preview.xlsx"
|
||||
workbook = Workbook()
|
||||
sheet = workbook.active
|
||||
sheet.title = "预览"
|
||||
for index in range(250):
|
||||
sheet.append([index])
|
||||
workbook.save(workbook_path)
|
||||
|
||||
summary = _summarize_workbook(workbook_path, tmp_path, 250)
|
||||
assert len(summary.sheets[0].preview) == 250
|
||||
|
||||
|
||||
def main():
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
tmp_path = Path(tmp)
|
||||
data_dir = tmp_path / "data"
|
||||
build_fixture(data_dir)
|
||||
zip_path = tmp_path / "fixture.zip"
|
||||
zip_dir(data_dir, zip_path)
|
||||
|
||||
job_dir = tmp_path / "job"
|
||||
result = run_processing(
|
||||
zip_path=zip_path,
|
||||
job_dir=job_dir,
|
||||
mode="auto",
|
||||
data_type="auto",
|
||||
result_name="Verify",
|
||||
show_not_match=True,
|
||||
show_all_infos=True,
|
||||
preview_rows=500,
|
||||
)
|
||||
assert result.mode == "v1"
|
||||
assert_dynamic_routing(job_dir / "output" / "Verify.xlsx")
|
||||
assert_preview_can_exceed_200(tmp_path)
|
||||
|
||||
print("dynamic routing verification passed")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user