-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathsteiner_tree.py
167 lines (133 loc) · 5.7 KB
/
steiner_tree.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
from itertools import combinations, chain, permutations
from functools import reduce
from networkx.utils import pairwise, not_implemented_for
import networkx as nx
from tqdm import tqdm
__all__ = ['metric_closure', 'steiner_tree']
# NOTE: this is just source code from networkx, we've copied it here so add progress updates
# SOURCE: https://networkx.github.io/documentation/stable/_modules/networkx/algorithms/approximation/steinertree.html
@not_implemented_for('directed')
def metric_closure(G, weight='weight', verbose=False):
""" Return the metric closure of a graph.
The metric closure of a graph *G* is the complete graph in which each edge
is weighted by the shortest path distance between the nodes in *G* .
Parameters
----------
G : NetworkX graph
Returns
-------
NetworkX graph
Metric closure of the graph `G`.
"""
#print("in metric_closure in steiner_tree.py")
M = nx.Graph()
Gnodes = set(G)
# check for connected graph while processing first node
#print("begin dijkistra in metric_closure")
all_paths_iter = nx.all_pairs_dijkstra(G, weight=weight)
u, (distance, path) = next(all_paths_iter)
if Gnodes - set(distance):
msg = "G is not a connected graph. metric_closure is not defined."
raise nx.NetworkXError(msg)
Gnodes.remove(u)
if verbose:
#print("\nmetric_closure for-loop #1\n")
for v in tqdm(Gnodes):
M.add_edge(u, v, distance=distance[v], path=path[v])
else:
for v in Gnodes:
M.add_edge(u, v, distance=distance[v], path=path[v])
# first node done -- now process the rest
if verbose:
print("\nmetric_closure for-loop #2\n")
# NOTE: this loop takes FOREVER!
for u, (distance, path) in tqdm(all_paths_iter):
Gnodes.remove(u)
for v in Gnodes:
M.add_edge(u, v, distance=distance[v], path=path[v])
else:
for u, (distance, path) in all_paths_iter:
Gnodes.remove(u)
for v in Gnodes:
M.add_edge(u, v, distance=distance[v], path=path[v])
return M
def coopers_steiner_tree(G, terminal_nodes, weight='weight', verbose=False):
'''
Just do pairwise dijkstra distances for the terminal nodes
we care about
Parameters
----------
G : NetworkX graph
terminal_nodes : list
A list of terminal nodes for which minimum steiner tree is
to be found.
'''
H = nx.Graph()
for u,v in combinations(terminal_nodes, 2):
distance = nx.dijkstra_path_length(G, u, v, weight=weight)
path = nx.dijkstra_path(G, u, v, weight=weight)
H.add_edge(u, v, distance=distance, path=path)
mst_edges = nx.minimum_spanning_edges(H, weight='distance', data=True)
# Create an iterator over each edge in each shortest path; repeats are okay
#if verbose: print("Begin iterator thing")
edges = chain.from_iterable(pairwise(d['path']) for u, v, d in mst_edges)
T = G.edge_subgraph(edges)
return T
def igraph_steiner_tree(G, terminal_vertices, weight='weight'):
'''
terminal_nodes is List of igraph.Vertex
'''
# Build closed graph of terminal_vertices where each weight is the shortest path distance
H = PlanarGraph()
for u,v in combinations(terminal_vertices, 2):
path_idxs = G.get_shortest_paths(u, v, weights='weight', output='epath')
path_edges = G.es[path_idxs[0]]
path_distance = reduce(lambda x,y : x+y, map(lambda x: x['weight'], path_edges))
kwargs = {'weight':path_distance, 'path':path_idxs[0]}
H.add_edge(u['name'], v['name'], **kwargs)
# Now get the MST of that complete graph of only terminal_vertices
mst_edge_idxs = H.spanning_tree(weights='weight', return_tree=False)
# Now, we join the paths for all the mst_edge_idxs
steiner_edge_idxs = list(chain.from_iterable(H.es[i]['path'] for i in mst_edge_idxs))
return H, steiner_edge_idxs
#steiner_subgraph = G.subgraph_edges(steiner_edge_idxs, delete_vertices=True)
@not_implemented_for('multigraph')
@not_implemented_for('directed')
def steiner_tree(G, terminal_nodes, weight='weight', verbose=False):
""" Return an approximation to the minimum Steiner tree of a graph.
Parameters
----------
G : NetworkX graph
terminal_nodes : list
A list of terminal nodes for which minimum steiner tree is
to be found.
Returns
-------
NetworkX graph
Approximation to the minimum steiner tree of `G` induced by
`terminal_nodes` .
Notes
-----
Steiner tree can be approximated by computing the minimum spanning
tree of the subgraph of the metric closure of the graph induced by the
terminal nodes, where the metric closure of *G* is the complete graph in
which each edge is weighted by the shortest path distance between the
nodes in *G* .
This algorithm produces a tree whose weight is within a (2 - (2 / t))
factor of the weight of the optimal Steiner tree where *t* is number of
terminal nodes.
"""
# M is the subgraph of the metric closure induced by the terminal nodes of
# G.
#print("In steiner_tree within steiner_tree.py")
M = metric_closure(G, weight=weight, verbose=verbose)
# Use the 'distance' attribute of each edge provided by the metric closure
# graph.
H = M.subgraph(terminal_nodes)
if verbose: print("Begin min span edges")
mst_edges = nx.minimum_spanning_edges(H, weight='distance', data=True)
# Create an iterator over each edge in each shortest path; repeats are okay
if verbose: print("Begin iterator thing")
edges = chain.from_iterable(pairwise(d['path']) for u, v, d in mst_edges)
T = G.edge_subgraph(edges)
return T