| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446 |
- """
- 大模型 API 调用模块
- 用于安全状态判断和 OCR 编号识别
- """
- import os
- import json
- import time
- import base64
- import http.client
- from typing import Optional, Dict, Any, List
- from dataclasses import dataclass
- import cv2
- import numpy as np
- @dataclass
- class LLMResponse:
- """大模型响应"""
- content: str # 响应内容
- success: bool = True # 是否成功
- error: str = "" # 错误信息
- latency: float = 0.0 # 响应延迟(秒)
- class LLMClient:
- """
- 大模型 API 客户端
- 支持 OpenAI 兼容接口 (千问、llama-server 等)
- """
-
- def __init__(self, config: Dict[str, Any] = None):
- """
- 初始化客户端
-
- Args:
- config: 配置字典
- """
- self.config = config or {}
-
- # API 配置
- self.api_host = self.config.get('api_host', 'localhost')
- self.api_port = self.config.get('api_port', 8111)
- self.api_key = self.config.get('api_key', '')
- self.model = self.config.get('model', 'Qwen2.5-VL-7B-Instruct')
-
- # 超时和重试
- self.timeout = self.config.get('timeout', 30)
- self.max_retries = self.config.get('max_retries', 3)
- self.retry_delay = self.config.get('retry_delay', 1.0)
-
- # 是否使用 HTTPS
- self.use_https = self.config.get('use_https',
- self.api_host not in ['localhost', '127.0.0.1'])
-
- def _image_to_base64(self, image: np.ndarray) -> str:
- """将图像转换为 base64 编码"""
- if image is None:
- return ""
-
- # 确保图像是连续的
- image = np.ascontiguousarray(image)
-
- # 编码为 JPEG
- success, buffer = cv2.imencode('.jpg', image, [cv2.IMWRITE_JPEG_QUALITY, 85])
- if not success:
- return ""
-
- base64_str = base64.b64encode(buffer).decode('utf-8')
- return f"data:image/jpeg;base64,{base64_str}"
-
- def chat(self, messages: List[Dict], temperature: float = 0.3,
- max_tokens: int = 1024, stream: bool = False) -> LLMResponse:
- """
- 发送聊天请求
-
- Args:
- messages: 消息列表
- temperature: 温度参数
- max_tokens: 最大生成 token 数
- stream: 是否流式输出
-
- Returns:
- LLMResponse 响应对象
- """
- payload = {
- "model": self.model,
- "messages": messages,
- "temperature": temperature,
- "max_tokens": max_tokens,
- "stream": stream
- }
-
- headers = {
- 'Content-Type': 'application/json',
- 'Accept': 'application/json',
- }
-
- if self.api_key:
- headers['Authorization'] = f'Bearer {self.api_key}'
-
- last_error = None
-
- for attempt in range(self.max_retries):
- try:
- start_time = time.time()
-
- # 创建连接
- conn_class = http.client.HTTPSConnection if self.use_https else http.client.HTTPConnection
- conn = conn_class(self.api_host, self.api_port, timeout=self.timeout)
-
- conn.request("POST", "/v1/chat/completions",
- json.dumps(payload), headers)
-
- res = conn.getresponse()
- data = res.read()
- conn.close()
-
- latency = time.time() - start_time
-
- if res.status != 200:
- error_msg = f"HTTP {res.status}: {data.decode('utf-8', errors='ignore')}"
- return LLMResponse(content="", success=False, error=error_msg, latency=latency)
-
- response = json.loads(data.decode('utf-8'))
-
- if 'choices' in response and len(response['choices']) > 0:
- content = response['choices'][0]['message']['content']
- return LLMResponse(content=content, success=True, latency=latency)
- elif 'error' in response:
- return LLMResponse(content="", success=False,
- error=response['error'].get('message', 'Unknown error'),
- latency=latency)
- else:
- return LLMResponse(content="", success=False,
- error="Invalid response format", latency=latency)
-
- except json.JSONDecodeError as e:
- last_error = f"JSON 解析错误: {e}"
- except http.client.HTTPException as e:
- last_error = f"HTTP 错误: {e}"
- except Exception as e:
- last_error = str(e)
-
- # 重试
- if attempt < self.max_retries - 1:
- time.sleep(self.retry_delay * (attempt + 1))
-
- return LLMResponse(content="", success=False, error=last_error or "Unknown error")
-
- def vision_chat(self, image: np.ndarray, prompt: str,
- temperature: float = 0.3) -> LLMResponse:
- """
- 视觉语言模型对话
-
- Args:
- image: 图像
- prompt: 提示词
- temperature: 温度参数
-
- Returns:
- LLMResponse 响应对象
- """
- image_base64 = self._image_to_base64(image)
-
- messages = [
- {
- "role": "user",
- "content": [
- {"type": "text", "text": prompt},
- {"type": "image_url", "image_url": {"url": image_base64}}
- ]
- }
- ]
-
- return self.chat(messages, temperature=temperature)
-
- def check_connection(self) -> bool:
- """检查 API 连接"""
- try:
- conn_class = http.client.HTTPSConnection if self.use_https else http.client.HTTPConnection
- conn = conn_class(self.api_host, self.api_port, timeout=5)
- conn.request("GET", "/v1/models")
- res = conn.getresponse()
- conn.close()
- return res.status in [200, 404] # 404 也表示服务在运行
- except:
- return False
- class SafetyAnalyzer:
- """
- 安全状态分析器
- 使用大模型判断安全状态
- """
-
- # 安全分析提示词
- SAFETY_PROMPT = """你是一个施工现场安全管理助手。请分析这张图片中的安全情况。
- 请检查以下几点:
- 1. 图片中是否有人员?
- 2. 人员是否佩戴了安全帽?
- 3. 人员是否穿着反光衣/安全背心?
- 请以 JSON 格式回复,格式如下:
- {
- "has_person": true/false,
- "person_count": 数字,
- "safety_status": "safe" 或 "violation",
- "violations": ["违规项1", "违规项2"],
- "description": "简要描述",
- "confidence": 0.0-1.0
- }
- 只返回 JSON,不要其他内容。"""
- def __init__(self, llm_config: Dict[str, Any] = None):
- """
- 初始化分析器
-
- Args:
- llm_config: LLM 配置
- """
- self.llm = LLMClient(llm_config)
- self.enabled = True
-
- def analyze(self, image: np.ndarray) -> Dict[str, Any]:
- """
- 分析图像中的安全状态
-
- Args:
- image: 输入图像
-
- Returns:
- 分析结果字典
- """
- if not self.enabled or image is None:
- return self._default_result()
-
- # 调用大模型
- response = self.llm.vision_chat(image, self.SAFETY_PROMPT, temperature=0.1)
-
- if not response.success:
- print(f"安全分析失败: {response.error}")
- return self._default_result()
-
- # 解析结果
- try:
- # 尝试提取 JSON
- content = response.content.strip()
-
- # 处理 markdown 代码块
- if '```json' in content:
- content = content.split('```json')[1].split('```')[0]
- elif '```' in content:
- content = content.split('```')[1].split('```')[0]
-
- result = json.loads(content.strip())
-
- # 验证必要字段
- if 'has_person' not in result:
- result['has_person'] = False
- if 'safety_status' not in result:
- result['safety_status'] = 'unknown'
- if 'violations' not in result:
- result['violations'] = []
-
- result['success'] = True
- result['latency'] = response.latency
-
- return result
-
- except json.JSONDecodeError as e:
- print(f"解析安全分析结果失败: {e}")
- print(f"原始响应: {response.content[:200]}")
- return self._default_result()
-
- def _default_result(self) -> Dict[str, Any]:
- """返回默认结果"""
- return {
- 'has_person': False,
- 'person_count': 0,
- 'safety_status': 'unknown',
- 'violations': [],
- 'description': '',
- 'confidence': 0.0,
- 'success': False
- }
-
- def check_person_safety(self, person_image: np.ndarray) -> Dict[str, Any]:
- """
- 检查单个人员的安全状态
-
- Args:
- person_image: 人员图像(裁剪后的人体区域)
-
- Returns:
- 安全状态字典
- """
- prompt = """分析这张图片中人员的安全装备佩戴情况。
- 请检查:
- 1. 是否佩戴安全帽?
- 2. 是否穿着反光衣/安全背心?
- 以 JSON 格式回复:
- {
- "has_helmet": true/false,
- "has_vest": true/false,
- "is_violation": true/false,
- "violation_desc": "违规描述,如果没有违规则为空",
- "confidence": 0.0-1.0
- }
- 只返回 JSON。"""
-
- if person_image is None:
- return {'has_helmet': False, 'has_vest': False, 'is_violation': True,
- 'violation_desc': '无法识别', 'confidence': 0.0}
-
- response = self.llm.vision_chat(person_image, prompt, temperature=0.1)
-
- if not response.success:
- return {'has_helmet': False, 'has_vest': False, 'is_violation': True,
- 'violation_desc': '识别失败', 'confidence': 0.0}
-
- try:
- content = response.content.strip()
- if '```json' in content:
- content = content.split('```json')[1].split('```')[0]
- elif '```' in content:
- content = content.split('```')[1].split('```')[0]
-
- result = json.loads(content.strip())
- result['success'] = True
- return result
- except:
- return {'has_helmet': False, 'has_vest': False, 'is_violation': True,
- 'violation_desc': '解析失败', 'confidence': 0.0}
- class NumberRecognizer:
- """
- 编号识别器
- 使用大模型进行 OCR 编号识别
- """
-
- NUMBER_PROMPT = """请识别这张图片中工作人员衣服上的编号或工号。
- 只返回识别到的编号数字,如果没有看到编号则返回 "无"。
- 不要返回其他内容。"""
-
- def __init__(self, llm_config: Dict[str, Any] = None):
- """
- 初始化识别器
-
- Args:
- llm_config: LLM 配置
- """
- self.llm = LLMClient(llm_config)
-
- def recognize(self, image: np.ndarray) -> Dict[str, Any]:
- """
- 识别图像中的编号
-
- Args:
- image: 输入图像
-
- Returns:
- 识别结果 {'number': str, 'confidence': float, 'success': bool}
- """
- if image is None:
- return {'number': None, 'confidence': 0.0, 'success': False}
-
- response = self.llm.vision_chat(image, self.NUMBER_PROMPT, temperature=0.1)
-
- if not response.success:
- return {'number': None, 'confidence': 0.0, 'success': False,
- 'error': response.error}
-
- content = response.content.strip()
-
- # 处理结果
- if content == '无' or '无' in content or not content:
- return {'number': None, 'confidence': 0.0, 'success': True}
-
- # 提取数字/字母数字组合
- import re
- matches = re.findall(r'[A-Za-z]*\d+[A-Za-z0-9]*', content)
-
- if matches:
- number = matches[0]
- return {'number': number, 'confidence': 0.9, 'success': True}
-
- # 如果没有匹配到,返回原始内容
- return {'number': content, 'confidence': 0.5, 'success': True}
-
- def recognize_person_number(self, person_image: np.ndarray,
- search_chest: bool = True) -> Dict[str, Any]:
- """
- 识别人员编号(在胸部/背部区域搜索)
-
- Args:
- person_image: 人员图像
- search_chest: 是否搜索胸部区域
-
- Returns:
- 识别结果
- """
- if person_image is None:
- return {'number': None, 'confidence': 0.0, 'success': False}
-
- h, w = person_image.shape[:2]
-
- # 如果图像较大,先尝试裁剪胸部区域
- if search_chest and h > 100 and w > 100:
- # 胸部区域:上半身中间部分
- y1 = int(h * 0.15)
- y2 = int(h * 0.55)
- x1 = int(w * 0.15)
- x2 = int(w * 0.85)
-
- chest_region = person_image[y1:y2, x1:x2]
-
- # 先在胸部区域搜索
- result = self.recognize(chest_region)
-
- if result.get('number'):
- result['location'] = '胸部'
- return result
-
- # 整图识别
- result = self.recognize(person_image)
- result['location'] = '全身'
- return result
- def create_safety_analyzer(config: Dict[str, Any] = None) -> SafetyAnalyzer:
- """创建安全分析器"""
- return SafetyAnalyzer(config)
- def create_number_recognizer(config: Dict[str, Any] = None) -> NumberRecognizer:
- """创建编号识别器"""
- return NumberRecognizer(config)
|