"""Receives a weighted directed graph in GML format and deletes all edges
that connects nodes that are connected via some other path. Only the longest
paths are retained.
The graph should be available in the "graphs" directory in the case
data folder.
A reduced graph will have the same title as the original file with the suffix
"_simplified".
A <casename>_graphreduce.json configuration file needs to be available in the
case directory root.
"""
# Library imports
import json
import logging
import os
import networkx as nx
import numpy as np
from faultmap import config_setup
from faultmap.type_definitions import RunModes
[docs]
class GraphReduceData:
"""Creates a data object from file and or function definitions for use in
graph reduce method.
"""
def __init__(self, mode, case):
# Get locations from configuration file
(
self.saveloc,
self.caseconfigloc,
self.casedir,
_,
) = config_setup.run_setup(mode, case)
# Load case config file
with open(os.path.join(self.caseconfigloc, "graphreduce.json")) as f:
self.caseconfig = json.load(f)
# Get scenarios
self.scenarios = self.caseconfig["scenarios"]
# Get data type
self.datatype = self.caseconfig["datatype"]
[docs]
def scenariodata(self, scenario):
"""Retrieves data particular to each scenario for the case being
investigated.
"""
self.graph = self.caseconfig[scenario]["graph"]
self.percentile = self.caseconfig[scenario]["percentile"]
self.depth = self.caseconfig[scenario]["depth"]
self.weight_discretion = self.caseconfig[scenario]["weight_discretion"]
# Provides option of not removing self loops
if "remove_self_loops" in self.caseconfig[scenario]:
self.remove_self_loops = self.caseconfig[scenario]["remove_self_loops"]
else:
self.remove_self_loops = True
def get_boxes(self, scenario, datadir, typename):
boxindexes = self.caseconfig[scenario]["boxindexes"]
if boxindexes == "all":
boxesdir = os.path.join(datadir, typename)
boxes = next(os.walk(boxesdir))[1]
self.boxes = range(len(boxes))
else:
self.boxes = boxindexes
def reduce_graph(graph_reduce_data, data_dir, typename, write_output):
graph_filename = "{}.gml"
simplified_graph_filename = "{}_simplified.gml"
lowedge_graph_filename = "{}_lowedge.gml"
boxesdir = os.path.join(data_dir, typename)
boxes = next(os.walk(boxesdir))[1]
for box in boxes:
dummiesdir = os.path.join(boxesdir, box)
dummytypes = next(os.walk(dummiesdir))[1]
for dummytype in dummytypes:
# Open the original graph
original_graph = nx.readwrite.read_gml(
os.path.join(
dummiesdir,
dummytype,
graph_filename.format(graph_reduce_data.graph),
)
)
# Get appropriate weight threshold for deleting edges from graph
# TODO: Implement elegant way of dealing with empty graphs
try:
threshold = compute_edge_threshold(
original_graph, graph_reduce_data.percentile
)
except Exception:
logging.warning("Empty graph")
break
# Delete low value edges from graph
lowedge_graph = delete_lowval_edges(
original_graph, threshold, graph_reduce_data.remove_self_loops
)
# Get simplified graph
simplified_graph = delete_loworder_edges(
lowedge_graph,
graph_reduce_data.depth,
graph_reduce_data.weight_discretion,
)
# Write simplified graph to file
if write_output:
# Write simplified graph
nx.readwrite.write_gml(
simplified_graph,
os.path.join(
dummiesdir,
dummytype,
simplified_graph_filename.format(graph_reduce_data.graph),
),
)
# Write lowedge graph
nx.readwrite.write_gml(
lowedge_graph,
os.path.join(
dummiesdir,
dummytype,
lowedge_graph_filename.format(graph_reduce_data.graph),
),
)
return None
def reduce_graph_scenarios(mode: RunModes, case: str, write_output: bool) -> None:
graph_reduce_data = GraphReduceData(mode, case)
saveloc, caseconfigdir, casedir, _ = config_setup.run_setup(mode, case)
# Create output directory
# savedir = \
# config_setup.ensure_existence(
# os.path.join(graphreducedata.saveloc,
# 'graphs'), make=True)
# Directory where subdirectories for scenarios will be found
scenarios_dir = os.path.join(saveloc, "noderank", case)
# Get list of all scenarios
scenarios = next(os.walk(scenarios_dir))[1]
for scenario in scenarios:
if scenario in graph_reduce_data.scenarios:
logging.info("Running scenario %s", scenario)
# Update scenario-specific fields graphreducedata object
graph_reduce_data.scenariodata(scenario)
else:
continue
# Iterate through every source graph that is found inside
# the scenario's subdirectories
methods_dir = os.path.join(scenarios_dir, scenario)
methods = next(os.walk(methods_dir))[1]
for method in methods:
logging.info("Processing method: %s", method)
sig_types_dir = os.path.join(methods_dir, method)
sig_types = next(os.walk(sig_types_dir))[1]
for sig_type in sig_types:
logging.info("Processing sigtype: %s", sig_type)
embed_types_dir = os.path.join(sig_types_dir, sig_type)
embed_types = next(os.walk(embed_types_dir))[1]
for embed_type in embed_types:
logging.info("Processing embedtype: %s", embed_type)
data_dir = os.path.join(embed_types_dir, embed_type)
if method[:16] == "transfer_entropy":
type_names = ["weight_absolute", "weight_directional"]
# 'signtested_weight_directional']
if sig_type == "sigtest":
type_names.append("sig_dweight_absolute")
type_names.append("sigweight_directional")
type_names.append("signtested_sigweight_directional")
else:
type_names = ["weight"]
if sig_type == "sigtest":
type_names.append("sigweight")
for typename in type_names:
logging.info("Processing typename: %s", typename)
# Start the methods here
reduce_graph(
graph_reduce_data,
data_dir,
typename,
write_output,
)
return None
[docs]
def compute_edge_threshold(graph, percentile):
"""Calculates the threshold that should be used to delete edges from the
original graph based on determined templates.
"""
# Get list of all edge weights
weight_dict = nx.get_edge_attributes(graph, "weight")
edge_weightlist = []
for edge in graph.edges():
edge_weightlist.append(weight_dict[edge])
threshold = np.percentile(np.asarray(edge_weightlist), percentile)
logging.info("The " + str(percentile) + "th percentile is: " + str(threshold))
return threshold
[docs]
def delete_lowval_edges(graph, weight_threshold, remove_self_loops=True):
"""Deletes all edges with weight below the threshold value.
Also deletes all self-looping edges.
"""
lowedge_graph = graph.copy()
if remove_self_loops:
# First, delete all self-loops
selfloop_list = list(nx.selfloop_edges(lowedge_graph))
lowedge_graph.remove_edges_from(selfloop_list)
logging.info("Deleted " + str(len(selfloop_list)) + " self-looping edges")
edge_dellist = []
edge_totlist = []
weight_dict = nx.get_edge_attributes(lowedge_graph, "weight")
for edge in lowedge_graph.edges():
edge_totlist.append(edge)
if weight_dict[edge] < weight_threshold:
edge_dellist.append(edge)
lowedge_graph.remove_edges_from(edge_dellist)
logging.info(
"Deleted "
+ str(len(edge_dellist))
+ "/"
+ str(graph.number_of_edges())
+ " low valued edges"
)
return lowedge_graph
def remove_duplicates(
intersection_list,
node,
upper_child,
simplified_graph,
weight_dict,
removed_edges,
weight_discretion,
):
for duplicate in intersection_list:
if simplified_graph.has_edge(node, duplicate):
if weight_discretion:
# Only remove the edge if its weight is less than the
# weight of the connection between the child
# and the duplicate
# Find the importance of the first order connection
firstweight = weight_dict[(node, duplicate)]
# Find the importance of the second order connection
secondweight = weight_dict[(upper_child, duplicate)]
if firstweight < secondweight:
simplified_graph.remove_edge(node, duplicate)
removed_edge = [node, duplicate]
removed_edges.append(removed_edge)
else:
simplified_graph.remove_edge(node, duplicate)
removed_edge = [node, duplicate]
removed_edges.append(removed_edge)
return simplified_graph, removed_edges
[docs]
def decompose(input_, output_):
"""Decomposes (flattens) a list of lists into a simple list."""
if type(input_) is list:
for subitem in input_:
decompose(subitem, output_)
else:
output_.append(input_)
[docs]
def delete_loworder_edges(graph, max_depth, weight_discretion):
"""Returns a simplified graph with higher order connections eliminated.
All self-loops are also deleted.
The level up to which the search for higher order connections should be
completed is indicated by the 'max_depth' parameter.
A value of 1 means that children of children will be investigated, while a
value of 2 means that children of children of children will be included in
the search, and so on.
If depth is set to "full", then the search is completed until no more
children is found.
If the 'weight_discretion' boolean is True, a higher order connection
between a source node and a child will not be eliminated if this connection
weight is higher than the weight of the connection between the last
higher-order child to the destination node under question.
"""
simplified_graph = graph.copy()
weight_dict = nx.get_edge_attributes(simplified_graph, "weight")
removed_edges = []
for index, node in enumerate(simplified_graph.nodes()):
children_lists = []
logging.info(
"Currently processing node: "
+ str(node)
+ " ("
+ str(index + 1)
+ "/"
+ str(len(simplified_graph.nodes()))
+ ")"
)
# First create a list of lists of all children at different degrees,
# up to the level where no children are returned
morechilds = True
depth = 0
child_list = list(simplified_graph.successors(node))
if len(child_list) != 0:
children_lists.append(child_list)
while morechilds:
depth += 1
# Flatten list of children
children_lists_decomp = []
decompose(children_lists[depth - 1], children_lists_decomp)
for upper_child in children_lists_decomp:
# Get list of children for each child in previous layer
upper_child_children = list(
simplified_graph.successors(upper_child)
)
if len(upper_child_children) != 0:
children_lists.append(upper_child_children)
intersection_list = [
val for val in child_list if val in upper_child_children
]
simplified_graph, removed_edges = remove_duplicates(
intersection_list,
node,
upper_child,
simplified_graph,
weight_dict,
removed_edges,
weight_discretion,
)
# If no upper children were found, set morechilds to False
if len(upper_child_children) == 0:
morechilds = False
# If the max_depth is not "full" and is greater than the
# maximum depth, set morechilds to False
if max_depth != "full":
if depth > (max_depth - 1):
morechilds = False
logging.info("Removed " + str(len(removed_edges)) + " higher connection edges")
# Remove nodes without edges
# Get full dictionary of in- and out-degree
out_deg_dict = simplified_graph.out_degree()
in_deg_dict = simplified_graph.in_degree()
# Remove nodes with sum(out+in) degree of zero
node_dellist = []
for node in simplified_graph.nodes():
if (out_deg_dict[node] == 0) and (in_deg_dict[node] == 0):
node_dellist.append(node)
simplified_graph.remove_nodes_from(node_dellist)
logging.info("Removed " + str(len(node_dellist)) + " nodes that were left hanging")
logging.info(
"Simplified graph has "
+ str(simplified_graph.number_of_nodes())
+ " nodes and "
+ str(simplified_graph.number_of_edges())
+ " edges"
)
return simplified_graph