import sys
import warnings
import joblib
import pickle
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import h5py
sns.set_context('paper')
warnings.filterwarnings('ignore')
from Topyfic.topic import Topic
from Topyfic.utilsAnalyseModel import MA_plot
[docs]
class TopModel:
"""
A class that saved a model
:param name: name of class
:type name: str
:param N: number of topics
:type N: int
:param gene_weights: dataframe that has weights of genes for each topics; genes are indexes and topics are columns
:type gene_weights: pandas dataframe
:param topics: dictionary contains all topics for the topmodel
:type topics: Dictionary of Topics
:param model: store reproducible LDA model
:type model: sklearn.decomposition.LatentDirichletAllocation
"""
def __init__(self,
name,
N,
topics=None,
gene_weights=None,
gene_information=None,
model=None):
if gene_weights is None and topics is None:
sys.exit("Both gene weights and topics can not be empty at the same time!")
if topics is not None:
self.topics = topics
else:
self.topics = dict()
for i in range(N):
topic_gene_weights = pd.DataFrame(gene_weights.iloc[:, i].values,
index=gene_weights.index,
columns=[f"Topic_{i + 1}"])
self.topics[f"Topic_{i + 1}"] = Topic(topic_id=f"Topic_{i + 1}",
topic_gene_weights=topic_gene_weights,
gene_information=gene_information)
self.name = name
self.N = N
self.model = model
[docs]
def get_feature_name(self):
"""
get feature(gene) name
:return: list of feature(gene) name
:rtype: list
"""
topic1 = next(iter(self.topics.items()))[1]
return topic1.gene_information.index.tolist()
[docs]
def save_rLDA_model(self, name='rLDA', save_path="", file_format='joblib'):
"""
save rLDA model (instance of LDA model in sklearn) as a joblib/HDF5 file.
:param name: name of the joblib file (default: rLDA)
:type name: str
:param save_path: directory you want to use to save pickle file (default is saving near script)
:type save_path: str
:param file_format: format of the file you want to save (option: joblib (default), HDF5)
:type file_format: str
"""
if file_format not in ['joblib', 'HDF5']:
sys.exit(f"{file_format} is not correct! It should be 'joblib' or 'HDF5'.")
if file_format == "joblib":
print(f"Saving rLDA model as {name}_{self.N}topics.joblib")
joblib.dump(self.model, f"{save_path}{name}_{self.N}topics.joblib", compress=3)
if file_format == "HDF5":
print(f"Saving rLDA model as {name}_{self.N}topics.h5")
f = h5py.File(f"{name}_{self.N}topics.h5", "a")
f['components_'] = self.model.components_
f['exp_dirichlet_component_'] = self.model.exp_dirichlet_component_
f['n_batch_iter_'] = np.int_(self.model.n_batch_iter_)
f['n_features_in_'] = self.model.n_features_in_
f['n_iter_'] = np.int_(self.model.n_iter_)
f['bound_'] = np.float_(self.model.bound_)
f['doc_topic_prior_'] = np.float_(self.model.doc_topic_prior_)
f['topic_word_prior_'] = np.float_(self.model.topic_word_prior_)
f.close()
[docs]
def get_gene_weights(self):
"""
get feature(gene) weights
:return: dataframe contains feature(gene) weights; genes are indexes and topics are columns
:rtype: pandas dataframe
"""
gene_weights = pd.DataFrame(np.transpose(self.model.components_),
columns=[f'{self.name}_Topic_{i + 1}' for i in
range(self.model.components_.shape[0])],
index=self.get_feature_name())
return gene_weights
[docs]
def get_ranked_gene_weight(self):
"""
get sorted feature(gene) weights. each value is gene and weights on each topics
:return: dataframe contains feature(gene) and their weights; ranks are indexes and topics are columns
:rtype: pandas dataframe
"""
gene_weights = pd.DataFrame(np.transpose(self.model.components_),
columns=[f'{self.name}_Topic_{i + 1}' for i in
range(self.model.components_.shape[0])],
index=self.get_feature_name())
ranked_gene_weights = pd.DataFrame(columns=[f'{self.name}_Topic_{i + 1}' for i in
range(self.model.components_.shape[0])],
index=range(len(self.get_feature_name())))
for col in gene_weights.columns:
tmp = gene_weights.sort_values(by=col, ascending=False)
tmp[col] = ";" + tmp[col].astype(str)
tmp[col] = tmp.index.tolist() + tmp[col].astype(str)
ranked_gene_weights[col] = tmp[col].values
return ranked_gene_weights
[docs]
def get_top_model_attributes(self):
"""
get top model attributes to be able to make sklearn.decomposition.LatentDirichletAllocation
:return: three data frame which the first one is components, the second one is exp_dirichlet_component and
the last one is combining the rest of LDA attributes which put them to gather as a dataframe
:rtype: pandas dataframe, pandas dataframe, pandas dataframe
"""
feature = self.get_feature_name()
components = pd.DataFrame(self.model.components_,
index=[f"Topic_{i + 1}" for i in range(self.N)],
columns=feature)
exp_dirichlet_component = pd.DataFrame(self.model.exp_dirichlet_component_,
index=[f"Topic_{i + 1}" for i in range(self.N)],
columns=feature)
others = pd.DataFrame(
index=[f"Topic_{i + 1}" for i in range(self.N)],
columns=["n_batch_iter",
"n_features_in",
"n_iter",
"bound",
"doc_topic_prior",
"topic_word_prior"])
others.loc[[f"Topic_{i + 1}" for i in range(self.N)], "n_batch_iter"] = self.model.n_batch_iter_
others.loc[[f"Topic_{i + 1}" for i in range(self.N)], "n_features_in"] = self.model.n_features_in_
others.loc[[f"Topic_{i + 1}" for i in range(self.N)], "n_iter"] = self.model.n_iter_
others.loc[[f"Topic_{i + 1}" for i in range(self.N)], "bound"] = self.model.bound_
others.loc[[f"Topic_{i + 1}" for i in range(self.N)], "doc_topic_prior"] = self.model.doc_topic_prior_
others.loc[[f"Topic_{i + 1}" for i in range(self.N)], "topic_word_prior"] = self.model.topic_word_prior_
return components, exp_dirichlet_component, others
[docs]
def gene_weight_rank_heatmap(self,
genes=None,
topics=None,
show_rank=True,
scale=None,
save=True,
show=True,
figsize=None,
file_format="pdf",
file_name="gene_weight_rank_heatmap"):
"""
plot selected genes weights and their ranks in selected topics
:param genes: list of genes you want to see their weights (default: all genes)
:type genes: list
:param topics: list of topics
:type topics: list
:param show_rank: indicate if you want to show the rank of significant genes or not (default: True)
:type show_rank: bool
:param scale: indicate if you want to plot as log2, log10 or not (default: None which show actual value) other options is log2 and log10
:scale scale: str
:param save: indicate if you want to save the plot or not (default: True)
:type save: bool
:param show: indicate if you want to show the plot or not (default: True)
:type show: bool
:param figsize: indicate the size of plot (default: (10 * (len(category) + 1), 10))
:type figsize: tuple of int
:param file_format: indicate the format of plot (default: pdf)
:type file_format: str
:param file_name: name and path of the plot use for save (default: gene_weight_rank_heatmap)
:type file_name: str
"""
if genes is None:
genes = self.get_feature_name()
elif not (set(genes) & set(self.get_feature_name())) == set(genes):
sys.exit("some/all of genes are not part of topModel!")
if topics is None:
topics = [f'{self.name}_Topic_{i + 1}' for i in range(self.model.components_.shape[0])]
elif not (set(topics) & set(
[f'{self.name}_Topic_{i + 1}' for i in range(self.model.components_.shape[0])])) == set(topics):
sys.exit("some/all of topics are not part of topModel!")
if scale not in ["log2", "log10", None]:
sys.exit("scale is not valid!")
if figsize is None:
figsize = (self.N * 1.5, len(genes))
gene_weights = self.get_gene_weights()
gene_weights = gene_weights.loc[:, topics]
gene_rank = gene_weights.loc[genes, topics]
for topic in topics:
tmp = gene_weights.sort_values([topic], ascending=False)
tmp['rank'] = range(1, tmp.shape[0] + 1)
gene_rank.loc[genes, topic] = tmp.loc[genes, 'rank'].values
gene_weights = self.get_gene_weights()
gene_weights = gene_weights.loc[genes, topics]
gene_weights = gene_weights.reindex(genes)
gene_weights = gene_weights.reindex(topics, axis="columns")
gene_rank = gene_rank.reindex(genes)
gene_rank = gene_rank.reindex(topics, axis="columns")
gene_rank = gene_rank.astype(int)
gene_rank[gene_weights <= self.N] = 0
gene_rank = gene_rank.astype(str)
gene_rank[gene_rank == "0"] = ""
fig, ax = plt.subplots(figsize=figsize,
facecolor='white')
if scale == "log2":
gene_weights = gene_weights.applymap(np.log2)
if scale == "log10":
gene_weights = gene_weights.applymap(np.log10)
if scale is None:
scale = "no"
if show_rank:
sns.heatmap(gene_weights,
cmap='viridis',
annot=gene_rank,
linewidths=.5,
fmt='',
ax=ax)
ax.set_title(f'gene weights with rank ({scale} scale)')
else:
sns.heatmap(gene_weights,
cmap='viridis',
linewidths=.5,
ax=ax)
ax.set_title('gene weights')
if save:
plt.savefig(f"{file_name}.{file_format}")
if show:
plt.show()
else:
plt.close()
[docs]
def MA_plot(self,
topic1,
topic2,
size=None,
pseudocount=1,
threshold=1,
cutoff=2,
consistency_correction=1.4826,
#topN=None,
labels=None,
save=True,
show=True,
file_format="pdf",
file_name="MA_plot"):
"""
plot MA based on the gene weights on given topics
:param topic1: first topic to be compared
:type topic1: str
:param topic2: second topic to be compared
:type topic2: str
:param size: table contains size of dot for each genes (genes are index)
:type size: pandas dataframe
:param pseudocount: pseudocount that you want to add (default: 1)
:type pseudocount: float
:param threshold: threshold to filter genes based on A values (default: 1)
:type threshold: float
:param cutoff: cutoff for categorized genes by modified z-score (default: 2)
:type cutoff: float
:param consistency_correction: the factor converts the MAD to the standard deviation for a given distribution. The default value (1.4826) is the conversion factor if the underlying data is normally distributed
:type consistency_correction: float
:param topN: number of genes to be consider for calculating z-score based on the A value (if it's none is gonna be avarage of # genes in both topics with weights above threshold
:type topN: int
:param labels: list of gene names wish to show in MA-plot
:type labels: list
:param save: indicate if you want to save the plot or not (default: True)
:type save: bool
:param show: indicate if you want to show the plot or not (default: True)
:type show: bool
:param file_format: indicate the format of plot (default: pdf)
:type file_format: str
:param file_name: name and path of the plot use for save (default: MA_plot)
:type file_name: str
:return: return M and A values
"""
gene_weights = self.get_gene_weights().copy(deep=True)
topic1 = gene_weights[topic1]
topic2 = gene_weights[topic2]
gene_zscore = MA_plot(topic1,
topic2,
size=size,
pseudocount=pseudocount,
threshold=threshold,
cutoff=cutoff,
consistency_correction=consistency_correction,
#topN=topN,
labels=labels,
save=save,
show=show,
file_format=file_format,
file_name=file_name)
return gene_zscore
[docs]
def save_topModel(self, name=None, save_path="", file_format='pickle'):
"""
save TopModel class as a pickle/HDF5 file
:param name: name of the file (default: topModel_TopModel.name)
:type name: str
:param save_path: directory you want to use to save pickle file (default is saving near script)
:type save_path: str
:param file_format: format of the file you want to save (option: pickle (default), HDF5)
:type file_format: str
"""
if file_format not in ['pickle', 'HDF5']:
sys.exit(f"{file_format} is not correct! It should be 'pickle' or 'HDF5'.")
if name is None:
name = f"topModel_{self.name}"
if file_format == "pickle":
print(f"Saving topModel as {name}.p")
picklefile = open(f"{save_path}{name}.p", "wb")
pickle.dump(self, picklefile)
picklefile.close()
if file_format == "HDF5":
print(f"Saving topModel as {name}.h5")
f = h5py.File(f"{name}.h5", "w")
# model
model = f.create_group("model")
model['components_'] = self.model.components_
model['exp_dirichlet_component_'] = self.model.exp_dirichlet_component_
model['n_batch_iter_'] = np.int_(self.model.n_batch_iter_)
model['n_features_in_'] = self.model.n_features_in_
model['n_iter_'] = np.int_(self.model.n_iter_)
model['bound_'] = np.float_(self.model.bound_)
model['doc_topic_prior_'] = np.float_(self.model.doc_topic_prior_)
model['topic_word_prior_'] = np.float_(self.model.topic_word_prior_)
# topics
topics = f.create_group("topics")
for topic in self.topics.keys():
topic_gp = topics.create_group(self.topics[topic].id)
topic_gp['id'] = np.string_(self.topics[topic].id)
topic_gp['name'] = np.string_(self.topics[topic].name)
topic_gp['gene_weights'] = self.topics[topic].gene_weights
gene_information = self.topics[topic].gene_information.copy(deep=True)
gene_information.reset_index(inplace=True)
gene_information = gene_information.T.reset_index().T
topic_gp['gene_information'] = np.array(gene_information)
topic_information = self.topics[topic].topic_information.copy(deep=True)
topic_information.reset_index(inplace=True)
topic_information = topic_information.T.reset_index().T
topic_gp['topic_information'] = np.array(topic_information)
f['name'] = np.string_(self.name)
f['N'] = np.int_(self.N)
f.close()