任务的阻塞与执行

前面的文章中在讲到任务模型的时候有提到阻塞任务,那这个在工作流中是如何形成的呢?阻塞任务和任务模型之间又有什么关系呢?下面我们来做个简单的分析。这里还是以最简单的流程为例:

流程定义

流程定义

流程说明

节点名称显示名称模型类
start开始StartModel
apply请假申请TaskModel
approveDept部门领导审批TaskModel
end结束EndModel

流程执行分析

  • 发起一条流程,即拿到StartModel节点,调用其执行方法,startModel.execute(...)
  1. 此时因为输出边的目标节点为TaskModel,所以需要生成一条阻塞任务(请假申请),注意,这里只是生成任务,并不执行。
  2. 假设生成的任务存放在如下任务表内:
    任务ID任务名称显示名称任务状态
    1apply请假申请进行中
  • 阻塞任务一般有两个状态:进行中和已完成
  • 完成一个任务(请假申请)需要进行如下处理
  1. 找到该任务ID对应的记录
  2. 将任务状态由进行中修改为已完成
  3. 将该任务对应的流程定义文件转成流程模型
  4. 从流程模型中找到任务名称相同的任务模型,即名称为上图的请假申请节点
  5. 调用该任务模型的执行方法,applyTaskModel.execute(...)
  6. 此时,因为输出边的目标节点依然为TaskModel,所以依然会生成阻塞任务(部门领导审批)

此时任务表如下:

任务ID任务名称显示名称任务状态
1apply请假申请已完成
2approveDept部门领导审批进行中
  • 重复上一步完成一个任务(部门领导审批)
  1. 找到该任务ID对应的记录
  2. 将任务状态由进行中修改为已完成
  3. 将该任务对应的流程定义文件转成流程模型
  4. 从流程模型中找到任务名称相同的任务模型,即名称为上图的部门领导审批节点
  5. 调用该任务模型的执行方法,approveDeptTaskModel.execute(...)
  6. 此时,因为输出边的目标节点为结束节点,流程结束

最终流程结束,任务表如下:

任务ID任务名称显示名称任务状态
1apply请假申请已完成
2approveDept部门领导审批已完成

从上面的流程分析中我们可以得出如下说明:

  • 阻塞任务即某个用户要做的事,会记录在任务表上
  • 阻塞任务关联某个流程模型的任务模型节点,当完成任务时,需从关联的流程模型中找到其任务模型节点,并调用节点执行方法

类图

为了通用性,我们可以按如下方式来组织代码: 类图

代码实现

新增流程各模型操作处理接口handlers/IHandler.java,主要用于对节点执行方法的扩充。

package com.mldong.flow.engine.handlers;


import com.mldong.flow.engine.core.Execution;

/**
 * 流程各模型操作处理接口
 * @author mldong
 * @date 2023/5/17
 */
public interface IHandler {
    /**
     * 子类需要实现的方法,来处理具体的操作
     * @param execution 执行对象
     */
    void handle(Execution execution);
}

基础模型类中增加调用方法fire,方便子类调用models/BaseModel.java

package com.mldong.flow.engine.model;

import com.mldong.flow.engine.core.Execution;
import com.mldong.flow.engine.handlers.IHandler;
import lombok.Data;
/**
 *
 * 模型基类
 * @author mldong
 * @date 2023/4/25
 */
@Data
public class BaseModel {
    private String name; // 唯一编码
    private String displayName; // 显示名称
    /**
     * 将执行对象execution交给具体的处理器处理
     * @param handler
     * @param execution
     */
    protected void fire(IHandler handler, Execution execution) {
        handler.handle(execution);
    }
}

增加任务实体类entity/ProcessTask.java,暂时只定义几个必要字段,后面讲到数据库设计的时候,再完善字段。

package com.mldong.flow.engine.entity;

import lombok.Data;

import java.io.Serializable;

/**
 *
 * 流程任务
 * @author mldong
 * @date 2023/5/17
 */
@Data
public class ProcessTask implements Serializable {
    private Long id; // 主键
    private String taskName; // 任务名称
    private String displayName; // 任务显示名称
    private Integer taskState; // 任务状态
}

对应的任务状态枚举类enums/TaskStateEnum.java

package com.mldong.flow.engine.enums;
/**
 *
 * 任务状态枚举
 * @author mldong
 * @date 2023/5/17
 */
public enum TaskStateEnum {
    DOING(10,"进行中"),
    FINISHED(20, "已完成"),
    ;
    private final Integer code;

    private final String message;


    TaskStateEnum(Integer code, String message) {
        this.code = code;
        this.message = message;
    }

    public Integer getCode() {
        return code;
    }

    public String getMessage() {
        return message;
    }
}

流程执行参数core/Execution.java增加

  • 当前流程模型属性
  • 当前流程任务属性
  • 流程任务列表属性
  • 添加任务到任务集合方法
  • 获取正在进行中的任务列表方法
package com.mldong.flow.engine.core;

import cn.hutool.core.lang.Dict;
import com.mldong.flow.engine.entity.ProcessTask;
import com.mldong.flow.engine.enums.TaskStateEnum;
import com.mldong.flow.engine.model.ProcessModel;
import lombok.Data;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

/**
 *
 * 执行对象参数
 * @author mldong
 * @date 2023/4/25
 */
@Data
public class Execution {
    // 流程实例ID
    private String processInstanceId;
    // 当前流程任务ID
    private String processTaskId;
    // 执行对象扩展参数
    private Dict args;
    // 当前流程模型
    private ProcessModel processModel;
    // 当前任务
    private ProcessTask processTask;
    // 所有任务集合
    private List<ProcessTask> processTaskList = new ArrayList<>();

    /**
     * 添加任务到任务集合
     * @param processTask
     */
    public void addTask(ProcessTask processTask) {
        this.processTaskList.add(processTask);
    }

    /**
     * 获取正在进行中的任务列表
     * @return
     */
    public List<ProcessTask> getDoingTaskList() {
        return this.processTaskList.stream().filter(item->{
            return TaskStateEnum.DOING.getCode().equals(item.getTaskState());
        }).collect(Collectors.toList());
    }
 }

流程模型类model/ProcesssModel.java增加获取process定义的指定节点名称的节点模型方法

package com.mldong.flow.engine.model;

import lombok.Data;

import java.util.ArrayList;
import java.util.List;
/**
 *
 * 流程模型
 * @author mldong
 * @date 2023/4/25
 */
@Data
public class ProcessModel extends BaseModel {
    private String type; // 流程定义分类
    private String instanceUrl; // 启动实例要填写的表单key
    private String expireTime; // 期待完成时间变量key
    private String instanceNoClass; // 实例编号生成器实现类
    // 流程定义的所有节点
    private List<NodeModel> nodes = new ArrayList<NodeModel>();
    // 流程定义的所有任务节点
    private List<TaskModel> tasks = new ArrayList<TaskModel>();

    /**
     * 获取开始节点
     * @return
     */
    public StartModel getStart() {
        StartModel startModel = null;
        for (int i = 0; i < nodes.size(); i++) {
            NodeModel nodeModel = nodes.get(i);
            if(nodeModel instanceof StartModel) {
                startModel = (StartModel) nodeModel;
                break;
            }
        }
        return startModel;
    }
    /**
     * 获取process定义的指定节点名称的节点模型
     * @param nodeName 节点名称
     * @return
     */
    public NodeModel getNode(String nodeName) {
        for(NodeModel node : nodes) {
            if(node.getName().equals(nodeName)) {
                return node;
            }
        }
        return null;
    }
}

新增创建任务处理器类handlers/impl/CreateTaskHandler.java实现IHandler接口,主要用于创建流程任务,创建的流程任务追加到执行对象的任务列表中,将将当前任务绑定到执行参数上。

package com.mldong.flow.engine.handlers.impl;


import cn.hutool.core.util.RandomUtil;
import com.mldong.flow.engine.core.Execution;
import com.mldong.flow.engine.entity.ProcessTask;
import com.mldong.flow.engine.enums.TaskStateEnum;
import com.mldong.flow.engine.handlers.IHandler;
import com.mldong.flow.engine.model.TaskModel;

/**
 * 创建任务处理器
 * @author mldong
 * @date 2023/5/16
 */
public class CreateTaskHandler implements IHandler {
    private TaskModel taskModel;
    public CreateTaskHandler(TaskModel taskModel) {
        this.taskModel = taskModel;
    }
    @Override
    public void handle(Execution execution) {
        ProcessTask processTask = new ProcessTask();
        processTask.setId(RandomUtil.randomLong());
        processTask.setTaskName(this.taskModel.getName());
        processTask.setDisplayName(this.taskModel.getDisplayName());
        processTask.setTaskState(TaskStateEnum.DOING.getCode());
        execution.setProcessTask(processTask);
        System.out.println("创建任务:"+processTask.getTaskName()+","+processTask.getDisplayName());
        execution.addTask(processTask);
    }
}

调整model/TransitionModel.java方法,当目标节点为任务模型时,调用创建任务处理器,创建对应任务。 对于任务模型节点,执行周期本质上为:生成对应的阻塞任务->完成阻塞任务->调用该任务模型节点的执行方法

package com.mldong.flow.engine.model;

import com.mldong.flow.engine.Action;
import com.mldong.flow.engine.core.Execution;
import com.mldong.flow.engine.handlers.impl.CreateTaskHandler;
import lombok.Data;
/**
 *
 * 边模型
 * @author mldong
 * @date 2023/4/25
 */
@Data
public class TransitionModel extends BaseModel implements Action {
    private NodeModel source; // 边源节点引用
    private NodeModel target; // 边目标节点引用
    private String to; // 目标节点名称
    private String expr; // 边表达式
    private String g; // 边点坐标集合(x1,y1;x2,y2,x3,y3……)开始、拐角、结束
    private boolean enabled; // 是否可执行
    @Override
    public void execute(Execution execution) {
        if(!enabled) return;
        if(target instanceof TaskModel) {
            // 创建阻塞任务
            fire(new CreateTaskHandler((TaskModel) target),execution);
        } else {
            target.execute(execution);
        }
    }
}

增加一下模拟执行任务全流程的类MockExecuteTask.java

  • 循环判断是否存在正在进行中的任务
  • 如果存在,取第一个未完成的任务
  • 将任务状态由进行中修改为已完成
  • 从流程模型中找到任务名称相同的任务模型
  • 调用节点模型执行方法

当然,为了模拟任务的阻塞性,我们增加了休眠方法,休眠1sThreadUtil.safeSleep(1000)再执行任务

package com.mldong.flow.engine;

import cn.hutool.core.thread.ThreadUtil;
import com.mldong.flow.engine.core.Execution;
import com.mldong.flow.engine.entity.ProcessTask;
import com.mldong.flow.engine.model.NodeModel;

import java.util.List;

/**
 *
 * 模拟执行任务全流程
 * @author mldong
 * @date 2023/5/17
 */
public class MockExecuteTask {
    private Execution execution;
    public MockExecuteTask(Execution execution) {
        this.execution = execution;
    }
    public void run() {
        while (!execution.getDoingTaskList().isEmpty()) {
            // 休眠1s
            ThreadUtil.safeSleep(1000);
            // 找到未完成的任务
            List<ProcessTask> doingTaskList = execution.getDoingTaskList();
            // 取第一个未完成的任务
            ProcessTask processTask = doingTaskList.get(0);
            // 将任务状态由进行中修改为已完成
            processTask.setTaskState(20);
            System.out.println("设置任务状态为已完成:"+processTask.getTaskName()+","+processTask.getDisplayName());
            execution.setProcessTask(processTask);
            // 从流程模型中找到任务名称相同的任务模型
            NodeModel nodeModel = execution.getProcessModel().getNode(processTask.getTaskName());
            // 调用节点模型执行方法
            nodeModel.execute(execution);
        }
    }
}

调整单元测试类ExecuteTest.java

package com.mldong.flow;

import cn.hutool.core.io.IoUtil;
import cn.hutool.core.lang.Dict;
import com.mldong.flow.engine.MockExecuteTask;
import com.mldong.flow.engine.cfg.Configuration;
import com.mldong.flow.engine.core.Execution;
import com.mldong.flow.engine.model.ProcessModel;
import com.mldong.flow.engine.parser.ModelParser;
import org.junit.Test;
/**
 *
 * 执行测试
 * @author mldong
 * @date 2023/5/1
 */
public class ExecuteTest {
    @Test
    public void executeLeave_01() {
        new Configuration();
        // 将流程定义文件解析成流程模型
        ProcessModel processModel = ModelParser.parse(IoUtil.readBytes(this.getClass().getResourceAsStream("/leave.json")));
        // 构造执行参数
        Execution execution = new Execution();
        // 设置当前流程模型
        execution.setProcessModel(processModel);
        // 设置扩展属性
        execution.setArgs(Dict.create());
        // 拿到开始节点调用执行方法
        processModel.getStart().execute(execution);
        // 执行模拟执行任务方法
        new MockExecuteTask(execution).run();
    }
}

最终执行结果:

调用模型节点执行方法:model:StartModel,name:start,displayName:开始
创建任务:apply,请假申请
设置任务状态为已完成:apply,请假申请
调用模型节点执行方法:model:TaskModel,name:apply,displayName:请假申请
创建任务:approveDept,部门领导审批
设置任务状态为已完成:approveDept,部门领导审批
调用模型节点执行方法:model:TaskModel,name:approveDept,displayName:部门领导审批
调用模型节点执行方法:model:EndModel,name:end,displayName:结束

小结

本章节主要阐述了任务的阻塞与执行,其本质上就是阻塞任务和任务模型节点之间的关系。在工作流里面,阻塞任务即我们个人办公里面的我的待办/已办任务,进行中任务则为待办任务,已完成任务则为已办任务。当我们对进行中的任务进行处理时,就是修改进行中的任务状态为已完成,并调用任务绑定的任务模型节点执行方法以驱动流程往下一节点进行。这里为了方便,使用MockExecuteTask去模拟该过程,后续我们做到数据库设计时再进一步完善。

相关源码

mldong-flow-demo-05open in new window

全栈工程师带你从0~1设计实现轻量级工作流引擎