Compare commits
5 Commits
24334d1def
...
a7f0001659
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a7f0001659 | ||
|
|
66f5eb102b | ||
|
|
4e8a5851d5 | ||
|
|
db8d5c4454 | ||
|
|
e67ef7d790 |
@@ -1,9 +1,8 @@
|
||||
from . import responses
|
||||
from .slow import (
|
||||
JSONAPI,
|
||||
App,
|
||||
Headers,
|
||||
HTTPResponse,
|
||||
JSONResponse,
|
||||
Request,
|
||||
redirect,
|
||||
render,
|
||||
@@ -12,10 +11,9 @@ from .slow import (
|
||||
__all__ = [
|
||||
"JSONAPI",
|
||||
"App",
|
||||
"HTTPResponse",
|
||||
"JSONResponse",
|
||||
"render",
|
||||
"Request",
|
||||
"Headers",
|
||||
"redirect",
|
||||
"responses",
|
||||
]
|
||||
|
||||
54
slow/responses.py
Normal file
54
slow/responses.py
Normal file
@@ -0,0 +1,54 @@
|
||||
import http.client
|
||||
import json
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from .slow import App
|
||||
|
||||
|
||||
class Response:
|
||||
status: int
|
||||
headers: list[str]
|
||||
body: bytes
|
||||
|
||||
def __init__(self, status: int, headers: list[str], body: bytes):
|
||||
self.status = status
|
||||
self.headers = headers
|
||||
self.body = body
|
||||
self.no_header = False
|
||||
|
||||
def header(self, app: App) -> bytes:
|
||||
header = f"HTTP/1.1 {self.status} {http.client.responses.get(self.status, 'Unkown Status Code')}\r\n"
|
||||
header += f"Content-Length: {len(self.body)}\r\n"
|
||||
for h in self.headers:
|
||||
header += h + "\r\n"
|
||||
|
||||
header += "\r\n"
|
||||
|
||||
return header.encode(encoding="utf-8")
|
||||
|
||||
def render(self, app: "App") -> bytes:
|
||||
if self.no_header:
|
||||
return self.header(app)
|
||||
return self.header(app) + self.body
|
||||
|
||||
|
||||
def HTTPResponse(
|
||||
content: str | bytes,
|
||||
status=200,
|
||||
content_type="text/plain; charset=utf-8",
|
||||
headers=[],
|
||||
) -> Response:
|
||||
if isinstance(content, str):
|
||||
content_bytes = content.encode(encoding="utf-8")
|
||||
else:
|
||||
content_bytes = content
|
||||
return Response(
|
||||
status, [f"Content-Type: {content_type}", *headers], body=content_bytes
|
||||
)
|
||||
|
||||
|
||||
def JSONResponse(d: dict, status=200) -> Response:
|
||||
return HTTPResponse(
|
||||
json.dumps(d), status=status, content_type="text/json; charset=utf-8"
|
||||
)
|
||||
161
slow/slow.py
161
slow/slow.py
@@ -1,23 +1,25 @@
|
||||
import asyncio
|
||||
import http.client
|
||||
import json
|
||||
import re
|
||||
import urllib.parse
|
||||
from json.decoder import JSONDecodeError
|
||||
from pathlib import Path
|
||||
from typing import Any, Awaitable, Callable, Optional
|
||||
|
||||
from .responses import HTTPResponse, JSONResponse, Response
|
||||
|
||||
PR = re.compile(r"\<([a-zA-Z_][a-zA-Z0-9_]*)\>")
|
||||
|
||||
|
||||
class CORS:
|
||||
Origins: list[str]
|
||||
Methods: list[str]
|
||||
Origins: set[str]
|
||||
Methods: set[str]
|
||||
Disabled: bool
|
||||
|
||||
def __init__(self):
|
||||
self.Disabled = False
|
||||
self.Origins = []
|
||||
self.Methods = ["GET", "POST", "PUT", "DELETE", "PATCH", "HEAD"]
|
||||
self.Origins: set[str] = {}
|
||||
self.Methods: set[str] = {"GET", "POST", "PUT", "DELETE", "PATCH", "HEAD"}
|
||||
|
||||
|
||||
class Headers:
|
||||
@@ -38,14 +40,11 @@ class Headers:
|
||||
|
||||
|
||||
class Request:
|
||||
def __init__(
|
||||
self, method: str, path: str, headers: Headers, body: bytes, app: "App"
|
||||
):
|
||||
def __init__(self, method: str, path: str, headers: Headers, body: bytes):
|
||||
self.method = method
|
||||
self.path = path
|
||||
self.headers = headers
|
||||
self.body = body
|
||||
self.app = app
|
||||
|
||||
def __str__(self):
|
||||
return str(
|
||||
@@ -57,6 +56,12 @@ class Request:
|
||||
}
|
||||
)
|
||||
|
||||
def json(self) -> dict | None:
|
||||
try:
|
||||
return json.loads(self.body)
|
||||
except JSONDecodeError:
|
||||
return None
|
||||
|
||||
|
||||
async def _default_404_route(request: Request):
|
||||
return "HTTP/1.1 404 Not Found\r\nContent-Type: text/html\r\n\r\n404 Not Found contact admin".encode(
|
||||
@@ -73,9 +78,9 @@ async def _default_405_route(request: Request):
|
||||
class App:
|
||||
def __init__(self):
|
||||
self.routes: dict[
|
||||
re.Pattern[str], dict[str, Callable[[Request, ...], Awaitable[bytes]]]
|
||||
re.Pattern[str], dict[str, Callable[[Request, ...], Awaitable[Response]]]
|
||||
] = {}
|
||||
self.error_routes: dict[int, Callable[[Request, ...], Awaitable[bytes]]] = {
|
||||
self.error_routes: dict[int, Callable[[Request, ...], Awaitable[Response]]] = {
|
||||
404: _default_404_route,
|
||||
405: _default_405_route,
|
||||
}
|
||||
@@ -90,7 +95,10 @@ class App:
|
||||
return re.compile(re_temp)
|
||||
|
||||
def _serve(
|
||||
self, path: str, method: str, func: Callable[[Request, ...], Awaitable[bytes]]
|
||||
self,
|
||||
path: str,
|
||||
method: str,
|
||||
func: Callable[[Request, ...], Awaitable[Response]],
|
||||
):
|
||||
if method not in ["GET", "POST", "PUT", "DELETE"]:
|
||||
raise RuntimeError(f'Invalid method "{method}".')
|
||||
@@ -104,8 +112,15 @@ class App:
|
||||
def GET(self, path: str):
|
||||
"""Decorator to register a GET HTTP route."""
|
||||
|
||||
def decorator(func: Callable[[Request, ...], Awaitable[bytes]]):
|
||||
def decorator(func: Callable[[Request, ...], Awaitable[Response]]):
|
||||
self._serve(path, "GET", func)
|
||||
|
||||
async def wrapper(*args, **kwargs):
|
||||
res: Response = await func(*args, **kwargs)
|
||||
res.no_header = True
|
||||
return res
|
||||
|
||||
self._serve(path, "HEAD", wrapper)
|
||||
return func
|
||||
|
||||
return decorator
|
||||
@@ -113,7 +128,7 @@ class App:
|
||||
def POST(self, path: str):
|
||||
"""Decorator to register a POST HTTP route."""
|
||||
|
||||
def decorator(func: Callable[[Request, ...], Awaitable[bytes]]):
|
||||
def decorator(func: Callable[[Request, ...], Awaitable[Response]]):
|
||||
self._serve(path, "POST", func)
|
||||
return func
|
||||
|
||||
@@ -122,14 +137,14 @@ class App:
|
||||
def PUT(self, path: str):
|
||||
"""Decorator to register a PUT HTTP route."""
|
||||
|
||||
def decorator(func: Callable[[Request, ...], Awaitable[bytes]]):
|
||||
def decorator(func: Callable[[Request, ...], Awaitable[Response]]):
|
||||
self._serve(path, "PUT", func)
|
||||
return func
|
||||
|
||||
def DELETE(self, path: str):
|
||||
"""Decorator to register a DELETE HTTP route."""
|
||||
|
||||
def decorator(func: Callable[[Request, ...], Awaitable[bytes]]):
|
||||
def decorator(func: Callable[[Request, ...], Awaitable[Response]]):
|
||||
self._serve(path, "DELETE", func)
|
||||
return func
|
||||
|
||||
@@ -138,7 +153,7 @@ class App:
|
||||
def PATCH(self, path: str):
|
||||
"""Decorator to register a PATCH HTTP route."""
|
||||
|
||||
def decorator(func: Callable[[Request, ...], Awaitable[bytes]]):
|
||||
def decorator(func: Callable[[Request, ...], Awaitable[Response]]):
|
||||
self._serve(path, "PATCH", func)
|
||||
return func
|
||||
|
||||
@@ -147,7 +162,7 @@ class App:
|
||||
def HEAD(self, path: str):
|
||||
"""Decorator to register a HEAD HTTP route."""
|
||||
|
||||
def decorator(func: Callable[[Request, ...], Awaitable[bytes]]):
|
||||
def decorator(func: Callable[[Request, ...], Awaitable[Response]]):
|
||||
self._serve(path, "HEAD", func)
|
||||
return func
|
||||
|
||||
@@ -162,7 +177,7 @@ class App:
|
||||
|
||||
return decorator
|
||||
|
||||
def resolve(self, path, method) -> tuple[Callable[..., Awaitable[bytes]], dict]:
|
||||
def resolve(self, path, method) -> tuple[Callable[..., Awaitable[Response]], dict]:
|
||||
for pattern, route in self.routes.items():
|
||||
if m := pattern.fullmatch(path):
|
||||
if method not in route:
|
||||
@@ -172,6 +187,12 @@ class App:
|
||||
}
|
||||
return self.error_routes[404], {}
|
||||
|
||||
def methods(self, path) -> set[str]:
|
||||
for pattern, route in self.routes.items():
|
||||
if pattern.fullmatch(path):
|
||||
return set(route.keys())
|
||||
return set()
|
||||
|
||||
async def handle_client(
|
||||
self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter
|
||||
):
|
||||
@@ -204,42 +225,55 @@ class App:
|
||||
content_length = int(headers.get("Content-Length", 0))
|
||||
body = await reader.read(content_length) if content_length else b""
|
||||
|
||||
if method == "OPTIONS":
|
||||
if (
|
||||
method == "OPTIONS"
|
||||
): # FIXME Should handle responding with available methods as well
|
||||
if "origin" in headers and headers.get("origin") in self.CORS.Origins:
|
||||
origin = headers.get("origin")
|
||||
response = "HTTP/1.1 200 OK\r\n"
|
||||
response += "Content-Type: text/plain\r\n"
|
||||
response += "Content-Length: 0\r\n"
|
||||
response += f"Access-Control-Allow-Origin: {origin}\r\n"
|
||||
response += f"Access-Control-Allow-Methods: {','.join(self.CORS.Methods)}\r\n"
|
||||
response += "Access-Control-Allow-Headers: Content-Type,Authorization\r\n" # CORS
|
||||
response += "Vary: Origin\r\n"
|
||||
response += "\r\n"
|
||||
head = [
|
||||
"Content-Type: text/plain",
|
||||
"Content-Length: 0",
|
||||
f"Access-Control-Allow-Origin: {origin}",
|
||||
f"Access-Control-Allow-Methods: {','.join(self.CORS.Methods)}",
|
||||
"Access-Control-Allow-Headers: Content-Type,Authorization", # CORS
|
||||
"Vary: Origin",
|
||||
]
|
||||
|
||||
writer.write(response.encode(encoding="utf-8"))
|
||||
writer.write(Response(200, head, b"").render(self))
|
||||
|
||||
await writer.drain()
|
||||
|
||||
else:
|
||||
response = "HTTP/1.1 403 Forbidden\r\n"
|
||||
response += "Content-Length: 0\r\n"
|
||||
response += "Vary: Origin\r\n"
|
||||
response += "\r\n"
|
||||
|
||||
writer.write(response.encode(encoding="utf-8"))
|
||||
writer.write(Response(403, ["Vary: Origin"], b"").render(self))
|
||||
|
||||
await writer.drain()
|
||||
|
||||
else:
|
||||
route, kwargs = self.resolve(path, method)
|
||||
|
||||
response = await route(
|
||||
response: Response = await route(
|
||||
request=Request(
|
||||
method=method, path=path, headers=headers, body=body, app=self
|
||||
method=method, path=path, headers=headers, body=body
|
||||
),
|
||||
**kwargs,
|
||||
)
|
||||
writer.write(response)
|
||||
response.headers.append("Vary: Origin")
|
||||
if (
|
||||
"origin" in headers
|
||||
and self.CORS.Disabled
|
||||
and headers.get("origin") in self.CORS.Origins
|
||||
): # CORS
|
||||
response.headers.append(
|
||||
f"Access-Control-Allow-Origin: {headers.get('origin')}"
|
||||
)
|
||||
response.headers.append(
|
||||
f"Access-Control-Allow-Methods: {','.join(self.CORS.Methods & self.methods(path))}"
|
||||
)
|
||||
response.headers.append(
|
||||
"Access-Control-Allow-Headers: Content-Type,Authorization\r\n"
|
||||
)
|
||||
|
||||
writer.write(response.render(self))
|
||||
|
||||
await writer.drain()
|
||||
except Exception as e:
|
||||
@@ -259,59 +293,26 @@ class App:
|
||||
await server.serve_forever()
|
||||
|
||||
|
||||
def HTTPResponse(
|
||||
request: Request,
|
||||
content: str | bytes,
|
||||
status=200,
|
||||
content_type="text/plain; charset=utf-8",
|
||||
headers=[],
|
||||
) -> bytes:
|
||||
content_bytes = content
|
||||
if isinstance(content, str):
|
||||
content_bytes = content.encode(encoding="utf-8")
|
||||
head: str = f"HTTP/1.1 {status} {http.client.responses.get(status, 'Unkown Status Code')}\r\nContent-Type: {content_type}\r\nContent-Length: {len(content_bytes)}\r\n"
|
||||
if (
|
||||
"origin" in request.headers
|
||||
and not request.app.CORS.Disabled
|
||||
and request.headers.get("origin") in request.app.CORS.Origins
|
||||
):
|
||||
head += (
|
||||
f"Access-Control-Allow-Origin: {request.headers.get('origin')}\r\n" # CORS
|
||||
)
|
||||
head += f"Access-Control-Allow-Methods: {','.join(request.app.CORS.Methods)}\r\n" # CORS
|
||||
head += "Access-Control-Allow-Headers: Content-Type,Authorization\r\n" # CORS
|
||||
head += "Vary: Origin\r\n"
|
||||
head += "\r\n".join(headers) + ("\r\n" if len(headers) > 0 else "")
|
||||
head += "\r\n"
|
||||
return head.encode(encoding="utf-8") + content_bytes
|
||||
|
||||
|
||||
_value_pattern = re.compile(r"\{\{\s*([a-zA-Z_][a-zA-Z_0-9]*)\s*\}\}")
|
||||
|
||||
|
||||
def render(request: Request, file: str | Path, variables: dict[str, Any] = {}) -> bytes:
|
||||
def render(
|
||||
file: str | Path, variables: dict[str, Any] = {}
|
||||
) -> Response: # TODO Move to another module
|
||||
if isinstance(file, str):
|
||||
file = Path(file)
|
||||
content: str = file.read_text(encoding="utf-8")
|
||||
for m in _value_pattern.findall(content):
|
||||
if m in variables:
|
||||
content = re.sub(r"\{\{\s*" + m + r"\s*\}\}", variables[m], content)
|
||||
return HTTPResponse(request, content, content_type="text/html; charset=utf-8")
|
||||
return HTTPResponse(content, content_type="text/html; charset=utf-8")
|
||||
|
||||
|
||||
def redirect(location: str):
|
||||
return f"HTTP/1.1 307 Temporary Redirect\r\nContent-Length: 0\r\nLocation: {location}".encode(
|
||||
encoding="utf-8"
|
||||
)
|
||||
def redirect(location: str): # TODO Move to another module
|
||||
return Response(307, ["Location: {location}"], b"")
|
||||
|
||||
|
||||
def JSONResponse(request: Request, d: dict, status=200) -> bytes:
|
||||
return HTTPResponse(
|
||||
request, json.dumps(d), status=status, content_type="text/json; charset=utf-8"
|
||||
)
|
||||
|
||||
|
||||
def JSONAPI(func):
|
||||
def JSONAPI(func): # TODO Move to another module
|
||||
async def wrapper(*args, **kwargs):
|
||||
result = await func(*args, **kwargs)
|
||||
if not isinstance(result, dict):
|
||||
@@ -321,8 +322,8 @@ def JSONAPI(func):
|
||||
and isinstance(result[1], dict)
|
||||
and isinstance(result[0], int)
|
||||
):
|
||||
return JSONResponse(kwargs["request"], result[1], result[0])
|
||||
return JSONResponse(result[1], result[0])
|
||||
raise RuntimeError("Return value of JSONAPI route is not a dictionary")
|
||||
return JSONResponse(kwargs["request"], result)
|
||||
return JSONResponse(result)
|
||||
|
||||
return wrapper
|
||||
|
||||
Reference in New Issue
Block a user