办公自动化场景实战:用 OpenClaw 接管邮件、日程、文档和报表
·
标签:
办公自动化OpenClawMCPAI AgentPython邮件自动化日程管理
阅读时长:约 20 分钟
📌 写在前面
你是否每天都在重复这些操作:
- 手动整理几十封邮件,逐一回复、分类、归档
- 打开日历应用手动排期,反复确认会议时间
- 复制粘贴数据,在 Excel 里做出同样格式的周报
- 在多个系统之间来回切换,寻找会议纪要模板
OpenClaw 是一个基于 MCP(Model Context Protocol)协议构建的 AI 办公自动化框架,能让大语言模型(LLM)直接调用系统级工具,真正接管你的日常办公流程。
本文将聚焦 4 个核心场景的实现方法,全程图文详实,代码可直接复用。
🗺️ 整体架构速览
用户意图(自然语言)
↓
OpenClaw Agent
↓
┌─────────────────────────────────────┐
│ MCP Tool Router │
└──┬──────┬──────────┬───────────────┘
↓ ↓ ↓ ↓
邮件 日历/日程 文档处理 数据报表
Tool Tool Tool Tool
↓ ↓ ↓ ↓
Gmail Google Word/Notion Excel/CSV
Calendar 可视化
OpenClaw 的核心是一个 工具路由层:接收自然语言指令,拆解为子任务,分发给对应的 MCP 工具,最后聚合结果返回给用户。
场景一:邮件自动化 📧
1.1 实现目标
- 自动读取未读邮件并按优先级分类
- 根据上下文自动起草回复草稿
- 批量归档已处理邮件
1.2 核心实现
Step 1:注册邮件 MCP 工具
# tools/email_tool.py
from openclaw.core import MCPTool, ToolSchema
from openclaw.connectors import GmailConnector
class EmailTool(MCPTool):
name = "email_manager"
description = "读取、分类、回复和归档邮件"
schema = ToolSchema(
actions=["list_unread", "read_email", "draft_reply", "archive", "label"],
required_auth=["gmail_oauth2"]
)
def __init__(self):
self.connector = GmailConnector()
def list_unread(self, max_results: int = 20) -> list[dict]:
"""获取未读邮件列表"""
messages = self.connector.users().messages().list(
userId="me",
q="is:unread",
maxResults=max_results
).execute()
result = []
for msg in messages.get("messages", []):
detail = self.connector.get_message(msg["id"])
result.append({
"id": msg["id"],
"subject": detail.get("subject"),
"sender": detail.get("from"),
"snippet": detail.get("snippet"),
"date": detail.get("date"),
})
return result
Step 2:构建邮件分类 Agent
# agents/email_classifier_agent.py
from openclaw.agent import OpenClawAgent
from tools.email_tool import EmailTool
CLASSIFICATION_PROMPT = """
你是一个邮件分类助手。根据邮件的主题、发件人和摘要,将邮件归类为:
- 🔴 urgent:需要当天回复的紧急邮件
- 🟡 action_required:需要操作但不紧急
- 🟢 informational:仅供知晓,无需回复
- 🔵 newsletter:订阅类邮件
请以 JSON 格式返回分类结果:
{"id": "邮件ID", "category": "分类", "reason": "分类理由", "suggested_reply": "建议回复摘要(如需)"}
"""
class EmailClassifierAgent(OpenClawAgent):
tools = [EmailTool()]
def run(self, max_emails: int = 20):
# 1. 获取未读邮件
emails = self.tools[0].list_unread(max_results=max_emails)
print(f"📬 共获取到 {len(emails)} 封未读邮件")
classified = []
for email in emails:
# 2. 调用 LLM 分类
result = self.llm.chat(
system=CLASSIFICATION_PROMPT,
user=f"邮件信息:{email}"
)
classified.append(result)
# 3. 按优先级排序
priority_order = {"urgent": 0, "action_required": 1,
"informational": 2, "newsletter": 3}
classified.sort(key=lambda x: priority_order.get(x["category"], 99))
return classified
Step 3:自动起草回复
# agents/email_reply_agent.py
REPLY_DRAFT_PROMPT = """
你是一位专业的商务助理,请根据原始邮件内容,起草一封简洁、专业的中文回复邮件。
要求:
1. 语气礼貌正式
2. 直接回应邮件中的核心问题
3. 结尾使用:「如有疑问,欢迎继续沟通。祝好,[签名]」
"""
def draft_reply(email_id: str, context: str = "") -> str:
"""
为指定邮件生成回复草稿
:param email_id: 邮件 ID
:param context: 额外上下文(如附加说明、参考资料等)
"""
email_tool = EmailTool()
email_content = email_tool.read_email(email_id)
user_prompt = f"""
原始邮件:
发件人:{email_content['sender']}
主题:{email_content['subject']}
正文:{email_content['body']}
额外上下文:{context}
"""
draft = llm.chat(system=REPLY_DRAFT_PROMPT, user=user_prompt)
# 保存草稿到 Gmail
email_tool.save_draft(
to=email_content['sender'],
subject=f"Re: {email_content['subject']}",
body=draft
)
print(f"✅ 回复草稿已保存:{email_content['subject']}")
return draft
1.3 运行效果示意
📬 共获取到 18 封未读邮件
分类结果:
┌────────────────────────────────────────────┬──────────────────┐
│ 邮件主题 │ 优先级 │
├────────────────────────────────────────────┼──────────────────┤
│ [紧急] 服务器告警 - 磁盘空间不足 │ 🔴 urgent │
│ Q3 财务报告审批请求 │ 🟡 action_required│
│ 团队周会通知 - 本周五 14:00 │ 🟢 informational │
│ ProductHunt 每日推送 │ 🔵 newsletter │
└────────────────────────────────────────────┴──────────────────┘
✅ 已为 2 封 urgent 邮件自动生成回复草稿
场景二:日程智能管理 📅
2.1 实现目标
- 解析自然语言创建日程事件
- 自动检测冲突并建议替代时间
- 生成每日/每周日程摘要
2.2 核心实现
Step 1:日历 MCP 工具封装
# tools/calendar_tool.py
from openclaw.core import MCPTool
from googleapiclient.discovery import build
from datetime import datetime, timedelta
import pytz
class CalendarTool(MCPTool):
name = "calendar_manager"
description = "管理 Google Calendar 日程,支持增删查改和冲突检测"
def __init__(self, timezone: str = "Asia/Shanghai"):
self.service = build("calendar", "v3", credentials=self._get_creds())
self.tz = pytz.timezone(timezone)
def get_events(self, days_ahead: int = 7) -> list[dict]:
"""获取未来 N 天的日程"""
now = datetime.now(self.tz).isoformat()
end = (datetime.now(self.tz) + timedelta(days=days_ahead)).isoformat()
events_result = self.service.events().list(
calendarId="primary",
timeMin=now,
timeMax=end,
singleEvents=True,
orderBy="startTime"
).execute()
return events_result.get("items", [])
def check_conflict(self, start: str, end: str) -> list[dict]:
"""检测指定时间段是否有冲突"""
existing = self.get_events()
conflicts = []
for event in existing:
ev_start = event["start"].get("dateTime")
ev_end = event["end"].get("dateTime")
# 时间段重叠判断
if ev_start < end and ev_end > start:
conflicts.append(event)
return conflicts
def create_event(self, title: str, start: str, end: str,
attendees: list = None, description: str = "") -> dict:
"""创建新日程事件"""
event_body = {
"summary": title,
"description": description,
"start": {"dateTime": start, "timeZone": "Asia/Shanghai"},
"end": {"dateTime": end, "timeZone": "Asia/Shanghai"},
}
if attendees:
event_body["attendees"] = [{"email": a} for a in attendees]
created = self.service.events().insert(
calendarId="primary",
body=event_body,
sendUpdates="all"
).execute()
return created
def suggest_free_slots(self, duration_minutes: int = 60,
search_days: int = 3) -> list[dict]:
"""根据现有日程,推荐空闲时间段"""
events = self.get_events(days_ahead=search_days)
# 构建已占用时间块
busy_blocks = [(e["start"]["dateTime"], e["end"]["dateTime"])
for e in events if "dateTime" in e.get("start", {})]
free_slots = []
work_start = 9 # 09:00
work_end = 18 # 18:00
for day_offset in range(search_days):
day = datetime.now(self.tz) + timedelta(days=day_offset)
slot_start = day.replace(hour=work_start, minute=0, second=0)
while slot_start.hour < work_end:
slot_end = slot_start + timedelta(minutes=duration_minutes)
slot_start_str = slot_start.isoformat()
slot_end_str = slot_end.isoformat()
# 检查是否与已有日程冲突
is_free = not any(
bs < slot_end_str and be > slot_start_str
for bs, be in busy_blocks
)
if is_free:
free_slots.append({
"date": day.strftime("%Y-%m-%d %A"),
"start": slot_start.strftime("%H:%M"),
"end": slot_end.strftime("%H:%M"),
})
slot_start += timedelta(minutes=30)
return free_slots[:5] # 返回前5个推荐时段
Step 2:自然语言解析日程
# agents/schedule_agent.py
SCHEDULE_PARSE_PROMPT = """
将用户的自然语言转化为日程事件的结构化数据,以 JSON 格式返回:
{
"title": "事件标题",
"date": "YYYY-MM-DD",
"start_time": "HH:MM",
"end_time": "HH:MM",
"attendees": ["邮箱1", "邮箱2"],
"description": "备注"
}
今天的日期是 {today},处于时区 Asia/Shanghai。
如果用户说"明天"、"下周五"等相对时间,请转换为具体日期。
"""
def parse_and_create_schedule(user_input: str) -> dict:
"""
输入:「明天下午3点和产品团队开需求评审会,持续1小时」
输出:自动创建日程并返回确认信息
"""
from datetime import date
today = date.today().strftime("%Y-%m-%d")
# 1. 解析自然语言
parsed = llm.chat(
system=SCHEDULE_PARSE_PROMPT.format(today=today),
user=user_input
)
# 2. 检测时间冲突
cal = CalendarTool()
start_dt = f"{parsed['date']}T{parsed['start_time']}:00+08:00"
end_dt = f"{parsed['date']}T{parsed['end_time']}:00+08:00"
conflicts = cal.check_conflict(start_dt, end_dt)
if conflicts:
print(f"⚠️ 检测到 {len(conflicts)} 个时间冲突:")
for c in conflicts:
print(f" - {c['summary']} ({c['start']['dateTime']})")
# 推荐替代时间
suggestions = cal.suggest_free_slots(
duration_minutes=60, search_days=3
)
print("\n📌 推荐的空闲时段:")
for s in suggestions:
print(f" {s['date']} {s['start']} - {s['end']}")
return {"status": "conflict", "suggestions": suggestions}
# 3. 创建日程
event = cal.create_event(
title=parsed["title"],
start=start_dt,
end=end_dt,
attendees=parsed.get("attendees", []),
description=parsed.get("description", "")
)
print(f"✅ 日程已创建:{event['summary']} {event['start']['dateTime']}")
return {"status": "created", "event": event}
Step 3:每日日程摘要推送
# agents/daily_briefing.py
def generate_daily_briefing() -> str:
"""
生成今日日程摘要,可自动发送到邮件或企业微信
"""
cal = CalendarTool()
events = cal.get_events(days_ahead=1)
if not events:
return "📅 今日暂无日程,好好享受空闲时光!"
events_text = "\n".join([
f"- {e['start'].get('dateTime', '全天')[11:16]} "
f"{e['summary']} "
f"({e.get('location', '无地点')})"
for e in events
])
briefing = llm.chat(
system="你是一位贴心的AI助理,根据日程列表生成简洁友好的每日简报。",
user=f"今日日程如下:\n{events_text}\n\n请生成一份日程摘要,并提醒重要事项。"
)
return briefing
2.3 运行效果示意
用户输入:「后天上午10点半和设计部 Lisa、Tom 开 UI 评审,大概要1.5小时」
🔍 解析结果:
- 事件:UI 评审会议
- 时间:2026-03-15 10:30 - 12:00
- 参与者:lisa@company.com, tom@company.com
✅ 无冲突,日程已创建并向参与者发送邀请!
--- 今日摘要 ---
📅 您今天共有 3 个日程:
• 09:00 晨会 - 工程部周同步(约30分钟)
• 14:00 产品 Demo 演示(会议室A)
• 16:30 1on1 与 Manager Jack
⚠️ 注意:14:00 的 Demo 前请提前准备好演示材料!
场景三:文档智能处理 📄
3.1 实现目标
- 批量读取 Word/PDF 文件并提取关键信息
- 根据模板自动生成会议纪要
- 实现跨文档信息检索
3.2 核心实现
Step 1:文档解析工具
# tools/document_tool.py
import os
import json
from pathlib import Path
from openclaw.core import MCPTool
import docx
import pdfplumber
class DocumentTool(MCPTool):
name = "document_processor"
description = "读取、解析和生成 Word/PDF 文档"
def read_docx(self, filepath: str) -> dict:
"""读取 Word 文档内容"""
doc = docx.Document(filepath)
full_text = []
metadata = {"headings": [], "tables": []}
for para in doc.paragraphs:
if para.style.name.startswith("Heading"):
metadata["headings"].append({
"level": para.style.name,
"text": para.text
})
if para.text.strip():
full_text.append(para.text)
for table in doc.tables:
table_data = []
for row in table.rows:
table_data.append([cell.text for cell in row.cells])
metadata["tables"].append(table_data)
return {
"filename": Path(filepath).name,
"content": "\n".join(full_text),
"metadata": metadata
}
def read_pdf(self, filepath: str) -> dict:
"""读取 PDF 文档内容"""
pages_content = []
with pdfplumber.open(filepath) as pdf:
for i, page in enumerate(pdf.pages):
text = page.extract_text() or ""
tables = page.extract_tables()
pages_content.append({
"page": i + 1,
"text": text,
"tables": tables
})
return {
"filename": Path(filepath).name,
"pages": len(pages_content),
"content": "\n".join(p["text"] for p in pages_content),
"raw_pages": pages_content
}
def batch_read(self, folder_path: str,
extensions: list = None) -> list[dict]:
"""批量读取文件夹下的文档"""
if extensions is None:
extensions = [".docx", ".pdf"]
folder = Path(folder_path)
results = []
for file in folder.iterdir():
if file.suffix.lower() in extensions:
try:
if file.suffix == ".docx":
doc_data = self.read_docx(str(file))
else:
doc_data = self.read_pdf(str(file))
results.append(doc_data)
print(f"✅ 已读取:{file.name}")
except Exception as e:
print(f"❌ 读取失败:{file.name} - {e}")
return results
Step 2:自动生成会议纪要
# agents/meeting_minutes_agent.py
MINUTES_TEMPLATE_PROMPT = """
你是一位专业的会议纪要撰写助理。根据以下会议录音转写文本,生成标准格式的会议纪要。
输出格式:
## 会议纪要
**会议主题**:[从文本中提取]
**会议时间**:[从文本中提取或用今天的日期]
**参会人员**:[列出所有提到的人名]
**主持人**:[如有]
---
### 📋 主要议题
[按议题分点列出]
### ✅ 决议事项
| 事项 | 负责人 | 截止时间 |
|------|--------|---------|
| ... | ... | ... |
### 📌 待确认事项
[列出未决事项]
### 📅 下次会议
[如有提及]
"""
def generate_meeting_minutes(transcript_text: str = None,
audio_file: str = None) -> str:
"""
从会议录音或文字稿生成会议纪要
:param transcript_text: 直接传入转写文本
:param audio_file: 传入音频文件路径(自动调用 Whisper 转写)
"""
if audio_file and not transcript_text:
# 调用 OpenAI Whisper 进行语音转写
import openai
with open(audio_file, "rb") as f:
transcript = openai.Audio.transcribe("whisper-1", f)
transcript_text = transcript["text"]
print(f"🎤 语音转写完成,共 {len(transcript_text)} 字")
# 调用 LLM 生成会议纪要
minutes = llm.chat(
system=MINUTES_TEMPLATE_PROMPT,
user=f"会议记录:\n\n{transcript_text}"
)
# 保存为 Word 文档
output_path = save_minutes_to_docx(minutes)
print(f"📄 会议纪要已保存:{output_path}")
return minutes
def save_minutes_to_docx(content: str, output_dir: str = "./output") -> str:
"""将 Markdown 格式的纪要保存为 Word 文档"""
import docx
from datetime import date
doc = docx.Document()
doc.add_heading("会议纪要", level=0)
for line in content.split("\n"):
if line.startswith("## "):
doc.add_heading(line[3:], level=1)
elif line.startswith("### "):
doc.add_heading(line[4:], level=2)
elif line.startswith("**") and line.endswith("**"):
p = doc.add_paragraph()
p.add_run(line[2:-2]).bold = True
elif line.startswith("| "):
# 表格行(简化处理)
doc.add_paragraph(line)
elif line.strip():
doc.add_paragraph(line)
filename = f"meeting_minutes_{date.today()}.docx"
filepath = os.path.join(output_dir, filename)
os.makedirs(output_dir, exist_ok=True)
doc.save(filepath)
return filepath
Step 3:跨文档语义检索
# agents/doc_search_agent.py
from sentence_transformers import SentenceTransformer
import numpy as np
class DocSearchAgent:
"""基于向量嵌入的跨文档语义检索"""
def __init__(self):
self.model = SentenceTransformer("paraphrase-multilingual-MiniLM-L12-v2")
self.doc_chunks = []
self.embeddings = []
def index_documents(self, folder_path: str):
"""索引文件夹中的所有文档"""
doc_tool = DocumentTool()
docs = doc_tool.batch_read(folder_path)
for doc in docs:
# 按段落切分文档
chunks = [c.strip() for c in doc["content"].split("\n")
if len(c.strip()) > 30]
for chunk in chunks:
self.doc_chunks.append({
"text": chunk,
"source": doc["filename"]
})
# 生成向量嵌入
texts = [c["text"] for c in self.doc_chunks]
self.embeddings = self.model.encode(texts, show_progress_bar=True)
print(f"📚 已索引 {len(docs)} 个文档,{len(self.doc_chunks)} 个文本块")
def search(self, query: str, top_k: int = 5) -> list[dict]:
"""语义搜索"""
query_embedding = self.model.encode([query])
# 计算余弦相似度
similarities = np.dot(self.embeddings, query_embedding.T).flatten()
top_indices = similarities.argsort()[-top_k:][::-1]
results = []
for idx in top_indices:
results.append({
"text": self.doc_chunks[idx]["text"],
"source": self.doc_chunks[idx]["source"],
"score": float(similarities[idx])
})
return results
3.3 运行效果示意
用户:「帮我从上个月所有会议录音里找出关于'API 限流方案'的讨论」
🔍 正在检索 24 个会议纪要文档...
找到 3 处相关内容:
1. [相关度: 0.94] 来源:2026-02-18-架构评审会.docx
"针对 API 限流方案,团队决定采用令牌桶算法,
限制每个租户 1000 req/min..."
2. [相关度: 0.87] 来源:2026-02-25-技术周会.docx
"限流方案测试结果:P99 延迟控制在 50ms 以内..."
3. [相关度: 0.81] 来源:2026-03-03-产品研讨会.docx
"产品侧提出需要对 VIP 客户提供限流豁免机制..."
场景四:数据报表自动生成 📊
4.1 实现目标
- 自动从多数据源拉取数据
- 按固定模板生成周报/月报
- 将报表自动发送至相关人员
4.2 核心实现
Step 1:数据源适配器
# tools/data_source_tool.py
import pandas as pd
import sqlite3
import requests
class DataSourceTool:
"""统一的多数据源适配器"""
def from_csv(self, filepath: str,
date_col: str = None,
date_range: tuple = None) -> pd.DataFrame:
"""从 CSV 文件读取数据"""
df = pd.read_csv(filepath)
if date_col and date_range:
df[date_col] = pd.to_datetime(df[date_col])
mask = (df[date_col] >= date_range[0]) & (df[date_col] <= date_range[1])
df = df[mask]
return df
def from_sql(self, db_path: str, query: str) -> pd.DataFrame:
"""从 SQLite 数据库查询数据"""
conn = sqlite3.connect(db_path)
df = pd.read_sql_query(query, conn)
conn.close()
return df
def from_api(self, url: str, headers: dict = None,
params: dict = None) -> pd.DataFrame:
"""从 REST API 获取数据"""
resp = requests.get(url, headers=headers, params=params)
resp.raise_for_status()
data = resp.json()
# 自动展平嵌套 JSON
return pd.json_normalize(data if isinstance(data, list) else data.get("data", []))
Step 2:报表生成引擎
# agents/report_generator.py
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib
matplotlib.rcParams["font.family"] = "SimHei" # 中文字体支持
from pathlib import Path
from openpyxl import Workbook
from openpyxl.chart import BarChart, Reference
from openpyxl.styles import Font, PatternFill, Alignment
from openpyxl.utils.dataframe import dataframe_to_rows
class WeeklyReportGenerator:
"""自动周报生成器"""
def __init__(self, output_dir: str = "./reports"):
self.output_dir = Path(output_dir)
self.output_dir.mkdir(exist_ok=True)
def generate(self, data: dict, week_label: str = "本周") -> str:
"""
data 格式:
{
"sales": DataFrame, # 销售数据
"user_metrics": DataFrame, # 用户指标
"issues": DataFrame # 问题单数据
}
"""
wb = Workbook()
# ---- Sheet 1: 执行摘要 ----
ws_summary = wb.active
ws_summary.title = "执行摘要"
self._build_summary_sheet(ws_summary, data, week_label)
# ---- Sheet 2: 销售明细 ----
ws_sales = wb.create_sheet("销售明细")
self._build_data_sheet(ws_sales, data["sales"], "销售数据明细")
# ---- Sheet 3: 用户指标 ----
ws_metrics = wb.create_sheet("用户指标")
self._build_metrics_sheet(ws_metrics, data["user_metrics"])
filename = self.output_dir / f"weekly_report_{week_label}.xlsx"
wb.save(filename)
print(f"📊 报表已生成:{filename}")
return str(filename)
def _build_summary_sheet(self, ws, data: dict, week_label: str):
"""构建执行摘要页"""
# 标题样式
header_font = Font(bold=True, size=14, color="FFFFFF")
header_fill = PatternFill(fill_type="solid", fgColor="2F5496")
ws["A1"] = f"{week_label} 运营数据周报"
ws["A1"].font = Font(bold=True, size=18)
ws.merge_cells("A1:F1")
# KPI 卡片区域
kpis = self._calculate_kpis(data)
kpi_headers = ["指标", "本周数值", "上周数值", "环比变化", "目标值", "达成率"]
for col, header in enumerate(kpi_headers, start=1):
cell = ws.cell(row=3, column=col, value=header)
cell.font = header_font
cell.fill = header_fill
cell.alignment = Alignment(horizontal="center")
for row_idx, (metric, values) in enumerate(kpis.items(), start=4):
ws.cell(row=row_idx, column=1, value=metric)
ws.cell(row=row_idx, column=2, value=values["current"])
ws.cell(row=row_idx, column=3, value=values["last_week"])
change = values["current"] - values["last_week"]
change_pct = f"{change/values['last_week']*100:+.1f}%"
cell = ws.cell(row=row_idx, column=4, value=change_pct)
# 涨红跌绿
cell.font = Font(color="C00000" if change > 0 else "00B050")
ws.cell(row=row_idx, column=5, value=values.get("target", "-"))
ws.cell(row=row_idx, column=6,
value=f"{values['current']/values.get('target', 1)*100:.1f}%")
# 插入趋势图
self._add_trend_chart(ws, data["sales"])
def _calculate_kpis(self, data: dict) -> dict:
"""计算核心 KPI"""
sales_df = data["sales"]
metrics_df = data["user_metrics"]
return {
"总销售额(万元)": {
"current": sales_df["amount"].sum() / 10000,
"last_week": sales_df["amount_lw"].sum() / 10000,
"target": 500
},
"新增用户数": {
"current": int(metrics_df["new_users"].sum()),
"last_week": int(metrics_df["new_users_lw"].sum()),
"target": 2000
},
"订单转化率": {
"current": metrics_df["conversion_rate"].mean(),
"last_week": metrics_df["conversion_rate_lw"].mean(),
"target": 0.15
},
}
def _add_trend_chart(self, ws, sales_df: pd.DataFrame):
"""在 Sheet 中插入销售趋势柱状图"""
chart = BarChart()
chart.type = "col"
chart.title = "本周每日销售额趋势"
chart.y_axis.title = "销售额(元)"
chart.x_axis.title = "日期"
chart.height = 15
chart.width = 25
# 写入图表数据到隐藏区域
start_row = 20
ws.cell(row=start_row, column=1, value="日期")
ws.cell(row=start_row, column=2, value="销售额")
for i, (_, row) in enumerate(sales_df.iterrows()):
ws.cell(row=start_row + 1 + i, column=1, value=str(row["date"]))
ws.cell(row=start_row + 1 + i, column=2, value=row["amount"])
data_ref = Reference(ws,
min_col=2, max_col=2,
min_row=start_row,
max_row=start_row + len(sales_df))
chart.add_data(data_ref, titles_from_data=True)
ws.add_chart(chart, "H3")
def _build_data_sheet(self, ws, df: pd.DataFrame, title: str):
"""构建数据明细 Sheet"""
ws["A1"] = title
ws["A1"].font = Font(bold=True, size=14)
for r_idx, row in enumerate(dataframe_to_rows(df, index=False, header=True), start=3):
for c_idx, value in enumerate(row, start=1):
cell = ws.cell(row=r_idx, column=c_idx, value=value)
if r_idx == 3: # 表头
cell.font = Font(bold=True, color="FFFFFF")
cell.fill = PatternFill(fill_type="solid", fgColor="4472C4")
Step 3:AI 智能报表解读
# agents/report_analyst_agent.py
ANALYST_PROMPT = """
你是一位资深数据分析师,请根据以下周报数据:
1. 指出本周最显著的 3 个亮点(正面)
2. 识别 2-3 个需要关注的风险点
3. 给出下周的具体行动建议
请用简洁的商业语言,以 Markdown 格式输出分析报告。
"""
def generate_report_analysis(report_data: dict) -> str:
"""为生成的报表自动添加 AI 分析解读"""
data_summary = {
"sales_total": report_data["sales"]["amount"].sum(),
"sales_wow": (report_data["sales"]["amount"].sum() /
report_data["sales"]["amount_lw"].sum() - 1),
"new_users": report_data["user_metrics"]["new_users"].sum(),
"conversion": report_data["user_metrics"]["conversion_rate"].mean(),
"open_issues": len(report_data.get("issues", [])),
}
analysis = llm.chat(
system=ANALYST_PROMPT,
user=f"本周核心数据:{data_summary}"
)
return analysis
def auto_send_report(report_path: str, analysis: str,
recipients: list[str]):
"""自动将报表和分析通过邮件发送给相关人员"""
email_tool = EmailTool()
body = f"""
Hi Team,
{analysis}
完整报表请见附件。
Best Regards,
AI 报表系统
"""
email_tool.send_with_attachment(
to=recipients,
subject=f"📊 {week_label} 运营数据周报 - 自动生成",
body=body,
attachment=report_path
)
print(f"📧 报表已发送至 {len(recipients)} 位收件人")
4.3 运行效果示意
📊 开始生成本周数据报表...
✅ 销售数据已读取:1,247 条记录
✅ 用户指标已读取:7 天 × 12 个维度
✅ 问题单数据已读取:34 条
📄 正在生成 Excel 报表...
📊 报表已生成:./reports/weekly_report_W11.xlsx
🤖 AI 分析报告:
━━━━━━━━━━━━━━━━━━━━━━━━
✨ 本周亮点:
1. 总销售额达 487 万元,环比上涨 12.3%,创近 8 周新高
2. 新增用户 2,341 人,超额完成目标 117%
3. 华东区转化率提升至 18.2%,高出全国均值 3.1 个百分点
⚠️ 风险提示:
1. 华北区销售额连续 3 周环比下滑,需重点关注
2. 客服响应时长均值达 4.2 小时,超出 SLA 要求
📌 下周建议:
1. 针对华北区启动专项分析,建议本周五前提交原因报告
2. 扩充客服班次或接入 AI 客服预处理
━━━━━━━━━━━━━━━━━━━━━━━━
📧 报表已发送至 5 位收件人
🔗 将四大场景串联:编排 Workflow
OpenClaw 支持通过 Workflow 将多个 Agent 串联成完整的自动化流水线:
# workflows/monday_morning_workflow.py
from openclaw.workflow import Workflow, WorkflowStep
class MondayMorningWorkflow(Workflow):
"""
每周一早 8:00 自动执行的"晨间例行"工作流:
1. 生成本周日程摘要
2. 读取周末积压邮件并分类
3. 生成上周数据报表
4. 汇总所有内容推送到企业微信
"""
name = "monday_morning_briefing"
schedule = "0 8 * * 1" # Cron 表达式:每周一 08:00
steps = [
WorkflowStep(
name="calendar_briefing",
agent="ScheduleAgent",
action="generate_daily_briefing",
params={"days_ahead": 5}
),
WorkflowStep(
name="email_triage",
agent="EmailClassifierAgent",
action="run",
params={"max_emails": 50}
),
WorkflowStep(
name="weekly_report",
agent="WeeklyReportGenerator",
action="generate",
depends_on=[] # 与前两步并行执行
),
WorkflowStep(
name="push_notification",
agent="NotificationAgent",
action="push_to_wecom",
# 接收前三步的输出
input_from=["calendar_briefing", "email_triage", "weekly_report"]
),
]
# 注册并运行
workflow = MondayMorningWorkflow()
workflow.register() # 注册定时任务
workflow.run_now() # 立即执行一次(测试用)
工作流执行日志示意
[08:00:01] 🚀 MondayMorningWorkflow 开始执行
[08:00:02] ├─ Step 1: calendar_briefing ... ✅ (0.8s)
[08:00:03] ├─ Step 2: email_triage ... ✅ 处理 42 封邮件 (3.2s)
[08:00:05] ├─ Step 3: weekly_report ... ✅ 报表已生成 (4.1s)
[08:00:09] └─ Step 4: push_notification ... ✅ 推送至企业微信 (0.3s)
[08:00:09] ✅ Workflow 执行完成,总耗时 8.4s
⚡ 最佳实践总结
| 场景 | 关键实现要点 | 推荐工具 |
|---|---|---|
| 邮件自动化 | OAuth2 认证 + 优先级分类 + 草稿生成 | Gmail API + LLM |
| 日程管理 | 自然语言解析 + 冲突检测 + 空闲推荐 | Google Calendar API |
| 文档处理 | 多格式解析 + 向量检索 + 模板生成 | python-docx + pdfplumber + sentence-transformers |
| 数据报表 | 多源聚合 + 自动图表 + AI 解读 + 推送 | openpyxl + matplotlib + LLM |
💡 核心设计原则
- 工具原子化:每个 MCP 工具只做一件事,便于复用和测试
- 错误降级:单步失败不影响整体工作流,记录日志并跳过
- 人工审核钩子:重要操作(如发送邮件)前插入确认步骤
- 异步并行:无依赖的步骤并行执行,提升整体吞吐
- 可观测性:每步操作记录执行时间、输入输出,便于排查
🧩 扩展方向
- 企业微信/飞书 Bot:接入 Webhook,让 AI 回复同事消息
- Notion 知识库同步:自动将会议纪要推送至团队知识库
- 多模型路由:根据任务类型自动选择 GPT-4 / Claude / 本地模型
- 审计日志:记录所有 AI 操作,满足合规要求
📚 参考资料
💬 如果这篇文章对你有帮助,欢迎点赞收藏!有问题可以在评论区交流。
🔔 关注专栏获取更多 AI 办公自动化实战内容。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐


所有评论(0)