import cv2 import numpy as np import argparse import requests import json import time import os from PIL import Image, ImageDraw, ImageFont from ultralytics import YOLO def put_text_chinese(img, text, position, font_size=20, color=(255, 0, 0)): img_pil = Image.fromarray(cv2.cvtColor(img, cv2.COLOR_BGR2RGB)) draw = ImageDraw.Draw(img_pil) font_path = "Alibaba_PuHuiTi_2.0_35_Thin_35_Thin.ttf" try: font = ImageFont.truetype(font_path, font_size) except: try: font = ImageFont.truetype("MiSans-Thin.ttf", font_size) except: font = ImageFont.load_default() color_rgb = (color[2], color[1], color[0]) draw.text(position, text, font=font, fill=color_rgb) img_cv2 = cv2.cvtColor(np.array(img_pil), cv2.COLOR_RGB2BGR) x, y = position text_width = draw.textlength(text, font=font) text_height = font_size img[y:y+text_height, x:x+int(text_width)] = img_cv2[y:y+text_height, x:x+int(text_width)] # 上传图片到OSS def upload_image(image_path): try: import http.client import mimetypes from codecs import encode # 获取文件名 filename = os.path.basename(image_path) # 创建连接 conn = http.client.HTTPSConnection("jtjai.device.wenhq.top", 8583) # 准备multipart/form-data boundary = 'wL36Yn8afVp8Ag7AmP8qZ0SA4n1v9T' dataList = [] dataList.append(encode('--' + boundary)) dataList.append(encode('Content-Disposition: form-data; name=file; filename={0}'.format(filename))) # 猜测文件类型 fileType = mimetypes.guess_type(image_path)[0] or 'application/octet-stream' dataList.append(encode('Content-Type: {}'.format(fileType))) dataList.append(encode('')) # 读取文件内容 with open(image_path, 'rb') as f: dataList.append(f.read()) dataList.append(encode('--'+boundary+'--')) dataList.append(encode('')) # 构建请求体 body = b'\r\n'.join(dataList) # 构建请求头 headers = { 'User-Agent': 'Apifox/1.0.0 (https://apifox.com)', 'Accept': '*/*', 'Host': 'jtjai.device.wenhq.top:8583', 'Connection': 'keep-alive', 'Content-Type': 'multipart/form-data; boundary={}'.format(boundary) } # 发送请求 conn.request("POST", "/api/resource/oss/upload", body, headers) res = conn.getresponse() data = res.read() # 解析响应 if res.status == 200: result = json.loads(data.decode("utf-8")) if result.get('code') == 200: return result.get('data', {}).get('purl') print(f"上传图片失败: {data.decode('utf-8')}") except Exception as e: print(f"上传图片异常: {e}") return None # 创建事件 def create_event(addr, purl): try: url = "https://jtjai.device.wenhq.top:8583/api/system/event" create_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) data = { "createTime": create_time, "addr": addr, "ext1": json.dumps([purl]), "ext2": json.dumps({"lx":"工地安全"}) } response = requests.post(url, json=data, verify=False) if response.status_code == 200: result = response.json() if result.get('code') == 200: print(f"事件创建成功: {addr}") return True print(f"创建事件失败: {response.text}") except Exception as e: print(f"创建事件异常: {e}") return False # 解析命令行参数 parser = argparse.ArgumentParser(description='YOLO11 目标检测') parser.add_argument('--input', type=str, default='b30090c8d3e9bf75f97b2f51a4b3cdd2.jpg', help='输入图片或视频路径') parser.add_argument('--output', type=str, default='', help='输出结果路径(不提供则不保存)') parser.add_argument('--type', type=str, default='image', choices=['image', 'video'], help='输入类型') parser.add_argument('--conf', type=float, default=0.5, help='置信度阈值') parser.add_argument('--fps', type=int, default=2, help='每秒处理的帧数') args = parser.parse_args() # 加载官方 YOLO11n 模型 model = YOLO('yolo11m_safety.pt') # 处理输入 input_path = args.input output_path = args.output input_type = args.type conf_threshold = args.conf process_fps = args.fps # 标签映射 label_map = {0: '安全帽', 4: '安全衣', 3: '人'} print(f"使用置信度阈值: {conf_threshold}") print(f"每秒处理帧数: {process_fps}") # 事件上传时间控制 last_upload_time = 0 upload_interval = 2 # 2秒 if input_type == 'image': print("正在进行检测...") results = model(input_path) print(results) # 读取图片 image = cv2.imread(input_path) if image is None: print(f"无法读取图片: {input_path}") exit(1) # 处理检测结果并绘制边界框 print("正在绘制检测结果...") for result in results: boxes = result.boxes for box in boxes: cls = int(box.cls[0]) conf = float(box.conf[0]) # 根据类别设置不同的置信度阈值 # 人: > 0.8, 安全帽/反光衣: > 0.5 if cls == 3: # 人 conf_threshold_item = 0.8 else: # 安全帽或反光衣 conf_threshold_item = 0.5 if conf > conf_threshold_item and cls in label_map: x1, y1, x2, y2 = box.xyxy[0].tolist() x1, y1, x2, y2 = int(x1), int(y1), int(x2), int(y2) print(f" {cls} detected: confidence={conf:.2f}, box=[{x1:.0f}, {y1:.0f}, {x2:.0f}, {y2:.0f}]") # 绘制边界框 cv2.rectangle(image, (x1, y1), (x2, y2), (255, 0, 0), 2) # 绘制标签和置信度 text = f"{label_map[cls]}: {conf:.2f}" text_y = max(15, y1 - 20) put_text_chinese(image, text, (x1, text_y), font_size=20, color=(255, 0, 0)) # 保存结果 if output_path: cv2.imwrite(output_path, image) print(f"检测结果已保存到: {output_path}") else: print("未指定输出路径,跳过保存") # 显示结果 cv2.imshow('YOLO11 Detection Result', image) cv2.waitKey(0) cv2.destroyAllWindows() elif input_type == 'video': # 打开视频文件或RTSP流 def open_stream(): cap = cv2.VideoCapture(input_path) if not cap.isOpened(): print(f"无法打开视频或RTSP流: {input_path}") return None print(f"成功打开: {input_path}") return cap cap = open_stream() if not cap: exit(1) # 获取视频信息 fps = cap.get(cv2.CAP_PROP_FPS) width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) # 检查是否为RTSP流 is_rtsp = input_path.startswith('rtsp://') if is_rtsp: print(f"RTSP流信息: {width}x{height}") # 对于RTSP流,使用固定帧率 if fps <= 0: fps = 30.0 print(f"使用帧率: {fps:.2f} FPS") else: print(f"视频信息: {width}x{height}, {fps:.2f} FPS") # 处理视频帧 frame_count = 0 out = None # 计算帧间隔 frame_interval = int(round(fps / process_fps)) if fps > 0 else 1 print(f"帧间隔: {frame_interval} 帧") # 只有当指定了输出路径时才创建视频写入对象 if output_path: # 创建视频写入对象 fourcc = cv2.VideoWriter_fourcc(*'mp4v') out = cv2.VideoWriter(output_path, fourcc, fps, (width, height)) print(f"将保存结果到: {output_path}") else: print("未指定输出路径,跳过保存") # 存储检测结果 last_detections = [] # 重连计数器 reconnect_count = 0 max_reconnects = 10 try: while True: # 检查连接状态 if not cap or not cap.isOpened(): print(f"连接已断开,尝试重新连接... ({reconnect_count}/{max_reconnects})") # 清理旧连接 if cap: cap.release() # 尝试重新连接 cap = open_stream() if not cap: reconnect_count += 1 if reconnect_count > max_reconnects: print("达到最大重连次数,退出") break # 等待一段时间后重试 import time time.sleep(2) continue # 重置重连计数器 reconnect_count = 0 # 重新获取视频信息 fps = cap.get(cv2.CAP_PROP_FPS) width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) frame_interval = int(round(fps / process_fps)) if fps > 0 else 1 print(f"重连成功,新的帧间隔: {frame_interval} 帧") # 如果需要保存,重新创建视频写入对象 if output_path and not out: fourcc = cv2.VideoWriter_fourcc(*'mp4v') out = cv2.VideoWriter(output_path, fourcc, fps, (width, height)) print(f"重新创建视频写入对象") # 读取帧 ret, frame = cap.read() if not ret: print("读取帧失败,可能流已断开") # 强制重新连接 continue frame_count += 1 # 只有当帧号是帧间隔的倍数时才进行检测 if frame_count % frame_interval == 0: print(f"处理第 {frame_count} 帧...") try: # 进行检测 results = model(frame) # 存储检测结果 last_detections = [] # 存储人的检测结果,用于后续检查 person_detections = [] # 存储安全帽和安全衣的检测结果 helmet_detections = [] safety_clothes_detections = [] for result in results: boxes = result.boxes for box in boxes: cls = int(box.cls[0]) conf = float(box.conf[0]) # 根据类别设置不同的置信度阈值 # 人: > 0.8, 安全帽/反光衣: > 0.5 if cls == 3: # 人 conf_threshold_item = 0.8 else: # 安全帽或反光衣 conf_threshold_item = 0.5 if conf > conf_threshold_item and cls in label_map: x1, y1, x2, y2 = box.xyxy[0].tolist() x1, y1, x2, y2 = int(x1), int(y1), int(x2), int(y2) last_detections.append((cls, conf, x1, y1, x2, y2)) # 分类存储检测结果 if cls == 3: # 人 person_detections.append((x1, y1, x2, y2, conf)) elif cls == 0: # 安全帽 helmet_detections.append((x1, y1, x2, y2, conf)) elif cls == 4: # 安全衣 safety_clothes_detections.append((x1, y1, x2, y2, conf)) # 标记是否需要告警 need_alert = False alert_addr = None # 检查每个人是否戴了安全帽和安全衣 for person_x1, person_y1, person_x2, person_y2, person_conf in person_detections: # 检查是否戴安全帽 has_helmet = False for helmet_x1, helmet_y1, helmet_x2, helmet_y2, helmet_conf in helmet_detections: # 简单的重叠检测:安全帽中心是否在人框内 helmet_center_x = (helmet_x1 + helmet_x2) / 2 helmet_center_y = (helmet_y1 + helmet_y2) / 2 if (helmet_center_x >= person_x1 and helmet_center_x <= person_x2 and helmet_center_y >= person_y1 and helmet_center_y <= person_y2): has_helmet = True break # 检查是否穿安全衣 has_safety_clothes = False for clothes_x1, clothes_y1, clothes_x2, clothes_y2, clothes_conf in safety_clothes_detections: # 简单的重叠检测:安全衣与人体有重叠 overlap_x1 = max(person_x1, clothes_x1) overlap_y1 = max(person_y1, clothes_y1) overlap_x2 = min(person_x2, clothes_x2) overlap_y2 = min(person_y2, clothes_y2) if overlap_x1 < overlap_x2 and overlap_y1 < overlap_y2: has_safety_clothes = True break # 标记是否需要告警 if not has_helmet or not has_safety_clothes: need_alert = True # 准备告警信息 if not has_helmet and not has_safety_clothes: alert_addr = "反光衣和安全帽都没戴" elif not has_helmet: alert_addr = "未戴安全帽" else: alert_addr = "未穿反光衣" print(f"警告: {alert_addr},置信度: {person_conf:.2f}") except Exception as e: print(f"检测过程中出错: {e}") # 继续处理,使用上一次的检测结果 # 绘制上一次的检测结果 for cls, conf, x1, y1, x2, y2 in last_detections: # 绘制边界框 cv2.rectangle(frame, (x1, y1), (x2, y2), (255, 0, 0), 2) # 绘制标签和置信度 text = f"{label_map[cls]}: {conf:.2f}" text_y = max(15, y1 - 20) put_text_chinese(frame, text, (x1, text_y), font_size=20, color=(255, 0, 0)) # 检查是否需要告警并上传图片 # 上传的必要条件是先识别到人 if 'person_detections' in locals() and person_detections and 'need_alert' in locals() and need_alert and alert_addr: # 检查是否在2秒内已经上传过 current_time = time.time() if current_time - last_upload_time >= upload_interval: print(f"检测到人,触发告警上传") # 保存带标签的告警帧 temp_image_path = f"alert_frame_{frame_count}.jpg" cv2.imwrite(temp_image_path, frame) # 上传图片 purl = upload_image(temp_image_path) if purl: # 创建事件 create_event(alert_addr, purl) # 更新最后上传时间 last_upload_time = current_time # 清理临时文件 if os.path.exists(temp_image_path): os.remove(temp_image_path) else: print(f"2秒内已上传过事件,跳过本次上传") # 只有当指定了输出路径时才写入处理后的帧 if out: try: out.write(frame) except Exception as e: print(f"写入视频失败: {e}") # 显示处理后的帧 try: cv2.imshow('YOLO11 Detection Result', frame) # 按 'q' 键退出 if cv2.waitKey(1) & 0xFF == ord('q'): break except Exception as e: print(f"显示帧失败: {e}") # 继续处理 finally: # 清理资源 cap.release() if out: out.release() cv2.destroyAllWindows() if output_path: print(f"视频处理完成,结果已保存到: {output_path}") else: print("视频处理完成")