# 1.工作流引擎简介

首先了解一下工作流,就是工作流程的计算模型,即将工作流程中的工作如何前后组织在一起的逻辑和规则在计算机中以恰当的模型进行表示并对其实施计算。它主要解决的是“使在多个参与者之间按照某种预定义的规则传递文档、信息或任务的过程自动进行,从而实现某个预期的业务目标,或者促使此目标的实现”,而工作流引擎是指工作流作为应用系统的一部分,并为之提供对各应用系统有决定作用的根据角色、分工和条件的不同决定信息传递路由、内容等级等核心解决方案。我的理解就是其包括流程的节点管理、流向管理等重要功能的建模定义,通过这些模型我们将封装的执行能力进行自由编排从而实现业务建模场景自动化。

经过一番调研(此处调研过程略去,直奔主题),我们最终选定了一款纯python开发工作流引擎——SpiffWorkflow。其亮点如下:API完善且轻量;可塑性强;支持工作流标准建模标准BPMN;支持非BPMN标准。下一章节将重点讲解此工作流引擎框架。

# 2.工作流引擎实战

# 2.1 核心内容

# 2.1.1 SpiffWorkflow 中核心模块及概念理解

我们重点关注SpiffWorkflow.specs.WorkflowSpec和SpiffWorkflow.Workflow、SpiffWorkflow.specs.TaskSpec和SpiffWorkflow.Task实现思路和概念理解。

WorkflowSpec 和TaskSpec是用于定义实际工作流和节点规范的类。

TaskSpec是用于定义实际工作流中节点规范类,工作流中涵盖的各类任务节点都是继承TaskSpec实现的,如结束节点:Cancel。(所谓的规范即是遵照公认或符合业务的模式来规范我们节点的定义,同时我们可以去学习一下流程规范BPMN的概念,这样会更好理解什么叫流程规范),其支持python、xml、json方式定义规范流程。

Workflow是基于WorkflowSpec规范类的组合,即执行工作流的引擎,它本质上是管理所有任务分支树的执行基类,引擎的核心部分。同时,也是保存正在运行的工作流数据的地方。

Task是基于Workflow、TaskSpec规范类的组合,其会基于规范生成流程对应的派生树节点,即构成执行引擎中每个操作步骤节点任务,Workflow也是通过遍历派生树,操作Task状态变更进行步骤执行的。

image.png

注意:组合,即A类有B类的特性,通俗的说就是在一个类中以另一个类的对象作为数据属性。接下来,我们再结合下图理解一下: image.png

# 2.1.2 数据的共享与传递

TaskSpec数据存储在 TaskSpec 对象中。 其数据的改变会影响所有引擎执行组合新对象的数据,可以理解为全局变量。

Task中数据存储可以是当前任务类中,它只会影响到其子路径上的任务节点的数据使用,一定程度上可以理解为局部变量。

# 2.1.3 任务执行的hook机制

在SpiffWorkflow中,TaskSpec中有一套任务执行过程中的hook机制,可以利用此机制在任务执行某些状态下执行我们想要执行的动作。其中有_on_complete_hook(),_predict_hook()等。

_predict_hook():可以利用此hook重写改变future状态下的task,已达到当前派生树任务(Task)载入的数据的目的,也就是我们2.1.2中提到第二点。

_on_complete_hook():可以利用此hook重写任务执行具体内容,比如我们想执行一个自定义的脚本。

# 2.2 进阶内容

# 2.2.1 工作流引擎应用自动化测试的思路

从全文中其他章节我们得知很多自动化工具或平台都是由代码编程堆砌完成,而在这个过程中我们是否可以找到一种途径解决这种大量自研脚本的成本,当然是有的,我们可以将很多单点能封装成原子能力,通过工作流引擎进行自由编排完成各类场景组装,以达到一种低代码模式进行业务测试场景编排的自动化测试。 举个例子来说,我们常见的一种webUI自动化测试框架结构,如下图: image.png

在上图中,我们可以明确对于新增功能或页面,我们还需要持续的封装各类页面、测试套件、测试用例等等才能满足测试需求。但是引入工作流引擎,把操作页面、结果校验、测试报告等能力封装成原子能力之后,通过工作流进行编排,再由工作流引擎进行调度执行即可满足各类测试场景,便可省去大量的底层代码封装时间,而且在此基础上我们还可以拓展行为驱动测试,人工智能化方向测试(基于MDN等)。为了让大家更清晰的理解原子能力在工作流编排中的应用,如下图: image.png

文中介绍到这里,大家可能有以下几个疑问:

  1. 如何进行原子能力抽象封装?
  2. SpiffWorkflow是如何实现流程节点定义的?
  3. 结合SpiffWorkflow如何设计让原子能力调度执行的?

带着以上问题我们继续探索接下来重点内容。

# 2.2.2 工作流节点模型定义

限于本篇重点是工作流引擎,原子能力封装不做深入讲解,通俗点说,原子能力就是人工操作业务行为能力或操作对象抽象分解的封装。我们将重点讲解章节2.2.1中提到的问题——“如何用封装好的原子能力在流程引擎中调度使用?”。 首先我们直观感知一下工作流编排用到什么模型,用圆代表开始节点、结束节点,用矩形代表任务节点。当然流程编排中的节点模型远不止这几个,相信很多同学都用过一些画流程的软件。 其次,我们要理解SpiffWorkflow引擎中是遵从何种协议完成工作流中节点建模的?在SpiffWorkflow中定义工作流节点模型有三种方式(python、xml、json),这里以json数据格式对上图中涉及模型进行讲解:

  • 开始节点
"Start": {
  "class": "SpiffWorkflow.specs.StartTask.StartTask",
  "id" : 1,
  "name":"Start",
  "manual": false,
  "outputs": [
    "customer_define"
  ]
}
1
2
3
4
5
6
7
8
9

解读:在spiffworkflow中,上述json代表一个圆,这个圆是开始节点模型的意思,其实质是spiffworkflow流程的开始,其映射到类就是StartTask,此类是系统内置的基于TaskSpec类派生的。 Start:spiffworkflow中,开始节点名,不能随意改动 outputs:代表输出节点,即流向到工作流中节点customer_define class:是基于taskspec基类派生的开始任务节点,代码中内置的不需要调整修改 注意:json中定义节点的关键词不可随意填写,要根据spiffworkflow规则生成,示例中是基本常用参数,可直接用于定义开始节点。

  • 任务节点
"customer_define": {
  "id" : 2,
  "class": "SpiffWorkflow.specs.CustomerSimple.CustomerSimple",
  "name": "customer_define",
  "data":{},
  "inputs": [
    "Start"
  ],
  "outputs": [
    "end"
  ]
}
1
2
3
4
5
6
7
8
9
10
11
12

解读:在spiffworkflow中,上述json代表一个矩形且是我们作者自定义的 class:是我们自定义的矩形模型规则,继承TaskSpec实现的CustomerSimple name:必填,且要与节点名称一致 data:用户模型的入参定义 inputs:代表输入节点,当前输入节点是Start

  • 结束节点
"end": {
  "id" : 3,
  "class": "SpiffWorkflow.specs.Cancel.Cancel",
  "name": "end",
  "inputs": [
    "customer_define"
  ]
}
1
2
3
4
5
6
7
8

最后我们来看一下完整的工作流json定义,如下:

{
    "task_specs": {
        "Start": {
        "class": "SpiffWorkflow.specs.StartTask.StartTask",
        "id" : 1,
        "name":"Start",
        "manual": false,
        "outputs": [
          "customer_define"
        ]
     },
     "customer_define": {
        "id" : 2,
        "class": "SpiffWorkflow.specs.CustomerSimple.CustomerSimple",
        "name": "customer_define",
        "data":{},
        "inputs": [
          "Start"
        ],
        "outputs": [
          "end"
        ]
      },
     "end": {
        "id" : 3,
        "class": "SpiffWorkflow.specs.Cancel.Cancel",
        "name": "end",
        "inputs": [
          "customer_define"
        ]
      }
    },
    "description": "",
    "file": null,
    "name": "测试工作流"
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36

综上,通俗的说,基于节点模型构建的完整工作流json文件后,我们的工作流引擎才能读懂每个过程具体要干什么事。这里留下一个问题自行探索,我是如何知道json结构体中用到的关键词描述的?源码里由一套脚本生成json的方法,研究清楚很关键。最后,就是原子能力在上述定义的节点模型中该怎么被调度执行?接下来我们将重点讲解。

# 2.2.3 工作流引擎的二次开发设计

首页,我们了解一下整体引擎的基本工作过程,如下图 image.png

那么我们带着2.2.2中最后留下的问题思考一下,在上图中我们该在什么过程进行设计才能满足工作流编排自动化测试的需求? 为实现原子能力在工作流引擎完美工作,我们在引擎调度核心框架进行改造,方案中采用一种常见的工厂模式和python中的反射和动态加载的方式实现,如下图 image.png

#能力工厂核心片段代码
class AtomicDispatchFactory:
    def __init__(self, model_name, class_name, func_name, **kwargs):
        self.atomic_model = __import__('SpiffWorkflow.product.{}'.format(model_name), fromlist=True)
        if hasattr(self.atomic_model, class_name):
            self.atomic_class = getattr(self.atomic_model, class_name)
        self.atomic_class_obj = self.get_atomic_class_obj(class_name, **kwargs)
        if hasattr(self.atomic_class_obj, func_name):
            self.atomic_class_func = getattr(self.atomic_class_obj, func_name)
            
    def get_atomic_class_obj(self, class_name, **kwargs):
        if key in _spam_cache:
            factory_obj = _spam_cache[key]
        elif key not in _spam_cache:
            factory_obj = self.atomic_class(**kwargs)
            _spam_cache[key] = factory_obj
        return factory_obj

    def run(self, *args, **kwargs):
        if args[0] != "" and isinstance(args[0], str):
            time.sleep(int(args[0]))
        result = self.atomic_class_func(**kwargs)
        if args[1] != "" and isinstance(args[0], str):
            time.sleep(int(args[1]))
        return result
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

# 3.总结

本节课我们介绍了工作流引擎在自动化实战中的应用。

更新于: 12/30/2021, 2:46:39 AM