Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
fullpipeline_results/*
gt/*
keys
129 changes: 67 additions & 62 deletions baseline.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,9 @@
import time
from collections import deque
from tqdm import tqdm
import pdb
import math
import base64, json, os
import openai, pdb, cv2
import openai, cv2
from utils import img_proc_utils, mobilesam, file_utils
from utils import process_utils
from ast import literal_eval
Expand All @@ -20,10 +19,21 @@
from PIL import Image
from io import BytesIO

class VLM:
from utils.ros_vlm import VLM

def frame_paths_from_folder(folder_path):
frames= []
for f in os.listdir(folder_path):
frame = os.path.join(folder_path,f)
frames.append(frame)
return frames

class Baseline_VLM:
def __init__(self, config, args) -> None:
self.root = args.root
self.config = file_utils.load_yaml(config)
self.config['root'] = self.root

self.img_queue = deque(maxlen=self.config['exp']['prompt_img_len'])
self.video_name = None
self.model_name = self.config['exp']['model_name']
Expand Down Expand Up @@ -110,49 +120,33 @@ def create_crops(self, img_path=None):
if not os.path.exists(f"{self.root}/{self.config['sam']['output_rotated_crop_folder']}/{self.video_name}"):
os.makedirs(f"{self.root}/{self.config['sam']['output_rotated_crop_folder']}/{self.video_name}")

try:
temp_crop_name_list = list()
rot_im_lst , bbox_lst, conf_lst = img_proc_utils.get_rotated_image_crops(img_path, self.crop_model)
for idx, rot_im in enumerate(rot_im_lst):
cv2.imwrite(f"{rot_img_path[:-4]}_{idx}.jpg", rot_im)
self.img_dict['full'] = f"{img_path}"
temp_crop_name_list.append(f"{rot_img_path[:-4]}_{idx}.jpg")
self.img_dict['rot_crops'] = temp_crop_name_list
self.img_dict['bbox'] = bbox_lst
self.img_dict['conf'] = conf_lst

except Exception as e:
print(e)
print(f"while running {self.config['sam']['model_name']} on {img_path}")
print('cropping messup - using full image for this!')
print('not saved any rotation img as NONE generated')
pdb.set_trace()
temp_crop_name_list = list()
rot_im_lst , bbox_lst, conf_lst = img_proc_utils.get_rotated_image_crops(img_path, self.crop_model)
for idx, rot_im in enumerate(rot_im_lst):
cv2.imwrite(f"{rot_img_path[:-4]}_{idx}.jpg", rot_im)
self.img_dict['full'] = f"{img_path}"
temp_crop_name_list.append(f"{rot_img_path[:-4]}_{idx}.jpg")
self.img_dict['rot_crops'] = temp_crop_name_list
self.img_dict['bbox'] = bbox_lst
self.img_dict['conf'] = conf_lst

elif isinstance(img_path, deque):
for x, img_p in enumerate(img_path):
rot_img_path = f"{self.root}/{self.config['sam']['output_rotated_crop_folder']}/{self.video_name}/{os.path.basename(img_p)}"
if not os.path.exists(f"{self.root}/{self.config['sam']['output_rotated_crop_folder']}/{self.video_name}"):
os.makedirs(f"{self.root}/{self.config['sam']['output_rotated_crop_folder']}/{self.video_name}")

try:
temp_crop_name_list = list()
rot_im_lst , bbox_lst , conf_lst = img_proc_utils.get_image_crops(img_p, self.crop_model)
for idx, rot_im in enumerate(rot_im_lst):
cv2.imwrite(f"{rot_img_path[:-4]}_{idx}.jpg", rot_im)
self.img_dict[x]['full'] = f"{img_p}"
temp_crop_name_list.append(f"{rot_img_path[:-4]}_{idx}.jpg")
self.img_dict[x]['rot_crops'] = temp_crop_name_list
self.img_dict[x]['bbox'] = bbox_lst
self.img_dict[x]['conf'] = conf_lst

except Exception as e:
print(e)
print(f"UNSUCCESS-- at crop generation")
print(f"while running {self.config['sam']['model_name']} on {img_p}")
print('cropping messup - using full image for this!')
print('not saved any rotation img as NONE generated')
pdb.set_trace()
temp_crop_name_list = list()
rot_im_lst , bbox_lst , conf_lst = img_proc_utils.get_image_crops(img_p, self.crop_model)
for idx, rot_im in enumerate(rot_im_lst):
cv2.imwrite(f"{rot_img_path[:-4]}_{idx}.jpg", rot_im)
self.img_dict[x]['full'] = f"{img_p}"
temp_crop_name_list.append(f"{rot_img_path[:-4]}_{idx}.jpg")
self.img_dict[x]['rot_crops'] = temp_crop_name_list
self.img_dict[x]['bbox'] = bbox_lst
self.img_dict[x]['conf'] = conf_lst


def create_message(self, image_path):

self.create_prompt()
Expand Down Expand Up @@ -205,13 +199,6 @@ def create_message(self, image_path):
all_messages.append(messages)
return all_messages

def frame_paths_from_folder(self, folder_path):
frames= []
for f in os.listdir(folder_path):
frame = os.path.join(folder_path,f)
frames.append(frame)
return frames

def post_process_gemini_response(self, text, width, height):
lines = text.splitlines()
for i, line in enumerate(lines):
Expand All @@ -232,10 +219,12 @@ def process_gpt_output(self, resp):
temp_3 = [te.replace('python','') for te in temp_2]
temp_4 = [te.replace('\t','') for te in temp_3]
temp_final = [json.loads(te) for te in temp_4]
# print(">>>")
# print(temp_final)
# print("<<<")
return temp_final

def prompt_model(self, image_path):

if self.config['exp']['prompt_img_len'] == 1:
self.list_message = self.create_message(image_path)
mega_resp = []
Expand All @@ -250,6 +239,8 @@ def prompt_model(self, image_path):
n=self.config['exp']['voting_iter_count']
)
prc_resp = self.process_gpt_output(completion.choices)
print(prc_resp)
print(type(prc_resp), type(prc_resp[0]))
mega_resp.append(prc_resp)
break
except Exception as e:
Expand All @@ -275,31 +266,33 @@ def get_image_string(self, image_path):
parser = argparse.ArgumentParser(description="baseline")
parser.add_argument('--root', type=str, help='/path/to/Sign-Understanding')
args = parser.parse_args()

root = args.root

while True:
try:
r = input("Recognition or Full-Pipeline Evaluation? R/F")
if r.upper() == 'R':
config = os.path.join(root, 'config/recognition_eval_config.yaml')
elif r.upper() == 'F':
config = os.path.join(root,'config/full_pipeline_eval_config.yaml')
r = input("Recognition or Full-Pipeline Evaluation? R/F\n")
if r.upper() == 'R':
config = os.path.join(root, 'config/recognition_eval_config.yaml')
break
elif r.upper() == 'F':
config = os.path.join(root,'config/full_pipeline_eval_config.yaml')
break
except Exception as e:
print('Please enter valid response....')
print('Please enter valid response....')

vlm = VLM(config=config, args)
vlm = VLM(config, args)
# vlm = VLM(config)
print(f"You are using this config: {config}")
print(f"You are using this model: {vlm.config['exp']['model_name']} and version {vlm.config['exp']['model_version']}")
print(f"You are using this prompt: {vlm.config['exp']['prompt_file']} and symbol_list {vlm.config['exp']['symbols']}")
print('Do you agree (c) or disagree (q)?')
pdb.set_trace()
confidence_tries = vlm.config['exp']['voting_iter_count']

if vlm.config['exp']['source'] == 'selected-frames' and vlm.config['name'] == 'recognition':
names = [f"{vlm.config['exp']['model_name']}"]
elif vlm.config['exp']['source'] == 'selected-frames' and vlm.config['name'] == 'full-pipeline':
names = [f"{vlm.config['exp']['crop_gen_model']}-{vlm.config['exp']['model_name']}"]

for nm in tqdm(names):
print("Starting eval...")
for nm in names:
vlm.video_name = nm
if vlm.config['name'] == 'full-pipeline':
vlm.crop_model.video_name = nm
Expand Down Expand Up @@ -327,24 +320,36 @@ def get_image_string(self, image_path):
'symbol labels' : ann['symbol labels']})
bbox_gt_dict[item['imagePath']] = gt_boxes #xyxy list
gt_resp_dict[item['imagePath']] = recg_ann
frame_paths = vlm.frame_paths_from_folder(all_frame_folder)
frame_paths = frame_paths_from_folder(all_frame_folder)

base = 0
correct = 0
history = list()
match_history = list()
vlm.img_queue = deque(maxlen=vlm.config['exp']['prompt_img_len'])

print(f"Evaluating {len(frame_paths)} frames...")
bbox_preds = dict()
for cnt, frame_path in tqdm(enumerate(frame_paths)):
if cnt == 1:
break
result = dict()
vlm_decider_flag = True
vlm.img_queue.append(frame_path)
vlm.img_dict = {i: {'full': None, 'rot_crops': None, 'bbox': None} for i in range(len(vlm.img_queue))}
vlm.last_message = None
resp = vlm.prompt_model(vlm.img_queue)
# resp = vlm.prompt_model(vlm.img_queue)

if vlm.config['exp']['rot_crops']:
if len(vlm.img_queue) != 1:
raise NotImplementedError
# Should only take a single image path
tmp_deq = deque(maxlen=1)
tmp_deq.append(frame_path)
vlm.create_crops(tmp_deq)

resp = []
for rot_im in vlm.img_dict[0]['rot_crops']:
individual_resp = vlm.prompt_model([rot_im], return_json=False)
resp.append(individual_resp)

outputs = []
for r in resp:
Expand Down
12 changes: 3 additions & 9 deletions utils/file_utils.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,8 @@
import yaml, json, os

def load_yaml(filepath):
try:
with open(f"{filepath}", "r") as file:
# print(os.getcwd())
data = yaml.safe_load(file) # Use safe_load to avoid potential security issues
# print(data)
except FileNotFoundError:
print(os.getcwd())
print("File not found.")
with open(f"{filepath}", "r") as file:
data = yaml.safe_load(file) # Use safe_load to avoid potential security issues
return data

def save_file_json(file_path, data):
Expand Down Expand Up @@ -40,4 +34,4 @@ def read_json(filepath):

def makeCheck(fol_path):
if not os.path.exists(fol_path):
os.makedirs(fol_path)
os.makedirs(fol_path)
88 changes: 84 additions & 4 deletions utils/img_proc_utils.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import numpy as np
from scipy.ndimage import label, center_of_mass
import cv2, os, pdb
from utils.mobilesam import GroundedSAM
from utils import file_utils
import cv2

def crop_buffer_bbox(img_path, bbox_cords, buffer = 10):
'''
Expand Down Expand Up @@ -75,4 +73,86 @@ def greedy_match(preds, gts, iou_threshold=0.75):
matched_pred_indices.add(i)
matched_gt_indices.add(j)

return matches
return matches


def convert_to_binary(img, bbox=None, mode = "bbox"):
if mode == "bbox":
assert len(bbox), 'provide bbox if you choose bbox type for binary conversion'
img_bin = np.zeros((img.shape[0], img.shape[1]), dtype=np.uint8)
x_min , y_min, x_max , y_max = bbox
img_bin[int(y_min):int(y_max) + 1 , int(x_min):int(x_max)+1] = 255
return img_bin

elif mode == 'irregular':
raise NotImplementedError

def calculate_orientation(binary_image):
"""
Calculate the orientation vector of a 2D shape in a binary image.
Returns angle in radians and the unit vector of orientation.
"""
# Calculate moments
y_coords, x_coords = np.nonzero(binary_image)
x_bar, y_bar = np.mean(x_coords), np.mean(y_coords)

# Calculate central moments
u20 = np.sum((x_coords - x_bar) ** 2)
u02 = np.sum((y_coords - y_bar) ** 2)
u11 = np.sum((x_coords - x_bar) * (y_coords - y_bar))

# Calculate orientation angle
theta = 0.5 * np.arctan2(2 * u11, u20 - u02)

# Calculate unit vector
direction_vector = np.array([np.cos(theta), np.sin(theta)])

return theta, direction_vector

def get_shape_properties(binary_image):
"""
Get basic properties of the shape including centroid and orientation.
"""
# Find centroid
labeled_array, num_features = label(binary_image)
cy , cx = center_of_mass(binary_image)

# Get orientation
theta, direction = calculate_orientation(binary_image)

return {
'centroid': (cx, cy),
'angle_rad': theta,
'angle_deg': np.degrees(theta),
'direction_vector': direction
}

def rotate_sign_to_align_bbox(crop_img, bbox_cords1 , irregular_binary_mask, ablation = False):
x_min , y_min, x_max , y_max = bbox_cords1 # regular shape
bbox_binary = convert_to_binary(crop_img, bbox_cords1)
center = get_shape_properties(irregular_binary_mask)['centroid'] # center of irregular
angle_bbox = np.degrees(calculate_orientation(bbox_binary)[0])
angle_irr = np.degrees(calculate_orientation(irregular_binary_mask)[0])
# print(f"BBOX ANGLE: {angle_bbox}")
# print(f"IRR ANAGLE: {angle_irr}")
if angle_irr > 0 :
angle = min(10,angle_irr)
else:
angle = max(-10,angle_irr)

scale = 1
rotation_matrix = cv2.getRotationMatrix2D(center, angle, scale)
rotated_image = cv2.warpAffine(crop_img, rotation_matrix, (crop_img.shape[1], crop_img.shape[0]))
if ablation:
return angle_irr, crop_img
return angle_irr, rotated_image

def get_rotated_image_crops(img_path, crop_model, ocr_map_queue=None, ablation=True):
#ablation True means we are not canonicalizing the crop
ctd, area = crop_model.execute_model(img_path, ocr_map_queue, type='box')
crop_img = crop_buffer_bbox(img_path, crop_model.detections.xyxy[0])
# if not ablation:
# crop_model.execute_model(crop_img,ocr_map_queue, type='mask')
irregular_binary_mask = crop_model.detections.mask[0].astype(np.uint8)*255
ang , rot_img = rotate_sign_to_align_bbox(crop_img, crop_model.detections.xyxy[0], irregular_binary_mask, ablation)
return ctd, area, ang, rot_img
16 changes: 8 additions & 8 deletions utils/mobilesam.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class GroundedSAM:

def __init__(self, config, args, model ='mobile_sam'):
def __init__(self, config, model ='mobile_sam'):
self.config = config
self.model_name = model
self.video_name = None
Expand Down Expand Up @@ -84,10 +84,10 @@ def get_masks(self, image):

def save_binary_annotations(self):
self.binary_mask = self.detections.mask[0].astype(np.uint8)*255
if not os.path.exists(f"{args.root}/{self.config['sam']['output_binary_mask_folder']}/{self.video_name}/"):
os.makedirs(f"{args.root}/{self.config['sam']['output_binary_mask_folder']}/{self.video_name}/")
if not os.path.exists(f"{self.config['root']}/{self.config['sam']['output_binary_mask_folder']}/{self.video_name}/"):
os.makedirs(f"{self.config['root']}/{self.config['sam']['output_binary_mask_folder']}/{self.video_name}/")

cv2.imwrite(f"{args.root}/{self.config['sam']['output_binary_mask_folder']}/{self.video_name}/{os.path.basename(self.temp_image_path)}", self.binary_mask)
cv2.imwrite(f"{self.config['root']}/{self.config['sam']['output_binary_mask_folder']}/{self.video_name}/{os.path.basename(self.temp_image_path)}", self.binary_mask)

def save_rgb_annotations(self, image):
box_annotator = sv.BoxAnnotator()
Expand All @@ -99,10 +99,10 @@ def save_rgb_annotations(self, image):
in self.detections]
annotated_frame = box_annotator.annotate(scene=image.copy(), detections=self.detections)
annotated_frame = label_annotator.annotate(scene=annotated_frame, detections=self.detections, labels=labels)
if not os.path.exists(f"{args.root}/{self.config['sam']['output_ann_box_folder']}/{self.video_name}/"):
os.makedirs(f"{args.root}/{self.config['sam']['output_ann_box_folder']}/{self.video_name}/")
if not os.path.exists(f"{self.config['root']}/{self.config['sam']['output_ann_box_folder']}/{self.video_name}/"):
os.makedirs(f"{self.config['root']}/{self.config['sam']['output_ann_box_folder']}/{self.video_name}/")

cv2.imwrite(f"{args.root}/{self.config['sam']['output_ann_box_folder']}/{self.video_name}/{os.path.basename(self.temp_image_path)}", annotated_frame)
cv2.imwrite(f"{self.config['root']}/{self.config['sam']['output_ann_box_folder']}/{self.video_name}/{os.path.basename(self.temp_image_path)}", annotated_frame)

def max_conf_process(self):
conf = np.array([c for c in self.detections.confidence])
Expand Down Expand Up @@ -158,4 +158,4 @@ def largest_box_process(self):
self.largest_area = areas[idx]
self.detections.confidence = np.array([self.detections.confidence[idx]])
self.detections.class_id = np.array([self.detections.class_id[idx]])
self.detections.xyxy = np.reshape(self.detections.xyxy[idx], (1,4))
self.detections.xyxy = np.reshape(self.detections.xyxy[idx], (1,4))
Loading