182 lines
5.7 KiB
Python
182 lines
5.7 KiB
Python
#!/usr/bin/env python3
|
|
import csv
|
|
import sys
|
|
import tempfile
|
|
import zipfile
|
|
from pathlib import Path
|
|
|
|
from openpyxl import Workbook, load_workbook
|
|
|
|
ROOT = Path(__file__).resolve().parents[1]
|
|
if str(ROOT) not in sys.path:
|
|
sys.path.insert(0, str(ROOT))
|
|
|
|
from app.main import _clean_preview_rows
|
|
from app.processor import _summarize_workbook, run_processing
|
|
|
|
|
|
REPORT_HEADERS = [
|
|
"rptunitid",
|
|
"rechkdt",
|
|
"reportid",
|
|
"reporttype",
|
|
"req_reason",
|
|
"specimen_code",
|
|
"specimen_name",
|
|
"rptunitname",
|
|
"resultclass",
|
|
"pat_diag",
|
|
"alter_flag",
|
|
"emer_flag",
|
|
"sampled_dt",
|
|
]
|
|
|
|
DETAIL_HEADERS = ["reportid", "rpt_itemname", "result_str", "result_ref"]
|
|
|
|
|
|
def write_csv(path, headers, rows):
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
with path.open("w", encoding="utf-8-sig", newline="") as file:
|
|
writer = csv.DictWriter(file, fieldnames=headers)
|
|
writer.writeheader()
|
|
writer.writerows(rows)
|
|
|
|
|
|
def detail_filename(patient_id, row):
|
|
return (
|
|
f"{patient_id}_{row['reporttype']}_{row['rptunitid']}_"
|
|
f"{row['reportid'].replace('_', '-')}_"
|
|
f"{row['rechkdt'].replace(' ', '-').replace(':', '-').replace('_', '-')}.csv"
|
|
)
|
|
|
|
|
|
def build_fixture(root):
|
|
patient_id = "0000000001"
|
|
write_csv(root / "Patients_info.csv", ["pat_name", "pat_no"], [{"pat_name": "验证患者", "pat_no": "1"}])
|
|
|
|
reports = [
|
|
{
|
|
"rptunitid": "20",
|
|
"rechkdt": "2026-01-01 10:00:00",
|
|
"reportid": "20260101-20-1",
|
|
"reporttype": "10",
|
|
"req_reason": "肝功十项[复]_电解质五项[复]_肾功三项[复]",
|
|
"sampled_dt": "2026-01-01 08:00:00",
|
|
},
|
|
{
|
|
"rptunitid": "20",
|
|
"rechkdt": "2026-01-01 10:05:00",
|
|
"reportid": "20260101-20-2",
|
|
"reporttype": "10",
|
|
"req_reason": "B型钠尿肽前体(Pro-BNP)测定",
|
|
"sampled_dt": "2026-01-01 08:05:00",
|
|
},
|
|
{
|
|
"rptunitid": "20",
|
|
"rechkdt": "2026-01-01 10:10:00",
|
|
"reportid": "20260101-20-3",
|
|
"reporttype": "10",
|
|
"req_reason": "未知组合检测",
|
|
"sampled_dt": "2026-01-01 08:10:00",
|
|
},
|
|
]
|
|
for row in reports:
|
|
for header in REPORT_HEADERS:
|
|
row.setdefault(header, "")
|
|
|
|
write_csv(root / "Tests_List" / f"{patient_id}.csv", REPORT_HEADERS, reports)
|
|
|
|
details = [
|
|
[{"reportid": reports[0]["reportid"], "rpt_itemname": "谷草转氨酶", "result_str": "21"}],
|
|
[{"reportid": reports[1]["reportid"], "rpt_itemname": "B型前脑尿钠肽", "result_str": "57.20"}],
|
|
[{"reportid": reports[2]["reportid"], "rpt_itemname": "神秘检测项目", "result_str": "42"}],
|
|
]
|
|
detail_dir = root / "Tests_Detail_List" / patient_id
|
|
for report, rows in zip(reports, details):
|
|
for row in rows:
|
|
row.setdefault("result_ref", "")
|
|
write_csv(detail_dir / detail_filename(patient_id, report), DETAIL_HEADERS, rows)
|
|
|
|
|
|
def zip_dir(source, target):
|
|
with zipfile.ZipFile(target, "w", zipfile.ZIP_DEFLATED) as zf:
|
|
for path in source.rglob("*"):
|
|
if path.is_file():
|
|
zf.write(path, path.relative_to(source).as_posix())
|
|
|
|
|
|
def rows_for(ws):
|
|
return [["" if value is None else str(value) for value in row] for row in ws.iter_rows(values_only=True)]
|
|
|
|
|
|
def assert_dynamic_routing(workbook_path):
|
|
workbook = load_workbook(workbook_path, read_only=True, data_only=True)
|
|
try:
|
|
liver_rows = rows_for(workbook["肝功"])
|
|
heart_rows = rows_for(workbook["心衰系列"])
|
|
summary_text = "\n".join("\t".join(row) for row in rows_for(workbook["未检测到内容汇总"]))
|
|
|
|
liver_data = [row for row in liver_rows[1:] if row and row[0]]
|
|
heart_data = [row for row in heart_rows[1:] if row and row[0]]
|
|
|
|
assert len(liver_data) == 1, liver_data
|
|
assert liver_data[0][3] == "肝功十项[复]_电解质五项[复]_肾功三项[复]"
|
|
assert "21" in liver_data[0]
|
|
assert "57.20" not in liver_data[0]
|
|
|
|
assert len(heart_data) == 1, heart_data
|
|
assert heart_data[0][3] == "B型钠尿肽前体(Pro-BNP)测定"
|
|
assert "57.20" in heart_data[0]
|
|
assert "肝功十项[复]_电解质五项[复]_肾功三项[复]" not in "\n".join("\t".join(row) for row in heart_data)
|
|
|
|
assert "神秘检测项目" in summary_text
|
|
assert "谷草转氨酶" not in summary_text
|
|
assert "B型前脑尿钠肽" not in summary_text
|
|
finally:
|
|
workbook.close()
|
|
|
|
|
|
def assert_preview_can_exceed_200(tmp_path):
|
|
assert _clean_preview_rows(500) == 500
|
|
|
|
workbook_path = tmp_path / "preview.xlsx"
|
|
workbook = Workbook()
|
|
sheet = workbook.active
|
|
sheet.title = "预览"
|
|
for index in range(250):
|
|
sheet.append([index])
|
|
workbook.save(workbook_path)
|
|
|
|
summary = _summarize_workbook(workbook_path, tmp_path, 250)
|
|
assert len(summary.sheets[0].preview) == 250
|
|
|
|
|
|
def main():
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
tmp_path = Path(tmp)
|
|
data_dir = tmp_path / "data"
|
|
build_fixture(data_dir)
|
|
zip_path = tmp_path / "fixture.zip"
|
|
zip_dir(data_dir, zip_path)
|
|
|
|
job_dir = tmp_path / "job"
|
|
result = run_processing(
|
|
zip_path=zip_path,
|
|
job_dir=job_dir,
|
|
mode="auto",
|
|
data_type="auto",
|
|
result_name="Verify",
|
|
show_not_match=True,
|
|
show_all_infos=True,
|
|
preview_rows=500,
|
|
)
|
|
assert result.mode == "v1"
|
|
assert_dynamic_routing(job_dir / "output" / "Verify.xlsx")
|
|
assert_preview_can_exceed_200(tmp_path)
|
|
|
|
print("dynamic routing verification passed")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|