Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Bow histogram computation #26

Merged
merged 2 commits into from
Nov 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added src/python/bow/__init__.py
Empty file.
243 changes: 243 additions & 0 deletions src/python/bow/bow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
import numpy as np
import cv2
import argparse
from pathlib import Path

from sklearn import preprocessing
from sklearn.cluster import KMeans
from sklearn.neighbors import KDTree

kDefaultWidth = 640 # px
ovysotska marked this conversation as resolved.
Show resolved Hide resolved
kDefaultClusterSize = 400


def listImagesInFolder(folderPath):
trainImageFiles = list(folderPath.glob("*.jpg"))
trainImageFiles.extend(list(folderPath.glob("*.png")))
return trainImageFiles


def rescaleImageIfNeeded(image):
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: very minor, but you seem to have docs for the other functions and not for these. Maybe add docstring to these too?

"""Rescales the image to have a maximum kDefaultWidth and keeps the aspect ratio.

Args:
image (np.array): image

Returns:
np.array: rescaled or original image
"""
height, width = image.shape
if width > kDefaultWidth:
newHeight = (height * kDefaultWidth) / width
image = cv2.resize(image, (kDefaultWidth, int(newHeight)))
print("Resized image from", height, width, "to", image.shape)
return image


def extractSiftsFromImage(imageFile):
"""Extracts SIFT features from an image

Args:
imageFile (Path): path to the image file

Returns:
list(list(int)): array of descriptors NxD
"""
# Extracts features from an image
image = cv2.imread(imageFile.as_posix(), cv2.IMREAD_GRAYSCALE)
image = rescaleImageIfNeeded(image)
sift = cv2.SIFT_create()
keypoints, descriptors = sift.detectAndCompute(image, None)
return descriptors


def computeIDF(descriptorsPerImage, clusters):
"""Compute inverse document frequence (IDF). IDF in visual BoW context means in how many images does the word occur.

Args:
descriptorsByImages (list(list(1xD)): List of descriptors per image
clusters (np.array): CxD array of clusters (words)
Returns:
np.array: Cx1 occurence of clusters/words in images
"""
clusterOccurenceInImages = [set() for index in range(clusters.shape[0])]
N = len(descriptorsPerImage)
clustersTree = KDTree(clusters)
for imageId in range(len(descriptorsPerImage)):
dist, nearestClusters = clustersTree.query(descriptorsPerImage[imageId], k=1)
for clusterId in nearestClusters.squeeze():
if clusterId < 0 or clusterId >= clusters.shape[0]:
print("Error: cluster ids outside bounds")
continue
clusterOccurenceInImages[clusterId].add(imageId)

# reweight by number of images
clusterOccurence = [0] * clusters.shape[0]
for clusterId in range(len(clusterOccurenceInImages)):
if len(clusterOccurenceInImages[clusterId]) <= 0:
print("WARNING: word", clusterId, "is not represented in any image")
continue
clusterOccurence[clusterId] = N / len(clusterOccurenceInImages[clusterId])
return np.array(clusterOccurence)


def trainVocabulary(imageFiles, outputFile=""):
"""train vocabulary from given image paths

Args:
imageFiles (list(Path)): paths to images
Returns:
(np.array, np.array): A pair of values: CxD array of computed words and Cx1 inverse word occurance

"""
descriptorsPerImage = []
for imageFile in imageFiles:
sifts = extractSiftsFromImage(imageFile)
descriptorsPerImage.append(sifts)

# flatten the descriptors list
descriptors = [
descriptor
for imageDescriptors in descriptorsPerImage
for descriptor in imageDescriptors
]
descriptors = np.array(descriptors)

descriptorsNormalized = preprocessing.normalize(descriptors)
kmeans = KMeans(n_clusters=kDefaultClusterSize, random_state=0, n_init="auto")
kmeans.fit(descriptorsNormalized)
words = kmeans.cluster_centers_

idfs = computeIDF(descriptorsPerImage, words)

if outputFile:
np.savez(outputFile, vocabulary=words, idfs=idfs)
print("Vocabulary was saved to", outputFile)
return words, idfs


def trainVocabularyFromFolder(folderPath, outputFile=""):
return trainVocabulary(listImagesInFolder(folderPath), outputFile)


def getVocabulary(imageTrainFolder, vocabularyFile):
"""Trains a vocabulary from images in imageTrainFolder or loads if the vocabulary exists under vocabularyFile

Args:
imageTrainFolder (Path): path to folder with images to be used for training
vocabularyFile (Path): a file with vocabulary. If file doesn't exists, the new vocabulary will be computed

Returns:
(np.array, np.array) | None: A pair of values: CxD array of computed words and Cx1 inverse word occurance,
or None if it was impossible to read or compute the vocabulary
"""
if vocabularyFile:
if vocabularyFile.exists():
print("Vocabulary exists and will be loaded")
data = np.load(vocabularyFile)
return data["vocabulary"], data["idfs"]
elif imageTrainFolder is None:
print("Vocabulary doesn't exits, please provide images to train on.")
return None
else:
return trainVocabularyFromFolder(imageTrainFolder, vocabularyFile)
elif imageTrainFolder:
return trainVocabularyFromFolder(imageTrainFolder)
else:
print("No vocabulary or image_train data is provided.")
return None


def reweightHistogram(wordOccurences, idfs):
"""Reweight Histogram

Args:
wordOccurences (np.array): Cx1 array
idfs (np.array): Cx1 array, inverse document frequency (idf). How often every word occurres in training database.

Returns:
np.array: Reweigted histogram
"""
totalNumberOfWordOccurences = np.sum(wordOccurences)
reweightedHistogram = np.zeros(wordOccurences.shape)
for idx in range(wordOccurences.shape[0]):
if idx < 0 or idx >= idfs.shape[0]:
print("Error: index is outside the idfs range")
continue
reweightedHistogram[idx] = (
wordOccurences[idx] / totalNumberOfWordOccurences * np.log(idfs[idx])
)
return reweightedHistogram


def computeImageHistogram(imagePath, vocabularyTree, numberOfWords, idfs):
"""Compute histogram of visual word occurence.

Args:
image (Path): Path to an image
vocabularyTree (np.array): Array of words, CxD where C is the number of clusters
numberOfWords (int) : Number of words in vocabulary
idfs (np.array): Cx1 array of "learned" word occurence
"""
wordHistogram = [0] * numberOfWords
descriptors = extractSiftsFromImage(imagePath)
if descriptors is None:
print("Descriptors are empty", descriptors)
return wordHistogram
descriptorsNormalized = preprocessing.normalize(descriptors)

for descriptor in descriptorsNormalized:
dist, wordId = vocabularyTree.query(descriptor.reshape(1, -1), k=1)
wordHistogram[np.squeeze(wordId)] += 1
return reweightHistogram(np.array(wordHistogram), idfs)


def main():
parser = argparse.ArgumentParser("Compute Bag Of visual Words (BoW) with SIFT.")
parser.add_argument("--image_train_dir", required=False, type=Path)
parser.add_argument("--vocabulary_file", required=False, type=Path)
parser.add_argument(
"--images",
required=False,
type=Path,
help="Path to the image directory for which the histograms should be computed.",
)
parser.add_argument(
"--output_file",
required=False,
type=Path,
help="Filename where Bow features will be stored, .csv recommended.",
)

args = parser.parse_args()

vocabulary, idfs = getVocabulary(args.image_train_dir, args.vocabulary_file)

numberOfWords = vocabulary.shape[0]
vocabularyTree = KDTree(vocabulary)
if args.images:
if not args.output_file:
print(
"WARNING: The output file is not specified. The features will not be stored."
)
imagesPath = listImagesInFolder(args.images)
imagesPath = sorted(imagesPath)
# TODO(olga) Make sure that the order is preserved by using a map of something. Would be better even to use the image name
histograms = []
for imagePath in imagesPath:
print("Processing", imagePath)
histogram = computeImageHistogram(
imagePath, vocabularyTree, numberOfWords, idfs
)
histograms.append(histogram)
print("Processing done")
histograms = np.array(histograms)
if args.output_file:
np.savetxt(args.output_file, histograms)
print("Features were saved to", args.output_file)

return


if __name__ == "__main__":
main()
4 changes: 4 additions & 0 deletions src/python/bow/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
numpy==1.24.4
opencv-python==4.8.1.78
scikit-learn==1.3.1
pytest==7.4.2
21 changes: 21 additions & 0 deletions src/python/bow/test_bow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from .bow import computeIDF
from .bow import reweightHistogram

import numpy as np
import pytest


def test_computeIDF():
# 3 images with 2 descriptors of dimension 2
descriptorsPerImage = [[[1, 2], [5, 6]], [[0, 0], [6, 5]], [[0, 0], [10, 9]]]
# 3 clusters
clusters = np.array([[0, 0], [6, 7], [10, 10]])
occurance = computeIDF(descriptorsPerImage, clusters)
np.testing.assert_array_almost_equal(occurance, [1.0, 1.5, 3.0])


def test_reweightHistogram():
wordOccurences = np.array([5, 2, 1, 0, 0])
idfs = 4 / np.array([4, 3, 4, 1, 1])
reweightedHistogram = reweightHistogram(wordOccurences, idfs)
np.testing.assert_array_almost_equal(reweightedHistogram, [0, 0.07192052, 0, 0, 0])
Loading