Python 实现调用 Dify 工作流实战博客

简介

Dify 是开源低代码 AI 应用搭建平台,支持可视化拖拽搭建 AI 工作流,可自由组合大模型问答、工具调用、数据处理、条件分支等节点。工作流发布后可通过官方 API 实现外部程序调用。本篇文章使用 Python 语言,基于 requests 库完成 Dify 工作流同步调用、流式调用实操,搭配参数表格解析、实例代码、返回值说明,零基础也能快速上手对接 Dify 工作流。


一、前期环境准备

1.1 Dify 端准备工作

  • 部署 Dify 服务(本地部署 / 线上服务均可)
  • 新建工作流应用,编排业务流程并调试运行正常
  • 将工作流正式发布,未发布应用无法调用 API
  • 获取应用 API Key、Dify 服务访问地址

1.2 Python 环境依赖

仅需安装网络请求库 requests,用于发起 Http 接口请求

pip install requests

二、Dify 工作流核心调用参数说明

调用 Dify 工作流 /workflows/run 接口存在固定请求头、请求体参数,下方用表格清晰标注参数作用与填写规则。

2.1 请求头参数表

参数键名 参数值格式 作用
Authorization Bearer + APIKey 接口身份鉴权,校验调用权限
Content-Type application/json 声明请求数据格式为 JSON

2.2 请求体核心参数表

参数名 可选值 说明
inputs 字典格式 传入工作流自定义变量,键名必须和工作流变量名一致
response_mode block / stream block 同步一次性返回结果;stream 流式实时返回数据
user 自定义字符串 标记调用用户标识,用于后台统计调用记录

2.3 接口基础地址

部署方式 接口地址
本地部署 http://localhost:8000/v1/workflows/run
线上部署 https://你的域名/v1/workflows/run

2.4 其他常用 API 接口

接口 方法 用途
/workflows/run POST 执行工作流
/workflows/run/:workflow_run_id GET 查询工作流执行状态
/files/upload POST 上传文件(供工作流使用)
/parameters GET 获取工作流输入参数定义

三、同步模式调用工作流(block)

同步调用会等待整个工作流所有节点执行完成后,一次性返回最终结果,适合执行速度快、无需实时查看过程的工作流,也是日常开发测试最常用方式。

3.1 完整实例代码

import requests

# ============ 基础配置项 ============
# Dify服务地址
BASE_URL = "http://localhost:8000/v1"
# 应用API密钥
API_KEY = "sk-xxxxxxxxxxxxxxxxxxxxxxxxxxxx"
# 工作流调用接口
RUN_WORKFLOW_URL = f"{BASE_URL}/workflows/run"

# 构造请求头
headers = {
    "Authorization": f"Bearer {API_KEY}",
    "Content-Type": "application/json"
}

# 构造请求参数
# query为工作流内定义好的入参变量名
request_data = {
    "inputs": {
        "query": "简单介绍下Python编程语言特点"
    },
    "response_mode": "block",
    "user": "python_call_01"
}

def call_workflow_block():
    """同步调用Dify工作流"""
    try:
        # 发起POST请求
        res = requests.post(url=RUN_WORKFLOW_URL, headers=headers, json=request_data)
        result = res.json()

        # 判断请求状态
        if res.status_code == 200 and result.get("data", {}).get("status") == "succeeded":
            print("工作流执行成功")
            # 提取工作流最终输出结果
            output_data = result["data"]["outputs"]
            print("最终返回结果:", output_data)
        else:
            print("工作流执行失败", result)
    except Exception as e:
        print("请求出现异常:", str(e))


if __name__ == "__main__":
    call_workflow_block()

3.2 同步返回结果字段解析表

返回字段 含义
workflow_run_id 本次工作流运行唯一编号
status 运行状态,succeeded 代表执行成功
elapsed_time 工作流执行耗时 (秒)
outputs 工作流最终输出内容
inputs 本次调用传入的入参数据

四、流式模式调用工作流(stream)

流式调用会实时推送工作流执行日志、大模型生成内容,逐段返回数据,模拟打字机输出效果,适合生成大段文本内容的工作流。

4.1 完整实例代码

import requests
import json

# ============ 基础配置项 ============
BASE_URL = "http://localhost:8000/v1"
API_KEY = "sk-xxxxxxxxxxxxxxxxxxxxxxxxxxxx"
RUN_WORKFLOW_URL = f"{BASE_URL}/workflows/run"

headers = {
    "Authorization": f"Bearer {API_KEY}",
    "Content-Type": "application/json"
}

request_data = {
    "inputs": {
        "query": "说说日常学习编程有哪些实用技巧"
    },
    "response_mode": "stream",
    "user": "python_call_02"
}

def call_workflow_stream():
    """流式调用Dify工作流"""
    # 开启流式接收数据
    res = requests.post(url=RUN_WORKFLOW_URL, headers=headers, json=request_data, stream=True)
    # 循环读取流式数据
    for line in res.iter_lines(decode_unicode=True):
        if line and line.startswith("data:"):
            # 清洗多余标识字符
            stream_data = line.replace("data:", "").strip()
            print(stream_data)


if __name__ == "__main__":
    call_workflow_stream()

4.2 流式数据特点

  • 数据会以 data: 开头持续推送
  • 会返回节点执行日志、节点运行结果、最终输出
  • 适合前端实时展示 AI 生成内容场景

五、文件上传与工作流调用

当工作流需要处理文件(如文档分析、图片识别)时,需要先上传文件再调用工作流。

5.1 文件上传代码

import requests

BASE_URL = "http://localhost:8000/v1"
API_KEY = "sk-xxxxxxxxxxxxxxxxxxxxxxxxxxxx"

headers = {
    "Authorization": f"Bearer {API_KEY}"
}

def upload_file(file_path: str, user: str = "python_user") -> dict:
    """
    上传文件到 Dify
    
    Args:
        file_path: 本地文件路径
        user: 用户标识
    
    Returns:
        上传结果,包含 file_id 等信息
    """
    try:
        with open(file_path, "rb") as f:
            files = {"file": (file_path.split("/")[-1], f)}
            data = {"user": user}
            
            res = requests.post(
                f"{BASE_URL}/files/upload",
                headers=headers,
                files=files,
                data=data
            )
            
            result = res.json()
            
            if res.status_code == 201:
                print(f"✅ 文件上传成功")
                print(f"📄 文件ID: {result['id']}")
                print(f"📄 文件名称: {result['name']}")
                return result
            else:
                print(f"❌ 文件上传失败: {result}")
                return None
                
    except FileNotFoundError:
        print(f"❌ 文件不存在: {file_path}")
        return None
    except Exception as e:
        print(f"⚠️ 上传异常: {str(e)}")
        return None

# 使用示例
# file_info = upload_file("./document.pdf")
# file_id = file_info["id"]  # 用于后续工作流调用

5.2 带文件输入的工作流调用

import requests

BASE_URL = "http://localhost:8000/v1"
API_KEY = "sk-xxxxxxxxxxxxxxxxxxxxxxxxxxxx"

headers = {
    "Authorization": f"Bearer {API_KEY}",
    "Content-Type": "application/json"
}

def call_workflow_with_file(file_id: str, query: str = "总结这份文档的主要内容"):
    """
    调用需要文件输入的工作流
    
    Args:
        file_id: 上传文件后获取的 file_id
        query: 对文件的提问或处理指令
    """
    request_data = {
        "inputs": {
            "query": query,
            "file": {
                "transfer_method": "local_file",
                "upload_file_id": file_id,
                "type": "document"  # 可选: document / image / audio / video
            }
        },
        "response_mode": "block",
        "user": "python_file_user"
    }
    
    try:
        res = requests.post(
            f"{BASE_URL}/workflows/run",
            headers=headers,
            json=request_data
        )
        result = res.json()
        
        if res.status_code == 200:
            outputs = result.get("data", {}).get("outputs", {})
            print("📝 处理结果:", outputs)
            return outputs
        else:
            print("❌ 调用失败:", result)
            return None
            
    except Exception as e:
        print(f"⚠️ 异常: {str(e)}")
        return None

六、生产级封装:Dify 工作流调用工具类

"""
Dify Workflow SDK
生产级封装,支持同步/流式调用、重试机制、超时控制、日志记录
"""

import requests
import json
import time
import logging
from typing import Optional, Dict, Any, Callable, Generator
from dataclasses import dataclass
from enum import Enum

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)


class ResponseMode(Enum):
    """响应模式枚举"""
    BLOCK = "blocking"
    STREAM = "streaming"


@dataclass
class DifyConfig:
    """Dify 配置类"""
    base_url: str
    api_key: str
    timeout: int = 60
    max_retries: int = 3
    retry_interval: int = 2


@dataclass
class WorkflowResult:
    """工作流执行结果"""
    success: bool
    workflow_run_id: Optional[str] = None
    outputs: Optional[Dict[str, Any]] = None
    elapsed_time: Optional[float] = None
    error_message: Optional[str] = None
    raw_response: Optional[Dict] = None


class DifyWorkflowClient:
    """
    Dify 工作流调用客户端
    
    使用示例:
        config = DifyConfig(
            base_url="http://localhost:8000/v1",
            api_key="sk-xxxxxxxx"
        )
        client = DifyWorkflowClient(config)
        
        # 同步调用
        result = client.run_sync(inputs={"query": "你好"})
        
        # 流式调用
        for chunk in client.run_stream(inputs={"query": "你好"}):
            print(chunk, end="")
    """
    
    def __init__(self, config: DifyConfig):
        self.config = config
        self.base_url = config.base_url.rstrip("/")
        self.headers = {
            "Authorization": f"Bearer {config.api_key}",
            "Content-Type": "application/json"
        }
        logger.info(f"Dify客户端初始化完成,服务地址: {self.base_url}")
    
    def _make_request(self, method: str, endpoint: str, **kwargs) -> requests.Response:
        """发送请求并处理重试"""
        url = f"{self.base_url}{endpoint}"
        kwargs.setdefault("timeout", self.config.timeout)
        kwargs.setdefault("headers", self.headers)
        
        for attempt in range(self.config.max_retries):
            try:
                logger.debug(f"请求 {method} {url},第 {attempt + 1} 次尝试")
                response = requests.request(method, url, **kwargs)
                return response
                
            except requests.exceptions.Timeout:
                logger.warning(f"请求超时,第 {attempt + 1} 次重试...")
                if attempt < self.config.max_retries - 1:
                    time.sleep(self.config.retry_interval)
                else:
                    raise
                    
            except requests.exceptions.ConnectionError as e:
                logger.warning(f"连接错误: {str(e)},第 {attempt + 1} 次重试...")
                if attempt < self.config.max_retries - 1:
                    time.sleep(self.config.retry_interval)
                else:
                    raise
        
        raise Exception("请求失败,已超出最大重试次数")
    
    def run_sync(
        self,
        inputs: Dict[str, Any],
        user: str = "default_user",
        timeout: Optional[int] = None
    ) -> WorkflowResult:
        """
        同步调用工作流
        
        Args:
            inputs: 工作流输入参数
            user: 用户标识
            timeout: 自定义超时时间(秒)
        
        Returns:
            WorkflowResult: 执行结果
        """
        request_data = {
            "inputs": inputs,
            "response_mode": ResponseMode.BLOCK.value,
            "user": user
        }
        
        try:
            response = self._make_request(
                "POST",
                "/workflows/run",
                json=request_data,
                timeout=timeout or self.config.timeout
            )
            
            result = response.json()
            
            if response.status_code != 200:
                return WorkflowResult(
                    success=False,
                    error_message=f"HTTP {response.status_code}: {result}",
                    raw_response=result
                )
            
            data = result.get("data", {})
            status = data.get("status")
            
            if status == "succeeded":
                return WorkflowResult(
                    success=True,
                    workflow_run_id=data.get("workflow_run_id"),
                    outputs=data.get("outputs"),
                    elapsed_time=data.get("elapsed_time"),
                    raw_response=result
                )
            else:
                return WorkflowResult(
                    success=False,
                    workflow_run_id=data.get("workflow_run_id"),
                    error_message=f"工作流执行状态: {status}",
                    raw_response=result
                )
                
        except requests.exceptions.Timeout:
            logger.error("同步调用超时")
            return WorkflowResult(
                success=False,
                error_message="请求超时,请检查工作流执行时间或增加超时设置"
            )
        except Exception as e:
            logger.error(f"同步调用异常: {str(e)}")
            return WorkflowResult(
                success=False,
                error_message=str(e)
            )
    
    def run_stream(
        self,
        inputs: Dict[str, Any],
        user: str = "default_user",
        on_event: Optional[Callable[[str, Dict], None]] = None
    ) -> Generator[str, None, WorkflowResult]:
        """
        流式调用工作流
        
        Args:
            inputs: 工作流输入参数
            user: 用户标识
            on_event: 事件回调函数,接收 (event_type, event_data)
        
        Yields:
            str: 文本片段
        
        Returns:
            WorkflowResult: 最终结果
        """
        request_data = {
            "inputs": inputs,
            "response_mode": ResponseMode.STREAM.value,
            "user": user
        }
        
        full_text = ""
        final_result = WorkflowResult(success=False)
        
        try:
            response = self._make_request(
                "POST",
                "/workflows/run",
                json=request_data,
                stream=True
            )
            
            for line in response.iter_lines(decode_unicode=True):
                if not line or not line.startswith("data:"):
                    continue
                
                try:
                    json_str = line.replace("data:", "").strip()
                    if not json_str:
                        continue
                        
                    event = json.loads(json_str)
                    event_type = event.get("event")
                    
                    # 触发回调
                    if on_event:
                        on_event(event_type, event)
                    
                    # 处理不同事件类型
                    if event_type == "workflow_started":
                        final_result.workflow_run_id = event.get("workflow_run_id")
                        logger.info(f"工作流开始: {final_result.workflow_run_id}")
                    
                    elif event_type == "text_chunk":
                        text = event.get("data", {}).get("text", "")
                        full_text += text
                        yield text
                    
                    elif event_type == "workflow_finished":
                        data = event.get("data", {})
                        final_result.success = data.get("status") == "succeeded"
                        final_result.elapsed_time = data.get("elapsed_time")
                        final_result.outputs = data.get("outputs")
                        logger.info(f"工作流完成,状态: {data.get('status')}")
                    
                    elif event_type == "error":
                        final_result.error_message = event.get("message", "未知错误")
                        logger.error(f"工作流错误: {final_result.error_message}")
                        
                except json.JSONDecodeError:
                    continue
                except Exception as e:
                    logger.warning(f"解析流数据异常: {str(e)}")
                    continue
            
            final_result.raw_response = {"full_text": full_text}
            return final_result
            
        except Exception as e:
            logger.error(f"流式调用异常: {str(e)}")
            final_result.error_message = str(e)
            return final_result
    
    def get_status(self, workflow_run_id: str) -> Optional[Dict]:
        """
        查询工作流执行状态
        
        Args:
            workflow_run_id: 工作流运行ID
        
        Returns:
            状态信息字典
        """
        try:
            response = self._make_request(
                "GET",
                f"/workflows/run/{workflow_run_id}"
            )
            return response.json().get("data")
        except Exception as e:
            logger.error(f"查询状态异常: {str(e)}")
            return None
    
    def upload_file(self, file_path: str, user: str = "default_user") -> Optional[Dict]:
        """
        上传文件
        
        Args:
            file_path: 本地文件路径
            user: 用户标识
        
        Returns:
            文件信息
        """
        try:
            with open(file_path, "rb") as f:
                files = {"file": (file_path.split("/")[-1], f)}
                data = {"user": user}
                
                response = self._make_request(
                    "POST",
                    "/files/upload",
                    headers={"Authorization": f"Bearer {self.config.api_key}"},
                    files=files,
                    data=data
                )
                
                if response.status_code == 201:
                    return response.json()
                else:
                    logger.error(f"文件上传失败: {response.text}")
                    return None
                    
        except Exception as e:
            logger.error(f"文件上传异常: {str(e)}")
            return None


# ============ 使用示例 ============

if __name__ == "__main__":
    # 初始化配置
    config = DifyConfig(
        base_url="http://localhost:8000/v1",
        api_key="sk-xxxxxxxxxxxxxxxxxxxxxxxxxxxx",
        timeout=120,
        max_retries=3
    )
    
    client = DifyWorkflowClient(config)
    
    # 示例1: 同步调用
    print("=" * 50)
    print("同步调用示例")
    print("=" * 50)
    
    result = client.run_sync(
        inputs={"query": "用Python写个快速排序"},
        user="demo_user_01"
    )
    
    if result.success:
        print(f"执行成功,耗时: {result.elapsed_time}s")
        print(f"结果: {result.outputs}")
    else:
        print(f"执行失败: {result.error_message}")
    
    # 示例2: 流式调用
    print("\n" + "=" * 50)
    print("流式调用示例")
    print("=" * 50)
    
    def on_event(event_type, event_data):
        if event_type == "node_started":
            print(f"\n[节点开始] {event_data.get('data', {}).get('title', '')}")
    
    print("📝 AI回答: ", end="")
    final = None
    for text in client.run_stream(
        inputs={"query": "介绍Python的GIL机制"},
        user="demo_user_02",
        on_event=on_event
    ):
        print(text, end="", flush=True)
        final = text  # 最后一个 yield 会返回 WorkflowResult
    
    print("\n")
    
    # 示例3: 带文件的工作流
    print("\n" + "=" * 50)
    print("文件处理示例")
    print("=" * 50)
    
    file_info = client.upload_file("./sample.pdf")
    if file_info:
        file_id = file_info["id"]
        result = client.run_sync(
            inputs={
                "query": "总结文档内容",
                "file": {
                    "transfer_method": "local_file",
                    "upload_file_id": file_id,
                    "type": "document"
                }
            },
            user="demo_user_03"
        )
        print(f"📄 文档处理结果: {result.outputs}")

七、异步调用(asyncio + aiohttp)

对于高并发场景,使用异步方式调用:

import asyncio
import aiohttp
import json
from typing import Dict, Any


class AsyncDifyClient:
    """异步 Dify 客户端"""
    
    def __init__(self, base_url: str, api_key: str):
        self.base_url = base_url.rstrip("/")
        self.api_key = api_key
        self.headers = {
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json"
        }
    
    async def run_workflow(
        self,
        inputs: Dict[str, Any],
        user: str = "async_user",
        session: aiohttp.ClientSession = None
    ) -> Dict:
        """异步调用工作流"""
        close_session = False
        if session is None:
            session = aiohttp.ClientSession()
            close_session = True
        
        try:
            async with session.post(
                f"{self.base_url}/workflows/run",
                headers=self.headers,
                json={
                    "inputs": inputs,
                    "response_mode": "blocking",
                    "user": user
                }
            ) as response:
                result = await response.json()
                return result
        finally:
            if close_session:
                await session.close()
    
    async def batch_run(
        self,
        tasks: list,
        max_concurrency: int = 5
    ) -> list:
        """
        批量异步调用
        
        Args:
            tasks: [(inputs, user), ...] 任务列表
            max_concurrency: 最大并发数
        """
        semaphore = asyncio.Semaphore(max_concurrency)
        
        async def _run_with_limit(inputs, user):
            async with semaphore:
                return await self.run_workflow(inputs, user)
        
        results = await asyncio.gather(
            *[_run_with_limit(inp, usr) for inp, usr in tasks],
            return_exceptions=True
        )
        return results


# 使用示例
async def main():
    client = AsyncDifyClient(
        base_url="http://localhost:8000/v1",
        api_key="sk-xxxxxxxx"
    )
    
    # 单个调用
    result = await client.run_workflow(
        inputs={"query": "Python异步编程介绍"}
    )
    print(result)
    
    # 批量调用
    tasks = [
        ({"query": f"问题{i}"}, f"user_{i}")
        for i in range(10)
    ]
    results = await client.batch_run(tasks, max_concurrency=3)
    print(f"完成 {len(results)} 个任务")


# 运行
# asyncio.run(main())

八、常见问题排查对照表

报错状态码 异常现象 解决方案
401 鉴权失败 检查 API Key 是否填写错误,确认应用已发布
404 接口不存在 核对 Dify 服务地址,确认接口路径携带 v1 版本号
400 参数错误 检查 inputs 参数格式是否正确,变量名是否匹配
429 请求过于频繁 降低请求频率,或联系管理员调整限流策略
500 服务器内部错误 检查 Dify 服务日志,查看工作流节点是否报错
200 无结果 有响应无输出 检查 inputs 内变量名和工作流定义变量不一致
请求超时 长时间无响应 工作流节点过多,增加请求超时时间,优化工作流流程
流式中断 流式输出突然停止 检查网络稳定性,增加重试机制

九、总结

要点 说明
核心接口 Python 调用 Dify 工作流核心使用 POST 请求调用官方 /workflows/run 接口
调用模式 同步模式适合获取最终结果,流式模式适合实时查看生成内容
参数匹配 inputs 入参变量名必须严格匹配工作流配置变量,是调用成功核心要点
生产建议 建议使用封装类管理调用,增加重试、超时、日志等机制
应用场景 学会基础调用后,可将该逻辑整合进后端项目、自动化脚本、批量处理任务中,复用 Dify 搭建好的 AI 工作流能力
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐