diff --git a/modelseedpy/core/exceptions.py b/modelseedpy/core/exceptions.py index 572355b8..32bea6dd 100644 --- a/modelseedpy/core/exceptions.py +++ b/modelseedpy/core/exceptions.py @@ -11,4 +11,8 @@ class PackageError(Exception): class GapfillingError(Exception): """Error in model gapfilling""" - pass \ No newline at end of file + pass + +class ObjectError(Exception): + """Error in the construction of a base KBase object""" + pass diff --git a/modelseedpy/core/gapfillinghelper.py b/modelseedpy/core/gapfillinghelper.py index 50c9cd29..3af8fb56 100644 --- a/modelseedpy/core/gapfillinghelper.py +++ b/modelseedpy/core/gapfillinghelper.py @@ -1,57 +1,28 @@ import logging import re -import copy - +import copy # !!! import never used +from warnings import warn from cobra.core.dictlist import DictList - -from optlang.symbolics import Zero, add -from cobra.core import Gene, Metabolite, Model, Reaction +from optlang.symbolics import Zero, add # !!! Add is never used +from cobra.core import Gene, Metabolite, Model, Reaction # !!! Gene is never used from cobrakbase.core.kbaseobject import AttrDict from cobrakbase.annotation_ontology_api.annotation_ontology_apiServiceClient import annotation_ontology_api -from numpy.f2py.cfuncs import f90modhooks +from numpy.f2py.cfuncs import f90modhooks # !!! import never used +from modelseedpy.core.exceptions import ObjectError, FeasibilityError # !!! FeasibilityError is never used logger = logging.getLogger(__name__) +def build_id(string): + if any([string.startswith(x) for x in ["M_", "M-", "R-", "R_"]]): + string = string[2:] + string_fix = string.replace('-', '__DASH__') + if not string == string_fix: + logger.debug('[Species] rename: [%s] -> [%s]', string, string_fix) + if any([string.startswith(x) for x in ["R-", "R_"]]): + return string_fix + return string - - - - -def build_cpd_id(str): - if str.startswith("M_"): - str = str[2:] - elif str.startswith("M-"): - str = str[2:] - str_fix = str - if '-' in str_fix: - str_fix = str_fix.replace('-', '__DASH__') - if not str == str_fix: - logger.debug('[Species] rename: [%s] -> [%s]', str, str_fix) - return str - -def build_rxn_id(str): - if str.startswith("R_"): - str = str[2:] - elif str.startswith("R-"): - str = str[2:] - str_fix = str - if '-' in str_fix: - str_fix = str_fix.replace('-', '__DASH__') - if not str == str_fix: - logger.debug('[Reaction] rename: [%s] -> [%s]', str, str_fix) - return str_fix - -#Adding a few exception classes to handle different types of errors -class ObjectError(Exception): - """Error in the construction of a base KBase object""" - pass - -class FeasibilityError(Exception): - """Error in FBA formulation""" - pass - -#This class caries functions designed to more easily make standard modifications to an input model - no model state is retained in this class class GapfillingHelper(): def __init__(self,blacklist = [],auto_sink = ["cpd02701_c", "cpd11416_c0", "cpd15302_c"]): self.blacklist = ["rxn12985","rxn00238","rxn07058","rxn05305","rxn00154","rxn09037","rxn10643", @@ -103,18 +74,14 @@ def __init__(self,blacklist = [],auto_sink = ["cpd02701_c", "cpd11416_c0", "cpd1 "rxn11665","rxn11666","rxn11667","rxn11698","rxn11983","rxn11986","rxn11994", "rxn12006","rxn12007","rxn12014","rxn12017","rxn12022","rxn12160","rxn12161", "rxn01267","rxn05294","rxn04656"] - for item in blacklist: - if item not in self.blacklist: - self.blacklist.append(item) + self.blacklist = set(self.blacklist).update(blacklist) self.auto_sink = [] full_id = re.compile('\d+$') - for id in auto_sink: - if full_id.search(id): - self.auto_sink.append(id) + for sink_id in auto_sink: + if full_id.search(sink_id): + self.auto_sink.append(sink_id) else: - for i in range(0,100): - newid = id + str(i) - self.auto_sink.append(newid) + self.auto_sink.extend([sink_id + str(i) for i in range(0,100)]) self.auto_exchange = "e0" self.COBRA_0_BOUND = 0 @@ -123,7 +90,7 @@ def __init__(self,blacklist = [],auto_sink = ["cpd02701_c", "cpd11416_c0", "cpd1 #FBA macro analyses def test_reaction_additions_againt_limits(self,model,reactions,tests): - filtered = DictList(); + filtered = DictList() with model: for rxn in reactions: if rxn.id() in model.reactions: @@ -134,19 +101,15 @@ def test_reaction_additions_againt_limits(self,model,reactions,tests): for test in tests: testmodel = model with testmodel: - self.apply_media_to_model(testmodel,test["media"],test["default_uptake"],test["default_excretion"]) - self.set_objective_from_target_reaction(testmodel,test["target"],test["maximize"]) - solution = testmodel.optimize() + self.apply_media_to_model(testmodel,test["media"],test["default_uptake"],test["default_excretion"]) #!!! this function is undefined + self.set_objective_from_target_reaction(testmodel,test["target"],test["maximize"]) #!!! this function is undefined + solution = testmodel.optimize() #!!! solution is unused if test.maximize == 1: if testmodel.objective.value() > test.limit: + filtered.append(test) + return filtered - - - - - def build_model_extended_for_gapfilling(self,extend_with_template = 1,source_models = [], input_templates = [],model_penalty = 1,reaction_scores = {}): - model_id = self.fbamodel["id"]+".gf" - + def build_model_extended_for_gapfilling(self,extend_with_template = True, source_models = [], input_templates = [], model_penalty = 1, reaction_scores = {}): #Determine all indecies that should be gapfilled indexlist = [0]*1000 compounds = self.fbamodel["modelcompounds"] @@ -154,24 +117,19 @@ def build_model_extended_for_gapfilling(self,extend_with_template = 1,source_mod compartment = compound['modelcompartment_ref'].split("/").pop() basecomp = compartment[0:1] if not basecomp == "e": - index = compartment[1:] - index = int(index) - indexlist[index] += 1 + indexlist[int(compartment[1:])] += 1 #Iterating over all indecies with more than 10 intracellular compounds: gapfilling_penalties = dict() - for i in range(0,1000): - if indexlist[i] > 10: - if extend_with_template == 1: - new_penalties = self.temp_extend_model_index_for_gapfilling(i,input_templates) - gapfilling_penalties.update(new_penalties) - if i < len(source_models) and source_models[i] != None: - new_penalties = self.mdl_extend_model_index_for_gapfilling(i,source_models[i],model_penalty) - gapfilling_penalties.update(new_penalties) + for i, val in enumerate(indexlist): + if val > 10: + if extend_with_template: + gapfilling_penalties.update(self.temp_extend_model_index_for_gapfilling(i,input_templates)) + if i < len(source_models) and source_models[i]: + gapfilling_penalties.update(self.mdl_extend_model_index_for_gapfilling(i,source_models[i],model_penalty)) #Rescaling penalties by reaction scores and saving genes for reaction in gapfilling_penalties: - array = reaction.split("_") - rxnid = array[0] + rxnid = reaction.split("_")[0] if rxnid in reaction_scores: highest_score = 0 for gene in reaction_scores[rxnid]: @@ -179,28 +137,18 @@ def build_model_extended_for_gapfilling(self,extend_with_template = 1,source_mod highest_score = reaction_scores[rxnid][gene] factor = 1-0.9*highest_score if "reverse" in gapfilling_penalties[reaction]: - penalties[reaction.id]["reverse"] = factor*penalties[reaction.id]["reverse"] + gapfilling_penalties[reaction.id]["reverse"] = factor*gapfilling_penalties[reaction.id]["reverse"] if "forward" in gapfilling_penalties[reaction]: - penalties[reaction.id]["forward"] = factor*penalties[reaction.id]["forward"] + gapfilling_penalties[reaction.id]["forward"] = factor*gapfilling_penalties[reaction.id]["forward"] self.cobramodel.solver.update() return gapfilling_penalties #Possible new function to add to the KBaseFBAModelToCobraBuilder to extend a model with a template for gapfilling for a specific index - def mdl_extend_model_index_for_gapfilling(self,model,index,source_model,model_penalty): - new_metabolites = {} - new_reactions = {} - new_exchange = [] - new_demand = [] - new_penalties = dict() - local_remap = {} - + def mdl_extend_model_index_for_gapfilling(self, model, index, source_model, model_penalty): + new_metabolites, new_reactions, new_penalties, local_remap = {}, {}, {}, {} + new_exchange, new_demand = [], [] comp = re.compile('(.*_*)(.)\d+$') for modelcompound in source_model.metabolites: - - - - - cobra_metabolite = self.convert_modelcompound(modelcompound) original_id = cobra_metabolite.id groups = comp.match(cobra_metabolite.compartment) @@ -212,15 +160,15 @@ def mdl_extend_model_index_for_gapfilling(self,model,index,source_model,model_pe cobra_metabolite.compartment = groups[1]+groups[2]+str(index) groups = comp.match(cobra_metabolite.id) cobra_metabolite.id = groups[1]+groups[2]+str(index) - if cobra_metabolite.id not in self.cobramodel.metabolites and cobra_metabolite.id not in new_metabolites: + if cobra_metabolite.id not in (self.cobramodel.metabolites and new_metabolites): new_metabolites[cobra_metabolite.id] = cobra_metabolite if original_id in self.auto_sink: - self.demand_compounds.add(cobra_metabolite.id) + self.demand_compounds.add(cobra_metabolite.id) #!!! where is demand_compounds defined? new_demand.append(cobra_metabolite) if cobra_metabolite.compartment == self.auto_exchange: self.exchange_compounds.add(cobra_metabolite.id) new_exchange.append(cobra_metabolite) - if cobra_metabolite.id in self.cobramodel.metabolites: + if cobra_metabolite.id in self.cobramodel.metabolites: #!!! where is cobramodel defined? cobra_metabolite = self.cobramodel.metabolites.get_by_id(cobra_metabolite.id) else:#Just in case the same compound is added twice - we want to switch the metabolite to the first new version cobra_metabolite = new_metabolites[cobra_metabolite.id] @@ -230,14 +178,14 @@ def mdl_extend_model_index_for_gapfilling(self,model,index,source_model,model_pe for modelreaction in source_model.reactions: if modelreaction.id.split("_")[0] in self.blacklist: - next + continue #cobra_reaction = self.convert_modelreaction(modelreaction) cobra_reaction = modelreaction.copy() groups = comp.match(cobra_reaction.id) cobra_reaction.id = groups[1]+groups[2]+str(index) - new_penalties[cobra_reaction.id] = dict(); + new_penalties[cobra_reaction.id] = dict() #Updating metabolites in reaction to new model - metabolites = cobra_reaction.metabolites; + metabolites = cobra_reaction.metabolites new_stoichiometry = {} for metabolite in metabolites: #Adding new coefficient: @@ -246,41 +194,37 @@ def mdl_extend_model_index_for_gapfilling(self,model,index,source_model,model_pe if local_remap[metabolite.id] != metabolite: new_stoichiometry[metabolite] = 0 cobra_reaction.add_metabolites(new_stoichiometry,combine=False) - if cobra_reaction.id not in self.cobramodel.reactions and cobra_reaction.id not in new_reactions: + rxn = self.cobramodel.reactions.get_by_id(cobra_reaction.id) + if cobra_reaction.id not in (self.cobramodel.reactions and new_reactions): new_reactions[cobra_reaction.id] = cobra_reaction - new_penalties[cobra_reaction.id]["added"] = 1 + new_penalties[cobra_reaction.id]["added"] = True if cobra_reaction.lower_bound < 0: new_penalties[cobra_reaction.id]["reverse"] = model_penalty if cobra_reaction.upper_bound > 0: new_penalties[cobra_reaction.id]["forward"] = model_penalty - elif cobra_reaction.lower_bound < 0 and self.cobramodel.reactions.get_by_id(cobra_reaction.id).lower_bound == 0: - self.cobramodel.reactions.get_by_id(cobra_reaction.id).lower_bound = cobra_reaction.lower_bound - self.cobramodel.reactions.get_by_id(cobra_reaction.id).update_variable_bounds() + elif cobra_reaction.lower_bound < 0 and rxn.lower_bound == 0: + rxn.lower_bound = cobra_reaction.lower_bound + rxn.update_variable_bounds() new_penalties[cobra_reaction.id]["reverse"] = model_penalty - new_penalties[cobra_reaction.id]["reversed"] = 1 - elif cobra_reaction.upper_bound > 0 and self.cobramodel.reactions.get_by_id(cobra_reaction.id).upper_bound == 0: - self.cobramodel.reactions.get_by_id(cobra_reaction.id).upper_bound = cobra_reaction.upper_bound - self.cobramodel.reactions.get_by_id(cobra_reaction.id).update_variable_bounds() + new_penalties[cobra_reaction.id]["reversed"] = True + elif cobra_reaction.upper_bound > 0 and rxn.upper_bound == 0: + rxn.upper_bound = cobra_reaction.upper_bound + rxn.update_variable_bounds() new_penalties[cobra_reaction.id]["forward"] = model_penalty - new_penalties[cobra_reaction.id]["reversed"] = 1 + new_penalties[cobra_reaction.id]["reversed"] = True - #Only run this on new exchanges so we don't readd for all exchanges + #Only run this on new exchanges so we don't read for all exchanges for cpd in new_exchange: drain_reaction = self.add_drain_from_metabolite_id(cpd.id) - if drain_reaction.id not in self.cobramodel.reactions and drain_reaction.id not in new_reactions: + if drain_reaction.id not in (self.cobramodel.reactions and new_reactions): new_reactions[drain_reaction.id] = drain_reaction - #Only run this on new demands so we don't readd for all exchanges + #Only run this on new demands so we don't read for all exchanges for cpd_id in new_demand: drain_reaction = self.add_drain_from_metabolite_id( - cpd_id, - lower_bound = self.COBRA_0_BOUND, - upper_bound = self.COBRA_DEFAULT_UB, - prefix = 'DM_', - prefix_name = 'Demand for ', - sbo = 'SBO:0000627' - ) - if drain_reaction.id not in self.cobramodel.reactions and drain_reaction.id not in new_reactions: + cpd_id, lower_bound = self.COBRA_0_BOUND, upper_bound = self.COBRA_DEFAULT_UB, + prefix = 'DM_', prefix_name = 'Demand for ', sbo = 'SBO:0000627') + if drain_reaction.id not in (self.cobramodel.reactions and new_reactions): new_reactions[drain_reaction.id] = drain_reaction #Adding all new reactions to the model at once (much faster than one at a time) @@ -289,32 +233,26 @@ def mdl_extend_model_index_for_gapfilling(self,model,index,source_model,model_pe #Possible new function to add to the KBaseFBAModelToCobraBuilder to extend a model with a template for gapfilling for a specific index def temp_extend_model_index_for_gapfilling(self,index,input_templates = []): - new_metabolites = {} - new_reactions = {} - new_exchange = [] - new_demand = [] - new_penalties = dict() + new_metabolites, new_reactions, new_penalties = {}, {}, {} + new_exchange, new_demand = [], [] template = None if index < len(input_templates): template = input_templates[index] - elif index in self.fbamodel['template_refs']: + elif index in self.fbamodel['template_refs']: #!!! where is fbamodel defined? template = self.kbapi.get_from_ws(self.fbamodel['template_refs'][index]) else: template = self.kbapi.get_from_ws(self.fbamodel['template_ref']) - if template.info.type != "KBaseFBA.NewModelTemplate": raise ObjectError(template.info.type+" loaded when KBaseFBA.NewModelTemplate expected") for template_compound in template.compcompounds: tempindex = index - compartment = template_compound.templatecompartment_ref.split("/").pop() - if compartment == "e": + if template_compound.templatecompartment_ref.split("/").pop() == "e": tempindex = 0 cobra_metabolite = self.convert_template_compound(template_compound,tempindex,template) - if cobra_metabolite.id not in self.cobramodel.metabolites and cobra_metabolite.id not in new_metabolites: + if cobra_metabolite.id not in (self.cobramodel.metabolites and new_metabolites): new_metabolites[cobra_metabolite.id] = cobra_metabolite - self.cobramodel.add_metabolites([cobra_metabolite]) if cobra_metabolite.id in self.auto_sink: self.demand_compounds.add(cobra_metabolite.id) new_demand.append(cobra_metabolite.id) @@ -326,20 +264,20 @@ def temp_extend_model_index_for_gapfilling(self,index,input_templates = []): for template_reaction in template.reactions: if template_reaction.id.split("_")[0] in self.blacklist: - continue - cobra_reaction = self.convert_template_reaction(template_reaction,index,template,1) - new_penalties[cobra_reaction.id] = dict(); - if cobra_reaction.id not in self.cobramodel.reactions and cobra_reaction.id not in new_reactions: + continue + cobra_reaction = self.convert_template_reaction(template_reaction,index,template,True) # !!! where is convert_template_reaction defined? + new_penalties[cobra_reaction.id] = dict() + if cobra_reaction.id not in (self.cobramodel.reactions and new_reactions): #Adding any template reactions missing from the present model new_reactions[cobra_reaction.id] = cobra_reaction if cobra_reaction.lower_bound < 0: new_penalties[cobra_reaction.id]["reverse"] = template_reaction.base_cost + template_reaction.reverse_penalty if cobra_reaction.upper_bound > 0: new_penalties[cobra_reaction.id]["forward"] = template_reaction.base_cost + template_reaction.forward_penalty - new_penalties[cobra_reaction.id]["added"] = 1 + new_penalties[cobra_reaction.id]["added"] = True elif template_reaction.GapfillDirection == "=": #Adjusting directionality as needed for existing reactions - new_penalties[cobra_reaction.id]["reversed"] = 1 + new_penalties[cobra_reaction.id]["reversed"] = True if self.cobramodel.reactions.get_by_id(cobra_reaction.id).lower_bound == 0: self.cobramodel.reactions.get_by_id(cobra_reaction.id).lower_bound = template_reaction.maxrevflux self.cobramodel.reactions.get_by_id(cobra_reaction.id).update_variable_bounds() @@ -352,19 +290,14 @@ def temp_extend_model_index_for_gapfilling(self,index,input_templates = []): #Only run this on new exchanges so we don't readd for all exchanges for cpd_id in new_exchange: drain_reaction = self.add_drain_from_metabolite_id(cpd_id) - if drain_reaction != None and drain_reaction.id not in new_reactions: + if drain_reaction is not None and drain_reaction.id not in new_reactions: new_reactions[drain_reaction.id] = drain_reaction #Only run this on new demands so we don't readd for all exchanges for cpd_id in new_demand: drain_reaction = self.add_drain_from_metabolite_id( - cpd_id, - self.COBRA_0_BOUND, - self.COBRA_DEFAULT_UB, - "DM_", - "Demand for " - ) - if drain_reaction != None and drain_reaction.id not in new_reactions: + cpd_id, self.COBRA_0_BOUND, self.COBRA_DEFAULT_UB, "DM_", "Demand for ") + if drain_reaction and drain_reaction.id not in new_reactions: new_reactions[drain_reaction.id] = drain_reaction #Adding all new reactions to the model at once (much faster than one at a time) @@ -372,32 +305,21 @@ def temp_extend_model_index_for_gapfilling(self,index,input_templates = []): return new_penalties def convert_modelreaction(self, reaction, bigg=False): - mr_id = reaction.id - name = reaction.name - annotation = reaction.annotation lower_bound, upper_bound = reaction.get_reaction_constraints() - id = build_rxn_id(mr_id) - if bigg and "bigg.reaction" in annotation: - id = annotation["bigg.reaction"] - - gpr = reaction.get_gpr() + rxn_id = build_id(reaction.id) + if bigg and "bigg.reaction" in reaction.annotation: + rxn_id = reaction.annotation["bigg.reaction"] - cobra_reaction = Reaction(id, - name=name, - lower_bound=lower_bound, - upper_bound=upper_bound) + cobra_reaction = Reaction(rxn_id, name=reaction.name, lower_bound=lower_bound, upper_bound=upper_bound) cobra_reaction.annotation[self.SBO_ANNOTATION] = "SBO:0000176" #biochemical reaction - cobra_reaction.annotation.update(annotation) - - if id.startswith('rxn'): + cobra_reaction.annotation.update(reaction.annotation) + if rxn_id.startswith('rxn'): cobra_reaction.annotation["seed.reaction"] = id.split("_")[0] cobra_reaction.add_metabolites(self.convert_modelreaction_stoichiometry(reaction)) - cobra_reaction.gene_reaction_rule = reaction.gene_reaction_rule - - for genes in gpr: + for genes in reaction.get_gpr(): for gene in genes: if not gene in self.genes: self.genes[gene] = gene @@ -405,56 +327,40 @@ def convert_modelreaction(self, reaction, bigg=False): return cobra_reaction def convert_modelcompound(self, metabolite, bigg=False): - formula = metabolite.formula - name = metabolite.name - charge = metabolite.charge - mc_id = metabolite.id - compartment = metabolite.compartment - annotation = metabolite.annotation - - id = build_cpd_id(mc_id) - - if bigg and "bigg.metabolite" in annotation: - id = annotation["bigg.metabolite"] + "_" + compartment - #print(id) - - met = Metabolite(id, - formula=formula, - name=name, - charge=charge, - compartment=compartment) + met_id = build_id(metabolite.id) + if bigg and "bigg.metabolite" in metabolite.annotation: + met_id = metabolite.annotation["bigg.metabolite"]+"_"+metabolite.compartment + met = Metabolite(met_id, formula=metabolite.formula, name=metabolite.name, + charge=metabolite.charge, compartment=metabolite.compartment) met.annotation[self.SBO_ANNOTATION] = "SBO:0000247" #simple chemical - Simple, non-repetitive chemical entity. - if id.startswith('cpd'): - met.annotation["seed.compound"] = id.split("_")[0] - met.annotation.update(annotation) + if met_id.startswith('cpd'): + met.annotation["seed.compound"] = met_id.split("_")[0] + met.annotation.update(metabolite.annotation) return met def convert_modelreaction_stoichiometry(self, reaction): object_stoichiometry = {} - s = reaction.stoichiometry - for metabolite_id in s: - if metabolite_id in self.metabolites_remap: - object_stoichiometry[self.cobramodel.metabolites.get_by_id(self.metabolites_remap[metabolite_id])] = s[metabolite_id] + for met_id in reaction.stoichiometry: + if met_id in self.metabolites_remap: + object_stoichiometry[self.cobramodel.metabolites.get_by_id(self.metabolites_remap[met_id])] = reaction.stoichiometry[met_id] return object_stoichiometry - def create_binary_variables(self,rxnobj,forward = 1,reverse = 1): - if rxnobj.id not in self.binary_flux_variables: + def create_binary_variables(self,rxnobj,forward = True,reverse = True): + if rxnobj.id not in self.binary_flux_variables: #!!! Where are binary_flux_variables and binary_flux_constraints defined? self.binary_flux_variables[rxnobj.id] = dict() self.binary_flux_constraints[rxnobj.id] = dict() - if forward == 1 and rxnobj.upper_bound > 0 and "forward" not in self.binary_flux_variables[rxnobj.id]: + if all([forward, rxnobj.upper_bound > 0, "forward" not in self.binary_flux_variables[rxnobj.id]]): self.binary_flux_variables[rxnobj.id]["forward"] = self.cobramodel.problem.Variable(rxnobj.id+"_fb", lb=0,ub=1,type="binary") self.cobramodel.add_cons_vars(self.binary_flux_variables[rxnobj.id]["forward"]) self.binary_flux_constraints[rxnobj.id]["forward"] = self.cobramodel.problem.Constraint( - 1000*self.binary_flux_variables[rxnobj.id]["forward"] - rxnobj.forward_variable,lb=0,ub=None,name=rxnobj.id+"_fb" - ) + 1000*self.binary_flux_variables[rxnobj.id]["forward"] - rxnobj.forward_variable,lb=0,ub=None,name=rxnobj.id+"_fb") self.cobramodel.add_cons_vars(self.binary_flux_constraints[rxnobj.id]["forward"]) - if reverse == 1 and rxnobj.lower_bound < 0 and "reverse" not in self.binary_flux_variables[rxnobj.id]: + if all([reverse, rxnobj.lower_bound < 0, "reverse" not in self.binary_flux_variables[rxnobj.id]]): self.binary_flux_variables[rxnobj.id]["reverse"] = self.cobramodel.problem.Variable(rxnobj.id+"_bb", lb=0,ub=1,type="binary") self.cobramodel.add_cons_vars(self.binary_flux_variables[rxnobj.id]["reverse"]) self.binary_flux_constraints[rxnobj.id]["reverse"] = self.cobramodel.problem.Constraint( - 1000*self.binary_flux_variables[rxnobj.id]["reverse"] - rxnobj.forward_variable,lb=0,ub=None,name=rxnobj.id+"_bb" - ) + 1000*self.binary_flux_variables[rxnobj.id]["reverse"] - rxnobj.forward_variable,lb=0,ub=None,name=rxnobj.id+"_bb") self.cobramodel.add_cons_vars(self.binary_flux_constraints[rxnobj.id]["reverse"]) def binary_check_gapfilling_solution(self,gapfilling_penalties,add_solution_exclusion_constraint): @@ -463,10 +369,10 @@ def binary_check_gapfilling_solution(self,gapfilling_penalties,add_solution_excl for rxnobj in self.cobramodel.reactions: if rxnobj.id in gapfilling_penalties: if "reverse" in gapfilling_penalties[rxnobj.id] and flux_values[rxnobj.id]["reverse"] > Zero: - self.create_binary_variables(rxnobj,0,1) + self.create_binary_variables(rxnobj, forward=False) objcoef[self.binary_flux_variables[rxnobj.id]["reverse"]] = 1 if "forward" in gapfilling_penalties[rxnobj.id] and flux_values[rxnobj.id]["forward"] > Zero: - self.create_binary_variables(rxnobj,1,0) + self.create_binary_variables(rxnobj, reverse=False) objcoef[self.binary_flux_variables[rxnobj.id]["forward"]] = 1 with self.cobramodel: #Setting all gapfilled reactions not in the solution to zero @@ -480,62 +386,50 @@ def binary_check_gapfilling_solution(self,gapfilling_penalties,add_solution_excl rxnobj.update_variable_bounds() #Setting the objective to be minimization of sum of binary variables self.cobramodel.objective = min_reaction_objective - min_reaction_objective.set_linear_coefficients(objcoef) + self.cobramodel.objective.set_linear_coefficients(objcoef) with open('GapfillBinary.lp', 'w') as out: out.write(str(self.cobramodel.solver)) self.cobramodel.optimize() flux_values = self.compute_flux_values_from_variables() - if add_solution_exclusion_constraint == 1: + if add_solution_exclusion_constraint: self.add_binary_solution_exclusion_constraint(flux_values) return flux_values #Adds a constraint that eliminates a gapfilled solution from feasibility so a new solution can be obtained def add_binary_solution_exclusion_constraint(self,flux_values): - count = len(self.solution_exclusion_constraints) solution_coef = {} solution_size = 0 - for reaction in self.binary_flux_variables: - for direction in self.binary_flux_variables[reaction]: - if flux_values[reaction][direction] > Zero: - solution_size += 1 - solution_coef[self.binary_flux_variables[reaction][direction]] = 1 + for reaction, direction in self.binary_flux_variables.items(): + if flux_values[reaction][direction] > Zero: + solution_size += 1 + solution_coef[self.binary_flux_variables[reaction][direction]] = 1 if len(solution_coef) > 0: new_exclusion_constraint = self.cobramodel.problem.Constraint( - Zero,lb=None,ub=(solution_size-1),name="exclusion."+str(count+1) - ) + Zero,lb=None,ub=(solution_size-1),name="exclusion."+str(len(self.solution_exclusion_constraints)+1)) self.cobramodel.add_cons_vars(new_exclusion_constraint) self.cobramodel.solver.update() new_exclusion_constraint.set_linear_coefficients(solution_coef) - self.solution_exclusion_constraints.append(new_exclusion_constraint); + self.solution_exclusion_constraints.append(new_exclusion_constraint) return new_exclusion_constraint return None - #Takes gapfilled penalties and creates and objective function minimizing gapfilled reactions + #Takes gapfilled penalties and creates an objective function minimizing gapfilled reactions def create_minimal_reaction_objective(self,penalty_hash,default_penalty = 0): - reaction_objective = self.cobramodel.problem.Objective( - Zero, - direction="min") + reaction_objective = self.cobramodel.problem.Objective(Zero, direction="min") obj_coef = dict() for reaction in self.cobramodel.reactions: + obj_coef[reaction.forward_variable] = default_penalty + obj_coef[reaction.reverse_variable] = default_penalty if reaction.id in penalty_hash: - #Minimizing gapfilled reactions if "reverse" in penalty_hash[reaction.id]: obj_coef[reaction.reverse_variable] = abs(penalty_hash[reaction.id]["reverse"]) - elif default_penalty != 0: - obj_coef[reaction.reverse_variable] = default_penalty if "forward" in penalty_hash[reaction.id]: obj_coef[reaction.forward_variable] = abs(penalty_hash[reaction.id]["forward"]) - elif default_penalty != 0: - obj_coef[reaction.forward_variable] = default_penalty - else: - obj_coef[reaction.forward_variable] = default_penalty - obj_coef[reaction.reverse_variable] = default_penalty - self.cobramodel.objective = reaction_objective - reaction_objective.set_linear_coefficients(obj_coef) + self.cobramodel.objective.set_linear_coefficients(obj_coef) #Required this function to add gapfilled compounds to a KBase model for saving gapfilled model - def convert_cobra_compound_to_kbcompound(self,cpd,kbmodel,add_to_model = 1): + def convert_cobra_compound_to_kbcompound(self, cpd, kbmodel=None, add_to_model=True): refid = "cpd00000" if re.search('cpd\d+_[a-z]+',cpd.id): refid = cpd.id @@ -554,12 +448,12 @@ def convert_cobra_compound_to_kbcompound(self,cpd,kbmodel,add_to_model = 1): "string_attributes": {} } cpd_data = AttrDict(cpd_data) - if add_to_model == 1: + if add_to_model: kbmodel.modelcompounds.append(cpd_data) return cpd_data #Required this function to add gapfilled reactions to a KBase model for saving gapfilled model - def convert_cobra_reaction_to_kbreaction(self,rxn,kbmodel,direction = "=",add_to_model = 1): + def convert_cobra_reaction_to_kbreaction(self, rxn, kbmodel, direction="=", add_to_model=True): rxnref = "~/template/reactions/id/rxn00000_c" if re.search('rxn\d+_[a-z]+',rxn.id): rxnref = "~/template/reactions/id/"+rxn.id @@ -586,26 +480,21 @@ def convert_cobra_reaction_to_kbreaction(self,rxn,kbmodel,direction = "=",add_to rxn_data = AttrDict(rxn_data) for cpd in rxn.metabolites: if cpd.id not in kbmodel.modelcompounds: - convert_cobra_compound_to_kbcompound(cpd,kbmodel,1) + self.convert_cobra_compound_to_kbcompound(cpd,kbmodel) rxn_data.modelReactionReagents.append({ "coefficient" : rxn.metabolites[cpd], "modelcompound_ref" : "~/modelcompounds/id/"+cpd.id }) - if add_to_model == 1: + if add_to_model: kbmodel.modelreactions.append(rxn_data) return rxn_data def convert_objective_to_constraint(self,lower_bound,upper_bound): old_obj_variable = self.cobramodel.problem.Variable( - name="old_objective_variable", - lb=lower_bound,ub=upper_bound - ) + name="old_objective_variable", lb=lower_bound,ub=upper_bound) old_obj_constraint = self.cobramodel.problem.Constraint( self.cobramodel.solver.objective.expression - old_obj_variable, - lb=0, - ub=0, - name="old_objective_constraint", - ) + lb=0, ub=0, name="old_objective_constraint") self.cobramodel.add_cons_vars([old_obj_variable, old_obj_constraint]) def compute_flux_values_from_variables(self): @@ -616,33 +505,26 @@ def compute_flux_values_from_variables(self): flux_values[rxnobj.id]["forward"] = rxnobj.forward_variable.primal return flux_values - def compute_gapfilled_solution(self,penalties,flux_values = None): - if flux_values == None: - flux_values = self.compute_flux_values_from_variables() - output = {"reversed" : {},"new" : {}} + def compute_gapfilled_solution(self, penalties:dict, flux_values = None): + flux_values = flux_values or self.compute_flux_values_from_variables() + directions = {"reversed" : {},"new" : {}} for reaction in self.cobramodel.reactions: if reaction.id in penalties: if flux_values[reaction.id]["forward"] > Zero and "forward" in penalties[reaction.id]: if "added" in penalties[reaction.id]: - output["new"][reaction.id] = ">" + directions["new"][reaction.id] = ">" else: - output["reversed"][reaction.id] = ">" + directions["reversed"][reaction.id] = ">" elif flux_values[reaction.id]["reverse"] > Zero and "reverse" in penalties[reaction.id]: if "added" in penalties[reaction.id]: - output["new"][reaction.id] = "<" + directions["new"][reaction.id] = "<" else: - output["reversed"][reaction.id] = "<" - return output + directions["reversed"][reaction.id] = "<" + return directions - def add_gapfilling_solution_to_kbase_model(self,newmodel,penalties,media_ref): - gfid = None - if gfid == None: - largest_index = 0 - for gapfilling in newmodel.gapfillings: - current_index = gapfilling.id.split(".").pop() - if largest_index == 0 or largest_index < current_index: - largest_index = current_index - gfid = "gf."+str(largest_index+1) + def add_gapfilling_solution_to_kbase_model(self, newmodel, penalties:dict, media_ref): + largest_index = max([gapfilling.id.split(".").pop() for gapfilling in newmodel.gapfillings]) + gfid = "gf."+str(largest_index+1) newmodel.gapfillings.append({ "gapfill_id": newmodel.id+"."+gfid, "id": gfid, @@ -665,7 +547,7 @@ def add_gapfilling_solution_to_kbase_model(self,newmodel,penalties,media_ref): gfrxn.gapfill_data[gfid] = dict() gfrxn.gapfill_data[gfid]["0"] = ["<",1,[]] - def compute_reaction_scores(self,weigh_all_events_equally = 1,weights = None): + def compute_reaction_scores(self, weigh_all_events_equally=True, weights=None): reaction_genes = {} if "genome_ref" in self.fbamodel: anno_api = annotation_ontology_api() @@ -681,7 +563,7 @@ def compute_reaction_scores(self,weigh_all_events_equally = 1,weights = None): reaction_genes[newrxn] = {} if gene not in reaction_genes[newrxn]: reaction_genes[newrxn][gene] = 0 - if weigh_all_events_equally == 1 or weights == None: + if weigh_all_events_equally or weights is None: reaction_genes[newrxn][gene] += 1 elif event["description"] in weights: reaction_genes[newrxn][gene] += weights[event["description"]] @@ -693,40 +575,31 @@ def compute_reaction_scores(self,weigh_all_events_equally = 1,weights = None): def replicate_model(self,count): newmodel = Model(self.cobramodel.id+"_rep"+str(count)) - utilities = KBaseFBAUtilities(newmodel,newmodel,self.kbapi,self.media,default_uptake = self.default_uptake,default_excretion = self.default_excretion,blacklist = self.blacklist) - metabolites = [] - reactions = [] + # utilities = KBaseFBAUtilities(newmodel, newmodel, self.kbapi, self.media, default_uptake = self.default_uptake, default_excretion = self.default_excretion, blacklist = self.blacklist) # !!! KBaseFBAUtilities is undefined + metabolites, reactions = [], [] metabolite_hash = {} for i in range(0,count): for metabolite in self.cobramodel.metabolites: metabolite = metabolite.copy() - metabolite.id = metabolite.id + "__" + str(i) + metabolite.id += "__"+str(i) metabolite_hash[metabolite.id] = metabolite metabolites.append(metabolite) for reaction in self.cobramodel.reactions: reaction = reaction.copy() - reaction.id = reaction.id + "__" + str(i) - input_metabolites = {} - for metabolite in reaction.metabolites: - newid = metabolite.id + "__" + str(i) - input_metabolites[metabolite_hash[newid]] = reaction.metabolites[metabolite] - reaction.add_metabolites(input_metabolites,combine=False) + reaction.id += "__"+str(i) + reaction.add_metabolites({metabolite_hash[met.id+"__"+str(i)]:reaction.metabolites[met] for met in reaction.metabolites},combine=False) reactions.append(reaction) newmodel.add_metabolites(metabolites) newmodel.add_reactions(reactions) - return utilities + return newmodel - def test_reaction_additions_againt_limits(self,reactions,directions,tests): - filtered_rxn = [] - filtered_direction = [] - - - #Using "with" to ensure we don't alter the model with these tests + def test_reaction_additions_againt_limits(self,reactions,tests): + filtered_tests = {} model = self.cobramodel - with model: + with model: # conserve the original model through WITH for rxn in reactions: - if rxn.id() in self.cobramodel.reactions: - rxn_obj = self.cobramodel.reactions.get_by_id(rxn.id()) + if rxn.id in self.cobramodel.reactions: + rxn_obj = self.cobramodel.reactions.get_by_id(rxn.id) else: rxn_obj = self.cobramodel.add_reactions([rxn]) self.set_reaction_bounds_from_direction(rxn_obj,reactions[rxn]) @@ -738,15 +611,16 @@ def test_reaction_additions_againt_limits(self,reactions,directions,tests): solution = self.cobramodel.optimize() if test.maximize == 1: if testmodel.objective.value() > test.limit: - + filtered_tests.update({rxn_obj:reactions[rxn]}) + return filtered_tests - def set_reaction_bounds_from_direction(self,reaction,direction,add=0): + def set_reaction_bounds_from_direction(self,reaction,direction,add=False): if direction == "<": reaction.lower_bound = -100 - if add == 0: + if not add: reaction.upper_bound = 0 if direction == ">": reaction.upper_bound = 100 - if add == 0: + if not add: reaction.lower_bound = 0 - reaction.update_variable_bounds() \ No newline at end of file + reaction.update_variable_bounds()