diff --git a/examples/play_interactive.py b/examples/play_interactive.py index c7cde3b..e3490d5 100644 --- a/examples/play_interactive.py +++ b/examples/play_interactive.py @@ -3,7 +3,7 @@ import cv2 import gymnasium as gym -from tetris_gymnasium.envs.tetris import ACTIONS +from tetris_gymnasium.envs import Tetris if __name__ == "__main__": # Create an instance of Tetris @@ -30,19 +30,19 @@ key = cv2.waitKey(1) if key == ord("a"): - action = ACTIONS["move_left"] + action = tetris_game.unwrapped.actions.move_left elif key == ord("d"): - action = ACTIONS["move_right"] + action = tetris_game.unwrapped.actions.move_right elif key == ord("s"): - action = ACTIONS["move_down"] + action = tetris_game.unwrapped.actions.move_down elif key == ord("w"): - action = ACTIONS["rotate_counterclockwise"] + action = tetris_game.unwrapped.actions.rotate_counterclockwise elif key == ord("e"): - action = ACTIONS["rotate_clockwise"] + action = tetris_game.unwrapped.actions.rotate_clockwise elif key == ord(" "): - action = ACTIONS["hard_drop"] + action = tetris_game.unwrapped.actions.hard_drop elif key == ord("q"): - action = ACTIONS["swap"] + action = tetris_game.unwrapped.actions.swap elif key == ord("r"): tetris_game.reset(seed=42) break diff --git a/tetris_gymnasium/components/tetromino.py b/tetris_gymnasium/components/tetromino.py new file mode 100644 index 0000000..c24962c --- /dev/null +++ b/tetris_gymnasium/components/tetromino.py @@ -0,0 +1,25 @@ +"""Data structures for Tetris.""" +from dataclasses import dataclass + +import numpy as np + + +@dataclass +class Pixel: + """A single pixel of a Tetris game. + + A pixel can be part of a tetromino or part of the game board (empty, bedrock). + """ + + id: int + color_rgb: list + + +@dataclass +class Tetromino(Pixel): + """A Tetris piece. + + A tetromino is a geometric shape composed of multiple pixels. + """ + + matrix: np.ndarray diff --git a/tetris_gymnasium/components/tetromino_holder.py b/tetris_gymnasium/components/tetromino_holder.py index d2598d5..ff4419b 100644 --- a/tetris_gymnasium/components/tetromino_holder.py +++ b/tetris_gymnasium/components/tetromino_holder.py @@ -2,7 +2,7 @@ from collections import deque from typing import Optional -import numpy as np +from tetris_gymnasium.components.tetromino import Tetromino class TetrominoHolder: @@ -17,15 +17,15 @@ def __init__(self, size=1): self.size = size self.queue = deque(maxlen=size) - def _get_tetromino(self) -> Optional[np.ndarray]: + def _get_tetromino(self) -> Optional[Tetromino]: """Get the next tetromino from the holder.""" return self.queue.popleft() - def _store_tetromino(self, tetromino: np.ndarray): + def _store_tetromino(self, tetromino: Tetromino): """Store a tetromino in the holder.""" self.queue.append(tetromino) - def swap(self, tetromino: np.ndarray) -> Optional[np.ndarray]: + def swap(self, tetromino: Tetromino) -> Optional[Tetromino]: """Swap the given tetromino with the one in the holder. This implementation uses a queue to store the tetrominoes. Tetromioes are only returned once the queue is full. diff --git a/tetris_gymnasium/envs/tetris.py b/tetris_gymnasium/envs/tetris.py index acd88c6..e26185e 100644 --- a/tetris_gymnasium/envs/tetris.py +++ b/tetris_gymnasium/envs/tetris.py @@ -1,4 +1,5 @@ """Tetris environment for Gymnasium.""" +from dataclasses import fields from typing import Any, List import gymnasium as gym @@ -6,26 +7,12 @@ from gymnasium.core import ActType, RenderFrame from gymnasium.spaces import Box, Discrete +from tetris_gymnasium.components.tetromino import Pixel, Tetromino from tetris_gymnasium.components.tetromino_holder import TetrominoHolder from tetris_gymnasium.components.tetromino_queue import TetrominoQueue from tetris_gymnasium.components.tetromino_randomizer import BagRandomizer, Randomizer -from tetris_gymnasium.util.tetrominoes import STANDARD_COLORS, STANDARD_TETROMINOES - -REWARDS = { - "alife": 0.001, - "clear_line": 1, - "game_over": -2, -} - -ACTIONS = { - "move_left": 0, - "move_right": 1, - "move_down": 2, - "rotate_clockwise": 3, - "rotate_counterclockwise": 4, - "hard_drop": 5, - "swap": 6, -} +from tetris_gymnasium.mappings.actions import ActionsMapping +from tetris_gymnasium.mappings.rewards import RewardsMapping class Tetris(gym.Env): @@ -37,15 +24,50 @@ class Tetris(gym.Env): "render_human": True, } + BASE_PIXELS = [Pixel(0, [0, 0, 0]), Pixel(1, [128, 128, 128])] # Empty # Bedrock + + TETROMINOES = [ + Tetromino( + 0, + [0, 240, 240], + np.array( + [[0, 0, 0, 0], [1, 1, 1, 1], [0, 0, 0, 0], [0, 0, 0, 0]], dtype=np.uint8 + ), + ), # I + Tetromino(1, [240, 240, 0], np.array([[1, 1], [1, 1]], dtype=np.uint8)), # O + Tetromino( + 2, + [160, 0, 240], + np.array([[0, 1, 0], [1, 1, 1], [0, 0, 0]], dtype=np.uint8), + ), # T + Tetromino( + 3, [0, 240, 0], np.array([[0, 1, 1], [1, 1, 0], [0, 0, 0]], dtype=np.uint8) + ), # S + Tetromino( + 4, [240, 0, 0], np.array([[1, 1, 0], [0, 1, 1], [0, 0, 0]], dtype=np.uint8) + ), # Z + Tetromino( + 5, [0, 0, 240], np.array([[1, 0, 0], [1, 1, 1], [0, 0, 0]], dtype=np.uint8) + ), # J + Tetromino( + 6, + [240, 160, 0], + np.array([[0, 0, 1], [1, 1, 1], [0, 0, 0]], dtype=np.uint8), + ), # L + ] + def __init__( self, render_mode=None, width=10, height=20, - tetrominoes=STANDARD_TETROMINOES, randomizer: Randomizer = BagRandomizer, holder: TetrominoHolder = TetrominoHolder, queue: TetrominoQueue = TetrominoQueue, + actions_mapping=ActionsMapping(), + rewards_mapping=RewardsMapping(), + base_pixels=None, + tetrominoes=None, ): """Creates a new Tetris environment. @@ -53,33 +75,45 @@ def __init__( render_mode: The mode to use for rendering. If None, no rendering will be done. width: The width of the game board. height: The height of the game board. - tetrominoes: A list of numpy arrays representing the tetrominoes to use. randomizer: The randomizer to use for selecting tetrominoes. holder: The holder to use for storing tetrominoes. queue: The queue to use for storing tetrominoes. + actions_mapping: The mapping for the actions that the agent can take. + rewards_mapping: The mapping for the rewards that the agent can receive. + base_pixels: The base pixels to use for the environment (e.g. empty, bedrock). + tetrominoes: The tetrominoes to use for the environment. """ # Dimensions self.height: int = height self.width: int = width - # Utilities - self.queue = queue(randomizer(len(tetrominoes)), 5) - self.holder = holder() + # Base Pixels + if base_pixels is None: + self.base_pixels = self.BASE_PIXELS # Tetrominoes - self.tetrominoes: List[np.ndarray] = tetrominoes - self.active_tetromino: np.ndarray = None - # The index is used to keep track of the active tetromino without comparing arrays - self.active_tetromino_i: int = 0 + if tetrominoes is None: + tetrominoes = self.TETROMINOES + self.tetrominoes: List[Tetromino] = self.offset_tetromino_id( + tetrominoes, len(self.base_pixels) + ) + self.active_tetromino: Tetromino = None + + # Pixels + self.pixels: List[Pixel] = self.parse_pixels(self.tetrominoes) # Padding - self.padding: int = max(max(t.shape) for t in tetrominoes) + self.padding: int = max(max(t.matrix.shape) for t in self.tetrominoes) self.width_padded: int = self.width + 2 * self.padding self.height_padded: int = self.height + 2 * self.padding # Board self.board = self.create_board() + # Utilities + self.queue = queue(randomizer(len(tetrominoes)), 5) + self.holder = holder() + # Position self.x: int = 0 self.y: int = 0 @@ -89,20 +123,34 @@ def __init__( { "board": Box( low=0, - high=len(self.tetrominoes), + high=len(self.pixels), shape=(self.height, self.width), dtype=np.float32, ), - "holder": gym.spaces.MultiBinary( - (self.holder.size, len(self.tetrominoes)) + "holder": Box( + low=0, + high=len(self.pixels), + shape=(self.holder.size,), + dtype=np.uint8, ), - "queue": gym.spaces.MultiBinary( - (self.queue.size, len(self.tetrominoes)) + "queue": gym.spaces.Box( + low=0, + high=len(self.pixels), + shape=(self.queue.size,), + dtype=np.uint8, ), } ) - self.action_space = Discrete(len(ACTIONS)) - self.reward_range = (min(REWARDS.values()), max(REWARDS.values())) + + # Mappings for rewards & actions (readability in code) + self.actions = actions_mapping + self.rewards = rewards_mapping + + self.action_space = Discrete(len(fields(self.actions))) + self.reward_range = ( + min(vars(self.rewards).values()), + max(vars(self.rewards).values()), + ) assert render_mode is None or render_mode in self.metadata["render_modes"] self.render_mode = render_mode @@ -126,39 +174,39 @@ def step(self, action: ActType) -> "tuple[dict, float, bool, bool, dict]": game_over = False truncated = False # Tetris without levels will never truncate - reward = REWARDS["alife"] + reward = self.rewards.alife - if action == ACTIONS["move_left"]: + if action == self.actions.move_left: if not self.collision(self.active_tetromino, self.x - 1, self.y): self.x -= 1 - elif action == ACTIONS["move_right"]: + elif action == self.actions.move_right: if not self.collision(self.active_tetromino, self.x + 1, self.y): self.x += 1 - elif action == ACTIONS["move_down"]: + elif action == self.actions.move_down: if not self.collision(self.active_tetromino, self.x, self.y + 1): self.y += 1 - elif action == ACTIONS["rotate_clockwise"]: + elif action == self.actions.rotate_clockwise: if not self.collision( self.rotate(self.active_tetromino, True), self.x, self.y ): self.active_tetromino = self.rotate(self.active_tetromino, True) - elif action == ACTIONS["rotate_counterclockwise"]: + elif action == self.actions.rotate_counterclockwise: if not self.collision( self.rotate(self.active_tetromino, False), self.x, self.y ): self.active_tetromino = self.rotate(self.active_tetromino, False) - elif action == ACTIONS["swap"]: + elif action == self.actions.swap: if not self.has_swapped: # Swap the active tetromino with the one in the holder - self.active_tetromino_i = self.holder.swap(self.active_tetromino_i) + self.active_tetromino = self.holder.swap(self.active_tetromino) self.has_swapped = True - if self.active_tetromino_i is None: + if self.active_tetromino is None: # If the holder is empty, spawn the next tetromino # No need for collision check, as the holder is only empty at the start self.spawn_tetromino() else: self.reset_tetromino_position() - elif action == ACTIONS["hard_drop"]: + elif action == self.actions.hard_drop: # 1. Drop the tetromino and lock it in place self.drop_active_tetromino() self.place_active_tetromino() @@ -167,7 +215,7 @@ def step(self, action: ActType) -> "tuple[dict, float, bool, bool, dict]": # 2. Spawn the next tetromino and check if the game continues game_over = not self.spawn_tetromino() if game_over: - reward = REWARDS["game_over"] + reward = self.rewards.game_over # 3. Reset the swap flag (agent can swap once per tetromino) self.has_swapped = False @@ -176,7 +224,7 @@ def step(self, action: ActType) -> "tuple[dict, float, bool, bool, dict]": def reset( self, *, seed: "int | None" = None, options: "dict[str, Any] | None" = None - ) -> "tuple[np.ndarray, dict[str, Any]]": + ) -> "tuple[dict[str, Any], dict[str, Any]]": """Resets the state of the environment. Args: @@ -194,7 +242,7 @@ def reset( self.queue.reset(seed=seed) # Get the next tetromino and spawn it - self.set_active_tetromino(self.queue.get_next_tetromino()) + self.active_tetromino = self.tetrominoes[self.queue.get_next_tetromino()] self.reset_tetromino_position() # Holder @@ -222,18 +270,19 @@ def render(self) -> "RenderFrame | list[RenderFrame] | None": (self.board.shape[0], self.board.shape[1], 3), dtype=np.uint8 ) # Render the board - rgb[...] = STANDARD_COLORS[self.board] + colors = np.array(list(p.color_rgb for p in self.pixels), dtype=np.uint8) + rgb[...] = colors[self.board] # Render active tetromino (because it's not on self.board) if self.active_tetromino is not None: # Expand to 3 Dimensions for RGB active_tetromino_rgb = np.repeat( - self.active_tetromino[:, :, np.newaxis], 3, axis=2 + self.active_tetromino.matrix[:, :, np.newaxis], 3, axis=2 ) - active_tetromino_rgb[...] = STANDARD_COLORS[self.active_tetromino] + active_tetromino_rgb[...] = colors[self.active_tetromino.matrix] # Apply by masking - slices = self.get_active_tetromino_slices( + slices = self.get_tetromino_slices( self.active_tetromino, self.x, self.y ) rgb[slices] += active_tetromino_rgb @@ -249,18 +298,16 @@ def spawn_tetromino(self) -> bool: Returns True if the tetromino can be successfully spawned, False otherwise. """ - self.set_active_tetromino(self.queue.get_next_tetromino()) + self.active_tetromino = self.tetrominoes[self.queue.get_next_tetromino()] self.reset_tetromino_position() return not self.collision(self.active_tetromino, self.x, self.y) def place_active_tetromino(self): """Locks the active tetromino in place on the board.""" - slices = self.get_active_tetromino_slices(self.active_tetromino, self.x, self.y) - self.board[slices] += self.active_tetromino + self.board = self.project_active_tetromino() self.active_tetromino = None - self.active_tetromino_i = 0 - def collision(self, tetromino: np.ndarray, x: int, y: int) -> bool: + def collision(self, tetromino: Tetromino, x: int, y: int) -> bool: """Check if the tetromino collides with the board at the given position. A collision is detected if the tetromino overlaps with any non-zero cell on the board. @@ -275,13 +322,13 @@ def collision(self, tetromino: np.ndarray, x: int, y: int) -> bool: True if the tetromino collides with the board at the given position, False otherwise. """ # Extract the part of the board that the tetromino would occupy. - slices = self.get_active_tetromino_slices(tetromino, x, y) + slices = self.get_tetromino_slices(tetromino, x, y) board_subsection = self.board[slices] # Check collision using numpy element-wise operations. - return np.any(board_subsection[tetromino > 0] > 0) + return np.any(board_subsection[tetromino.matrix > 0] > 0) - def rotate(self, tetromino: np.ndarray, clockwise=True) -> np.ndarray: + def rotate(self, tetromino: Tetromino, clockwise=True) -> Tetromino: """Rotate a tetromino by 90 degrees. Args: @@ -291,7 +338,11 @@ def rotate(self, tetromino: np.ndarray, clockwise=True) -> np.ndarray: Returns: The rotated tetromino. """ - return np.rot90(tetromino, k=(1 if clockwise else -1)) + return Tetromino( + tetromino.id, + tetromino.color_rgb, + np.rot90(tetromino.matrix, k=(1 if clockwise else -1)), + ) def drop_active_tetromino(self): """Drop the active tetromino to the lowest possible position on the board.""" @@ -344,29 +395,23 @@ def crop_padding(self, matrix: np.ndarray) -> np.ndarray: """ return matrix[0 : -self.padding, self.padding : -self.padding] - def get_active_tetromino_slices( - self, tetromino: np.ndarray, x: int, y: int + def get_tetromino_slices( + self, tetromino: Tetromino, x: int, y: int ) -> "tuple(slice, slice)": """Get the slices of the active tetromino on the board. Returns: The slices of the active tetromino on the board. """ - tetromino_height, tetromino_width = tetromino.shape + tetromino_height, tetromino_width = tetromino.matrix.shape return tuple((slice(y, y + tetromino_height), slice(x, x + tetromino_width))) def reset_tetromino_position(self) -> None: """Reset the x and y position of the active tetromino to the center of the board.""" - self.x, self.y = self.width_padded // 2 - self.active_tetromino.shape[0] // 2, 0 - - def set_active_tetromino(self, index: int) -> None: - """Set the active tetromino to the given tetromino and position. - - Args: - index: The index of the tetromino to set as the active tetromino. - """ - self.active_tetromino_i = index - self.active_tetromino = self.tetrominoes[index] + self.x, self.y = ( + self.width_padded // 2 - self.active_tetromino.matrix.shape[0] // 2, + 0, + ) def project_active_tetromino(self): """Project the active tetromino on the board. @@ -375,8 +420,8 @@ def project_active_tetromino(self): on the board to render it. """ projection = self.board.copy() - slices = self.get_active_tetromino_slices(self.active_tetromino, self.x, self.y) - projection[slices] += self.active_tetromino + slices = self.get_tetromino_slices(self.active_tetromino, self.x, self.y) + projection[slices] += self.active_tetromino.matrix return projection def _get_obs(self) -> "dict[str, Any]": @@ -384,19 +429,23 @@ def _get_obs(self) -> "dict[str, Any]": # Include the active tetromino on the board for the observation. board_obs = self.project_active_tetromino() board_obs = self.crop_padding(board_obs).astype(np.float32) - - holder_obs = np.array(self.holder.get_tetrominoes()) - holder_one_hot_obs = np.zeros((self.holder.size, len(self.tetrominoes))) - if len(holder_obs) > 0: - holder_one_hot_obs += np.eye(len(self.tetrominoes))[holder_obs] - - queue_one_hot_obs = np.array(self.queue.get_queue()) - queue_one_hot_obs = np.eye(len(self.tetrominoes))[queue_one_hot_obs] + # Holder + holder_obs = np.array( + [t.id for t in self.holder.get_tetrominoes()], dtype=np.uint8 + ) + holder_obs = np.pad( + holder_obs, + (0, self.holder.size - len(holder_obs)), + mode="constant", + constant_values=0, + ) + # Queue + queue_obs = np.array(self.queue.get_queue(), dtype=np.uint8) return { "board": board_obs, - "queue": queue_one_hot_obs, - "holder": holder_one_hot_obs, + "holder": holder_obs, + "queue": queue_obs, } def _get_info(self) -> dict: @@ -412,7 +461,7 @@ def score(self, rows_cleared) -> int: Returns The score for the given number of lines cleared. """ - return rows_cleared * REWARDS["clear_line"] + return rows_cleared * self.rewards.clear_line def create_board(self) -> np.ndarray: """Create a new board with the given dimensions.""" @@ -424,3 +473,41 @@ def create_board(self) -> np.ndarray: constant_values=1, ) return board + + def parse_pixels(self, tetrominoes: "List[Tetromino]") -> "List[Pixel]": + """Creates a list of pixels from the base pixels and the tetrominoes. + + Pixels are used to represent the board and the tetrominoes in the environment. + + Args: + tetrominoes: The tetrominoes to add to the base pixels. + + Returns: + The list of pixels for the environment. + """ + return self.base_pixels + [ + Pixel(t.id + len(self.base_pixels), t.color_rgb) for t in tetrominoes + ] + + def offset_tetromino_id( + self, tetrominoes: "List[Tetromino]", offset: int + ) -> "List[Tetromino]": + """In order to make the tetrominos distinguishable, each tetromino should have a unique value. + + The tetrominoes already possess a unique ID, but the matrix should also be updated to reflect this. + Additionally, the tetrominoes should be offset by a certain value to avoid conflicts with the board. + The board already contains a number of pixels which are not part of the tetrominoes (empty cells, bedrock). + So, the tetrominoes should be offset by the number of pixels in the board that are not tetrominoes. + + Args: + tetrominoes: The tetrominoes to preprocess. + offset: The offset to apply to the tetrominoes. This is usually the number of non-tetromino pixels in the board. + + Returns: + The preprocessed tetrominoes (= id and matrix values offset by number of non-tetromino pixels). + """ + for i in range(len(tetrominoes)): + tetrominoes[i].id += offset + tetrominoes[i].matrix = tetrominoes[i].matrix * (i + offset) + + return tetrominoes diff --git a/tetris_gymnasium/mappings/__init__.py b/tetris_gymnasium/mappings/__init__.py new file mode 100644 index 0000000..1f58afa --- /dev/null +++ b/tetris_gymnasium/mappings/__init__.py @@ -0,0 +1 @@ +"""This module contains mappings for the environment, making the code more readable instead of using magic numbers.""" diff --git a/tetris_gymnasium/mappings/actions.py b/tetris_gymnasium/mappings/actions.py new file mode 100644 index 0000000..77a9936 --- /dev/null +++ b/tetris_gymnasium/mappings/actions.py @@ -0,0 +1,18 @@ +"""This module contains mappings for actions that the agent can take.""" +from dataclasses import dataclass + + +@dataclass +class ActionsMapping: + """The actions that the agent can take. + + The mapping can be extended to include additional rewards. + """ + + move_left: int = 0 + move_right: int = 1 + move_down: int = 2 + rotate_clockwise: int = 3 + rotate_counterclockwise: int = 4 + hard_drop: int = 5 + swap: int = 6 diff --git a/tetris_gymnasium/mappings/rewards.py b/tetris_gymnasium/mappings/rewards.py new file mode 100644 index 0000000..d899bac --- /dev/null +++ b/tetris_gymnasium/mappings/rewards.py @@ -0,0 +1,14 @@ +"""This module contains the mapping for the rewards that the agent can receive.""" +from dataclasses import dataclass + + +@dataclass +class RewardsMapping: + """Mapping for the rewards that the agent can receive. + + The mapping can be extended to include additional rewards. + """ + + alife: float = 0.001 + clear_line: float = 1 + game_over: float = -2 diff --git a/tetris_gymnasium/util/__init__.py b/tetris_gymnasium/util/__init__.py deleted file mode 100644 index 85cdd24..0000000 --- a/tetris_gymnasium/util/__init__.py +++ /dev/null @@ -1 +0,0 @@ -"""Utility functions for the Tetris Gymnasium package.""" diff --git a/tetris_gymnasium/util/tetrominoes.py b/tetris_gymnasium/util/tetrominoes.py deleted file mode 100644 index 54d5b6c..0000000 --- a/tetris_gymnasium/util/tetrominoes.py +++ /dev/null @@ -1,50 +0,0 @@ -"""Temporary file to store standard tetrominoes and their colors.""" -from typing import List - -import numpy as np - - -def pre_process(tetrominos: List[np.ndarray]): - """In order to make the tetrominos distinguishable, each tetromino should have a unique value. - - The values start at 2, as 0 is the background and 1 is the bedrock. - - Args: - tetrominos: The masks of the tetrominos. - - Returns: - The preprocessed tetrominos. - """ - for i in range(len(tetrominos)): - tetrominos[i] = tetrominos[i] * (i + 2) - return tetrominos - - -STANDARD_TETROMINOES_MASKS = [ - np.array( - [[0, 0, 0, 0], [1, 1, 1, 1], [0, 0, 0, 0], [0, 0, 0, 0]], dtype=np.uint8 - ), # I - np.array([[1, 1], [1, 1]], dtype=np.uint8), # O - np.array([[0, 1, 0], [1, 1, 1], [0, 0, 0]], dtype=np.uint8), # T - np.array([[0, 1, 1], [1, 1, 0], [0, 0, 0]], dtype=np.uint8), # S - np.array([[1, 1, 0], [0, 1, 1], [0, 0, 0]], dtype=np.uint8), # Z - np.array([[1, 0, 0], [1, 1, 1], [0, 0, 0]], dtype=np.uint8), # L - np.array([[0, 0, 1], [1, 1, 1], [0, 0, 0]], dtype=np.uint8), # J -] - -STANDARD_TETROMINOES = pre_process(STANDARD_TETROMINOES_MASKS) - -STANDARD_COLORS = np.array( - [ - [0, 0, 0], # Background color: Black - [128, 128, 128], # Bedrock: Grey - [0, 240, 240], # I: Cyan - [240, 240, 0], # O: Yellow - [160, 0, 240], # T: Purple - [0, 240, 0], # S: Green - [240, 0, 0], # Z: Red - [0, 0, 240], # J: Blue - [240, 160, 0], # L: Orange - ], - dtype=np.uint8, -)