| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445 |
- import cv2
- import numpy as np
- import argparse
- import requests
- import json
- import time
- import os
- from PIL import Image, ImageDraw, ImageFont
- from ultralytics import YOLO
- def put_text_chinese(img, text, position, font_size=20, color=(255, 0, 0)):
- img_pil = Image.fromarray(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
- draw = ImageDraw.Draw(img_pil)
- font_path = "Alibaba_PuHuiTi_2.0_35_Thin_35_Thin.ttf"
- try:
- font = ImageFont.truetype(font_path, font_size)
- except:
- try:
- font = ImageFont.truetype("MiSans-Thin.ttf", font_size)
- except:
- font = ImageFont.load_default()
-
- color_rgb = (color[2], color[1], color[0])
- draw.text(position, text, font=font, fill=color_rgb)
-
- img_cv2 = cv2.cvtColor(np.array(img_pil), cv2.COLOR_RGB2BGR)
-
- x, y = position
- text_width = draw.textlength(text, font=font)
- text_height = font_size
- img[y:y+text_height, x:x+int(text_width)] = img_cv2[y:y+text_height, x:x+int(text_width)]
- # 上传图片到OSS
- def upload_image(image_path):
- try:
- import http.client
- import mimetypes
- from codecs import encode
-
- # 获取文件名
- filename = os.path.basename(image_path)
-
- # 创建连接
- conn = http.client.HTTPSConnection("jtjai.device.wenhq.top", 8583)
-
- # 准备multipart/form-data
- boundary = 'wL36Yn8afVp8Ag7AmP8qZ0SA4n1v9T'
- dataList = []
- dataList.append(encode('--' + boundary))
- dataList.append(encode('Content-Disposition: form-data; name=file; filename={0}'.format(filename)))
-
- # 猜测文件类型
- fileType = mimetypes.guess_type(image_path)[0] or 'application/octet-stream'
- dataList.append(encode('Content-Type: {}'.format(fileType)))
- dataList.append(encode(''))
-
- # 读取文件内容
- with open(image_path, 'rb') as f:
- dataList.append(f.read())
-
- dataList.append(encode('--'+boundary+'--'))
- dataList.append(encode(''))
-
- # 构建请求体
- body = b'\r\n'.join(dataList)
-
- # 构建请求头
- headers = {
- 'User-Agent': 'Apifox/1.0.0 (https://apifox.com)',
- 'Accept': '*/*',
- 'Host': 'jtjai.device.wenhq.top:8583',
- 'Connection': 'keep-alive',
- 'Content-Type': 'multipart/form-data; boundary={}'.format(boundary)
- }
-
- # 发送请求
- conn.request("POST", "/api/resource/oss/upload", body, headers)
- res = conn.getresponse()
- data = res.read()
-
- # 解析响应
- if res.status == 200:
- result = json.loads(data.decode("utf-8"))
- if result.get('code') == 200:
- return result.get('data', {}).get('purl')
- print(f"上传图片失败: {data.decode('utf-8')}")
- except Exception as e:
- print(f"上传图片异常: {e}")
- return None
- # 创建事件
- def create_event(addr, purl):
- try:
- url = "https://jtjai.device.wenhq.top:8583/api/system/event"
- create_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
- data = {
- "createTime": create_time,
- "addr": addr,
- "ext1": json.dumps([purl]),
- "ext2": json.dumps({"lx":"工地安全"})
- }
- response = requests.post(url, json=data, verify=False)
- if response.status_code == 200:
- result = response.json()
- if result.get('code') == 200:
- print(f"事件创建成功: {addr}")
- return True
- print(f"创建事件失败: {response.text}")
- except Exception as e:
- print(f"创建事件异常: {e}")
- return False
- # 解析命令行参数
- parser = argparse.ArgumentParser(description='YOLO11 目标检测')
- parser.add_argument('--input', type=str, default='b30090c8d3e9bf75f97b2f51a4b3cdd2.jpg', help='输入图片或视频路径')
- parser.add_argument('--output', type=str, default='', help='输出结果路径(不提供则不保存)')
- parser.add_argument('--type', type=str, default='image', choices=['image', 'video'], help='输入类型')
- parser.add_argument('--conf', type=float, default=0.5, help='置信度阈值')
- parser.add_argument('--fps', type=int, default=2, help='每秒处理的帧数')
- args = parser.parse_args()
- # 加载官方 YOLO11n 模型
- model = YOLO('yolo11m_safety.pt')
- # 处理输入
- input_path = args.input
- output_path = args.output
- input_type = args.type
- conf_threshold = args.conf
- process_fps = args.fps
- # 标签映射
- label_map = {0: '安全帽', 4: '安全衣', 3: '人'}
- print(f"使用置信度阈值: {conf_threshold}")
- print(f"每秒处理帧数: {process_fps}")
- # 事件上传时间控制
- last_upload_time = 0
- upload_interval = 2 # 2秒
- if input_type == 'image':
- print("正在进行检测...")
- results = model(input_path)
-
- print(results)
-
- # 读取图片
- image = cv2.imread(input_path)
- if image is None:
- print(f"无法读取图片: {input_path}")
- exit(1)
-
- # 处理检测结果并绘制边界框
- print("正在绘制检测结果...")
- for result in results:
- boxes = result.boxes
- for box in boxes:
- cls = int(box.cls[0])
- conf = float(box.conf[0])
-
- # 根据类别设置不同的置信度阈值
- # 人: > 0.8, 安全帽/反光衣: > 0.5
- if cls == 3: # 人
- conf_threshold_item = 0.8
- else: # 安全帽或反光衣
- conf_threshold_item = 0.5
-
- if conf > conf_threshold_item and cls in label_map:
- x1, y1, x2, y2 = box.xyxy[0].tolist()
- x1, y1, x2, y2 = int(x1), int(y1), int(x2), int(y2)
- print(f" {cls} detected: confidence={conf:.2f}, box=[{x1:.0f}, {y1:.0f}, {x2:.0f}, {y2:.0f}]")
-
- # 绘制边界框
- cv2.rectangle(image, (x1, y1), (x2, y2), (255, 0, 0), 2)
-
- # 绘制标签和置信度
- text = f"{label_map[cls]}: {conf:.2f}"
- text_y = max(15, y1 - 20)
- put_text_chinese(image, text, (x1, text_y), font_size=20, color=(255, 0, 0))
-
- # 保存结果
- if output_path:
- cv2.imwrite(output_path, image)
- print(f"检测结果已保存到: {output_path}")
- else:
- print("未指定输出路径,跳过保存")
-
- # 显示结果
- cv2.imshow('YOLO11 Detection Result', image)
- cv2.waitKey(0)
- cv2.destroyAllWindows()
- elif input_type == 'video':
- # 打开视频文件或RTSP流
- def open_stream():
- cap = cv2.VideoCapture(input_path)
- if not cap.isOpened():
- print(f"无法打开视频或RTSP流: {input_path}")
- return None
- print(f"成功打开: {input_path}")
- return cap
-
- cap = open_stream()
- if not cap:
- exit(1)
-
- # 获取视频信息
- fps = cap.get(cv2.CAP_PROP_FPS)
- width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
- height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
-
- # 检查是否为RTSP流
- is_rtsp = input_path.startswith('rtsp://')
-
- if is_rtsp:
- print(f"RTSP流信息: {width}x{height}")
- # 对于RTSP流,使用固定帧率
- if fps <= 0:
- fps = 30.0
- print(f"使用帧率: {fps:.2f} FPS")
- else:
- print(f"视频信息: {width}x{height}, {fps:.2f} FPS")
-
- # 处理视频帧
- frame_count = 0
- out = None
-
- # 计算帧间隔
- frame_interval = int(round(fps / process_fps)) if fps > 0 else 1
- print(f"帧间隔: {frame_interval} 帧")
-
- # 只有当指定了输出路径时才创建视频写入对象
- if output_path:
- # 创建视频写入对象
- fourcc = cv2.VideoWriter_fourcc(*'mp4v')
- out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
- print(f"将保存结果到: {output_path}")
- else:
- print("未指定输出路径,跳过保存")
-
- # 存储检测结果
- last_detections = []
-
- # 重连计数器
- reconnect_count = 0
- max_reconnects = 10
-
- try:
- while True:
- # 检查连接状态
- if not cap or not cap.isOpened():
- print(f"连接已断开,尝试重新连接... ({reconnect_count}/{max_reconnects})")
-
- # 清理旧连接
- if cap:
- cap.release()
-
- # 尝试重新连接
- cap = open_stream()
- if not cap:
- reconnect_count += 1
- if reconnect_count > max_reconnects:
- print("达到最大重连次数,退出")
- break
- # 等待一段时间后重试
- import time
- time.sleep(2)
- continue
-
- # 重置重连计数器
- reconnect_count = 0
-
- # 重新获取视频信息
- fps = cap.get(cv2.CAP_PROP_FPS)
- width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
- height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
- frame_interval = int(round(fps / process_fps)) if fps > 0 else 1
- print(f"重连成功,新的帧间隔: {frame_interval} 帧")
-
- # 如果需要保存,重新创建视频写入对象
- if output_path and not out:
- fourcc = cv2.VideoWriter_fourcc(*'mp4v')
- out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
- print(f"重新创建视频写入对象")
-
- # 读取帧
- ret, frame = cap.read()
- if not ret:
- print("读取帧失败,可能流已断开")
- # 强制重新连接
- continue
-
- frame_count += 1
-
- # 只有当帧号是帧间隔的倍数时才进行检测
- if frame_count % frame_interval == 0:
- print(f"处理第 {frame_count} 帧...")
-
- try:
- # 进行检测
- results = model(frame)
-
- # 存储检测结果
- last_detections = []
- # 存储人的检测结果,用于后续检查
- person_detections = []
- # 存储安全帽和安全衣的检测结果
- helmet_detections = []
- safety_clothes_detections = []
-
- for result in results:
- boxes = result.boxes
- for box in boxes:
- cls = int(box.cls[0])
- conf = float(box.conf[0])
-
- # 根据类别设置不同的置信度阈值
- # 人: > 0.8, 安全帽/反光衣: > 0.5
- if cls == 3: # 人
- conf_threshold_item = 0.8
- else: # 安全帽或反光衣
- conf_threshold_item = 0.5
-
- if conf > conf_threshold_item and cls in label_map:
- x1, y1, x2, y2 = box.xyxy[0].tolist()
- x1, y1, x2, y2 = int(x1), int(y1), int(x2), int(y2)
- last_detections.append((cls, conf, x1, y1, x2, y2))
-
- # 分类存储检测结果
- if cls == 3: # 人
- person_detections.append((x1, y1, x2, y2, conf))
- elif cls == 0: # 安全帽
- helmet_detections.append((x1, y1, x2, y2, conf))
- elif cls == 4: # 安全衣
- safety_clothes_detections.append((x1, y1, x2, y2, conf))
-
- # 标记是否需要告警
- need_alert = False
- alert_addr = None
-
- # 检查每个人是否戴了安全帽和安全衣
- for person_x1, person_y1, person_x2, person_y2, person_conf in person_detections:
- # 检查是否戴安全帽
- has_helmet = False
- for helmet_x1, helmet_y1, helmet_x2, helmet_y2, helmet_conf in helmet_detections:
- # 简单的重叠检测:安全帽中心是否在人框内
- helmet_center_x = (helmet_x1 + helmet_x2) / 2
- helmet_center_y = (helmet_y1 + helmet_y2) / 2
- if (helmet_center_x >= person_x1 and helmet_center_x <= person_x2 and
- helmet_center_y >= person_y1 and helmet_center_y <= person_y2):
- has_helmet = True
- break
-
- # 检查是否穿安全衣
- has_safety_clothes = False
- for clothes_x1, clothes_y1, clothes_x2, clothes_y2, clothes_conf in safety_clothes_detections:
- # 简单的重叠检测:安全衣与人体有重叠
- overlap_x1 = max(person_x1, clothes_x1)
- overlap_y1 = max(person_y1, clothes_y1)
- overlap_x2 = min(person_x2, clothes_x2)
- overlap_y2 = min(person_y2, clothes_y2)
-
- if overlap_x1 < overlap_x2 and overlap_y1 < overlap_y2:
- has_safety_clothes = True
- break
-
- # 标记是否需要告警
- if not has_helmet or not has_safety_clothes:
- need_alert = True
- # 准备告警信息
- if not has_helmet and not has_safety_clothes:
- alert_addr = "反光衣和安全帽都没戴"
- elif not has_helmet:
- alert_addr = "未戴安全帽"
- else:
- alert_addr = "未穿反光衣"
-
- print(f"警告: {alert_addr},置信度: {person_conf:.2f}")
- except Exception as e:
- print(f"检测过程中出错: {e}")
- # 继续处理,使用上一次的检测结果
-
- # 绘制上一次的检测结果
- for cls, conf, x1, y1, x2, y2 in last_detections:
- # 绘制边界框
- cv2.rectangle(frame, (x1, y1), (x2, y2), (255, 0, 0), 2)
-
- # 绘制标签和置信度
- text = f"{label_map[cls]}: {conf:.2f}"
- text_y = max(15, y1 - 20)
- put_text_chinese(frame, text, (x1, text_y), font_size=20, color=(255, 0, 0))
-
- # 检查是否需要告警并上传图片
- # 上传的必要条件是先识别到人
- if 'person_detections' in locals() and person_detections and 'need_alert' in locals() and need_alert and alert_addr:
- # 检查是否在2秒内已经上传过
- current_time = time.time()
- if current_time - last_upload_time >= upload_interval:
- print(f"检测到人,触发告警上传")
- # 保存带标签的告警帧
- temp_image_path = f"alert_frame_{frame_count}.jpg"
- cv2.imwrite(temp_image_path, frame)
-
- # 上传图片
- purl = upload_image(temp_image_path)
- if purl:
- # 创建事件
- create_event(alert_addr, purl)
- # 更新最后上传时间
- last_upload_time = current_time
-
- # 清理临时文件
- if os.path.exists(temp_image_path):
- os.remove(temp_image_path)
- else:
- print(f"2秒内已上传过事件,跳过本次上传")
-
- # 只有当指定了输出路径时才写入处理后的帧
- if out:
- try:
- out.write(frame)
- except Exception as e:
- print(f"写入视频失败: {e}")
-
- # 显示处理后的帧
- try:
- cv2.imshow('YOLO11 Detection Result', frame)
-
- # 按 'q' 键退出
- if cv2.waitKey(1) & 0xFF == ord('q'):
- break
- except Exception as e:
- print(f"显示帧失败: {e}")
- # 继续处理
- finally:
- # 清理资源
- cap.release()
- if out:
- out.release()
- cv2.destroyAllWindows()
-
- if output_path:
- print(f"视频处理完成,结果已保存到: {output_path}")
- else:
- print("视频处理完成")
|