import html import shutil import tempfile import uuid from pathlib import Path from urllib.parse import quote from fastapi import FastAPI, File, Form, HTTPException, Query, UploadFile from fastapi.responses import FileResponse, HTMLResponse from .processor import ( ProcessingError, ProcessingResult, create_result_zip, find_output_file, run_processing, summarize_job, ) WORK_ROOT = Path(tempfile.gettempdir()) / "his_sur_data_deal_jobs" WORK_ROOT.mkdir(parents=True, exist_ok=True) APP_TITLE = "\u68c0\u6d4b\u6570\u636e\u5904\u7406" UNMATCHED = "\u672a\u5339\u914d\u68c0\u6d4b\u5185\u5bb9" SORT_OPTIONS = { "name": "\u59d3\u540d", "sample_time": "\u91c7\u6837\u65f6\u95f4", "reason": "\u68c0\u6d4b\u539f\u56e0", } SORT_ORDERS = { "asc": "\u5347\u5e8f", "desc": "\u964d\u5e8f", } app = FastAPI(title=APP_TITLE) @app.get("/", response_class=HTMLResponse) def index() -> str: body = f"""
\u9ed8\u8ba4\u8f93\u51fa\u5168\u90e8\u68c0\u6d4b\u8bb0\u5f55\uff0c\u5e76\u4fdd\u7559\u201c{UNMATCHED}\u201d\u5217\uff1b\u8be5\u5217\u4e3a\u989d\u5916\u8f85\u52a9\u4fe1\u606f\uff0c\u4fbf\u4e8e\u6838\u5bf9\u672a\u5f52\u7c7b\u9879\u76ee\u3002
""" return _page_shell(body) @app.post("/process", response_class=HTMLResponse) async def process( file: UploadFile = File(...), mode: str = Form("auto"), data_type: str = Form("auto"), result_name: str = Form("Result"), preview_rows: int = Form(20), ) -> str: if not file.filename or not file.filename.lower().endswith(".zip"): raise HTTPException(status_code=400, detail="\u8bf7\u4e0a\u4f20 zip \u6587\u4ef6\u3002") job_dir = WORK_ROOT / uuid.uuid4().hex job_dir.mkdir(parents=True, exist_ok=True) upload_path = job_dir / "input.zip" try: with upload_path.open("wb") as out: shutil.copyfileobj(file.file, out) rows = _clean_preview_rows(preview_rows) result = run_processing( zip_path=upload_path, job_dir=job_dir, mode=mode, data_type=data_type, result_name=result_name, show_not_match=True, show_all_infos=True, preview_rows=rows, include_basic_sheets=True, include_unmatched_items=True, include_summary_sheet=True, ) except ProcessingError as exc: raise HTTPException(status_code=400, detail=str(exc)) from exc except Exception as exc: raise HTTPException(status_code=500, detail=f"\u5904\u7406\u5931\u8d25\uff1a{exc}") from exc return _render_result(result, preview_rows=rows) @app.get("/result/{job_id}", response_class=HTMLResponse) def result_page( job_id: str, preview_rows: int = Query(20, ge=5, le=200), include_basic_sheets: bool = Query(True), include_unmatched_items: bool = Query(True), include_summary_sheet: bool = Query(True), sort_by: str = Query("sample_time"), sort_order: str = Query("asc"), ) -> str: job_dir = _get_job_dir(job_id) try: rows = _clean_preview_rows(preview_rows) clean_sort_by = _clean_sort_by(sort_by) clean_sort_order = _clean_sort_order(sort_order) result = summarize_job( job_dir, rows, include_basic_sheets=include_basic_sheets, include_unmatched_items=include_unmatched_items, include_summary_sheet=include_summary_sheet, sort_by=clean_sort_by, sort_order=clean_sort_order, ) except ProcessingError as exc: raise HTTPException(status_code=404, detail=str(exc)) from exc return _render_result( result, preview_rows=rows, include_basic_sheets=include_basic_sheets, include_unmatched_items=include_unmatched_items, include_summary_sheet=include_summary_sheet, sort_by=clean_sort_by, sort_order=clean_sort_order, ) @app.get("/download/all/{job_id}") def download_all( job_id: str, include_basic_sheets: bool = Query(True), include_unmatched_items: bool = Query(True), include_summary_sheet: bool = Query(True), sort_by: str = Query("sample_time"), sort_order: str = Query("asc"), ) -> FileResponse: job_dir = _get_job_dir(job_id) try: result_zip = create_result_zip( job_dir, include_basic_sheets=include_basic_sheets, include_unmatched_items=include_unmatched_items, include_summary_sheet=include_summary_sheet, sort_by=_clean_sort_by(sort_by), sort_order=_clean_sort_order(sort_order), ) except ProcessingError as exc: raise HTTPException(status_code=404, detail=str(exc)) from exc return FileResponse(result_zip, media_type="application/zip", filename="result.zip") @app.get("/download/file/{job_id}") def download_file( job_id: str, path: str = Query(...), include_basic_sheets: bool = Query(True), include_unmatched_items: bool = Query(True), include_summary_sheet: bool = Query(True), sort_by: str = Query("sample_time"), sort_order: str = Query("asc"), ) -> FileResponse: job_dir = _get_job_dir(job_id) try: target = find_output_file( job_dir, path, include_basic_sheets=include_basic_sheets, include_unmatched_items=include_unmatched_items, include_summary_sheet=include_summary_sheet, sort_by=_clean_sort_by(sort_by), sort_order=_clean_sort_order(sort_order), ) except ProcessingError as exc: raise HTTPException(status_code=404, detail=str(exc)) from exc return FileResponse( target, media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", filename=target.name, ) @app.get("/health") def health() -> dict[str, str]: return {"status": "ok"} def _get_job_dir(job_id: str) -> Path: if not job_id.isalnum(): raise HTTPException(status_code=404, detail="\u7ed3\u679c\u4e0d\u5b58\u5728\u3002") job_dir = (WORK_ROOT / job_id).resolve() if not str(job_dir).startswith(str(WORK_ROOT.resolve())) or not job_dir.exists(): raise HTTPException(status_code=404, detail="\u7ed3\u679c\u4e0d\u5b58\u5728\u3002") return job_dir def _clean_preview_rows(preview_rows: int) -> int: return max(5, min(int(preview_rows or 20), 200)) def _checked(value: bool) -> str: return "checked" if value else "" def _selected(value: str, current: str) -> str: return "selected" if value == current else "" def _clean_sort_by(sort_by: str) -> str: return sort_by if sort_by in SORT_OPTIONS else "sample_time" def _clean_sort_order(sort_order: str) -> str: return sort_order if sort_order in SORT_ORDERS else "asc" def _render_result( result: ProcessingResult, preview_rows: int = 20, include_basic_sheets: bool = True, include_unmatched_items: bool = True, include_summary_sheet: bool = True, sort_by: str = "sample_time", sort_order: str = "asc", ) -> str: total_sheets = sum(len(file.sheets) for file in result.files) total_rows = sum(max(sheet.rows - 1, 0) for file in result.files for sheet in file.sheets) file_items = "\n".join(_render_file(file, result.job_id) for file in result.files) body = f"""
\u5904\u7406\u6a21\u5f0f{html.escape(result.mode.upper())}
Excel \u6587\u4ef6{len(result.files)}
\u5de5\u4f5c\u8868{total_sheets}
\u6570\u636e\u884c{total_rows}
\u5185\u5bb9\u9009\u62e9
\u7ed3\u679c\u4e2d\u201c{UNMATCHED}\u201d\u5217\u4e3a\u989d\u5916\u8f85\u52a9\u4fe1\u606f\uff0c\u7528\u6765\u6807\u660e\u672a\u5f52\u5165\u6807\u51c6\u9879\u76ee\u7684\u68c0\u6d4b\u5185\u5bb9\u3002
{file_items}
""" return _page_shell( body, subtitle="\u5904\u7406\u5b8c\u6210\uff0c\u53ef\u5411\u4e0b\u6eda\u52a8\u67e5\u770b\u66f4\u591a\u5de5\u4f5c\u8868\uff0c\u4e5f\u53ef\u8c03\u6574\u9884\u89c8\u884c\u6570\u540e\u5237\u65b0\u3002", top_action=True, ) def _render_file(file, job_id: str) -> str: sheet_items = "\n".join(_render_sheet(sheet) for sheet in file.sheets) file_url = f"/download/file/{html.escape(job_id)}?path={quote(file.relpath)}" return f"""

{html.escape(file.filename)}

{len(file.sheets)} \u4e2a\u5de5\u4f5c\u8868

\u5bfc\u51fa\u6b64 Excel
{sheet_items}
""" def _render_sheet(sheet) -> str: preview = sheet.preview table = '
\u6b64\u5de5\u4f5c\u8868\u6ca1\u6709\u53ef\u9884\u89c8\u7684\u6570\u636e\u3002
' if preview: max_cols = min(max((len(row) for row in preview), default=0), 18) unmatched_start = None if preview and UNMATCHED in preview[0]: unmatched_start = preview[0].index(UNMATCHED) rows = [] for index, row in enumerate(preview): row_class = ' class="group-header"' if _is_summary_group_header(row) else "" cells = [] for col_index, value in enumerate((row + [""] * max_cols)[:max_cols]): tag = "th" if index == 0 else "td" css_class = ' class="extra-col"' if unmatched_start is not None and col_index >= unmatched_start else "" cells.append(f"<{tag}{css_class}>{html.escape(value)}") rows.append(f"" + "".join(cells) + "") table = f'
{"".join(rows)}
' return f"""
{html.escape(sheet.name)} {sheet.rows} \u884c ยท {sheet.columns} \u5217 {table}
""" def _page_shell( body: str, subtitle: str = "\u4e0a\u4f20\u201c\u5f85\u5904\u7406\u68c0\u6d4b\u6570\u636e.zip\u201d\uff0c\u5904\u7406\u5b8c\u6210\u540e\u5728\u7f51\u9875\u4e2d\u67e5\u770b\u7ed3\u679c\u3002", top_action: bool = False, ) -> str: action_html = "" if top_action: action_html = '\u9000\u51fa / \u7ee7\u7eed\u5904\u7406\u65b0\u6587\u4ef6' return f""" {APP_TITLE}

{APP_TITLE}

{html.escape(subtitle)}
{action_html}
{body}
""" def _is_summary_group_header(row: list[str]) -> bool: return len(row) >= 4 and row[0] == "\u59d3\u540d" and row[1] == "\u4f4f\u9662\u53f7" and row[2] == "\u91c7\u6837\u65f6\u95f4" and row[3].startswith("\u68c0\u6d4b\u539f\u56e0")