Source code for dataloaders.loaders

"""
Module for loading data into the ltu-ili pipeline.
"""

import yaml
from abc import ABC, abstractmethod
from typing import Any, List, Tuple, Optional, Union
from pathlib import Path
import numpy as np
import json
import pandas as pd
from ili.utils import Dataset, update

try:
    from sbi.simulators.simutils import simulate_in_batches
    from torch import Tensor
    from torch.utils.data import DataLoader
except ModuleNotFoundError:
    DataLoader, Tensor = Any, Any


class _BaseLoader(ABC):
    @classmethod
    def from_config(
        cls,
        config_path: Union[str, Path],
        **kwargs
    ) -> "_BaseLoader":
        """Create a data loader from a yaml config file

        Args:
            config_path (str, Path): path to config file.
            **kwargs: optional keyword arguments to overload config file

        Returns:
            BaseLoader: the sbi runner specified by the config file
        """
        with open(config_path, "r") as fd:
            config = yaml.safe_load(fd)

        # optionally overload config file with kwargs
        update(config, **kwargs)

        return cls(**config)

    @abstractmethod
    def __len__(self) -> int:
        """Returns the total number of data points in the dataset

        Returns:
            int: length of dataset
        """
        return NotImplemented

    @abstractmethod
    def get_all_data(self) -> Any:
        """Returns all the loaded data

        Returns:
            Any: data
        """
        return NotImplemented

    @abstractmethod
    def get_all_parameters(self) -> Any:
        """Returns all the loaded parameters

        Returns:
            Any: parameters
        """
        return NotImplemented

    @abstractmethod
    def get_obs_data(self) -> Any:
        """Returns the observed data

        Returns:
            Any: data
        """
        return NotImplemented

    @abstractmethod
    def get_fid_parameters(self) -> Any:
        """Returns the fiducial parameters which we expect the
        observed data to resemble

        Returns:
            Any: parameters
        """
        return NotImplemented

    @abstractmethod
    def __len__(self) -> int:
        """Returns the total number of data points in the dataset

        Returns:
            int: length of dataset
        """
        return NotImplemented

    @abstractmethod
    def get_all_data(self) -> Any:
        """Returns all the loaded data

        Returns:
            Any: data
        """
        return NotImplemented

    @abstractmethod
    def get_all_parameters(self) -> Any:
        """Returns all the loaded parameters

        Returns:
            Any: parameters
        """
        return NotImplemented

    @abstractmethod
    def get_obs_data(self) -> Any:
        """Returns the observed data

        Returns:
            Any: data
        """
        return NotImplemented

    @abstractmethod
    def get_fid_parameters(self) -> Any:
        """Returns the fiducial parameters which we expect the
        observed data to resemble

        Returns:
            Any: parameters
        """
        return NotImplemented


[docs] class NumpyLoader(_BaseLoader): """A class for loading in-memory data using numpy arrays. Args: x (np.array): Array of training data of shape (Ndata, \*data.shape) theta (np.array): Array of training parameters of shape (Ndata, \*parameters.shape) xobs (Optional[np.array]): Array of observed data of shape (\*data.shape). Defaults to None. thetafid (Optional[np.array]): Array of fiducial parameters of shape (\*parameters.shape). Defaults to None. """ def __init__( self, x: np.array, theta: np.array, xobs: Optional[np.array] = None, thetafid: Optional[np.array] = None ) -> None: self.x = x self.theta = theta if len(self.x) != len(self.theta): raise Exception( "Stored data and parameters are not of same length.") self.xobs = xobs self.thetafid = thetafid def __len__(self) -> int: """Returns the total number of data points in the dataset Returns: int: length of dataset """ if self.x is None: return 0 return len(self.x)
[docs] def get_all_data(self) -> np.array: """Returns all the loaded data for training Returns: np.array: data """ return self.x
[docs] def get_all_parameters(self): """Returns all the loaded parameters for training Returns: np.array: parameters """ return self.theta
[docs] def get_obs_data(self) -> np.array: """Returns the observed data Returns: np.array: data """ return self.xobs
[docs] def get_fid_parameters(self): """Returns the fiducial parameters which we expect the observed data to resemble Returns: np.array: parameters """ return self.thetafid
[docs] class StaticNumpyLoader(NumpyLoader): """Loads single numpy files of data and parameters from disk Args: in_dir (str): path to the location of stored data x_file (str): filename of the stored training data theta_file (str): filename of the stored training parameters xobs_file (Optional[str]): filename used for observed x values thetafid_file (Optional[str]): filename used for fiducial parameters """ def __init__( self, in_dir: str, x_file: str, theta_file: str, xobs_file: Optional[str] = None, thetafid_file: Optional[str] = None ) -> None: self.in_dir = Path(in_dir) self.x_path = self.in_dir / x_file self.theta_path = self.in_dir / theta_file # Load stored data (if specified) x = np.load(self.x_path, allow_pickle=True) theta = np.load(self.theta_path, allow_pickle=True) if xobs_file is None: self.xobs_path = None xobs = None else: self.xobs_path = self.in_dir / xobs_file xobs = np.load(self.xobs_path, allow_pickle=True) if thetafid_file is None: self.thetafid_path = None thetafid = None else: self.thetafid_path = self.in_dir / thetafid_file thetafid = np.load(self.thetafid_path, allow_pickle=True) super().__init__(x=x, theta=theta, xobs=xobs, thetafid=thetafid)
[docs] class SBISimulator(NumpyLoader): """ Class to run simulations of data and parameters and save results to numpy files. Only works for sbi backend. Args: in_dir (str): path to the location of stored data xobs_file (str): filename used for observed x values num_simulations (int): number of simulations to run at each call simulator (callable): function taking the parameters as an argument and returns data. NOTE: This must take a tuple of parameters and output a torch.Tensor of shape (1, *data.shape). save_simulated (Optional[bool]): whether to save simulated data. Concatenates to previous data if True. Defaults to False. x_file (Optional[str]): filename of the stored first-round training data theta_file (Optional[str]): filename of the stored first-round training parameters thetafid_file (Optional[str]): filename used for fiducial parameters """ def __init__( self, in_dir: str, xobs_file: str, num_simulations: int, simulator: Optional[callable] = None, save_simulated: Optional[bool] = False, x_file: Optional[str] = None, theta_file: Optional[str] = None, thetafid_file: Optional[str] = None, ): self.in_dir = Path(in_dir) self.xobs_path = self.in_dir / xobs_file self.num_simulations = num_simulations self.simulator = simulator self.save_simulated = save_simulated # If save_simulated, check that x_file and theta_file are specified if save_simulated and (x_file is None or theta_file is None): raise Exception( "If save_simulated is True, x_file and theta_file must be " "specified." ) # Load stored data (if specified) xobs = np.load(self.xobs_path, allow_pickle=True) x = np.array([]) theta = np.array([]) thetafid = None if x_file is None: self.x_path = None else: self.x_path = self.in_dir / x_file if self.x_path.is_file(): x = np.load(self.x_path, allow_pickle=True) if theta_file is None: self.theta_path = None else: self.theta_path = self.in_dir / theta_file if self.theta_path.is_file(): theta = np.load(self.theta_path) if thetafid_file is None: self.thetafid_path = None else: self.thetafid_path = self.in_dir / thetafid_file thetafid = np.load(self.thetafid_path, allow_pickle=True) super().__init__(x=x, theta=theta, xobs=xobs, thetafid=thetafid)
[docs] def set_simulator(self, simulator: callable): """Set the simulator to be used in the inference Args: simulator (callable): function taking the parameters as an argument and returns data """ self.simulator = simulator
[docs] def simulate(self, proposal: Any) -> Tuple[np.array, np.array]: """Run simulations give a proposal and returns ($\theta, x$) pairs obtained from sampling the proposal and simulating. Args: proposal (Any): Distribution to sample paramaters from Returns: Tuple[np.array, np.array]: Sampled parameters $\theta$ and simulation-outputs $x$. """ theta = proposal.sample((self.num_simulations,)).cpu() x = simulate_in_batches(self.simulator, theta) # Get device returns -1 for cpu, integers for CUDA tensors if x.get_device() != -1: x = x.cpu() theta, x = theta.numpy(), x.numpy() # Save simulated data (concatenates to previous data) if len(self) == 0: self.theta, self.x = theta, x else: self.theta = np.concatenate((self.theta, theta)) self.x = np.concatenate((self.x, x)) if self.save_simulated: np.save(self.theta_path, self.theta) np.save(self.x_path, self.x) return theta, x
[docs] class SummarizerDatasetLoader(NumpyLoader): """Class to load netCF files of data and a csv of parameters Basically a wrapper for ili-summarizer's Dataset, with added functionality for loading parameters Args: in_dir (str): path to data directory stage (str): whether to load train, test or val data x_root (str): root of data files theta_file (str): parameter file name train_test_split_file (str): file name where train, test, val split idx are stored param_names (List[str]): parameters to fit xobs_file (Optional[str]): filename used for observed x values thetafid_file (Optional[str]): filename used for fiducial parameters Raises: Exception: won't work when data and parameters don't have same length """ def __init__( self, in_dir: str, stage: str, x_root: str, theta_file: str, train_test_split_file: str, param_names: List[str], xobs_file: Optional[str] = None, thetafid_file: Optional[str] = None ): self.in_dir = Path(in_dir) self.nodes = self.get_nodes_for_stage( stage=stage, train_test_split_file=train_test_split_file ) self.x = Dataset( nodes=self.nodes, path_to_data=self.in_dir, root_file=x_root, ) self.theta = self.load_parameters( param_file=theta_file, nodes=self.nodes, param_names=param_names, ) if len(self.x) != len(self.theta): raise Exception( "Stored data and parameters are not of same length.") if xobs_file is None: self.xobs_path = None self.xobs = None else: self.xobs_path = self.in_dir / xobs_file self.xobs = np.load(self.xobs_path, allow_pickle=True) if thetafid_file is None: self.thetafid_path = None self.thetafid = None else: self.thetafid_path = self.in_dir / thetafid_file self.thetafid = np.load(self.thetafid_path, allow_pickle=True) def __len__(self) -> int: """Returns the total number of data points in the dataset Returns: int: length of dataset """ return len(self.nodes)
[docs] def get_all_data(self) -> np.array: """Returns all the loaded data Returns: np.array: data """ return self.x.load().reshape((len(self), -1))
[docs] def get_nodes_for_stage( self, stage: str, train_test_split_file: str) -> List[int]: """Get nodes for a given stage (train, test or val) Args: stage (str): either train, test or val train_test_split_file (str): file where node idx for each stage are stored Returns: List[int]: list of idx for stage """ with open(self.in_dir / train_test_split_file) as f: train_test_split = json.load(f) return train_test_split[stage]
[docs] def load_parameters( self, param_file: str, nodes: List[int], param_names: List[str] ) -> np.array: """Get parameters for nodes Args: param_file (str): where to find parameters of latin hypercube nodes (List[int]): list of nodes to read param_names (List[str]): parameters to use Returns: np.array: array of parameters """ theta = pd.read_csv( self.in_dir / param_file, sep=" ", skipinitialspace=True ).iloc[nodes] return theta[param_names].values
[docs] class TorchLoader(_BaseLoader): """A class for using TorchDataloaders. Args: train_loader (DataLoader): dataloader for training outputting (data, parameters) val_loader (DataLoader): dataloader for validation outputting (data, parameters). Defaults to None. xobs (Optional[Tensor]): observed data. Defaults to None. thetafid (Optional[Tensor]): fiducial parameters. Defaults to None. """ def __init__( self, train_loader: DataLoader, val_loader: DataLoader = None, xobs: Optional[Tensor] = None, thetafid: Optional[Tensor] = None ) -> None: self.train_loader = train_loader self.val_loader = val_loader self.xobs = xobs self.thetafid = thetafid def __len__(self) -> int: """Returns the total number of data points in the dataset Returns: int: length of dataset """ return len(self.train_loader.dataset)
[docs] def get_all_data(self) -> Tensor: """Returns all the loaded data for training. May need to be redefined for complex dataloaders. Returns: Tensor: data """ return self.train_loader.dataset.tensors[0]
[docs] def get_all_parameters(self): """Returns all the loaded parameters for training. May need to be redefined for complex dataloaders. Returns: Tensor: parameters """ return self.train_loader.dataset.tensors[1]
[docs] def get_obs_data(self) -> Tensor: """Returns the observed data Returns: Tensor: data """ return self.xobs
[docs] def get_fid_parameters(self): """Returns the fiducial parameters which we expect the observed data to resemble Returns: Tensor: parameters """ return self.thetafid
# TODO: Add loaders which load dynamically from many files, so # that everything doesn't need to be stored in memory