309 lines
9.4 KiB
Python
309 lines
9.4 KiB
Python
import os
|
|
import shutil
|
|
import subprocess
|
|
import sys
|
|
import zipfile
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
|
|
from openpyxl import load_workbook
|
|
|
|
|
|
PROCESSOR_DIR = Path(__file__).resolve().parent / "processors"
|
|
|
|
|
|
class ProcessingError(Exception):
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class SheetSummary:
|
|
name: str
|
|
rows: int
|
|
columns: int
|
|
preview: list[list[str]]
|
|
|
|
|
|
@dataclass
|
|
class ExcelSummary:
|
|
filename: str
|
|
relpath: str
|
|
sheets: list[SheetSummary]
|
|
|
|
|
|
@dataclass
|
|
class ProcessingResult:
|
|
job_id: str
|
|
mode: str
|
|
output_dir: Path
|
|
zip_path: Path
|
|
files: list[ExcelSummary]
|
|
|
|
|
|
def run_processing(
|
|
zip_path: Path,
|
|
job_dir: Path,
|
|
mode: str,
|
|
data_type: str,
|
|
result_name: str,
|
|
show_not_match: bool,
|
|
show_all_infos: bool,
|
|
preview_rows: int = 20,
|
|
) -> ProcessingResult:
|
|
if mode not in {"auto", "v1", "v2"}:
|
|
raise ProcessingError("处理模式不正确。")
|
|
if data_type not in {"pat_no", "zhuyuanhao"}:
|
|
raise ProcessingError("患者编号类型不正确。")
|
|
|
|
extract_dir = job_dir / "input"
|
|
output_dir = job_dir / "output"
|
|
extract_dir.mkdir(parents=True, exist_ok=True)
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
_safe_extract(zip_path, extract_dir)
|
|
data_dir = _find_data_root(extract_dir)
|
|
selected_mode = _detect_mode(data_dir) if mode == "auto" else mode
|
|
|
|
clean_name = _clean_result_name(result_name)
|
|
if selected_mode == "v1":
|
|
result_path = output_dir / f"{clean_name}.xlsx"
|
|
cmd = [
|
|
sys.executable,
|
|
str(PROCESSOR_DIR / "V1-ALL_convert_Lab_Test_data.py"),
|
|
str(data_dir),
|
|
str(result_path),
|
|
str(show_not_match),
|
|
str(show_all_infos),
|
|
data_type,
|
|
]
|
|
elif selected_mode == "v2":
|
|
cmd = [
|
|
sys.executable,
|
|
str(PROCESSOR_DIR / "V2-Every_Pat_File_convert_Lab_Test_data.py"),
|
|
"--file_dir",
|
|
str(data_dir),
|
|
"--result_save_file_name",
|
|
clean_name,
|
|
"--show_not_match",
|
|
str(show_not_match),
|
|
"--show_all_infos",
|
|
str(show_all_infos),
|
|
"--data_type",
|
|
data_type,
|
|
]
|
|
else:
|
|
raise ProcessingError("无法识别数据目录结构,请手动选择 V1 或 V2。")
|
|
|
|
env = os.environ.copy()
|
|
env["PYTHONUTF8"] = "1"
|
|
env["PYTHONIOENCODING"] = "utf-8"
|
|
|
|
completed = subprocess.run(
|
|
cmd,
|
|
cwd=PROCESSOR_DIR,
|
|
env=env,
|
|
text=True,
|
|
encoding="utf-8",
|
|
errors="replace",
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.STDOUT,
|
|
timeout=60 * 30,
|
|
)
|
|
|
|
(job_dir / "process.log").write_text(
|
|
"mode=" + selected_mode + "\n\n" + completed.stdout,
|
|
encoding="utf-8",
|
|
)
|
|
if completed.returncode != 0:
|
|
raise ProcessingError(f"处理脚本退出码 {completed.returncode}。\n{completed.stdout[-4000:]}")
|
|
|
|
if selected_mode == "v2":
|
|
_collect_v2_outputs(data_dir, output_dir)
|
|
|
|
xlsx_files = sorted(output_dir.rglob("*.xlsx"))
|
|
if not xlsx_files:
|
|
raise ProcessingError("处理完成但没有生成 Excel 文件,请检查数据结构。")
|
|
|
|
for xlsx_file in xlsx_files:
|
|
_remove_default_empty_sheet(xlsx_file)
|
|
|
|
result_zip = job_dir / "result.zip"
|
|
_create_result_zip(output_dir, result_zip)
|
|
return ProcessingResult(
|
|
job_id=job_dir.name,
|
|
mode=selected_mode,
|
|
output_dir=output_dir,
|
|
zip_path=result_zip,
|
|
files=[_summarize_workbook(path, output_dir, preview_rows) for path in xlsx_files],
|
|
)
|
|
|
|
|
|
def create_result_zip(job_dir: Path) -> Path:
|
|
output_dir = job_dir / "output"
|
|
result_zip = job_dir / "result.zip"
|
|
if not output_dir.exists():
|
|
raise ProcessingError("结果目录不存在。")
|
|
_create_result_zip(output_dir, result_zip)
|
|
return result_zip
|
|
|
|
|
|
def summarize_job(job_dir: Path, preview_rows: int = 20) -> ProcessingResult:
|
|
output_dir = job_dir / "output"
|
|
if not output_dir.exists():
|
|
raise ProcessingError("结果目录不存在。")
|
|
xlsx_files = sorted(output_dir.rglob("*.xlsx"))
|
|
if not xlsx_files:
|
|
raise ProcessingError("结果文件不存在。")
|
|
result_zip = job_dir / "result.zip"
|
|
mode = _read_mode(job_dir)
|
|
return ProcessingResult(
|
|
job_id=job_dir.name,
|
|
mode=mode,
|
|
output_dir=output_dir,
|
|
zip_path=result_zip,
|
|
files=[_summarize_workbook(path, output_dir, preview_rows) for path in xlsx_files],
|
|
)
|
|
|
|
|
|
def find_output_file(job_dir: Path, relpath: str) -> Path:
|
|
output_dir = (job_dir / "output").resolve()
|
|
target = (output_dir / relpath).resolve()
|
|
if not str(target).startswith(str(output_dir)) or not target.is_file():
|
|
raise ProcessingError("结果文件不存在。")
|
|
if target.suffix.lower() != ".xlsx":
|
|
raise ProcessingError("只能导出 Excel 结果文件。")
|
|
return target
|
|
|
|
|
|
def _read_mode(job_dir: Path) -> str:
|
|
log_path = job_dir / "process.log"
|
|
if not log_path.exists():
|
|
return "unknown"
|
|
first_line = log_path.read_text(encoding="utf-8", errors="replace").splitlines()[0:1]
|
|
if first_line and first_line[0].startswith("mode="):
|
|
return first_line[0].split("=", 1)[1]
|
|
return "unknown"
|
|
|
|
|
|
def _safe_extract(zip_path: Path, target_dir: Path) -> None:
|
|
try:
|
|
with zipfile.ZipFile(zip_path) as zf:
|
|
for member in zf.infolist():
|
|
destination = (target_dir / member.filename).resolve()
|
|
if not str(destination).startswith(str(target_dir.resolve())):
|
|
raise ProcessingError("zip 中包含不安全路径。")
|
|
zf.extractall(target_dir)
|
|
except zipfile.BadZipFile as exc:
|
|
raise ProcessingError("zip 文件无法解压。") from exc
|
|
|
|
|
|
def _find_data_root(extract_dir: Path) -> Path:
|
|
candidates = [extract_dir]
|
|
children = [p for p in extract_dir.iterdir() if p.is_dir()]
|
|
if len(children) == 1 and not any(p.is_file() for p in extract_dir.iterdir()):
|
|
candidates.insert(0, children[0])
|
|
|
|
for candidate in candidates:
|
|
if (candidate / "Patients_info.csv").exists():
|
|
return candidate
|
|
|
|
for path in extract_dir.rglob("Patients_info.csv"):
|
|
return path.parent
|
|
|
|
raise ProcessingError("未找到 Patients_info.csv。")
|
|
|
|
|
|
def _detect_mode(data_dir: Path) -> str:
|
|
if (data_dir / "Tests_List").is_dir() and (data_dir / "Tests_Detail_List").is_dir():
|
|
return "v1"
|
|
|
|
patient_dirs = [p for p in data_dir.iterdir() if p.is_dir()]
|
|
for patient_dir in patient_dirs:
|
|
names = {p.name for p in patient_dir.iterdir()}
|
|
has_summary = any(name.endswith("_检测汇总.csv") for name in names)
|
|
has_detail_dir = any(name.endswith("_具体检测") and (patient_dir / name).is_dir() for name in names)
|
|
if has_summary and has_detail_dir:
|
|
return "v2"
|
|
|
|
raise ProcessingError("无法自动识别 V1/V2 数据结构。")
|
|
|
|
|
|
def _clean_result_name(result_name: str) -> str:
|
|
name = (result_name or "Result").strip()
|
|
if name.lower().endswith(".xlsx"):
|
|
name = name[:-5]
|
|
forbidden = '<>:"/\\|?*'
|
|
name = "".join("_" if ch in forbidden else ch for ch in name).strip(" .")
|
|
return name or "Result"
|
|
|
|
|
|
def _collect_v2_outputs(data_dir: Path, output_dir: Path) -> None:
|
|
v2_dir = output_dir / "V2患者结果"
|
|
v2_dir.mkdir(exist_ok=True)
|
|
for path in data_dir.rglob("*.xlsx"):
|
|
if path.is_file():
|
|
target = v2_dir / path.name
|
|
if target.exists():
|
|
target = v2_dir / f"{path.parent.name}_{path.name}"
|
|
shutil.copy2(path, target)
|
|
|
|
|
|
def _create_result_zip(output_dir: Path, result_zip: Path) -> None:
|
|
with zipfile.ZipFile(result_zip, "w", compression=zipfile.ZIP_DEFLATED) as zf:
|
|
for path in sorted(output_dir.rglob("*.xlsx")):
|
|
if path.is_file():
|
|
zf.write(path, path.relative_to(output_dir))
|
|
|
|
|
|
def _remove_default_empty_sheet(path: Path) -> None:
|
|
workbook = load_workbook(path)
|
|
try:
|
|
if "Sheet" in workbook.sheetnames and len(workbook.sheetnames) > 1:
|
|
sheet = workbook["Sheet"]
|
|
if _is_empty_sheet(sheet):
|
|
workbook.remove(sheet)
|
|
workbook.save(path)
|
|
finally:
|
|
workbook.close()
|
|
|
|
|
|
def _is_empty_sheet(sheet) -> bool:
|
|
for row in sheet.iter_rows(values_only=True):
|
|
for value in row:
|
|
if value not in (None, ""):
|
|
return False
|
|
return True
|
|
|
|
|
|
def _summarize_workbook(path: Path, output_dir: Path, preview_rows: int) -> ExcelSummary:
|
|
sheets: list[SheetSummary] = []
|
|
workbook = load_workbook(path, read_only=True, data_only=True)
|
|
try:
|
|
for sheet in workbook.worksheets:
|
|
preview: list[list[str]] = []
|
|
for row in sheet.iter_rows(max_row=max(2, min(preview_rows, 200)), values_only=True):
|
|
preview.append([_cell_to_text(value) for value in row])
|
|
sheets.append(
|
|
SheetSummary(
|
|
name=sheet.title,
|
|
rows=sheet.max_row or 0,
|
|
columns=sheet.max_column or 0,
|
|
preview=preview,
|
|
)
|
|
)
|
|
finally:
|
|
workbook.close()
|
|
|
|
return ExcelSummary(
|
|
filename=path.name,
|
|
relpath=path.relative_to(output_dir).as_posix(),
|
|
sheets=sheets,
|
|
)
|
|
|
|
|
|
def _cell_to_text(value: object) -> str:
|
|
if value is None:
|
|
return ""
|
|
text = str(value)
|
|
return text if len(text) <= 80 else text[:77] + "..."
|