Compare commits

..

1 Commits

Author SHA1 Message Date
HugeFrog24 4a5b0a6ee3 Multi-gooner support because who knows 2026-03-01 01:36:01 +01:00
13 changed files with 2786 additions and 11325 deletions
-14
View File
@@ -1,14 +0,0 @@
{
"permissions": {
"allow": [
"Bash(jq -r \".\"\"/api/v1/video-channels/{channelHandle}/videos\"\".get.parameters[] | $ref\")",
"Bash(grep '$ref')",
"Bash(grep -c \"^ ''/\" /c/Users/admin/Desktop/jailbirdz-dl/docs/external/peertube/openapi.yaml)",
"Bash(grep -c ' \"\"/' /c/Users/admin/Desktop/jailbirdz-dl/docs/external/peertube/openapi.json)",
"Bash(grep -c \"^ ''/\" /c/Users/admin/Desktop/jailbirdz-dl/docs/external/peertube/openapi.yaml)",
"Bash(git --no-pager diff HEAD~1 docs/external/peertube/openapi.json)",
"Bash(git --no-pager show HEAD~1:docs/external/peertube/openapi.json)",
"Bash(git --no-pager diff --name-only HEAD~1)"
]
}
}
@@ -0,0 +1,14 @@
---
alwaysApply: true
always_on: true
trigger: always_on
applyTo: "**"
description: Snyk Security At Inception
---
# Project security best practices
- Always run snyk_code_scan tool for new first party code that is generated in a Snyk-supported language.
- If any security issues are found based on newly introduced or modified code or dependencies, attempt to fix the issues using the results context from Snyk.
- Rescan the code after fixing the issues to ensure that the issues were fixed and that there are no newly introduced issues.
- Repeat this process until no new issues are found.
+4 -5
View File
@@ -44,7 +44,7 @@ Sites with no credentials are skipped automatically when running `python main.py
### 1. Scrape ### 1. Scrape
Discovers all post URLs via the WordPress REST API, then visits each page with a headless Firefox browser to intercept video network requests (MP4, MOV, WebM, AVI, M4V, HLS/M3U8). Discovers all post URLs via the WordPress REST API, then visits each page with a headless Firefox browser to intercept video network requests (MP4, MOV, WebM, AVI, M4V).
```bash ```bash
python main.py # scrape all sites you have credentials for python main.py # scrape all sites you have credentials for
@@ -62,7 +62,7 @@ python download.py [options]
Options: Options:
-o, --output DIR Download directory (default: downloads) -o, --output DIR Download directory (default: downloads)
-t, --titles Name files by post title -t, --titles Name files by post title
--original Name files by original filename derived from the video URL (default) --original Name files by original CloudFront filename (default)
--reorganize Rename existing files to match current naming mode --reorganize Rename existing files to match current naming mode
-w, --workers N Concurrent downloads (default: 4) -w, --workers N Concurrent downloads (default: 4)
-n, --dry-run Print what would be downloaded -n, --dry-run Print what would be downloaded
@@ -128,11 +128,10 @@ Lists filenames that map to more than one source URL, with sizes.
### Estimate total download size ### Estimate total download size
```bash ```bash
python total_size.py # read cached sizes and print summary python total_size.py
python total_size.py --write # probe uncached/stale URLs and refresh the cache
``` ```
Reads cached file sizes from `video_map.json` and prints a summary (total, smallest, largest, average). The default mode never hits the network. Use `--write` to probe any missing or stale entries and persist the results. Fetches `Content-Length` for every video URL in `video_map.json` and prints a size summary. Does not download anything.
## Data files ## Data files
+14 -21
View File
@@ -18,7 +18,7 @@ Importable functions:
from collections import defaultdict from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor, as_completed from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path, PurePosixPath from pathlib import Path, PurePosixPath
from typing import Any, cast from typing import Any, Optional, cast
from collections.abc import Callable from collections.abc import Callable
from urllib.parse import urlparse, unquote from urllib.parse import urlparse, unquote
@@ -121,21 +121,18 @@ def save_video_map(
def build_url_referers(video_map: dict[str, Any]) -> dict[str, str]: def build_url_referers(video_map: dict[str, Any]) -> dict[str, str]:
"""Pure function: return {cdn_video_url: referer} from a flat video map. """Pure function: return {cdn_video_url: site_referer} from a flat video map.
Bunny.net CDN URLs require https://player.mediadelivery.net/ as referer. The flat video map has page URLs as keys; the scheme+netloc of each page URL
All other URLs use the scheme+netloc of the page they were found on. is used as the Referer for all CDN video URLs found in that entry.
""" """
result: dict[str, str] = {} result: dict[str, str] = {}
for page_url, entry in video_map.items(): for page_url, entry in video_map.items():
parsed = urlparse(page_url) parsed = urlparse(page_url)
site_referer = f"{parsed.scheme}://{parsed.netloc}/" referer = f"{parsed.scheme}://{parsed.netloc}/"
for vid in cast(dict[str, Any], entry).get("videos", []): for vid in cast(dict[str, Any], entry).get("videos", []):
vid_url = vid["url"] if isinstance(vid, str):
if urlparse(vid_url).netloc.endswith(".b-cdn.net"): result.setdefault(vid, referer)
result.setdefault(vid_url, "https://player.mediadelivery.net/")
else:
result.setdefault(vid_url, site_referer)
return result return result
@@ -151,17 +148,8 @@ def fmt_size(b: float | int) -> str:
return f"{b:.1f} TB" return f"{b:.1f} TB"
def is_hls_url(url: str) -> bool:
"""True if url is an HLS master playlist (.m3u8)."""
return urlparse(url).path.endswith(".m3u8")
def url_to_filename(url: str) -> str: def url_to_filename(url: str) -> str:
path = PurePosixPath(urlparse(url).path) return unquote(PurePosixPath(urlparse(url).path).name)
# Bunny.net HLS: .../guid/playlist.m3u8 → guid.mp4
if path.name == "playlist.m3u8":
return unquote(path.parent.name) + ".mp4"
return unquote(path.name)
def find_clashes(urls: list[str]) -> dict[str, list[str]]: def find_clashes(urls: list[str]) -> dict[str, list[str]]:
@@ -264,7 +252,12 @@ def fetch_sizes(
def main() -> None: def main() -> None:
vm = load_video_map() vm = load_video_map()
urls = [vid["url"] for entry in vm.values() for vid in entry.get("videos", [])] urls = [
u
for entry in vm.values()
for u in entry.get("videos", [])
if u.startswith("http")
]
clashes = find_clashes(urls) clashes = find_clashes(urls)
-3
View File
@@ -1,9 +1,6 @@
# config.py # config.py
from typing import Final from typing import Final
# How long a cached file size stays valid. 0 = always re-probe; large = effectively forever.
SIZE_CACHE_TTL: Final[int] = 9_999_999 # seconds (~115 days)
SITES: Final[dict[str, dict[str, str]]] = { SITES: Final[dict[str, dict[str, str]]] = {
"jailbirdz": { "jailbirdz": {
"base_url": "https://www.jailbirdz.com", "base_url": "https://www.jailbirdz.com",
+857 -1412
View File
File diff suppressed because one or more lines are too long
+20 -108
View File
@@ -14,13 +14,10 @@ import argparse
from pathlib import Path from pathlib import Path
import re import re
import shutil import shutil
import subprocess
import sys
from collections import defaultdict from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor, as_completed from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Any from typing import Any, Optional
import requests import requests
import time
from check_clashes import ( from check_clashes import (
make_session, make_session,
@@ -31,9 +28,7 @@ from check_clashes import (
build_url_referers, build_url_referers,
fetch_sizes, fetch_sizes,
load_video_map, load_video_map,
save_video_map,
is_valid_url, is_valid_url,
is_hls_url,
VIDEO_MAP_FILE, VIDEO_MAP_FILE,
) )
from config import SITES from config import SITES
@@ -209,30 +204,6 @@ def reorganize(
# ── Download ───────────────────────────────────────────────────────── # ── Download ─────────────────────────────────────────────────────────
def download_hls(url: str, dest: Path, referer: str = "") -> tuple[str, int]:
"""Download an HLS stream via yt-dlp. Returns (status, bytes_written)."""
dest.parent.mkdir(parents=True, exist_ok=True)
if dest.exists():
return "ok", 0
cmd = [
sys.executable, "-m", "yt_dlp",
"--quiet", "--no-warnings",
"--referer", referer or "https://player.mediadelivery.net/",
"-o", str(dest),
url,
]
try:
proc = subprocess.run(cmd, capture_output=True, text=True)
if proc.returncode != 0:
lines = (proc.stderr or proc.stdout).strip().splitlines()
return f"error: {lines[-1] if lines else 'yt-dlp failed'}", 0
if not dest.exists():
return "error: output file missing after yt-dlp", 0
return "ok", dest.stat().st_size
except Exception as e:
return f"error: {e}", 0
def download_one( def download_one(
session: requests.Session, session: requests.Session,
url: str, url: str,
@@ -293,13 +264,12 @@ def download_one(
def collect_urls(video_map: dict[str, Any]) -> list[str]: def collect_urls(video_map: dict[str, Any]) -> list[str]:
urls, seen, skipped = [], set(), 0 urls, seen, skipped = [], set(), 0
for entry in video_map.values(): for entry in video_map.values():
for vid in entry.get("videos", []): for video_url in entry.get("videos", []):
u = vid["url"] if video_url in seen:
if u in seen:
continue continue
seen.add(u) seen.add(video_url)
if is_valid_url(u): if is_valid_url(video_url):
urls.append(u) urls.append(video_url)
else: else:
skipped += 1 skipped += 1
if skipped: if skipped:
@@ -311,41 +281,12 @@ def build_url_title_map(video_map: dict[str, Any]) -> dict[str, str]:
url_title = {} url_title = {}
for entry in video_map.values(): for entry in video_map.values():
title = entry.get("title", "") title = entry.get("title", "")
for vid in entry.get("videos", []): for video_url in entry.get("videos", []):
if vid["url"] not in url_title: if video_url not in url_title:
url_title[vid["url"]] = title url_title[video_url] = title
return url_title return url_title
def _persist_fetched_sizes(newly_fetched: dict[str, int | None]) -> None:
"""Write newly probed sizes back to video_map.json (successful probes only)."""
now = int(time.time())
for site_key in SITES:
vm_site = load_video_map(site_key)
changed = False
for entry in vm_site.values():
for vid in entry.get("videos", []):
if vid["url"] in newly_fetched and vid.get("size") is None and newly_fetched[vid["url"]] is not None:
vid["size"] = newly_fetched[vid["url"]]
vid["size_checked_at"] = now
changed = True
if changed:
save_video_map(vm_site, site_key)
n_saved = sum(1 for s in newly_fetched.values() if s is not None)
if n_saved:
print(f"[+] Cached {n_saved} newly probed size(s).")
def build_url_to_site() -> dict[str, str]:
"""Return {cdn_video_url: site_key} by loading each site's map in turn."""
result: dict[str, str] = {}
for site_key in SITES:
for entry in load_video_map(site_key).values():
for vid in entry.get("videos", []):
result[vid["url"]] = site_key
return result
# ── Main ───────────────────────────────────────────────────────────── # ── Main ─────────────────────────────────────────────────────────────
@@ -400,7 +341,11 @@ def main() -> None:
url_referers = build_url_referers(video_map) url_referers = build_url_referers(video_map)
urls = collect_urls(video_map) urls = collect_urls(video_map)
url_to_site = build_url_to_site() url_to_site: dict[str, str] = {}
for site_key in SITES:
for entry in load_video_map(site_key).values():
for vid_url in entry.get("videos", []):
url_to_site[vid_url] = site_key
if args.sites: if args.sites:
selected = set(args.sites) selected = set(args.sites)
@@ -411,7 +356,7 @@ def main() -> None:
saved = read_mode(args.output) saved = read_mode(args.output)
mode_changed = saved is not None and saved != mode mode_changed = saved is not None and saved != mode
print(f"[+] {len(urls)} video URLs from {VIDEO_MAP_FILE}") print(f"[+] {len(urls)} MP4 URLs from {VIDEO_MAP_FILE}")
print(f"[+] Naming mode: {mode}" + (" (changed!)" if mode_changed else "")) print(f"[+] Naming mode: {mode}" + (" (changed!)" if mode_changed else ""))
# Handle reorganize # Handle reorganize
@@ -464,45 +409,17 @@ def main() -> None:
print(f" … and {len(pending) - 20} more") print(f" … and {len(pending) - 20} more")
return return
cached_sizes: dict[str, int] = { print("\n[+] Fetching remote file sizes…")
vid["url"]: vid["size"]
for entry in video_map.values()
for vid in entry.get("videos", [])
if vid.get("size") is not None
}
newly_fetched: dict[str, int | None] = {}
uncached_pending = [u for u in pending if u not in cached_sizes and not is_hls_url(u)]
session = make_session() session = make_session()
if uncached_pending: remote_sizes = fetch_sizes(pending, workers=20, url_referers=url_referers)
print(
f"\n[+] Fetching remote file sizes ({len(uncached_pending)} uncached, {len(pending) - len(uncached_pending)} cached)…"
)
fetched_pending = fetch_sizes(uncached_pending, workers=20, url_referers=url_referers)
newly_fetched.update(fetched_pending)
remote_sizes: dict[str, int | None] = {**cached_sizes, **fetched_pending}
else:
print(f"\n[+] All {len(pending)} pending sizes cached — skipping probe.")
remote_sizes = dict(cached_sizes)
sized = {u: s for u, s in remote_sizes.items() if s is not None} sized = {u: s for u, s in remote_sizes.items() if s is not None}
total_bytes = sum(sized.values()) total_bytes = sum(sized.values())
print(f"[+] Download size: {fmt_size(total_bytes)} across {len(pending)} files") print(f"[+] Download size: {fmt_size(total_bytes)} across {len(pending)} files")
already_sizes: dict[str, int | None] = {} if already:
already_to_verify = [u for u in already if not is_hls_url(u)] print(f"[+] Verifying {len(already)} existing files…")
if already_to_verify: already_sizes = fetch_sizes(already, workers=20, url_referers=url_referers)
uncached_already = [u for u in already_to_verify if u not in cached_sizes]
if uncached_already:
print(
f"[+] Verifying {len(already_to_verify)} existing files ({len(uncached_already)} uncached)…"
)
fetched_already = fetch_sizes(uncached_already, workers=20, url_referers=url_referers)
newly_fetched.update(fetched_already)
already_sizes = {**cached_sizes, **fetched_already}
else:
print(f"[+] Verifying {len(already_to_verify)} existing files (all sizes cached)…")
already_sizes = dict(cached_sizes)
mismatched = 0 mismatched = 0
for url in already: for url in already:
@@ -521,9 +438,6 @@ def main() -> None:
if mismatched: if mismatched:
print(f"[!] {mismatched} file(s) will be re-downloaded due to size mismatch") print(f"[!] {mismatched} file(s) will be re-downloaded due to size mismatch")
if newly_fetched:
_persist_fetched_sizes(newly_fetched)
print(f"\n[⚡] Downloading with {args.workers} threads…\n") print(f"\n[⚡] Downloading with {args.workers} threads…\n")
completed = 0 completed = 0
@@ -534,8 +448,6 @@ def main() -> None:
def do_download(url: str) -> tuple[str, tuple[str, int]]: def do_download(url: str) -> tuple[str, tuple[str, int]]:
dest = paths[url] dest = paths[url]
if is_hls_url(url):
return url, download_hls(url, dest, url_referers.get(url, ""))
expected = remote_sizes.get(url) expected = remote_sizes.get(url)
return url, download_one( return url, download_one(
session, url, dest, expected, url_referers.get(url, "") session, url, dest, expected, url_referers.get(url, "")
-5
View File
@@ -14,12 +14,10 @@ import os
from pathlib import Path from pathlib import Path
from typing import Literal from typing import Literal
import requests import requests
from dotenv import load_dotenv
from config import SITES from config import SITES
ENV_FILE = Path(".env") ENV_FILE = Path(".env")
COOKIE_PREFIX = "wordpress_logged_in_" COOKIE_PREFIX = "wordpress_logged_in_"
load_dotenv(dotenv_path=ENV_FILE)
def update_env( def update_env(
@@ -74,9 +72,6 @@ def login_and_get_cookie(
"Referer": f"{base_url}/", "Referer": f"{base_url}/",
"Origin": base_url, "Origin": base_url,
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:147.0) Gecko/20100101 Firefox/147.0", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:147.0) Gecko/20100101 Firefox/147.0",
"Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
"X-Requested-With": "XMLHttpRequest",
"Accept": "*/*",
}, },
timeout=30, timeout=30,
) )
+8 -73
View File
@@ -6,7 +6,7 @@ import signal
import asyncio import asyncio
import requests import requests
from pathlib import PurePosixPath from pathlib import PurePosixPath
from typing import Any from typing import Any, Optional
from urllib.parse import urlparse from urllib.parse import urlparse
from dotenv import load_dotenv from dotenv import load_dotenv
from playwright.async_api import async_playwright, BrowserContext from playwright.async_api import async_playwright, BrowserContext
@@ -335,46 +335,6 @@ def extract_title_from_html(html: str) -> str | None:
return None return None
def _is_bunny_playlist(url: str) -> bool:
"""True if url is the root Bunny.net HLS playlist (not a sub-playlist)."""
parsed = urlparse(url)
return (
parsed.netloc.endswith(".b-cdn.net")
and parsed.path.endswith("/playlist.m3u8")
)
def _is_bunny_junk(url: str) -> bool:
"""True if url is a Bunny.net CDN init segment (not a usable video URL)."""
parsed = urlparse(url)
return parsed.netloc.endswith(".b-cdn.net") and PurePosixPath(
parsed.path
).name in {"init.mp4", "init.dmp4"}
def extract_bunny_embed_url(html: str) -> str | None:
"""Return a tokenless Bunny.net embed URL found in an iframe, or None."""
m = re.search(
r'<iframe[^>]+src="(https://player\.mediadelivery\.net/embed/[^"?]+)',
html,
)
return m.group(1) if m else None
def _clear_junk_video_entries(video_map: dict[str, Any], site_key: str) -> int:
"""Reset entries whose only stored videos are CDN init segments. Returns count fixed."""
cleared = 0
for entry in video_map.values():
videos = entry.get("videos", [])
if videos and all(_is_bunny_junk(v["url"]) for v in videos):
entry["videos"] = []
entry.pop("scraped_at", None)
cleared += 1
if cleared:
save_video_map(video_map, site_key)
return cleared
MAX_RETRIES = 2 MAX_RETRIES = 2
@@ -401,9 +361,7 @@ async def worker(
page.on( page.on(
"response", "response",
lambda resp: video_hits.add(resp.url) lambda resp: video_hits.add(resp.url) if _is_video_url(resp.url) else None,
if (_is_video_url(resp.url) or _is_bunny_playlist(resp.url))
else None,
) )
try: try:
@@ -418,7 +376,7 @@ async def worker(
print(f"[W{worker_id}] ({idx + 1}/{total}) {url}{label}") print(f"[W{worker_id}] ({idx + 1}/{total}) {url}{label}")
try: try:
await page.goto(url, wait_until="load", timeout=60000) await page.goto(url, wait_until="networkidle", timeout=60000)
except Exception as e: except Exception as e:
print(f"[W{worker_id}] Navigation error: {e}") print(f"[W{worker_id}] Navigation error: {e}")
if expects_video(url) and attempt < MAX_RETRIES: if expects_video(url) and attempt < MAX_RETRIES:
@@ -493,29 +451,18 @@ async def worker(
found = set(html_videos) | set(video_hits) found = set(html_videos) | set(video_hits)
video_hits.clear() video_hits.clear()
print(f"[W{worker_id}] network hits raw: {found or '(empty)'}")
all_videos = [ all_videos = [
m m
for m in found for m in found
if is_valid_url(m) if is_valid_url(m)
and not _is_bunny_junk(m)
and m and m
not in ( not in (
f"{base_url}/wp-content/plugins/easy-video-player/lib/blank.mp4", f"{base_url}/wp-content/plugins/easy-video-player/lib/blank.mp4",
) )
] ]
if not all_videos:
embed_url = extract_bunny_embed_url(html)
if embed_url:
print(
f"[W{worker_id}] No network hit — iframe fallback: {embed_url}"
)
all_videos = [embed_url]
async with map_lock: async with map_lock:
new_found = set(all_videos) - known new_found = found - known
if new_found: if new_found:
print(f"[W{worker_id}] Found {len(new_found)} new video URLs") print(f"[W{worker_id}] Found {len(new_found)} new video URLs")
known.update(new_found) known.update(new_found)
@@ -529,13 +476,9 @@ async def worker(
entry = video_map.get(url, {}) entry = video_map.get(url, {})
if title: if title:
entry["title"] = title entry["title"] = title
existing_dict: dict[str, Any] = { existing_videos = set(entry.get("videos", []))
vid["url"]: vid for vid in entry.get("videos", []) existing_videos.update(all_videos)
} entry["videos"] = sorted(existing_videos)
for vid_url in all_videos:
if vid_url not in existing_dict:
existing_dict[vid_url] = {"url": vid_url}
entry["videos"] = sorted(existing_dict.values(), key=lambda v: v["url"])
mark_done = bool(all_videos) or not expects_video(url) mark_done = bool(all_videos) or not expects_video(url)
if mark_done: if mark_done:
entry["scraped_at"] = int(time.time()) entry["scraped_at"] = int(time.time())
@@ -572,12 +515,6 @@ async def run_for_site(
urls = load_post_urls(site_key, base_url, wp_api, req_headers) urls = load_post_urls(site_key, base_url, wp_api, req_headers)
video_map = load_video_map(site_key) video_map = load_video_map(site_key)
junk_cleared = _clear_junk_video_entries(video_map, site_key)
if junk_cleared:
print(
f"[{site_key}] Cleared {junk_cleared} entries with junk CDN init segments — will re-scrape."
)
if any( if any(
u not in video_map u not in video_map
or not video_map[u].get("title") or not video_map[u].get("title")
@@ -589,9 +526,7 @@ async def run_for_site(
site_key, base_url, wp_api, video_map, urls, req_headers site_key, base_url, wp_api, video_map, urls, req_headers
) )
known = { known = {u for entry in video_map.values() for u in entry.get("videos", [])}
vid["url"] for entry in video_map.values() for vid in entry.get("videos", [])
}
total = len(urls) total = len(urls)
pending = [] pending = []
-2
View File
@@ -1,5 +1,3 @@
playwright==1.58.0 playwright==1.58.0
python-dotenv==1.2.1 python-dotenv==1.2.1
Requests==2.32.5 Requests==2.32.5
yt-dlp>=2026.3.17
pycryptodomex>=3.23.0
+21 -111
View File
@@ -4,19 +4,15 @@ Importable function:
summarize_sizes(sizes) - return dict with total, smallest, largest, average, failed summarize_sizes(sizes) - return dict with total, smallest, largest, average, failed
""" """
import argparse from typing import Optional, TypedDict
import time
from typing import Any, TypedDict
from check_clashes import ( from check_clashes import (
fmt_size, fmt_size,
fetch_sizes, fetch_sizes,
load_video_map, load_video_map,
save_video_map,
build_url_referers, build_url_referers,
VIDEO_MAP_FILE, VIDEO_MAP_FILE,
) )
from config import SITES, SIZE_CACHE_TTL
class SizeStats(TypedDict): class SizeStats(TypedDict):
@@ -29,7 +25,7 @@ class SizeStats(TypedDict):
failed: list[str] failed: list[str]
def summarize_sizes(sizes: dict[str, int | None]) -> SizeStats: def summarize_sizes(sizes: dict[str, Optional[int]]) -> SizeStats:
"""Given {url: size_or_None}, return a stats dict.""" """Given {url: size_or_None}, return a stats dict."""
known = {u: s for u, s in sizes.items() if s is not None} known = {u: s for u, s in sizes.items() if s is not None}
failed = [u for u, s in sizes.items() if s is None] failed = [u for u, s in sizes.items() if s is None]
@@ -55,13 +51,6 @@ def summarize_sizes(sizes: dict[str, int | None]) -> SizeStats:
} }
def _is_stale(vid: dict[str, Any], now: int) -> bool:
"""True if the cached size is absent or older than SIZE_CACHE_TTL seconds."""
if vid.get("size") is None:
return True
return (now - int(vid.get("size_checked_at", 0))) >= SIZE_CACHE_TTL
# --------------- CLI --------------- # --------------- CLI ---------------
@@ -70,7 +59,24 @@ def _progress(done: int, total: int) -> None:
print(f" {done}/{total}") print(f" {done}/{total}")
def _print_stats(stats: SizeStats) -> None: def main() -> None:
vm = load_video_map()
urls: list[str] = [
u
for entry in vm.values()
for u in entry.get("videos", [])
if u.startswith("http")
]
url_referers = build_url_referers(vm)
print(f"[+] {len(urls)} URLs in {VIDEO_MAP_FILE}")
print("[+] Fetching file sizes (20 threads)…\n")
sizes = fetch_sizes(
urls, workers=20, on_progress=_progress, url_referers=url_referers
)
stats = summarize_sizes(sizes)
print(f"\n{'=' * 45}") print(f"\n{'=' * 45}")
print(f" Sized: {stats['sized']}/{stats['total']} files") print(f" Sized: {stats['sized']}/{stats['total']} files")
print(f" Total: {fmt_size(stats['total_bytes'])}") print(f" Total: {fmt_size(stats['total_bytes'])}")
@@ -78,108 +84,12 @@ def _print_stats(stats: SizeStats) -> None:
print(f" Largest: {fmt_size(stats['largest'])}") print(f" Largest: {fmt_size(stats['largest'])}")
print(f" Average: {fmt_size(stats['average'])}") print(f" Average: {fmt_size(stats['average'])}")
print(f"{'=' * 45}") print(f"{'=' * 45}")
if stats["failed"]: if stats["failed"]:
print(f"\n[!] {len(stats['failed'])} URL(s) could not be sized:") print(f"\n[!] {len(stats['failed'])} URL(s) could not be sized:")
for u in stats["failed"]: for u in stats["failed"]:
print(f" {u}") print(f" {u}")
def _cache_hint(fresh: int, stale: int, missing: int) -> str:
parts = [label for count, label in [(fresh, f"{fresh} fresh"), (stale, f"{stale} stale"), (missing, f"{missing} missing")] if count]
if stale or missing:
suffix = " — run --write to refresh" if stale else " — run --write to probe missing"
else:
suffix = " — all current"
return f"Cache: {', '.join(parts)}{suffix}"
def _run_stats() -> None:
vm = load_video_map()
now = int(time.time())
sizes: dict[str, int | None] = {}
fresh = stale = missing = 0
for entry in vm.values():
for vid in entry.get("videos", []):
url = vid["url"]
if url in sizes:
continue
sizes[url] = vid.get("size")
if vid.get("size") is None:
missing += 1
elif _is_stale(vid, now):
stale += 1
else:
fresh += 1
print(f"[+] {len(sizes)} URLs in {VIDEO_MAP_FILE}")
print(f" {_cache_hint(fresh, stale, missing)}")
_print_stats(summarize_sizes(sizes))
def _apply_fetched(vm: dict[str, Any], fetched: dict[str, int | None], now: int) -> None:
for entry in vm.values():
for vid in entry.get("videos", []):
if vid["url"] in fetched:
vid["size"] = fetched[vid["url"]]
vid["size_checked_at"] = now
def _run_write() -> None:
"""Probe uncached sizes and write them into video_map.json."""
now = int(time.time())
all_fetched: dict[str, int | None] = {}
for site_key in SITES:
vm = load_video_map(site_key)
if not vm:
continue
url_referers = build_url_referers(vm)
to_probe: list[str] = [
vid["url"]
for entry in vm.values()
for vid in entry.get("videos", [])
if _is_stale(vid, now)
]
cached_count = sum(
1
for entry in vm.values()
for vid in entry.get("videos", [])
if not _is_stale(vid, now)
)
print(f"[{site_key}] {cached_count} cached, {len(to_probe)} to probe…")
fetched: dict[str, int | None] = {}
if to_probe:
fetched = fetch_sizes(
to_probe, workers=20, on_progress=_progress, url_referers=url_referers
)
_apply_fetched(vm, fetched, now)
save_video_map(vm, site_key)
all_fetched.update(fetched)
print(f"[{site_key}] Written.")
if all_fetched:
_print_stats(summarize_sizes(all_fetched))
def main() -> None:
parser = argparse.ArgumentParser(description="Calculate total video download size")
parser.add_argument(
"--write",
"-w",
action="store_true",
help="Probe uncached sizes and write them into video_map.json",
)
args = parser.parse_args()
if args.write:
_run_write()
else:
_run_stats()
if __name__ == "__main__": if __name__ == "__main__":
main() main()
+15 -8
View File
@@ -32,11 +32,11 @@ import requests
from dotenv import load_dotenv from dotenv import load_dotenv
from check_clashes import fmt_size, url_to_filename, VIDEO_EXTS, load_video_map from check_clashes import fmt_size, url_to_filename, VIDEO_EXTS, load_video_map
from config import SITES
from download import ( from download import (
collect_urls, collect_urls,
get_paths_for_mode, get_paths_for_mode,
read_mode, read_mode,
build_url_to_site,
MODE_ORIGINAL, MODE_ORIGINAL,
DEFAULT_OUTPUT, DEFAULT_OUTPUT,
) )
@@ -125,12 +125,11 @@ def get_channel_id(base: str, token: str, channel_name: str) -> int:
def get_channel_video_names(base: str, token: str, channel_name: str) -> Counter[str]: def get_channel_video_names(base: str, token: str, channel_name: str) -> Counter[str]:
"""Paginate through the channel and return a Counter of video names.""" """Paginate through the channel and return a Counter of video names."""
counts: Counter[str] = Counter() counts: Counter[str] = Counter()
page_size = 25
start = 0 start = 0
while True: while True:
r = requests.get( r = requests.get(
f"{base}/api/v1/video-channels/{channel_name}/videos", f"{base}/api/v1/video-channels/{channel_name}/videos",
params={"start": start, "count": page_size}, params={"start": start, "count": 100},
headers=api_headers(token), headers=api_headers(token),
timeout=30, timeout=30,
) )
@@ -138,7 +137,7 @@ def get_channel_video_names(base: str, token: str, channel_name: str) -> Counter
data = r.json() data = r.json()
for v in data.get("data", []): for v in data.get("data", []):
counts[v["name"]] += 1 counts[v["name"]] += 1
start += page_size start += 100
if start >= data.get("total", 0): if start >= data.get("total", 0):
break break
return counts return counts
@@ -413,7 +412,11 @@ def build_path_to_meta(
urls = collect_urls(video_map) urls = collect_urls(video_map)
mode = read_mode(input_dir) or MODE_ORIGINAL mode = read_mode(input_dir) or MODE_ORIGINAL
url_to_site = build_url_to_site() url_to_site: dict[str, str] = {}
for site_key in SITES:
for entry in load_video_map(site_key).values():
for vid_url in entry.get("videos", []):
url_to_site[vid_url] = site_key
paths = get_paths_for_mode(mode, urls, video_map, input_dir, url_to_site) paths = get_paths_for_mode(mode, urls, video_map, input_dir, url_to_site)
@@ -426,9 +429,13 @@ def build_path_to_meta(
title = t if isinstance(t, str) else "" title = t if isinstance(t, str) else ""
desc = d if isinstance(d, str) else "" desc = d if isinstance(d, str) else ""
for vid in entry.get("videos", []): videos_any = entry.get("videos", [])
if vid["url"] not in url_meta: if isinstance(videos_any, list):
url_meta[vid["url"]] = {"title": title, "description": desc} for video_url_any in videos_any:
if not isinstance(video_url_any, str):
continue
if video_url_any not in url_meta:
url_meta[video_url_any] = {"title": title, "description": desc}
result: dict[Path, dict[str, str]] = {} result: dict[Path, dict[str, str]] = {}
for url, abs_path in paths.items(): for url, abs_path in paths.items():
+1833 -9563
View File
File diff suppressed because one or more lines are too long