diff --git a/slow/slow.py b/slow/slow.py new file mode 100644 index 0000000..49df99d --- /dev/null +++ b/slow/slow.py @@ -0,0 +1,161 @@ +import asyncio +import json +import re +from pathlib import Path + +PR = re.compile(r"\<([a-zA-Z_]+)\>") + + +async def _default_404_route(request): + return "HTTP/1.1 404 Not Found\r\nContent-Type: text/html\r\n\r\n404 Not Found contact admin" + + +async def _default_405_route(request): + return "HTTP/1.1 405 Method Not Allowed\r\nContent-Type: text/html\r\n\r\n405 Method Not Allowed" + + +class App: + def __init__(self): + self.routes: dict[re.Pattern[str], dict[str, callable]] = {} + self.error_routes: dict[int, callable] = { + 404: _default_404_route, + 405: _default_405_route, + } + + def _pattern_to_regex(self, temp) -> re.Pattern[str]: + re_temp = temp + iter = PR.finditer(temp) + for m in iter: + name = m[1] + re_temp = re.sub(m[0], r"(?P<" + name + r">[a-zA-Z0-9_-]+)", re_temp) + return re.compile(re_temp) + + def _serve(self, path, method, func): + if method not in ["GET", "POST", "PUT", "DELETE"]: + raise RuntimeError(f'Invalid method "{method}".') + pat = self._pattern_to_regex(path) + if pat not in self.routes: + self.routes[pat] = {} + if method in self.routes[pat]: + raise RuntimeWarning(f'Path "{path}" already exists.') + self.routes[pat][method] = func + + def GET(self, path): + """Decorator to register a GET HTTP route.""" + + def decorator(func): + self._serve(path, "GET", func) + return func + + return decorator + + def POST(self, path): + """Decorator to register a POST HTTP route.""" + + def decorator(func): + self._serve(path, "POST", func) + return func + + return decorator + + def PUT(self, path): + """Decorator to register a PUT HTTP route.""" + + def decorator(func): + self._serve(path, "PUT", func) + return func + + def DELETE(self, path): + """Decorator to register a DELETE HTTP route.""" + + def decorator(func): + self._serve(path, "DELETE", func) + return func + + return decorator + + def error(self, code): + """Decorator to register an error route.""" + + def decorator(func): + self.error_routes[code] = func + return func + + return decorator + + def resolve(self, path, method) -> tuple[callable, dict]: + for pattern, route in self.routes.items(): + if m := pattern.fullmatch(path): + if method not in route: + return self.error_routes[405], {} + return route[method], m.groupdict() + return self.error_routes[404], {} + + async def handle_client( + self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter + ): + """Handle an incoming connection (HTTP or WebSocket).""" + try: + # Read the initial HTTP request line + request_line = await reader.readline() + if not request_line: + return + + # Parse request line + parts = request_line.decode(encoding="utf-8").strip().split() + if len(parts) < 3: + return + + method, path, protocol = parts[0], parts[1], parts[2] + + assert protocol == "HTTP/1.1" + + headers = {} + + content_length = int(headers.get("Content-Length", 0)) + body = await reader.read(content_length) if content_length else b"" + + route, kwargs = self.resolve(path) + response = await route( + request={ + "method": method, + "path": path, + "headers": headers, + "body": body, + }, + **kwargs, + ) + writer.write(response.encode(encoding="utf-8")) + + await writer.drain() + except Exception as e: + print(f"Error: {e}") + finally: + writer.close() + await writer.wait_closed() + + async def run(self, host="127.0.0.1", port=80): + """Start the async server.""" + + server = await asyncio.start_server(self.handle_client, host, port) + + print(f"Serving on {host}:{port}") + async with server: + await server.serve_forever() + + +def HTTPResponse(content: str, status=200, content_type="text/plain; charset=utf-8"): + return f"HTTP/1.1 {status} OK\r\nContent-Type: {content_type}\r\n\r\n{content}" + + +def render(file: str | Path): + if type(file) is str: + file = Path(file) + content: str = file.read_text() + return HTTPResponse(content, content_type="text/html; charset=utf-8") + + +def JSONResponse(d: dict, status=200): + return HTTPResponse( + json.dumps(d), status=status, content_type="text/json; charset=utf-8" + )