import asyncio import json import random import re import string import uuid from collections import deque from collections.abc import Iterator from datetime import datetime, timedelta, timezone from enum import Enum from itertools import product from pathlib import Path from typing import Any from libs.slow import ( JSONAPI, App, Request, redirect, render, ) from libs.slow.responses import HTTPResponse, JSONResponse class Coord: x: int y: int def __init__(self, x: int, y: int): self.x = x self.y = y def __iter__(self) -> Iterator[int]: yield self.x yield self.y def copy(self): return Coord(self.x, self.y) def __str__(self): return str(coord_to_pos(self)) def __eq__(self, other): if isinstance(other, Coord): return self.x == other.x and self.y == other.y return False def __repr__(self): return f"Coord({self.x}, {self.y})" app = App() class Color(Enum): WHITE = 0 BLACK = 1 class State(Enum): NOT_FINISHED = -1 TIE = 0 WHITE_WIN = 1 BLACK_WIN = 2 class Piece(Enum): NONE = "" EMPTY = "E" # WHITE WHITE_PAWN = "P" WHITE_ROOK = "R" WHITE_BISHOP = "B" WHITE_KING = "K" WHITE_QUEEN = "Q" WHITE_CASTLE = "C" # BLACK BLACK_PAWN = "p" BLACK_ROOK = "r" BLACK_BISHOP = "b" BLACK_KING = "k" BLACK_QUEEN = "q" BLACK_CASTLE = "c" def coord_to_pos(xy: Coord) -> None | str: if xy.x < 0 or xy.x > 7 or xy.y < 0 or xy.y > 7: return None return "abcdefgh"[xy.x] + "12345678"[xy.y] def coord_to_pos_safe(xy: Coord) -> str: return "abcdefgh"[xy.x] + "12345678"[xy.y] def xy_to_pos(x: int, y: int) -> None | str: if x < 0 or x > 7 or y < 0 or y > 7: return None return "abcdefgh"[x] + "12345678"[y] def xy_to_pos_safe(x: int, y: int) -> str: return "abcdefgh"[x] + "12345678"[y] def pos_to_coord(pos: str) -> Coord: return Coord("abcdefgh".index(pos[0]), "12345678".index(pos[1])) class Board: grid: list[list[Piece]] def __init__(self): self.grid: list[list[Piece]] = [[Piece.EMPTY] * 8 for _ in range(8)] self.grid[0] = [ Piece.WHITE_CASTLE, Piece.WHITE_ROOK, Piece.WHITE_BISHOP, Piece.WHITE_QUEEN, Piece.WHITE_KING, Piece.WHITE_BISHOP, Piece.WHITE_ROOK, Piece.WHITE_CASTLE, ] self.grid[1] = [Piece.WHITE_PAWN] * 8 self.grid[6] = [Piece.BLACK_PAWN] * 8 self.grid[7] = [ Piece.BLACK_CASTLE, Piece.BLACK_ROOK, Piece.BLACK_BISHOP, Piece.BLACK_QUEEN, Piece.BLACK_KING, Piece.BLACK_BISHOP, Piece.BLACK_ROOK, Piece.BLACK_CASTLE, ] def index(self, pos: str) -> Piece: let = "abcdefgh".index(pos[0]) num = "12345678".index(pos[1]) return self.grid[num][let] def index_coord(self, coord: Coord) -> Piece: if coord.x < 0 or coord.x > 7 or coord.y < 0 or coord.y > 7: return Piece.NONE return self.grid[coord.y][coord.x] def index_xy(self, x: int, y: int) -> Piece: if x < 0 or x > 7 or y < 0 or y > 7: return Piece.NONE return self.grid[y][x] def copy(self) -> "Board": board = Board() board.grid = [[Piece(p.value) for p in r] for r in self.grid] return board def serialize(self) -> dict: return {"grid": [[p.value for p in r] for r in self.grid]} def __str__(self): return "\n".join([" ".join([p.value for p in self.grid[i]]) for i in range(8)]) class Room: board: Board turn: Color players: list[str] last_move: datetime game_start: datetime | None state: State def __init__(self): self.turn = Color.WHITE self.board = Board() self.players = [] self.last_move = datetime.now(timezone.utc) self.game_start = None self.state = State.NOT_FINISHED def start(self): self.last_move = datetime.now(timezone.utc) self.game_start = datetime.now(timezone.utc) def add_player(self) -> None | tuple[str, Color]: np: int = len(self.players) if np >= 2: return None elif np == 1: uid = str(uuid.uuid4()) self.players.append(uid) self.start() return uid, Color.BLACK else: uid = str(uuid.uuid4()) self.players.append(uid) return uid, Color.WHITE rooms: dict[str, Room] = {} favicon = Path("favicon.ico").read_bytes() favicon_update = Path("favicon_update.ico").read_bytes() @app.GET("/favicon.ico") async def favicon_s(request): return HTTPResponse(favicon, content_type="image/x-icon") @app.GET("/favicon_update.ico") async def favicon_update_s(request): return HTTPResponse(favicon_update, content_type="image/x-icon") letters = string.ascii_lowercase + string.digits def room_key(): key: str = "".join(random.choices(letters, k=4)) while key in rooms.keys(): key: str = "".join(random.choices(letters, k=4)) return key @app.POST("/create_room") async def new_room(request): if len(rooms) == len(letters) ** 4: return HTTPResponse("Out of service", status=501) key = room_key() rooms[key] = Room() return JSONResponse( { "id": key, }, ) quick_queue: deque[str] = deque() quick_map: dict[str, Room] = {} quick_last_map: dict[str, datetime] = {} lock = asyncio.Lock() @app.POST("/quick") async def quick_match(request: Request): global quick_queue, quick_map, quick_last_map data = parse(request.body) async with lock: if ( data and len(data) == 1 and "queue_id" in data and data["queue_id"] in quick_queue ): first = None second = None position: int = 0 # UPDATE LOGIC now = datetime.now(timezone.utc) for q, t in quick_last_map.items(): if t - now >= timedelta(seconds=3): quick_queue.remove(q) del quick_map[q] del quick_last_map[q] while position < len(quick_queue): if quick_queue[position] not in quick_map: if not first: first = quick_queue[position] else: second = quick_queue[position] break position += 1 if first and second: room = Room() k = room_key() rooms[k] = room quick_map[first] = k quick_map[second] = k qid = data["queue_id"] quick_last_map[qid] = datetime.now(timezone.utc) if qid in quick_map: return JSONResponse({"room_id": quick_map[qid]}) else: return JSONResponse({}) # Client handles empty as continue to wait elif ( data and len(data) == 2 and "queue_id" in data and data["queue_id"] in quick_queue and "leave" in data ): quick_queue.remove(data["queue_id"]) else: qid = str(uuid.uuid4()) quick_queue.append(qid) quick_last_map[qid] = datetime.now(timezone.utc) return JSONResponse({"queue_id": qid}) @app.GET("/") async def home(request): return render("home.html") @app.GET("/") async def game(request, id): if not re.match(r"[a-z0-9]{4}", id): return redirect("/") if id not in rooms: return redirect("/") return render("game.html") @app.POST("/join/") async def join(request, room_id): if room_id not in rooms: return JSONResponse({"code": "NOGO", "error": "Room not found."}, 404) room: Room = rooms[room_id] player = room.add_player() if player: return JSONResponse( { "code": "JOIN", "id": player[0], "color": player[1].value, "turn": room.turn.value, "board": room.board.serialize(), "ready": len(room.players) == 2, "state": room.state.value, "start_time": ( room.game_start or datetime.now(timezone.utc) ).isoformat(), }, ) else: return JSONResponse( { "code": "FULL", "error": "Room Full", "board": room.board.serialize(), "ready": len(room.players) == 2, "turn": room.turn.value, "state": room.state.value, "start_time": ( room.game_start or datetime.now(timezone.utc) ).isoformat(), }, ) def parse(body: str | bytes) -> dict[str, Any] | None: try: return json.loads(body) except Exception: return None def get_piece_moves(piece_kind, board: Board, is_white, src: str) -> list[Coord]: valids: list[Coord] = [] if piece_kind == "p": dir = 1 if is_white else -1 x, y = pos_to_coord(src) if board.index_xy(x, y + dir) == Piece.EMPTY: valids.append(Coord(x=x, y=y + dir)) if (y == 1 or y == 6) and board.index_xy(x, y + 2 * dir) == Piece.EMPTY: valids.append(Coord(x=x, y=y + 2 * dir)) if ( board.index_xy(x + 1, y + dir) not in [Piece.EMPTY, Piece.NONE] and board.index_xy(x + 1, y + dir).value.isupper() != is_white ): valids.append(Coord(x=x + 1, y=y + dir)) if ( board.index_xy(x - 1, y + dir) not in [Piece.EMPTY, Piece.NONE] and board.index_xy(x - 1, y + dir).value.isupper() != is_white ): valids.append(Coord(x=x - 1, y=y + dir)) elif piece_kind == "b": x, y = pos_to_coord(src) comb = product([1, -1], repeat=2) for dir in comb: target: Coord = Coord(x=x + dir[0], y=y + dir[1]) while ( board.index_coord(target) == Piece.EMPTY and board.index_coord(target) != Piece.NONE ): valids.append(target.copy()) target.x += dir[0] target.y += dir[1] if ( p := board.index_coord(target) ) != Piece.NONE and p.value.isupper() != is_white: valids.append(target.copy()) elif piece_kind == "q": x, y = pos_to_coord(src) comb = product([1, -1, 0], repeat=2) for dir in comb: if dir[0] ** 2 + dir[1] ** 2 == 0: continue # x=0 y=0 cannot move => invalid target: Coord = Coord(x=x + dir[0], y=y + dir[1]) while ( board.index_coord(target) == Piece.EMPTY and board.index_coord(target) != Piece.NONE ): valids.append(target.copy()) target.x += dir[0] target.y += dir[1] if ( p := board.index_coord(target) ) != Piece.NONE and p.value.isupper() != is_white: valids.append(target.copy()) elif piece_kind == "c": x, y = pos_to_coord(src) comb = [(-1, 0), (1, 0), (0, -1), (0, 1)] for dir in comb: target: Coord = Coord(x=x + dir[0], y=y + dir[1]) while ( board.index_coord(target) == Piece.EMPTY and board.index_coord(target) != Piece.NONE ): valids.append(target.copy()) target.x += dir[0] target.y += dir[1] if ( p := board.index_coord(target) ) != Piece.NONE and p.value.isupper() != is_white: valids.append(target.copy()) elif piece_kind == "k": x, y = pos_to_coord(src) comb = product([-1, 0, 1], repeat=2) for dir in comb: if dir[0] ** 2 + dir[1] ** 2 == 0: continue # x=0 y=0 cannot move => invalid target: Coord = Coord(x=x + dir[0], y=y + dir[1]) if (p := board.index_coord(target)) != Piece.NONE and ( p.value.isupper() != is_white or p.value == "E" ): valids.append(target.copy()) elif piece_kind == "r": x, y = pos_to_coord(src) moves = [(2, 1), (2, -1), (-2, 1), (-2, -1), (1, 2), (-1, 2), (1, -2), (-1, -2)] for m in moves: target: Coord = Coord(x=x + m[0], y=y + m[1]) if (p := board.index_coord(target)) != Piece.NONE and ( p.value.isupper() != is_white or p.value == "E" ): valids.append(target.copy()) v_temp = [a for a in valids] valids.clear() for a in v_temp: if board.index_coord(a).value.lower != "k": valids.append(a) return valids def generate_valid_moves( piece_kind: str, board: Board, is_white: bool, src: str ) -> list[Coord]: possible_moves: list[Coord] = get_piece_moves(piece_kind, board, is_white, src) valids: list[Coord] = [] king: Coord for i in range(8): for j in range(8): p = board.grid[i][j] if ( p in [Piece.BLACK_KING, Piece.WHITE_KING] and p.value.isupper() == is_white ): king = Coord(j, i) break for m in possible_moves: fake_board = board.copy() fake_board.grid[m.y][m.x] = fake_board.index(src) sx, sy = pos_to_coord(src) fake_board.grid[sy][sx] = Piece.EMPTY king_safe = True for i in range(8): for j in range(8): p = board.index_xy(j, i) if p != Piece.EMPTY and p.value.isupper() != is_white: new_enemy_moves = get_piece_moves( p.value.lower(), fake_board, not is_white, xy_to_pos_safe(j, i), ) if king in new_enemy_moves: king_safe = False break if not king_safe: break if king_safe: valids.append(m) return valids @app.POST("/move/") @JSONAPI async def move(request: Request, room_id): req = parse(request.body) if not req: return 400, {} if "uid" not in req or not isinstance(req["uid"], str): return 400, {} if "from" not in req or not isinstance(req["from"], str): return 400, {} if "to" not in req or not isinstance(req["to"], str): return 400, {} if len(req) > 3: return 400, {} if room_id not in rooms: return 400, {"code": "NOEX", "error": "Room does not exist"} room = rooms[room_id] if len(room.players) != 2: return 400, {"code": "PRES", "error": "Game has not been started"} if room.state != State.NOT_FINISHED: return 400, {"code": "FINI", "error": "Game has not finished."} board = room.board uid = req["uid"] src = req["from"].lower() dst = req["to"].lower() color = Color.WHITE if room.players[0] == uid else Color.BLACK turn = room.turn == color if not turn: return 400, {"code": "OTHR", "error": "Not your turn."} if ( len(src) != 2 or len(dst) != 2 or src[0] not in "abcdefgh" or src[1] not in "12345678" or dst[0] not in "abcdefgh" or dst[1] not in "12345678" or board.index(src) == Piece.EMPTY ): return 400, {"code": "LMOV", "error": "Bad Move 1"} srcp = board.index(src) dstp = board.index(dst) piece_kind = srcp.value.lower() is_white = srcp.value.isupper() if (is_white and color == Color.BLACK) or (not is_white and color == Color.WHITE): return 400, {"code": "NOTU", "error": "Not your piece"} if dstp != Piece.EMPTY and ( (dstp.value.isupper() and color == Color.WHITE) or (dstp.value.islower() and color == Color.BLACK) ): return 400, {"code": "HTME", "error": "Cannot move to your own piece"} valid_moves = generate_valid_moves(piece_kind, board, is_white, src) if (c := pos_to_coord(dst)) in valid_moves: board.grid[c.y][c.x] = srcp sx, sy = pos_to_coord(src) board.grid[sy][sx] = Piece.EMPTY if (c.y == 0 or c.y == 7) and piece_kind == "p": board.grid[c.y][c.x] = Piece.WHITE_QUEEN if is_white else Piece.BLACK_QUEEN room.turn = Color.BLACK if color == Color.WHITE else Color.WHITE room.last_move = datetime.now(timezone.utc) opp_checkmate = True for i in range(8): for j in range(8): P = board.index_xy(i, j) if P != "E" and P.value.isupper() != is_white: moves = generate_valid_moves( P.value.lower(), board, not is_white, xy_to_pos_safe(i, j) ) if len(moves) > 0: opp_checkmate = False break if not opp_checkmate: break if opp_checkmate: room.state = State.WHITE_WIN if is_white else State.BLACK_WIN return { "code": "MOVD", "color": color.value, "turn": room.turn.value, "state": room.state.value, "board": board.serialize(), } else: return 400, {"code": "LMOV", "error": "Bad Move 2"} @app.GET("/poll/") @JSONAPI async def poll(request, room_id): for key in rooms: room = rooms[key] if (datetime.now(timezone.utc) - room.last_move) >= timedelta(hours=24): del rooms[key] if ( room.game_start and (datetime.now(timezone.utc) - room.game_start) >= timedelta(minutes=30) and room.state == State.NOT_FINISHED ): room.state = State.TIE if room_id not in rooms: return 400, {"code": "NOEX", "error": "Room does not exist"} room = rooms[room_id] return { "code": "STAT", "turn": room.turn.value, "board": room.board.serialize(), "ready": len(room.players) == 2, "state": room.state.value, "start_time": (room.game_start or datetime.now(timezone.utc)).isoformat(), } @app.POST("/moves/") @JSONAPI async def generate_moves(request: Request, room_id): req = parse(request.body) if not req: return 400, {} if "from" not in req or not isinstance(req["from"], str): return 400, {} if len(req) > 1: return 400, {} if room_id not in rooms: return 400, {"code": "NOEX", "error": "Room does not exist"} room = rooms[room_id] if len(room.players) != 2: return 400, {"code": "PRES", "error": "Game has not been started"} board = room.board src = req["from"].lower() srcp = board.index(src) color = Color.WHITE if srcp.value.isupper() else Color.BLACK if ( len(src) != 2 or src[0] not in "abcdefgh" or src[1] not in "12345678" or srcp == Piece.EMPTY ): return 400, {"code": "LMOV", "error": "Bad Move"} piece_kind = srcp.value.lower() is_white = srcp.value.isupper() if (is_white and color == Color.BLACK) or (not is_white and color == Color.WHITE): return 400, {"code": "NOTU", "error": "Not your piece"} valid_moves = generate_valid_moves(piece_kind, board, is_white, src) return { "moves": ",".join([coord_to_pos_safe(c) for c in valid_moves]), } def sanitize_filename(filename: str, base: Path = Path.cwd().resolve()) -> Path: p = (base / filename).resolve() if not p.is_relative_to(base): raise ValueError("Filename leaves current directory") return p @app.GET("/static/") async def static(request, fn): try: path = sanitize_filename(fn, Path("static/").resolve()) return HTTPResponse(path.read_bytes(), content_type="application/octet-stream") except Exception: return HTTPResponse("404 File Not Found", status=404) if __name__ == "__main__": asyncio.run(app.run(port=8989))