dify中通过api调用workflow(输入变量中包含图片)
本文介绍了如何通过API调用Dify工作流(包含图片输入变量)。主要内容包括:1)上传图片文件到Dify平台,根据文件扩展名判断MIME类型;2)执行工作流时通过"transfer_method": "local_file"参数传递已上传的文件ID;3)提供了完整的Python示例代码,包含文件上传、工作流执行和结果提取功能。代码支持处理多种文件类型(PNG
·
dify中通过api调用workflow(输入变量中包含图片)
现有一个这样的dify工作流
工作流需要上传一个图片变量img
使用api调用这个工作流的代码如下
import requests
import os
import json
# ========================
# 上传文件到 Dify
# ========================
def upload_file(file_path, user):
upload_url = "https://api.dify.ai/v1/files/upload"
headers = {
"Authorization": "Bearer **********", # 这里换成你的 App Key
}
# 根据扩展名判断 MIME 和文件类型
ext = os.path.splitext(file_path)[1].lower()
if ext == ".txt":
mime_type, file_type = "text/plain", "TXT"
elif ext == ".png":
mime_type, file_type = "image/png", "IMAGE"
elif ext == ".jpg" or ext == ".jpeg":
mime_type, file_type = "image/jpeg", "IMAGE"
elif ext == ".pdf":
mime_type, file_type = "application/pdf", "PDF"
else:
raise ValueError(f"不支持的文件类型: {ext}")
try:
print(f"上传文件中 ({file_type}) ...")
with open(file_path, 'rb') as file:
files = {
'file': (os.path.basename(file_path), file, mime_type)
}
data = {
"user": user,
"type": file_type
}
response = requests.post(upload_url, headers=headers, files=files, data=data)
if response.status_code == 201:
print("文件上传成功")
return response.json().get("id")
else:
print(f"文件上传失败,状态码: {response.status_code}, 返回: {response.text}")
return None
except Exception as e:
print(f"发生错误: {str(e)}")
return None
# ========================
# 运行工作流
# ========================
def run_workflow(file_id, user, response_mode="blocking"):
workflow_url = "https://api.dify.ai/v1/workflows/run"
headers = {
"Authorization": "Bearer *********", # 这里换成你的 App Key
"Content-Type": "application/json"
}
data = {
"inputs": {
"img": { # 这里是工作流中输入变量的名字,如果需要上传多张图片就将这里改成列表
"transfer_method": "local_file",
"upload_file_id": file_id,
"type": "image"
}
},
"response_mode": response_mode,
"user": user
}
try:
print("运行工作流...")
response = requests.post(workflow_url, headers=headers, json=data)
if response.status_code == 200:
print("工作流执行成功")
return response.json()
else:
print(f"工作流执行失败,状态码: {response.status_code}, 返回: {response.text}")
return {"status": "error", "message": response.text}
except Exception as e:
print(f"发生错误: {str(e)}")
return {"status": "error", "message": str(e)}
# ========================
# 使用示例
# ========================
if __name__ == "__main__":
file_path = "tess.png" # 比如 "test.png"
user = "difyuser"
file_id = upload_file(file_path, user)
if file_id:
result = run_workflow(file_id, user)
try:
text_output = result["data"]["outputs"]["text"]
print("提取到的 text 内容:\n")
print(text_output)
except KeyError as e:
print(f"解析失败,缺少字段: {e}")
else:
print("文件上传失败,无法执行工作流")
火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。
更多推荐
所有评论(0)