From a68be85e6e2102fb027c170f7cf380bc436227b0 Mon Sep 17 00:00:00 2001 From: Pavel Perestoronin Date: Wed, 9 Jan 2019 17:28:16 +0100 Subject: [PATCH] Unified arena solver for arenas --- CHANGELOG.md | 4 + bestmobabot/arena.py | 445 +++++---------------- bestmobabot/bot.py | 70 ++-- bestmobabot/dataclasses_.py | 9 +- bestmobabot/itertools_.py | 63 ++- bestmobabot/settings.py | 23 +- setup.cfg | 1 + setup.py | 2 +- tests/{test_arena.py => test_itertools.py} | 19 +- 9 files changed, 245 insertions(+), 391 deletions(-) rename tests/{test_arena.py => test_itertools.py} (86%) diff --git a/CHANGELOG.md b/CHANGELOG.md index 252ed5e..afd1272 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +# `2.4b3` + +* **Unified arena solver for arenas** + # `2.4b2` * Fix critical bug in `bestmobabot.dataclasses_.ShopSlot` diff --git a/bestmobabot/arena.py b/bestmobabot/arena.py index 08b986d..854a565 100644 --- a/bestmobabot/arena.py +++ b/bestmobabot/arena.py @@ -4,12 +4,10 @@ from __future__ import annotations -import math -from abc import ABC, abstractmethod -from functools import lru_cache -from itertools import chain, combinations, count, product -from operator import itemgetter -from typing import Any, Callable, Dict, Generic, Iterable, List, Optional, Tuple, TypeVar +from dataclasses import dataclass +from functools import total_ordering +from itertools import combinations, count, product +from typing import Any, Callable, Dict, Iterable, List, Optional, TypeVar import numpy from loguru import logger @@ -18,251 +16,50 @@ # noinspection PyUnresolvedReferences from numpy.random import choice, permutation, randint -from bestmobabot.constants import GRAND_SIZE, TEAM_SIZE -from bestmobabot.dataclasses_ import ArenaEnemy, BaseArenaEnemy, GrandArenaEnemy, Hero, Team -from bestmobabot.itertools_ import CoolDown +from bestmobabot.constants import TEAM_SIZE +from bestmobabot.dataclasses_ import BaseArenaEnemy, Hero, Team +from bestmobabot.itertools_ import CountDown, secretary_max, slices from bestmobabot.model import Model -from bestmobabot.settings import Settings -from dataclasses import dataclass T = TypeVar('T') -T1 = TypeVar('T1') -T2 = TypeVar('T2') - - -# Enemy selection. -# ---------------------------------------------------------------------------------------------------------------------- - -TEnemy = TypeVar('TEnemy', ArenaEnemy, GrandArenaEnemy) -TAttackers = TypeVar('TAttackers') - - -class AbstractArena(ABC, Generic[TEnemy, TAttackers]): - probability_getter = itemgetter(2) - - def __init__( - self, - *, - model: Model, - user_clan_id: Optional[str], - heroes: List[Hero], - get_enemies_page: Callable[[], List[TEnemy]], - settings: Settings, - ): - self.model = model - self.user_clan_id = user_clan_id - self.heroes = heroes - self.get_enemies_page = get_enemies_page - self.settings = settings - - self.cache: Dict[str, Tuple[TAttackers, float]] = {} - - def select_enemy(self) -> Tuple[TEnemy, TAttackers, float]: - return secretary_max( - self.iterate_enemies_pages(), - self.max_iterations, - key=self.probability_getter, - early_stop=self.settings.bot.arena.early_stop, - ) - - def iterate_enemies_pages(self) -> Iterable[Tuple[TEnemy, TAttackers, float]]: - while True: - enemies: List[Tuple[TEnemy, TAttackers, float]] = list(self.iterate_enemies(self.get_enemies_page())) - # Yes, sometimes all enemies are filtered out. - if enemies: - yield max(enemies, key=self.probability_getter) - - def iterate_enemies(self, enemies: Iterable[TEnemy]) -> Tuple[TEnemy, TAttackers, float]: - logger.info('Estimating win probability…') - for enemy in enemies: - if enemy.user is None: - # Some enemies don't have user assigned. Filter them out. - logger.debug(f'Skipped empty user.') - continue - if self.user_clan_id and enemy.user.is_from_clans((self.user_clan_id,)): - logger.debug(f'Skipped same clan: «{enemy.user.name}».') - continue - if enemy.user.is_from_clans(self.settings.bot.arena.skip_clans): - logger.debug(f'Skipped configured clan: «{enemy.user.clan_title}».') - continue - - # It appears that often enemies are repeated during the search. So don't repeat computations. - if enemy.user.id in self.cache: - attackers, probability = self.cache[enemy.user.id] - logger.info(f'«{enemy.user.name}» from «{enemy.user.clan_title}»: {100.0 * probability:.1f}% (cached)') - else: - attackers, probability = self.select_attackers(enemy) # type: TAttackers, float - self.cache[enemy.user.id] = attackers, probability - yield (enemy, attackers, probability) - - def make_hero_features(self, hero: Hero) -> ndarray: - """ - Make hero features 1D-array. - """ - return fromiter((hero.features.get(name, 0.0) for name in self.model.feature_names), numpy.float) - - def make_team_features(self, heroes: Iterable[Hero]) -> ndarray: - """ - Make team features 2D-array. Shape is number of heroes × number of features. - """ - return vstack(self.make_hero_features(hero) for hero in heroes) - - @abstractmethod - def select_attackers(self, enemy: TEnemy) -> Tuple[TAttackers, float]: - raise NotImplementedError - - @property - @abstractmethod - def max_iterations(self): - raise NotImplementedError - - -class Arena(AbstractArena[ArenaEnemy, List[Hero]]): - def __init__(self, **kwargs): - super().__init__(**kwargs) - self.n_teams_limit = self.settings.bot.arena.teams_limit - logger.info(f'Teams count limit: {self.n_teams_limit}.') - - @property - def max_iterations(self): - return self.settings.bot.arena.max_pages - - def select_attackers(self, enemy: ArenaEnemy) -> Tuple[List[Hero], float]: - """ - Select attackers for the given enemy to maximise win probability. - """ - - hero_combinations_ = hero_combinations(len(self.heroes)) - - # Select top N candidates by team power. - selected_combinations: ndarray = numpy \ - .array([hero.power for hero in self.heroes])[hero_combinations_] \ - .sum(axis=1) \ - .argpartition(-self.n_teams_limit)[-self.n_teams_limit:] - hero_combinations_ = hero_combinations_[selected_combinations] - - # Construct features. - x: ndarray = ( - self.make_team_features(self.heroes)[hero_combinations_].sum(axis=1) - - self.make_team_features(enemy.heroes).sum(axis=0) - ) - - # Select top combination by win probability. - y: ndarray = self.model.estimator.predict_proba(x)[:, 1] - index: int = y.argmax() - logger.info(f'«{enemy.user.name}» from «{enemy.user.clan_title}»: {100.0 * y[index]:.1f}%') - return [self.heroes[i] for i in hero_combinations_[index]], y[index] - - -class GrandArena(AbstractArena[GrandArenaEnemy, List[List[Hero]]]): - def __init__(self, **kwargs): - super().__init__(**kwargs) - self.n_generate_solutions = self.settings.bot.arena.grand_generate_solutions - self.n_keep_solutions = self.settings.bot.arena.grand_keep_solutions - - # We keep solutions in an attribute because we want to retry the best solutions across different enemies. - logger.trace('Generating initial solutions…') - self.solutions: ndarray = vstack( - permutation(len(self.heroes)) - for _ in range(self.n_keep_solutions) - ) - - @property - def max_iterations(self): - return self.settings.bot.arena.max_grand_pages - - def select_attackers(self, enemy: GrandArenaEnemy) -> Tuple[List[List[Hero]], float]: - logger.trace(f'Selecting attackers for «{enemy.user.name}»…') - - n_heroes = len(self.heroes) - hero_features = self.make_team_features(self.heroes) - defenders_features = [self.make_team_features(heroes).sum(axis=0) for heroes in enemy.heroes] - - # Used to select separate teams from the population array. - team_selectors = (slice(0, 5), slice(5, 10), slice(10, 15)) - - # Possible permutations of a single solution. - # Each permutation swaps two particular elements so that the heroes are moved to or from the teams. - swaps = vstack(swap_permutation(n_heroes, i, j) for i, j in chain( - product(range(0, 5), range(5, 10)), - product(range(0, 5), range(10, 15)), - product(range(5, 10), range(10, 15)), - product(range(GRAND_SIZE), range(GRAND_SIZE, n_heroes)), - )) - - # Let's evolve. - max_index: int = None - max_probability = 0.0 - cool_down = CoolDown(count(1), self.settings.bot.arena.grand_generations_cool_down) - - for n_generation in cool_down: - # Generate new solutions. - # Choose random solutions from the population and apply a random permutation to each of them. - new_permutations = swaps[randint(0, swaps.shape[0], self.n_generate_solutions)] - new_solutions = self.solutions[ - choice(self.solutions.shape[0], self.n_generate_solutions).reshape(-1, 1), - new_permutations - ] - - # Stack past solutions with the new ones. - self.solutions = vstack((self.solutions, new_solutions)) - - # Predict probabilities. - # Call to `predict_proba` is expensive, thus call it for all the grand teams at once. Stack and split. - x = vstack( - hero_features[self.solutions[:, selector]].sum(axis=1) - defender_features - for selector, defender_features in zip(team_selectors, defenders_features) - ) - p1, p2, p3 = numpy.split(self.model.estimator.predict_proba(x)[:, 1], 3) - probabilities = p1 * p2 * p3 + p1 * p2 * (1.0 - p3) + p2 * p3 * (1.0 - p1) + p1 * p3 * (1.0 - p2) - - # Select top ones for the next iteration. - # See also: https://stackoverflow.com/a/23734295/359730 - top_indexes = probabilities.argpartition(-self.n_keep_solutions)[-self.n_keep_solutions:] - self.solutions = self.solutions[top_indexes, :] - probabilities = probabilities[top_indexes] - p1, p2, p3 = p1[top_indexes], p2[top_indexes], p3[top_indexes] - - # Select the best one. - max_index = probabilities.argmax() - if probabilities[max_index] - max_probability > 0.00001: - # Improved solution. Give the optimizer another chance to beat the best solution. - cool_down.reset() - max_probability = probabilities[max_index] - logger.trace(f'Generation {n_generation:2}: {100.0 * max_probability:.2f}%{" +" if cool_down.is_fresh else ""}') # noqa - - # I'm feeling lucky! - if max_probability > 0.99999: - break - - # noinspection PyUnboundLocalVariable - logger.info( - f'«{enemy.user.name}» from «{enemy.user.clan_title}»:' - f' {100.0 * max_probability:.2f}%' - f' ({100 * p1[max_index]:.1f}% {100 * p2[max_index]:.1f}% {100 * p3[max_index]:.1f}%)' - ) - - return [ - [self.heroes[i] for i in self.solutions[max_index, selector]] - for selector in team_selectors - ], max_probability # Universal arena solver. -# TODO: work in progress. # ---------------------------------------------------------------------------------------------------------------------- @dataclass +@total_ordering class ArenaSolution: - enemy: BaseArenaEnemy - attackers: List[Team] - probability: float + enemy: BaseArenaEnemy # selected enemy + attackers: List[Team] # player's attacker teams + probability: float # arena win probability + probabilities: List[float] # individual battle win probabilities + + def log(self): + logger.success('Solution:') + logger.success('{}', self) + for i, (defenders, attackers) in enumerate(zip(self.enemy.teams, self.attackers), start=1): + logger.success('') + logger.success('Defenders #{}', i) + for hero in sorted(defenders, reverse=True): + logger.success('{}', hero) + logger.success('Attackers #{}', i) + for hero in sorted(attackers, reverse=True): + logger.success('{}', hero) def __lt__(self, other: ArenaSolution) -> Any: if isinstance(other, ArenaSolution): return self.probability < other.probability + if isinstance(other, float): + return self.probability < other return NotImplemented + def __str__(self): + return ( + f'{self.enemy}: {100.0 * self.probability:.1f}%' + f' ({" ".join(f"{100 * probability:.1f}%" for probability in self.probabilities)})' + ) + class ArenaSolver: """ @@ -278,11 +75,11 @@ def __init__( max_iterations: int, n_keep_solutions: int, n_generate_solutions: int, - n_generations_cool_down: int, + n_generations_count_down: int, early_stop: float, get_enemies: Callable[[], List[BaseArenaEnemy]], friendly_clans: Iterable[str], - reduce_probabilities: Callable[[List[ndarray]], ndarray], + reduce_probabilities: Callable[..., ndarray], ): """ :param model: prediction model. @@ -291,7 +88,7 @@ def __init__( :param max_iterations: maximum number of `get_enemies` calls. :param n_keep_solutions: number of the best kept solutions from each generation. :param n_generate_solutions: number of newly generated solutions in each generation. - :param n_generations_cool_down: for how much generations should a solution stay the best. + :param n_generations_count_down: for how much generations should a solution stay the best. :param early_stop: minimal probability to attack the enemy immediately. :param get_enemies: callable to fetch an enemy page. :param friendly_clans: friendly clan IDs or titles. @@ -304,7 +101,7 @@ def __init__( self.max_iterations = max_iterations self.n_keep_solutions = n_keep_solutions self.n_generate_solutions = n_generate_solutions - self.n_generations_cool_down = n_generations_cool_down + self.n_generations_count_down = n_generations_count_down self.early_stop = early_stop self.get_enemies = get_enemies self.friendly_clans = set(friendly_clans) @@ -329,7 +126,7 @@ def yield_solutions(self) -> Iterable[ArenaSolution]: logger.debug('Fetching enemies…') enemies = list(self.filter_enemies(self.get_enemies())) if enemies: - yield max(self.solve_enemy(enemy) for enemy in enemies) + yield max(self.solve_enemy_cached(enemy) for enemy in enemies) else: logger.debug('All enemies are filtered out on the current page.') @@ -345,40 +142,60 @@ def filter_enemies(self, enemies: Iterable[BaseArenaEnemy]) -> Iterable[BaseAren logger.info('Skipped enemy «{}» from your clan.', enemy.user.name) continue if enemy.user.is_from_clans(self.friendly_clans): - logger.info('Skipped enemy «{user.name}» from «{user.clan_title}».', user=enemy.user) + logger.info('Skipped enemy {}.', enemy.user) continue yield enemy + def solve_enemy_cached(self, enemy: BaseArenaEnemy) -> ArenaSolution: + """ + Makes use of the solution cache for repeated enemies. + """ + solution = self.cache.get(enemy.user_id) + if not solution: + solution = self.solve_enemy(enemy) + self.cache[enemy.user_id] = solution + else: + logger.debug('Cache hit: #{}.', enemy.user_id) + logger.success('{}', solution) + return solution + def solve_enemy(self, enemy: BaseArenaEnemy) -> ArenaSolution: """ Finds solution for the single enemy. """ - logger.debug('Solving arena for «{user.name}» from «{user.clan_title}»…', user=enemy.user) + logger.debug('Solving arena for {}…', enemy) n_heroes = len(self.heroes) - n_teams = len(enemy.teams) + n_teams = len(enemy.teams) # we will generate the same number of attacker teams + n_attackers = n_teams * TEAM_SIZE + hero_features = self.make_team_features(self.heroes) defenders_features = [self.make_team_features(team).sum(axis=0) for team in enemy.teams] - # Used to select separate teams from the population array. + # Used to speed up selection of separate attacker teams from the solutions array. team_selectors = slices(n_teams, TEAM_SIZE) - # Possible (per)mutations of a single solution. - # Each permutation swaps two particular elements so that the heroes are replaced between the teams. - swaps = vstack(swap_permutation(n_heroes, i, j) for i, j in chain( - product(range(0, 5), range(5, 10)), - product(range(0, 5), range(10, 15)), - product(range(5, 10), range(10, 15)), - product(range(GRAND_SIZE), range(GRAND_SIZE, n_heroes)), - )) # FIXME + # Generate all possible (per)mutations of a single solution. + # We will use it to speed up mutation process by selecting random rows from the `swaps` array. + # Each permutation swaps two particular elements so that the heroes are interchanged in the teams. + # In total `n_teams + 1` groups. + groups = [ + *[range(selector.start, selector.stop) for selector in team_selectors], + range(n_attackers, n_heroes), # fake group to keep unused heroes in + ] + logger.trace('{} hero groups.', len(groups)) + swaps = vstack( + swap_permutation(n_heroes, i, j) # swap these two heroes + for group_1, group_2 in combinations(groups, 2) # select two groups to interchange heroes in + for i, j in product(group_1, group_2) # select particular indexes to interchange + ) + logger.trace('Swaps shape: {}.', swaps.shape) # Let's evolve. - cool_down = CoolDown(count(1), self.n_generations_cool_down) - ys: List[ndarray] = [] - max_index: int = None - max_probability = 0.0 + count_down = CountDown(count(1), self.n_generations_count_down) + solution = ArenaSolution(enemy=enemy, attackers=[], probability=0.0, probabilities=[]) - for n_generation in cool_down: + for n_generation in count_down: # Generate new solutions. # Choose random solutions from the population and apply a random permutation to each of them. new_permutations = swaps[randint(0, swaps.shape[0], self.n_generate_solutions)] @@ -397,47 +214,43 @@ def solve_enemy(self, enemy: BaseArenaEnemy) -> ArenaSolution: for selector, defender_features in zip(team_selectors, defenders_features) ) ys = numpy.split(self.model.estimator.predict_proba(x)[:, 1], n_teams) - probabilities = self.reduce_probabilities(ys) # vector of reduced probabilities - # Select top ones for the next iteration. + # Convert individual battle probabilities to the final arena battle probabilities. + y_reduced = self.reduce_probabilities(*ys) + + # Select top solutions for the next iteration. # See also: https://stackoverflow.com/a/23734295/359730 - top_indexes = probabilities.argpartition(-self.n_keep_solutions)[-self.n_keep_solutions:] + top_indexes = y_reduced.argpartition(-self.n_keep_solutions)[-self.n_keep_solutions:] + + # All the arrays must be cut to the top indexes, otherwise their rows won't correspond to each other. self.solutions = self.solutions[top_indexes, :] - probabilities = probabilities[top_indexes] - - # Select the best one. - max_index = probabilities.argmax() - if probabilities[max_index] - max_probability > 0.00001: - # Improved solution. Give the optimizer another chance to beat the best solution. - cool_down.reset() - max_probability = probabilities[max_index] - logger.trace( - 'Generation {:2}: {:.2f}%{}', - n_generation, - 100.0 * max_probability, - (" +" if cool_down.is_fresh else ""), + y_reduced = y_reduced[top_indexes] + ys = [y[top_indexes] for y in ys] + + # Select the best solution of this generation. + old_probability = solution.probability + max_index = y_reduced.argmax() + solution = ArenaSolution( + enemy=enemy, + attackers=[ + [self.heroes[i] for i in self.solutions[max_index, selector]] + for selector in team_selectors + ], + probability=y_reduced[max_index], + probabilities=[y[max_index] for y in ys], ) + if solution.probability - old_probability >= 0.00001: + # The solution has been improved. Give the optimizer another chance to beat it. + count_down.reset() + logger.trace('Bump: +{:.3f}%.', 100.0 * (solution.probability - old_probability)) + logger.trace('Generation {:2}: {:.2f}% ({:d})', n_generation, 100.0 * solution.probability, int(count_down)) # I'm feeling lucky! - if max_probability > 0.99999: + # It makes sense to stop if the probability is already close to 100%. + if solution.probability > 0.99999: break - logger.info( - '«{}» from «{}»: {:.1f}% ({})', - enemy.user.name, - enemy.user.clan_title, - 100.0 * max_probability, - ' '.join(f'{100 * y[max_index]:.1f}%' for y in ys), - ) - - return ArenaSolution( - enemy=enemy, - attackers=[ - [self.heroes[i] for i in self.solutions[max_index, selector]] - for selector in team_selectors - ], - probability=max_probability, - ) + return solution def make_hero_features(self, hero: Hero) -> ndarray: """ @@ -456,57 +269,21 @@ def make_team_features(self, team: Team) -> ndarray: # Utilities. # ---------------------------------------------------------------------------------------------------------------------- -@lru_cache(maxsize=1) -def hero_combinations(hero_count: int) -> ndarray: - """ - Used to generate indexes of possible heroes in a team. - It it cached because hero count rarely changes. - """ - return fromiter(chain.from_iterable(combinations(range(hero_count), TEAM_SIZE)), dtype=int).reshape(-1, TEAM_SIZE) # noqa - - def swap_permutation(size: int, index_1: int, index_2: int) -> ndarray: permutation = arange(size) permutation[[index_1, index_2]] = permutation[[index_2, index_1]] return permutation -def ranges(n_ranges: int, range_size: int) -> Iterable[range]: - return (range(i * range_size, (i + 1) * range_size) for i in range(n_ranges)) - - -def slices(n_slices: int, slice_size: int) -> Tuple[slice, ...]: - return tuple(slice(range_.start, range_.stop) for range_ in ranges(n_slices, slice_size)) +def reduce_normal_arena(y: ndarray) -> ndarray: + """ + For the normal one-battle arena it's just the probabilities themselves. + """ + return y -def secretary_max( - items: Iterable[T1], - n: int, - key: Optional[Callable[[T1], T2]] = None, - early_stop: T2 = None, -) -> T1: +def reduce_grand_arena(y1: ndarray, y2: ndarray, y3: ndarray) -> ndarray: """ - Select best item while lazily iterating over the items. - https://en.wikipedia.org/wiki/Secretary_problem#Deriving_the_optimal_policy + Gives probability to win at least two of three battles. """ - r = int(n / math.e) + 1 - - max_key = None - - for i, item in enumerate(items, start=1): - item_key = key(item) if key else item - # Check early stop condition. - if early_stop is not None and item_key >= early_stop: - return item - # If it's the last item, just return it. - if i == n: - return item - # Otherwise, check if the item is better than previous ones. - if max_key is None or item_key >= max_key: - if i >= r: - # Better than (r - 1) previous ones, return it. - return item - # Otherwise, update the best key. - max_key = item_key - - raise RuntimeError('unreachable code') + return y1 * y2 * y3 + y1 * y2 * (1.0 - y3) + y2 * y3 * (1.0 - y1) + y1 * y3 * (1.0 - y2) diff --git a/bestmobabot/bot.py b/bestmobabot/bot.py index 1dc9dd8..0e91676 100644 --- a/bestmobabot/bot.py +++ b/bestmobabot/bot.py @@ -13,12 +13,13 @@ import requests -from bestmobabot import arena, constants +from bestmobabot import constants from bestmobabot.api import API, AlreadyError, InvalidResponseError, NotEnoughError, NotFoundError, OutOfRetargetDelta +from bestmobabot.arena import ArenaSolver, reduce_grand_arena, reduce_normal_arena from bestmobabot.database import Database from bestmobabot.dataclasses_ import Hero, Mission, Quests, Replay, User from bestmobabot.enums import BattleType -from bestmobabot.logging_ import log_heroes, log_rewards, logger +from bestmobabot.logging_ import log_rewards, logger from bestmobabot.model import Model from bestmobabot.resources import get_heroic_mission_ids, mission_name, shop_name from bestmobabot.settings import Settings @@ -128,7 +129,7 @@ def start(self): Task(next_run_at=Task.at(hour=21, minute=30, tz=self.user.tz), execute=self.farm_quests), # Recurring tasks. - Task(next_run_at=Task.every_n_minutes(24 * 60 // 5, self.settings.bot.arena.schedule_offset), execute=self.attack_arena), # noqa + Task(next_run_at=Task.every_n_minutes(24 * 60 // 5, self.settings.bot.arena.schedule_offset), execute=self.attack_normal_arena), # noqa Task(next_run_at=Task.every_n_minutes(24 * 60 // 5, self.settings.bot.arena.schedule_offset), execute=self.attack_grand_arena), # noqa Task(next_run_at=Task.every_n_hours(6), execute=self.farm_mail), Task(next_run_at=Task.every_n_hours(6), execute=self.check_freebie), @@ -332,37 +333,42 @@ def train_arena_model(self): n_last_battles=self.settings.bot.arena.last_battles, ).train() - def attack_arena(self): + # FIXME: refactor. + def attack_normal_arena(self): """ Совершает бой на арене. """ - logger.info('Attacking arena…') + logger.info('Attacking normal arena…') model, heroes = self.check_arena(constants.TEAM_SIZE) # Pick an enemy and select attackers. - enemy, attackers, probability = arena.Arena( + solution = ArenaSolver( model=model, user_clan_id=self.user.clan_id, heroes=heroes, - get_enemies_page=self.api.find_arena_enemies, - settings=self.settings, - ).select_enemy() - - # Debugging. - logger.info(f'Enemy name: "{enemy.user.name}".') - logger.info(f'Enemy place: {enemy.place}.') - logger.info(f'Probability: {100.0 * probability:.1f}%.') - log_heroes('Attackers:', attackers) - log_heroes(f'Defenders:', enemy.heroes) + max_iterations=self.settings.bot.arena.normal_max_pages, + n_keep_solutions=self.settings.bot.arena.normal_keep_solutions, + n_generate_solutions=self.settings.bot.arena.normal_generate_solutions, + n_generations_count_down=self.settings.bot.arena.normal_generations_count_down, + early_stop=self.settings.bot.arena.early_stop, + get_enemies=self.api.find_arena_enemies, + friendly_clans=self.settings.bot.arena.friendly_clans, + reduce_probabilities=reduce_normal_arena, + ).solve() + solution.log() # Attack! - result, quests = self.api.attack_arena(enemy.user.id, self.get_hero_ids(attackers)) + result, quests = self.api.attack_arena( + solution.enemy.user_id, + self.get_hero_ids(solution.attackers[0]), + ) # Collect results. result.log() logger.info('Current place: {}.', result.state.arena_place) self.farm_quests(quests) + # FIXME: refactor. def attack_grand_arena(self): """ Совершает бой на гранд арене. @@ -371,27 +377,25 @@ def attack_grand_arena(self): model, heroes = self.check_arena(constants.GRAND_SIZE) # Pick an enemy and select attackers. - enemy, attacker_teams, probability = arena.GrandArena( + solution = ArenaSolver( model=model, user_clan_id=self.user.clan_id, heroes=heroes, - get_enemies_page=self.api.find_grand_enemies, - settings=self.settings, - ).select_enemy() - - # Debugging. - logger.info(f'Enemy name: "{enemy.user.name}".') - logger.info(f'Enemy place: {enemy.place}.') - logger.info(f'Probability: {100.0 * probability:.1f}%.') - for i, (attackers, defenders) in enumerate(zip(attacker_teams, enemy.heroes), start=1): - logger.info(f'Battle #{i}.') - log_heroes('Attackers:', attackers) - log_heroes('Defenders:', defenders) + max_iterations=self.settings.bot.arena.grand_max_pages, + n_keep_solutions=self.settings.bot.arena.grand_keep_solutions, + n_generate_solutions=self.settings.bot.arena.grand_generate_solutions, + n_generations_count_down=self.settings.bot.arena.grand_generations_count_down, + early_stop=self.settings.bot.arena.early_stop, + get_enemies=self.api.find_grand_enemies, + friendly_clans=self.settings.bot.arena.friendly_clans, + reduce_probabilities=reduce_grand_arena, + ).solve() + solution.log() # Attack! - result, quests = self.api.attack_grand(enemy.user.id, [ - [attacker.id for attacker in attackers] - for attackers in attacker_teams + result, quests = self.api.attack_grand(solution.enemy.user_id, [ + self.get_hero_ids(attackers) + for attackers in solution.attackers ]) # Collect results. diff --git a/bestmobabot/dataclasses_.py b/bestmobabot/dataclasses_.py index 0064b8d..6dde153 100644 --- a/bestmobabot/dataclasses_.py +++ b/bestmobabot/dataclasses_.py @@ -128,7 +128,8 @@ def __lt__(self, other: Any) -> Any: return NotImplemented def __str__(self): - return f'{"⭐" * self.star} {resources.hero_name(self.id)} ({self.level}) {COLORS.get(self.color, self.color)}' + stars = '🌟' if self.star > 5 else '⭐' * self.star + return f'{stars} {resources.hero_name(self.id)} ({self.level}) {COLORS.get(self.color, self.color)}' Team = List[Hero] @@ -177,6 +178,9 @@ class Config: def is_from_clans(self, clans: Iterable[str]) -> bool: return (self.clan_id and self.clan_id in clans) or (self.clan_title and self.clan_title in clans) + def __str__(self) -> str: + return f'«{self.name}» from «{self.clan_title}»' + class BaseArenaEnemy(BaseModel): user_id: str @@ -193,6 +197,9 @@ class Config: def teams(self) -> List[Team]: raise NotImplementedError() + def __str__(self) -> str: + return f'{self.user} at place {self.place}' + class ArenaEnemy(BaseArenaEnemy): heroes: List[Hero] diff --git a/bestmobabot/itertools_.py b/bestmobabot/itertools_.py index 7aa527a..c1686e2 100644 --- a/bestmobabot/itertools_.py +++ b/bestmobabot/itertools_.py @@ -1,15 +1,26 @@ +""" +More `itertools`. +""" + from __future__ import annotations -from typing import Iterable, Iterator, TypeVar +import math +from typing import Any, Iterable, Iterator, List, TypeVar + +from loguru import logger T = TypeVar('T') -class CoolDown(Iterator[T]): - def __init__(self, iterable: Iterable[T], interval: int): +class CountDown(Iterator[T]): + """ + Limits iteration number unless `reset` is called. + """ + + def __init__(self, iterable: Iterable[T], n_iterations: int): self.iterator = iter(iterable) - self.interval = interval - self.iterations_left = interval + self.n_iterations = n_iterations + self.iterations_left = n_iterations self.is_fresh = True def __iter__(self) -> Iterator[T]: @@ -22,6 +33,46 @@ def __next__(self) -> T: self.is_fresh = False return next(self.iterator) + def __int__(self) -> int: + return self.iterations_left + def reset(self): - self.iterations_left = self.interval + self.iterations_left = self.n_iterations self.is_fresh = True + + +def secretary_max(items: Iterable[T], n: int, early_stop: Any = None) -> T: + """ + Select best item while lazily iterating over the items. + https://en.wikipedia.org/wiki/Secretary_problem#Deriving_the_optimal_policy + """ + r = int(n / math.e) + 1 + + max_item = None + + for i, item in enumerate(items, start=1): + # Check early stop condition. + if early_stop is not None and item >= early_stop: + logger.trace('Early stop.') + return item + # If it's the last item, just return it. + if i == n: + logger.trace('Last item.') + return item + # Otherwise, check if the item is better than previous ones. + if max_item is None or item >= max_item: + if i >= r: + # Better than (r - 1) previous ones, return it. + logger.trace('Better than {} previous ones.', r - 1) + return item + # Otherwise, update the best key. + max_item = item + + raise RuntimeError('unreachable code') + + +def slices(n: int, length: int) -> List[slice]: + """ + Make a list of continuous slices. E.g. `[slice(0, 2), slice(2, 4), slice(4, 6)]`. + """ + return [slice(i * length, (i + 1) * length) for i in range(n)] diff --git a/bestmobabot/settings.py b/bestmobabot/settings.py index 305e0c8..6608ce0 100644 --- a/bestmobabot/settings.py +++ b/bestmobabot/settings.py @@ -20,17 +20,24 @@ class TelegramSettings(BaseModel): class ArenaSettings(BaseModel): - skip_clans: Set[str] = [] # names or clan IDs which must be skipped during enemy search - early_stop: confloat(ge=0.0, le=1.0) = 0.95 # minimal win probability to stop enemy search + # Shared settings. schedule_offset: timedelta = timedelta() # arena task schedule offset - teams_limit: conint(ge=1) = 20000 # number of the most powerful teams tested - grand_generations_cool_down: conint(ge=1) = 25 # maximum number of GA iterations without any improvement - max_pages: conint(ge=1) = 15 # maximal number of pages during enemy search - max_grand_pages: conint(ge=1) = 15 # maximal number of pages during grand enemy search - randomize_grand_defenders: bool = False + friendly_clans: Set[str] = [] # names or clan IDs which must be skipped during enemy search + early_stop: confloat(ge=0.0, le=1.0) = 0.95 # minimal win probability to stop enemy search + last_battles: conint(ge=1) = constants.MODEL_N_LAST_BATTLES # use last N battles for training + + # Normal arena. + normal_max_pages: conint(ge=1) = 15 # maximal number of pages during normal enemy search + normal_generations_count_down: conint(ge=1) = 5 + normal_generate_solutions: conint(ge=1) = 750 + normal_keep_solutions: conint(ge=1) = 250 + + # Grand arena. + grand_max_pages: conint(ge=1) = 15 # maximal number of pages during grand enemy search + grand_generations_count_down: conint(ge=1) = 25 # maximum number of GA iterations without any improvement grand_generate_solutions: conint(ge=1) = 1250 grand_keep_solutions: conint(ge=1) = 250 - last_battles: conint(ge=1) = constants.MODEL_N_LAST_BATTLES # use last N battles for training + randomize_grand_defenders: bool = False # noinspection PyMethodParameters diff --git a/setup.cfg b/setup.cfg index 3ff2d7a..d95f29c 100644 --- a/setup.cfg +++ b/setup.cfg @@ -3,6 +3,7 @@ line_length = 120 multi_line_output = 3 include_trailing_comma = true known_third_party = ujson +known_standard_library = dataclasses [flake8] max_line_length = 120 diff --git a/setup.py b/setup.py index 72088ee..56a9677 100644 --- a/setup.py +++ b/setup.py @@ -4,7 +4,7 @@ setuptools.setup( name='bestmobabot', - version='2.4b2', + version='2.4b3', author='Pavel Perestoronin', author_email='eigenein@gmail.com', description='Hero Wars game bot 🏆', diff --git a/tests/test_arena.py b/tests/test_itertools.py similarity index 86% rename from tests/test_arena.py rename to tests/test_itertools.py index f29a7ad..59cd45a 100644 --- a/tests/test_arena.py +++ b/tests/test_itertools.py @@ -1,8 +1,18 @@ +from __future__ import annotations + from typing import List import pytest -from bestmobabot.arena import ranges, secretary_max +from bestmobabot.itertools_ import secretary_max, slices + + +@pytest.mark.parametrize('n, length, expected', [ + (1, 5, [slice(0, 5)]), + (3, 5, [slice(0, 5), slice(5, 10), slice(10, 15)]), +]) +def test_slices(n: int, length: int, expected: List[slice]): + assert slices(n, length) == expected @pytest.mark.parametrize('items, expected, next_', [ @@ -44,10 +54,3 @@ def test_secretary_max_early_stop(items, early_stop, expected, next_): iterator = iter(items) assert secretary_max(iterator, len(items), early_stop=early_stop) == expected assert next(iterator, None) == next_ # test the iterator position - - -@pytest.mark.parametrize('n_ranges, range_size, expected', [ - (3, 5, [range(0, 5), range(5, 10), range(10, 15)]), -]) -def test_ranges(n_ranges: int, range_size: int, expected: List[range]): - assert list(ranges(n_ranges, range_size)) == expected