-
Notifications
You must be signed in to change notification settings - Fork 11
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Added python code to compute Bow histograms for images. The code also can train a vocabulary of words. Based on SIFT descriptors
- Loading branch information
Showing
4 changed files
with
268 additions
and
0 deletions.
There are no files selected for viewing
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,243 @@ | ||
import numpy as np | ||
import cv2 | ||
import argparse | ||
from pathlib import Path | ||
|
||
from sklearn import preprocessing | ||
from sklearn.cluster import KMeans | ||
from sklearn.neighbors import KDTree | ||
|
||
kDefaultWidth = 640 # px | ||
kDefaultClusterSize = 400 | ||
|
||
|
||
def listImagesInFolder(folderPath): | ||
trainImageFiles = list(folderPath.glob("*.jpg")) | ||
trainImageFiles.extend(list(folderPath.glob("*.png"))) | ||
return trainImageFiles | ||
|
||
|
||
def rescaleImageIfNeeded(image): | ||
"""Rescales the image to have a maximum kDefaultWidth and keeps the aspect ratio. | ||
Args: | ||
image (np.array): image | ||
Returns: | ||
np.array: rescaled or original image | ||
""" | ||
height, width = image.shape | ||
if width > kDefaultWidth: | ||
newHeight = (height * kDefaultWidth) / width | ||
image = cv2.resize(image, (kDefaultWidth, int(newHeight))) | ||
print("Resized image from", height, width, "to", image.shape) | ||
return image | ||
|
||
|
||
def extractSiftsFromImage(imageFile): | ||
"""Extracts SIFT features from an image | ||
Args: | ||
imageFile (Path): path to the image file | ||
Returns: | ||
list(list(int)): array of descriptors NxD | ||
""" | ||
# Extracts features from an image | ||
image = cv2.imread(imageFile.as_posix(), cv2.IMREAD_GRAYSCALE) | ||
image = rescaleImageIfNeeded(image) | ||
sift = cv2.SIFT_create() | ||
keypoints, descriptors = sift.detectAndCompute(image, None) | ||
return descriptors | ||
|
||
|
||
def computeIDF(descriptorsPerImage, clusters): | ||
"""Compute inverse document frequence (IDF). IDF in visual BoW context means in how many images does the word occur. | ||
Args: | ||
descriptorsByImages (list(list(1xD)): List of descriptors per image | ||
clusters (np.array): CxD array of clusters (words) | ||
Returns: | ||
np.array: Cx1 occurence of clusters/words in images | ||
""" | ||
clusterOccurenceInImages = [set() for index in range(clusters.shape[0])] | ||
N = len(descriptorsPerImage) | ||
clustersTree = KDTree(clusters) | ||
for imageId in range(len(descriptorsPerImage)): | ||
dist, nearestClusters = clustersTree.query(descriptorsPerImage[imageId], k=1) | ||
for clusterId in nearestClusters.squeeze(): | ||
if clusterId < 0 or clusterId >= clusters.shape[0]: | ||
print("Error: cluster ids outside bounds") | ||
continue | ||
clusterOccurenceInImages[clusterId].add(imageId) | ||
|
||
# reweight by number of images | ||
clusterOccurence = [0] * clusters.shape[0] | ||
for clusterId in range(len(clusterOccurenceInImages)): | ||
if len(clusterOccurenceInImages[clusterId]) <= 0: | ||
print("WARNING: word", clusterId, "is not represented in any image") | ||
continue | ||
clusterOccurence[clusterId] = N / len(clusterOccurenceInImages[clusterId]) | ||
return np.array(clusterOccurence) | ||
|
||
|
||
def trainVocabulary(imageFiles, outputFile=""): | ||
"""train vocabulary from given image paths | ||
Args: | ||
imageFiles (list(Path)): paths to images | ||
Returns: | ||
(np.array, np.array): A pair of values: CxD array of computed words and Cx1 inverse word occurance | ||
""" | ||
descriptorsPerImage = [] | ||
for imageFile in imageFiles: | ||
sifts = extractSiftsFromImage(imageFile) | ||
descriptorsPerImage.append(sifts) | ||
|
||
# flatten the descriptors list | ||
descriptors = [ | ||
descriptor | ||
for imageDescriptors in descriptorsPerImage | ||
for descriptor in imageDescriptors | ||
] | ||
descriptors = np.array(descriptors) | ||
|
||
descriptorsNormalized = preprocessing.normalize(descriptors) | ||
kmeans = KMeans(n_clusters=kDefaultClusterSize, random_state=0, n_init="auto") | ||
kmeans.fit(descriptorsNormalized) | ||
words = kmeans.cluster_centers_ | ||
|
||
idfs = computeIDF(descriptorsPerImage, words) | ||
|
||
if outputFile: | ||
np.savez(outputFile, vocabulary=words, idfs=idfs) | ||
print("Vocabulary was saved to", outputFile) | ||
return words, idfs | ||
|
||
|
||
def trainVocabularyFromFolder(folderPath, outputFile=""): | ||
return trainVocabulary(listImagesInFolder(folderPath), outputFile) | ||
|
||
|
||
def getVocabulary(imageTrainFolder, vocabularyFile): | ||
"""Trains a vocabulary from images in imageTrainFolder or loads if the vocabulary exists under vocabularyFile | ||
Args: | ||
imageTrainFolder (Path): path to folder with images to be used for training | ||
vocabularyFile (Path): a file with vocabulary. If file doesn't exists, the new vocabulary will be computed | ||
Returns: | ||
(np.array, np.array) | None: A pair of values: CxD array of computed words and Cx1 inverse word occurance, | ||
or None if it was impossible to read or compute the vocabulary | ||
""" | ||
if vocabularyFile: | ||
if vocabularyFile.exists(): | ||
print("Vocabulary exists and will be loaded") | ||
data = np.load(vocabularyFile) | ||
return data["vocabulary"], data["idfs"] | ||
elif imageTrainFolder is None: | ||
print("Vocabulary doesn't exits, please provide images to train on.") | ||
return None | ||
else: | ||
return trainVocabularyFromFolder(imageTrainFolder, vocabularyFile) | ||
elif imageTrainFolder: | ||
return trainVocabularyFromFolder(imageTrainFolder) | ||
else: | ||
print("No vocabulary or image_train data is provided.") | ||
return None | ||
|
||
|
||
def reweightHistogram(wordOccurences, idfs): | ||
"""Reweight Histogram | ||
Args: | ||
wordOccurences (np.array): Cx1 array | ||
idfs (np.array): Cx1 array, inverse document frequency (idf). How often every word occurres in training database. | ||
Returns: | ||
np.array: Reweigted histogram | ||
""" | ||
totalNumberOfWordOccurences = np.sum(wordOccurences) | ||
reweightedHistogram = np.zeros(wordOccurences.shape) | ||
for idx in range(wordOccurences.shape[0]): | ||
if idx < 0 or idx >= idfs.shape[0]: | ||
print("Error: index is outside the idfs range") | ||
continue | ||
reweightedHistogram[idx] = ( | ||
wordOccurences[idx] / totalNumberOfWordOccurences * np.log(idfs[idx]) | ||
) | ||
return reweightedHistogram | ||
|
||
|
||
def computeImageHistogram(imagePath, vocabularyTree, numberOfWords, idfs): | ||
"""Compute histogram of visual word occurence. | ||
Args: | ||
image (Path): Path to an image | ||
vocabularyTree (np.array): Array of words, CxD where C is the number of clusters | ||
numberOfWords (int) : Number of words in vocabulary | ||
idfs (np.array): Cx1 array of "learned" word occurence | ||
""" | ||
wordHistogram = [0] * numberOfWords | ||
descriptors = extractSiftsFromImage(imagePath) | ||
if descriptors is None: | ||
print("Descriptors are empty", descriptors) | ||
return wordHistogram | ||
descriptorsNormalized = preprocessing.normalize(descriptors) | ||
|
||
for descriptor in descriptorsNormalized: | ||
dist, wordId = vocabularyTree.query(descriptor.reshape(1, -1), k=1) | ||
wordHistogram[np.squeeze(wordId)] += 1 | ||
return reweightHistogram(np.array(wordHistogram), idfs) | ||
|
||
|
||
def main(): | ||
parser = argparse.ArgumentParser("Compute Bag Of visual Words (BoW) with SIFT.") | ||
parser.add_argument("--image_train_dir", required=False, type=Path) | ||
parser.add_argument("--vocabulary_file", required=False, type=Path) | ||
parser.add_argument( | ||
"--images", | ||
required=False, | ||
type=Path, | ||
help="Path to the image directory for which the histograms should be computed.", | ||
) | ||
parser.add_argument( | ||
"--output_file", | ||
required=False, | ||
type=Path, | ||
help="Filename where Bow features will be stored, .csv recommended.", | ||
) | ||
|
||
args = parser.parse_args() | ||
|
||
vocabulary, idfs = getVocabulary(args.image_train_dir, args.vocabulary_file) | ||
|
||
numberOfWords = vocabulary.shape[0] | ||
vocabularyTree = KDTree(vocabulary) | ||
if args.images: | ||
if not args.output_file: | ||
print( | ||
"WARNING: The output file is not specified. The features will not be stored." | ||
) | ||
imagesPath = listImagesInFolder(args.images) | ||
imagesPath = sorted(imagesPath) | ||
# TODO(olga) Make sure that the order is preserved by using a map of something. Would be better even to use the image name | ||
histograms = [] | ||
for imagePath in imagesPath: | ||
print("Processing", imagePath) | ||
histogram = computeImageHistogram( | ||
imagePath, vocabularyTree, numberOfWords, idfs | ||
) | ||
histograms.append(histogram) | ||
print("Processing done") | ||
histograms = np.array(histograms) | ||
if args.output_file: | ||
np.savetxt(args.output_file, histograms) | ||
print("Features were saved to", args.output_file) | ||
|
||
return | ||
|
||
|
||
if __name__ == "__main__": | ||
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
numpy==1.24.4 | ||
opencv-python==4.8.1.78 | ||
scikit-learn==1.3.1 | ||
pytest==7.4.2 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
from .bow import computeIDF | ||
from .bow import reweightHistogram | ||
|
||
import numpy as np | ||
import pytest | ||
|
||
|
||
def test_computeIDF(): | ||
# 3 images with 2 descriptors of dimension 2 | ||
descriptorsPerImage = [[[1, 2], [5, 6]], [[0, 0], [6, 5]], [[0, 0], [10, 9]]] | ||
# 3 clusters | ||
clusters = np.array([[0, 0], [6, 7], [10, 10]]) | ||
occurance = computeIDF(descriptorsPerImage, clusters) | ||
np.testing.assert_array_almost_equal(occurance, [1.0, 1.5, 3.0]) | ||
|
||
|
||
def test_reweightHistogram(): | ||
wordOccurences = np.array([5, 2, 1, 0, 0]) | ||
idfs = 4 / np.array([4, 3, 4, 1, 1]) | ||
reweightedHistogram = reweightHistogram(wordOccurences, idfs) | ||
np.testing.assert_array_almost_equal(reweightedHistogram, [0, 0.07192052, 0, 0, 0]) |