123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340 |
- """Multi-Ojective Evolutionary Algorithm with Scalar Decomposition algorithm
- """
- # main imports
- import pkgutil
- import logging
- import math
- import numpy as np
- import sys
- from ...utils.color import macop_text, macop_line, macop_progress
- # module imports
- from ..Algorithm import Algorithm
- from .MOSubProblem import MOSubProblem
- # import all available solutions
- for loader, module_name, is_pkg in pkgutil.walk_packages(
- path=['macop/solutions'], prefix='macop.solutions.'):
- _module = loader.find_module(module_name).load_module(module_name)
- globals()[module_name] = _module
- def moEvaluator(_solution, _evaluator, _weights):
- scores = [eval(_solution) for eval in _evaluator]
- # associate objectives scores to solution
- _solution.scores = scores
- return sum([scores[i] for i, w in enumerate(_weights)])
- class MOEAD(Algorithm):
- """Multi-Ojective Evolutionary Algorithm with Scalar Decomposition
- Attributes:
- mu: {int} -- number of sub problems
- T: {[float]} -- number of neightbors for each sub problem
- nObjectives: {int} -- number of objectives (based of number evaluator)
- initalizer: {function} -- basic function strategy to initialize solution
- evaluator: {[function]} -- list of basic function in order to obtained fitness (multiple objectives)
- operators: {[Operator]} -- list of operator to use when launching algorithm
- policy: {Policy} -- Policy class implementation strategy to select operators
- validator: {function} -- basic function to check if solution is valid or not under some constraints
- maximise: {bool} -- specify kind of optimization problem
- population: [{Solution}] -- population of solution, one for each sub problem
- pfPop: [{Solution}] -- pareto front population
- weights: [[{float}]] -- random weights used for custom mu sub problems
- callbacks: {[Callback]} -- list of Callback class implementation to do some instructions every number of evaluations and `load` when initializing algorithm
- """
- def __init__(self,
- _mu,
- _T,
- _initalizer,
- _evaluator,
- _operators,
- _policy,
- _validator,
- _maximise=True,
- _parent=None):
- # redefinition of constructor to well use `initRun` method
- self.initializer = _initalizer
- self.evaluator = _evaluator
- self.operators = _operators
- self.policy = _policy
- self.validator = _validator
- self.callbacks = []
- # by default
- self.numberOfEvaluations = 0
- self.maxEvaluations = 0
- self.nObjectives = len(_evaluator)
- # other parameters
- self.parent = _parent # parent algorithm if it's sub algorithm
- #self.maxEvaluations = 0 # by default
- self.maximise = _maximise
- # track reference of algo into operator (keep an eye into best solution)
- for operator in self.operators:
- operator.setAlgo(self)
- # also track reference for policy
- self.policy.setAlgo(self)
- if _mu < _T:
- raise ValueError('`mu` cannot be less than `T`')
- self.mu = _mu
- self.T = _T
- # initialize neighbors for each sub problem
- self.setNeighbors()
- weights = []
- if self.nObjectives == 2:
- for i in range(self.mu):
- angle = math.pi / 2 * i / (self.mu - 1)
- weights.append([math.cos(angle), math.sin(angle)])
- elif self.nObjectives >= 3:
- # random weights using uniform
- for i in range(self.mu):
- w_i = np.random.uniform(0, 1, self.nObjectives)
- weights.append(w_i / sum(w_i))
- else:
- raise ValueError('Unvalid number of objectives')
- self.weights = weights
- self.subProblems = []
- for i in range(self.mu):
- # compute weight sum from solution
- sub_evaluator = lambda _solution: moEvaluator(
- _solution, _evaluator, weights[i])
- # intialize each sub problem
- subProblem = MOSubProblem(i, weights[i], _initalizer,
- sub_evaluator, _operators, _policy,
- _validator, _maximise, self)
- self.subProblems.append(subProblem)
- self.population = [None for n in range(self.mu)]
- self.pfPop = []
- # ref point based on number of evaluators
- if self.maximise:
- self.refPoint = [0 for _ in range(self.nObjectives)]
- else:
- self.refPoint = [
- sys.float_info.max for _ in range(self.nObjectives)
- ]
- def initRun(self):
- """
- Method which initialiazes or re-initializes the whole algorithm context specifically for MOEAD
- """
- # initialization is done during run method
- pass
- def run(self, _evaluations):
- """
- Run the local search algorithm
- Args:
- _evaluations: {int} -- number of Local search evaluations
-
- Returns:
- {Solution} -- best solution found
- """
- # by default use of mother method to initialize variables
- super().run(_evaluations)
- # initialize each sub problem
- for i in range(self.mu):
- self.subProblems[i].run(1)
- self.population[i] = self.subProblems[i].bestSolution
- self.pfPop.append(self.subProblems[i].bestSolution)
- # enable callback resume for MOEAD
- self.resume()
- # MOEAD algorithm implementation
- while not self.stop():
- for i in range(self.mu):
- # run 1 iteration into sub problem `i`
- self.subProblems[i].run(1)
- spBestSolution = self.subProblems[i].bestSolution
- self.updateRefPoint(spBestSolution)
- # for each neighbor of current sub problem update solution if better
- improvment = False
- for j in self.neighbors[i]:
- if spBestSolution.fitness(
- ) > self.subProblems[j].bestSolution.fitness():
- # create new solution based on current new if better, computes fitness associated to new solution for sub problem
- class_name = type(spBestSolution).__name__
- newSolution = getattr(
- globals()['macop.solutions.' + class_name],
- class_name)(spBestSolution.data,
- len(spBestSolution.data))
- # evaluate solution for new sub problem and update as best solution
- self.subProblems[j].evaluate(newSolution)
- self.subProblems[j].bestSolution = newSolution
- # update population solution for this sub problem
- self.population[j] = newSolution
- improvment = True
- # add new solution if improvment is idenfity
- if improvment:
- self.pfPop.append(spBestSolution)
- # update pareto front
- self.pfPop = self.paretoFront(self.pfPop)
- # add progress here
- self.progress()
- # stop algorithm if necessary
- if self.stop():
- break
- logging.info("End of %s, best solution found %s" %
- (type(self).__name__, self.population))
- self.end()
- return self.pfPop
- def progress(self):
- """
- Log progress and apply callbacks if necessary
- """
- if len(self.callbacks) > 0:
- for callback in self.callbacks:
- callback.run()
- macop_progress(self.getGlobalEvaluation(),
- self.getGlobalMaxEvaluation())
- logging.info(
- "-- %s evaluation %s of %s (%s%%)" %
- (type(self).__name__, self.numberOfEvaluations,
- self.maxEvaluations, "{0:.2f}".format(
- (self.numberOfEvaluations) / self.maxEvaluations * 100.)))
- def setNeighbors(self):
- dmin = dmax = 0
- if self.T % 2 == 1:
- dmin = -int(self.T / 2)
- dmax = int(self.T / 2) + 1
- else:
- dmin = -int(self.T / 2) + 1
- dmax = +self.T / 2
- # init neighbord list
- self.neighbors = [[] for n in range(self.mu)]
- for direction in range(0, -dmin):
- for i in range(self.T):
- self.neighbors[direction].append(i)
- for direction in range(-dmin, self.mu - dmax):
- for i in range(direction + dmin, direction + dmax):
- self.neighbors[direction].append(i)
- for direction in range(self.mu - dmax, self.mu):
- for i in range(self.mu - self.T, self.mu):
- self.neighbors[direction].append(i)
- def updateRefPoint(self, _solution):
- if self.maximise:
- for i in range(len(self.evaluator)):
- if _solution.scores[i] > self.refPoint[i]:
- self.refPoint[i] = _solution.scores[i]
- else:
- for i in range(len(self.evaluator)):
- if _solution.scores[i] < self.refPoint[i]:
- self.refPoint[i] = _solution.scores[i]
- def paretoFront(self, _population):
- paFront = []
- indexes = []
- nObjectives = len(self.evaluator)
- nSolutions = len(_population)
- # find dominated solution
- for i in range(nSolutions):
- for j in range(nSolutions):
- if j in indexes:
- continue
- nDominated = 0
- # check number of dominated objectives of current solution by the others solution
- for k in range(len(self.evaluator)):
- if self.maximise:
- if _population[i].scores[k] < _population[j].scores[k]:
- nDominated += 1
- else:
- if _population[i].scores[k] > _population[j].scores[k]:
- nDominated += 1
- if nDominated == nObjectives:
- indexes.append(i)
- break
- # store the non dominated solution into pareto front
- for i in range(nSolutions):
- if i not in indexes:
- paFront.append(_population[i])
- return paFront
- def end(self):
- """Display end message into `run` method
- """
- print(
- macop_text('({}) Found after {} evaluations'.format(
- type(self).__name__, self.numberOfEvaluations)))
- for i, solution in enumerate(self.pfPop):
- print(' - [{}] {} : {}'.format(i, solution.scores, solution))
- print(macop_line())
- def information(self):
- logging.info("-- Pareto front :")
- for i, solution in enumerate(self.pfPop):
- logging.info("-- %s] SCORE %s - %s" %
- (i, solution.scores, solution))
- def __str__(self):
- return "%s using %s" % (type(self).__name__, type(
- self.population).__name__)
|