Initial Commit
This commit is contained in:
280
app.py
Normal file
280
app.py
Normal file
@@ -0,0 +1,280 @@
|
||||
import asyncio
|
||||
import json
|
||||
import uuid
|
||||
from datetime import datetime
|
||||
from enum import Enum
|
||||
from itertools import combinations
|
||||
from typing import TypedDict
|
||||
|
||||
from slow import JSONAPI, App, JSONResponse, render
|
||||
|
||||
|
||||
class Coord(TypedDict):
|
||||
x: int
|
||||
y: int
|
||||
|
||||
|
||||
app = App()
|
||||
|
||||
|
||||
class Color(Enum):
|
||||
WHITE = 0
|
||||
BLACK = 1
|
||||
|
||||
|
||||
class Piece(Enum):
|
||||
NONE = ""
|
||||
EMPTY = "E"
|
||||
|
||||
# WHITE
|
||||
WHITE_PAWN = "P"
|
||||
WHITE_ROOK = "R"
|
||||
WHITE_BISHOP = "B"
|
||||
WHITE_KING = "K"
|
||||
WHITE_QUEEN = "Q"
|
||||
WHITE_CASTLE = "C"
|
||||
|
||||
# BLACK
|
||||
BLACK_PAWN = "p"
|
||||
BLACK_ROOK = "r"
|
||||
BLACK_BISHOP = "b"
|
||||
BLACK_KING = "k"
|
||||
BLACK_QUEEN = "q"
|
||||
BLACK_CASTLE = "c"
|
||||
|
||||
|
||||
def xy_to_coord(xy: Coord) -> None | str:
|
||||
if xy.x < 0 or xy.x > 7 or xy.y < 0 or xy.y > 7:
|
||||
return None
|
||||
return "abcdefgh"[xy.x] + "12345678"[xy.y]
|
||||
|
||||
|
||||
def coord_to_xy(pos: str) -> tuple[int, int]:
|
||||
return ("abcdefgh".inedx(pos[0]), "12345678".inedx(pos[1]))
|
||||
|
||||
|
||||
class Board:
|
||||
grid: list[list[Piece]]
|
||||
|
||||
def __init__(self):
|
||||
self.grid = [[Piece.EMPTY] * 8] * 8
|
||||
self.grid[0] = [
|
||||
Piece.WHITE_CASTLE,
|
||||
Piece.WHITE_ROOK,
|
||||
Piece.WHITE_BISHOP,
|
||||
Piece.WHITE_KING,
|
||||
Piece.WHITE_QUEEN,
|
||||
Piece.WHITE_BISHOP,
|
||||
Piece.WHITE_ROOK,
|
||||
Piece.WHITE_CASTLE,
|
||||
]
|
||||
self.grid[1] = [Piece.WHITE_PAWN] * 8
|
||||
|
||||
self.grid[6] = [Piece.BLACK_PAWN] * 8
|
||||
self.grid[7] = [
|
||||
Piece.BLACK_CASTLE,
|
||||
Piece.BLACK_ROOK,
|
||||
Piece.BLACK_BISHOP,
|
||||
Piece.BLACK_KING,
|
||||
Piece.BLACK_QUEEN,
|
||||
Piece.BLACK_BISHOP,
|
||||
Piece.BLACK_ROOK,
|
||||
Piece.BLACK_CASTLE,
|
||||
]
|
||||
|
||||
def index(self, pos: str) -> Piece:
|
||||
let = "abcdefgh".index(pos[0])
|
||||
num = "12345678".index(pos[1])
|
||||
return self.grid[num][let]
|
||||
|
||||
def index_xy(self, xy: Coord) -> Piece:
|
||||
if xy.x < 0 or xy.x > 7 or xy.y < 0 or xy.y > 7:
|
||||
return Piece.NONE
|
||||
return self.grid[xy.y][xy.x]
|
||||
|
||||
def serialize(self) -> dict:
|
||||
return {"grid": self.grid}
|
||||
|
||||
|
||||
class Room:
|
||||
board: Board = Board()
|
||||
turn: Color = Color.WHITE
|
||||
players: list[uuid.UUID] = []
|
||||
last_move: datetime
|
||||
|
||||
def start(self):
|
||||
self.last_move = datetime.now()
|
||||
|
||||
def add_player(self) -> None | tuple[uuid.UUID, Color]:
|
||||
np: int = len(self.players)
|
||||
if np >= 2:
|
||||
return None
|
||||
elif np == 1:
|
||||
uid = uuid.uuid4()
|
||||
self.players.append(uid)
|
||||
return uid, Color.BLACK
|
||||
else:
|
||||
uid = uuid.uuid4()
|
||||
self.players.append(uid)
|
||||
return uid, Color.WHITE
|
||||
|
||||
|
||||
rooms: dict[str, Room] = {}
|
||||
|
||||
|
||||
@app.GET("/")
|
||||
async def home(request):
|
||||
return render("index.html")
|
||||
|
||||
|
||||
@app.POST("/join/<id>")
|
||||
async def join(request, room_id):
|
||||
room: Room = rooms.get(room_id, Room())
|
||||
player = room.add_player()
|
||||
if player:
|
||||
return JSONResponse(
|
||||
{
|
||||
"code": "JOIN",
|
||||
"id": player[0],
|
||||
"color": player[1],
|
||||
"board": room.board.serialize(),
|
||||
}
|
||||
)
|
||||
else:
|
||||
return JSONResponse(
|
||||
{"code": "FULL", "error": "Room Full", "board": room.board.serialize()}
|
||||
)
|
||||
|
||||
|
||||
def parse(body: str) -> dict | None:
|
||||
try:
|
||||
return json.loads(body)
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
@app.POST("/move/<id>")
|
||||
@JSONAPI
|
||||
async def move(request, room_id):
|
||||
req = parse(request.body)
|
||||
if not req:
|
||||
return 400, {}
|
||||
if "uid" not in req or not isinstance(req["uid"], str):
|
||||
return 400, {}
|
||||
if "from" not in req or not isinstance(req["from"], str):
|
||||
return 400, {}
|
||||
if "to" not in req or not isinstance(req["to"], str):
|
||||
return 400, {}
|
||||
if room_id not in rooms:
|
||||
return 400, {"code": "NOEX", "error": "Room does not exist"}
|
||||
room = rooms[room_id]
|
||||
if len(room.players) != 2:
|
||||
return 400, {"code": "PRES", "error": "Game has not been started"}
|
||||
board = room.board
|
||||
uid = req["uid"]
|
||||
src = req["from"].lower()
|
||||
dst = req["to"].lower()
|
||||
color = Color.WHITE if room.players[0] == uid else Color.BLACK
|
||||
turn = room.turn == color
|
||||
if not turn:
|
||||
return 400, {"code": "OTHR", "error": "Not your turn."}
|
||||
if (
|
||||
len(src) != 2
|
||||
or len(dst) != 2
|
||||
or src[0] not in "abcdefgh"
|
||||
or src[1] not in "12345678"
|
||||
or dst[0] not in "abcdefgh"
|
||||
or dst[1] not in "12345678"
|
||||
or board.index(src) != Piece.EMPTY
|
||||
):
|
||||
return 400, {"code": "LMOV", "error": "Bad Move"}
|
||||
srcp = board.index(src)
|
||||
dstp = board.index(dst)
|
||||
piece_kind = srcp.value.lower()
|
||||
is_white = srcp.value.isupper()
|
||||
if (is_white and color == Color.BLACK) or (not is_white and color == Color.WHITE):
|
||||
return 400, {"code": "NOTU", "error": "Not your piece"}
|
||||
if dstp != Piece.EMPTY and (
|
||||
(dstp.value.isupper() and color == Color.WHITE)
|
||||
or (dstp.value.islower() and color == Color.BLACK)
|
||||
):
|
||||
return 400, {"code": "HTME", "error": "Cannot move to your own piece"}
|
||||
valids: list[Coord] = []
|
||||
if piece_kind == "p":
|
||||
dir = 1 if is_white else -1
|
||||
x, y = coord_to_xy(src)
|
||||
if board.index_xy(x, y + dir) == Piece.EMPTY:
|
||||
valids.append({"x": x, "y": y + dir})
|
||||
if (y == 1 or y == 6) and board.index_xy(x, y + 2 * dir) == Piece.EMPTY:
|
||||
valids.append({"x": x, "y": y + 2 * dir})
|
||||
if (
|
||||
board.index_xy(x + 1, y + dir)
|
||||
not in [Piece.EMPTY, Piece.NONE, Piece.BLACK_KING, Piece.WHITE_KING]
|
||||
and board.index_xy(x + 1, y + dir).value.upper() != is_white
|
||||
):
|
||||
valids.append({"x": x + 1, "y": y + 1})
|
||||
if (
|
||||
board.index_xy(x - 1, y + dir)
|
||||
not in [Piece.EMPTY, Piece.NONE, Piece.BLACK_KING, Piece.WHITE_KING]
|
||||
and board.index_xy(x - 1, y + dir).value.upper() != is_white
|
||||
):
|
||||
valids.append({"x": x - 1, "y": y + 1})
|
||||
if piece_kind == "b":
|
||||
x, y = coord_to_xy(src)
|
||||
comb = combinations([1, -1], 2)
|
||||
for dir in comb:
|
||||
scl = 1
|
||||
target: Coord = {"x": x + dir[0] * scl, "y": y + dir[1] * scl}
|
||||
while board.index_xy(target) not in [Piece.EMPTY, Piece.NONE]:
|
||||
valids.append(target.copy())
|
||||
scl += 1
|
||||
target: Coord = {"x": x + dir[0] * scl, "y": y + dir[1] * scl}
|
||||
if (
|
||||
p := board.index_xy(target)
|
||||
) != Piece.NONE and p.value.isupper() != is_white:
|
||||
valids.append(target.copy())
|
||||
if piece_kind == "q":
|
||||
x, y = coord_to_xy(src)
|
||||
comb = combinations([1, -1, 0], 2)
|
||||
for dir in comb:
|
||||
if dir[0] ** 2 + dir[1] ** 2 == 0:
|
||||
continue # x=0 y=0 cannot move => invalid
|
||||
scl = 1
|
||||
target: Coord = {"x": x + dir[0] * scl, "y": y + dir[1] * scl}
|
||||
while board.index_xy(target) not in [Piece.EMPTY, Piece.NONE]:
|
||||
valids.append(target.copy())
|
||||
scl += 1
|
||||
target: Coord = {"x": x + dir[0] * scl, "y": y + dir[1] * scl}
|
||||
if (
|
||||
p := board.index_xy(target)
|
||||
) != Piece.NONE and p.value.isupper() != is_white:
|
||||
valids.append(target.copy())
|
||||
if piece_kind == "c":
|
||||
x, y = coord_to_xy(src)
|
||||
comb = [(-1, 0), (1, 0), (0, -1), (0, 1)]
|
||||
for dir in comb:
|
||||
scl = 1
|
||||
target: Coord = {"x": x + dir[0] * scl, "y": y + dir[1] * scl}
|
||||
while board.index_xy(target) not in [Piece.EMPTY, Piece.NONE]:
|
||||
valids.append(target.copy())
|
||||
scl += 1
|
||||
target: Coord = {"x": x + dir[0] * scl, "y": y + dir[1] * scl}
|
||||
if (
|
||||
p := board.index_xy(target)
|
||||
) != Piece.NONE and p.value.isupper() != is_white:
|
||||
valids.append(target.copy())
|
||||
if piece_kind == "k":
|
||||
x, y = coord_to_xy(src)
|
||||
comb = combinations([-1, 0, 1], 2)
|
||||
for dir in comb:
|
||||
if dir[0] ** 2 + dir[1] ** 2 == 0:
|
||||
continue # x=0 y=0 cannot move => invalid
|
||||
target: Coord = {"x": x + dir[0], "y": y + dir[1]}
|
||||
if (
|
||||
p := board.index_xy(target)
|
||||
) != Piece.NONE and p.value.isupper() != is_white:
|
||||
valids.append(target.copy())
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(app.run(port=8080))
|
||||
79
index.html
Normal file
79
index.html
Normal file
@@ -0,0 +1,79 @@
|
||||
<!DOCTYPE html>
|
||||
<html lang="en">
|
||||
<head>
|
||||
<meta charset="UTF-8">
|
||||
<meta name="viewport" content="width=device-width, initial-scale=1.0">
|
||||
<title>Document</title>
|
||||
<style>
|
||||
* {
|
||||
padding: 0;
|
||||
margin: 0;
|
||||
box-sizing: border-box;
|
||||
font-family: Arial, Helvetica, sans-serif;
|
||||
color: 262942;
|
||||
}
|
||||
body {
|
||||
background-color: #f8e8d3;
|
||||
}
|
||||
.title {
|
||||
font-family: 'Times New Roman', Times, serif;
|
||||
text-transform: lowercase;
|
||||
letter-spacing: 0.25rem;
|
||||
font-size: 4rem;
|
||||
font-weight: 100;
|
||||
color: #48517b;
|
||||
padding: 0 0.5rem 0.5rem 0.5rem;
|
||||
display: block;
|
||||
width: min-content;
|
||||
border-bottom: 2px solid #48517b;
|
||||
}
|
||||
header {
|
||||
display: flex;
|
||||
padding: 1rem;
|
||||
justify-content: center;
|
||||
width: 100vw;
|
||||
user-select: none;
|
||||
}
|
||||
#board {
|
||||
width: 38rem;
|
||||
height: 38rem;
|
||||
display: grid;
|
||||
grid-template-columns: repeat(8, 1fr);
|
||||
cursor: not-allowed;
|
||||
}
|
||||
.center {
|
||||
display: flex;
|
||||
justify-content: center;
|
||||
}
|
||||
</style>
|
||||
</head>
|
||||
<body>
|
||||
<header>
|
||||
<h1 class="title">shatranj</h1>
|
||||
</header>
|
||||
<div class="center">
|
||||
<div id="board">
|
||||
|
||||
</div>
|
||||
</div>
|
||||
<script>
|
||||
const container = document.getElementById("board");
|
||||
|
||||
const cols = ["A", "B", "C", "D", "E", "F", "G", "H"];
|
||||
|
||||
for (let i = 0; i < 64; i++) {
|
||||
let col = i % 8;
|
||||
let row = Math.floor(i/8);
|
||||
const div = document.createElement("div");
|
||||
div.className = "house";
|
||||
div.id = cols[col] + (row+1).toString()
|
||||
if (i % 2 == row % 2) {
|
||||
div.style.backgroundColor = "#262942"
|
||||
} else {
|
||||
div.style.backgroundColor = "#faf5f0"
|
||||
}
|
||||
container.appendChild(div);
|
||||
}
|
||||
</script>
|
||||
</body>
|
||||
</html>
|
||||
3
slow/__init__.py
Normal file
3
slow/__init__.py
Normal file
@@ -0,0 +1,3 @@
|
||||
from .slow import JSONAPI, App, HTTPResponse, JSONResponse, render
|
||||
|
||||
__all__ = ["JSONAPI", "App", "HTTPResponse", "JSONResponse", "render"]
|
||||
176
slow/slow.py
Normal file
176
slow/slow.py
Normal file
@@ -0,0 +1,176 @@
|
||||
import asyncio
|
||||
import json
|
||||
import re
|
||||
from pathlib import Path
|
||||
|
||||
PR = re.compile(r"\<([a-zA-Z_]+)\>")
|
||||
|
||||
|
||||
async def _default_404_route(request):
|
||||
return "HTTP/1.1 404 Not Found\r\nContent-Type: text/html\r\n\r\n404 Not Found contact admin"
|
||||
|
||||
|
||||
async def _default_405_route(request):
|
||||
return "HTTP/1.1 405 Method Not Allowed\r\nContent-Type: text/html\r\n\r\n405 Method Not Allowed"
|
||||
|
||||
|
||||
class App:
|
||||
def __init__(self):
|
||||
self.routes: dict[re.Pattern[str], dict[str, callable]] = {}
|
||||
self.error_routes: dict[int, callable] = {
|
||||
404: _default_404_route,
|
||||
405: _default_405_route,
|
||||
}
|
||||
|
||||
def _pattern_to_regex(self, temp) -> re.Pattern[str]:
|
||||
re_temp = temp
|
||||
iter = PR.finditer(temp)
|
||||
for m in iter:
|
||||
name = m[1]
|
||||
re_temp = re.sub(m[0], r"(?P<" + name + r">[a-zA-Z0-9_-]+)", re_temp)
|
||||
return re.compile(re_temp)
|
||||
|
||||
def _serve(self, path, method, func):
|
||||
if method not in ["GET", "POST", "PUT", "DELETE"]:
|
||||
raise RuntimeError(f'Invalid method "{method}".')
|
||||
pat = self._pattern_to_regex(path)
|
||||
if pat not in self.routes:
|
||||
self.routes[pat] = {}
|
||||
if method in self.routes[pat]:
|
||||
raise RuntimeWarning(f'Path "{path}" already exists.')
|
||||
self.routes[pat][method] = func
|
||||
|
||||
def GET(self, path):
|
||||
"""Decorator to register a GET HTTP route."""
|
||||
|
||||
def decorator(func):
|
||||
self._serve(path, "GET", func)
|
||||
return func
|
||||
|
||||
return decorator
|
||||
|
||||
def POST(self, path):
|
||||
"""Decorator to register a POST HTTP route."""
|
||||
|
||||
def decorator(func):
|
||||
self._serve(path, "POST", func)
|
||||
return func
|
||||
|
||||
return decorator
|
||||
|
||||
def PUT(self, path):
|
||||
"""Decorator to register a PUT HTTP route."""
|
||||
|
||||
def decorator(func):
|
||||
self._serve(path, "PUT", func)
|
||||
return func
|
||||
|
||||
def DELETE(self, path):
|
||||
"""Decorator to register a DELETE HTTP route."""
|
||||
|
||||
def decorator(func):
|
||||
self._serve(path, "DELETE", func)
|
||||
return func
|
||||
|
||||
return decorator
|
||||
|
||||
def error(self, code):
|
||||
"""Decorator to register an error route."""
|
||||
|
||||
def decorator(func):
|
||||
self.error_routes[code] = func
|
||||
return func
|
||||
|
||||
return decorator
|
||||
|
||||
def resolve(self, path, method) -> tuple[callable, dict]:
|
||||
for pattern, route in self.routes.items():
|
||||
if m := pattern.fullmatch(path):
|
||||
if method not in route:
|
||||
return self.error_routes[405], {}
|
||||
return route[method], m.groupdict()
|
||||
return self.error_routes[404], {}
|
||||
|
||||
async def handle_client(
|
||||
self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter
|
||||
):
|
||||
"""Handle an incoming connection (HTTP or WebSocket)."""
|
||||
try:
|
||||
# Read the initial HTTP request line
|
||||
request_line = await reader.readline()
|
||||
if not request_line:
|
||||
return
|
||||
|
||||
# Parse request line
|
||||
parts = request_line.decode(encoding="utf-8").strip().split()
|
||||
if len(parts) < 3:
|
||||
return
|
||||
|
||||
method, path, protocol = parts[0], parts[1], parts[2]
|
||||
|
||||
assert protocol == "HTTP/1.1"
|
||||
|
||||
headers = {}
|
||||
|
||||
content_length = int(headers.get("Content-Length", 0))
|
||||
body = await reader.read(content_length) if content_length else b""
|
||||
|
||||
route, kwargs = self.resolve(path)
|
||||
response = await route(
|
||||
request={
|
||||
"method": method,
|
||||
"path": path,
|
||||
"headers": headers,
|
||||
"body": body,
|
||||
},
|
||||
**kwargs,
|
||||
)
|
||||
writer.write(response.encode(encoding="utf-8"))
|
||||
|
||||
await writer.drain()
|
||||
except Exception as e:
|
||||
print(f"Error: {e}")
|
||||
finally:
|
||||
writer.close()
|
||||
await writer.wait_closed()
|
||||
|
||||
async def run(self, host="127.0.0.1", port=80):
|
||||
"""Start the async server."""
|
||||
|
||||
server = await asyncio.start_server(self.handle_client, host, port)
|
||||
|
||||
print(f"Serving on {host}:{port}")
|
||||
async with server:
|
||||
await server.serve_forever()
|
||||
|
||||
|
||||
def HTTPResponse(content: str, status=200, content_type="text/plain; charset=utf-8"):
|
||||
return f"HTTP/1.1 {status} OK\r\nContent-Type: {content_type}\r\n\r\n{content}"
|
||||
|
||||
|
||||
def render(file: str | Path):
|
||||
if type(file) is str:
|
||||
file = Path(file)
|
||||
content: str = file.read_text()
|
||||
return HTTPResponse(content, content_type="text/html; charset=utf-8")
|
||||
|
||||
|
||||
def JSONResponse(d: dict, status=200):
|
||||
return HTTPResponse(
|
||||
json.dumps(d), status=status, content_type="text/json; charset=utf-8"
|
||||
)
|
||||
|
||||
|
||||
def JSONAPI(func):
|
||||
def wrapper(*args, **kwargs):
|
||||
result = func(*args, **kwargs)
|
||||
if not isinstance(result, dict):
|
||||
if (
|
||||
isinstance(result, tuple)
|
||||
and len(result) == 2
|
||||
and isinstance(result[1], dict)
|
||||
and isinstance(result[0], int)
|
||||
):
|
||||
return JSONResponse(result[1], result[0])
|
||||
raise RuntimeError("Return value of JSONAPI route is not a dictionary")
|
||||
return JSONResponse(result)
|
||||
Reference in New Issue
Block a user