AI 辅助 K8s 网络策略智能生成与安全审计:从手动配置到自动化防护

cover

一、K8s 网络策略的配置困境:一条规则遗漏就是一次安全事件

Kubernetes 网络策略(NetworkPolicy)是集群内部流量隔离的核心机制,但它的配置极其繁琐且容易出错。一个中型集群可能有上百个 Service、数千个 Pod,服务间的调用关系错综复杂。手动编写 NetworkPolicy 需要精确指定每个命名空间的每个标签选择器,遗漏一条规则就可能导致数据库 Pod 暴露给所有命名空间。更棘手的是,业务频繁变更——新服务上线、旧服务下线、标签修改——每次变更都可能让已有的网络策略失效。

AI 辅助网络策略生成通过分析集群的实际流量模式,自动推导服务间的合法通信关系,生成最小权限的网络策略,并持续审计策略与实际流量的偏差。

二、智能网络策略生成架构

flowchart TD
    A[集群流量采集] --> A1[Service Mesh 可观测数据]
    A --> A2[NetworkPolicy 审计日志]
    A --> A3[DNS 查询日志]
    A1 --> B[流量关系图谱]
    A2 --> B
    A3 --> B
    B --> C[AI 策略推导引擎]
    C --> C1[合法流量模式识别]
    C --> C2[异常流量检测]
    C --> C3[策略缺口发现]
    C1 --> D[NetworkPolicy 生成]
    C2 --> E[安全告警]
    C3 --> D
    D --> F[策略预览与审批]
    F --> G[灰度发布与验证]

2.1 集群流量关系采集

# traffic_collector.py — 集群流量关系采集
# 设计意图:从 Service Mesh 和 DNS 日志中提取服务间通信关系

from dataclasses import dataclass
from collections import defaultdict

@dataclass
class TrafficEdge:
    src_namespace: str
    src_service: str
    dst_namespace: str
    dst_service: str
    dst_port: int
    protocol: str
    request_count: int
    last_seen: str

class TrafficCollector:
    def __init__(self):
        self.edges: list[TrafficEdge] = []
        self.service_labels: dict[str, dict] = {}  # service → labels

    def parse_istio_access_log(self, log_line: str) -> TrafficEdge | None:
        """从 Istio 访问日志提取流量关系"""
        import json
        try:
            entry = json.loads(log_line)
        except json.JSONDecodeError:
            return None

        source = entry.get("source", {})
        destination = entry.get("destination", {})

        return TrafficEdge(
            src_namespace=source.get("namespace", "unknown"),
            src_service=source.get("workload", "unknown"),
            dst_namespace=destination.get("namespace", "unknown"),
            dst_service=destination.get("workload", "unknown"),
            dst_port=int(destination.get("port", 0)),
            protocol=entry.get("protocol", "TCP"),
            request_count=1,
            last_seen=entry.get("timestamp", ""),
        )

    def build_traffic_graph(self) -> dict[str, list[TrafficEdge]]:
        """构建命名空间维度的流量关系图"""
        graph: dict[str, list[TrafficEdge]] = defaultdict(list)
        for edge in self.edges:
            key = f"{edge.src_namespace}→{edge.dst_namespace}"
            graph[key].append(edge)
        return graph

2.2 AI 策略推导引擎

# policy_generator.py — AI 网络策略推导引擎
# 设计意图:基于流量关系图,用 AI 推导最小权限网络策略

import json

POLICY_GENERATION_PROMPT = """你是一个 Kubernetes 网络安全专家。根据以下服务间流量关系,生成最小权限的 NetworkPolicy。

命名空间: {namespace}
该命名空间的服务: {services}
入站流量来源: {inbound_edges}
出站流量目标: {outbound_edges}

当前已有的 NetworkPolicy: {existing_policies}

要求:
1. 默认拒绝所有入站和出站流量
2. 仅允许上述流量关系中存在的合法通信
3. 使用标签选择器精确匹配源和目标
4. 为每个端口和协议创建独立的规则
5. 检查已有策略是否存在缺口或冗余

输出 JSON:
{{"policies": [{{"name": "...", "spec": {{...}}}}], "gaps": [{{"description": "..."}}], "redundancies": [{{"policy": "...", "reason": "..."}}]}}"""

async def generate_network_policies(
    namespace: str,
    traffic_graph: dict,
    existing_policies: list[dict],
    llm_client,
) -> dict:
    """AI 推导网络策略"""
    inbound = [e for e in traffic_graph.get("inbound", [])]
    outbound = [e for e in traffic_graph.get("outbound", [])]
    services = list(set(
        [e.dst_service for e in inbound] + [e.src_service for e in outbound]
    ))

    prompt = POLICY_GENERATION_PROMPT.format(
        namespace=namespace,
        services=json.dumps(services),
        inbound_edges=json.dumps(inbound[:20], default=str),
        outbound_edges=json.dumps(outbound[:20], default=str),
        existing_policies=json.dumps(existing_policies),
    )

    response = await llm_client.chat(prompt, temperature=0.1)
    try:
        return json.loads(response)
    except json.JSONDecodeError:
        return {"policies": [], "gaps": [], "redundancies": []}

2.3 策略安全审计

# policy_auditor.py — 网络策略安全审计
# 设计意图:检测策略与实际流量的偏差,发现安全风险

from dataclasses import dataclass

@dataclass
class AuditFinding:
    severity: str  # critical, high, medium, low
    category: str  # gap, redundancy, misconfiguration
    description: str
    namespace: str
    policy_name: str | None
    recommendation: str

class NetworkPolicyAuditor:
    def audit(
        self,
        traffic_graph: dict,
        policies: list[dict],
    ) -> list[AuditFinding]:
        """审计网络策略与实际流量的偏差"""
        findings = []

        # 检查1: 无策略的命名空间
        covered_ns = {p["metadata"]["namespace"] for p in policies}
        all_ns = set()
        for key in traffic_graph:
            for ns in key.split("→"):
                all_ns.add(ns)

        for ns in all_ns - covered_ns:
            findings.append(AuditFinding(
                severity="critical",
                category="gap",
                description=f"命名空间 {ns} 没有任何 NetworkPolicy,所有流量默认允许",
                namespace=ns,
                policy_name=None,
                recommendation=f"为 {ns} 创建默认拒绝策略",
            ))

        # 检查2: 过于宽松的规则
        for policy in policies:
            spec = policy.get("spec", {})
            for ingress in spec.get("ingress", []):
                for rule in ingress.get("from", []):
                    if not rule.get("namespaceSelector") and not rule.get("podSelector"):
                        findings.append(AuditFinding(
                            severity="high",
                            category="misconfiguration",
                            description=f"策略 {policy['metadata']['name']} 的入站规则没有选择器,允许所有流量",
                            namespace=policy["metadata"]["namespace"],
                            policy_name=policy["metadata"]["name"],
                            recommendation="添加 namespaceSelector 或 podSelector 限制来源",
                        ))

        return sorted(findings, key=lambda x: ["critical", "high", "medium", "low"].index(x.severity))

三、灰度发布与验证流程

3.1 策略干运行模式

# policy_dryrun.py — 策略干运行验证
# 设计意图:在应用策略前模拟其效果,检测是否阻断合法流量

import subprocess
import json

def dry_run_policy(
    policy: dict,
    namespace: str,
    test_traffic: list[dict],
) -> list[dict]:
    """干运行模式验证策略效果"""
    results = []

    for traffic in test_traffic:
        src_label = traffic.get("src_labels", {})
        dst_label = traffic.get("dst_labels", {})
        port = traffic.get("port", 0)

        # 模拟策略匹配逻辑
        allowed = check_policy_allows(policy, src_label, dst_label, port)

        results.append({
            "traffic": traffic,
            "allowed": allowed,
            "expected": traffic.get("expected", True),
            "match": allowed == traffic.get("expected", True),
        })

    mismatches = [r for r in results if not r["match"]]
    return mismatches


def check_policy_allows(
    policy: dict,
    src_labels: dict,
    dst_labels: dict,
    port: int,
) -> bool:
    """模拟 NetworkPolicy 匹配逻辑"""
    spec = policy.get("spec", {})

    # 检查 podSelector 是否匹配目标
    pod_selector = spec.get("podSelector", {})
    if pod_selector.get("matchLabels"):
        for k, v in pod_selector["matchLabels"].items():
            if dst_labels.get(k) != v:
                return True  # 策略不适用于此 Pod,默认允许

    # 检查入站规则
    for ingress in spec.get("ingress", []):
        for rule in ingress.get("from", []):
            ns_sel = rule.get("namespaceSelector", {})
            pod_sel = rule.get("podSelector", {})
            if matches_selector(src_labels, ns_sel, pod_sel):
                for port_rule in ingress.get("ports", []):
                    if port_rule.get("port") == port:
                        return True

    return False  # 默认拒绝

四、边界分析与架构权衡

流量采集的盲区:Service Mesh 只能采集经过 Sidecar 代理的流量,HostNetwork 模式的 Pod 和 kube-system 命名空间的流量可能被遗漏。需要补充 CNI 层的流量日志来覆盖盲区。

AI 生成策略的可信度:AI 推导的策略可能遗漏低频但合法的流量(如定时任务、管理端口)。建议先以"审计模式"运行,只报告策略缺口,不自动应用策略,人工确认后再开启"执行模式"。

策略数量膨胀:每个服务一个策略会导致策略数量爆炸,增加 API Server 的负担。建议按命名空间维度合并策略,减少策略总数。

流量基线的冷启动:新上线的服务没有历史流量数据,AI 无法推导其合法通信关系。建议为新服务先配置宽松策略,积累 7 天流量数据后再收紧。

五、总结

AI 辅助 K8s 网络策略生成将安全配置从"手动编写"升级为"流量驱动自动推导",通过采集集群实际流量关系,用 AI 推导最小权限策略,并持续审计策略与流量的偏差。落地建议:先以审计模式运行,只报告缺口不自动应用;补充 CNI 层流量日志覆盖盲区;按命名空间合并策略减少数量;新服务先宽松后收紧,积累流量基线后再收紧策略。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐