处理器

在 Nornir 中 处理器(Processors) 是一种可以通过自定义代码处理某些事件的插件,它就是一个可以处理任务的装饰器,它在不改变任务结果的前提下,让用户可以自己编写代码对任务结果进行加工,为处理任务提供了更多的扩展性。它有一些优点:

  1. 由于处理器是基于事件(event-based)的,所以可以异步处理事件,例如在某台主机完成任务后马上处理该主机的结果,不用需等待其它主机完成任务。

  2. 基于事件编写的代码更简洁,更容易理解。

来通过几个例子来看看处理器是如何工作的,先初始化一个 nornir 对象:

[1]:
from typing import Dict
from nornir import InitNornir

nr = InitNornir(config_file="files/config.yaml")

编写一个处理器,它的作用是打印一些有关任务执行的信息:

[2]:
from nornir.core import Nornir
from nornir.core.inventory import Host
from nornir.core.task import Task, AggregatedResult, MultiResult, Result, Task

class PrintResult:
    # 任务开始运行时执行的动作
    def task_started(self, task: Task) -> None:
        print(f" 任务[{task.name}] 开始执行 ".center(79, "="))

    # 任务运行结束后执行的动作
    def task_completed(self, task: Task, result: AggregatedResult) -> None:
        print(f" 任务[{task.name}]执行结束 ".center(79, "="))

    # 任务分配给单台主机运行时执行的动作
    def task_instance_started(self, task: Task, host: Host) -> None:
        print(f"任务[{task.name}]分配给主机[{host.name}]开始执行.\n")

    # 任务分配给单台主机运行完成后执行的动作
    def task_instance_completed(
        self, task: Task, host: Host, result: MultiResult
    ) -> None:
        print(f"任务[{task.name}]分配给主机[{host.name}]执行完成,执行结果:{result.result} \n")

    # 子任务开始运行时执行的动作
    def subtask_instance_started(self, task: Task, host: Host) -> None:
        pass

    # 子任务结束运行时执行的动作
    def subtask_instance_completed(
        self, task: Task, host: Host, result: MultiResult
    ) -> None:
        pass

编写一个简单的任务,让自定义的处理器 PrintResult 来处理结果:

[3]:
def greeter(task: Task, greet: str) -> Result:
    return Result(
        host=task.host,
        result=f"{greet}! My name is {task.host.name}!"
    )

要使用自定义的处理器,需要用到 nornir 对象的 with_processors 方法,这个方法需要传递一个 Processer 的列表对象 Processers,然后返回一个带有 Processers 的 nornir 对象:

[4]:
# 为了保持简洁,这里使用过滤器过滤所有角色是 `spine` 的主机来执行任务
nr = nr.filter(role="spine")
nr_with_processors = nr.with_processors([PrintResult()])

nr_with_processors.run(
    task=greeter,
    greet="Hi",
    name="Hi",
)

nr_with_processors.run(
    task=greeter,
    greet="Bye",
    name="Bye",
)
================================= 任务[Hi] 开始执行 =================================
任务[Hi]分配给主机[spine00.bj]开始执行.

任务[Hi]分配给主机[spine00.bj]执行完成,执行结果:Hi! My name is spine00.bj!

任务[Hi]分配给主机[spine01.bj]开始执行.

任务[Hi]分配给主机[spine01.gz]开始执行.
任务[Hi]分配给主机[spine01.bj]执行完成,执行结果:Hi! My name is spine01.bj!


任务[Hi]分配给主机[spine01.gz]执行完成,执行结果:Hi! My name is spine01.gz!

================================== 任务[Hi]执行结束 =================================
================================= 任务[Bye] 开始执行 ================================
任务[Bye]分配给主机[spine00.bj]开始执行.
任务[Bye]分配给主机[spine01.bj]开始执行.

任务[Bye]分配给主机[spine01.bj]执行完成,执行结果:Bye! My name is spine01.bj!


任务[Bye]分配给主机[spine01.gz]开始执行.

任务[Bye]分配给主机[spine01.gz]执行完成,执行结果:Bye! My name is spine01.gz!

任务[Bye]分配给主机[spine00.bj]执行完成,执行结果:Bye! My name is spine00.bj!

================================= 任务[Bye]执行结束 =================================
[4]:
AggregatedResult (Bye): {'spine00.bj': MultiResult: [Result: "Bye"], 'spine01.bj': MultiResult: [Result: "Bye"], 'spine01.gz': MultiResult: [Result: "Bye"]}

可以看到,任务执行完成后,它的过程都被打印出来了,这是由自定义的处理器 PrintResult 来完成的。

打印结果是无序的,因为默认情况下 nornir 的任务是多线程异步执行的。

前面说到 with_processors 方法需要传递一个 Processers 对象,这个对象是由 Processer 组成的列表。

现在来再定义一个处理器,它的任务是将任务的信息保存在字典中。

[5]:
class SaveResultToDict:
    def __init__(self, data: Dict[str, None]) -> None:
        self.data = data

    def task_started(self, task: Task) -> None:
        self.data[task.name] = {}
        self.data[task.name]["started"] = True
        print(f"任务开始信息已经保存到 {self.data.keys()}!")

    def task_completed(self, task: Task, result: AggregatedResult) -> None:
        self.data[task.name]["completed"] = True
        print(f"任务完成信息已经保存到 {self.data.keys()}!")

    def task_instance_started(self, task: Task, host: Host) -> None:
        self.data[task.name][host.name] = {"started": True}
        print(f"主机[{host.name}]任务开始信息已经保存到 {self.data.keys()}!")

    def task_instance_completed(
        self, task: Task, host: Host, result: MultiResult
    ) -> None:
        self.data[task.name][host.name] = {
            "completed": True,
            "result": result.result,
        }
        print(f"主机[{host.name}]任务完成信息已经保存到 {self.data.keys()}!")

    def subtask_instance_started(self, task: Task, host: Host) -> None:
        pass

    def subtask_instance_completed(
        self, task: Task, host: Host, result: MultiResult
    ) -> None:
        pass

现在来再次执行任务 greeter,这次使用两个处理器 SaveResultToDictPrintResult 来对任务进行处理:

[6]:
data = {}

nr_with_processors = nr.with_processors([PrintResult(),SaveResultToDict(data)])

nr_with_processors.run(
    task=greeter,
    greet="Hi",
    name="Hi",
)

nr_with_processors.run(
    task=greeter,
    greet="Bye",
    name="Bye",
)
================================= 任务[Hi] 开始执行 =================================
任务开始信息已经保存到 dict_keys(['Hi'])!
任务[Hi]分配给主机[spine00.bj]开始执行.

主机[spine00.bj]任务开始信息已经保存到 dict_keys(['Hi'])!任务[Hi]分配给主机[spine01.bj]开始执行.


任务[Hi]分配给主机[spine00.bj]执行完成,执行结果:Hi! My name is spine00.bj!

主机[spine00.bj]任务完成信息已经保存到 dict_keys(['Hi'])!
任务[Hi]分配给主机[spine01.gz]开始执行.

主机[spine01.gz]任务开始信息已经保存到 dict_keys(['Hi'])!
任务[Hi]分配给主机[spine01.gz]执行完成,执行结果:Hi! My name is spine01.gz!

主机[spine01.gz]任务完成信息已经保存到 dict_keys(['Hi'])!
主机[spine01.bj]任务开始信息已经保存到 dict_keys(['Hi'])!
任务[Hi]分配给主机[spine01.bj]执行完成,执行结果:Hi! My name is spine01.bj!

主机[spine01.bj]任务完成信息已经保存到 dict_keys(['Hi'])!
================================== 任务[Hi]执行结束 =================================
任务完成信息已经保存到 dict_keys(['Hi'])!
================================= 任务[Bye] 开始执行 ================================
任务开始信息已经保存到 dict_keys(['Hi', 'Bye'])!
任务[Bye]分配给主机[spine00.bj]开始执行.

主机[spine00.bj]任务开始信息已经保存到 dict_keys(['Hi', 'Bye'])!
任务[Bye]分配给主机[spine00.bj]执行完成,执行结果:Bye! My name is spine00.bj!

主机[spine00.bj]任务完成信息已经保存到 dict_keys(['Hi', 'Bye'])!
任务[Bye]分配给主机[spine01.bj]开始执行.
任务[Bye]分配给主机[spine01.gz]开始执行.

主机[spine01.gz]任务开始信息已经保存到 dict_keys(['Hi', 'Bye'])!
任务[Bye]分配给主机[spine01.gz]执行完成,执行结果:Bye! My name is spine01.gz!

主机[spine01.gz]任务完成信息已经保存到 dict_keys(['Hi', 'Bye'])!

主机[spine01.bj]任务开始信息已经保存到 dict_keys(['Hi', 'Bye'])!
任务[Bye]分配给主机[spine01.bj]执行完成,执行结果:Bye! My name is spine01.bj!

主机[spine01.bj]任务完成信息已经保存到 dict_keys(['Hi', 'Bye'])!
================================= 任务[Bye]执行结束 =================================
任务完成信息已经保存到 dict_keys(['Hi', 'Bye'])!
[6]:
AggregatedResult (Bye): {'spine00.bj': MultiResult: [Result: "Bye"], 'spine01.bj': MultiResult: [Result: "Bye"], 'spine01.gz': MultiResult: [Result: "Bye"]}

任务已经成功执行,并且两个处理器都按照预期进行工作,任务执行的最后也打印出了最后的结果: AggregatedResult 对象,事实上如果处理器里面已经对结果进行除了,这个对象也不需要再给它赋值然后再使用 print_result 打印出来了。

这里注意一点,因为 Processers 是一个列表,所以它里面 Processer 的执行顺序是按照列表的顺序来运行的。

接下来看一下处理器 SaveResultToDictdata 做的操作:

[7]:
import json
print(json.dumps(data, indent=4))
{
    "Hi": {
        "started": true,
        "spine00.bj": {
            "completed": true,
            "result": "Hi! My name is spine00.bj!"
        },
        "spine01.gz": {
            "completed": true,
            "result": "Hi! My name is spine01.gz!"
        },
        "spine01.bj": {
            "completed": true,
            "result": "Hi! My name is spine01.bj!"
        },
        "completed": true
    },
    "Bye": {
        "started": true,
        "spine00.bj": {
            "completed": true,
            "result": "Bye! My name is spine00.bj!"
        },
        "spine01.gz": {
            "completed": true,
            "result": "Bye! My name is spine01.gz!"
        },
        "spine01.bj": {
            "completed": true,
            "result": "Bye! My name is spine01.bj!"
        },
        "completed": true
    }
}

通过以上两个示例,可以看到 处理器(Processers) 处理器的强大功能,通过它来操作处理任务结果更加简单,也无需通过 print_result 来查看任务结果。

一些想法

借助处理器还可以做哪些其他事情?

  1. 将任务执行事件发送到 slack/IRC/logging_system

  2. 让使用者可以关注到正在的执行的任务情况而无需等待所有主机的任务执行完成(尤其是当设备数量很多时)

  3. 如果某些任务失败,及时通知/发出警报

  4. 根据业务场景尽情发挥吧!

Nornir 基础教程到这里就结束了,如果想要更加深入的了解,请继续阅读进阶部分。