| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527 |
- """
- OCR识别模块
- 负责人体分割和编号OCR识别
- """
- import cv2
- import numpy as np
- from typing import List, Optional, Tuple, Dict
- from dataclasses import dataclass
- from config import OCR_CONFIG
- @dataclass
- class OCRResult:
- """OCR识别结果"""
- text: str # 识别文本
- confidence: float # 置信度
- bbox: Tuple[int, int, int, int] # 边界框
- location: str = "" # 位置描述 (如: "胸部", "背部")
- @dataclass
- class PersonInfo:
- """人员信息"""
- person_id: int # 人员ID
- person_bbox: Tuple[int, int, int, int] # 人体边界框
- number_text: Optional[str] = None # 编号文本
- number_confidence: float = 0.0 # 编号置信度
- number_location: str = "" # 编号位置
- ocr_results: List[OCRResult] = None # 所有OCR结果
- class PersonSegmenter:
- """
- 人体分割器
- 将人体从背景中分割出来
- """
-
- def __init__(self, use_gpu: bool = True):
- """
- 初始化分割器
- Args:
- use_gpu: 是否使用GPU
- """
- self.use_gpu = use_gpu
- self.segmentor = None
- self._load_model()
-
- def _load_model(self):
- """加载分割模型"""
- try:
- # 使用YOLO11分割模型
- from ultralytics import YOLO
- self.segmentor = YOLO('yolo11n-seg.pt') # YOLO11分割模型
- print("成功加载YOLO11人体分割模型")
- except Exception as e:
- print(f"加载分割模型失败: {e}")
- self.segmentor = None
-
- def segment_person(self, frame: np.ndarray,
- person_bbox: Tuple[int, int, int, int]) -> Optional[np.ndarray]:
- """
- 分割人体
- Args:
- frame: 输入图像
- person_bbox: 人体边界框 (x, y, w, h)
- Returns:
- 人体分割掩码 (或分割后的人体图像)
- """
- if self.segmentor is None:
- return None
-
- x, y, w, h = person_bbox
-
- # 裁剪人体区域
- person_roi = frame[y:y+h, x:x+w]
-
- try:
- # 使用分割模型
- results = self.segmentor(person_roi, classes=[0], verbose=False) # class 0 = person
-
- if results and len(results) > 0 and results[0].masks is not None:
- masks = results[0].masks.data
- if len(masks) > 0:
- # 获取第一个掩码
- mask = masks[0].cpu().numpy()
- mask = cv2.resize(mask, (w, h))
- mask = (mask > 0.5).astype(np.uint8) * 255
- return mask
- except Exception as e:
- print(f"分割错误: {e}")
-
- return None
-
- def extract_person_region(self, frame: np.ndarray,
- person_bbox: Tuple[int, int, int, int],
- padding: float = 0.1) -> Tuple[np.ndarray, Tuple[int, int]]:
- """
- 提取人体区域
- Args:
- frame: 输入图像
- person_bbox: 人体边界框
- padding: 边界填充比例
- Returns:
- (人体区域图像, 原始位置偏移)
- """
- x, y, w, h = person_bbox
-
- # 添加填充
- pad_w = int(w * padding)
- pad_h = int(h * padding)
-
- x1 = max(0, x - pad_w)
- y1 = max(0, y - pad_h)
- x2 = min(frame.shape[1], x + w + pad_w)
- y2 = min(frame.shape[0], y + h + pad_h)
-
- person_region = frame[y1:y2, x1:x2]
- offset = (x1, y1)
-
- return person_region, offset
- class OCRRecognizer:
- """
- OCR识别器
- 使用llama-server API接口进行OCR识别
- """
-
- def __init__(self, config: Dict = None):
- """
- 初始化OCR
- Args:
- config: API配置
- """
- self.config = config or OCR_CONFIG
- self.api_host = self.config.get('api_host', 'localhost')
- self.api_port = self.config.get('api_port', 8111)
- self.model = self.config.get('model', 'PaddleOCR-VL-1.5-GGUF.gguf')
- self.prompt = self.config.get('prompt', '请识别图片中的数字编号,只返回数字,不要其他内容')
- self.temperature = self.config.get('temperature', 0.3)
- self.timeout = self.config.get('timeout', 30)
-
- # 检查API是否可用
- self._check_api()
-
- def _check_api(self):
- """检查API是否可用"""
- try:
- import http.client
- # localhost通常使用HTTP而非HTTPS
- use_https = self.api_host not in ['localhost', '127.0.0.1']
- conn_class = http.client.HTTPSConnection if use_https else http.client.HTTPConnection
- conn = conn_class(self.api_host, self.api_port, timeout=5)
- conn.request("GET", "/")
- res = conn.getresponse()
- conn.close()
- print(f"llama-server API已连接: {self.api_host}:{self.api_port}")
- except Exception as e:
- print(f"连接llama-server失败: {e}")
- print(f"请确保llama-server运行在 {self.api_host}:{self.api_port}")
-
- def _image_to_base64(self, image: np.ndarray) -> str:
- """
- 将图像转换为base64编码
- Args:
- image: 输入图像
- Returns:
- base64编码字符串
- """
- import base64
- _, buffer = cv2.imencode('.jpg', image)
- base64_str = base64.b64encode(buffer).decode('utf-8')
- return f"data:image/jpeg;base64,{base64_str}"
-
- def recognize(self, image: np.ndarray,
- prompt: str = None,
- detect_only_numbers: bool = True,
- max_retries: int = 3) -> List[OCRResult]:
- """
- 使用llama-server API识别图像中的文字
- Args:
- image: 输入图像
- prompt: 自定义提示词
- detect_only_numbers: 是否只检测数字编号
- max_retries: 最大重试次数
- Returns:
- 识别结果列表
- """
- if image is None:
- return []
-
- import http.client
- import json
- import re
-
- results = []
- last_error = None
-
- for attempt in range(max_retries):
- try:
- # 准备图像数据
- image_base64 = self._image_to_base64(image)
-
- # 构建请求
- use_prompt = prompt or self.prompt
-
- payload = {
- "model": self.model,
- "messages": [
- {
- "role": "user",
- "content": [
- {
- "type": "text",
- "text": use_prompt
- },
- {
- "type": "image_url",
- "image_url": {
- "url": image_base64
- }
- }
- ]
- }
- ],
- "temperature": self.temperature,
- "stream": False
- }
-
- headers = {
- 'Content-Type': 'application/json',
- 'Accept': 'application/json',
- }
-
- # 发送请求 - localhost使用HTTP
- use_https = self.api_host not in ['localhost', '127.0.0.1']
- conn_class = http.client.HTTPSConnection if use_https else http.client.HTTPConnection
- conn = conn_class(
- self.api_host,
- self.api_port,
- timeout=self.timeout
- )
-
- conn.request("POST", "/v1/chat/completions",
- json.dumps(payload), headers)
-
- res = conn.getresponse()
- data = res.read()
- conn.close()
-
- # 解析响应
- response = json.loads(data.decode('utf-8'))
-
- if 'choices' in response and len(response['choices']) > 0:
- content = response['choices'][0]['message']['content']
-
- # 从响应中提取数字/编号
- text = content.strip()
-
- # 如果只检测数字,提取数字部分
- if detect_only_numbers:
- # 匹配数字、字母数字组合
- numbers = re.findall(r'[A-Za-z]*\d+[A-Za-z0-9]*', text)
- if numbers:
- text = numbers[0]
-
- # 创建结果
- if text:
- results.append(OCRResult(
- text=text,
- confidence=1.0, # API不返回置信度,设为1.0
- bbox=(0, 0, image.shape[1], image.shape[0])
- ))
- return results # 成功则直接返回
-
- except Exception as e:
- last_error = e
- print(f"OCR API识别错误 (尝试 {attempt + 1}/{max_retries}): {e}")
- if attempt < max_retries - 1:
- import time
- time.sleep(0.5 * (attempt + 1)) # 指数退避
-
- # 所有重试都失败
- if last_error:
- print(f"OCR API识别最终失败: {last_error}")
-
- return results
-
- def recognize_number(self, image: np.ndarray) -> Optional[str]:
- """
- 识别图像中的编号
- Args:
- image: 输入图像
- Returns:
- 编号文本
- """
- results = self.recognize(image, detect_only_numbers=True)
- if results:
- return results[0].text
- return None
- class OCRRecognizerLocal:
- """
- 本地OCR识别器 (备用)
- 使用PaddleOCR或EasyOCR进行识别
- """
-
- def __init__(self, use_gpu: bool = True, languages: List[str] = None):
- """
- 初始化OCR
- Args:
- use_gpu: 是否使用GPU
- languages: 支持的语言列表
- """
- self.use_gpu = use_gpu
- self.languages = languages or ['ch', 'en']
- self.ocr = None
- self._load_ocr()
-
- def _load_ocr(self):
- """加载OCR引擎"""
- try:
- from paddleocr import PaddleOCR
- self.ocr = PaddleOCR(
- use_angle_cls=True,
- lang='ch' if 'ch' in self.languages else 'en',
- use_gpu=self.use_gpu,
- show_log=False
- )
- print("成功加载PaddleOCR")
- except ImportError:
- print("未安装PaddleOCR")
- self.ocr = None
- except Exception as e:
- print(f"加载OCR失败: {e}")
-
- def recognize(self, image: np.ndarray,
- detect_only_numbers: bool = True) -> List[OCRResult]:
- """识别图像中的文字"""
- if self.ocr is None or image is None:
- return []
-
- results = []
-
- try:
- ocr_results = self.ocr.ocr(image, cls=True)
- if ocr_results and len(ocr_results) > 0:
- for line in ocr_results[0]:
- if line is None:
- continue
- bbox_points, (text, conf) = line
-
- if conf < 0.5:
- continue
-
- x1 = int(min(p[0] for p in bbox_points))
- y1 = int(min(p[1] for p in bbox_points))
- x2 = int(max(p[0] for p in bbox_points))
- y2 = int(max(p[1] for p in bbox_points))
-
- results.append(OCRResult(
- text=text,
- confidence=conf,
- bbox=(x1, y1, x2-x1, y2-y1)
- ))
- except Exception as e:
- print(f"OCR识别错误: {e}")
-
- return results
- class NumberDetector:
- """
- 编号检测器
- 在人体图像中检测编号
- 使用llama-server API进行OCR识别
- """
-
- def __init__(self, use_api: bool = True, ocr_config: Dict = None):
- """
- 初始化检测器
- Args:
- use_api: 是否使用API进行OCR
- ocr_config: OCR配置
- """
- self.segmenter = PersonSegmenter(use_gpu=False)
-
- # 使用API OCR或本地OCR
- if use_api:
- self.ocr = OCRRecognizer(ocr_config)
- print("使用llama-server API进行OCR识别")
- else:
- self.ocr = OCRRecognizerLocal()
- print("使用本地OCR进行识别")
-
- # 编号可能出现的区域 (相对于人体边界框的比例)
- self.search_regions = [
- {'name': '胸部', 'y_ratio': (0.2, 0.5), 'x_ratio': (0.2, 0.8)},
- {'name': '腹部', 'y_ratio': (0.5, 0.8), 'x_ratio': (0.2, 0.8)},
- {'name': '背部上方', 'y_ratio': (0.1, 0.4), 'x_ratio': (0.1, 0.9)},
- ]
-
- def detect_number(self, frame: np.ndarray,
- person_bbox: Tuple[int, int, int, int]) -> PersonInfo:
- """
- 检测人体编号
- Args:
- frame: 输入图像
- person_bbox: 人体边界框
- Returns:
- 人员信息
- """
- x, y, w, h = person_bbox
-
- # 提取人体区域
- person_region, offset = self.segmenter.extract_person_region(
- frame, person_bbox
- )
-
- person_info = PersonInfo(
- person_id=-1,
- person_bbox=person_bbox,
- ocr_results=[]
- )
-
- # 在不同区域搜索编号
- best_result = None
- best_confidence = 0
-
- for region in self.search_regions:
- # 计算搜索区域
- y1 = int(h * region['y_ratio'][0])
- y2 = int(h * region['y_ratio'][1])
- x1 = int(w * region['x_ratio'][0])
- x2 = int(w * region['x_ratio'][1])
-
- # 确保在图像范围内
- y1 = max(0, min(y1, person_region.shape[0]))
- y2 = max(0, min(y2, person_region.shape[0]))
- x1 = max(0, min(x1, person_region.shape[1]))
- x2 = max(0, min(x2, person_region.shape[1]))
-
- if y2 <= y1 or x2 <= x1:
- continue
-
- # 裁剪区域
- roi = person_region[y1:y2, x1:x2]
-
- # OCR识别
- ocr_results = self.ocr.recognize(roi)
-
- for result in ocr_results:
- # 调整坐标到原始图像坐标系
- adjusted_bbox = (
- result.bbox[0] + x1 + offset[0],
- result.bbox[1] + y1 + offset[1],
- result.bbox[2],
- result.bbox[3]
- )
-
- result.bbox = adjusted_bbox
- result.location = region['name']
-
- person_info.ocr_results.append(result)
-
- # 更新最佳结果
- if result.confidence > best_confidence:
- best_confidence = result.confidence
- best_result = result
-
- # 设置最佳结果作为编号
- if best_result:
- person_info.number_text = best_result.text
- person_info.number_confidence = best_result.confidence
- person_info.number_location = best_result.location
-
- return person_info
-
- def detect_numbers_batch(self, frame: np.ndarray,
- person_bboxes: List[Tuple[int, int, int, int]]) -> List[PersonInfo]:
- """
- 批量检测人体编号
- Args:
- frame: 输入图像
- person_bboxes: 人体边界框列表
- Returns:
- 人员信息列表
- """
- results = []
- for i, bbox in enumerate(person_bboxes):
- person_info = self.detect_number(frame, bbox)
- person_info.person_id = i
- results.append(person_info)
- return results
- def preprocess_for_ocr(image: np.ndarray) -> np.ndarray:
- """
- OCR预处理
- Args:
- image: 输入图像
- Returns:
- 预处理后的图像
- """
- if image is None:
- return None
-
- # 转换为灰度图
- if len(image.shape) == 3:
- gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
- else:
- gray = image
-
- # 自适应直方图均衡化
- clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
- enhanced = clahe.apply(gray)
-
- # 降噪
- denoised = cv2.fastNlMeansDenoising(enhanced, None, 10)
-
- # 二值化
- _, binary = cv2.threshold(denoised, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
-
- return binary
|