Source code for maboss.simulation

"""Class that handles the parameters of a MaBoSS simulation.
"""

from __future__ import print_function
import collections
from functools import reduce
from sys import stderr, stdout, version_info
if version_info[0] < 3:
    from contextlib2 import ExitStack
else:
    from contextlib import ExitStack

from colomoto import ModelState

from .result import Result
from .cmabossresult import CMaBoSSResult
import os
import uuid
import subprocess
import tempfile
import shutil
from colomoto_jupyter import import_colomoto_tool
from colomoto_jupyter.sessionfiles import new_output_file

_default_parameter_list = collections.OrderedDict([
    ('time_tick', 0.1),
    ('max_time', 4),
    ('sample_count', 10000),
    ('discrete_time', 0),
    ('use_physrandgen', 0),
    ('seed_pseudorandom', 0),
    ('display_traj', 0),
    ('statdist_traj_count', 0),
    ('statdist_cluster_threshold', 1),
    ('thread_count', 1),
    ('statdist_similarity_cache_max_size', 20000)
])

[docs] class Simulation(object): """ Class that handles MaBoSS simulations. .. py:attribute:: network A Network object, that will be translated in a bnd file .. py:attribute:: mutations A list of nodes for which mutation can be triggered by modifying the cfg file .. py:attribute:: palette A mapping of nodes to color for plotting the results of the simulation. .. py:attribute:: param A dictionary that contains global variables (keys starting with a '$'), and simulation parameters (keys not starting with a '$'). """
[docs] def __init__(self, nt, parameters=collections.OrderedDict({}), command=None, mutations=[], mutationsTypes={}, **kwargs): """ Initialize the Simulation object. :param nt: the network associated with the simulation. :type nt: :py:class:`Network` :param dict kwargs: parameters of the simulation """ self.param = _default_parameter_list.copy() self.palette = {} for cfg in (parameters, kwargs): for p in cfg: if p in _default_parameter_list or p[0] == '$': self.param[p] = cfg[p] elif p == "palette": self.palette = kwargs[p] else: print("Warning: unused parameter %s" % p, file=stderr) self.network = nt self.mutations = mutations self.mutationTypes = mutationsTypes self.refstate = {} self.workdir = None self.overwrite = False errors = self.check(command=command) if len(errors) > 0: print(errors)
[docs] def update_parameters(self, **kwargs): """Add elements to ``self.param``.""" for p in kwargs: if p in _default_parameter_list or p[0] == '$': self.param[p] = kwargs[p] else: print("Warning: unused parameter %s" % p, file=stderr)
def copy(self): new_network = self.network.copy() result = Simulation(new_network, self.param, mutations=self.mutations.copy(), mutationsTypes=self.mutationTypes.copy(), palette=self.palette) result.refstate = self.refstate.copy() return result def check(self, command=None): try: path = tempfile.mkdtemp() cfg_fd, cfg_path = tempfile.mkstemp(dir=path, suffix='.cfg') os.close(cfg_fd) bnd_fg, bnd_path = tempfile.mkstemp(dir=path, suffix='.bnd') os.close(bnd_fg) with ExitStack() as stack: bnd_file = stack.enter_context(open(bnd_path, 'w')) cfg_file = stack.enter_context(open(cfg_path, 'w')) self.print_bnd(out=bnd_file) self.print_cfg(out=cfg_file) maboss_cmd = self.get_maboss_cmd() if command is not None: maboss_cmd = command proc = subprocess.Popen( [maboss_cmd, "--check", "-c", cfg_path, bnd_path], cwd=path, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) (t_stdout, t_stderr) = proc.communicate() shutil.rmtree(path) messages = [] if len(t_stderr) != 0: messages = [mess.replace("MaBoSS: ", "") for mess in list(set(t_stderr.decode().split("\n"))) if mess != ''] return messages except ValueError as e: return [str(e)] def get_maboss_cmd(self): maboss_cmd = "MaBoSS" l = len(self.network) assert l <= 1024, "Models with more than 1024 nodes are not compatible with this version of MaBoSS" if l <= 64: pass elif l <= 128: maboss_cmd = "MaBoSS_128n" elif l <= 256: maboss_cmd = "MaBoSS_256n" elif l <= 512: maboss_cmd = "MaBoSS_512n" else: maboss_cmd = "MaBoSS_1024n" import platform if platform.platform == "Windows": maboss_cmd += ".exe" return maboss_cmd def get_cmaboss(self): l = len(self.network) assert l <= 1024, "Models with more than 1024 nodes are not compatible with this version of MaBoSS" if l <= 64: return __import__("cmaboss") elif l <= 128: return __import__("cmaboss_128n") elif l <= 256: return __import__("cmaboss_256n") elif l <= 512: return __import__("cmaboss_512n") else: return __import__("cmaboss_1024n")
[docs] def print_bnd(self, out=stdout): """Produce the content of the bnd file associated to the simulation.""" print(self.network, file=out)
[docs] def print_cfg(self, out=stdout): """Produce the content of the cfg file associated to the simulation.""" print(self.str_cfg(), file=out)
def str_cfg(self): res = "" # Filtering out unused values if len(self.mutations) > 0: res = "$nb_mutable = %d;\n" % len(self.mutations) for p in self.param: if p[0] == '$' and p != "$nb_mutable": res += "%s = %s;\n" % (p, self.param[p]) res += self.network.str_istate() + "\n" res += "\n" for p in self.param: if p[0] != '$': res += "%s = %s;\n" % (p, self.param[p]) for name in self.network.names: # Filtering out default values if str(self.network[name].is_internal).lower() not in ["false", "0", "0.0"]: res += "%s.is_internal = %s;\n" % (name, self.network[name].is_internal) for name in self.network.names: # Filtering out default values if str(self.network[name].in_graph).lower() not in ["false", "0", "0.0"]: res += "%s.in_graph = %s;\n" % (name, self.network[name].in_graph) for nd in self.refstate: # Filtering out default values if str(self.refstate[nd]) not in ["-1", "-1.0"]: res += "%s.refstate = %s;\n" % (nd, self.refstate[nd]) for name in self.network.names: if self.network[name].get_schedule() is not None and len(self.network[name].get_schedule()) > 0: schedule = ", ".join([f"{time}: {value}" for time, value in self.network[name].get_schedule().items()]) res += "%s.schedule = %s;\n" % (name, schedule) return res def get_logical_rules(self): path = tempfile.mkdtemp() cfg_fd, cfg_path = tempfile.mkstemp(dir=path, suffix='.cfg') os.close(cfg_fd) bnd_fd, bnd_path = tempfile.mkstemp(dir=path, suffix='.bnd') os.close(bnd_fd) with ExitStack() as stack: bnd_file = stack.enter_context(open(bnd_path, 'w')) cfg_file = stack.enter_context(open(cfg_path, 'w')) self.print_bnd(out=bnd_file) self.print_cfg(out=cfg_file) proc = subprocess.Popen( [self.get_maboss_cmd(), "-c", cfg_path, "-l", bnd_path], stdout=subprocess.PIPE, stderr=subprocess.PIPE ) out, err = proc.communicate() if proc.returncode != 0: raise Exception(err.decode()) rules = {} for line in out.decode().split("\n"): if ":" in line: node, rule = line.split(" : ", 1) rules.update({node.strip(): rule.strip()}) return rules
[docs] def run(self, command=None, workdir=None, overwrite=False, prefix="res", cmaboss=False, only_final_state=False): """Run the simulation with MaBoSS and return a Result object. :param command: specify a MaBoSS command, default to None for automatic selection :rtype: :py:class:`Result` """ if cmaboss: return CMaBoSSResult(self, workdir=workdir, overwrite=overwrite, only_final_state=only_final_state) if workdir is not None: self.workdir = workdir if overwrite: self.overwrite = overwrite return Result(self, command, self.workdir, self.overwrite, prefix)
[docs] def mutate(self, node, state): """ Trigger or untrigger mutation for a node. :param node: The :py:class:`Node` to be modified :type node: :py:class:`Node` :param str State: * ``'ON'`` (always up) * ``'OFF'`` (always down) * ``'WT'`` (mutable but with normal behaviour) The node will appear as a mutable node in the bnd file. This means that its rate will be of the form: ``rate_up = $LowNode ? 0 :($HighNode ? 1: (@logic ? rt_up : 0))`` If the node is already mutable, this method will simply set $HighNode and $LowNode accordingly to the desired mutation. """ if node not in self.network: print("Error, unknown node %s" % node, file=stderr) return nd = self.network[node] if not nd.is_mutant: self.network[node]=_make_mutant_node(nd) self.mutations.append(nd.name) self.mutationTypes.update({nd.name: state}) lowvar = "$Low_"+node highvar = "$High_"+node if state == "ON": self.param[lowvar] = 0 self.param[highvar] = 1 elif state == "OFF": self.param[lowvar] = 1 self.param[highvar] = 0 elif state == "WT": self.param[lowvar] = 0 self.param[highvar] = 0 else: print("Error, state must be ON, OFF or WT", file=stderr) return
[docs] def continue_from_result(self, result): """Set the initial state from as the last state from result.""" node_prob = result.get_nodes_probtraj() nodes = node_prob.iloc[-1] for i in nodes.index: if i != "<nil>": prob = nodes[i] self.network.set_istate(i, [1 - prob, prob])
[docs] def get_initial_state(self): """ TODO """ istate = ModelState() def _simplify(states): if len(states) == 1: return states.pop() return list(sorted(states)) for key, val in self.network.get_istate().items(): if str in map(type, val.values()): raise NotImplementedError("This MaBoSS initial state contains macros which is not supported yet") states = [i for i, prob in val.items() if prob > 0] if isinstance(key, tuple): pre = {i: set() for i in key} for subs in states: for (i,state) in zip(key, subs): pre[i].add(state) count = reduce(int.__mul__, map(len, pre.values())) if count != len(states): raise ValueError("This MaBoSS initial state cannot be converted to a single hypercube") for key, states in pre.items(): istate[key] = _simplify(states) else: istate[key] = _simplify(states) return istate
def get_mutations(self): return self.mutationTypes def set_workdir(self, path, overwrite=False): self.workdir = path self.overwrite = overwrite
def _make_mutant_node(nd): """Create a new logic for mutation that can be activated from .cfg file.""" curent_rt_up = nd.rt_up curent_rt_down = nd.rt_down lowvar = "$Low_"+nd.name highvar = "$High_"+nd.name rt_up = (lowvar+" ? 0 : (" + highvar + " ? 1E308/$nb_mutable : (" + curent_rt_up + "))") rt_down = (highvar + " ? 0 : (" + lowvar + " ? 1E308/$nb_mutable : (" + curent_rt_down + "))") # Once this is done, the mutation can be activated by modifying the value # of $Low_nodename and $High_nodename in the .cfg file newNode = nd.copy() newNode.rt_up = rt_up newNode.rt_down = rt_down newNode.is_mutant = True return newNode def set_nodes_istate(masim, nodes, istate): for n in nodes: masim.network.set_istate(n, istate) def set_output(masim, output): masim.network.set_output(output) def copy_and_mutate(masim, nodes, mut): masim2 = masim.copy() for node in nodes: masim2.mutate(node, mut) return masim2 def copy_and_update_parameters(sim, parameters): new_sim = sim.copy() new_sim.update_parameters( **parameters ) return new_sim def to_biolqm(maboss_model): biolqm = import_colomoto_tool("biolqm") bnet_filename = new_output_file("bnet") with open(bnet_filename, "w") as bnet_file: for node, rule in maboss_model.get_logical_rules().items(): bnet_file.write("%s, %s\n" % (node, rule)) return biolqm.load(bnet_filename) def to_minibn(maboss_model): from colomoto import minibn bnet_filename = new_output_file("bnet") with open(bnet_filename, "w") as bnet_file: for node, rule in maboss_model.get_logical_rules().items(): bnet_file.write("%s, %s\n" % (node, rule)) return minibn.BooleanNetwork.load(bnet_filename) def sbml_to_bnd_and_cfg(sbml_filename, bnd_filename, cfg_filename, use_sbml_names=False): from .gsparser import loadSBML model = loadSBML(sbml_filename, use_sbml_names=use_sbml_names, cmaboss=True) with open(bnd_filename, "w") as bnd_file: model.print_bnd(bnd_file) with open(cfg_filename, "w") as cfg_file: model.print_cfg(cfg_file) def bnet_to_bnd_and_cfg(bnet_filename, bnd_filename, cfg_filename): from .gsparser import loadBNet model = loadBNet(bnet_filename, cmaboss=True) with open(bnd_filename, "w") as bnd_file: model.print_bnd(bnd_file) with open(cfg_filename, "w") as cfg_file: model.print_cfg(cfg_file) def sbml_to_maboss(sbml_filename, cfg_filename=None, use_sbml_names=False): from .gsparser import load bnd_filename = new_output_file("bnd") default_cfg_filename = new_output_file("cfg") sbml_to_bnd_and_cfg(sbml_filename, bnd_filename, default_cfg_filename, use_sbml_names) sim = load(bnd_filename, cfg_filename if cfg_filename is not None else default_cfg_filename) if cfg_filename is None: for node in sim.network: sim.network.set_istate(node, [1, 0]) return sim def bnet_to_maboss(sbml_filename, cfg_filename=None): from .gsparser import load bnd_filename = new_output_file("bnd") default_cfg_filename = new_output_file("cfg") bnet_to_bnd_and_cfg(sbml_filename, bnd_filename, default_cfg_filename) sim = load(bnd_filename, cfg_filename if cfg_filename is not None else default_cfg_filename) if cfg_filename is None: for node in sim.network: sim.network.set_istate(node, [1, 0]) return sim