Files
shatranj/app.py

676 lines
20 KiB
Python
Raw Normal View History

2026-01-15 16:06:25 +03:30
import asyncio
import json
2026-01-20 19:57:19 +03:30
import random
import re
import string
2026-01-15 16:06:25 +03:30
import uuid
2026-01-20 19:57:38 +03:30
from collections import deque
2026-01-17 01:59:04 +03:30
from collections.abc import Iterator
2026-01-18 19:06:57 +03:30
from datetime import datetime, timedelta, timezone
2026-01-15 16:06:25 +03:30
from enum import Enum
2026-01-17 01:59:04 +03:30
from itertools import product
from pathlib import Path
from typing import Any
2026-01-15 16:06:25 +03:30
2026-01-20 19:57:19 +03:30
from libs.slow import (
JSONAPI,
App,
HTTPResponse,
JSONResponse,
Request,
redirect,
render,
)
2026-01-15 16:06:25 +03:30
2026-01-17 01:59:04 +03:30
class Coord:
2026-01-15 16:06:25 +03:30
x: int
y: int
2026-01-17 01:59:04 +03:30
def __init__(self, x: int, y: int):
self.x = x
self.y = y
def __iter__(self) -> Iterator[int]:
yield self.x
yield self.y
def copy(self):
return Coord(self.x, self.y)
def __str__(self):
2026-01-17 19:29:13 +03:30
return str(coord_to_pos(self))
2026-01-17 01:59:04 +03:30
def __eq__(self, other):
if isinstance(other, Coord):
return self.x == other.x and self.y == other.y
return False
def __repr__(self):
return f"Coord({self.x}, {self.y})"
2026-01-15 16:06:25 +03:30
app = App()
class Color(Enum):
WHITE = 0
BLACK = 1
2026-01-17 01:59:04 +03:30
class State(Enum):
NOT_FINISHED = -1
TIE = 0
WHITE_WIN = 1
BLACK_WIN = 2
2026-01-15 16:06:25 +03:30
class Piece(Enum):
NONE = ""
EMPTY = "E"
# WHITE
WHITE_PAWN = "P"
WHITE_ROOK = "R"
WHITE_BISHOP = "B"
WHITE_KING = "K"
WHITE_QUEEN = "Q"
WHITE_CASTLE = "C"
# BLACK
BLACK_PAWN = "p"
BLACK_ROOK = "r"
BLACK_BISHOP = "b"
BLACK_KING = "k"
BLACK_QUEEN = "q"
BLACK_CASTLE = "c"
2026-01-17 01:59:04 +03:30
def coord_to_pos(xy: Coord) -> None | str:
2026-01-15 16:06:25 +03:30
if xy.x < 0 or xy.x > 7 or xy.y < 0 or xy.y > 7:
return None
return "abcdefgh"[xy.x] + "12345678"[xy.y]
2026-01-17 01:59:04 +03:30
def coord_to_pos_safe(xy: Coord) -> str:
return "abcdefgh"[xy.x] + "12345678"[xy.y]
def xy_to_pos(x: int, y: int) -> None | str:
if x < 0 or x > 7 or y < 0 or y > 7:
return None
return "abcdefgh"[x] + "12345678"[y]
def xy_to_pos_safe(x: int, y: int) -> str:
return "abcdefgh"[x] + "12345678"[y]
def pos_to_coord(pos: str) -> Coord:
return Coord("abcdefgh".index(pos[0]), "12345678".index(pos[1]))
2026-01-15 16:06:25 +03:30
class Board:
grid: list[list[Piece]]
def __init__(self):
2026-01-17 01:59:04 +03:30
self.grid: list[list[Piece]] = [[Piece.EMPTY] * 8 for _ in range(8)]
2026-01-15 16:06:25 +03:30
self.grid[0] = [
Piece.WHITE_CASTLE,
Piece.WHITE_ROOK,
Piece.WHITE_BISHOP,
Piece.WHITE_QUEEN,
2026-01-17 01:59:04 +03:30
Piece.WHITE_KING,
2026-01-15 16:06:25 +03:30
Piece.WHITE_BISHOP,
Piece.WHITE_ROOK,
Piece.WHITE_CASTLE,
]
self.grid[1] = [Piece.WHITE_PAWN] * 8
self.grid[6] = [Piece.BLACK_PAWN] * 8
self.grid[7] = [
Piece.BLACK_CASTLE,
Piece.BLACK_ROOK,
Piece.BLACK_BISHOP,
Piece.BLACK_QUEEN,
2026-01-17 01:59:04 +03:30
Piece.BLACK_KING,
2026-01-15 16:06:25 +03:30
Piece.BLACK_BISHOP,
Piece.BLACK_ROOK,
Piece.BLACK_CASTLE,
]
def index(self, pos: str) -> Piece:
let = "abcdefgh".index(pos[0])
num = "12345678".index(pos[1])
return self.grid[num][let]
2026-01-17 01:59:04 +03:30
def index_coord(self, coord: Coord) -> Piece:
if coord.x < 0 or coord.x > 7 or coord.y < 0 or coord.y > 7:
2026-01-15 16:06:25 +03:30
return Piece.NONE
2026-01-17 01:59:04 +03:30
return self.grid[coord.y][coord.x]
def index_xy(self, x: int, y: int) -> Piece:
if x < 0 or x > 7 or y < 0 or y > 7:
return Piece.NONE
return self.grid[y][x]
def copy(self) -> "Board":
board = Board()
board.grid = [[Piece(p.value) for p in r] for r in self.grid]
return board
2026-01-15 16:06:25 +03:30
def serialize(self) -> dict:
2026-01-17 01:59:04 +03:30
return {"grid": [[p.value for p in r] for r in self.grid]}
def __str__(self):
return "\n".join([" ".join([p.value for p in self.grid[i]]) for i in range(8)])
2026-01-15 16:06:25 +03:30
class Room:
2026-01-17 01:59:04 +03:30
board: Board
turn: Color
players: list[str]
2026-01-15 16:06:25 +03:30
last_move: datetime
2026-01-17 19:29:13 +03:30
game_start: datetime | None
2026-01-17 01:59:04 +03:30
state: State
def __init__(self):
self.turn = Color.WHITE
self.board = Board()
self.players = []
2026-01-18 19:06:57 +03:30
self.last_move = datetime.now(timezone.utc)
2026-01-17 19:29:13 +03:30
self.game_start = None
2026-01-17 01:59:04 +03:30
self.state = State.NOT_FINISHED
2026-01-15 16:06:25 +03:30
def start(self):
2026-01-18 19:06:57 +03:30
self.last_move = datetime.now(timezone.utc)
self.game_start = datetime.now(timezone.utc)
2026-01-15 16:06:25 +03:30
2026-01-17 01:59:04 +03:30
def add_player(self) -> None | tuple[str, Color]:
2026-01-15 16:06:25 +03:30
np: int = len(self.players)
if np >= 2:
return None
elif np == 1:
2026-01-17 01:59:04 +03:30
uid = str(uuid.uuid4())
2026-01-15 16:06:25 +03:30
self.players.append(uid)
2026-01-17 01:59:04 +03:30
self.start()
2026-01-15 16:06:25 +03:30
return uid, Color.BLACK
else:
2026-01-17 01:59:04 +03:30
uid = str(uuid.uuid4())
2026-01-15 16:06:25 +03:30
self.players.append(uid)
return uid, Color.WHITE
rooms: dict[str, Room] = {}
2026-01-18 19:07:25 +03:30
favicon = Path("favicon.ico").read_bytes()
favicon_update = Path("favicon_update.ico").read_bytes()
@app.GET("/favicon.ico")
async def favicon_s(request):
return HTTPResponse(request, favicon, content_type="image/x-icon")
@app.GET("/favicon_update.ico")
async def favicon_update_s(request):
return HTTPResponse(request, favicon_update, content_type="image/x-icon")
2026-01-20 19:57:01 +03:30
letters = string.ascii_lowercase + string.digits
def room_key():
key: str = "".join(random.choices(letters, k=4))
while key in rooms.keys():
key: str = "".join(random.choices(letters, k=4))
return key
@app.POST("/create_room")
async def new_room(request):
if len(rooms) == len(letters) ** 4:
return HTTPResponse(request, "Out of service", status=501)
key = room_key()
rooms[key] = Room()
return JSONResponse(
request,
{
"id": key,
},
)
2026-01-20 19:57:38 +03:30
quick_queue: deque[str]
quick_map: dict[str, Room]
lock = asyncio.Lock()
@app.POST("/quick")
async def quick_match(request: Request):
global quick_queue, quick_map
data = parse(request.body)
async with lock:
if (
data
and len(data) == 1
and "queue_id" in data
and data["queue_id"] in quick_queue
):
first = None
second = None
position: int = 0
# UPDATE LOGIC
while position < len(quick_queue):
2026-01-20 19:57:38 +03:30
if quick_queue[position] not in quick_map:
if not first:
first = quick_queue[position]
else:
second = quick_queue[position]
break
2026-01-20 19:57:38 +03:30
position += 1
if first and second:
room = Room()
k = room_key()
rooms[k] = room
quick_map[first] = room
quick_map[second] = room
qid = data["queue_id"]
if qid in quick_map:
return JSONResponse(request, {"room_id": quick_map[qid]})
else:
return JSONResponse(
request, {}
) # Client handles empty as continue to wait
else:
qid = str(uuid.uuid4())
quick_queue.append(qid)
return JSONResponse(request, {"queue_id": qid})
2026-01-15 16:06:25 +03:30
@app.GET("/")
async def home(request):
2026-01-20 19:57:01 +03:30
return render(request, "home.html")
2026-01-17 02:01:19 +03:30
@app.GET("/<id>")
2026-01-20 19:57:01 +03:30
async def game(request, id):
if not re.match(r"[a-z0-9]{4}", id):
return redirect("/")
if id not in rooms:
return redirect("/")
return render(request, "game.html")
2026-01-15 16:06:25 +03:30
2026-01-17 01:59:04 +03:30
@app.POST("/join/<room_id>")
2026-01-15 16:06:25 +03:30
async def join(request, room_id):
2026-01-17 01:59:04 +03:30
if room_id not in rooms:
2026-01-20 19:57:01 +03:30
return JSONResponse(request, {"code": "NOGO", "error": "Room not found."}, 404)
2026-01-17 01:59:04 +03:30
room: Room = rooms[room_id]
2026-01-15 16:06:25 +03:30
player = room.add_player()
if player:
return JSONResponse(
2026-01-17 18:18:16 +03:30
request,
2026-01-15 16:06:25 +03:30
{
"code": "JOIN",
"id": player[0],
2026-01-17 01:59:04 +03:30
"color": player[1].value,
"turn": room.turn.value,
2026-01-15 16:06:25 +03:30
"board": room.board.serialize(),
2026-01-17 01:59:04 +03:30
"ready": len(room.players) == 2,
2026-01-18 19:06:57 +03:30
"state": room.state.value,
"start_time": (
room.game_start or datetime.now(timezone.utc)
).isoformat(),
2026-01-17 18:18:16 +03:30
},
2026-01-15 16:06:25 +03:30
)
else:
return JSONResponse(
2026-01-17 18:18:16 +03:30
request,
2026-01-18 19:06:57 +03:30
{
"code": "FULL",
"error": "Room Full",
"board": room.board.serialize(),
2026-01-18 19:12:59 +03:30
"ready": len(room.players) == 2,
2026-01-18 19:06:57 +03:30
"turn": room.turn.value,
"state": room.state.value,
"start_time": (
room.game_start or datetime.now(timezone.utc)
).isoformat(),
},
2026-01-15 16:06:25 +03:30
)
2026-01-17 01:59:04 +03:30
def parse(body: str | bytes) -> dict[str, Any] | None:
2026-01-15 16:06:25 +03:30
try:
return json.loads(body)
except Exception:
return None
2026-01-17 01:59:04 +03:30
def get_piece_moves(piece_kind, board: Board, is_white, src: str) -> list[Coord]:
valids: list[Coord] = []
if piece_kind == "p":
dir = 1 if is_white else -1
x, y = pos_to_coord(src)
if board.index_xy(x, y + dir) == Piece.EMPTY:
valids.append(Coord(x=x, y=y + dir))
if (y == 1 or y == 6) and board.index_xy(x, y + 2 * dir) == Piece.EMPTY:
valids.append(Coord(x=x, y=y + 2 * dir))
if (
board.index_xy(x + 1, y + dir)
not in [Piece.EMPTY, Piece.NONE, Piece.BLACK_KING, Piece.WHITE_KING]
2026-01-17 19:29:13 +03:30
and board.index_xy(x + 1, y + dir).value.isupper() != is_white
2026-01-17 01:59:04 +03:30
):
2026-01-17 19:29:13 +03:30
valids.append(Coord(x=x + 1, y=y + dir))
2026-01-17 01:59:04 +03:30
if (
board.index_xy(x - 1, y + dir)
not in [Piece.EMPTY, Piece.NONE, Piece.BLACK_KING, Piece.WHITE_KING]
2026-01-17 19:29:13 +03:30
and board.index_xy(x - 1, y + dir).value.isupper() != is_white
2026-01-17 01:59:04 +03:30
):
2026-01-17 19:29:13 +03:30
valids.append(Coord(x=x - 1, y=y + dir))
2026-01-17 01:59:04 +03:30
elif piece_kind == "b":
x, y = pos_to_coord(src)
comb = product([1, -1], repeat=2)
for dir in comb:
target: Coord = Coord(x=x + dir[0], y=y + dir[1])
while (
board.index_coord(target) == Piece.EMPTY
and board.index_coord(target) != Piece.NONE
):
valids.append(target.copy())
target.x += dir[0]
target.y += dir[1]
if (
p := board.index_coord(target)
) != Piece.NONE and p.value.isupper() != is_white:
valids.append(target.copy())
elif piece_kind == "q":
x, y = pos_to_coord(src)
comb = product([1, -1, 0], repeat=2)
for dir in comb:
if dir[0] ** 2 + dir[1] ** 2 == 0:
continue # x=0 y=0 cannot move => invalid
target: Coord = Coord(x=x + dir[0], y=y + dir[1])
while (
board.index_coord(target) == Piece.EMPTY
and board.index_coord(target) != Piece.NONE
):
valids.append(target.copy())
target.x += dir[0]
target.y += dir[1]
if (
p := board.index_coord(target)
) != Piece.NONE and p.value.isupper() != is_white:
valids.append(target.copy())
elif piece_kind == "c":
x, y = pos_to_coord(src)
comb = [(-1, 0), (1, 0), (0, -1), (0, 1)]
for dir in comb:
target: Coord = Coord(x=x + dir[0], y=y + dir[1])
while (
board.index_coord(target) == Piece.EMPTY
and board.index_coord(target) != Piece.NONE
):
valids.append(target.copy())
target.x += dir[0]
target.y += dir[1]
if (
p := board.index_coord(target)
) != Piece.NONE and p.value.isupper() != is_white:
valids.append(target.copy())
elif piece_kind == "k":
x, y = pos_to_coord(src)
comb = product([-1, 0, 1], repeat=2)
for dir in comb:
if dir[0] ** 2 + dir[1] ** 2 == 0:
continue # x=0 y=0 cannot move => invalid
target: Coord = Coord(x=x + dir[0], y=y + dir[1])
2026-01-17 19:29:13 +03:30
if (p := board.index_coord(target)) != Piece.NONE and (
p.value.isupper() != is_white or p.value == "E"
):
2026-01-17 01:59:04 +03:30
valids.append(target.copy())
elif piece_kind == "r":
x, y = pos_to_coord(src)
moves = [(2, 1), (2, -1), (-2, 1), (-2, -1), (1, 2), (-1, 2), (1, -2), (-1, -2)]
for m in moves:
target: Coord = Coord(x=x + m[0], y=y + m[1])
2026-01-17 19:29:13 +03:30
if (p := board.index_coord(target)) != Piece.NONE and (
p.value.isupper() != is_white or p.value == "E"
):
2026-01-17 01:59:04 +03:30
valids.append(target.copy())
2026-01-19 18:18:15 +03:30
v_temp = [a for a in valids]
valids.clear()
for a in v_temp:
if board.index_coord(a).value.lower != "k":
valids.append(a)
2026-01-17 01:59:04 +03:30
return valids
def generate_valid_moves(
piece_kind: str, board: Board, is_white: bool, src: str
) -> list[Coord]:
possible_moves: list[Coord] = get_piece_moves(piece_kind, board, is_white, src)
valids: list[Coord] = []
king: Coord
for i in range(8):
for j in range(8):
p = board.grid[i][j]
if (
p in [Piece.BLACK_KING, Piece.WHITE_KING]
and p.value.isupper() == is_white
):
king = Coord(j, i)
break
for m in possible_moves:
fake_board = board.copy()
fake_board.grid[m.y][m.x] = fake_board.index(src)
sx, sy = pos_to_coord(src)
fake_board.grid[sy][sx] = Piece.EMPTY
king_safe = True
for i in range(8):
for j in range(8):
p = board.grid[i][j]
if p != Piece.EMPTY and p.value.isupper() != is_white:
# Enemy
enemy_moves = get_piece_moves(
p.value.lower(), board, not is_white, xy_to_pos_safe(j, i)
)
if king not in enemy_moves:
continue
2026-01-19 18:30:09 +03:30
ni = i
nj = j
if j == sx and i == sy:
ni = m.y
nj = m.x
2026-01-17 01:59:04 +03:30
new_enemy_moves = get_piece_moves(
2026-01-19 18:30:09 +03:30
p.value.lower(),
fake_board,
not is_white,
xy_to_pos_safe(nj, ni),
2026-01-17 01:59:04 +03:30
)
if king in new_enemy_moves:
king_safe = False
break
if not king_safe:
break
if king_safe:
valids.append(m)
return valids
@app.POST("/move/<room_id>")
2026-01-15 16:06:25 +03:30
@JSONAPI
2026-01-17 01:59:04 +03:30
async def move(request: Request, room_id):
2026-01-15 16:06:25 +03:30
req = parse(request.body)
if not req:
return 400, {}
if "uid" not in req or not isinstance(req["uid"], str):
return 400, {}
if "from" not in req or not isinstance(req["from"], str):
return 400, {}
if "to" not in req or not isinstance(req["to"], str):
return 400, {}
2026-01-17 01:59:04 +03:30
if len(req) > 3:
return 400, {}
2026-01-15 16:06:25 +03:30
if room_id not in rooms:
return 400, {"code": "NOEX", "error": "Room does not exist"}
room = rooms[room_id]
if len(room.players) != 2:
return 400, {"code": "PRES", "error": "Game has not been started"}
2026-01-18 19:07:25 +03:30
if room.state != State.NOT_FINISHED:
return 400, {"code": "FINI", "error": "Game has not finished."}
2026-01-15 16:06:25 +03:30
board = room.board
uid = req["uid"]
src = req["from"].lower()
dst = req["to"].lower()
color = Color.WHITE if room.players[0] == uid else Color.BLACK
turn = room.turn == color
if not turn:
return 400, {"code": "OTHR", "error": "Not your turn."}
if (
len(src) != 2
or len(dst) != 2
or src[0] not in "abcdefgh"
or src[1] not in "12345678"
or dst[0] not in "abcdefgh"
or dst[1] not in "12345678"
2026-01-17 01:59:04 +03:30
or board.index(src) == Piece.EMPTY
2026-01-15 16:06:25 +03:30
):
2026-01-17 01:59:04 +03:30
return 400, {"code": "LMOV", "error": "Bad Move 1"}
2026-01-15 16:06:25 +03:30
srcp = board.index(src)
dstp = board.index(dst)
piece_kind = srcp.value.lower()
is_white = srcp.value.isupper()
if (is_white and color == Color.BLACK) or (not is_white and color == Color.WHITE):
return 400, {"code": "NOTU", "error": "Not your piece"}
if dstp != Piece.EMPTY and (
(dstp.value.isupper() and color == Color.WHITE)
or (dstp.value.islower() and color == Color.BLACK)
):
return 400, {"code": "HTME", "error": "Cannot move to your own piece"}
2026-01-17 01:59:04 +03:30
valid_moves = generate_valid_moves(piece_kind, board, is_white, src)
if (c := pos_to_coord(dst)) in valid_moves:
board.grid[c.y][c.x] = srcp
sx, sy = pos_to_coord(src)
board.grid[sy][sx] = Piece.EMPTY
2026-01-19 18:14:37 +03:30
if (c.y == 0 or c.y == 7) and piece_kind == "p":
2026-01-17 01:59:04 +03:30
board.grid[c.y][c.x] = Piece.WHITE_QUEEN if is_white else Piece.BLACK_QUEEN
room.turn = Color.BLACK if color == Color.WHITE else Color.WHITE
2026-01-18 19:07:25 +03:30
room.last_move = datetime.now(timezone.utc)
opp_checkmate = True
for i in range(8):
for j in range(8):
P = board.index_xy(i, j)
if P != "E" and P.value.isupper() != is_white:
moves = generate_valid_moves(
P.value.lower(), board, not is_white, xy_to_pos_safe(i, j)
)
if len(moves) > 0:
opp_checkmate = False
break
if not opp_checkmate:
break
if opp_checkmate:
room.state = State.BLACK_WIN if is_white else State.WHITE_WIN
2026-01-17 01:59:04 +03:30
return {
"code": "MOVD",
"color": color.value,
"turn": room.turn.value,
2026-01-18 19:07:25 +03:30
"state": room.state.value,
2026-01-17 01:59:04 +03:30
"board": board.serialize(),
}
else:
return 400, {"code": "LMOV", "error": "Bad Move 2"}
@app.GET("/poll/<room_id>")
@JSONAPI
async def poll(request, room_id):
for key in rooms:
room = rooms[key]
2026-01-18 19:06:57 +03:30
if (datetime.now(timezone.utc) - room.last_move) >= timedelta(hours=24):
2026-01-17 01:59:04 +03:30
del rooms[key]
2026-01-17 19:29:13 +03:30
if (
room.game_start
2026-01-18 19:06:57 +03:30
and (datetime.now(timezone.utc) - room.game_start) >= timedelta(minutes=30)
2026-01-17 19:29:13 +03:30
and room.state == State.NOT_FINISHED
):
room.state = State.TIE
2026-01-17 01:59:04 +03:30
if room_id not in rooms:
return 400, {"code": "NOEX", "error": "Room does not exist"}
room = rooms[room_id]
return {
"code": "STAT",
"turn": room.turn.value,
"board": room.board.serialize(),
"ready": len(room.players) == 2,
"state": room.state.value,
2026-01-18 19:06:57 +03:30
"start_time": (room.game_start or datetime.now(timezone.utc)).isoformat(),
2026-01-17 01:59:04 +03:30
}
@app.POST("/moves/<room_id>")
@JSONAPI
async def generate_moves(request: Request, room_id):
req = parse(request.body)
if not req:
return 400, {}
if "from" not in req or not isinstance(req["from"], str):
return 400, {}
if len(req) > 1:
return 400, {}
if room_id not in rooms:
return 400, {"code": "NOEX", "error": "Room does not exist"}
room = rooms[room_id]
if len(room.players) != 2:
return 400, {"code": "PRES", "error": "Game has not been started"}
board = room.board
src = req["from"].lower()
srcp = board.index(src)
color = Color.WHITE if srcp.value.isupper() else Color.BLACK
if (
len(src) != 2
or src[0] not in "abcdefgh"
or src[1] not in "12345678"
or srcp == Piece.EMPTY
):
return 400, {"code": "LMOV", "error": "Bad Move"}
piece_kind = srcp.value.lower()
is_white = srcp.value.isupper()
if (is_white and color == Color.BLACK) or (not is_white and color == Color.WHITE):
return 400, {"code": "NOTU", "error": "Not your piece"}
valid_moves = generate_valid_moves(piece_kind, board, is_white, src)
return {
"moves": ",".join([coord_to_pos_safe(c) for c in valid_moves]),
}
def sanitize_filename(filename: str, base: Path = Path.cwd().resolve()) -> Path:
p = (base / filename).resolve()
if not p.is_relative_to(base):
raise ValueError("Filename leaves current directory")
return p
@app.GET("/static/<fn>")
async def static(request, fn):
try:
path = sanitize_filename(fn, Path("static/").resolve())
return HTTPResponse(
request, path.read_bytes(), content_type="application/octet-stream"
2026-01-17 01:59:04 +03:30
)
except Exception:
2026-01-17 18:18:16 +03:30
return HTTPResponse(request, "404 File Not Found", status=404)
2026-01-15 16:06:25 +03:30
if __name__ == "__main__":
2026-01-17 02:00:28 +03:30
asyncio.run(app.run(port=8989))