Files

352 lines
11 KiB
Python
Raw Permalink Normal View History

2026-01-14 08:40:36 +00:00
import asyncio
import json
import re
import urllib.parse
2026-01-20 20:20:20 +03:30
from json.decoder import JSONDecodeError
2026-01-14 08:40:36 +00:00
from pathlib import Path
from typing import Any, Awaitable, Callable, Optional
2026-01-14 08:40:36 +00:00
2026-01-21 01:55:33 +03:30
from . import _errors
2026-01-20 21:06:59 +03:30
from .responses import HTTPResponse, JSONResponse, Response
PR = re.compile(r"\<([a-zA-Z_][a-zA-Z0-9_]*)\>")
2026-01-14 08:40:36 +00:00
2026-01-17 18:12:18 +03:30
class CORS:
2026-01-20 21:01:24 +03:30
Origins: set[str]
Methods: set[str]
2026-01-17 18:12:18 +03:30
Disabled: bool
def __init__(self):
self.Disabled = False
2026-01-20 21:01:32 +03:30
self.Origins: set[str] = {}
self.Methods: set[str] = {"GET", "POST", "PUT", "DELETE", "PATCH", "HEAD"}
2026-01-17 18:12:18 +03:30
class Headers:
def __init__(self):
self._d: dict[str, str] = {}
def get(self, key: str, default: Optional[Any] = None) -> str | Any:
return self._d.get(key.lower(), default)
def set(self, key: str, value: str) -> None:
self._d[key.lower()] = value
def __str__(self):
return str(self._d)
2026-01-17 18:12:34 +03:30
def __contains__(self, key):
return self._d.__contains__(key)
class Request:
def __init__(self, method: str, path: str, headers: Headers, body: bytes):
self.method = method
self.path = path
self.headers = headers
self.body = body
def __str__(self):
return str(
{
"method": self.method,
"path": self.path,
"headers": self.headers,
"body": self.body,
}
)
2026-01-14 08:40:36 +00:00
2026-01-20 20:20:20 +03:30
def json(self) -> dict | None:
try:
return json.loads(self.body)
except JSONDecodeError:
return None
2026-01-14 08:40:36 +00:00
async def _default_404_route(request: Request):
2026-01-21 01:55:33 +03:30
return HTTPResponse("404 Not Found", status=404)
async def _default_405_route(request: Request):
2026-01-21 01:55:33 +03:30
return HTTPResponse(b"", status=405)
2026-01-14 08:40:36 +00:00
class App:
def __init__(self):
self.routes: dict[
2026-01-21 01:55:33 +03:30
re.Pattern[str],
dict[
str,
Callable[[Request, ...], Awaitable[Response | None]],
],
] = {}
2026-01-21 01:55:33 +03:30
self.error_routes: dict[
int, Callable[[Request, ...], Awaitable[Response | None]]
] = {
2026-01-14 08:40:36 +00:00
404: _default_404_route,
405: _default_405_route,
}
2026-01-17 18:13:09 +03:30
self.CORS = CORS()
2026-01-14 08:40:36 +00:00
def _pattern_to_regex(self, temp) -> re.Pattern[str]:
re_temp = temp
iter = PR.finditer(temp)
for m in iter:
name = m[1]
2026-01-17 18:13:01 +03:30
re_temp = re.sub(m[0], r"(?P<" + name + r">[a-zA-Z0-9\-._~:%&=]+)", re_temp)
2026-01-14 08:40:36 +00:00
return re.compile(re_temp)
def _serve(
self,
path: str,
method: str,
2026-01-21 01:55:33 +03:30
func: Callable[[Request, ...], Awaitable[Response | None]],
):
2026-01-20 22:19:57 +03:30
if method not in ["GET", "POST", "PUT", "DELETE", "PATCH", "HEAD"]:
2026-01-14 08:40:36 +00:00
raise RuntimeError(f'Invalid method "{method}".')
2026-01-21 01:55:33 +03:30
_errors.error_not_async(func)
2026-01-14 08:40:36 +00:00
pat = self._pattern_to_regex(path)
if pat not in self.routes:
self.routes[pat] = {}
if method in self.routes[pat]:
raise RuntimeWarning(f'Path "{path}" already exists.')
self.routes[pat][method] = func
def GET(self, path: str):
2026-01-14 08:40:36 +00:00
"""Decorator to register a GET HTTP route."""
2026-01-21 01:55:33 +03:30
def decorator(func: Callable[[Request, ...], Awaitable[Response | None]]):
2026-01-14 08:40:36 +00:00
self._serve(path, "GET", func)
async def wrapper(*args, **kwargs):
2026-01-21 01:55:33 +03:30
res = await func(*args, **kwargs)
if isinstance(res, Response):
res.no_header = True
return res
self._serve(path, "HEAD", wrapper)
2026-01-14 08:40:36 +00:00
return func
return decorator
def POST(self, path: str):
2026-01-14 08:40:36 +00:00
"""Decorator to register a POST HTTP route."""
2026-01-21 01:55:33 +03:30
def decorator(func: Callable[[Request, ...], Awaitable[Response | None]]):
2026-01-14 08:40:36 +00:00
self._serve(path, "POST", func)
return func
return decorator
def PUT(self, path: str):
2026-01-14 08:40:36 +00:00
"""Decorator to register a PUT HTTP route."""
2026-01-21 01:55:33 +03:30
def decorator(func: Callable[[Request, ...], Awaitable[Response | None]]):
2026-01-14 08:40:36 +00:00
self._serve(path, "PUT", func)
return func
def DELETE(self, path: str):
2026-01-14 08:40:36 +00:00
"""Decorator to register a DELETE HTTP route."""
2026-01-21 01:55:33 +03:30
def decorator(func: Callable[[Request, ...], Awaitable[Response | None]]):
2026-01-14 08:40:36 +00:00
self._serve(path, "DELETE", func)
return func
return decorator
2026-01-17 18:13:32 +03:30
def PATCH(self, path: str):
"""Decorator to register a PATCH HTTP route."""
2026-01-21 01:55:33 +03:30
def decorator(func: Callable[[Request, ...], Awaitable[Response | None]]):
2026-01-17 18:13:32 +03:30
self._serve(path, "PATCH", func)
return func
return decorator
def HEAD(self, path: str):
"""Decorator to register a HEAD HTTP route."""
2026-01-21 01:55:33 +03:30
def decorator(func: Callable[[Request, ...], Awaitable[Response | None]]):
2026-01-17 18:13:32 +03:30
self._serve(path, "HEAD", func)
return func
return decorator
2026-01-14 08:40:36 +00:00
def error(self, code):
"""Decorator to register an error route."""
def decorator(func):
self.error_routes[code] = func
return func
return decorator
def resolve(self, path, method) -> tuple[Callable[..., Awaitable[Response]], dict]:
2026-01-14 08:40:36 +00:00
for pattern, route in self.routes.items():
if m := pattern.fullmatch(path):
2026-01-21 01:55:33 +03:30
if method not in route or method == "websocket":
2026-01-14 08:40:36 +00:00
return self.error_routes[405], {}
return route[method], {
k: urllib.parse.unquote(v) for k, v in m.groupdict().items()
2026-01-21 01:55:33 +03:30
} # ty:ignore[invalid-return-type]
2026-01-14 08:40:36 +00:00
return self.error_routes[404], {}
def methods(self, path) -> set[str]:
for pattern, route in self.routes.items():
if pattern.fullmatch(path):
2026-01-21 01:55:33 +03:30
return set(route.keys() - {"websocket"})
return set()
2026-01-14 08:40:36 +00:00
async def handle_client(
self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter
):
"""Handle an incoming connection (HTTP or WebSocket)."""
try:
# Read the initial HTTP request line
request_line = await reader.readline()
if not request_line:
return
2026-01-21 01:55:33 +03:30
if not request_line.decode(encoding="utf-8").startswith(
("GET", "POST", "PUT", "HEAD", "DELETE", "PATCH", "OPTIONS")
):
# Probably WebSocket
pass
2026-01-14 08:40:36 +00:00
# Parse request line
parts = request_line.decode(encoding="utf-8").strip().split()
if len(parts) < 3:
return
method, path, protocol = parts[0], parts[1], parts[2]
assert protocol == "HTTP/1.1"
headers: Headers = Headers()
while True:
line = await reader.readline()
if line == b"\r\n" or line == b"\n" or not line: # End of headers
break
line = line.decode("utf-8").strip()
if ":" in line:
key, value = line.split(":", 1)
headers.set(key.strip(), value.strip())
2026-01-14 08:40:36 +00:00
content_length = int(headers.get("Content-Length", 0))
body = await reader.read(content_length) if content_length else b""
if (
method == "OPTIONS"
): # FIXME Should handle responding with available methods as well
2026-01-17 18:13:48 +03:30
if "origin" in headers and headers.get("origin") in self.CORS.Origins:
origin = headers.get("origin")
head = [
"Content-Type: text/plain",
"Content-Length: 0",
f"Access-Control-Allow-Origin: {origin}",
f"Access-Control-Allow-Methods: {','.join(self.CORS.Methods)}",
"Access-Control-Allow-Headers: Content-Type,Authorization", # CORS
"Vary: Origin",
]
2026-01-17 18:13:48 +03:30
writer.write(Response(200, head, b"").render(self))
2026-01-17 18:13:48 +03:30
await writer.drain()
else:
writer.write(Response(403, ["Vary: Origin"], b"").render(self))
2026-01-17 18:13:48 +03:30
await writer.drain()
else:
route, kwargs = self.resolve(path, method)
2026-01-21 01:55:33 +03:30
if (
method == "GET"
and "connection" in headers
and headers.get("connection") == "Upgrade"
and "upgrade" in headers
):
# Upgrade
return Response(
status=426, headers=["Connection: close"], body=b""
).render(self)
response: Response = await route(
2026-01-17 18:13:48 +03:30
request=Request(
method=method, path=path, headers=headers, body=body
2026-01-17 18:13:48 +03:30
),
**kwargs,
)
response.headers.append("Vary: Origin")
if (
"origin" in headers
and self.CORS.Disabled
and headers.get("origin") in self.CORS.Origins
): # CORS
response.headers.append(
f"Access-Control-Allow-Origin: {headers.get('origin')}"
)
response.headers.append(
f"Access-Control-Allow-Methods: {','.join(self.CORS.Methods & self.methods(path))}"
)
response.headers.append(
"Access-Control-Allow-Headers: Content-Type,Authorization\r\n"
)
writer.write(response.render(self))
2026-01-17 18:13:48 +03:30
await writer.drain()
2026-01-14 08:40:36 +00:00
except Exception as e:
2026-01-17 19:32:25 +03:30
raise e
print(f"Internal Server Error: {e}")
2026-01-14 08:40:36 +00:00
finally:
writer.close()
await writer.wait_closed()
async def run(self, host="127.0.0.1", port=8000):
2026-01-14 08:40:36 +00:00
"""Start the async server."""
server = await asyncio.start_server(self.handle_client, host, port)
print(f"Serving on http://{host}:{port}")
2026-01-14 08:40:36 +00:00
async with server:
await server.serve_forever()
_value_pattern = re.compile(r"\{\{\s*([a-zA-Z_][a-zA-Z_0-9]*)\s*\}\}")
2026-01-20 21:06:59 +03:30
def render(
file: str | Path, variables: dict[str, Any] = {}
) -> Response: # TODO Move to another module
if isinstance(file, str):
2026-01-14 08:40:36 +00:00
file = Path(file)
content: str = file.read_text(encoding="utf-8")
for m in _value_pattern.findall(content):
if m in variables:
content = re.sub(r"\{\{\s*" + m + r"\s*\}\}", variables[m], content)
return HTTPResponse(content, content_type="text/html; charset=utf-8")
2026-01-14 08:40:36 +00:00
2026-01-20 21:06:59 +03:30
def redirect(location: str): # TODO Move to another module
2026-01-21 13:39:40 +03:30
return Response(307, [f"Location: {location}"], b"")
2026-01-20 19:15:57 +03:30
2026-01-20 21:06:59 +03:30
def JSONAPI(func): # TODO Move to another module
async def wrapper(*args, **kwargs):
result = await func(*args, **kwargs)
2026-01-14 09:01:49 +00:00
if not isinstance(result, dict):
if (
isinstance(result, tuple)
and len(result) == 2
and isinstance(result[1], dict)
and isinstance(result[0], int)
):
return JSONResponse(result[1], result[0])
2026-01-14 09:01:49 +00:00
raise RuntimeError("Return value of JSONAPI route is not a dictionary")
return JSONResponse(result)
return wrapper