声明与提示

  • 本文仅用于技术学习与交流,请勿用于商业用途,爬取内容需严格遵守《哔哩哔哩用户协议》及相关法律法规,不得侵犯B站及内容创作者的合法权益,严禁滥用技术爬取付费、版权保护内容。

  • 提示:本文为进阶教程,聚焦分段爬取的优化的底层逻辑与代码实现,基础爬虫语法、请求头构造、HTML解析等入门内容,请参考前两天的基础教程,建议先掌握基础内容后再学习本文。

一、本次优化核心点(对比上一版本)

上一版本仅实现了基础的分段下载与音视频合成功能,本次优化针对B站反爬机制、网络稳定性、程序容错性三大核心问题做了针对性升级,所有优化均围绕“提升下载成功率、规避反爬拦截、增强程序健壮性”展开,具体优化项如下:

优化项 上一版本存在的问题 进阶优化解决方案
Referer校验优化 下载请求Referer与目标视频页面不匹配,导致B站CDN拦截,返回404错误 将下载请求的Referer动态匹配目标视频页面,严格对齐B站CDN的权限校验规则
分段下载容错机制 单个分片下载失败后,程序直接终止,无法完成下载 新增「分段下载失败→整文件下载」兜底逻辑,分段请求失败时自动切换下载方式
全链路权限验证 部分下载请求未携带Cookie,导致权限不足,无法获取CDN资源 所有请求(页面解析、文件大小获取、分片下载、整文件下载)统一携带登录Cookie,确保通过B站权限校验
网络稳定性优化 无重试机制,网络抖动、临时连接失败时直接报错终止 关键请求(获取文件大小、分片下载)增加重试机制(获取大小3次重试,分片下载2次重试),降低网络波动影响
合成安全性校验 未校验临时音视频文件是否存在,空文件强行合成,导致FFmpeg报错 音视频合成前,先校验临时文件的存在性,避免无效合成操作
临时资源清理 下载失败或程序异常时,残留不完整的临时文件,占用资源且影响下次运行 新增异常捕获后的临时文件清理逻辑,无论程序成功与否,均确保临时文件不残留
日志优化 FFmpeg运行时输出大量冗余日志,影响阅读 给FFmpeg添加-loglevel error参数,仅输出错误日志,精简控制台输出

二、分段请求爬取的底层逻辑

1. 核心原理

分段请求(Range Request)是HTTP/1.1协议提供的核心功能,其核心目的是实现“断点续传”和“分块下载”,适用于大文件(如视频、音频)的下载场景。

客户端通过在请求头中携带 Range: bytes=start-end 字段,向服务端(此处为B站CDN)请求文件的指定字节范围数据;服务端收到请求后,会返回对应范围的文件数据,并通过响应头 Content-Range 告知客户端当前返回的字节范围及文件总大小,客户端将每次获取的分段数据追加写入本地文件,最终拼接成完整文件。

2. 底层执行流程

获取目标文件URL

发送HEAD请求,获取文件总大小

根据预设分片大小(如4MB),计算分段数量和每段的起止字节

循环构造每段的Range请求头(bytes=start-end)

发送分段请求,获取对应字节段数据

分段请求成功?

将分段数据追加写入本地临时文件

触发兜底策略:取消分段,执行整文件下载

所有分段下载完成?

执行音视频合成(若为流媒体分片)

清理临时文件,输出最终文件

3. 适用场景

  • 大文件下载:流媒体文件(视频、音频)通常体积较大,分段下载可避免单次请求超时,同时支持断点续传(若程序中断,下次可从已下载分段继续)。

  • 反爬规避:部分网站(如B站)会对单次请求的文件大小进行限制,分段下载可拆分请求,降低被反爬拦截的概率(需遵守网站规则,不可滥用)。

  • 网络不稳定场景:网络波动较大时,分段下载可减少单次请求的失败影响,某一分段失败仅需重新下载该分段,无需重新下载整个文件。

三、完整Python代码实现(进阶版)

说明:代码中未包含任何真实B站视频URL、Cookie等敏感信息,仅提供底层逻辑实现,使用时需自行替换合法的Cookie、目标视频相关配置,且需遵守B站用户协议。

import json
import re
import requests
import os
import subprocess
import time

# -------------------------- 基础配置(需自行替换合法信息) --------------------------
# 提示:Cookie需从自身合法登录的B站账号获取,仅用于技术学习,请勿泄露或滥用
cookies = {
    '_uuid': '替换为自身合法Cookie字段',
    'buvid_fp': '替换为自身合法Cookie字段',
    'buvid3': '替换为自身合法Cookie字段',
    'b_nut': '替换为自身合法Cookie字段',
    'CURRENT_QUALITY': '0',
    'rpdid': '替换为自身合法Cookie字段',
    'bili_ticket': '替换为自身合法Cookie字段',
    'bili_ticket_expires': '替换为自身合法Cookie字段',
    'buvid4': '替换为自身合法Cookie字段',
    'sid': '替换为自身合法Cookie字段',
    # 其余Cookie字段请自行补充,确保完整且合法
}

# 页面请求头(用于解析视频页面,获取音视频流信息)
page_headers = {
    'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
    'accept-language': 'zh-CN,zh;q=0.9',
    'cache-control': 'max-age=0',
    'priority': 'u=0, i',
    'referer': 'https://www.bilibili.com/',
    'sec-ch-ua': '"Chromium";v="146", "Not-A.Brand";v="24", "Google Chrome";v="146"',
    'sec-ch-ua-mobile': '?0',
    'sec-ch-ua-platform': '"Windows"',
    'sec-fetch-dest': 'document',
    'sec-fetch-mode': 'navigate',
    'sec-fetch-site': 'same-origin',
    'sec-fetch-user': '?1',
    'upgrade-insecure-requests': '1',
    'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/146.0.0.0 Safari/537.36',
}

# 下载请求头(关键优化:Referer需匹配目标视频页面,确保权限校验通过)
download_headers = {
    'Accept': '*/*',
    'Accept-Language': 'zh-CN,zh;q=0.9',
    'Connection': 'keep-alive',
    'Origin': 'https://www.bilibili.com',
    'Referer': 'https://www.bilibili.com/video/替换为目标视频BV号/',  # 需替换为真实目标视频页面URL
    'Sec-Fetch-Dest': 'empty',
    'Sec-Fetch-Mode': 'cors',
    'Sec-Fetch-Site': 'cross-site',
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/146.0.0.0 Safari/537.36',
    'sec-ch-ua': '"Chromium";v="146", "Not-A.Brand";v="24", "Google Chrome";v="146"',
    'sec-ch-ua-mobile': '?0',
    'sec-ch-ua-platform': '"Windows"',
    'Range': 'bytes=0-',  # 初始Range头,用于分段请求
}

# 分段下载配置
CHUNK_SIZE = 4 * 1024 * 1024  # 4MB每段(可根据网络情况调整)
VIDEO_TMP = "temp_video.m4s"  # 视频临时文件(流媒体分片格式)
AUDIO_TMP = "temp_audio.m4s"  # 音频临时文件(流媒体分片格式)
FINAL_VIDEO = "final_video.mp4"  # 最终合成的视频文件

# -------------------------- 1. 解析视频页面,获取音视频流URL(底层逻辑:HTML解析) --------------------------
def get_video_audio_urls():
    """
    解析B站视频页面,提取音视频流的baseUrl
    底层逻辑:通过正则表达式匹配页面中的window.__playinfo__字段,解析出JSON格式的音视频流信息
    """
    # 目标视频页面参数(需替换为真实参数)
    params = {
        'spm_id_from': '替换为目标视频页面的spm参数',
    }
    # 发送请求,获取视频页面HTML(携带Cookie,确保有权限解析)
    response = requests.get(
        'https://www.bilibili.com/video/替换为目标视频BV号/',  # 需替换为真实目标视频URL
        params=params, 
        cookies=cookies, 
        headers=page_headers
    )
    html = response.text
    
    # 正则匹配window.__playinfo__字段(存储音视频流信息的核心字段)
    pattern = re.compile(r"window\.__playinfo__=(.+?)")
    json_str = pattern.findall(html)[0]
    json_data = json.loads(json_str)

    # 提取最高分辨率视频流URL(按height降序排序,取第一个)
    video_streams = json_data["data"]["dash"]["video"]
    sorted_video_streams = sorted(video_streams, key=lambda x: x["height"], reverse=True)
    highest_res_video_url = sorted_video_streams[0]["baseUrl"]
    print(f"最高分辨率视频流URL获取成功(已隐藏真实地址)")
    print(f"分辨率: {sorted_video_streams[0]['width']}x{sorted_video_streams[0]['height']}")

    # 提取音频流URL(通常仅一个音频流,取第一个即可)
    audio_streams = json_data["data"]["dash"]["audio"]
    audio_url = audio_streams[0]["baseUrl"]
    print(f"音频流URL获取成功(已隐藏真实地址)")
    
    return highest_res_video_url, audio_url

# -------------------------- 2. 分段下载核心逻辑(含兜底策略) --------------------------
def download_file_fallback(url, save_path):
    """
    带容错的下载函数:先尝试分段下载,失败则切换为整文件下载
    底层逻辑:优先使用Range请求实现分段下载,提升效率;分段失败则取消Range请求,整文件流式下载
    """
    # 先尝试分段下载
    try:
        return download_file_chunked(url, save_path)
    except Exception as e:
        print(f"分段下载失败: {str(e)},尝试兜底方案:直接整文件下载...")
        # 兜底方案:直接整文件下载(移除Range头,避免分段请求)
        try:
            headers = download_headers.copy()
            headers.pop('Range', None)  # 移除分段请求头
            response = requests.get(
                url, 
                headers=headers, 
                cookies=cookies, 
                stream=True,  # 流式下载,避免一次性加载到内存
                timeout=60
            )
            response.raise_for_status()  # 若状态码非200,抛出异常
            
            with open(save_path, 'wb') as f:
                total_size = int(response.headers.get('content-length', 0))
                downloaded_size = 0
                # 流式写入文件,避免内存溢出
                for chunk in response.iter_content(chunk_size=8192):
                    if chunk:
                        f.write(chunk)
                        downloaded_size += len(chunk)
                        if total_size > 0:
                            progress = (downloaded_size / total_size) * 100
                            print(f"\r直接下载进度: {progress:.1f}%", end='')
            print(f"\n{save_path} 直接下载完成!")
            return True
        except Exception as e2:
            print(f"直接下载也失败: {str(e2)}")
            return False

def get_total_size(url):
    """
    获取文件总大小(用于分段计算)
    底层逻辑:发送HEAD请求(仅获取响应头,不获取文件内容),解析Content-Range或Content-Length字段
    """
    headers = download_headers.copy()
    headers.pop('Range', None)  # 移除Range头,确保HEAD请求能获取完整文件大小
    retry = 3  # 3次重试
    while retry > 0:
        try:
            response = requests.head(
                url, 
                headers=headers, 
                cookies=cookies, 
                timeout=30,
                allow_redirects=True  # 允许重定向,避免CDN重定向导致失败
            )
            response.raise_for_status()
            # 两种方式获取文件总大小,兼容不同服务端响应
            if "Content-Range" in response.headers:
                return int(response.headers["Content-Range"].split("/")[1])
            elif "Content-Length" in response.headers:
                return int(response.headers["Content-Length"])
            else:
                raise ValueError("无法获取文件大小,服务端未返回相关响应头")
        except:
            retry -= 1
            time.sleep(1)  # 重试间隔1秒
    raise Exception("获取文件大小失败,已达到最大重试次数")

def download_chunk(url, start, end, chunk_num):
    """
    下载单个分片
    底层逻辑:构造指定范围的Range请求头,请求对应字节段,返回分片数据
    """
    headers = download_headers.copy()
    headers['range'] = f'bytes={start}-{end}'  # 构造当前分片的Range请求
    
    retry = 2  # 2次重试
    while retry > 0:
        try:
            response = requests.get(
                url, 
                headers=headers, 
                cookies=cookies, 
                stream=True, 
                timeout=30,
                allow_redirects=True
            )
            response.raise_for_status()
            return response.content  # 返回分片二进制数据
        except Exception as e:
            retry -= 1
            time.sleep(1)
            print(f"分片 {chunk_num} 重试({2-retry}/2)失败: {str(e)}")
    return None

def download_file_chunked(url, save_path):
    """
    分段下载文件(核心函数)
    底层逻辑:1. 获取文件总大小;2. 计算分段起止字节;3. 循环下载每个分片;4. 追加写入本地文件
    """
    try:
        total_size = get_total_size(url)
        print(f"\n开始分段下载 {save_path} (总大小: {total_size / 1024 / 1024:.2f} MB)")
        
        # 若临时文件已存在,先删除(避免残留脏数据)
        if os.path.exists(save_path):
            os.remove(save_path)
        
        start = 0  # 初始分段起始字节
        chunk_num = 1  # 分片序号
        # 以追加模式打开文件,逐段写入
        with open(save_path, 'ab') as f:
            while start < total_size:
                # 计算当前分片的结束字节(避免最后一个分片超出总大小)
                end = min(start + CHUNK_SIZE - 1, total_size - 1)
                print(f"正在下载分片 {chunk_num}: {start} - {end} 字节")
                
                # 下载当前分片
                chunk_data = download_chunk(url, start, end, chunk_num)
                if chunk_data:
                    f.write(chunk_data)
                    print(f"分片 {chunk_num} 下载完成,大小: {len(chunk_data)/1024:.2f} KB")
                else:
                    raise Exception(f"分片 {chunk_num} 下载失败,已达到最大重试次数")
                
                # 更新下一分段的起始字节
                start = end + 1
                chunk_num += 1
        
        print(f"{save_path} 分段下载完成!最终大小: {os.path.getsize(save_path)/1024/1024:.2f} MB")
        return True
        
    except Exception as e:
        # 下载失败,清理不完整的临时文件
        if os.path.exists(save_path):
            os.remove(save_path)
        raise e  # 抛出异常,触发兜底策略

# -------------------------- 3. 音视频合成(FFmpeg) --------------------------
def merge_video_audio():
    """
    使用FFmpeg合成音视频(m4s格式→mp4格式)
    底层逻辑:将分离的视频流(m4s)和音频流(m4s)通过FFmpeg复制流的方式合成,不重新编码,提升效率
    """
    # 合成前先检查临时文件是否存在,避免报错
    if not os.path.exists(VIDEO_TMP):
        raise Exception(f"视频临时文件 {VIDEO_TMP} 不存在,无法合成")
    if not os.path.exists(AUDIO_TMP):
        raise Exception(f"音频临时文件 {AUDIO_TMP} 不存在,无法合成")
    
    # 检查FFmpeg是否安装(需提前安装并配置环境变量)
    try:
        subprocess.run(['ffmpeg', '-version'], capture_output=True, check=True)
    except (subprocess.CalledProcessError, FileNotFoundError):
        raise RuntimeError("未找到FFmpeg,请先安装并添加到系统环境变量")

    # FFmpeg合成命令(-c:v copy -c:a copy 表示复制流,不重新编码)
    cmd = [
        'ffmpeg', '-y', '-loglevel', 'error',  # -y覆盖已有文件,-loglevel error仅输出错误
        '-i', VIDEO_TMP,  # 输入视频临时文件
        '-i', AUDIO_TMP,  # 输入音频临时文件
        '-c:v', 'copy',  # 视频流复制
        '-c:a', 'copy',  # 音频流复制
        '-shortest',  # 按最短的流截断(避免音视频时长不一致)
        FINAL_VIDEO  # 最终输出文件
    ]

    print("\n开始合成音视频...")
    result = subprocess.run(cmd, capture_output=True, text=True)
    if result.returncode != 0:
        raise RuntimeError(f"合成失败: {result.stderr}")

    print(f"合成完成!最终文件: {FINAL_VIDEO}")

# -------------------------- 4. 主流程(串联所有逻辑) --------------------------
if __name__ == "__main__":
    try:
        # 1. 解析视频页面,获取音视频流URL
        video_url, audio_url = get_video_audio_urls()
        
        # 2. 下载音视频(带容错兜底)
        video_ok = download_file_fallback(video_url, VIDEO_TMP)
        audio_ok = download_file_fallback(audio_url, AUDIO_TMP)
        
        # 检查下载结果,若有一个失败,终止合成
        if not video_ok or not audio_ok:
            raise Exception("音视频下载失败,终止合成操作")
        
        # 3. 合成音视频
        merge_video_audio()
        
        # 4. 清理临时文件(避免占用资源)
        os.remove(VIDEO_TMP)
        os.remove(AUDIO_TMP)
        print("临时文件已清理")
        
        print(f"\n✅ 全部完成!最终视频文件: {FINAL_VIDEO}")
        
    except Exception as e:
        print(f"\n❌ 执行失败: {str(e)}")
        # 异常时也清理临时文件,避免残留
        try:
            if os.path.exists(VIDEO_TMP):
                os.remove(VIDEO_TMP)
            if os.path.exists(AUDIO_TMP):
                os.remove(AUDIO_TMP)
        except:
            pass

# -------------------------- 作者备注 --------------------------
# 作者备注:本文代码及逻辑仅用于B站流媒体爬取的技术学习,所有操作均需遵守B站用户协议及相关法律法规。
# 作者在B站平台相关技术交流中,始终倡导“合法学习、合规爬取”,严禁利用本文技术侵犯B站及创作者权益。

四、curl转换工具推荐(实用技巧)

在B站爬虫调试过程中,若遇到过长的curl(bash)命令无法手动转换为Python脚本,推荐使用curlconverter 网站(可直接在浏览器搜索“curlconverter”找到)。该网站为万能curl转换工具,支持将任意长度、任意格式的curl命令一键转换为Python脚本格式,代码中需要自行添加的Cookie、请求头、参数等字段,均可通过该网站转换后直接复制填入,无需手动编写请求逻辑,大幅提升调试效率,确保请求参数与curl命令完全一致,避免因参数遗漏、格式错误导致的爬取失败。

五、关键注意事项

  1. 权限与合规:所有Cookie、视频URL需从自身合法登录的B站账号获取,仅用于技术学习,不得泄露、分享或用于商业用途,严禁爬取付费、版权保护内容。

  2. 环境依赖:代码依赖FFmpeg(用于音视频合成),需提前安装并配置系统环境变量,否则会报错“未找到FFmpeg”。

  3. 参数替换:代码中所有“替换为XXX”的字段(Cookie、视频BV号、Referer等),需自行替换为真实、合法的信息,否则无法正常运行。

  4. 反爬规避:请勿频繁、批量爬取B站内容,避免触发B站反爬机制(如IP封禁、账号限制),尊重平台规则。

  5. 基础前提:本文为进阶教程,若未掌握基础的爬虫请求、HTML解析、Cookie使用等知识,建议先学习前两天的基础教程,再使用本文代码。

六、作者备注

作者备注:本文代码及逻辑仅用于B站流媒体爬取的技术学习,所有操作均需遵守B站用户协议及相关法律法规。作者在B站平台相关技术交流中,始终倡导“合法学习、合规爬取”,严禁利用本文技术侵犯B站及创作者权益。再次提醒:本文为进阶教程,基础操作请参考前两天的基础教程,请勿跳过基础直接使用进阶代码。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐