子解释器不是沙箱,异步爬虫不是 Demo:Python 多租户脚本隔离与一周不崩的工程实践

很多 Python 工程问题,最危险的地方不在“能不能写出来”,而在“你以为它已经安全、稳定、可维护”。子解释器(subinterpreters)和异步爬虫正好是两类典型场景:前者看起来像进程隔离,后者看起来像 asyncio.Queue + worker 就完事;但真正上线以后,坑往往来自隔离边界、资源泄漏、失败风暴、队列堆积和不可观测。

从 Python 3.14 开始,标准库新增了 concurrent.interpreters,用于在同一进程中管理多个解释器;官方文档也明确提醒:解释器默认隔离、不会隐式创建线程,并且并非所有 PyPI 包都已经支持多解释器环境。(Python documentation) 这让“同进程内运行多个租户脚本”有了更清晰的工程选项,但它绝不是“安全沙箱”的代名词。


一、子解释器适合解决什么问题?

子解释器最适合解决的是同一进程内的逻辑隔离和受控协作。你可以把它理解成:比线程更隔离,比进程更轻量,但没有进程级安全边界。

适合的场景主要有四类。

第一,插件系统或租户脚本执行环境。例如一个数据平台允许不同团队提交 Python 转换脚本,你希望每个脚本拥有独立的 sys.modules、全局变量、导入状态和运行上下文,避免 A 租户污染 B 租户。官方文档把解释器定义为 Python 运行时的执行上下文,包含导入状态、内置对象等运行程序所需状态。(Python documentation)

第二,Actor/CSP 风格的并发模型。多个解释器之间默认不共享可变对象,更适合通过消息传递协作。官方文档也提到,多解释器隔离带来的一个副作用,是它更接近 CSP 或 actor 模型。(Python documentation)

第三,CPU 密集任务的多核并行。从 Python 3.12 起,充分隔离的解释器不共享 GIL;结合多个线程使用时,可以实现真正的多核并行。Python 3.14 的“What’s New”也将多解释器描述为“进程的隔离性 + 线程的效率”。(Python documentation)

第四,减少进程数量带来的部署和内存成本。如果你的服务需要同时运行许多相互独立的小型执行上下文,子解释器可能比 multiprocessing 更省资源。但要注意,Python 3.14 文档也列出当前限制:解释器启动尚未完全优化、每个解释器仍有额外内存开销、第三方扩展包兼容性还在演进。(Python documentation)

一个极简示例:

from concurrent import interpreters
from textwrap import dedent

interp = interpreters.create()

interp.exec(dedent("""
    tenant_name = "tenant-a"
    result = sum(range(10_000))
    print(f"{tenant_name}: {result}")
"""))

interp.close()

更接近工程实践的封装:

from concurrent import interpreters
from textwrap import dedent


class TenantScriptRunner:
    def __init__(self, tenant_id: str):
        self.tenant_id = tenant_id
        self.interp = interpreters.create()

    def run(self, user_code: str) -> None:
        bootstrap = f"""
TENANT_ID = {self.tenant_id!r}
"""
        self.interp.exec(bootstrap)
        self.interp.exec(dedent(user_code))

    def close(self) -> None:
        self.interp.close()


runner = TenantScriptRunner("tenant-a")
try:
    runner.run("""
import math

value = math.sqrt(144)
print(TENANT_ID, value)
""")
finally:
    runner.close()

这个例子适合可信或半可信的内部脚本隔离,不适合执行恶意代码。


二、子解释器不适合解决什么问题?

最重要的一句话:子解释器不是安全边界。

官方文档解释得非常直接:同一进程中的解释器在技术上无法严格隔离,因为同一进程内的内存访问限制很少;Python 运行时会尽力隔离,但扩展模块可能轻易破坏这种隔离,因此不应在安全敏感场景中使用多解释器。(Python documentation)

所以它不适合这些问题:

问题 为什么不适合
执行恶意用户代码 同进程共享地址空间,C 扩展、ctypes、文件描述符等都可能突破隔离
权限强隔离 子解释器不等于容器、虚拟机、独立用户、seccomp 或沙箱
隔离文件系统、网络、环境变量 这些是进程/系统级资源,不是解释器天然边界
依赖大量 C 扩展的复杂科学计算栈 部分扩展模块可能尚未适配多解释器
高频创建销毁极小任务 启动和通信成本可能抵消收益
需要共享大量可变对象 多解释器推荐消息传递;可变对象不会自动同步

C API 文档还提醒,因为子解释器属于同一进程,隔离并不完美;例如低级文件操作可能影响彼此打开的文件,某些扩展模块也可能因单阶段初始化或静态全局变量而无法正常工作。(Python documentation)

因此,如果你的场景是“运行陌生人上传的 Python 代码”,优先考虑:

容器 / 微虚拟机 / 独立进程 / 最小权限用户 / seccomp / cgroups / 网络隔离 / 文件系统隔离

子解释器可以做工程隔离,不能做安全沙箱


三、同进程多租户脚本执行的推荐架构

一个更稳妥的多租户脚本系统可以拆成五层:

API 提交脚本

静态检查与配额校验

任务调度器

子解释器执行池

结果队列

指标/日志/追踪

失败队列

关键设计点:

  1. 入口限流:不要让租户无限提交任务。
  2. 执行超时:每个脚本必须有最大运行时间。
  3. 内存和 CPU 配额:这部分子解释器本身无法完整解决,必要时上升到进程或容器。
  4. 禁止把安全假设建立在 exec 包装上
  5. 通信只传数据,不传复杂可变对象

官方文档也说明,多解释器之间的通信通常依赖消息传递;concurrent.interpreters 提供了跨解释器队列,且大多数对象传递时会通过 pickle 复制,可变对象不会自动保持同步。(Python documentation)


四、从子解释器切到爬虫:真正要担心什么?

你给出的异步爬虫示例很典型:

import asyncio

async def worker(name, queue):
    while True:
        url = await queue.get()
        try:
            await fetch(url)
        finally:
            queue.task_done()

async def main(urls):
    queue = asyncio.Queue(maxsize=1000)
    async with asyncio.TaskGroup() as tg:
        for i in range(20):
            tg.create_task(worker(f"w{i}", queue))
        for u in urls:
            await queue.put(u)
        await queue.join()

这段代码作为 demo 很好,但作为“跑一周也不崩”的系统,第一眼我会担心:worker 是无限循环,TaskGroup 无法自然退出。

TaskGroup 会在退出异步上下文时等待组内任务完成;如果 worker 永远 while True,即使 queue.join() 完成,async with 也会一直等。官方文档说明,TaskGroup 会在退出上下文时等待所有任务完成,并在子任务异常时取消其余任务。(Python documentation)

修正版可以用哨兵值:

import asyncio
from dataclasses import dataclass

STOP = object()


@dataclass
class CrawlJob:
    url: str
    retry: int = 0


async def fetch(url: str) -> str:
    # 示例:真实项目中用 aiohttp/httpx,并设置连接池、超时、代理等
    await asyncio.sleep(0.1)
    return f"<html>{url}</html>"


async def parse(html: str) -> dict:
    return {"title": "demo", "length": len(html)}


async def save(item: dict) -> None:
    await asyncio.sleep(0.01)


async def worker(name: str, queue: asyncio.Queue, failed: asyncio.Queue):
    while True:
        job = await queue.get()
        try:
            if job is STOP:
                return

            try:
                async with asyncio.timeout(10):
                    html = await fetch(job.url)
                    item = await parse(html)
                    await save(item)

            except TimeoutError:
                if job.retry < 3:
                    await queue.put(CrawlJob(job.url, job.retry + 1))
                else:
                    await failed.put((job.url, "timeout"))

            except Exception as exc:
                await failed.put((job.url, repr(exc)))

        finally:
            queue.task_done()


async def crawl(urls: list[str], workers: int = 20):
    queue = asyncio.Queue(maxsize=1000)
    failed = asyncio.Queue()

    async with asyncio.TaskGroup() as tg:
        for i in range(workers):
            tg.create_task(worker(f"w{i}", queue, failed))

        for url in urls:
            await queue.put(CrawlJob(url))

        await queue.join()

        for _ in range(workers):
            await queue.put(STOP)

    return failed

asyncio.Queue(maxsize=1000) 的意义不是“缓存多一点”,而是背压。官方文档说明,asyncio.Queue 在达到 maxsize 后,put() 会等待直到队列有空位;join() 会等待所有已入队任务被处理,并依赖消费者调用 task_done()。(Python documentation)


五、跑一周也不崩的爬虫架构

一个可靠爬虫不是“20 个 worker 同时 fetch”,而是下面这套闭环:

URL Seed

去重/规范化

优先级队列

按域名限速

Fetch

解析

存储

重试队列

失败队列/DLQ

指标/日志/追踪

我最先关心这七件事。

1. 限速:不要把对方网站打挂,也不要把自己打挂。

按域名设置并发限制,而不是全局开 1000 个协程:

from collections import defaultdict
from urllib.parse import urlparse
import asyncio

host_limits = defaultdict(lambda: asyncio.Semaphore(3))

async def limited_fetch(url: str) -> str:
    host = urlparse(url).netloc
    async with host_limits[host]:
        return await fetch(url)

2. 超时:所有外部 I/O 必须有 deadline。

async def safe_fetch(url: str) -> str:
    async with asyncio.timeout(8):
        return await limited_fetch(url)

asyncio.timeout() 会通过取消当前任务并将取消转换为 TimeoutError 来处理超时,适合给网络请求设置边界。(Python documentation)

3. 重试:只重试瞬时错误,不重试永久错误。

def should_retry(status_code: int) -> bool:
    return status_code in {408, 429, 500, 502, 503, 504}


def backoff_seconds(retry: int) -> float:
    return min(60, 2 ** retry)

真实系统还要加 jitter,避免所有任务同时重试形成“重试风暴”。

4. 失败队列:不要让失败消失在日志里。

@dataclass
class FailedJob:
    url: str
    reason: str
    retry: int


async def send_to_dlq(failed: asyncio.Queue, job: CrawlJob, reason: str):
    await failed.put(FailedJob(job.url, reason, job.retry))

5. 幂等存储:同一个 URL 重复抓取不能写出脏数据。

存储层应使用唯一键,例如:

source + normalized_url + content_hash

6. 可恢复:进程重启后不能从零开始。

内存队列适合单进程 demo;长期任务建议用 Redis Streams、Kafka、RabbitMQ、数据库任务表或对象存储 checkpoint。

7. 优雅停机:收到 SIGTERM 后停止接新任务,等待当前任务完成,未完成任务回写队列。


六、可观测性:不要靠 print 猜问题

“跑一周也不崩”的系统,必须回答这些问题:

  • 当前队列积压多少?
  • 每分钟成功多少、失败多少?
  • 哪些域名最慢?
  • 重试是否突然升高?
  • 解析失败是代码问题还是页面结构变化?
  • worker 是否卡死?
  • 存储层是否成为瓶颈?

最小可用指标可以这样设计:

from prometheus_client import Counter, Gauge, Histogram, start_http_server

FETCH_TOTAL = Counter(
    "crawler_fetch_total",
    "Total fetch attempts",
    ["status"]
)

FETCH_LATENCY = Histogram(
    "crawler_fetch_latency_seconds",
    "Fetch latency in seconds",
    ["host"]
)

QUEUE_SIZE = Gauge(
    "crawler_queue_size",
    "Current crawl queue size"
)


async def observed_fetch(url: str, queue: asyncio.Queue) -> str:
    from urllib.parse import urlparse
    host = urlparse(url).netloc
    QUEUE_SIZE.set(queue.qsize())

    with FETCH_LATENCY.labels(host=host).time():
        try:
            html = await safe_fetch(url)
            FETCH_TOTAL.labels(status="success").inc()
            return html
        except Exception:
            FETCH_TOTAL.labels(status="error").inc()
            raise


if __name__ == "__main__":
    start_http_server(8000)

Prometheus 官方文档强调,服务要被监控,需要在代码中通过客户端库埋点,并通过 HTTP endpoint 暴露内部指标;Python 客户端也支持用装饰器或上下文记录请求次数和耗时。(Prometheus)

日志也要结构化:

import logging
import json
import time

logger = logging.getLogger("crawler")


def log_event(event: str, **fields):
    record = {
        "event": event,
        "ts": time.time(),
        **fields,
    }
    logger.info(json.dumps(record, ensure_ascii=False))


log_event(
    "fetch_failed",
    url="https://example.com/a",
    retry=2,
    reason="timeout",
    worker="w3",
)

更进一步,可以接入 OpenTelemetry,把日志、指标、链路追踪统一起来。OpenTelemetry Python 文档说明,它可以用 Python API 和 SDK 生成、收集 metrics、logs 和 traces;其中 traces 和 metrics 处于 stable 状态,logs 仍标为 development。(OpenTelemetry)


七、子解释器与爬虫结合时的边界

假设你要做一个“多租户可编程爬虫平台”:平台负责抓取、限速、重试、监控;租户只提交解析脚本。

这时一个合理分工是:

主进程:
  - URL 调度
  - 限速
  - fetch
  - 重试
  - 失败队列
  - 指标与追踪

子解释器:
  - 执行租户解析逻辑
  - 隔离租户 import/global/module 状态
  - 通过消息传递接收 HTML 和返回结构化数据

不要让租户脚本直接控制网络、文件、环境变量和系统调用。即便用了子解释器,也应该把“危险能力”收回平台层。

一个工程原则是:

子解释器隔离 Python 运行时状态;
进程/容器隔离操作系统资源;
权限系统隔离数据访问;
可观测性隔离故障排查成本。

八、最佳实践清单

上线前,我会逐项检查:

  1. worker 是否能退出?
  2. 所有外部 I/O 是否有超时?
  3. 队列是否有上限?
  4. 是否有按域名限速?
  5. 重试是否有上限、退避和 jitter?
  6. 失败任务是否进入 DLQ?
  7. 存储是否幂等?
  8. 指标是否覆盖吞吐、延迟、失败率、队列长度?
  9. 日志是否结构化,并带 urlhosttenant_idtrace_id
  10. 租户脚本是否被限制在可控能力范围内?
  11. 是否错误地把子解释器当成安全沙箱?
  12. 第三方扩展库是否验证过多解释器兼容性?

结语

子解释器让 Python 在“同进程隔离”和“多核并行”之间多了一把新工具;异步爬虫让我们用很少的线程处理大量 I/O。但工程世界里,工具本身从不自动带来可靠性。

真正的可靠性来自边界感:知道子解释器隔离什么、不隔离什么;知道 asyncio 并发什么、不保证什么;知道日志只是线索,指标和追踪才是系统的仪表盘。

写 Python 越久,我越相信一句话:优雅不是代码短,而是系统在出错时仍然清楚、克制、可恢复。你的爬虫能跑一天,不代表它能跑一周;你的脚本能隔离变量,不代表它能隔离恶意。把这些边界想清楚,才是从“会写 Python”走向“会用 Python 构建系统”的分水岭。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐