第一步:安装pychram

第二步:安装python

第三步:在pychram终端依次运行:

pip install requests beautifulsoup4
pip install requests
pip list

如果是macOS系统就运行

pip3 install bs4
pip3 install requests

第四步:在main.py中粘贴下面的代码,并把网站修改成你自己想抓取的

"""
从 finestcatchchamber.shop 商品页抓取并下载嵌入视频。

用法:
    python main.py
    python main.py "https://finestcatchchamber.shop/某个商品页/"
    python main.py --proxy http://127.0.0.1:7890
"""

from __future__ import annotations

import argparse
import os
import re
import sys
from pathlib import Path
from urllib.parse import unquote, urlparse

import requests

DEFAULT_URL = (
    "https:xxxxxxxxx.shop/"
    "calm-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxditation/"
)
OUTPUT_DIR = Path(__file__).resolve().parent

HEADERS = {
    "User-Agent": (
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
        "AppleWebKit/537.36 (KHTML, like Gecko) "
        "Chrome/120.0.0.0 Safari/537.36"
    ),
    "Referer": "https://xxxxxxxxxxxxxxxxxxxxxx.shop/",
}

VIDEO_PATTERNS = (
    r'data-video="(https?://[^"]+\.(?:mp4|webm|m3u8)[^"]*)"',
    r'<video[^>]+src="(https?://[^"]+\.(?:mp4|webm|m3u8)[^"]*)"',
    r'"(https?://player\.vimeo\.com/progressive_redirect/playback/[^"]+\.mp4[^"]*)"',
)


DEFAULT_PROXY_PORTS = (7897, 7890, 10809, 1080, 8080)


def detect_local_proxy() -> str | None:
    import socket

    for port in DEFAULT_PROXY_PORTS:
        try:
            with socket.create_connection(("127.0.0.1", port), timeout=0.5):
                return f"http://127.0.0.1:{port}"
        except OSError:
            continue
    return None


def build_session(proxy: str | None = None) -> requests.Session:
    session = requests.Session()
    session.headers.update(HEADERS)
    if proxy:
        session.proxies.update({"http": proxy, "https": proxy})
    return session


def fetch_page(session: requests.Session, url: str) -> str:
    response = session.get(url, timeout=30)
    response.raise_for_status()
    return response.text


def extract_video_urls(html: str) -> list[str]:
    found: list[str] = []
    seen: set[str] = set()

    for pattern in VIDEO_PATTERNS:
        for match in re.findall(pattern, html, flags=re.IGNORECASE):
            video_url = unquote(match.strip())
            if not video_url.startswith("http"):
                continue
            if video_url not in seen:
                seen.add(video_url)
                found.append(video_url)

    return found


def slug_from_page_url(page_url: str) -> str:
    slug = page_url.rstrip("/").split("/")[-1]
    return slug or "video"


def filename_from_url(url: str, page_slug: str, index: int) -> str:
    parsed = urlparse(url)
    rendition = "video"
    match = re.search(r"/rendition/([^/]+)/", parsed.path)
    if match:
        rendition = match.group(1)

    if index == 1 and page_slug:
        return f"{page_slug}_{rendition}.mp4"
    return f"{page_slug}_{rendition}_{index}.mp4"


def download_video(
    session: requests.Session,
    url: str,
    output_path: Path,
    chunk_size: int = 1024 * 1024,
) -> None:
    with session.get(url, stream=True, timeout=120) as response:
        response.raise_for_status()
        total = int(response.headers.get("Content-Length", 0))
        downloaded = 0

        with output_path.open("wb") as file:
            for chunk in response.iter_content(chunk_size=chunk_size):
                if not chunk:
                    continue
                file.write(chunk)
                downloaded += len(chunk)
                if total:
                    percent = downloaded * 100 // total
                    print(f"\r下载进度: {percent}% ({downloaded}/{total} bytes)", end="")

    print(f"\n已保存: {output_path}")


def scrape_and_download(
    page_url: str = DEFAULT_URL,
    output_dir: Path = OUTPUT_DIR,
    proxy: str | None = None,
) -> list[Path]:
    output_dir.mkdir(parents=True, exist_ok=True)
    proxy = proxy or os.environ.get("VIDEO_PROXY") or detect_local_proxy()
    if proxy:
        print(f"使用代理: {proxy}")

    session = build_session(proxy)
    page_slug = slug_from_page_url(page_url)

    print(f"正在抓取页面: {page_url}")
    html = fetch_page(session, page_url)
    video_urls = extract_video_urls(html)

    if not video_urls:
        raise RuntimeError("未在页面中找到视频链接,请检查页面结构是否变化。")

    print(f"找到 {len(video_urls)} 个视频链接")
    saved_files: list[Path] = []

    for index, video_url in enumerate(video_urls, start=1):
        output_path = output_dir / filename_from_url(video_url, page_slug, index)

        if output_path.exists() and output_path.stat().st_size > 0:
            print(f"文件已存在,跳过: {output_path}")
            saved_files.append(output_path)
            continue

        print(f"\n[{index}/{len(video_urls)}] 开始下载")
        print(f"视频地址: {video_url}")
        try:
            download_video(session, video_url, output_path)
        except requests.RequestException:
            if proxy:
                raise
            fallback_proxy = detect_local_proxy()
            if not fallback_proxy:
                raise
            print(f"直连失败,改用代理: {fallback_proxy}")
            download_video(build_session(fallback_proxy), video_url, output_path)

        saved_files.append(output_path)

    return saved_files


def parse_args() -> argparse.Namespace:
    parser = argparse.ArgumentParser(description="抓取商品页视频并保存到本地")
    parser.add_argument("url", nargs="?", default=DEFAULT_URL, help="商品页 URL")
    parser.add_argument(
        "--proxy",
        default=os.environ.get("VIDEO_PROXY"),
        help="代理地址,例如 http://127.0.0.1:7890",
    )
    return parser.parse_args()


def main() -> int:
    args = parse_args()

    try:
        saved = scrape_and_download(args.url, proxy=args.proxy)
    except requests.RequestException as exc:
        print(f"网络请求失败: {exc}", file=sys.stderr)
        print(
            "\n提示: 该页面视频托管在 Vimeo,若连接超时,请开启 VPN/代理后重试,例如:",
            file=sys.stderr,
        )
        print("  python main.py --proxy http://127.0.0.1:7890", file=sys.stderr)
        return 1
    except RuntimeError as exc:
        print(str(exc), file=sys.stderr)
        return 1

    print(f"\n完成,共保存 {len(saved)} 个文件到: {OUTPUT_DIR}")
    return 0


if __name__ == "__main__":
    raise SystemExit(main())

第五步:将网站地址改成你自己想抓取的

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐