- 新增基于 JWT 当前用户的登录恢复、角色权限、用户管理、审计日志和演示出厂重置后台接口与前端管理页。 - 重串 GT_label 导出和 GT Mask 导入逻辑:导出保留类别真实 maskid,导入仅接受灰度或 RGB 等通道 maskid 图,支持未知 maskid 策略、尺寸最近邻拉伸和导入预览。 - 统一分割结果导出体验:默认当前帧,按项目抽帧顺序和 XhXXmXXsXXXms 时间戳命名 ZIP 与图片,补齐 GT/Pro/Mix/分开 Mask 输出和映射 JSON。 - 调整工作区左侧工具栏:移除创建点/线段入口,新增画笔、橡皮擦及尺寸控制,并按绘制、布尔、导入/AI 工具分组分隔。 - 扩展 Canvas 编辑能力:画笔按语义分类绘制并可自动并入连通选中 mask,橡皮擦对选中区域扣除,优化布尔操作、选区、撤销重做和保存状态联动。 - 优化自动传播时间轴显示:同一蓝色系按传播新旧递进变暗,老传播记录达到阈值后统一旧记录色,并维护范围选择与清空后的历史显示。 - 将 AI 智能分割入口替换为更明确的 AI 元素图标,并同步侧栏、工作区和 AI 页面入口表现。 - 完善模板分类、maskid 工具函数、分类树联动、遮罩透明度、边缘平滑和传播链同步相关前端状态。 - 扩展后端项目、媒体、任务、Dashboard、模板和传播 runner 的用户隔离、任务控制、进度事件与兼容处理。 - 补充前后端测试,覆盖用户管理、GT_label 往返导入导出、GT Mask 校验和预览、画笔/橡皮擦、时间轴传播历史、导出范围、WebSocket 与 API 封装。 - 更新 AGENTS、README 和 doc 文档,记录当前接口契约、实现状态、测试计划、安装说明和 maskid/GT_label 规则。
176 lines
5.7 KiB
Python
176 lines
5.7 KiB
Python
"""Authentication endpoints and dependencies."""
|
|
|
|
from datetime import datetime, timedelta, timezone
|
|
from typing import Any
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException, status
|
|
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
|
|
from jose import JWTError, jwt
|
|
from passlib.context import CryptContext
|
|
from pydantic import BaseModel
|
|
from sqlalchemy.orm import Session
|
|
|
|
from config import settings
|
|
from database import get_db
|
|
from models import AuditLog, User
|
|
from schemas import LoginResponse, UserOut
|
|
|
|
router = APIRouter(prefix="/api/auth", tags=["Auth"])
|
|
password_context = CryptContext(schemes=["pbkdf2_sha256"], deprecated="auto")
|
|
bearer_scheme = HTTPBearer(auto_error=False)
|
|
|
|
|
|
class LoginRequest(BaseModel):
|
|
username: str
|
|
password: str
|
|
|
|
|
|
def hash_password(password: str) -> str:
|
|
"""Hash a plain password for storage."""
|
|
return password_context.hash(password)
|
|
|
|
|
|
def verify_password(password: str, password_hash: str) -> bool:
|
|
"""Verify a plain password against a stored hash."""
|
|
return password_context.verify(password, password_hash)
|
|
|
|
|
|
def create_access_token(user: User, expires_delta: timedelta | None = None) -> str:
|
|
"""Create a signed JWT access token for a user."""
|
|
expire = datetime.now(timezone.utc) + (
|
|
expires_delta or timedelta(minutes=settings.access_token_expire_minutes)
|
|
)
|
|
payload: dict[str, Any] = {
|
|
"sub": str(user.id),
|
|
"username": user.username,
|
|
"role": user.role,
|
|
"exp": expire,
|
|
}
|
|
return jwt.encode(payload, settings.jwt_secret_key, algorithm=settings.jwt_algorithm)
|
|
|
|
|
|
def ensure_default_admin(db: Session) -> User:
|
|
"""Create the default development admin if the user table is empty."""
|
|
existing = db.query(User).filter(User.username == settings.default_admin_username).first()
|
|
if existing:
|
|
return existing
|
|
user = User(
|
|
username=settings.default_admin_username,
|
|
password_hash=hash_password(settings.default_admin_password),
|
|
role="admin",
|
|
is_active=1,
|
|
)
|
|
db.add(user)
|
|
db.commit()
|
|
db.refresh(user)
|
|
return user
|
|
|
|
|
|
def get_current_user(
|
|
credentials: HTTPAuthorizationCredentials | None = Depends(bearer_scheme),
|
|
db: Session = Depends(get_db),
|
|
) -> User:
|
|
"""Resolve and validate the current user from the Bearer token."""
|
|
if credentials is None or credentials.scheme.lower() != "bearer":
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Not authenticated",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
try:
|
|
payload = jwt.decode(
|
|
credentials.credentials,
|
|
settings.jwt_secret_key,
|
|
algorithms=[settings.jwt_algorithm],
|
|
)
|
|
user_id = int(payload.get("sub"))
|
|
except (JWTError, TypeError, ValueError) as exc:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Invalid or expired token",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
) from exc
|
|
|
|
user = db.query(User).filter(User.id == user_id).first()
|
|
if not user or not user.is_active:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Inactive or missing user",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
return user
|
|
|
|
|
|
def require_admin(current_user: User = Depends(get_current_user)) -> User:
|
|
"""Require the current user to have the administrator role."""
|
|
if current_user.role != "admin":
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin permission required")
|
|
return current_user
|
|
|
|
|
|
def require_editor(current_user: User = Depends(get_current_user)) -> User:
|
|
"""Require a user role that can modify segmentation data."""
|
|
if current_user.role not in {"admin", "annotator"}:
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Edit permission required")
|
|
return current_user
|
|
|
|
|
|
def write_audit_log(
|
|
db: Session,
|
|
*,
|
|
actor: User | None,
|
|
action: str,
|
|
target_type: str | None = None,
|
|
target_id: str | int | None = None,
|
|
detail: dict[str, Any] | None = None,
|
|
) -> AuditLog:
|
|
"""Persist a compact audit event."""
|
|
log = AuditLog(
|
|
actor_user_id=actor.id if actor else None,
|
|
action=action,
|
|
target_type=target_type,
|
|
target_id=str(target_id) if target_id is not None else None,
|
|
detail=detail or {},
|
|
)
|
|
db.add(log)
|
|
db.commit()
|
|
db.refresh(log)
|
|
return log
|
|
|
|
|
|
@router.post("/login", response_model=LoginResponse)
|
|
def login(payload: LoginRequest, db: Session = Depends(get_db)) -> dict:
|
|
"""Authenticate a user and return a signed JWT."""
|
|
ensure_default_admin(db)
|
|
user = db.query(User).filter(User.username == payload.username).first()
|
|
if not user or not user.is_active or not verify_password(payload.password, user.password_hash):
|
|
write_audit_log(
|
|
db,
|
|
actor=None,
|
|
action="auth.login_failed",
|
|
target_type="user",
|
|
target_id=payload.username,
|
|
detail={"username": payload.username},
|
|
)
|
|
raise HTTPException(status_code=401, detail="Invalid credentials")
|
|
write_audit_log(
|
|
db,
|
|
actor=user,
|
|
action="auth.login_success",
|
|
target_type="user",
|
|
target_id=user.id,
|
|
detail={"username": user.username},
|
|
)
|
|
return {
|
|
"token": create_access_token(user),
|
|
"token_type": "bearer",
|
|
"username": user.username,
|
|
"user": user,
|
|
}
|
|
|
|
|
|
@router.get("/me", response_model=UserOut)
|
|
def read_current_user(current_user: User = Depends(get_current_user)) -> User:
|
|
"""Return the authenticated user profile."""
|
|
return current_user
|