| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648 |
- """
- OCR识别模块
- 负责人体分割和编号OCR识别
- """
- import cv2
- import numpy as np
- from typing import List, Optional, Tuple, Dict
- from dataclasses import dataclass
- from config import OCR_CONFIG, SEGMENTATION_CONFIG
- @dataclass
- class OCRResult:
- """OCR识别结果"""
- text: str # 识别文本
- confidence: float # 置信度
- bbox: Tuple[int, int, int, int] # 边界框
- location: str = "" # 位置描述 (如: "胸部", "背部")
- @dataclass
- class PersonInfo:
- """人员信息"""
- person_id: int # 人员ID
- person_bbox: Tuple[int, int, int, int] # 人体边界框
- number_text: Optional[str] = None # 编号文本
- number_confidence: float = 0.0 # 编号置信度
- number_location: str = "" # 编号位置
- ocr_results: List[OCRResult] = None # 所有OCR结果
- class PersonSegmenter:
- """
- 人体分割器 - 使用 RKNN YOLOv8 分割模型
- 将人体从背景中分割出来
- """
-
- def __init__(self, use_gpu: bool = True):
- """
- 初始化分割器
- Args:
- use_gpu: 是否使用GPU (RKNN使用NPU,此参数保留用于兼容)
- """
- self.use_gpu = use_gpu
- self.config = SEGMENTATION_CONFIG
- self.input_size = self.config.get('input_size', (640, 640))
- self.conf_threshold = self.config.get('conf_threshold', 0.5)
- self.rknn = None
- self._load_model()
-
- def _load_model(self):
- """加载 RKNN 分割模型"""
- try:
- from rknnlite.api import RKNNLite
-
- model_path = self.config.get('model_path', '/home/admin/dsh/testrk3588/yolov8n-seg.rknn')
- self.rknn = RKNNLite()
-
- ret = self.rknn.load_rknn(model_path)
- if ret != 0:
- print(f"[错误] 加载 RKNN 分割模型失败: {model_path}")
- self.rknn = None
- return
-
- # 初始化运行时,使用所有NPU核心
- ret = self.rknn.init_runtime(core_mask=RKNNLite.NPU_CORE_0_1_2)
- if ret != 0:
- print("[错误] 初始化 RKNN 运行时失败")
- self.rknn = None
- return
-
- print(f"成功加载 RKNN 人体分割模型: {model_path}")
- except ImportError:
- print("未安装 rknnlite,无法使用 RKNN 分割模型")
- self.rknn = None
- except Exception as e:
- print(f"加载分割模型失败: {e}")
- self.rknn = None
-
- def _letterbox(self, image: np.ndarray) -> tuple:
- """Letterbox 预处理,保持宽高比"""
- h0, w0 = image.shape[:2]
- ih, iw = self.input_size
- scale = min(iw / w0, ih / h0)
- new_w, new_h = int(w0 * scale), int(h0 * scale)
- pad_w = (iw - new_w) // 2
- pad_h = (ih - new_h) // 2
- resized = cv2.resize(image, (new_w, new_h))
- canvas = np.full((ih, iw, 3), 114, dtype=np.uint8)
- canvas[pad_h:pad_h+new_h, pad_w:pad_w+new_w] = resized
- return canvas, scale, pad_w, pad_h, h0, w0
-
- def _postprocess_segmentation(self, outputs, scale, pad_w, pad_h, w0, h0):
- """
- 处理 YOLOv8 分割模型输出
- YOLOv8-seg 输出格式: [检测输出, 分割输出]
- - 检测输出: (1, 116, 8400) - 包含边界框、类别、掩码系数
- - 分割输出: (1, 32, 160, 160) - 原型掩码
- """
- if not outputs or len(outputs) < 2:
- return None
-
- # 解析输出
- det_output = outputs[0] # (1, 116, 8400) - 检测输出
- seg_output = outputs[1] # (1, 32, 160, 160) - 分割原型
-
- # 处理检测输出
- if len(det_output.shape) == 3:
- det_output = det_output[0] # (116, 8400)
-
- # YOLOv8-seg: 前 84 维是检测 (4 box + 80 classes),后 32 维是掩码系数
- num_anchors = det_output.shape[1]
-
- best_idx = -1
- best_conf = 0
-
- # 寻找最佳人体检测 (class 0 = person)
- for i in range(num_anchors):
- # 类别概率 (索引 4-84 是80个类别)
- class_probs = det_output[4:84, i]
- person_conf = float(class_probs[0]) # class 0 = person
-
- if person_conf > self.conf_threshold and person_conf > best_conf:
- best_conf = person_conf
- best_idx = i
-
- if best_idx < 0:
- return None
-
- # 获取掩码系数 (后32维)
- mask_coeffs = det_output[84:116, best_idx] # (32,)
-
- # 处理分割原型 (1, 32, 160, 160) -> (32, 160, 160)
- if len(seg_output.shape) == 4:
- seg_output = seg_output[0]
-
- # 计算最终掩码: mask = coeffs @ prototypes
- # seg_output: (32, 160, 160), mask_coeffs: (32,)
- mask = np.zeros((160, 160), dtype=np.float32)
- for i in range(32):
- mask += mask_coeffs[i] * seg_output[i]
-
- # Sigmoid 激活
- mask = 1 / (1 + np.exp(-mask))
-
- # 移除 padding 并缩放到原始尺寸
- mask = (mask > 0.5).astype(np.uint8) * 255
-
- # 裁剪掉 letterbox 添加的 padding
- mask_h, mask_w = mask.shape
- pad_h_mask = int(pad_h * mask_h / self.input_size[0]) # 160/640 = 0.25
- pad_w_mask = int(pad_w * mask_w / self.input_size[1])
- new_h_mask = int((mask_h - 2 * pad_h_mask))
- new_w_mask = int((mask_w - 2 * pad_w_mask))
-
- if new_h_mask > 0 and new_w_mask > 0:
- mask = mask[pad_h_mask:pad_h_mask+new_h_mask, pad_w_mask:pad_w_mask+new_w_mask]
-
- # 缩放到原始 ROI 尺寸
- mask = cv2.resize(mask, (w0, h0))
-
- return mask
-
- def segment_person(self, frame: np.ndarray,
- person_bbox: Tuple[int, int, int, int]) -> Optional[np.ndarray]:
- """
- 分割人体
- Args:
- frame: 输入图像
- person_bbox: 人体边界框 (x, y, w, h)
- Returns:
- 人体分割掩码
- """
- if self.rknn is None:
- return None
-
- x, y, w, h = person_bbox
-
- # 裁剪人体区域
- person_roi = frame[y:y+h, x:x+w]
- if person_roi.size == 0:
- return None
-
- try:
- # 预处理
- canvas, scale, pad_w, pad_h, h0, w0 = self._letterbox(person_roi)
-
- # RKNN 输入: NHWC (1, H, W, C), RGB, float32 normalized 0-1
- img = canvas[..., ::-1].astype(np.float32) / 255.0
- blob = img[None, ...] # (1, 640, 640, 3)
-
- # 推理
- outputs = self.rknn.inference(inputs=[blob])
-
- # 后处理
- mask = self._postprocess_segmentation(outputs, scale, pad_w, pad_h, w0, h0)
- return mask
-
- except Exception as e:
- print(f"分割错误: {e}")
-
- return None
-
- def release(self):
- """释放 RKNN 资源"""
- if self.rknn is not None:
- self.rknn.release()
- self.rknn = None
-
- def extract_person_region(self, frame: np.ndarray,
- person_bbox: Tuple[int, int, int, int],
- padding: float = 0.1) -> Tuple[np.ndarray, Tuple[int, int]]:
- """
- 提取人体区域
- Args:
- frame: 输入图像
- person_bbox: 人体边界框
- padding: 边界填充比例
- Returns:
- (人体区域图像, 原始位置偏移)
- """
- x, y, w, h = person_bbox
-
- # 添加填充
- pad_w = int(w * padding)
- pad_h = int(h * padding)
-
- x1 = max(0, x - pad_w)
- y1 = max(0, y - pad_h)
- x2 = min(frame.shape[1], x + w + pad_w)
- y2 = min(frame.shape[0], y + h + pad_h)
-
- person_region = frame[y1:y2, x1:x2]
- offset = (x1, y1)
-
- return person_region, offset
- class OCRRecognizer:
- """
- OCR识别器
- 使用llama-server API接口进行OCR识别
- """
-
- def __init__(self, config: Dict = None):
- """
- 初始化OCR
- Args:
- config: API配置
- """
- self.config = config or OCR_CONFIG
- self.api_host = self.config.get('api_host', 'localhost')
- self.api_port = self.config.get('api_port', 8111)
- self.model = self.config.get('model', 'PaddleOCR-VL-1.5-GGUF.gguf')
- self.prompt = self.config.get('prompt', '请识别图片中的数字编号,只返回数字,不要其他内容')
- self.temperature = self.config.get('temperature', 0.3)
- self.timeout = self.config.get('timeout', 30)
-
- # 检查API是否可用
- self._check_api()
-
- def _check_api(self):
- """检查API是否可用"""
- try:
- import http.client
- # localhost通常使用HTTP而非HTTPS
- use_https = self.api_host not in ['localhost', '127.0.0.1']
- conn_class = http.client.HTTPSConnection if use_https else http.client.HTTPConnection
- conn = conn_class(self.api_host, self.api_port, timeout=5)
- conn.request("GET", "/")
- res = conn.getresponse()
- conn.close()
- print(f"llama-server API已连接: {self.api_host}:{self.api_port}")
- except Exception as e:
- print(f"连接llama-server失败: {e}")
- print(f"请确保llama-server运行在 {self.api_host}:{self.api_port}")
-
- def _image_to_base64(self, image: np.ndarray) -> str:
- """
- 将图像转换为base64编码
- Args:
- image: 输入图像
- Returns:
- base64编码字符串
- """
- import base64
- _, buffer = cv2.imencode('.jpg', image)
- base64_str = base64.b64encode(buffer).decode('utf-8')
- return f"data:image/jpeg;base64,{base64_str}"
-
- def recognize(self, image: np.ndarray,
- prompt: str = None,
- detect_only_numbers: bool = True,
- max_retries: int = 3) -> List[OCRResult]:
- """
- 使用llama-server API识别图像中的文字
- Args:
- image: 输入图像
- prompt: 自定义提示词
- detect_only_numbers: 是否只检测数字编号
- max_retries: 最大重试次数
- Returns:
- 识别结果列表
- """
- if image is None:
- return []
-
- import http.client
- import json
- import re
-
- results = []
- last_error = None
-
- for attempt in range(max_retries):
- try:
- # 准备图像数据
- image_base64 = self._image_to_base64(image)
-
- # 构建请求
- use_prompt = prompt or self.prompt
-
- payload = {
- "model": self.model,
- "messages": [
- {
- "role": "user",
- "content": [
- {
- "type": "text",
- "text": use_prompt
- },
- {
- "type": "image_url",
- "image_url": {
- "url": image_base64
- }
- }
- ]
- }
- ],
- "temperature": self.temperature,
- "stream": False
- }
-
- headers = {
- 'Content-Type': 'application/json',
- 'Accept': 'application/json',
- }
-
- # 发送请求 - localhost使用HTTP
- use_https = self.api_host not in ['localhost', '127.0.0.1']
- conn_class = http.client.HTTPSConnection if use_https else http.client.HTTPConnection
- conn = conn_class(
- self.api_host,
- self.api_port,
- timeout=self.timeout
- )
-
- conn.request("POST", "/v1/chat/completions",
- json.dumps(payload), headers)
-
- res = conn.getresponse()
- data = res.read()
- conn.close()
-
- # 解析响应
- response = json.loads(data.decode('utf-8'))
-
- if 'choices' in response and len(response['choices']) > 0:
- content = response['choices'][0]['message']['content']
-
- # 从响应中提取数字/编号
- text = content.strip()
-
- # 如果只检测数字,提取数字部分
- if detect_only_numbers:
- # 匹配数字、字母数字组合
- numbers = re.findall(r'[A-Za-z]*\d+[A-Za-z0-9]*', text)
- if numbers:
- text = numbers[0]
-
- # 创建结果
- if text:
- results.append(OCRResult(
- text=text,
- confidence=1.0, # API不返回置信度,设为1.0
- bbox=(0, 0, image.shape[1], image.shape[0])
- ))
- return results # 成功则直接返回
-
- except Exception as e:
- last_error = e
- print(f"OCR API识别错误 (尝试 {attempt + 1}/{max_retries}): {e}")
- if attempt < max_retries - 1:
- import time
- time.sleep(0.5 * (attempt + 1)) # 指数退避
-
- # 所有重试都失败
- if last_error:
- print(f"OCR API识别最终失败: {last_error}")
-
- return results
-
- def recognize_number(self, image: np.ndarray) -> Optional[str]:
- """
- 识别图像中的编号
- Args:
- image: 输入图像
- Returns:
- 编号文本
- """
- results = self.recognize(image, detect_only_numbers=True)
- if results:
- return results[0].text
- return None
- class OCRRecognizerLocal:
- """
- 本地OCR识别器 (备用)
- 使用PaddleOCR或EasyOCR进行识别
- """
-
- def __init__(self, use_gpu: bool = True, languages: List[str] = None):
- """
- 初始化OCR
- Args:
- use_gpu: 是否使用GPU
- languages: 支持的语言列表
- """
- self.use_gpu = use_gpu
- self.languages = languages or ['ch', 'en']
- self.ocr = None
- self._load_ocr()
-
- def _load_ocr(self):
- """加载OCR引擎"""
- try:
- from paddleocr import PaddleOCR
- self.ocr = PaddleOCR(
- use_angle_cls=True,
- lang='ch' if 'ch' in self.languages else 'en',
- use_gpu=self.use_gpu,
- show_log=False
- )
- print("成功加载PaddleOCR")
- except ImportError:
- print("未安装PaddleOCR")
- self.ocr = None
- except Exception as e:
- print(f"加载OCR失败: {e}")
-
- def recognize(self, image: np.ndarray,
- detect_only_numbers: bool = True) -> List[OCRResult]:
- """识别图像中的文字"""
- if self.ocr is None or image is None:
- return []
-
- results = []
-
- try:
- ocr_results = self.ocr.ocr(image, cls=True)
- if ocr_results and len(ocr_results) > 0:
- for line in ocr_results[0]:
- if line is None:
- continue
- bbox_points, (text, conf) = line
-
- if conf < 0.5:
- continue
-
- x1 = int(min(p[0] for p in bbox_points))
- y1 = int(min(p[1] for p in bbox_points))
- x2 = int(max(p[0] for p in bbox_points))
- y2 = int(max(p[1] for p in bbox_points))
-
- results.append(OCRResult(
- text=text,
- confidence=conf,
- bbox=(x1, y1, x2-x1, y2-y1)
- ))
- except Exception as e:
- print(f"OCR识别错误: {e}")
-
- return results
- class NumberDetector:
- """
- 编号检测器
- 在人体图像中检测编号
- 使用llama-server API进行OCR识别
- """
-
- def __init__(self, use_api: bool = True, ocr_config: Dict = None):
- """
- 初始化检测器
- Args:
- use_api: 是否使用API进行OCR
- ocr_config: OCR配置
- """
- self.segmenter = PersonSegmenter(use_gpu=False)
-
- # 使用API OCR或本地OCR
- if use_api:
- self.ocr = OCRRecognizer(ocr_config)
- print("使用llama-server API进行OCR识别")
- else:
- self.ocr = OCRRecognizerLocal()
- print("使用本地OCR进行识别")
-
- # 编号可能出现的区域 (相对于人体边界框的比例)
- self.search_regions = [
- {'name': '胸部', 'y_ratio': (0.2, 0.5), 'x_ratio': (0.2, 0.8)},
- {'name': '腹部', 'y_ratio': (0.5, 0.8), 'x_ratio': (0.2, 0.8)},
- {'name': '背部上方', 'y_ratio': (0.1, 0.4), 'x_ratio': (0.1, 0.9)},
- ]
-
- def detect_number(self, frame: np.ndarray,
- person_bbox: Tuple[int, int, int, int]) -> PersonInfo:
- """
- 检测人体编号
- Args:
- frame: 输入图像
- person_bbox: 人体边界框
- Returns:
- 人员信息
- """
- x, y, w, h = person_bbox
-
- # 提取人体区域
- person_region, offset = self.segmenter.extract_person_region(
- frame, person_bbox
- )
-
- person_info = PersonInfo(
- person_id=-1,
- person_bbox=person_bbox,
- ocr_results=[]
- )
-
- # 在不同区域搜索编号
- best_result = None
- best_confidence = 0
-
- for region in self.search_regions:
- # 计算搜索区域
- y1 = int(h * region['y_ratio'][0])
- y2 = int(h * region['y_ratio'][1])
- x1 = int(w * region['x_ratio'][0])
- x2 = int(w * region['x_ratio'][1])
-
- # 确保在图像范围内
- y1 = max(0, min(y1, person_region.shape[0]))
- y2 = max(0, min(y2, person_region.shape[0]))
- x1 = max(0, min(x1, person_region.shape[1]))
- x2 = max(0, min(x2, person_region.shape[1]))
-
- if y2 <= y1 or x2 <= x1:
- continue
-
- # 裁剪区域
- roi = person_region[y1:y2, x1:x2]
-
- # OCR识别
- ocr_results = self.ocr.recognize(roi)
-
- for result in ocr_results:
- # 调整坐标到原始图像坐标系
- adjusted_bbox = (
- result.bbox[0] + x1 + offset[0],
- result.bbox[1] + y1 + offset[1],
- result.bbox[2],
- result.bbox[3]
- )
-
- result.bbox = adjusted_bbox
- result.location = region['name']
-
- person_info.ocr_results.append(result)
-
- # 更新最佳结果
- if result.confidence > best_confidence:
- best_confidence = result.confidence
- best_result = result
-
- # 设置最佳结果作为编号
- if best_result:
- person_info.number_text = best_result.text
- person_info.number_confidence = best_result.confidence
- person_info.number_location = best_result.location
-
- return person_info
-
- def detect_numbers_batch(self, frame: np.ndarray,
- person_bboxes: List[Tuple[int, int, int, int]]) -> List[PersonInfo]:
- """
- 批量检测人体编号
- Args:
- frame: 输入图像
- person_bboxes: 人体边界框列表
- Returns:
- 人员信息列表
- """
- results = []
- for i, bbox in enumerate(person_bboxes):
- person_info = self.detect_number(frame, bbox)
- person_info.person_id = i
- results.append(person_info)
- return results
-
- def release(self):
- """释放资源"""
- if hasattr(self.segmenter, 'release'):
- self.segmenter.release()
- def preprocess_for_ocr(image: np.ndarray) -> np.ndarray:
- """
- OCR预处理
- Args:
- image: 输入图像
- Returns:
- 预处理后的图像
- """
- if image is None:
- return None
-
- # 转换为灰度图
- if len(image.shape) == 3:
- gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
- else:
- gray = image
-
- # 自适应直方图均衡化
- clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
- enhanced = clahe.apply(gray)
-
- # 降噪
- denoised = cv2.fastNlMeansDenoising(enhanced, None, 10)
-
- # 二值化
- _, binary = cv2.threshold(denoised, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
-
- return binary
|