Source code for jittor_geometric.data.in_memory_dataset

import copy
from itertools import repeat, product

import jittor as jt
from jittor import Var
from jittor_geometric.data import Dataset


[docs] class InMemoryDataset(Dataset): r"""Dataset base class for creating graph datasets which fit completely into CPU memory. Args: root (string, optional): Root directory where the dataset should be saved. (default: :obj:`None`) transform (callable, optional): A function/transform that takes in an :obj:`jittor_geometric.data.Data` object and returns a transformed version. The data object will be transformed before every access. (default: :obj:`None`) pre_transform (callable, optional): A function/transform that takes in an :obj:`jittor_geometric.data.Data` object and returns a transformed version. The data object will be transformed before being saved to disk. (default: :obj:`None`) pre_filter (callable, optional): A function that takes in an :obj:`jittor_geometric.data.Data` object and returns a boolean value, indicating whether the data object should be included in the final dataset. (default: :obj:`None`) """ @property def raw_file_names(self): r"""The name of the files to find in the :obj:`self.raw_dir` folder in order to skip the download.""" raise NotImplementedError @property def processed_file_names(self): r"""The name of the files to find in the :obj:`self.processed_dir` folder in order to skip the processing.""" raise NotImplementedError
[docs] def download(self): r"""Downloads the dataset to the :obj:`self.raw_dir` folder.""" raise NotImplementedError
[docs] def process(self): r"""Processes the dataset to the :obj:`self.processed_dir` folder.""" raise NotImplementedError
def __init__(self, root=None, transform=None, pre_transform=None, pre_filter=None): super(InMemoryDataset, self).__init__(root, transform, pre_transform, pre_filter) self.data, self.slices = None, None self.__data_list__ = None @property def num_classes(self): r"""The number of classes in the dataset.""" if self.data.y is None: return 0 elif self.data.y.ndim == 1: return int(self.data.y.max().item()) + 1 else: return self.data.y.size(-1)
[docs] def len(self): for item in self.slices.values(): return len(item) - 1 return 0
[docs] def get(self, idx): if hasattr(self, '__data_list__'): if self.__data_list__ is None: self.__data_list__ = self.len() * [None] else: data = self.__data_list__[idx] if data is not None: return copy.copy(data) data = self.data.__class__() if hasattr(self.data, '__num_nodes__'): data.num_nodes = self.data.__num_nodes__[idx] for key in self.data.keys: if key=="csr": continue if key=="csc": continue item, slices = self.data[key], self.slices[key] start, end = slices[idx].item(), slices[idx + 1].item() if isinstance(item, Var): s = list(repeat(slice(None), item.ndim)) cat_dim = self.data.__cat_dim__(key, item) if cat_dim is None: cat_dim = 0 s[cat_dim] = slice(start, end) elif start + 1 == end: s = slices[start] else: s = slice(start, end) if key in ['smiles','name']: data[key] = item[s] else: data[key] = item[tuple(s)] if hasattr(self, '__data_list__'): self.__data_list__[idx] = copy.copy(data) return data
[docs] @staticmethod def collate(data_list): r"""Collates a python list of data objects to the internal storage format of :class:`torch_geometric.data.InMemoryDataset`.""" keys = data_list[0].keys data = data_list[0].__class__() for key in keys: data[key] = [] slices = {key: [0] for key in keys} for item, key in product(data_list, keys): data[key].append(item[key]) if isinstance(item[key], Var) and item[key].ndim > 0: cat_dim = item.__cat_dim__(key, item[key]) cat_dim = 0 if cat_dim is None else cat_dim s = slices[key][-1] + item[key].size(cat_dim) else: s = slices[key][-1] + 1 slices[key].append(s) if hasattr(data_list[0], '__num_nodes__'): data.__num_nodes__ = [] for item in data_list: data.__num_nodes__.append(item.num_nodes) for key in keys: item = data_list[0][key] if isinstance(item, Var) and len(data_list) > 1: if item.ndim > 0: cat_dim = data.__cat_dim__(key, item) cat_dim = 0 if cat_dim is None else cat_dim data[key] = jt.concat(data[key], dim=cat_dim) else: data[key] = jt.stack(data[key]) elif isinstance(item, Var): # Don't duplicate attributes... data[key] = data[key][0] elif isinstance(item, int) or isinstance(item, float): data[key] = jt.array(data[key]) slices[key] = jt.array(slices[key], dtype=Var.int32) return data, slices
[docs] def copy(self, idx=None): if idx is None: data_list = [self.get(i) for i in range(len(self))] else: data_list = [self.get(i) for i in idx] dataset = copy.copy(self) dataset.__indices__ = None dataset.__data_list__ = data_list dataset.data, dataset.slices = self.collate(data_list) return dataset