diff --git a/README.md b/README.md index ed54b62..607c606 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,3 @@ -# shatranj +# shartanj +بازی شرطنج diff --git a/app.py b/app.py new file mode 100644 index 0000000..fc675b9 --- /dev/null +++ b/app.py @@ -0,0 +1,280 @@ +import asyncio +import json +import uuid +from datetime import datetime +from enum import Enum +from itertools import combinations +from typing import TypedDict + +from slow import JSONAPI, App, JSONResponse, render + + +class Coord(TypedDict): + x: int + y: int + + +app = App() + + +class Color(Enum): + WHITE = 0 + BLACK = 1 + + +class Piece(Enum): + NONE = "" + EMPTY = "E" + + # WHITE + WHITE_PAWN = "P" + WHITE_ROOK = "R" + WHITE_BISHOP = "B" + WHITE_KING = "K" + WHITE_QUEEN = "Q" + WHITE_CASTLE = "C" + + # BLACK + BLACK_PAWN = "p" + BLACK_ROOK = "r" + BLACK_BISHOP = "b" + BLACK_KING = "k" + BLACK_QUEEN = "q" + BLACK_CASTLE = "c" + + +def xy_to_coord(xy: Coord) -> None | str: + if xy.x < 0 or xy.x > 7 or xy.y < 0 or xy.y > 7: + return None + return "abcdefgh"[xy.x] + "12345678"[xy.y] + + +def coord_to_xy(pos: str) -> tuple[int, int]: + return ("abcdefgh".inedx(pos[0]), "12345678".inedx(pos[1])) + + +class Board: + grid: list[list[Piece]] + + def __init__(self): + self.grid = [[Piece.EMPTY] * 8] * 8 + self.grid[0] = [ + Piece.WHITE_CASTLE, + Piece.WHITE_ROOK, + Piece.WHITE_BISHOP, + Piece.WHITE_KING, + Piece.WHITE_QUEEN, + Piece.WHITE_BISHOP, + Piece.WHITE_ROOK, + Piece.WHITE_CASTLE, + ] + self.grid[1] = [Piece.WHITE_PAWN] * 8 + + self.grid[6] = [Piece.BLACK_PAWN] * 8 + self.grid[7] = [ + Piece.BLACK_CASTLE, + Piece.BLACK_ROOK, + Piece.BLACK_BISHOP, + Piece.BLACK_KING, + Piece.BLACK_QUEEN, + Piece.BLACK_BISHOP, + Piece.BLACK_ROOK, + Piece.BLACK_CASTLE, + ] + + def index(self, pos: str) -> Piece: + let = "abcdefgh".index(pos[0]) + num = "12345678".index(pos[1]) + return self.grid[num][let] + + def index_xy(self, xy: Coord) -> Piece: + if xy.x < 0 or xy.x > 7 or xy.y < 0 or xy.y > 7: + return Piece.NONE + return self.grid[xy.y][xy.x] + + def serialize(self) -> dict: + return {"grid": self.grid} + + +class Room: + board: Board = Board() + turn: Color = Color.WHITE + players: list[uuid.UUID] = [] + last_move: datetime + + def start(self): + self.last_move = datetime.now() + + def add_player(self) -> None | tuple[uuid.UUID, Color]: + np: int = len(self.players) + if np >= 2: + return None + elif np == 1: + uid = uuid.uuid4() + self.players.append(uid) + return uid, Color.BLACK + else: + uid = uuid.uuid4() + self.players.append(uid) + return uid, Color.WHITE + + +rooms: dict[str, Room] = {} + + +@app.GET("/") +async def home(request): + return render("index.html") + + +@app.POST("/join/") +async def join(request, room_id): + room: Room = rooms.get(room_id, Room()) + player = room.add_player() + if player: + return JSONResponse( + { + "code": "JOIN", + "id": player[0], + "color": player[1], + "board": room.board.serialize(), + } + ) + else: + return JSONResponse( + {"code": "FULL", "error": "Room Full", "board": room.board.serialize()} + ) + + +def parse(body: str) -> dict | None: + try: + return json.loads(body) + except Exception: + return None + + +@app.POST("/move/") +@JSONAPI +async def move(request, room_id): + req = parse(request.body) + if not req: + return 400, {} + if "uid" not in req or not isinstance(req["uid"], str): + return 400, {} + if "from" not in req or not isinstance(req["from"], str): + return 400, {} + if "to" not in req or not isinstance(req["to"], str): + return 400, {} + if room_id not in rooms: + return 400, {"code": "NOEX", "error": "Room does not exist"} + room = rooms[room_id] + if len(room.players) != 2: + return 400, {"code": "PRES", "error": "Game has not been started"} + board = room.board + uid = req["uid"] + src = req["from"].lower() + dst = req["to"].lower() + color = Color.WHITE if room.players[0] == uid else Color.BLACK + turn = room.turn == color + if not turn: + return 400, {"code": "OTHR", "error": "Not your turn."} + if ( + len(src) != 2 + or len(dst) != 2 + or src[0] not in "abcdefgh" + or src[1] not in "12345678" + or dst[0] not in "abcdefgh" + or dst[1] not in "12345678" + or board.index(src) != Piece.EMPTY + ): + return 400, {"code": "LMOV", "error": "Bad Move"} + srcp = board.index(src) + dstp = board.index(dst) + piece_kind = srcp.value.lower() + is_white = srcp.value.isupper() + if (is_white and color == Color.BLACK) or (not is_white and color == Color.WHITE): + return 400, {"code": "NOTU", "error": "Not your piece"} + if dstp != Piece.EMPTY and ( + (dstp.value.isupper() and color == Color.WHITE) + or (dstp.value.islower() and color == Color.BLACK) + ): + return 400, {"code": "HTME", "error": "Cannot move to your own piece"} + valids: list[Coord] = [] + if piece_kind == "p": + dir = 1 if is_white else -1 + x, y = coord_to_xy(src) + if board.index_xy(x, y + dir) == Piece.EMPTY: + valids.append({"x": x, "y": y + dir}) + if (y == 1 or y == 6) and board.index_xy(x, y + 2 * dir) == Piece.EMPTY: + valids.append({"x": x, "y": y + 2 * dir}) + if ( + board.index_xy(x + 1, y + dir) + not in [Piece.EMPTY, Piece.NONE, Piece.BLACK_KING, Piece.WHITE_KING] + and board.index_xy(x + 1, y + dir).value.upper() != is_white + ): + valids.append({"x": x + 1, "y": y + 1}) + if ( + board.index_xy(x - 1, y + dir) + not in [Piece.EMPTY, Piece.NONE, Piece.BLACK_KING, Piece.WHITE_KING] + and board.index_xy(x - 1, y + dir).value.upper() != is_white + ): + valids.append({"x": x - 1, "y": y + 1}) + if piece_kind == "b": + x, y = coord_to_xy(src) + comb = combinations([1, -1], 2) + for dir in comb: + scl = 1 + target: Coord = {"x": x + dir[0] * scl, "y": y + dir[1] * scl} + while board.index_xy(target) not in [Piece.EMPTY, Piece.NONE]: + valids.append(target.copy()) + scl += 1 + target: Coord = {"x": x + dir[0] * scl, "y": y + dir[1] * scl} + if ( + p := board.index_xy(target) + ) != Piece.NONE and p.value.isupper() != is_white: + valids.append(target.copy()) + if piece_kind == "q": + x, y = coord_to_xy(src) + comb = combinations([1, -1, 0], 2) + for dir in comb: + if dir[0] ** 2 + dir[1] ** 2 == 0: + continue # x=0 y=0 cannot move => invalid + scl = 1 + target: Coord = {"x": x + dir[0] * scl, "y": y + dir[1] * scl} + while board.index_xy(target) not in [Piece.EMPTY, Piece.NONE]: + valids.append(target.copy()) + scl += 1 + target: Coord = {"x": x + dir[0] * scl, "y": y + dir[1] * scl} + if ( + p := board.index_xy(target) + ) != Piece.NONE and p.value.isupper() != is_white: + valids.append(target.copy()) + if piece_kind == "c": + x, y = coord_to_xy(src) + comb = [(-1, 0), (1, 0), (0, -1), (0, 1)] + for dir in comb: + scl = 1 + target: Coord = {"x": x + dir[0] * scl, "y": y + dir[1] * scl} + while board.index_xy(target) not in [Piece.EMPTY, Piece.NONE]: + valids.append(target.copy()) + scl += 1 + target: Coord = {"x": x + dir[0] * scl, "y": y + dir[1] * scl} + if ( + p := board.index_xy(target) + ) != Piece.NONE and p.value.isupper() != is_white: + valids.append(target.copy()) + if piece_kind == "k": + x, y = coord_to_xy(src) + comb = combinations([-1, 0, 1], 2) + for dir in comb: + if dir[0] ** 2 + dir[1] ** 2 == 0: + continue # x=0 y=0 cannot move => invalid + target: Coord = {"x": x + dir[0], "y": y + dir[1]} + if ( + p := board.index_xy(target) + ) != Piece.NONE and p.value.isupper() != is_white: + valids.append(target.copy()) + + +if __name__ == "__main__": + asyncio.run(app.run(port=8080)) diff --git a/index.html b/index.html new file mode 100644 index 0000000..da3d258 --- /dev/null +++ b/index.html @@ -0,0 +1,79 @@ + + + + + + Document + + + +
+

shatranj

+
+
+
+ +
+
+ + + \ No newline at end of file diff --git a/slow/__init__.py b/slow/__init__.py new file mode 100644 index 0000000..f8b4e01 --- /dev/null +++ b/slow/__init__.py @@ -0,0 +1,3 @@ +from .slow import JSONAPI, App, HTTPResponse, JSONResponse, render + +__all__ = ["JSONAPI", "App", "HTTPResponse", "JSONResponse", "render"] diff --git a/slow/slow.py b/slow/slow.py new file mode 100644 index 0000000..52ce09c --- /dev/null +++ b/slow/slow.py @@ -0,0 +1,176 @@ +import asyncio +import json +import re +from pathlib import Path + +PR = re.compile(r"\<([a-zA-Z_]+)\>") + + +async def _default_404_route(request): + return "HTTP/1.1 404 Not Found\r\nContent-Type: text/html\r\n\r\n404 Not Found contact admin" + + +async def _default_405_route(request): + return "HTTP/1.1 405 Method Not Allowed\r\nContent-Type: text/html\r\n\r\n405 Method Not Allowed" + + +class App: + def __init__(self): + self.routes: dict[re.Pattern[str], dict[str, callable]] = {} + self.error_routes: dict[int, callable] = { + 404: _default_404_route, + 405: _default_405_route, + } + + def _pattern_to_regex(self, temp) -> re.Pattern[str]: + re_temp = temp + iter = PR.finditer(temp) + for m in iter: + name = m[1] + re_temp = re.sub(m[0], r"(?P<" + name + r">[a-zA-Z0-9_-]+)", re_temp) + return re.compile(re_temp) + + def _serve(self, path, method, func): + if method not in ["GET", "POST", "PUT", "DELETE"]: + raise RuntimeError(f'Invalid method "{method}".') + pat = self._pattern_to_regex(path) + if pat not in self.routes: + self.routes[pat] = {} + if method in self.routes[pat]: + raise RuntimeWarning(f'Path "{path}" already exists.') + self.routes[pat][method] = func + + def GET(self, path): + """Decorator to register a GET HTTP route.""" + + def decorator(func): + self._serve(path, "GET", func) + return func + + return decorator + + def POST(self, path): + """Decorator to register a POST HTTP route.""" + + def decorator(func): + self._serve(path, "POST", func) + return func + + return decorator + + def PUT(self, path): + """Decorator to register a PUT HTTP route.""" + + def decorator(func): + self._serve(path, "PUT", func) + return func + + def DELETE(self, path): + """Decorator to register a DELETE HTTP route.""" + + def decorator(func): + self._serve(path, "DELETE", func) + return func + + return decorator + + def error(self, code): + """Decorator to register an error route.""" + + def decorator(func): + self.error_routes[code] = func + return func + + return decorator + + def resolve(self, path, method) -> tuple[callable, dict]: + for pattern, route in self.routes.items(): + if m := pattern.fullmatch(path): + if method not in route: + return self.error_routes[405], {} + return route[method], m.groupdict() + return self.error_routes[404], {} + + async def handle_client( + self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter + ): + """Handle an incoming connection (HTTP or WebSocket).""" + try: + # Read the initial HTTP request line + request_line = await reader.readline() + if not request_line: + return + + # Parse request line + parts = request_line.decode(encoding="utf-8").strip().split() + if len(parts) < 3: + return + + method, path, protocol = parts[0], parts[1], parts[2] + + assert protocol == "HTTP/1.1" + + headers = {} + + content_length = int(headers.get("Content-Length", 0)) + body = await reader.read(content_length) if content_length else b"" + + route, kwargs = self.resolve(path) + response = await route( + request={ + "method": method, + "path": path, + "headers": headers, + "body": body, + }, + **kwargs, + ) + writer.write(response.encode(encoding="utf-8")) + + await writer.drain() + except Exception as e: + print(f"Error: {e}") + finally: + writer.close() + await writer.wait_closed() + + async def run(self, host="127.0.0.1", port=80): + """Start the async server.""" + + server = await asyncio.start_server(self.handle_client, host, port) + + print(f"Serving on {host}:{port}") + async with server: + await server.serve_forever() + + +def HTTPResponse(content: str, status=200, content_type="text/plain; charset=utf-8"): + return f"HTTP/1.1 {status} OK\r\nContent-Type: {content_type}\r\n\r\n{content}" + + +def render(file: str | Path): + if type(file) is str: + file = Path(file) + content: str = file.read_text() + return HTTPResponse(content, content_type="text/html; charset=utf-8") + + +def JSONResponse(d: dict, status=200): + return HTTPResponse( + json.dumps(d), status=status, content_type="text/json; charset=utf-8" + ) + + +def JSONAPI(func): + def wrapper(*args, **kwargs): + result = func(*args, **kwargs) + if not isinstance(result, dict): + if ( + isinstance(result, tuple) + and len(result) == 2 + and isinstance(result[1], dict) + and isinstance(result[0], int) + ): + return JSONResponse(result[1], result[0]) + raise RuntimeError("Return value of JSONAPI route is not a dictionary") + return JSONResponse(result)