如何使用LangChain 代理和Scrapeless构建 AI 数据管道+ 访问实时网络数据
关键要点:
- 第一方LangChain集成。 langchain-scrapeless包在PyPI上提供五个现成的工具——
ScrapelessDeepSerpGoogleSearchTool、ScrapelessDeepSerpGoogleTrendsTool、ScrapelessUniversalScrapingTool、ScrapelessCrawlerCrawlTool和ScrapelessCrawlerScrapeTool——可以直接放入LangChain或LangGraph代理中。无需子进程处理,无需自定义CDP连接。 - 管道有四个步骤:发现 → 渲染 → 提取 → 存储。 使用深SERP工具在Google中搜索,使用通用抓取工具渲染任何URL(背后是抓取浏览器),使用
PydanticOutputParser提取已类型化的记录,并可选择性地嵌入向量存储,以便于后续的RAG。 - 代理决定何时抓取。 LangChain的
create_agent(背后是LangGraph运行时)让LLM在根据先前上下文回答和调用Scrapeless工具之间进行选择——云浏览器仅在真正需要新数据时启动,这样可以保持常规操作中的成本和延迟在合理范围内。 - 已类型化的记录,而非原始HTML。 将
ScrapelessUniversalScrapingTool(markdown响应)与Pydantic模式配对,可将抓取页面转化为经过验证的Product/Article/JobListing记录,后续代码可以在没有额外解析支持的情况下使用。 - 反检测云浏览器,覆盖195多个国家的住宅代理。 Scrapeless抓取浏览器处理JavaScript渲染、住宅代理流量、和每个会话的指纹随机化(用户代理、时区、WebGL、canvas),让代理专注于推理,而非规避处理。
引言:实时网页的AI数据管道
一个光秃秃的LLM仅依赖训练数据回答。对于大多数实际发 ship 的智能工作流程——竞争价格情报、线索研究、市场监控、结构化新闻摄取、实时网络上的RAG——模型需要“现在查看页面”。训练截止日期、付费墙、懒加载的单页面应用以及个性化渲染都将答案推向LLM从未看到的领域,回答要么引用过时的数字,要么礼貌地拒绝。
LangChain加云浏览器是标准答案。模型进行推理;浏览器进行获取;代理将两者结合在一起。大多数团队遇到的摩擦点在代理之下:住宅代理、JavaScript渲染、反检测指纹识别和会话生命周期等都需要解决,代理才能做任何有用的事情。直接使用Playwright通过住宅VPN运行时适用于单台笔记本电脑的作业;但无法支持生产调度。
Scrapeless抓取浏览器在平台层面处理这四个问题,langchain-scrapeless PyPI包将其作为原生LangChain工具暴露。本文将介绍如何将这些工具组合成一个四步的发现 → 渲染 → 提取 → 存储 AI数据管道,并提供一个竞争研究的实例、已类型化的Pydantic输出、受限的并发和可观察性钩子。如需在不同协议下使用相同原语,请参阅MCP集成帖子。
你可以构建的内容
langchain-scrapeless提供的五个工具覆盖了最常见的AI数据管道模式:
- 竞争价格情报。 搜索一个类别,渲染顶级零售商页面,提取带有价格、评级和评论数量的已类型化
Product记录。 - SERP监控。 使用
ScrapelessDeepSerpGoogleSearchTool根据gl和hl参数跟踪关键词排名和片段漂移。 - 市场趋势追踪。 使用
ScrapelessDeepSerpGoogleTrendsTool获取类别大小的interest_over_time和相关查询(趋势端点访问取决于你的Scrapeless计划级别)。 - 大规模的产品详情提取。 将一组URL输入
ScrapelessCrawlerScrapeTool,返回适合LLM提取器的markdown格式。 - 目录中的潜在客户生成。 使用
ScrapelessCrawlerCrawlTool抓取商业列表网站,将联系人行解析为已类型化的记录,并按域名去重。 - 结构化新闻摄取以支持RAG。 渲染出版商页面以生成干净的markdown,提取
Article记录,嵌入到LangChain向量存储中,并通过检索增强链查询。
所有六个管道都将相同的原语组合——搜索、渲染、提取、存储——底下的示例涵盖了整个链的每个环节。
为什么选择Scrapeless抓取浏览器
Scrapeless抓取浏览器是一款可定制的、反检测的云浏览器,专为网络爬虫和AI代理设计。对于LangChain代理而言,它提供:
- 覆盖195多个国家的住宅代理——地理限制查询返回本地用户所看到的列表,每个会话的轮换是自动的。
- 云端 JavaScript 渲染 — 完整的 Chromium,页面在提取前已被激活,因此单页面应用程序(SPA)、无限滚动的动态信息流和延迟加载的面板都是一流的目标。
- 每个会话的防检测指纹识别 — 用户代理、时区、语言、屏幕分辨率、WebGL 和画布在每个会话中随机化,当一致性重要时,提供自定义指纹 API 用于固定身份。
- 云浏览器层的会话持久性 通过
sessionTTL(60–900秒)和sessionName— 直接驱动 WSS 端点时可用;langchain-scrapeless工具调用在每次invoke时分配一个新会话,这是研究管道的正确默认设置。 - 第一方 LangChain 集成 — 使用
pip install langchain-scrapeless使云浏览器作为原生 LangChain 工具暴露;无需子进程包装,无需 CDP 管道,无需自定义序列化器。
在 Scrapeless 的免费计划中获取 API 密钥。完整的集成文档可在 github.com/scrapeless-ai/langchain-scrapeless 找到。
申请您的免费计划并开始抓取:
加入 Scrapeless 的活跃社区,申请 $5-10 免费计划,并与其他创新者联系:
Scrapeless 官方 Discord 社区
Scrapeless 官方 Telegram 社区
先决条件
- Python 3.10 或更新版本。
- 一个 Scrapeless 账户和 API 密钥 — 在 Scrapeless 注册并从 设置 → API 密钥管理 中复制密钥。
- 一个聊天模型 API 密钥 — 以下示例使用 OpenAI(
OPENAI_API_KEY);通过替换ChatOpenAI行,使用langchain-anthropic、langchain-google-genai、langchain-ollama或任何 LangChain 聊天模型的相同代理代码也有效。 - 对
pip和venv的基本熟悉。
安装
完整的设置分为四个子步骤。每个步骤都可以独立验证,您可以在继续之前暂停并确认。
1. 创建 venv 并安装包
bashCopy
python -m venv .venv
source .venv/bin/activate # Windows: .venv\Scripts\activate
pip install langchain langchain-scrapeless langchain-openai langgraph pydantic
langchain-scrapeless 拉取 langchain-core 和 Scrapeless Python SDK 作为传递依赖。langchain 元包提供 langchain.agents.create_agent,这是现代(未弃用)ReAct 代理运行时;langgraph 提供底层 CompiledStateGraph 运行时;langchain-openai 是示例中使用的聊天模型提供者。如果您愿意,可以更换为 langchain-anthropic 或其他提供者。
2. 配置您的 API 密钥
为当前 shell 会话导出两个密钥:
bashCopy
export SCRAPELESS_API_KEY="your_api_token_here"
export OPENAI_API_KEY="your_openai_token_here"
对于永久安装,将相同的行添加到 ~/.bashrc / ~/.zshrc,或使用 .env 加载器(python-dotenv),并在进程启动时加载该文件。Scrapeless 工具自动从环境中读取 SCRAPELESS_API_KEY — 您无需将其作为构造函数参数传入。
3. 验证安装
一个简短的烟雾测试,测试搜索工具并打印一行结果。Scrapeless API 有时在冷启动后的第一次调用时返回临时 400;测试循环最多会尝试三次,因此单个重试可以解决这些情况:
pythonCopy
# verify.py
import time
from langchain_scrapeless import ScrapelessDeepSerpGoogleSearchTool
tool = ScrapelessDeepSerpGoogleSearchTool()
for attempt in range(3):
try:
result = tool.invoke({"q": "scrapeless scraping browser",
"hl": "en", "gl": "us"})
print(str(result)[:300])
break
except ValueError as e:
print(f"transient (attempt {attempt + 1}): {e}")
time.sleep(3)
运行它:python verify.py。成功运行会在几秒钟内打印一串搜索结果。如果所有三次尝试都抛出 ValueError 并显示 failed with status 401,则 API 密钥丢失或错误;请重新检查同一 shell 中的 echo $SCRAPELESS_API_KEY。如果在三次尝试后仍然持续出现 400,则表示账户可能无法访问请求的端点 — 参见有关临时错误的 FAQ。
4. (可选)固定依赖项版本
为了可重现的构建,固定您测试过的版本。在干净的 Python 3.12 环境中,由 pip install langchain langchain-scrapeless langchain-openai langgraph pydantic 解析的版本组合是:
Copy
langchain==1.2.17
langchain-core==1.3.2
langchain-openai==1.2.1
langchain-scrapeless==0.1.3
langgraph==1.1.10
pydantic==2.13.3
scrapeless==1.1.1
注意:langchain-scrapeless 0.1.3 的包元数据声明 langchain-core <0.4.0,但 pip 解析为 langchain-core 1.3.2,因为 langchain-openai 需要它;运行时导入仍然有效。为了完全避免解析器警告,请安装 langchain-scrapeless 和您需要的聊天模型提供者(例如,去掉 langchain-openai,并传递一个 ChatAnthropic 实例)。LangGraph 1.0 于 2025 年 10 月 GA;1.1.x 系列是当前稳定版本。
如何实际使用:提示您的代理
安装后,您可以通过与代理对话来构建管道,而不是每次目标网站旋转其 DOM 时重新推导 CSS 选择器。代理负责发现 → 渲染 → 提取循环,而 LLM 为每一步选择合适的工具。
您可以粘贴的提示
| 您输入 | 代理所做的 |
|---|---|
| "找到前 5 名便携式咖啡机,并以 JSON 格式返回名称、价格、评分。" | 搜索 Google,渲染顶部结果,提取类型为 Product[]。 |
"给我美国 vector database 在过去 12 个月的当前 Google 趋势兴趣。" |
调用 ScrapelessDeepSerpGoogleTrendsTool,并使用 data_type="interest_over_time"(需要对 Trends 端点的计划访问)。 |
"爬取 https://example.com/docs 到深度 2,并返回每页的 markdown。" |
调用 ScrapelessCrawlerCrawlTool 并使用 limit=...。 |
"将 https://www.amazon.com/dp/B08N5WRWNW 渲染为 markdown。" |
调用 ScrapelessUniversalScrapingTool,并使用 response_type="markdown"。 |
"搜索 2026 年金融科技的 A 轮初创公司,列出公司及其融资轮次规模。" |
搜索 → 渲染 → 类型提取自动链式进行。 |
| "提取这三家 SaaS 竞争对手的主页和定价页面,并总结差异。" | 多 URL ScrapelessCrawlerScrapeTool → LLM 总结。 |
"监视这个 Greenhouse 职业页面,并告诉我哪些角色匹配 staff engineer 或 infra。" |
渲染 → 关键词过滤 → JSON 行。 |
"从英国出口中获取关于 langchain scrapeless tutorial 的前 10 名自然结果?" |
使用 ScrapelessDeepSerpGoogleSearchTool,并使用 gl="uk",hl="en"。 |
工作示例
您输入:
找到价格低于 150 美元的 3 款便携式咖啡机,适合旅行。对每一款,返回名称、价格、平均评分、评论数量和三个关键特征。将搜索固定在美国出口。
代理的计划(简单语音):
- 调用
ScrapelessDeepSerpGoogleSearchTool,使用q="best portable espresso makers under 150",gl="us",hl="en",num=5获取自然结果 URL。 - 对于前 3 个结果 URL,调用
ScrapelessUniversalScrapingTool,并使用response_type="markdown"渲染每个页面为干净的 markdown。 - 将每个渲染的页面传递给 LLM,并使用绑定到
Product模式的PydanticOutputParser;拒绝任何解析器无法提取name和price的页面。 - 将三条
Product记录汇总成 JSON 数组并返回。
您收到的响应:
jsonCopy
[
{
"name": "Wacaco Nanopresso",
"price": 79.95,
"rating": 4.7,
"review_count": 12483,
"key_features": [
"手动手泵操作",
"高达 18 巴的提取压力",
"与磨咖啡或通过适配器兼容的 NS 胶囊"
],
"url": "https://example.com/p/wacaco-nanopresso"
},
{
"name": "Flair NEO Flex",
"price": 119.00,
"rating": 4.5,
"review_count": 2104,
"key_features": [
"杠杆驱动,无需电力",
"拥有无底把手的浓缩压力",
"可拆卸以便旅行"
],
"url": "https://example.com/p/flair-neo-flex"
},
{
"name": "Outin Nano",
"price": 129.99,
"rating": 4.6,
"review_count": 5871,
"key_features": [
"内置加热元件",
"自清洁循环",
"USB-C 充电,约 3 分钟加热"
],
"url": "https://example.com/p/outin-nano"
}
]
// 模式准确反映步骤 4 解析器输出的内容。字段值仅供参考。
设计提示词
| 表达 | 效果 |
|---|---|
"使用德语出口(gl=de)。" |
固定搜索工具代理区域;结果返回柏林用户所见的内容。 |
| "以 markdown 格式获取。" | 在通用抓取工具上使用 response_type="markdown" — 更便宜的 LLM 上下文,更稳定的选择器,而不是 HTML。 |
| "限制抓取为 25 页。" | 在 ScrapelessCrawlerCrawlTool 上使用 limit=25。 |
| "跳过没有价格的页面。" | 解析器会对缺少的字段返回 None;代理进行过滤。 |
| "并行运行三个 URL。" | 移交给下面步骤 6 的有界并发模式。 |
下面的步骤 1-6 是在后台的参考。阅读一次以了解发现 → 渲染 → 提取 → 存储模式是如何组合的;然后相信代理将其应用于操作员提交的任何查询。
架构
Copy
┌──────────────────────────────────────────────────────────────────────┐
│ LangChain create_agent (LangGraph runtime) │
│ │
│ ┌───────────────────────┐ ┌────────────────────────────────┐ │
│ │ 聊天模型 │ ──► │ 工具 (langchain-scrapeless) │ │
│ │ (OpenAI / Anthropic │ │ • DeepSerpGoogleSearch │ │
│ │ / Gemini / Ollama) │ ◄── │ • UniversalScraping │ │
│ └───────────────────────┘ │ • CrawlerCrawl │ │
│ ▲ │ • CrawlerScrape │ │
│ │ │ • DeepSerpGoogleTrends │ │
│ │ └──────────────┬─────────────────┘ │
│ │ │ │
│ │ PydanticOutputParser │ │
│ │ (类型化记录) ▼ │
│ │ ┌──────────────────────────────────┐ │
│ │ │ 无痕云浏览器 │ │
│ │ │ • 住宅代理 (195+) │ │
│ │ │ • 反检测指纹 │ │
│ │ │ • Chromium JS 渲染 │ │
│ │ │ • 会话 TTL 60–900秒 │ │
│ │ └──────────────────────────────────┘ │
│ │ │ │
│ └──────────────────────────────────┘ │
│ 类型化记录返回到代理 │
└──────────────────────────────────────────────────────────────────────┘
│
▼ (可选)
┌───────────────────────────┐
│ 向量存储 (Chroma / │
│ PGVector / pgvector) │
└───────────────────────────┘
---
## 步骤 1 — 定义类型化输出模式
Pydantic 是将抓取的 Markdown 转变为下游代码可以依赖的负载支撑原素。在第 4 步中定义目标记录一次,并将其绑定到 LLM 提取器上。
```python
# schema.py
from typing import Optional
from pydantic import BaseModel, Field, HttpUrl
class Product(BaseModel):
name: str = Field(...,
description="产品名称。始终需要 — 如果没有清晰的产品名称,使用页面标题或 H1。")
price: Optional[float] = Field(None, description="以美元计的数值价格;如果缺失为 null")
rating: Optional[float] = Field(None, description="平均评分,0–5;如果缺失为 null")
review_count: Optional[int] = Field(None, description="评论数量;如果缺失为 null")
key_features: list[str] = Field(default_factory=list, description="3–5 个简短的特色要点")
url: HttpUrl = Field(..., description="规范产品网址")
将每个可能缺失的字段标记为 Optional,并将默认列表设置为空 — 反机器人插页、地区布局差异和懒惰加载的 DOM 节点意味着页面通常会省略一两个字段,而一个非可选的模式会拒绝那些本来可以有用的行。保持 name 为必需项,并在其描述中提供替代选项(页面标题,H1),以使提取器在必需字段的情况下永远不会返回 null;这个单一的提示使得模式能够吸收嘈杂页面而不会引发错误。
步骤 2 — 使用 ScrapelessDeepSerpGoogleSearchTool 进行发现
深层 SERP 工具根据查询返回 Google 的自然结果,参数化为语言(hl)、国家(gl)和结果数量(num)。它是发现的基础构件 — 搜索在您承诺任何每页渲染预算之前扩展了 URL 的宇宙。
pythonCopy
# discover.py
from langchain_scrapeless import ScrapelessDeepSerpGoogleSearchTool
search = ScrapelessDeepSerpGoogleSearchTool()
results = search.invoke({
"q": "2026 年最佳便携式浓缩咖啡机,价格低于 150",
"hl": "zh",
"gl": "cn",
"num": 5,
})
print(results)
hl 控制结果语言,gl 控制出口国家 — 它们是地区杠杆。对于跨区域的 SERP 监控,使用不同的 gl 值(us, de, jp, br)运行相同的查询,并比较结果列表。临时的 ValueError(HTTP 400/503 被工具包装)或 TimeoutError 响应在查询量较大时是正常的;在扩展之前,请使用第 6 步中的重试装饰器包装调用。
步骤 3 — 使用 ScrapelessUniversalScrapingTool 进行渲染
通用抓取工具前端是无刮擦抓取浏览器。它接受一个 URL 并以 markdown(或 HTML,或截图)的形式返回渲染的页面。Markdown 是输入 LLM 提取器的最低成本格式——它去除了广告、导航元素和内联样式,只保留页面实际上讲述的内容。
pythonCopy
# render.py
from langchain_scrapeless import ScrapelessUniversalScrapingTool
scrape = ScrapelessUniversalScrapingTool()
markdown = scrape.invoke({
"url": "https://example.com/p/wacaco-nanopresso",
"response_type": "markdown",
})
print(markdown[:600])
每次 invoke 都会分配一个新的云浏览器会话,这对于研究管道来说是合适的默认设置——每个 URL 使用新会话更简单且对每会话的反机器人状态更具弹性。通过 sessionName 的跨调用会话重用是 CDP 级功能;如果你的工作流程需要在页面之间保持热 cookie 和登录状态,请直接通过 WSS 端点驱动云浏览器,而不是通过这个 LangChain 工具。当你需要运行自己的选择器时,工具还接受 response_type="html",对于最低成本的 LLM 上下文,使用 response_type="plaintext",或对于视觉回归管道,使用 response_type="png" / "jpeg"。
第 4 步 — 使用 PydanticOutputParser 提取
第 1 步绑定的模式直接插入到一个 LangChain 表达式语言 (LCEL) 链中,该链接收渲染的 markdown 并返回一个类型为 Product 的对象。解析器将 JSON 模式注入提示中并根据其验证 LLM 的响应。
pythonCopy
# extract.py
from langchain_core.output_parsers import PydanticOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from schema import Product
parser = PydanticOutputParser(pydantic_object=Product)
prompt = ChatPromptTemplate.from_messages([
("system",
"你从渲染的网页中提取产品记录。\n"
"输出严格符合此模式:\n{format_instructions}"),
("human",
"来源 URL: {url}\n\n渲染的 markdown:\n{markdown}\n\n"
"返回一个 Product 记录。`name` 字段是必需的 —— 如果没有明确的产品名称,使用页面标题或 H1。缺少时仅将可选字段 "
"(price, rating, review_count) 设置为 null。"),
]).partial(format_instructions=parser.get_format_instructions())
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
extract_chain = prompt | llm | parser
product = extract_chain.invoke({"url": "https://example.com/p/wacaco-nanopresso",
"markdown": "<第 3 步的渲染 markdown>"})
print(product.model_dump())
当以下三件事成立时,解析器失败是很少见的:(1)每个不确定字段标记为 Optional[...],(2)提示明确告知模型仅可选字段可以是 null,(3)每个必需字段在其描述中有后备(例如 name 使用页面标题或 H1)。在这三者到位的情况下,模式的可空约定处理缺少的可选字段,而提示指令防止 LLM 在嘈杂页面上将必需字段设为 null——这样链条运行得很顺利,没有任何重试包装。
第 5 步 — 组合成 create_agent
代理将三个工具结合在一起,让 LLM 决定在任何给定用户消息中调用哪一个。langchain.agents.create_agent 是截至 langchain 1.2 的标准运行时(它替代了弃用的 langgraph.prebuilt.create_react_agent,并在底层使用相同的 LangGraph 状态图)。
pythonCopy
# agent.py
from langchain.agents import create_agent
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
from langchain_scrapeless import (
ScrapelessDeepSerpGoogleSearchTool,
ScrapelessUniversalScrapingTool,
ScrapelessCrawlerCrawlTool,
)
from tenacity import (retry, stop_after_attempt,
wait_exponential, retry_if_exception_type)
# Underlying first-party tools
_search = ScrapelessDeepSerpGoogleSearchTool()
_scrape = ScrapelessUniversalScrapingTool()
_crawl = ScrapelessCrawlerCrawlTool()
# The retry decorator the agent will see on every tool call.
# Scrapeless tools surface transient API errors as ValueError, so that's the filter.
_retry = retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=15),
retry=retry_if_exception_type(ValueError),
)
def _check(payload: str) -> str:
# The cloud browser sometimes returns HTTP 200 with an embedded ERR_ JSON.
# Surface it as ValueError so the @_retry decorator above kicks in.
if isinstance(payload, str) and payload.startswith('{"statusCode"') and "ERR_" in payload:
raise ValueError(f"Cloud-browser error: {payload[:200]}")
return payload
@tool
@_retry
def google_search(q: str, hl: str = "en", gl: str = "us", num: int = 5) -> str:
"""Search Google and return the top organic results as JSON."""
return _check(str(_search.invoke({"q": q, "hl": hl, "gl": gl, "num": num})))
@tool
@_retry
def render_page(url: str) -> str:
"""Render a URL with the Scrapeless cloud browser and return clean markdown."""
return _check(_scrape.invoke({"url": url, "response_type": "markdown"}))
@tool
@_retry
def crawl_site(url: str, limit: int = 10) -> str:
"""Crawl a site to a bounded page count, returning markdown for each page."""
return _check(str(_crawl.invoke({"url": url, "limit": limit})))
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
system_prompt = (
"You are a research agent that builds typed product datasets. "
"Given a category query, you: "
"(1) call google_search for the top organic URLs, "
"(2) call render_page with each promising URL, "
"(3) extract a Product record per page using the schema, "
"(4) return a JSON array of records. "
"Pin gl='us' and hl='en' unless the user asks otherwise. "
"If a page lacks a price, omit the row from the final array."
)
agent = create_agent(llm,
[google_search, render_page, crawl_site],
system_prompt=system_prompt)
for chunk in agent.stream(
{"messages": [("human",
"Find the top 3 portable espresso makers under $150 "
"and return name, price, rating, review_count, key_features, url.")]},
stream_mode="values",
):
chunk["messages"][-1].pretty_print()
@tool + @retry 装饰器栈是生产环境的负载核心。langchain langchain-scrapeless 工具会将底层 ScrapelessError 包装成 ValueError 再抛出,因此代理工具调用循环内部的瞬态 400/503 响应会以 ValueError 形式冒泡。如果没有重试机制,一次瞬态故障就会导致整个代理运行崩溃;而有了装饰器栈,每次工具调用都会使用指数抖动退避机制进行最多三次重试,之后才会放弃。( langchain-core Runnable.with_retry() 返回一个 RunnableRetry 对象,而 create_agent 不接受该对象——上面提到的 @tool 装饰器函数才是在重试 shell 中生成真正的 StructuredTool 路径。)
agent.stream(..., stream_mode="values") 会在每一步都发出完整的消息状态,因此操作员可以实时观察代理的工具调用和中间推理过程。对于单个最终答案,请切换到 agent.invoke(...) 。要将步骤 4 中的类型化 Product[] 传递给代理,请将提取链作为另一个 @tool 函数公开,并将其添加到列表中——代理会在每次渲染后调用它。`langchain langchain-scrapeless README 仍然演示了 from langgraph.prebuilt import create_react_agent ;`,该路径虽然有效,但会发出 ` LangGraphDeprecatedSinceV10 警告,因此建议使用上述现代导入方式。
第6步 — 生产强化
在笔记本中运行的研究脚本无法处理三万个URL。四种强化模式将上面的管道转变为调度程序可以无人值守运行的东西。
有限的并发
# concurrent_render.py
import asyncio
from langchain_scrapeless import ScrapelessUniversalScrapingTool
抓取 = ScrapelessUniversalScrapingTool()
SEM = asyncio.Semaphore(3) # 每个主机最多同时渲染3个
async def 渲染(url: str) -> str:
async with SEM:
return await 抓取.ainvoke({"url": url, "response_type": "markdown"})
async def 渲染所有(urls: list[str]) -> list[str]:
return await asyncio.gather(*(渲染(u) for u in urls))
每个主机三次并发渲染是最佳点——足够高以摊销每会话的预热成本,足够低以保持在大多数网站的每IP速率限制之下。在主机级别进行限制,而不是全球限制;十个不同的主机,每个主机三个工作者是可以的,但十个工作者全部访问同一零售商则不可以。
对瞬时错误进行重试
pythonCopy
# retry.py
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
from langchain_scrapeless import ScrapelessUniversalScrapingTool
抓取 = ScrapelessUniversalScrapingTool()
def _raise_on_embedded_error(payload: str) -> str:
# Scrapeless 有时返回HTTP 200,但JSON主体描述了一个
# 内部浏览器错误(隧道重置,ERR_CONNECTION_RESET等)。
# 工具没有对此进行抛出;将其表面化为ValueError,以便进行重试。
if payload.startswith('{"statusCode"') and "ERR_" in payload:
raise ValueError(f"云浏览器错误: {payload[:200]}")
return payload
@retry(
stop=stop_after_attempt(4),
wait=wait_exponential(multiplier=1, min=2, max=20),
retry=retry_if_exception_type((ValueError, TimeoutError)),
)
def 带重试的渲染(url: str) -> str:
return _raise_on_embedded_error(
抓取.invoke({"url": url, "response_type": "markdown"}))
抓取浏览器会话偶尔会以两种不同的形式失败:一种是HTTP层的400/503(langchain-scrapeless包装器会将其抛出为ValueError),另一种是HTTP 200并带有描述浏览器端错误的JSON体,如ERR_TUNNEL_CONNECTION_FAILED(包装器并不会抛出——错误的JSON作为字符串返回)。上述的_raise_on_embedded_error保护会捕获第二种形式并将其转换为ValueError,这样同样的重试策略就适用。涵盖了这两种形式后,指数退避与四次尝试的组合在不埋没真正失败(反机器人阻止、404错误)之下的重试情况下,捕捉到了高90百分位数。对于代理驱动的调用,使用第5步的@tool + @retry装饰器堆栈——那里包装器函数在返回之前应应用相同的_raise_on_embedded_error检查。
使用LangSmith的可观察性
bashCopy
export LANGCHAIN_TRACING_V2=true
export LANGCHAIN_API_KEY="your_langsmith_key"
export LANGCHAIN_PROJECT="scrapeless-research-agent"
设置好这三个环境变量后,每个工具调用、每个LLM调用和每个解析失败都会在LangSmith中显示,包含时间、成本以及模型看到的确切提示。对于生产运行,这是唯一的最高杠杆变化——它将“代理做了一些奇怪的事情”转变为可点击的追踪信息。
持久化到向量存储
pythonCopy
# embed.py
from langchain_chroma import Chroma
from langchain_openai import OpenAIEmbeddings
from langchain_core.documents import Document
docs = [Document(page_content=p.model_dump_json(), metadata={"url": str(p.url)})
for p in products]
vs = Chroma.from_documents(docs, OpenAIEmbeddings(), persist_directory=".chroma")
向量存储是管道的可选第四阶段。当代理是一个长期运行的RAG服务,用于回答下游对数据集的问题时,它才会产生回报;对于一次性研究脚本来说,它则显得有些过分。根据操作偏好选择存储——本地文件使用langchain_chroma,托管的Postgres + pgvector使用langchain_postgres,托管的向量数据库使用langchain_pinecone。
你会收到的结果
[
{
"name": "Wacaco Nanopresso",
"price": 79.95,
"rating": 4.7,
"review_count": 12483,
"key_features": [
"手动泵操作",
"可达18巴的提取压力",
"通过适配器兼容研磨咖啡或NS胶囊"
],
"url": "https://example.com/p/wacaco-nanopresso"
},
{
"name": "Flair NEO Flex",
"price": 119.00,
"rating": 4.5,
"review_count": 2104,
"key_features": [
"杠杆驱动,无需电力",
"无底滤杯的意式压榨压力",
"可拆卸以便旅行"
],
"url": "https://example.com/p/flair-neo-flex"
},
{
"name": "Outin Nano",
"price": 129.99,
"rating": 4.6,
"review_count": 5871,
"key_features": [
"内置加热元件",
"自清洁过程",
"USB-C充电,约3分钟加热"
],
"url": "https://example.com/p/outin-nano"
}
]
// 模式准确反映第4步解析器输出。字段值为示例性样本。
关于在实际网络上运行时期望得到的结果的一些真实观察:
- 水合时间因网站而异。 通用抓取工具默认等待
domcontentloaded;对于通过第二个XHR水合价格的单页应用,标记可能在价格渲染之前到达。重新渲染一次,稍微延迟,或在字段始终为null的情况下回退到response_type="html"和自定义选择器。 - 可选字段保持可选。 一些产品页面省略明确的评分或评论数,特别是在直接面向消费者的网站上。将
null值视为信息而非失败模式,并在后续处理中进行过滤。 - LLM提取主导延迟。 整个管道对于搜索调用大约为1-3秒,页面渲染为2-4秒,LLM提取为3-5秒。在渲染和提取阶段的并发是最大的杠杆。
- 反机器人插入作为
ValueError浮现。 当一个网站预加载Cloudflare或Akamai挑战,而云浏览器无法透明地完成时,langchain-scrapeless包装器会引发ValueError,而不是无声地返回占位页面。重试装饰器可以捕捉瞬态情况;持久性案例最好通过扩大指纹或固定不同的代理区域来处理。 - 向量存储阶段是可选的。 对于返回类型记录给下游消费者的研究管道,可以完全跳过。如果相同的数据集在一段时间内回答多个下游问题,则可以添加它。
常见问题FAQ
Q: 我需要住宅代理吗?
是的,对于任何具有有意义的反机器人保护的网站,例如大多数零售商、市场和搜索引擎结果页面(SERP)端点。ScrapelessUniversalScrapingTool和深度搜索工具默认通过Scrapeless的住宅代理池进行路由;搜索工具上的gl参数指定出口国家。
Q: 关于像400或503这样的瞬态错误怎么办?langchain-scrapeless工具将瞬态API错误显示为ValueError(底层的ScrapelessError在重新抛出之前被包装)。对于直接调用,使用第6步中的tenacity装饰器,设置retry_if_exception_type=(ValueError, TimeoutError)。对于在create_agent内部的代理驱动调用,使用第5步中的@tool + @retry装饰器堆栈包装每个工具 —— 这将生成一个真实的StructuredTool,代理可以接受并在每次工具调用时应用重试策略。如果没有其中之一,单个瞬态400会导致整个代理运行失败。
Q: 一个网站返回访问被拒绝。怎么办?
首先使用第6步的装饰器重试。如果页面持续阻止,则通过将gl更改为不同的国家来扩大会话,或者在尝试之间添加简短的await asyncio.sleep(...)来让会话状态冷却。对于有持续IP层阻止的网站,联系Scrapeless支持以确认阻止是在平台级别而非账户级别。
Q: 选择器不断损坏。我该如何应对DOM旋转?
在ScrapelessUniversalScrapingTool上使用response_type="markdown",而不是使用CSS选择器解析HTML。Markdown会折叠导航框架和大部分布局偏移,因此第4步中的LLM提取器可以看到内容的稳定表示,即使底层DOM发生变化。
Q: 每个主机可以有多少个并发工作者?
三个是稳定运行的文档上限。在主机级别设置上限(第6步中的asyncio.Semaphore(3));不同主机上的工作者可以独立运行。
Q: 我可以在没有LangGraph的情况下使用这个吗?
可以。ScrapelessUniversalScrapingTool().invoke({...})是一个普通的可调用对象 — 可以从任何Python脚本、FastAPI路由或Celery任务中调用。LangGraph在其上添加了代理循环,但工具本身是框架无关的。
Q: 我可以用Claude、Gemini或本地模型替换OpenAI吗?
可以。用ChatAnthropic(model="claude-sonnet-4-6")、ChatGoogleGenerativeAI(model="gemini-2.5-pro")、ChatOllama(model="llama3.1")或任何其他LangChain聊天模型替换ChatOpenAI(model="gpt-4o-mini")。tools列表、提示和解析器保持不变。
Q: 我如何添加多轮记忆?
将一个MemorySaver检查点传递给create_agent(llm, tools, checkpointer=MemorySaver()),并在每次调用时提供一个thread_id。LangGraph在回合之间持久化对话状态,因此代理可以在不重新运行的情况下参考早期的搜索。
Q: 我在哪里查看请求跟踪?
设置LANGCHAIN_TRACING_V2=true、LANGCHAIN_API_KEY和LANGCHAIN_PROJECT(第6步)。每个工具调用、LLM调用和解析运行都会在LangSmith中显示其时间、成本和确切提示 — 这是生产部署中观察能力提升最大的变化。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)