SlowAPI Update

This commit is contained in:
0880
2026-01-17 01:59:12 +03:30
parent 6089eb0b21
commit 5119a09c29
2 changed files with 133 additions and 41 deletions

View File

@@ -1,3 +1,11 @@
from .slow import JSONAPI, App, HTTPResponse, JSONResponse, render from .slow import JSONAPI, App, Headers, HTTPResponse, JSONResponse, Request, render
__all__ = ["JSONAPI", "App", "HTTPResponse", "JSONResponse", "render"] __all__ = [
"JSONAPI",
"App",
"HTTPResponse",
"JSONResponse",
"render",
"Request",
"Headers",
]

View File

@@ -1,23 +1,64 @@
import asyncio import asyncio
import http.client
import json import json
import re import re
import urllib.parse
from pathlib import Path from pathlib import Path
from typing import Any, Awaitable, Callable, Optional
PR = re.compile(r"\<([a-zA-Z_]+)\>") PR = re.compile(r"\<([a-zA-Z_][a-zA-Z0-9_]*)\>")
async def _default_404_route(request): class Headers:
return "HTTP/1.1 404 Not Found\r\nContent-Type: text/html\r\n\r\n404 Not Found contact admin" def __init__(self):
self._d: dict[str, str] = {}
def get(self, key: str, default: Optional[Any] = None) -> str | Any:
return self._d.get(key.lower(), default)
def set(self, key: str, value: str) -> None:
self._d[key.lower()] = value
def __str__(self):
return str(self._d)
async def _default_405_route(request): class Request:
return "HTTP/1.1 405 Method Not Allowed\r\nContent-Type: text/html\r\n\r\n405 Method Not Allowed" def __init__(self, method: str, path: str, headers: Headers, body: bytes):
self.method = method
self.path = path
self.headers = headers
self.body = body
def __str__(self):
return str(
{
"method": self.method,
"path": self.path,
"headers": self.headers,
"body": self.body,
}
)
async def _default_404_route(request: Request):
return "HTTP/1.1 404 Not Found\r\nContent-Type: text/html\r\n\r\n404 Not Found contact admin".encode(
encoding="utf-8"
)
async def _default_405_route(request: Request):
return "HTTP/1.1 405 Method Not Allowed\r\nContent-Type: text/html\r\n\r\n405 Method Not Allowed".encode(
encoding="utf-8"
)
class App: class App:
def __init__(self): def __init__(self):
self.routes: dict[re.Pattern[str], dict[str, callable]] = {} self.routes: dict[
self.error_routes: dict[int, callable] = { re.Pattern[str], dict[str, Callable[[Request, ...], Awaitable[bytes]]]
] = {}
self.error_routes: dict[int, Callable[[Request, ...], Awaitable[bytes]]] = {
404: _default_404_route, 404: _default_404_route,
405: _default_405_route, 405: _default_405_route,
} }
@@ -27,10 +68,14 @@ class App:
iter = PR.finditer(temp) iter = PR.finditer(temp)
for m in iter: for m in iter:
name = m[1] name = m[1]
re_temp = re.sub(m[0], r"(?P<" + name + r">[a-zA-Z0-9_-]+)", re_temp) re_temp = re.sub(
m[0], r"(?P<" + name + r">[a-zA-Z0-9\-._~/:%&=]+)", re_temp
)
return re.compile(re_temp) return re.compile(re_temp)
def _serve(self, path, method, func): def _serve(
self, path: str, method: str, func: Callable[[Request, ...], Awaitable[bytes]]
):
if method not in ["GET", "POST", "PUT", "DELETE"]: if method not in ["GET", "POST", "PUT", "DELETE"]:
raise RuntimeError(f'Invalid method "{method}".') raise RuntimeError(f'Invalid method "{method}".')
pat = self._pattern_to_regex(path) pat = self._pattern_to_regex(path)
@@ -40,40 +85,49 @@ class App:
raise RuntimeWarning(f'Path "{path}" already exists.') raise RuntimeWarning(f'Path "{path}" already exists.')
self.routes[pat][method] = func self.routes[pat][method] = func
def GET(self, path): def GET(self, path: str):
"""Decorator to register a GET HTTP route.""" """Decorator to register a GET HTTP route."""
def decorator(func): def decorator(func: Callable[[Request, ...], Awaitable[bytes]]):
self._serve(path, "GET", func) self._serve(path, "GET", func)
return func return func
return decorator return decorator
def POST(self, path): def POST(self, path: str):
"""Decorator to register a POST HTTP route.""" """Decorator to register a POST HTTP route."""
def decorator(func): def decorator(func: Callable[[Request, ...], Awaitable[bytes]]):
self._serve(path, "POST", func) self._serve(path, "POST", func)
return func return func
return decorator return decorator
def PUT(self, path): def PUT(self, path: str):
"""Decorator to register a PUT HTTP route.""" """Decorator to register a PUT HTTP route."""
def decorator(func): def decorator(func: Callable[[Request, ...], Awaitable[bytes]]):
self._serve(path, "PUT", func) self._serve(path, "PUT", func)
return func return func
def DELETE(self, path): def DELETE(self, path: str):
"""Decorator to register a DELETE HTTP route.""" """Decorator to register a DELETE HTTP route."""
def decorator(func): def decorator(func: Callable[[Request, ...], Awaitable[bytes]]):
self._serve(path, "DELETE", func) self._serve(path, "DELETE", func)
return func return func
return decorator return decorator
def OPTIONS(self, path: str):
"""Decorator to register a OPTIONS HTTP route."""
def decorator(func: Callable[[Request, ...], Awaitable[bytes]]):
self._serve(path, "OPTIONS", func)
return func
return decorator
def error(self, code): def error(self, code):
"""Decorator to register an error route.""" """Decorator to register an error route."""
@@ -83,12 +137,14 @@ class App:
return decorator return decorator
def resolve(self, path, method) -> tuple[callable, dict]: def resolve(self, path, method) -> tuple[Callable[..., Awaitable[bytes]], dict]:
for pattern, route in self.routes.items(): for pattern, route in self.routes.items():
if m := pattern.fullmatch(path): if m := pattern.fullmatch(path):
if method not in route: if method not in route:
return self.error_routes[405], {} return self.error_routes[405], {}
return route[method], m.groupdict() return route[method], {
k: urllib.parse.unquote(v) for k, v in m.groupdict().items()
}
return self.error_routes[404], {} return self.error_routes[404], {}
async def handle_client( async def handle_client(
@@ -110,60 +166,86 @@ class App:
assert protocol == "HTTP/1.1" assert protocol == "HTTP/1.1"
headers = {} headers: Headers = Headers()
while True:
line = await reader.readline()
if line == b"\r\n" or line == b"\n" or not line: # End of headers
break
line = line.decode("utf-8").strip()
if ":" in line:
key, value = line.split(":", 1)
headers.set(key.strip(), value.strip())
content_length = int(headers.get("Content-Length", 0)) content_length = int(headers.get("Content-Length", 0))
body = await reader.read(content_length) if content_length else b"" body = await reader.read(content_length) if content_length else b""
route, kwargs = self.resolve(path) route, kwargs = self.resolve(path, method)
response = await route( response = await route(
request={ request=Request(
"method": method, method=method,
"path": path, path=path,
"headers": headers, headers=headers,
"body": body, body=body,
}, ),
**kwargs, **kwargs,
) )
writer.write(response.encode(encoding="utf-8")) writer.write(response)
await writer.drain() await writer.drain()
except Exception as e: except Exception as e:
print(f"Error: {e}") print(f"Internal Server Error: {e}")
finally: finally:
writer.close() writer.close()
await writer.wait_closed() await writer.wait_closed()
async def run(self, host="127.0.0.1", port=80): async def run(self, host="127.0.0.1", port=8000):
"""Start the async server.""" """Start the async server."""
server = await asyncio.start_server(self.handle_client, host, port) server = await asyncio.start_server(self.handle_client, host, port)
print(f"Serving on {host}:{port}") print(f"Serving on http://{host}:{port}")
async with server: async with server:
await server.serve_forever() await server.serve_forever()
def HTTPResponse(content: str, status=200, content_type="text/plain; charset=utf-8"): AccessControlAllowOrigin = "*"
return f"HTTP/1.1 {status} OK\r\nContent-Type: {content_type}\r\n\r\n{content}"
def render(file: str | Path): def HTTPResponse(
if type(file) is str: content: str, status=200, content_type="text/plain; charset=utf-8", headers=[]
) -> bytes:
global AccessControlAllowOrigin
head: str = f"HTTP/1.1 {status} {http.client.responses.get(status, 'Unkown Status Code')}\r\nContent-Type: {content_type}\r\n"
head += f"Access-Control-Allow-Origin: {AccessControlAllowOrigin}\r\n" # CORS
head += "Access-Control-Allow-Methods: GET, POST, PUT, DELETE, OPTIONS\r\n" # CORS
head += "\r\n".join(headers) + ("\r\n" if len(headers) > 0 else "")
head += "\r\n"
return (head + content).encode(encoding="utf-8")
_value_pattern = re.compile(r"\{\{\s*([a-zA-Z_][a-zA-Z_0-9]*)\s*\}\}")
def render(file: str | Path, variables: dict[str, Any] = {}) -> bytes:
if isinstance(file, str):
file = Path(file) file = Path(file)
content: str = file.read_text() content: str = file.read_text(encoding="utf-8")
for m in _value_pattern.findall(content):
if m in variables:
content = re.sub(r"\{\{\s*" + m + r"\s*\}\}", variables[m], content)
return HTTPResponse(content, content_type="text/html; charset=utf-8") return HTTPResponse(content, content_type="text/html; charset=utf-8")
def JSONResponse(d: dict, status=200): def JSONResponse(d: dict, status=200) -> bytes:
return HTTPResponse( return HTTPResponse(
json.dumps(d), status=status, content_type="text/json; charset=utf-8" json.dumps(d), status=status, content_type="text/json; charset=utf-8"
) )
def JSONAPI(func): def JSONAPI(func):
def wrapper(*args, **kwargs): async def wrapper(*args, **kwargs):
result = func(*args, **kwargs) result = await func(*args, **kwargs)
if not isinstance(result, dict): if not isinstance(result, dict):
if ( if (
isinstance(result, tuple) isinstance(result, tuple)
@@ -174,3 +256,5 @@ def JSONAPI(func):
return JSONResponse(result[1], result[0]) return JSONResponse(result[1], result[0])
raise RuntimeError("Return value of JSONAPI route is not a dictionary") raise RuntimeError("Return value of JSONAPI route is not a dictionary")
return JSONResponse(result) return JSONResponse(result)
return wrapper