paint-brush
Dolphinscheduler DAG Core Source Code Analysisby@williamguo
155 reads

Dolphinscheduler DAG Core Source Code Analysis

by William Guo8mDecember 7th, 2024
Read on Terminal Reader
Read this story w/o Javascript
tldt arrow

Too Long; Didn't Read

In Dolphinscheduler, offline tasks have a complete lifecycle, such as stopping, pausing, resuming from pause, rerunning, etc., all are organized in the form of DAG (Directed Acyclic Graph) for T+1 offline tasks.
featured image - Dolphinscheduler DAG Core Source Code Analysis
William Guo HackerNoon profile picture


Background

org.apache.dolphinscheduler.common.graph.DAG



Note: In Dolphinscheduler, offline tasks have a complete lifecycle, such as stopping, pausing, resuming from pause, rerunning, etc., all are organized in the form of DAG (Directed Acyclic Graph) for T+1 offline tasks.

Dolphinscheduler DAG Implementation

org.apache.dolphinscheduler.common.graph.DAG

Three important data structures of DAG:

// Vertex information
private final Map<Node, NodeInfo> nodesMap;

// Edge association information, which records the relationship between vertices and edges, allowing to find leaf nodes and downstream nodes
private final Map<Node, Map<Node, EdgeInfo>> edgesMap;

// Reverse edge association information, which allows for quick finding of nodes with an in-degree of 0 (starting nodes), and also to obtain upstream nodes
private final Map<Node, Map<Node, EdgeInfo>> reverseEdgesMap;


Example below:

DAG<String, String, String> graph = new DAG<>();
graph.addNode("A", "A");
graph.addNode("B", "B");
graph.addNode("C", "C");

// Add an edge from B to C, A is still floating
graph.addEdge("B", "C");

// If you add A -> B, it actually starts from B and checks if there is a connectable line to A. If there is, it means the A -> B edge cannot be added because it would form a cycle; otherwise, it can be added.
graph.addEdge("A", "B");


Source code analysis: org.apache.dolphinscheduler.common.graph.DAG#addEdge

public boolean addEdge(Node fromNode, Node toNode, EdgeInfo edge, boolean createNode) {
    lock.writeLock().lock();

    try {
        // TODO Whether the edge can be added
        if (!isLegalAddEdge(fromNode, toNode, createNode)) {
            log.error("serious error: add edge({} -> {}) is invalid, cause cycle!", fromNode, toNode);
            return false;
        }

        // TODO Add nodes
        addNodeIfAbsent(fromNode, null);
        addNodeIfAbsent(toNode, null);

        // TODO Add edges
        addEdge(fromNode, toNode, edge, edgesMap);
        addEdge(toNode, fromNode, edge, reverseEdgesMap);

        return true;
    } finally {
        lock.writeLock().unlock();
    }
}

private boolean isLegalAddEdge(Node fromNode, Node toNode, boolean createNode) {
    // TODO If fromNode and toNode are the same vertex, this edge cannot be added
    if (fromNode.equals(toNode)) {
        log.error("edge fromNode({}) can't equals toNode({})", fromNode, toNode);
        return false;
    }

    // TODO If not creating a node, meaning fromNode and toNode must be existing vertices
    if (!createNode) {
        if (!containsNode(fromNode) || !containsNode(toNode)) {
            log.error("edge fromNode({}) or toNode({}) is not in vertices map", fromNode, toNode);
            return false;
        }
    }

    // Whether an edge can be successfully added(fromNode -> toNode), need to determine whether the
    // DAG has a cycle!
    // TODO Get the number of nodes
    int verticesCount = getNodesCount();

    Queue<Node> queue = new LinkedList<>();

    // TODO Put toNode into the queue
    queue.add(toNode);

    // If DAG doesn't find fromNode, it's not a cycle!
    // TODO When the queue is not empty, it is definitely not empty here
    while (!queue.isEmpty() && (--verticesCount > 0)) {
        // TODO Get the element in the queue
        Node key = queue.poll();

        for (Node subsequentNode : getSubsequentNodes(key)) {
            // TODO Actually, it is judged that if A -> B has a connection in the DAG, and the node B is passed in, to see if B's edge has A. If there is A, it means there is already a B -> A association, and it cannot be added. If, for example, B's downstream node is A -> B -> C, then B's downstream node is C, and C needs to be put into the queue
            // TODO The core idea is to find the connection of the target node to be added, whether there is a connection from the target node to the source node (to judge whether there is a cycle)
            if (subsequentNode.equals(fromNode)) {
                return false;
            }

            queue.add(subsequentNode);
        }
    }

    return true;
}

Dolphinscheduler DagHelper Explanation

The DAG class is a basic general-purpose DAG tool class, and DagHelper is a business tool class that assembles task definitions and relationships between task definitions into a DAG.


org.apache.dolphinscheduler.server.master.graph.WorkflowGraphFactory#createWorkflowGraph

public IWorkflowGraph createWorkflowGraph(ProcessInstance workflowInstance) throws Exception {

    // TODO Here is actually to get the number of tasks and their relationships corresponding to the process instance
    List<ProcessTaskRelation> processTaskRelations = processService.findRelationByCode(
            workflowInstance.getProcessDefinitionCode(),
            workflowInstance.getProcessDefinitionVersion());

    // TODO Get the corresponding task definition log
    List<TaskDefinitionLog> taskDefinitionLogs = taskDefinitionLogDao.queryTaskDefineLogList(processTaskRelations);

    // TODO Get TaskNode
    List<TaskNode> taskNodeList = processService.transformTask(processTaskRelations, taskDefinitionLogs);

    // generate process to get DAG info
    // TODO Here is to parse whether the start node list is manually specified, which is not by default
    List<Long> recoveryTaskNodeCodeList = getRecoveryTaskNodeCodeList(workflowInstance.getCommandParam());

    // TODO If the default startNodeNameList is empty
    List<Long> startNodeNameList = parseStartNodeName(workflowInstance.getCommandParam());

    // TODO Build a ProcessDag object instance
    ProcessDag processDag = DagHelper.generateFlowDag(
            taskNodeList,
            startNodeNameList,
            recoveryTaskNodeCodeList,
            workflowInstance.getTaskDependType());

    if (processDag == null) {
        log.error("ProcessDag is null");
        throw new IllegalArgumentException("Create WorkflowGraph failed, ProcessDag is null");
    }

    // TODO Generate DAG
    DAG<Long, TaskNode, TaskNodeRelation> dagGraph = DagHelper.buildDagGraph(processDag);
    log.debug("Build dag success, dag: {}", dagGraph);

    // TODO Use WorkflowGraph to encapsulate the task node list and dagGraph
    return new WorkflowGraph(taskNodeList, dagGraph);
}


org.apache.dolphinscheduler.service.utils.DagHelper#generateFlowDag

public static ProcessDag generateFlowDag(
                                             List<TaskNode> totalTaskNodeList,
                                             List<Long> startNodeNameList,
                                             List<Long> recoveryNodeCodeList,
                                             TaskDependType depNodeType) throws Exception {

    // TODO Actually, it is to get all nodes
    List<TaskNode> destTaskNodeList =
            generateFlowNodeListByStartNode(
                    totalTaskNodeList, startNodeNameList, recoveryNodeCodeList, depNodeType);

    if (destTaskNodeList.isEmpty()) {
        return null;
    }

    // TODO Get the relationship between task nodes
    List<TaskNodeRelation> taskNodeRelations = generateRelationListByFlowNodes(destTaskNodeList);

    // TODO Actually, it is to instantiate a ProcessDag
    ProcessDag processDag = new ProcessDag();
    // TODO Set the edges of DAG
    processDag.setEdges(taskNodeRelations);
    // TODO Set the vertices of DAG
    processDag.setNodes(destTaskNodeList);
    return processDag;
}


Set destTaskNodeList and taskNodeRelations

org.apache.dolphinscheduler.service.utils.DagHelper#buildDagGraph

public static DAG<Long, TaskNode, TaskNodeRelation> buildDagGraph(ProcessDag processDag) {

    DAG<Long, TaskNode, TaskNodeRelation> dag = new DAG<>();

    // TODO Add vertices
    if (CollectionUtils.isNotEmpty(processDag.getNodes())) {
        for (TaskNode node : processDag.getNodes()) {
            dag.addNode(node.getCode(), node);
        }
    }

    // TODO Add edges
    if (CollectionUtils.isNotEmpty(processDag.getEdges())) {
        for (TaskNodeRelation edge : processDag.getEdges()) {
            dag.addEdge(edge.getStartNode(), edge.getEndNode());
        }
    }
    return dag;
}