Source code for macop.algorithms.multi.MOEAD

"""Multi-Ojective Evolutionary Algorithm with Scalar Decomposition algorithm
"""

# main imports
import pkgutil
import logging
import math
import numpy as np
import sys
from ...utils.color import macop_text, macop_line, macop_progress
from ...utils.modules import load_class

# module imports
from ..Algorithm import Algorithm
from .MOSubProblem import MOSubProblem


def moEvaluator(solution, evaluator, weights):

    scores = [eval(solution) for eval in evaluator]

    # associate objectives scores to solution
    solution._scores = scores

    return sum([scores[i] for i, w in enumerate(weights)])


[docs]class MOEAD(Algorithm): """Multi-Ojective Evolutionary Algorithm with Scalar Decomposition Attributes: mu: {int} -- number of sub problems T: {[float]} -- number of neightbors for each sub problem nObjectives: {int} -- number of objectives (based of number evaluator) initalizer: {function} -- basic function strategy to initialize solution evaluator: {[function]} -- list of basic function in order to obtained fitness (multiple objectives) operators: {[Operator]} -- list of operator to use when launching algorithm policy: {Policy} -- Policy class implementation strategy to select operators validator: {function} -- basic function to check if solution is valid or not under some constraints maximise: {bool} -- specify kind of optimisation problem population: [{Solution}] -- population of solution, one for each sub problem pfPop: [{Solution}] -- pareto front population weights: [[{float}]] -- random weights used for custom mu sub problems callbacks: {[Callback]} -- list of Callback class implementation to do some instructions every number of evaluations and `load` when initializing algorithm """ def __init__(self, mu, T, initalizer, evaluator, operators, policy, validator, maximise=True, parent=None): # redefinition of constructor to well use `initRun` method self._initializer = initalizer self._evaluator = evaluator self._operators = operators self._policy = policy self._validator = validator self._callbacks = [] # by default self._numberOfEvaluations = 0 self._maxEvaluations = 0 self._nObjectives = len(evaluator) # other parameters self._parent = parent # parent algorithm if it's sub algorithm #self.maxEvaluations = 0 # by default self._maximise = maximise # track reference of algo into operator (keep an eye into best solution) for operator in self._operators: operator.setAlgo(self) # by default track reference for policy self._policy.setAlgo(self) if mu < T: raise ValueError('`mu` cannot be less than `T`') self._mu = mu self._T = T # initialize neighbors for each sub problem self.setNeighbors() weights = [] if self._nObjectives == 2: for i in range(self._mu): angle = math.pi / 2 * i / (self._mu - 1) weights.append([math.cos(angle), math.sin(angle)]) elif self._nObjectives >= 3: # random weights using uniform for i in range(self._mu): w_i = np.random.uniform(0, 1, self._nObjectives) weights.append(w_i / sum(w_i)) else: raise ValueError('Unvalid number of objectives') self._weights = weights self._subProblems = [] for i in range(self._mu): # compute weight sum from solution sub_evaluator = lambda solution: moEvaluator( solution, evaluator, weights[i]) # intialize each sub problem # use copy of list to keep track for each sub problem subProblem = MOSubProblem(i, weights[i], initalizer, sub_evaluator, operators.copy(), policy, validator, maximise, self) self._subProblems.append(subProblem) self._population = [None for n in range(self._mu)] self._pfPop = [] # ref point based on number of evaluators if self._maximise: self._refPoint = [0 for _ in range(self._nObjectives)] else: self._refPoint = [ sys.float_info.max for _ in range(self._nObjectives) ]
[docs] def initRun(self): """ Method which initialiazes or re-initializes the whole algorithm context specifically for MOEAD """ # initialization is done during run method pass
[docs] def run(self, evaluations): """ Run the local search algorithm Args: evaluations: {int} -- number of Local search evaluations Returns: {Solution} -- best solution found """ # by default use of mother method to initialize variables super().run(evaluations) # enable callback resume for MOEAD self.resume() # initialize each sub problem if no backup for i in range(self._mu): if self._subProblems[i]._bestSolution is None: self._subProblems[i].run(1) self._population[i] = self._subProblems[i]._bestSolution # if no backup for pf population if len(self._pfPop) == 0: for i in range(self._mu): self._pfPop.append(self._subProblems[i]._bestSolution) # MOEAD algorithm implementation while not self.stop(): for i in range(self._mu): # run 1 iteration into sub problem `i` self._subProblems[i].run(1) spBestSolution = self._subProblems[i]._bestSolution self.updateRefPoint(spBestSolution) # for each neighbor of current sub problem update solution if better improvment = False for j in self._neighbors[i]: if spBestSolution.fitness( ) > self._subProblems[j]._bestSolution.fitness(): # create new solution based on current new if better, computes fitness associated to new solution for sub problem class_name = type(spBestSolution).__name__ # dynamically load solution class if unknown if class_name not in sys.modules: load_class(class_name, globals()) newSolution = getattr( globals()['macop.solutions.' + class_name], class_name)(spBestSolution._data, len(spBestSolution._data)) # evaluate solution for new sub problem and update as best solution self._subProblems[j].evaluate(newSolution) self._subProblems[j]._bestSolution = newSolution # update population solution for this sub problem self._population[j] = newSolution improvment = True # add new solution if improvment is idenfity if improvment: self._pfPop.append(spBestSolution) # update pareto front self._pfPop = self.paretoFront(self._pfPop) # add progress here self.progress() # stop algorithm if necessary if self.stop(): break logging.info(f"End of {type(self).__name__}, best solution found {self._population}") self.end() return self._pfPop
[docs] def progress(self): """ Log progress and apply callbacks if necessary """ if len(self._callbacks) > 0: for callback in self._callbacks: callback.run() macop_progress(self.getGlobalEvaluation(), self.getGlobalMaxEvaluation()) logging.info(f"-- {type(self).__name__} evaluation {self._numberOfEvaluations} of {self._maxEvaluations} ({((self._numberOfEvaluations) / self._maxEvaluations * 100.):.2f}%)")
def setNeighbors(self): dmin = dmax = 0 if self._T % 2 == 1: dmin = -int(self._T / 2) dmax = int(self._T / 2) + 1 else: dmin = -int(self._T / 2) + 1 dmax = +self._T / 2 # init neighbord list self._neighbors = [[] for n in range(self._mu)] for direction in range(0, -dmin): for i in range(self._T): self._neighbors[direction].append(i) for direction in range(-dmin, self._mu - dmax): for i in range(direction + dmin, direction + dmax): self._neighbors[direction].append(i) for direction in range(self._mu - dmax, self._mu): for i in range(self._mu - self._T, self._mu): self._neighbors[direction].append(i) def updateRefPoint(self, solution): if self._maximise: for i in range(len(self._evaluator)): if solution._scores[i] > self._refPoint[i]: self._refPoint[i] = solution._scores[i] else: for i in range(len(self._evaluator)): if solution.scores[i] < self._refPoint[i]: self._refPoint[i] = solution._scores[i] def paretoFront(self, population): paFront = [] indexes = [] nObjectives = len(self._evaluator) nSolutions = len(population) # find dominated solution for i in range(nSolutions): for j in range(nSolutions): if j in indexes: continue # check if solutions are the same if all([ population[i]._data[k] == population[j]._data[k] for k in range(len(population[i]._data)) ]): continue nDominated = 0 # check number of dominated objectives of current solution by the others solution for k in range(len(self._evaluator)): if self._maximise: if population[i]._scores[k] < population[j]._scores[k]: nDominated += 1 else: if population[i]._scores[k] > population[j]._scores[k]: nDominated += 1 if nDominated == nObjectives: indexes.append(i) break # store the non dominated solution into pareto front for i in range(nSolutions): if i not in indexes: paFront.append(population[i]) return paFront
[docs] def end(self): """Display end message into `run` method """ print(macop_text(f'({type(self).__name__}) Found after {self._numberOfEvaluations} evaluations')) for i, solution in enumerate(self._pfPop): print(f' - [{i}] {solution._scores} : {solution}') print(macop_line())
def information(self): logging.info("-- Pareto front :") for i, solution in enumerate(self._pfPop): logging.info(f"-- {i}] SCORE {solution._scores} - {solution}") def __str__(self): return f"{type(self).__name__} using {type(self._population).__name__}"