track_queue
dreem.inference.track_queue
¶
Module handling sliding window tracking.
TrackQueue
¶
Class handling track local queue system for sliding window.
Each trajectory has its own deque based queue of size window_size - 1
.
Elements of the queue are Instance objects that have already been tracked
and will be compared against later frames for assignment.
Source code in dreem/inference/track_queue.py
class TrackQueue:
"""Class handling track local queue system for sliding window.
Each trajectory has its own deque based queue of size `window_size - 1`.
Elements of the queue are Instance objects that have already been tracked
and will be compared against later frames for assignment.
"""
def __init__(
self, window_size: int, max_gap: int = np.inf, verbose: bool = False
) -> None:
"""Initialize track queue.
Args:
window_size: The number of instances per trajectory allowed in the
queue to be compared against.
max_gap: The number of consecutive frames a trajectory can fail to
appear in before terminating the track.
verbose: Whether to print info during operations.
"""
self._window_size = window_size
self._queues = {}
self._max_gap = max_gap
self._curr_gap = {}
if self._max_gap <= self._window_size:
self._max_gap = self._window_size
self._curr_track = -1
self._verbose = verbose
def __len__(self) -> int:
"""Get length of the queue.
Returns:
The total number of instances in every sub-queue.
"""
return sum([len(queue) for queue in self._queues.values()])
def __repr__(self) -> str:
"""Return the string representation of the TrackQueue.
Returns:
The string representation of the current state of the queue.
"""
return (
"TrackQueue("
f"window_size={self.window_size}, "
f"max_gap={self.max_gap}, "
f"n_tracks={self.n_tracks}, "
f"curr_track={self.curr_track}, "
f"queues={[(key,len(queue)) for key, queue in self._queues.items()]}, "
f"curr_gap:{self._curr_gap}"
")"
)
@property
def window_size(self) -> int:
"""The maximum number of instances allowed in a sub-queue to be compared against.
Returns:
An int representing The maximum number of instances allowed in a
sub-queue to be compared against.
"""
return self._window_size
@window_size.setter
def window_size(self, window_size: int) -> None:
"""Set the window size of the queue.
Args:
window_size: An int representing The maximum number of instances
allowed in a sub-queue to be compared against.
"""
self._window_size = window_size
@property
def max_gap(self) -> int:
"""The maximum number of consecutive frames an trajectory can fail to appear before termination.
Returns:
An int representing the maximum number of consecutive frames an trajectory can fail to
appear before termination.
"""
return self._max_gap
@max_gap.setter
def max_gap(self, max_gap: int) -> None:
"""Set the max consecutive frame gap allowed for a trajectory.
Args:
max_gap: An int representing the maximum number of consecutive frames an trajectory can fail to
appear before termination.
"""
self._max_gap = max_gap
@property
def curr_track(self) -> int:
"""The newest *created* trajectory in the queue.
Returns:
The latest *created* trajectory in the queue.
"""
return self._curr_track
@curr_track.setter
def curr_track(self, curr_track: int) -> None:
"""Set the newest *created* trajectory in the queue.
Args:
curr_track: The latest *created* trajectory in the queue.
"""
self._curr_track = curr_track
@property
def n_tracks(self) -> int:
"""The current number of trajectories in the queue.
Returns:
An int representing the current number of trajectories in the queue.
"""
return len(self._queues.keys())
@property
def tracks(self) -> list:
"""A list of the track ids currently in the queue.
Returns:
A list containing the track ids currently in the queue.
"""
return list(self._queues.keys())
@property
def verbose(self) -> bool:
"""Indicate whether or not to print outputs along operations. Mostly used for debugging.
Returns:
A boolean representing whether or not printing is turned on.
"""
return self._verbose
@verbose.setter
def verbose(self, verbose: bool) -> None:
"""Turn on/off printing.
Args:
verbose: A boolean representing whether printing should be on or off.
"""
self._verbose = verbose
def end_tracks(self, track_id: int = None) -> bool:
"""Terminate tracks and removing them from the queue.
Args:
track_id: The index of the trajectory to be ended and removed.
If `None` then then every trajectory is removed and the track queue is reset.
Returns:
True if the track is successively removed, otherwise False.
(ie if the track doesn't exist in the queue.)
"""
if track_id is None:
self._queues = {}
self._curr_gap = {}
self.curr_track = -1
else:
try:
self._queues.pop(track_id)
self._curr_gap.pop(track_id)
except KeyError:
print(f"Track ID {track_id} not found in queue!")
return False
return True
def add_frame(self, frame: Frame) -> None:
"""Add frames to the queue.
Each instance from the frame is added to the queue according to its pred_track_id.
If the corresponding trajectory is not already in the queue then create a new queue for the track.
Args:
frame: A Frame object containing instances that have already been tracked.
"""
if frame.num_detected == 0: # only add frames with instances.
return
vid_id = frame.video_id.item()
frame_id = frame.frame_id.item()
img_shape = frame.img_shape
if isinstance(frame.video, str):
vid_name = frame.video
else:
vid_name = frame.video.filename
# traj_score = frame.get_traj_score() TODO: figure out better way to save trajectory scores.
frame_meta = (vid_id, frame_id, vid_name, img_shape.cpu().tolist())
pred_tracks = []
for instance in frame.instances:
pred_track_id = instance.pred_track_id.item()
pred_tracks.append(pred_track_id)
if pred_track_id not in self._queues.keys():
self._queues[pred_track_id] = deque(
[(*frame_meta, instance)], maxlen=self.window_size - 1
) # dumb work around to retain `img_shape`
self.curr_track = pred_track_id
if self.verbose:
warnings.warn(
f"New track = {pred_track_id} on frame {frame_id}! Current number of tracks = {self.n_tracks}"
)
else:
self._queues[pred_track_id].append((*frame_meta, instance))
self.increment_gaps(
pred_tracks
) # should this be done in the tracker or the queue?
def collate_tracks(
self, track_ids: list[int] = None, device: str = None
) -> list[Frame]:
"""Merge queues into a single list of Frames containing corresponding instances.
Args:
track_ids: A list of trajectorys to merge. If None, then merge all
queues, otherwise filter queues by track_ids then merge.
device: A str representation of the device the frames should be on after merging
since all instances in the queue are kept on the cpu.
Returns:
A sorted list of Frame objects from which each instance came from,
containing the corresponding instances.
"""
if len(self._queues) == 0:
return []
frames = {}
tracks_to_convert = (
{track: queue for track, queue in self._queues if track in track_ids}
if track_ids is not None
else self._queues
)
for track, instances in tracks_to_convert.items():
for video_id, frame_id, vid_name, img_shape, instance in instances:
if (video_id, frame_id) not in frames.keys():
frame = Frame(
video_id,
frame_id,
img_shape=img_shape,
instances=[instance],
vid_file=vid_name,
)
frames[(video_id, frame_id)] = frame
else:
frames[(video_id, frame_id)].instances.append(instance)
return [frames[frame].to(device) for frame in sorted(frames.keys())]
def increment_gaps(self, pred_track_ids: list[int]) -> dict[int, bool]:
"""Keep track of number of consecutive frames each trajectory has been missing from the queue.
If a trajectory has exceeded the `max_gap` then terminate the track and remove it from the queue.
Args:
pred_track_ids: A list of track_ids to be matched against the trajectories in the queue.
If a trajectory is in `pred_track_ids` then its gap counter is reset,
otherwise its incremented by 1.
Returns:
A dictionary containing the trajectory id and a boolean value representing
whether or not it has exceeded the max allowed gap and been
terminated.
"""
exceeded_gap = {}
for track in pred_track_ids:
if track not in self._curr_gap:
self._curr_gap[track] = 0
for track in self._curr_gap:
if track not in pred_track_ids:
self._curr_gap[track] += 1
if self.verbose:
warnings.warn(
f"Track {track} has not been seen for {self._curr_gap[track]} frames."
)
else:
self._curr_gap[track] = 0
if self._curr_gap[track] >= self.max_gap:
exceeded_gap[track] = True
else:
exceeded_gap[track] = False
for track, gap_exceeded in exceeded_gap.items():
if gap_exceeded:
if self.verbose:
warnings.warn(
f"Track {track} has not been seen for {self._curr_gap[track]} frames! Terminating Track...Current number of tracks = {self.n_tracks}."
)
self._queues.pop(track)
self._curr_gap.pop(track)
return exceeded_gap
curr_track: int
property
writable
¶
The newest created trajectory in the queue.
Returns:
Type | Description |
---|---|
int
|
The latest created trajectory in the queue. |
max_gap: int
property
writable
¶
The maximum number of consecutive frames an trajectory can fail to appear before termination.
Returns:
Type | Description |
---|---|
int
|
An int representing the maximum number of consecutive frames an trajectory can fail to appear before termination. |
n_tracks: int
property
¶
The current number of trajectories in the queue.
Returns:
Type | Description |
---|---|
int
|
An int representing the current number of trajectories in the queue. |
tracks: list
property
¶
A list of the track ids currently in the queue.
Returns:
Type | Description |
---|---|
list
|
A list containing the track ids currently in the queue. |
verbose: bool
property
writable
¶
Indicate whether or not to print outputs along operations. Mostly used for debugging.
Returns:
Type | Description |
---|---|
bool
|
A boolean representing whether or not printing is turned on. |
window_size: int
property
writable
¶
The maximum number of instances allowed in a sub-queue to be compared against.
Returns:
Type | Description |
---|---|
int
|
An int representing The maximum number of instances allowed in a sub-queue to be compared against. |
__init__(window_size, max_gap=np.inf, verbose=False)
¶
Initialize track queue.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
window_size |
int
|
The number of instances per trajectory allowed in the queue to be compared against. |
required |
max_gap |
int
|
The number of consecutive frames a trajectory can fail to appear in before terminating the track. |
inf
|
verbose |
bool
|
Whether to print info during operations. |
False
|
Source code in dreem/inference/track_queue.py
def __init__(
self, window_size: int, max_gap: int = np.inf, verbose: bool = False
) -> None:
"""Initialize track queue.
Args:
window_size: The number of instances per trajectory allowed in the
queue to be compared against.
max_gap: The number of consecutive frames a trajectory can fail to
appear in before terminating the track.
verbose: Whether to print info during operations.
"""
self._window_size = window_size
self._queues = {}
self._max_gap = max_gap
self._curr_gap = {}
if self._max_gap <= self._window_size:
self._max_gap = self._window_size
self._curr_track = -1
self._verbose = verbose
__len__()
¶
Get length of the queue.
Returns:
Type | Description |
---|---|
int
|
The total number of instances in every sub-queue. |
__repr__()
¶
Return the string representation of the TrackQueue.
Returns:
Type | Description |
---|---|
str
|
The string representation of the current state of the queue. |
Source code in dreem/inference/track_queue.py
def __repr__(self) -> str:
"""Return the string representation of the TrackQueue.
Returns:
The string representation of the current state of the queue.
"""
return (
"TrackQueue("
f"window_size={self.window_size}, "
f"max_gap={self.max_gap}, "
f"n_tracks={self.n_tracks}, "
f"curr_track={self.curr_track}, "
f"queues={[(key,len(queue)) for key, queue in self._queues.items()]}, "
f"curr_gap:{self._curr_gap}"
")"
)
add_frame(frame)
¶
Add frames to the queue.
Each instance from the frame is added to the queue according to its pred_track_id. If the corresponding trajectory is not already in the queue then create a new queue for the track.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
frame |
Frame
|
A Frame object containing instances that have already been tracked. |
required |
Source code in dreem/inference/track_queue.py
def add_frame(self, frame: Frame) -> None:
"""Add frames to the queue.
Each instance from the frame is added to the queue according to its pred_track_id.
If the corresponding trajectory is not already in the queue then create a new queue for the track.
Args:
frame: A Frame object containing instances that have already been tracked.
"""
if frame.num_detected == 0: # only add frames with instances.
return
vid_id = frame.video_id.item()
frame_id = frame.frame_id.item()
img_shape = frame.img_shape
if isinstance(frame.video, str):
vid_name = frame.video
else:
vid_name = frame.video.filename
# traj_score = frame.get_traj_score() TODO: figure out better way to save trajectory scores.
frame_meta = (vid_id, frame_id, vid_name, img_shape.cpu().tolist())
pred_tracks = []
for instance in frame.instances:
pred_track_id = instance.pred_track_id.item()
pred_tracks.append(pred_track_id)
if pred_track_id not in self._queues.keys():
self._queues[pred_track_id] = deque(
[(*frame_meta, instance)], maxlen=self.window_size - 1
) # dumb work around to retain `img_shape`
self.curr_track = pred_track_id
if self.verbose:
warnings.warn(
f"New track = {pred_track_id} on frame {frame_id}! Current number of tracks = {self.n_tracks}"
)
else:
self._queues[pred_track_id].append((*frame_meta, instance))
self.increment_gaps(
pred_tracks
) # should this be done in the tracker or the queue?
collate_tracks(track_ids=None, device=None)
¶
Merge queues into a single list of Frames containing corresponding instances.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
track_ids |
list[int]
|
A list of trajectorys to merge. If None, then merge all queues, otherwise filter queues by track_ids then merge. |
None
|
device |
str
|
A str representation of the device the frames should be on after merging since all instances in the queue are kept on the cpu. |
None
|
Returns:
Type | Description |
---|---|
list[Frame]
|
A sorted list of Frame objects from which each instance came from, containing the corresponding instances. |
Source code in dreem/inference/track_queue.py
def collate_tracks(
self, track_ids: list[int] = None, device: str = None
) -> list[Frame]:
"""Merge queues into a single list of Frames containing corresponding instances.
Args:
track_ids: A list of trajectorys to merge. If None, then merge all
queues, otherwise filter queues by track_ids then merge.
device: A str representation of the device the frames should be on after merging
since all instances in the queue are kept on the cpu.
Returns:
A sorted list of Frame objects from which each instance came from,
containing the corresponding instances.
"""
if len(self._queues) == 0:
return []
frames = {}
tracks_to_convert = (
{track: queue for track, queue in self._queues if track in track_ids}
if track_ids is not None
else self._queues
)
for track, instances in tracks_to_convert.items():
for video_id, frame_id, vid_name, img_shape, instance in instances:
if (video_id, frame_id) not in frames.keys():
frame = Frame(
video_id,
frame_id,
img_shape=img_shape,
instances=[instance],
vid_file=vid_name,
)
frames[(video_id, frame_id)] = frame
else:
frames[(video_id, frame_id)].instances.append(instance)
return [frames[frame].to(device) for frame in sorted(frames.keys())]
end_tracks(track_id=None)
¶
Terminate tracks and removing them from the queue.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
track_id |
int
|
The index of the trajectory to be ended and removed.
If |
None
|
Returns:
Type | Description |
---|---|
bool
|
True if the track is successively removed, otherwise False. (ie if the track doesn't exist in the queue.) |
Source code in dreem/inference/track_queue.py
def end_tracks(self, track_id: int = None) -> bool:
"""Terminate tracks and removing them from the queue.
Args:
track_id: The index of the trajectory to be ended and removed.
If `None` then then every trajectory is removed and the track queue is reset.
Returns:
True if the track is successively removed, otherwise False.
(ie if the track doesn't exist in the queue.)
"""
if track_id is None:
self._queues = {}
self._curr_gap = {}
self.curr_track = -1
else:
try:
self._queues.pop(track_id)
self._curr_gap.pop(track_id)
except KeyError:
print(f"Track ID {track_id} not found in queue!")
return False
return True
increment_gaps(pred_track_ids)
¶
Keep track of number of consecutive frames each trajectory has been missing from the queue.
If a trajectory has exceeded the max_gap
then terminate the track and remove it from the queue.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pred_track_ids |
list[int]
|
A list of track_ids to be matched against the trajectories in the queue.
If a trajectory is in |
required |
Returns:
Type | Description |
---|---|
dict[int, bool]
|
A dictionary containing the trajectory id and a boolean value representing whether or not it has exceeded the max allowed gap and been terminated. |
Source code in dreem/inference/track_queue.py
def increment_gaps(self, pred_track_ids: list[int]) -> dict[int, bool]:
"""Keep track of number of consecutive frames each trajectory has been missing from the queue.
If a trajectory has exceeded the `max_gap` then terminate the track and remove it from the queue.
Args:
pred_track_ids: A list of track_ids to be matched against the trajectories in the queue.
If a trajectory is in `pred_track_ids` then its gap counter is reset,
otherwise its incremented by 1.
Returns:
A dictionary containing the trajectory id and a boolean value representing
whether or not it has exceeded the max allowed gap and been
terminated.
"""
exceeded_gap = {}
for track in pred_track_ids:
if track not in self._curr_gap:
self._curr_gap[track] = 0
for track in self._curr_gap:
if track not in pred_track_ids:
self._curr_gap[track] += 1
if self.verbose:
warnings.warn(
f"Track {track} has not been seen for {self._curr_gap[track]} frames."
)
else:
self._curr_gap[track] = 0
if self._curr_gap[track] >= self.max_gap:
exceeded_gap[track] = True
else:
exceeded_gap[track] = False
for track, gap_exceeded in exceeded_gap.items():
if gap_exceeded:
if self.verbose:
warnings.warn(
f"Track {track} has not been seen for {self._curr_gap[track]} frames! Terminating Track...Current number of tracks = {self.n_tracks}."
)
self._queues.pop(track)
self._curr_gap.pop(track)
return exceeded_gap