import re
import json
import os
import time
import signal
import asyncio
import tempfile
import requests
from pathlib import Path, PurePosixPath
from urllib.parse import urlparse
from dotenv import load_dotenv
from playwright.async_api import async_playwright
from check_clashes import VIDEO_EXTS
from config import BASE_URL
load_dotenv()
def _is_video_url(url):
"""True if `url` ends with a recognised video extension (case-insensitive, path only)."""
return PurePosixPath(urlparse(url).path).suffix.lower() in VIDEO_EXTS
WP_API = f"{BASE_URL}/wp-json/wp/v2"
SKIP_TYPES = {
"attachment", "nav_menu_item", "wp_block", "wp_template",
"wp_template_part", "wp_global_styles", "wp_navigation",
"wp_font_family", "wp_font_face",
}
VIDEO_MAP_FILE = "video_map.json"
MAX_WORKERS = 4
API_HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:147.0) Gecko/20100101 Firefox/147.0",
"Accept": "application/json",
"Referer": f"{BASE_URL}/",
}
def _get_login_cookie():
raw = os.environ.get("WP_LOGIN_COOKIE", "").strip() # strip accidental whitespace
if not raw:
raise RuntimeError(
"WP_LOGIN_COOKIE not set. Copy it from your browser into .env — see .env.example.")
name, _, value = raw.partition("=")
if not value:
raise RuntimeError(
"WP_LOGIN_COOKIE looks malformed (no '=' found). Expected: name=value")
if not name.startswith("wordpress_logged_in_"):
raise RuntimeError(
"WP_LOGIN_COOKIE doesn't look right — expected a wordpress_logged_in_... cookie.")
return name, value
def discover_content_types(session):
"""Query /wp-json/wp/v2/types and return a list of (name, rest_base, type_slug) for content types worth scraping."""
r = session.get(f"{WP_API}/types", timeout=30)
r.raise_for_status()
types = r.json()
targets = []
for type_slug, info in types.items():
if type_slug in SKIP_TYPES:
continue
rest_base = info.get("rest_base")
name = info.get("name", type_slug)
if rest_base:
targets.append((name, rest_base, type_slug))
return targets
def fetch_all_posts_for_type(session, type_name, rest_base, type_slug):
"""Paginate one content type and return (url, title, description) tuples.
Uses the `link` field when available; falls back to building from slug."""
url_prefix = type_slug.replace("_", "-")
results = []
page = 1
while True:
r = session.get(
f"{WP_API}/{rest_base}",
params={"per_page": 100, "page": page},
timeout=30,
)
if r.status_code == 400 or not r.ok:
break
data = r.json()
if not data:
break
for post in data:
link = post.get("link", "")
if not link.startswith("http"):
slug = post.get("slug")
if slug:
link = f"{BASE_URL}/{url_prefix}/{slug}/"
else:
continue
title_obj = post.get("title", {})
title = title_obj.get("rendered", "") if isinstance(
title_obj, dict) else str(title_obj)
content_obj = post.get("content", {})
content_html = content_obj.get(
"rendered", "") if isinstance(content_obj, dict) else ""
description = html_to_text(content_html) if content_html else ""
results.append((link, title, description))
print(f" {type_name} page {page}: {len(data)} items")
page += 1
return results
def fetch_post_urls_from_api(headers):
"""Auto-discover all content types via the WP REST API and collect every post URL.
Also builds video_map.json with titles pre-populated."""
print("[+] video_map.json empty or missing — discovering content types from REST API…")
session = requests.Session()
session.headers.update(headers)
targets = discover_content_types(session)
print(
f"[+] Found {len(targets)} content types: {', '.join(name for name, _, _ in targets)}\n")
all_results = []
for type_name, rest_base, type_slug in targets:
type_results = fetch_all_posts_for_type(
session, type_name, rest_base, type_slug)
all_results.extend(type_results)
seen = set()
deduped_urls = []
video_map = load_video_map()
for url, title, description in all_results:
if url not in seen and url.startswith("http"):
seen.add(url)
deduped_urls.append(url)
if url not in video_map:
video_map[url] = {"title": title,
"description": description, "videos": []}
else:
if not video_map[url].get("title"):
video_map[url]["title"] = title
if not video_map[url].get("description"):
video_map[url]["description"] = description
save_video_map(video_map)
print(
f"\n[+] Discovered {len(deduped_urls)} unique URLs → saved to {VIDEO_MAP_FILE}")
print(
f"[+] Pre-populated {len(video_map)} entries in {VIDEO_MAP_FILE}")
return deduped_urls
def fetch_metadata_from_api(video_map, urls, headers):
"""Populate missing titles and descriptions in video_map from the REST API."""
missing = [u for u in urls
if u not in video_map
or not video_map[u].get("title")
or not video_map[u].get("description")]
if not missing:
return
print(f"[+] Fetching metadata from REST API for {len(missing)} posts…")
session = requests.Session()
session.headers.update(headers)
targets = discover_content_types(session)
for type_name, rest_base, type_slug in targets:
type_results = fetch_all_posts_for_type(
session, type_name, rest_base, type_slug)
for url, title, description in type_results:
if url in video_map:
if not video_map[url].get("title"):
video_map[url]["title"] = title
if not video_map[url].get("description"):
video_map[url]["description"] = description
else:
video_map[url] = {"title": title,
"description": description, "videos": []}
save_video_map(video_map)
populated_t = sum(1 for u in urls if video_map.get(u, {}).get("title"))
populated_d = sum(1 for u in urls if video_map.get(
u, {}).get("description"))
print(f"[+] Titles populated: {populated_t}/{len(urls)}")
print(f"[+] Descriptions populated: {populated_d}/{len(urls)}")
def load_post_urls(headers):
vm = load_video_map()
if vm:
print(f"[+] {VIDEO_MAP_FILE} found — loading {len(vm)} post URLs.")
return list(vm.keys())
return fetch_post_urls_from_api(headers)
def html_to_text(html_str):
"""Strip HTML tags, decode entities, and collapse whitespace into clean plain text."""
import html
text = re.sub(r'
', '\n', html_str)
text = text.replace('