• 钩子函数调用的举例

Pytest插件注册流程介绍了Pytest中插件是怎么被注册的,那么对于注册好的插件,其实现的钩子函数是怎么被调用的呢?我们在源码常看到形如hook.pytest_*的调用,比如下面的代码:

self.config.hook.pytest_collection_modifyitems(
     session=self, config=self.config, items=items
)

这就是钩子函数pytest_collection_modifyitems的调用方式,它会执行所有插件实现了的pytest_collection_modifyitems方法,下面我详细介绍下它的调用原理。

首先看这里的config是什么?《Pytest插件注册流程》一文中介绍了,config对象是在src/_pytest/config/__init__.py文件中的get_config()方法里创建的:

config = Config(
        pluginmanager,
        invocation_params=Config.InvocationParams(
            args=args or (),
            plugins=plugins,
            dir=pathlib.Path.cwd(),
        ),
    )

这里的hook是它的一个成员变量:

self.hook: pluggy.HookRelay = PathAwareHookProxy(self.pluginmanager.hook) 

阅读源码可以知道,config的hook对象实际指向的是pluginmanager对象的hook变量。通过《Pytest插件注册流程》我们知道,所有的钩子函数都与一个HookCaller一对一绑定在一起,绑定过程在PytestPluginManager的__init__函数中做的:

@final
class PytestPluginManager(PluginManager):
    
    def __init__(self) -> None:
        ...

        self.add_hookspecs(_pytest.hookspec)

在add_hookspecs()方法中有:

hc: HookCaller | None = getattr(self.hook, name, None)
if hc is None:
    hc = HookCaller(name, self._hookexec, module_or_class, spec_opts)
    setattr(self.hook, name, hc)

这里将函数名为name的钩子函数设置为self.hook的属性,并赋值为hc,这是个对象。钩子函数pytest_collection_modifyitems就是这样与一个HookCaller对象绑定的。然后我们看self.hook.pytest_collection_modifyitems调用会发生什么?self.hook是个PathAwareHookProxy对象但它没有pytest_collection_modifyitems属性,没关系,这时会进入PathAwareHookProxy实例的__getattr__方法里寻找它:

def __getattr__(self, key: str) -> pluggy.HookCaller:
    hook: pluggy.HookCaller = getattr(self._hook_relay, key)
    if key not in imply_paths_hooks:
        self.__dict__[key] = hook
        return hook

前面代码统过settattr()方法将钩子函数名设置为pluginmanager的hook对象的属性,并赋一个HookCaller对象为它的值,这里通过getattr()方法就将这个HookCaller对象取出来了,代码中的key就是钩子函数名。所以,这里self.hook.pytest_collection_modifyitems就会获得一个HookCaller对象,接着将这个对象作为方法来调用:self.hook.pytest_collection_modifyitems(...),这就会进入HookCaller的__call__方法,源码在pluggy/_hooks.py文件中:

  • HookCaller的__call__方法
class HookCaller:
    def __init__(
        self,
        name: str,
        hook_execute: _HookExec,
        specmodule_or_class: _Namespace | None = None,
        spec_opts: HookspecOpts | None = None,
    ) -> None:
        """:meta private:"""
        #: Name of the hook getting called.
        self.name: Final = name
        self._hookexec: Final = hook_execute
        ...

    def __call__(self, **kwargs: object) -> Any:
        assert (
            not self.is_historic()
        ), "Cannot directly call a historic hook - use call_historic instead."
        self._verify_all_args_are_provided(kwargs)
        firstresult = self.spec.opts.get("firstresult", False) if self.spec else False
        # Copy because plugins may register other plugins during iteration (#438).
        return self._hookexec(self.name, self._hookimpls.copy(), kwargs, firstresult)

__call__最后调用了self._hookexec实例,而self._hookexec实例值是在构造HookCaller对象时传进来的,它的实现是PluginManager的_hookexec()方法:

  • PluginManager的_hookexec方法
def _hookexec(
    self,
    hook_name: str,
    methods: Sequence[HookImpl],
    kwargs: Mapping[str, object],
    firstresult: bool,
) -> object | list[object]:
    # called from all hookcaller instances.
    # enable_tracing will set its own wrapping function at self._inner_hookexec
    return self._inner_hookexec(hook_name, methods, kwargs, firstresult)

最后又调用了它的self._inner_hookexec成员变量:

def __init__(self, project_name: str) -> None:
    ...
    self._inner_hookexec = _multicall
  • _multicall的实现
def _multicall(
    hook_name: str,
    hook_impls: Sequence[HookImpl],
    caller_kwargs: Mapping[str, object],
    firstresult: bool,
) -> object | list[object]:
    """Execute a call into multiple python functions/methods and return the
    result(s).

    ``caller_kwargs`` comes from HookCaller.__call__().
    """
    __tracebackhide__ = True
    results: list[object] = []
    exception = None
    only_new_style_wrappers = True
    try:  # run impl and wrapper setup functions in a loop
        teardowns: list[Teardown] = []
        try:
            for hook_impl in reversed(hook_impls):
                try:
                    args = [caller_kwargs[argname] for argname in hook_impl.argnames]
                except KeyError:
                    for argname in hook_impl.argnames:
                        if argname not in caller_kwargs:
                            raise HookCallError(
                                f"hook call must provide argument {argname!r}"
                            )

                if hook_impl.hookwrapper:
                    only_new_style_wrappers = False
                    try:
                        # If this cast is not valid, a type error is raised below,
                        # which is the desired response.
                        res = hook_impl.function(*args)  #hookwrapper值为True,创建的就是个生成器
                        wrapper_gen = cast(Generator[None, Result[object], None], res)
                        next(wrapper_gen)  # 将生成器移动到第一个yield
                        teardowns.append((wrapper_gen, hook_impl))
                    except StopIteration:
                        _raise_wrapfail(wrapper_gen, "did not yield")
                elif hook_impl.wrapper:
                    try:
                        # If this cast is not valid, a type error is raised below,
                        # which is the desired response.
                        res = hook_impl.function(*args)
                        function_gen = cast(Generator[None, object, object], res)
                        next(function_gen)  # first yield
                        teardowns.append(function_gen)
                    except StopIteration:
                        _raise_wrapfail(function_gen, "did not yield")
                else:
                    res = hook_impl.function(*args) #直接执行钩子函数
                    if res is not None:
                        results.append(res)
                        if firstresult:  # halt further impl calls
                            break
        except BaseException as exc:
            exception = exc
    finally:
        # Fast path - only new-style wrappers, no Result.
        if only_new_style_wrappers:
            if firstresult:  # first result hooks return a single value
                result = results[0] if results else None
            else:
                result = results

            # run all wrapper post-yield blocks
            for teardown in reversed(teardowns):
                try:
                    if exception is not None:
                        teardown.throw(exception)  # type: ignore[union-attr]
                    else:
                        #执行完yield后的语句
                        teardown.send(result)  # type: ignore[union-attr]
                    # Following is unreachable for a well behaved hook wrapper.
                    # Try to force finalizers otherwise postponed till GC action.
                    # Note: close() may raise if generator handles GeneratorExit.
                    teardown.close()  # type: ignore[union-attr]
                except StopIteration as si:
                    result = si.value
                    exception = None
                    continue
                except BaseException as e:
                    exception = e
                    continue
                _raise_wrapfail(teardown, "has second yield")  # type: ignore[arg-type]

            if exception is not None:
                raise exception.with_traceback(exception.__traceback__)
            else:
                return result

        # Slow path - need to support old-style wrappers.
        else:
            if firstresult:  # first result hooks return a single value
                outcome: Result[object | list[object]] = Result(
                    results[0] if results else None, exception
                )
            else:
                outcome = Result(results, exception)

            # run all wrapper post-yield blocks
            for teardown in reversed(teardowns):
                if isinstance(teardown, tuple):
                    try:
                        teardown[0].send(outcome)
                    except StopIteration:
                        pass
                    except BaseException as e:
                        _warn_teardown_exception(hook_name, teardown[1], e)
                        raise
                    else:
                        _raise_wrapfail(teardown[0], "has second yield")
                else:
                    try:
                        if outcome._exception is not None:
                            teardown.throw(outcome._exception)
                        else:
                            teardown.send(outcome._result)
                        # Following is unreachable for a well behaved hook wrapper.
                        # Try to force finalizers otherwise postponed till GC action.
                        # Note: close() may raise if generator handles GeneratorExit.
                        teardown.close()
                    except StopIteration as si:
                        outcome.force_result(si.value)
                        continue
                    except BaseException as e:
                        outcome.force_exception(e)
                        continue
                    _raise_wrapfail(teardown, "has second yield")

            return outcome.get_result()

在multicall函数里最终调用了插件里实现的钩子函数。

Logo

火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。

更多推荐