# -*- coding: utf-8 -*- """ 安全模块 提供文件验证、大小限制等安全功能 """ import base64 from pathlib import Path from typing import Optional from api.exceptions import ( FileTooLargeError, InvalidImageError, UnsupportedFormatError, ) # 安全配置常量 MAX_FILE_SIZE = 10 * 1024 * 1024 # 10MB ALLOWED_EXTENSIONS = {".jpg", ".jpeg", ".png", ".bmp", ".webp", ".tiff", ".tif"} ALLOWED_MIME_TYPES = { "image/jpeg", "image/png", "image/bmp", "image/webp", "image/tiff", } # 图片文件魔数 (Magic Bytes) IMAGE_MAGIC_BYTES = { b"\xff\xd8\xff": "jpeg", # JPEG b"\x89PNG\r\n\x1a\n": "png", # PNG b"BM": "bmp", # BMP b"RIFF": "webp", # WebP (需要进一步检查) b"II*\x00": "tiff", # TIFF (Little Endian) b"MM\x00*": "tiff", # TIFF (Big Endian) } def validate_file_extension(filename: Optional[str]) -> bool: """ 验证文件扩展名 Args: filename: 文件名 Returns: 是否为允许的扩展名 """ if not filename: return False ext = Path(filename).suffix.lower() return ext in ALLOWED_EXTENSIONS def validate_file_size(content: bytes) -> bool: """ 验证文件大小 Args: content: 文件内容 Returns: 是否在允许的大小范围内 """ return len(content) <= MAX_FILE_SIZE def validate_image_magic_bytes(content: bytes) -> bool: """ 验证图片文件魔数 Args: content: 文件内容 Returns: 是否为有效的图片文件 """ if len(content) < 8: return False for magic, _ in IMAGE_MAGIC_BYTES.items(): if content.startswith(magic): # WebP 需要额外检查 if magic == b"RIFF" and len(content) >= 12: if content[8:12] != b"WEBP": continue return True return False def validate_mime_type(content_type: Optional[str]) -> bool: """ 验证 MIME 类型 Args: content_type: MIME 类型 Returns: 是否为允许的 MIME 类型 """ if not content_type: return False # 处理带参数的 MIME 类型,如 "image/jpeg; charset=utf-8" mime = content_type.split(";")[0].strip().lower() return mime in ALLOWED_MIME_TYPES def validate_uploaded_file( content: bytes, filename: Optional[str] = None, content_type: Optional[str] = None, ) -> bytes: """ 综合验证上传的文件 Args: content: 文件内容 filename: 文件名 content_type: MIME 类型 Returns: 验证通过的文件内容 Raises: FileTooLargeError: 文件过大 UnsupportedFormatError: 不支持的格式 InvalidImageError: 无效的图片 """ # 验证文件大小 if not validate_file_size(content): raise FileTooLargeError( f"文件大小超过限制,最大允许 {MAX_FILE_SIZE // 1024 // 1024}MB" ) # 验证扩展名 (如果提供) if filename and not validate_file_extension(filename): raise UnsupportedFormatError( f"不支持的文件格式,允许的格式: {', '.join(ALLOWED_EXTENSIONS)}" ) # 验证 MIME 类型 (如果提供) if content_type and not validate_mime_type(content_type): raise UnsupportedFormatError(f"不支持的 MIME 类型: {content_type}") # 验证文件魔数 if not validate_image_magic_bytes(content): raise InvalidImageError("文件内容不是有效的图片格式") return content def decode_base64_image(base64_string: str) -> bytes: """ 解码 Base64 图片 Args: base64_string: Base64 编码的图片字符串 Returns: 解码后的图片字节 Raises: InvalidImageError: Base64 解码失败或不是有效图片 """ # 移除可能的 Data URL 前缀 if "," in base64_string: base64_string = base64_string.split(",", 1)[1] # 移除空白字符 base64_string = base64_string.strip() try: content = base64.b64decode(base64_string) except Exception as e: raise InvalidImageError(f"Base64 解码失败: {str(e)}") # 验证解码后的内容 return validate_uploaded_file(content)