MOEAD.py 11 KB


  1. """Multi-Ojective Evolutionary Algorithm with Scalar Decomposition algorithm
  2. """
  3. # main imports
  4. import pkgutil
  5. import logging
  6. import math
  7. import numpy as np
  8. import sys
  9. from ...utils.color import macop_text, macop_line, macop_progress
  10. # module imports
  11. from ..Algorithm import Algorithm
  12. from .MOSubProblem import MOSubProblem
  13. # import all available solutions
  14. for loader, module_name, is_pkg in pkgutil.walk_packages(
  15. path=['macop/solutions'], prefix='macop.solutions.'):
  16. _module = loader.find_module(module_name).load_module(module_name)
  17. globals()[module_name] = _module
  18. def moEvaluator(_solution, _evaluator, _weights):
  19. scores = [eval(_solution) for eval in _evaluator]
  20. # associate objectives scores to solution
  21. _solution.scores = scores
  22. return sum([scores[i] for i, w in enumerate(_weights)])
  23. class MOEAD(Algorithm):
  24. """Multi-Ojective Evolutionary Algorithm with Scalar Decomposition
  25. Attributes:
  26. mu: {int} -- number of sub problems
  27. T: {[float]} -- number of neightbors for each sub problem
  28. nObjectives: {int} -- number of objectives (based of number evaluator)
  29. initalizer: {function} -- basic function strategy to initialize solution
  30. evaluator: {[function]} -- list of basic function in order to obtained fitness (multiple objectives)
  31. operators: {[Operator]} -- list of operator to use when launching algorithm
  32. policy: {Policy} -- Policy class implementation strategy to select operators
  33. validator: {function} -- basic function to check if solution is valid or not under some constraints
  34. maximise: {bool} -- specify kind of optimization problem
  35. population: [{Solution}] -- population of solution, one for each sub problem
  36. pfPop: [{Solution}] -- pareto front population
  37. weights: [[{float}]] -- random weights used for custom mu sub problems
  38. callbacks: {[Callback]} -- list of Callback class implementation to do some instructions every number of evaluations and `load` when initializing algorithm
  39. """
  40. def __init__(self,
  41. _mu,
  42. _T,
  43. _initalizer,
  44. _evaluator,
  45. _operators,
  46. _policy,
  47. _validator,
  48. _maximise=True,
  49. _parent=None):
  50. # redefinition of constructor to well use `initRun` method
  51. self.initializer = _initalizer
  52. self.evaluator = _evaluator
  53. self.operators = _operators
  54. self.policy = _policy
  55. self.validator = _validator
  56. self.callbacks = []
  57. # by default
  58. self.numberOfEvaluations = 0
  59. self.maxEvaluations = 0
  60. self.nObjectives = len(_evaluator)
  61. # other parameters
  62. self.parent = _parent # parent algorithm if it's sub algorithm
  63. #self.maxEvaluations = 0 # by default
  64. self.maximise = _maximise
  65. # track reference of algo into operator (keep an eye into best solution)
  66. for operator in self.operators:
  67. operator.setAlgo(self)
  68. # also track reference for policy
  69. self.policy.setAlgo(self)
  70. if _mu < _T:
  71. raise ValueError('`mu` cannot be less than `T`')
  72. self.mu = _mu
  73. self.T = _T
  74. # initialize neighbors for each sub problem
  75. self.setNeighbors()
  76. weights = []
  77. if self.nObjectives == 2:
  78. for i in range(self.mu):
  79. angle = math.pi / 2 * i / (self.mu - 1)
  80. weights.append([math.cos(angle), math.sin(angle)])
  81. elif self.nObjectives >= 3:
  82. # random weights using uniform
  83. for i in range(self.mu):
  84. w_i = np.random.uniform(0, 1, self.nObjectives)
  85. weights.append(w_i / sum(w_i))
  86. else:
  87. raise ValueError('Unvalid number of objectives')
  88. self.weights = weights
  89. self.subProblems = []
  90. for i in range(self.mu):
  91. # compute weight sum from solution
  92. sub_evaluator = lambda _solution: moEvaluator(
  93. _solution, _evaluator, weights[i])
  94. # intialize each sub problem
  95. subProblem = MOSubProblem(i, weights[i], _initalizer,
  96. sub_evaluator, _operators, _policy,
  97. _validator, _maximise, self)
  98. self.subProblems.append(subProblem)
  99. self.population = [None for n in range(self.mu)]
  100. self.pfPop = []
  101. # ref point based on number of evaluators
  102. if self.maximise:
  103. self.refPoint = [0 for _ in range(self.nObjectives)]
  104. else:
  105. self.refPoint = [
  106. sys.float_info.max for _ in range(self.nObjectives)
  107. ]
  108. def initRun(self):
  109. """
  110. Method which initialiazes or re-initializes the whole algorithm context specifically for MOEAD
  111. """
  112. # initialization is done during run method
  113. pass
  114. def run(self, _evaluations):
  115. """
  116. Run the local search algorithm
  117. Args:
  118. _evaluations: {int} -- number of Local search evaluations
  119. Returns:
  120. {Solution} -- best solution found
  121. """
  122. # by default use of mother method to initialize variables
  123. super().run(_evaluations)
  124. # initialize each sub problem
  125. for i in range(self.mu):
  126. self.subProblems[i].run(1)
  127. self.population[i] = self.subProblems[i].bestSolution
  128. self.pfPop.append(self.subProblems[i].bestSolution)
  129. # enable callback resume for MOEAD
  130. self.resume()
  131. # MOEAD algorithm implementation
  132. while not self.stop():
  133. for i in range(self.mu):
  134. # run 1 iteration into sub problem `i`
  135. self.subProblems[i].run(1)
  136. spBestSolution = self.subProblems[i].bestSolution
  137. self.updateRefPoint(spBestSolution)
  138. # for each neighbor of current sub problem update solution if better
  139. improvment = False
  140. for j in self.neighbors[i]:
  141. if spBestSolution.fitness(
  142. ) > self.subProblems[j].bestSolution.fitness():
  143. # create new solution based on current new if better, computes fitness associated to new solution for sub problem
  144. class_name = type(spBestSolution).__name__
  145. newSolution = getattr(
  146. globals()['macop.solutions.' + class_name],
  147. class_name)(spBestSolution.data,
  148. len(spBestSolution.data))
  149. # evaluate solution for new sub problem and update as best solution
  150. self.subProblems[j].evaluate(newSolution)
  151. self.subProblems[j].bestSolution = newSolution
  152. # update population solution for this sub problem
  153. self.population[j] = newSolution
  154. improvment = True
  155. # add new solution if improvment is idenfity
  156. if improvment:
  157. self.pfPop.append(spBestSolution)
  158. # update pareto front
  159. self.pfPop = self.paretoFront(self.pfPop)
  160. # add progress here
  161. self.progress()
  162. # stop algorithm if necessary
  163. if self.stop():
  164. break
  165. logging.info("End of %s, best solution found %s" %
  166. (type(self).__name__, self.population))
  167. self.end()
  168. return self.pfPop
  169. def progress(self):
  170. """
  171. Log progress and apply callbacks if necessary
  172. """
  173. if len(self.callbacks) > 0:
  174. for callback in self.callbacks:
  175. callback.run()
  176. macop_progress(self.getGlobalEvaluation(),
  177. self.getGlobalMaxEvaluation())
  178. logging.info(
  179. "-- %s evaluation %s of %s (%s%%)" %
  180. (type(self).__name__, self.numberOfEvaluations,
  181. self.maxEvaluations, "{0:.2f}".format(
  182. (self.numberOfEvaluations) / self.maxEvaluations * 100.)))
  183. def setNeighbors(self):
  184. dmin = dmax = 0
  185. if self.T % 2 == 1:
  186. dmin = -int(self.T / 2)
  187. dmax = int(self.T / 2) + 1
  188. else:
  189. dmin = -int(self.T / 2) + 1
  190. dmax = +self.T / 2
  191. # init neighbord list
  192. self.neighbors = [[] for n in range(self.mu)]
  193. for direction in range(0, -dmin):
  194. for i in range(self.T):
  195. self.neighbors[direction].append(i)
  196. for direction in range(-dmin, self.mu - dmax):
  197. for i in range(direction + dmin, direction + dmax):
  198. self.neighbors[direction].append(i)
  199. for direction in range(self.mu - dmax, self.mu):
  200. for i in range(self.mu - self.T, self.mu):
  201. self.neighbors[direction].append(i)
  202. def updateRefPoint(self, _solution):
  203. if self.maximise:
  204. for i in range(len(self.evaluator)):
  205. if _solution.scores[i] > self.refPoint[i]:
  206. self.refPoint[i] = _solution.scores[i]
  207. else:
  208. for i in range(len(self.evaluator)):
  209. if _solution.scores[i] < self.refPoint[i]:
  210. self.refPoint[i] = _solution.scores[i]
  211. def paretoFront(self, _population):
  212. paFront = []
  213. indexes = []
  214. nObjectives = len(self.evaluator)
  215. nSolutions = len(_population)
  216. # find dominated solution
  217. for i in range(nSolutions):
  218. for j in range(nSolutions):
  219. if j in indexes:
  220. continue
  221. nDominated = 0
  222. # check number of dominated objectives of current solution by the others solution
  223. for k in range(len(self.evaluator)):
  224. if self.maximise:
  225. if _population[i].scores[k] < _population[j].scores[k]:
  226. nDominated += 1
  227. else:
  228. if _population[i].scores[k] > _population[j].scores[k]:
  229. nDominated += 1
  230. if nDominated == nObjectives:
  231. indexes.append(i)
  232. break
  233. # store the non dominated solution into pareto front
  234. for i in range(nSolutions):
  235. if i not in indexes:
  236. paFront.append(_population[i])
  237. return paFront
  238. def end(self):
  239. """Display end message into `run` method
  240. """
  241. print(
  242. macop_text('({}) Found after {} evaluations'.format(
  243. type(self).__name__, self.numberOfEvaluations)))
  244. for i, solution in enumerate(self.pfPop):
  245. print(' - [{}] {} : {}'.format(i, solution.scores, solution))
  246. print(macop_line())
  247. def information(self):
  248. logging.info("-- Pareto front :")
  249. for i, solution in enumerate(self.pfPop):
  250. logging.info("-- %s] SCORE %s - %s" %
  251. (i, solution.scores, solution))
  252. def __str__(self):
  253. return "%s using %s" % (type(self).__name__, type(
  254. self.population).__name__)