Skip to content

Issue 164 Make Device Handling Explicit #187

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 23 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/contributing.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,12 @@ The tests are located in the `tests/`-directory. We use `pytest-cov` to measure
!!! todo "Add tests"
We are currently only at 60% coverage. So the lines above are currently pure fiction.

Test that use a GPU are located in the `tests/gpu`-directory.
They are currently disabled for CI but can be manually executed with
```
pytest -m gpu
```

## Benchmarking

For optimal runtime, we continually measure the execution time of our core functions using pytest benchmarks. These benchmarks are located in `tests/benchmarks/` and are unit-tests that utilize the `benchmark` fixture from [`pytest-benchmark`](https://pytest-benchmark.readthedocs.io/en/latest/index.html). All of them are marked with the benchmark decorator (`@pytest.mark.benchmark`) to exclude them from the normal unit-tests. You can run all benchmarks in the command line using
Expand Down
5 changes: 3 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,10 @@ name = "pathpyG"
pythonpath = ["src"]
testpaths = "tests"
markers = [
"benchmark: marks tests as benchmarking tests (deselect with '-m \"not benchmark\"')"
"benchmark: marks tests as benchmarking tests (deselect with '-m \"not benchmark\"')",
"gpu: marks tests that require GPU support (deselect with '-m \"not gpu\"')"
]
addopts = "--cov=src -m \"not benchmark\""
addopts = "--cov=src -m \"not benchmark and not gpu\""

[tool.coverage.run]
branch = true
Expand Down
30 changes: 14 additions & 16 deletions src/pathpyG/algorithms/temporal.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@
from pathpyG.core.IndexMap import IndexMap
from pathpyG.core.MultiOrderModel import MultiOrderModel

from pathpyG import config

device = config['torch']['device']

def lift_order_temporal(g: TemporalGraph, delta: int = 1):

Expand All @@ -35,45 +32,46 @@ def lift_order_temporal(g: TemporalGraph, delta: int = 1):

# find indices of all source edges that occur at unique timestamp t
src_time_mask = (timestamps == t)
src_edges = edge_index[:,src_time_mask]
src_edges = edge_index[:, src_time_mask]
src_edge_idx = indices[src_time_mask]

# find indices of all edges that can possibly continue edges occurring at time t for the given delta
dst_time_mask = (timestamps > t) & (timestamps <= t+delta)
dst_edges = edge_index[:,dst_time_mask]
dst_edges = edge_index[:, dst_time_mask]
dst_edge_idx = indices[dst_time_mask]

if dst_edge_idx.size(0)>0 and src_edge_idx.size(0)>0:
if dst_edge_idx.size(0) > 0 and src_edge_idx.size(0)>0:

# compute second-order edges between src and dst idx for all edges where dst in src_edges matches src in dst_edges
x = torch.cartesian_prod(src_edge_idx, dst_edge_idx).t()
src_edges = torch.index_select(edge_index, dim=1, index=x[0])
dst_edges = torch.index_select(edge_index, dim=1, index=x[1])
ho_edge_index = x[:,torch.where(src_edges[1,:] == dst_edges[0,:])[0]]
ho_edge_index = x[:, torch.where(src_edges[1, :] == dst_edges[0, :])[0]]
second_order.append(ho_edge_index)

ho_index = torch.cat(second_order, dim=1)
return ho_index


def temporal_shortest_paths(g: TemporalGraph, delta: int):
# generate temporal event DAG
edge_index = lift_order_temporal(g, delta).to(device)
edge_index = lift_order_temporal(g, delta)

# Add indices of first-order nodes as src and dst of paths in augmented
# temporal event DAG
src_edges_src = (g.data.edge_index[0] + g.M).to(device)
src_edges_dst = (torch.arange(0, g.data.edge_index.size(1))).to(device)
src_edges_src = g.data.edge_index[0] + g.M
src_edges_dst = torch.arange(0, g.data.edge_index.size(1), device=edge_index.device)

dst_edges_src = torch.arange(0, g.data.edge_index.size(1)).to(device)
dst_edges_dst = (g.data.edge_index[1] + g.M + g.N).to(device)
dst_edges_src = torch.arange(0, g.data.edge_index.size(1), device=edge_index.device)
dst_edges_dst = (g.data.edge_index[1] + g.M + g.N)

# add edges from source to edges and from edges to destinations
src_edges = torch.stack([src_edges_src, src_edges_dst]).to(device)
dst_edges = torch.stack([dst_edges_src, dst_edges_dst]).to(device)
src_edges = torch.stack([src_edges_src, src_edges_dst])
dst_edges = torch.stack([dst_edges_src, dst_edges_dst])
edge_index = torch.cat([edge_index, src_edges, dst_edges], dim=1)

# create sparse scipy matrix
event_graph = Graph.from_edge_index(edge_index, num_nodes=g.M + 2 * g.N)
event_graph = Graph.from_edge_index(edge_index, num_nodes=g.M + 2 * g.N)
m = event_graph.get_sparse_adj_matrix()

#print(f"Created temporal event DAG with {event_graph.N} nodes and {event_graph.M} edges")
Expand All @@ -82,7 +80,7 @@ def temporal_shortest_paths(g: TemporalGraph, delta: int):
dist, pred = dijkstra(m, directed=True, indices=np.arange(g.M, g.M+g.N), return_predecessors=True, unweighted=True)

# limit to first-order destinations and correct distances
dist_fo = dist[:, g.M+g.N:] - 1
dist_fo = dist[:, g.M+g.N:] - 1
pred_fo = pred[:, g.N+g.M:]
np.fill_diagonal(dist_fo, 0)

Expand Down
3 changes: 0 additions & 3 deletions src/pathpyG/config.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@ enabled = True
leave = False
min_iter = 1000

[torch]
device = cpu

[attributes]
history = True
multiple = False
Expand Down
35 changes: 21 additions & 14 deletions src/pathpyG/core/Graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
from torch_geometric.transforms.to_undirected import ToUndirected
from torch_geometric.utils import is_undirected

from pathpyG.utils.config import config
from pathpyG.core.IndexMap import IndexMap


Expand All @@ -36,7 +35,7 @@ class Graph:
def __init__(self, data: Data, mapping: Optional[IndexMap] = None):
"""Generate graph instance from a pyG `Data` object.

Generate a Graph instance from a `torch_geometric.Data` object that contains an EdgeIndex as well as
Generate a Graph instance from a `torch_geometric.Data` object that contains an EdgeIndex as well as
optional node-, edge- or graph-level attributes. An optional mapping can be used to transparently map
node indices to string identifiers.

Expand Down Expand Up @@ -67,7 +66,7 @@ def __init__(self, data: Data, mapping: Optional[IndexMap] = None):

# turn edge index tensor into EdgeIndex object
if not isinstance(data.edge_index, EdgeIndex):
data.edge_index = EdgeIndex(data=data.edge_index, sparse_size=(data.num_nodes, data.num_nodes))
data.edge_index = EdgeIndex(data=data.edge_index, sparse_size=(data.num_nodes, data.num_nodes)).to(device=data.edge_index.device)

if data.edge_index.get_sparse_size(dim=0) != data.num_nodes or data.edge_index.get_sparse_size(dim=1) != data.num_nodes:
raise Exception('sparse size of EdgeIndex should match number of nodes!')
Expand All @@ -87,10 +86,12 @@ def __init__(self, data: Data, mapping: Optional[IndexMap] = None):
((self.row_ptr, self.col), _) = self.data.edge_index.get_csr()
((self.col_ptr, self.row), _) = self.data.edge_index.get_csc()


@staticmethod
def from_edge_index(edge_index: torch.Tensor, mapping: Optional[IndexMap] = None, num_nodes=None) -> Graph:
"""Construct a graph from a torch Tensor containing an edge index. An optional mapping can
be used to transparently map node indices to string identifiers.
The device is defined by edge_index.

Args:
edge_index: torch.Tensor or torch_geometric.EdgeIndex object containing an edge_index
Expand All @@ -115,14 +116,15 @@ def from_edge_index(edge_index: torch.Tensor, mapping: Optional[IndexMap] = None
d = Data(edge_index=edge_index)
else:
d = Data(edge_index=edge_index, num_nodes=num_nodes)

return Graph(
d,
mapping=mapping
)


@staticmethod
def from_edge_list(edge_list: Iterable[Tuple[str, str]], is_undirected: bool = False, mapping: IndexMap = None, num_nodes=None) -> Graph:
def from_edge_list(edge_list: Iterable[Tuple[str, str]], is_undirected: bool = False, mapping: IndexMap = None, num_nodes=None, device: Optional[torch.device] = None) -> Graph:
"""Generate a Graph based on an edge list.

Edges can be given as string or integer tuples. If strings are used and no mapping is given,
Expand Down Expand Up @@ -169,7 +171,7 @@ def from_edge_list(edge_list: Iterable[Tuple[str, str]], is_undirected: bool = F
if num_nodes is None:
num_nodes = mapping.num_ids()

edge_index = EdgeIndex([sources, targets], sparse_size=(num_nodes, num_nodes), is_undirected=is_undirected, device=config['torch']['device'])
edge_index = EdgeIndex([sources, targets], sparse_size=(num_nodes, num_nodes), is_undirected=is_undirected, device=device)
return Graph(
Data(edge_index=edge_index, num_nodes=num_nodes),
mapping=mapping
Expand Down Expand Up @@ -203,9 +205,18 @@ def to_undirected(self) -> Graph:

def to_weighted_graph(self) -> Graph:
"""Coalesces multi-edges to single-edges with an additional weight attribute"""
i, w = torch_geometric.utils.coalesce(self.data.edge_index, torch.ones(self.M).to(config["torch"]["device"]))
i, w = torch_geometric.utils.coalesce(self.data.edge_index, torch.ones(self.M, device=self.data.edge_index.device))
return Graph(Data(edge_index=i, edge_weight=w), mapping=self.mapping)

def to(self, device: torch.device) -> Graph:
"""Moves `edge_index` to the given device. To move whole underlying data use `graph.data.to(device)`."""
self.data.to(device, 'edge_index')
self.row = self.row.to(device)
self.row_ptr = self.row_ptr.to(device)
self.col = self.col.to(device)
self.col_ptr = self.col_ptr.to(device)
return self

@staticmethod
def attr_types(attr: Dict) -> Dict:
"""
Expand Down Expand Up @@ -422,9 +433,7 @@ def add_node_ohe(self, attr_name: str, dim: int = 0) -> None:
"""
if dim == 0:
dim = self.N
self.data[attr_name] = torch.eye(dim, dtype=torch.float).to(
config["torch"]["device"]
)[: self.N]
self.data[attr_name] = torch.eye(dim, dtype=torch.float).to(self.data.edge_index.device)[: self.N]

def add_edge_ohe(self, attr_name: str, dim: int = 0) -> None:
"""Add one-hot encoding of edges to edge attribute.
Expand All @@ -435,9 +444,7 @@ def add_edge_ohe(self, attr_name: str, dim: int = 0) -> None:
"""
if dim == 0:
dim = self.M
self.data[attr_name] = torch.eye(dim, dtype=torch.float).to(
config["torch"]["device"]
)[: self.M]
self.data[attr_name] = torch.eye(dim, dtype=torch.float).to(self.data.edge_index.device)[: self.M]

def __getitem__(self, key: Union[tuple, str]) -> Any:
"""Return node, edge, or graph attribute.
Expand Down Expand Up @@ -576,8 +583,8 @@ def __add__(self, other: Graph) -> Graph:
# apply index translation to d2
# fast dictionary based mapping using torch
palette, key = zip(*d2_idx_translation.items())
key = torch.tensor(key)
palette = torch.tensor(palette)
key = torch.tensor(key, device=d2.edge_index.device)
palette = torch.tensor(palette, device=d2.edge_index.device)

index = torch.bucketize(d2.edge_index.ravel(), palette)
d2.edge_index = key[index].reshape(d2.edge_index.shape)
Expand Down
8 changes: 4 additions & 4 deletions src/pathpyG/core/IndexMap.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from __future__ import annotations
from typing import TYPE_CHECKING, Dict, List, Union
from typing import TYPE_CHECKING, Dict, List, Union, Optional

import torch
import numpy as np
Expand Down Expand Up @@ -65,12 +65,12 @@ def to_idx(self, node: Union[str, int]) -> int:
else:
return node

def to_idxs(self, nodes: list | tuple) -> torch.Tensor:
def to_idxs(self, nodes: list | tuple, device: Optional[torch.device] = None) -> torch.Tensor:
"""Map list of arguments (IDs or indices) to indices if mapping is defined, return argument otherwise."""
if self.has_ids:
return torch.tensor([self.id_to_idx[node] for node in nodes])
return torch.tensor([self.id_to_idx[node] for node in nodes], device=device)
else:
return torch.tensor(nodes)
return torch.tensor(nodes, device=device)

def __str__(self) -> str:
s = ''
Expand Down
10 changes: 8 additions & 2 deletions src/pathpyG/core/MultiOrderModel.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from torch_geometric.loader import DataLoader
from torch_geometric.utils import cumsum, coalesce, degree, sort_edge_index

from pathpyG.utils.config import config
from pathpyG.core.Graph import Graph
from pathpyG.core.path_data import PathData
from pathpyG.core.TemporalGraph import TemporalGraph
Expand All @@ -24,6 +23,13 @@ def __str__(self) -> str:
max_order = max(list(self.layers.keys())) if self.layers else 0
s = f"MultiOrderModel with max. order {max_order}"
return s

def to(self, device: torch.device) -> MultiOrderModel:
"""Converts the graph layers to the given device."""
for g in self.layers.values():
g.to(device)
return self


@staticmethod
def aggregate_edge_weight(ho_index: torch.Tensor, edge_weight: torch.Tensor, aggr: str = "src") -> torch.Tensor:
Expand Down Expand Up @@ -246,7 +252,7 @@ def from_PathData(
m = MultiOrderModel()

# We assume that paths are sorted
path_graph = next(iter(DataLoader(path_data.paths, batch_size=len(path_data.paths)))).to(config["torch"]["device"])
path_graph = next(iter(DataLoader(path_data.paths, batch_size=len(path_data.paths)))).to(path_data.device)
edge_index = path_graph.edge_index
node_sequence = path_graph.node_sequence
if path_graph.edge_attr is None:
Expand Down
20 changes: 12 additions & 8 deletions src/pathpyG/core/TemporalGraph.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@

from pathpyG import Graph
from pathpyG.core.IndexMap import IndexMap
from pathpyG.utils.config import config


class TemporalGraph(Graph):
Expand Down Expand Up @@ -41,7 +40,7 @@ def __init__(self, data: TemporalData, mapping: IndexMap = None) -> None:
src=data.src[sort_index],
dst=data.dst[sort_index],
t=t_sorted
).to(config['torch']['device'])
).to(data.edge_index.device)

if mapping is not None:
self.mapping = mapping
Expand All @@ -63,7 +62,7 @@ def __init__(self, data: TemporalData, mapping: IndexMap = None) -> None:
# ).tocsr()

@staticmethod
def from_edge_list(edge_list) -> TemporalGraph:
def from_edge_list(edge_list, device: Optional[torch.device] = None) -> TemporalGraph:
sources = []
targets = []
ts = []
Expand All @@ -79,14 +78,14 @@ def from_edge_list(edge_list) -> TemporalGraph:

return TemporalGraph(
data=TemporalData(
src=torch.Tensor(sources).long(),
dst=torch.Tensor(targets).long(),
t=torch.Tensor(ts)),
src=torch.Tensor(sources, device=device).long(),
dst=torch.Tensor(targets, device=device).long(),
t=torch.Tensor(ts, device=device)),
mapping=index_map
)

@staticmethod
def from_csv(file, timestamp_format='%Y-%m-%d %H:%M:%S', time_rescale=1) -> TemporalGraph:
def from_csv(file, timestamp_format='%Y-%m-%d %H:%M:%S', time_rescale=1, device: Optional[torch.device] = None) -> TemporalGraph:
tedges = []
with open(file, "r", encoding="utf-8") as f:
for line in f:
Expand All @@ -100,7 +99,7 @@ def from_csv(file, timestamp_format='%Y-%m-%d %H:%M:%S', time_rescale=1) -> Temp
x = datetime.datetime.strptime(timestamp, timestamp_format)
t = int(mktime(x.timetuple()))
tedges.append((fields[0], fields[1], int(t/time_rescale)))
return TemporalGraph.from_edge_list(tedges)
return TemporalGraph.from_edge_list(tedges, device=device)

@property
def temporal_edges(self) -> Generator[Tuple[int, int, int], None, None]:
Expand All @@ -109,6 +108,11 @@ def temporal_edges(self) -> Generator[Tuple[int, int, int], None, None]:
for e in self.data.edge_index.t():
yield self.mapping.to_id(e[0].item()), self.mapping.to_id(e[1].item()), self.data.t[i].item() # type: ignore
i += 1

def to(self, device: torch.device) -> Graph:
"""Moves all attributes to the given device. """
self.data.to(device)
return self

def shuffle_time(self) -> None:
"""Randomly shuffles the temporal order of edges by randomly permuting timestamps."""
Expand Down
Loading
Loading