"""Functions to calculate the shortest temporal path."""
import numpy as np
from teneto.utils import process_input
import itertools
import pandas as pd
[docs]
def seqpath_to_path(pairseq, source):
# seq must be a path sequence (i.e. possible paths per timepoint)
# convert the sequence of pairs to a n x 2 array
pairrows = np.reshape(pairseq, [int(len(pairseq)/2), 2])
queue = [(0, [0])]
# if source is in the first tuple, return
if source in pairrows[0]:
yield [pairrows[0].tolist()]
while queue:
# Set the queue
(node, path) = queue.pop(0)
# Get all remaining possible paths in sequence
iterset = set(np.where((pairrows == pairrows[node, 0]) | (
pairrows == pairrows[node, 1]))[0]) - set(range(node+1))
for nextset in iterset:
if source in pairrows[nextset]:
yield list(reversed(pairrows[path + [nextset]].tolist()))
else:
queue.append((nextset, path + [nextset]))
[docs]
def shortest_path_from_pairseq(pairseq, source):
try:
return next(seqpath_to_path(pairseq, source))
except StopIteration:
return None
[docs]
def shortest_temporal_path(tnet, steps_per_t='all', i=None, j=None, it=None, minimise='temporal_distance'):
"""
Shortest temporal path
Parameters
--------------
tnet : tnet obj, array or dict
input network. nettype: bu, bd.
steps_per_t : int or str
If str, should be 'all'.
How many edges can be travelled during a single time-point.
i : list
List of node indicies to restrict analysis. These are nodes the paths start from. Default is all nodes.
j : list
List of node indicies to restrict analysis. There are nodes the paths end on. Default is all nodes.
it : None, int, list
Time points for parts.
Either None (default) which takes all time points,
an integer to indicate which time point to start at,
or a list of time-points that is included in analysis
(including end time-point).
minimise : str
Can be "temporal_distance", returns the path that has the smallest temporal distance.
It is possible there can be a path that is a smaller
topological distance (this option currently not available).
Returns
-------------------
paths : pandas df
Dataframe consisting of information about all the paths found.
Notes
---------------
The shortest temporal path calculates the temporal and topological distance there to be a path between nodes.
The argument steps_per_t allows for multiple nodes to be travelled per time-point.
Topological distance is the number of edges that are travelled. Temporal distance is the number of time-points.
This function returns the path that is the shortest temporal distance away.
Examples
--------
Let us start by creating a small network.
>>> import numpy as np
>>> import matplotlib.pyplot as plt
>>> import teneto
>>> G = np.zeros([4, 4, 3])
>>> G[0, 1, [0, 2]] = 1
>>> G[0, 3, [2]] = 1
>>> G[1, 2, [1]] = 1
>>> G[2, 3, [1]] = 1
Let us look at this network to see what is there.
>>> fig, ax = plt.subplots(1)
>>> ax = teneto.plot.slice_plot(G, ax, nodelabels=[0,1,2,3], timelabels=[0,1,2], cmap='Set2')
>>> plt.tight_layout()
>>> fig.show()
.. plot::
import numpy as np
import matplotlib.pyplot as plt
import teneto
G = np.zeros([4, 4, 3])
G[0, 1, [0, 2]] = 1
G[0, 3, [2]] = 1
G[1, 2, [1]] = 1
G[2, 3, [1]] = 1
fig,ax = plt.subplots(1)
teneto.plot.slice_plot(G,ax,nodelabels=[0,1,2,3],timelabels=[0,1,2],cmap='Set2')
plt.tight_layout()
fig.show()
Here we can visualize what the shortest paths are.
Let us start by starting at
node 0 we want to find the path to node 3, starting at time 0. To do this we write:
>>> sp = teneto.networkmeasures.shortest_temporal_path(G, i=0, j=3, it=0)
>>> sp['temporal-distance']
0 2
Name: temporal-distance, dtype: int64
>>> sp['topological-distance']
0 3
Name: topological-distance, dtype: int64
>>> sp['path includes']
0 [[0, 1], [1, 2], [2, 3]]
Name: path includes, dtype: object
Here we see that the shortest path takes 3 steps (topological distance of 3) at 2 time points.
It starts by going from node 0 to 1 at t=0, then 1 to 2 and 2 to 3 at t=1.
We can see all the nodes
that were travelled in the "path includes" list.
In the above example, it was possible to traverse multiple edges at a single time-point.
It is possible to restrain that by setting the steps_per_t argument
>>> sp = teneto.networkmeasures.shortest_temporal_path(G, i=0, j=3, it=0, steps_per_t=1)
>>> sp['temporal-distance']
0 3
Name: temporal-distance, dtype: int64
>>> sp['topological-distance']
0 1
Name: topological-distance, dtype: int64
>>> sp['path includes']
0 [[0, 3]]
Name: path includes, dtype: object
Here we see that the path is now only one edge, 0 to 3 at t=2.
The quicker path is no longer possible.
"""
tnet = process_input(tnet, ['C', 'G', 'TN'], 'TN')
# If i, j or it are inputs, process them
if i is None:
source_nodes = np.arange(tnet.netshape[0])
elif isinstance(i, int):
source_nodes = [i]
elif isinstance(i, list):
source_nodes = i
else:
raise ValueError('Unknown i input. Should be None, int or list')
if j is None:
target_nodes = np.arange(tnet.netshape[0])
elif isinstance(j, int):
target_nodes = [j]
elif isinstance(j, list):
target_nodes = j
else:
raise ValueError('Unknown j input. Should be None, int or list')
if it is None:
time_points = np.arange(tnet.netshape[1])
elif isinstance(it, int):
time_points = [it]
elif isinstance(it, list):
time_points = it
else:
raise ValueError('Unknown t input. Should be None, int or list')
# Two step process.
# First, get what the network can reach per timepoint.
# Second, check all possible sequences of what the network can reach for the shortest sequence.
paths = []
for source in source_nodes:
for target in target_nodes:
if target == source:
pass
else:
for tstart in time_points:
# Part 1 starts here
ij = [source]
t = tstart
step = 1
lenij = 1
pairs = []
stop = 0
while stop == 0:
# Only select i if directed, ij if undirected.
if tnet.nettype[1] == 'u':
network = tnet.get_network_when(ij=list(ij), t=t)
elif tnet.nettype[1] == 'd':
network = tnet.get_network_when(i=list(ij), t=t)
new_nodes = network[['i', 'j']].values
if len(new_nodes) != 0:
pairs.append(new_nodes.tolist())
new_nodes = new_nodes.flatten()
ij = np.hstack([ij, new_nodes])
ij = np.unique(ij)
if minimise == 'temporal_distance' and target in ij:
stop = 1
elif minimise == 'topology' and t == tnet.netshape[1] and target in ij:
stop = 1
elif t == tnet.netshape[1]:
t = np.nan
ij = [target]
stop = 1
else:
if len(ij) == lenij:
t += 1
step = 1
elif steps_per_t == 'all':
pass
elif step < steps_per_t:
step += 1
else:
t += 1
step = 1
if t == tnet.netshape[1]:
t = np.nan
ij = [target]
stop = 1
lenij = len(ij)
# correct t for return
# Only run if one pair is added.
t += 1
# part 2 starts here
path = np.nan
path_length = np.nan
for n in itertools.product(*reversed(pairs)):
a = np.array(n).flatten()
if source not in a or target not in a:
pass
else:
pathtmp = shortest_path_from_pairseq(a, source)
if pathtmp:
if not isinstance(path, list):
path = pathtmp
path_length = len(path)
elif len(pathtmp) < path_length:
path = pathtmp
path_length = len(path)
elif len(pathtmp) == path_length:
if isinstance(path[0][0], list):
if pathtmp in path:
pass
else:
path.append(pathtmp)
else:
if path == pathtmp:
pass
else:
path = [path, pathtmp]
# elif sourcei < 2 and target in a[:2]:
# path_length = 2
paths.append([source, target, tstart, t-tstart, path_length, path])
paths = pd.DataFrame(data=paths, columns=[
'from', 'to', 't_start', 'temporal-distance', 'topological-distance', 'path includes'])
return paths