B站流媒体分段爬取进阶教程:分段请求下载与容错优化
声明与提示
-
本文仅用于技术学习与交流,请勿用于商业用途,爬取内容需严格遵守《哔哩哔哩用户协议》及相关法律法规,不得侵犯B站及内容创作者的合法权益,严禁滥用技术爬取付费、版权保护内容。
-
提示:本文为进阶教程,聚焦分段爬取的优化的底层逻辑与代码实现,基础爬虫语法、请求头构造、HTML解析等入门内容,请参考前两天的基础教程,建议先掌握基础内容后再学习本文。
一、本次优化核心点(对比上一版本)
上一版本仅实现了基础的分段下载与音视频合成功能,本次优化针对B站反爬机制、网络稳定性、程序容错性三大核心问题做了针对性升级,所有优化均围绕“提升下载成功率、规避反爬拦截、增强程序健壮性”展开,具体优化项如下:
| 优化项 | 上一版本存在的问题 | 进阶优化解决方案 |
|---|---|---|
| Referer校验优化 | 下载请求Referer与目标视频页面不匹配,导致B站CDN拦截,返回404错误 | 将下载请求的Referer动态匹配目标视频页面,严格对齐B站CDN的权限校验规则 |
| 分段下载容错机制 | 单个分片下载失败后,程序直接终止,无法完成下载 | 新增「分段下载失败→整文件下载」兜底逻辑,分段请求失败时自动切换下载方式 |
| 全链路权限验证 | 部分下载请求未携带Cookie,导致权限不足,无法获取CDN资源 | 所有请求(页面解析、文件大小获取、分片下载、整文件下载)统一携带登录Cookie,确保通过B站权限校验 |
| 网络稳定性优化 | 无重试机制,网络抖动、临时连接失败时直接报错终止 | 关键请求(获取文件大小、分片下载)增加重试机制(获取大小3次重试,分片下载2次重试),降低网络波动影响 |
| 合成安全性校验 | 未校验临时音视频文件是否存在,空文件强行合成,导致FFmpeg报错 | 音视频合成前,先校验临时文件的存在性,避免无效合成操作 |
| 临时资源清理 | 下载失败或程序异常时,残留不完整的临时文件,占用资源且影响下次运行 | 新增异常捕获后的临时文件清理逻辑,无论程序成功与否,均确保临时文件不残留 |
| 日志优化 | FFmpeg运行时输出大量冗余日志,影响阅读 | 给FFmpeg添加-loglevel error参数,仅输出错误日志,精简控制台输出 |
二、分段请求爬取的底层逻辑
1. 核心原理
分段请求(Range Request)是HTTP/1.1协议提供的核心功能,其核心目的是实现“断点续传”和“分块下载”,适用于大文件(如视频、音频)的下载场景。
客户端通过在请求头中携带 Range: bytes=start-end 字段,向服务端(此处为B站CDN)请求文件的指定字节范围数据;服务端收到请求后,会返回对应范围的文件数据,并通过响应头 Content-Range 告知客户端当前返回的字节范围及文件总大小,客户端将每次获取的分段数据追加写入本地文件,最终拼接成完整文件。
2. 底层执行流程
3. 适用场景
-
大文件下载:流媒体文件(视频、音频)通常体积较大,分段下载可避免单次请求超时,同时支持断点续传(若程序中断,下次可从已下载分段继续)。
-
反爬规避:部分网站(如B站)会对单次请求的文件大小进行限制,分段下载可拆分请求,降低被反爬拦截的概率(需遵守网站规则,不可滥用)。
-
网络不稳定场景:网络波动较大时,分段下载可减少单次请求的失败影响,某一分段失败仅需重新下载该分段,无需重新下载整个文件。
三、完整Python代码实现(进阶版)
说明:代码中未包含任何真实B站视频URL、Cookie等敏感信息,仅提供底层逻辑实现,使用时需自行替换合法的Cookie、目标视频相关配置,且需遵守B站用户协议。
import json
import re
import requests
import os
import subprocess
import time
# -------------------------- 基础配置(需自行替换合法信息) --------------------------
# 提示:Cookie需从自身合法登录的B站账号获取,仅用于技术学习,请勿泄露或滥用
cookies = {
'_uuid': '替换为自身合法Cookie字段',
'buvid_fp': '替换为自身合法Cookie字段',
'buvid3': '替换为自身合法Cookie字段',
'b_nut': '替换为自身合法Cookie字段',
'CURRENT_QUALITY': '0',
'rpdid': '替换为自身合法Cookie字段',
'bili_ticket': '替换为自身合法Cookie字段',
'bili_ticket_expires': '替换为自身合法Cookie字段',
'buvid4': '替换为自身合法Cookie字段',
'sid': '替换为自身合法Cookie字段',
# 其余Cookie字段请自行补充,确保完整且合法
}
# 页面请求头(用于解析视频页面,获取音视频流信息)
page_headers = {
'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'accept-language': 'zh-CN,zh;q=0.9',
'cache-control': 'max-age=0',
'priority': 'u=0, i',
'referer': 'https://www.bilibili.com/',
'sec-ch-ua': '"Chromium";v="146", "Not-A.Brand";v="24", "Google Chrome";v="146"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
'sec-fetch-dest': 'document',
'sec-fetch-mode': 'navigate',
'sec-fetch-site': 'same-origin',
'sec-fetch-user': '?1',
'upgrade-insecure-requests': '1',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/146.0.0.0 Safari/537.36',
}
# 下载请求头(关键优化:Referer需匹配目标视频页面,确保权限校验通过)
download_headers = {
'Accept': '*/*',
'Accept-Language': 'zh-CN,zh;q=0.9',
'Connection': 'keep-alive',
'Origin': 'https://www.bilibili.com',
'Referer': 'https://www.bilibili.com/video/替换为目标视频BV号/', # 需替换为真实目标视频页面URL
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'cross-site',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/146.0.0.0 Safari/537.36',
'sec-ch-ua': '"Chromium";v="146", "Not-A.Brand";v="24", "Google Chrome";v="146"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
'Range': 'bytes=0-', # 初始Range头,用于分段请求
}
# 分段下载配置
CHUNK_SIZE = 4 * 1024 * 1024 # 4MB每段(可根据网络情况调整)
VIDEO_TMP = "temp_video.m4s" # 视频临时文件(流媒体分片格式)
AUDIO_TMP = "temp_audio.m4s" # 音频临时文件(流媒体分片格式)
FINAL_VIDEO = "final_video.mp4" # 最终合成的视频文件
# -------------------------- 1. 解析视频页面,获取音视频流URL(底层逻辑:HTML解析) --------------------------
def get_video_audio_urls():
"""
解析B站视频页面,提取音视频流的baseUrl
底层逻辑:通过正则表达式匹配页面中的window.__playinfo__字段,解析出JSON格式的音视频流信息
"""
# 目标视频页面参数(需替换为真实参数)
params = {
'spm_id_from': '替换为目标视频页面的spm参数',
}
# 发送请求,获取视频页面HTML(携带Cookie,确保有权限解析)
response = requests.get(
'https://www.bilibili.com/video/替换为目标视频BV号/', # 需替换为真实目标视频URL
params=params,
cookies=cookies,
headers=page_headers
)
html = response.text
# 正则匹配window.__playinfo__字段(存储音视频流信息的核心字段)
pattern = re.compile(r"window\.__playinfo__=(.+?)")
json_str = pattern.findall(html)[0]
json_data = json.loads(json_str)
# 提取最高分辨率视频流URL(按height降序排序,取第一个)
video_streams = json_data["data"]["dash"]["video"]
sorted_video_streams = sorted(video_streams, key=lambda x: x["height"], reverse=True)
highest_res_video_url = sorted_video_streams[0]["baseUrl"]
print(f"最高分辨率视频流URL获取成功(已隐藏真实地址)")
print(f"分辨率: {sorted_video_streams[0]['width']}x{sorted_video_streams[0]['height']}")
# 提取音频流URL(通常仅一个音频流,取第一个即可)
audio_streams = json_data["data"]["dash"]["audio"]
audio_url = audio_streams[0]["baseUrl"]
print(f"音频流URL获取成功(已隐藏真实地址)")
return highest_res_video_url, audio_url
# -------------------------- 2. 分段下载核心逻辑(含兜底策略) --------------------------
def download_file_fallback(url, save_path):
"""
带容错的下载函数:先尝试分段下载,失败则切换为整文件下载
底层逻辑:优先使用Range请求实现分段下载,提升效率;分段失败则取消Range请求,整文件流式下载
"""
# 先尝试分段下载
try:
return download_file_chunked(url, save_path)
except Exception as e:
print(f"分段下载失败: {str(e)},尝试兜底方案:直接整文件下载...")
# 兜底方案:直接整文件下载(移除Range头,避免分段请求)
try:
headers = download_headers.copy()
headers.pop('Range', None) # 移除分段请求头
response = requests.get(
url,
headers=headers,
cookies=cookies,
stream=True, # 流式下载,避免一次性加载到内存
timeout=60
)
response.raise_for_status() # 若状态码非200,抛出异常
with open(save_path, 'wb') as f:
total_size = int(response.headers.get('content-length', 0))
downloaded_size = 0
# 流式写入文件,避免内存溢出
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
downloaded_size += len(chunk)
if total_size > 0:
progress = (downloaded_size / total_size) * 100
print(f"\r直接下载进度: {progress:.1f}%", end='')
print(f"\n{save_path} 直接下载完成!")
return True
except Exception as e2:
print(f"直接下载也失败: {str(e2)}")
return False
def get_total_size(url):
"""
获取文件总大小(用于分段计算)
底层逻辑:发送HEAD请求(仅获取响应头,不获取文件内容),解析Content-Range或Content-Length字段
"""
headers = download_headers.copy()
headers.pop('Range', None) # 移除Range头,确保HEAD请求能获取完整文件大小
retry = 3 # 3次重试
while retry > 0:
try:
response = requests.head(
url,
headers=headers,
cookies=cookies,
timeout=30,
allow_redirects=True # 允许重定向,避免CDN重定向导致失败
)
response.raise_for_status()
# 两种方式获取文件总大小,兼容不同服务端响应
if "Content-Range" in response.headers:
return int(response.headers["Content-Range"].split("/")[1])
elif "Content-Length" in response.headers:
return int(response.headers["Content-Length"])
else:
raise ValueError("无法获取文件大小,服务端未返回相关响应头")
except:
retry -= 1
time.sleep(1) # 重试间隔1秒
raise Exception("获取文件大小失败,已达到最大重试次数")
def download_chunk(url, start, end, chunk_num):
"""
下载单个分片
底层逻辑:构造指定范围的Range请求头,请求对应字节段,返回分片数据
"""
headers = download_headers.copy()
headers['range'] = f'bytes={start}-{end}' # 构造当前分片的Range请求
retry = 2 # 2次重试
while retry > 0:
try:
response = requests.get(
url,
headers=headers,
cookies=cookies,
stream=True,
timeout=30,
allow_redirects=True
)
response.raise_for_status()
return response.content # 返回分片二进制数据
except Exception as e:
retry -= 1
time.sleep(1)
print(f"分片 {chunk_num} 重试({2-retry}/2)失败: {str(e)}")
return None
def download_file_chunked(url, save_path):
"""
分段下载文件(核心函数)
底层逻辑:1. 获取文件总大小;2. 计算分段起止字节;3. 循环下载每个分片;4. 追加写入本地文件
"""
try:
total_size = get_total_size(url)
print(f"\n开始分段下载 {save_path} (总大小: {total_size / 1024 / 1024:.2f} MB)")
# 若临时文件已存在,先删除(避免残留脏数据)
if os.path.exists(save_path):
os.remove(save_path)
start = 0 # 初始分段起始字节
chunk_num = 1 # 分片序号
# 以追加模式打开文件,逐段写入
with open(save_path, 'ab') as f:
while start < total_size:
# 计算当前分片的结束字节(避免最后一个分片超出总大小)
end = min(start + CHUNK_SIZE - 1, total_size - 1)
print(f"正在下载分片 {chunk_num}: {start} - {end} 字节")
# 下载当前分片
chunk_data = download_chunk(url, start, end, chunk_num)
if chunk_data:
f.write(chunk_data)
print(f"分片 {chunk_num} 下载完成,大小: {len(chunk_data)/1024:.2f} KB")
else:
raise Exception(f"分片 {chunk_num} 下载失败,已达到最大重试次数")
# 更新下一分段的起始字节
start = end + 1
chunk_num += 1
print(f"{save_path} 分段下载完成!最终大小: {os.path.getsize(save_path)/1024/1024:.2f} MB")
return True
except Exception as e:
# 下载失败,清理不完整的临时文件
if os.path.exists(save_path):
os.remove(save_path)
raise e # 抛出异常,触发兜底策略
# -------------------------- 3. 音视频合成(FFmpeg) --------------------------
def merge_video_audio():
"""
使用FFmpeg合成音视频(m4s格式→mp4格式)
底层逻辑:将分离的视频流(m4s)和音频流(m4s)通过FFmpeg复制流的方式合成,不重新编码,提升效率
"""
# 合成前先检查临时文件是否存在,避免报错
if not os.path.exists(VIDEO_TMP):
raise Exception(f"视频临时文件 {VIDEO_TMP} 不存在,无法合成")
if not os.path.exists(AUDIO_TMP):
raise Exception(f"音频临时文件 {AUDIO_TMP} 不存在,无法合成")
# 检查FFmpeg是否安装(需提前安装并配置环境变量)
try:
subprocess.run(['ffmpeg', '-version'], capture_output=True, check=True)
except (subprocess.CalledProcessError, FileNotFoundError):
raise RuntimeError("未找到FFmpeg,请先安装并添加到系统环境变量")
# FFmpeg合成命令(-c:v copy -c:a copy 表示复制流,不重新编码)
cmd = [
'ffmpeg', '-y', '-loglevel', 'error', # -y覆盖已有文件,-loglevel error仅输出错误
'-i', VIDEO_TMP, # 输入视频临时文件
'-i', AUDIO_TMP, # 输入音频临时文件
'-c:v', 'copy', # 视频流复制
'-c:a', 'copy', # 音频流复制
'-shortest', # 按最短的流截断(避免音视频时长不一致)
FINAL_VIDEO # 最终输出文件
]
print("\n开始合成音视频...")
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
raise RuntimeError(f"合成失败: {result.stderr}")
print(f"合成完成!最终文件: {FINAL_VIDEO}")
# -------------------------- 4. 主流程(串联所有逻辑) --------------------------
if __name__ == "__main__":
try:
# 1. 解析视频页面,获取音视频流URL
video_url, audio_url = get_video_audio_urls()
# 2. 下载音视频(带容错兜底)
video_ok = download_file_fallback(video_url, VIDEO_TMP)
audio_ok = download_file_fallback(audio_url, AUDIO_TMP)
# 检查下载结果,若有一个失败,终止合成
if not video_ok or not audio_ok:
raise Exception("音视频下载失败,终止合成操作")
# 3. 合成音视频
merge_video_audio()
# 4. 清理临时文件(避免占用资源)
os.remove(VIDEO_TMP)
os.remove(AUDIO_TMP)
print("临时文件已清理")
print(f"\n✅ 全部完成!最终视频文件: {FINAL_VIDEO}")
except Exception as e:
print(f"\n❌ 执行失败: {str(e)}")
# 异常时也清理临时文件,避免残留
try:
if os.path.exists(VIDEO_TMP):
os.remove(VIDEO_TMP)
if os.path.exists(AUDIO_TMP):
os.remove(AUDIO_TMP)
except:
pass
# -------------------------- 作者备注 --------------------------
# 作者备注:本文代码及逻辑仅用于B站流媒体爬取的技术学习,所有操作均需遵守B站用户协议及相关法律法规。
# 作者在B站平台相关技术交流中,始终倡导“合法学习、合规爬取”,严禁利用本文技术侵犯B站及创作者权益。
四、curl转换工具推荐(实用技巧)
在B站爬虫调试过程中,若遇到过长的curl(bash)命令无法手动转换为Python脚本,推荐使用curlconverter 网站(可直接在浏览器搜索“curlconverter”找到)。该网站为万能curl转换工具,支持将任意长度、任意格式的curl命令一键转换为Python脚本格式,代码中需要自行添加的Cookie、请求头、参数等字段,均可通过该网站转换后直接复制填入,无需手动编写请求逻辑,大幅提升调试效率,确保请求参数与curl命令完全一致,避免因参数遗漏、格式错误导致的爬取失败。
五、关键注意事项
-
权限与合规:所有Cookie、视频URL需从自身合法登录的B站账号获取,仅用于技术学习,不得泄露、分享或用于商业用途,严禁爬取付费、版权保护内容。
-
环境依赖:代码依赖FFmpeg(用于音视频合成),需提前安装并配置系统环境变量,否则会报错“未找到FFmpeg”。
-
参数替换:代码中所有“替换为XXX”的字段(Cookie、视频BV号、Referer等),需自行替换为真实、合法的信息,否则无法正常运行。
-
反爬规避:请勿频繁、批量爬取B站内容,避免触发B站反爬机制(如IP封禁、账号限制),尊重平台规则。
-
基础前提:本文为进阶教程,若未掌握基础的爬虫请求、HTML解析、Cookie使用等知识,建议先学习前两天的基础教程,再使用本文代码。
六、作者备注
作者备注:本文代码及逻辑仅用于B站流媒体爬取的技术学习,所有操作均需遵守B站用户协议及相关法律法规。作者在B站平台相关技术交流中,始终倡导“合法学习、合规爬取”,严禁利用本文技术侵犯B站及创作者权益。再次提醒:本文为进阶教程,基础操作请参考前两天的基础教程,请勿跳过基础直接使用进阶代码。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)