import argparse
import re
import os
import time
import signal
import asyncio
import requests
from pathlib import PurePosixPath
from typing import Any, Optional
from urllib.parse import urlparse
from dotenv import load_dotenv
from playwright.async_api import async_playwright, BrowserContext
from check_clashes import (
VIDEO_EXTS,
load_video_map,
save_video_map,
is_valid_url,
expects_video,
)
from config import SITES
from grab_cookie import login_and_get_cookie, update_env
load_dotenv()
def _is_video_url(url: str) -> bool:
"""True if `url` ends with a recognised video extension (case-insensitive, path only)."""
return PurePosixPath(urlparse(url).path).suffix.lower() in VIDEO_EXTS
SKIP_TYPES = {
"attachment",
"nav_menu_item",
"wp_block",
"wp_template",
"wp_template_part",
"wp_global_styles",
"wp_navigation",
"wp_font_family",
"wp_font_face",
}
MAX_WORKERS = 4
_USER_AGENT = (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:147.0) Gecko/20100101 Firefox/147.0"
)
def _api_headers(base_url: str, cookie_name: str, cookie_value: str) -> dict[str, str]:
return {
"User-Agent": _USER_AGENT,
"Accept": "application/json",
"Referer": f"{base_url}/",
"Cookie": f"{cookie_name}={cookie_value}; eav-age-verified=1",
}
def _select_probe_url(video_map: dict[str, Any]) -> str | None:
"""Pure function: return the first URL in video_map where expects_video() is True."""
return next((url for url in video_map if expects_video(url)), None)
def _probe_cookie(name: str, value: str, site_key: str) -> bool:
"""HEAD request to a members-only video page. Returns True if the cookie is still valid."""
video_map = load_video_map(site_key)
probe_url = _select_probe_url(video_map)
if probe_url is None:
return False # no video URLs yet — can't validate, fall through to re-auth
r = requests.head(
probe_url,
headers={"Cookie": f"{name}={value}", "User-Agent": _USER_AGENT},
allow_redirects=False,
timeout=10,
)
return r.status_code == 200
def _get_login_cookie(site_key: str, site_cfg: dict[str, str]) -> tuple[str, str]:
env_prefix = site_cfg["env_prefix"]
base_url = site_cfg["base_url"]
env_key = f"{env_prefix}_LOGIN_COOKIE"
username = os.environ.get(f"{env_prefix}_USERNAME", "").strip()
password = os.environ.get(f"{env_prefix}_PASSWORD", "").strip()
has_credentials = bool(username and password)
raw = os.environ.get(env_key, "").strip()
if raw:
name, _, value = raw.partition("=")
if value and name.startswith("wordpress_logged_in_"):
if not has_credentials:
return name, value # cookie-only mode — trust it
print(f"[{site_key}] Cookie found — validating…")
if _probe_cookie(name, value, site_key):
print(f"[{site_key}] Cookie still valid — skipping login.")
return name, value
print(f"[{site_key}] Cookie expired — re-authenticating…")
if has_credentials:
cookie_name, cookie_value = login_and_get_cookie(username, password, base_url)
action = update_env(cookie_name, cookie_value, env_key=env_key)
print(f"[{site_key}] Logged in: {cookie_name} ({action} in .env)")
return cookie_name, cookie_value
raise RuntimeError(
f"No credentials or cookie found for {site_key}. Set either:\n"
f" • {env_prefix}_USERNAME + {env_prefix}_PASSWORD (recommended)\n"
f" • {env_prefix}_LOGIN_COOKIE (fallback — may expire)\n"
"See .env.example."
)
def _has_credentials(site_cfg: dict[str, str]) -> bool:
env_prefix = site_cfg["env_prefix"]
has_cookie = bool(os.environ.get(f"{env_prefix}_LOGIN_COOKIE", "").strip())
has_creds = bool(
os.environ.get(f"{env_prefix}_USERNAME", "").strip()
and os.environ.get(f"{env_prefix}_PASSWORD", "").strip()
)
return has_cookie or has_creds
def discover_content_types(
session: requests.Session, wp_api: str
) -> list[tuple[str, str, str]]:
"""Query /wp-json/wp/v2/types and return a list of (name, rest_base, type_slug)."""
r = session.get(f"{wp_api}/types", timeout=30)
r.raise_for_status()
types = r.json()
targets = []
for type_slug, info in types.items():
if type_slug in SKIP_TYPES:
continue
rest_base = info.get("rest_base")
name = info.get("name", type_slug)
if rest_base:
targets.append((name, rest_base, type_slug))
return targets
def fetch_all_posts_for_type(
session: requests.Session,
wp_api: str,
base_url: str,
type_name: str,
rest_base: str,
type_slug: str,
) -> list[tuple[str, str, str]]:
"""Paginate one content type and return (url, title, description) tuples."""
url_prefix = type_slug.replace("_", "-")
results = []
page = 1
while True:
r = session.get(
f"{wp_api}/{rest_base}",
params={"per_page": 100, "page": page},
timeout=30,
)
if r.status_code == 400 or not r.ok:
break
data = r.json()
if not data:
break
for post in data:
link = post.get("link", "")
if not link.startswith("http"):
slug = post.get("slug")
if slug:
link = f"{base_url}/{url_prefix}/{slug}/"
else:
continue
title_obj = post.get("title", {})
title = (
title_obj.get("rendered", "")
if isinstance(title_obj, dict)
else str(title_obj)
)
content_obj = post.get("content", {})
content_html = (
content_obj.get("rendered", "") if isinstance(content_obj, dict) else ""
)
description = html_to_text(content_html) if content_html else ""
results.append((link, title, description))
print(f" {type_name} page {page}: {len(data)} items")
page += 1
return results
def fetch_post_urls_from_api(
site_key: str,
base_url: str,
wp_api: str,
headers: dict[str, str],
) -> list[str]:
"""Auto-discover all content types via the WP REST API and collect every post URL.
Also pre-populates video_map.json with titles."""
print(f"[{site_key}] video_map empty — discovering content types from REST API…")
session = requests.Session()
session.headers.update(headers)
targets = discover_content_types(session, wp_api)
print(
f"[{site_key}] Found {len(targets)} content types: "
f"{', '.join(name for name, _, _ in targets)}\n"
)
all_results = []
for type_name, rest_base, type_slug in targets:
type_results = fetch_all_posts_for_type(
session, wp_api, base_url, type_name, rest_base, type_slug
)
all_results.extend(type_results)
seen: set[str] = set()
deduped_urls = []
video_map = load_video_map(site_key)
for url, title, description in all_results:
if url not in seen and url.startswith("http"):
seen.add(url)
deduped_urls.append(url)
if url not in video_map:
video_map[url] = {
"title": title,
"description": description,
"videos": [],
}
else:
if not video_map[url].get("title"):
video_map[url]["title"] = title
if not video_map[url].get("description"):
video_map[url]["description"] = description
save_video_map(video_map, site_key)
print(
f"\n[{site_key}] Discovered {len(deduped_urls)} unique URLs → saved to video_map.json"
)
print(f"[{site_key}] Pre-populated {len(video_map)} entries")
return deduped_urls
def fetch_metadata_from_api(
site_key: str,
base_url: str,
wp_api: str,
video_map: dict[str, Any],
urls: list[str],
headers: dict[str, str],
) -> None:
"""Populate missing titles and descriptions in video_map from the REST API."""
missing = [
u
for u in urls
if u not in video_map
or not video_map[u].get("title")
or not video_map[u].get("description")
]
if not missing:
return
print(f"[{site_key}] Fetching metadata from REST API for {len(missing)} posts…")
session = requests.Session()
session.headers.update(headers)
targets = discover_content_types(session, wp_api)
for type_name, rest_base, type_slug in targets:
type_results = fetch_all_posts_for_type(
session, wp_api, base_url, type_name, rest_base, type_slug
)
for url, title, description in type_results:
if url in video_map:
if not video_map[url].get("title"):
video_map[url]["title"] = title
if not video_map[url].get("description"):
video_map[url]["description"] = description
else:
video_map[url] = {
"title": title,
"description": description,
"videos": [],
}
save_video_map(video_map, site_key)
populated_t = sum(1 for u in urls if video_map.get(u, {}).get("title"))
populated_d = sum(1 for u in urls if video_map.get(u, {}).get("description"))
print(f"[{site_key}] Titles populated: {populated_t}/{len(urls)}")
print(f"[{site_key}] Descriptions populated: {populated_d}/{len(urls)}")
def load_post_urls(
site_key: str,
base_url: str,
wp_api: str,
headers: dict[str, str],
) -> list[str]:
vm = load_video_map(site_key)
if vm:
print(f"[{site_key}] video_map found — loading {len(vm)} post URLs.")
return list(vm.keys())
return fetch_post_urls_from_api(site_key, base_url, wp_api, headers)
def html_to_text(html_str: str) -> str:
"""Strip HTML tags, decode entities, and collapse whitespace into clean plain text."""
import html
text = re.sub(r"
", "\n", html_str)
text = text.replace("