Autogen_core源码:_cancellation_token.py
类提供了一种机制,允许用户在需要时取消挂起的异步调用。通过添加回调函数和关联Future对象,当调用cancel方法时,所有与该取消令牌关联的操作都将被取消。同时,使用线程锁确保了在多线程环境下的线程安全。
·
目录
_cancellation_token.py代码
import threading
from asyncio import Future
from typing import Any, Callable, List
class CancellationToken:
"""A token used to cancel pending async calls"""
def __init__(self) -> None:
self._cancelled: bool = False
self._lock: threading.Lock = threading.Lock()
self._callbacks: List[Callable[[], None]] = []
def cancel(self) -> None:
"""Cancel pending async calls linked to this cancellation token."""
with self._lock:
if not self._cancelled:
self._cancelled = True
for callback in self._callbacks:
callback()
def is_cancelled(self) -> bool:
"""Check if the CancellationToken has been used"""
with self._lock:
return self._cancelled
def add_callback(self, callback: Callable[[], None]) -> None:
"""Attach a callback that will be called when cancel is invoked"""
with self._lock:
if self._cancelled:
callback()
else:
self._callbacks.append(callback)
def link_future(self, future: Future[Any]) -> Future[Any]:
"""Link a pending async call to a token to allow its cancellation"""
with self._lock:
if self._cancelled:
future.cancel()
else:
def _cancel() -> None:
future.cancel()
self._callbacks.append(_cancel)
return future
代码解释
这段Python代码定义了一个名为CancellationToken的类,其主要功能是提供一种机制,用于取消挂起的异步调用。下面详细解释代码的逻辑和功能:
类的初始化
def __init__(self) -> None:
self._cancelled: bool = False
self._lock: threading.Lock = threading.Lock()
self._callbacks: List[Callable[[], None]] = []
_cancelled:一个布尔类型的私有变量,用于标记该取消令牌是否已经被使用(即是否已经调用了cancel方法),初始值为False。_lock:一个线程锁对象,用于确保在多线程环境下对共享资源(如_cancelled和_callbacks)的访问是线程安全的。_callbacks:一个列表,用于存储当调用cancel方法时需要执行的回调函数。
取消操作
def cancel(self) -> None:
"""Cancel pending async calls linked to this cancellation token."""
with self._lock:
if not self._cancelled:
self._cancelled = True
for callback in self._callbacks:
callback()
cancel方法用于取消与该取消令牌关联的所有挂起的异步调用。- 使用
with self._lock语句确保在修改_cancelled状态和执行回调函数时不会出现竞态条件。 - 只有当
_cancelled为False时,才会将其设置为True,并依次执行_callbacks列表中的所有回调函数。
检查取消状态
def is_cancelled(self) -> bool:
"""Check if the CancellationToken has been used"""
with self._lock:
return self._cancelled
is_cancelled方法用于检查该取消令牌是否已经被使用。- 使用
with self._lock语句确保在读取_cancelled状态时不会出现竞态条件。 - 返回
_cancelled的值。
添加回调函数
def add_callback(self, callback: Callable[[], None]) -> None:
"""Attach a callback that will be called when cancel is invoked"""
with self._lock:
if self._cancelled:
callback()
else:
self._callbacks.append(callback)
add_callback方法用于添加一个回调函数,当调用cancel方法时,该回调函数将被执行。- 使用
with self._lock语句确保在检查_cancelled状态和修改_callbacks列表时不会出现竞态条件。 - 如果
_cancelled为True,说明取消操作已经发生,直接执行回调函数;否则,将回调函数添加到_callbacks列表中。
关联异步Future对象
def link_future(self, future: Future[Any]) -> Future[Any]:
"""Link a pending async call to a token to allow its cancellation"""
with self._lock:
if self._cancelled:
future.cancel()
else:
def _cancel() -> None:
future.cancel()
self._callbacks.append(_cancel)
return future
link_future方法用于将一个异步Future对象与该取消令牌关联起来,以便可以取消该异步调用。- 使用
with self._lock语句确保在检查_cancelled状态和修改_callbacks列表时不会出现竞态条件。 - 如果
_cancelled为True,说明取消操作已经发生,直接取消Future对象;否则,定义一个内部函数_cancel,用于取消Future对象,并将其添加到_callbacks列表中。 - 最后返回
Future对象。
总结
CancellationToken类提供了一种机制,允许用户在需要时取消挂起的异步调用。通过添加回调函数和关联Future对象,当调用cancel方法时,所有与该取消令牌关联的操作都将被取消。同时,使用线程锁确保了在多线程环境下的线程安全。
代码示例
示例 1:基本的取消操作
import asyncio
from typing import Any, Callable, List
import threading
from asyncio import Future
class CancellationToken:
"""A token used to cancel pending async calls"""
def __init__(self) -> None:
self._cancelled: bool = False
self._lock: threading.Lock = threading.Lock()
self._callbacks: List[Callable[[], None]] = []
def cancel(self) -> None:
"""Cancel pending async calls linked to this cancellation token."""
with self._lock:
if not self._cancelled:
self._cancelled = True
for callback in self._callbacks:
callback()
def is_cancelled(self) -> bool:
"""Check if the CancellationToken has been used"""
with self._lock:
return self._cancelled
def add_callback(self, callback: Callable[[], None]) -> None:
"""Attach a callback that will be called when cancel is invoked"""
with self._lock:
if self._cancelled:
callback()
else:
self._callbacks.append(callback)
def link_future(self, future: Future[Any]) -> Future[Any]:
"""Link a pending async call to a token to allow its cancellation"""
with self._lock:
if self._cancelled:
future.cancel()
else:
def _cancel() -> None:
future.cancel()
self._callbacks.append(_cancel)
return future
async def long_running_task():
print("Task started")
await asyncio.sleep(5)
print("Task completed")
async def main():
token = CancellationToken()
task = asyncio.create_task(long_running_task())
token.link_future(task)
# 模拟一段时间后取消任务
await asyncio.sleep(2)
token.cancel()
try:
await task
except asyncio.CancelledError:
print("Task was cancelled")
await main()
Task started
Task was cancelled
示例 2:添加回调函数
def callback_function():
print("Callback function was called")
async def main():
token = CancellationToken()
token.add_callback(callback_function)
# 取消令牌
token.cancel()
await main()
Callback function was called
示例 3:检查令牌是否已取消
async def main():
token = CancellationToken()
print(f"Is cancelled before cancel: {token.is_cancelled()}")
token.cancel()
print(f"Is cancelled after cancel: {token.is_cancelled()}")
await main()
Is cancelled before cancel: False
Is cancelled after cancel: True
火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。
更多推荐
所有评论(0)