diff --git a/.gitignore b/.gitignore index bec4f9f..af03091 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,10 @@ .DS_Store /Data/fsoco_segmentation_train /Segmentation/Yolact_minimal +/DepthEstimation/TestData +/DepthEstimation/Lite-Mono +/DepthEstimation/output +/DepthEstimation/output_best +/DepthEstimation/weights /SampleData/driverless.mp4 *.txt diff --git a/DepthEstimation/README.md b/DepthEstimation/README.md new file mode 100644 index 0000000..d01f7d4 --- /dev/null +++ b/DepthEstimation/README.md @@ -0,0 +1,17 @@ +# Lite Mono archive +The model was deemed 'not good enough' to continue use. It is too slow, and just not good enough at large depths +Clone +``` +git@github.com:noahzn/Lite-Mono.git +``` +into this folder and run +``` +python main.py +``` +in order to get depth estimation + +A good test video source for /TestData/ is +``` +https://www.youtube.com/watch?v=o5vES5QaeiQ +``` + diff --git a/DepthEstimation/ground_plane_ransac.py b/DepthEstimation/ground_plane_ransac.py new file mode 100644 index 0000000..4f1a6dc --- /dev/null +++ b/DepthEstimation/ground_plane_ransac.py @@ -0,0 +1,417 @@ +import os +import sys +import argparse +import cv2 +import numpy as np +import matplotlib +import matplotlib.pyplot as plt +from pathlib import Path +from sklearn.linear_model import RANSACRegressor +import json + +GOPRO_PRESETS = { + 'wide': {'fov': 118, 'description': 'GoPro Wide (SuperView) - 118 deg'}, + 'linear': {'fov': 85, 'description': 'GoPro Linear - 85 deg'}, + 'medium': {'fov': 95, 'description': 'GoPro Medium - 95 deg'}, + 'narrow': {'fov': 65, 'description': 'GoPro Narrow - 65 deg'}, +} +DEFAULT_GOPRO_MODE = 'wide' + +def convertDisparityToDepth(disparity, scale=1.0): + epsilon = 1e-6 + depth = scale / (disparity + epsilon) + depth[disparity <= 0] = 0 + return depth + +def getCameraIntrinsics(width, height, fx=None, fy=None, cx=None, cy=None, fov=None, goproMode=None): + if goproMode is not None and goproMode in GOPRO_PRESETS: + fov = GOPRO_PRESETS[goproMode]['fov'] + elif fov is None: + fov = GOPRO_PRESETS[DEFAULT_GOPRO_MODE]['fov'] + + if fx is None or fy is None: + focalLength = width / (2 * np.tan(np.radians(fov) / 2)) + fx = fy = focalLength + + if cx is None: + cx = width / 2 + if cy is None: + cy = height / 2 + + return {'fx': fx, 'fy': fy, 'cx': cx, 'cy': cy, 'fov': fov} + +def loadExclusionAreas(jsonPath): + with open(jsonPath, 'r') as f: + exclusions = json.load(f) + return exclusions + +def estimateGroundPlane(depthMap, cameraIntrinsics, exclusionAreas=None, subsampleStep=5, + inlierThreshold=0.1, maxTrials=100, maxTiltAngle=45, + depthScale=10.0): + metricDepth = convertDisparityToDepth(depthMap, depthScale) + + H, W = metricDepth.shape + fx = cameraIntrinsics['fx'] + fy = cameraIntrinsics['fy'] + cx = cameraIntrinsics['cx'] + cy = cameraIntrinsics['cy'] + + if exclusionAreas is None: + exclusionAreas = [] + + def isInExclusionArea(u, v, exclusions): + for box in exclusions: + if box['x1'] <= u <= box['x2'] and box['y1'] <= v <= box['y2']: + return True + return False + + points3d = [] + pointIndices = [] + + for v in range(0, H, subsampleStep): + for u in range(0, W, subsampleStep): + if isInExclusionArea(u, v, exclusionAreas): + continue + d = metricDepth[v, u] + if d > 0 and np.isfinite(d) and d < 100: + x = (u - cx) * d / fx + y = (v - cy) * d / fy + z = d + points3d.append([x, y, z]) + pointIndices.append((v, u)) + + if len(points3d) < 3: + return None, 0.0, None + + points3d = np.array(points3d) + + X = points3d[:, :2] + y = points3d[:, 2] + + ransac = RANSACRegressor( + residual_threshold=inlierThreshold, + max_trials=maxTrials, + random_state=42 + ) + + try: + ransac.fit(X, y) + except Exception as e: + return None, 0.0, None + + a, b = ransac.estimator_.coef_ + c = -1 + d = ransac.estimator_.intercept_ + + norm = np.sqrt(a**2 + b**2 + c**2) + planeParams = np.array([a, b, c, -d]) / norm + + normal = planeParams[:3] + upVector = np.array([0, -1, 0]) + dotProduct = np.abs(np.dot(normal, upVector)) + tiltAngle = np.degrees(np.arccos(np.clip(dotProduct, 0, 1))) + + inlierRatio = ransac.inlier_mask_.sum() / len(points3d) + + fullInlierMask = np.zeros(depthMap.shape, dtype=bool) + for idx, isInlier in enumerate(ransac.inlier_mask_): + v, u = pointIndices[idx] + fullInlierMask[v, u] = isInlier + + return planeParams, inlierRatio, fullInlierMask + +def generatePlaneGrid(planeParams, cameraIntrinsics, depthMap, gridSpacing=2.0, depthScale=10.0): + a, b, c, d = planeParams + + if abs(b) < 1e-6: + return [], 0, 0 + + metricDepth = convertDisparityToDepth(depthMap, depthScale) + validDepths = metricDepth[(metricDepth > 0) & (metricDepth < 100)] + if len(validDepths) == 0: + return [], 0, 0 + + minZ = max(np.percentile(validDepths, 5), 0.5) + maxZ = min(np.percentile(validDepths, 95), 50.0) + + fov = cameraIntrinsics.get('fov', 90) + maxX = maxZ * np.tan(np.radians(fov / 2)) + + zVals = np.arange(minZ, maxZ, gridSpacing) + xVals = np.arange(-maxX, maxX, gridSpacing) + + gridPoints = [] + for x in xVals: + row = [] + for z in zVals: + y = (d - a * x - c * z) / b + if -5 < y < 5: + row.append([x, y, z]) + else: + row.append(None) + gridPoints.append(row) + + return gridPoints, len(xVals), len(zVals) + +def projectPointsTo2D(points3d, cameraIntrinsics, imageShape): + H, W = imageShape + fx = cameraIntrinsics['fx'] + fy = cameraIntrinsics['fy'] + cx = cameraIntrinsics['cx'] + cy = cameraIntrinsics['cy'] + + points2d = [] + validIndices = [] + + for idx, (x, y, z) in enumerate(points3d): + if z > 0: + u = int(fx * x / z + cx) + v = int(fy * y / z + cy) + if 0 <= u < W and 0 <= v < H: + points2d.append([u, v]) + validIndices.append(idx) + + return np.array(points2d), validIndices + +def createVisualization(depthMap, inlierMask, planeParams, inlierRatio, originalFrame=None, + cameraIntrinsics=None, exclusionAreas=None, showInliers=True, depthScale=10.0): + H, W = depthMap.shape + + depthMin = depthMap.min() + depthMax = depthMap.max() + depthNorm = (depthMap - depthMin) / (depthMax - depthMin + 1e-8) * 255.0 + depthNorm = depthNorm.astype(np.uint8) + + cmap = matplotlib.colormaps.get_cmap('Spectral_r') + depthVis = (cmap(depthNorm)[:, :, :3] * 255)[:, :, ::-1].astype(np.uint8) + + barWidth = 30 + barHeight = 200 + barX = W - barWidth - 20 + barY = 50 + + for i in range(barHeight): + colorVal = int(255 * (i / barHeight)) + colorRgb = cmap(colorVal)[:3] + colorBgr = (int(colorRgb[2] * 255), int(colorRgb[1] * 255), int(colorRgb[0] * 255)) + cv2.rectangle(depthVis, (barX, barY + i), (barX + barWidth, barY + i + 1), colorBgr, -1) + + cv2.rectangle(depthVis, (barX, barY), (barX + barWidth, barY + barHeight), (255, 255, 255), 2) + + font = cv2.FONT_HERSHEY_SIMPLEX + fontScale = 0.4 + thickness = 1 + + numLabels = 5 + for i in range(numLabels): + distance = depthMin + (depthMax - depthMin) * i / (numLabels - 1) + labelY = barY + barHeight - int(barHeight * i / (numLabels - 1)) + label = f"{distance:.1f}m" + textSize = cv2.getTextSize(label, font, fontScale, thickness)[0] + cv2.putText(depthVis, label, (barX - textSize[0] - 5, labelY + 4), + font, fontScale, (255, 255, 255), thickness) + + if exclusionAreas is not None and len(exclusionAreas) > 0: + for box in exclusionAreas: + x1, y1, x2, y2 = box['x1'], box['y1'], box['x2'], box['y2'] + cv2.rectangle(depthVis, (x1, y1), (x2, y2), (0, 0, 255), 2) + cv2.putText(depthVis, "EXCLUDED FROM RANSAC", (x1 + 5, y1 + 20), font, fontScale*0.8, (0, 0, 255), thickness) + + if originalFrame is not None: + overlayVis = originalFrame.copy() + else: + overlayVis = depthVis.copy() + + if inlierMask is not None and showInliers: + inlierOverlay = np.zeros_like(overlayVis) + inlierOverlay[inlierMask] = (255, 0, 255) + + overlayVis = cv2.addWeighted(overlayVis, 1.0, inlierOverlay, 0.25, 0) + + if cameraIntrinsics is not None and planeParams is not None: + gridPoints, numX, numZ = generatePlaneGrid(planeParams, cameraIntrinsics, depthMap, gridSpacing=2.0, depthScale=depthScale) + if numX > 0 and numZ > 0: + fx = cameraIntrinsics['fx'] + fy = cameraIntrinsics['fy'] + cx = cameraIntrinsics['cx'] + cy = cameraIntrinsics['cy'] + + points2d = [[None for _ in range(numZ)] for _ in range(numX)] + + for xi in range(numX): + for zi in range(numZ): + if gridPoints[xi][zi] is not None: + x, y, z = gridPoints[xi][zi] + if z > 0.1: + u = int(fx * x / z + cx) + v = int(fy * y / z + cy) + if -200 <= u < W + 200 and -200 <= v < H + 200: + points2d[xi][zi] = (u, v) + + wireframeColor = (0, 255, 0) + wireframeThickness = 2 + + for xi in range(numX): + for zi in range(numZ - 1): + if points2d[xi][zi] and points2d[xi][zi + 1]: + pt1 = points2d[xi][zi] + pt2 = points2d[xi][zi + 1] + retval, p1, p2 = cv2.clipLine((0, 0, W, H), pt1, pt2) + if retval: + cv2.line(overlayVis, p1, p2, wireframeColor, wireframeThickness) + + for xi in range(numX - 1): + for zi in range(numZ): + if points2d[xi][zi] and points2d[xi + 1][zi]: + pt1 = points2d[xi][zi] + pt2 = points2d[xi + 1][zi] + retval, p1, p2 = cv2.clipLine((0, 0, W, H), pt1, pt2) + if retval: + cv2.line(overlayVis, p1, p2, wireframeColor, wireframeThickness) + + a, b, c, d = planeParams + + separator = np.ones((H, 20, 3), dtype=np.uint8) * 255 + + visualization = cv2.hconcat([depthVis, separator, overlayVis]) + + return visualization + +def processDepthFile(depthPath, outputDir, cameraIntrinsics, args): + basename = Path(depthPath).stem + try: + depthMap = np.load(depthPath) + except Exception as e: + print(f"ERROR loading {depthPath}: {e}") + return None + + planeParams, inlierRatio, inlierMask = estimateGroundPlane( + depthMap, + cameraIntrinsics, + exclusionAreas=args.exclusion_areas, + subsampleStep=args.subsample, + inlierThreshold=args.inlier_threshold, + maxTrials=args.max_trials, + maxTiltAngle=args.max_tilt, + depthScale=args.depth_scale, + ) + + if planeParams is None: + return None + + originalFrame = None + import re + frameMatch = re.search(r'frame_(\d+)', basename) + if frameMatch and os.path.exists(args.video): + frameNum = int(frameMatch.group(1)) + cap = cv2.VideoCapture(args.video) + cap.set(cv2.CAP_PROP_POS_FRAMES, frameNum) + ret, originalFrame = cap.read() + cap.release() + if ret and originalFrame is not None: + originalFrame = cv2.resize(originalFrame, (depthMap.shape[1], depthMap.shape[0])) + + planeOutputPath = os.path.join(outputDir, f"{basename}_plane.npy") + np.save(planeOutputPath, planeParams) + + visualization = createVisualization(depthMap, inlierMask, planeParams, inlierRatio, originalFrame, + cameraIntrinsics, args.exclusion_areas, args.show_inliers, + args.depth_scale) + + visOutputPath = os.path.join(outputDir, f"{basename}_ground_plane.png") + cv2.imwrite(visOutputPath, visualization) + + return planeParams + +def processDirectory(inputDir, outputDir, cameraIntrinsics, args): + import glob + + depthFiles = sorted(glob.glob(os.path.join(inputDir, "*_depth.npy"))) + + successCount = 0 + + for i, depthPath in enumerate(depthFiles): + planeParams = processDepthFile( + depthPath, + outputDir, + cameraIntrinsics, + args, + ) + + if planeParams is not None: + successCount += 1 + +def main(): + parser = argparse.ArgumentParser() + + parser.add_argument('--input', type=str, default='depth_output', + help='Input directory with .npy depth files (default: depth_output)') + parser.add_argument('--output', type=str, default='ground_plane_output', + help='Output directory (default: ground_plane_output)') + parser.add_argument('--frame', type=int, default=None, + help='Process specific frame number (optional, otherwise all)') + parser.add_argument('--video', type=str, default='TestData/test1.mp4', + help='Path to source video file for overlay visualization (default: TestData/test1.mp4)') + + parser.add_argument('--fx', type=float, default=None, + help='Camera focal length x (optional)') + parser.add_argument('--fy', type=float, default=None, + help='Camera focal length y (optional)') + parser.add_argument('--cx', type=float, default=None, + help='Camera principal point x (optional)') + parser.add_argument('--cy', type=float, default=None, + help='Camera principal point y (optional)') + parser.add_argument('--gopro-mode', type=str, default='wide', + choices=['wide', 'linear', 'medium', 'narrow'], + help='GoPro lens mode preset (default: wide=118deg)') + parser.add_argument('--fov', type=float, default=None, + help='Custom FOV in degrees (overrides --gopro-mode if set)') + + parser.add_argument('--exclusions', type=str, required=True, + help='Path to JSON file with exclusion areas (format: [{"x1": int, "y1": int, "x2": int, "y2": int}])') + parser.add_argument('--subsample', type=int, default=5, + help='Subsample every Nth pixel (default: 5)') + parser.add_argument('--inlier-threshold', type=float, default=0.1, + help='RANSAC inlier threshold in meters (default: 0.1)') + parser.add_argument('--max-trials', type=int, default=100, + help='Maximum RANSAC iterations (default: 100)') + parser.add_argument('--max-tilt', type=float, default=45, + help='Maximum plane tilt from horizontal in degrees (default: 45)') + parser.add_argument('--depth-scale', type=float, default=10.0, + help='Scale factor for disparity-to-depth conversion (default: 10.0)') + + parser.add_argument('--show-inliers', action='store_true', default=True, + help='Highlight inlier points in visualization (default: True)') + + args = parser.parse_args() + + args.exclusion_areas = loadExclusionAreas(args.exclusions) + + os.makedirs(args.output, exist_ok=True) + + sampleDepthFile = None + import glob + depthFiles = glob.glob(os.path.join(args.input, "*_depth.npy")) + if depthFiles: + sampleDepthFile = depthFiles[0] + sampleDepth = np.load(sampleDepthFile) + H, W = sampleDepth.shape + + cameraIntrinsics = getCameraIntrinsics( + W, H, + fx=args.fx, fy=args.fy, + cx=args.cx, cy=args.cy, + fov=args.fov, + goproMode=args.gopro_mode if args.fov is None else None + ) + + if args.frame is not None: + depthPath = os.path.join(args.input, f"frame_{args.frame:06d}_depth.npy") + + processDepthFile(depthPath, args.output, cameraIntrinsics, args) + else: + processDirectory(args.input, args.output, cameraIntrinsics, args) + print("DONE!") + +if __name__ == '__main__': + main() diff --git a/DepthEstimation/main.py b/DepthEstimation/main.py new file mode 100644 index 0000000..53f7675 --- /dev/null +++ b/DepthEstimation/main.py @@ -0,0 +1,171 @@ +import os +import sys +import argparse +import numpy as np +import cv2 +import torch +from torchvision import transforms +import matplotlib.cm as cm +import matplotlib.colors as colors +from PIL import Image + +sys.path.append('./Lite-Mono') +import networks +from layers import disp_to_depth + +MODEL_CONFIGS = { + 'tiny': { + 'name': 'lite-mono-tiny', + 'folder': 'lite-mono-tiny_640x192', + 'description': 'Lightest/fastest (2.2M params, 640x192)' + }, + 'best': { + 'name': 'lite-mono-8m', + 'folder': 'lite-mono-8m_1024x320', + 'description': 'Best quality/heaviest (8.7M params, 1024x320)' + } +} + + +class DepthEstimator: + def __init__(self, weightsFolder, model="lite-mono-tiny", useCuda=True): + self.device = torch.device("cuda" if torch.cuda.is_available() and useCuda else "cpu") + + encoderPath = os.path.join(weightsFolder, "encoder.pth") + decoderPath = os.path.join(weightsFolder, "depth.pth") + + encoderDict = torch.load(encoderPath, map_location=self.device) + decoderDict = torch.load(decoderPath, map_location=self.device) + + self.feedHeight = encoderDict['height'] + self.feedWidth = encoderDict['width'] + + print(f"Loading {model} model ({self.feedWidth}x{self.feedHeight}) on {self.device}") + + self.encoder = networks.LiteMono(model=model, height=self.feedHeight, width=self.feedWidth) + modelDict = self.encoder.state_dict() + self.encoder.load_state_dict({k: v for k, v in encoderDict.items() if k in modelDict}) + self.encoder.to(self.device) + self.encoder.eval() + + self.depthDecoder = networks.DepthDecoder(self.encoder.num_ch_enc, scales=range(3)) + depthModelDict = self.depthDecoder.state_dict() + self.depthDecoder.load_state_dict({k: v for k, v in decoderDict.items() if k in depthModelDict}) + self.depthDecoder.to(self.device) + self.depthDecoder.eval() + + self.toTensor = transforms.ToTensor() + + def processFrame(self, frame): + originalHeight, originalWidth = frame.shape[:2] + + frameRgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) + framePil = Image.fromarray(frameRgb) + frameResized = framePil.resize((self.feedWidth, self.feedHeight), Image.LANCZOS) + + inputTensor = self.toTensor(frameResized).unsqueeze(0).to(self.device) + + with torch.no_grad(): + features = self.encoder(inputTensor) + outputs = self.depthDecoder(features) + + disp = outputs[("disp", 0)] + dispResized = torch.nn.functional.interpolate( + disp, (originalHeight, originalWidth), mode="bilinear", align_corners=False) + + dispNp = dispResized.squeeze().cpu().numpy() + + vmax = np.percentile(dispNp, 95) + normalizer = colors.Normalize(vmin=dispNp.min(), vmax=vmax) + mapper = cm.ScalarMappable(norm=normalizer, cmap='magma') + colormapped = (mapper.to_rgba(dispNp)[:, :, :3] * 255).astype(np.uint8) + + return colormapped, dispNp + + def processVideo(self, videoPath, outputFolder="output", saveFrames=True, displayLive=False): + os.makedirs(outputFolder, exist_ok=True) + + cap = cv2.VideoCapture(videoPath) + if not cap.isOpened(): + raise ValueError(f"Cannot open video: {videoPath}") + + fps = cap.get(cv2.CAP_PROP_FPS) + frameCount = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) + width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) + height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) + + print(f"Processing video: {frameCount} frames at {fps:.2f} FPS ({width}x{height})") + + outputVideoPath = os.path.join(outputFolder, "depth_output.mp4") + fourcc = cv2.VideoWriter_fourcc(*'mp4v') + outVideo = cv2.VideoWriter(outputVideoPath, fourcc, fps, (width, height)) + + frameIdx = 0 + while True: + ret, frame = cap.read() + if not ret: + break + + depthColor, depthRaw = self.processFrame(frame) + depthBgr = cv2.cvtColor(depthColor, cv2.COLOR_RGB2BGR) + + outVideo.write(depthBgr) + + if saveFrames and frameIdx % 30 == 0: + cv2.imwrite(os.path.join(outputFolder, f"frame_{frameIdx:06d}_depth.png"), depthBgr) + np.save(os.path.join(outputFolder, f"frame_{frameIdx:06d}_depth.npy"), depthRaw) + + if displayLive: + combined = np.hstack((frame, depthBgr)) + cv2.imshow('Original | Depth', combined) + if cv2.waitKey(1) & 0xFF == ord('q'): + break + + frameIdx += 1 + if frameIdx % 30 == 0: + print(f"Processed {frameIdx}/{frameCount} frames ({100*frameIdx/frameCount:.1f}%)") + + cap.release() + outVideo.release() + if displayLive: + cv2.destroyAllWindows() + + print(f"\nComplete! Processed {frameIdx} frames") + print(f"Output video: {outputVideoPath}") + return outputVideoPath + + +def main(): + parser = argparse.ArgumentParser(description='Lite-Mono Depth Estimation') + parser.add_argument('--model', type=str, default='tiny', choices=['tiny', 'best'], + help='Model to use: tiny (fastest) or best (highest quality)') + parser.add_argument('--video', type=str, default='./TestData/test1.mp4', + help='Path to input video') + parser.add_argument('--output', type=str, default=None, + help='Output folder (default: ./output_)') + parser.add_argument('--cuda', action='store_true', default=True, + help='Use CUDA if available') + + args = parser.parse_args() + + config = MODEL_CONFIGS[args.model] + weightsFolder = f"./weights/{config['folder']}" + outputFolder = args.output or f"./output_{args.model}" + + print(f"Using model: {config['description']}") + + if not os.path.exists(weightsFolder): + print(f"Error: Weights folder not found: {weightsFolder}") + print("Please download the model weights first") + return + + if not os.path.exists(args.video): + print(f"Error: Video not found: {args.video}") + return + + estimator = DepthEstimator(weightsFolder, model=config['name'], useCuda=args.cuda) + estimator.processVideo(args.video, outputFolder=outputFolder, saveFrames=True, displayLive=False) + + +if __name__ == '__main__': + main() diff --git a/Integration/coneSegmentor.py b/Integration/coneSegmentor.py new file mode 100644 index 0000000..06ef833 --- /dev/null +++ b/Integration/coneSegmentor.py @@ -0,0 +1,72 @@ +import sys +import os +sys.path.append(os.path.join(os.path.dirname(__file__), '..', 'Segmentation-YOLO')) + +import torch +import numpy as np +from ultralytics import YOLO + +class ConeSegmentor: + def __init__(self, weightsPath=None, confThreshold=0.25, device='auto'): + if weightsPath is None: + weightsPath = os.path.join( + os.path.dirname(__file__), + '..', + 'Segmentation-YOLO', + 'runs', + 'segment', + 'cone-segmentation', + 'weights', + 'best.pt' + ) + + if device == 'auto': + if torch.cuda.is_available(): + self.device = 'cuda' + elif torch.backends.mps.is_available(): + self.device = 'mps' + else: + self.device = 'cpu' + else: + self.device = device + + self.confThreshold = confThreshold + self.model = YOLO(weightsPath) + + def segment(self, image): + if not isinstance(image, np.ndarray): + raise ValueError("Input image must be a numpy array") + + if len(image.shape) != 3 or image.shape[2] != 3: + raise ValueError("Input image must be a 3-channel RGB image (H, W, 3)") + + results = self.model.predict( + source=image, + conf=self.confThreshold, + save=False, + device=self.device, + verbose=False + ) + + result = results[0] + + output = { + 'masks': None, + 'boxes': None, + 'classes': None, + 'confidences': None, + 'numDetections': 0 + } + + if result.masks is not None: + numDetections = len(result.masks) + output['numDetections'] = numDetections + output['masks'] = result.masks.data.cpu().numpy() + output['boxes'] = result.boxes.xyxy.cpu().numpy() + output['classes'] = [{ + 'id': int(cls), + 'name': result.names[int(cls)] + } for cls in result.boxes.cls] + output['confidences'] = result.boxes.conf.cpu().numpy() + + return output diff --git a/Integration/depthEstimator.py b/Integration/depthEstimator.py new file mode 100644 index 0000000..5469abe --- /dev/null +++ b/Integration/depthEstimator.py @@ -0,0 +1,46 @@ +import sys +import os +sys.path.append(os.path.join(os.path.dirname(__file__), '..', 'DepthEstimation', 'Prior-Depth-Anything')) + +import torch +import numpy as np +from prior_depth_anything import PriorDepthAnything + +class DepthEstimator: + def __init__(self, device='auto', modelSize='vits'): + if device == 'auto': + if torch.cuda.is_available(): + self.device = "cuda:0" + elif torch.backends.mps.is_available(): + self.device = "mps" + else: + self.device = "cpu" + else: + self.device = device + + self.model = PriorDepthAnything( + device=self.device, + frozen_model_size=modelSize, + conditioned_model_size=modelSize + ) + + def estimateDepth(self, image, pattern=1000): + if not isinstance(image, np.ndarray): + raise ValueError("Input image must be a numpy array") + + if len(image.shape) != 3 or image.shape[2] != 3: + raise ValueError("Input image must be a 3-channel RGB image (H, W, 3)") + + depthTensor = self.model.infer_one_sample( + image=image, + prior=image, + pattern=pattern, + visualize=False + ) + + if isinstance(depthTensor, torch.Tensor): + depthMap = depthTensor.cpu().numpy() + else: + depthMap = depthTensor + + return depthMap diff --git a/PathPlanning/README.md b/PathPlanning/README.md new file mode 100644 index 0000000..8d924c7 --- /dev/null +++ b/PathPlanning/README.md @@ -0,0 +1,306 @@ +# Path Planning Module + +This module implements real-time path planning for the UCSC FSAE driverless vehicle, generating optimal racing paths at 25 Hz. + +**Based on**: AMZ Driverless (ETH Zurich) boundary estimation algorithm +- **Paper**: "AMZ Driverless: The Full Autonomous Racing System" (Kabzan et al., 2019) +- **Competition Results**: 1st place FSG 2017, 2018, FSI 2018 +- **Performance**: 28.48s fastest lap, 1.5g lateral acceleration +- **Reference**: arXiv:1905.05150v1, Section 4.3 +- **GitHub**: https://github.com/AMZ-Driverless + +## Module Overview + +### Core Files + +1. **config.py** - Configuration and constants + - Track constraints (min width 3m, turning radius 4.5m) + - Performance parameters (25 Hz, beam width) + - Cost function weights (qc, qw, qs, qcolor, ql) + - Vehicle dynamics parameters + - Sensor range (~10m) + +2. **delaunay.py** - Delaunay Triangulation **[IN PROGRESS]** + - Uses `scipy.spatial.Delaunay` for triangulation (NOT CGAL) + - Creates triangulation from cone [x, y] positions + - Extracts midpoint graph for waypoints + - **Important**: Handles duplicate midpoints (interior edges shared by 2 triangles) + - Visualization utilities (matplotlib) + - **AMZ Implementation**: Used CGAL library, we use scipy for simplicity + - **Status**: In Progress (Nov 8-15) + +3. **path_tree.py** - Path Tree Population (Breadth-First) + - Builds breadth-first tree of possible paths through midpoint graph + - Starting node: current vehicle position + - Manages PathNode objects with parent/child relationships + - Each path connects to center points of surrounding edges + - Limited tree depth to meet 40ms cycle time constraint + - **AMZ approach**: Breadth-first manor, fixed iteration limit + - **Status**: Not Started (Deadline: Nov 22) + +4. **cost_functions.py** - Path Cost Evaluation (5 AMZ Terms) + + Implements the exact AMZ cost function with 5 weighted terms: + + - **Term 1**: Maximum angle change between path segments + - Rationale: Sharp corners use multiple cones, sudden angles unlikely + + - **Term 2**: Track width standard deviation (~3m expected) + - Rationale: Rules specify 3m min width, unlikely to change drastically + + - **Term 3**: Cone spacing standard deviation (~5m max expected) + - Rationale: Cones typically spaced evenly along boundaries + + - **Term 4**: Wrong color probability (blue=left, yellow=right) + - Rationale: Uses probabilistic color estimates from perception + - Becomes zero if no color information available + + - **Term 5**: Path length deviation from sensor range (~10m) + - Rationale: Penalize too short/long paths, prefer sensor range length + + **Cost Function Formula**: + ```python + cost = qc * normalize(max_angle_change)² + + qw * normalize(track_width_stddev)² + + qs * normalize(cone_spacing_stddev)² + + qcolor * normalize(wrong_color_prob)² + + ql * normalize(path_length_diff)² + ``` + + - Each term normalized, squared, and weighted + - Weights (qc, qw, qs, qcolor, ql) tuned experimentally + - **Status**: Not Started (Deadline: Dec 3) + +5. **beam_search.py** - Beam Search Pruning + - Prunes path tree to keep only top-k best paths at each level + - Prevents exponential growth of search tree + - Limits computational complexity for real-time performance + - Integrates cost function evaluation + - **AMZ result**: Only 4.2% of paths left track, mostly at >7m distance + - **Status**: Not Started (Deadline: Nov 29) + +6. **path_smoother.py** - Spline Interpolation + - Smooths discrete waypoints using scipy.interpolate + - Options: splprep/splev (parametric splines) or CubicSpline + - Calculates curvature profiles for velocity planning + - Generates velocity profiles respecting dynamics + - Resampling utilities for control planning + - **AMZ approach**: Feeds smoothed path to pure pursuit controller + - **Status**: Not Started (Deadline: Dec 3) + +7. **path_planner.py** - Main Integration Module + - 25 Hz planning loop (40ms cycle time) + - Integrates all 5 phases of algorithm + - SLAM interface (input): receives cone positions at 25 Hz + - Control planning interface (output): sends path data at 25 Hz + - Real-time performance monitoring + - **AMZ timing**: 50ms in SLAM mode, 40ms in racing mode + - **Status**: Not Started (Deadline: Dec 6) + +## Algorithm Pipeline + +Based on AMZ Section 4.3: Boundary Estimation + +``` +SLAM -> Cones -> Delaunay -> Waypoints -> Path Tree -> Beam Search -> Best Path -> Spline Smooth -> Control + | | + +------------------------------------ 25 Hz Loop (40ms cycle time) ----------------------------------+ +``` + +### Phase 1: Graph Generation (Delaunay Triangulation) +- **Input**: Cone positions [x, y] from SLAM +- **Library**: scipy.spatial.Delaunay (AMZ uses CGAL) +- **Process**: + 1. Create Delaunay triangulation of all visible cones + 2. Triangles maximize minimum internal angles + 3. Extract midpoints of all triangle edges + 4. Handle duplicate midpoints from shared interior edges +- **Output**: Midpoint waypoint graph +- **AMZ approach**: Discretizes X-Y space into connected triangles + +### Phase 2: Tree Population (Breadth-First Search) +- **Input**: Waypoints + current vehicle position +- **Process**: + 1. Start from vehicle position + 2. Build tree by connecting to surrounding edge midpoints + 3. Each level represents one waypoint ahead + 4. Branches represent alternative route choices + 5. Limit depth by iteration count (computational budget) +- **Output**: Tree of all possible paths +- **AMZ approach**: Breadth-first manor, explores all possible center lines + +### Phase 3: Beam Search (Cost-Based Pruning) +- **Input**: Path tree from Phase 2 +- **Process**: + 1. Evaluate each path using 5-term cost function + 2. Normalize, square, and weight each cost term + 3. Keep only top-k paths with lowest cost at each level + 4. Discard worst paths (beam search pruning) +- **Output**: Top-k best paths +- **AMZ result**: 4.2% invalid paths at FSG 2018, mostly >7m away + +### Phase 4: Path Selection & Smoothing +- **Input**: Best path waypoints from Phase 3 +- **Process**: + 1. Select path with absolute lowest cost + 2. Fit spline through discrete waypoints + 3. Calculate curvature along path +- **Output**: Smooth continuous path with curvature +- **AMZ approach**: Passes to pure pursuit controller + +### Phase 5: Control Interface +- **Input**: Smooth path + curvatures +- **Process**: + 1. Calculate velocity profile (optional, may be control team's job) + 2. Package waypoints, velocities, curvatures + 3. Send to control planning at 25 Hz +- **Output**: Path data for control planning +- **Coordinate with**: Ray Zou, Nathan Margolis (Control Planning team) + +## Running Tests + +Each module has standalone tests in its `if __name__ == "__main__"` block: + +```bash +# Test Delaunay triangulation (CURRENTLY WORKING) +python PathPlanning/delaunay.py + +# Test path tree (after implementation) +python PathPlanning/path_tree.py + +# Test cost functions (after implementation) +python PathPlanning/cost_functions.py + +# Test beam search (after implementation) +python PathPlanning/beam_search.py + +# Test path smoother (after implementation) +python PathPlanning/path_smoother.py + +# Test full integrated system (after implementation) +python PathPlanning/path_planner.py +``` + +### Delaunay Test Patterns + +The delaunay.py script includes multiple test patterns (modify `test_cone_data` variable): + +```python +# Available test patterns: +test_cone_data = oval_track # Realistic oval racing circuit +test_cone_data = simple_track # Basic straight-to-turn layout +test_cone_data = slalom # Chicane/slalom pattern +test_cone_data = grid # Stress test with regular grid +test_cone_data = random_scatter # Random cones for robustness test +``` + +## Performance Requirements + +- **Frequency**: 25 Hz (40ms cycle time) +- **Hardware**: Raspberry Pi (limited CPU/memory) +- **Real-time**: Must complete before next SLAM update +- **Robustness**: Handle noisy monocular vision data +- **AMZ benchmark**: 50ms cycle time in SLAM mode, achieved 1.5g lateral acceleration + +### Computational Budget + +- Delaunay triangulation: ~5-10ms (scipy is fast) +- Tree building: Variable (limit iterations) +- Cost evaluation: ~5-10ms (vectorize with NumPy) +- Beam search: ~5ms (fixed beam width) +- Spline smoothing: ~5ms +- **Total target**: <40ms on Raspberry Pi + +## Key Implementation Differences from AMZ + +| Aspect | AMZ (ETH Zurich) | Our Implementation | +|--------|------------------|-------------------| +| Triangulation Library | CGAL (C++) | scipy.spatial.Delaunay (Python) | +| Update Frequency | 50ms (20 Hz) SLAM mode | 40ms (25 Hz) target | +| Hardware | Industrial PC + GPU | Raspberry Pi (CPU only) | +| Sensors | LiDAR + Stereo Camera | Monocular Camera only | +| Programming Language | C++ / ROS | Python / ROS | +| Sensor Range | ~10m (LiDAR) | ~10m (monocular depth estimation) | + +**Key Challenge**: Monocular vision provides noisier/sparser cone detections than LiDAR, so our cost function needs to be more robust. + +## Integration Points + +### Input from Localization/SLAM +- **Team**: Parker Costa, Hansika Nerusa, Chanchal Mukeshsingh +- **Format**: TBD - coordinate on cone position data structure + - Need: [x, y] positions in global frame + - Need: Color probabilities [p_blue, p_yellow, p_orange, p_unknown] + - Need: Position uncertainties (covariance matrices) +- **Update rate**: 25 Hz + +### Output to Control Planning +- **Team**: Ray Zou, Nathan Margolis +- **Format**: TBD - coordinate on path/velocity data structure + - Provide: Waypoint coordinates [x, y] array + - Provide: Desired velocities at each waypoint (optional) + - Provide: Curvature at each waypoint + - Provide: Path confidence/quality metric +- **Update rate**: 25 Hz + +## Development Timeline + +| Task | Deadline | Status | +|------|----------|--------| +| Delaunay Triangulation (scipy) | Nov 8 | **IN PROGRESS** | +| Test & Visualize | Nov 15 | **IN PROGRESS** | +| Path Tree Population | Nov 22 | Not Started | +| Beam Search Pruning | Nov 29 | Not Started | +| Cost Functions (5 AMZ terms) | Dec 3 | Not Started | +| Path Smoothing (scipy splines) | Dec 3 | Not Started | +| Control Interface | Dec 6 | Not Started | + +## Dependencies + +```python +import numpy as np +from scipy.spatial import Delaunay # Delaunay triangulation +from scipy.interpolate import splprep, splev, CubicSpline # Path smoothing +import matplotlib.pyplot as plt # Visualization +``` + +No CGAL required! Everything uses standard scipy/numpy. + +## AMZ Cost Function Tuning Guide + +The 5 cost weights need experimental tuning on your specific tracks: + +```python +# In config.py - starting values (tune these!) +qc = 1.0 # Angle change weight +qw = 1.0 # Track width variance weight +qs = 1.0 # Cone spacing variance weight +qcolor = 2.0 # Color mismatch weight (higher = trust colors more) +ql = 0.5 # Path length deviation weight +``` + +**Tuning strategy**: +1. Start with all weights = 1.0 +2. Test on simple_track and oval_track patterns +3. Visualize which paths are selected +4. Increase weights for terms that should be more important +5. qcolor should be high if perception gives good colors, low if noisy +6. AMZ used squared and normalized costs, so relative weights matter more than absolute + +## Team + +- Nathan Yee +- Suhani Agarwal +- Aedan Benavides + +## References + +**Primary**: +- Kabzan et al. "AMZ Driverless: The Full Autonomous Racing System", 2019 +- Section 4.3: Boundary Estimation (our algorithm) +- Section 5: MPC Control (Control Planning team's reference) + +**Additional**: +- Delaunay, B. "Sur la sphère vide", 1934 (original Delaunay paper) +- scipy.spatial.Delaunay documentation +- Formula Student Germany Rules (track specifications) diff --git a/PathPlanning/beam_search.py b/PathPlanning/beam_search.py new file mode 100644 index 0000000..2741bab --- /dev/null +++ b/PathPlanning/beam_search.py @@ -0,0 +1,9 @@ +""" +Beam Search Path Pruning +Deadline: Nov 29, 2025 +""" + +import numpy as np +from typing import List +from path_tree import PathNode +import config diff --git a/PathPlanning/config.py b/PathPlanning/config.py new file mode 100644 index 0000000..7142bcd --- /dev/null +++ b/PathPlanning/config.py @@ -0,0 +1,45 @@ +""" +Path Planning Configuration +Contains all constants and parameters for the path planning system +""" + +# Performance Requirements +PLANNING_FREQUENCY_HZ = 25 # Hz - must compute new path 25 times/sec +CYCLE_TIME_MS = 40 # ms - max computation time (1000/25) + +# Track Constraints (from Formula Student Driverless rules) +MIN_TRACK_WIDTH = 3.0 # meters (DD.2.2.2.c) +MIN_TURNING_DIAMETER = 9.0 # meters (DD.2.2.2.d) +MIN_TURNING_RADIUS = MIN_TURNING_DIAMETER / 2 # 4.5 meters +MAX_STRAIGHT_LENGTH = 80.0 # meters (DD.2.2.2.a) +LAP_LENGTH_MIN = 200.0 # meters (DD.2.2.2.e) +LAP_LENGTH_MAX = 500.0 # meters (DD.2.2.2.e) + +# Cone Colors (from DD.1.1) +CONE_COLOR_BLUE = 0 # Left track border +CONE_COLOR_YELLOW = 1 # Right track border +CONE_COLOR_ORANGE_SMALL = 2 # Entry/exit lanes +CONE_COLOR_ORANGE_LARGE = 3 # Start/finish lines + +# Beam Search Parameters +BEAM_WIDTH = 10 # Number of best paths to keep at each tree level +MAX_TREE_DEPTH = 15 # Maximum number of waypoints to look ahead + +# Cost Function Weights +WEIGHT_BOUNDARY_VIOLATION = 1000.0 # Heavily penalize leaving track +WEIGHT_TURNING_RADIUS = 100.0 # Penalize sharp turns +WEIGHT_CURVATURE_CHANGE = 50.0 # Prefer smooth paths +WEIGHT_DISTANCE = 1.0 # Prefer shorter paths +WEIGHT_CENTER_LINE = 10.0 # Prefer staying near center of track + +# Path Smoothing Parameters +SPLINE_DEGREE = 3 # Cubic spline +SPLINE_SMOOTHING_FACTOR = 0.1 # Lower = smoother, higher = closer to waypoints + +# Vehicle Dynamics (estimated - coordinate with Control Planning team) +VEHICLE_MAX_SPEED = 20.0 # m/s (72 km/h - from competitor data) +VEHICLE_MAX_ACCELERATION = 15.0 # m/s^2 (0-100 km/h in ~2s) + +# Noise/Robustness Parameters +MAX_CONE_POSITION_ERROR = 0.5 # meters - expected SLAM noise from monocular vision +MIN_CONES_FOR_VALID_PATH = 4 # Minimum cones needed to plan a path diff --git a/PathPlanning/cost_functions.py b/PathPlanning/cost_functions.py new file mode 100644 index 0000000..66ef316 --- /dev/null +++ b/PathPlanning/cost_functions.py @@ -0,0 +1,8 @@ +""" +Cost Function Heuristics for Path Planning +Deadline: Dec 3, 2025 +""" + +import numpy as np +from typing import List, Tuple +import config diff --git a/PathPlanning/delaunay.py b/PathPlanning/delaunay.py new file mode 100644 index 0000000..8d7343d --- /dev/null +++ b/PathPlanning/delaunay.py @@ -0,0 +1,135 @@ +from scipy.spatial import Delaunay +from collections import defaultdict +import numpy as np +import matplotlib.pyplot as plt +import time + +def get_midpoints(cones): + # Get a midpoint graph from the Delaunay Triangulation + points = np.array(cones) + tri = Delaunay(points) + waypoint_graph = defaultdict(set) + + # Vectorize this for speedup instead of using a loop + # Get coordinates of each point + p1 = points[tri.simplices[:,0]] + p2 = points[tri.simplices[:,1]] + p3 = points[tri.simplices[:,2]] + + # Compute midpoints of each edge and store + wayp1p2 = ((p1 + p2) / 2) + wayp1p3 = ((p1 + p3) / 2) + wayp2p3 = ((p2 + p3) / 2) + + for i in range(len(wayp1p2)): + wp1, wp2, wp3 = wayp1p2[i], wayp1p3[i], wayp2p3[i] + # Convert to tuples (hashable for dict keys) + wp1_tuple = tuple(np.round(wp1, decimals=6)) + wp2_tuple = tuple(np.round(wp2, decimals=6)) + wp3_tuple = tuple(np.round(wp3, decimals=6)) + # Each waypoint connects to the other 2 in the triangle + waypoint_graph[wp1_tuple].add(wp2_tuple) + waypoint_graph[wp1_tuple].add(wp3_tuple) + waypoint_graph[wp2_tuple].add(wp1_tuple) + waypoint_graph[wp2_tuple].add(wp3_tuple) + waypoint_graph[wp3_tuple].add(wp1_tuple) + waypoint_graph[wp3_tuple].add(wp2_tuple) + + # Stack all the waypoint arrays into one vertically + waypoints = np.unique(np.vstack([wayp1p2, wayp1p3, wayp2p3]), axis=0) + + return waypoints, waypoint_graph, tri + +def visualize(test_cases): + # Visualize one or more tracks with triangulation and waypoint graph + # test_cases: list of (name, cone_data) tuples + + n = len(test_cases) + cols = min(n, 3) + rows = (n + cols - 1) // cols + fig, axes = plt.subplots(rows, cols, figsize=(6*cols, 5*rows), squeeze=False) + axes = axes.flatten() + + for i, (name, cone_data) in enumerate(test_cases): + ax = axes[i] + points = np.array(cone_data) + waypoints, waypoint_graph, tri = get_midpoints(cone_data) + + # Plot triangulation and cones + ax.triplot(points[:, 0], points[:, 1], tri.simplices, 'b-', linewidth=0.5) + ax.plot(points[:, 0], points[:, 1], 'ro', markersize=6, label="Cones") + ax.plot(waypoints[:, 0], waypoints[:, 1], 'go', markersize=4, label="Waypoints") + + # Draw waypoint graph connections + for wp, neighbors in waypoint_graph.items(): + for neighbor in neighbors: + ax.plot([wp[0], neighbor[0]], [wp[1], neighbor[1]], 'g-', linewidth=0.8, alpha=0.4) + + ax.set_title(f'{name} ({len(points)} cones)') + ax.legend(fontsize=8) + ax.grid(True, alpha=0.3) + ax.set_aspect('equal', adjustable='box') + + # Hide unused subplots + for j in range(n, len(axes)): + axes[j].set_visible(False) + + plt.tight_layout() + plt.show() + +if __name__ == "__main__": + # Test data option 1: Simple track-like pattern + simple_track = [ + [0, 0], [1, 0], [2, 0.5], [3, 1], [4, 1.5], + [4, -0.5], [3, -1], [2, -1.5], [1, -1], [0, -0.5] + ] + + # Test data option 2: Oval/loop track (more realistic) + oval_track = [ + # Outer boundary + [0, 0], [2, -1], [4, -1.5], [6, -1.5], [8, -1], [10, 0], + [10, 2], [8, 3], [6, 3.5], [4, 3.5], [2, 3], [0, 2], + # Inner boundary + [2, 0.5], [4, 0], [6, 0], [8, 0.5], [8, 1.5], [6, 2], + [4, 2], [2, 1.5] + ] + + # Test data option 3: Slalom/chicane pattern + slalom = [ + [0, 0], [1, 1], [2, -1], [3, 1.5], [4, -1.5], + [5, 2], [6, -2], [7, 2.5], [8, -2.5], [9, 0], + [0, -0.5], [1, -2], [2, 1], [3, -2.5], [4, 2], + [5, -3], [6, 2.5], [7, -3.5], [8, 3], [9, 0.5] + ] + + # Test data option 4: Grid pattern (stress test) + grid = [] + for x in range(0, 10, 2): + for y in range(-4, 5, 2): + grid.append([x, y]) + + # Test data option 5: Random scattered (realistic chaos) + import random + random.seed(42) + random_scatter = [[random.uniform(0, 10), random.uniform(-3, 3)] for _ in range(30)] + + # Choose which test data to use + test_cases = [ + ("Random 30", random_scatter), + ("Simple Track", simple_track), + ("Slalom", slalom), + ("Grid", grid), + ("Oval Track", oval_track) + ] + + for name, cone_data in test_cases: + points = np.array(cone_data) + + start = time.perf_counter() + waypoints, waypoint_graph, tri = get_midpoints(cone_data) + elapsed = time.perf_counter() - start + + print(f"{name:15} | Cones: {len(points):3} | Time: {elapsed*1000:6.2f} ms | Hz: {1/elapsed:6.1f}") + + # Visualize all test tracks + visualize(test_cases) diff --git a/PathPlanning/path_planner.py b/PathPlanning/path_planner.py new file mode 100644 index 0000000..a4828b8 --- /dev/null +++ b/PathPlanning/path_planner.py @@ -0,0 +1,13 @@ +""" +Main Path Planning Module +Deadline: Dec 6, 2025 +""" + +import numpy as np +from typing import Tuple +from delaunay import create_delaunay_triangulation, get_midpoint_graph +from path_tree import PathTree +from cost_functions import PathCostEvaluator +from beam_search import BeamSearchPathPlanner +from path_smoother import PathSmoother +import config diff --git a/PathPlanning/path_smoother.py b/PathPlanning/path_smoother.py new file mode 100644 index 0000000..f98ed55 --- /dev/null +++ b/PathPlanning/path_smoother.py @@ -0,0 +1,9 @@ +""" +Path Smoothing using Spline Interpolation +Deadline: Dec 3, 2025 +""" + +import numpy as np +from typing import List, Tuple +from scipy.interpolate import splprep, splev, CubicSpline +import config diff --git a/PathPlanning/path_tree.py b/PathPlanning/path_tree.py new file mode 100644 index 0000000..524ff6d --- /dev/null +++ b/PathPlanning/path_tree.py @@ -0,0 +1,9 @@ +""" +Path Tree Population Module +Deadline: Nov 22, 2025 +""" + +import numpy as np +from typing import List, Optional +from dataclasses import dataclass +import config diff --git a/SampleData/extractFrames.py b/SampleData/extractFrames.py new file mode 100644 index 0000000..156d83c --- /dev/null +++ b/SampleData/extractFrames.py @@ -0,0 +1,30 @@ +import cv2 +import os + +videoPath = 'driverless.mp4' +outputFolder = 'driverless-10fps' +targetFps = 10 + +cap = cv2.VideoCapture(videoPath) +videoFps = cap.get(cv2.CAP_PROP_FPS) +frameInterval = int(videoFps / targetFps) + +os.makedirs(outputFolder, exist_ok=True) + +frameCount = 0 +savedCount = 0 + +while True: + ret, frame = cap.read() + if not ret: + break + + if frameCount % frameInterval == 0: + framePath = os.path.join(outputFolder, f'frame_{savedCount:04d}.jpg') + cv2.imwrite(framePath, frame) + savedCount += 1 + + frameCount += 1 + +cap.release() +print('done') diff --git a/Segmentation-YOLO/batchInference.py b/Segmentation-YOLO/batchInference.py new file mode 100644 index 0000000..939b130 --- /dev/null +++ b/Segmentation-YOLO/batchInference.py @@ -0,0 +1,95 @@ +import torch +from ultralytics import YOLO +import argparse +from pathlib import Path +from tqdm import tqdm + +def runBatchInference(weightsPath, sourceDir, confThreshold=0.25, saveResults=True, outputDir=None): + """ + Run YOLOv8 segmentation inference on multiple images + """ + device = 'mps' if torch.backends.mps.is_available() else 'cpu' + print(f"Loading model from {weightsPath}") + print(f"Using device: {device}") + + model = YOLO(weightsPath) + + sourcePath = Path(sourceDir) + if not sourcePath.exists(): + print(f"Error: Source directory not found: {sourceDir}") + return + + imageExtensions = {'.jpg', '.jpeg', '.png', '.bmp', '.tiff'} + imageFiles = [f for f in sourcePath.iterdir() + if f.is_file() and f.suffix.lower() in imageExtensions] + + if len(imageFiles) == 0: + print(f"No images found in {sourceDir}") + return + + print(f"\nFound {len(imageFiles)} images") + print(f"Confidence threshold: {confThreshold}") + + totalDetections = 0 + classDetections = {} + + results = model.predict( + source=str(sourcePath), + conf=confThreshold, + save=saveResults, + device=device, + verbose=False, + stream=True + ) + + print("\nProcessing images...") + for result in tqdm(results, total=len(imageFiles)): + if result.masks is not None: + numDetections = len(result.masks) + totalDetections += numDetections + + for cls in result.boxes.cls: + className = result.names[int(cls)] + classDetections[className] = classDetections.get(className, 0) + 1 + + print("\n" + "="*60) + print("Batch Inference Complete!") + print("="*60) + print(f"\nTotal images processed: {len(imageFiles)}") + print(f"Total detections: {totalDetections}") + print(f"Average detections per image: {totalDetections/len(imageFiles):.2f}") + + if classDetections: + print("\nDetections by class:") + for className, count in sorted(classDetections.items(), key=lambda x: x[1], reverse=True): + print(f" {className}: {count}") + + if saveResults: + print(f"\nResults saved to: runs/segment/predict") + +def main(): + parser = argparse.ArgumentParser(description='YOLOv8 Batch Segmentation Inference') + parser.add_argument('--weights', type=str, required=True, + help='Path to trained weights (.pt file)') + parser.add_argument('--source', type=str, default='dataset/images/test', + help='Path to directory containing images') + parser.add_argument('--conf', type=float, default=0.25, + help='Confidence threshold') + parser.add_argument('--save', action='store_true', default=True, + help='Save results') + + args = parser.parse_args() + + if not Path(args.weights).exists(): + print(f"Error: Weights file not found: {args.weights}") + return + + runBatchInference( + weightsPath=args.weights, + sourceDir=args.source, + confThreshold=args.conf, + saveResults=args.save + ) + +if __name__ == '__main__': + main() diff --git a/Segmentation-YOLO/convertDataset.py b/Segmentation-YOLO/convertDataset.py new file mode 100644 index 0000000..e4b7772 --- /dev/null +++ b/Segmentation-YOLO/convertDataset.py @@ -0,0 +1,271 @@ +import json +import os +import shutil +from pathlib import Path +import numpy as np +import cv2 +from tqdm import tqdm +from pycocotools import mask as cocoMask + +def decodeUncompressedRLE(rleList, imageHeight, imageWidth): + """ + Decode uncompressed RLE format to binary mask + Format: [start_index, run_length, start_index, run_length, ...] + Where start_index = y * width + x (flattened coordinates) + """ + mask = np.zeros((imageHeight, imageWidth), dtype=np.uint8) + + for i in range(0, len(rleList), 2): + if i + 1 >= len(rleList): + break + + startIndex = int(rleList[i]) + runLength = int(rleList[i + 1]) + + for j in range(runLength): + index = startIndex + j + if index >= imageHeight * imageWidth: + continue + + y = index // imageWidth + x = index % imageWidth + + if 0 <= y < imageHeight and 0 <= x < imageWidth: + mask[y, x] = 1 + + return mask + +def isUncompressedRLE(poly, imageWidth, imageHeight): + """ + Detect if polygon is actually uncompressed RLE format + RLE has very large values (flattened indices) alternating with small values (run lengths) + """ + if len(poly) < 4: + return False + + maxVal = max(poly) + threshold = max(imageWidth, imageHeight) * 2 + + if maxVal <= threshold: + return False + + evenVals = [poly[i] for i in range(0, len(poly), 2)] + oddVals = [poly[i] for i in range(1, len(poly), 2)] + + avgEven = sum(evenVals) / len(evenVals) if evenVals else 0 + avgOdd = sum(oddVals) / len(oddVals) if oddVals else 0 + + return avgEven > threshold and avgOdd < max(imageWidth, imageHeight) + +def decodeCocoSegmentation(segmentation, imageHeight, imageWidth): + """ + Decode COCO segmentation to polygon coordinates + Handles RLE (compressed and uncompressed) and polygon formats + """ + if isinstance(segmentation, dict): + if 'counts' in segmentation: + mask = cocoMask.decode(segmentation) + else: + return [] + elif isinstance(segmentation, list): + if len(segmentation) == 0: + return [] + + polygons = [] + + if isinstance(segmentation[0], list): + for poly in segmentation: + if len(poly) < 6: + continue + + if len(poly) % 2 != 0: + poly = poly[:-1] + + if isUncompressedRLE(poly, imageWidth, imageHeight): + mask = decodeUncompressedRLE(poly, imageHeight, imageWidth) + elif max(poly) < max(imageWidth, imageHeight) * 1.5: + polygons.append(poly) + continue + else: + try: + rle = cocoMask.frPyObjects([poly], imageHeight, imageWidth) + mask = cocoMask.decode(rle) + except: + continue + + contours, _ = cv2.findContours(mask.astype(np.uint8), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) + + for contour in contours: + if len(contour) >= 3: + epsilon = 0.002 * cv2.arcLength(contour, True) + approx = cv2.approxPolyDP(contour, epsilon, True) + + if len(approx) >= 3: + polygon = approx.reshape(-1).astype(float).tolist() + if len(polygon) >= 6: + polygons.append(polygon) + + return polygons + else: + if len(segmentation) % 2 != 0: + segmentation = segmentation[:-1] + + if isUncompressedRLE(segmentation, imageWidth, imageHeight): + mask = decodeUncompressedRLE(segmentation, imageHeight, imageWidth) + elif max(segmentation) < max(imageWidth, imageHeight) * 1.5: + return [segmentation] + else: + try: + rle = cocoMask.frPyObjects([segmentation], imageHeight, imageWidth) + mask = cocoMask.decode(rle) + except: + return [] + else: + return [] + + contours, _ = cv2.findContours(mask.astype(np.uint8), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) + + polygons = [] + for contour in contours: + if len(contour) >= 3: + epsilon = 0.002 * cv2.arcLength(contour, True) + approx = cv2.approxPolyDP(contour, epsilon, True) + + if len(approx) >= 3: + polygon = approx.reshape(-1).astype(float).tolist() + if len(polygon) >= 6: + polygons.append(polygon) + + return polygons + +def convertCocoToYolo(cocoJsonPath, imagesRootPath, outputPath, trainRatio=0.8, valRatio=0.1): + """ + Convert COCO segmentation format to YOLO segmentation format + """ + print(f"Loading COCO annotations from {cocoJsonPath}") + with open(cocoJsonPath, 'r') as f: + cocoData = json.load(f) + + categories = {cat['id']: idx for idx, cat in enumerate(cocoData['categories'])} + categoryNames = [cat['name'] for cat in sorted(cocoData['categories'], key=lambda x: x['id'])] + + print(f"Found {len(categories)} classes: {categoryNames}") + + imageIdToFilename = {img['id']: img['file_name'] for img in cocoData['images']} + imageIdToDimensions = {img['id']: (img['width'], img['height']) for img in cocoData['images']} + + imageAnnotations = {} + for ann in cocoData['annotations']: + imageId = ann['image_id'] + if imageId not in imageAnnotations: + imageAnnotations[imageId] = [] + imageAnnotations[imageId].append(ann) + + print(f"Processing {len(imageAnnotations)} images with annotations") + + outputPath = Path(outputPath) + trainImgPath = outputPath / 'images' / 'train' + valImgPath = outputPath / 'images' / 'val' + testImgPath = outputPath / 'images' / 'test' + trainLblPath = outputPath / 'labels' / 'train' + valLblPath = outputPath / 'labels' / 'val' + testLblPath = outputPath / 'labels' / 'test' + + for path in [trainImgPath, valImgPath, testImgPath, trainLblPath, valLblPath, testLblPath]: + path.mkdir(parents=True, exist_ok=True) + + allImageIds = list(imageAnnotations.keys()) + np.random.seed(42) + np.random.shuffle(allImageIds) + + trainSplit = int(len(allImageIds) * trainRatio) + valSplit = int(len(allImageIds) * (trainRatio + valRatio)) + + trainIds = allImageIds[:trainSplit] + valIds = allImageIds[trainSplit:valSplit] + testIds = allImageIds[valSplit:] + + print(f"Split: Train={len(trainIds)}, Val={len(valIds)}, Test={len(testIds)}") + + def processSplit(imageIds, imgPath, lblPath, splitName): + print(f"\nProcessing {splitName} split...") + skippedCount = 0 + totalPolygons = 0 + failedDecodes = 0 + + for imageId in tqdm(imageIds, desc=f"Converting {splitName}"): + filename = imageIdToFilename[imageId] + width, height = imageIdToDimensions[imageId] + + sourcePath = Path(imagesRootPath) / filename + if not sourcePath.exists(): + skippedCount += 1 + continue + + stem = Path(filename).stem + targetImgPath = imgPath / Path(filename).name + shutil.copy(sourcePath, targetImgPath) + + yoloAnnotations = [] + for ann in imageAnnotations[imageId]: + if 'segmentation' not in ann or not ann['segmentation']: + continue + + classId = categories[ann['category_id']] + segmentation = ann['segmentation'] + + try: + polygons = decodeCocoSegmentation(segmentation, height, width) + except Exception as e: + failedDecodes += 1 + continue + + for polygon in polygons: + if len(polygon) < 6: + continue + + normalizedPolygon = [] + for i in range(0, len(polygon), 2): + x = max(0.0, min(1.0, polygon[i] / width)) + y = max(0.0, min(1.0, polygon[i + 1] / height)) + normalizedPolygon.extend([x, y]) + + if len(normalizedPolygon) >= 6: + polyStr = ' '.join(f'{coord:.6f}' for coord in normalizedPolygon) + yoloAnnotations.append(f"{classId} {polyStr}") + totalPolygons += 1 + + if yoloAnnotations: + labelPath = lblPath / f"{stem}.txt" + with open(labelPath, 'w') as f: + f.write('\n'.join(yoloAnnotations)) + + print(f"Converted {totalPolygons} polygons") + if failedDecodes > 0: + print(f"Failed to decode {failedDecodes} annotations") + if skippedCount > 0: + print(f"Skipped {skippedCount} images (not found)") + + processSplit(trainIds, trainImgPath, trainLblPath, 'train') + processSplit(valIds, valImgPath, valLblPath, 'val') + processSplit(testIds, testImgPath, testLblPath, 'test') + + print(f"\nDataset conversion complete!") + print(f"Output directory: {outputPath}") + print(f"Classes: {categoryNames}") + + return categoryNames + +def main(): + cocoJsonPath = '../Data/fsoco_segmentation_train/train_coco.json' + imagesRootPath = '../Data/fsoco_segmentation_train' + outputPath = './dataset' + + categoryNames = convertCocoToYolo(cocoJsonPath, imagesRootPath, outputPath) + + print(f"\nNext steps:") + print(f"1. Verify dataset structure in {outputPath}") + print(f"2. Run train.py to start training") + +if __name__ == '__main__': + main() diff --git a/Segmentation-YOLO/inference.py b/Segmentation-YOLO/inference.py new file mode 100644 index 0000000..c4559b5 --- /dev/null +++ b/Segmentation-YOLO/inference.py @@ -0,0 +1,84 @@ +import torch +from ultralytics import YOLO +import argparse +import cv2 +from pathlib import Path + +def runInference(weightsPath, sourcePath, confThreshold=0.25, saveResults=True, showResults=False): + """ + Run YOLOv8 segmentation inference on a single image + """ + device = 'mps' if torch.backends.mps.is_available() else 'cpu' + print(f"Loading model from {weightsPath}") + print(f"Using device: {device}") + + model = YOLO(weightsPath) + + print(f"\nRunning inference on: {sourcePath}") + print(f"Confidence threshold: {confThreshold}") + + results = model.predict( + source=sourcePath, + conf=confThreshold, + save=saveResults, + device=device, + show=showResults, + verbose=True + ) + + result = results[0] + + if result.masks is not None: + numDetections = len(result.masks) + print(f"\nDetected {numDetections} cones:") + + for i, (box, cls, conf, mask) in enumerate(zip( + result.boxes.xyxy, + result.boxes.cls, + result.boxes.conf, + result.masks.data + )): + className = result.names[int(cls)] + print(f" {i+1}. {className}: {conf:.2f}") + + if saveResults: + outputPath = results[0].save_dir + print(f"\nResults saved to: {outputPath}") + else: + print("\nNo detections found") + + return results + +def main(): + parser = argparse.ArgumentParser(description='YOLOv8 Segmentation Inference') + parser.add_argument('--weights', type=str, required=True, + help='Path to trained weights (.pt file)') + parser.add_argument('--source', type=str, required=True, + help='Path to input image') + parser.add_argument('--conf', type=float, default=0.25, + help='Confidence threshold') + parser.add_argument('--save', action='store_true', default=True, + help='Save results') + parser.add_argument('--show', action='store_true', + help='Display results') + + args = parser.parse_args() + + if not Path(args.weights).exists(): + print(f"Error: Weights file not found: {args.weights}") + return + + if not Path(args.source).exists(): + print(f"Error: Source image not found: {args.source}") + return + + runInference( + weightsPath=args.weights, + sourcePath=args.source, + confThreshold=args.conf, + saveResults=args.save, + showResults=args.show + ) + +if __name__ == '__main__': + main() diff --git a/Segmentation-YOLO/train.py b/Segmentation-YOLO/train.py new file mode 100644 index 0000000..7537026 --- /dev/null +++ b/Segmentation-YOLO/train.py @@ -0,0 +1,111 @@ +import torch +from ultralytics import YOLO +import argparse + +def checkMpsAvailability(): + """ + Check if MPS (Metal Performance Shaders) is available for M-series acceleration + """ + if torch.backends.mps.is_available(): + print("MPS (Apple Silicon GPU) is available and will be used for training") + return 'mps' + else: + print("MPS not available, falling back to CPU") + return 'cpu' + +def trainModel( + modelSize='n', + epochs=100, + imgSize=416, + batchSize=8, + patience=20, + name='cone-segmentation', + resume=False +): + """ + Train YOLOv8 segmentation model on cone dataset + """ + device = checkMpsAvailability() + + modelName = f'yolov8{modelSize}-seg.pt' + print(f"\nLoading {modelName} pretrained model") + model = YOLO(modelName) + + print(f"\nTraining configuration:") + print(f" Model: {modelName}") + print(f" Device: {device}") + print(f" Epochs: {epochs}") + print(f" Image size: {imgSize}") + print(f" Batch size: {batchSize}") + print(f" Patience: {patience}") + print(f" Project name: {name}") + + results = model.train( + data='dataset.yaml', + epochs=epochs, + imgsz=imgSize, + batch=batchSize, + device=device, + patience=patience, + save=True, + plots=True, + project='runs/segment', + name=name, + exist_ok=True, + resume=resume, + verbose=True, + workers=4, + amp=False, + cache=False + ) + + print("\n" + "="*60) + print("Training Complete!") + print("="*60) + + metrics = model.val() + + print(f"\nValidation Results:") + print(f" Box mAP@0.5: {metrics.box.map50:.4f}") + print(f" Box mAP@0.5:0.95: {metrics.box.map:.4f}") + print(f" Mask mAP@0.5: {metrics.seg.map50:.4f}") + print(f" Mask mAP@0.5:0.95: {metrics.seg.map:.4f}") + + bestWeightsPath = f'runs/segment/{name}/weights/best.pt' + print(f"\nBest weights saved to: {bestWeightsPath}") + print(f"\nTo run inference:") + print(f" python inference.py --weights {bestWeightsPath} --source ") + + return results + +def main(): + parser = argparse.ArgumentParser(description='Train YOLOv8 Segmentation on Cone Dataset') + parser.add_argument('--model', type=str, default='n', choices=['n', 's', 'm', 'l', 'x'], + help='Model size: n(nano), s(small), m(medium), l(large), x(xlarge)') + parser.add_argument('--epochs', type=int, default=100, + help='Number of training epochs') + parser.add_argument('--img-size', type=int, default=416, + help='Training image size') + parser.add_argument('--batch', type=int, default=8, + help='Batch size') + parser.add_argument('--patience', type=int, default=20, + help='Early stopping patience (epochs without improvement)') + parser.add_argument('--name', type=str, default='cone-segmentation', + help='Experiment name') + parser.add_argument('--resume', action='store_true', + help='Resume training from last checkpoint') + + args = parser.parse_args() + + trainModel( + modelSize=args.model, + epochs=args.epochs, + imgSize=args.img_size, + batchSize=args.batch, + patience=args.patience, + name=args.name, + resume=args.resume + ) + +if __name__ == '__main__': + main() diff --git a/Segmentation-YOLO/utils.py b/Segmentation-YOLO/utils.py new file mode 100644 index 0000000..6c3dd25 --- /dev/null +++ b/Segmentation-YOLO/utils.py @@ -0,0 +1,319 @@ +import cv2 +import numpy as np +from pathlib import Path +from typing import Tuple, List +import matplotlib.pyplot as plt + +def xywhToXyxy(x: float, y: float, w: float, h: float, imgWidth: int, imgHeight: int) -> Tuple[int, int, int, int]: + centerX = x * imgWidth + centerY = y * imgHeight + width = w * imgWidth + height = h * imgHeight + + x1 = int(centerX - width / 2) + y1 = int(centerY - height / 2) + x2 = int(centerX + width / 2) + y2 = int(centerY + height / 2) + + return x1, y1, x2, y2 + +def xyxyToXywh(x1: int, y1: int, x2: int, y2: int, imgWidth: int, imgHeight: int) -> Tuple[float, float, float, float]: + width = x2 - x1 + height = y2 - y1 + centerX = x1 + width / 2 + centerY = y1 + height / 2 + + x = centerX / imgWidth + y = centerY / imgHeight + w = width / imgWidth + h = height / imgHeight + + return x, y, w, h + +def visualizeYoloLabel(imgPath: str, labelPath: str, classNames: List[str], savePath: str = None): + img = cv2.imread(imgPath) + + if img is None: + print(f"Error: Could not read image at {imgPath}") + return None + + imgHeight, imgWidth = img.shape[:2] + + colors = [ + (255, 127, 0), + (0, 127, 255), + (255, 165, 0), + (255, 255, 0), + ] + + with open(labelPath, 'r') as f: + lines = f.readlines() + + for line in lines: + parts = line.strip().split() + + if len(parts) < 5: + continue + + classId = int(parts[0]) + x, y, w, h = map(float, parts[1:5]) + + x1, y1, x2, y2 = xywhToXyxy(x, y, w, h, imgWidth, imgHeight) + + color = colors[classId % len(colors)] + className = classNames[classId] + + cv2.rectangle(img, (x1, y1), (x2, y2), color, 2) + + label = className + labelSize, _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1) + + cv2.rectangle( + img, + (x1, y1 - labelSize[1] - 4), + (x1 + labelSize[0], y1), + color, + -1 + ) + + cv2.putText( + img, + label, + (x1, y1 - 2), + cv2.FONT_HERSHEY_SIMPLEX, + 0.5, + (255, 255, 255), + 1 + ) + + if savePath: + cv2.imwrite(savePath, img) + print(f"Saved visualization to {savePath}") + + return img + +def analyzeDataset(datasetDir: str, classNames: List[str]): + datasetDir = Path(datasetDir) + + print("\n" + "=" * 60) + print("Dataset Analysis") + print("=" * 60) + + for split in ['train', 'val']: + labelDir = datasetDir / split / 'labels' + imgDir = datasetDir / split / 'images' + + if not labelDir.exists(): + print(f"\n{split.upper()}: Directory not found") + continue + + labelFiles = list(labelDir.glob('*.txt')) + imgFiles = list(imgDir.glob('*.[jp][pn]g')) if imgDir.exists() else [] + + classCount = {i: 0 for i in range(len(classNames))} + totalBoxes = 0 + boxSizes = [] + imgsWithCones = 0 + + for labelFile in labelFiles: + with open(labelFile, 'r') as f: + lines = f.readlines() + + if len(lines) > 0: + imgsWithCones += 1 + + for line in lines: + parts = line.strip().split() + + if len(parts) >= 5: + classId = int(parts[0]) + w, h = float(parts[3]), float(parts[4]) + + classCount[classId] += 1 + totalBoxes += 1 + boxSizes.append((w, h)) + + print(f"\n{split.upper()}:") + print(f" Images: {len(imgFiles)}") + print(f" Label files: {len(labelFiles)}") + print(f" Images with cones: {imgsWithCones}") + print(f" Total bounding boxes: {totalBoxes}") + + if totalBoxes > 0: + avgBoxesPerImg = totalBoxes / max(1, imgsWithCones) + print(f" Average boxes per image: {avgBoxesPerImg:.2f}") + + print(f"\n Class distribution:") + for classId, count in classCount.items(): + className = classNames[classId] + percentage = (count / totalBoxes * 100) if totalBoxes > 0 else 0 + print(f" {className}: {count} ({percentage:.1f}%)") + + if boxSizes: + widths = [w for w, h in boxSizes] + heights = [h for w, h in boxSizes] + + print(f"\n Bounding box sizes (normalized):") + print(f" Width - Mean: {np.mean(widths):.4f}, Std: {np.std(widths):.4f}") + print(f" Height - Mean: {np.mean(heights):.4f}, Std: {np.std(heights):.4f}") + + print("\n" + "=" * 60) + +def plotClassDistribution(datasetDir: str, classNames: List[str], savePath: str = None): + datasetDir = Path(datasetDir) + + trainClassCount = {i: 0 for i in range(len(classNames))} + valClassCount = {i: 0 for i in range(len(classNames))} + + for split, classCount in [('train', trainClassCount), ('val', valClassCount)]: + labelDir = datasetDir / split / 'labels' + + if not labelDir.exists(): + continue + + labelFiles = list(labelDir.glob('*.txt')) + + for labelFile in labelFiles: + with open(labelFile, 'r') as f: + lines = f.readlines() + + for line in lines: + parts = line.strip().split() + + if len(parts) >= 5: + classId = int(parts[0]) + classCount[classId] += 1 + + x = np.arange(len(classNames)) + width = 0.35 + + fig, ax = plt.subplots(figsize=(10, 6)) + + trainCounts = [trainClassCount[i] for i in range(len(classNames))] + valCounts = [valClassCount[i] for i in range(len(classNames))] + + ax.bar(x - width/2, trainCounts, width, label='Train', color='#3498db') + ax.bar(x + width/2, valCounts, width, label='Validation', color='#e74c3c') + + ax.set_xlabel('Cone Class') + ax.set_ylabel('Count') + ax.set_title('Class Distribution in Dataset') + ax.set_xticks(x) + ax.set_xticklabels(classNames, rotation=45, ha='right') + ax.legend() + ax.grid(axis='y', alpha=0.3) + + plt.tight_layout() + + if savePath: + plt.savefig(savePath, dpi=300, bbox_inches='tight') + print(f"Saved class distribution plot to {savePath}") + + plt.show() + + return fig + +def validateDataset(datasetDir: str): + datasetDir = Path(datasetDir) + + print("\n" + "=" * 60) + print("Dataset Validation") + print("=" * 60) + + issues = [] + + for split in ['train', 'val']: + labelDir = datasetDir / split / 'labels' + imgDir = datasetDir / split / 'images' + + if not labelDir.exists(): + issues.append(f"{split}/labels directory not found") + continue + + if not imgDir.exists(): + issues.append(f"{split}/images directory not found") + continue + + labelFiles = list(labelDir.glob('*.txt')) + imgFiles = {f.stem: f for f in imgDir.glob('*.[jp][pn]g')} + + print(f"\n{split.upper()}:") + + missingImages = 0 + invalidLabels = 0 + + for labelFile in labelFiles: + stem = labelFile.stem + + if stem not in imgFiles: + missingImages += 1 + issues.append(f"Missing image for label: {labelFile.name}") + + with open(labelFile, 'r') as f: + lines = f.readlines() + + for lineNum, line in enumerate(lines, 1): + parts = line.strip().split() + + if len(parts) < 5: + invalidLabels += 1 + issues.append(f"Invalid label format in {labelFile.name} line {lineNum}") + continue + + try: + classId = int(parts[0]) + x, y, w, h = map(float, parts[1:5]) + + if not (0 <= x <= 1 and 0 <= y <= 1 and 0 <= w <= 1 and 0 <= h <= 1): + invalidLabels += 1 + issues.append(f"Out of range values in {labelFile.name} line {lineNum}") + + except ValueError: + invalidLabels += 1 + issues.append(f"Invalid number format in {labelFile.name} line {lineNum}") + + print(f" Total labels: {len(labelFiles)}") + print(f" Missing images: {missingImages}") + print(f" Invalid label lines: {invalidLabels}") + + print("\n" + "=" * 60) + + if issues: + print(f"\nFound {len(issues)} issues") + print("\nFirst 10 issues:") + for issue in issues[:10]: + print(f" - {issue}") + else: + print("\nNo issues found! Dataset is valid.") + + print("=" * 60) + + return len(issues) == 0 + +if __name__ == '__main__': + import argparse + + parser = argparse.ArgumentParser(description='Dataset utilities for YOLO cone detection') + parser.add_argument('--action', type=str, required=True, choices=['analyze', 'validate', 'visualize', 'plot'], + help='Action to perform') + parser.add_argument('--dataset', type=str, default='datasets/cones', help='Path to dataset directory') + parser.add_argument('--classes', type=str, nargs='+', + default=['orange_cone', 'blue_cone', 'large_orange_cone', 'yellow_cone'], + help='Class names') + parser.add_argument('--image', type=str, help='Path to image (for visualize)') + parser.add_argument('--label', type=str, help='Path to label (for visualize)') + parser.add_argument('--output', type=str, help='Output path') + + args = parser.parse_args() + + if args.action == 'analyze': + analyzeDataset(args.dataset, args.classes) + elif args.action == 'validate': + validateDataset(args.dataset) + elif args.action == 'visualize': + if not args.image or not args.label: + print("Error: --image and --label required for visualize action") + else: + visualizeYoloLabel(args.image, args.label, args.classes, args.output) + elif args.action == 'plot': + plotClassDistribution(args.dataset, args.classes, args.output)