"""Progress event payloads and Redis publication helpers.""" from __future__ import annotations import json import logging from datetime import datetime, timezone from typing import Any from redis_client import get_redis_client from statuses import TASK_STATUS_CANCELLED, TASK_STATUS_FAILED, TASK_STATUS_SUCCESS logger = logging.getLogger(__name__) PROGRESS_CHANNEL = "seg:progress" def _iso_now() -> str: return datetime.now(timezone.utc).isoformat() def _event_type(task_status: str) -> str: if task_status == TASK_STATUS_SUCCESS: return "complete" if task_status == TASK_STATUS_CANCELLED: return "cancelled" if task_status == TASK_STATUS_FAILED: return "error" return "progress" def task_progress_payload(task: Any) -> dict[str, Any]: """Build the WebSocket payload from a persisted processing task.""" project = getattr(task, "project", None) project_name = getattr(project, "name", None) status = getattr(task, "status", "") updated_at = getattr(task, "updated_at", None) timestamp = updated_at.isoformat() if updated_at is not None else _iso_now() message = getattr(task, "message", None) return { "type": _event_type(status), "taskId": f"task-{task.id}", "task_id": task.id, "project_id": getattr(task, "project_id", None), "projectName": project_name, "filename": project_name, "progress": getattr(task, "progress", 0), "status": message or status, "message": message, "error": getattr(task, "error", None), "timestamp": timestamp, } def publish_progress_event(payload: dict[str, Any]) -> None: """Publish a JSON progress event without failing the worker on Redis errors.""" try: get_redis_client().publish(PROGRESS_CHANNEL, json.dumps(payload, ensure_ascii=False)) except Exception as exc: # noqa: BLE001 logger.warning("Failed to publish progress event: %s", exc) def publish_task_progress_event(task: Any) -> None: """Publish a progress event for a ProcessingTask ORM object.""" publish_progress_event(task_progress_payload(task))