"""A community solution object."""
import numpy as np
import pandas as pd
from collections import Counter
from optlang.interface import (
OPTIMAL,
NUMERIC,
FEASIBLE,
SUBOPTIMAL,
ITERATION_LIMIT,
)
from optlang.symbolics import Zero
from itertools import chain
from functools import partial
from cobra.exceptions import OptimizationError
from cobra.core import Solution
from cobra.util import interface_to_str, get_context
from micom.logger import logger
from micom.util import reset_min_community_growth, _apply_min_growth
from swiglpk import glp_adv_basis
[docs]
good = [OPTIMAL, NUMERIC, FEASIBLE, SUBOPTIMAL, ITERATION_LIMIT]
"""Solver states that permit returning the solution."""
[docs]
DIRECTION = pd.Series(["import", "export"], index=[0, 1])
[docs]
def _group_taxa(values, ids, taxa, what="reaction"):
"""Format a list of values by id and taxa."""
df = pd.DataFrame({values.name: values, what: ids, "compartment": taxa})
df = df.pivot(index="compartment", columns=what, values=values.name)
df.name = values.name
return df
[docs]
def add_pfba_objective(community, atol=1e-6, rtol=1e-6):
"""Add pFBA objective.
Add objective to minimize the summed flux of all reactions to the
current objective. This one will work with any objective (even non-linear
ones).
See Also
--------
pfba
Parameters
----------
community : micom.Community
The community to add the objective to.
"""
# Fix all growth rates
rates = {
sp: community.constraints["objective_" + sp].primal for sp in community.taxa
}
_apply_min_growth(community, rates, atol, rtol)
if community.solver.objective.name == "_pfba_objective":
raise ValueError("model already has pfba objective")
reaction_variables = (
(rxn.forward_variable, rxn.reverse_variable) for rxn in community.reactions
)
variables = chain(*reaction_variables)
community.objective = Zero
community.objective_direction = "min"
community.objective.set_linear_coefficients(dict.fromkeys(variables, 1.0))
if community.modification is None:
community.modification = "pFBA"
else:
community.modification += " and pFBA"
[docs]
def solve(community, fluxes=True, pfba=True, raise_error=False, atol=1e-6, rtol=1e-6):
"""Get all fluxes stratified by taxa."""
community.solver.optimize()
status = community.solver.status
if status in good:
if status != OPTIMAL:
if raise_error:
raise OptimizationError("solver returned the status %s." % status)
else:
logger.info(
"solver returned the status %s," % status
+ " returning the solution anyway."
)
if pfba and fluxes:
add_pfba_objective(community, atol, rtol)
community.solver.optimize()
if fluxes:
sol = CommunitySolution(community)
else:
sol = CommunitySolution(community, slim=True)
return sol
logger.warning("solver encountered an error %s" % status)
return None
[docs]
def reset_solver(community):
"""Reset the solver."""
interface = interface_to_str(community.solver.interface)
logger.info("resetting solver, hoping for the best.")
if interface == "cplex":
community.solver.configuration.lp_method = "network"
community.solver.configuration.lp_method = "barrier"
elif interface == "gurobi":
community.solver.problem.reset()
elif interface == "glpk":
glp_adv_basis(community.solver.problem, 0)
elif interface == "hybrid":
community.solver.problem.reset()
[docs]
def optimize_with_retry(com, message="could not get optimum."):
"""Try to reset the solver."""
sol = com.optimize()
if sol is None:
reset_solver(com)
sol = com.optimize()
if sol is None:
raise OptimizationError(message)
else:
return sol.objective_value
[docs]
def crossover(community, sol, fluxes=False):
"""Get the crossover solution."""
gcs = sol.members.growth_rate.drop("medium")
com_growth = sol.growth_rate
logger.info("Starting crossover...")
with community as com:
logger.info("constraining growth rates.")
context = get_context(community)
if context is not None:
context(partial(reset_min_community_growth, com))
reset_min_community_growth(com)
com.variables.community_objective.lb = 0.0
com.variables.community_objective.ub = com_growth + 1e-6
com.objective = com.scale * com.variables.community_objective
for sp in com.taxa:
const = com.constraints["objective_" + sp]
const.ub = max(const.lb, gcs[sp])
logger.info("finding closest feasible solution")
s = com.optimize()
if s is None:
reset_solver(com)
s = com.optimize()
if s is not None:
s = CommunitySolution(com, slim=not fluxes)
for sp in com.taxa:
com.constraints["objective_" + sp].ub = None
if s is None:
raise OptimizationError(
"crossover could not converge (status = %s)." % community.solver.status
)
s.objective_value /= com.scale
return s
[docs]
def optimize_with_fraction(com, fraction, growth_rate=None, fluxes=False):
"""Optimize with a constrained community growth rate."""
com.variables.community_objective.lb = 0
com.variables.community_objective.ub = None
if growth_rate is None:
with com:
com.objective = com.scale * com.variables.community_objective
growth_rate = optimize_with_retry(
com, message="could not get community growth rate."
)
growth_rate /= com.scale
com.variables.community_objective.lb = fraction * growth_rate
com.variables.community_objective.ub = growth_rate
sol = com.optimize()
if sol.status != OPTIMAL:
sol = crossover(com, sol, fluxes=fluxes)
else:
sol = CommunitySolution(com, slim=not fluxes)
return sol