2026-01-15 16:06:25 +03:30
|
|
|
import asyncio
|
|
|
|
|
import json
|
2026-01-20 19:57:19 +03:30
|
|
|
import random
|
|
|
|
|
import re
|
|
|
|
|
import string
|
2026-01-15 16:06:25 +03:30
|
|
|
import uuid
|
2026-01-20 19:57:38 +03:30
|
|
|
from collections import deque
|
2026-01-17 01:59:04 +03:30
|
|
|
from collections.abc import Iterator
|
2026-01-18 19:06:57 +03:30
|
|
|
from datetime import datetime, timedelta, timezone
|
2026-01-15 16:06:25 +03:30
|
|
|
from enum import Enum
|
2026-01-17 01:59:04 +03:30
|
|
|
from itertools import product
|
|
|
|
|
from pathlib import Path
|
|
|
|
|
from typing import Any
|
2026-01-15 16:06:25 +03:30
|
|
|
|
2026-01-20 19:57:19 +03:30
|
|
|
from libs.slow import (
|
|
|
|
|
JSONAPI,
|
|
|
|
|
App,
|
|
|
|
|
Request,
|
|
|
|
|
redirect,
|
|
|
|
|
render,
|
|
|
|
|
)
|
2026-01-21 02:06:54 +03:30
|
|
|
from libs.slow.responses import HTTPResponse, JSONResponse
|
2026-01-15 16:06:25 +03:30
|
|
|
|
|
|
|
|
|
2026-01-17 01:59:04 +03:30
|
|
|
class Coord:
|
2026-01-15 16:06:25 +03:30
|
|
|
x: int
|
|
|
|
|
y: int
|
|
|
|
|
|
2026-01-17 01:59:04 +03:30
|
|
|
def __init__(self, x: int, y: int):
|
|
|
|
|
self.x = x
|
|
|
|
|
self.y = y
|
|
|
|
|
|
|
|
|
|
def __iter__(self) -> Iterator[int]:
|
|
|
|
|
yield self.x
|
|
|
|
|
yield self.y
|
|
|
|
|
|
|
|
|
|
def copy(self):
|
|
|
|
|
return Coord(self.x, self.y)
|
|
|
|
|
|
|
|
|
|
def __str__(self):
|
2026-01-17 19:29:13 +03:30
|
|
|
return str(coord_to_pos(self))
|
2026-01-17 01:59:04 +03:30
|
|
|
|
|
|
|
|
def __eq__(self, other):
|
|
|
|
|
if isinstance(other, Coord):
|
|
|
|
|
return self.x == other.x and self.y == other.y
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
def __repr__(self):
|
|
|
|
|
return f"Coord({self.x}, {self.y})"
|
|
|
|
|
|
2026-01-15 16:06:25 +03:30
|
|
|
|
|
|
|
|
app = App()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class Color(Enum):
|
|
|
|
|
WHITE = 0
|
|
|
|
|
BLACK = 1
|
|
|
|
|
|
|
|
|
|
|
2026-01-17 01:59:04 +03:30
|
|
|
class State(Enum):
|
|
|
|
|
NOT_FINISHED = -1
|
|
|
|
|
TIE = 0
|
|
|
|
|
WHITE_WIN = 1
|
|
|
|
|
BLACK_WIN = 2
|
|
|
|
|
|
|
|
|
|
|
2026-01-15 16:06:25 +03:30
|
|
|
class Piece(Enum):
|
|
|
|
|
NONE = ""
|
|
|
|
|
EMPTY = "E"
|
|
|
|
|
|
|
|
|
|
# WHITE
|
|
|
|
|
WHITE_PAWN = "P"
|
|
|
|
|
WHITE_ROOK = "R"
|
|
|
|
|
WHITE_BISHOP = "B"
|
|
|
|
|
WHITE_KING = "K"
|
|
|
|
|
WHITE_QUEEN = "Q"
|
|
|
|
|
WHITE_CASTLE = "C"
|
|
|
|
|
|
|
|
|
|
# BLACK
|
|
|
|
|
BLACK_PAWN = "p"
|
|
|
|
|
BLACK_ROOK = "r"
|
|
|
|
|
BLACK_BISHOP = "b"
|
|
|
|
|
BLACK_KING = "k"
|
|
|
|
|
BLACK_QUEEN = "q"
|
|
|
|
|
BLACK_CASTLE = "c"
|
|
|
|
|
|
|
|
|
|
|
2026-01-17 01:59:04 +03:30
|
|
|
def coord_to_pos(xy: Coord) -> None | str:
|
2026-01-15 16:06:25 +03:30
|
|
|
if xy.x < 0 or xy.x > 7 or xy.y < 0 or xy.y > 7:
|
|
|
|
|
return None
|
|
|
|
|
return "abcdefgh"[xy.x] + "12345678"[xy.y]
|
|
|
|
|
|
|
|
|
|
|
2026-01-17 01:59:04 +03:30
|
|
|
def coord_to_pos_safe(xy: Coord) -> str:
|
|
|
|
|
return "abcdefgh"[xy.x] + "12345678"[xy.y]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def xy_to_pos(x: int, y: int) -> None | str:
|
|
|
|
|
if x < 0 or x > 7 or y < 0 or y > 7:
|
|
|
|
|
return None
|
|
|
|
|
return "abcdefgh"[x] + "12345678"[y]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def xy_to_pos_safe(x: int, y: int) -> str:
|
|
|
|
|
return "abcdefgh"[x] + "12345678"[y]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def pos_to_coord(pos: str) -> Coord:
|
|
|
|
|
return Coord("abcdefgh".index(pos[0]), "12345678".index(pos[1]))
|
2026-01-15 16:06:25 +03:30
|
|
|
|
|
|
|
|
|
|
|
|
|
class Board:
|
|
|
|
|
grid: list[list[Piece]]
|
|
|
|
|
|
|
|
|
|
def __init__(self):
|
2026-01-17 01:59:04 +03:30
|
|
|
self.grid: list[list[Piece]] = [[Piece.EMPTY] * 8 for _ in range(8)]
|
2026-01-15 16:06:25 +03:30
|
|
|
self.grid[0] = [
|
|
|
|
|
Piece.WHITE_CASTLE,
|
|
|
|
|
Piece.WHITE_ROOK,
|
|
|
|
|
Piece.WHITE_BISHOP,
|
|
|
|
|
Piece.WHITE_QUEEN,
|
2026-01-17 01:59:04 +03:30
|
|
|
Piece.WHITE_KING,
|
2026-01-15 16:06:25 +03:30
|
|
|
Piece.WHITE_BISHOP,
|
|
|
|
|
Piece.WHITE_ROOK,
|
|
|
|
|
Piece.WHITE_CASTLE,
|
|
|
|
|
]
|
|
|
|
|
self.grid[1] = [Piece.WHITE_PAWN] * 8
|
|
|
|
|
|
|
|
|
|
self.grid[6] = [Piece.BLACK_PAWN] * 8
|
|
|
|
|
self.grid[7] = [
|
|
|
|
|
Piece.BLACK_CASTLE,
|
|
|
|
|
Piece.BLACK_ROOK,
|
|
|
|
|
Piece.BLACK_BISHOP,
|
|
|
|
|
Piece.BLACK_QUEEN,
|
2026-01-17 01:59:04 +03:30
|
|
|
Piece.BLACK_KING,
|
2026-01-15 16:06:25 +03:30
|
|
|
Piece.BLACK_BISHOP,
|
|
|
|
|
Piece.BLACK_ROOK,
|
|
|
|
|
Piece.BLACK_CASTLE,
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
def index(self, pos: str) -> Piece:
|
|
|
|
|
let = "abcdefgh".index(pos[0])
|
|
|
|
|
num = "12345678".index(pos[1])
|
|
|
|
|
return self.grid[num][let]
|
|
|
|
|
|
2026-01-17 01:59:04 +03:30
|
|
|
def index_coord(self, coord: Coord) -> Piece:
|
|
|
|
|
if coord.x < 0 or coord.x > 7 or coord.y < 0 or coord.y > 7:
|
2026-01-15 16:06:25 +03:30
|
|
|
return Piece.NONE
|
2026-01-17 01:59:04 +03:30
|
|
|
return self.grid[coord.y][coord.x]
|
|
|
|
|
|
|
|
|
|
def index_xy(self, x: int, y: int) -> Piece:
|
|
|
|
|
if x < 0 or x > 7 or y < 0 or y > 7:
|
|
|
|
|
return Piece.NONE
|
|
|
|
|
return self.grid[y][x]
|
|
|
|
|
|
|
|
|
|
def copy(self) -> "Board":
|
|
|
|
|
board = Board()
|
|
|
|
|
board.grid = [[Piece(p.value) for p in r] for r in self.grid]
|
|
|
|
|
return board
|
2026-01-15 16:06:25 +03:30
|
|
|
|
|
|
|
|
def serialize(self) -> dict:
|
2026-01-17 01:59:04 +03:30
|
|
|
return {"grid": [[p.value for p in r] for r in self.grid]}
|
|
|
|
|
|
|
|
|
|
def __str__(self):
|
|
|
|
|
return "\n".join([" ".join([p.value for p in self.grid[i]]) for i in range(8)])
|
2026-01-15 16:06:25 +03:30
|
|
|
|
|
|
|
|
|
|
|
|
|
class Room:
|
2026-01-17 01:59:04 +03:30
|
|
|
board: Board
|
|
|
|
|
turn: Color
|
|
|
|
|
players: list[str]
|
2026-01-15 16:06:25 +03:30
|
|
|
last_move: datetime
|
2026-01-17 19:29:13 +03:30
|
|
|
game_start: datetime | None
|
2026-01-17 01:59:04 +03:30
|
|
|
state: State
|
|
|
|
|
|
|
|
|
|
def __init__(self):
|
|
|
|
|
self.turn = Color.WHITE
|
|
|
|
|
self.board = Board()
|
|
|
|
|
self.players = []
|
2026-01-18 19:06:57 +03:30
|
|
|
self.last_move = datetime.now(timezone.utc)
|
2026-01-17 19:29:13 +03:30
|
|
|
self.game_start = None
|
2026-01-17 01:59:04 +03:30
|
|
|
self.state = State.NOT_FINISHED
|
2026-01-15 16:06:25 +03:30
|
|
|
|
|
|
|
|
def start(self):
|
2026-01-18 19:06:57 +03:30
|
|
|
self.last_move = datetime.now(timezone.utc)
|
|
|
|
|
self.game_start = datetime.now(timezone.utc)
|
2026-01-15 16:06:25 +03:30
|
|
|
|
2026-01-17 01:59:04 +03:30
|
|
|
def add_player(self) -> None | tuple[str, Color]:
|
2026-01-15 16:06:25 +03:30
|
|
|
np: int = len(self.players)
|
|
|
|
|
if np >= 2:
|
|
|
|
|
return None
|
|
|
|
|
elif np == 1:
|
2026-01-17 01:59:04 +03:30
|
|
|
uid = str(uuid.uuid4())
|
2026-01-15 16:06:25 +03:30
|
|
|
self.players.append(uid)
|
2026-01-17 01:59:04 +03:30
|
|
|
self.start()
|
2026-01-15 16:06:25 +03:30
|
|
|
return uid, Color.BLACK
|
|
|
|
|
else:
|
2026-01-17 01:59:04 +03:30
|
|
|
uid = str(uuid.uuid4())
|
2026-01-15 16:06:25 +03:30
|
|
|
self.players.append(uid)
|
|
|
|
|
return uid, Color.WHITE
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
rooms: dict[str, Room] = {}
|
|
|
|
|
|
|
|
|
|
|
2026-01-18 19:07:25 +03:30
|
|
|
favicon = Path("favicon.ico").read_bytes()
|
|
|
|
|
favicon_update = Path("favicon_update.ico").read_bytes()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.GET("/favicon.ico")
|
|
|
|
|
async def favicon_s(request):
|
|
|
|
|
return HTTPResponse(request, favicon, content_type="image/x-icon")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.GET("/favicon_update.ico")
|
|
|
|
|
async def favicon_update_s(request):
|
|
|
|
|
return HTTPResponse(request, favicon_update, content_type="image/x-icon")
|
|
|
|
|
|
|
|
|
|
|
2026-01-20 19:57:01 +03:30
|
|
|
letters = string.ascii_lowercase + string.digits
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def room_key():
|
|
|
|
|
key: str = "".join(random.choices(letters, k=4))
|
|
|
|
|
while key in rooms.keys():
|
|
|
|
|
key: str = "".join(random.choices(letters, k=4))
|
|
|
|
|
return key
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.POST("/create_room")
|
|
|
|
|
async def new_room(request):
|
|
|
|
|
if len(rooms) == len(letters) ** 4:
|
2026-01-21 02:06:54 +03:30
|
|
|
return HTTPResponse("Out of service", status=501)
|
2026-01-20 19:57:01 +03:30
|
|
|
key = room_key()
|
|
|
|
|
rooms[key] = Room()
|
|
|
|
|
return JSONResponse(
|
|
|
|
|
request,
|
|
|
|
|
{
|
|
|
|
|
"id": key,
|
|
|
|
|
},
|
|
|
|
|
)
|
|
|
|
|
|
2026-01-20 19:57:38 +03:30
|
|
|
|
|
|
|
|
quick_queue: deque[str]
|
|
|
|
|
quick_map: dict[str, Room]
|
|
|
|
|
lock = asyncio.Lock()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.POST("/quick")
|
|
|
|
|
async def quick_match(request: Request):
|
|
|
|
|
global quick_queue, quick_map
|
|
|
|
|
data = parse(request.body)
|
|
|
|
|
async with lock:
|
|
|
|
|
if (
|
|
|
|
|
data
|
|
|
|
|
and len(data) == 1
|
|
|
|
|
and "queue_id" in data
|
|
|
|
|
and data["queue_id"] in quick_queue
|
|
|
|
|
):
|
|
|
|
|
first = None
|
|
|
|
|
second = None
|
|
|
|
|
position: int = 0
|
|
|
|
|
# UPDATE LOGIC
|
2026-01-20 19:59:08 +03:30
|
|
|
while position < len(quick_queue):
|
2026-01-20 19:57:38 +03:30
|
|
|
if quick_queue[position] not in quick_map:
|
|
|
|
|
if not first:
|
|
|
|
|
first = quick_queue[position]
|
|
|
|
|
else:
|
|
|
|
|
second = quick_queue[position]
|
2026-01-20 19:59:08 +03:30
|
|
|
break
|
2026-01-20 19:57:38 +03:30
|
|
|
position += 1
|
|
|
|
|
if first and second:
|
|
|
|
|
room = Room()
|
|
|
|
|
k = room_key()
|
|
|
|
|
rooms[k] = room
|
|
|
|
|
quick_map[first] = room
|
|
|
|
|
quick_map[second] = room
|
|
|
|
|
|
|
|
|
|
qid = data["queue_id"]
|
|
|
|
|
if qid in quick_map:
|
2026-01-21 02:06:54 +03:30
|
|
|
return JSONResponse({"room_id": quick_map[qid]})
|
2026-01-20 19:57:38 +03:30
|
|
|
else:
|
2026-01-21 02:06:54 +03:30
|
|
|
return JSONResponse({}) # Client handles empty as continue to wait
|
2026-01-20 19:57:38 +03:30
|
|
|
|
|
|
|
|
else:
|
|
|
|
|
qid = str(uuid.uuid4())
|
|
|
|
|
quick_queue.append(qid)
|
2026-01-21 02:06:54 +03:30
|
|
|
return JSONResponse({"queue_id": qid})
|
2026-01-20 19:57:38 +03:30
|
|
|
|
|
|
|
|
|
2026-01-15 16:06:25 +03:30
|
|
|
@app.GET("/")
|
|
|
|
|
async def home(request):
|
2026-01-21 02:06:54 +03:30
|
|
|
return render("home.html")
|
2026-01-17 02:01:19 +03:30
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.GET("/<id>")
|
2026-01-20 19:57:01 +03:30
|
|
|
async def game(request, id):
|
|
|
|
|
if not re.match(r"[a-z0-9]{4}", id):
|
|
|
|
|
return redirect("/")
|
|
|
|
|
if id not in rooms:
|
|
|
|
|
return redirect("/")
|
2026-01-21 02:06:54 +03:30
|
|
|
return render("game.html")
|
2026-01-15 16:06:25 +03:30
|
|
|
|
|
|
|
|
|
2026-01-17 01:59:04 +03:30
|
|
|
@app.POST("/join/<room_id>")
|
2026-01-15 16:06:25 +03:30
|
|
|
async def join(request, room_id):
|
2026-01-17 01:59:04 +03:30
|
|
|
if room_id not in rooms:
|
2026-01-21 02:06:54 +03:30
|
|
|
return JSONResponse({"code": "NOGO", "error": "Room not found."}, 404)
|
2026-01-17 01:59:04 +03:30
|
|
|
room: Room = rooms[room_id]
|
2026-01-15 16:06:25 +03:30
|
|
|
player = room.add_player()
|
|
|
|
|
if player:
|
|
|
|
|
return JSONResponse(
|
2026-01-17 18:18:16 +03:30
|
|
|
request,
|
2026-01-15 16:06:25 +03:30
|
|
|
{
|
|
|
|
|
"code": "JOIN",
|
|
|
|
|
"id": player[0],
|
2026-01-17 01:59:04 +03:30
|
|
|
"color": player[1].value,
|
|
|
|
|
"turn": room.turn.value,
|
2026-01-15 16:06:25 +03:30
|
|
|
"board": room.board.serialize(),
|
2026-01-17 01:59:04 +03:30
|
|
|
"ready": len(room.players) == 2,
|
2026-01-18 19:06:57 +03:30
|
|
|
"state": room.state.value,
|
|
|
|
|
"start_time": (
|
|
|
|
|
room.game_start or datetime.now(timezone.utc)
|
|
|
|
|
).isoformat(),
|
2026-01-17 18:18:16 +03:30
|
|
|
},
|
2026-01-15 16:06:25 +03:30
|
|
|
)
|
|
|
|
|
else:
|
|
|
|
|
return JSONResponse(
|
2026-01-17 18:18:16 +03:30
|
|
|
request,
|
2026-01-18 19:06:57 +03:30
|
|
|
{
|
|
|
|
|
"code": "FULL",
|
|
|
|
|
"error": "Room Full",
|
|
|
|
|
"board": room.board.serialize(),
|
2026-01-18 19:12:59 +03:30
|
|
|
"ready": len(room.players) == 2,
|
2026-01-18 19:06:57 +03:30
|
|
|
"turn": room.turn.value,
|
|
|
|
|
"state": room.state.value,
|
|
|
|
|
"start_time": (
|
|
|
|
|
room.game_start or datetime.now(timezone.utc)
|
|
|
|
|
).isoformat(),
|
|
|
|
|
},
|
2026-01-15 16:06:25 +03:30
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
2026-01-17 01:59:04 +03:30
|
|
|
def parse(body: str | bytes) -> dict[str, Any] | None:
|
2026-01-15 16:06:25 +03:30
|
|
|
try:
|
|
|
|
|
return json.loads(body)
|
|
|
|
|
except Exception:
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
2026-01-17 01:59:04 +03:30
|
|
|
def get_piece_moves(piece_kind, board: Board, is_white, src: str) -> list[Coord]:
|
|
|
|
|
valids: list[Coord] = []
|
|
|
|
|
if piece_kind == "p":
|
|
|
|
|
dir = 1 if is_white else -1
|
|
|
|
|
x, y = pos_to_coord(src)
|
|
|
|
|
if board.index_xy(x, y + dir) == Piece.EMPTY:
|
|
|
|
|
valids.append(Coord(x=x, y=y + dir))
|
|
|
|
|
if (y == 1 or y == 6) and board.index_xy(x, y + 2 * dir) == Piece.EMPTY:
|
|
|
|
|
valids.append(Coord(x=x, y=y + 2 * dir))
|
|
|
|
|
if (
|
|
|
|
|
board.index_xy(x + 1, y + dir)
|
|
|
|
|
not in [Piece.EMPTY, Piece.NONE, Piece.BLACK_KING, Piece.WHITE_KING]
|
2026-01-17 19:29:13 +03:30
|
|
|
and board.index_xy(x + 1, y + dir).value.isupper() != is_white
|
2026-01-17 01:59:04 +03:30
|
|
|
):
|
2026-01-17 19:29:13 +03:30
|
|
|
valids.append(Coord(x=x + 1, y=y + dir))
|
2026-01-17 01:59:04 +03:30
|
|
|
if (
|
|
|
|
|
board.index_xy(x - 1, y + dir)
|
|
|
|
|
not in [Piece.EMPTY, Piece.NONE, Piece.BLACK_KING, Piece.WHITE_KING]
|
2026-01-17 19:29:13 +03:30
|
|
|
and board.index_xy(x - 1, y + dir).value.isupper() != is_white
|
2026-01-17 01:59:04 +03:30
|
|
|
):
|
2026-01-17 19:29:13 +03:30
|
|
|
valids.append(Coord(x=x - 1, y=y + dir))
|
2026-01-17 01:59:04 +03:30
|
|
|
elif piece_kind == "b":
|
|
|
|
|
x, y = pos_to_coord(src)
|
|
|
|
|
comb = product([1, -1], repeat=2)
|
|
|
|
|
for dir in comb:
|
|
|
|
|
target: Coord = Coord(x=x + dir[0], y=y + dir[1])
|
|
|
|
|
while (
|
|
|
|
|
board.index_coord(target) == Piece.EMPTY
|
|
|
|
|
and board.index_coord(target) != Piece.NONE
|
|
|
|
|
):
|
|
|
|
|
valids.append(target.copy())
|
|
|
|
|
target.x += dir[0]
|
|
|
|
|
target.y += dir[1]
|
|
|
|
|
if (
|
|
|
|
|
p := board.index_coord(target)
|
|
|
|
|
) != Piece.NONE and p.value.isupper() != is_white:
|
|
|
|
|
valids.append(target.copy())
|
|
|
|
|
elif piece_kind == "q":
|
|
|
|
|
x, y = pos_to_coord(src)
|
|
|
|
|
comb = product([1, -1, 0], repeat=2)
|
|
|
|
|
for dir in comb:
|
|
|
|
|
if dir[0] ** 2 + dir[1] ** 2 == 0:
|
|
|
|
|
continue # x=0 y=0 cannot move => invalid
|
|
|
|
|
target: Coord = Coord(x=x + dir[0], y=y + dir[1])
|
|
|
|
|
while (
|
|
|
|
|
board.index_coord(target) == Piece.EMPTY
|
|
|
|
|
and board.index_coord(target) != Piece.NONE
|
|
|
|
|
):
|
|
|
|
|
valids.append(target.copy())
|
|
|
|
|
target.x += dir[0]
|
|
|
|
|
target.y += dir[1]
|
|
|
|
|
if (
|
|
|
|
|
p := board.index_coord(target)
|
|
|
|
|
) != Piece.NONE and p.value.isupper() != is_white:
|
|
|
|
|
valids.append(target.copy())
|
|
|
|
|
elif piece_kind == "c":
|
|
|
|
|
x, y = pos_to_coord(src)
|
|
|
|
|
comb = [(-1, 0), (1, 0), (0, -1), (0, 1)]
|
|
|
|
|
for dir in comb:
|
|
|
|
|
target: Coord = Coord(x=x + dir[0], y=y + dir[1])
|
|
|
|
|
while (
|
|
|
|
|
board.index_coord(target) == Piece.EMPTY
|
|
|
|
|
and board.index_coord(target) != Piece.NONE
|
|
|
|
|
):
|
|
|
|
|
valids.append(target.copy())
|
|
|
|
|
target.x += dir[0]
|
|
|
|
|
target.y += dir[1]
|
|
|
|
|
if (
|
|
|
|
|
p := board.index_coord(target)
|
|
|
|
|
) != Piece.NONE and p.value.isupper() != is_white:
|
|
|
|
|
valids.append(target.copy())
|
|
|
|
|
elif piece_kind == "k":
|
|
|
|
|
x, y = pos_to_coord(src)
|
|
|
|
|
comb = product([-1, 0, 1], repeat=2)
|
|
|
|
|
for dir in comb:
|
|
|
|
|
if dir[0] ** 2 + dir[1] ** 2 == 0:
|
|
|
|
|
continue # x=0 y=0 cannot move => invalid
|
|
|
|
|
target: Coord = Coord(x=x + dir[0], y=y + dir[1])
|
2026-01-17 19:29:13 +03:30
|
|
|
if (p := board.index_coord(target)) != Piece.NONE and (
|
|
|
|
|
p.value.isupper() != is_white or p.value == "E"
|
|
|
|
|
):
|
2026-01-17 01:59:04 +03:30
|
|
|
valids.append(target.copy())
|
|
|
|
|
elif piece_kind == "r":
|
|
|
|
|
x, y = pos_to_coord(src)
|
|
|
|
|
moves = [(2, 1), (2, -1), (-2, 1), (-2, -1), (1, 2), (-1, 2), (1, -2), (-1, -2)]
|
|
|
|
|
for m in moves:
|
|
|
|
|
target: Coord = Coord(x=x + m[0], y=y + m[1])
|
2026-01-17 19:29:13 +03:30
|
|
|
if (p := board.index_coord(target)) != Piece.NONE and (
|
|
|
|
|
p.value.isupper() != is_white or p.value == "E"
|
|
|
|
|
):
|
2026-01-17 01:59:04 +03:30
|
|
|
valids.append(target.copy())
|
2026-01-19 18:18:15 +03:30
|
|
|
v_temp = [a for a in valids]
|
|
|
|
|
valids.clear()
|
|
|
|
|
for a in v_temp:
|
|
|
|
|
if board.index_coord(a).value.lower != "k":
|
|
|
|
|
valids.append(a)
|
2026-01-17 01:59:04 +03:30
|
|
|
return valids
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def generate_valid_moves(
|
|
|
|
|
piece_kind: str, board: Board, is_white: bool, src: str
|
|
|
|
|
) -> list[Coord]:
|
|
|
|
|
possible_moves: list[Coord] = get_piece_moves(piece_kind, board, is_white, src)
|
|
|
|
|
valids: list[Coord] = []
|
|
|
|
|
king: Coord
|
|
|
|
|
for i in range(8):
|
|
|
|
|
for j in range(8):
|
|
|
|
|
p = board.grid[i][j]
|
|
|
|
|
if (
|
|
|
|
|
p in [Piece.BLACK_KING, Piece.WHITE_KING]
|
|
|
|
|
and p.value.isupper() == is_white
|
|
|
|
|
):
|
|
|
|
|
king = Coord(j, i)
|
|
|
|
|
break
|
|
|
|
|
|
|
|
|
|
for m in possible_moves:
|
|
|
|
|
fake_board = board.copy()
|
|
|
|
|
fake_board.grid[m.y][m.x] = fake_board.index(src)
|
|
|
|
|
sx, sy = pos_to_coord(src)
|
|
|
|
|
fake_board.grid[sy][sx] = Piece.EMPTY
|
|
|
|
|
king_safe = True
|
|
|
|
|
for i in range(8):
|
|
|
|
|
for j in range(8):
|
2026-01-21 02:07:03 +03:30
|
|
|
p = board.grid[j][i]
|
2026-01-17 01:59:04 +03:30
|
|
|
if p != Piece.EMPTY and p.value.isupper() != is_white:
|
|
|
|
|
# Enemy
|
|
|
|
|
enemy_moves = get_piece_moves(
|
|
|
|
|
p.value.lower(), board, not is_white, xy_to_pos_safe(j, i)
|
|
|
|
|
)
|
|
|
|
|
if king not in enemy_moves:
|
|
|
|
|
continue
|
2026-01-19 18:30:09 +03:30
|
|
|
ni = i
|
|
|
|
|
nj = j
|
|
|
|
|
if j == sx and i == sy:
|
|
|
|
|
ni = m.y
|
|
|
|
|
nj = m.x
|
2026-01-17 01:59:04 +03:30
|
|
|
new_enemy_moves = get_piece_moves(
|
2026-01-19 18:30:09 +03:30
|
|
|
p.value.lower(),
|
|
|
|
|
fake_board,
|
|
|
|
|
not is_white,
|
|
|
|
|
xy_to_pos_safe(nj, ni),
|
2026-01-17 01:59:04 +03:30
|
|
|
)
|
|
|
|
|
if king in new_enemy_moves:
|
|
|
|
|
king_safe = False
|
|
|
|
|
break
|
|
|
|
|
if not king_safe:
|
|
|
|
|
break
|
|
|
|
|
if king_safe:
|
|
|
|
|
valids.append(m)
|
|
|
|
|
return valids
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.POST("/move/<room_id>")
|
2026-01-15 16:06:25 +03:30
|
|
|
@JSONAPI
|
2026-01-17 01:59:04 +03:30
|
|
|
async def move(request: Request, room_id):
|
2026-01-15 16:06:25 +03:30
|
|
|
req = parse(request.body)
|
|
|
|
|
if not req:
|
|
|
|
|
return 400, {}
|
|
|
|
|
if "uid" not in req or not isinstance(req["uid"], str):
|
|
|
|
|
return 400, {}
|
|
|
|
|
if "from" not in req or not isinstance(req["from"], str):
|
|
|
|
|
return 400, {}
|
|
|
|
|
if "to" not in req or not isinstance(req["to"], str):
|
|
|
|
|
return 400, {}
|
2026-01-17 01:59:04 +03:30
|
|
|
if len(req) > 3:
|
|
|
|
|
return 400, {}
|
2026-01-15 16:06:25 +03:30
|
|
|
if room_id not in rooms:
|
|
|
|
|
return 400, {"code": "NOEX", "error": "Room does not exist"}
|
|
|
|
|
room = rooms[room_id]
|
|
|
|
|
if len(room.players) != 2:
|
|
|
|
|
return 400, {"code": "PRES", "error": "Game has not been started"}
|
2026-01-18 19:07:25 +03:30
|
|
|
if room.state != State.NOT_FINISHED:
|
|
|
|
|
return 400, {"code": "FINI", "error": "Game has not finished."}
|
2026-01-15 16:06:25 +03:30
|
|
|
board = room.board
|
|
|
|
|
uid = req["uid"]
|
|
|
|
|
src = req["from"].lower()
|
|
|
|
|
dst = req["to"].lower()
|
|
|
|
|
color = Color.WHITE if room.players[0] == uid else Color.BLACK
|
|
|
|
|
turn = room.turn == color
|
|
|
|
|
if not turn:
|
|
|
|
|
return 400, {"code": "OTHR", "error": "Not your turn."}
|
|
|
|
|
if (
|
|
|
|
|
len(src) != 2
|
|
|
|
|
or len(dst) != 2
|
|
|
|
|
or src[0] not in "abcdefgh"
|
|
|
|
|
or src[1] not in "12345678"
|
|
|
|
|
or dst[0] not in "abcdefgh"
|
|
|
|
|
or dst[1] not in "12345678"
|
2026-01-17 01:59:04 +03:30
|
|
|
or board.index(src) == Piece.EMPTY
|
2026-01-15 16:06:25 +03:30
|
|
|
):
|
2026-01-17 01:59:04 +03:30
|
|
|
return 400, {"code": "LMOV", "error": "Bad Move 1"}
|
2026-01-15 16:06:25 +03:30
|
|
|
srcp = board.index(src)
|
|
|
|
|
dstp = board.index(dst)
|
|
|
|
|
piece_kind = srcp.value.lower()
|
|
|
|
|
is_white = srcp.value.isupper()
|
|
|
|
|
if (is_white and color == Color.BLACK) or (not is_white and color == Color.WHITE):
|
|
|
|
|
return 400, {"code": "NOTU", "error": "Not your piece"}
|
|
|
|
|
if dstp != Piece.EMPTY and (
|
|
|
|
|
(dstp.value.isupper() and color == Color.WHITE)
|
|
|
|
|
or (dstp.value.islower() and color == Color.BLACK)
|
|
|
|
|
):
|
|
|
|
|
return 400, {"code": "HTME", "error": "Cannot move to your own piece"}
|
2026-01-17 01:59:04 +03:30
|
|
|
valid_moves = generate_valid_moves(piece_kind, board, is_white, src)
|
|
|
|
|
if (c := pos_to_coord(dst)) in valid_moves:
|
|
|
|
|
board.grid[c.y][c.x] = srcp
|
|
|
|
|
sx, sy = pos_to_coord(src)
|
|
|
|
|
board.grid[sy][sx] = Piece.EMPTY
|
2026-01-19 18:14:37 +03:30
|
|
|
if (c.y == 0 or c.y == 7) and piece_kind == "p":
|
2026-01-17 01:59:04 +03:30
|
|
|
board.grid[c.y][c.x] = Piece.WHITE_QUEEN if is_white else Piece.BLACK_QUEEN
|
|
|
|
|
room.turn = Color.BLACK if color == Color.WHITE else Color.WHITE
|
2026-01-18 19:07:25 +03:30
|
|
|
room.last_move = datetime.now(timezone.utc)
|
|
|
|
|
opp_checkmate = True
|
|
|
|
|
for i in range(8):
|
|
|
|
|
for j in range(8):
|
|
|
|
|
P = board.index_xy(i, j)
|
|
|
|
|
if P != "E" and P.value.isupper() != is_white:
|
|
|
|
|
moves = generate_valid_moves(
|
|
|
|
|
P.value.lower(), board, not is_white, xy_to_pos_safe(i, j)
|
|
|
|
|
)
|
|
|
|
|
if len(moves) > 0:
|
|
|
|
|
opp_checkmate = False
|
|
|
|
|
break
|
|
|
|
|
if not opp_checkmate:
|
|
|
|
|
break
|
|
|
|
|
if opp_checkmate:
|
|
|
|
|
room.state = State.BLACK_WIN if is_white else State.WHITE_WIN
|
2026-01-17 01:59:04 +03:30
|
|
|
return {
|
|
|
|
|
"code": "MOVD",
|
|
|
|
|
"color": color.value,
|
|
|
|
|
"turn": room.turn.value,
|
2026-01-18 19:07:25 +03:30
|
|
|
"state": room.state.value,
|
2026-01-17 01:59:04 +03:30
|
|
|
"board": board.serialize(),
|
|
|
|
|
}
|
|
|
|
|
else:
|
|
|
|
|
return 400, {"code": "LMOV", "error": "Bad Move 2"}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.GET("/poll/<room_id>")
|
|
|
|
|
@JSONAPI
|
|
|
|
|
async def poll(request, room_id):
|
|
|
|
|
for key in rooms:
|
|
|
|
|
room = rooms[key]
|
2026-01-18 19:06:57 +03:30
|
|
|
if (datetime.now(timezone.utc) - room.last_move) >= timedelta(hours=24):
|
2026-01-17 01:59:04 +03:30
|
|
|
del rooms[key]
|
2026-01-17 19:29:13 +03:30
|
|
|
if (
|
|
|
|
|
room.game_start
|
2026-01-18 19:06:57 +03:30
|
|
|
and (datetime.now(timezone.utc) - room.game_start) >= timedelta(minutes=30)
|
2026-01-17 19:29:13 +03:30
|
|
|
and room.state == State.NOT_FINISHED
|
|
|
|
|
):
|
|
|
|
|
room.state = State.TIE
|
2026-01-17 01:59:04 +03:30
|
|
|
if room_id not in rooms:
|
|
|
|
|
return 400, {"code": "NOEX", "error": "Room does not exist"}
|
|
|
|
|
room = rooms[room_id]
|
|
|
|
|
return {
|
|
|
|
|
"code": "STAT",
|
|
|
|
|
"turn": room.turn.value,
|
|
|
|
|
"board": room.board.serialize(),
|
|
|
|
|
"ready": len(room.players) == 2,
|
|
|
|
|
"state": room.state.value,
|
2026-01-18 19:06:57 +03:30
|
|
|
"start_time": (room.game_start or datetime.now(timezone.utc)).isoformat(),
|
2026-01-17 01:59:04 +03:30
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.POST("/moves/<room_id>")
|
|
|
|
|
@JSONAPI
|
|
|
|
|
async def generate_moves(request: Request, room_id):
|
|
|
|
|
req = parse(request.body)
|
|
|
|
|
if not req:
|
|
|
|
|
return 400, {}
|
|
|
|
|
if "from" not in req or not isinstance(req["from"], str):
|
|
|
|
|
return 400, {}
|
|
|
|
|
if len(req) > 1:
|
|
|
|
|
return 400, {}
|
|
|
|
|
if room_id not in rooms:
|
|
|
|
|
return 400, {"code": "NOEX", "error": "Room does not exist"}
|
|
|
|
|
room = rooms[room_id]
|
|
|
|
|
if len(room.players) != 2:
|
|
|
|
|
return 400, {"code": "PRES", "error": "Game has not been started"}
|
|
|
|
|
board = room.board
|
|
|
|
|
src = req["from"].lower()
|
|
|
|
|
srcp = board.index(src)
|
|
|
|
|
color = Color.WHITE if srcp.value.isupper() else Color.BLACK
|
|
|
|
|
if (
|
|
|
|
|
len(src) != 2
|
|
|
|
|
or src[0] not in "abcdefgh"
|
|
|
|
|
or src[1] not in "12345678"
|
|
|
|
|
or srcp == Piece.EMPTY
|
|
|
|
|
):
|
|
|
|
|
return 400, {"code": "LMOV", "error": "Bad Move"}
|
|
|
|
|
piece_kind = srcp.value.lower()
|
|
|
|
|
is_white = srcp.value.isupper()
|
|
|
|
|
if (is_white and color == Color.BLACK) or (not is_white and color == Color.WHITE):
|
|
|
|
|
return 400, {"code": "NOTU", "error": "Not your piece"}
|
|
|
|
|
valid_moves = generate_valid_moves(piece_kind, board, is_white, src)
|
|
|
|
|
return {
|
|
|
|
|
"moves": ",".join([coord_to_pos_safe(c) for c in valid_moves]),
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def sanitize_filename(filename: str, base: Path = Path.cwd().resolve()) -> Path:
|
|
|
|
|
p = (base / filename).resolve()
|
|
|
|
|
|
|
|
|
|
if not p.is_relative_to(base):
|
|
|
|
|
raise ValueError("Filename leaves current directory")
|
|
|
|
|
|
|
|
|
|
return p
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.GET("/static/<fn>")
|
|
|
|
|
async def static(request, fn):
|
|
|
|
|
try:
|
|
|
|
|
path = sanitize_filename(fn, Path("static/").resolve())
|
2026-01-17 18:32:37 +03:30
|
|
|
|
2026-01-21 02:06:54 +03:30
|
|
|
return HTTPResponse(path.read_bytes(), content_type="application/octet-stream")
|
2026-01-17 01:59:04 +03:30
|
|
|
except Exception:
|
2026-01-21 02:06:54 +03:30
|
|
|
return HTTPResponse("404 File Not Found", status=404)
|
2026-01-15 16:06:25 +03:30
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
2026-01-17 02:00:28 +03:30
|
|
|
asyncio.run(app.run(port=8989))
|