""" OCR识别模块 负责人体分割和编号OCR识别 """ import cv2 import numpy as np from typing import List, Optional, Tuple, Dict from dataclasses import dataclass from config import OCR_CONFIG, SEGMENTATION_CONFIG @dataclass class OCRResult: """OCR识别结果""" text: str # 识别文本 confidence: float # 置信度 bbox: Tuple[int, int, int, int] # 边界框 location: str = "" # 位置描述 (如: "胸部", "背部") @dataclass class PersonInfo: """人员信息""" person_id: int # 人员ID person_bbox: Tuple[int, int, int, int] # 人体边界框 number_text: Optional[str] = None # 编号文本 number_confidence: float = 0.0 # 编号置信度 number_location: str = "" # 编号位置 ocr_results: List[OCRResult] = None # 所有OCR结果 class PersonSegmenter: """ 人体分割器 - 使用 RKNN YOLOv8 分割模型 将人体从背景中分割出来 """ def __init__(self, use_gpu: bool = True): """ 初始化分割器 Args: use_gpu: 是否使用GPU (RKNN使用NPU,此参数保留用于兼容) """ self.use_gpu = use_gpu self.config = SEGMENTATION_CONFIG self.input_size = self.config.get('input_size', (640, 640)) self.conf_threshold = self.config.get('conf_threshold', 0.5) self.rknn = None self._load_model() def _load_model(self): """加载 RKNN 分割模型""" try: from rknnlite.api import RKNNLite model_path = self.config.get('model_path', '/home/admin/dsh/testrk3588/yolov8n-seg.rknn') self.rknn = RKNNLite() ret = self.rknn.load_rknn(model_path) if ret != 0: print(f"[错误] 加载 RKNN 分割模型失败: {model_path}") self.rknn = None return # 初始化运行时,使用所有NPU核心 ret = self.rknn.init_runtime(core_mask=RKNNLite.NPU_CORE_0_1_2) if ret != 0: print("[错误] 初始化 RKNN 运行时失败") self.rknn = None return print(f"成功加载 RKNN 人体分割模型: {model_path}") except ImportError: print("未安装 rknnlite,无法使用 RKNN 分割模型") self.rknn = None except Exception as e: print(f"加载分割模型失败: {e}") self.rknn = None def _letterbox(self, image: np.ndarray) -> tuple: """Letterbox 预处理,保持宽高比""" h0, w0 = image.shape[:2] ih, iw = self.input_size scale = min(iw / w0, ih / h0) new_w, new_h = int(w0 * scale), int(h0 * scale) pad_w = (iw - new_w) // 2 pad_h = (ih - new_h) // 2 resized = cv2.resize(image, (new_w, new_h)) canvas = np.full((ih, iw, 3), 114, dtype=np.uint8) canvas[pad_h:pad_h+new_h, pad_w:pad_w+new_w] = resized return canvas, scale, pad_w, pad_h, h0, w0 def _postprocess_segmentation(self, outputs, scale, pad_w, pad_h, w0, h0): """ 处理 YOLOv8 分割模型输出 YOLOv8-seg 输出格式: [检测输出, 分割输出] - 检测输出: (1, 116, 8400) - 包含边界框、类别、掩码系数 - 分割输出: (1, 32, 160, 160) - 原型掩码 """ if not outputs or len(outputs) < 2: return None # 解析输出 det_output = outputs[0] # (1, 116, 8400) - 检测输出 seg_output = outputs[1] # (1, 32, 160, 160) - 分割原型 # 处理检测输出 if len(det_output.shape) == 3: det_output = det_output[0] # (116, 8400) # YOLOv8-seg: 前 84 维是检测 (4 box + 80 classes),后 32 维是掩码系数 num_anchors = det_output.shape[1] best_idx = -1 best_conf = 0 # 寻找最佳人体检测 (class 0 = person) for i in range(num_anchors): # 类别概率 (索引 4-84 是80个类别) class_probs = det_output[4:84, i] person_conf = float(class_probs[0]) # class 0 = person if person_conf > self.conf_threshold and person_conf > best_conf: best_conf = person_conf best_idx = i if best_idx < 0: return None # 获取掩码系数 (后32维) mask_coeffs = det_output[84:116, best_idx] # (32,) # 处理分割原型 (1, 32, 160, 160) -> (32, 160, 160) if len(seg_output.shape) == 4: seg_output = seg_output[0] # 计算最终掩码: mask = coeffs @ prototypes # seg_output: (32, 160, 160), mask_coeffs: (32,) mask = np.zeros((160, 160), dtype=np.float32) for i in range(32): mask += mask_coeffs[i] * seg_output[i] # Sigmoid 激活 mask = 1 / (1 + np.exp(-mask)) # 移除 padding 并缩放到原始尺寸 mask = (mask > 0.5).astype(np.uint8) * 255 # 裁剪掉 letterbox 添加的 padding mask_h, mask_w = mask.shape pad_h_mask = int(pad_h * mask_h / self.input_size[0]) # 160/640 = 0.25 pad_w_mask = int(pad_w * mask_w / self.input_size[1]) new_h_mask = int((mask_h - 2 * pad_h_mask)) new_w_mask = int((mask_w - 2 * pad_w_mask)) if new_h_mask > 0 and new_w_mask > 0: mask = mask[pad_h_mask:pad_h_mask+new_h_mask, pad_w_mask:pad_w_mask+new_w_mask] # 缩放到原始 ROI 尺寸 mask = cv2.resize(mask, (w0, h0)) return mask def segment_person(self, frame: np.ndarray, person_bbox: Tuple[int, int, int, int]) -> Optional[np.ndarray]: """ 分割人体 Args: frame: 输入图像 person_bbox: 人体边界框 (x, y, w, h) Returns: 人体分割掩码 """ if self.rknn is None: return None x, y, w, h = person_bbox # 裁剪人体区域 person_roi = frame[y:y+h, x:x+w] if person_roi.size == 0: return None try: # 预处理 canvas, scale, pad_w, pad_h, h0, w0 = self._letterbox(person_roi) # RKNN 输入: NHWC (1, H, W, C), RGB, float32 normalized 0-1 img = canvas[..., ::-1].astype(np.float32) / 255.0 blob = img[None, ...] # (1, 640, 640, 3) # 推理 outputs = self.rknn.inference(inputs=[blob]) # 后处理 mask = self._postprocess_segmentation(outputs, scale, pad_w, pad_h, w0, h0) return mask except Exception as e: print(f"分割错误: {e}") return None def release(self): """释放 RKNN 资源""" if self.rknn is not None: self.rknn.release() self.rknn = None def extract_person_region(self, frame: np.ndarray, person_bbox: Tuple[int, int, int, int], padding: float = 0.1) -> Tuple[np.ndarray, Tuple[int, int]]: """ 提取人体区域 Args: frame: 输入图像 person_bbox: 人体边界框 padding: 边界填充比例 Returns: (人体区域图像, 原始位置偏移) """ x, y, w, h = person_bbox # 添加填充 pad_w = int(w * padding) pad_h = int(h * padding) x1 = max(0, x - pad_w) y1 = max(0, y - pad_h) x2 = min(frame.shape[1], x + w + pad_w) y2 = min(frame.shape[0], y + h + pad_h) person_region = frame[y1:y2, x1:x2] offset = (x1, y1) return person_region, offset class OCRRecognizer: """ OCR识别器 使用llama-server API接口进行OCR识别 """ def __init__(self, config: Dict = None): """ 初始化OCR Args: config: API配置 """ self.config = config or OCR_CONFIG self.api_host = self.config.get('api_host', 'localhost') self.api_port = self.config.get('api_port', 8111) self.model = self.config.get('model', 'PaddleOCR-VL-1.5-GGUF.gguf') self.prompt = self.config.get('prompt', '请识别图片中的数字编号,只返回数字,不要其他内容') self.temperature = self.config.get('temperature', 0.3) self.timeout = self.config.get('timeout', 30) # 检查API是否可用 self._check_api() def _check_api(self): """检查API是否可用""" try: import http.client # localhost通常使用HTTP而非HTTPS use_https = self.api_host not in ['localhost', '127.0.0.1'] conn_class = http.client.HTTPSConnection if use_https else http.client.HTTPConnection conn = conn_class(self.api_host, self.api_port, timeout=5) conn.request("GET", "/") res = conn.getresponse() conn.close() print(f"llama-server API已连接: {self.api_host}:{self.api_port}") except Exception as e: print(f"连接llama-server失败: {e}") print(f"请确保llama-server运行在 {self.api_host}:{self.api_port}") def _image_to_base64(self, image: np.ndarray) -> str: """ 将图像转换为base64编码 Args: image: 输入图像 Returns: base64编码字符串 """ import base64 _, buffer = cv2.imencode('.jpg', image) base64_str = base64.b64encode(buffer).decode('utf-8') return f"data:image/jpeg;base64,{base64_str}" def recognize(self, image: np.ndarray, prompt: str = None, detect_only_numbers: bool = True, max_retries: int = 3) -> List[OCRResult]: """ 使用llama-server API识别图像中的文字 Args: image: 输入图像 prompt: 自定义提示词 detect_only_numbers: 是否只检测数字编号 max_retries: 最大重试次数 Returns: 识别结果列表 """ if image is None: return [] import http.client import json import re results = [] last_error = None for attempt in range(max_retries): try: # 准备图像数据 image_base64 = self._image_to_base64(image) # 构建请求 use_prompt = prompt or self.prompt payload = { "model": self.model, "messages": [ { "role": "user", "content": [ { "type": "text", "text": use_prompt }, { "type": "image_url", "image_url": { "url": image_base64 } } ] } ], "temperature": self.temperature, "stream": False } headers = { 'Content-Type': 'application/json', 'Accept': 'application/json', } # 发送请求 - localhost使用HTTP use_https = self.api_host not in ['localhost', '127.0.0.1'] conn_class = http.client.HTTPSConnection if use_https else http.client.HTTPConnection conn = conn_class( self.api_host, self.api_port, timeout=self.timeout ) conn.request("POST", "/v1/chat/completions", json.dumps(payload), headers) res = conn.getresponse() data = res.read() conn.close() # 解析响应 response = json.loads(data.decode('utf-8')) if 'choices' in response and len(response['choices']) > 0: content = response['choices'][0]['message']['content'] # 从响应中提取数字/编号 text = content.strip() # 如果只检测数字,提取数字部分 if detect_only_numbers: # 匹配数字、字母数字组合 numbers = re.findall(r'[A-Za-z]*\d+[A-Za-z0-9]*', text) if numbers: text = numbers[0] # 创建结果 if text: results.append(OCRResult( text=text, confidence=1.0, # API不返回置信度,设为1.0 bbox=(0, 0, image.shape[1], image.shape[0]) )) return results # 成功则直接返回 except Exception as e: last_error = e print(f"OCR API识别错误 (尝试 {attempt + 1}/{max_retries}): {e}") if attempt < max_retries - 1: import time time.sleep(0.5 * (attempt + 1)) # 指数退避 # 所有重试都失败 if last_error: print(f"OCR API识别最终失败: {last_error}") return results def recognize_number(self, image: np.ndarray) -> Optional[str]: """ 识别图像中的编号 Args: image: 输入图像 Returns: 编号文本 """ results = self.recognize(image, detect_only_numbers=True) if results: return results[0].text return None class OCRRecognizerLocal: """ 本地OCR识别器 (备用) 使用PaddleOCR或EasyOCR进行识别 """ def __init__(self, use_gpu: bool = True, languages: List[str] = None): """ 初始化OCR Args: use_gpu: 是否使用GPU languages: 支持的语言列表 """ self.use_gpu = use_gpu self.languages = languages or ['ch', 'en'] self.ocr = None self._load_ocr() def _load_ocr(self): """加载OCR引擎""" try: from paddleocr import PaddleOCR self.ocr = PaddleOCR( use_angle_cls=True, lang='ch' if 'ch' in self.languages else 'en', use_gpu=self.use_gpu, show_log=False ) print("成功加载PaddleOCR") except ImportError: print("未安装PaddleOCR") self.ocr = None except Exception as e: print(f"加载OCR失败: {e}") def recognize(self, image: np.ndarray, detect_only_numbers: bool = True) -> List[OCRResult]: """识别图像中的文字""" if self.ocr is None or image is None: return [] results = [] try: ocr_results = self.ocr.ocr(image, cls=True) if ocr_results and len(ocr_results) > 0: for line in ocr_results[0]: if line is None: continue bbox_points, (text, conf) = line if conf < 0.5: continue x1 = int(min(p[0] for p in bbox_points)) y1 = int(min(p[1] for p in bbox_points)) x2 = int(max(p[0] for p in bbox_points)) y2 = int(max(p[1] for p in bbox_points)) results.append(OCRResult( text=text, confidence=conf, bbox=(x1, y1, x2-x1, y2-y1) )) except Exception as e: print(f"OCR识别错误: {e}") return results class NumberDetector: """ 编号检测器 在人体图像中检测编号 使用llama-server API进行OCR识别 """ def __init__(self, use_api: bool = True, ocr_config: Dict = None): """ 初始化检测器 Args: use_api: 是否使用API进行OCR ocr_config: OCR配置 """ self.segmenter = PersonSegmenter(use_gpu=False) # 使用API OCR或本地OCR if use_api: self.ocr = OCRRecognizer(ocr_config) print("使用llama-server API进行OCR识别") else: self.ocr = OCRRecognizerLocal() print("使用本地OCR进行识别") # 编号可能出现的区域 (相对于人体边界框的比例) self.search_regions = [ {'name': '胸部', 'y_ratio': (0.2, 0.5), 'x_ratio': (0.2, 0.8)}, {'name': '腹部', 'y_ratio': (0.5, 0.8), 'x_ratio': (0.2, 0.8)}, {'name': '背部上方', 'y_ratio': (0.1, 0.4), 'x_ratio': (0.1, 0.9)}, ] def detect_number(self, frame: np.ndarray, person_bbox: Tuple[int, int, int, int]) -> PersonInfo: """ 检测人体编号 Args: frame: 输入图像 person_bbox: 人体边界框 Returns: 人员信息 """ x, y, w, h = person_bbox # 提取人体区域 person_region, offset = self.segmenter.extract_person_region( frame, person_bbox ) person_info = PersonInfo( person_id=-1, person_bbox=person_bbox, ocr_results=[] ) # 在不同区域搜索编号 best_result = None best_confidence = 0 for region in self.search_regions: # 计算搜索区域 y1 = int(h * region['y_ratio'][0]) y2 = int(h * region['y_ratio'][1]) x1 = int(w * region['x_ratio'][0]) x2 = int(w * region['x_ratio'][1]) # 确保在图像范围内 y1 = max(0, min(y1, person_region.shape[0])) y2 = max(0, min(y2, person_region.shape[0])) x1 = max(0, min(x1, person_region.shape[1])) x2 = max(0, min(x2, person_region.shape[1])) if y2 <= y1 or x2 <= x1: continue # 裁剪区域 roi = person_region[y1:y2, x1:x2] # OCR识别 ocr_results = self.ocr.recognize(roi) for result in ocr_results: # 调整坐标到原始图像坐标系 adjusted_bbox = ( result.bbox[0] + x1 + offset[0], result.bbox[1] + y1 + offset[1], result.bbox[2], result.bbox[3] ) result.bbox = adjusted_bbox result.location = region['name'] person_info.ocr_results.append(result) # 更新最佳结果 if result.confidence > best_confidence: best_confidence = result.confidence best_result = result # 设置最佳结果作为编号 if best_result: person_info.number_text = best_result.text person_info.number_confidence = best_result.confidence person_info.number_location = best_result.location return person_info def detect_numbers_batch(self, frame: np.ndarray, person_bboxes: List[Tuple[int, int, int, int]]) -> List[PersonInfo]: """ 批量检测人体编号 Args: frame: 输入图像 person_bboxes: 人体边界框列表 Returns: 人员信息列表 """ results = [] for i, bbox in enumerate(person_bboxes): person_info = self.detect_number(frame, bbox) person_info.person_id = i results.append(person_info) return results def release(self): """释放资源""" if hasattr(self.segmenter, 'release'): self.segmenter.release() def preprocess_for_ocr(image: np.ndarray) -> np.ndarray: """ OCR预处理 Args: image: 输入图像 Returns: 预处理后的图像 """ if image is None: return None # 转换为灰度图 if len(image.shape) == 3: gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) else: gray = image # 自适应直方图均衡化 clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8)) enhanced = clahe.apply(gray) # 降噪 denoised = cv2.fastNlMeansDenoising(enhanced, None, 10) # 二值化 _, binary = cv2.threshold(denoised, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU) return binary