Python 实现调用 Dify 工作流实战博客
·
Python 实现调用 Dify 工作流实战博客
简介
Dify 是开源低代码 AI 应用搭建平台,支持可视化拖拽搭建 AI 工作流,可自由组合大模型问答、工具调用、数据处理、条件分支等节点。工作流发布后可通过官方 API 实现外部程序调用。本篇文章使用 Python 语言,基于 requests 库完成 Dify 工作流同步调用、流式调用实操,搭配参数表格解析、实例代码、返回值说明,零基础也能快速上手对接 Dify 工作流。
一、前期环境准备
1.1 Dify 端准备工作
- 部署 Dify 服务(本地部署 / 线上服务均可)
- 新建工作流应用,编排业务流程并调试运行正常
- 将工作流正式发布,未发布应用无法调用 API
- 获取应用 API Key、Dify 服务访问地址
1.2 Python 环境依赖
仅需安装网络请求库 requests,用于发起 Http 接口请求
pip install requests
二、Dify 工作流核心调用参数说明
调用 Dify 工作流 /workflows/run 接口存在固定请求头、请求体参数,下方用表格清晰标注参数作用与填写规则。
2.1 请求头参数表
| 参数键名 | 参数值格式 | 作用 |
|---|---|---|
| Authorization | Bearer + APIKey | 接口身份鉴权,校验调用权限 |
| Content-Type | application/json | 声明请求数据格式为 JSON |
2.2 请求体核心参数表
| 参数名 | 可选值 | 说明 |
|---|---|---|
| inputs | 字典格式 | 传入工作流自定义变量,键名必须和工作流变量名一致 |
| response_mode | block / stream | block 同步一次性返回结果;stream 流式实时返回数据 |
| user | 自定义字符串 | 标记调用用户标识,用于后台统计调用记录 |
2.3 接口基础地址
| 部署方式 | 接口地址 |
|---|---|
| 本地部署 | http://localhost:8000/v1/workflows/run |
| 线上部署 | https://你的域名/v1/workflows/run |
2.4 其他常用 API 接口
| 接口 | 方法 | 用途 |
|---|---|---|
/workflows/run |
POST | 执行工作流 |
/workflows/run/:workflow_run_id |
GET | 查询工作流执行状态 |
/files/upload |
POST | 上传文件(供工作流使用) |
/parameters |
GET | 获取工作流输入参数定义 |
三、同步模式调用工作流(block)
同步调用会等待整个工作流所有节点执行完成后,一次性返回最终结果,适合执行速度快、无需实时查看过程的工作流,也是日常开发测试最常用方式。
3.1 完整实例代码
import requests
# ============ 基础配置项 ============
# Dify服务地址
BASE_URL = "http://localhost:8000/v1"
# 应用API密钥
API_KEY = "sk-xxxxxxxxxxxxxxxxxxxxxxxxxxxx"
# 工作流调用接口
RUN_WORKFLOW_URL = f"{BASE_URL}/workflows/run"
# 构造请求头
headers = {
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json"
}
# 构造请求参数
# query为工作流内定义好的入参变量名
request_data = {
"inputs": {
"query": "简单介绍下Python编程语言特点"
},
"response_mode": "block",
"user": "python_call_01"
}
def call_workflow_block():
"""同步调用Dify工作流"""
try:
# 发起POST请求
res = requests.post(url=RUN_WORKFLOW_URL, headers=headers, json=request_data)
result = res.json()
# 判断请求状态
if res.status_code == 200 and result.get("data", {}).get("status") == "succeeded":
print("工作流执行成功")
# 提取工作流最终输出结果
output_data = result["data"]["outputs"]
print("最终返回结果:", output_data)
else:
print("工作流执行失败", result)
except Exception as e:
print("请求出现异常:", str(e))
if __name__ == "__main__":
call_workflow_block()
3.2 同步返回结果字段解析表
| 返回字段 | 含义 |
|---|---|
| workflow_run_id | 本次工作流运行唯一编号 |
| status | 运行状态,succeeded 代表执行成功 |
| elapsed_time | 工作流执行耗时 (秒) |
| outputs | 工作流最终输出内容 |
| inputs | 本次调用传入的入参数据 |
四、流式模式调用工作流(stream)
流式调用会实时推送工作流执行日志、大模型生成内容,逐段返回数据,模拟打字机输出效果,适合生成大段文本内容的工作流。
4.1 完整实例代码
import requests
import json
# ============ 基础配置项 ============
BASE_URL = "http://localhost:8000/v1"
API_KEY = "sk-xxxxxxxxxxxxxxxxxxxxxxxxxxxx"
RUN_WORKFLOW_URL = f"{BASE_URL}/workflows/run"
headers = {
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json"
}
request_data = {
"inputs": {
"query": "说说日常学习编程有哪些实用技巧"
},
"response_mode": "stream",
"user": "python_call_02"
}
def call_workflow_stream():
"""流式调用Dify工作流"""
# 开启流式接收数据
res = requests.post(url=RUN_WORKFLOW_URL, headers=headers, json=request_data, stream=True)
# 循环读取流式数据
for line in res.iter_lines(decode_unicode=True):
if line and line.startswith("data:"):
# 清洗多余标识字符
stream_data = line.replace("data:", "").strip()
print(stream_data)
if __name__ == "__main__":
call_workflow_stream()
4.2 流式数据特点
- 数据会以
data:开头持续推送 - 会返回节点执行日志、节点运行结果、最终输出
- 适合前端实时展示 AI 生成内容场景
五、文件上传与工作流调用
当工作流需要处理文件(如文档分析、图片识别)时,需要先上传文件再调用工作流。
5.1 文件上传代码
import requests
BASE_URL = "http://localhost:8000/v1"
API_KEY = "sk-xxxxxxxxxxxxxxxxxxxxxxxxxxxx"
headers = {
"Authorization": f"Bearer {API_KEY}"
}
def upload_file(file_path: str, user: str = "python_user") -> dict:
"""
上传文件到 Dify
Args:
file_path: 本地文件路径
user: 用户标识
Returns:
上传结果,包含 file_id 等信息
"""
try:
with open(file_path, "rb") as f:
files = {"file": (file_path.split("/")[-1], f)}
data = {"user": user}
res = requests.post(
f"{BASE_URL}/files/upload",
headers=headers,
files=files,
data=data
)
result = res.json()
if res.status_code == 201:
print(f"✅ 文件上传成功")
print(f"📄 文件ID: {result['id']}")
print(f"📄 文件名称: {result['name']}")
return result
else:
print(f"❌ 文件上传失败: {result}")
return None
except FileNotFoundError:
print(f"❌ 文件不存在: {file_path}")
return None
except Exception as e:
print(f"⚠️ 上传异常: {str(e)}")
return None
# 使用示例
# file_info = upload_file("./document.pdf")
# file_id = file_info["id"] # 用于后续工作流调用
5.2 带文件输入的工作流调用
import requests
BASE_URL = "http://localhost:8000/v1"
API_KEY = "sk-xxxxxxxxxxxxxxxxxxxxxxxxxxxx"
headers = {
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json"
}
def call_workflow_with_file(file_id: str, query: str = "总结这份文档的主要内容"):
"""
调用需要文件输入的工作流
Args:
file_id: 上传文件后获取的 file_id
query: 对文件的提问或处理指令
"""
request_data = {
"inputs": {
"query": query,
"file": {
"transfer_method": "local_file",
"upload_file_id": file_id,
"type": "document" # 可选: document / image / audio / video
}
},
"response_mode": "block",
"user": "python_file_user"
}
try:
res = requests.post(
f"{BASE_URL}/workflows/run",
headers=headers,
json=request_data
)
result = res.json()
if res.status_code == 200:
outputs = result.get("data", {}).get("outputs", {})
print("📝 处理结果:", outputs)
return outputs
else:
print("❌ 调用失败:", result)
return None
except Exception as e:
print(f"⚠️ 异常: {str(e)}")
return None
六、生产级封装:Dify 工作流调用工具类
"""
Dify Workflow SDK
生产级封装,支持同步/流式调用、重试机制、超时控制、日志记录
"""
import requests
import json
import time
import logging
from typing import Optional, Dict, Any, Callable, Generator
from dataclasses import dataclass
from enum import Enum
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class ResponseMode(Enum):
"""响应模式枚举"""
BLOCK = "blocking"
STREAM = "streaming"
@dataclass
class DifyConfig:
"""Dify 配置类"""
base_url: str
api_key: str
timeout: int = 60
max_retries: int = 3
retry_interval: int = 2
@dataclass
class WorkflowResult:
"""工作流执行结果"""
success: bool
workflow_run_id: Optional[str] = None
outputs: Optional[Dict[str, Any]] = None
elapsed_time: Optional[float] = None
error_message: Optional[str] = None
raw_response: Optional[Dict] = None
class DifyWorkflowClient:
"""
Dify 工作流调用客户端
使用示例:
config = DifyConfig(
base_url="http://localhost:8000/v1",
api_key="sk-xxxxxxxx"
)
client = DifyWorkflowClient(config)
# 同步调用
result = client.run_sync(inputs={"query": "你好"})
# 流式调用
for chunk in client.run_stream(inputs={"query": "你好"}):
print(chunk, end="")
"""
def __init__(self, config: DifyConfig):
self.config = config
self.base_url = config.base_url.rstrip("/")
self.headers = {
"Authorization": f"Bearer {config.api_key}",
"Content-Type": "application/json"
}
logger.info(f"Dify客户端初始化完成,服务地址: {self.base_url}")
def _make_request(self, method: str, endpoint: str, **kwargs) -> requests.Response:
"""发送请求并处理重试"""
url = f"{self.base_url}{endpoint}"
kwargs.setdefault("timeout", self.config.timeout)
kwargs.setdefault("headers", self.headers)
for attempt in range(self.config.max_retries):
try:
logger.debug(f"请求 {method} {url},第 {attempt + 1} 次尝试")
response = requests.request(method, url, **kwargs)
return response
except requests.exceptions.Timeout:
logger.warning(f"请求超时,第 {attempt + 1} 次重试...")
if attempt < self.config.max_retries - 1:
time.sleep(self.config.retry_interval)
else:
raise
except requests.exceptions.ConnectionError as e:
logger.warning(f"连接错误: {str(e)},第 {attempt + 1} 次重试...")
if attempt < self.config.max_retries - 1:
time.sleep(self.config.retry_interval)
else:
raise
raise Exception("请求失败,已超出最大重试次数")
def run_sync(
self,
inputs: Dict[str, Any],
user: str = "default_user",
timeout: Optional[int] = None
) -> WorkflowResult:
"""
同步调用工作流
Args:
inputs: 工作流输入参数
user: 用户标识
timeout: 自定义超时时间(秒)
Returns:
WorkflowResult: 执行结果
"""
request_data = {
"inputs": inputs,
"response_mode": ResponseMode.BLOCK.value,
"user": user
}
try:
response = self._make_request(
"POST",
"/workflows/run",
json=request_data,
timeout=timeout or self.config.timeout
)
result = response.json()
if response.status_code != 200:
return WorkflowResult(
success=False,
error_message=f"HTTP {response.status_code}: {result}",
raw_response=result
)
data = result.get("data", {})
status = data.get("status")
if status == "succeeded":
return WorkflowResult(
success=True,
workflow_run_id=data.get("workflow_run_id"),
outputs=data.get("outputs"),
elapsed_time=data.get("elapsed_time"),
raw_response=result
)
else:
return WorkflowResult(
success=False,
workflow_run_id=data.get("workflow_run_id"),
error_message=f"工作流执行状态: {status}",
raw_response=result
)
except requests.exceptions.Timeout:
logger.error("同步调用超时")
return WorkflowResult(
success=False,
error_message="请求超时,请检查工作流执行时间或增加超时设置"
)
except Exception as e:
logger.error(f"同步调用异常: {str(e)}")
return WorkflowResult(
success=False,
error_message=str(e)
)
def run_stream(
self,
inputs: Dict[str, Any],
user: str = "default_user",
on_event: Optional[Callable[[str, Dict], None]] = None
) -> Generator[str, None, WorkflowResult]:
"""
流式调用工作流
Args:
inputs: 工作流输入参数
user: 用户标识
on_event: 事件回调函数,接收 (event_type, event_data)
Yields:
str: 文本片段
Returns:
WorkflowResult: 最终结果
"""
request_data = {
"inputs": inputs,
"response_mode": ResponseMode.STREAM.value,
"user": user
}
full_text = ""
final_result = WorkflowResult(success=False)
try:
response = self._make_request(
"POST",
"/workflows/run",
json=request_data,
stream=True
)
for line in response.iter_lines(decode_unicode=True):
if not line or not line.startswith("data:"):
continue
try:
json_str = line.replace("data:", "").strip()
if not json_str:
continue
event = json.loads(json_str)
event_type = event.get("event")
# 触发回调
if on_event:
on_event(event_type, event)
# 处理不同事件类型
if event_type == "workflow_started":
final_result.workflow_run_id = event.get("workflow_run_id")
logger.info(f"工作流开始: {final_result.workflow_run_id}")
elif event_type == "text_chunk":
text = event.get("data", {}).get("text", "")
full_text += text
yield text
elif event_type == "workflow_finished":
data = event.get("data", {})
final_result.success = data.get("status") == "succeeded"
final_result.elapsed_time = data.get("elapsed_time")
final_result.outputs = data.get("outputs")
logger.info(f"工作流完成,状态: {data.get('status')}")
elif event_type == "error":
final_result.error_message = event.get("message", "未知错误")
logger.error(f"工作流错误: {final_result.error_message}")
except json.JSONDecodeError:
continue
except Exception as e:
logger.warning(f"解析流数据异常: {str(e)}")
continue
final_result.raw_response = {"full_text": full_text}
return final_result
except Exception as e:
logger.error(f"流式调用异常: {str(e)}")
final_result.error_message = str(e)
return final_result
def get_status(self, workflow_run_id: str) -> Optional[Dict]:
"""
查询工作流执行状态
Args:
workflow_run_id: 工作流运行ID
Returns:
状态信息字典
"""
try:
response = self._make_request(
"GET",
f"/workflows/run/{workflow_run_id}"
)
return response.json().get("data")
except Exception as e:
logger.error(f"查询状态异常: {str(e)}")
return None
def upload_file(self, file_path: str, user: str = "default_user") -> Optional[Dict]:
"""
上传文件
Args:
file_path: 本地文件路径
user: 用户标识
Returns:
文件信息
"""
try:
with open(file_path, "rb") as f:
files = {"file": (file_path.split("/")[-1], f)}
data = {"user": user}
response = self._make_request(
"POST",
"/files/upload",
headers={"Authorization": f"Bearer {self.config.api_key}"},
files=files,
data=data
)
if response.status_code == 201:
return response.json()
else:
logger.error(f"文件上传失败: {response.text}")
return None
except Exception as e:
logger.error(f"文件上传异常: {str(e)}")
return None
# ============ 使用示例 ============
if __name__ == "__main__":
# 初始化配置
config = DifyConfig(
base_url="http://localhost:8000/v1",
api_key="sk-xxxxxxxxxxxxxxxxxxxxxxxxxxxx",
timeout=120,
max_retries=3
)
client = DifyWorkflowClient(config)
# 示例1: 同步调用
print("=" * 50)
print("同步调用示例")
print("=" * 50)
result = client.run_sync(
inputs={"query": "用Python写个快速排序"},
user="demo_user_01"
)
if result.success:
print(f"执行成功,耗时: {result.elapsed_time}s")
print(f"结果: {result.outputs}")
else:
print(f"执行失败: {result.error_message}")
# 示例2: 流式调用
print("\n" + "=" * 50)
print("流式调用示例")
print("=" * 50)
def on_event(event_type, event_data):
if event_type == "node_started":
print(f"\n[节点开始] {event_data.get('data', {}).get('title', '')}")
print("📝 AI回答: ", end="")
final = None
for text in client.run_stream(
inputs={"query": "介绍Python的GIL机制"},
user="demo_user_02",
on_event=on_event
):
print(text, end="", flush=True)
final = text # 最后一个 yield 会返回 WorkflowResult
print("\n")
# 示例3: 带文件的工作流
print("\n" + "=" * 50)
print("文件处理示例")
print("=" * 50)
file_info = client.upload_file("./sample.pdf")
if file_info:
file_id = file_info["id"]
result = client.run_sync(
inputs={
"query": "总结文档内容",
"file": {
"transfer_method": "local_file",
"upload_file_id": file_id,
"type": "document"
}
},
user="demo_user_03"
)
print(f"📄 文档处理结果: {result.outputs}")
七、异步调用(asyncio + aiohttp)
对于高并发场景,使用异步方式调用:
import asyncio
import aiohttp
import json
from typing import Dict, Any
class AsyncDifyClient:
"""异步 Dify 客户端"""
def __init__(self, base_url: str, api_key: str):
self.base_url = base_url.rstrip("/")
self.api_key = api_key
self.headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
async def run_workflow(
self,
inputs: Dict[str, Any],
user: str = "async_user",
session: aiohttp.ClientSession = None
) -> Dict:
"""异步调用工作流"""
close_session = False
if session is None:
session = aiohttp.ClientSession()
close_session = True
try:
async with session.post(
f"{self.base_url}/workflows/run",
headers=self.headers,
json={
"inputs": inputs,
"response_mode": "blocking",
"user": user
}
) as response:
result = await response.json()
return result
finally:
if close_session:
await session.close()
async def batch_run(
self,
tasks: list,
max_concurrency: int = 5
) -> list:
"""
批量异步调用
Args:
tasks: [(inputs, user), ...] 任务列表
max_concurrency: 最大并发数
"""
semaphore = asyncio.Semaphore(max_concurrency)
async def _run_with_limit(inputs, user):
async with semaphore:
return await self.run_workflow(inputs, user)
results = await asyncio.gather(
*[_run_with_limit(inp, usr) for inp, usr in tasks],
return_exceptions=True
)
return results
# 使用示例
async def main():
client = AsyncDifyClient(
base_url="http://localhost:8000/v1",
api_key="sk-xxxxxxxx"
)
# 单个调用
result = await client.run_workflow(
inputs={"query": "Python异步编程介绍"}
)
print(result)
# 批量调用
tasks = [
({"query": f"问题{i}"}, f"user_{i}")
for i in range(10)
]
results = await client.batch_run(tasks, max_concurrency=3)
print(f"完成 {len(results)} 个任务")
# 运行
# asyncio.run(main())
八、常见问题排查对照表
| 报错状态码 | 异常现象 | 解决方案 |
|---|---|---|
| 401 | 鉴权失败 | 检查 API Key 是否填写错误,确认应用已发布 |
| 404 | 接口不存在 | 核对 Dify 服务地址,确认接口路径携带 v1 版本号 |
| 400 | 参数错误 | 检查 inputs 参数格式是否正确,变量名是否匹配 |
| 429 | 请求过于频繁 | 降低请求频率,或联系管理员调整限流策略 |
| 500 | 服务器内部错误 | 检查 Dify 服务日志,查看工作流节点是否报错 |
| 200 无结果 | 有响应无输出 | 检查 inputs 内变量名和工作流定义变量不一致 |
| 请求超时 | 长时间无响应 | 工作流节点过多,增加请求超时时间,优化工作流流程 |
| 流式中断 | 流式输出突然停止 | 检查网络稳定性,增加重试机制 |
九、总结
| 要点 | 说明 |
|---|---|
| 核心接口 | Python 调用 Dify 工作流核心使用 POST 请求调用官方 /workflows/run 接口 |
| 调用模式 | 同步模式适合获取最终结果,流式模式适合实时查看生成内容 |
| 参数匹配 | inputs 入参变量名必须严格匹配工作流配置变量,是调用成功核心要点 |
| 生产建议 | 建议使用封装类管理调用,增加重试、超时、日志等机制 |
| 应用场景 | 学会基础调用后,可将该逻辑整合进后端项目、自动化脚本、批量处理任务中,复用 Dify 搭建好的 AI 工作流能力 |
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐

所有评论(0)