Fixed pagination bug

This commit is contained in:
HugeFrog24
2026-03-14 06:49:05 +01:00
parent 80444405e9
commit 112ea70c00
17 changed files with 25346 additions and 12457 deletions
+14
View File
@@ -0,0 +1,14 @@
{
"permissions": {
"allow": [
"Bash(jq -r \".\"\"/api/v1/video-channels/{channelHandle}/videos\"\".get.parameters[] | $ref\")",
"Bash(grep '$ref')",
"Bash(grep -c \"^ ''/\" /c/Users/admin/Desktop/jailbirdz-dl/docs/external/peertube/openapi.yaml)",
"Bash(grep -c ' \"\"/' /c/Users/admin/Desktop/jailbirdz-dl/docs/external/peertube/openapi.json)",
"Bash(grep -c \"^ ''/\" /c/Users/admin/Desktop/jailbirdz-dl/docs/external/peertube/openapi.yaml)",
"Bash(git --no-pager diff HEAD~1 docs/external/peertube/openapi.json)",
"Bash(git --no-pager show HEAD~1:docs/external/peertube/openapi.json)",
"Bash(git --no-pager diff --name-only HEAD~1)"
]
}
}
+14 -5
View File
@@ -1,8 +1,17 @@
# Copy your wordpress_logged_in_... cookie from browser DevTools → Storage → Cookies.
# Paste the full name=value pair below.
# wordpress_sec_... is the wp-admin cookie — irrelevant for read-only viewers.
# __cf_bm is a Cloudflare bot-management cookie — also not needed.
WP_LOGIN_COOKIE=wordpress_logged_in_<hash>=<value>
# jailbirdz.com credentials
# These are used to obtain a fresh login cookie via the WooCommerce AJAX endpoint.
JAILBIRDZ_USERNAME=your-email-or-username
JAILBIRDZ_PASSWORD=your-password
# Alternatively, set the cookie manually (expires in ~2 weeks).
# Get it from browser DevTools → Storage → Cookies while on jailbirdz.com.
# Copy the full name=value of the wordpress_logged_in_* cookie.
JAILBIRDZ_LOGIN_COOKIE=wordpress_logged_in_<hash>=<value>
# pinkcuffs.com credentials (separate membership — different from jailbirdz)
PINKCUFFS_USERNAME=your-email-or-username
PINKCUFFS_PASSWORD=your-password
PINKCUFFS_LOGIN_COOKIE=wordpress_logged_in_<hash>=<value>
# PeerTube upload target
PEERTUBE_URL=https://your-peertube-instance.example
@@ -1,14 +0,0 @@
---
alwaysApply: true
always_on: true
trigger: always_on
applyTo: "**"
description: Snyk Security At Inception
---
# Project security best practices
- Always run snyk_code_scan tool for new first party code that is generated in a Snyk-supported language.
- If any security issues are found based on newly introduced or modified code or dependencies, attempt to fix the issues using the results context from Snyk.
- Rescan the code after fixing the issues to ensure that the issues were fixed and that there are no newly introduced issues.
- Repeat this process until no new issues are found.
+49
View File
@@ -0,0 +1,49 @@
name: Nightly Index
on:
schedule:
- cron: '0 3 * * *' # 03:00 UTC daily
workflow_dispatch: # manual trigger via GitHub UI
permissions:
contents: write # needed to push video_map.json back
concurrency:
group: nightly-index
cancel-in-progress: false # let an in-progress scrape finish; queue the next run
jobs:
index:
runs-on: ubuntu-latest
timeout-minutes: 300 # 5 h ceiling; scraper resumes where it left off on next run
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: '3.12'
cache: pip
- name: Install dependencies
run: pip install -r requirements.txt
- name: Install Playwright Firefox
run: playwright install firefox --with-deps
- name: Run scraper
run: python main.py
env:
JAILBIRDZ_USERNAME: ${{ secrets.JAILBIRDZ_USERNAME }}
JAILBIRDZ_PASSWORD: ${{ secrets.JAILBIRDZ_PASSWORD }}
PINKCUFFS_USERNAME: ${{ secrets.PINKCUFFS_USERNAME }}
PINKCUFFS_PASSWORD: ${{ secrets.PINKCUFFS_PASSWORD }}
- name: Commit updated video_map.json
if: always() # save progress even if main.py crashed or timed out
run: |
git config user.name "github-actions[bot]"
git config user.email "github-actions[bot]@users.noreply.github.com"
git add video_map.json
git diff --staged --quiet || git commit -m "chore: nightly index update [skip ci]"
git push
+9
View File
@@ -1,5 +1,14 @@
# Temporary cache
__pycache__/
.ruff_cache/
# Local IDE config
.vscode
# Project output & artifacts
downloads/
*.mp4
*.mp4.part
# Secrets & sensitive info
.env
-4
View File
@@ -1,4 +0,0 @@
{
"snyk.advanced.organization": "512ef4a1-6034-4537-a391-9692d282122a",
"snyk.advanced.autoSelectOrganization": true
}
+42 -15
View File
@@ -1,6 +1,6 @@
# 𝒥𝒶𝒾𝓁𝒷𝒾𝓇𝒹𝓏-𝒹𝓁
Jailbirdz.com is an Arizona-based subscription video site publishing arrest and jail roleplay scenarios featuring women. This tool scrapes the member area, downloads the videos, and re-hosts them on a self-owned PeerTube instance.
Jailbirdz.com and Pinkcuffs.com are Arizona-based subscription video sites publishing arrest and jail roleplay scenarios featuring women. This tool scrapes the member area of one or both sites, downloads the videos, and re-hosts them on a self-owned PeerTube instance.
> [!NOTE]
> This tool does not bypass authentication, modify the site, or intercept anything it isn't entitled to. A valid, paid membership is required. The scraper authenticates using your own session cookie and accesses only content your account can already view in a browser.
@@ -19,23 +19,22 @@ Jailbirdz.com is an Arizona-based subscription video site publishing arrest and
cp .env.example .env
```
### WP_LOGIN_COOKIE
### Credentials
You need to be logged into jailbirdz.com in a browser. Then either:
Set credentials for whichever sites you have a membership on. You don't need both.
**Option A — auto (recommended):** let `grab_cookie.py` read it from your browser and write it to `.env` automatically:
**Option A — credentials (recommended):** set `JAILBIRDZ_USERNAME` + `JAILBIRDZ_PASSWORD` (and/or the `PINKCUFFS_*` equivalents) in `.env`. `main.py` logs in automatically on startup.
```bash
python grab_cookie.py # tries Firefox, Chrome, Edge, Brave in order
python grab_cookie.py -b firefox # or target a specific browser
```
**Option B — manual cookie:** set `JAILBIRDZ_LOGIN_COOKIE` (and/or `PINKCUFFS_LOGIN_COOKIE`) yourself. Get the value from browser DevTools → Storage → Cookies — copy the full `name=value` of the `wordpress_logged_in_*` cookie.
> **Note:** Chrome and Edge on Windows 130+ require the script to run as Administrator due to App-bound Encryption. Firefox works without elevated privileges.
Sites with no credentials are skipped automatically when running `python main.py`.
**Option B — manual:** open `.env` and set `WP_LOGIN_COOKIE` yourself. Get the value from browser DevTools → Storage → Cookies while on jailbirdz.com — copy the full `name=value` of the `wordpress_logged_in_*` cookie.
### Other `.env` values
### `.env` values
- `JAILBIRDZ_USERNAME` / `JAILBIRDZ_PASSWORD` — jailbirdz.com login.
- `JAILBIRDZ_LOGIN_COOKIE` — jailbirdz.com session cookie (fallback).
- `PINKCUFFS_USERNAME` / `PINKCUFFS_PASSWORD` — pinkcuffs.com login.
- `PINKCUFFS_LOGIN_COOKIE` — pinkcuffs.com session cookie (fallback).
- `PEERTUBE_URL` — base URL of your PeerTube instance.
- `PEERTUBE_USER` — PeerTube username.
- `PEERTUBE_CHANNEL` — channel to upload to.
@@ -48,7 +47,9 @@ python grab_cookie.py -b firefox # or target a specific browser
Discovers all post URLs via the WordPress REST API, then visits each page with a headless Firefox browser to intercept video network requests (MP4, MOV, WebM, AVI, M4V).
```bash
python main.py
python main.py # scrape all sites you have credentials for
python main.py --site jailbirdz # scrape one site only
python main.py --site pinkcuffs --site jailbirdz # explicit multi-site
```
Results are written to `video_map.json`. Safe to re-run — already-scraped posts are skipped.
@@ -65,6 +66,7 @@ Options:
--reorganize Rename existing files to match current naming mode
-w, --workers N Concurrent downloads (default: 4)
-n, --dry-run Print what would be downloaded
--site SITE Limit to one site (jailbirdz or pinkcuffs); repeatable
```
Resumes partial downloads. The chosen naming mode is saved to `.naming_mode` inside the output directory and persists across runs. Filenames that would clash are placed into subfolders.
@@ -89,6 +91,30 @@ Options:
Uploads in resumable 10 MB chunks. After each batch, waits for transcoding and object storage to complete before uploading the next batch — this prevents disk exhaustion on the PeerTube server. Videos already present on the channel (matched by name) are skipped. Progress is tracked in `.uploaded` inside the input directory.
## CI / Nightly Indexing
`.github/workflows/nightly-index.yml` runs `main.py` at 03:00 UTC daily and commits any new `video_map.json` entries back to the repo.
**One-time setup — add repo secrets for each site you have a membership on:**
```bash
# jailbirdz (if you have a membership)
gh secret set JAILBIRDZ_USERNAME
gh secret set JAILBIRDZ_PASSWORD
# pinkcuffs (if you have a membership)
gh secret set PINKCUFFS_USERNAME
gh secret set PINKCUFFS_PASSWORD
```
**Seed CI with your current progress before the first run:**
```bash
git add video_map.json && git commit -m "chore: seed video_map"
```
**Trigger manually:** Actions → Nightly Index → Run workflow.
## Utilities
### Check for filename clashes
@@ -102,10 +128,11 @@ Lists filenames that map to more than one source URL, with sizes.
### Estimate total download size
```bash
python total_size.py
python total_size.py # read cached sizes and print summary
python total_size.py --write # probe uncached/stale URLs and refresh the cache
```
Fetches `Content-Length` for every video URL in `video_map.json` and prints a size summary. Does not download anything.
Reads cached file sizes from `video_map.json` and prints a summary (total, smallest, largest, average). The default mode never hits the network. Use `--write` to probe any missing or stale entries and persist the results.
## Data files
+160 -35
View File
@@ -5,42 +5,141 @@ Importable functions:
find_clashes(urls) - {filename: [urls]} for filenames with >1 source
build_download_paths(urls, output_dir) - {url: local_path} with clash resolution
fmt_size(bytes) - human-readable size string
get_remote_size(session, url) - file size via HEAD without downloading
fetch_sizes(urls, workers, on_progress) - bulk size lookup
get_remote_size(session, url, referer) - file size via HEAD without downloading
fetch_sizes(urls, workers, on_progress, url_referers, session) - bulk size lookup
make_session() - requests.Session with required headers
load_video_map() - load video_map.json, returns {} on missing/corrupt
load_video_map(site, path) - load video_map.json; auto-migrates old flat format
save_video_map(video_map, site_key, path) - atomic write of one site's entries
build_url_referers(video_map) - {cdn_url: referer} derived from page URL keys
is_valid_url(url) - True if url is a plain http(s) URL with no HTML artefacts
expects_video(url) - True if url is a members-only video page
"""
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path, PurePosixPath
from typing import Any, cast
from collections.abc import Callable
from urllib.parse import urlparse, unquote
import json
import os
import tempfile
import requests
from config import BASE_URL
REFERER = f"{BASE_URL}/"
VIDEO_MAP_FILE = "video_map.json"
VIDEO_EXTS = {".mp4", ".mov", ".m4v", ".webm", ".avi"}
VIDEO_MAP_FILE: str = "video_map.json"
VIDEO_EXTS: set[str] = {".mp4", ".mov", ".m4v", ".webm", ".avi"}
def load_video_map():
if Path(VIDEO_MAP_FILE).exists():
def is_valid_url(url: str) -> bool:
"""True if url is a plain http(s) URL with no HTML artefacts (<, >, href= etc.)."""
return (
url.startswith("http")
and "<" not in url
and ">" not in url
and " href=" not in url
)
def expects_video(url: str) -> bool:
"""True if url is a members-only video page that should contain a video."""
return "/pinkcuffs-videos/" in url
def _write_video_map_atomic(data: dict[str, Any], path: Path) -> None:
"""Write the full nested video_map dict to disk atomically via a temp file."""
fd, tmp = tempfile.mkstemp(dir=path.resolve().parent, suffix=".tmp")
try:
with open(VIDEO_MAP_FILE, encoding="utf-8") as f:
return json.load(f)
with os.fdopen(fd, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
Path(tmp).replace(path)
except Exception:
try:
Path(tmp).unlink()
except OSError:
pass
raise
def load_video_map(
site: str | None = None,
path: str | Path = VIDEO_MAP_FILE,
) -> dict[str, Any]:
"""Load video_map.json.
Args:
site: If given, return only that site's inner dict {url: entry}.
If None, return a flat-merged dict across all sites.
path: Path to the JSON file (injectable for tests).
"""
p = Path(path)
if not p.exists():
return {}
try:
with open(p, encoding="utf-8") as f:
raw: Any = json.load(f)
data = cast(dict[str, Any], raw)
except (json.JSONDecodeError, OSError):
return {}
return {}
if site is not None:
return cast(dict[str, Any], data.get(site, {}))
# Merge all sites into a flat dict for backward-compat callers
merged: dict[str, Any] = {}
for site_entries in data.values():
if isinstance(site_entries, dict):
merged.update(cast(dict[str, Any], site_entries))
return merged
def make_session():
s = requests.Session()
s.headers.update({"Referer": REFERER})
return s
def save_video_map(
video_map: dict[str, Any],
site_key: str,
path: str | Path = VIDEO_MAP_FILE,
) -> None:
"""Atomically update one site's entries in the nested video_map.json.
Args:
video_map: The inner {url: entry} dict for site_key.
site_key: Which top-level key to update (e.g. "jailbirdz").
path: Path to the JSON file (injectable for tests).
"""
p = Path(path)
if p.exists():
try:
with open(p, encoding="utf-8") as f:
raw: Any = json.load(f)
full = cast(dict[str, Any], raw)
except (json.JSONDecodeError, OSError):
full = {}
else:
full = {}
full[site_key] = video_map
_write_video_map_atomic(full, p)
def fmt_size(b):
def build_url_referers(video_map: dict[str, Any]) -> dict[str, str]:
"""Pure function: return {cdn_video_url: site_referer} from a flat video map.
The flat video map has page URLs as keys; the scheme+netloc of each page URL
is used as the Referer for all CDN video URLs found in that entry.
"""
result: dict[str, str] = {}
for page_url, entry in video_map.items():
parsed = urlparse(page_url)
referer = f"{parsed.scheme}://{parsed.netloc}/"
for vid in cast(dict[str, Any], entry).get("videos", []):
result.setdefault(vid["url"], referer)
return result
def make_session() -> requests.Session:
return requests.Session()
def fmt_size(b: float | int) -> str:
for unit in ("B", "KB", "MB", "GB"):
if b < 1024:
return f"{b:.1f} {unit}"
@@ -48,30 +147,34 @@ def fmt_size(b):
return f"{b:.1f} TB"
def url_to_filename(url):
def url_to_filename(url: str) -> str:
return unquote(PurePosixPath(urlparse(url).path).name)
def find_clashes(urls):
def find_clashes(urls: list[str]) -> dict[str, list[str]]:
# Case-insensitive grouping so that e.g. "DaisyArrest.mp4" and
# "daisyarrest.mp4" are treated as a clash. This is required for
# correctness on case-insensitive filesystems (NTFS, exFAT, macOS HFS+)
# and harmless on case-sensitive ones (ext4) — the actual filenames on
# disk keep their original casing; only the clash *detection* is folded.
by_lower = defaultdict(list)
by_lower: defaultdict[str, list[str]] = defaultdict(list)
for url in urls:
by_lower[url_to_filename(url).lower()].append(url)
return {url_to_filename(srcs[0]): srcs
for srcs in by_lower.values() if len(srcs) > 1}
return {
url_to_filename(srcs[0]): srcs for srcs in by_lower.values() if len(srcs) > 1
}
def _clash_subfolder(url):
def _clash_subfolder(url: str) -> str:
"""Parent path segment used as disambiguator for clashing filenames."""
parts = urlparse(url).path.rstrip("/").split("/")
return unquote(parts[-2]) if len(parts) >= 2 else "unknown"
def build_download_paths(urls, output_dir):
def build_download_paths(
urls: list[str],
output_dir: str | Path,
) -> dict[str, Path]:
"""Map each URL to a local file path. Flat layout; clashing names get a subfolder."""
clashes = find_clashes(urls)
clash_lower = {name.lower() for name in clashes}
@@ -86,16 +189,25 @@ def build_download_paths(urls, output_dir):
return paths
def get_remote_size(session, url):
def get_remote_size(
session: requests.Session,
url: str,
referer: str = "",
) -> int | None:
extra = {"Referer": referer} if referer else {}
try:
r = session.head(url, allow_redirects=True, timeout=15)
r = session.head(url, headers=extra, allow_redirects=True, timeout=15)
if r.status_code < 400 and "Content-Length" in r.headers:
return int(r.headers["Content-Length"])
except Exception:
pass
try:
r = session.get(
url, headers={"Range": "bytes=0-0"}, stream=True, timeout=15)
url,
headers={"Range": "bytes=0-0", **extra},
stream=True,
timeout=15,
)
r.close()
cr = r.headers.get("Content-Range", "")
if "/" in cr:
@@ -105,19 +217,30 @@ def get_remote_size(session, url):
return None
def fetch_sizes(urls, workers=20, on_progress=None):
def fetch_sizes(
urls: list[str],
workers: int = 20,
on_progress: Callable[[int, int], None] | None = None,
url_referers: dict[str, str] | None = None,
session: requests.Session | None = None,
) -> dict[str, int | None]:
"""Return {url: size_or_None}. on_progress(done, total) called after each URL."""
if session is None:
session = make_session()
sizes = {}
referers = url_referers or {}
sizes: dict[str, int | None] = {}
total = len(urls)
with ThreadPoolExecutor(max_workers=workers) as pool:
futures = {pool.submit(get_remote_size, session, u): u for u in urls}
futures = {
pool.submit(get_remote_size, session, u, referers.get(u, "")): u
for u in urls
}
done = 0
for fut in as_completed(futures):
sizes[futures[fut]] = fut.result()
done += 1
if on_progress:
if on_progress is not None:
on_progress(done, total)
return sizes
@@ -125,14 +248,15 @@ def fetch_sizes(urls, workers=20, on_progress=None):
# --------------- CLI ---------------
def main():
def main() -> None:
vm = load_video_map()
urls = [u for entry in vm.values() for u in entry.get("videos", []) if u.startswith("http")]
urls = [vid["url"] for entry in vm.values() for vid in entry.get("videos", [])]
clashes = find_clashes(urls)
print(f"Total URLs: {len(urls)}")
by_name = defaultdict(list)
by_name: defaultdict[str, list[str]] = defaultdict(list)
for url in urls:
by_name[url_to_filename(url)].append(url)
print(f"Unique filenames: {len(by_name)}")
@@ -142,8 +266,9 @@ def main():
return
clash_urls = [u for srcs in clashes.values() for u in srcs]
url_referers = build_url_referers(vm)
print(f"\n[+] Fetching file sizes for {len(clash_urls)} clashing URLs…")
sizes = fetch_sizes(clash_urls)
sizes = fetch_sizes(clash_urls, url_referers=url_referers)
print(f"\n{len(clashes)} filename clash(es):\n")
for name, srcs in sorted(clashes.items()):
+18 -2
View File
@@ -1,2 +1,18 @@
BASE_URL = "https://www.jailbirdz.com"
COOKIE_DOMAIN = "jailbirdz.com" # rookiepy domain filter (no www)
# config.py
from typing import Final
# How long a cached file size stays valid. 0 = always re-probe; large = effectively forever.
SIZE_CACHE_TTL: Final[int] = 9_999_999 # seconds (~115 days)
SITES: Final[dict[str, dict[str, str]]] = {
"jailbirdz": {
"base_url": "https://www.jailbirdz.com",
"cookie_domain": "jailbirdz.com",
"env_prefix": "JAILBIRDZ",
},
"pinkcuffs": {
"base_url": "https://www.pinkcuffs.com",
"cookie_domain": "pinkcuffs.com",
"env_prefix": "PINKCUFFS",
},
}
File diff suppressed because one or more lines are too long
+226 -84
View File
@@ -11,12 +11,14 @@ Usage:
"""
import argparse
import json
from pathlib import Path
import re
import shutil
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Any
import requests
import time
from check_clashes import (
make_session,
@@ -24,33 +26,39 @@ from check_clashes import (
url_to_filename,
find_clashes,
build_download_paths,
build_url_referers,
fetch_sizes,
load_video_map,
save_video_map,
is_valid_url,
VIDEO_MAP_FILE,
)
from config import SITES
VIDEO_MAP_FILE = "video_map.json"
CHUNK_SIZE = 8 * 1024 * 1024
DEFAULT_OUTPUT = "downloads"
DEFAULT_WORKERS = 4
MODE_FILE = ".naming_mode"
MODE_ORIGINAL = "original"
MODE_TITLE = "title"
DEFAULT_OUTPUT: str = "downloads"
DEFAULT_WORKERS: int = 4
MODE_FILE: str = ".naming_mode"
MODE_ORIGINAL: str = "original"
MODE_TITLE: str = "title"
# ── Naming mode persistence ──────────────────────────────────────────
def read_mode(output_dir):
def read_mode(output_dir: str | Path) -> str | None:
p = Path(output_dir) / MODE_FILE
if p.exists():
return p.read_text().strip()
return None
def write_mode(output_dir, mode):
def write_mode(output_dir: str | Path, mode: str) -> None:
Path(output_dir).mkdir(parents=True, exist_ok=True)
(Path(output_dir) / MODE_FILE).write_text(mode)
def resolve_mode(args):
def resolve_mode(args: argparse.Namespace) -> str:
"""Determine naming mode from CLI flags + saved marker. Returns mode string."""
saved = read_mode(args.output)
@@ -69,13 +77,18 @@ def resolve_mode(args):
# ── Filename helpers ─────────────────────────────────────────────────
def sanitize_filename(title, max_len=180):
name = re.sub(r'[<>:"/\\|?*]', '', title)
name = re.sub(r'\s+', ' ', name).strip().rstrip('.')
def sanitize_filename(title: str, max_len: int = 180) -> str:
name = re.sub(r'[<>:"/\\|?*]', "", title)
name = re.sub(r"\s+", " ", name).strip().rstrip(".")
return name[:max_len].rstrip() if len(name) > max_len else name
def build_title_paths(urls, url_to_title, output_dir):
def build_title_paths(
urls: list[str],
url_to_title: dict[str, str],
output_dir: str | Path,
) -> dict[str, Path]:
name_to_urls = defaultdict(list)
url_to_base = {}
@@ -91,14 +104,33 @@ def build_title_paths(urls, url_to_title, output_dir):
base, ext = url_to_base[url]
full = base + ext
if len(name_to_urls[full]) > 1:
slug = url_to_filename(url).rsplit('.', 1)[0]
slug = url_to_filename(url).rsplit(".", 1)[0]
paths[url] = Path(output_dir) / f"{base} [{slug}]{ext}"
else:
paths[url] = Path(output_dir) / full
return paths
def get_paths_for_mode(mode, urls, video_map, output_dir):
def get_paths_for_mode(
mode: str,
urls: list[str],
video_map: dict[str, Any],
output_dir: str | Path,
url_to_site: dict[str, str] | None = None,
) -> dict[str, Path]:
if url_to_site:
by_site: dict[str, list[str]] = defaultdict(list)
for u in urls:
by_site[url_to_site.get(u, "")].append(u)
paths: dict[str, Path] = {}
url_title = build_url_title_map(video_map) if mode == MODE_TITLE else {}
for site, site_urls in by_site.items():
base = Path(output_dir) / site if site else Path(output_dir)
if mode == MODE_TITLE:
paths.update(build_title_paths(site_urls, url_title, base))
else:
paths.update(build_download_paths(site_urls, base))
return paths
if mode == MODE_TITLE:
url_title = build_url_title_map(video_map)
return build_title_paths(urls, url_title, output_dir)
@@ -107,11 +139,21 @@ def get_paths_for_mode(mode, urls, video_map, output_dir):
# ── Reorganize ───────────────────────────────────────────────────────
def reorganize(urls, video_map, output_dir, target_mode, dry_run=False):
def reorganize(
urls: list[str],
video_map: dict[str, Any],
output_dir: str | Path,
target_mode: str,
dry_run: bool = False,
url_to_site: dict[str, str] | None = None,
) -> None:
"""Rename existing files from one naming scheme to another."""
other_mode = MODE_TITLE if target_mode == MODE_ORIGINAL else MODE_ORIGINAL
old_paths = get_paths_for_mode(other_mode, urls, video_map, output_dir)
new_paths = get_paths_for_mode(target_mode, urls, video_map, output_dir)
old_paths = get_paths_for_mode(other_mode, urls, video_map, output_dir, url_to_site)
new_paths = get_paths_for_mode(
target_mode, urls, video_map, output_dir, url_to_site
)
moves = []
for url in urls:
@@ -163,21 +205,30 @@ def reorganize(urls, video_map, output_dir, target_mode, dry_run=False):
# ── Download ─────────────────────────────────────────────────────────
def download_one(session, url, dest, expected_size):
def download_one(
session: requests.Session,
url: str,
dest: str | Path,
expected_size: int | None,
referer: str = "",
) -> tuple[str, int]:
dest = Path(dest)
part = dest.parent / (dest.name + ".part")
dest.parent.mkdir(parents=True, exist_ok=True)
if dest.exists():
local = dest.stat().st_size
if expected_size and local == expected_size:
if expected_size is not None and local == expected_size:
return "ok", 0
if expected_size and local != expected_size:
if expected_size is not None and local != expected_size:
dest.unlink()
existing = part.stat().st_size if part.exists() else 0
headers = {}
if existing and expected_size and existing < expected_size:
headers: dict[str, str] = {}
if referer:
headers["Referer"] = referer
if existing and expected_size is not None and existing < expected_size:
headers["Range"] = f"bytes={existing}-"
try:
@@ -205,34 +256,23 @@ def download_one(session, url, dest, expected_size):
return f"error: {e}", written
final_size = existing + written
if expected_size and final_size != expected_size:
if expected_size is not None and final_size != expected_size:
return "size_mismatch", written
part.rename(dest)
return "ok", written
# ── Data loading ─────────────────────────────────────────────────────
def load_video_map():
with open(VIDEO_MAP_FILE, encoding="utf-8") as f:
return json.load(f)
def _is_valid_url(url):
return url.startswith(
"http") and "<" not in url and ">" not in url and " href=" not in url
def collect_urls(video_map):
def collect_urls(video_map: dict[str, Any]) -> list[str]:
urls, seen, skipped = [], set(), 0
for entry in video_map.values():
for video_url in entry.get("videos", []):
if video_url in seen:
for vid in entry.get("videos", []):
u = vid["url"]
if u in seen:
continue
seen.add(video_url)
if _is_valid_url(video_url):
urls.append(video_url)
seen.add(u)
if is_valid_url(u):
urls.append(u)
else:
skipped += 1
if skipped:
@@ -240,40 +280,105 @@ def collect_urls(video_map):
return urls
def build_url_title_map(video_map):
def build_url_title_map(video_map: dict[str, Any]) -> dict[str, str]:
url_title = {}
for entry in video_map.values():
title = entry.get("title", "")
for video_url in entry.get("videos", []):
if video_url not in url_title:
url_title[video_url] = title
for vid in entry.get("videos", []):
if vid["url"] not in url_title:
url_title[vid["url"]] = title
return url_title
def _persist_fetched_sizes(newly_fetched: dict[str, int | None]) -> None:
"""Write newly probed sizes back to video_map.json (successful probes only)."""
now = int(time.time())
for site_key in SITES:
vm_site = load_video_map(site_key)
changed = False
for entry in vm_site.values():
for vid in entry.get("videos", []):
if vid["url"] in newly_fetched and vid.get("size") is None and newly_fetched[vid["url"]] is not None:
vid["size"] = newly_fetched[vid["url"]]
vid["size_checked_at"] = now
changed = True
if changed:
save_video_map(vm_site, site_key)
n_saved = sum(1 for s in newly_fetched.values() if s is not None)
if n_saved:
print(f"[+] Cached {n_saved} newly probed size(s).")
def build_url_to_site() -> dict[str, str]:
"""Return {cdn_video_url: site_key} by loading each site's map in turn."""
result: dict[str, str] = {}
for site_key in SITES:
for entry in load_video_map(site_key).values():
for vid in entry.get("videos", []):
result[vid["url"]] = site_key
return result
# ── Main ─────────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(
description="Download videos from video_map.json")
parser.add_argument("--output", "-o", default=DEFAULT_OUTPUT,
help=f"Download directory (default: {DEFAULT_OUTPUT})")
def main() -> None:
parser = argparse.ArgumentParser(description="Download videos from video_map.json")
parser.add_argument(
"--output",
"-o",
default=DEFAULT_OUTPUT,
help=f"Download directory (default: {DEFAULT_OUTPUT})",
)
naming = parser.add_mutually_exclusive_group()
naming.add_argument("--titles", "-t", action="store_true",
help="Use title-based filenames (saved as default for this directory)")
naming.add_argument("--original", action="store_true",
help="Use original CloudFront filenames (saved as default for this directory)")
naming.add_argument(
"--titles",
"-t",
action="store_true",
help="Use title-based filenames (saved as default for this directory)",
)
naming.add_argument(
"--original",
action="store_true",
help="Use original CloudFront filenames (saved as default for this directory)",
)
parser.add_argument("--reorganize", action="store_true",
help="Rename existing files to match the current naming mode")
parser.add_argument("--dry-run", "-n", action="store_true",
help="Preview without making changes")
parser.add_argument("--workers", "-w", type=int, default=DEFAULT_WORKERS,
help=f"Concurrent downloads (default: {DEFAULT_WORKERS})")
parser.add_argument(
"--reorganize",
action="store_true",
help="Rename existing files to match the current naming mode",
)
parser.add_argument(
"--dry-run", "-n", action="store_true", help="Preview without making changes"
)
parser.add_argument(
"--workers",
"-w",
type=int,
default=DEFAULT_WORKERS,
help=f"Concurrent downloads (default: {DEFAULT_WORKERS})",
)
parser.add_argument(
"--site",
action="append",
choices=list(SITES.keys()),
dest="sites",
metavar="SITE",
help=f"Site(s) to download (default: all). Can be repeated. Choices: {', '.join(SITES)}",
)
args = parser.parse_args()
video_map = load_video_map()
url_referers = build_url_referers(video_map)
urls = collect_urls(video_map)
url_to_site = build_url_to_site()
if args.sites:
selected = set(args.sites)
urls = [u for u in urls if url_to_site.get(u) in selected]
mode = resolve_mode(args)
saved = read_mode(args.output)
@@ -287,10 +392,18 @@ def main():
if mode_changed and not args.reorganize:
print(f"\n[!] Mode changed from '{saved}' to '{mode}'.")
print(
" Use --reorganize to rename existing files, or --dry-run to preview.")
" Use --reorganize to rename existing files, or --dry-run to preview."
)
print(" Refusing to download until existing files are reorganized.")
return
reorganize(urls, video_map, args.output, mode, dry_run=args.dry_run)
reorganize(
urls,
video_map,
args.output,
mode,
dry_run=args.dry_run,
url_to_site=url_to_site,
)
if args.dry_run or args.reorganize:
return
@@ -298,12 +411,13 @@ def main():
if not args.dry_run:
write_mode(args.output, mode)
paths = get_paths_for_mode(mode, urls, video_map, args.output)
paths = get_paths_for_mode(mode, urls, video_map, args.output, url_to_site)
clashes = find_clashes(urls)
if clashes:
print(
f"[+] {len(clashes)} filename clash(es) resolved with subfolders/suffixes")
f"[+] {len(clashes)} filename clash(es) resolved with subfolders/suffixes"
)
already = [u for u in urls if paths[u].exists()]
pending = [u for u in urls if not paths[u].exists()]
@@ -316,26 +430,50 @@ def main():
return
if args.dry_run:
print(
f"\n[dry-run] Would download {len(pending)} files to {args.output}/")
print(f"\n[dry-run] Would download {len(pending)} files to {args.output}/")
for url in pending[:20]:
print(f"{paths[url].name}")
if len(pending) > 20:
print(f" … and {len(pending) - 20} more")
return
print("\n[+] Fetching remote file sizes…")
cached_sizes: dict[str, int] = {
vid["url"]: vid["size"]
for entry in video_map.values()
for vid in entry.get("videos", [])
if vid.get("size") is not None
}
newly_fetched: dict[str, int | None] = {}
uncached_pending = [u for u in pending if u not in cached_sizes]
session = make_session()
remote_sizes = fetch_sizes(pending, workers=20)
if uncached_pending:
print(
f"\n[+] Fetching remote file sizes ({len(uncached_pending)} uncached, {len(pending) - len(uncached_pending)} cached)…"
)
fetched_pending = fetch_sizes(uncached_pending, workers=20, url_referers=url_referers)
newly_fetched.update(fetched_pending)
remote_sizes: dict[str, int | None] = {**cached_sizes, **fetched_pending}
else:
print(f"\n[+] All {len(pending)} pending sizes cached — skipping probe.")
remote_sizes = dict(cached_sizes)
sized = {u: s for u, s in remote_sizes.items() if s is not None}
total_bytes = sum(sized.values())
print(
f"[+] Download size: {fmt_size(total_bytes)} across {len(pending)} files")
print(f"[+] Download size: {fmt_size(total_bytes)} across {len(pending)} files")
if already:
print(f"[+] Verifying {len(already)} existing files…")
already_sizes = fetch_sizes(already, workers=20)
uncached_already = [u for u in already if u not in cached_sizes]
if uncached_already:
print(
f"[+] Verifying {len(already)} existing files ({len(uncached_already)} uncached)…"
)
fetched_already = fetch_sizes(uncached_already, workers=20, url_referers=url_referers)
newly_fetched.update(fetched_already)
already_sizes: dict[str, int | None] = {**cached_sizes, **fetched_already}
else:
print(f"[+] Verifying {len(already)} existing files (all sizes cached)…")
already_sizes = dict(cached_sizes)
mismatched = 0
for url in already:
@@ -344,14 +482,18 @@ def main():
remote = already_sizes.get(url)
if remote and local != remote:
mismatched += 1
print(f"[!] Size mismatch: {dest.name} "
f"(local {fmt_size(local)} vs remote {fmt_size(remote)})")
print(
f"[!] Size mismatch: {dest.name} "
f"(local {fmt_size(local)} vs remote {fmt_size(remote)})"
)
pending.append(url)
remote_sizes[url] = remote
if mismatched:
print(
f"[!] {mismatched} file(s) will be re-downloaded due to size mismatch")
print(f"[!] {mismatched} file(s) will be re-downloaded due to size mismatch")
if newly_fetched:
_persist_fetched_sizes(newly_fetched)
print(f"\n[⚡] Downloading with {args.workers} threads…\n")
@@ -361,10 +503,12 @@ def main():
total = len(pending)
interrupted = False
def do_download(url):
def do_download(url: str) -> tuple[str, tuple[str, int]]:
dest = paths[url]
expected = remote_sizes.get(url)
return url, download_one(session, url, dest, expected)
return url, download_one(
session, url, dest, expected, url_referers.get(url, "")
)
try:
with ThreadPoolExecutor(max_workers=args.workers) as pool:
@@ -376,11 +520,9 @@ def main():
name = paths[url].name
if status == "ok" and written > 0:
print(
f" [{completed}/{total}] ✓ {name} ({fmt_size(written)})")
print(f" [{completed}/{total}] ✓ {name} ({fmt_size(written)})")
elif status == "ok":
print(
f" [{completed}/{total}] ✓ {name} (already complete)")
print(f" [{completed}/{total}] ✓ {name} (already complete)")
elif status == "size_mismatch":
print(f" [{completed}/{total}] ⚠ {name} (size mismatch)")
failed.append(url)
+96 -74
View File
@@ -1,113 +1,135 @@
#!/usr/bin/env python3
"""
grab_cookie.py — read the WordPress login cookie from an
installed browser and write it to .env as WP_LOGIN_COOKIE=name=value.
grab_cookie.py — log in to a site and write the session cookie to .env.
Requires {SITE}_USERNAME and {SITE}_PASSWORD to be set in the environment or .env.
Usage:
python grab_cookie.py # tries Firefox, Chrome, Edge, Brave
python grab_cookie.py --browser firefox # explicit browser
python grab_cookie.py --site jailbirdz
python grab_cookie.py --site pinkcuffs
"""
import argparse
import os
from pathlib import Path
from config import COOKIE_DOMAIN
from typing import Literal
import requests
from dotenv import load_dotenv
from config import SITES
ENV_FILE = Path(".env")
ENV_KEY = "WP_LOGIN_COOKIE"
COOKIE_PREFIX = "wordpress_logged_in_"
BROWSER_NAMES = ["firefox", "chrome", "edge", "brave"]
load_dotenv(dotenv_path=ENV_FILE)
def find_cookie(browser_name):
"""Return (name, value) for the wordpress_logged_in_* cookie, or (None, None)."""
try:
import rookiepy
except ImportError:
raise ImportError("rookiepy not installed — run: pip install rookiepy")
def update_env(
name: str,
value: str,
env_key: str = "WP_LOGIN_COOKIE",
path: Path = ENV_FILE,
) -> Literal["updated", "appended", "created"]:
"""Write env_key=name=value into the env file, replacing any existing line."""
new_line = f"{env_key}={name}={value}\n"
fn = getattr(rookiepy, browser_name, None)
if fn is None:
raise ValueError(f"rookiepy does not support '{browser_name}'.")
try:
cookies = fn([COOKIE_DOMAIN])
except PermissionError:
raise PermissionError(
f"Permission denied reading {browser_name} cookies.\n"
" Close the browser, or on Windows run as Administrator for Chrome/Edge."
)
except Exception as e:
raise RuntimeError(f"Could not read {browser_name} cookies: {e}")
for c in cookies:
if c.get("name", "").startswith(COOKIE_PREFIX):
return c["name"], c["value"]
return None, None
def update_env(name, value):
"""Write WP_LOGIN_COOKIE=name=value into .env, replacing any existing line."""
new_line = f"{ENV_KEY}={name}={value}\n"
if ENV_FILE.exists():
text = ENV_FILE.read_text(encoding="utf-8")
if path.exists():
text = path.read_text(encoding="utf-8")
lines = text.splitlines(keepends=True)
for i, line in enumerate(lines):
if line.startswith(f"{ENV_KEY}=") or line.strip() == ENV_KEY:
key, sep, _ = line.partition("=")
if key.strip() == env_key and sep:
lines[i] = new_line
ENV_FILE.write_text("".join(lines), encoding="utf-8")
path.write_text("".join(lines), encoding="utf-8")
return "updated"
# Key not present — append
if text and not text.endswith("\n"):
text += "\n"
ENV_FILE.write_text(text + new_line, encoding="utf-8")
path.write_text(text + new_line, encoding="utf-8")
return "appended"
else:
ENV_FILE.write_text(new_line, encoding="utf-8")
path.write_text(new_line, encoding="utf-8")
return "created"
def main():
def login_and_get_cookie(
username: str, password: str, base_url: str
) -> tuple[str, str]:
"""POST to wp-admin/admin-ajax.php (xootix action) and return (cookie_name, cookie_value).
No browser needed — the xootix login endpoint takes plain form fields and returns
the wordpress_logged_in_* cookie directly in the response Set-Cookie headers.
"""
session = requests.Session()
r = session.post(
f"{base_url}/wp-admin/admin-ajax.php",
data={
"xoo-el-username": username,
"xoo-el-password": password,
"xoo-el-rememberme": "forever",
"_xoo_el_form": "login",
"xoo_el_redirect": "/",
"action": "xoo_el_form_action",
"display": "popup",
},
headers={
"Referer": f"{base_url}/",
"Origin": base_url,
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:147.0) Gecko/20100101 Firefox/147.0",
"Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
"X-Requested-With": "XMLHttpRequest",
"Accept": "*/*",
},
timeout=30,
)
r.raise_for_status()
result = r.json()
if result.get("error"):
raise RuntimeError(f"Login rejected by server: {result.get('notice', result)}")
for name, value in session.cookies.items():
if name.startswith(COOKIE_PREFIX):
return name, value
raise RuntimeError(
"Server accepted login but no wordpress_logged_in_* cookie was set.\n"
" Check that username and password are correct."
)
def _auto_login() -> None:
parser = argparse.ArgumentParser(
description=f"Copy the {COOKIE_DOMAIN} login cookie from your browser into .env."
description="Log in and save session cookie to .env"
)
parser.add_argument(
"--browser", "-b",
choices=BROWSER_NAMES,
metavar="BROWSER",
help=f"Browser to read from: {', '.join(BROWSER_NAMES)} (default: try all in order)",
"--site",
required=True,
choices=list(SITES.keys()),
help="Which site to authenticate with",
)
args = parser.parse_args()
order = [args.browser] if args.browser else BROWSER_NAMES
site_cfg = SITES[args.site]
env_prefix = site_cfg["env_prefix"]
base_url = site_cfg["base_url"]
env_key = f"{env_prefix}_LOGIN_COOKIE"
cookie_name = cookie_value = None
for browser in order:
print(f"[…] Trying {browser}")
try:
cookie_name, cookie_value = find_cookie(browser)
except ImportError as e:
raise SystemExit(f"[!] {e}")
except (ValueError, PermissionError, RuntimeError) as e:
print(f"[!] {e}")
continue
if cookie_name:
print(f"[+] Found in {browser}: {cookie_name}")
break
print(f" No {COOKIE_PREFIX}* cookie found in {browser}.")
if not cookie_name:
username = os.environ.get(f"{env_prefix}_USERNAME", "").strip()
password = os.environ.get(f"{env_prefix}_PASSWORD", "").strip()
if not username or not password:
raise SystemExit(
f"\n[!] No {COOKIE_PREFIX}* cookie found in any browser.\n"
f" Make sure you are logged into {COOKIE_DOMAIN}, then re-run.\n"
" Or set WP_LOGIN_COOKIE manually in .env — see .env.example."
f"[!] {env_prefix}_USERNAME and {env_prefix}_PASSWORD must be set "
"in the environment or .env — see .env.example."
)
try:
cookie_name, cookie_value = login_and_get_cookie(username, password, base_url)
except RuntimeError as e:
raise SystemExit(f"[!] {e}")
print(f"[+] Login succeeded: {cookie_name}")
action = update_env(cookie_name, cookie_value, env_key=env_key)
print(f"[✓] {env_key} {action} in {ENV_FILE}.")
action = update_env(cookie_name, cookie_value)
print(f"[✓] {ENV_KEY} {action} in {ENV_FILE}.")
def main() -> None:
_auto_login()
if __name__ == "__main__":
+398 -174
View File
@@ -1,60 +1,131 @@
import argparse
import re
import json
import os
import time
import signal
import asyncio
import tempfile
import requests
from pathlib import Path, PurePosixPath
from pathlib import PurePosixPath
from typing import Any
from urllib.parse import urlparse
from dotenv import load_dotenv
from playwright.async_api import async_playwright
from check_clashes import VIDEO_EXTS
from config import BASE_URL
from playwright.async_api import async_playwright, BrowserContext
from check_clashes import (
VIDEO_EXTS,
load_video_map,
save_video_map,
is_valid_url,
expects_video,
)
from config import SITES
from grab_cookie import login_and_get_cookie, update_env
load_dotenv()
def _is_video_url(url):
def _is_video_url(url: str) -> bool:
"""True if `url` ends with a recognised video extension (case-insensitive, path only)."""
return PurePosixPath(urlparse(url).path).suffix.lower() in VIDEO_EXTS
WP_API = f"{BASE_URL}/wp-json/wp/v2"
SKIP_TYPES = {
"attachment", "nav_menu_item", "wp_block", "wp_template",
"wp_template_part", "wp_global_styles", "wp_navigation",
"wp_font_family", "wp_font_face",
"attachment",
"nav_menu_item",
"wp_block",
"wp_template",
"wp_template_part",
"wp_global_styles",
"wp_navigation",
"wp_font_family",
"wp_font_face",
}
VIDEO_MAP_FILE = "video_map.json"
MAX_WORKERS = 4
API_HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:147.0) Gecko/20100101 Firefox/147.0",
_USER_AGENT = (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:147.0) Gecko/20100101 Firefox/147.0"
)
def _api_headers(base_url: str, cookie_name: str, cookie_value: str) -> dict[str, str]:
return {
"User-Agent": _USER_AGENT,
"Accept": "application/json",
"Referer": f"{BASE_URL}/",
"Referer": f"{base_url}/",
"Cookie": f"{cookie_name}={cookie_value}; eav-age-verified=1",
}
def _get_login_cookie():
raw = os.environ.get("WP_LOGIN_COOKIE", "").strip() # strip accidental whitespace
if not raw:
raise RuntimeError(
"WP_LOGIN_COOKIE not set. Copy it from your browser into .env — see .env.example.")
def _select_probe_url(video_map: dict[str, Any]) -> str | None:
"""Pure function: return the first URL in video_map where expects_video() is True."""
return next((url for url in video_map if expects_video(url)), None)
def _probe_cookie(name: str, value: str, site_key: str) -> bool:
"""HEAD request to a members-only video page. Returns True if the cookie is still valid."""
video_map = load_video_map(site_key)
probe_url = _select_probe_url(video_map)
if probe_url is None:
return False # no video URLs yet — can't validate, fall through to re-auth
r = requests.head(
probe_url,
headers={"Cookie": f"{name}={value}", "User-Agent": _USER_AGENT},
allow_redirects=False,
timeout=10,
)
return r.status_code == 200
def _get_login_cookie(site_key: str, site_cfg: dict[str, str]) -> tuple[str, str]:
env_prefix = site_cfg["env_prefix"]
base_url = site_cfg["base_url"]
env_key = f"{env_prefix}_LOGIN_COOKIE"
username = os.environ.get(f"{env_prefix}_USERNAME", "").strip()
password = os.environ.get(f"{env_prefix}_PASSWORD", "").strip()
has_credentials = bool(username and password)
raw = os.environ.get(env_key, "").strip()
if raw:
name, _, value = raw.partition("=")
if not value:
raise RuntimeError(
"WP_LOGIN_COOKIE looks malformed (no '=' found). Expected: name=value")
if not name.startswith("wordpress_logged_in_"):
raise RuntimeError(
"WP_LOGIN_COOKIE doesn't look right — expected a wordpress_logged_in_... cookie.")
if value and name.startswith("wordpress_logged_in_"):
if not has_credentials:
return name, value # cookie-only mode — trust it
print(f"[{site_key}] Cookie found — validating…")
if _probe_cookie(name, value, site_key):
print(f"[{site_key}] Cookie still valid — skipping login.")
return name, value
print(f"[{site_key}] Cookie expired — re-authenticating…")
if has_credentials:
cookie_name, cookie_value = login_and_get_cookie(username, password, base_url)
action = update_env(cookie_name, cookie_value, env_key=env_key)
print(f"[{site_key}] Logged in: {cookie_name} ({action} in .env)")
return cookie_name, cookie_value
raise RuntimeError(
f"No credentials or cookie found for {site_key}. Set either:\n"
f"{env_prefix}_USERNAME + {env_prefix}_PASSWORD (recommended)\n"
f"{env_prefix}_LOGIN_COOKIE (fallback — may expire)\n"
"See .env.example."
)
def discover_content_types(session):
"""Query /wp-json/wp/v2/types and return a list of (name, rest_base, type_slug) for content types worth scraping."""
r = session.get(f"{WP_API}/types", timeout=30)
def _has_credentials(site_cfg: dict[str, str]) -> bool:
env_prefix = site_cfg["env_prefix"]
has_cookie = bool(os.environ.get(f"{env_prefix}_LOGIN_COOKIE", "").strip())
has_creds = bool(
os.environ.get(f"{env_prefix}_USERNAME", "").strip()
and os.environ.get(f"{env_prefix}_PASSWORD", "").strip()
)
return has_cookie or has_creds
def discover_content_types(
session: requests.Session, wp_api: str
) -> list[tuple[str, str, str]]:
"""Query /wp-json/wp/v2/types and return a list of (name, rest_base, type_slug)."""
r = session.get(f"{wp_api}/types", timeout=30)
r.raise_for_status()
types = r.json()
@@ -69,16 +140,22 @@ def discover_content_types(session):
return targets
def fetch_all_posts_for_type(session, type_name, rest_base, type_slug):
"""Paginate one content type and return (url, title, description) tuples.
Uses the `link` field when available; falls back to building from slug."""
def fetch_all_posts_for_type(
session: requests.Session,
wp_api: str,
base_url: str,
type_name: str,
rest_base: str,
type_slug: str,
) -> list[tuple[str, str, str]]:
"""Paginate one content type and return (url, title, description) tuples."""
url_prefix = type_slug.replace("_", "-")
results = []
page = 1
while True:
r = session.get(
f"{WP_API}/{rest_base}",
f"{wp_api}/{rest_base}",
params={"per_page": 100, "page": page},
timeout=30,
)
@@ -92,15 +169,19 @@ def fetch_all_posts_for_type(session, type_name, rest_base, type_slug):
if not link.startswith("http"):
slug = post.get("slug")
if slug:
link = f"{BASE_URL}/{url_prefix}/{slug}/"
link = f"{base_url}/{url_prefix}/{slug}/"
else:
continue
title_obj = post.get("title", {})
title = title_obj.get("rendered", "") if isinstance(
title_obj, dict) else str(title_obj)
title = (
title_obj.get("rendered", "")
if isinstance(title_obj, dict)
else str(title_obj)
)
content_obj = post.get("content", {})
content_html = content_obj.get(
"rendered", "") if isinstance(content_obj, dict) else ""
content_html = (
content_obj.get("rendered", "") if isinstance(content_obj, dict) else ""
)
description = html_to_text(content_html) if content_html else ""
results.append((link, title, description))
print(f" {type_name} page {page}: {len(data)} items")
@@ -109,66 +190,88 @@ def fetch_all_posts_for_type(session, type_name, rest_base, type_slug):
return results
def fetch_post_urls_from_api(headers):
def fetch_post_urls_from_api(
site_key: str,
base_url: str,
wp_api: str,
headers: dict[str, str],
) -> list[str]:
"""Auto-discover all content types via the WP REST API and collect every post URL.
Also builds video_map.json with titles pre-populated."""
print("[+] video_map.json empty or missing — discovering content types from REST API…")
Also pre-populates video_map.json with titles."""
print(f"[{site_key}] video_map empty — discovering content types from REST API…")
session = requests.Session()
session.headers.update(headers)
targets = discover_content_types(session)
targets = discover_content_types(session, wp_api)
print(
f"[+] Found {len(targets)} content types: {', '.join(name for name, _, _ in targets)}\n")
f"[{site_key}] Found {len(targets)} content types: "
f"{', '.join(name for name, _, _ in targets)}\n"
)
all_results = []
for type_name, rest_base, type_slug in targets:
type_results = fetch_all_posts_for_type(
session, type_name, rest_base, type_slug)
session, wp_api, base_url, type_name, rest_base, type_slug
)
all_results.extend(type_results)
seen = set()
seen: set[str] = set()
deduped_urls = []
video_map = load_video_map()
video_map = load_video_map(site_key)
for url, title, description in all_results:
if url not in seen and url.startswith("http"):
seen.add(url)
deduped_urls.append(url)
if url not in video_map:
video_map[url] = {"title": title,
"description": description, "videos": []}
video_map[url] = {
"title": title,
"description": description,
"videos": [],
}
else:
if not video_map[url].get("title"):
video_map[url]["title"] = title
if not video_map[url].get("description"):
video_map[url]["description"] = description
save_video_map(video_map)
save_video_map(video_map, site_key)
print(
f"\n[+] Discovered {len(deduped_urls)} unique URLs → saved to {VIDEO_MAP_FILE}")
print(
f"[+] Pre-populated {len(video_map)} entries in {VIDEO_MAP_FILE}")
f"\n[{site_key}] Discovered {len(deduped_urls)} unique URLs → saved to video_map.json"
)
print(f"[{site_key}] Pre-populated {len(video_map)} entries")
return deduped_urls
def fetch_metadata_from_api(video_map, urls, headers):
def fetch_metadata_from_api(
site_key: str,
base_url: str,
wp_api: str,
video_map: dict[str, Any],
urls: list[str],
headers: dict[str, str],
) -> None:
"""Populate missing titles and descriptions in video_map from the REST API."""
missing = [u for u in urls
missing = [
u
for u in urls
if u not in video_map
or not video_map[u].get("title")
or not video_map[u].get("description")]
or not video_map[u].get("description")
]
if not missing:
return
print(f"[+] Fetching metadata from REST API for {len(missing)} posts…")
print(f"[{site_key}] Fetching metadata from REST API for {len(missing)} posts…")
session = requests.Session()
session.headers.update(headers)
targets = discover_content_types(session)
targets = discover_content_types(session, wp_api)
for type_name, rest_base, type_slug in targets:
type_results = fetch_all_posts_for_type(
session, type_name, rest_base, type_slug)
session, wp_api, base_url, type_name, rest_base, type_slug
)
for url, title, description in type_results:
if url in video_map:
if not video_map[url].get("title"):
@@ -176,93 +279,90 @@ def fetch_metadata_from_api(video_map, urls, headers):
if not video_map[url].get("description"):
video_map[url]["description"] = description
else:
video_map[url] = {"title": title,
"description": description, "videos": []}
video_map[url] = {
"title": title,
"description": description,
"videos": [],
}
save_video_map(video_map)
save_video_map(video_map, site_key)
populated_t = sum(1 for u in urls if video_map.get(u, {}).get("title"))
populated_d = sum(1 for u in urls if video_map.get(
u, {}).get("description"))
print(f"[+] Titles populated: {populated_t}/{len(urls)}")
print(f"[+] Descriptions populated: {populated_d}/{len(urls)}")
populated_d = sum(1 for u in urls if video_map.get(u, {}).get("description"))
print(f"[{site_key}] Titles populated: {populated_t}/{len(urls)}")
print(f"[{site_key}] Descriptions populated: {populated_d}/{len(urls)}")
def load_post_urls(headers):
vm = load_video_map()
def load_post_urls(
site_key: str,
base_url: str,
wp_api: str,
headers: dict[str, str],
) -> list[str]:
vm = load_video_map(site_key)
if vm:
print(f"[+] {VIDEO_MAP_FILE} found — loading {len(vm)} post URLs.")
print(f"[{site_key}] video_map found — loading {len(vm)} post URLs.")
return list(vm.keys())
return fetch_post_urls_from_api(headers)
return fetch_post_urls_from_api(site_key, base_url, wp_api, headers)
def html_to_text(html_str):
def html_to_text(html_str: str) -> str:
"""Strip HTML tags, decode entities, and collapse whitespace into clean plain text."""
import html
text = re.sub(r'<br\s*/?>', '\n', html_str)
text = text.replace('</p>', '\n\n')
text = re.sub(r'<[^>]+>', '', text)
text = re.sub(r"<br\s*/?>", "\n", html_str)
text = text.replace("</p>", "\n\n")
text = re.sub(r"<[^>]+>", "", text)
text = html.unescape(text)
lines = [line.strip() for line in text.splitlines()]
text = '\n'.join(lines)
text = re.sub(r'\n{3,}', '\n\n', text)
text = "\n".join(lines)
text = re.sub(r"\n{3,}", "\n\n", text)
return text.strip()
def extract_mp4_from_html(html):
def extract_mp4_from_html(html: str) -> list[str]:
candidates = re.findall(r'https?://[^\s"\'<>]+', html)
return [u for u in candidates if _is_video_url(u)]
def extract_title_from_html(html):
m = re.search(
r'<h1[^>]*class="entry-title"[^>]*>(.*?)</h1>', html, re.DOTALL)
def extract_title_from_html(html: str) -> str | None:
m = re.search(r'<h1[^>]*class="entry-title"[^>]*>(.*?)</h1>', html, re.DOTALL)
if m:
title = re.sub(r'<[^>]+>', '', m.group(1)).strip()
title = re.sub(r"<[^>]+>", "", m.group(1)).strip()
return title
m = re.search(r'<title>(.*?)(?:\s*[-|].*)?</title>', html, re.DOTALL)
m = re.search(r"<title>(.*?)(?:\s*[-|].*)?</title>", html, re.DOTALL)
if m:
return m.group(1).strip()
return None
def load_video_map():
if Path(VIDEO_MAP_FILE).exists():
try:
with open(VIDEO_MAP_FILE, encoding="utf-8") as f:
return json.load(f)
except (json.JSONDecodeError, OSError):
return {}
return {}
def save_video_map(video_map):
fd, tmp_path = tempfile.mkstemp(dir=Path(VIDEO_MAP_FILE).resolve().parent, suffix=".tmp")
try:
with os.fdopen(fd, "w", encoding="utf-8") as f:
json.dump(video_map, f, indent=2, ensure_ascii=False)
Path(tmp_path).replace(VIDEO_MAP_FILE)
except Exception:
try:
Path(tmp_path).unlink()
except OSError:
pass
raise
def _expects_video(url):
return "/pinkcuffs-videos/" in url
MAX_RETRIES = 2
async def worker(worker_id, queue, context, known,
total, retry_counts, video_map, map_lock, shutdown_event):
async def worker(
worker_id: int,
queue: asyncio.Queue[tuple[int, str]],
context: BrowserContext,
known: set[str],
total: int,
retry_counts: dict[int, int],
video_map: dict[str, Any],
map_lock: asyncio.Lock,
shutdown_event: asyncio.Event,
reauth_lock: asyncio.Lock,
reauth_done: list[bool],
site_key: str,
site_cfg: dict[str, str],
) -> None:
base_url = site_cfg["base_url"]
cookie_domain = urlparse(base_url).hostname or site_cfg["cookie_domain"]
env_prefix = site_cfg["env_prefix"]
page = await context.new_page()
video_hits = set()
video_hits: set[str] = set()
page.on("response", lambda resp: video_hits.add(resp.url) if _is_video_url(resp.url) else None)
page.on(
"response",
lambda resp: video_hits.add(resp.url) if _is_video_url(resp.url) else None,
)
try:
while not shutdown_event.is_set():
@@ -279,19 +379,69 @@ async def worker(worker_id, queue, context, known,
await page.goto(url, wait_until="networkidle", timeout=60000)
except Exception as e:
print(f"[W{worker_id}] Navigation error: {e}")
if _expects_video(url) and attempt < MAX_RETRIES:
if expects_video(url) and attempt < MAX_RETRIES:
retry_counts[idx] = attempt + 1
queue.put_nowait((idx, url))
print(f"[W{worker_id}] Re-queued for retry.")
elif not _expects_video(url):
elif not expects_video(url):
async with map_lock:
entry = video_map.get(url, {})
entry["scraped_at"] = int(time.time())
video_map[url] = entry
save_video_map(video_map)
save_video_map(video_map, site_key)
else:
print(
f"[W{worker_id}] Still failing after {MAX_RETRIES} retries — will retry next run.")
f"[W{worker_id}] Still failing after {MAX_RETRIES} retries — will retry next run."
)
continue
if "NoDirectAccessAllowed" in page.url:
recovered = False
async with reauth_lock:
if not reauth_done[0]:
username = os.environ.get(f"{env_prefix}_USERNAME", "").strip()
password = os.environ.get(f"{env_prefix}_PASSWORD", "").strip()
if username and password:
print(f"[W{worker_id}] Cookie expired — re-authenticating…")
try:
new_name, new_value = await asyncio.to_thread(
login_and_get_cookie, username, password, base_url
)
update_env(
new_name,
new_value,
env_key=f"{env_prefix}_LOGIN_COOKIE",
)
await context.add_cookies(
[
{
"name": new_name,
"value": new_value,
"domain": cookie_domain,
"path": "/",
"httpOnly": True,
"secure": True,
"sameSite": "None",
}
]
)
reauth_done[0] = True
recovered = True
print(f"[W{worker_id}] Re-auth succeeded — re-queuing.")
except Exception as e:
print(f"[W{worker_id}] Re-auth failed: {e}")
shutdown_event.set()
else:
print(
f"[W{worker_id}] Cookie expired. "
f"Set {env_prefix}_USERNAME + {env_prefix}_PASSWORD "
"in .env for auto re-auth."
)
shutdown_event.set()
else:
recovered = True # another worker already re-authed
if recovered:
queue.put_nowait((idx, url))
continue
await asyncio.sleep(1.5)
@@ -301,9 +451,15 @@ async def worker(worker_id, queue, context, known,
found = set(html_videos) | set(video_hits)
video_hits.clear()
all_videos = [m for m in found if m not in (
f"{BASE_URL}/wp-content/plugins/easy-video-player/lib/blank.mp4",
)]
all_videos = [
m
for m in found
if is_valid_url(m)
and m
not in (
f"{base_url}/wp-content/plugins/easy-video-player/lib/blank.mp4",
)
]
async with map_lock:
new_found = found - known
@@ -312,63 +468,71 @@ async def worker(worker_id, queue, context, known,
known.update(new_found)
elif all_videos:
print(
f"[W{worker_id}] {len(all_videos)} video(s) already known — skipping write.")
f"[W{worker_id}] {len(all_videos)} video(s) already known — skipping write."
)
else:
print(f"[W{worker_id}] No video found on page.")
entry = video_map.get(url, {})
if title:
entry["title"] = title
existing_videos = set(entry.get("videos", []))
existing_videos.update(all_videos)
entry["videos"] = sorted(existing_videos)
mark_done = bool(all_videos) or not _expects_video(url)
existing_dict: dict[str, Any] = {
vid["url"]: vid for vid in entry.get("videos", [])
}
for vid_url in all_videos:
if vid_url not in existing_dict:
existing_dict[vid_url] = {"url": vid_url}
entry["videos"] = sorted(existing_dict.values(), key=lambda v: v["url"])
mark_done = bool(all_videos) or not expects_video(url)
if mark_done:
entry["scraped_at"] = int(time.time())
video_map[url] = entry
save_video_map(video_map)
save_video_map(video_map, site_key)
if not mark_done:
if attempt < MAX_RETRIES:
retry_counts[idx] = attempt + 1
queue.put_nowait((idx, url))
print(
f"[W{worker_id}] Re-queued for retry ({attempt + 1}/{MAX_RETRIES}).")
f"[W{worker_id}] Re-queued for retry ({attempt + 1}/{MAX_RETRIES})."
)
else:
print(
f"[W{worker_id}] No video after {MAX_RETRIES} retries — will retry next run.")
f"[W{worker_id}] No video after {MAX_RETRIES} retries — will retry next run."
)
finally:
await page.close()
async def run():
shutdown_event = asyncio.Event()
loop = asyncio.get_running_loop()
async def run_for_site(
site_key: str,
site_cfg: dict[str, str],
shutdown_event: asyncio.Event,
) -> None:
base_url = site_cfg["base_url"]
cookie_domain = urlparse(base_url).hostname or site_cfg["cookie_domain"]
wp_api = f"{base_url}/wp-json/wp/v2"
def _handle_shutdown(signum, _frame):
print(f"\n[!] Signal {signum} received — finishing active pages then exiting…")
loop.call_soon_threadsafe(shutdown_event.set)
cookie_name, cookie_value = _get_login_cookie(site_key, site_cfg)
req_headers = _api_headers(base_url, cookie_name, cookie_value)
signal.signal(signal.SIGINT, _handle_shutdown)
signal.signal(signal.SIGTERM, _handle_shutdown)
urls = load_post_urls(site_key, base_url, wp_api, req_headers)
try:
cookie_name, cookie_value = _get_login_cookie()
req_headers = {
**API_HEADERS,
"Cookie": f"{cookie_name}={cookie_value}; eav-age-verified=1",
}
urls = load_post_urls(req_headers)
video_map = load_video_map()
if any(u not in video_map
video_map = load_video_map(site_key)
if any(
u not in video_map
or not video_map[u].get("title")
or not video_map[u].get("description")
for u in urls if _expects_video(u)):
fetch_metadata_from_api(video_map, urls, req_headers)
for u in urls
if expects_video(u)
):
fetch_metadata_from_api(
site_key, base_url, wp_api, video_map, urls, req_headers
)
known = {u for entry in video_map.values() for u in entry.get("videos", [])}
known = {
vid["url"] for entry in video_map.values() for vid in entry.get("videos", [])
}
total = len(urls)
pending = []
@@ -377,62 +541,79 @@ async def run():
entry = video_map.get(u, {})
if not entry.get("scraped_at"):
pending.append((i, u))
elif _expects_video(u) and not entry.get("videos"):
elif expects_video(u) and not entry.get("videos"):
pending.append((i, u))
needs_map += 1
done_count = sum(1 for v in video_map.values() if v.get("scraped_at"))
print(f"[+] Loaded {total} post URLs.")
print(f"[+] Already have {len(known)} video URLs mapped.")
print(f"[+] Video map: {len(video_map)} entries in {VIDEO_MAP_FILE}")
print(f"[{site_key}] Loaded {total} post URLs.")
print(f"[{site_key}] Already have {len(known)} video URLs mapped.")
print(f"[{site_key}] Video map: {len(video_map)} entries in video_map.json")
if done_count:
remaining_new = len(pending) - needs_map
print(
f"[] Resuming: {done_count} done, {remaining_new} new + {needs_map} needing map data.")
f"[{site_key}] Resuming: {done_count} done, "
f"{remaining_new} new + {needs_map} needing map data."
)
if not pending:
print("[] All URLs already processed and mapped.")
print(f"[{site_key}] All URLs already processed and mapped.")
return
print(
f"[] Running with {min(MAX_WORKERS, len(pending))} concurrent workers.\n")
f"[{site_key}] Running with {min(MAX_WORKERS, len(pending))} concurrent workers.\n"
)
queue = asyncio.Queue()
queue: asyncio.Queue[tuple[int, str]] = asyncio.Queue()
for item in pending:
queue.put_nowait(item)
map_lock = asyncio.Lock()
retry_counts = {}
reauth_lock = asyncio.Lock()
reauth_done: list[bool] = [False]
retry_counts: dict[int, int] = {}
async with async_playwright() as p:
browser = await p.firefox.launch(headless=True)
context = await browser.new_context()
_cookie_domain = urlparse(BASE_URL).netloc
site_cookies = [
{
"name": cookie_name,
"value": cookie_value,
"domain": _cookie_domain,
"domain": cookie_domain,
"path": "/",
"httpOnly": True,
"secure": True,
"sameSite": "None"
"sameSite": "None",
},
{
"name": "eav-age-verified",
"value": "1",
"domain": _cookie_domain,
"path": "/"
}
"domain": cookie_domain,
"path": "/",
},
]
await context.add_cookies(site_cookies)
await context.add_cookies(site_cookies) # type: ignore[arg-type]
num_workers = min(MAX_WORKERS, len(pending))
workers = [
asyncio.create_task(
worker(i, queue, context, known,
total, retry_counts, video_map, map_lock, shutdown_event)
worker(
i,
queue,
context,
known,
total,
retry_counts,
video_map,
map_lock,
shutdown_event,
reauth_lock,
reauth_done,
site_key,
site_cfg,
)
)
for i in range(num_workers)
]
@@ -442,21 +623,64 @@ async def run():
mapped = sum(1 for v in video_map.values() if v.get("videos"))
print(
f"\n[+] Video map: {mapped} posts with videos, {len(video_map)} total entries.")
f"\n[{site_key}] Video map: {mapped} posts with videos, {len(video_map)} total entries."
)
if not shutdown_event.is_set():
print(f"[] Completed. Full map in {VIDEO_MAP_FILE}")
print(f"[{site_key}] Completed. Full map in video_map.json")
else:
done = sum(1 for v in video_map.values() if v.get("scraped_at"))
print(f"[] Paused — {done}/{total} done. Run again to resume.")
print(f"[{site_key}] Paused — {done}/{total} done. Run again to resume.")
async def run(selected_sites: list[str], explicit: bool) -> None:
shutdown_event = asyncio.Event()
loop = asyncio.get_running_loop()
def _handle_shutdown(signum: int, _: object) -> None:
print(f"\n[!] Signal {signum} received — finishing active pages then exiting…")
loop.call_soon_threadsafe(shutdown_event.set)
signal.signal(signal.SIGINT, _handle_shutdown)
signal.signal(signal.SIGTERM, _handle_shutdown)
try:
for site_key in selected_sites:
if shutdown_event.is_set():
break
site_cfg = SITES[site_key]
if not _has_credentials(site_cfg):
if explicit:
raise RuntimeError(
f"No credentials or cookie found for {site_key}. See .env.example."
)
print(f"[{site_key}] No credentials found — skipping.")
continue
print(f"\n{'=' * 60}")
print(f" Site: {site_key} ({site_cfg['base_url']})")
print(f"{'=' * 60}\n")
await run_for_site(site_key, site_cfg, shutdown_event)
finally:
signal.signal(signal.SIGINT, signal.SIG_DFL)
signal.signal(signal.SIGTERM, signal.SIG_DFL)
def main():
def main() -> None:
parser = argparse.ArgumentParser(description="Scrape video URLs from member sites")
parser.add_argument(
"--site",
action="append",
choices=list(SITES.keys()),
dest="sites",
metavar="SITE",
help=f"Site(s) to scrape (default: all). Can be repeated. Choices: {', '.join(SITES)}",
)
args = parser.parse_args()
explicit = bool(args.sites)
selected = args.sites or list(SITES.keys())
try:
asyncio.run(run())
asyncio.run(run(selected, explicit))
except KeyboardInterrupt:
print("\n[!] Interrupted. Run again to resume.")
except RuntimeError as e:
-1
View File
@@ -1,4 +1,3 @@
playwright==1.58.0
python-dotenv==1.2.1
Requests==2.32.5
rookiepy==0.5.6
+140 -16
View File
@@ -4,16 +4,45 @@ Importable function:
summarize_sizes(sizes) - return dict with total, smallest, largest, average, failed
"""
from check_clashes import fmt_size, fetch_sizes, load_video_map, VIDEO_MAP_FILE
import argparse
import time
from typing import Any, TypedDict
from check_clashes import (
fmt_size,
fetch_sizes,
load_video_map,
save_video_map,
build_url_referers,
VIDEO_MAP_FILE,
)
from config import SITES, SIZE_CACHE_TTL
def summarize_sizes(sizes):
class SizeStats(TypedDict):
sized: int
total: int
total_bytes: int
smallest: int
largest: int
average: int
failed: list[str]
def summarize_sizes(sizes: dict[str, int | None]) -> SizeStats:
"""Given {url: size_or_None}, return a stats dict."""
known = {u: s for u, s in sizes.items() if s is not None}
failed = [u for u, s in sizes.items() if s is None]
if not known:
return {"sized": 0, "total": len(sizes), "total_bytes": 0,
"smallest": 0, "largest": 0, "average": 0, "failed": failed}
return {
"sized": 0,
"total": len(sizes),
"total_bytes": 0,
"smallest": 0,
"largest": 0,
"average": 0,
"failed": failed,
}
total_bytes = sum(known.values())
return {
"sized": len(known),
@@ -26,23 +55,22 @@ def summarize_sizes(sizes):
}
def _is_stale(vid: dict[str, Any], now: int) -> bool:
"""True if the cached size is absent or older than SIZE_CACHE_TTL seconds."""
if vid.get("size") is None:
return True
return (now - int(vid.get("size_checked_at", 0))) >= SIZE_CACHE_TTL
# --------------- CLI ---------------
def _progress(done, total):
def _progress(done: int, total: int) -> None:
if done % 200 == 0 or done == total:
print(f" {done}/{total}")
def main():
vm = load_video_map()
urls = [u for entry in vm.values() for u in entry.get("videos", []) if u.startswith("http")]
print(f"[+] {len(urls)} URLs in {VIDEO_MAP_FILE}")
print("[+] Fetching file sizes (20 threads)…\n")
sizes = fetch_sizes(urls, workers=20, on_progress=_progress)
stats = summarize_sizes(sizes)
def _print_stats(stats: SizeStats) -> None:
print(f"\n{'=' * 45}")
print(f" Sized: {stats['sized']}/{stats['total']} files")
print(f" Total: {fmt_size(stats['total_bytes'])}")
@@ -50,12 +78,108 @@ def main():
print(f" Largest: {fmt_size(stats['largest'])}")
print(f" Average: {fmt_size(stats['average'])}")
print(f"{'=' * 45}")
if stats["failed"]:
print(f"\n[!] {len(stats['failed'])} URL(s) could not be sized:")
for u in stats["failed"]:
print(f" {u}")
def _cache_hint(fresh: int, stale: int, missing: int) -> str:
parts = [label for count, label in [(fresh, f"{fresh} fresh"), (stale, f"{stale} stale"), (missing, f"{missing} missing")] if count]
if stale or missing:
suffix = " — run --write to refresh" if stale else " — run --write to probe missing"
else:
suffix = " — all current"
return f"Cache: {', '.join(parts)}{suffix}"
def _run_stats() -> None:
vm = load_video_map()
now = int(time.time())
sizes: dict[str, int | None] = {}
fresh = stale = missing = 0
for entry in vm.values():
for vid in entry.get("videos", []):
url = vid["url"]
if url in sizes:
continue
sizes[url] = vid.get("size")
if vid.get("size") is None:
missing += 1
elif _is_stale(vid, now):
stale += 1
else:
fresh += 1
print(f"[+] {len(sizes)} URLs in {VIDEO_MAP_FILE}")
print(f" {_cache_hint(fresh, stale, missing)}")
_print_stats(summarize_sizes(sizes))
def _apply_fetched(vm: dict[str, Any], fetched: dict[str, int | None], now: int) -> None:
for entry in vm.values():
for vid in entry.get("videos", []):
if vid["url"] in fetched:
vid["size"] = fetched[vid["url"]]
vid["size_checked_at"] = now
def _run_write() -> None:
"""Probe uncached sizes and write them into video_map.json."""
now = int(time.time())
all_fetched: dict[str, int | None] = {}
for site_key in SITES:
vm = load_video_map(site_key)
if not vm:
continue
url_referers = build_url_referers(vm)
to_probe: list[str] = [
vid["url"]
for entry in vm.values()
for vid in entry.get("videos", [])
if _is_stale(vid, now)
]
cached_count = sum(
1
for entry in vm.values()
for vid in entry.get("videos", [])
if not _is_stale(vid, now)
)
print(f"[{site_key}] {cached_count} cached, {len(to_probe)} to probe…")
fetched: dict[str, int | None] = {}
if to_probe:
fetched = fetch_sizes(
to_probe, workers=20, on_progress=_progress, url_referers=url_referers
)
_apply_fetched(vm, fetched, now)
save_video_map(vm, site_key)
all_fetched.update(fetched)
print(f"[{site_key}] Written.")
if all_fetched:
_print_stats(summarize_sizes(all_fetched))
def main() -> None:
parser = argparse.ArgumentParser(description="Calculate total video download size")
parser.add_argument(
"--write",
"-w",
action="store_true",
help="Probe uncached sizes and write them into video_map.json",
)
args = parser.parse_args()
if args.write:
_run_write()
else:
_run_stats()
if __name__ == "__main__":
main()
+193 -96
View File
@@ -26,16 +26,17 @@ from pathlib import Path
import re
import sys
import time
from typing import Any, cast
import requests
from dotenv import load_dotenv
from check_clashes import fmt_size, url_to_filename, VIDEO_EXTS
from check_clashes import fmt_size, url_to_filename, VIDEO_EXTS, load_video_map
from download import (
load_video_map,
collect_urls,
get_paths_for_mode,
read_mode,
build_url_to_site,
MODE_ORIGINAL,
DEFAULT_OUTPUT,
)
@@ -52,21 +53,21 @@ PT_NAME_MAX = 120
# ── Text helpers ─────────────────────────────────────────────────────
def clean_description(raw):
def clean_description(raw: str) -> str:
"""Strip WordPress shortcodes and HTML from a description."""
if not raw:
return ""
text = re.sub(r'\[/?[^\]]+\]', '', raw)
text = re.sub(r'<[^>]+>', '', text)
text = re.sub(r"\[/?[^\]]+\]", "", raw)
text = re.sub(r"<[^>]+>", "", text)
text = html.unescape(text)
text = re.sub(r'\n{3,}', '\n\n', text).strip()
text = re.sub(r"\n{3,}", "\n\n", text).strip()
return text[:10000]
def make_pt_name(title, fallback_filename):
def make_pt_name(title: str, fallback_filename: str) -> str:
"""Build a PeerTube-safe video name (3-120 chars)."""
name = html.unescape(title).strip(
) if title else Path(fallback_filename).stem
name = html.unescape(title).strip() if title else Path(fallback_filename).stem
if len(name) > PT_NAME_MAX:
name = name[: PT_NAME_MAX - 1].rstrip() + "\u2026"
while len(name) < 3:
@@ -76,7 +77,8 @@ def make_pt_name(title, fallback_filename):
# ── PeerTube API ─────────────────────────────────────────────────────
def get_oauth_token(base, username, password):
def get_oauth_token(base: str, username: str, password: str) -> str:
r = requests.get(f"{base}/api/v1/oauth-clients/local", timeout=15)
r.raise_for_status()
client = r.json()
@@ -93,31 +95,42 @@ def get_oauth_token(base, username, password):
timeout=15,
)
r.raise_for_status()
return r.json()["access_token"]
data_any: Any = r.json()
data = cast(dict[str, Any], data_any)
token = data.get("access_token")
if not isinstance(token, str) or not token:
raise RuntimeError("PeerTube token response missing access_token")
return token
def api_headers(token):
def api_headers(token: str) -> dict[str, str]:
return {"Authorization": f"Bearer {token}"}
def get_channel_id(base, token, channel_name):
def get_channel_id(base: str, token: str, channel_name: str) -> int:
r = requests.get(
f"{base}/api/v1/video-channels/{channel_name}",
headers=api_headers(token),
timeout=15,
)
r.raise_for_status()
return r.json()["id"]
data_any: Any = r.json()
data = cast(dict[str, Any], data_any)
cid = data.get("id")
if not isinstance(cid, int):
raise RuntimeError("PeerTube channel response missing id")
return cid
def get_channel_video_names(base, token, channel_name):
def get_channel_video_names(base: str, token: str, channel_name: str) -> Counter[str]:
"""Paginate through the channel and return a Counter of video names."""
counts = Counter()
counts: Counter[str] = Counter()
page_size = 25
start = 0
while True:
r = requests.get(
f"{base}/api/v1/video-channels/{channel_name}/videos",
params={"start": start, "count": 100},
params={"start": start, "count": page_size},
headers=api_headers(token),
timeout=30,
)
@@ -125,7 +138,7 @@ def get_channel_video_names(base, token, channel_name):
data = r.json()
for v in data.get("data", []):
counts[v["name"]] += 1
start += 100
start += page_size
if start >= data.get("total", 0):
break
return counts
@@ -135,8 +148,16 @@ CHUNK_SIZE = 10 * 1024 * 1024 # 10 MB
MAX_RETRIES = 5
def _init_resumable(base, token, channel_id, filepath, filename, name,
description="", nsfw=False):
def _init_resumable(
base: str,
token: str,
channel_id: int,
filepath: Path,
filename: str,
name: str,
description: str = "",
nsfw: bool = False,
) -> tuple[str, int]:
"""POST to create a resumable upload session. Returns upload URL."""
file_size = Path(filepath).stat().st_size
metadata = {
@@ -171,7 +192,7 @@ def _init_resumable(base, token, channel_id, filepath, filename, name,
return location, file_size
def _query_offset(upload_url, token, file_size):
def _query_offset(upload_url: str, token: str, file_size: int) -> int:
"""Ask the server how many bytes it has received so far."""
r = requests.put(
upload_url,
@@ -193,8 +214,15 @@ def _query_offset(upload_url, token, file_size):
return 0
def upload_video(base, token, channel_id, filepath, name,
description="", nsfw=False):
def upload_video(
base: str,
token: str,
channel_id: int,
filepath: Path,
name: str,
description: str = "",
nsfw: bool = False,
) -> tuple[bool, str | None]:
"""Resumable chunked upload. Returns (ok, uuid)."""
filepath = Path(filepath)
filename = filepath.name
@@ -202,8 +230,14 @@ def upload_video(base, token, channel_id, filepath, name,
try:
upload_url, _ = _init_resumable(
base, token, channel_id, filepath, filename,
name, description, nsfw,
base,
token,
channel_id,
filepath,
filename,
name,
description,
nsfw,
)
except Exception as e:
print(f" Init failed: {e}")
@@ -221,8 +255,11 @@ def upload_video(base, token, channel_id, filepath, name,
chunk = f.read(chunk_len)
pct = int(100 * (end + 1) / file_size)
print(f" {fmt_size(offset)}/{fmt_size(file_size)} ({pct}%)",
end="\r", flush=True)
print(
f" {fmt_size(offset)}/{fmt_size(file_size)} ({pct}%)",
end="\r",
flush=True,
)
try:
r = requests.put(
@@ -239,12 +276,13 @@ def upload_video(base, token, channel_id, filepath, name,
except (requests.ConnectionError, requests.Timeout) as e:
retries += 1
if retries > MAX_RETRIES:
print(
f"\n Upload failed after {MAX_RETRIES} retries: {e}")
print(f"\n Upload failed after {MAX_RETRIES} retries: {e}")
return False, None
wait = min(2**retries, 60)
print(f"\n Connection error, retry {retries}/{MAX_RETRIES} "
f"in {wait}s ...")
print(
f"\n Connection error, retry {retries}/{MAX_RETRIES} "
f"in {wait}s ..."
)
time.sleep(wait)
try:
offset = _query_offset(upload_url, token, file_size)
@@ -261,8 +299,7 @@ def upload_video(base, token, channel_id, filepath, name,
retries = 0
elif r.status_code == 200:
print(
f" {fmt_size(file_size)}/{fmt_size(file_size)} (100%)")
print(f" {fmt_size(file_size)}/{fmt_size(file_size)} (100%)")
uuid = r.json().get("video", {}).get("uuid")
return True, uuid
@@ -270,11 +307,9 @@ def upload_video(base, token, channel_id, filepath, name,
retry_after = int(r.headers.get("Retry-After", 10))
retries += 1
if retries > MAX_RETRIES:
print(
f"\n Upload failed: server returned {r.status_code}")
print(f"\n Upload failed: server returned {r.status_code}")
return False, None
print(
f"\n Server {r.status_code}, retry in {retry_after}s ...")
print(f"\n Server {r.status_code}, retry in {retry_after}s ...")
time.sleep(retry_after)
try:
offset = _query_offset(upload_url, token, file_size)
@@ -301,7 +336,7 @@ _STATE = {
}
def get_video_state(base, token, uuid):
def get_video_state(base: str, token: str, uuid: str) -> tuple[int, str]:
r = requests.get(
f"{base}/api/v1/videos/{uuid}",
headers=api_headers(token),
@@ -312,7 +347,7 @@ def get_video_state(base, token, uuid):
return state["id"], state.get("label", "")
def wait_for_published(base, token, uuid, poll_interval):
def wait_for_published(base: str, token: str, uuid: str, poll_interval: int) -> int:
"""Block until the video reaches state 1 (Published) or a failure state."""
started = time.monotonic()
while True:
@@ -329,8 +364,10 @@ def wait_for_published(base, token, uuid, poll_interval):
try:
sid, label = get_video_state(base, token, uuid)
except requests.exceptions.RequestException as e:
print(f" -> Poll error ({e.__class__.__name__}) "
f"after {elapsed_str}, retrying in {poll_interval}s …")
print(
f" -> Poll error ({e.__class__.__name__}) "
f"after {elapsed_str}, retrying in {poll_interval}s …"
)
time.sleep(poll_interval)
continue
@@ -343,13 +380,16 @@ def wait_for_published(base, token, uuid, poll_interval):
print(f" -> FAILED: {display}")
return sid
print(f" -> {display}{elapsed_str} elapsed (next check in {poll_interval}s)")
print(
f" -> {display}{elapsed_str} elapsed (next check in {poll_interval}s)"
)
time.sleep(poll_interval)
# ── State tracker ────────────────────────────────────────────────────
def load_uploaded(input_dir):
def load_uploaded(input_dir: str) -> set[Path]:
path = Path(input_dir) / UPLOADED_FILE
if not path.exists():
return set()
@@ -357,36 +397,52 @@ def load_uploaded(input_dir):
return {Path(line.strip()) for line in f if line.strip()}
def mark_uploaded(input_dir, rel_path):
def mark_uploaded(input_dir: str, rel_path: Path) -> None:
with open(Path(input_dir) / UPLOADED_FILE, "a") as f:
f.write(f"{rel_path}\n")
# ── File / metadata helpers ─────────────────────────────────────────
def build_path_to_meta(video_map, input_dir):
"""Map each expected download path (relative) to {title, description}."""
def build_path_to_meta(
video_map: dict[str, Any],
input_dir: str,
) -> dict[Path, dict[str, str]]:
"""Map each expected download path (relative) to {title, description, original_filename}."""
urls = collect_urls(video_map)
mode = read_mode(input_dir) or MODE_ORIGINAL
paths = get_paths_for_mode(mode, urls, video_map, input_dir)
url_meta = {}
for entry in video_map.values():
t = entry.get("title", "")
d = entry.get("description", "")
for video_url in entry.get("videos", []):
if video_url not in url_meta:
url_meta[video_url] = {"title": t, "description": d}
url_to_site = build_url_to_site()
result = {}
paths = get_paths_for_mode(mode, urls, video_map, input_dir, url_to_site)
url_meta: dict[str, dict[str, str]] = {}
for entry_any in video_map.values():
entry = cast(dict[str, Any], entry_any)
t = entry.get("title")
d = entry.get("description")
title = t if isinstance(t, str) else ""
desc = d if isinstance(d, str) else ""
for vid in entry.get("videos", []):
if vid["url"] not in url_meta:
url_meta[vid["url"]] = {"title": title, "description": desc}
result: dict[Path, dict[str, str]] = {}
for url, abs_path in paths.items():
rel = Path(abs_path).relative_to(input_dir)
rel = abs_path.relative_to(input_dir)
meta = url_meta.get(url, {"title": "", "description": ""})
result[rel] = {**meta, "original_filename": url_to_filename(url)}
result[rel] = {
"title": meta.get("title", ""),
"description": meta.get("description", ""),
"original_filename": url_to_filename(url),
}
return result
def find_videos(input_dir):
def find_videos(input_dir: str) -> set[Path]:
"""Walk input_dir and return a set of relative paths for all video files."""
found = set()
for root, dirs, files in os.walk(input_dir):
@@ -399,7 +455,12 @@ def find_videos(input_dir):
# ── Channel match helpers ─────────────────────────────────────────────
def _channel_match(rel, path_meta, existing):
def _channel_match(
rel: Path,
path_meta: dict[Path, dict[str, str]],
existing: set[str],
) -> tuple[bool, str]:
"""Return (matched, name) for a local file against the channel name set.
Checks both the title-derived name and the original-filename-derived name
@@ -409,38 +470,62 @@ def _channel_match(rel, path_meta, existing):
"""
meta = path_meta.get(rel, {})
name = make_pt_name(meta.get("title", ""), rel.name)
orig_fn = meta.get("original_filename", "")
raw_name = make_pt_name("", orig_fn) if orig_fn else None
matched = name in existing or (raw_name and raw_name != name and raw_name in existing)
raw_name: str | None = make_pt_name("", orig_fn) if orig_fn else None
matched = name in existing
if not matched and raw_name is not None and raw_name != name:
matched = raw_name in existing
return matched, name
# ── CLI ──────────────────────────────────────────────────────────────
def main():
def main() -> None:
ap = argparse.ArgumentParser(
description="Upload videos to PeerTube with transcoding-aware batching",
)
ap.add_argument("--input", "-i", default=DEFAULT_OUTPUT,
help=f"Directory with downloaded videos (default: {DEFAULT_OUTPUT})")
ap.add_argument("--url",
help="PeerTube instance URL (or set PEERTUBE_URL env var)")
ap.add_argument("--username", "-U",
help="PeerTube username (or set PEERTUBE_USER env var)")
ap.add_argument("--password", "-p",
help="PeerTube password (or set PEERTUBE_PASSWORD env var)")
ap.add_argument("--channel", "-C",
help="Channel to upload to (or set PEERTUBE_CHANNEL env var)")
ap.add_argument("--batch-size", "-b", type=int, default=DEFAULT_BATCH_SIZE,
help="Videos to upload before waiting for transcoding (default: 1)")
ap.add_argument("--poll-interval", type=int, default=DEFAULT_POLL,
help=f"Seconds between state polls (default: {DEFAULT_POLL})")
ap.add_argument("--skip-wait", action="store_true",
help="Upload everything without waiting for transcoding")
ap.add_argument("--nsfw", action="store_true",
help="Mark videos as NSFW")
ap.add_argument("--dry-run", "-n", action="store_true",
help="Preview what would be uploaded")
ap.add_argument(
"--input",
"-i",
default=DEFAULT_OUTPUT,
help=f"Directory with downloaded videos (default: {DEFAULT_OUTPUT})",
)
ap.add_argument("--url", help="PeerTube instance URL (or set PEERTUBE_URL env var)")
ap.add_argument(
"--username", "-U", help="PeerTube username (or set PEERTUBE_USER env var)"
)
ap.add_argument(
"--password", "-p", help="PeerTube password (or set PEERTUBE_PASSWORD env var)"
)
ap.add_argument(
"--channel", "-C", help="Channel to upload to (or set PEERTUBE_CHANNEL env var)"
)
ap.add_argument(
"--batch-size",
"-b",
type=int,
default=DEFAULT_BATCH_SIZE,
help="Videos to upload before waiting for transcoding (default: 1)",
)
ap.add_argument(
"--poll-interval",
type=int,
default=DEFAULT_POLL,
help=f"Seconds between state polls (default: {DEFAULT_POLL})",
)
ap.add_argument(
"--skip-wait",
action="store_true",
help="Upload everything without waiting for transcoding",
)
ap.add_argument("--nsfw", action="store_true", help="Mark videos as NSFW")
ap.add_argument(
"--dry-run", "-n", action="store_true", help="Preview what would be uploaded"
)
args = ap.parse_args()
url = args.url or os.environ.get("PEERTUBE_URL")
@@ -449,12 +534,16 @@ def main():
password = args.password or os.environ.get("PEERTUBE_PASSWORD")
if not args.dry_run:
missing = [label for label, val in [
missing = [
label
for label, val in [
("--url / PEERTUBE_URL", url),
("--username / PEERTUBE_USER", username),
("--channel / PEERTUBE_CHANNEL", channel),
("--password / PEERTUBE_PASSWORD", password),
] if not val]
]
if not val
]
if missing:
for label in missing:
print(f"[!] Required: {label}")
@@ -468,7 +557,8 @@ def main():
unmatched = on_disk - set(path_meta.keys())
if unmatched:
print(
f"[!] {len(unmatched)} file(s) on disk not in video_map (will use filename as title)")
f"[!] {len(unmatched)} file(s) on disk not in video_map (will use filename as title)"
)
for rel in unmatched:
path_meta[rel] = {"title": "", "description": ""}
@@ -493,10 +583,14 @@ def main():
sz = (Path(args.input) / rel).stat().st_size
total_bytes += sz
print(f" [{fmt_size(sz):>10}] {name}")
print(
f"\n Total: {fmt_size(total_bytes)} across {len(pending)} videos")
print(f"\n Total: {fmt_size(total_bytes)} across {len(pending)} videos")
return
assert url is not None
assert username is not None
assert channel is not None
assert password is not None
# ── authenticate ──
base = url.rstrip("/")
if not base.startswith("http"):
@@ -533,7 +627,9 @@ def main():
if _channel_match(rel, path_meta, existing)[0]:
pre_matched.append(rel)
if pre_matched:
print(f"\n[+] Pre-sweep: {len(pre_matched)} local file(s) already on channel — marking uploaded")
print(
f"\n[+] Pre-sweep: {len(pre_matched)} local file(s) already on channel — marking uploaded"
)
for rel in pre_matched:
mark_uploaded(args.input, rel)
pending = [rel for rel in pending if rel not in set(pre_matched)]
@@ -548,7 +644,8 @@ def main():
# ── flush batch if full ──
if not args.skip_wait and len(batch) >= args.batch_size:
print(
f"\n[+] Waiting for {len(batch)} video(s) to finish processing ...")
f"\n[+] Waiting for {len(batch)} video(s) to finish processing ..."
)
for uuid, bname in batch:
print(f"\n [{bname}]")
wait_for_published(base, token, uuid, args.poll_interval)
@@ -568,18 +665,19 @@ def main():
print(f"\n[{total_up + 1}/{len(pending)}] {name}")
print(f" File: {rel} ({fmt_size(sz)})")
ok, uuid = upload_video(
base, token, channel_id, filepath, name, desc, nsfw)
ok, uuid_opt = upload_video(
base, token, channel_id, filepath, name, desc, nsfw
)
if not ok:
continue
print(f" Uploaded uuid={uuid}")
print(f" Uploaded uuid={uuid_opt}")
mark_uploaded(args.input, rel)
total_up += 1
existing.add(name)
if uuid:
batch.append((uuid, name))
if uuid_opt is not None:
batch.append((uuid_opt, name))
# ── wait for final batch ──
if batch and not args.skip_wait:
@@ -589,8 +687,7 @@ def main():
wait_for_published(base, token, uuid, args.poll_interval)
except KeyboardInterrupt:
print(
f"\n\n[!] Interrupted after {total_up} uploads. Re-run to continue.")
print(f"\n\n[!] Interrupted after {total_up} uploads. Re-run to continue.")
sys.exit(130)
print(f"\n{'=' * 50}")
+12845 -1350
View File
File diff suppressed because one or more lines are too long