Source code for micom.solution

"""A community solution object."""

import numpy as np
import pandas as pd
from collections import Counter
from optlang.interface import (
from optlang.symbolics import Zero
from itertools import chain
from functools import partial
from cobra.exceptions import OptimizationError
from cobra.core import Solution
from cobra.util import interface_to_str, get_context
from micom.logger import logger
from micom.util import reset_min_community_growth, _apply_min_growth
from swiglpk import glp_adv_basis

"""Solver states that permit returning the solution."""
[docs]def _group_taxa(values, ids, taxa, what="reaction"): """Format a list of values by id and taxa.""" df = pd.DataFrame({ values, what: ids, "compartment": taxa}) df = df.pivot(index="compartment", columns=what, = return df
[docs]class CommunitySolution(Solution): """An FBA solution for an entire community. Attributes ---------- objective_value : float The (optimal) value for the objective function. members : pandas.Series Contains basic info about the individual compartments/members of the community such as id, abundance and growth rates. Will also include one row for the external medium (without abundance and growth rate). growth_rate : float The overall growth rate for the community normalized to 1 gDW. status : str The solver status related to the solution. strategy : str The optimization strategy used to obtain the solution (may be empty). fluxes : pandas.DataFrame Contains the reaction fluxes (primal values of variables) stratified by compartment. Columns denote individual fluxes and rows denote compartments: one for every taxon plus one for the external medium. Fluxes will be NA if the reaction does not exist in the organism. reduced_costs : pandas.Series Contains reaction reduced costs (dual values of variables) stratified by taxa. Columns denote individual fluxes and rows denote taxa. Reduced costs will be NA if the reaction does not exist in the organism. shadow_prices : pandas.Series Contains metabolite shadow prices (dual values of constraints) stratified by taxa. Columns denote individual metabolites and rows denote taxa. Shadow prices will be NA if the metabolite does not exist in the organism. """ def __init__(self, community, slim=False, reactions=None, metabolites=None): """Get the solution from a community model.""" if reactions is None: reactions = community.reactions if metabolites is None: metabolites = community.metabolites rids = np.array([(r.global_id, r.community_id) for r in reactions]) mids = np.array([(m.global_id, m.community_id) for m in metabolites]) if not slim: var_primals = community.solver.primal_values fluxes = pd.Series( [var_primals[] - var_primals[r.reverse_id] for r in reactions], name="fluxes", ) super(CommunitySolution, self).__init__( community.solver.objective.value, community.solver.status, _group_taxa(fluxes, rids[:, 0], rids[:, 1]), None, None, ) else: super(CommunitySolution, self).__init__( community.solver.objective.value, community.solver.status, None, None, None, ) gcs = pd.Series(dtype="float64") for sp in community.taxa: gcs[sp] = community.constraints["objective_" + sp].primal # Workaround for an optlang bug (PR #120) if interface_to_str(community.problem) == "gurobi": gcs = gcs.abs() self.strategy = community.modification self.members = pd.DataFrame( { "abundance": community.abundances, "growth_rate": gcs, "reactions": pd.Series(Counter(rids[:, 1])), "metabolites": pd.Series(Counter(mids[:, 1])), } ) = "compartments" self.growth_rate = sum(community.abundances * gcs) # Save estimated accuracy for osqp if interface_to_str(community.problem) == "osqp": self.primal_residual = self.dual_residual =
[docs] def _repr_html_(self): if self.status in good: with pd.option_context("display.max_rows", 10): html = ( "<strong>community growth:</strong> {:.3f}" "<br><strong>status:</strong> {}" "<br><strong>taxa:</strong>{}".format( sum( self.members.abundance.dropna() * self.members.growth_rate.dropna() ), self.status, self.members._repr_html_(), ) ) else: html = "<strong>{}</strong> solution :(".format(self.status) return html
[docs] def __repr__(self): """Convert CommunitySolution instance to string representation.""" if self.status not in good: return "<CommunitySolution {0:s} at 0x{1:x}>".format(self.status, id(self)) return "<CommunitySolution {0:.3f} at 0x{1:x}>".format( self.growth_rate, id(self) )
[docs]def add_pfba_objective(community, atol=1e-6, rtol=1e-6): """Add pFBA objective. Add objective to minimize the summed flux of all reactions to the current objective. This one will work with any objective (even non-linear ones). See Also -------- pfba Parameters ---------- community : micom.Community The community to add the objective to. """ # Fix all growth rates rates = { sp: community.constraints["objective_" + sp].primal for sp in community.taxa } _apply_min_growth(community, rates, atol, rtol) if == "_pfba_objective": raise ValueError("model already has pfba objective") reaction_variables = ( (rxn.forward_variable, rxn.reverse_variable) for rxn in community.reactions ) variables = chain(*reaction_variables) community.objective = Zero community.objective_direction = "min" community.objective.set_linear_coefficients(dict.fromkeys(variables, 1.0)) if interface_to_str(community.solver.interface) == "osqp": community.objective += 1e-6 * community.variables.community_objective ** 2 if community.modification is None: community.modification = "pFBA" else: community.modification += " and pFBA"
[docs]def solve(community, fluxes=True, pfba=True, raise_error=False, atol=1e-6, rtol=1e-6): """Get all fluxes stratified by taxa.""" solver_name = interface_to_str(community.solver.interface) term = None if solver_name == "osqp" and community.objective.is_Linear: # This improves OSQP by soooo much term = ( 1e-6 * community.solver.problem.direction * community.variables.community_objective ** 2 ) community.objective += term community.solver.optimize() status = community.solver.status if status in good: if status != OPTIMAL: if raise_error: raise OptimizationError("solver returned the status %s." % status) else: "solver returned the status %s," % status + " returning the solution anyway." ) if pfba and fluxes: add_pfba_objective(community, atol, rtol) community.solver.optimize() if fluxes: sol = CommunitySolution(community) else: sol = CommunitySolution(community, slim=True) if term: correction = 1e-6 * community.variables.community_objective.primal ** 2 sol.objective_value -= community.solver.problem.direction * correction community.objective -= term return sol logger.warning("solver encountered an error %s" % status) return None
[docs]def reset_solver(community): """Reset the solver.""" interface = interface_to_str(community.solver.interface)"resetting solver, hoping for the best.") if interface == "cplex": community.solver.configuration.lp_method = "network" community.solver.configuration.lp_method = "barrier" elif interface == "gurobi": community.solver.problem.reset() elif interface == "glpk": glp_adv_basis(community.solver.problem, 0) elif interface == "osqp": community.solver.problem.reset()
[docs]def optimize_with_retry(com, message="could not get optimum."): """Try to reset the solver.""" sol = com.optimize() if sol is None: reset_solver(com) sol = com.optimize() if sol is None: raise OptimizationError(message) else: return sol.objective_value
[docs]def crossover(community, sol, fluxes=False): """Get the crossover solution.""" gcs = sol.members.growth_rate.drop("medium") com_growth = sol.growth_rate"Starting crossover...") with community as com:"constraining growth rates.") context = get_context(community) if context is not None: context(partial(reset_min_community_growth, com)) reset_min_community_growth(com) = 0.0 com.variables.community_objective.ub = com_growth + 1e-6 com.objective = com.scale * com.variables.community_objective for sp in com.taxa: const = com.constraints["objective_" + sp] const.ub = gcs[sp]"finding closest feasible solution") s = com.optimize() if s is None: reset_solver(com) s = com.optimize() if s is not None: s = CommunitySolution(com, slim=not fluxes) for sp in com.taxa: com.constraints["objective_" + sp].ub = None if s is None: raise OptimizationError( "crossover could not converge (status = %s)." % community.solver.status ) s.objective_value /= com.scale return s
[docs]def optimize_with_fraction(com, fraction, growth_rate=None, fluxes=False): """Optimize with a constrained community growth rate.""" = 0 com.variables.community_objective.ub = None if growth_rate is None: with com: com.objective = com.scale * com.variables.community_objective growth_rate = optimize_with_retry( com, message="could not get community growth rate." ) growth_rate /= com.scale = fraction * growth_rate com.variables.community_objective.ub = growth_rate sol = com.optimize() if sol.status != OPTIMAL: sol = crossover(com, sol, fluxes=fluxes) else: sol = CommunitySolution(com, slim=not fluxes) return sol