【爬虫实战】从零搭建生产级异步爬虫:Redis分布式去重+aiohttp高并发+自动重试
前言
正所谓Python离不开爬虫,就像西方离不开耶路撒冷~~~
学python但不学爬虫,那么你将失去提升自己,磨练意志的机会。
注意!!!本章演示网站为https://books.toscrape.com
此网站为专门练习爬虫的网站,不用担心封IP
一、功能介绍
本爬虫实现了一个生产级的异步爬虫系统,具备以下核心能力:
- 异步高并发:基于 asyncio + aiohttp,5个Worker协同工作,爬取效率提升10倍+
- 分布式去重:Redis Set 实现URL级别+内容级别双重去重,避免重复爬取和重复存储
- 智能重试: 支持指数退避重试策略,自动处理网络波动和服务器限流(429)
- 代理池管理:代理自动轮转、失效自动剔除,有效降低被封风险
- 生产级日志:分级日志输出(控制台/全量文件/错误文件分离),支持日志自动轮转
- 数据持久化:爬取结果和错误信息自动导出为JSON文件
- 实时监控: 每10秒输出统计信息(队列长度、已爬、保存、失败、重复)
演示视频
Python生产级异步爬虫视频
二、环境配置(运行前必看)
1. 安装 Python
- 官网下载:python.org
- 建议版本:3.8及以上
- 安装时勾选“Add Python to PATH”
2. 安装依赖
pip install aiohttp beautifulsoup4 lxml redis
3. Redis安装与启动
Windows:
# 下载 Redis for Windows
#(这边建议下载个Tiny RDN用于配置环境,不用每次手动启动redis)
# 或使用 WSL
redis-server.exe
Mac:
brew install redis
redis-server
Linux (Ubuntu/Debian):
sudo apt update
sudo apt install redis-server
sudo systemctl start redis
4. 验证Redis连接
redis-cli ping
# 返回 PONG 表示正常
三、代码展示
完整代码如下:
import asyncio
import aiohttp
import redis
import json
import hashlib
import time
import random
import logging
from bs4 import BeautifulSoup
from datetime import datetime
from logging.handlers import RotatingFileHandler
def set_logging():
logger = logging.getLogger('Crawler')
logger.setLevel(logging.DEBUG)
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
file_handler = RotatingFileHandler(
'total.log',
maxBytes = 10*1024**2,
backupCount=5,
encoding='utf-8'
)
file_handler.setLevel(logging.DEBUG)
error_handler = RotatingFileHandler(
'error.log',
maxBytes = 5*1024**2,
backupCount = 3,
encoding = 'utf-8'
)
error_handler.setLevel(logging.ERROR)
formatter = logging.Formatter(
fmt = '%(asctime)s - %(name)s - %(levelname)s - %(message)s',
datefmt = '%Y-%m-%d %H:%M:%S'
)
console_handler.setFormatter(formatter)
file_handler.setFormatter(formatter)
error_handler.setFormatter(formatter)
logger.addHandler(console_handler)
logger.addHandler(file_handler)
logger.addHandler(error_handler)
return logger
logger = set_logging()
USER_AGENTS = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 Chrome/120.0.0.0',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:121.0) Gecko/20100101 Firefox/121.0',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 Safari/605.1.15',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 Chrome/119.0.0.0',
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 Chrome/120.0.0.0'
]
class ProductionCrawler:
def __init__(self):
self.logger = logging.getLogger('Crawler')
self.redis = redis.Redis(
host = 'localhost',
port = 6379,
decode_responses = True
)
self.URL_QUEUE = 'crawler:url_queue'
self.VISITED_SET = 'crawler:visited'
self.CONTENT_SET = 'crawler:content_hash'
self.RESULT_LIST = 'crawler:results'
self.FAILED_SET = 'crawler:failed'
self.STATS_HASH = 'crawler:stats'
self.PROXY_KEY = 'crawler:proxies'
def get_content_hash(self,title,price):
return hashlib.md5(f'{title}_{price}'.encode()).hexdigest()
def add_url(self,url):
if not self.redis.sismember(self.VISITED_SET,url):
self.redis.lpush(self.URL_QUEUE,url)
self.logger.debug(f'{url}已添加至队列')
def get_url(self):
return self.redis.rpop(self.URL_QUEUE)
def add_proxy(self,proxy):
self.redis.sadd(self.PROXY_KEY,proxy)
self.logger.debug(f'已添加:{proxy}代理')
def get_proxy(self):
proxies = list(self.redis.smembers(self.PROXY_KEY))
return random.choice(proxies) if proxies else None
def remove_proxy(self,proxy):
removed = self.redis.srem(self.PROXY_KEY,proxy)
if removed:
self.logger.warning(f'已移除失效代理: {proxy}')
else:
self.logger.debug(f'代理不存在或已移除: {proxy}')
def mark_visited(self,url):
self.redis.sadd(self.VISITED_SET,url)
self.redis.hincrby(self.STATS_HASH,'visited',1)
self.logger.debug(f'标记已访问:{url}')
def mark_failed(self,url,error):
self.redis.sadd(self.FAILED_SET,url)
self.redis.hincrby(self.STATS_HASH,'failed',1)
self.redis.hset(f'error:{url}','error',error)
self.redis.hset(f'error:{url}','time',time.time())
self.logger.warning(f'请求失败[{url}]:{error}')
def save_book(self,book_data):
content_hash = self.get_content_hash(
book_data.get('title'),
book_data.get('price')
)
result = self.redis.sismember(self.CONTENT_SET,content_hash)
if result:
self.redis.hincrby(self.STATS_HASH,'duplicates',1)
self.logger.debug(f'发现重复数据:{content_hash}')
return False
pipe = self.redis.pipeline()
pipe.sadd(self.CONTENT_SET,content_hash)
book_data['content_hash'] = content_hash
book_data['crawled_at'] = datetime.now().isoformat()
pipe.lpush(self.RESULT_LIST,json.dumps(book_data,ensure_ascii=False))
pipe.hincrby(self.STATS_HASH,'saved',1)
pipe.execute()
self.logger.info(f'已保存书籍:{book_data["title"]}')
return True
def clear_all_data(self):
self.logger.info('正在清理历史数据...')
pipe = self.redis.pipeline()
keys_to_delete = [
self.URL_QUEUE,
self.VISITED_SET,
self.CONTENT_SET,
self.RESULT_LIST,
self.FAILED_SET,
self.STATS_HASH
]
for key in keys_to_delete:
pipe.delete(key)
error_keys = list(self.redis.scan_iter("error:*"))
for key in error_keys:
pipe.delete(key)
pipe.execute()
self.logger.info(f'清理完成,删除了{len(keys_to_delete)+len(error_keys)}个key')
async def close(self):
self.logger.info('关闭Redis连接')
self.redis.close()
async def crawl_book_page(self,session,url,max_retries=3):
for i in range(max_retries):
try:
headers = {'User-Agent': random.choice(USER_AGENTS)}
proxy = self.get_proxy()
async with session.get(url,timeout=15,headers=headers) as resp:
if resp.status == 200:
pass
elif resp.status == 429:
wait_time = 2**i
self.logger.warning(f'遇到限流,等待{wait_time}秒后重试')
await asyncio.sleep(wait_time)
continue
else:
self.mark_failed(url,f'HTTP(page) {resp.status}')
return []
html = await resp.text()
soup = BeautifulSoup(html,'lxml')
books = soup.find_all('article',class_='product_pod')
book_urls = []
for book in books:
relative_url = book.h3.a['href']
full_url = f'https://books.toscrape.com/catalogue/{relative_url.replace("../","")}'
book_urls.append(full_url)
self.mark_visited(url)
return book_urls
except asyncio.TimeoutError:
if proxy:
self.remove_proxy(proxy)
self.mark_failed(url,'Timeout')
except aiohttp.ClientProxyConnectionError:
if proxy:
self.remove_proxy(proxy)
self.mark_failed(url, 'Proxy connection failed')
except aiohttp.ClientConnectorError as e:
if proxy:
self.remove_proxy(proxy)
self.mark_failed(url, str(e))
except Exception as e:
self.mark_failed(url,str(e))
error_msg = self.redis.hget(f'error:{url}', 'error')
self.logger.error(f'错误信息:{error_msg}')
return []
async def crawl_book_detail(self,session,url,max_retries=3):
for i in range(max_retries):
try:
headers = {'User-Agent': random.choice(USER_AGENTS)}
proxy = self.get_proxy()
async with session.get(url,timeout=10,headers=headers) as resp:
if resp.status == 200:
pass
elif resp.status == 429:
wait_time = 2**i
self.logger.warning(f'遇到限流,等待{wait_time}秒后重试')
await asyncio.sleep(wait_time)
continue
else:
self.mark_failed(url,f'HTTP(detail) {resp.status}')
return None
html = await resp.text()
soup = BeautifulSoup(html,'lxml')
title = soup.h1.text if soup.h1 else ''
price_elem = soup.find('p',class_='price_color')
price = price_elem.text if price_elem else ''
rating_elem = soup.find('p',class_='star-rating')
rating = rating_elem['class'][1] if rating_elem else ''
rating_map = {'One':1,'Two':2,'Three':3,'Four':4,'Five':5}
rating = rating_map.get(rating,0)
stock_elem = soup.find('p',class_='instock availability')
stock = stock_elem.text.strip() if stock_elem else ''
return {
'url':url,
'title':title,
'price':price,
'rating':rating,
'stock':stock
}
except asyncio.TimeoutError:
if proxy:
self.remove_proxy(proxy)
self.mark_failed(url,'Timeout')
except aiohttp.ClientProxyConnectionError:
if proxy:
self.remove_proxy(proxy)
self.mark_failed(url, 'Proxy connection failed')
except aiohttp.ClientConnectorError as e:
if proxy:
self.remove_proxy(proxy)
self.mark_failed(url, str(e))
except Exception as e:
self.mark_failed(url,str(e))
error_msg = self.redis.hget(f'error:{url}', 'error')
self.logger.error(f'错误信息:{error_msg}')
return None
async def worker(self,worker_id):
self.logger.info(f"工作线程 {worker_id} 启动")
async with aiohttp.ClientSession() as session:
while True:
url = self.get_url()
if not url:
self.logger.info(f"工作线程 {worker_id} 队列为空,退出")
break
if '/catalogue/page-' in url:
book_urls = await self.crawl_book_page(session,url)
for book_url in book_urls:
self.add_url(book_url)
else:
book_data = await self.crawl_book_detail(session,url)
if book_data:
self.save_book(book_data)
self.logger.info(f'[{worker_id}] √ {book_data["title"]}')
else:
self.mark_failed(url,'书本数据获取失败')
self.logger.warning('\n'+'>'*7+'有个书本数据获取失败!'+'<'*7)
await asyncio.sleep(0.5)
def get_stats(self):
return {
'queue_size':self.redis.llen(self.URL_QUEUE),
'visited':int(self.redis.hget(self.STATS_HASH,'visited')or 0),
'saved':int(self.redis.hget(self.STATS_HASH,'saved')or 0),
'failed':int(self.redis.hget(self.STATS_HASH, 'failed') or 0),
'duplicates': int(self.redis.hget(self.STATS_HASH, 'duplicates') or 0)
}
def export_results(self,filename='book.json'):
self.logger.info(f"开始导出数据到 {filename}")
results=[]
while True:
item = self.redis.rpop(self.RESULT_LIST)
if not item:
break
results.append(json.loads(item))
with open(filename,'w',encoding = 'utf-8') as f:
json.dump(results,f,ensure_ascii=False,indent=2)
self.logger.info(f'已导出{len(results)}条数据到{filename}')
return None
def export_errors(self,filename='error.json'):
self.logger.info(f"开始导出错误到 {filename}")
errors = []
failed_urls = self.redis.smembers(self.FAILED_SET)
for url in failed_urls:
error = self.redis.hget(f'error:{url}','error')
error_time = self.redis.hget(f'error:{url}','time')
errors.append({
'url':url,
'error':error,
'time':float(error_time) if error_time else None,
'timestamp':datetime.fromtimestamp(float(error_time)).isoformat() if error_time else None
})
with open(filename,'w',encoding='utf-8') as f:
json.dump(errors,f,ensure_ascii=False,indent=2)
self.logger.info(f'已导出{len(errors)}条错误到{filename}')
return None
async def main():
logger = logging.getLogger('Crawler')
logger.info("=" * 50)
logger.info("爬虫程序启动")
logger.info("=" * 50)
crawler = ProductionCrawler()
crawler.clear_all_data()
base_urls = [f'https://books.toscrape.com/catalogue/page-{i}.html' for i in range(1, 51)]
for url in base_urls:
crawler.add_url(url)
base_proxies = [
'http://39.102.214.199:80',
'http://47.99.112.148:3129',
'http://47.104.198.111:8008',
'http://8.219.167.110:8082',
'http://116.63.130.30:18081'
]
for proxy in base_proxies:
crawler.add_proxy(proxy)
logger.info(f'已添加{len(base_proxies)}个代理')
logger.info(f'已添加{len(base_urls)}个初始URL')
workers = [crawler.worker(i+1) for i in range(5)]
async def print_stats():
while True:
await asyncio.sleep(10)
stats = crawler.get_stats()
logger.info(f'''统计:
队列 = {stats['queue_size']},
已爬 = {stats['visited']},
保存 = {stats['saved']},
失败 = {stats['failed']},
重复={stats['duplicates']}''')
stats_task = asyncio.create_task(print_stats())
try:
await asyncio.gather(*workers)
except KeyboardInterrupt:
logger.warning('\n!!!!!!用户停止爬虫!!!!!!')
finally:
crawler.export_results()
crawler.export_errors()
await crawler.close()
stats = crawler.get_stats()
logger.info(f'''统计:
队列 = {stats['queue_size']},
已爬 = {stats['visited']},
保存 = {stats['saved']},
失败 = {stats['failed']},
重复={stats['duplicates']}''')
stats_task.cancel()
logger.info("=" * 50)
logger.info("爬虫程序结束")
logger.info("=" * 50)
if __name__ == '__main__':
asyncio.run(main())
注意!!由于演示IP是在网上找的免费版,极其不稳定,所以此代码删除了proxy的加入,但保留了代理池。如果大家想使用直接在session.get()括号里面写入proxy=proxy即可!!!同时小编建议大家使用正规IP代理平台,比如小编使用的‘站大爷’。
四、HTML关键提取
- 调出查阅系统,这是浏览器自带的,按F12键(小编是Windows11)

点击元素按键 - 再点击第一个按键

- 把鼠标移到你想要查询的信息并点击,比如书籍名字。这时会自动定位到HTML相关内容

被选中的内容背景会出现加深
五、核心代码
核心1:分布式去重系统
一开始小编只做了URL去重,后来发现同一本书在不同页面出现多次,数据重复了。于是加了内容哈希去重。
self.redis.sismember()会判断是元素否在集合里,并返回True/False。
def add_url(self,url):
if not self.redis.sismember(self.VISITED_SET,url):
self.redis.lpush(self.URL_QUEUE,url)
self.logger.debug(f'{url}已添加至队列')
def save_book(self,book_data):
content_hash = self.get_content_hash(
book_data.get('title'),
book_data.get('price')
)
result = self.redis.sismember(self.CONTENT_SET,content_hash)
if result:
self.redis.hincrby(self.STATS_HASH,'duplicates',1)
self.logger.debug(f'发现重复数据:{content_hash}')
return False
核心2:异常处理与重试机制
for i in range(max_retries):
try:
async with session.get(url, timeout=15) as resp:
if resp.status == 429: # 限流
await asyncio.sleep(2 ** i) # 指数退避
continue
# 正常处理...
except asyncio.TimeoutError:
self.mark_failed(url, 'Timeout')
if proxy:
self.remove_proxy(proxy) # 代理失效,移除
在写爬虫的时候,可能会遇到限流、代理被封、代理不可用等情况,这时可以利用指数退避来防止限流,当超时爬取,首先要判断是否使用了代理,可能是自己的网络或者其他问题导致触发延迟。
核心3:异步并发架构
async def worker(self, worker_id):
async with aiohttp.ClientSession() as session:
while True:
url = self.get_url()
if not url:
break
# 处理请求...
await asyncio.sleep(0.5) # 礼貌爬取
# 启动5个Worker
workers = [crawler.worker(i) for i in range(5)]
await asyncio.gather(*workers)
爬虫小编认为讲究的是效率,一页页爬取太慢了,适当设置一些worker可以有效提高我们爬取的效率。
六、待改进的地方
这个异步爬虫代码还有许多待改进的地方:比如并未添加断点爬取和增量爬取功能,大家喜欢的可以尝试添加。
七、源码下载
完整代码已上传 GitHub,里面有详细的讲解,下面是链接:
https://github.com/LIBOHAN-sudo/async-crawler.git
也可以直接下载压缩包:
https://github.com/LIBOHAN-sudo/async-crawler/archive/refs/tags/v1.0.0.zip
如果对你有帮助,欢迎 Star ⭐ ~~
本人持续更新中————
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐

所有评论(0)