diff --git a/depthai_nodes/ml/messages/creators/detection.py b/depthai_nodes/ml/messages/creators/detection.py index 69215d5..842f57e 100644 --- a/depthai_nodes/ml/messages/creators/detection.py +++ b/depthai_nodes/ml/messages/creators/detection.py @@ -96,9 +96,9 @@ def create_detection_message( if keypoints is not None and len(keypoints) != 0: if not isinstance(keypoints, List): raise ValueError(f"keypoints should be list, got {type(keypoints)}.") - for pointcloud in keypoints: - for point in pointcloud: - if not isinstance(point, Tuple): + for object_keypoints in keypoints: + for point in object_keypoints: + if not isinstance(point, Tuple) and not isinstance(point, List): raise ValueError( f"keypoint pairs should be list of tuples, got {type(point)}." ) diff --git a/depthai_nodes/ml/messages/img_detections.py b/depthai_nodes/ml/messages/img_detections.py index 9c6114e..0593372 100644 --- a/depthai_nodes/ml/messages/img_detections.py +++ b/depthai_nodes/ml/messages/img_detections.py @@ -39,7 +39,7 @@ def keypoints(self, value: List[Tuple[Union[int, float], Union[int, float]]]): raise TypeError("Keypoints must be a list") for item in value: if ( - not isinstance(item, tuple) + not (isinstance(item, tuple) or isinstance(item, list)) or len(item) != 2 or not all(isinstance(i, (int, float)) for i in item) ): diff --git a/depthai_nodes/ml/parsers/scrfd.py b/depthai_nodes/ml/parsers/scrfd.py index 40c88cc..5b4fa50 100644 --- a/depthai_nodes/ml/parsers/scrfd.py +++ b/depthai_nodes/ml/parsers/scrfd.py @@ -1,8 +1,8 @@ -import cv2 import depthai as dai import numpy as np from ..messages.creators import create_detection_message +from .utils.scrfd import decode_scrfd class SCRFDParser(dai.node.ThreadedHostNode): @@ -20,6 +20,12 @@ class SCRFDParser(dai.node.ThreadedHostNode): Non-maximum suppression threshold. top_k : int Maximum number of detections to keep. + feat_stride_fpn : tuple + Tuple of the feature strides. + num_anchors : int + Number of anchors. + input_size : tuple + Input size of the model. Output Message/s ---------------- @@ -28,7 +34,15 @@ class SCRFDParser(dai.node.ThreadedHostNode): **Description**: ImgDetections message containing bounding boxes, labels, and confidence scores of detected faces. """ - def __init__(self, score_threshold=0.5, nms_threshold=0.5, top_k=100): + def __init__( + self, + score_threshold=0.5, + nms_threshold=0.5, + top_k=100, + input_size=(640, 640), + feat_stride_fpn=(8, 16, 32), + num_anchors=2, + ): """Initializes the SCRFDParser node. @param score_threshold: Confidence score threshold for detected faces. @@ -37,6 +51,12 @@ def __init__(self, score_threshold=0.5, nms_threshold=0.5, top_k=100): @type nms_threshold: float @param top_k: Maximum number of detections to keep. @type top_k: int + @param feat_stride_fpn: List of the feature strides. + @type feat_stride_fpn: tuple + @param num_anchors: Number of anchors. + @type num_anchors: int + @param input_size: Input size of the model. + @type input_size: tuple """ dai.node.ThreadedHostNode.__init__(self) self.input = dai.Node.Input(self) @@ -46,6 +66,10 @@ def __init__(self, score_threshold=0.5, nms_threshold=0.5, top_k=100): self.nms_threshold = nms_threshold self.top_k = top_k + self.feat_stride_fpn = feat_stride_fpn + self.num_anchors = num_anchors + self.input_size = input_size + def setConfidenceThreshold(self, threshold): """Sets the confidence score threshold for detected faces. @@ -70,6 +94,30 @@ def setTopK(self, top_k): """ self.top_k = top_k + def setFeatStrideFPN(self, feat_stride_fpn): + """Sets the feature stride of the FPN. + + @param feat_stride_fpn: Feature stride of the FPN. + @type feat_stride_fpn: list + """ + self.feat_stride_fpn = feat_stride_fpn + + def setInputSize(self, input_size): + """Sets the input size of the model. + + @param input_size: Input size of the model. + @type input_size: list + """ + self.input_size = input_size + + def setNumAnchors(self, num_anchors): + """Sets the number of anchors. + + @param num_anchors: Number of anchors. + @type num_anchors: int + """ + self.num_anchors = num_anchors + def run(self): while self.isRunning(): try: @@ -77,101 +125,69 @@ def run(self): except dai.MessageQueue.QueueException: break # Pipeline was stopped - score_8 = output.getTensor("score_8").flatten().astype(np.float32) - score_16 = output.getTensor("score_16").flatten().astype(np.float32) - score_32 = output.getTensor("score_32").flatten().astype(np.float32) + score_8 = ( + output.getTensor("score_8", dequantize=True) + .flatten() + .astype(np.float32) + ) + score_16 = ( + output.getTensor("score_16", dequantize=True) + .flatten() + .astype(np.float32) + ) + score_32 = ( + output.getTensor("score_32", dequantize=True) + .flatten() + .astype(np.float32) + ) bbox_8 = ( - output.getTensor("bbox_8").reshape(len(score_8), 4).astype(np.float32) + output.getTensor("bbox_8", dequantize=True) + .reshape(len(score_8), 4) + .astype(np.float32) ) bbox_16 = ( - output.getTensor("bbox_16").reshape(len(score_16), 4).astype(np.float32) + output.getTensor("bbox_16", dequantize=True) + .reshape(len(score_16), 4) + .astype(np.float32) ) bbox_32 = ( - output.getTensor("bbox_32").reshape(len(score_32), 4).astype(np.float32) + output.getTensor("bbox_32", dequantize=True) + .reshape(len(score_32), 4) + .astype(np.float32) ) kps_8 = ( - output.getTensor("kps_8").reshape(len(score_8), 5, 2).astype(np.float32) + output.getTensor("kps_8", dequantize=True) + .reshape(len(score_8), 10) + .astype(np.float32) ) kps_16 = ( - output.getTensor("kps_16") - .reshape(len(score_16), 5, 2) + output.getTensor("kps_16", dequantize=True) + .reshape(len(score_16), 10) .astype(np.float32) ) kps_32 = ( - output.getTensor("kps_32") - .reshape(len(score_32), 5, 2) + output.getTensor("kps_32", dequantize=True) + .reshape(len(score_32), 10) .astype(np.float32) ) - bboxes = [] - keypoints = [] - - for i in range(len(score_8)): - y = int(np.floor(i / 80)) * 4 - x = (i % 160) * 4 - bbox = bbox_8[i] - xmin = int(x - bbox[0] * 8) - ymin = int(y - bbox[1] * 8) - xmax = int(x + bbox[2] * 8) - ymax = int(y + bbox[3] * 8) - kps = kps_8[i] - kps_batch = [] - for kp in kps: - kpx = int(x + kp[0] * 8) - kpy = int(y + kp[1] * 8) - kps_batch.append([kpx, kpy]) - keypoints.append(kps_batch) - bbox = [xmin, ymin, xmax, ymax] - bboxes.append(bbox) - - for i in range(len(score_16)): - y = int(np.floor(i / 40)) * 8 - x = (i % 80) * 8 - bbox = bbox_16[i] - xmin = int(x - bbox[0] * 16) - ymin = int(y - bbox[1] * 16) - xmax = int(x + bbox[2] * 16) - ymax = int(y + bbox[3] * 16) - kps = kps_16[i] - kps_batch = [] - for kp in kps: - kpx = int(x + kp[0] * 16) - kpy = int(y + kp[1] * 16) - kps_batch.append([kpx, kpy]) - keypoints.append(kps_batch) - bbox = [xmin, ymin, xmax, ymax] - bboxes.append(bbox) - - for i in range(len(score_32)): - y = int(np.floor(i / 20)) * 16 - x = (i % 40) * 16 - bbox = bbox_32[i] - xmin = int(x - bbox[0] * 32) - ymin = int(y - bbox[1] * 32) - xmax = int(x + bbox[2] * 32) - ymax = int(y + bbox[3] * 32) - kps = kps_32[i] - kps_batch = [] - for kp in kps: - kpx = int(x + kp[0] * 32) - kpy = int(y + kp[1] * 32) - kps_batch.append([kpx, kpy]) - keypoints.append(kps_batch) - bbox = [xmin, ymin, xmax, ymax] - bboxes.append(bbox) - - scores = np.concatenate([score_8, score_16, score_32]) - indices = cv2.dnn.NMSBoxes( - bboxes, - list(scores), - self.score_threshold, - self.nms_threshold, - top_k=self.top_k, + bboxes_concatenated = [bbox_8, bbox_16, bbox_32] + scores_concatenated = [score_8, score_16, score_32] + kps_concatenated = [kps_8, kps_16, kps_32] + + bboxes, scores, keypoints = decode_scrfd( + bboxes_concatenated=bboxes_concatenated, + scores_concatenated=scores_concatenated, + kps_concatenated=kps_concatenated, + feat_stride_fpn=self.feat_stride_fpn, + input_size=self.input_size, + num_anchors=self.num_anchors, + score_threshold=self.score_threshold, + nms_threshold=self.nms_threshold, + ) + detection_msg = create_detection_message( + bboxes, scores, None, keypoints.tolist() ) - bboxes = np.array(bboxes)[indices] - keypoints = np.array(keypoints)[indices] - scores = scores[indices] - - detection_msg = create_detection_message(bboxes, scores, None, None) detection_msg.setTimestamp(output.getTimestamp()) + self.out.send(detection_msg) diff --git a/depthai_nodes/ml/parsers/utils/scrfd.py b/depthai_nodes/ml/parsers/utils/scrfd.py new file mode 100644 index 0000000..533f4a0 --- /dev/null +++ b/depthai_nodes/ml/parsers/utils/scrfd.py @@ -0,0 +1,174 @@ +import numpy as np + + +def nms(dets, nms_thresh=0.5): + """Non-maximum suppression. + + @param dets: Bounding boxes and confidence scores. + @type dets: np.ndarray + @return: Indices of the detections to keep. + @rtype: list[int] + """ + thresh = nms_thresh + x1 = dets[:, 0] + y1 = dets[:, 1] + x2 = dets[:, 2] + y2 = dets[:, 3] + scores = dets[:, 4] + + areas = (x2 - x1 + 1) * (y2 - y1 + 1) + order = scores.argsort()[::-1] + + keep = [] + while order.size > 0: + i = order[0] + keep.append(i) + xx1 = np.maximum(x1[i], x1[order[1:]]) + yy1 = np.maximum(y1[i], y1[order[1:]]) + xx2 = np.minimum(x2[i], x2[order[1:]]) + yy2 = np.minimum(y2[i], y2[order[1:]]) + + w = np.maximum(0.0, xx2 - xx1 + 1) + h = np.maximum(0.0, yy2 - yy1 + 1) + inter = w * h + ovr = inter / (areas[i] + areas[order[1:]] - inter) + + inds = np.where(ovr <= thresh)[0] + order = order[inds + 1] + + return keep + + +def distance2bbox(points, distance, max_shape=None): + """Decode distance prediction to bounding box. + + @param points: Shape (n, 2), [x, y]. + @type points: np.ndarray + @param distance: Distance from the given point to 4 boundaries (left, top, right, + bottom). + @type distance: np.ndarray + @param max_shape: Shape of the image. + @type max_shape: Tuple[int, int] + @return: Decoded bboxes. + @rtype: np.ndarray + """ + x1 = points[:, 0] - distance[:, 0] + y1 = points[:, 1] - distance[:, 1] + x2 = points[:, 0] + distance[:, 2] + y2 = points[:, 1] + distance[:, 3] + if max_shape is not None: + x1 = x1.clamp(min=0, max=max_shape[1]) + y1 = y1.clamp(min=0, max=max_shape[0]) + x2 = x2.clamp(min=0, max=max_shape[1]) + y2 = y2.clamp(min=0, max=max_shape[0]) + return np.stack([x1, y1, x2, y2], axis=-1) + + +def distance2kps(points, distance, max_shape=None): + """Decode distance prediction to keypoints. + + @param points: Shape (n, 2), [x, y]. + @type points: np.ndarray + @param distance: Distance from the given point to 4 boundaries (left, top, right, + bottom). + @type distance: np.ndarray + @param max_shape: Shape of the image. + @type max_shape: Tuple[int, int] + @return: Decoded keypoints. + @rtype: np.ndarray + """ + preds = [] + for i in range(0, distance.shape[1], 2): + px = points[:, i % 2] + distance[:, i] + py = points[:, i % 2 + 1] + distance[:, i + 1] + if max_shape is not None: + px = px.clamp(min=0, max=max_shape[1]) + py = py.clamp(min=0, max=max_shape[0]) + preds.append(px) + preds.append(py) + return np.stack(preds, axis=-1) + + +def decode_scrfd( + bboxes_concatenated, + scores_concatenated, + kps_concatenated, + feat_stride_fpn, + input_size, + num_anchors, + score_threshold, + nms_threshold, +): + """Decode the detection results of SCRFD. + + @param bboxes_concatenated: List of bounding box predictions for each scale. + @type bboxes_concatenated: list[np.ndarray] + @param scores_concatenated: List of confidence score predictions for each scale. + @type scores_concatenated: list[np.ndarray] + @param kps_concatenated: List of keypoint predictions for each scale. + @type kps_concatenated: list[np.ndarray] + @param feat_stride_fpn: List of feature strides for each scale. + @type feat_stride_fpn: list[int] + @param input_size: Input size of the model. + @type input_size: tuple[int] + @param num_anchors: Number of anchors. + @type num_anchors: int + @param score_threshold: Confidence score threshold. + @type score_threshold: float + @param nms_threshold: Non-maximum suppression threshold. + @type nms_threshold: float + @return: Bounding boxes, confidence scores, and keypoints of detected objects. + @rtype: tuple[np.ndarray, np.ndarray, np.ndarray] + """ + scores_list = [] + bboxes_list = [] + kps_list = [] + + for idx, stride in enumerate(feat_stride_fpn): + scores = scores_concatenated[idx] + bbox_preds = bboxes_concatenated[idx] * stride + kps_preds = kps_concatenated[idx] * stride + + height = input_size[0] // stride + width = input_size[1] // stride + + anchor_centers = np.stack(np.mgrid[:height, :width][::-1], axis=-1).astype( + np.float32 + ) + anchor_centers = (anchor_centers * stride).reshape((-1, 2)) + if num_anchors > 1: + anchor_centers = np.stack([anchor_centers] * num_anchors, axis=1).reshape( + (-1, 2) + ) + + pos_inds = np.where(scores >= score_threshold)[0] + bboxes = distance2bbox(anchor_centers, bbox_preds) + pos_scores = scores[pos_inds] + pos_bboxes = bboxes[pos_inds] + scores_list.append(pos_scores.reshape(-1, 1)) + bboxes_list.append(pos_bboxes) + + kpss = distance2kps(anchor_centers, kps_preds) + kpss = kpss.reshape((kpss.shape[0], -1, 2)) + pos_kpss = kpss[pos_inds] + kps_list.append(pos_kpss) + + scores = np.vstack(scores_list) + scores_ravel = scores.ravel() + order = scores_ravel.argsort()[::-1] + bboxes = np.vstack(bboxes_list) + kpss = np.vstack(kps_list) + + pre_det = np.hstack((bboxes, scores)).astype(np.float32, copy=False) + pre_det = pre_det[order, :] + keep = nms(pre_det, nms_threshold) + det = pre_det[keep, :] + kpss = kpss[order, :, :] + kpss = kpss[keep, :, :] + + scores = det[:, 4] + bboxes = np.int32(det[:, :4]) + keypoints = np.int32(kpss) + keypoints = keypoints.reshape(-1, 5, 2) + + return bboxes, scores, keypoints