-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathon_drone.py
More file actions
240 lines (186 loc) · 7.99 KB
/
on_drone.py
File metadata and controls
240 lines (186 loc) · 7.99 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# 添加多线程,发布和订阅
import numpy as np
import math
import rospy
import cv2
import threading
from ctrl_msgs.msg import command
kernel = np.ones((5, 5), np.uint8)
frame = None
frame_lock = threading.Lock()
velocity_pub = None
cnt = 0
body_center = (640, 320)
def calculate_angle(center, new_center):
new_center = (640, 320) # 示例定义
delta_x = center[0] - new_center[0]
delta_y = -(center[1] - new_center[1])
angle = np.arctan2(delta_y, delta_x) * (180 / np.pi) # 转换为度
return angle
def calculate_velocity(x1, y1, c1, c2, max_speed):
# 计算目标中心与图像中心的偏差
delta_x = c1 - x1
delta_y = c2 - y1
# 计算与目标的距离
distance = np.sqrt(delta_x**2 + delta_y**2)
# 线性减速,距离越小,速度越小
# 可以通过一个常数调整速度衰减的效果
arr = distance/300
if arr>1:
arr = 1
speed = arr * max_speed
# 计算角度
angle = np.arctan2(delta_y, delta_x) # 计算与 x 轴的角度
# 计算速度分量
vy = np.cos(angle) * speed # x 方向的速度分量
vz = np.sin(angle) * speed # y 方向的速度分量
return vy, vz
def detect_red_blue_circles(frame):
flag = 0
finishcv_flag = 0
body_center = (640, 320)
# 将帧调整为 HSV 色彩空间
#frame = cv2.resize(frame, (640, 360), interpolation=cv2.INTER_LINEAR)
hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
# 定义红色和蓝色的 HSV 范围
lower_red2 = np.array([155, 70, 70])
upper_red2 = np.array([180, 255, 255])
lower_blue = np.array([91, 38, 175])
upper_blue = np.array([121, 111, 255])
# 创建掩模
mask_red2 = cv2.inRange(hsv, lower_red2, upper_red2)
mask_blue = cv2.inRange(hsv, lower_blue, upper_blue)
# 膨胀操作
kernel = np.ones((5, 5), np.uint8) # 添加膨胀核
mask_red_dilated = cv2.dilate(mask_red2, kernel, iterations=5)
mask_blue_dilated = cv2.dilate(mask_blue, kernel, iterations=5)
contours_red, _ = cv2.findContours(mask_red_dilated, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
contours_blue, _ = cv2.findContours(mask_blue_dilated, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
center_red = None
center_blue = None
if contours_red:
largest_red = max(contours_red, key=cv2.contourArea)
if cv2.contourArea(largest_red) > 500:
x, y, w, h = cv2.boundingRect(largest_red)
cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 0), 2)
center_red = (x + w // 2, y + h // 2)
cv2.circle(frame, center_red, 5, (0, 255, 0), -1)
if contours_blue:
largest_blue = max(contours_blue, key=cv2.contourArea)
if cv2.contourArea(largest_blue) > 500:
x, y, w, h = cv2.boundingRect(largest_blue)
cv2.rectangle(frame, (x, y), (x + w, y + h), (255, 0, 0), 2)
center_blue = (x + w // 2, y + h // 2)
cv2.circle(frame, center_blue, 5, (255, 0, 0), -1)
# 如果检测到红色和蓝色圆圈
if center_red and center_blue:
body_center = ((center_red[0] + center_blue[0]) // 2, (center_red[1] + center_blue[1]) // 2 -30)
cv2.line(frame, center_red, center_blue, (255, 255, 255), 2)
cv2.circle(frame, body_center, 5, (255, 255, 255), -1)
# 检查连线的长度
width = frame.shape[1]
distance = np.linalg.norm(np.array(center_red) - np.array(center_blue))
if distance > (width * 3/ 4):
# 在左上角绘制红色小圆圈
cv2.circle(frame, (20, 20), 5, (0, 0, 255), -1)
finishcv_flag = 2
# 计算图像中心
#image_center = (frame.shape[1] // 2, frame.shape[0] // 2)
# 计算图像中心与 body_center 之间的距离
if body_center is not None:
# 将 image_center 替换为确定的坐标 (320, 150)
new_center = (640, 320)
dist_to_body_center = np.linalg.norm(np.array(new_center) - np.array(body_center))
# 更新 bingo 值
if dist_to_body_center < 100:
flag += 1
else:
flag = 0
return frame, body_center, flag, finishcv_flag
def CV_flag_cb(msg):
global CV_flag
rospy.loginfo("Received data: %d", msg.CV_flag)
CV_flag = msg.CV_flag
class CameraViewer:
def __init__(self):
rospy.init_node('camera_viewer', anonymous=True)
self.cap = cv2.VideoCapture(6) # 打开指定摄像头
self.running = True
def capture_frame(self):
global frame
while self.running:
ret, captured_frame = self.cap.read()
if ret:
with frame_lock:
frame = captured_frame
else:
rospy.logerr("Failed to capture image")
def process_frame(self):
global frame,cnt,body_center
finishcv_flag = 0
rate = rospy.Rate(8) # 设置发布频率为20Hz
while self.running:
with frame_lock:
if frame is not None:
# 在这里添加图像处理逻辑
#processed_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) # 转换为灰度图像
frame , body_center ,flag ,fflag = detect_red_blue_circles(frame)
finishcv_flag = fflag
# 更新 flag 值
if flag == 1:
cnt += 1
else:
cnt = 0
# 检查 flag 值
if cnt > 2:
finishcv_flag = 1
if fflag == 2:
finishcv_flag = 2
# 发布消息
velocity_msg = command()
new_center = (640, 320)
if body_center != (0, 0):
Vy, Vz = calculate_velocity(body_center[0], body_center[1], new_center[0], new_center[1], 0.40)
# 限制速度
Vy = np.clip(Vy, -0.25, 0.25)
Vz = np.clip(Vz, -0.2, 0.2)
velocity_msg.vy = Vy
velocity_msg.vz = Vz
velocity_msg.Finishcv_flag = finishcv_flag
rospy.loginfo(f"vy: {velocity_msg.vy}, vz: {velocity_msg.vz}, x: {body_center[0]}, y: {body_center[1]}")
# 绘制速度矢量
arrow_start = (int(body_center[0]), int(body_center[1])) # 目标中心
arrow_end = (int(new_center[0]), int(new_center[1])) # 速度矢量的终点
# 在图像上绘制箭头
cv2.arrowedLine(frame, arrow_start, arrow_end, (0, 255, 0), 2, tipLength=0.1)
# 标注速度值
speed_text = f"Speed (Vy, Vz): ({Vy:.2f}, {Vz:.2f})"
cv2.putText(frame, speed_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2)
# 显示图像
cv2.imshow("Camera Feed", frame)
velocity_pub.publish(velocity_msg) # 发布速度消息
rate.sleep() # 控制循环频率
if cv2.waitKey(1) & 0xFF == ord('q'): # 添加退出条件
break
def run(self):
global velocity_pub
velocity_pub = rospy.Publisher("task/cv_task", command, queue_size=1)
rospy.Subscriber("task/task_pub", command, CV_flag_cb)
capture_thread = threading.Thread(target=self.capture_frame)
capture_thread.start()
# 在主线程中处理帧
self.process_frame()
# 等待线程结束
capture_thread.join()
def cleanup(self):
self.running = False
self.cap.release()
cv2.destroyAllWindows()
if __name__ == '__main__':
viewer = CameraViewer()
try:
viewer.run()
except rospy.ROSInterruptException:
viewer.cleanup()