Files
shatranj/app.py
2026-01-20 19:59:08 +03:30

676 lines
20 KiB
Python

import asyncio
import json
import random
import re
import string
import uuid
from collections import deque
from collections.abc import Iterator
from datetime import datetime, timedelta, timezone
from enum import Enum
from itertools import product
from pathlib import Path
from typing import Any
from libs.slow import (
JSONAPI,
App,
HTTPResponse,
JSONResponse,
Request,
redirect,
render,
)
class Coord:
x: int
y: int
def __init__(self, x: int, y: int):
self.x = x
self.y = y
def __iter__(self) -> Iterator[int]:
yield self.x
yield self.y
def copy(self):
return Coord(self.x, self.y)
def __str__(self):
return str(coord_to_pos(self))
def __eq__(self, other):
if isinstance(other, Coord):
return self.x == other.x and self.y == other.y
return False
def __repr__(self):
return f"Coord({self.x}, {self.y})"
app = App()
class Color(Enum):
WHITE = 0
BLACK = 1
class State(Enum):
NOT_FINISHED = -1
TIE = 0
WHITE_WIN = 1
BLACK_WIN = 2
class Piece(Enum):
NONE = ""
EMPTY = "E"
# WHITE
WHITE_PAWN = "P"
WHITE_ROOK = "R"
WHITE_BISHOP = "B"
WHITE_KING = "K"
WHITE_QUEEN = "Q"
WHITE_CASTLE = "C"
# BLACK
BLACK_PAWN = "p"
BLACK_ROOK = "r"
BLACK_BISHOP = "b"
BLACK_KING = "k"
BLACK_QUEEN = "q"
BLACK_CASTLE = "c"
def coord_to_pos(xy: Coord) -> None | str:
if xy.x < 0 or xy.x > 7 or xy.y < 0 or xy.y > 7:
return None
return "abcdefgh"[xy.x] + "12345678"[xy.y]
def coord_to_pos_safe(xy: Coord) -> str:
return "abcdefgh"[xy.x] + "12345678"[xy.y]
def xy_to_pos(x: int, y: int) -> None | str:
if x < 0 or x > 7 or y < 0 or y > 7:
return None
return "abcdefgh"[x] + "12345678"[y]
def xy_to_pos_safe(x: int, y: int) -> str:
return "abcdefgh"[x] + "12345678"[y]
def pos_to_coord(pos: str) -> Coord:
return Coord("abcdefgh".index(pos[0]), "12345678".index(pos[1]))
class Board:
grid: list[list[Piece]]
def __init__(self):
self.grid: list[list[Piece]] = [[Piece.EMPTY] * 8 for _ in range(8)]
self.grid[0] = [
Piece.WHITE_CASTLE,
Piece.WHITE_ROOK,
Piece.WHITE_BISHOP,
Piece.WHITE_QUEEN,
Piece.WHITE_KING,
Piece.WHITE_BISHOP,
Piece.WHITE_ROOK,
Piece.WHITE_CASTLE,
]
self.grid[1] = [Piece.WHITE_PAWN] * 8
self.grid[6] = [Piece.BLACK_PAWN] * 8
self.grid[7] = [
Piece.BLACK_CASTLE,
Piece.BLACK_ROOK,
Piece.BLACK_BISHOP,
Piece.BLACK_QUEEN,
Piece.BLACK_KING,
Piece.BLACK_BISHOP,
Piece.BLACK_ROOK,
Piece.BLACK_CASTLE,
]
def index(self, pos: str) -> Piece:
let = "abcdefgh".index(pos[0])
num = "12345678".index(pos[1])
return self.grid[num][let]
def index_coord(self, coord: Coord) -> Piece:
if coord.x < 0 or coord.x > 7 or coord.y < 0 or coord.y > 7:
return Piece.NONE
return self.grid[coord.y][coord.x]
def index_xy(self, x: int, y: int) -> Piece:
if x < 0 or x > 7 or y < 0 or y > 7:
return Piece.NONE
return self.grid[y][x]
def copy(self) -> "Board":
board = Board()
board.grid = [[Piece(p.value) for p in r] for r in self.grid]
return board
def serialize(self) -> dict:
return {"grid": [[p.value for p in r] for r in self.grid]}
def __str__(self):
return "\n".join([" ".join([p.value for p in self.grid[i]]) for i in range(8)])
class Room:
board: Board
turn: Color
players: list[str]
last_move: datetime
game_start: datetime | None
state: State
def __init__(self):
self.turn = Color.WHITE
self.board = Board()
self.players = []
self.last_move = datetime.now(timezone.utc)
self.game_start = None
self.state = State.NOT_FINISHED
def start(self):
self.last_move = datetime.now(timezone.utc)
self.game_start = datetime.now(timezone.utc)
def add_player(self) -> None | tuple[str, Color]:
np: int = len(self.players)
if np >= 2:
return None
elif np == 1:
uid = str(uuid.uuid4())
self.players.append(uid)
self.start()
return uid, Color.BLACK
else:
uid = str(uuid.uuid4())
self.players.append(uid)
return uid, Color.WHITE
rooms: dict[str, Room] = {}
favicon = Path("favicon.ico").read_bytes()
favicon_update = Path("favicon_update.ico").read_bytes()
@app.GET("/favicon.ico")
async def favicon_s(request):
return HTTPResponse(request, favicon, content_type="image/x-icon")
@app.GET("/favicon_update.ico")
async def favicon_update_s(request):
return HTTPResponse(request, favicon_update, content_type="image/x-icon")
letters = string.ascii_lowercase + string.digits
def room_key():
key: str = "".join(random.choices(letters, k=4))
while key in rooms.keys():
key: str = "".join(random.choices(letters, k=4))
return key
@app.POST("/create_room")
async def new_room(request):
if len(rooms) == len(letters) ** 4:
return HTTPResponse(request, "Out of service", status=501)
key = room_key()
rooms[key] = Room()
return JSONResponse(
request,
{
"id": key,
},
)
quick_queue: deque[str]
quick_map: dict[str, Room]
lock = asyncio.Lock()
@app.POST("/quick")
async def quick_match(request: Request):
global quick_queue, quick_map
data = parse(request.body)
async with lock:
if (
data
and len(data) == 1
and "queue_id" in data
and data["queue_id"] in quick_queue
):
first = None
second = None
position: int = 0
# UPDATE LOGIC
while position < len(quick_queue):
if quick_queue[position] not in quick_map:
if not first:
first = quick_queue[position]
else:
second = quick_queue[position]
break
position += 1
if first and second:
room = Room()
k = room_key()
rooms[k] = room
quick_map[first] = room
quick_map[second] = room
qid = data["queue_id"]
if qid in quick_map:
return JSONResponse(request, {"room_id": quick_map[qid]})
else:
return JSONResponse(
request, {}
) # Client handles empty as continue to wait
else:
qid = str(uuid.uuid4())
quick_queue.append(qid)
return JSONResponse(request, {"queue_id": qid})
@app.GET("/")
async def home(request):
return render(request, "home.html")
@app.GET("/<id>")
async def game(request, id):
if not re.match(r"[a-z0-9]{4}", id):
return redirect("/")
if id not in rooms:
return redirect("/")
return render(request, "game.html")
@app.POST("/join/<room_id>")
async def join(request, room_id):
if room_id not in rooms:
return JSONResponse(request, {"code": "NOGO", "error": "Room not found."}, 404)
room: Room = rooms[room_id]
player = room.add_player()
if player:
return JSONResponse(
request,
{
"code": "JOIN",
"id": player[0],
"color": player[1].value,
"turn": room.turn.value,
"board": room.board.serialize(),
"ready": len(room.players) == 2,
"state": room.state.value,
"start_time": (
room.game_start or datetime.now(timezone.utc)
).isoformat(),
},
)
else:
return JSONResponse(
request,
{
"code": "FULL",
"error": "Room Full",
"board": room.board.serialize(),
"ready": len(room.players) == 2,
"turn": room.turn.value,
"state": room.state.value,
"start_time": (
room.game_start or datetime.now(timezone.utc)
).isoformat(),
},
)
def parse(body: str | bytes) -> dict[str, Any] | None:
try:
return json.loads(body)
except Exception:
return None
def get_piece_moves(piece_kind, board: Board, is_white, src: str) -> list[Coord]:
valids: list[Coord] = []
if piece_kind == "p":
dir = 1 if is_white else -1
x, y = pos_to_coord(src)
if board.index_xy(x, y + dir) == Piece.EMPTY:
valids.append(Coord(x=x, y=y + dir))
if (y == 1 or y == 6) and board.index_xy(x, y + 2 * dir) == Piece.EMPTY:
valids.append(Coord(x=x, y=y + 2 * dir))
if (
board.index_xy(x + 1, y + dir)
not in [Piece.EMPTY, Piece.NONE, Piece.BLACK_KING, Piece.WHITE_KING]
and board.index_xy(x + 1, y + dir).value.isupper() != is_white
):
valids.append(Coord(x=x + 1, y=y + dir))
if (
board.index_xy(x - 1, y + dir)
not in [Piece.EMPTY, Piece.NONE, Piece.BLACK_KING, Piece.WHITE_KING]
and board.index_xy(x - 1, y + dir).value.isupper() != is_white
):
valids.append(Coord(x=x - 1, y=y + dir))
elif piece_kind == "b":
x, y = pos_to_coord(src)
comb = product([1, -1], repeat=2)
for dir in comb:
target: Coord = Coord(x=x + dir[0], y=y + dir[1])
while (
board.index_coord(target) == Piece.EMPTY
and board.index_coord(target) != Piece.NONE
):
valids.append(target.copy())
target.x += dir[0]
target.y += dir[1]
if (
p := board.index_coord(target)
) != Piece.NONE and p.value.isupper() != is_white:
valids.append(target.copy())
elif piece_kind == "q":
x, y = pos_to_coord(src)
comb = product([1, -1, 0], repeat=2)
for dir in comb:
if dir[0] ** 2 + dir[1] ** 2 == 0:
continue # x=0 y=0 cannot move => invalid
target: Coord = Coord(x=x + dir[0], y=y + dir[1])
while (
board.index_coord(target) == Piece.EMPTY
and board.index_coord(target) != Piece.NONE
):
valids.append(target.copy())
target.x += dir[0]
target.y += dir[1]
if (
p := board.index_coord(target)
) != Piece.NONE and p.value.isupper() != is_white:
valids.append(target.copy())
elif piece_kind == "c":
x, y = pos_to_coord(src)
comb = [(-1, 0), (1, 0), (0, -1), (0, 1)]
for dir in comb:
target: Coord = Coord(x=x + dir[0], y=y + dir[1])
while (
board.index_coord(target) == Piece.EMPTY
and board.index_coord(target) != Piece.NONE
):
valids.append(target.copy())
target.x += dir[0]
target.y += dir[1]
if (
p := board.index_coord(target)
) != Piece.NONE and p.value.isupper() != is_white:
valids.append(target.copy())
elif piece_kind == "k":
x, y = pos_to_coord(src)
comb = product([-1, 0, 1], repeat=2)
for dir in comb:
if dir[0] ** 2 + dir[1] ** 2 == 0:
continue # x=0 y=0 cannot move => invalid
target: Coord = Coord(x=x + dir[0], y=y + dir[1])
if (p := board.index_coord(target)) != Piece.NONE and (
p.value.isupper() != is_white or p.value == "E"
):
valids.append(target.copy())
elif piece_kind == "r":
x, y = pos_to_coord(src)
moves = [(2, 1), (2, -1), (-2, 1), (-2, -1), (1, 2), (-1, 2), (1, -2), (-1, -2)]
for m in moves:
target: Coord = Coord(x=x + m[0], y=y + m[1])
if (p := board.index_coord(target)) != Piece.NONE and (
p.value.isupper() != is_white or p.value == "E"
):
valids.append(target.copy())
v_temp = [a for a in valids]
valids.clear()
for a in v_temp:
if board.index_coord(a).value.lower != "k":
valids.append(a)
return valids
def generate_valid_moves(
piece_kind: str, board: Board, is_white: bool, src: str
) -> list[Coord]:
possible_moves: list[Coord] = get_piece_moves(piece_kind, board, is_white, src)
valids: list[Coord] = []
king: Coord
for i in range(8):
for j in range(8):
p = board.grid[i][j]
if (
p in [Piece.BLACK_KING, Piece.WHITE_KING]
and p.value.isupper() == is_white
):
king = Coord(j, i)
break
for m in possible_moves:
fake_board = board.copy()
fake_board.grid[m.y][m.x] = fake_board.index(src)
sx, sy = pos_to_coord(src)
fake_board.grid[sy][sx] = Piece.EMPTY
king_safe = True
for i in range(8):
for j in range(8):
p = board.grid[i][j]
if p != Piece.EMPTY and p.value.isupper() != is_white:
# Enemy
enemy_moves = get_piece_moves(
p.value.lower(), board, not is_white, xy_to_pos_safe(j, i)
)
if king not in enemy_moves:
continue
ni = i
nj = j
if j == sx and i == sy:
ni = m.y
nj = m.x
new_enemy_moves = get_piece_moves(
p.value.lower(),
fake_board,
not is_white,
xy_to_pos_safe(nj, ni),
)
if king in new_enemy_moves:
king_safe = False
break
if not king_safe:
break
if king_safe:
valids.append(m)
return valids
@app.POST("/move/<room_id>")
@JSONAPI
async def move(request: Request, room_id):
req = parse(request.body)
if not req:
return 400, {}
if "uid" not in req or not isinstance(req["uid"], str):
return 400, {}
if "from" not in req or not isinstance(req["from"], str):
return 400, {}
if "to" not in req or not isinstance(req["to"], str):
return 400, {}
if len(req) > 3:
return 400, {}
if room_id not in rooms:
return 400, {"code": "NOEX", "error": "Room does not exist"}
room = rooms[room_id]
if len(room.players) != 2:
return 400, {"code": "PRES", "error": "Game has not been started"}
if room.state != State.NOT_FINISHED:
return 400, {"code": "FINI", "error": "Game has not finished."}
board = room.board
uid = req["uid"]
src = req["from"].lower()
dst = req["to"].lower()
color = Color.WHITE if room.players[0] == uid else Color.BLACK
turn = room.turn == color
if not turn:
return 400, {"code": "OTHR", "error": "Not your turn."}
if (
len(src) != 2
or len(dst) != 2
or src[0] not in "abcdefgh"
or src[1] not in "12345678"
or dst[0] not in "abcdefgh"
or dst[1] not in "12345678"
or board.index(src) == Piece.EMPTY
):
return 400, {"code": "LMOV", "error": "Bad Move 1"}
srcp = board.index(src)
dstp = board.index(dst)
piece_kind = srcp.value.lower()
is_white = srcp.value.isupper()
if (is_white and color == Color.BLACK) or (not is_white and color == Color.WHITE):
return 400, {"code": "NOTU", "error": "Not your piece"}
if dstp != Piece.EMPTY and (
(dstp.value.isupper() and color == Color.WHITE)
or (dstp.value.islower() and color == Color.BLACK)
):
return 400, {"code": "HTME", "error": "Cannot move to your own piece"}
valid_moves = generate_valid_moves(piece_kind, board, is_white, src)
if (c := pos_to_coord(dst)) in valid_moves:
board.grid[c.y][c.x] = srcp
sx, sy = pos_to_coord(src)
board.grid[sy][sx] = Piece.EMPTY
if (c.y == 0 or c.y == 7) and piece_kind == "p":
board.grid[c.y][c.x] = Piece.WHITE_QUEEN if is_white else Piece.BLACK_QUEEN
room.turn = Color.BLACK if color == Color.WHITE else Color.WHITE
room.last_move = datetime.now(timezone.utc)
opp_checkmate = True
for i in range(8):
for j in range(8):
P = board.index_xy(i, j)
if P != "E" and P.value.isupper() != is_white:
moves = generate_valid_moves(
P.value.lower(), board, not is_white, xy_to_pos_safe(i, j)
)
if len(moves) > 0:
opp_checkmate = False
break
if not opp_checkmate:
break
if opp_checkmate:
room.state = State.BLACK_WIN if is_white else State.WHITE_WIN
return {
"code": "MOVD",
"color": color.value,
"turn": room.turn.value,
"state": room.state.value,
"board": board.serialize(),
}
else:
return 400, {"code": "LMOV", "error": "Bad Move 2"}
@app.GET("/poll/<room_id>")
@JSONAPI
async def poll(request, room_id):
for key in rooms:
room = rooms[key]
if (datetime.now(timezone.utc) - room.last_move) >= timedelta(hours=24):
del rooms[key]
if (
room.game_start
and (datetime.now(timezone.utc) - room.game_start) >= timedelta(minutes=30)
and room.state == State.NOT_FINISHED
):
room.state = State.TIE
if room_id not in rooms:
return 400, {"code": "NOEX", "error": "Room does not exist"}
room = rooms[room_id]
return {
"code": "STAT",
"turn": room.turn.value,
"board": room.board.serialize(),
"ready": len(room.players) == 2,
"state": room.state.value,
"start_time": (room.game_start or datetime.now(timezone.utc)).isoformat(),
}
@app.POST("/moves/<room_id>")
@JSONAPI
async def generate_moves(request: Request, room_id):
req = parse(request.body)
if not req:
return 400, {}
if "from" not in req or not isinstance(req["from"], str):
return 400, {}
if len(req) > 1:
return 400, {}
if room_id not in rooms:
return 400, {"code": "NOEX", "error": "Room does not exist"}
room = rooms[room_id]
if len(room.players) != 2:
return 400, {"code": "PRES", "error": "Game has not been started"}
board = room.board
src = req["from"].lower()
srcp = board.index(src)
color = Color.WHITE if srcp.value.isupper() else Color.BLACK
if (
len(src) != 2
or src[0] not in "abcdefgh"
or src[1] not in "12345678"
or srcp == Piece.EMPTY
):
return 400, {"code": "LMOV", "error": "Bad Move"}
piece_kind = srcp.value.lower()
is_white = srcp.value.isupper()
if (is_white and color == Color.BLACK) or (not is_white and color == Color.WHITE):
return 400, {"code": "NOTU", "error": "Not your piece"}
valid_moves = generate_valid_moves(piece_kind, board, is_white, src)
return {
"moves": ",".join([coord_to_pos_safe(c) for c in valid_moves]),
}
def sanitize_filename(filename: str, base: Path = Path.cwd().resolve()) -> Path:
p = (base / filename).resolve()
if not p.is_relative_to(base):
raise ValueError("Filename leaves current directory")
return p
@app.GET("/static/<fn>")
async def static(request, fn):
try:
path = sanitize_filename(fn, Path("static/").resolve())
return HTTPResponse(
request, path.read_bytes(), content_type="application/octet-stream"
)
except Exception:
return HTTPResponse(request, "404 File Not Found", status=404)
if __name__ == "__main__":
asyncio.run(app.run(port=8989))