reinforcement.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. """Reinforcement learning policy classes implementations for Operator Selection Strategy
  2. """
  3. # main imports
  4. import logging
  5. import random
  6. import math
  7. import numpy as np
  8. # module imports
  9. from .base import Policy
  10. from ..operators.base import KindOperator
  11. class UCBPolicy(Policy):
  12. """Upper Confidence Bound (UCB) policy class which is used for applying UCB strategy when selecting and applying operator
  13. Rather than performing exploration by simply selecting an arbitrary action, chosen with a probability that remains constant,
  14. the UCB algorithm changes its exploration-exploitation balance as it gathers more knowledge of the environment.
  15. It moves from being primarily focused on exploration, when actions that have been tried the least are preferred,
  16. to instead concentrate on exploitation, selecting the action with the highest estimated reward.
  17. - Resource link: https://banditalgs.com/2016/09/18/the-upper-confidence-bound-algorithm/
  18. Attributes:
  19. operators: {[Operator]} -- list of selected operators for the algorithm
  20. C: {float} -- The second half of the UCB equation adds exploration, with the degree of exploration being controlled by the hyper-parameter `C`.
  21. exp_rate: {float} -- exploration rate (probability to choose randomly next operator)
  22. rewards: {[float]} -- list of summed rewards obtained for each operator
  23. occurrences: {[int]} -- number of use (selected) of each operator
  24. Example:
  25. >>> # operators import
  26. >>> from macop.operators.discrete.crossovers import SimpleCrossover
  27. >>> from macop.operators.discrete.mutators import SimpleMutation
  28. >>> # policy import
  29. >>> from macop.policies.reinforcement import UCBPolicy
  30. >>> # solution and algorithm
  31. >>> from macop.solutions.discrete import BinarySolution
  32. >>> from macop.algorithms.mono import IteratedLocalSearch
  33. >>> from macop.algorithms.mono import HillClimberFirstImprovment
  34. >>> # evaluator import
  35. >>> from macop.evaluators.discrete.mono import KnapsackEvaluator
  36. >>> # evaluator initialization (worths objects passed into data)
  37. >>> worths = [ random.randint(0, 20) for i in range(20) ]
  38. >>> evaluator = KnapsackEvaluator(data={'worths': worths})
  39. >>> # validator specification (based on weights of each objects)
  40. >>> weights = [ random.randint(5, 30) for i in range(20) ]
  41. >>> validator = lambda solution: True if sum([weights[i] for i, value in enumerate(solution._data) if value == 1]) < 200 else False
  42. >>> # initializer function with lambda function
  43. >>> initializer = lambda x=20: BinarySolution.random(x, validator)
  44. >>> # operators list with crossover and mutation
  45. >>> operators = [SimpleCrossover(), SimpleMutation()]
  46. >>> policy = UCBPolicy(operators)
  47. >>> local_search = HillClimberFirstImprovment(initializer, evaluator, operators, policy, validator, maximise=True, verbose=False)
  48. >>> algo = IteratedLocalSearch(initializer, evaluator, operators, policy, validator, localSearch=local_search, maximise=True, verbose=False)
  49. >>> policy._occurences
  50. [0, 0]
  51. >>> solution = algo.run(100)
  52. >>> type(solution).__name__
  53. 'BinarySolution'
  54. >>> policy._occurences # one more due to first evaluation
  55. [51, 53]
  56. """
  57. def __init__(self, operators, C=100., exp_rate=0.5):
  58. self._operators = operators
  59. self._rewards = [0. for o in self._operators]
  60. self._occurences = [0 for o in self._operators]
  61. self._C = C
  62. self._exp_rate = exp_rate
  63. def select(self):
  64. """Select using Upper Confidence Bound the next operator to use (using acquired rewards)
  65. Returns:
  66. {Operator}: the selected operator
  67. """
  68. indices = [i for i, o in enumerate(self._occurences) if o == 0]
  69. # random choice following exploration rate
  70. if np.random.uniform(0, 1) <= self._exp_rate:
  71. index = random.choice(range(len(self._operators)))
  72. return self._operators[index]
  73. elif len(indices) == 0:
  74. # if operator have at least be used one time
  75. ucbValues = []
  76. nVisits = sum(self._occurences)
  77. for i in range(len(self._operators)):
  78. ucbValue = self._rewards[i] + self._C * math.sqrt(
  79. math.log(nVisits) / (self._occurences[i] + 0.1))
  80. ucbValues.append(ucbValue)
  81. return self._operators[ucbValues.index(max(ucbValues))]
  82. else:
  83. return self._operators[random.choice(indices)]
  84. def apply(self, solution1, solution2=None):
  85. """
  86. Apply specific operator chosen to create new solution, computes its fitness and returns solution
  87. - fitness improvment is saved as rewards
  88. - selected operator occurence is also increased
  89. Args:
  90. solution1: {Solution} -- the first solution to use for generating new solution
  91. solution2: {Solution} -- the second solution to use for generating new solution (in case of specific crossover, default is best solution from algorithm)
  92. Returns:
  93. {Solution} -- new generated solution
  94. """
  95. operator = self.select()
  96. logging.info("---- Applying %s on %s" %
  97. (type(operator).__name__, solution1))
  98. # default value of solution2 is current best solution
  99. if solution2 is None and self._algo is not None:
  100. solution2 = self._algo._bestSolution
  101. # avoid use of crossover if only one solution is passed
  102. if solution2 is None and operator._kind == KindOperator.CROSSOVER:
  103. while operator._kind == KindOperator.CROSSOVER:
  104. operator = self.select()
  105. # apply operator on solution
  106. if operator._kind == KindOperator.CROSSOVER:
  107. newSolution = operator.apply(solution1, solution2)
  108. else:
  109. newSolution = operator.apply(solution1)
  110. # compute fitness of new solution
  111. newSolution.evaluate(self._algo._evaluator)
  112. # compute fitness improvment rate
  113. if self._algo._maximise:
  114. fir = (newSolution.fitness() -
  115. solution1.fitness()) / solution1.fitness()
  116. else:
  117. fir = (solution1.fitness() -
  118. newSolution.fitness()) / solution1.fitness()
  119. operator_index = self._operators.index(operator)
  120. if fir > 0:
  121. self._rewards[operator_index] += fir
  122. self._occurences[operator_index] += 1
  123. logging.info("---- Obtaining %s" % (newSolution))
  124. return newSolution