Source code for pta.sampling.commons

"""Common classes and methods for sampling algorithms."""

import abc
import datetime
import math
from typing import Any, Callable

import _pta_python_binaries as pb
import numpy as np
import pandas as pd
import psutil

from ..constants import (
    default_max_psrf,
    default_max_threads,
    default_min_chains,
    min_samples_per_chain,
)


[docs]class SamplingException(Exception): """Raised when an exception occurs during sampling."""
[docs]class SamplingResult: """Encapsulates the result of a sampler. Parameters ---------- samples : pd.DataFrame Data frame containing the samples as rows. psrf : pd.Series Series containing the PSRF of each variable. basis_samples : pd.DataFrame, optional The samples in the basis. chains : np.ndarray, optional The simulated chains. """ def __init__( self, samples: pd.DataFrame, psrf: pd.Series, basis_samples: pd.DataFrame = None, chains: np.ndarray = None, ): self._samples = samples self._psrf = psrf self._basis_samples = basis_samples self._chains = chains @property
[docs] def basis_samples(self) -> pd.DataFrame: """Gets a data frame containing the samples in the basis.""" return self._basis_samples
@property
[docs] def samples(self) -> pd.DataFrame: """Gets a data frame containing the samples.""" return self._samples
@property
[docs] def chains(self) -> np.ndarray: """Get the simulated chains.""" return self._chains
@property
[docs] def psrf(self) -> pd.Series: """Gets the Potential Scale Reduction Factor (PSRF) of each variable.""" return self._psrf
[docs] def check_convergence(self, max_psrf: float = default_max_psrf) -> bool: """Checks whether the chains converged according to the given criteria. Parameters ---------- max_psrf : float, optional Maximum PSRF value for convergence. Returns ------- bool True if the test succeeded, false otherwise. """ return np.all(self.psrf <= max_psrf)
[docs]class SamplerInterface(metaclass=abc.ABCMeta): """Interface for an MCMC sampler.""" @property @abc.abstractmethod
[docs] def dimensionality(self) -> int: """Get the dimensionality of the sampling space."""
@abc.abstractmethod
[docs] def simulate( self, settings: pb.SamplerSettings, initial_points: np.ndarray, directions_transform: np.ndarray, ) -> Any: """Run the sampler with the given parameters. Parameters ---------- settings : pb.SamplerSettings Sampling settings. initial_points : np.ndarray The initial points for the chains. directions_transform : np.ndarray The transform for the directions sampler. """
@abc.abstractmethod
[docs] def compute_psrf(self, result: Any) -> pd.Series: """Compute the potential scale reduction factors for the variables of interest on a given set of chains. Parameters ---------- result : Any The result of the sampling function. Returns ------- pd.Series The computed potential scale reduction factors. """
@abc.abstractmethod
[docs] def get_chains(self, result: Any) -> np.ndarray: """Extract the simulated chains from a given result. Parameters ---------- result : Any The result of the native sampling function. Returns ------- np.ndarray The simulated chains. """
[docs]def sample_from_chains(chains: np.ndarray, num_samples: int) -> np.ndarray: """Draws samples from a given set of chains. Parameters ---------- chains : np.ndarray Numpy 3D array containing the chains. num_samples : int Number of samples to draw. Returns ------- np.ndarray Array of samples. """ samples = np.hstack(chains) indices = np.random.choice(samples.shape[1], num_samples, False) return samples[:, indices].T
[docs]def split_chains(chains: np.ndarray) -> np.ndarray: """Split the chains in two, threating each half as a new chain. This is often used to detect systematic trends in a random walk. Parameters ---------- chains : np.ndarray Numpy 3D array containing the input chains. Returns ------- np.ndarray Numpy 3D array containing the resulting chains. """ half_chain_idx = math.floor(chains.shape[0] / 2) return np.dstack( [ chains[:half_chain_idx, :, :], chains[half_chain_idx : 2 * half_chain_idx, :, :],
] )
[docs]def apply_to_chains( chains: np.ndarray, f: Callable[[np.ndarray], np.ndarray] ) -> np.ndarray: """Apply a function to each sample of a group of chains. Parameters ---------- chains : np.ndarray Numpy 3D array containing the input chains. f : Callable[[np.ndarray], np.ndarray] Function to apply to each sample. Returns ------- np.ndarray Numpy 3D array containing the transformed samples. """ n_chains = chains.shape[2] transformed_chains = [] for i in range(n_chains): transformed_chains.append(f(chains[:, :, i].T).T) return np.dstack(transformed_chains)
[docs]def split_R(chains: np.ndarray) -> np.ndarray: """Compute the split-R (or Potential Scale Reduction Factor) of each variable in the given chains. Parameters ---------- chains : np.ndarray Numpy 3D array containing the input chains. Returns ------- np.ndarray Vector containing the split-R value for each variable. """ EPSILON = 1e8 chains = split_chains(chains) n_steps, n_params, n_chains = chains.shape R = np.empty(n_params) # Iterate over parameters. for i in range(n_params): chain_means = np.mean(chains[:, i, :], axis=0, keepdims=True) sample_mean = np.mean(chain_means) sample_variance = ( 1 / (n_steps) * np.sum((chains[:, i, :] - chain_means) ** 2, axis=0) ) # Compute between-chains variance. B = n_steps / (n_chains - 1) * np.sum((chain_means - sample_mean) ** 2) # Compute within-chain variance. W = 1 / n_chains * np.sum(sample_variance) if ( np.all(np.abs(sample_variance / np.abs(chain_means) + EPSILON) < EPSILON) and np.abs(B / np.abs(sample_mean) + EPSILON) < EPSILON ): # If the variance-to-mean ratios within each chain and between all chains # are very low, we can assume that the variable is practically constant. Set # R to one in order to avoid numerical artifacts. R[i] = 1 else: # Compute pooled variance. V = (n_steps - 1) / n_steps * W + 1 / n_steps * B # Compute potential scale reduction factor estimate. R[i] = np.sqrt(V / W) return R
[docs]def fill_common_sampling_settings( settings: pb.SamplerSettings, num_samples: int, num_steps: int, num_chains: int = -1, num_warmup_steps: int = -1, max_threads: int = default_max_threads, ): """Fills default values for common sampling settings. Parameters ---------- settings : SamplerSettings The settings object to be filled. log_directory : str Directory in which the sampling logs should be stored. num_samples : int Number of samples to draw. num_steps : int Number of steps to take with each chain. num_chains : int, optional Number of chains to use. Will be set to the number of CPUs by default. num_warmup_steps : int, optional Number of burn-in steps to discard. Will be set to half the number of steps by default. log_interval : datetime.timedelta, optional Interval between each logging event. Raises ------ ValueError If the inputs are inconsistent. """ if num_samples < 0: raise ValueError("The number of samples must be positive") if num_chains == 0: raise ValueError("Sampling requires at least one chain") settings.max_threads = min(psutil.cpu_count(logical=False), max_threads) # If the number of chains is not specified, use at least as many as the number of # CPUs. if num_chains < 0: num_chains = max(settings.max_threads, default_min_chains) # If the number of warmup steps is not specified, set it to half the number of # steps. min_steps_per_chain = math.ceil(num_samples / num_chains) if num_warmup_steps < 0: num_warmup_steps = math.ceil(num_steps / 2) if num_warmup_steps + min_steps_per_chain > num_steps: raise ValueError( "The chosen number of steps is insufficient for the chosen number of " "samples" ) # Compute the steps thinning and adjust the total steps count if needed. samples_per_chain = max(math.ceil(num_samples / num_chains), min_samples_per_chain) steps_thinning = math.ceil((num_steps - num_warmup_steps) / samples_per_chain) num_steps = num_warmup_steps + samples_per_chain * steps_thinning settings.num_steps = num_steps settings.num_chains = num_chains settings.steps_thinning = steps_thinning settings.num_skipped_steps = num_warmup_steps settings.log_interval = datetime.timedelta(seconds=1) settings.log_directory = ""