Created
October 11, 2024 13:11
-
-
Save simin75simin/c3835df2441e8cea8878130036c24afc to your computer and use it in GitHub Desktop.
simple 2048 agent with 80 percent winrate by simulation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# to use, pip install gymnasium-2048 first then replace the second file in the package | |
from typing import List | |
import gymnasium as gym | |
from tqdm import trange | |
import numpy as np | |
np.random.seed(0) | |
import pandas as pd | |
from copy import deepcopy | |
from multiprocessing import Pool, cpu_count | |
# env=gym.make("gymnasium_2048:gymnasium_2048/TwentyFortyEight-v0", size=4, max_pow=11) | |
# print('观测空间 = {}'.format(env.observation_space)) | |
# print('动作空间 = {}'.format(env.action_space)) | |
# print('观测范围 = {} ~ {}'.format(env.observation_space.low, | |
# env.observation_space.high)) | |
# print('动作数 = {}'.format(env.action_space.n)) | |
class SimAgent: | |
# n_sim=12, depth=9 .胜率 = 46.67% | |
# n_sim=12, depth=3 胜率 = 33.33% | |
# n_sim=32, depth=3 胜率 = 46.67% | |
# n_sim=32, depth=9 胜率 = 80.00% | |
def __init__(self, env, n_sim=32, sim_depth=9): | |
self.env=env | |
self.sim_per_action=n_sim // env.action_space.n | |
self.sim_depth=sim_depth | |
def decide(self, observation): # 决策 | |
action_scores=[] | |
for action in range(self.env.action_space.n): | |
score=0 | |
for _ in range(self.sim_per_action): | |
# monte carlo sim | |
env=deepcopy(self.env) | |
_, reward, _, _, _ = env.step(action) | |
for _ in range(self.sim_depth): | |
_, r, _, _, _ = env.step(np.random.choice(env.action_space.n)) | |
reward+=r | |
score+=reward | |
action_scores.append(score) | |
print('.',end='') | |
return np.argmax(action_scores) | |
def learn(self, *args): # 学习 | |
pass | |
# agent = SimAgent(env) | |
def rand_subset(board: List[int], num: int=10) -> List[int]: | |
return [np.random.choice(board, np.random.randint(1,17), replace=False) \ | |
for _ in range(num)] | |
# def play_montecarlo(env, agent, render=False, train=False): | |
def play_montecarlo(i): | |
env = gym.make("gymnasium_2048:gymnasium_2048/TwentyFortyEight-v0", size=4, max_pow=12) | |
agent = SimAgent(env) | |
won=False | |
episode_reward = 0. # 记录回合总奖励,初始化为0 | |
observation, _ = env.reset() # 重置游戏环境,开始新回合 | |
while True: # 不断循环,直到回合结束 | |
# if render: # 判断是否显示 | |
# # print(observation) | |
# env.render() # 显示图形界面,图形界面可以用 env.close() 语句关闭 | |
action = agent.decide(observation) | |
next_observation, reward, terminated, truncated, info = env.step(action) # 执行动作 | |
episode_reward += reward # 收集回合奖励 | |
if info['won']: | |
won=True | |
break | |
# if train: # 判断是否训练智能体 | |
# agent.learn(observation, action, reward, terminated, truncated) # 学习 | |
if terminated or truncated: # 回合结束,跳出循环 | |
break | |
observation = next_observation | |
return won # 返回回合总奖励 | |
# episode_reward = play_montecarlo(env, agent, render=True) | |
# print('回合奖励 = {}'.format(episode_reward)) | |
# env.close() # 此语句可关闭图形界面 | |
# episode_rewards = [play_montecarlo(env, agent) for _ in trange(100)] | |
# print('平均回合奖励 = {}'.format(np.mean(episode_rewards))) | |
if __name__ == '__main__': | |
n_process=cpu_count()-1 | |
n_runs=n_process | |
print(f'使用 {n_process} 个进程进行 {n_runs} 次模拟') | |
with Pool(n_process) as p: | |
wins = p.map(play_montecarlo, list(range(n_runs))) | |
winrate=np.mean(wins)*100 | |
print(f'胜率 = {winrate:.2f}%') |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from typing import Any | |
import gymnasium as gym | |
import numpy as np | |
import pygame | |
from gymnasium import spaces | |
from gymnasium.core import ActType, ObsType, RenderFrame, SupportsFloat | |
WINDOW_WIDTH = 400 | |
WINDOW_HEIGHT = 400 | |
WINDOW_SCORE_HEIGHT = 60 | |
WINDOW_BG_COLOR = (250, 248, 238) | |
BOARD_PADDING = 20 | |
BOARD_BG_COLOR = (186, 172, 160) | |
TILE_PADDING = 5 | |
TILE_COLOR_MAP = { | |
0: (204, 193, 178), | |
2: (238, 228, 218), | |
4: (237, 224, 200), | |
8: (242, 177, 121), | |
16: (245, 149, 99), | |
32: (246, 124, 95), | |
64: (246, 94, 59), | |
128: (237, 207, 114), | |
256: (237, 204, 97), | |
512: (237, 200, 80), | |
1024: (237, 197, 63), | |
2048: (237, 194, 46), | |
} | |
TILE_COLOR_DEFAULT = (60, 58, 50) | |
BORDER_RADIUS = 4 | |
FONT_NAME = "Comic Sans MS" | |
FONT_DARK_COLOR = (119, 110, 101) | |
FONT_LIGHT_COLOR = (249, 246, 242) | |
FONT_SCORE_COLOR = (0, 0, 0) | |
FONT_SIZE = 40 | |
class TwentyFortyEightEnv(gym.Env): | |
metadata = {"render_modes": ["human", "rgb_array"], "render_fps": 4} | |
def __init__( | |
self, | |
render_mode: str | None = None, | |
size: int = 4, | |
max_pow: int = 16, | |
) -> None: | |
assert size >= 2, "size must be greater of equal than 2" | |
self.observation_space = spaces.Box( | |
low=0, | |
high=1, | |
shape=(size, size, max_pow), | |
dtype=np.uint16, | |
) | |
# 0: up, 1: right, 2: down, 3: left | |
self.action_space = spaces.Discrete(4) | |
assert render_mode is None or render_mode in self.metadata["render_modes"] | |
self.render_mode = render_mode | |
self.window = None | |
self.clock = None | |
self.font = None | |
def _get_obs(self) -> ObsType: | |
observation = np.zeros( | |
self.observation_space.shape, | |
dtype=self.observation_space.dtype, | |
) | |
for row in range(self.board.shape[0]): | |
for col in range(self.board.shape[1]): | |
value = self.board[row, col] | |
observation[row, col, value] = 1 | |
return observation | |
def _get_info(self) -> dict[str, Any]: | |
return { | |
"board": self.board, | |
"step_score": self.step_score, | |
"total_score": self.total_score, | |
"max": np.max(self.board), | |
"won": 11<=np.max(self.board), | |
"is_legal": self.is_legal, | |
"illegal_count": self.illegal_count, | |
} | |
def _spawn_tile(self) -> None: | |
rows, cols = np.where(self.board == 0) | |
index = self.np_random.choice(len(rows)) | |
value = 1 if self.np_random.random() > 0.1 else 2 | |
self.board[rows[index], cols[index]] = value | |
def reset( | |
self, | |
*, | |
seed: int | None = None, | |
options: dict[str, Any] | None = None, | |
) -> tuple[ObsType, dict[str, Any]]: | |
super().reset(seed=seed) | |
self.board = np.zeros( | |
(self.observation_space.shape[0], self.observation_space.shape[1]), | |
dtype=np.uint16, | |
) | |
self.step_score = 0 | |
self.total_score = 0 | |
self.is_legal = True | |
self.illegal_count = 0 | |
self._spawn_tile() | |
self._spawn_tile() | |
observation = self._get_obs() | |
info = self._get_info() | |
# if self.render_mode == "human": | |
# self.render_frame() | |
return observation, info | |
@staticmethod | |
def _transpose(board: np.ndarray) -> np.ndarray: | |
"""Transpose a matrix.""" | |
return np.transpose(board) | |
@staticmethod | |
def _reverse(board: np.ndarray) -> np.ndarray: | |
"""Reverse a matrix.""" | |
return np.flipud(board) | |
@staticmethod | |
def _cover_up(board: np.ndarray) -> np.ndarray: | |
"""Cover the most antecedent zeros with non-zero number.""" | |
cover_board = np.zeros_like(board) | |
for col in range(board.shape[1]): | |
up = 0 | |
for row in range(board.shape[0]): | |
if board[row, col] != 0: | |
cover_board[up, col] = board[row, col] | |
up += 1 | |
return cover_board | |
@staticmethod | |
def _merge(board: np.ndarray) -> tuple[np.ndarray, int]: | |
"""Verify if a merge is possible and execute.""" | |
score = 0 | |
for row in range(1, board.shape[0]): | |
for col in range(board.shape[1]): | |
if board[row, col] != 0 and board[row, col] == board[row - 1, col]: | |
score += 2 ** (board[row, col] + 1) | |
board[row - 1, col] = board[row - 1, col] + 1 | |
board[row, col] = 0 | |
return board, score | |
@classmethod | |
def _up(cls, board: np.ndarray) -> tuple[np.ndarray, int]: | |
next_board = cls._cover_up(board) | |
next_board, score = cls._merge(next_board) | |
next_board = cls._cover_up(next_board) | |
return next_board, score | |
@classmethod | |
def _right(cls, board: np.ndarray) -> tuple[np.ndarray, int]: | |
next_board = cls._reverse(cls._transpose(board)) | |
next_board = cls._cover_up(next_board) | |
next_board, score = cls._merge(next_board) | |
next_board = cls._cover_up(next_board) | |
next_board = cls._transpose(cls._reverse(next_board)) | |
return next_board, score | |
@classmethod | |
def _down(cls, board: np.ndarray) -> tuple[np.ndarray, int]: | |
next_board = cls._reverse(board) | |
next_board = cls._cover_up(next_board) | |
next_board, score = cls._merge(next_board) | |
next_board = cls._cover_up(next_board) | |
next_board = cls._reverse(next_board) | |
return next_board, score | |
@classmethod | |
def _left(cls, board: np.ndarray) -> tuple[np.ndarray, int]: | |
next_board = cls._transpose(board) | |
next_board = cls._cover_up(next_board) | |
next_board, score = cls._merge(next_board) | |
next_board = cls._cover_up(next_board) | |
next_board = cls._transpose(next_board) | |
return next_board, score | |
@classmethod | |
def apply_action( | |
cls, | |
board: np.ndarray, | |
action: ActType, | |
) -> tuple[np.ndarray, int, bool]: | |
"""Apply an action to the board without spawning a new tile.""" | |
action_func = (cls._up, cls._right, cls._down, cls._left) | |
next_board, score = action_func[action](board) | |
is_legal = not np.array_equal(board, next_board) | |
return next_board, score, is_legal | |
@staticmethod | |
def is_terminated(board: np.ndarray) -> bool: | |
"""Check if the game is terminated or not.""" | |
# Verify zero entries | |
if (board == 0).any(): | |
return False | |
if (board >= 11).any(): | |
return True | |
# Verify possible merges | |
for row in range(1, board.shape[0]): | |
for col in range(1, board.shape[1]): | |
if ( | |
board[row, col] == board[row, col - 1] | |
or board[row, col] == board[row - 1, col] | |
): | |
return False | |
# Verify possible merges in first column | |
for row in range(1, board.shape[0]): | |
if board[row, 0] == board[row - 1, 0]: | |
return False | |
# Verify possible merges in first row | |
for col in range(1, board.shape[1]): | |
if board[0, col] == board[0, col - 1]: | |
return False | |
return True | |
def step( | |
self, | |
action: ActType, | |
) -> tuple[ObsType, SupportsFloat, bool, bool, dict[str, Any]]: | |
assert self.action_space.contains( | |
action | |
), f"{action!r} ({type(action)}) invalid" | |
next_board, self.step_score, self.is_legal = self.apply_action( | |
board=self.board, | |
action=action, | |
) | |
self.total_score += self.step_score | |
if self.is_legal: | |
self.board = next_board | |
self._spawn_tile() | |
else: | |
self.illegal_count += 1 | |
observation = self._get_obs() | |
reward = self.step_score | |
terminated = self.is_terminated(board=self.board) | |
info = self._get_info() | |
if self.render_mode == "human": | |
self.render_frame() | |
return observation, reward, terminated, False, info | |
def render(self) -> RenderFrame | list[RenderFrame] | None: | |
if self.render_mode == "rgb_array": | |
return self.render_frame() | |
def _get_value(self, row: int, col: int) -> int: | |
return 2 ** self.board[row, col] if self.board[row, col] > 0 else 0 | |
@staticmethod | |
def _get_background_color(value: int) -> tuple[int, int, int]: | |
return TILE_COLOR_MAP.get(value, TILE_COLOR_DEFAULT) | |
@staticmethod | |
def _get_text_color(value: int) -> tuple[int, int, int]: | |
return FONT_DARK_COLOR if value < 8 else FONT_LIGHT_COLOR | |
def _draw_board(self, canvas: pygame.Surface) -> None: | |
board_left = BOARD_PADDING | |
board_right = BOARD_PADDING | |
board_width = WINDOW_WIDTH - 2 * BOARD_PADDING | |
board_height = WINDOW_HEIGHT - 2 * BOARD_PADDING | |
tile_width = (board_width - 2 * TILE_PADDING) // self.board.shape[1] | |
tile_height = (board_height - 2 * TILE_PADDING) // self.board.shape[0] | |
pygame.draw.rect( | |
surface=canvas, | |
color=BOARD_BG_COLOR, | |
rect=(board_left, board_right, board_width, board_height), | |
border_radius=BORDER_RADIUS, | |
) | |
for row in range(self.board.shape[0]): | |
for col in range(self.board.shape[1]): | |
value = self._get_value(row=row, col=col) | |
rect = pygame.Rect( | |
board_left + col * tile_width + 2 * TILE_PADDING, | |
board_right + row * tile_height + 2 * TILE_PADDING, | |
tile_width - 2 * TILE_PADDING, | |
tile_height - 2 * TILE_PADDING, | |
) | |
pygame.draw.rect( | |
surface=canvas, | |
color=self._get_background_color(value=value), | |
rect=rect, | |
border_radius=BORDER_RADIUS, | |
) | |
if value == 0: | |
continue | |
text_surface = self.font.render( | |
str(value), | |
True, | |
self._get_text_color(value=value), | |
) | |
text_rect = text_surface.get_rect(center=rect.center) | |
canvas.blit(source=text_surface, dest=text_rect) | |
def _draw_score(self, canvas: pygame.Surface) -> None: | |
board_width = WINDOW_WIDTH - 2 * BOARD_PADDING | |
score_surface = self.font.render( | |
f"Score: {self.total_score}", | |
True, | |
FONT_SCORE_COLOR, | |
) | |
score_height = self.font.get_height() | |
score_rect = pygame.Rect( | |
BOARD_PADDING, | |
WINDOW_HEIGHT + (WINDOW_SCORE_HEIGHT - score_height) // 2, | |
board_width, | |
score_height, | |
) | |
canvas.blit(source=score_surface, dest=score_rect) | |
def render_frame(self, board, total_score) -> RenderFrame | list[RenderFrame]: | |
self.board, self.total_score = board, total_score | |
if self.window is None and self.render_mode == "human": | |
pygame.init() | |
pygame.display.init() | |
self.window = pygame.display.set_mode( | |
(WINDOW_WIDTH, WINDOW_HEIGHT + WINDOW_SCORE_HEIGHT) | |
) | |
pygame.display.set_caption("2048") | |
if self.clock is None and self.render_mode == "human": | |
self.clock = pygame.time.Clock() | |
if self.font is None: | |
pygame.font.init() | |
self.font = pygame.font.SysFont(FONT_NAME, FONT_SIZE) | |
canvas = pygame.Surface((WINDOW_WIDTH, WINDOW_HEIGHT + WINDOW_SCORE_HEIGHT)) | |
canvas.fill(WINDOW_BG_COLOR) | |
self._draw_board(canvas=canvas) | |
self._draw_score(canvas=canvas) | |
if self.render_mode == "human": | |
self.window.blit(canvas, canvas.get_rect()) | |
pygame.event.pump() | |
pygame.display.update() | |
self.clock.tick(self.metadata["render_fps"]) | |
else: # rgb_array | |
return np.transpose( | |
np.array(pygame.surfarray.pixels3d(canvas)), axes=(1, 0, 2) | |
) | |
def close(self) -> None: | |
if self.window is not None: | |
pygame.display.quit() | |
pygame.quit() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment