import copy
from itertools import repeat, product
import jittor as jt
from jittor import Var
from jittor_geometric.data import Dataset
[docs]
class InMemoryDataset(Dataset):
r"""Dataset base class for creating graph datasets which fit completely
into CPU memory.
Args:
root (string, optional): Root directory where the dataset should be
saved. (default: :obj:`None`)
transform (callable, optional): A function/transform that takes in an
:obj:`jittor_geometric.data.Data` object and returns a transformed
version. The data object will be transformed before every access.
(default: :obj:`None`)
pre_transform (callable, optional): A function/transform that takes in
an :obj:`jittor_geometric.data.Data` object and returns a
transformed version. The data object will be transformed before
being saved to disk. (default: :obj:`None`)
pre_filter (callable, optional): A function that takes in an
:obj:`jittor_geometric.data.Data` object and returns a boolean
value, indicating whether the data object should be included in the
final dataset. (default: :obj:`None`)
"""
@property
def raw_file_names(self):
r"""The name of the files to find in the :obj:`self.raw_dir` folder in
order to skip the download."""
raise NotImplementedError
@property
def processed_file_names(self):
r"""The name of the files to find in the :obj:`self.processed_dir`
folder in order to skip the processing."""
raise NotImplementedError
[docs]
def download(self):
r"""Downloads the dataset to the :obj:`self.raw_dir` folder."""
raise NotImplementedError
[docs]
def process(self):
r"""Processes the dataset to the :obj:`self.processed_dir` folder."""
raise NotImplementedError
def __init__(self, root=None, transform=None, pre_transform=None,
pre_filter=None):
super(InMemoryDataset, self).__init__(root, transform, pre_transform,
pre_filter)
self.data, self.slices = None, None
self.__data_list__ = None
@property
def num_classes(self):
r"""The number of classes in the dataset."""
if self.data.y is None:
return 0
elif self.data.y.ndim == 1:
return int(self.data.y.max().item()) + 1
else:
return self.data.y.size(-1)
[docs]
def len(self):
for item in self.slices.values():
return len(item) - 1
return 0
[docs]
def get(self, idx):
if hasattr(self, '__data_list__'):
if self.__data_list__ is None:
self.__data_list__ = self.len() * [None]
else:
data = self.__data_list__[idx]
if data is not None:
return copy.copy(data)
data = self.data.__class__()
if hasattr(self.data, '__num_nodes__'):
data.num_nodes = self.data.__num_nodes__[idx]
for key in self.data.keys:
if key=="csr":
continue
if key=="csc":
continue
item, slices = self.data[key], self.slices[key]
start, end = slices[idx].item(), slices[idx + 1].item()
if isinstance(item, Var):
s = list(repeat(slice(None), item.ndim))
cat_dim = self.data.__cat_dim__(key, item)
if cat_dim is None:
cat_dim = 0
s[cat_dim] = slice(start, end)
elif start + 1 == end:
s = slices[start]
else:
s = slice(start, end)
if key in ['smiles','name']:
data[key] = item[s]
else:
data[key] = item[tuple(s)]
if hasattr(self, '__data_list__'):
self.__data_list__[idx] = copy.copy(data)
return data
[docs]
@staticmethod
def collate(data_list):
r"""Collates a python list of data objects to the internal storage
format of :class:`torch_geometric.data.InMemoryDataset`."""
keys = data_list[0].keys
data = data_list[0].__class__()
for key in keys:
data[key] = []
slices = {key: [0] for key in keys}
for item, key in product(data_list, keys):
data[key].append(item[key])
if isinstance(item[key], Var) and item[key].ndim > 0:
cat_dim = item.__cat_dim__(key, item[key])
cat_dim = 0 if cat_dim is None else cat_dim
s = slices[key][-1] + item[key].size(cat_dim)
else:
s = slices[key][-1] + 1
slices[key].append(s)
if hasattr(data_list[0], '__num_nodes__'):
data.__num_nodes__ = []
for item in data_list:
data.__num_nodes__.append(item.num_nodes)
for key in keys:
item = data_list[0][key]
if isinstance(item, Var) and len(data_list) > 1:
if item.ndim > 0:
cat_dim = data.__cat_dim__(key, item)
cat_dim = 0 if cat_dim is None else cat_dim
data[key] = jt.concat(data[key], dim=cat_dim)
else:
data[key] = jt.stack(data[key])
elif isinstance(item, Var): # Don't duplicate attributes...
data[key] = data[key][0]
elif isinstance(item, int) or isinstance(item, float):
data[key] = jt.array(data[key])
slices[key] = jt.array(slices[key], dtype=Var.int32)
return data, slices
[docs]
def copy(self, idx=None):
if idx is None:
data_list = [self.get(i) for i in range(len(self))]
else:
data_list = [self.get(i) for i in idx]
dataset = copy.copy(self)
dataset.__indices__ = None
dataset.__data_list__ = data_list
dataset.data, dataset.slices = self.collate(data_list)
return dataset