"""Multi-objetive classes algorithm
"""
# main imports
import logging
import math
import numpy as np
import sys
from ..utils.progress import macop_text, macop_line, macop_progress
# module imports
from .base import Algorithm
from ..evaluators.discrete.multi import WeightedSum
[docs]class MOEAD(Algorithm):
"""Multi-Ojective Evolutionary Algorithm with Scalar Decomposition
Attributes:
mu: {int} -- number of sub problems
T: {[float]} -- number of neightbors for each sub problem
nObjectives: {int} -- number of objectives (based of number evaluator)
initializer: {function} -- basic function strategy to initialize solution
evaluator: {[function]} -- list of basic function in order to obtained fitness (multiple objectives)
operators: {[Operator]} -- list of operator to use when launching algorithm
policy: {Policy} -- Policy class implementation strategy to select operators
validator: {function} -- basic function to check if solution is valid or not under some constraints
maximise: {bool} -- specify kind of optimisation problem
verbose: {bool} -- verbose or not information about the algorithm
population: [{Solution}] -- population of solution, one for each sub problem
pfPop: [{Solution}] -- pareto front population
weights: [[{float}]] -- random weights used for custom mu sub problems
callbacks: {[Callback]} -- list of Callback class implementation to do some instructions every number of evaluations and `load` when initializing algorithm
>>> import random
>>> # operators import
>>> from macop.operators.discrete.crossovers import SimpleCrossover
>>> from macop.operators.discrete.mutators import SimpleMutation
>>> # policy import
>>> from macop.policies.classicals import RandomPolicy
>>> # solution and algorithm
>>> from macop.solutions.discrete import BinarySolution
>>> from macop.algorithms.multi import MOEAD
>>> # evaluator import
>>> from macop.evaluators.discrete.mono import KnapsackEvaluator
>>> # evaluator initialization (worths objects passed into data)
>>> problem_size = 20
>>> worths1 = [ random.randint(0, 20) for i in range(problem_size) ]
>>> evaluator1 = KnapsackEvaluator(data={'worths': worths1})
>>> worths2 = [ random.randint(10, 15) for i in range(problem_size) ]
>>> evaluator2 = KnapsackEvaluator(data={'worths': worths2})
>>> # validator specification (based on weights of each objects)
>>> weights = [ random.randint(5, 30) for i in range(problem_size) ]
>>> validator = lambda solution: True if sum([weights[i] for i, value in enumerate(solution._data) if value == 1]) < 200 else False
>>> # initializer function with lambda function
>>> initializer = lambda x=20: BinarySolution.random(x, validator)
>>> # operators list with crossover and mutation
>>> operators = [SimpleCrossover(), SimpleMutation()]
>>> policy = RandomPolicy(operators)
>>> # MOEAD use multi-objective, hence list of evaluators with mu=100 and T=10
>>> algo = MOEAD(20, 5, initializer, [evaluator1, evaluator2], operators, policy, validator, maximise=True, verbose=False)
>>> # run the algorithm and get the pareto front obtained
>>> pf_solutions = algo.run(100)
>>> # check size of expected pareto
>>> len(pf_solutions)
33
"""
def __init__(self,
mu,
T,
initializer,
evaluator,
operators,
policy,
validator,
maximise=True,
parent=None,
verbose=True):
# redefinition of constructor to well use `initRun` method
self._initializer = initializer
self._evaluator = evaluator
self._operators = operators
self._policy = policy
self._validator = validator
self._callbacks = []
# by default
self._numberOfEvaluations = 0
self._maxEvaluations = 0
self._nObjectives = len(evaluator)
# other parameters
self._parent = parent # parent algorithm if it's sub algorithm
#self.maxEvaluations = 0 # by default
self._maximise = maximise
self._verbose = verbose
# track reference of algo into operator (keep an eye into best solution)
for operator in self._operators:
operator.setAlgo(self)
# by default track reference for policy
self._policy.setAlgo(self)
if mu < T:
raise ValueError('`mu` cannot be less than `T`')
if mu < T:
raise ValueError('`mu` cannot be less than `T`')
self._mu = mu
self._T = T
# initialize neighbors for each sub problem
self.setNeighbors()
weights = []
if self._nObjectives == 2:
for i in range(self._mu):
angle = math.pi / 2 * i / (self._mu - 1)
weights.append([math.cos(angle), math.sin(angle)])
elif self._nObjectives >= 3:
# random weights using uniform
for i in range(self._mu):
w_i = np.random.uniform(0, 1, self._nObjectives)
weights.append(w_i / sum(w_i))
else:
raise ValueError('Unvalid number of objectives')
self._weights = weights
self._subProblems = []
for i in range(self._mu):
# compute weight sum from solution
sub_evaluator = WeightedSum(data={'evaluators': evaluator, 'weights': weights[i]})
# intialize each sub problem
# use copy of list to keep track for each sub problem
subProblem = MOSubProblem(i, weights[i],
initializer, sub_evaluator,
operators.copy(), policy, validator,
maximise, self, verbose=self._verbose)
self._subProblems.append(subProblem)
self._population = [None for n in range(self._mu)]
self._pfPop = []
# ref point based on number of evaluators
if self._maximise:
self._refPoint = [0 for _ in range(self._nObjectives)]
else:
self._refPoint = [
sys.float_info.max for _ in range(self._nObjectives)
]
[docs] def initRun(self):
"""
Method which initialiazes or re-initializes the whole algorithm context specifically for MOEAD
"""
# initialization is done during run method
pass
[docs] def run(self, evaluations):
"""
Run the local search algorithm
Args:
evaluations: {int} -- number of Local search evaluations
Returns:
{Solution} -- best solution found
"""
# by default use of mother method to initialize variables
super().run(evaluations)
# enable callback resume for MOEAD
self.resume()
# initialize each sub problem if no backup
for i in range(self._mu):
if self._subProblems[i]._bestSolution is None:
self._subProblems[i].run(1)
self._population[i] = self._subProblems[i]._bestSolution
# if no backup for pf population
if len(self._pfPop) == 0:
for i in range(self._mu):
self._pfPop.append(self._subProblems[i]._bestSolution)
# MOEAD algorithm implementation
while not self.stop():
for i in range(self._mu):
# run 1 iteration into sub problem `i`
self._subProblems[i].run(1)
spBestSolution = self._subProblems[i]._bestSolution
self.updateRefPoint(spBestSolution)
# for each neighbor of current sub problem update solution if better
improvment = False
for j in self._neighbors[i]:
if spBestSolution.fitness() > self._subProblems[j]._bestSolution.fitness():
# create new solution based on current new if better, computes fitness associated to new solution for sub problem
newSolution = spBestSolution.clone()
# evaluate solution for new sub problem and update as best solution
self._subProblems[j].evaluate(newSolution)
self._subProblems[j]._bestSolution = newSolution
# update population solution for this sub problem
self._population[j] = newSolution
improvment = True
# add new solution if improvment is idenfity
if improvment:
self._pfPop.append(spBestSolution)
# update pareto front
self._pfPop = self.paretoFront(self._pfPop)
# add progress here
self.progress()
# stop algorithm if necessary
if self.stop():
break
logging.info(f"End of {type(self).__name__}, best solution found {self._population}")
self.end()
return self._pfPop
[docs] def progress(self):
"""
Log progress and apply callbacks if necessary
"""
if len(self._callbacks) > 0:
for callback in self._callbacks:
callback.run()
macop_progress(self, self.getGlobalEvaluation(), self.getGlobalMaxEvaluation())
logging.info(f"-- {type(self).__name__} evaluation {self._numberOfEvaluations} of {self._maxEvaluations} ({((self._numberOfEvaluations) / self._maxEvaluations * 100.):.2f}%)")
def setNeighbors(self):
if self._T % 2 == 1:
dmin = -int(self._T / 2)
dmax = int(self._T / 2) + 1
else:
dmin = -int(self._T / 2) + 1
dmax = int(+self._T / 2)
# init neighbord list
self._neighbors = [[] for n in range(self._mu)]
for direction in range(0, -dmin):
for i in range(self._T):
self._neighbors[direction].append(i)
for direction in range(-dmin, self._mu - dmax):
for i in range(direction + dmin, direction + dmax - 1):
self._neighbors[direction].append(i)
for direction in range(self._mu - dmax, self._mu):
for i in range(self._mu - self._T, self._mu):
self._neighbors[direction].append(i)
def updateRefPoint(self, solution):
if self._maximise:
for i in range(len(self._evaluator)):
if solution._scores[i] > self._refPoint[i]:
self._refPoint[i] = solution._scores[i]
else:
for i in range(len(self._evaluator)):
if solution.scores[i] < self._refPoint[i]:
self._refPoint[i] = solution._scores[i]
def paretoFront(self, population):
paFront = []
indexes = []
nObjectives = len(self._evaluator)
nSolutions = len(population)
# find dominated solution
for i in range(nSolutions):
for j in range(nSolutions):
if j in indexes:
continue
# check if solutions are the same
if all([
population[i]._data[k] == population[j]._data[k]
for k in range(len(population[i]._data))
]):
continue
nDominated = 0
# check number of dominated objectives of current solution by the others solution
for k in range(len(self._evaluator)):
if self._maximise:
if population[i]._scores[k] < population[j]._scores[k]:
nDominated += 1
else:
if population[i]._scores[k] > population[j]._scores[k]:
nDominated += 1
if nDominated == nObjectives:
indexes.append(i)
break
# store the non dominated solution into pareto front
for i in range(nSolutions):
if i not in indexes:
paFront.append(population[i])
return paFront
[docs] def end(self):
"""Display end message into `run` method
"""
macop_text(self, f'({type(self).__name__}) Found after {self._numberOfEvaluations} evaluations')
for i, solution in enumerate(self._pfPop):
macop_text(self, f' - [{i}] {solution._scores} : {solution}')
macop_line(self)
def information(self):
logging.info("-- Pareto front :")
for i, solution in enumerate(self._pfPop):
logging.info(f"-- {i}] SCORE {solution._scores} - {solution}")
def __str__(self):
return f"{type(self).__name__} using {type(self._population).__name__}"
[docs]class MOSubProblem(Algorithm):
"""Specific MO sub problem used into MOEAD
Attributes:
index: {int} -- sub problem index
weights: {[float]} -- sub problems objectives weights
initalizer: {function} -- basic function strategy to initialize solution
evaluator: {function} -- basic function in order to obtained fitness (mono or multiple objectives)
operators: {[Operator]} -- list of operator to use when launching algorithm
policy: {Policy} -- Policy class implementation strategy to select operators
validator: {function} -- basic function to check if solution is valid or not under some constraints
maximise: {bool} -- specify kind of optimisation problem
verbose: {bool} -- verbose or not information about the algorithm
currentSolution: {Solution} -- current solution managed for current evaluation
bestSolution: {Solution} -- best solution found so far during running algorithm
callbacks: {[Callback]} -- list of Callback class implementation to do some instructions every number of evaluations and `load` when initializing algorithm
Example:
>>> import random
>>> # operators import
>>> from macop.operators.discrete.crossovers import SimpleCrossover
>>> from macop.operators.discrete.mutators import SimpleMutation
>>> # policy import
>>> from macop.policies.classicals import RandomPolicy
>>> # solution and algorithm
>>> from macop.solutions.discrete import BinarySolution
>>> from macop.algorithms.multi import MOEAD, MOSubProblem
>>> # evaluator import
>>> from macop.evaluators.discrete.mono import KnapsackEvaluator
>>> # evaluator initialization (worths objects passed into data)
>>> problem_size = 20
>>> worths1 = [ random.randint(0, 20) for i in range(problem_size) ]
>>> evaluator1 = KnapsackEvaluator(data={'worths': worths1})
>>> worths2 = [ random.randint(10, 15) for i in range(problem_size) ]
>>> evaluator2 = KnapsackEvaluator(data={'worths': worths2})
>>> # validator specification (based on weights of each objects)
>>> weights = [ random.randint(5, 30) for i in range(problem_size) ]
>>> validator = lambda solution: True if sum([weights[i] for i, value in enumerate(solution._data) if value == 1]) < 200 else False
>>> # initializer function with lambda function
>>> initializer = lambda x=20: BinarySolution.random(x, validator)
>>> # operators list with crossover and mutation
>>> operators = [SimpleCrossover(), SimpleMutation()]
>>> policy = RandomPolicy(operators)
>>> algo = MOEAD(20, 5, initializer, [evaluator1, evaluator2], operators, policy, validator, maximise=True, verbose=False)
>>> # weights of the sub problem
>>> sub_problem_weights = [0.4, 0.6]
>>> sub_evaluator = WeightedSum(data={'evaluators': [evaluator1, evaluator2], 'weights': sub_problem_weights})
>>> # first parameter is the index of the MOSubProblem
>>> subProblem = MOSubProblem(0, sub_problem_weights, initializer, sub_evaluator, operators, policy, validator, maximise=True, parent=algo, verbose=False)
>>> # run the algorithm
>>> solution = subProblem.run(100)
>>> solution._score
133.0
"""
def __init__(self,
index,
weights,
initalizer,
evaluator,
operators,
policy,
validator,
maximise=True,
parent=None,
verbose=True):
super().__init__(initalizer, evaluator, operators, policy,
validator, maximise, parent)
self._index = index
self._weights = weights
self._verbose = verbose
[docs] def run(self, evaluations):
"""
Run the local search algorithm
Args:
evaluations: {int} -- number of evaluations
Returns:
{Solution} -- best solution found
"""
# by default use of mother method to initialize variables
super().run(evaluations)
# initialize solution if necessary
if self._bestSolution is None:
self.initRun()
# new operators list keep track of current sub problem
for op in self._operators:
op.setAlgo(self)
for _ in range(evaluations):
# keep reference of sub problem used
self._policy.setAlgo(self)
# update solution using policy
newSolution = self.update(self._bestSolution)
# if better solution than currently, replace it
if self.isBetter(newSolution):
self._bestSolution = newSolution
# increase number of evaluations
self.increaseEvaluation()
self.progress()
# stop algorithm if necessary
if self.stop():
break
logging.info(f"---- Current {newSolution} - SCORE {newSolution.fitness()}")
logging.info(f"End of {type(self).__name__}, best solution found {self._bestSolution}")
return self._bestSolution