import argparse
import re
import os
import time
import signal
import asyncio
import requests
from pathlib import PurePosixPath
from typing import Any
from urllib.parse import urlparse
from dotenv import load_dotenv
from playwright.async_api import async_playwright, BrowserContext
from check_clashes import (
VIDEO_EXTS,
load_video_map,
save_video_map,
is_valid_url,
expects_video,
)
from config import SITES
from grab_cookie import login_and_get_cookie, update_env
load_dotenv()
def _is_video_url(url: str) -> bool:
"""True if `url` ends with a recognised video extension (case-insensitive, path only)."""
return PurePosixPath(urlparse(url).path).suffix.lower() in VIDEO_EXTS
SKIP_TYPES = {
"attachment",
"nav_menu_item",
"wp_block",
"wp_template",
"wp_template_part",
"wp_global_styles",
"wp_navigation",
"wp_font_family",
"wp_font_face",
}
MAX_WORKERS = 4
_USER_AGENT = (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:147.0) Gecko/20100101 Firefox/147.0"
)
def _api_headers(base_url: str, cookie_name: str, cookie_value: str) -> dict[str, str]:
return {
"User-Agent": _USER_AGENT,
"Accept": "application/json",
"Referer": f"{base_url}/",
"Cookie": f"{cookie_name}={cookie_value}; eav-age-verified=1",
}
def _select_probe_url(video_map: dict[str, Any]) -> str | None:
"""Pure function: return the first URL in video_map where expects_video() is True."""
return next((url for url in video_map if expects_video(url)), None)
def _probe_cookie(name: str, value: str, site_key: str) -> bool:
"""HEAD request to a members-only video page. Returns True if the cookie is still valid."""
video_map = load_video_map(site_key)
probe_url = _select_probe_url(video_map)
if probe_url is None:
return False # no video URLs yet — can't validate, fall through to re-auth
r = requests.head(
probe_url,
headers={"Cookie": f"{name}={value}", "User-Agent": _USER_AGENT},
allow_redirects=False,
timeout=10,
)
return r.status_code == 200
def _get_login_cookie(site_key: str, site_cfg: dict[str, str]) -> tuple[str, str]:
env_prefix = site_cfg["env_prefix"]
base_url = site_cfg["base_url"]
env_key = f"{env_prefix}_LOGIN_COOKIE"
username = os.environ.get(f"{env_prefix}_USERNAME", "").strip()
password = os.environ.get(f"{env_prefix}_PASSWORD", "").strip()
has_credentials = bool(username and password)
raw = os.environ.get(env_key, "").strip()
if raw:
name, _, value = raw.partition("=")
if value and name.startswith("wordpress_logged_in_"):
if not has_credentials:
return name, value # cookie-only mode — trust it
print(f"[{site_key}] Cookie found — validating…")
if _probe_cookie(name, value, site_key):
print(f"[{site_key}] Cookie still valid — skipping login.")
return name, value
print(f"[{site_key}] Cookie expired — re-authenticating…")
if has_credentials:
cookie_name, cookie_value = login_and_get_cookie(username, password, base_url)
action = update_env(cookie_name, cookie_value, env_key=env_key)
print(f"[{site_key}] Logged in: {cookie_name} ({action} in .env)")
return cookie_name, cookie_value
raise RuntimeError(
f"No credentials or cookie found for {site_key}. Set either:\n"
f" • {env_prefix}_USERNAME + {env_prefix}_PASSWORD (recommended)\n"
f" • {env_prefix}_LOGIN_COOKIE (fallback — may expire)\n"
"See .env.example."
)
def _has_credentials(site_cfg: dict[str, str]) -> bool:
env_prefix = site_cfg["env_prefix"]
has_cookie = bool(os.environ.get(f"{env_prefix}_LOGIN_COOKIE", "").strip())
has_creds = bool(
os.environ.get(f"{env_prefix}_USERNAME", "").strip()
and os.environ.get(f"{env_prefix}_PASSWORD", "").strip()
)
return has_cookie or has_creds
def discover_content_types(
session: requests.Session, wp_api: str
) -> list[tuple[str, str, str]]:
"""Query /wp-json/wp/v2/types and return a list of (name, rest_base, type_slug)."""
r = session.get(f"{wp_api}/types", timeout=30)
r.raise_for_status()
types = r.json()
targets = []
for type_slug, info in types.items():
if type_slug in SKIP_TYPES:
continue
rest_base = info.get("rest_base")
name = info.get("name", type_slug)
if rest_base:
targets.append((name, rest_base, type_slug))
return targets
def fetch_all_posts_for_type(
session: requests.Session,
wp_api: str,
base_url: str,
type_name: str,
rest_base: str,
type_slug: str,
) -> list[tuple[str, str, str]]:
"""Paginate one content type and return (url, title, description) tuples."""
url_prefix = type_slug.replace("_", "-")
results = []
page = 1
while True:
r = session.get(
f"{wp_api}/{rest_base}",
params={"per_page": 100, "page": page},
timeout=30,
)
if r.status_code == 400 or not r.ok:
break
data = r.json()
if not data:
break
for post in data:
link = post.get("link", "")
if not link.startswith("http"):
slug = post.get("slug")
if slug:
link = f"{base_url}/{url_prefix}/{slug}/"
else:
continue
title_obj = post.get("title", {})
title = (
title_obj.get("rendered", "")
if isinstance(title_obj, dict)
else str(title_obj)
)
content_obj = post.get("content", {})
content_html = (
content_obj.get("rendered", "") if isinstance(content_obj, dict) else ""
)
description = html_to_text(content_html) if content_html else ""
results.append((link, title, description))
print(f" {type_name} page {page}: {len(data)} items")
page += 1
return results
def fetch_post_urls_from_api(
site_key: str,
base_url: str,
wp_api: str,
headers: dict[str, str],
) -> list[str]:
"""Auto-discover all content types via the WP REST API and collect every post URL.
Also pre-populates video_map.json with titles."""
print(f"[{site_key}] video_map empty — discovering content types from REST API…")
session = requests.Session()
session.headers.update(headers)
targets = discover_content_types(session, wp_api)
print(
f"[{site_key}] Found {len(targets)} content types: "
f"{', '.join(name for name, _, _ in targets)}\n"
)
all_results = []
for type_name, rest_base, type_slug in targets:
type_results = fetch_all_posts_for_type(
session, wp_api, base_url, type_name, rest_base, type_slug
)
all_results.extend(type_results)
seen: set[str] = set()
deduped_urls = []
video_map = load_video_map(site_key)
for url, title, description in all_results:
if url not in seen and url.startswith("http"):
seen.add(url)
deduped_urls.append(url)
if url not in video_map:
video_map[url] = {
"title": title,
"description": description,
"videos": [],
}
else:
if not video_map[url].get("title"):
video_map[url]["title"] = title
if not video_map[url].get("description"):
video_map[url]["description"] = description
save_video_map(video_map, site_key)
print(
f"\n[{site_key}] Discovered {len(deduped_urls)} unique URLs → saved to video_map.json"
)
print(f"[{site_key}] Pre-populated {len(video_map)} entries")
return deduped_urls
def fetch_metadata_from_api(
site_key: str,
base_url: str,
wp_api: str,
video_map: dict[str, Any],
urls: list[str],
headers: dict[str, str],
) -> None:
"""Populate missing titles and descriptions in video_map from the REST API."""
missing = [
u
for u in urls
if u not in video_map
or not video_map[u].get("title")
or not video_map[u].get("description")
]
if not missing:
return
print(f"[{site_key}] Fetching metadata from REST API for {len(missing)} posts…")
session = requests.Session()
session.headers.update(headers)
targets = discover_content_types(session, wp_api)
for type_name, rest_base, type_slug in targets:
type_results = fetch_all_posts_for_type(
session, wp_api, base_url, type_name, rest_base, type_slug
)
for url, title, description in type_results:
if url in video_map:
if not video_map[url].get("title"):
video_map[url]["title"] = title
if not video_map[url].get("description"):
video_map[url]["description"] = description
else:
video_map[url] = {
"title": title,
"description": description,
"videos": [],
}
save_video_map(video_map, site_key)
populated_t = sum(1 for u in urls if video_map.get(u, {}).get("title"))
populated_d = sum(1 for u in urls if video_map.get(u, {}).get("description"))
print(f"[{site_key}] Titles populated: {populated_t}/{len(urls)}")
print(f"[{site_key}] Descriptions populated: {populated_d}/{len(urls)}")
def load_post_urls(
site_key: str,
base_url: str,
wp_api: str,
headers: dict[str, str],
) -> list[str]:
vm = load_video_map(site_key)
if vm:
print(f"[{site_key}] video_map found — loading {len(vm)} post URLs.")
return list(vm.keys())
return fetch_post_urls_from_api(site_key, base_url, wp_api, headers)
def html_to_text(html_str: str) -> str:
"""Strip HTML tags, decode entities, and collapse whitespace into clean plain text."""
import html
text = re.sub(r"
", "\n", html_str)
text = text.replace("