123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192 |
- # main imports
- import os, sys
- import argparse
- import json
- import numpy as np
- import shutil
- # PNG images
- from PIL import Image
- # others import
- from ipfml import utils
- from scipy.signal import savgol_filter
- '''
- Display progress information as progress bar
- '''
- def write_progress(progress):
- barWidth = 180
- output_str = "["
- pos = barWidth * progress
- for i in range(barWidth):
- if i < pos:
- output_str = output_str + "="
- elif i == pos:
- output_str = output_str + ">"
- else:
- output_str = output_str + " "
- output_str = output_str + "] " + str(int(progress * 100.0)) + " %\r"
- print(output_str)
- sys.stdout.write("\033[F")
- def extract_index(filepath):
- return int(filepath.split('_')[-1].split('.')[0])
- def extracts_linear_indices(images_path, n_expected=50, indices_step=20, start_at=20, smooth_arr=False):
- # TODO : check this part
- default_add = start_at - indices_step
-
- # extract variance for each image path
- var_arr = []
- n_counter = 0
- n_images = len(images_path)
- for p in sorted(images_path):
- img = Image.open(p)
- var_arr.append(np.var(img))
- n_counter += 1
- write_progress((n_counter + 1) / n_images)
-
- # normalize variance values
- norm_arr = np.array(utils.normalize_arr_with_range(var_arr))
-
- if smooth_arr:
- norm_arr = utils.normalize_arr_with_range(savgol_filter(norm_arr, 201, 3)) # window size 7, polynomial order 3
-
- # get expected linear step (using n_expectec output images)
- linear_steps = utils.normalize_arr_with_range((1 - (np.arange(n_expected) / n_expected)))
-
- # get image indices from variance convergence and linear
- # => when linear step is reached we store the index found from variance values
- indices_found = []
- for i in linear_steps:
-
- find_index = 0
-
- for index, y in enumerate(norm_arr):
- if i <= y:
- find_index = index
- indices_found.append(find_index + 1)
- indices = np.array(indices_found) * indices_step
-
- # add tricks to avoid same indice
- # => when index is same as previous, then add number of samples expected by step
- # Example with step of 20 : [20, 20, 20, 100, 200] => [20, 40, 60, 100, 200]
- final_indices = []
- for index, i in enumerate(indices):
- value = indices[index]
- if index > 0:
- if i <= indices[index - 1]:
- value = indices[index - 1] + indices_step
- indices[index] = value
- final_indices.append(value)
-
- return np.array(final_indices) + default_add
- def main():
- """
- main function which is ran when launching script
- """
- parser = argparse.ArgumentParser(description="Compute new dataset scene")
- parser.add_argument('--file', type=str, help='file data extracted from `utils/extract_stats_freq_and_min.py` script', required=True)
- parser.add_argument('--png_folder', type=str, help='png dataset folder with scene', required=True)
- parser.add_argument('--users', type=int, help='min number of users required per scene', required=True, default=10)
- #parser.add_argument('--samples', type=int, help='expected samples to get for this dataset', required=True, default=10000)
- parser.add_argument('--output', type=str, help='output image folder', required=True)
- args = parser.parse_args()
- p_file = args.file
- p_png_folder = args.png_folder
- p_users = args.users
- #p_samples = args.samples
- p_output = args.output
- with open(p_file, 'r') as f:
- for line in f.readlines():
- data = line.split(';')
- scene = data[0]
- n_users = int(data[1])
- min_index = int(data[2])
- # remove _partX from scene name
- scene_parts = scene.split('_')
- del scene_parts[-1]
- scene_name = '_'.join(scene_parts)
- output_scene_dir = os.path.join(p_output, scene)
- if os.path.exists(output_scene_dir):
- print('Extraction of custom indices already done for', scene)
- continue
- if n_users >= p_users:
- print('Extract custom indices based on minimum index for', scene)
- png_folder_scene = os.path.join(p_png_folder, scene)
- if not os.path.exists(png_folder_scene):
- print(png_folder_scene, 'png folder does not exist')
- else:
-
- # get all rawls files
- png_files = [ os.path.join(png_folder_scene, p) for p in sorted(os.listdir(png_folder_scene)) ]
- # extract max samples found for this scene
- _, filename = os.path.split(png_files[-1])
- max_samples = extract_index(filename)
- # extract step from these files
- input_step = int(max_samples / len(png_files))
- # get indices using min index
- indices = extracts_linear_indices(png_files[int(min_index / input_step):], n_expected=50, indices_step=input_step, start_at=min_index, smooth_arr=True)
- # print('Indices found are', indices)
- # create output directory
- if not os.path.exists(output_scene_dir):
- os.makedirs(output_scene_dir)
- # get expected png image and move it
- for index in indices:
-
- str_index = str(index)
- while len(str_index) < 5:
- str_index = "0" + str_index
- image_name = scene_name + '_' + str_index + '.png'
- png_image_path = os.path.join(png_folder_scene, image_name)
- # create output filepath
- output_img_filepath = os.path.join(output_scene_dir, image_name)
- # copy expected image path
- shutil.copy2(png_image_path, output_img_filepath)
- else:
- print('Only', n_users, 'users who passed the experiment for', scene)
-
- print('\n---------------------------------------------')
-
- if __name__ == "__main__":
- main()
|