|
@@ -0,0 +1,464 @@
|
|
|
+# main imports
|
|
|
+import numpy as np
|
|
|
+import sys
|
|
|
+
|
|
|
+# image transform imports
|
|
|
+from PIL import Image
|
|
|
+from skimage import color
|
|
|
+from sklearn.decomposition import FastICA
|
|
|
+from sklearn.decomposition import IncrementalPCA
|
|
|
+from sklearn.decomposition import TruncatedSVD
|
|
|
+from numpy.linalg import svd as lin_svd
|
|
|
+from scipy.signal import medfilt2d, wiener, cwt
|
|
|
+import pywt
|
|
|
+import cv2
|
|
|
+
|
|
|
+from ipfml.processing import transform, compression, segmentation
|
|
|
+from ipfml import utils
|
|
|
+
|
|
|
+# modules and config imports
|
|
|
+sys.path.insert(0, '') # trick to enable import of main folder module
|
|
|
+
|
|
|
+import custom_config as cfg
|
|
|
+from modules.utils import data as dt
|
|
|
+
|
|
|
+
|
|
|
+def get_svd_data(data_type, block):
|
|
|
+ """
|
|
|
+ Method which returns the data type expected
|
|
|
+ """
|
|
|
+
|
|
|
+ if data_type == 'lab':
|
|
|
+
|
|
|
+ block_file_path = '/tmp/lab_img.png'
|
|
|
+ block.save(block_file_path)
|
|
|
+ data = transform.get_LAB_L_SVD_s(Image.open(block_file_path))
|
|
|
+
|
|
|
+ if data_type == 'mscn':
|
|
|
+
|
|
|
+ img_mscn_revisited = transform.rgb_to_mscn(block)
|
|
|
+
|
|
|
+ # save tmp as img
|
|
|
+ img_output = Image.fromarray(img_mscn_revisited.astype('uint8'), 'L')
|
|
|
+ mscn_revisited_file_path = '/tmp/mscn_revisited_img.png'
|
|
|
+ img_output.save(mscn_revisited_file_path)
|
|
|
+ img_block = Image.open(mscn_revisited_file_path)
|
|
|
+
|
|
|
+ # extract from temp image
|
|
|
+ data = compression.get_SVD_s(img_block)
|
|
|
+
|
|
|
+ """if data_type == 'mscn':
|
|
|
+
|
|
|
+ img_gray = np.array(color.rgb2gray(np.asarray(block))*255, 'uint8')
|
|
|
+ img_mscn = transform.calculate_mscn_coefficients(img_gray, 7)
|
|
|
+ img_mscn_norm = transform.normalize_2D_arr(img_mscn)
|
|
|
+
|
|
|
+ img_mscn_gray = np.array(img_mscn_norm*255, 'uint8')
|
|
|
+
|
|
|
+ data = compression.get_SVD_s(img_mscn_gray)
|
|
|
+ """
|
|
|
+
|
|
|
+ if data_type == 'low_bits_6':
|
|
|
+
|
|
|
+ low_bits_6 = transform.rgb_to_LAB_L_low_bits(block, 6)
|
|
|
+ data = compression.get_SVD_s(low_bits_6)
|
|
|
+
|
|
|
+ if data_type == 'low_bits_5':
|
|
|
+
|
|
|
+ low_bits_5 = transform.rgb_to_LAB_L_low_bits(block, 5)
|
|
|
+ data = compression.get_SVD_s(low_bits_5)
|
|
|
+
|
|
|
+ if data_type == 'low_bits_4':
|
|
|
+
|
|
|
+ low_bits_4 = transform.rgb_to_LAB_L_low_bits(block, 4)
|
|
|
+ data = compression.get_SVD_s(low_bits_4)
|
|
|
+
|
|
|
+ if data_type == 'low_bits_3':
|
|
|
+
|
|
|
+ low_bits_3 = transform.rgb_to_LAB_L_low_bits(block, 3)
|
|
|
+ data = compression.get_SVD_s(low_bits_3)
|
|
|
+
|
|
|
+ if data_type == 'low_bits_2':
|
|
|
+
|
|
|
+ low_bits_2 = transform.rgb_to_LAB_L_low_bits(block, 2)
|
|
|
+ data = compression.get_SVD_s(low_bits_2)
|
|
|
+
|
|
|
+ if data_type == 'low_bits_4_shifted_2':
|
|
|
+
|
|
|
+ data = compression.get_SVD_s(transform.rgb_to_LAB_L_bits(block, (3, 6)))
|
|
|
+
|
|
|
+ if data_type == 'sub_blocks_stats':
|
|
|
+
|
|
|
+ block = np.asarray(block)
|
|
|
+ width, height, _= block.shape
|
|
|
+ sub_width, sub_height = int(width / 4), int(height / 4)
|
|
|
+
|
|
|
+ sub_blocks = segmentation.divide_in_blocks(block, (sub_width, sub_height))
|
|
|
+
|
|
|
+ data = []
|
|
|
+
|
|
|
+ for sub_b in sub_blocks:
|
|
|
+
|
|
|
+ # by default use the whole lab L canal
|
|
|
+ l_svd_data = np.array(transform.get_LAB_L_SVD_s(sub_b))
|
|
|
+
|
|
|
+ # get information we want from svd
|
|
|
+ data.append(np.mean(l_svd_data))
|
|
|
+ data.append(np.median(l_svd_data))
|
|
|
+ data.append(np.percentile(l_svd_data, 25))
|
|
|
+ data.append(np.percentile(l_svd_data, 75))
|
|
|
+ data.append(np.var(l_svd_data))
|
|
|
+
|
|
|
+ area_under_curve = utils.integral_area_trapz(l_svd_data, dx=100)
|
|
|
+ data.append(area_under_curve)
|
|
|
+
|
|
|
+ # convert into numpy array after computing all stats
|
|
|
+ data = np.asarray(data)
|
|
|
+
|
|
|
+ if data_type == 'sub_blocks_stats_reduced':
|
|
|
+
|
|
|
+ block = np.asarray(block)
|
|
|
+ width, height, _= block.shape
|
|
|
+ sub_width, sub_height = int(width / 4), int(height / 4)
|
|
|
+
|
|
|
+ sub_blocks = segmentation.divide_in_blocks(block, (sub_width, sub_height))
|
|
|
+
|
|
|
+ data = []
|
|
|
+
|
|
|
+ for sub_b in sub_blocks:
|
|
|
+
|
|
|
+ # by default use the whole lab L canal
|
|
|
+ l_svd_data = np.array(transform.get_LAB_L_SVD_s(sub_b))
|
|
|
+
|
|
|
+ # get information we want from svd
|
|
|
+ data.append(np.mean(l_svd_data))
|
|
|
+ data.append(np.median(l_svd_data))
|
|
|
+ data.append(np.percentile(l_svd_data, 25))
|
|
|
+ data.append(np.percentile(l_svd_data, 75))
|
|
|
+ data.append(np.var(l_svd_data))
|
|
|
+
|
|
|
+ # convert into numpy array after computing all stats
|
|
|
+ data = np.asarray(data)
|
|
|
+
|
|
|
+ if data_type == 'sub_blocks_area':
|
|
|
+
|
|
|
+ block = np.asarray(block)
|
|
|
+ width, height, _= block.shape
|
|
|
+ sub_width, sub_height = int(width / 8), int(height / 8)
|
|
|
+
|
|
|
+ sub_blocks = segmentation.divide_in_blocks(block, (sub_width, sub_height))
|
|
|
+
|
|
|
+ data = []
|
|
|
+
|
|
|
+ for sub_b in sub_blocks:
|
|
|
+
|
|
|
+ # by default use the whole lab L canal
|
|
|
+ l_svd_data = np.array(transform.get_LAB_L_SVD_s(sub_b))
|
|
|
+
|
|
|
+ area_under_curve = utils.integral_area_trapz(l_svd_data, dx=50)
|
|
|
+ data.append(area_under_curve)
|
|
|
+
|
|
|
+ # convert into numpy array after computing all stats
|
|
|
+ data = np.asarray(data)
|
|
|
+
|
|
|
+ if data_type == 'sub_blocks_area_normed':
|
|
|
+
|
|
|
+ block = np.asarray(block)
|
|
|
+ width, height, _= block.shape
|
|
|
+ sub_width, sub_height = int(width / 8), int(height / 8)
|
|
|
+
|
|
|
+ sub_blocks = segmentation.divide_in_blocks(block, (sub_width, sub_height))
|
|
|
+
|
|
|
+ data = []
|
|
|
+
|
|
|
+ for sub_b in sub_blocks:
|
|
|
+
|
|
|
+ # by default use the whole lab L canal
|
|
|
+ l_svd_data = np.array(transform.get_LAB_L_SVD_s(sub_b))
|
|
|
+ l_svd_data = utils.normalize_arr(l_svd_data)
|
|
|
+
|
|
|
+ area_under_curve = utils.integral_area_trapz(l_svd_data, dx=50)
|
|
|
+ data.append(area_under_curve)
|
|
|
+
|
|
|
+ # convert into numpy array after computing all stats
|
|
|
+ data = np.asarray(data)
|
|
|
+
|
|
|
+ if data_type == 'mscn_var_4':
|
|
|
+
|
|
|
+ data = _get_mscn_variance(block, (100, 100))
|
|
|
+
|
|
|
+ if data_type == 'mscn_var_16':
|
|
|
+
|
|
|
+ data = _get_mscn_variance(block, (50, 50))
|
|
|
+
|
|
|
+ if data_type == 'mscn_var_64':
|
|
|
+
|
|
|
+ data = _get_mscn_variance(block, (25, 25))
|
|
|
+
|
|
|
+ if data_type == 'mscn_var_16_max':
|
|
|
+
|
|
|
+ data = _get_mscn_variance(block, (50, 50))
|
|
|
+ data = np.asarray(data)
|
|
|
+ size = int(len(data) / 4)
|
|
|
+ indices = data.argsort()[-size:][::-1]
|
|
|
+ data = data[indices]
|
|
|
+
|
|
|
+ if data_type == 'mscn_var_64_max':
|
|
|
+
|
|
|
+ data = _get_mscn_variance(block, (25, 25))
|
|
|
+ data = np.asarray(data)
|
|
|
+ size = int(len(data) / 4)
|
|
|
+ indices = data.argsort()[-size:][::-1]
|
|
|
+ data = data[indices]
|
|
|
+
|
|
|
+ if data_type == 'ica_diff':
|
|
|
+ current_image = transform.get_LAB_L(block)
|
|
|
+
|
|
|
+ ica = FastICA(n_components=50)
|
|
|
+ ica.fit(current_image)
|
|
|
+
|
|
|
+ image_ica = ica.fit_transform(current_image)
|
|
|
+ image_restored = ica.inverse_transform(image_ica)
|
|
|
+
|
|
|
+ final_image = utils.normalize_2D_arr(image_restored)
|
|
|
+ final_image = np.array(final_image * 255, 'uint8')
|
|
|
+
|
|
|
+ sv_values = utils.normalize_arr(compression.get_SVD_s(current_image))
|
|
|
+ ica_sv_values = utils.normalize_arr(compression.get_SVD_s(final_image))
|
|
|
+
|
|
|
+ data = abs(np.array(sv_values) - np.array(ica_sv_values))
|
|
|
+
|
|
|
+ if data_type == 'svd_trunc_diff':
|
|
|
+
|
|
|
+ current_image = transform.get_LAB_L(block)
|
|
|
+
|
|
|
+ svd = TruncatedSVD(n_components=30, n_iter=100, random_state=42)
|
|
|
+ transformed_image = svd.fit_transform(current_image)
|
|
|
+ restored_image = svd.inverse_transform(transformed_image)
|
|
|
+
|
|
|
+ reduced_image = (current_image - restored_image)
|
|
|
+
|
|
|
+ U, s, V = compression.get_SVD(reduced_image)
|
|
|
+ data = s
|
|
|
+
|
|
|
+ if data_type == 'ipca_diff':
|
|
|
+
|
|
|
+ current_image = transform.get_LAB_L(block)
|
|
|
+
|
|
|
+ transformer = IncrementalPCA(n_components=20, batch_size=25)
|
|
|
+ transformed_image = transformer.fit_transform(current_image)
|
|
|
+ restored_image = transformer.inverse_transform(transformed_image)
|
|
|
+
|
|
|
+ reduced_image = (current_image - restored_image)
|
|
|
+
|
|
|
+ U, s, V = compression.get_SVD(reduced_image)
|
|
|
+ data = s
|
|
|
+
|
|
|
+ if data_type == 'svd_reconstruct':
|
|
|
+
|
|
|
+ reconstructed_interval = (90, 200)
|
|
|
+ begin, end = reconstructed_interval
|
|
|
+
|
|
|
+ lab_img = transform.get_LAB_L(block)
|
|
|
+ lab_img = np.array(lab_img, 'uint8')
|
|
|
+
|
|
|
+ U, s, V = lin_svd(lab_img, full_matrices=True)
|
|
|
+
|
|
|
+ smat = np.zeros((end-begin, end-begin), dtype=complex)
|
|
|
+ smat[:, :] = np.diag(s[begin:end])
|
|
|
+ output_img = np.dot(U[:, begin:end], np.dot(smat, V[begin:end, :]))
|
|
|
+
|
|
|
+ output_img = np.array(output_img, 'uint8')
|
|
|
+
|
|
|
+ data = compression.get_SVD_s(output_img)
|
|
|
+
|
|
|
+ if 'sv_std_filters' in data_type:
|
|
|
+
|
|
|
+ # convert into lab by default to apply filters
|
|
|
+ lab_img = transform.get_LAB_L(block)
|
|
|
+ arr = np.array(lab_img)
|
|
|
+ images = []
|
|
|
+
|
|
|
+ # Apply list of filter on arr
|
|
|
+ images.append(medfilt2d(arr, [3, 3]))
|
|
|
+ images.append(medfilt2d(arr, [5, 5]))
|
|
|
+ images.append(wiener(arr, [3, 3]))
|
|
|
+ images.append(wiener(arr, [5, 5]))
|
|
|
+
|
|
|
+ # By default computation of current block image
|
|
|
+ s_arr = compression.get_SVD_s(arr)
|
|
|
+ sv_vector = [s_arr]
|
|
|
+
|
|
|
+ # for each new image apply SVD and get SV
|
|
|
+ for img in images:
|
|
|
+ s = compression.get_SVD_s(img)
|
|
|
+ sv_vector.append(s)
|
|
|
+
|
|
|
+ sv_array = np.array(sv_vector)
|
|
|
+
|
|
|
+ _, len = sv_array.shape
|
|
|
+
|
|
|
+ sv_std = []
|
|
|
+
|
|
|
+ # normalize each SV vectors and compute standard deviation for each sub vectors
|
|
|
+ for i in range(len):
|
|
|
+ sv_array[:, i] = utils.normalize_arr(sv_array[:, i])
|
|
|
+ sv_std.append(np.std(sv_array[:, i]))
|
|
|
+
|
|
|
+ indices = []
|
|
|
+
|
|
|
+ if 'lowest' in data_type:
|
|
|
+ indices = utils.get_indices_of_lowest_values(sv_std, 200)
|
|
|
+
|
|
|
+ if 'highest' in data_type:
|
|
|
+ indices = utils.get_indices_of_highest_values(sv_std, 200)
|
|
|
+
|
|
|
+ # data are arranged following std trend computed
|
|
|
+ data = s_arr[indices]
|
|
|
+
|
|
|
+ # with the use of wavelet
|
|
|
+ if 'wave_sv_std_filters' in data_type:
|
|
|
+
|
|
|
+ # convert into lab by default to apply filters
|
|
|
+ lab_img = transform.get_LAB_L(block)
|
|
|
+ arr = np.array(lab_img)
|
|
|
+ images = []
|
|
|
+
|
|
|
+ # Apply list of filter on arr
|
|
|
+ images.append(medfilt2d(arr, [3, 3]))
|
|
|
+ images.append(medfilt2d(arr, [5, 5]))
|
|
|
+ images.append(medfilt2d(arr, [7, 7]))
|
|
|
+ images.append(wiener(arr, [3, 3]))
|
|
|
+ images.append(wiener(arr, [4, 4]))
|
|
|
+ images.append(wiener(arr, [5, 5]))
|
|
|
+ images.append(w2d(arr, 'haar', 2))
|
|
|
+ images.append(w2d(arr, 'haar', 3))
|
|
|
+ images.append(w2d(arr, 'haar', 4))
|
|
|
+
|
|
|
+ # By default computation of current block image
|
|
|
+ s_arr = compression.get_SVD_s(arr)
|
|
|
+ sv_vector = [s_arr]
|
|
|
+
|
|
|
+ # for each new image apply SVD and get SV
|
|
|
+ for img in images:
|
|
|
+ s = compression.get_SVD_s(img)
|
|
|
+ sv_vector.append(s)
|
|
|
+
|
|
|
+ sv_array = np.array(sv_vector)
|
|
|
+
|
|
|
+ _, len = sv_array.shape
|
|
|
+
|
|
|
+ sv_std = []
|
|
|
+
|
|
|
+ # normalize each SV vectors and compute standard deviation for each sub vectors
|
|
|
+ for i in range(len):
|
|
|
+ sv_array[:, i] = utils.normalize_arr(sv_array[:, i])
|
|
|
+ sv_std.append(np.std(sv_array[:, i]))
|
|
|
+
|
|
|
+ indices = []
|
|
|
+
|
|
|
+ if 'lowest' in data_type:
|
|
|
+ indices = utils.get_indices_of_lowest_values(sv_std, 200)
|
|
|
+
|
|
|
+ if 'highest' in data_type:
|
|
|
+ indices = utils.get_indices_of_highest_values(sv_std, 200)
|
|
|
+
|
|
|
+ # data are arranged following std trend computed
|
|
|
+ data = s_arr[indices]
|
|
|
+
|
|
|
+ if 'filters_statistics' in data_type:
|
|
|
+
|
|
|
+ img_width, img_height = 200, 200
|
|
|
+
|
|
|
+ lab_img = transform.get_LAB_L(block)
|
|
|
+ arr = np.array(lab_img)
|
|
|
+
|
|
|
+ # compute all filters statistics
|
|
|
+ def get_stats(arr, I_filter):
|
|
|
+
|
|
|
+ e1 = np.abs(arr - I_filter)
|
|
|
+ L = np.array(e1)
|
|
|
+ mu0 = np.mean(L)
|
|
|
+ A = L - mu0
|
|
|
+ H = A * A
|
|
|
+ E = np.sum(H) / (img_width * img_height)
|
|
|
+ P = np.sqrt(E)
|
|
|
+
|
|
|
+ return mu0, P
|
|
|
+
|
|
|
+ stats = []
|
|
|
+
|
|
|
+ kernel = np.ones((3,3),np.float32)/9
|
|
|
+ stats.append(get_stats(arr, cv2.filter2D(arr,-1,kernel)))
|
|
|
+
|
|
|
+ kernel = np.ones((5,5),np.float32)/25
|
|
|
+ stats.append(get_stats(arr, cv2.filter2D(arr,-1,kernel)))
|
|
|
+
|
|
|
+ stats.append(get_stats(arr, cv2.GaussianBlur(arr, (3, 3), 0.5)))
|
|
|
+
|
|
|
+ stats.append(get_stats(arr, cv2.GaussianBlur(arr, (3, 3), 1)))
|
|
|
+
|
|
|
+ stats.append(get_stats(arr, cv2.GaussianBlur(arr, (3, 3), 1.5)))
|
|
|
+
|
|
|
+ stats.append(get_stats(arr, cv2.GaussianBlur(arr, (5, 5), 0.5)))
|
|
|
+
|
|
|
+ stats.append(get_stats(arr, cv2.GaussianBlur(arr, (5, 5), 1)))
|
|
|
+
|
|
|
+ stats.append(get_stats(arr, cv2.GaussianBlur(arr, (5, 5), 1.5)))
|
|
|
+
|
|
|
+ stats.append(get_stats(arr, medfilt2d(arr, [3, 3])))
|
|
|
+
|
|
|
+ stats.append(get_stats(arr, medfilt2d(arr, [5, 5])))
|
|
|
+
|
|
|
+ stats.append(get_stats(arr, wiener(arr, [3, 3])))
|
|
|
+
|
|
|
+ stats.append(get_stats(arr, wiener(arr, [5, 5])))
|
|
|
+
|
|
|
+ wave = w2d(arr, 'db1', 2)
|
|
|
+ stats.append(get_stats(arr, np.array(wave, 'float64')))
|
|
|
+
|
|
|
+ data = []
|
|
|
+
|
|
|
+ for stat in stats:
|
|
|
+ data.append(stat[0])
|
|
|
+
|
|
|
+ for stat in stats:
|
|
|
+ data.append(stat[1])
|
|
|
+
|
|
|
+ data = np.array(data)
|
|
|
+
|
|
|
+ return data
|
|
|
+
|
|
|
+
|
|
|
+def w2d(arr, mode='haar', level=1):
|
|
|
+ #convert to float
|
|
|
+ imArray = arr
|
|
|
+ np.divide(imArray, 255)
|
|
|
+
|
|
|
+ # compute coefficients
|
|
|
+ coeffs=pywt.wavedec2(imArray, mode, level=level)
|
|
|
+
|
|
|
+ #Process Coefficients
|
|
|
+ coeffs_H=list(coeffs)
|
|
|
+ coeffs_H[0] *= 0
|
|
|
+
|
|
|
+ # reconstruction
|
|
|
+ imArray_H = pywt.waverec2(coeffs_H, mode)
|
|
|
+ imArray_H *= 255
|
|
|
+ imArray_H = np.uint8(imArray_H)
|
|
|
+
|
|
|
+ return imArray_H
|
|
|
+
|
|
|
+def _get_mscn_variance(block, sub_block_size=(50, 50)):
|
|
|
+
|
|
|
+ blocks = segmentation.divide_in_blocks(block, sub_block_size)
|
|
|
+
|
|
|
+ data = []
|
|
|
+
|
|
|
+ for block in blocks:
|
|
|
+ mscn_coefficients = transform.get_mscn_coefficients(block)
|
|
|
+ flat_coeff = mscn_coefficients.flatten()
|
|
|
+ data.append(np.var(flat_coeff))
|
|
|
+
|
|
|
+ return np.sort(data)
|
|
|
+
|