You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

405 lines
13 KiB
Python

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# -*- coding: utf-8 -*-
"""
快递单解析模块
将分散的 OCR 文本块合并并解析成结构化的快递单信息
"""
import re
from dataclasses import dataclass, field
from typing import List, Optional, Dict, Any
from .engine import TextBlock
@dataclass
class ExpressInfo:
"""
快递单结构化信息
Attributes:
tracking_number: 运单号
sender_name: 寄件人姓名
sender_phone: 寄件人电话
sender_address: 寄件人地址
receiver_name: 收件人姓名
receiver_phone: 收件人电话
receiver_address: 收件人地址
courier_company: 快递公司
raw_text: 原始合并文本(用于调试)
confidence: 平均置信度
extra_fields: 其他识别到的字段
"""
tracking_number: Optional[str] = None
sender_name: Optional[str] = None
sender_phone: Optional[str] = None
sender_address: Optional[str] = None
receiver_name: Optional[str] = None
receiver_phone: Optional[str] = None
receiver_address: Optional[str] = None
courier_company: Optional[str] = None
raw_text: str = ""
confidence: float = 0.0
extra_fields: Dict[str, str] = field(default_factory=dict)
def to_dict(self) -> Dict[str, Any]:
"""转换为字典格式"""
return {
"tracking_number": self.tracking_number,
"sender": {
"name": self.sender_name,
"phone": self.sender_phone,
"address": self.sender_address
},
"receiver": {
"name": self.receiver_name,
"phone": self.receiver_phone,
"address": self.receiver_address
},
"courier_company": self.courier_company,
"confidence": self.confidence,
"extra_fields": self.extra_fields,
"raw_text": self.raw_text
}
@property
def is_valid(self) -> bool:
"""检查是否包含有效的快递单信息"""
# 至少需要运单号或收件人信息
return bool(self.tracking_number or self.receiver_name or self.receiver_phone)
@dataclass
class TextLine:
"""
合并后的文本行
Attributes:
text: 合并后的文本
blocks: 原始文本块列表
y_center: 行中心 Y 坐标
x_min: 行起始 X 坐标
"""
text: str
blocks: List[TextBlock]
y_center: float
x_min: float
@property
def confidence(self) -> float:
"""计算平均置信度"""
if not self.blocks:
return 0.0
return sum(b.confidence for b in self.blocks) / len(self.blocks)
class ExpressParser:
"""
快递单解析器
将分散的文本块合并成行,并提取结构化信息
"""
# 快递公司关键词
COURIER_KEYWORDS = {
"顺丰": "顺丰速运",
"SF": "顺丰速运",
"圆通": "圆通速递",
"中通": "中通快递",
"韵达": "韵达快递",
"申通": "申通快递",
"极兔": "极兔速递",
"京东": "京东物流",
"JD": "京东物流",
"邮政": "中国邮政",
"EMS": "中国邮政EMS",
"百世": "百世快递",
"德邦": "德邦快递",
"天天": "天天快递",
"宅急送": "宅急送",
}
# 字段关键词模式
FIELD_PATTERNS = {
"tracking_number": [
r"运单号[:]\s*(\w+)",
r"单号[:]\s*(\w+)",
r"快递单号[:]\s*(\w+)",
r"物流单号[:]\s*(\w+)",
r"^(\d{10,20})$", # 纯数字运单号
r"^([A-Z]{2}\d{9,13}[A-Z]{2})$", # 国际快递单号格式
],
"receiver_name": [
r"收件人[:]\s*(.+?)(?:\s|电话|手机|地址|$)",
r"收货人[:]\s*(.+?)(?:\s|电话|手机|地址|$)",
r"收[:]\s*(.+?)(?:\s|电话|手机|地址|$)",
],
"receiver_phone": [
r"收件人.*?电话[:]\s*(\d{11})",
r"收件人.*?手机[:]\s*(\d{11})",
r"收.*?(\d{11})",
r"电话[:]\s*(\d{11})",
r"手机[:]\s*(\d{11})",
r"(?<![0-9])(\d{11})(?![0-9])", # 独立的11位手机号
],
"receiver_address": [
r"收件地址[:]\s*(.+?)(?:寄件|发件|$)",
r"收货地址[:]\s*(.+?)(?:寄件|发件|$)",
r"地址[:]\s*(.+?)(?:寄件|发件|电话|$)",
],
"sender_name": [
r"寄件人[:]\s*(.+?)(?:\s|电话|手机|地址|$)",
r"发件人[:]\s*(.+?)(?:\s|电话|手机|地址|$)",
r"寄[:]\s*(.+?)(?:\s|电话|手机|地址|$)",
],
"sender_phone": [
r"寄件人.*?电话[:]\s*(\d{11})",
r"寄件人.*?手机[:]\s*(\d{11})",
],
"sender_address": [
r"寄件地址[:]\s*(.+?)(?:收件|$)",
r"发件地址[:]\s*(.+?)(?:收件|$)",
],
}
def __init__(
self,
line_merge_threshold: float = 0.6,
horizontal_gap_threshold: float = 2.0
):
"""
初始化解析器
Args:
line_merge_threshold: 行合并阈值(相对于文本高度的比例)
horizontal_gap_threshold: 水平间距阈值(相对于平均字符宽度的比例)
"""
self._line_merge_threshold = line_merge_threshold
self._horizontal_gap_threshold = horizontal_gap_threshold
def parse(self, text_blocks: List[TextBlock]) -> ExpressInfo:
"""
解析文本块列表,提取快递单信息
Args:
text_blocks: OCR 识别的文本块列表
Returns:
结构化的快递单信息
"""
if not text_blocks:
return ExpressInfo()
# 1. 合并文本块为行
lines = self._merge_blocks_to_lines(text_blocks)
# 2. 生成完整文本(用于正则匹配)
full_text = self._lines_to_text(lines)
# 3. 提取结构化信息
info = self._extract_info(full_text, lines)
# 4. 计算平均置信度
info.confidence = sum(b.confidence for b in text_blocks) / len(text_blocks)
info.raw_text = full_text
return info
def _merge_blocks_to_lines(self, blocks: List[TextBlock]) -> List[TextLine]:
"""
将文本块按位置合并为行
基于 Y 坐标将相近的文本块合并到同一行,
然后按 X 坐标排序合并文本
"""
if not blocks:
return []
# 按 Y 坐标排序
sorted_blocks = sorted(blocks, key=lambda b: b.center[1])
lines: List[TextLine] = []
current_line_blocks: List[TextBlock] = [sorted_blocks[0]]
current_y = sorted_blocks[0].center[1]
for block in sorted_blocks[1:]:
block_y = block.center[1]
block_height = block.height
# 判断是否属于同一行Y 坐标差值小于阈值)
threshold = block_height * self._line_merge_threshold
if abs(block_y - current_y) <= threshold:
current_line_blocks.append(block)
else:
# 完成当前行,开始新行
line = self._create_line(current_line_blocks)
lines.append(line)
current_line_blocks = [block]
current_y = block_y
# 处理最后一行
if current_line_blocks:
line = self._create_line(current_line_blocks)
lines.append(line)
return lines
def _create_line(self, blocks: List[TextBlock]) -> TextLine:
"""
从文本块列表创建文本行
按 X 坐标排序,根据间距决定是否添加空格
"""
# 按 X 坐标排序
sorted_blocks = sorted(blocks, key=lambda b: b.center[0])
# 合并文本
text_parts = []
prev_block = None
for block in sorted_blocks:
if prev_block is not None:
# 计算水平间距
prev_right = max(p[0] for p in prev_block.bbox)
curr_left = min(p[0] for p in block.bbox)
gap = curr_left - prev_right
# 计算平均字符宽度
avg_char_width = prev_block.width / max(len(prev_block.text), 1)
# 如果间距较大,添加空格
if gap > avg_char_width * self._horizontal_gap_threshold:
text_parts.append(" ")
text_parts.append(block.text)
prev_block = block
merged_text = "".join(text_parts)
y_center = sum(b.center[1] for b in sorted_blocks) / len(sorted_blocks)
x_min = min(min(p[0] for p in b.bbox) for b in sorted_blocks)
return TextLine(
text=merged_text,
blocks=sorted_blocks,
y_center=y_center,
x_min=x_min
)
def _lines_to_text(self, lines: List[TextLine]) -> str:
"""将文本行列表转换为完整文本"""
return "\n".join(line.text for line in lines)
def _extract_info(self, full_text: str, lines: List[TextLine]) -> ExpressInfo:
"""
从文本中提取快递单信息
Args:
full_text: 完整文本
lines: 文本行列表
Returns:
结构化的快递单信息
"""
info = ExpressInfo()
# 提取快递公司
info.courier_company = self._extract_courier_company(full_text)
# 提取各字段
for field_name, patterns in self.FIELD_PATTERNS.items():
value = self._extract_field(full_text, patterns)
if value:
setattr(info, field_name, value)
# 尝试从上下文推断地址
if not info.receiver_address:
info.receiver_address = self._extract_address_from_context(lines, "")
if not info.sender_address:
info.sender_address = self._extract_address_from_context(lines, "")
return info
def _extract_courier_company(self, text: str) -> Optional[str]:
"""提取快递公司名称"""
text_upper = text.upper()
for keyword, company in self.COURIER_KEYWORDS.items():
if keyword.upper() in text_upper:
return company
return None
def _extract_field(self, text: str, patterns: List[str]) -> Optional[str]:
"""
使用正则表达式列表提取字段值
Args:
text: 待匹配文本
patterns: 正则表达式列表
Returns:
匹配到的字段值,或 None
"""
for pattern in patterns:
match = re.search(pattern, text, re.MULTILINE | re.IGNORECASE)
if match:
value = match.group(1).strip()
# 清理常见的干扰字符
value = re.sub(r'[【】\[\]()]', '', value)
if value:
return value
return None
def _extract_address_from_context(
self,
lines: List[TextLine],
context_keyword: str
) -> Optional[str]:
"""
从上下文中提取地址
查找包含省/市/区/县/街/路等关键词的行
"""
address_keywords = ["", "", "", "", "", "", "", "", "", "", "", ""]
# 查找包含上下文关键词的行索引
context_line_idx = -1
for i, line in enumerate(lines):
if context_keyword in line.text:
context_line_idx = i
break
# 在上下文行附近查找地址
search_range = range(
max(0, context_line_idx),
min(len(lines), context_line_idx + 3 if context_line_idx >= 0 else len(lines))
)
address_parts = []
for i in search_range:
line_text = lines[i].text
# 检查是否包含地址关键词
if any(kw in line_text for kw in address_keywords):
# 清理行首的标签(如 "地址:"
cleaned = re.sub(r'^[^:]*[:]\s*', '', line_text)
if cleaned and cleaned != line_text:
address_parts.append(cleaned)
elif any(kw in line_text for kw in address_keywords[:4]): # 省/市/区/县
address_parts.append(line_text)
if address_parts:
return "".join(address_parts)
return None
def merge_text_blocks(self, text_blocks: List[TextBlock]) -> str:
"""
仅合并文本块,不进行字段提取
用于获取完整的合并文本
Args:
text_blocks: 文本块列表
Returns:
合并后的完整文本
"""
lines = self._merge_blocks_to_lines(text_blocks)
return self._lines_to_text(lines)