AI Agent Harness Engineering 合同审核实践:法律场景的自动化应用

引言

在当今数字化转型的浪潮中,法律行业也正在经历前所未有的变革。合同审核作为法律工作中的核心环节,长期以来一直以其繁琐、耗时且对专业知识要求极高的特点而著称。一份复杂的商业合同可能包含数百页的内容,需要律师逐字逐句地审查,以确保没有风险点、遗漏或不利条款。这个过程不仅消耗大量的人力和时间成本,而且由于人为疲劳或疏忽,还可能存在一定的错误风险。

随着人工智能技术的快速发展,特别是大语言模型(LLM)和AI Agent技术的兴起,我们现在有机会彻底改变这一传统工作方式。AI Agent Harness Engineering(AI代理驾驭工程)作为一个新兴领域,专注于设计、构建和部署能够自主完成特定任务的AI代理系统。在合同审核场景中,这些AI代理可以被训练来识别关键条款、评估风险、比对标准模板,并最终提供专业的审核意见。

本文将深入探讨如何应用AI Agent Harness Engineering技术来构建合同审核系统,从核心概念到实际部署,从算法原理到代码实现,全方位地展示这一技术在法律场景中的应用。

1. 核心概念解析

1.1 AI Agent Harness Engineering 定义与内涵

AI Agent Harness Engineering是一门综合性的工程学科,它融合了人工智能、软件工程、系统设计等多个领域的知识,旨在设计、构建和管理能够自主执行复杂任务的AI代理系统。

核心概念

  • AI代理(Agent):具有感知环境、做出决策并执行行动能力的自主实体
  • 驾驭(Harness):有效地控制和引导AI代理的能力,使其按照预期目标工作
  • 工程(Engineering):系统化的设计、开发、测试和部署方法

在合同审核的语境下,AI Agent Harness Engineering就是要构建一个能够"理解"合同内容、识别关键信息、评估法律风险,并最终提供专业审核意见的AI代理系统。

1.2 合同审核的挑战与自动化需求

问题背景
传统合同审核流程通常涉及以下步骤:

  1. 律师收到合同文档
  2. 通读全文,理解合同背景和目的
  3. 识别关键条款(如责任限制、知识产权、终止条款等)
  4. 比对公司标准模板和政策
  5. 识别潜在风险和不利条款
  6. 撰写审核意见和修改建议
  7. 与对方律师沟通协商

这个过程存在以下痛点:

  • 时间成本高:一份复杂合同可能需要数小时甚至数天来审核
  • 人力成本高:需要专业律师参与,费用昂贵
  • 一致性问题:不同律师可能对同一条款有不同解读
  • 疲劳导致的错误:长时间审核可能导致遗漏重要风险点
  • 知识更新挑战:法律法规和判例不断更新,律师需要持续学习

1.3 AI Agent在法律科技中的应用现状

近年来,AI技术在法律领域的应用已经取得了显著进展:

  • 合同分析:使用NLP技术提取合同关键信息
  • 法律研究:AI辅助案例检索和法律条文分析
  • 文档生成:自动生成标准法律文档
  • 预测分析:预测案件结果和法律风险
  • 电子发现:在大量文档中自动识别相关信息

然而,大多数现有应用仍处于"工具"层面,需要人类律师的大量干预。AI Agent Harness Engineering则致力于构建更自主、更智能的系统,能够端到端地完成合同审核任务。

2. 合同审核AI Agent系统架构设计

2.1 系统总体架构

构建一个合同审核AI Agent系统需要多方面的技术组件协同工作。以下是我们设计的系统总体架构:

知识与数据层

专业代理层

代理协调层

用户界面层

合同上传接口

审核任务管理

审核结果展示

人工反馈接口

任务分解器

代理管理器

结果整合器

质量评估器

文档解析代理

条款识别代理

风险评估代理

标准比对代理

建议生成代理

合同模板库

法律知识库

风险案例库

公司政策库

2.2 核心概念结构与要素组成

我们的合同审核AI Agent系统由以下核心概念和要素组成:

概念 定义 核心属性 在系统中的作用
合同文档 需要审核的法律合同文件 格式多样性、内容复杂性、法律专业性 系统的输入和处理对象
审核代理 执行特定审核任务的AI实体 专业性、自主性、协作性 系统的核心执行单元
法律知识 法律法规、判例、法律原则 动态更新、领域专业性、解释多样性 代理决策的依据
风险点 合同中可能带来法律风险的条款 严重性、发生概率、影响范围 审核的主要识别对象
审核意见 对合同的分析和建议 准确性、可操作性、法律依据 系统的主要输出
反馈循环 用户对审核结果的评价和修正 实时性、有效性、持续性 系统持续优化的机制

2.3 概念之间的关系

下面是系统核心概念之间的实体关系图:

创建

包含

分配给

生成

使用

包含

识别

包含

提供

针对

更新

USER

TASK

CONTRACT

AGENT

ANALYSIS

KNOWLEDGE_BASE

RISK

RECOMMENDATION

FEEDBACK

这些概念和它们之间的关系构成了我们合同审核AI Agent系统的基础框架。在接下来的章节中,我们将深入探讨每个组件的具体实现和工作原理。

3. 合同文档预处理与解析技术

3.1 合同文档的多样性挑战

合同文档可能以多种格式存在,每种格式都有其独特的解析挑战:

  • 非结构化文本:纯文本文件,缺乏格式信息
  • 半结构化文档:Word文档、PDF文件,包含格式和布局信息
  • 扫描文档:需要OCR技术转换为可处理的文本
  • 表格数据:合同中经常包含表格,需要特殊处理
  • 多语言合同:国际化业务中常见的多语言合同

3.2 文档解析代理设计

文档解析代理是系统的第一个专业代理,负责将各种格式的合同文档转换为系统可处理的结构化数据。

PDF

Word

扫描件

纯文本

接收合同文档

文档类型判断

PDF解析器

Word解析器

OCR处理器

文本清洗器

布局分析

段落识别

标题层级识别

条款边界检测

元数据提取

结构化输出

3.3 文档解析实现代码

下面是一个简化版的文档解析代理实现,使用Python编写:

import os
import re
from typing import List, Dict, Any
from PyPDF2 import PdfReader
from docx import Document
import pytesseract
from PIL import Image
import pdf2image

class ContractParser:
    """
    合同文档解析器
    支持多种格式合同文档的解析和结构化处理
    """
    
    def __init__(self):
        self.supported_formats = ['.pdf', '.docx', '.doc', '.txt']
        self.section_pattern = re.compile(r'^(第[一二三四五六七八九十百千万]+[章节条])|^(\d+\.)|^([A-Z][a-z]+\s\d+)')
        self.title_pattern = re.compile(r'^(.{0,100})$')  # 简化的标题匹配模式
    
    def parse(self, file_path: str) -> Dict[str, Any]:
        """
        解析合同文档的主入口方法
        
        参数:
            file_path: 合同文档文件路径
            
        返回:
            包含结构化合同内容的字典
        """
        if not os.path.exists(file_path):
            raise FileNotFoundError(f"文件不存在: {file_path}")
        
        file_ext = os.path.splitext(file_path)[1].lower()
        if file_ext not in self.supported_formats:
            raise ValueError(f"不支持的文件格式: {file_ext}")
        
        # 根据文件类型选择解析方法
        if file_ext == '.pdf':
            raw_text, metadata = self._parse_pdf(file_path)
        elif file_ext in ['.docx', '.doc']:
            raw_text, metadata = self._parse_word(file_path)
        elif file_ext == '.txt':
            raw_text, metadata = self._parse_text(file_path)
        
        # 结构化处理
        structured_content = self._structure_content(raw_text)
        
        return {
            'metadata': metadata,
            'raw_text': raw_text,
            'structured_content': structured_content
        }
    
    def _parse_pdf(self, file_path: str) -> tuple:
        """解析PDF文件"""
        text = ""
        metadata = {}
        
        try:
            reader = PdfReader(file_path)
            
            # 提取元数据
            metadata = {
                'title': reader.metadata.title if reader.metadata.title else "未知标题",
                'author': reader.metadata.author if reader.metadata.author else "未知作者",
                'pages': len(reader.pages)
            }
            
            # 尝试直接提取文本
            for page in reader.pages:
                try:
                    page_text = page.extract_text()
                    if page_text and len(page_text.strip()) > 0:
                        text += page_text + "\n\n"
                except:
                    continue
            
            # 如果直接提取失败,尝试OCR
            if not text or len(text.strip()) < 100:
                text = self._ocr_pdf(file_path)
                
        except Exception as e:
            print(f"PDF解析错误: {str(e)}")
            # 尝试OCR作为备选方案
            text = self._ocr_pdf(file_path)
        
        return text, metadata
    
    def _ocr_pdf(self, file_path: str) -> str:
        """使用OCR技术解析扫描的PDF文件"""
        text = ""
        try:
            # 将PDF转换为图像
            images = pdf2image.convert_from_path(file_path)
            
            # 对每个图像进行OCR处理
            for image in images:
                page_text = pytesseract.image_to_string(image, lang='chi_sim+eng')
                text += page_text + "\n\n"
        except Exception as e:
            print(f"OCR处理错误: {str(e)}")
        
        return text
    
    def _parse_word(self, file_path: str) -> tuple:
        """解析Word文档"""
        text = ""
        metadata = {}
        
        try:
            doc = Document(file_path)
            
            # 提取元数据
            core_properties = doc.core_properties
            metadata = {
                'title': core_properties.title if core_properties.title else "未知标题",
                'author': core_properties.author if core_properties.author else "未知作者",
            }
            
            # 提取文本内容
            for para in doc.paragraphs:
                text += para.text + "\n"
                
            # 提取表格内容(简化处理)
            for table in doc.tables:
                for row in table.rows:
                    row_text = " | ".join([cell.text for cell in row.cells])
                    text += row_text + "\n"
                text += "\n"
                
        except Exception as e:
            print(f"Word文档解析错误: {str(e)}")
        
        return text, metadata
    
    def _parse_text(self, file_path: str) -> tuple:
        """解析纯文本文件"""
        text = ""
        metadata = {'title': os.path.basename(file_path)}
        
        try:
            with open(file_path, 'r', encoding='utf-8') as f:
                text = f.read()
        except UnicodeDecodeError:
            # 尝试其他编码
            encodings = ['gbk', 'gb2312', 'latin-1']
            for encoding in encodings:
                try:
                    with open(file_path, 'r', encoding=encoding) as f:
                        text = f.read()
                    break
                except:
                    continue
        
        return text, metadata
    
    def _structure_content(self, raw_text: str) -> List[Dict[str, Any]]:
        """
        将原始文本转换为结构化内容
        
        参数:
            raw_text: 原始合同文本
            
        返回:
            结构化的合同内容,按条款组织
        """
        lines = raw_text.split('\n')
        structured_content = []
        current_section = None
        current_subsection = None
        paragraph_buffer = []
        
        for line in lines:
            line = line.strip()
            if not line:
                # 空行,可能表示段落结束
                if paragraph_buffer:
                    self._add_to_structured_content(
                        structured_content, current_section, 
                        current_subsection, ' '.join(paragraph_buffer)
                    )
                    paragraph_buffer = []
                continue
            
            # 检查是否是新的章节/条款
            section_match = self.section_pattern.match(line)
            if section_match:
                # 先保存之前的段落
                if paragraph_buffer:
                    self._add_to_structured_content(
                        structured_content, current_section, 
                        current_subsection, ' '.join(paragraph_buffer)
                    )
                    paragraph_buffer = []
                
                # 判断是章节还是子条款
                if line.startswith('第') and ('章' in line or '节' in line or '条' in line):
                    current_section = line
                    current_subsection = None
                else:
                    current_subsection = line
                
                # 添加标题
                self._add_to_structured_content(
                    structured_content, current_section, 
                    current_subsection, line, is_title=True
                )
            else:
                # 普通文本,添加到段落缓冲区
                paragraph_buffer.append(line)
        
        # 处理最后一个段落
        if paragraph_buffer:
            self._add_to_structured_content(
                structured_content, current_section, 
                current_subsection, ' '.join(paragraph_buffer)
            )
        
        return structured_content
    
    def _add_to_structured_content(self, structured_content: List[Dict[str, Any]], 
                                    section: str, subsection: str, 
                                    content: str, is_title: bool = False):
        """将内容添加到结构化列表中"""
        item = {
            'section': section,
            'subsection': subsection,
            'content': content,
            'is_title': is_title,
            'type': 'title' if is_title else 'paragraph'
        }
        structured_content.append(item)


# 使用示例
if __name__ == "__main__":
    parser = ContractParser()
    
    # 解析示例合同
    contract_path = "sample_contract.pdf"  # 假设有一个示例合同文件
    if os.path.exists(contract_path):
        result = parser.parse(contract_path)
        print(f"合同标题: {result['metadata'].get('title', '未知')}")
        print(f"合同页数: {result['metadata'].get('pages', '未知')}")
        print(f"结构化条款数量: {len(result['structured_content'])}")
        
        # 输出前几个结构化条款
        print("\n前5个结构化条款:")
        for i, item in enumerate(result['structured_content'][:5]):
            print(f"{i+1}. [{item['section'] or '无章节'}] {item['content'][:100]}...")
    else:
        print(f"示例合同文件不存在: {contract_path}")
        print("请提供一个有效的合同文件路径进行测试")

这个文档解析器实现了基本的合同解析功能,包括:

  1. 支持多种格式的合同文档(PDF、Word、纯文本)
  2. 对扫描版PDF的OCR处理能力
  3. 基本的文档结构识别(章节、条款)
  4. 结构化输出合同内容

在实际应用中,这个解析器还需要进一步优化,特别是在处理复杂文档布局、特殊条款格式等方面。

4. 条款识别与风险评估算法

4.1 关键条款识别技术

合同中的关键条款识别是AI代理的核心能力之一。这些关键条款通常包括:

  • parties(合同双方)
  • payment terms(付款条款)
  • term and termination(期限与终止条款)
  • liability(责任条款)
  • intellectual property(知识产权条款)
  • confidentiality(保密条款)
  • dispute resolution(争议解决条款)
  • indemnification(赔偿条款)
  • force majeure(不可抗力条款)
  • governing law(适用法律条款)
4.1.1 基于规则的条款识别方法

在AI技术成熟之前,基于规则的方法是条款识别的主要手段。这种方法依赖于人工编写的规则和模式匹配:

import re
from typing import List, Dict, Any

class RuleBasedClauseIdentifier:
    """
    基于规则的合同条款识别器
    """
    
    def __init__(self):
        # 定义关键条款的正则表达式模式
        self.clause_patterns = {
            'parties': [
                r'(?:合同)?双方[::]',
                r'(?:甲方|乙方|丙方)[::]',
                r'以下简称["“](甲方|乙方|丙方)["”]',
            ],
            'payment_terms': [
                r'(?:付款|支付|价款|费用)条款?',
                r'(?:付款|支付)方式',
                r'(?:合同)?总价(?:款)?',
            ],
            'term_termination': [
                r'(?:合同)?期限',
                r'终止(?:条款)?',
                r'解除(?:合同)?',
            ],
            'liability': [
                r'违约责任?',
                r'(?:损害)?赔偿(?:责任)?',
                r'责任限制?',
            ],
            'ip': [
                r'知识产权',
                r'(?:专利|商标|著作权|版权)',
                r'技术(?:秘密|成果)',
            ],
            'confidentiality': [
                r'保密(?:条款)?',
                r'不披露',
                r'商业秘密',
            ],
            'dispute_resolution': [
                r'争议(?:解决)?',
                r'(?:仲裁|诉讼)',
                r'管辖(?:法院)?',
            ],
            'indemnification': [
                r'赔偿(?:条款)?',
                r'补偿',
                r'免责',
            ],
            'force_majeure': [
                r'不可抗力',
                r'意外事件',
            ],
            'governing_law': [
                r'适用法律',
                r'准据法',
                r'(?:合同)?签订地',
            ]
        }
        
        # 编译正则表达式
        self.compiled_patterns = {}
        for clause_type, patterns in self.clause_patterns.items():
            self.compiled_patterns[clause_type] = [
                re.compile(pattern, re.IGNORECASE) for pattern in patterns
            ]
    
    def identify_clauses(self, structured_content: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """
        从结构化合同内容中识别关键条款
        
        参数:
            structured_content: 结构化的合同内容
            
        返回:
            识别出的关键条款列表
        """
        identified_clauses = []
        
        for i, item in enumerate(structured_content):
            content = item['content']
            
            for clause_type, patterns in self.compiled_patterns.items():
                for pattern in patterns:
                    if pattern.search(content):
                        # 找到匹配的条款,获取上下文
                        clause_context = self._get_clause_context(structured_content, i)
                        
                        identified_clauses.append({
                            'type': clause_type,
                            'title': content if item['is_title'] else None,
                            'content': clause_context,
                            'position': i,
                            'section': item['section'],
                            'subsection': item['subsection'],
                            'match_pattern': pattern.pattern
                        })
                        break  # 一种条款类型只匹配一次
                else:
                    continue  # 内部循环正常结束,没有匹配到
                break  # 已经找到匹配,跳出当前条款类型的循环
        
        return identified_clauses
    
    def _get_clause_context(self, structured_content: List[Dict[str, Any]], 
                            position: int, context_size: int = 5) -> str:
        """
        获取条款周围的上下文内容
        
        参数:
            structured_content: 结构化的合同内容
            position: 当前条款位置
            context_size: 上下文大小(前后各多少条)
            
        返回:
            条款上下文内容
        """
        start = max(0, position - context_size)
        end = min(len(structured_content), position + context_size + 1)
        
        context_items = structured_content[start:end]
        context_text = '\n'.join([item['content'] for item in context_items])
        
        return context_text

基于规则的方法有其优势,比如实现简单、结果可解释性强,但也存在明显的局限性:

  1. 无法覆盖所有可能的条款表达方式
  2. 规则维护成本高,需要不断更新
  3. 对合同语言的变化和多样性适应能力差
4.1.2 基于机器学习的条款识别方法

随着自然语言处理技术的发展,基于机器学习的方法逐渐成为条款识别的主流。特别是预训练语言模型的出现,使得这一任务的准确率大幅提升。

import torch
from transformers import BertTokenizer, BertForSequenceClassification
from typing import List, Dict, Any
import numpy as np

class MLBasedClauseIdentifier:
    """
    基于机器学习的合同条款识别器
    使用BERT模型进行条款分类
    """
    
    def __init__(self, model_path: str = None):
        """
        初始化条款识别器
        
        参数:
            model_path: 预训练模型路径,如果为None则使用默认模型
        """
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        
        # 条款类型列表
        self.clause_types = [
            'parties', 'payment_terms', 'term_termination', 'liability', 
            'ip', 'confidentiality', 'dispute_resolution', 'indemnification',
            'force_majeure', 'governing_law', 'other'
        ]
        
        self.id2label = {i: label for i, label in enumerate(self.clause_types)}
        self.label2id = {label: i for i, label in enumerate(self.clause_types)}
        
        # 加载模型和分词器
        if model_path:
            # 使用自定义模型
            self.tokenizer = BertTokenizer.from_pretrained(model_path)
            self.model = BertForSequenceClassification.from_pretrained(
                model_path,
                num_labels=len(self.clause_types),
                id2label=self.id2label,
                label2id=self.label2id
            )
        else:
            # 使用默认的中文BERT模型(实际应用中应该使用在法律文本上微调过的模型)
            self.tokenizer = BertTokenizer.from_pretrained('bert-base-chinese')
            self.model = BertForSequenceClassification.from_pretrained(
                'bert-base-chinese',
                num_labels=len(self.clause_types),
                id2label=self.id2label,
                label2id=self.label2id
            )
            print("警告: 使用未经过法律文本微调的通用模型,识别准确率可能较低")
        
        self.model.to(self.device)
        self.model.eval()
    
    def identify_clauses(self, structured_content: List[Dict[str, Any]], 
                        threshold: float = 0.7) -> List[Dict[str, Any]]:
        """
        从结构化合同内容中识别关键条款
        
        参数:
            structured_content: 结构化的合同内容
            threshold: 分类置信度阈值
            
        返回:
            识别出的关键条款列表
        """
        identified_clauses = []
        
        # 批量处理文本,提高效率
        texts = [item['content'] for item in structured_content]
        
        # 分批处理,避免内存溢出
        batch_size = 8
        for i in range(0, len(texts), batch_size):
            batch_texts = texts[i:i+batch_size]
            
            # 分词和编码
            inputs = self.tokenizer(
                batch_texts,
                padding=True,
                truncation=True,
                max_length=512,
                return_tensors="pt"
            )
            
            # 将输入移到设备上
            inputs = {k: v.to(self.device) for k, v in inputs.items()}
            
            # 模型推理
            with torch.no_grad():
                outputs = self.model(**inputs)
                probabilities = torch.softmax(outputs.logits, dim=1)
                predictions = torch.argmax(probabilities, dim=1)
            
            # 处理结果
            for j, (pred, prob) in enumerate(zip(predictions, probabilities)):
                clause_type = self.id2label[pred.item()]
                confidence = prob[pred.item()].item()
                
                # 只保留置信度超过阈值且不是"other"类型的条款
                if clause_type != 'other' and confidence >= threshold:
                    position = i + j
                    item = structured_content[position]
                    
                    # 获取条款上下文
                    clause_context = self._get_clause_context(structured_content, position)
                    
                    identified_clauses.append({
                        'type': clause_type,
                        'title': content if item['is_title'] else None,
                        'content': clause_context,
                        'position': position,
                        'section': item['section'],
                        'subsection': item['subsection'],
                        'confidence': confidence
                    })
        
        return identified_clauses
    
    def _get_clause_context(self, structured_content: List[Dict[str, Any]], 
                            position: int, context_size: int = 5) -> str:
        """
        获取条款周围的上下文内容(与规则方法相同)
        """
        start = max(0, position - context_size)
        end = min(len(structured_content), position + context_size + 1)
        
        context_items = structured_content[start:end]
        context_text = '\n'.join([item['content'] for item in context_items])
        
        return context_text

基于机器学习的方法,特别是使用预训练语言模型的方法,在条款识别任务上通常能取得更好的效果。不过,要在法律领域取得最佳效果,通常需要在专门的法律文本语料上对模型进行微调。

4.2 风险评估模型

识别出关键条款后,下一步是评估这些条款中可能存在的法律风险。风险评估是一个复杂的任务,需要结合法律知识、商业考量和具体上下文。

4.2.1 风险评分模型

我们可以使用一个多维度的风险评分模型来评估合同条款的风险程度:

RiskScore=∑i=1nwi⋅fi(c)RiskScore = \sum_{i=1}^{n} w_i \cdot f_i(c)RiskScore=i=1nwifi(c)

其中:

  • RiskScoreRiskScoreRiskScore 是整体风险评分
  • wiw_iwi 是第 iii 个风险维度的权重
  • fi(c)f_i(c)fi(c) 是条款 ccc 在第 iii 个风险维度上的评分函数

关键风险维度可能包括:

  1. 财务风险:可能导致经济损失的风险
  2. 责任风险:可能导致法律责任的风险
  3. 运营风险:可能影响业务运营的风险
  4. 合规风险:可能违反法律法规的风险
  5. 声誉风险:可能影响公司声誉的风险

每个维度可以根据具体情况进行进一步细化和量化。

4.2.2 风险评估实现

下面是一个简化的风险评估代理实现:

import re
from typing import List, Dict, Any, Tuple
import numpy as np

class RiskAssessmentAgent:
    """
    合同风险评估代理
    """
    
    def __init__(self, company_policy: Dict[str, Any] = None):
        """
        初始化风险评估代理
        
        参数:
            company_policy: 公司政策和标准条款偏好
        """
        # 默认公司政策
        self.company_policy = company_policy or {
            'max_liability_limit': '1000000',  # 最高责任限额(元)
            'payment_terms': 30,  # 付款期限(天)
            'preferred_governing_law': ['中国法律', '中华人民共和国法律'],
            'preferred_dispute_resolution': ['仲裁', '北京仲裁委员会'],
            'ip_ownership': '我方',  # 知识产权归属偏好
        }
        
        # 风险关键词库(简化版)
        self.risk_keywords = {
            'high': [
                '无限责任', '全部损失', '任何索赔', '不可撤销', 
                '绝对保证', '全额赔偿', '承担一切责任'
            ],
            'medium': [
                '连带责任', '赔偿损失', '违约责任', '终止合同',
                '解除协议', '没收保证金', '罚款'
            ],
            'low': [
                '应当及时', '尽力协助', '原则上', '尽量',
                '协商解决', '友好协商'
            ]
        }
        
        # 风险维度权重
        self.risk_dimensions = {
            'financial': 0.3,    # 财务风险
            'liability': 0.3,    # 责任风险
            'operational': 0.15, # 运营风险
            'compliance': 0.15,  # 合规风险
            'reputation': 0.1    # 声誉风险
        }
    
    def assess_clause_risk(self, clause: Dict[str, Any]) -> Dict[str, Any]:
        """
        评估单个条款的风险
        
        参数:
            clause: 识别出的条款信息
            
        返回:
            条款风险评估结果
        """
        content = clause['content']
        clause_type = clause['type']
        
        # 初始化风险评估结果
        risk_assessment = {
            'clause_type': clause_type,
            'overall_risk_score': 0,
            'risk_level': 'low',
            'risk_dimensions': {},
            'risk_indicators': [],
            'recommendations': []
        }
        
        # 1. 基于关键词的风险检测
        keyword_risks = self._detect_keyword_risks(content)
        risk_assessment['risk_indicators'].extend(keyword_risks)
        
        # 2. 基于条款类型的专项风险评估
        specific_risks = self._assess_specific_clause_risks(clause)
        risk_assessment['risk_indicators'].extend(specific_risks)
        
        # 3. 与公司政策比对
        policy_risks = self._check_against_company_policy(clause)
        risk_assessment['risk_indicators'].extend(policy_risks)
        
        # 4. 计算各维度风险评分
        for dimension in self.risk_dimensions:
            dimension_score = self._calculate_dimension_score(
                dimension, risk_assessment['risk_indicators']
            )
            risk_assessment['risk_dimensions'][dimension] = dimension_score
        
        # 5. 计算整体风险评分
        overall_score = sum(
            self.risk_dimensions[dim] * risk_assessment['risk_dimensions'][dim]
            for dim in self.risk_dimensions
        )
        risk_assessment['overall_risk_score'] = min(10, overall_score)  # 限制在0-10分
        
        # 6. 确定风险等级
        if risk_assessment['overall_risk_score'] >= 7:
            risk_assessment['risk_level'] = 'high'
        elif risk_assessment['overall_risk_score'] >= 4:
            risk_assessment['risk_level'] = 'medium'
        else:
            risk_assessment['risk_level'] = 'low'
        
        # 7. 生成修改建议
        risk_assessment['recommendations'] = self._generate_recommendations(
            risk_assessment
        )
        
        return risk_assessment
    
    def assess_all_clauses(self, clauses: List[Dict[str, Any]]) -> Dict[str, Any]:
        """
        评估所有条款的风险,并给出合同整体风险评估
        
        参数:
            clauses: 识别出的所有条款列表
            
        返回:
            整体合同风险评估结果
        """
        clause_assessments = []
        high_risk_clauses = []
        medium_risk_clauses = []
        
        # 评估每个条款
        for clause in clauses:
            assessment = self.assess_clause_risk(clause)
            clause_assessments.append({
                'clause': clause,
                'assessment': assessment
            })
            
            if assessment['risk_level'] == 'high':
                high_risk_clauses.append(assessment)
            elif assessment['risk_level'] == 'medium':
                medium_risk_clauses.append(assessment)
        
        # 计算合同整体风险评分
        if clause_assessments:
            # 高风险条款权重更高
            weighted_scores = []
            for ca in clause_assessments:
                score = ca['assessment']['overall_risk_score']
                if ca['assessment']['risk_level'] == 'high':
                    weighted_scores.append(score * 2)  # 高风险条款权重加倍
                elif ca['assessment']['risk_level'] == 'medium':
                    weighted_scores.append(score * 1.5)  # 中等风险条款权重1.5倍
                else:
                    weighted_scores.append(score)
            
            overall_contract_risk = min(10, np.mean(weighted_scores))
        else:
            overall_contract_risk = 0
        
        # 确定合同整体风险等级
        if overall_contract_risk >= 7:
            contract_risk_level = 'high'
        elif overall_contract_risk >= 4:
            contract_risk_level = 'medium'
        else:
            contract_risk_level = 'low'
        
        return {
            'overall_contract_risk_score': overall_contract_risk,
            'overall_contract_risk_level': contract_risk_level,
            'clause_assessments': clause_assessments,
            'high_risk_clauses_count': len(high_risk_clauses),
            'medium_risk_clauses_count': len(medium_risk_clauses),
            'summary': self._generate_contract_summary(
                overall_contract_risk, contract_risk_level,
                high_risk_clauses, medium_risk_clauses
            )
        }
    
    def _detect_keyword_risks(self, content: str) -> List[Dict[str, Any]]:
        """
        基于关键词检测风险
        """
        risks = []
        
        for risk_level, keywords in self.risk_keywords.items():
            for keyword in keywords:
                if keyword in content:
                    risks.append({
                        'type': 'keyword_match',
                        'keyword': keyword,
                        'risk_level': risk_level,
                        'description': f'发现潜在风险关键词: {keyword}'
                    })
        
        return risks
    
    def _assess_specific_clause_risks(self, clause: Dict[str, Any]) -> List[Dict[str, Any]]:
        """
        基于条款类型的专项风险评估
        """
        risks = []
        clause_type = clause['type']
        content = clause['content']
        
        # 根据不同条款类型进行专项评估
        if clause_type == 'liability':
            risks.extend(self._assess_liability_clause(content))
        elif clause_type == 'payment_terms':
            risks.extend(self._assess_payment_clause(content))
        elif clause_type == 'ip':
            risks.extend(self._assess_ip_clause(content))
        elif clause_type == 'indemnification':
            risks.extend(self._assess_indemnification_clause(content))
        
        return risks
    
    def _assess_liability_clause(self, content: str) -> List[Dict[str, Any]]:
        """评估责任条款"""
        risks = []
        
        # 检查是否有无限责任
        if '无限责任' in content or '全部责任' in content:
            risks.append({
                'type': 'liability_concern',
                'risk_level': 'high',
                'description': '条款中包含无限责任或全部责任表述,可能带来过高风险',
                'dimension': 'liability'
            })
        
        # 检查责任限额
        limit_pattern = re.compile(r'最高(?:赔偿|责任)(?:不超过)?[人民币]?\s*(\d+(?:\.\d+)?)\s*([万亿]?元)', re.IGNORECASE)
        matches = limit_pattern.findall(content)
        if matches:
            amount, unit = matches[0]
            amount = float(amount)
            
            # 转换为元
            if '万' in unit:
                amount *= 10000
            elif '亿' in unit:
                amount *= 100000000
            
            max_limit = float(self.company_policy['max_liability_limit'])
            if amount > max_limit:
                risks.append({
                    'type': 'liability_concern',
                    'risk_level': 'medium',
                    'description': f'责任限额({amount}元)超过公司规定的最高限额({max_limit}元)',
                    'dimension': 'financial'
                })
        
        return risks
    
    def _assess_payment_clause(self, content: str) -> List[Dict[str, Any]]:
        """评估付款条款"""
        risks = []
        
        # 检查付款期限
        term_pattern = re.compile(r'(\d+)\s*天[内]?付款', re.IGNORECASE)
        matches = term_pattern.findall(content)
        if matches:
            days = int(matches[0])
            preferred_days = self.company_policy['payment_terms']
            
            if days < preferred_days:
                risks.append({
                    'type': 'payment_concern',
                    'risk_level': 'medium',
                    'description': f'付款期限({days}天)短于公司偏好({preferred_days}天)',
                    'dimension': 'financial'
                })
        
        return risks
    
    def _assess_ip_clause(self, content: str) -> List[Dict[str, Any]]:
        """评估知识产权条款"""
        risks = []
        
        # 检查知识产权归属
        preferred_owner = self.company_policy['ip_ownership']
        if '归' in content and '所有' in content:
            if preferred_owner == '我方':
                if '归乙方所有' in content or '归对方所有' in content:
                    risks.append({
                        'type': 'ip_concern',
                        'risk_level': 'high',
                        'description': '知识产权归属可能不利于我方',
                        'dimension': 'liability'
                    })
        
        return risks
    
    def _assess_indemnification_clause(self, content: str) -> List[Dict[str, Any]]:
        """评估赔偿条款"""
        risks = []
        
        # 检查赔偿范围
        if '全部损失' in content or '任何索赔' in content:
            risks.append({
                'type': 'indemnification_concern',
                'risk_level': 'high',
                'description': '赔偿范围过宽,可能承担过大风险',
                'dimension': 'liability'
            })
        
        return risks
    
    def _check_against_company_policy(self, clause: Dict[str, Any]) -> List[Dict[str, Any]]:
        """
        与公司政策比对
        """
        risks = []
        clause_type = clause['type']
        content = clause['content']
        
        if clause_type == 'governing_law':
            # 检查适用法律
            preferred_laws = self.company_policy['preferred_governing_law']
            law_found = any(law in content for law in preferred_laws)
            
            if not law_found:
                risks.append({
                    'type': 'policy_violation',
                    'risk_level': 'medium',
                    'description': '适用法律不符合公司偏好',
                    'dimension': 'compliance'
                })
        
        elif clause_type == 'dispute_resolution':
            # 检查争议解决方式
            preferred_resolutions = self.company_policy['preferred_dispute_resolution']
            resolution_found = any(res in content for res in preferred_resolutions)
            
            if not resolution_found:
                risks.append({
                    'type': 'policy_violation',
                    'risk_level': 'medium',
                    'description': '争议解决方式不符合公司偏好',
                    'dimension': 'compliance'
                })
        
        return risks
    
    def _calculate_dimension_score(self, dimension: str, risk_indicators: List[Dict[str, Any]]) -> float:
        """
        计算特定风险维度的评分
        """
        # 简化计算:统计相关维度的风险指标数量和严重程度
        relevant_indicators = [
            ri for ri in risk_indicators 
            if 'dimension' not in ri or ri['dimension'] == dimension
        ]
        
        if not relevant_indicators:
            return 0
        
        # 根据风险级别计算分数
        score = 0
        for indicator in relevant_indicators:
            if indicator['risk_level'] == 'high':
                score += 3
            elif indicator['risk_level'] == 'medium':
                score += 1.5
            else:
                score += 0.5
        
        # 归一化到0-10分
        return min(10, score)
    
    def _generate_recommendations(self, risk_assessment: Dict[str, Any]) -> List[str]:
        """
        生成修改建议
        """
        recommendations = []
        risk_level = risk_assessment['risk_level']
        risk_indicators = risk_assessment['risk_indicators']
        
        if risk_level == 'high':
            recommendations.append("此条款存在较高风险,建议重点关注并考虑修改")
        elif risk_level == 'medium':
            recommendations.append("此条款存在一定风险,建议评估后决定是否修改")
        
        # 根据具体风险指标生成建议
        for indicator in risk_indicators:
            if 'keyword' in indicator:
                recommendations.append(
                    f"考虑修改或删除风险关键词: '{indicator['keyword']}'"
                )
            elif indicator['type'] == 'liability_concern':
                recommendations.append(
                    "建议明确责任限制范围,避免承担无限责任"
                )
            elif indicator['type'] == 'payment_concern':
                recommendations.append(
                    "建议协商延长付款期限,符合公司资金管理要求"
                )
            elif indicator['type'] == 'ip_concern':
                recommendations.append(
                    "建议重新审核知识产权归属条款,确保公司利益"
                )
            elif indicator['type'] == 'policy_violation':
                recommendations.append(
                    "建议修改条款以符合公司标准政策"
                )
        
        return recommendations
    
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐