Source code for maboss.upp.results

from __future__ import print_function
import os
import sys
import re
import random
import pandas as pd
import numpy as np
if sys.version_info[0] < 3:
    from contextlib2 import ExitStack
else:
    from contextlib import ExitStack
import glob
from ..results.storedresult import StoredResult
from ..server import MaBoSSClient
import shutil
from multiprocessing import Pool
from collections import OrderedDict

[docs] class UpdatePopulationResults: def __init__(self, uppModel, verbose=False, workdir=None, overwrite=False, previous_run=None, previous_run_step=-1, host=None, port=7777, nodes_init=None): self.uppModel = uppModel self.pop_ratios = pd.Series(dtype='float64') self.stepwise_probability_distribution = None self.nodes_stepwise_probability_distribution = None self.nodes_list_stepwise_probability_distribution = None self.nodes_init = nodes_init self.results = [] self.verbose = verbose self.workdir = workdir self.overwrite = overwrite self.pop_ratio = uppModel.pop_ratio self.host = host self.port = port if workdir is not None and os.path.exists(workdir) and not self.overwrite: # Restoring self.results = [None] * (self.uppModel.step_number + 1) for folder in sorted(glob.glob("%s/Step_*/" % self.workdir)): step = os.path.basename(folder[0:-1]).split("_")[-1] self.results[int(step)] = StoredResult(folder) self.pop_ratios = pd.read_csv( os.path.join(self.workdir, "PopRatios.csv"), index_col=0 ).squeeze("columns") / self.uppModel.base_ratio if previous_run: # Load the previous run final state _get_next_condition_from_trajectory(previous_run, self.uppModel.model, step=previous_run_step) else: if workdir is not None: if os.path.exists(workdir): shutil.rmtree(workdir) os.makedirs(workdir) if previous_run: # Load the previous run final state _get_next_condition_from_trajectory(previous_run, self.uppModel.model, step=previous_run_step) self._run() def _run(self): if self.verbose: print("Run MaBoSS step 0") sim_workdir = os.path.join(self.workdir, "Step_0") if self.workdir is not None else None if self.host is None: result = self.uppModel.model.run(workdir=sim_workdir) else: mbcli = MaBoSSClient(self.host, self.port) result = mbcli.run(self.uppModel.model) mbcli.close() self.results.append(result) self.pop_ratios[self.uppModel.time_shift] = self.pop_ratio modelStep = self.uppModel.model.copy() for stepIndex in range(1, self.uppModel.step_number+1): # # Update pop ratio and construct the new version of model # with result._get_probtraj_fd() as result_probtraj_fd: modelStep = self._buildUpdateCfg(modelStep, result_probtraj_fd, stepIndex) if modelStep is None: if self.verbose: print("No cells left") break else: if self.verbose: print("Running MaBoSS for step %d" % stepIndex) sim_workdir = os.path.join(self.workdir, "Step_%d" % stepIndex) if self.workdir is not None else None if self.host is None: result = modelStep.run(workdir=sim_workdir) else: mbcli = MaBoSSClient(self.host, self.port) result = mbcli.run(modelStep) mbcli.close() self.results.append(result) if self.workdir is not None: self.save_population_ratios(os.path.join(self.workdir, "PopRatios.csv"))
[docs] def get_population_ratios(self, name=None): """ .. py:method:: Returns the population ratios timeserie :param name: Optional name for the pandas series object :return: Pandas series object, with the population ratios according to time """ if name: self.pop_ratios.name = name return self.pop_ratios*self.uppModel.base_ratio
[docs] def get_stepwise_probability_distribution(self, nb_cores=1, include=None, exclude=None): """ .. py:method:: Returns the stepwise probability distribution :return: Pandas dataframe object, representing the probability distribution of the different states, as a timeserie """ if self.stepwise_probability_distribution is None: if nb_cores > 1: tables = [] with Pool(processes=nb_cores) as pool: tables = pool.map( make_stepwise_probability_distribution_line, self.results ) else: tables = [result.get_last_states_probtraj() for result in self.results] self.stepwise_probability_distribution = pd.concat(tables, axis=0, sort=False) self.stepwise_probability_distribution.fillna(0, inplace=True) self.stepwise_probability_distribution.set_index([list(range(0, len(tables)))], inplace=True) self.stepwise_probability_distribution.insert( 0, column='PopRatio', value=(self.pop_ratios*self.uppModel.base_ratio).values ) if include is None and include is None: return self.stepwise_probability_distribution else: states_filtered = self.stepwise_probability_distribution.columns if include is not None: states_filtered = [state for state in states_filtered if set(include).issubset(set(state.split(" -- ")))] if exclude is not None: states_filtered = [state for state in states_filtered if set(exclude).isdisjoint(set(state.split(" -- ")))] return self.stepwise_probability_distribution.loc[:, states_filtered]
def get_nodes_stepwise_probability_distribution(self, nodes=None, nb_cores=1, direct=True): if self.nodes_stepwise_probability_distribution is None or set(nodes) != self.nodes_list_stepwise_probability_distribution: if direct: tables = [result.get_last_nodes_probtraj(nodes) for result in self.results] self.nodes_stepwise_probability_distribution = pd.concat(tables, axis=0, sort=False) self.nodes_stepwise_probability_distribution.fillna(0, inplace=True) self.nodes_stepwise_probability_distribution.set_index([list(range(0, len(tables)))], inplace=True) self.nodes_stepwise_probability_distribution.insert( 0, column='PopRatio', value=(self.pop_ratios*self.uppModel.base_ratio).values ) self.nodes_list_stepwise_probability_distribution = list(set(nodes)) else: table = self.get_stepwise_probability_distribution(nb_cores=nb_cores) states = table.columns.values[1:].tolist() if "<nil>" in states: states.remove("<nil>") if nodes is None: nodes = get_nodes(states) else: nodes = list(set(nodes)) self.nodes_list_stepwise_probability_distribution = nodes node_dict = {} for state in states: t_nodes = state.split(" -- ") t_nodes = [node for node in t_nodes if node in nodes] if len(t_nodes) > 0: node_dict.update({state: t_nodes}) if nb_cores > 1: self.nodes_stepwise_probability_distribution = make_nodes_table_parallel(table, nodes, node_dict, nb_cores) else: self.nodes_stepwise_probability_distribution = make_nodes_table(table, nodes, node_dict) self.nodes_stepwise_probability_distribution.insert(0, column='PopRatio', value=(self.pop_ratios*self.uppModel.base_ratio).values) return self.nodes_stepwise_probability_distribution
[docs] def save(self, path): """ .. py:method:: Saves the maboss model, the population ratios timeserie, and the probility distribution timeseries in the specified path :param path: The location in which to save the results """ if not os.path.exists(path): os.mkdir(path) self.save_model(path) self.save_population_ratios(os.path.join(path, "PopRatios.csv")) self.save_stepwise_probability_distribution(os.path.join(path, "PopProbTraj.csv"))
def save_model(self, path): with ExitStack() as stack: cfg_file = stack.enter_context(open(os.path.join(path, "model.cfg"), 'w')) bnd_file = stack.enter_context(open(os.path.join(path, "model.bnd"), 'w')) self.uppModel.model.print_cfg(cfg_file) self.uppModel.model.print_bnd(bnd_file) def save_population_ratios(self, path): (self.pop_ratios*self.uppModel.base_ratio).to_csv(path, header=["PopRatio"], index_label="Step") def save_stepwise_probability_distribution(self, path): nb_cores = int(self.uppModel.model.param["thread_count"]) self.get_stepwise_probability_distribution(nb_cores=nb_cores).to_csv(path, index_label="Step") def _buildUpdateCfg(self, simulation, traj_fd, stepIndex): """Configure the MaBoSS model for the next run of UppMaBoss. In practice, _buildUpdateCfg uses the previous simulation result to compute pop ratio, death, division, parameters, nodes formulas and sets the init state of the model for the next step of simulation :param simulation: MaBoSS simulation :param traj_file: trajectory file of previous run :param stepIndex: current step of the UppMaBoss simulation """ # # Read first and last line, extract last states with respective probs # first_line, last_line = read_first_last_lines_from_trajectory (traj_fd) states, probs = get_states_probs_from_trajectory_line (first_line, last_line) # # Update pop ratio # self.pop_ratio *= self._updatePopRatio (states, probs) new_time = self.uppModel.time_shift + self.uppModel.time_step*stepIndex self.pop_ratios[new_time] = self.pop_ratio # # Normalize # states, probs = self._normalize_with_death_and_division (states, probs) if states is None: return None # # Compute formulas for parameters and nodes # parameters = self._compute_parameters(simulation, states, probs) nodes_with_formula = self._compute_nodes_formula (states, probs) # # Apply new values for parameters # simulation.param.update(parameters) # # Init states # nodes_to_init, new_istate = self._initCond_Trajline (states, probs) simulation.network.set_istate (nodes_to_init, new_istate, warnings=False) # # Init nodes having a formula # for a_node in nodes_with_formula.keys(): new_val = nodes_with_formula[a_node] simulation.network.set_istate(a_node, [1-new_val,new_val], warnings=False) return simulation def _normalize_with_death_and_division(self, states, probs): """ Take into account impact of death and division and normalize NB: if no death, nor division is defined, do nothing :param states: list of states extracted from the trajectory :param probs: list of states probabilities extracted from the trajectory """ # # speed up programm when no death, nor division # if not self.uppModel.death_node and not self.uppModel.division_node: return states, probs # # if death or division # norm_factor = 0 death_prob = 0 division_prob = 0 states_ret = [] probs_ret = [] for one_state, one_prob in zip(states, probs): one_state_ret = one_state.copy() one_prob_ret = one_prob if self.uppModel.death_node in one_state_ret: death_prob += one_prob_ret one_prob_ret = 0 else: if self.uppModel.division_node in one_state_ret: division_prob += one_prob_ret one_prob_ret *= 2.0 one_state_ret.remove(self.uppModel.division_node) norm_factor += one_prob_ret states_ret.append (one_state_ret) probs_ret.append (one_prob_ret) if self.verbose: print("Norm Factor:%g probability of death: %g probability of division: %g" \ % (norm_factor, death_prob, division_prob)) # # if norm_factor <= 0, no more cells # if norm_factor <= 0: return None, None # # If norm_factor > 0, normalize # probs_ret = np.array(probs_ret, dtype = float) probs_ret = (probs_ret / norm_factor).tolist() return states_ret, probs_ret def _compute_parameters(self, simulation, states, probs): """ Computer parameters :param simulation: MaBoss model (containing the defining of parameters) :param states: list of states extracted from the trajectory :param probs: list of states probabilities extracted from the trajectory """ parameters = {} for parameter, value in simulation.param.items(): if parameter.startswith("$") and parameter in self.uppModel.update_var.keys(): formula = self.uppModel.update_var[parameter] new_value = varDef_Upp(formula, states, probs) for match in re.findall(r"#rand", new_value): rand_number = random.uniform(0, 1) new_value = new_value.replace("#rand", str(rand_number), 1) new_value = new_value.replace("#pop_ratio", str(self.pop_ratio)) parameters.update({parameter: new_value}) if self.verbose: print("Updated variable: %s = %s" % (parameter, new_value)) return parameters def _compute_nodes_formula (self, states, probs): """ Computer nodes formula to be used as init values for next run :param states: list of states extracted from the trajectory :param probs: list of states probabilities extracted from the trajectory """ all_node_upd = {} for node_upd in self.uppModel.nodes_formula.keys(): node_formula = self.uppModel.nodes_formula[node_upd] new_value = varDef_Upp(node_formula, states, probs) for match in re.findall(r"#rand", new_value): rand_number = random.uniform(0, 1) new_value = new_value.replace("#rand", str(rand_number), 1) new_value = new_value.replace("#pop_ratio", str(self.pop_ratio)) # # Remove trailing ';' added by varDef_Upp, compute formula via eval # and convert to proba # new_value = float (eval(new_value[:-1])) new_value = np.clip(new_value, 0, 1) all_node_upd.update({node_upd: new_value}) if self.verbose: print("Updated node:", node_upd, "=", new_value) return all_node_upd def _initCond_Trajline(self, states, probs): """ Return the list of states to be initialized from states and probs The function excludes from states the nodes with formulas or (for the first run) nodes with init value supplied as parameter :param states: list of states extracted from the trajectory :param probs: list of states probabilities extracted from the trajectory :param nodes_init: dict of nodes values of the form { "NODE1" : TrueValue1, "NODE2" : TrueValue2, ... }. Nodes to exclude from InitCond as these nodes have a specific init value """ new_istate = OrderedDict() # # Remove from the list of nodes the ones having a rule or an init value # nodes_to_exclude = set(self.uppModel.nodes_formula.keys()) if self.nodes_init: nodes_to_exclude= nodes_to_exclude | set(self.nodes_init.keys()) list_nodes_to_set = list( set(self.uppModel.node_list) - nodes_to_exclude ) # Not necessary for MaBoss, only to have visually an ouput always in the same order list_nodes_to_set.sort() # # Associate an index to each node # name2idx = {name: i for i, name in enumerate(list_nodes_to_set)} # # Exclude nodes with formula or with specific init from states # states = exclude_nodes_from_states (states, nodes_to_exclude) # # Construct a list of states (each state is tuple of nodes on/off) # with the probability of each state # for state, prob in zip(states, probs): # # Inclusion of modified _str2state code # str2state = [ 0 ] * len(name2idx) if '<nil>' not in state: for node in state: str2state[name2idx[node]] = 1 state_tuple = tuple(str2state) if state_tuple in new_istate.keys(): prob += new_istate[state_tuple] new_istate.update({state_tuple: prob}) return list_nodes_to_set, new_istate def _updatePopRatio (self, states, probs): """ Return update population ratio using nodes having death or division :param states: states extracted from the trajectory :param probs: probabilities of states extracted from the trajectory """ upd_pop_ratio = 0.0 for a_state, a_prob in zip(states, probs): if self.uppModel.death_node not in a_state: if self.uppModel.division_node in a_state: upd_pop_ratio += 2 * a_prob else: upd_pop_ratio += a_prob return upd_pop_ratio
def varDef_Upp(update_line, states, probs): res_match = re.findall(r"p\[[^\[]*\]", update_line) if len(res_match) == 0: print("Syntax error in the parameter update definition : %s" % update_line, file=sys.stderr) exit() for match in res_match: lhs, rhs = match.split("=") lhs = lhs.replace("p[", "").replace("]", "").replace("(", "").replace(")", "") rhs = rhs.replace("[", "").replace("]", "").replace("(", "").replace(")", "") node_list = [token.strip() for token in lhs.split(",")] boolVal_list = [token.strip() for token in rhs.split(",")] if len(node_list) != len(boolVal_list): print("Wrong probability definitions for \"%s\"" % match) exit() upNodeList = [] downNodeList = [] for i, node in enumerate(node_list): if float(boolVal_list[i]) == 0.0: downNodeList.append(node) else: upNodeList.append(node) probValue = 0.0 for i in range(0, len(states)): upNodeProbTraj = states[i] interlength = 0 for upNodePt in upNodeProbTraj: for upNode in upNodeList: if upNodePt == upNode: interlength += 1 if interlength == len(upNodeList): interlength = 0 for upNodePt in upNodeProbTraj: for downNode in downNodeList: if upNodePt == downNode: interlength = 1 break if interlength == 1: break if interlength == 0: probValue += probs[i] update_line = update_line.replace(match, str(probValue), 1) update_line += ";" return update_line def _get_next_condition_from_trajectory(self, next_model, step=-1): """ Set the values of MaBoss model when resuming from a previous run :param next_model : MaBoss model :param step: step of the UppMaBoss simulation """ # # Extract states and probs from trajectory # with self.results[step]._get_probtraj_fd() as trajfd: first_line, last_line = read_first_last_lines_from_trajectory (trajfd) states, probs = get_states_probs_from_trajectory_line (first_line, last_line) # # Compute formulas for nodes # nodes_with_formula = self._compute_nodes_formula (states, probs) # # Init states # nodes_to_set, new_istate = self._initCond_Trajline(states, probs) next_model.network.set_istate (nodes_to_set, new_istate, warnings=False) # # Init nodes having a formula # for a_node in nodes_with_formula.keys(): new_val = nodes_with_formula[a_node] next_model.network.set_istate(a_node, [1-new_val,new_val], warnings=False) # # Init nodes having an init value # if self.nodes_init: for a_node, new_val in self.nodes_init.items(): next_model.network.set_istate(a_node, [1-new_val,new_val], warnings=False) if self.verbose: print("Starting init of node:", a_node, "=", new_val) def read_first_last_lines_from_trajectory (f): """Read first and last lines of a trajectory file :param traj_file: trajectory file """ first_line = "" last_line = "" first_pass = True # # This loop avoid to load all the file in memory # for line in f: if first_pass: first_line = line first_pass = False else: last_line = line f.close() first_line = first_line.strip("\n") last_line = last_line.strip("\n") return first_line, last_line def get_states_probs_from_trajectory_line (first_line, last_line): """Extract states and probabilities from the first and last lines of a trajectory file :param first_line: first line of trajectory file :param last_line: last line of trajectory file """ # # Split using tab separator # first_line_as_list = first_line.split("\t") last_line_as_list = last_line.split("\t") # # Locate 'State' cols # cols_state = next(i for i, col in enumerate (first_line_as_list) if col == "State") # # Extract state from 'State' cols # states = [ s.split(" -- ") for s in last_line_as_list[cols_state::3] ] # # Extract probs from 'State' cols + 1 # probs = [float(v) for v in last_line_as_list[cols_state+1::3]] return states, probs def exclude_nodes_from_states (traj_states, nodes_exluded=None): """Exclude nodes from states :param traj_states: list of states (each state is a liste of nodes) :param nodes_exluded: list/set of nodes to remove """ if (not nodes_exluded) or len(nodes_exluded) <= 0: return traj_states traj_states_ret = [] for one_traj_states in traj_states: new_traj_states = list( set(one_traj_states) - set(nodes_exluded) ) if len (new_traj_states) <= 0: new_traj_states = ["<nil>"] traj_states_ret.append (new_traj_states) return traj_states_ret def make_stepwise_probability_distribution_line(result): return result.get_last_states_probtraj() def get_nodes(states): nodes = set() for s in states: if s != '<nil>': nds = s.split(' -- ') for nd in nds: nodes.add(nd) return list(nodes) def make_node_line(row, states_table, index, node_dict): for state, nd_state in node_dict.items(): for nd in nd_state: row[nd] += states_table.iloc[index, states_table.columns.get_loc(state)] def make_nodes_table(spd, nodes, node_dict): table = pd.DataFrame( np.zeros((len(spd.index), len(nodes))), index=spd.index.values, columns=nodes ) for index, (_, row) in enumerate(table.iterrows()): make_node_line(row, spd, index, node_dict) return table def make_node_line_parallel(states_row, nodes, node_dict, row): table = pd.DataFrame( np.zeros((1, len(nodes))), index=[row], columns=nodes ) for row_index, row_state in states_row.iteritems(): if row_index in node_dict.keys() and row_state > 0.0: for nd in node_dict[row_index]: table.loc[row, nd] += row_state return table def make_nodes_table_parallel(spd, nodes, node_dict, nb_cores): tables = [] with Pool(processes=6) as pool: tables = pool.starmap( make_node_line_parallel, [(spd.iloc[index, :], nodes, node_dict, row) for index, row in enumerate(spd.index)] ) table = pd.concat(tables, axis=0, sort=False) return table