前言

正所谓Python离不开爬虫,就像西方离不开耶路撒冷~~~
学python但不学爬虫,那么你将失去提升自己,磨练意志的机会。

注意!!!本章演示网站为https://books.toscrape.com
此网站为专门练习爬虫的网站,不用担心封IP

一、功能介绍

本爬虫实现了一个生产级的异步爬虫系统,具备以下核心能力:

  • 异步高并发:基于 asyncio + aiohttp,5个Worker协同工作,爬取效率提升10倍+
  • 分布式去重:Redis Set 实现URL级别+内容级别双重去重,避免重复爬取和重复存储
  • 智能重试: 支持指数退避重试策略,自动处理网络波动和服务器限流(429)
  • 代理池管理:代理自动轮转、失效自动剔除,有效降低被封风险
  • 生产级日志:分级日志输出(控制台/全量文件/错误文件分离),支持日志自动轮转
  • 数据持久化:爬取结果和错误信息自动导出为JSON文件
  • 实时监控: 每10秒输出统计信息(队列长度、已爬、保存、失败、重复)

演示视频

Python生产级异步爬虫视频

二、环境配置(运行前必看)

1. 安装 Python

  • 官网下载:python.org
  • 建议版本:3.8及以上
  • 安装时勾选“Add Python to PATH”

2. 安装依赖

pip install aiohttp beautifulsoup4 lxml redis

3. Redis安装与启动

Windows

# 下载 Redis for Windows
#(这边建议下载个Tiny RDN用于配置环境,不用每次手动启动redis)
# 或使用 WSL
redis-server.exe

Mac

brew install redis
redis-server

Linux (Ubuntu/Debian)

sudo apt update
sudo apt install redis-server
sudo systemctl start redis

4. 验证Redis连接

redis-cli ping
# 返回 PONG 表示正常

三、代码展示

完整代码如下:

import asyncio
import aiohttp
import redis
import json
import hashlib
import time
import random
import logging
from bs4 import BeautifulSoup
from datetime import datetime
from logging.handlers import RotatingFileHandler

def set_logging():
    logger = logging.getLogger('Crawler')
    logger.setLevel(logging.DEBUG)

    console_handler = logging.StreamHandler()
    console_handler.setLevel(logging.INFO)

    file_handler = RotatingFileHandler(
        'total.log',
        maxBytes = 10*1024**2,
        backupCount=5,
        encoding='utf-8'
        )
    file_handler.setLevel(logging.DEBUG)

    error_handler = RotatingFileHandler(
        'error.log',
        maxBytes = 5*1024**2,
        backupCount = 3,
        encoding = 'utf-8'
        )
    error_handler.setLevel(logging.ERROR)

    formatter = logging.Formatter(
        fmt = '%(asctime)s - %(name)s - %(levelname)s - %(message)s',
        datefmt = '%Y-%m-%d %H:%M:%S'
        )
    console_handler.setFormatter(formatter)
    file_handler.setFormatter(formatter)
    error_handler.setFormatter(formatter)

    logger.addHandler(console_handler)
    logger.addHandler(file_handler)
    logger.addHandler(error_handler)

    return logger
logger = set_logging()
    

USER_AGENTS = [
    'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 Chrome/120.0.0.0',
    'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:121.0) Gecko/20100101 Firefox/121.0',
    'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 Safari/605.1.15',
    'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 Chrome/119.0.0.0',
    'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 Chrome/120.0.0.0'
]

class ProductionCrawler:
    def __init__(self):
        self.logger = logging.getLogger('Crawler')
        self.redis = redis.Redis(
            host = 'localhost',
            port = 6379,
            decode_responses = True
            )
        self.URL_QUEUE = 'crawler:url_queue'
        self.VISITED_SET = 'crawler:visited'
        self.CONTENT_SET = 'crawler:content_hash'
        self.RESULT_LIST = 'crawler:results'
        self.FAILED_SET = 'crawler:failed'
        self.STATS_HASH = 'crawler:stats'
        self.PROXY_KEY = 'crawler:proxies'

    def get_content_hash(self,title,price):
        return hashlib.md5(f'{title}_{price}'.encode()).hexdigest()

    def add_url(self,url):
        if not self.redis.sismember(self.VISITED_SET,url):
            self.redis.lpush(self.URL_QUEUE,url)
            self.logger.debug(f'{url}已添加至队列')

    def get_url(self):
        return self.redis.rpop(self.URL_QUEUE)

    def add_proxy(self,proxy):
        self.redis.sadd(self.PROXY_KEY,proxy)
        self.logger.debug(f'已添加:{proxy}代理')
        
    def get_proxy(self):
        proxies = list(self.redis.smembers(self.PROXY_KEY))
        return random.choice(proxies) if proxies else None

    def remove_proxy(self,proxy):
        removed = self.redis.srem(self.PROXY_KEY,proxy)
        if removed:
            self.logger.warning(f'已移除失效代理: {proxy}')
        else:
            self.logger.debug(f'代理不存在或已移除: {proxy}')
        
    def mark_visited(self,url):
        self.redis.sadd(self.VISITED_SET,url)
        self.redis.hincrby(self.STATS_HASH,'visited',1)
        self.logger.debug(f'标记已访问:{url}')

    def mark_failed(self,url,error):
        self.redis.sadd(self.FAILED_SET,url)
        self.redis.hincrby(self.STATS_HASH,'failed',1)
        self.redis.hset(f'error:{url}','error',error)
        self.redis.hset(f'error:{url}','time',time.time())
        self.logger.warning(f'请求失败[{url}]:{error}')

    def save_book(self,book_data):
        content_hash = self.get_content_hash(
            book_data.get('title'),
            book_data.get('price')
            )

        result = self.redis.sismember(self.CONTENT_SET,content_hash)

        if result:
            self.redis.hincrby(self.STATS_HASH,'duplicates',1)
            self.logger.debug(f'发现重复数据:{content_hash}')
            return False
        pipe = self.redis.pipeline()
        pipe.sadd(self.CONTENT_SET,content_hash)
        book_data['content_hash'] = content_hash
        book_data['crawled_at'] = datetime.now().isoformat()

        pipe.lpush(self.RESULT_LIST,json.dumps(book_data,ensure_ascii=False))
        pipe.hincrby(self.STATS_HASH,'saved',1)
        pipe.execute()
        self.logger.info(f'已保存书籍:{book_data["title"]}')
        return True

    def clear_all_data(self):
        self.logger.info('正在清理历史数据...')
        pipe = self.redis.pipeline()

        keys_to_delete = [
            self.URL_QUEUE,
            self.VISITED_SET,
            self.CONTENT_SET,
            self.RESULT_LIST,
            self.FAILED_SET,
            self.STATS_HASH
            ]
        for key in keys_to_delete:
            pipe.delete(key)

        error_keys = list(self.redis.scan_iter("error:*"))
        for key in error_keys:
            pipe.delete(key)

        pipe.execute()
        self.logger.info(f'清理完成,删除了{len(keys_to_delete)+len(error_keys)}个key')
    
        
    async def close(self):
        self.logger.info('关闭Redis连接')
        self.redis.close()

    async def crawl_book_page(self,session,url,max_retries=3):     
        for i in range(max_retries):
            try:
                headers = {'User-Agent': random.choice(USER_AGENTS)}
                proxy = self.get_proxy()
                async with session.get(url,timeout=15,headers=headers) as resp:
                    if resp.status == 200:
                        pass
                    elif resp.status == 429:
                        wait_time = 2**i
                        self.logger.warning(f'遇到限流,等待{wait_time}秒后重试')
                        await asyncio.sleep(wait_time)
                        continue
                    else:
                        self.mark_failed(url,f'HTTP(page) {resp.status}')
                        return []
                    html = await resp.text()
                    soup = BeautifulSoup(html,'lxml')


                    books = soup.find_all('article',class_='product_pod')
                    book_urls = []
                    for book in books:
                        relative_url = book.h3.a['href']
                        full_url = f'https://books.toscrape.com/catalogue/{relative_url.replace("../","")}'
                        book_urls.append(full_url)

                    self.mark_visited(url)
                    return book_urls
            except asyncio.TimeoutError:
                if proxy:
                    self.remove_proxy(proxy)
                self.mark_failed(url,'Timeout')
            except aiohttp.ClientProxyConnectionError:
                if proxy:
                    self.remove_proxy(proxy)
                self.mark_failed(url, 'Proxy connection failed')
            except aiohttp.ClientConnectorError as e:
                if proxy:
                    self.remove_proxy(proxy)
                self.mark_failed(url, str(e))
            except Exception as e:
                self.mark_failed(url,str(e))
        error_msg = self.redis.hget(f'error:{url}', 'error')
        self.logger.error(f'错误信息:{error_msg}')
        return []

    async def crawl_book_detail(self,session,url,max_retries=3):   
        for i in range(max_retries):
            try:
                headers = {'User-Agent': random.choice(USER_AGENTS)}
                proxy = self.get_proxy()
                async with session.get(url,timeout=10,headers=headers) as resp:
                    if resp.status == 200:
                        pass
                    elif resp.status == 429:
                        wait_time = 2**i
                        self.logger.warning(f'遇到限流,等待{wait_time}秒后重试')
                        await asyncio.sleep(wait_time)
                        continue
                    else:
                        self.mark_failed(url,f'HTTP(detail) {resp.status}')
                        return None

                    html = await resp.text()
                    soup = BeautifulSoup(html,'lxml')

                    title = soup.h1.text if soup.h1 else ''
                    
                    price_elem = soup.find('p',class_='price_color')
                    price = price_elem.text if price_elem else ''

                    rating_elem = soup.find('p',class_='star-rating')
                    rating = rating_elem['class'][1] if rating_elem else ''
                    rating_map = {'One':1,'Two':2,'Three':3,'Four':4,'Five':5}
                    rating = rating_map.get(rating,0)

                    stock_elem = soup.find('p',class_='instock availability')
                    stock = stock_elem.text.strip() if stock_elem else ''

                    return {
                        'url':url,
                        'title':title,
                        'price':price,
                        'rating':rating,
                        'stock':stock
                        }
            except asyncio.TimeoutError:
                if proxy:
                    self.remove_proxy(proxy)
                self.mark_failed(url,'Timeout')
            except aiohttp.ClientProxyConnectionError:
                if proxy:
                    self.remove_proxy(proxy)
                self.mark_failed(url, 'Proxy connection failed')
            except aiohttp.ClientConnectorError as e:
                if proxy:
                    self.remove_proxy(proxy)
                self.mark_failed(url, str(e))
            except Exception as e:
                self.mark_failed(url,str(e))
        error_msg = self.redis.hget(f'error:{url}', 'error')
        self.logger.error(f'错误信息:{error_msg}')
        return None

    async def worker(self,worker_id):
        self.logger.info(f"工作线程 {worker_id} 启动")
        async with aiohttp.ClientSession() as session:
            while True:
                url = self.get_url()
                if not url:
                    self.logger.info(f"工作线程 {worker_id} 队列为空,退出")
                    break     
                if '/catalogue/page-' in url:
                    book_urls = await self.crawl_book_page(session,url)
                    for book_url in book_urls:
                        self.add_url(book_url)
                else:
                    book_data = await self.crawl_book_detail(session,url)
                    if book_data:
                        self.save_book(book_data)
                        self.logger.info(f'[{worker_id}] √ {book_data["title"]}')
                    else:
                        self.mark_failed(url,'书本数据获取失败')
                        self.logger.warning('\n'+'>'*7+'有个书本数据获取失败!'+'<'*7)

                await asyncio.sleep(0.5)

    def get_stats(self):
        return {
            'queue_size':self.redis.llen(self.URL_QUEUE),
            'visited':int(self.redis.hget(self.STATS_HASH,'visited')or 0),
            'saved':int(self.redis.hget(self.STATS_HASH,'saved')or 0),
            'failed':int(self.redis.hget(self.STATS_HASH, 'failed') or 0),
            'duplicates': int(self.redis.hget(self.STATS_HASH, 'duplicates') or 0)
            }
    def export_results(self,filename='book.json'):
        self.logger.info(f"开始导出数据到 {filename}")
        results=[]
        while True:
            item = self.redis.rpop(self.RESULT_LIST)
            if not item:
                break
            results.append(json.loads(item))

        with open(filename,'w',encoding = 'utf-8') as f:
            json.dump(results,f,ensure_ascii=False,indent=2)

        self.logger.info(f'已导出{len(results)}条数据到{filename}')
        return None

    def export_errors(self,filename='error.json'):
        self.logger.info(f"开始导出错误到 {filename}")
        errors = []
        failed_urls = self.redis.smembers(self.FAILED_SET)
        for url in failed_urls:
            error = self.redis.hget(f'error:{url}','error')
            error_time = self.redis.hget(f'error:{url}','time')
            errors.append({
                'url':url,
                'error':error,
                'time':float(error_time) if error_time else None,
                'timestamp':datetime.fromtimestamp(float(error_time)).isoformat() if error_time else None
                })
        with open(filename,'w',encoding='utf-8') as f:
            json.dump(errors,f,ensure_ascii=False,indent=2)
        self.logger.info(f'已导出{len(errors)}条错误到{filename}')
        return None

async def main():
    logger = logging.getLogger('Crawler')
    logger.info("=" * 50)
    logger.info("爬虫程序启动")
    logger.info("=" * 50)
    crawler = ProductionCrawler()
    crawler.clear_all_data()
    base_urls = [f'https://books.toscrape.com/catalogue/page-{i}.html' for i in range(1, 51)]
    for url in base_urls:
        crawler.add_url(url)
    base_proxies = [
        'http://39.102.214.199:80',
        'http://47.99.112.148:3129',
        'http://47.104.198.111:8008',
        'http://8.219.167.110:8082',
        'http://116.63.130.30:18081'
        ]
    for proxy in base_proxies:
        crawler.add_proxy(proxy)
        
    logger.info(f'已添加{len(base_proxies)}个代理')
    logger.info(f'已添加{len(base_urls)}个初始URL')

    workers = [crawler.worker(i+1) for i in range(5)]

    async def print_stats():
        while True:
            await asyncio.sleep(10)
            stats = crawler.get_stats()
            logger.info(f'''统计:
队列 = {stats['queue_size']},
已爬 = {stats['visited']},
保存 = {stats['saved']},
失败 = {stats['failed']},
重复={stats['duplicates']}''')
    stats_task = asyncio.create_task(print_stats())

    try:
        await asyncio.gather(*workers)
    except KeyboardInterrupt:
        logger.warning('\n!!!!!!用户停止爬虫!!!!!!')
    finally:
        crawler.export_results()
        crawler.export_errors()
        await crawler.close()
    stats = crawler.get_stats()
    logger.info(f'''统计:
队列 = {stats['queue_size']},
已爬 = {stats['visited']},
保存 = {stats['saved']},
失败 = {stats['failed']},
重复={stats['duplicates']}''')
    stats_task.cancel()

    logger.info("=" * 50)
    logger.info("爬虫程序结束")
    logger.info("=" * 50)

if __name__ == '__main__':
    asyncio.run(main())

注意!!由于演示IP是在网上找的免费版,极其不稳定,所以此代码删除了proxy的加入,但保留了代理池。如果大家想使用直接在session.get()括号里面写入proxy=proxy即可!!!同时小编建议大家使用正规IP代理平台,比如小编使用的‘站大爷’。

四、HTML关键提取

  1. 调出查阅系统,这是浏览器自带的,按F12键(小编是Windows11)
    F12
    点击元素按键
  2. 再点击第一个按键
    在这里插入图片描述
  3. 把鼠标移到你想要查询的信息并点击,比如书籍名字。这时会自动定位到HTML相关内容
    在这里插入图片描述
    被选中的内容背景会出现加深

五、核心代码

核心1:分布式去重系统

一开始小编只做了URL去重,后来发现同一本书在不同页面出现多次,数据重复了。于是加了内容哈希去重。
self.redis.sismember()会判断是元素否在集合里,并返回True/False。

def add_url(self,url):
        if not self.redis.sismember(self.VISITED_SET,url):
            self.redis.lpush(self.URL_QUEUE,url)
            self.logger.debug(f'{url}已添加至队列')
def save_book(self,book_data):
        content_hash = self.get_content_hash(
            book_data.get('title'),
            book_data.get('price')
            )
        result = self.redis.sismember(self.CONTENT_SET,content_hash)
        if result:
            self.redis.hincrby(self.STATS_HASH,'duplicates',1)
            self.logger.debug(f'发现重复数据:{content_hash}')
            return False

核心2:异常处理与重试机制

for i in range(max_retries):
    try:
        async with session.get(url, timeout=15) as resp:
            if resp.status == 429:  # 限流
                await asyncio.sleep(2 ** i)  # 指数退避
                continue
            # 正常处理...
    except asyncio.TimeoutError:
        self.mark_failed(url, 'Timeout')
        if proxy:
            self.remove_proxy(proxy)  # 代理失效,移除

在写爬虫的时候,可能会遇到限流、代理被封、代理不可用等情况,这时可以利用指数退避来防止限流,当超时爬取,首先要判断是否使用了代理,可能是自己的网络或者其他问题导致触发延迟。

核心3:异步并发架构

async def worker(self, worker_id):
    async with aiohttp.ClientSession() as session:
        while True:
            url = self.get_url()
            if not url:
                break
            # 处理请求...
            await asyncio.sleep(0.5)  # 礼貌爬取

# 启动5个Worker
workers = [crawler.worker(i) for i in range(5)]
await asyncio.gather(*workers)

爬虫小编认为讲究的是效率,一页页爬取太慢了,适当设置一些worker可以有效提高我们爬取的效率。

六、待改进的地方

这个异步爬虫代码还有许多待改进的地方:比如并未添加断点爬取和增量爬取功能,大家喜欢的可以尝试添加。

七、源码下载

完整代码已上传 GitHub,里面有详细的讲解,下面是链接:
https://github.com/LIBOHAN-sudo/async-crawler.git

也可以直接下载压缩包:
https://github.com/LIBOHAN-sudo/async-crawler/archive/refs/tags/v1.0.0.zip

如果对你有帮助,欢迎 Star ⭐ ~~
本人持续更新中————

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐