Compare commits

..

1 Commits

Author SHA1 Message Date
HugeFrog24 4a5b0a6ee3 Multi-gooner support because who knows 2026-03-01 01:36:01 +01:00
15 changed files with 3103 additions and 17277 deletions
-5
View File
@@ -13,11 +13,6 @@ PINKCUFFS_USERNAME=your-email-or-username
PINKCUFFS_PASSWORD=your-password
PINKCUFFS_LOGIN_COOKIE=wordpress_logged_in_<hash>=<value>
# femuniverse.com credentials (separate membership)
FEMUNIVERSE_USERNAME=your-email-or-username
FEMUNIVERSE_PASSWORD=your-password
FEMUNIVERSE_LOGIN_COOKIE=wordpress_logged_in_<hash>=<value>
# PeerTube upload target
PEERTUBE_URL=https://your-peertube-instance.example
PEERTUBE_USER=admin
@@ -0,0 +1,14 @@
---
alwaysApply: true
always_on: true
trigger: always_on
applyTo: "**"
description: Snyk Security At Inception
---
# Project security best practices
- Always run snyk_code_scan tool for new first party code that is generated in a Snyk-supported language.
- If any security issues are found based on newly introduced or modified code or dependencies, attempt to fix the issues using the results context from Snyk.
- Rescan the code after fixing the issues to ensure that the issues were fixed and that there are no newly introduced issues.
- Repeat this process until no new issues are found.
-2
View File
@@ -38,8 +38,6 @@ jobs:
JAILBIRDZ_PASSWORD: ${{ secrets.JAILBIRDZ_PASSWORD }}
PINKCUFFS_USERNAME: ${{ secrets.PINKCUFFS_USERNAME }}
PINKCUFFS_PASSWORD: ${{ secrets.PINKCUFFS_PASSWORD }}
FEMUNIVERSE_USERNAME: ${{ secrets.FEMUNIVERSE_USERNAME }}
FEMUNIVERSE_PASSWORD: ${{ secrets.FEMUNIVERSE_PASSWORD }}
- name: Commit updated video_map.json
if: always() # save progress even if main.py crashed or timed out
-3
View File
@@ -1,6 +1,3 @@
# Local Claude instructions
.claude/settings.local.json
# Temporary cache
__pycache__/
.ruff_cache/
+8 -16
View File
@@ -1,6 +1,6 @@
# 𝒥𝒶𝒾𝓁𝒷𝒾𝓇𝒹𝓏-𝒹𝓁
Jailbirdz.com, Pinkcuffs.com, and Femuniverse.com are Arizona-based subscription video sites publishing arrest and jail roleplay scenarios featuring women. This tool scrapes the member area of any combination of these sites, downloads the videos, and re-hosts them on a self-owned PeerTube instance.
Jailbirdz.com and Pinkcuffs.com are Arizona-based subscription video sites publishing arrest and jail roleplay scenarios featuring women. This tool scrapes the member area of one or both sites, downloads the videos, and re-hosts them on a self-owned PeerTube instance.
> [!NOTE]
> This tool does not bypass authentication, modify the site, or intercept anything it isn't entitled to. A valid, paid membership is required. The scraper authenticates using your own session cookie and accesses only content your account can already view in a browser.
@@ -23,9 +23,9 @@ cp .env.example .env
Set credentials for whichever sites you have a membership on. You don't need both.
**Option A — credentials (recommended):** set `JAILBIRDZ_USERNAME` + `JAILBIRDZ_PASSWORD` (and/or the `PINKCUFFS_*` / `FEMUNIVERSE_*` equivalents) in `.env`. `main.py` logs in automatically on startup.
**Option A — credentials (recommended):** set `JAILBIRDZ_USERNAME` + `JAILBIRDZ_PASSWORD` (and/or the `PINKCUFFS_*` equivalents) in `.env`. `main.py` logs in automatically on startup.
**Option B — manual cookie:** set `JAILBIRDZ_LOGIN_COOKIE` (and/or `PINKCUFFS_LOGIN_COOKIE` / `FEMUNIVERSE_LOGIN_COOKIE`) yourself. Get the value from browser DevTools → Storage → Cookies — copy the full `name=value` of the `wordpress_logged_in_*` cookie.
**Option B — manual cookie:** set `JAILBIRDZ_LOGIN_COOKIE` (and/or `PINKCUFFS_LOGIN_COOKIE`) yourself. Get the value from browser DevTools → Storage → Cookies — copy the full `name=value` of the `wordpress_logged_in_*` cookie.
Sites with no credentials are skipped automatically when running `python main.py`.
@@ -35,8 +35,6 @@ Sites with no credentials are skipped automatically when running `python main.py
- `JAILBIRDZ_LOGIN_COOKIE` — jailbirdz.com session cookie (fallback).
- `PINKCUFFS_USERNAME` / `PINKCUFFS_PASSWORD` — pinkcuffs.com login.
- `PINKCUFFS_LOGIN_COOKIE` — pinkcuffs.com session cookie (fallback).
- `FEMUNIVERSE_USERNAME` / `FEMUNIVERSE_PASSWORD` — femuniverse.com login.
- `FEMUNIVERSE_LOGIN_COOKIE` — femuniverse.com session cookie (fallback).
- `PEERTUBE_URL` — base URL of your PeerTube instance.
- `PEERTUBE_USER` — PeerTube username.
- `PEERTUBE_CHANNEL` — channel to upload to.
@@ -46,13 +44,12 @@ Sites with no credentials are skipped automatically when running `python main.py
### 1. Scrape
Discovers all post URLs via the WordPress REST API, then visits each page with a headless Firefox browser to intercept video network requests (MP4, MOV, WebM, AVI, M4V, HLS/M3U8).
Discovers all post URLs via the WordPress REST API, then visits each page with a headless Firefox browser to intercept video network requests (MP4, MOV, WebM, AVI, M4V).
```bash
python main.py # scrape all sites you have credentials for
python main.py --site jailbirdz # scrape one site only
python main.py --site pinkcuffs --site jailbirdz # explicit multi-site
python main.py --site femuniverse # femuniverse only
```
Results are written to `video_map.json`. Safe to re-run — already-scraped posts are skipped.
@@ -65,11 +62,11 @@ python download.py [options]
Options:
-o, --output DIR Download directory (default: downloads)
-t, --titles Name files by post title
--original Name files by original filename derived from the video URL (default)
--original Name files by original CloudFront filename (default)
--reorganize Rename existing files to match current naming mode
-w, --workers N Concurrent downloads (default: 4)
-n, --dry-run Print what would be downloaded
--site SITE Limit to one site (jailbirdz, pinkcuffs, or femuniverse); repeatable
--site SITE Limit to one site (jailbirdz or pinkcuffs); repeatable
```
Resumes partial downloads. The chosen naming mode is saved to `.naming_mode` inside the output directory and persists across runs. Filenames that would clash are placed into subfolders.
@@ -108,10 +105,6 @@ gh secret set JAILBIRDZ_PASSWORD
# pinkcuffs (if you have a membership)
gh secret set PINKCUFFS_USERNAME
gh secret set PINKCUFFS_PASSWORD
# femuniverse (if you have a membership)
gh secret set FEMUNIVERSE_USERNAME
gh secret set FEMUNIVERSE_PASSWORD
```
**Seed CI with your current progress before the first run:**
@@ -135,11 +128,10 @@ Lists filenames that map to more than one source URL, with sizes.
### Estimate total download size
```bash
python total_size.py # read cached sizes and print summary
python total_size.py --write # probe uncached/stale URLs and refresh the cache
python total_size.py
```
Reads cached file sizes from `video_map.json` and prints a summary (total, smallest, largest, average). The default mode never hits the network. Use `--write` to probe any missing or stale entries and persist the results.
Fetches `Content-Length` for every video URL in `video_map.json` and prints a size summary. Does not download anything.
## Data files
+27 -79
View File
@@ -18,7 +18,7 @@ Importable functions:
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path, PurePosixPath
from typing import Any, cast
from typing import Any, Optional, cast
from collections.abc import Callable
from urllib.parse import urlparse, unquote
@@ -121,21 +121,18 @@ def save_video_map(
def build_url_referers(video_map: dict[str, Any]) -> dict[str, str]:
"""Pure function: return {cdn_video_url: referer} from a flat video map.
"""Pure function: return {cdn_video_url: site_referer} from a flat video map.
Bunny.net CDN URLs require https://player.mediadelivery.net/ as referer.
All other URLs use the scheme+netloc of the page they were found on.
The flat video map has page URLs as keys; the scheme+netloc of each page URL
is used as the Referer for all CDN video URLs found in that entry.
"""
result: dict[str, str] = {}
for page_url, entry in video_map.items():
parsed = urlparse(page_url)
site_referer = f"{parsed.scheme}://{parsed.netloc}/"
referer = f"{parsed.scheme}://{parsed.netloc}/"
for vid in cast(dict[str, Any], entry).get("videos", []):
vid_url = vid["url"]
if urlparse(vid_url).netloc.endswith(".b-cdn.net"):
result.setdefault(vid_url, "https://player.mediadelivery.net/")
else:
result.setdefault(vid_url, site_referer)
if isinstance(vid, str):
result.setdefault(vid, referer)
return result
@@ -151,17 +148,8 @@ def fmt_size(b: float | int) -> str:
return f"{b:.1f} TB"
def is_hls_url(url: str) -> bool:
"""True if url is an HLS master playlist (.m3u8)."""
return urlparse(url).path.endswith(".m3u8")
def url_to_filename(url: str) -> str:
path = PurePosixPath(urlparse(url).path)
# Bunny.net HLS: .../guid/playlist.m3u8 → guid.mp4
if path.name == "playlist.m3u8":
return unquote(path.parent.name) + ".mp4"
return unquote(path.name)
return unquote(PurePosixPath(urlparse(url).path).name)
def find_clashes(urls: list[str]) -> dict[str, list[str]]:
@@ -178,72 +166,27 @@ def find_clashes(urls: list[str]) -> dict[str, list[str]]:
}
def _path_folders(url: str) -> list[str]:
"""Decoded URL path segments above the filename (filename excluded)."""
parts = [unquote(p) for p in urlparse(url).path.split("/") if p]
return parts[:-1]
def _disambiguate_group(group: list[str]) -> dict[str, tuple[str, ...]]:
"""Find the smallest depth of trailing folder segments that gives every URL in the group
a unique subfolder path. Returns {url: subfolder_segments}.
Comparison is case-insensitive so the result is safe on NTFS/APFS as well as ext4.
"""
folders = {u: _path_folders(u) for u in group}
max_depth = max((len(f) for f in folders.values()), default=0)
for depth in range(1, max_depth + 1):
keys = {u: tuple(p.lower() for p in folders[u][-depth:]) for u in group}
if len(set(keys.values())) == len(group):
return {u: tuple(folders[u][-depth:]) for u in group}
raise RuntimeError(
f"Cannot disambiguate URL group sharing filename and full parent path: {group}"
)
def _clash_subfolder(url: str) -> str:
"""Parent path segment used as disambiguator for clashing filenames."""
parts = urlparse(url).path.rstrip("/").split("/")
return unquote(parts[-2]) if len(parts) >= 2 else "unknown"
def build_download_paths(
urls: list[str],
output_dir: str | Path,
) -> dict[str, Path]:
"""Map each URL to a unique local file path.
"""Map each URL to a local file path. Flat layout; clashing names get a subfolder."""
clashes = find_clashes(urls)
clash_lower = {name.lower() for name in clashes}
Unique filenames go directly under output_dir. Filenames that clash
(case-insensitively) get the smallest tail of their URL path prepended
that makes every URL in the clashing group unique — e.g. /2018/Daisy/foo.mp4
and /2023/Daisy/foo.mp4 land at 2018/Daisy/foo.mp4 and 2023/Daisy/foo.mp4
rather than colliding at Daisy/foo.mp4.
"""
by_lower: defaultdict[str, list[str]] = defaultdict(list)
paths = {}
for url in urls:
by_lower[url_to_filename(url).lower()].append(url)
base = Path(output_dir)
paths: dict[str, Path] = {}
for group in by_lower.values():
if len(group) == 1:
url = group[0]
paths[url] = base / url_to_filename(url)
continue
subfolders = _disambiguate_group(group)
for url in group:
paths[url] = base.joinpath(*subfolders[url]) / url_to_filename(url)
# Defensive: every URL must map to a distinct destination path.
# Case-fold the comparison since callers commonly run on NTFS/APFS where
# "Daisy/foo" and "daisy/foo" are the same file on disk.
seen: dict[str, str] = {}
for url, p in paths.items():
key = str(p).lower()
if key in seen:
raise RuntimeError(
f"Path collision after disambiguation: {url!r} and {seen[key]!r} "
f"both map to {p}"
)
seen[key] = url
filename = url_to_filename(url)
if filename.lower() in clash_lower:
paths[url] = Path(output_dir) / _clash_subfolder(url) / filename
else:
paths[url] = Path(output_dir) / filename
return paths
@@ -309,7 +252,12 @@ def fetch_sizes(
def main() -> None:
vm = load_video_map()
urls = [vid["url"] for entry in vm.values() for vid in entry.get("videos", [])]
urls = [
u
for entry in vm.values()
for u in entry.get("videos", [])
if u.startswith("http")
]
clashes = find_clashes(urls)
-8
View File
@@ -1,9 +1,6 @@
# config.py
from typing import Final
# How long a cached file size stays valid. 0 = always re-probe; large = effectively forever.
SIZE_CACHE_TTL: Final[int] = 9_999_999 # seconds (~115 days)
SITES: Final[dict[str, dict[str, str]]] = {
"jailbirdz": {
"base_url": "https://www.jailbirdz.com",
@@ -15,9 +12,4 @@ SITES: Final[dict[str, dict[str, str]]] = {
"cookie_domain": "pinkcuffs.com",
"env_prefix": "PINKCUFFS",
},
"femuniverse": {
"base_url": "https://www.femuniverse.com",
"cookie_domain": "femuniverse.com",
"env_prefix": "FEMUNIVERSE",
},
}
+880 -3881
View File
File diff suppressed because one or more lines are too long
+20 -108
View File
@@ -14,13 +14,10 @@ import argparse
from pathlib import Path
import re
import shutil
import subprocess
import sys
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Any
from typing import Any, Optional
import requests
import time
from check_clashes import (
make_session,
@@ -31,9 +28,7 @@ from check_clashes import (
build_url_referers,
fetch_sizes,
load_video_map,
save_video_map,
is_valid_url,
is_hls_url,
VIDEO_MAP_FILE,
)
from config import SITES
@@ -209,30 +204,6 @@ def reorganize(
# ── Download ─────────────────────────────────────────────────────────
def download_hls(url: str, dest: Path, referer: str = "") -> tuple[str, int]:
"""Download an HLS stream via yt-dlp. Returns (status, bytes_written)."""
dest.parent.mkdir(parents=True, exist_ok=True)
if dest.exists():
return "ok", 0
cmd = [
sys.executable, "-m", "yt_dlp",
"--quiet", "--no-warnings",
"--referer", referer or "https://player.mediadelivery.net/",
"-o", str(dest),
url,
]
try:
proc = subprocess.run(cmd, capture_output=True, text=True)
if proc.returncode != 0:
lines = (proc.stderr or proc.stdout).strip().splitlines()
return f"error: {lines[-1] if lines else 'yt-dlp failed'}", 0
if not dest.exists():
return "error: output file missing after yt-dlp", 0
return "ok", dest.stat().st_size
except Exception as e:
return f"error: {e}", 0
def download_one(
session: requests.Session,
url: str,
@@ -293,13 +264,12 @@ def download_one(
def collect_urls(video_map: dict[str, Any]) -> list[str]:
urls, seen, skipped = [], set(), 0
for entry in video_map.values():
for vid in entry.get("videos", []):
u = vid["url"]
if u in seen:
for video_url in entry.get("videos", []):
if video_url in seen:
continue
seen.add(u)
if is_valid_url(u):
urls.append(u)
seen.add(video_url)
if is_valid_url(video_url):
urls.append(video_url)
else:
skipped += 1
if skipped:
@@ -311,41 +281,12 @@ def build_url_title_map(video_map: dict[str, Any]) -> dict[str, str]:
url_title = {}
for entry in video_map.values():
title = entry.get("title", "")
for vid in entry.get("videos", []):
if vid["url"] not in url_title:
url_title[vid["url"]] = title
for video_url in entry.get("videos", []):
if video_url not in url_title:
url_title[video_url] = title
return url_title
def _persist_fetched_sizes(newly_fetched: dict[str, int | None]) -> None:
"""Write newly probed sizes back to video_map.json (successful probes only)."""
now = int(time.time())
for site_key in SITES:
vm_site = load_video_map(site_key)
changed = False
for entry in vm_site.values():
for vid in entry.get("videos", []):
if vid["url"] in newly_fetched and vid.get("size") is None and newly_fetched[vid["url"]] is not None:
vid["size"] = newly_fetched[vid["url"]]
vid["size_checked_at"] = now
changed = True
if changed:
save_video_map(vm_site, site_key)
n_saved = sum(1 for s in newly_fetched.values() if s is not None)
if n_saved:
print(f"[+] Cached {n_saved} newly probed size(s).")
def build_url_to_site() -> dict[str, str]:
"""Return {cdn_video_url: site_key} by loading each site's map in turn."""
result: dict[str, str] = {}
for site_key in SITES:
for entry in load_video_map(site_key).values():
for vid in entry.get("videos", []):
result[vid["url"]] = site_key
return result
# ── Main ─────────────────────────────────────────────────────────────
@@ -400,7 +341,11 @@ def main() -> None:
url_referers = build_url_referers(video_map)
urls = collect_urls(video_map)
url_to_site = build_url_to_site()
url_to_site: dict[str, str] = {}
for site_key in SITES:
for entry in load_video_map(site_key).values():
for vid_url in entry.get("videos", []):
url_to_site[vid_url] = site_key
if args.sites:
selected = set(args.sites)
@@ -411,7 +356,7 @@ def main() -> None:
saved = read_mode(args.output)
mode_changed = saved is not None and saved != mode
print(f"[+] {len(urls)} video URLs from {VIDEO_MAP_FILE}")
print(f"[+] {len(urls)} MP4 URLs from {VIDEO_MAP_FILE}")
print(f"[+] Naming mode: {mode}" + (" (changed!)" if mode_changed else ""))
# Handle reorganize
@@ -464,45 +409,17 @@ def main() -> None:
print(f" … and {len(pending) - 20} more")
return
cached_sizes: dict[str, int] = {
vid["url"]: vid["size"]
for entry in video_map.values()
for vid in entry.get("videos", [])
if vid.get("size") is not None
}
newly_fetched: dict[str, int | None] = {}
uncached_pending = [u for u in pending if u not in cached_sizes and not is_hls_url(u)]
print("\n[+] Fetching remote file sizes…")
session = make_session()
if uncached_pending:
print(
f"\n[+] Fetching remote file sizes ({len(uncached_pending)} uncached, {len(pending) - len(uncached_pending)} cached)…"
)
fetched_pending = fetch_sizes(uncached_pending, workers=20, url_referers=url_referers)
newly_fetched.update(fetched_pending)
remote_sizes: dict[str, int | None] = {**cached_sizes, **fetched_pending}
else:
print(f"\n[+] All {len(pending)} pending sizes cached — skipping probe.")
remote_sizes = dict(cached_sizes)
remote_sizes = fetch_sizes(pending, workers=20, url_referers=url_referers)
sized = {u: s for u, s in remote_sizes.items() if s is not None}
total_bytes = sum(sized.values())
print(f"[+] Download size: {fmt_size(total_bytes)} across {len(pending)} files")
already_sizes: dict[str, int | None] = {}
already_to_verify = [u for u in already if not is_hls_url(u)]
if already_to_verify:
uncached_already = [u for u in already_to_verify if u not in cached_sizes]
if uncached_already:
print(
f"[+] Verifying {len(already_to_verify)} existing files ({len(uncached_already)} uncached)…"
)
fetched_already = fetch_sizes(uncached_already, workers=20, url_referers=url_referers)
newly_fetched.update(fetched_already)
already_sizes = {**cached_sizes, **fetched_already}
else:
print(f"[+] Verifying {len(already_to_verify)} existing files (all sizes cached)…")
already_sizes = dict(cached_sizes)
if already:
print(f"[+] Verifying {len(already)} existing files…")
already_sizes = fetch_sizes(already, workers=20, url_referers=url_referers)
mismatched = 0
for url in already:
@@ -521,9 +438,6 @@ def main() -> None:
if mismatched:
print(f"[!] {mismatched} file(s) will be re-downloaded due to size mismatch")
if newly_fetched:
_persist_fetched_sizes(newly_fetched)
print(f"\n[⚡] Downloading with {args.workers} threads…\n")
completed = 0
@@ -534,8 +448,6 @@ def main() -> None:
def do_download(url: str) -> tuple[str, tuple[str, int]]:
dest = paths[url]
if is_hls_url(url):
return url, download_hls(url, dest, url_referers.get(url, ""))
expected = remote_sizes.get(url)
return url, download_one(
session, url, dest, expected, url_referers.get(url, "")
-5
View File
@@ -14,12 +14,10 @@ import os
from pathlib import Path
from typing import Literal
import requests
from dotenv import load_dotenv
from config import SITES
ENV_FILE = Path(".env")
COOKIE_PREFIX = "wordpress_logged_in_"
load_dotenv(dotenv_path=ENV_FILE)
def update_env(
@@ -74,9 +72,6 @@ def login_and_get_cookie(
"Referer": f"{base_url}/",
"Origin": base_url,
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:147.0) Gecko/20100101 Firefox/147.0",
"Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
"X-Requested-With": "XMLHttpRequest",
"Accept": "*/*",
},
timeout=30,
)
+8 -73
View File
@@ -6,7 +6,7 @@ import signal
import asyncio
import requests
from pathlib import PurePosixPath
from typing import Any
from typing import Any, Optional
from urllib.parse import urlparse
from dotenv import load_dotenv
from playwright.async_api import async_playwright, BrowserContext
@@ -335,46 +335,6 @@ def extract_title_from_html(html: str) -> str | None:
return None
def _is_bunny_playlist(url: str) -> bool:
"""True if url is the root Bunny.net HLS playlist (not a sub-playlist)."""
parsed = urlparse(url)
return (
parsed.netloc.endswith(".b-cdn.net")
and parsed.path.endswith("/playlist.m3u8")
)
def _is_bunny_junk(url: str) -> bool:
"""True if url is a Bunny.net CDN init segment (not a usable video URL)."""
parsed = urlparse(url)
return parsed.netloc.endswith(".b-cdn.net") and PurePosixPath(
parsed.path
).name in {"init.mp4", "init.dmp4"}
def extract_bunny_embed_url(html: str) -> str | None:
"""Return a tokenless Bunny.net embed URL found in an iframe, or None."""
m = re.search(
r'<iframe[^>]+src="(https://player\.mediadelivery\.net/embed/[^"?]+)',
html,
)
return m.group(1) if m else None
def _clear_junk_video_entries(video_map: dict[str, Any], site_key: str) -> int:
"""Reset entries whose only stored videos are CDN init segments. Returns count fixed."""
cleared = 0
for entry in video_map.values():
videos = entry.get("videos", [])
if videos and all(_is_bunny_junk(v["url"]) for v in videos):
entry["videos"] = []
entry.pop("scraped_at", None)
cleared += 1
if cleared:
save_video_map(video_map, site_key)
return cleared
MAX_RETRIES = 2
@@ -401,9 +361,7 @@ async def worker(
page.on(
"response",
lambda resp: video_hits.add(resp.url)
if (_is_video_url(resp.url) or _is_bunny_playlist(resp.url))
else None,
lambda resp: video_hits.add(resp.url) if _is_video_url(resp.url) else None,
)
try:
@@ -418,7 +376,7 @@ async def worker(
print(f"[W{worker_id}] ({idx + 1}/{total}) {url}{label}")
try:
await page.goto(url, wait_until="load", timeout=60000)
await page.goto(url, wait_until="networkidle", timeout=60000)
except Exception as e:
print(f"[W{worker_id}] Navigation error: {e}")
if expects_video(url) and attempt < MAX_RETRIES:
@@ -493,29 +451,18 @@ async def worker(
found = set(html_videos) | set(video_hits)
video_hits.clear()
print(f"[W{worker_id}] network hits raw: {found or '(empty)'}")
all_videos = [
m
for m in found
if is_valid_url(m)
and not _is_bunny_junk(m)
and m
not in (
f"{base_url}/wp-content/plugins/easy-video-player/lib/blank.mp4",
)
]
if not all_videos:
embed_url = extract_bunny_embed_url(html)
if embed_url:
print(
f"[W{worker_id}] No network hit — iframe fallback: {embed_url}"
)
all_videos = [embed_url]
async with map_lock:
new_found = set(all_videos) - known
new_found = found - known
if new_found:
print(f"[W{worker_id}] Found {len(new_found)} new video URLs")
known.update(new_found)
@@ -529,13 +476,9 @@ async def worker(
entry = video_map.get(url, {})
if title:
entry["title"] = title
existing_dict: dict[str, Any] = {
vid["url"]: vid for vid in entry.get("videos", [])
}
for vid_url in all_videos:
if vid_url not in existing_dict:
existing_dict[vid_url] = {"url": vid_url}
entry["videos"] = sorted(existing_dict.values(), key=lambda v: v["url"])
existing_videos = set(entry.get("videos", []))
existing_videos.update(all_videos)
entry["videos"] = sorted(existing_videos)
mark_done = bool(all_videos) or not expects_video(url)
if mark_done:
entry["scraped_at"] = int(time.time())
@@ -572,12 +515,6 @@ async def run_for_site(
urls = load_post_urls(site_key, base_url, wp_api, req_headers)
video_map = load_video_map(site_key)
junk_cleared = _clear_junk_video_entries(video_map, site_key)
if junk_cleared:
print(
f"[{site_key}] Cleared {junk_cleared} entries with junk CDN init segments — will re-scrape."
)
if any(
u not in video_map
or not video_map[u].get("title")
@@ -589,9 +526,7 @@ async def run_for_site(
site_key, base_url, wp_api, video_map, urls, req_headers
)
known = {
vid["url"] for entry in video_map.values() for vid in entry.get("videos", [])
}
known = {u for entry in video_map.values() for u in entry.get("videos", [])}
total = len(urls)
pending = []
-2
View File
@@ -1,5 +1,3 @@
playwright==1.58.0
python-dotenv==1.2.1
Requests==2.32.5
yt-dlp>=2026.3.17
pycryptodomex>=3.23.0
+21 -111
View File
@@ -4,19 +4,15 @@ Importable function:
summarize_sizes(sizes) - return dict with total, smallest, largest, average, failed
"""
import argparse
import time
from typing import Any, TypedDict
from typing import Optional, TypedDict
from check_clashes import (
fmt_size,
fetch_sizes,
load_video_map,
save_video_map,
build_url_referers,
VIDEO_MAP_FILE,
)
from config import SITES, SIZE_CACHE_TTL
class SizeStats(TypedDict):
@@ -29,7 +25,7 @@ class SizeStats(TypedDict):
failed: list[str]
def summarize_sizes(sizes: dict[str, int | None]) -> SizeStats:
def summarize_sizes(sizes: dict[str, Optional[int]]) -> SizeStats:
"""Given {url: size_or_None}, return a stats dict."""
known = {u: s for u, s in sizes.items() if s is not None}
failed = [u for u, s in sizes.items() if s is None]
@@ -55,13 +51,6 @@ def summarize_sizes(sizes: dict[str, int | None]) -> SizeStats:
}
def _is_stale(vid: dict[str, Any], now: int) -> bool:
"""True if the cached size is absent or older than SIZE_CACHE_TTL seconds."""
if vid.get("size") is None:
return True
return (now - int(vid.get("size_checked_at", 0))) >= SIZE_CACHE_TTL
# --------------- CLI ---------------
@@ -70,7 +59,24 @@ def _progress(done: int, total: int) -> None:
print(f" {done}/{total}")
def _print_stats(stats: SizeStats) -> None:
def main() -> None:
vm = load_video_map()
urls: list[str] = [
u
for entry in vm.values()
for u in entry.get("videos", [])
if u.startswith("http")
]
url_referers = build_url_referers(vm)
print(f"[+] {len(urls)} URLs in {VIDEO_MAP_FILE}")
print("[+] Fetching file sizes (20 threads)…\n")
sizes = fetch_sizes(
urls, workers=20, on_progress=_progress, url_referers=url_referers
)
stats = summarize_sizes(sizes)
print(f"\n{'=' * 45}")
print(f" Sized: {stats['sized']}/{stats['total']} files")
print(f" Total: {fmt_size(stats['total_bytes'])}")
@@ -78,108 +84,12 @@ def _print_stats(stats: SizeStats) -> None:
print(f" Largest: {fmt_size(stats['largest'])}")
print(f" Average: {fmt_size(stats['average'])}")
print(f"{'=' * 45}")
if stats["failed"]:
print(f"\n[!] {len(stats['failed'])} URL(s) could not be sized:")
for u in stats["failed"]:
print(f" {u}")
def _cache_hint(fresh: int, stale: int, missing: int) -> str:
parts = [label for count, label in [(fresh, f"{fresh} fresh"), (stale, f"{stale} stale"), (missing, f"{missing} missing")] if count]
if stale or missing:
suffix = " — run --write to refresh" if stale else " — run --write to probe missing"
else:
suffix = " — all current"
return f"Cache: {', '.join(parts)}{suffix}"
def _run_stats() -> None:
vm = load_video_map()
now = int(time.time())
sizes: dict[str, int | None] = {}
fresh = stale = missing = 0
for entry in vm.values():
for vid in entry.get("videos", []):
url = vid["url"]
if url in sizes:
continue
sizes[url] = vid.get("size")
if vid.get("size") is None:
missing += 1
elif _is_stale(vid, now):
stale += 1
else:
fresh += 1
print(f"[+] {len(sizes)} URLs in {VIDEO_MAP_FILE}")
print(f" {_cache_hint(fresh, stale, missing)}")
_print_stats(summarize_sizes(sizes))
def _apply_fetched(vm: dict[str, Any], fetched: dict[str, int | None], now: int) -> None:
for entry in vm.values():
for vid in entry.get("videos", []):
if vid["url"] in fetched:
vid["size"] = fetched[vid["url"]]
vid["size_checked_at"] = now
def _run_write() -> None:
"""Probe uncached sizes and write them into video_map.json."""
now = int(time.time())
all_fetched: dict[str, int | None] = {}
for site_key in SITES:
vm = load_video_map(site_key)
if not vm:
continue
url_referers = build_url_referers(vm)
to_probe: list[str] = [
vid["url"]
for entry in vm.values()
for vid in entry.get("videos", [])
if _is_stale(vid, now)
]
cached_count = sum(
1
for entry in vm.values()
for vid in entry.get("videos", [])
if not _is_stale(vid, now)
)
print(f"[{site_key}] {cached_count} cached, {len(to_probe)} to probe…")
fetched: dict[str, int | None] = {}
if to_probe:
fetched = fetch_sizes(
to_probe, workers=20, on_progress=_progress, url_referers=url_referers
)
_apply_fetched(vm, fetched, now)
save_video_map(vm, site_key)
all_fetched.update(fetched)
print(f"[{site_key}] Written.")
if all_fetched:
_print_stats(summarize_sizes(all_fetched))
def main() -> None:
parser = argparse.ArgumentParser(description="Calculate total video download size")
parser.add_argument(
"--write",
"-w",
action="store_true",
help="Probe uncached sizes and write them into video_map.json",
)
args = parser.parse_args()
if args.write:
_run_write()
else:
_run_stats()
if __name__ == "__main__":
main()
+15 -14
View File
@@ -32,11 +32,11 @@ import requests
from dotenv import load_dotenv
from check_clashes import fmt_size, url_to_filename, VIDEO_EXTS, load_video_map
from config import SITES
from download import (
collect_urls,
get_paths_for_mode,
read_mode,
build_url_to_site,
MODE_ORIGINAL,
DEFAULT_OUTPUT,
)
@@ -49,7 +49,6 @@ DEFAULT_BATCH_SIZE = 1
DEFAULT_POLL = 30
UPLOADED_FILE = ".uploaded"
PT_NAME_MAX = 120
PT_DESC_MIN = 3 # PeerTube rejects descriptions shorter than this
# ── Text helpers ─────────────────────────────────────────────────────
@@ -63,11 +62,6 @@ def clean_description(raw: str) -> str:
text = re.sub(r"<[^>]+>", "", text)
text = html.unescape(text)
text = re.sub(r"\n{3,}", "\n\n", text).strip()
# PeerTube enforces a 3-char minimum on descriptions; a sub-minimum
# description (e.g. a stray ".") makes the upload-init 400. Drop it so
# it's omitted from the request rather than rejected.
if len(text) < PT_DESC_MIN:
return ""
return text[:10000]
@@ -131,12 +125,11 @@ def get_channel_id(base: str, token: str, channel_name: str) -> int:
def get_channel_video_names(base: str, token: str, channel_name: str) -> Counter[str]:
"""Paginate through the channel and return a Counter of video names."""
counts: Counter[str] = Counter()
page_size = 25
start = 0
while True:
r = requests.get(
f"{base}/api/v1/video-channels/{channel_name}/videos",
params={"start": start, "count": page_size},
params={"start": start, "count": 100},
headers=api_headers(token),
timeout=30,
)
@@ -144,7 +137,7 @@ def get_channel_video_names(base: str, token: str, channel_name: str) -> Counter
data = r.json()
for v in data.get("data", []):
counts[v["name"]] += 1
start += page_size
start += 100
if start >= data.get("total", 0):
break
return counts
@@ -419,7 +412,11 @@ def build_path_to_meta(
urls = collect_urls(video_map)
mode = read_mode(input_dir) or MODE_ORIGINAL
url_to_site = build_url_to_site()
url_to_site: dict[str, str] = {}
for site_key in SITES:
for entry in load_video_map(site_key).values():
for vid_url in entry.get("videos", []):
url_to_site[vid_url] = site_key
paths = get_paths_for_mode(mode, urls, video_map, input_dir, url_to_site)
@@ -432,9 +429,13 @@ def build_path_to_meta(
title = t if isinstance(t, str) else ""
desc = d if isinstance(d, str) else ""
for vid in entry.get("videos", []):
if vid["url"] not in url_meta:
url_meta[vid["url"]] = {"title": title, "description": desc}
videos_any = entry.get("videos", [])
if isinstance(videos_any, list):
for video_url_any in videos_any:
if not isinstance(video_url_any, str):
continue
if video_url_any not in url_meta:
url_meta[video_url_any] = {"title": title, "description": desc}
result: dict[Path, dict[str, str]] = {}
for url, abs_path in paths.items():
+1833 -12693
View File
File diff suppressed because one or more lines are too long