【CV学习笔记】多目标跟踪之bytetrack_v1
1、前言
ByteTrack是一个通用的基于检测的多目标跟踪的方法,能够适用于各种框架,本文将会对算法原理、代码进一步的学习。
2、原理简介
与deepsort原理类似,但是目标跟踪时,仅仅使用了卡尔曼滤波来预测目标框,然后利用匈牙利算法来匹配检测框与轨迹。ByteTrack中用多次匹配的方法,首先将得分较高的目标框与历史轨迹相匹配,然后将得分较低的目标框与与第一次没有匹配上的轨迹匹配,用于检测目标遮挡的情形。相对于deepsort,直接减少了ReID模型,更加方便移动端的部署。
由于代码里面存在多种轨迹,因此,首先需要对轨迹进行分类,避免在代码阅读时出现混淆的情形。
轨迹的状态可分为4种:
- New:当前帧发现的新轨迹
- Tracked:已跟踪轨迹,连续两帧都跟踪上的轨迹
- Lost:丢失轨迹,前n帧都未匹配上的轨迹
- Removed:即将删除的轨迹,连续n帧都未匹配上的轨迹
而轨迹的活跃状态分为: - is_activated:True为当前帧匹配上的轨迹
- is_activated:False为当前值未匹配上的轨迹
ByteTrack的的主要步骤已经在代码里面进行了详细的说明,跟着代码一步一步就能捋顺算法的步骤了。
3、代码解析
3.1、toos/demo_track.py
def imageflow_demo(predictor:Predictor, vis_folder, current_time, args):...while True:if ret_val:# 目标检测部分# outputs:[xxx, 7] outputs, img_info = predictor.inference(frame, timer)# 目标跟踪部分if outputs[0] is not None:online_targets = tracker.update(outputs[0], [img_info['height'], img_info['width']], exp.test_size) # -> yolox/tracker/byte_tracker.py
3.2 yolox/tracker/byte_tracker.py
class STrack(BaseTrack):shared_kalman = KalmanFilter()def __init__(self, tlwh, score):# 新建轨迹# 轨迹属性self._tlwh = np.asarray(tlwh, dtype=np.float)self.kalman_filter = None# 均值方差self.mean, self.covariance = None, None# 不活跃的轨迹self.is_activated = Falseself.score = score# 被跟踪的次数self.tracklet_len = 0def predict(self):mean_state = self.mean.copy()if self.state != TrackState.Tracked:mean_state[7] = 0更新新的均值与方差 self.mean, self.covariance = self.kalman_filter.predict(mean_state, self.covariance)@staticmethoddef multi_predict(stracks):# 预测多个轨迹if len(stracks) > 0:multi_mean = np.asarray([st.mean.copy() for st in stracks]) # 均值multi_covariance = np.asarray([st.covariance for st in stracks]) # 方差for i, st in enumerate(stracks):if st.state != TrackState.Tracked: # 未被跟踪multi_mean[i][7] = 0multi_mean, multi_covariance = STrack.shared_kalman.multi_predict(multi_mean, multi_covariance)for i, (mean, cov) in enumerate(zip(multi_mean, multi_covariance)):stracks[i].mean = meanstracks[i].covariance = cov def activate(self, kalman_filter, frame_id):"""Start a new tracklet"""# 开始一个新的轨迹# 初始化一个卡尔曼滤波器self.kalman_filter = kalman_filter# 跟踪IDself.track_id = self.next_id()# 初始化卡尔曼滤波参数self.mean, self.covariance = self.kalman_filter.initiate(self.tlwh_to_xyah(self._tlwh))# 跟踪次数设为0self.tracklet_len = 0self.state = TrackState.Tracked # 状态设置为 "已经被跟踪"def re_activate(self, new_track, frame_id, new_id=False):# 将一个旧的轨迹的状态修改为 "活跃"self.mean, self.covariance = self.kalman_filter.update(self.mean, self.covariance, self.tlwh_to_xyah(new_track.tlwh))self.tracklet_len = 0 # 跟踪的次数设置为 0 self.state = TrackState.Tracked # 跟踪状态设为 "被跟踪"self.is_activated = True # 设置为活跃轨迹def update(self, new_track, frame_id):# 更新以跟踪的轨迹的信息self.tracklet_len += 1 # 跟踪的次数+1new_tlwh = new_track.tlwh # 新的目标框# 根据当前的位置预测新的 self.mean, self.covarianceself.mean, self.covariance = self.kalman_filter.update(self.mean, self.covariance, self.tlwh_to_xyah(new_tlwh))self.state = TrackState.Tracked # 状态设置为 "已跟踪"self.is_activated = True # 活跃轨迹class BYTETracker(object):def __init__(self, args, frame_rate=30):...def update(self, output_results, img_info, img_size):"""output_results:目标检测结果"""activated_starcks = [] # 保存当前帧中的活跃轨迹(活跃轨迹是已跟踪的轨迹)refind_stracks = [] # 保存当前帧匹配到之前目标丢失的轨迹(不活跃的轨迹)lost_stracks = [] # 保存当前帧没有匹配到目标的轨迹removed_stracks = [] # 保存当前帧需要删除的轨迹if output_results.shape[1] == 5:...else:output_results = output_results.cpu().numpy()# x1,y1,x2,y2,objectness,label_score,labelscores = output_results[:, 4] * output_results[:, 5] # 前景概率 * 类别概率bboxes = output_results[:, :4] # x1,y1,x2,y2 remain_inds = scores > self.args.track_thresh # 提取当前值目标框中得分大于跟踪阈值的框inds_low = scores > 0.1 # 提取当前值目标框中得分大于0.1的框inds_high = scores < self.args.track_thresh # 提取当前帧目标框中得分小于跟踪阈值的框inds_second = np.logical_and(inds_low, inds_high) # 提取目标框中得分小于跟踪阈值的框分数处于0.1<分数<跟踪阈值,用于匹配 已跟踪但不活跃的轨迹(目标遮挡等。。。)dets_second = bboxes[inds_second] # 提取分得分处于 0.1<分数<跟踪阈值的目标框dets = bboxes[remain_inds] # 提取分得分处于 大于跟踪阈值的目标框scores_keep = scores[remain_inds] # 提取得分大于跟踪阈值的目标框的得分scores_second = scores[inds_second] # 提取分得分处于 0.1<分数<跟踪阈值 目标框的得分if len(dets) > 0:# 为当前帧每个大于跟踪阈值的目标框初始化一个 轨迹STrackdetections = [STrack(STrack.tlbr_to_tlwh(tlbr), s) for(tlbr, s) in zip(dets, scores_keep)]else:...unconfirmed = [] # 储存未确认的框tracked_stracks = [] # 历史帧已经跟踪上的轨迹# 遍历已跟踪的轨迹(包含 活跃和不活跃两种) for track in self.tracked_stracks:if not track.is_activated:# 不活跃轨迹unconfirmed.append(track) else:# 活跃轨迹tracked_stracks.append(track)#---- 第一次匹配 ----# 将活跃轨迹与丢失轨迹合并strack_pool = joint_stracks(tracked_stracks, self.lost_stracks)# 预测strack_pool中 每个轨迹的在当前帧的 mean和convarianceSTrack.multi_predict(strack_pool)# 将当前帧中,得分高于跟踪阈值的轨迹与strack_pool中的所有轨迹 进行IOU计算dists = matching.iou_distance(strack_pool, detections) #dists为二维矩阵 x轴:strack_pool , y轴:当前检测结果中阈值大的轨迹 # 使用匈牙利匹配算法# matches为已经已跟踪的轨迹 匹配上的 当前帧 检测出来的轨迹# u_track 为 已经跟踪的轨迹 未匹配上 当前帧 检测出来的轨迹# u_detection 为 当前帧 检测出来的轨迹 为匹配上 已经跟踪的轨迹matches, u_track , u_detection = matching.linear_assignment(dists, thresh=self.args.match_thresh)# 遍历匹配上的轨迹for itracked, idet in matches: track = strack_pool[itracked] # stack_pool 中的第几个轨迹 det = detections[idet] # 当前帧检测出来的第几个轨迹if track.state == TrackState.Tracked:# 当前轨迹的状态为已被跟踪# 更新当前track的mean, covariance ,并将self.is_activated 设置为 True,跟踪长度+1track.update(detections[idet], self.frame_id) else:# 更新当前track的mean, covariance ,并将self.is_activated 设置为 True,跟踪长度初始化为0track.re_activate(det, self.frame_id, new_id=False) refind_stracks.append(track) # 重新找回的轨迹# 第二次匹配:和低分的矩阵进行匹配if len(dets_second) > 0:# 为每个低分目标框 初始化一个轨迹detections_second = [STrack(STrack.tlbr_to_tlwh(tlbr), s) for(tlbr, s) in zip(dets_second, scores_second)]else:...# 找到第一次没有匹配上的轨迹 ,但是状态为已跟踪的轨迹(由于运动、遮挡,导致轨迹匹配度较小)r_tracked_stracks = [strack_pool[i] for i in u_track if strack_pool[i].state == TrackState.Tracked]# 计算 r_tracked_stracks 与 detections_second (低分轨迹)之间的IOUdists = matching.iou_distance(r_tracked_stracks, detections_second)# 匈牙利匹配matches, u_track, u_detection_second = matching.linear_assignment(dists, thresh=0.5)for itracked, idet in matches:track = r_tracked_stracks[itracked]det = detections_second[idet]if track.state == TrackState.Tracked:# 第一次匹配中未匹配到的轨迹 与 低分轨迹匹配上track.update(det, self.frame_id)activated_starcks.append(track)else:track.re_activate(det, self.frame_id, new_id=False)refind_stracks.append(track)# 遍历第二次也没匹配上的轨迹,调用mark_losk方法,并加入lost_stracks,等待下一帧匹配for it in u_track:# 如果状态不为 Lostif not track.state == TrackState.Lost:track.mark_lost() # 将状态标记为 Lost 在下一帧中会会继续进行匹配,如本函数开始时 合并已跟踪的轨迹以及丢失的轨迹lost_stracks.append(track)# 当前帧检出来的但是没有匹配任何历史轨迹 的轨迹 ,即当前帧检测出来的 新轨迹detections = [detections[i] for i in u_detection] # 新轨迹与 历史未被确认(状态为Lost)的轨迹匹配dists = matching.iou_distance(unconfirmed, detections)matches, u_unconfirmed, u_detection = matching.linear_assignment(dists, thresh=0.7)for itracked, idet in matches:# 丢失的轨迹重新找回unconfirmed[itracked].update(detections[idet], self.frame_id)activated_starcks.append(unconfirmed[itracked])# 遍历第二次匹配中,历史轨迹没有与当前帧检测出来的轨迹相匹配的轨迹for it in u_unconfirmed:track = unconfirmed[it]track.mark_removed() # 将状态设置为 Removed 需要删除的轨迹removed_stracks.append(track)# 遍历u_detection(前两步都没匹配到历史轨迹的的目标框,且得分超过跟踪阈值的)认为它是新的目标for inew in u_detection:track = detections[inew]if track.score < self.det_thresh:continue# 激活一个新的轨迹track.activate(self.kalman_filter, self.frame_id)activated_starcks.append(track)for track in self.lost_stracks:# 删除消失时间过长的轨迹if self.frame_id - track.end_frame > self.max_time_lost:track.mark_removed()removed_stracks.append(track)# 筛选出已跟踪的轨迹self.tracked_stracks = [t for t in self.tracked_stracks if t.state == TrackState.Tracked]# 将当前帧重新出现的活跃轨迹 以及 第一次出现的活跃轨迹合并self.tracked_stracks = joint_stracks(self.tracked_stracks, activated_starcks)# 将重新找到的轨迹合并到已跟踪的轨迹self.tracked_stracks = joint_stracks(self.tracked_stracks, refind_stracks)# self.lost_stracks = sub_stracks(self.lost_stracks, self.tracked_stracks)# 筛选出 lost 轨迹,参与下一帧的匹配self.lost_stracks = sub_stracks(self.lost_stracks, self.tracked_stracks)# 将本帧新发现的 lost_stracks 添加到 self.lost_stracksself.lost_stracks.extend(lost_stracks)# 在lost轨迹中剔除 要删除的轨迹self.lost_stracks = sub_stracks(self.lost_stracks, self.removed_stracks)# 添加本帧要删除的轨迹self.removed_stracks.extend(removed_stracks)# 去除重复的轨迹self.tracked_stracks, self.lost_stracks = remove_duplicate_stracks(self.tracked_stracks, self.lost_stracks)# 返回 当前帧活跃的轨迹output_stracks = [track for track in self.tracked_stracks if track.is_activated]output_stracks
3.3、matching.py
def iou_distance(atracks, btracks):Compute cost based on IoU:type atracks: list[STrack]:type btracks: list[STrack]:rtype cost_matrix np.ndarray# 利用iou计算代价矩阵if (len(atracks)>0 and isinstance(atracks[0], np.ndarray)) or (len(btracks) > 0 and isinstance(btracks[0], np.ndarray)):atlbrs = atracksbtlbrs = btrackselse:# 轨迹的tlbratlbrs = [track.tlbr for track in atracks]btlbrs = [track.tlbr for track in btracks]# 计算atlbrs与btlbrs之间的iou_ious = ious(atlbrs与, btlbrs)# 求 1 - _ious 的最小值cost_matrix = 1 - _iousreturn cost_matrixdef ious(atlbrs, btlbrs):ious = np.zeros((len(atlbrs), len(btlbrs)), dtype=np.float)ious = bbox_ious(np.ascontiguousarray(atlbrs, dtype=np.float),np.ascontiguousarray(btlbrs, dtype=np.float))return iousdef linear_assignment(cost_matrix, thresh):# cost: 代价矩阵,如果return_cost为False,则不返回。# x: 一个大小为n的数组,用于指定 cost代价矩阵中 x轴的轨迹(已存在轨迹)与y轴的轨迹(当前帧大于跟踪阈值的轨迹)匹配# y: 一个大小为n的数组,用于指定 cost代价矩阵中 y轴与y轴的轨迹匹配cost, x, y = lap.lapjv(cost_matrix, extend_cost=True, cost_limit=thresh)# 遍历已跟踪轨迹对当前轨迹的序列 for ix, mx in enumerate(x):# 匹配成功if mx >= 0:# 记录第ix个轨迹匹配上,且对应当前帧的轨迹为 第mx个matches.append([ix, mx]) # 已跟踪轨迹 对 当前帧轨迹 未匹配上的结果unmatched_a = np.where(x < 0)[0]# 当前帧轨迹 对 已跟踪轨迹 未匹配上的结果unmatched_b = np.where(y < 0)[0]return matches, unmatched_a, unmatched_b
3.4yolox/tracker/kalman_filter.py
class KalmanFilter:"""x, y, a, h, vx, vy, va, vh选择其中的 (x, y, a, h)作为状态变量"""def __init__(self):ndim, dt = 4, 1.self._motion_mat = np.eye(2 * ndim, 2 * ndim)for i in range(ndim):self._motion_mat[i, ndim + i] = dtself._update_mat = np.eye(ndim, 2 * ndim)self._std_weight_position = 1. / 20self._std_weight_velocity = 1. / 160
```
4、总结
从官方中自带的结果中可以看出,其效果还是可以的,最近ByteTrackv2也已经出来了,并且还只是3d框的跟踪,包括BEV视角下的目标跟踪,等代码出来后,再一起学习吧!