Source code for pta.pmo

"""Construction and solution of Probabilistic Metabolic Optimization (PMO) problems."""

import logging
import multiprocessing
import os
import pickle
import tempfile
from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Union

import cobra
import cvxpy as cp
import numpy as np
import psutil
from scipy.stats.distributions import chi2

from .constants import (
    default_confidence_level,
    default_max_drg,
    default_max_threads,
    default_min_drg,
)
from .flux_space import FluxSpace
from .thermodynamic_space import ThermodynamicSpace, ThermodynamicSpaceBasis

logger = logging.getLogger(__name__)
_pmo_problem: "PmoProblem"


[docs]class PmoProblemPool: """Creation of a process pool for solving multiple PMO problems on the same model.""" def __init__(self, num_processes: Optional[int], *argv): """Initialize a process pool for solving multiple PMO problems. Parameters ---------- num_processes : Optional[int] Number of workers in the pool. argv Arguments to be passed to the constructor of :code:`PmoProblem` """ # Pick the number of processes based on the number of available CPUs. num_processes = min( num_processes or default_max_threads, psutil.cpu_count(logical=False) ) assert num_processes >= 1, "At least one process is needed." # Create a temporary file to copy the initialization arguments to the worker # processes. Helps performance on Windows. with tempfile.NamedTemporaryFile(delete=False) as args_file: pickle.dump(argv, open(args_file.name, "wb")) self._args_filename = args_file.name # Create the process pool, initializing a PmoProblem on each worker. self._pool = multiprocessing.Pool( num_processes, initializer=PmoProblemPool._init_process, initargs=[self._args_filename], )
[docs] def map( self, fn: Callable[["PmoProblem", Any], Any], inputs: Iterable[Any] ) -> List[Any]: """Execute a function on each input element. This is the same as the regular :code:`map` function, except that it executes in parallel on the available workers. Parameters ---------- fn : Callable[[PmoProblem, Any], Any] Function to be applied to each element. inputs : Iterable[Any] An iterable containing the input elements. Returns ------- List[Any] A list containing the result of applying the function to each input element. """ inputs_with_fn = [(fn, i) for i in inputs] return self._pool.map(PmoProblemPool._pool_task, inputs_with_fn)
[docs] def close(self): """Wait for all jobs to be done and closes the pool.""" self._pool.close() self._pool.join() if os.path.isfile(self._args_filename): os.remove(self._args_filename)
@staticmethod def _init_process(args_filename): global _pmo_problem # pylint: disable=global-statement argv = pickle.load(open(args_filename, "rb")) _pmo_problem = PmoProblem(*argv) @staticmethod def _pool_task(v: Tuple[Callable[["PmoProblem", Any], Any], Any]): global _pmo_problem # pylint: disable=global-statement return v[0](_pmo_problem, v[1])
[docs]class PmoProblem: """Construction and solution of a PMO problem. Parameters ---------- network : Union[cobra.Model, FluxSpace] Cobra model or `FluxSpace` object describing the flux space of the metabolic network. thermodynamic_space : ThermodynamicSpace Description of the thermodynamic space of the metabolic network. thermodynamic_space_basis : ThermodynamicSpaceBasis, optional A basis for the thermodynamic space. If specified, `m` will be defined in this basis. objective : Callable[ [PmoProblem], cp.problems.objective.Objective], optional A function used to set the optimization objective. By default the probability of in thermodynamic space is maximized. confidence_level : float, optional Confidence level (in the range :math:`[0.0, 1.0[`) on the joint of the thermodynamic variables, by default 0.95. min_drg : float, optional Minimum magnitude for the reaction energy of each reaction, by default 1e-1. max_drg : float, optional Maximum magnitude for the reaction energy of each reaction, by default 1000. solver : Optional[str], optional Name of the solver to use, this can be any of the solvers supported by CVXPY, by default None. solver_options : dict, optional Dictionary specifying additional options for the solver. """ _REFERENCE_COEFFICIENT = 1e2 _MAX_COEFFICIENT_RANGE = 5e8 _DEFAULT_SOLVER_OPTIONS = { cp.GUROBI: {"IntFeasTol": 1e-9, "FeasibilityTol": 1e-9, "BarQCPConvTol": 1e-8} } def __init__( self, network: Union[cobra.Model, FluxSpace], thermodynamic_space: ThermodynamicSpace, thermodynamic_space_basis: ThermodynamicSpaceBasis = None, objective: Callable[["PmoProblem"], cp.problems.objective.Objective] = None, confidence_level: float = default_confidence_level, min_drg: float = default_min_drg, max_drg: float = default_max_drg, solver: Optional[str] = None, solver_options: dict = None, ): # Create the flux space if needed. if isinstance(network, FluxSpace): self._F = network.copy() else: self._F = FluxSpace.from_cobrapy_model(network) # Create the basis of the thermodynamic space if needed. self._T = thermodynamic_space if thermodynamic_space_basis is not None: self._B = thermodynamic_space_basis else: self._B = ThermodynamicSpaceBasis(thermodynamic_space) assert self.B.to_drg is not None, ( "The thermodynamic space basis " "must have explicit reaction energies for PMO." ) # Find mapping from reactions in F to reactions in T (T only covers a subset of # the reactions in F). self._rxn_idxs_F_to_T = [ self.F.reaction_ids.index(id) for id in self.T.reaction_ids ] # Scale the flux space to make the problem numerically nicer. self._flux_scale = self._scale_flux_space(self.F) # Store numerical settings. self._confidence_level = confidence_level self._min_drg = min_drg self._max_drg = max_drg # Initialize solver settings. self.solver = solver self.solver_options = solver_options or {} # Flux, thermodynamic and direction variables. self._vs = cp.Variable((self.F.n_reactions, 1), "fluxes") self._m = cp.Variable((self.B.dimensionality, 1), "thermo_basis") if np.all( np.logical_or( self.F.lb[self._rxn_idxs_F_to_T] >= 0, self.F.ub[self._rxn_idxs_F_to_T] <= 0, ) ): # If flux bounds already fix all the directions we do not need integer # variables. self._fixed_directions = True self._d = cp.Variable((len(self.T.reaction_ids), 1), "directions") else: self._fixed_directions = False self._d = cp.Variable( (len(self.T.reaction_ids), 1), "directions", boolean=True ) # If no objective is specified, maximize the probability of the # thermodynamic variables. self._objective = objective or ( lambda p: cp.Minimize( cp.atoms.quad_form(p.m, np.identity(p.B.dimensionality)) ) ) self._make_big_M_coefficients() self._make_constraints() self._rebuild_cvxpy_problem() @property
[docs] def confidence_level(self): """Gets the confidence level on the thermodynamic space.""" return self._confidence_level
@property
[docs] def min_drg(self) -> float: """Gets the minimum absolute value of DrG allowed in the model.""" return self._min_drg
@property
[docs] def max_drg(self) -> float: """Gets the maximum absolute value of DrG in the model.""" return self._max_drg
@property
[docs] def flux_scale(self) -> np.ndarray: """Gets the vector of scaling factors for the scaled fluxes.""" return self._flux_scale
@property
[docs] def objective( self, ) -> Optional[Callable[["PmoProblem"], cp.problems.objective.Objective]]: """Gets the function that constructs the PMO objective.""" return self._objective
@objective.setter def objective( self, value: Callable[["PmoProblem"], cp.problems.objective.Objective] ): """Sets the function that constructs the PMO objective.""" self._objective = value self._rebuild_cvxpy_problem() @property
[docs] def vs(self) -> cp.Variable: """Gets the CVXPY variable representing scaled reaction fluxes.""" return self._vs
@property
[docs] def m(self) -> cp.Variable: """Gets the CVXPY variable representing the thermodynamic space in the minimal basis.""" return self._m
@property
[docs] def d(self) -> cp.Variable: """Gets the CVXPY variable representing reaction directions.""" return self._d
@property
[docs] def v(self) -> np.array: """Gets the predicted reaction fluxes.""" return self._vs.value / self.flux_scale
@property
[docs] def log_c(self) -> np.ndarray: """Gets the predicted log-concentrations. Can be none if concentrations are not represented explicitely. """ return self.B.to_log_conc(self.m.value)
@property
[docs] def drg0(self) -> np.ndarray: """Gets the predicted standard reaction energies. Can be none if standard reaction energies are not represented explicitely. """ return self.B.to_drg0(self.m.value)
@property
[docs] def drg(self) -> np.ndarray: """Gets the predicted reaction energies. Can be none if reaction energies are not represented explicitely. """ return self.B.to_drg(self.m.value)
@property
[docs] def solver(self) -> Optional[str]: """Gets the selected solver.""" return self._solver
@solver.setter def solver(self, value: Optional[str]): """Sets the selected solver.""" self._solver = value @property
[docs] def solver_options(self) -> Dict[str, Any]: """Gets the solver options.""" return self._solver_options
@solver_options.setter def solver_options(self, value: Dict[str, Any]): """Sets the solver options.""" self._solver_options = value @property
[docs] def F(self) -> FluxSpace: """Gets the flux space associated with the PMO problem.""" return self._F
@property
[docs] def T(self) -> ThermodynamicSpace: """Gets the thermodynamic space associated with the PMO problem.""" return self._T
@property
[docs] def B(self) -> ThermodynamicSpaceBasis: """Gets the thermodynamic space basis associated with the PMO problem.""" return self._B
@property
[docs] def flux_lb_constraint(self) -> cp.constraints.constraint.Constraint: """Gets the lower bound constraint for fluxes.""" return self._flux_lb_constraint
@property
[docs] def flux_ub_constraint(self) -> cp.constraints.constraint.Constraint: """Gets the upper bound constraint for fluxes.""" return self._flux_ub_constraint
@property
[docs] def steady_state_constraint(self) -> cp.constraints.constraint.Constraint: """Gets the steady state constraint.""" return self._steady_state_constraint
@property
[docs] def confidence_constraint(self) -> cp.constraints.constraint.Constraint: """Gets the constraint for the selected confidence level.""" return self._m_constraint
@property
[docs] def sign_constraints(self) -> cp.constraints.constraint.Constraint: """Gets the reaction direction constraints. The constraints at indices 0 and 1 are constraints on the reaction energies, while 2 and 3 are constraints on the fluxes.""" return self._sign_constraints
[docs] def solve(self, verbose=False) -> str: """Solves the PMO problem. The result of the optimization is stored inside the class. Parameters ---------- verbose : bool, optional True is the solver log should be printed to the console, false otherwise. By default False. Returns ------- str The CVXPY result of the optimization. """ # Get the default solver-specific options and add the user options. options = self._DEFAULT_SOLVER_OPTIONS.get(self.solver, {}) options = {**options, **self.solver_options} # Solve the optimization problem. self._problem.solve(solver=self.solver, verbose=verbose, **options) if self._problem.status in ["optimal", "optimal_inaccurate"]: assert np.all( np.sign(self.d.value - 0.5) != np.sign(self.drg) ), "The directions and the free energies in the solution are inconsistent." assert np.all( np.sign(self.d.value - 0.5) == np.sign(self.v[self._rxn_idxs_F_to_T]) ), "The directions and the fluxes in the solution are inconsistent." return self._problem.status
[docs] def rebuild_for_directions(self, directions: np.ndarray) -> "PmoProblem": """Construct a copy of this PMO problem constrained to the given reaction directions. Parameters ---------- directions : np.ndarray Vector containing the directions (0: backward, 1: forward) of the reactions in T. Returns ------- PmoProblem A copy of this problem, constrained to the given directions. """ reaction_idxs = [self.F.reaction_ids.index(id) for id in self.T.reaction_ids] new_lb = self.F.lb[ reaction_idxs, ] new_ub = self.F.ub[ reaction_idxs, ] new_lb[directions > 0.5,] = np.maximum( new_lb[ directions > 0.5, ], 0, ) new_ub[directions < 0.5,] = np.minimum( new_ub[ directions < 0.5, ], 0, ) constrained_F = self.F.copy() constrained_F.lb[reaction_idxs] = new_lb constrained_F.ub[reaction_idxs] = new_ub return PmoProblem( constrained_F, self.T, self.B, self.objective, self.confidence_level, self.min_drg, self.max_drg, self.solver, self.solver_options,
) def _scale_flux_space(self, F: FluxSpace) -> np.ndarray: """Rescale the flux space in order to reduce the range of coefficients in S, lb and ub. """ reference = np.maximum(np.max(np.abs(F.lb)), np.max(np.abs(F.ub))) max_absolute_flux = np.maximum(np.abs(F.lb), np.abs(F.ub)) # Limit the maximum scaling factor, otherwise we may obtain bad range of # coefficients in S. This is a heuristic, and we should pick the value # with a better method. col_scale = np.minimum( reference / max_absolute_flux, self._REFERENCE_COEFFICIENT ) F.lb = F.lb * col_scale F.ub = F.ub * col_scale F.S = F.S @ np.diag(1 / col_scale.ravel()) row_scale = np.max(np.abs(F.S), axis=1) if any(row_scale == 0): zero_row_mets = [ F.metabolite_ids[i] for i in np.where(row_scale == 0)[0].tolist() ] raise Exception( "One row of the stoichiometric matrix is zero, " "Please make sure that the model does not contain orphan " "metabolites. The metabolites corresponding to zero rows are " f"{','.join(zero_row_mets)}." ) F.S = np.diag(reference / row_scale) @ F.S return col_scale def _make_big_M_coefficients(self): """Creates the Big-M coefficients for flux and reaction energies based. We must pick those such that they allow a reasonably large range of coefficients but without creating numerical challenges for the solver. """ ones = np.ones((len(self.T.reaction_ids), 1)) self._big_M_r = self.max_drg * ones self._big_M_f = ( 1.01 * np.maximum(np.abs(self.F.lb), np.abs(self.F.ub))[self._rxn_idxs_F_to_T, :] ) self._epsilon_r = self.min_drg * ones self._epsilon_f = np.maximum( self._big_M_f / self._MAX_COEFFICIENT_RANGE, np.max(self._big_M_f) / self._MAX_COEFFICIENT_RANGE, ) all_bounds = np.abs(np.vstack((self.F.lb, self.F.ub))) min_flux_bound = np.min(all_bounds[np.nonzero(all_bounds)]) if min_flux_bound < np.max(self._big_M_f) / self._MAX_COEFFICIENT_RANGE * 10: logger.warning( "The coefficient range of the flux bounds " f"({np.max(self._big_M_f) / min_flux_bound}) is close or " f"larger than {self._MAX_COEFFICIENT_RANGE} " "and may lead to numerical errors. It is recommended to " "simplify the model to reduce the range." ) def _make_constraints(self): """Create the CVXPY constraints of the problem.""" # Flux constraints. self._flux_lb_constraint = self.vs >= self.F.lb self._flux_ub_constraint = self.vs <= self.F.ub self._steady_state_constraint = self.F.S @ self.vs == np.zeros( (self.F.n_metabolites, 1) ) # Mappings between fluxes and constrained fluxes. v_to_v_con = np.identity(self.F.n_reactions)[self._rxn_idxs_F_to_T, :] # Sign relations. assert self.B.to_drg_transform is not None (T, s) = self.B.to_drg_transform self._sign_constraints = [ cp.multiply(self._big_M_r, self.d) + T @ self.m + s <= self._big_M_r - self._epsilon_r, cp.multiply(self._big_M_r, self.d) + T @ self.m + s >= self._epsilon_r, cp.multiply(self._big_M_f, self.d) - v_to_v_con @ self.vs <= self._big_M_f - self._epsilon_f, cp.multiply(self._big_M_f, self.d) - v_to_v_con @ self.vs >= self._epsilon_f, ] if self._fixed_directions: directions = np.zeros((len(self.T.reaction_ids), 1)) directions[ self.F.lb[ self._rxn_idxs_F_to_T, ] >= 0 ] = 1 self._sign_constraints += [self.d == directions] # Assemble in a single set of constraints. self._constraints = [ self._flux_lb_constraint, self._flux_ub_constraint, self._steady_state_constraint, ] + self._sign_constraints # Thermodynamic space constraints. if self.confidence_level < 1.0: CI_square = chi2.ppf(self._confidence_level, self.B.dimensionality) self._m_constraint = ( cp.atoms.quad_form(self.m, np.identity(self.B.dimensionality)) <= CI_square ) self._constraints += [self._m_constraint] def _rebuild_cvxpy_problem(self): """Rebuild the CVXPY representation of the PMO problem.""" self._problem = cp.Problem(self.objective(self), self._constraints)