Copyright (c) Microsoft Corporation. All rights reserved.
Licensed under the MIT License. See License.txt in the project root for
license information.
/**
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License. See License.txt in the project root for
* license information.
*/
package com.microsoft.azure.arm.dag;
import org.apache.commons.lang3.StringUtils;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
Type representing a DAG (directed acyclic graph).
each node in a DAG is represented by DAGNode
Type parameters:
/**
* Type representing a DAG (directed acyclic graph).
* <p>
* each node in a DAG is represented by {@link DAGNode}
*
* @param <DataT> the type of the data stored in the graph nodes
* @param <NodeT> the type of the nodes in the graph
*/
public class DAGraph<DataT, NodeT extends DAGNode<DataT, NodeT>> extends Graph<DataT, NodeT> {
the root node in the graph. this#nodeTable
contains all the nodes in this graph with this as the root. /**
* the root node in the graph.
* {@link this#nodeTable} contains all the nodes in this graph with this as the root.
*/
private final NodeT rootNode;
the immediate parent graphs of this graph. A parent graph is the one with it's root
depends on this graph's root.
/**
* the immediate parent graphs of this graph. A parent graph is the one with it's root
* depends on this graph's root.
*/
protected List<DAGraph<DataT, NodeT>> parentDAGs;
to perform topological sort on the graph. During sorting queue contains the nodes which
are ready to invoke.
/**
* to perform topological sort on the graph. During sorting queue contains the nodes which
* are ready to invoke.
*/
protected ConcurrentLinkedQueue<String> queue;
Creates a new DAG.
Params: - rootNode – the root node of this DAG
/**
* Creates a new DAG.
*
* @param rootNode the root node of this DAG
*/
public DAGraph(NodeT rootNode) {
this.parentDAGs = new ArrayList<>();
this.rootNode = rootNode;
this.queue = new ConcurrentLinkedQueue<>();
this.rootNode.setPreparer(true);
this.addNode(rootNode);
}
Returns: true if this DAG is merged with one or more DAG and hence has parents
/**
* @return <tt>true</tt> if this DAG is merged with one or more DAG and hence has parents
*/
public boolean hasParents() {
return this.parentDAGs.size() > 0;
}
Returns: the root node of the DAG.
/**
* @return the root node of the DAG.
*/
protected NodeT root() {
return this.rootNode;
}
Checks whether the given node is root node of this DAG.
Params: - node – the node
DAGNode
to be checked
Returns: true if the given node is root node
/**
* Checks whether the given node is root node of this DAG.
*
* @param node the node {@link DAGNode} to be checked
* @return <tt>true</tt> if the given node is root node
*/
public boolean isRootNode(NodeT node) {
return this.rootNode == node;
}
Returns: true if this dag is the preparer responsible for
preparing the DAG for traversal.
/**
* @return <tt>true</tt> if this dag is the preparer responsible for
* preparing the DAG for traversal.
*/
public boolean isPreparer() {
return this.rootNode.isPreparer();
}
Gets a node from the graph with the given key.
Params: - key – the key of the node
Returns: the node
/**
* Gets a node from the graph with the given key.
* @param key the key of the node
* @return the node
*/
public NodeT getNode(String key) {
return nodeTable.get(key);
}
Mark root of this DAG depends on given DAG's root.
Params: - dependencyGraph – the dependency DAG
/**
* Mark root of this DAG depends on given DAG's root.
*
* @param dependencyGraph the dependency DAG
*/
public void addDependencyGraph(DAGraph<DataT, NodeT> dependencyGraph) {
this.rootNode.addDependency(dependencyGraph.rootNode.key());
Map<String, NodeT> sourceNodeTable = dependencyGraph.nodeTable;
Map<String, NodeT> targetNodeTable = this.nodeTable;
this.merge(sourceNodeTable, targetNodeTable);
dependencyGraph.parentDAGs.add(this);
if (this.hasParents()) {
this.bubbleUpNodeTable(this, new LinkedList<String>());
}
}
Mark root of the given DAG depends on this DAG's root.
Params: - dependentGraph – the dependent DAG
/**
* Mark root of the given DAG depends on this DAG's root.
*
* @param dependentGraph the dependent DAG
*/
public void addDependentGraph(DAGraph<DataT, NodeT> dependentGraph) {
dependentGraph.addDependencyGraph(this);
}
Prepares this DAG for node enumeration using getNext method, each call to getNext returns next node
in the DAG with no dependencies.
/**
* Prepares this DAG for node enumeration using getNext method, each call to getNext returns next node
* in the DAG with no dependencies.
*/
public void prepareForEnumeration() {
if (isPreparer()) {
for (NodeT node : nodeTable.values()) {
// Prepare each node for traversal
node.initialize();
if (!this.isRootNode(node)) {
// Mark other sub-DAGs as non-preparer
node.setPreparer(false);
}
}
initializeDependentKeys();
initializeQueue();
}
}
Gets next node in the DAG which has no dependency or all of it's dependencies are resolved and
ready to be consumed.
Returns: next node or null if all the nodes have been explored or no node is available at this moment.
/**
* Gets next node in the DAG which has no dependency or all of it's dependencies are resolved and
* ready to be consumed.
*
* @return next node or null if all the nodes have been explored or no node is available at this moment.
*/
public NodeT getNext() {
String nextItemKey = queue.poll();
if (nextItemKey == null) {
return null;
}
return nodeTable.get(nextItemKey);
}
Reports that a node is resolved hence other nodes depends on it can consume it.
Params: - completed – the node ready to be consumed
/**
* Reports that a node is resolved hence other nodes depends on it can consume it.
*
* @param completed the node ready to be consumed
*/
public void reportCompletion(NodeT completed) {
completed.setPreparer(true);
String dependency = completed.key();
for (String dependentKey : nodeTable.get(dependency).dependentKeys()) {
DAGNode<DataT, NodeT> dependent = nodeTable.get(dependentKey);
dependent.lock().lock();
try {
dependent.onSuccessfulResolution(dependency);
if (dependent.hasAllResolved()) {
queue.add(dependent.key());
}
} finally {
dependent.lock().unlock();
}
}
}
Reports that a node is faulted.
Params: - faulted – the node faulted
- throwable – the reason for fault
/**
* Reports that a node is faulted.
*
* @param faulted the node faulted
* @param throwable the reason for fault
*/
public void reportError(NodeT faulted, Throwable throwable) {
faulted.setPreparer(true);
String dependency = faulted.key();
for (String dependentKey : nodeTable.get(dependency).dependentKeys()) {
DAGNode<DataT, NodeT> dependent = nodeTable.get(dependentKey);
dependent.lock().lock();
try {
dependent.onFaultedResolution(dependency, throwable);
if (dependent.hasAllResolved()) {
queue.add(dependent.key());
}
} finally {
dependent.lock().unlock();
}
}
}
Initializes dependents of all nodes.
The DAG will be explored in DFS order and all node's dependents will be identified,
this prepares the DAG for traversal using getNext method, each call to getNext returns next node
in the DAG with no dependencies.
/**
* Initializes dependents of all nodes.
* <p>
* The DAG will be explored in DFS order and all node's dependents will be identified,
* this prepares the DAG for traversal using getNext method, each call to getNext returns next node
* in the DAG with no dependencies.
*/
private void initializeDependentKeys() {
visit(new Visitor<NodeT>() {
@Override
public void visitNode(NodeT node) {
if (node.dependencyKeys().isEmpty()) {
return;
}
String dependentKey = node.key();
for (String dependencyKey : node.dependencyKeys()) {
nodeTable.get(dependencyKey)
.addDependent(dependentKey);
}
}
@Override
public void visitEdge(String fromKey, String toKey, EdgeType edgeType) {
if (edgeType == EdgeType.BACK) {
throw new IllegalStateException("Detected circular dependency: " + findPath(fromKey, toKey));
}
}
});
}
Initializes the queue that tracks the next set of nodes with no dependencies or
whose dependencies are resolved.
/**
* Initializes the queue that tracks the next set of nodes with no dependencies or
* whose dependencies are resolved.
*/
private void initializeQueue() {
this.queue.clear();
for (Map.Entry<String, NodeT> entry: nodeTable.entrySet()) {
if (!entry.getValue().hasDependencies()) {
this.queue.add(entry.getKey());
}
}
if (queue.isEmpty()) {
throw new IllegalStateException("Detected circular dependency");
}
}
Copies entries in the source map to target map.
Params: - source – source map
- target – target map
/**
* Copies entries in the source map to target map.
*
* @param source source map
* @param target target map
*/
private void merge(Map<String, NodeT> source, Map<String, NodeT> target) {
for (Map.Entry<String, NodeT> entry : source.entrySet()) {
String key = entry.getKey();
if (!target.containsKey(key)) {
target.put(key, entry.getValue());
}
}
}
Propagates node table of given DAG to all of it ancestors.
/**
* Propagates node table of given DAG to all of it ancestors.
*/
private void bubbleUpNodeTable(DAGraph<DataT, NodeT> from, LinkedList<String> path) {
if (path.contains(from.rootNode.key())) {
path.push(from.rootNode.key()); // For better error message
throw new IllegalStateException("Detected circular dependency: " + StringUtils.join(path, " -> "));
}
path.push(from.rootNode.key());
for (DAGraph<DataT, NodeT> to : from.parentDAGs) {
this.merge(from.nodeTable, to.nodeTable);
this.bubbleUpNodeTable(to, path);
}
path.pop();
}
}