89 lines
2.5 KiB
Python
89 lines
2.5 KiB
Python
import asyncio
|
|
import re
|
|
|
|
import aiohttp
|
|
from aiohttp import ClientError
|
|
|
|
folder_hyperlink_pat: re.Pattern = re.compile(
|
|
r"\<a href=\"\/?([a-zA-Z0-9_ -]+\/)\"\>\s*\<code\>"
|
|
)
|
|
|
|
movie_hyperlink_pat: re.Pattern = re.compile(
|
|
r"\<a href=\"\/?([a-zA-Z0-9_. -]+\.?(mp4|mkv|avi|mov|wmv|webm))\"\>\s*\<code\>"
|
|
)
|
|
|
|
|
|
async def fetch(url, tries=0):
|
|
try:
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(url) as response:
|
|
return await response.text()
|
|
except ClientError:
|
|
print(f"Failed ({tries + 1})...", end="")
|
|
if tries >= 5:
|
|
raise RuntimeError(f"Failed to fetch URL {url} after 6 tries")
|
|
return fetch(url, tries + 1)
|
|
except Exception as e:
|
|
print(f"Unexpected error: {e}")
|
|
return None
|
|
|
|
|
|
def join_url(a: str, b: str) -> str:
|
|
return a.rstrip("/") + "/" + b.lstrip("/")
|
|
|
|
|
|
async def traverse(pool: list[str], url: str, verbose=False) -> None:
|
|
page = await fetch(url)
|
|
folders = re.findall(folder_hyperlink_pat, page)
|
|
files = re.findall(movie_hyperlink_pat, page)
|
|
for f in folders:
|
|
if verbose:
|
|
print(f" -> {join_url(url, f)}")
|
|
await traverse(pool, join_url(url, f), verbose=verbose)
|
|
for f in files:
|
|
file_name = f[0]
|
|
if verbose:
|
|
print(f"{join_url(url, file_name)} ({len(pool)})")
|
|
pool.append(join_url(url, file_name))
|
|
|
|
|
|
URLS: list[str] = ["https://berlin.saymyname.website/Movies/"]
|
|
|
|
movies: list[str] = []
|
|
|
|
|
|
def ask(q: str, default=True) -> bool:
|
|
y = "Y" if default else "y"
|
|
n = "n" if default else "N"
|
|
while True:
|
|
a = input(f"{q} [{y}|{n}]: ").lower()
|
|
if not a:
|
|
return default
|
|
if a == "y" or a == "n":
|
|
return a == "y"
|
|
|
|
|
|
is_verbose = ask("Verbose?")
|
|
|
|
|
|
async def main():
|
|
global folder_hyperlink_pat, movie_hyperlink_pat
|
|
tasks = []
|
|
for url in URLS:
|
|
is_fancy = ask(
|
|
f"Is {url[url.find('/') + 2 : url.rfind('.') + url[url.rfind('.') :].find('/')]} fancy?"
|
|
)
|
|
if not is_fancy:
|
|
folder_hyperlink_pat = re.compile(r"\<a href=\"\/?([a-zA-Z0-9_ -]+\/)\"\>")
|
|
|
|
movie_hyperlink_pat = re.compile(
|
|
r"\<a href=\"\/?([a-zA-Z0-9_. -]+\.?(mp4|mkv|avi|mov|wmv|webm))\"\>"
|
|
)
|
|
tasks.append(traverse(movies, url, is_verbose))
|
|
await asyncio.gather(*tasks)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|
|
|
|
open("movies.txt", mode="w", encoding="utf-8").write("\n".join(movies)) |