如何让Agent停止输出,使用strands agents sdk
本文介绍了为Strands智能体实现停止请求(取消/中断)的多种方法。主要内容包括:1)基础取消功能:使用asyncio创建可取消任务,处理CancelledError异常;2)Web API实现:通过FastAPI构建支持取消的REST和WebSocket接口;3)A2A通信取消:在智能体间通信中实现任务取消机制;4)高级模式:优雅关闭、超时控制和协作式取消。关键要点包括使用asyncio.cr
停止请求(也称为取消或中断)允许你在智能体执行过程中中止运行。这对于长时间运行的任务、用户发起的取消操作和资源管理至关重要。让我展示几种实现方法:
获取知识
完美!让我为你展示为 Strands 智能体实现停止请求(取消/中断)的全面方法。这对于需要取消长时间运行操作的生产环境应用至关重要。
1. 基础 Asyncio 取消功能
1.1 简单任务取消
import asyncio
from strands import Agent, tool
import time
@tool
def long_running_task(duration: int = 10) -> str:
"""需要长时间运行的任务"""
for i in range(duration):
time.sleep(1) # 模拟工作
print(f"工作中... {i+1}/{duration}")
return f"任务在 {duration} 秒后完成"
agent = Agent(
name="可取消智能体",
tools=[long_running_task],
prompt="你执行可取消的长时间运行任务。"
)
class CancellableAgent:
def __init__(self, agent: Agent):
self.agent = agent
self.current_task = None
self.is_cancelled = False
async def run_with_cancellation(self, message: str, timeout: float = None):
"""运行支持取消的智能体"""
try:
# 创建可取消任务
self.current_task = asyncio.create_task(
self.agent.run_async(message)
)
if timeout:
# 带超时运行
result = await asyncio.wait_for(self.current_task, timeout=timeout)
else:
# 无超时运行
result = await self.current_task
return result
except asyncio.CancelledError:
print("❌ 智能体执行被取消")
return "任务被用户取消"
except asyncio.TimeoutError:
print("⏰ 智能体执行超时")
self.cancel()
return "任务超时"
except Exception as e:
print(f"❌ 错误: {e}")
return f"错误: {e}"
def cancel(self):
"""取消正在运行的智能体"""
if self.current_task and not self.current_task.done():
print("🛑 取消智能体执行...")
self.current_task.cancel()
self.is_cancelled = True
return True
return False
# 使用示例
async def cancellation_example():
"""智能体取消示例"""
cancellable = CancellableAgent(agent)
# 启动长时间运行任务
task = asyncio.create_task(
cancellable.run_with_cancellation("启动一个10秒的任务")
)
# 等待一会,然后取消
await asyncio.sleep(3)
print("用户要求取消...")
cancelled = cancellable.cancel()
if cancelled:
print("✅ 取消请求已发送")
# 等待任务完成(或被取消)
result = await task
print(f"最终结果: {result}")
# 运行示例
asyncio.run(cancellation_example())
1.2 支持取消的流式处理
python
import asyncio
from strands import Agent, tool
@tool
def streaming_task(steps: int = 20) -> str:
"""可在流式处理期间取消的任务"""
import time
results = []
for i in range(steps):
time.sleep(0.5) # 模拟工作
results.append(f"步骤 {i+1} 完成")
return f"所有 {steps} 个步骤完成: {', '.join(results)}"
streaming_agent = Agent(
name="流式处理智能体",
tools=[streaming_task],
prompt="你逐步执行流式处理任务。"
)
class CancellableStreamingAgent:
def __init__(self, agent: Agent):
self.agent = agent
self.current_stream = None
self.is_cancelled = False
async def stream_with_cancellation(self, message: str):
"""支持取消的流式处理"""
try:
self.is_cancelled = False
async for chunk in self.agent.stream_async(message):
# 检查取消
if self.is_cancelled:
print("\n🛑 流被用户取消")
break
print(chunk, end="", flush=True)
# 小延迟以允许取消检查
await asyncio.sleep(0.1)
if not self.is_cancelled:
print("\n✅ 流完成")
except asyncio.CancelledError:
print("\n❌ 流被取消")
except Exception as e:
print(f"\n❌ 流错误: {e}")
def cancel_stream(self):
"""取消当前流"""
self.is_cancelled = True
print("🛑 取消请求已发送...")
# 使用示例
async def streaming_cancellation_example():
"""流式处理取消示例"""
streaming_cancellable = CancellableStreamingAgent(streaming_agent)
# 启动流式处理任务
stream_task = asyncio.create_task(
streaming_cancellable.stream_with_cancellation("启动一个20步的流式处理任务")
)
# 3秒后取消
await asyncio.sleep(3)
streaming_cancellable.cancel_stream()
# 等待流结束
await stream_task
asyncio.run(streaming_cancellation_example())
2. 支持停止请求的 Web API
2.1 支持取消的 FastAPI
python
from fastapi import FastAPI, BackgroundTasks, HTTPException
from fastapi.responses import StreamingResponse
from strands import Agent, tool
import asyncio
import uuid
from typing import Dict
import json
app = FastAPI()
# 全局任务注册表
running_tasks: Dict[str, asyncio.Task] = {}
@tool
def long_computation(complexity: int = 10) -> str:
"""执行长时间计算"""
import time
total = 0
for i in range(complexity * 1000000):
total += i
if i % 100000 == 0:
time.sleep(0.01) # 允许取消点
return f"计算结果: {total}"
web_agent = Agent(
name="Web智能体",
tools=[long_computation],
prompt="你执行可取消的计算任务。"
)
@app.post("/start-task")
async def start_task(message: str):
"""启动可取消任务"""
task_id = str(uuid.uuid4())
async def run_agent():
try:
result = await web_agent.run_async(message)
return {"status": "completed", "result": str(result)}
except asyncio.CancelledError:
return {"status": "cancelled", "result": "任务被取消"}
except Exception as e:
return {"status": "error", "result": str(e)}
# 启动任务
task = asyncio.create_task(run_agent())
running_tasks[task_id] = task
return {"task_id": task_id, "status": "started"}
@app.post("/cancel-task/{task_id}")
async def cancel_task(task_id: str):
"""取消运行中的任务"""
if task_id not in running_tasks:
raise HTTPException(status_code=404, detail="任务未找到")
task = running_tasks[task_id]
if task.done():
return {"status": "already_completed"}
# 取消任务
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
# 清理
del running_tasks[task_id]
return {"status": "cancelled", "task_id": task_id}
@app.get("/task-status/{task_id}")
async def get_task_status(task_id: str):
"""获取任务状态"""
if task_id not in running_tasks:
raise HTTPException(status_code=404, detail="任务未找到")
task = running_tasks[task_id]
if task.done():
try:
result = await task
del running_tasks[task_id] # 清理已完成任务
return result
except asyncio.CancelledError:
del running_tasks[task_id]
return {"status": "cancelled"}
else:
return {"status": "running"}
@app.get("/stream-task")
async def stream_task(message: str):
"""流式处理可取消任务"""
task_id = str(uuid.uuid4())
async def generate_stream():
try:
async for chunk in web_agent.stream_async(message):
yield f"data: {json.dumps({'type': 'chunk', 'data': chunk, 'task_id': task_id})}\n\n"
await asyncio.sleep(0.1) # 允许取消
yield f"data: {json.dumps({'type': 'complete', 'task_id': task_id})}\n\n"
except asyncio.CancelledError:
yield f"data: {json.dumps({'type': 'cancelled', 'task_id': task_id})}\n\n"
return StreamingResponse(
generate_stream(),
media_type="text/event-stream",
headers={"X-Task-ID": task_id}
)
# 客户端使用示例:
"""
# 启动任务
response = requests.post("http://localhost:8000/start-task",
params={"message": "执行复杂度为50的复杂计算"})
task_id = response.json()["task_id"]
# 检查状态
status = requests.get(f"http://localhost:8000/task-status/{task_id}")
# 如果需要则取消
cancel_response = requests.post(f"http://localhost:8000/cancel-task/{task_id}")
"""
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
2.2 支持实时取消的 WebSocket
python
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from strands import Agent, tool
import asyncio
import json
import uuid
app = FastAPI()
@tool
def interactive_task(steps: int = 10) -> str:
"""可取消的交互式任务"""
import time
for i in range(steps):
time.sleep(1)
print(f"交互步骤 {i+1}/{steps}")
return f"交互任务完成,共 {steps} 步"
ws_agent = Agent(
name="WebSocket智能体",
tools=[interactive_task],
prompt="你通过WebSocket执行交互式任务。"
)
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
"""支持取消的WebSocket"""
await websocket.accept()
current_task = None
try:
while True:
# 接收消息
data = await websocket.receive_text()
message_data = json.loads(data)
if message_data["type"] == "start":
# 如果之前有任务在运行,则取消
if current_task and not current_task.done():
current_task.cancel()
# 启动新任务
async def run_task():
try:
async for chunk in ws_agent.stream_async(message_data["message"]):
await websocket.send_text(json.dumps({
"type": "chunk",
"data": chunk
}))
await asyncio.sleep(0.1)
await websocket.send_text(json.dumps({
"type": "complete"
}))
except asyncio.CancelledError:
await websocket.send_text(json.dumps({
"type": "cancelled"
}))
current_task = asyncio.create_task(run_task())
elif message_data["type"] == "cancel":
if current_task and not current_task.done():
current_task.cancel()
await websocket.send_text(json.dumps({
"type": "cancelled"
}))
else:
await websocket.send_text(json.dumps({
"type": "no_task_running"
}))
except WebSocketDisconnect:
if current_task and not current_task.done():
current_task.cancel()
# 客户端JavaScript示例:
"""
const ws = new WebSocket('ws://localhost:8000/ws');
// 启动任务
ws.send(JSON.stringify({
type: 'start',
message: '启动一个15步的交互式任务'
}));
// 取消任务
ws.send(JSON.stringify({
type: 'cancel'
}));
ws.onmessage = function(event) {
const data = JSON.parse(event.data);
console.log('收到:', data);
};
"""
3. 支持停止请求的 A2A
3.1 可取消的 A2A 客户端
python
from strands_tools.a2a_client import A2AClientToolProvider
from strands import Agent
import asyncio
import httpx
class CancellableA2AClient:
"""支持取消的A2A客户端"""
def __init__(self, agent_urls: list):
self.provider = A2AClientToolProvider(known_agent_urls=agent_urls)
self.client_agent = Agent(
name="可取消A2A客户端",
tools=self.provider.tools,
prompt="你与远程智能体协调,并且可以被取消。"
)
self.current_request = None
async def call_with_cancellation(self, message: str, timeout: float = 30):
"""调用支持取消的A2A智能体"""
try:
# 创建可取消任务
self.current_request = asyncio.create_task(
self.client_agent.run_async(message)
)
# 带超时等待
result = await asyncio.wait_for(self.current_request, timeout=timeout)
return {"status": "success", "result": str(result)}
except asyncio.CancelledError:
return {"status": "cancelled", "result": "请求被取消"}
except asyncio.TimeoutError:
self.cancel_current_request()
return {"status": "timeout", "result": f"请求在 {timeout} 秒后超时"}
except Exception as e:
return {"status": "error", "result": str(e)}
def cancel_current_request(self):
"""取消当前A2A请求"""
if self.current_request and not self.current_request.done():
self.current_request.cancel()
return True
return False
# 使用示例
async def a2a_cancellation_example():
"""A2A取消示例"""
a2a_client = CancellableA2AClient([
"http://localhost:9000",
"http://localhost:9001"
])
# 启动A2A请求
request_task = asyncio.create_task(
a2a_client.call_with_cancellation("执行复杂的多智能体计算", timeout=10)
)
# 3秒后取消
await asyncio.sleep(3)
cancelled = a2a_client.cancel_current_request()
if cancelled:
print("🛑 A2A请求已取消")
result = await request_task
print(f"结果: {result}")
# asyncio.run(a2a_cancellation_example())
3.2 可取消的 A2A 服务器
python
from strands import Agent, tool
from strands.multiagent.a2a import A2AServer
import asyncio
import threading
import time
class CancellableA2AServer:
"""支持取消的A2A服务器"""
def __init__(self, port: int):
self.current_operations = {} # 跟踪运行中的操作
self.agent = Agent(
name="可取消A2A服务器",
tools=[
self.cancellable_operation,
self.cancel_operation,
self.list_operations
],
prompt="你执行可通过操作ID取消的操作。"
)
self.server = A2AServer(agent=self.agent, port=port)
@tool
def cancellable_operation(self, operation_name: str, duration: int = 10) -> str:
"""启动可取消操作"""
import uuid
operation_id = str(uuid.uuid4())
# 创建取消事件
cancel_event = threading.Event()
self.current_operations[operation_id] = {
"name": operation_name,
"cancel_event": cancel_event,
"status": "running"
}
def long_operation():
"""实际长时间运行的操作"""
for i in range(duration):
if cancel_event.is_set():
self.current_operations[operation_id]["status"] = "cancelled"
return f"操作 {operation_name} 在第 {i} 步被取消"
time.sleep(1) # 模拟工作
self.current_operations[operation_id]["status"] = "completed"
return f"操作 {operation_name} 成功完成"
# 在后台线程中启动操作
thread = threading.Thread(target=long_operation, daemon=True)
thread.start()
return f"已启动操作 '{operation_name}',ID: {operation_id}"
@tool
def cancel_operation(self, operation_id: str) -> str:
"""取消运行中的操作"""
if operation_id not in self.current_operations:
return f"操作 {operation_id} 未找到"
operation = self.current_operations[operation_id]
if operation["status"] != "running":
return f"操作 {operation_id} 未在运行 (状态: {operation['status']})"
# 发送取消信号
operation["cancel_event"].set()
operation["status"] = "cancelling"
return f"已请求取消操作 {operation_id}"
@tool
def list_operations(self) -> str:
"""列出所有操作及其状态"""
if not self.current_operations:
return "没有运行中的操作"
operations = []
for op_id, op_info in self.current_operations.items():
operations.append(f"ID: {op_id}, 名称: {op_info['name']}, 状态: {op_info['status']}")
return "操作:\n" + "\n".join(operations)
def start_server(self):
"""启动可取消A2A服务器"""
print(f"🚀 在端口 {self.server.port} 上启动可取消A2A服务器")
self.server.serve()
# 使用示例
if __name__ == "__main__":
server = CancellableA2AServer(9000)
server.start_server()
4. 高级取消模式
4.1 带清理的优雅关闭
python
import asyncio
import signal
from strands import Agent, tool
import atexit
class GracefulAgent:
"""支持优雅关闭和清理的智能体"""
def __init__(self):
self.running_tasks = set()
self.is_shutting_down = False
# 注册信号处理器
signal.signal(signal.SIGINT, self.signal_handler)
signal.signal(signal.SIGTERM, self.signal_handler)
atexit.register(self.cleanup)
self.agent = Agent(
name="优雅智能体",
tools=[self.managed_task],
prompt="你执行支持优雅关闭的任务。"
)
@tool
def managed_task(self, task_name: str, duration: int = 5) -> str:
"""支持优雅关闭的任务"""
import time
for i in range(duration):
if self.is_shutting_down:
return f"任务 '{task_name}' 在第 {i} 步优雅停止"
time.sleep(1)
print(f"任务 '{task_name}' - 步骤 {i+1}/{duration}")
return f"任务 '{task_name}' 成功完成"
async def run_with_tracking(self, message: str):
"""运行带任务跟踪的智能体"""
task = asyncio.create_task(self.agent.run_async(message))
self.running_tasks.add(task)
try:
result = await task
return result
finally:
self.running_tasks.discard(task)
def signal_handler(self, signum, frame):
"""处理关闭信号"""
print(f"\n🛑 收到信号 {signum},启动优雅关闭...")
self.is_shutting_down = True
# 取消所有运行中的任务
for task in self.running_tasks:
if not task.done():
task.cancel()
def cleanup(self):
"""退出时清理"""
print("🧹 执行清理...")
self.is_shutting_down = True
# 使用示例
async def graceful_shutdown_example():
"""优雅关闭示例"""
graceful_agent = GracefulAgent()
try:
# 启动多个任务
tasks = [
graceful_agent.run_with_tracking(f"启动受管任务 'Task-{i}',时长10秒")
for i in range(3)
]
# 等待所有任务
results = await asyncio.gather(*tasks, return_exceptions=True)
for i, result in enumerate(results):
print(f"任务 {i} 结果: {result}")
except KeyboardInterrupt:
print("被用户中断")
# 使用 Ctrl+C 运行以测试优雅关闭
# asyncio.run(graceful_shutdown_example())
4.2 基于超时的取消
python
import asyncio
from strands import Agent, tool
from typing import Optional
class TimeoutAgent:
"""可配置超时的智能体"""
def __init__(self, default_timeout: float = 30):
self.default_timeout = default_timeout
self.agent = Agent(
name="超时智能体",
tools=[self.timed_operation],
prompt="你执行支持超时的操作。"
)
@tool
def timed_operation(self, operation: str, duration: int = 5) -> str:
"""可能超时的操作"""
import time
start_time = time.time()
for i in range(duration):
time.sleep(1)
elapsed = time.time() - start_time
print(f"操作 '{operation}' - 已用时 {elapsed:.1f}秒")
return f"操作 '{operation}' 在 {duration} 秒内完成"
async def run_with_timeout(self, message: str, timeout: Optional[float] = None) -> dict:
"""带超时运行智能体"""
timeout = timeout or self.default_timeout
try:
result = await asyncio.wait_for(
self.agent.run_async(message),
timeout=timeout
)
return {
"status": "success",
"result": str(result),
"timeout": timeout
}
except asyncio.TimeoutError:
return {
"status": "timeout",
"result": f"操作在 {timeout} 秒后超时",
"timeout": timeout
}
except Exception as e:
return {
"status": "error",
"result": str(e),
"timeout": timeout
}
# 使用示例
async def timeout_example():
"""基于超时的取消示例"""
timeout_agent = TimeoutAgent(default_timeout=5)
# 这个会超时
result1 = await timeout_agent.run_with_timeout(
"启动定时操作 'LongTask',时长10秒",
timeout=3
)
print(f"结果 1: {result1}")
# 这个会完成
result2 = await timeout_agent.run_with_timeout(
"启动定时操作 'ShortTask',时长2秒",
timeout=5
)
print(f"结果 2: {result2}")
asyncio.run(timeout_example())
关键要点:
-
使用
asyncio.create_task()创建可取消任务 -
调用
task.cancel()请求取消 -
在代码中处理
asyncio.CancelledError -
使用
asyncio.wait_for()实现基于超时的取消 -
在长时间运行的操作中实现协作式取消
-
跟踪运行中的任务以进行适当清理
-
处理信号以实现优雅关闭
-
向用户提供取消状态的反馈
这些模式让你完全控制智能体执行,并允许用户在需要时停止操作!🛑
火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。
更多推荐
所有评论(0)