From 5119a09c293bcd358a140b509a17d938af8aa990 Mon Sep 17 00:00:00 2001 From: 0880 <98263509+0880880@users.noreply.github.com> Date: Sat, 17 Jan 2026 01:59:12 +0330 Subject: [PATCH] SlowAPI Update --- slow/__init__.py | 12 +++- slow/slow.py | 162 +++++++++++++++++++++++++++++++++++------------ 2 files changed, 133 insertions(+), 41 deletions(-) diff --git a/slow/__init__.py b/slow/__init__.py index f8b4e01..144fdd9 100644 --- a/slow/__init__.py +++ b/slow/__init__.py @@ -1,3 +1,11 @@ -from .slow import JSONAPI, App, HTTPResponse, JSONResponse, render +from .slow import JSONAPI, App, Headers, HTTPResponse, JSONResponse, Request, render -__all__ = ["JSONAPI", "App", "HTTPResponse", "JSONResponse", "render"] +__all__ = [ + "JSONAPI", + "App", + "HTTPResponse", + "JSONResponse", + "render", + "Request", + "Headers", +] diff --git a/slow/slow.py b/slow/slow.py index 52ce09c..fd32716 100644 --- a/slow/slow.py +++ b/slow/slow.py @@ -1,23 +1,64 @@ import asyncio +import http.client import json import re +import urllib.parse from pathlib import Path +from typing import Any, Awaitable, Callable, Optional -PR = re.compile(r"\<([a-zA-Z_]+)\>") +PR = re.compile(r"\<([a-zA-Z_][a-zA-Z0-9_]*)\>") -async def _default_404_route(request): - return "HTTP/1.1 404 Not Found\r\nContent-Type: text/html\r\n\r\n404 Not Found contact admin" +class Headers: + def __init__(self): + self._d: dict[str, str] = {} + + def get(self, key: str, default: Optional[Any] = None) -> str | Any: + return self._d.get(key.lower(), default) + + def set(self, key: str, value: str) -> None: + self._d[key.lower()] = value + + def __str__(self): + return str(self._d) -async def _default_405_route(request): - return "HTTP/1.1 405 Method Not Allowed\r\nContent-Type: text/html\r\n\r\n405 Method Not Allowed" +class Request: + def __init__(self, method: str, path: str, headers: Headers, body: bytes): + self.method = method + self.path = path + self.headers = headers + self.body = body + + def __str__(self): + return str( + { + "method": self.method, + "path": self.path, + "headers": self.headers, + "body": self.body, + } + ) + + +async def _default_404_route(request: Request): + return "HTTP/1.1 404 Not Found\r\nContent-Type: text/html\r\n\r\n404 Not Found contact admin".encode( + encoding="utf-8" + ) + + +async def _default_405_route(request: Request): + return "HTTP/1.1 405 Method Not Allowed\r\nContent-Type: text/html\r\n\r\n405 Method Not Allowed".encode( + encoding="utf-8" + ) class App: def __init__(self): - self.routes: dict[re.Pattern[str], dict[str, callable]] = {} - self.error_routes: dict[int, callable] = { + self.routes: dict[ + re.Pattern[str], dict[str, Callable[[Request, ...], Awaitable[bytes]]] + ] = {} + self.error_routes: dict[int, Callable[[Request, ...], Awaitable[bytes]]] = { 404: _default_404_route, 405: _default_405_route, } @@ -27,10 +68,14 @@ class App: iter = PR.finditer(temp) for m in iter: name = m[1] - re_temp = re.sub(m[0], r"(?P<" + name + r">[a-zA-Z0-9_-]+)", re_temp) + re_temp = re.sub( + m[0], r"(?P<" + name + r">[a-zA-Z0-9\-._~/:%&=]+)", re_temp + ) return re.compile(re_temp) - def _serve(self, path, method, func): + def _serve( + self, path: str, method: str, func: Callable[[Request, ...], Awaitable[bytes]] + ): if method not in ["GET", "POST", "PUT", "DELETE"]: raise RuntimeError(f'Invalid method "{method}".') pat = self._pattern_to_regex(path) @@ -40,40 +85,49 @@ class App: raise RuntimeWarning(f'Path "{path}" already exists.') self.routes[pat][method] = func - def GET(self, path): + def GET(self, path: str): """Decorator to register a GET HTTP route.""" - def decorator(func): + def decorator(func: Callable[[Request, ...], Awaitable[bytes]]): self._serve(path, "GET", func) return func return decorator - def POST(self, path): + def POST(self, path: str): """Decorator to register a POST HTTP route.""" - def decorator(func): + def decorator(func: Callable[[Request, ...], Awaitable[bytes]]): self._serve(path, "POST", func) return func return decorator - def PUT(self, path): + def PUT(self, path: str): """Decorator to register a PUT HTTP route.""" - def decorator(func): + def decorator(func: Callable[[Request, ...], Awaitable[bytes]]): self._serve(path, "PUT", func) return func - def DELETE(self, path): + def DELETE(self, path: str): """Decorator to register a DELETE HTTP route.""" - def decorator(func): + def decorator(func: Callable[[Request, ...], Awaitable[bytes]]): self._serve(path, "DELETE", func) return func return decorator + def OPTIONS(self, path: str): + """Decorator to register a OPTIONS HTTP route.""" + + def decorator(func: Callable[[Request, ...], Awaitable[bytes]]): + self._serve(path, "OPTIONS", func) + return func + + return decorator + def error(self, code): """Decorator to register an error route.""" @@ -83,12 +137,14 @@ class App: return decorator - def resolve(self, path, method) -> tuple[callable, dict]: + def resolve(self, path, method) -> tuple[Callable[..., Awaitable[bytes]], dict]: for pattern, route in self.routes.items(): if m := pattern.fullmatch(path): if method not in route: return self.error_routes[405], {} - return route[method], m.groupdict() + return route[method], { + k: urllib.parse.unquote(v) for k, v in m.groupdict().items() + } return self.error_routes[404], {} async def handle_client( @@ -110,60 +166,86 @@ class App: assert protocol == "HTTP/1.1" - headers = {} + headers: Headers = Headers() + while True: + line = await reader.readline() + if line == b"\r\n" or line == b"\n" or not line: # End of headers + break + line = line.decode("utf-8").strip() + if ":" in line: + key, value = line.split(":", 1) + headers.set(key.strip(), value.strip()) content_length = int(headers.get("Content-Length", 0)) body = await reader.read(content_length) if content_length else b"" - route, kwargs = self.resolve(path) + route, kwargs = self.resolve(path, method) + response = await route( - request={ - "method": method, - "path": path, - "headers": headers, - "body": body, - }, + request=Request( + method=method, + path=path, + headers=headers, + body=body, + ), **kwargs, ) - writer.write(response.encode(encoding="utf-8")) + writer.write(response) await writer.drain() except Exception as e: - print(f"Error: {e}") + print(f"Internal Server Error: {e}") finally: writer.close() await writer.wait_closed() - async def run(self, host="127.0.0.1", port=80): + async def run(self, host="127.0.0.1", port=8000): """Start the async server.""" server = await asyncio.start_server(self.handle_client, host, port) - print(f"Serving on {host}:{port}") + print(f"Serving on http://{host}:{port}") async with server: await server.serve_forever() -def HTTPResponse(content: str, status=200, content_type="text/plain; charset=utf-8"): - return f"HTTP/1.1 {status} OK\r\nContent-Type: {content_type}\r\n\r\n{content}" +AccessControlAllowOrigin = "*" -def render(file: str | Path): - if type(file) is str: +def HTTPResponse( + content: str, status=200, content_type="text/plain; charset=utf-8", headers=[] +) -> bytes: + global AccessControlAllowOrigin + head: str = f"HTTP/1.1 {status} {http.client.responses.get(status, 'Unkown Status Code')}\r\nContent-Type: {content_type}\r\n" + head += f"Access-Control-Allow-Origin: {AccessControlAllowOrigin}\r\n" # CORS + head += "Access-Control-Allow-Methods: GET, POST, PUT, DELETE, OPTIONS\r\n" # CORS + head += "\r\n".join(headers) + ("\r\n" if len(headers) > 0 else "") + head += "\r\n" + return (head + content).encode(encoding="utf-8") + + +_value_pattern = re.compile(r"\{\{\s*([a-zA-Z_][a-zA-Z_0-9]*)\s*\}\}") + + +def render(file: str | Path, variables: dict[str, Any] = {}) -> bytes: + if isinstance(file, str): file = Path(file) - content: str = file.read_text() + content: str = file.read_text(encoding="utf-8") + for m in _value_pattern.findall(content): + if m in variables: + content = re.sub(r"\{\{\s*" + m + r"\s*\}\}", variables[m], content) return HTTPResponse(content, content_type="text/html; charset=utf-8") -def JSONResponse(d: dict, status=200): +def JSONResponse(d: dict, status=200) -> bytes: return HTTPResponse( json.dumps(d), status=status, content_type="text/json; charset=utf-8" ) def JSONAPI(func): - def wrapper(*args, **kwargs): - result = func(*args, **kwargs) + async def wrapper(*args, **kwargs): + result = await func(*args, **kwargs) if not isinstance(result, dict): if ( isinstance(result, tuple) @@ -174,3 +256,5 @@ def JSONAPI(func): return JSONResponse(result[1], result[0]) raise RuntimeError("Return value of JSONAPI route is not a dictionary") return JSONResponse(result) + + return wrapper