小红书数据采集服务深度解析与实践

一、项目架构设计与演进

1.1 架构设计理念

xhs-data-collector-mcp 采用分层架构设计,严格遵循单一职责原则,确保模块职责清晰、易于维护、易于扩展。

┌─────────────────────────────────────────────────────────────────┐
│                        架构分层                                 │
├─────────────────────────────────────────────────────────────────┤
│  Layer 1: MCP Protocol Layer                                   │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │ 工具注册 / 请求解析 / 响应序列化 / 错误处理              │   │
│  └─────────────────────────────────────────────────────────┘   │
├─────────────────────────────────────────────────────────────────┤
│  Layer 2: Service Layer                                        │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐         │
│  │ Collector    │  │ Analyzer     │  │ Storage      │         │
│  │ (数据采集)   │  │ (数据分析)   │  │ (数据存储)   │         │
│  └──────────────┘  └──────────────┘  └──────────────┘         │
├─────────────────────────────────────────────────────────────────┤
│  Layer 3: Client Layer                                         │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │ HTTP Client / API Wrapper / Rate Limiter               │   │
│  └─────────────────────────────────────────────────────────┘   │
├─────────────────────────────────────────────────────────────────┤
│  Layer 4: Infrastructure Layer                                 │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐         │
│  │ Config       │  │ Logging      │  │ Metrics      │         │
│  │ (配置管理)   │  │ (日志系统)   │  │ (指标监控)   │         │
│  └──────────────┘  └──────────────┘  └──────────────┘         │
└─────────────────────────────────────────────────────────────────┘

做数据采集最怕 写死结构,分层架构最大好处就是高内聚、低耦合。上层不用关心底层怎么爬、怎么解析,底层改接口、换规则也不影响上层调用。这种结构从单体到微服务都能平滑过渡,后期扩展抖音、微博等平台时,直接新增采集层即可,上层逻辑几乎不用动。

1.2 核心设计模式

表格

设计模式 应用场景 实现价值
策略模式 数据分析算法切换 支持不同分析策略动态切换
工厂模式 采集器创建 统一创建入口,降低耦合
装饰器模式 请求限流、日志增强 无侵入扩展功能
观察者模式 数据变化通知 实时响应数据更新
单例模式 HTTP 客户端、缓存 资源复用,避免重复创建

爬虫项目最容易乱的就是各种重复代码、各种零散逻辑。用设计模式把公共逻辑抽出来,比如限流、重试、缓存、日志,一次封装、到处复用,后期维护成本极低。

二、核心模块实现详解

2.1 MCP 协议层实现

2.1.1 工具注册机制
from fastmcp import FastMCP, tool
from typing import List, Optional

app = FastMCP(
    name="xhs-data-collector-mcp",
    version="0.1.0",
    description="小红书数据采集 MCP 服务"
)

@tool
async def collect_post_data(
    post_url: str,
    include_comments: bool = False,
    comment_limit: int = 10
) -> dict:
    """采集帖子详情数据"""
    collector = PostCollector()
    return await collector.fetch(post_url, include_comments, comment_limit)

@tool
async def batch_collect_posts(
    post_urls: List[str],
    parallel: bool = True
) -> List[dict]:
    """批量采集帖子数据"""
    collector = PostCollector()
    if parallel:
        tasks = [collector.fetch(url) for url in post_urls]
        return await asyncio.gather(*tasks)
    return [await collector.fetch(url) for url in post_urls]

MCP 最大的价值就是统一接口、统一调用方式。把所有采集能力包装成标准工具,上层 AI 调度系统不用关心底层是爬小红书还是其他平台,直接调用工具即可,真正做到能力即服务;。

2.1.2 请求处理流程
async def handle_request(tool_name: str, arguments: dict) -> dict:
    """
    请求处理核心流程
    1. 参数校验 → 2. 权限检查 → 3. 业务处理 → 4. 结果封装
    """
    validator = get_validator(tool_name)
    if not validator.validate(arguments):
        return {"error": "Invalid parameters", "details": validator.errors}

    if not await check_permission(tool_name, arguments):
        return {"error": "Permission denied"}

    try:
        result = await execute_tool(tool_name, arguments)
    except Exception as e:
        return {"error": str(e), "type": type(e).__name__}

    return {"success": True, "data": result}

标准请求流程就是安全 + 稳定。参数校验防止脏数据,权限检查保证安全,异常捕获避免服务崩溃。这一套流程写好,服务稳定性直接上一个台阶。

2.2 数据采集层设计

2.2.1 采集器基类
from abc import ABC, abstractmethod
from typing import Any, Dict

class BaseCollector(ABC):
    """采集器基类,定义统一接口"""
    def __init__(self):
        self.http_client = XHSHTTPClient()
        self.cache = DataCache()

    @abstractmethod
    async def fetch(self, *args, **kwargs) -> Dict[str, Any]:
        pass

    async def fetch_with_cache(self, key: str, fetch_func, *args, **kwargs):
        cached = self.cache.get(key)
        if cached:
            return cached
        result = await fetch_func(*args, **kwargs)
        self.cache.set(key, result)
        return result

基类 + 抽象方法是复用代码、统一规范的最佳实践。所有采集器都继承基类,自动拥有 HTTP 客户端、缓存、日志能力,不用每个采集器重复写。

2.2.2 帖子采集器实现
class PostCollector(BaseCollector):
    """帖子数据采集器"""
    async def fetch(
        self, 
        post_url: str,
        include_comments: bool = False,
        comment_limit: int = 10
    ) -> dict:
        post_id = self._extract_post_id(post_url)
        base_info = await self._get_post_base_info(post_id)
        interaction_data = await self._get_post_interaction(post_id)
        result = {**base_info, **interaction_data}
        if include_comments:
            result['comments'] = await self._get_post_comments(post_id, comment_limit)
        return result

    def _extract_post_id(self, url: str) -> str:
        import re
        match = re.search(r'/note/(\d+)/', url)
        return match.group(1) if match else url

    async def _get_post_base_info(self, post_id: str) -> dict:
        url = f"https://api.xiaohongshu.com/note/{post_id}"
        data = await self.http_client.get(url)
        return {
            'post_id': data.get('id'),
            'title': data.get('title', ''),
            'content': data.get('content', ''),
            'author_id': data.get('user', {}).get('id'),
            'author_name': data.get('user', {}).get('nickname', ''),
            'category': data.get('category', ''),
            'create_time': data.get('create_time'),
            'images': [img.get('url') for img in data.get('images', [])],
            'tags': [tag.get('name') for tag in data.get('tags', [])]
        }

    async def _get_post_interaction(self, post_id: str) -> dict:
        url = f"https://api.xiaohongshu.com/note/{post_id}/interaction"
        data = await self.http_client.get(url)
        return {
            'likes': data.get('likes', 0),
            'comments': data.get('comments', 0),
            'shares': data.get('shares', 0),
            'favorites': data.get('favorites', 0),
            'views': data.get('views', 0)
        }

    async def _get_post_comments(self, post_id: str, limit: int) -> list:
        url = f"https://api.xiaohongshu.com/note/{post_id}/comments"
        params = {'limit': limit, 'sort': 'hot'}
        data = await self.http_client.get(url)
        return [
            {
                'comment_id': comment.get('id'),
                'user_id': comment.get('user', {}).get('id'),
                'user_name': comment.get('user', {}).get('nickname'),
                'content': comment.get('content', ''),
                'likes': comment.get('likes', 0),
                'create_time': comment.get('create_time')
            }
            for comment in data.get('comments', [])[:limit]
        ]

采集器核心就是拆解流程、分步获取、合并结果。先拿 ID、再拿基础信息、再拿互动数据、可选拿评论,逻辑清晰、便于维护,后期接口变了只改对应方法即可。

2.3 数据分析层实现

2.3.1 趋势分析器
class TrendAnalyzer:
    """趋势分析器"""
    def __init__(self):
        self.collector = SearchCollector()

    async def analyze(self, topic: str, days: int = 7) -> dict:
        history_data = await self._get_history_data(topic, days)
        return {
            'topic': topic,
            'period': f'{days}天',
            'total_posts': history_data['total_posts'],
            'avg_likes': history_data['avg_likes'],
            'growth_rate': self._calculate_growth_rate(history_data),
            'peak_hours': self._find_peak_hours(history_data),
            'related_topics': await self._find_related_topics(topic),
            'sentiment': self._analyze_sentiment(history_data),
            'engagement_rate': self._calculate_engagement_rate(history_data)
        }

    def _calculate_growth_rate(self, data: dict) -> float:
        if data.get('previous_period_posts', 0) == 0:
            return 0.0
        return ((data['total_posts'] - data['previous_period_posts']) / 
                data['previous_period_posts']) * 100

    def _find_peak_hours(self, data: dict) -> list:
        hourly_data = data.get('hourly_data', {})
        if not hourly_data:
            return []
        sorted_hours = sorted(hourly_data.items(), key=lambda x: x[1], reverse=True)
        return [int(hour) for hour, _ in sorted_hours[:3]]

    def _calculate_engagement_rate(self, data: dict) -> float:
        views = data.get('total_views', 0)
        if views == 0:
            return 0.0
        interactions = data.get('total_likes', 0) + data.get('total_comments', 0)
        return (interactions / views) * 100

数据分析不是简单的 算个数,而是从数据里挖规律、找趋势、给决策。增长率、高峰时段、情感倾向、互动率,这些指标直接指导运营:什么时候发、发什么内容、效果好不好。

2.3.2 热点检测算法
class HotTopicDetector:
    """热点话题检测器"""
    def __init__(self):
        self.trending_threshold = 1000
        self.growth_threshold = 50

    async def detect_hot_topics(self, count: int = 20) -> list:
        topics = await self._get_topic_ranking()
        hot_topics = []
        for topic in topics[:count * 2]:
            if self._is_hot(topic):
                hot_topics.append(topic)
                if len(hot_topics) >= count:
                    break
        return hot_topics

    def _is_hot(self, topic: dict) -> bool:
        if topic.get('heat', 0) < self.trending_threshold:
            return False
        if topic.get('growth_rate', 0) < self.growth_threshold:
            return False
        return True

个人理解:热点检测关键是双阈值判断:热度够高、增长够快,才是真热点。避免把老热门、伪热门 误判,给运营提供精准的选题方向。

三、HTTP 客户端与请求优化

3.1 客户端设计

import aiohttp
from typing import Dict, Any, Optional

class XHSHTTPClient:
    """小红书 API HTTP 客户端"""
    def __init__(self, timeout: int = 30, max_retries: int = 3):
        self.timeout = aiohttp.ClientTimeout(total=timeout)
        self.max_retries = max_retries
        self.session: Optional[aiohttp.ClientSession] = None
        self._initialize_session()

    def _initialize_session(self):
        connector = aiohttp.TCPConnector(
            limit_per_host=10,
            ttl_dns_cache=300,
            ssl=False
        )
        self.session = aiohttp.ClientSession(
            connector=connector,
            timeout=self.timeout,
            headers=self._build_headers(),
            cookie_jar=aiohttp.CookieJar(unsafe=True)
        )

    def _build_headers(self) -> Dict[str, str]:
        return {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) Chrome/120.0.0.0 Safari/537.36',
            'Accept': 'application/json, text/plain, */*',
            'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
            'Accept-Encoding': 'gzip, deflate, br',
            'Connection': 'keep-alive',
            'Referer': 'https://www.xiaohongshu.com/',
            'Origin': 'https://www.xiaohongshu.com',
            'X-Requested-With': 'XMLHttpRequest'
        }

    async def get(self, url: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
        for attempt in range(self.max_retries):
            try:
                async with self.session.get(url, params=params) as response:
                    response.raise_for_status()
                    return await response.json()
            except aiohttp.ClientError as e:
                if attempt == self.max_retries - 1:
                    raise
                await asyncio.sleep(2 ** attempt)

    async def close(self):
        if self.session:
            await self.session.close()

爬虫最怕被封、超时、连接失败。HTTP 客户端做好异步、连接池、超时、重试、伪装请求头,就是爬虫的防弹衣,稳定是爬虫的生命线。

3.2 限流与熔断机制

from collections import defaultdict
from datetime import datetime, timedelta
from functools import wraps
import asyncio

class RateLimiter:
    """请求限流器"""
    def __init__(self, max_requests: int = 100, time_window: int = 60):
        self.max_requests = max_requests
        self.time_window = timedelta(seconds=time_window)
        self.requests = defaultdict(list)

    async def acquire(self, key: str = 'default'):
        now = datetime.now()
        self.requests[key] = [
            req_time for req_time in self.requests[key]
            if now - req_time < self.time_window
        ]
        if len(self.requests[key]) >= self.max_requests:
            oldest_time = self.requests[key][0]
            wait_time = (oldest_time + self.time_window - now).total_seconds()
            if wait_time > 0:
                await asyncio.sleep(wait_time)
        self.requests[key].append(datetime.now())

def rate_limited(max_requests: int = 100, time_window: int = 60):
    limiter = RateLimiter(max_requests, time_window)
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            await limiter.acquire()
            return await func(*args, **kwargs)
        return wrapper
    return decorator

限流不是限制自己;,而是保护自己、保护目标网站。高频请求必被封,合理限流 + 随机间隔,是爬虫长期稳定运行的关键。

四、数据模型与类型安全

4.1 核心数据模型

from pydantic import BaseModel, Field
from typing import List, Optional, Dict
from datetime import datetime

class User(BaseModel):
    user_id: str
    nickname: str
    avatar: Optional[str]
    bio: Optional[str]
    followers: int = 0
    followings: int = 0
    posts: int = 0
    likes: int = 0
    verified: bool = False
    create_time: Optional[datetime]

class Comment(BaseModel):
    comment_id: str
    user: User
    content: str
    likes: int = 0
    create_time: datetime
    reply_to: Optional[str]

class Post(BaseModel):
    post_id: str
    title: str
    content: str
    author: User
    category: str
    tags: List[str] = []
    images: List[str] = []
    video_url: Optional[str]
    likes: int = 0
    comments: int = 0
    shares: int = 0
    favorites: int = 0
    views: int = 0
    create_time: datetime
    comment_list: Optional[List[Comment]]

class TrendData(BaseModel):
    topic: str
    period: str
    total_posts: int
    avg_likes: float
    avg_comments: float
    growth_rate: float
    peak_hours: List[int]
    related_topics: List[str]
    sentiment: str
    engagement_rate: float

class HotTopic(BaseModel):
    rank: int
    title: str
    heat: int
    growth_rate: float
    trend: str
    related_posts: int

数据模型就是数据的说明书。用 Pydantic 做类型校验,避免脏数据、字段混乱、类型错误,前后端、服务之间数据交互更安全、清晰、易维护。

4.2 请求响应结构

class CollectRequest(BaseModel):
    url: str
    options: Optional[Dict[str, Any]]

class CollectResponse(BaseModel):
    success: bool
    data: Optional[Dict[str, Any]]
    error: Optional[str]
    timestamp: datetime = Field(default_factory=datetime.now)

class BatchCollectRequest(BaseModel):
    urls: List[str]
    parallel: bool = True
    timeout: Optional[int]

class BatchCollectResponse(BaseModel):
    success: bool
    results: List[CollectResponse]
    success_count: int
    failed_count: int

统一请求响应格式,调用方不用猜返回结构,成功 / 失败、数据 / 错误一目了然,对接更简单、排错更高效。

五、配置管理与运行时环境

5.1 配置体系

from pydantic_settings import BaseSettings, SettingsConfigDict
from typing import Optional

class Settings(BaseSettings):
    mcp_host: str = "0.0.0.0"
    mcp_port: int = 8006
    request_timeout: int = 30
    max_retries: int = 3
    max_requests_per_minute: int = 100
    cache_ttl: int = 300
    cache_maxsize: int = 1000
    log_level: str = "INFO"
    log_format: str = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
    data_dir: str = "./data"
    enable_persistence: bool = False
    enable_metrics: bool = True
    metrics_port: int = 9090

    model_config = SettingsConfigDict(env_file=".env", env_prefix="XHS_")

settings = Settings()

配置和代码分离,开发 / 测试 / 生产环境一键切换,不用改代码、不用重启服务,运维更轻松、部署更灵活。

5.2 .env 配置文件示例

XHS_MCP_HOST=0.0.0.0
XHS_MCP_PORT=8006
XHS_REQUEST_TIMEOUT=30
XHS_MAX_RETRIES=3
XHS_MAX_REQUESTS_PER_MINUTE=100
XHS_CACHE_TTL=300
XHS_CACHE_MAXSIZE=1000
XHS_LOG_LEVEL=INFO
XHS_DATA_DIR=./data
XHS_ENABLE_PERSISTENCE=false
XHS_ENABLE_METRICS=true
XHS_METRICS_PORT=9090

六、部署与运维

6.1 本地开发环境

uv sync
uv shell
uv run python -m xhs_data_collector_mcp.main --reload
curl http://localhost:8006/health
curl http://localhost:8006/tools

用 uv 管理依赖,安装快、依赖锁定、环境干净,开发体验比 pip 好太多,推荐所有 Python 项目都用。

6.2 生产环境部署

6.2.1 systemd 部署
[Unit]
Description=XHS Data Collector MCP Service
After=network.target

[Service]
User=appuser
WorkingDirectory=/opt/xhs-data-collector-mcp
ExecStart=/opt/xhs-data-collector-mcp/.venv/bin/python -m xhs_data_collector_mcp.main
Restart=always
RestartSec=5
Environment="XHS_LOG_LEVEL=INFO"
Environment="XHS_ENABLE_PERSISTENCE=true"

[Install]
WantedBy=multi-user.target

systemd 托管服务,开机自启、崩溃自动重启、日志统一管理,生产环境稳定运行的标配。

6.2.2 Docker 部署
FROM python:3.11-slim
WORKDIR /app
COPY pyproject.toml uv.lock ./
RUN pip install uv && uv sync --frozen
COPY src/ ./src/
EXPOSE 8006
CMD ["uv", "run", "python", "-m", "xhs_data_collector_mcp.main"]
docker build -t xhs-data-collector-mcp .
docker run -d \
  -p 8006:8006 \
  -v ./data:/app/data \
  --env-file .env \
  xhs-data-collector-mcp

Docker 容器化部署,环境隔离、一致性好、一键部署、扩容方便,微服务架构下必选方案。

6.3 监控与日志

6.3.1 指标监控
from prometheus_client import Counter, Gauge, Histogram, start_http_server

REQUEST_COUNT = Counter(
    'xhs_collector_requests_total',
    'Total number of requests',
    ['tool', 'status']
)

REQUEST_DURATION = Histogram(
    'xhs_collector_request_duration_seconds',
    'Request duration in seconds',
    ['tool']
)

ACTIVE_REQUESTS = Gauge(
    'xhs_collector_active_requests',
    'Number of active requests'
)

CACHE_HITS = Counter(
    'xhs_collector_cache_hits_total',
    'Number of cache hits'
)

CACHE_MISSES = Counter(
    'xhs_collector_cache_misses_total',
    'Number of cache misses'
)

事前预警、事中监控、事后分析。请求数、耗时、缓存命中率、活跃请求数,这些指标能直观看到服务健康状态。

6.3.2 结构化日志
import json
from loguru import logger

class StructuredLogger:
    def __init__(self, service_name: str):
        self.service_name = service_name
        logger.remove()
        logger.add(
            lambda msg: print(json.dumps(msg, ensure_ascii=False)),
            format="{message}",
            level="INFO"
        )

    def info(self, message: str, **kwargs):
        log_data = {
            'service': self.service_name,
            'level': 'INFO',
            'message': message,
            **kwargs
        }
        logger.info(log_data)

    def error(self, message: str, **kwargs):
        log_data = {
            'service': self.service_name,
            'level': 'ERROR',
            'message': message,
            **kwargs
        }
        logger.error(log_data)

结构化日志(JSON 格式)便于机器解析、便于日志分析、便于问题定位,比普通文本日志更适合生产环境。

七、安全与合规

7.1 请求伪装与反爬策略

import random
from typing import Dict

class RequestDisguise:
    USER_AGENTS = [
        'Mozilla/5.0 (Windows NT 10.0; Win64; x64) Chrome/120.0.0.0 Safari/537.36',
        'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) Chrome/120.0.0.0 Safari/537.36',
        'Mozilla/5.0 (Windows NT 10.0; Win64) Edge/120.0.0.0',
        'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) Safari/605.1.15'
    ]

    REFERRERS = [
        'https://www.xiaohongshu.com/',
        'https://www.xiaohongshu.com/search',
        'https://www.xiaohongshu.com/user/profile',
        'https://www.xiaohongshu.com/explore'
    ]

    @classmethod
    def get_random_user_agent(cls) -> str:
        return random.choice(cls.USER_AGENTS)

    @classmethod
    def get_random_referer(cls) -> str:
        return random.choice(cls.REFERRERS)

    @classmethod
    def build_headers(cls) -> Dict[str, str]:
        return {
            'User-Agent': cls.get_random_user_agent(),
            'Referer': cls.get_random_referer(),
            'Accept': 'application/json, text/plain, */*',
            'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
            'Accept-Encoding': 'gzip, deflate, br',
            'Connection': 'keep-alive',
            'X-Requested-With': 'XMLHttpRequest'
        }

伪装请求头、随机 UA、随机 Referer,模拟真实用户行为,降低被识别为爬虫的概率,提高稳定性。

7.2 合规使用规范

表格

规范项 要求
请求频率 单 IP 每分钟不超过 100 次请求
User-Agent 使用真实浏览器 UA,定期轮换
请求间隔 随机间隔 1-3 秒
数据用途 仅供学习研究,禁止商业用途
数据存储 敏感数据加密存储,定期清理
隐私保护 不采集用户隐私信息

合规是底线。遵守平台规则、控制频率、保护隐私、不商用,才能长期稳定运行,避免法律风险。

八、与其他 MCP 服务的集成实践

8.1 服务协作架构

┌─────────────────────────────────────────────────────────────────────────┐
│                        AI Social Scheduler                              │
│  ┌─────────────────────────────────────────────────────────────────┐   │
│  │                      决策引擎                                   │   │
│  │  ┌──────────┐    ┌──────────┐    ┌──────────┐                │   │
│  │  │ 策略分析  │───▶│ 任务调度  │───▶│ 结果评估  │                │   │
│  │  └──────────┘    └──────────┘    └──────────┘                │   │
│  └─────────────────────────────────────────────────────────────────┘   │
└─────────────────────────────────┬───────────────────────────────────────┘
                                  │ MCP Protocol
        ┌─────────────────────────┼─────────────────────────┐
        ↓                         ↓                         ↓
┌─────────────────┐     ┌─────────────────┐     ┌─────────────────┐
│ xhs-data-       │     │ xhs-content-    │     │ xhs-browser-    │
│ collector-mcp   │     │ generator-mcp   │     │ automation-mcp  │
│                 │     │                 │     │                 │
│ - 数据采集      │     │ - 内容生成      │     │ - 内容发布      │
│ - 趋势分析      │     │ - 文案优化      │     │ - 用户互动      │
│ - 热点检测      │     │ - 标题生成      │     │ - 账户管理      │
└────────┬────────┘     └────────┬────────┘     └────────┬────────┘
         │                       │                       │
         └───────────────────────┴───────────────────────┘
                          │
                          ↓
                 ┌─────────────────┐
                 │   小红书平台     │
                 └─────────────────┘

MCP 生态的核心是各司其职、协同工作。数据采集服务提供 &#34;弹药&#34;,内容生成、发布服务提供 战斗力,调度系统做指挥官,形成完整的 AI 运营闭环。

8.2 数据流转示例

async def execute_operation_flow():
    # 1. 获取热点话题
    hot_topics = await data_collector.call_tool("get_hot_topics", {"count": 10})
    selected_topic = select_best_topic(hot_topics)
    
    # 2. 分析趋势
    trend = await data_collector.call_tool("analyze_trend", {"topic": selected_topic, "days": 7})
    
    # 3. 生成内容
    content = await content_generator.call_tool(
        "generate_content",
        {"topic": selected_topic, "style": "casual", "length": "medium", "trend_data": trend}
    )
    
    # 4. 发布内容
    result = await browser_automation.call_tool(
        "publish_content",
        {"title": content["title"], "content": content["content"], "tags": content["tags"], "images": content["images"]}
    )
    
    # 5. 监控效果
    await monitor_publish_result(result["post_id"])

完整运营流程就是数据驱动决策、决策指导创作、创作发布、效果反馈、迭代优化。数据采集服务是整个流程的起点,也是核心。

总结

xhs-data-collector-mcp 是一套完整、现代化、可落地的小红书数据采集服务。从架构分层、设计模式、核心模块、HTTP 优化、数据模型、配置管理、部署运维、安全合规到服务集成,覆盖全链路工程化实践。

个人总结:做爬虫 / 数据采集项目,核心就三点:稳定、规范、可扩展。

稳定:做好限流、重试、伪装、异常处理,长期不崩;

规范:分层架构、设计模式、类型安全、统一接口,好维护;

可扩展:模块化、标准化、MCP 协议,轻松加平台、加能力。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐