Source code for memo_ms.classes

from dataclasses import dataclass, field
from collections import Counter
from spec2vec import SpectrumDocument
import pandas as pd
import numpy as np
from memo_ms import import_data
from tqdm import tqdm
import os
import copy

#pylint: disable=too-many-arguments
def filter_table(table, use_samples_pattern = False, samples_pattern = '', max_occurence = None, min_rel_occurence = 0, max_rel_occurence = 1):
    
    if use_samples_pattern:
        table_matched = table[table.index.str.contains(samples_pattern, case = False)]
        matched_samples = list(table_matched.index)
        table_matched = table_matched.loc[:, (table_matched != 0).any(axis=0)]
        count_null = table_matched.replace(0, np.nan).isnull().sum()
        
        if max_occurence is not None:
            excluded_features = count_null[count_null < (len(table_matched)-max_occurence)].index
            table_filtered = table.drop(excluded_features, axis=1)
        else: 
            table_filtered = table
        
        table_filtered = table_filtered.drop(matched_samples, axis=0)
        table_filtered = table_filtered.astype(float)
        table_filtered = table_filtered.loc[:, (table_filtered != 0).any(axis=0)]    
    else:
        table_filtered = table.replace(0, np.nan)
        len_table = len(table_filtered)
        min_rel = min_rel_occurence*len_table
        max_rel = max_rel_occurence*len_table
        table_filtered = table_filtered.loc[:, table_filtered.count(axis=0) <= max_rel]
        table_filtered = table_filtered.loc[:, table_filtered.count(axis=0) >= min_rel]
        table_filtered = table_filtered.fillna(0).astype(float)
    
    return table_filtered
        
           
[docs]@dataclass class SpectraDocuments: """Create a SpectraDocuments dataclass object containing spectra documents and metadata from an MzMine 2 spectra file (.mgf) Args: path (str): Path to spectra file (.mgf) min_relative_intensity (float): Minimal relative intensity to keep a peak max_relative_intensity (float): Maximal relative intensity to keep a peak min_peaks_required (int): Minimum number of peaks to keep a spectrum losses_from (int): minimal m/z value for losses losses_to (int): maximal m/z value for losses n_decimals (int): number of decimal when translating peaks/losses into words Returns: self.document (DataFrame): A table containing spectra documents and metadata """ path : str min_relative_intensity : float = 0.01 max_relative_intensity : float = 1.00 min_peaks_required : int = 10 losses_from : int = 10 losses_to : int = 200 n_decimals : int = 2 spectra : list = field(init=False) document : pd.DataFrame = field(init=False) def __post_init__(self): self.spectra = import_data.load_and_filter_from_mgf( path = self.path, min_relative_intensity = self.min_relative_intensity, max_relative_intensity = self.max_relative_intensity, loss_mz_from = self.losses_from, loss_mz_to = self.losses_to, n_required = self.min_peaks_required ) self.document = self._spec2doc() def _spec2doc(self) -> pd.DataFrame: """ Apply filters to spectra and convert them to words vectors with the number of specified decimals. Returns a pd.DataFrame with spectra "documents" and metadata. """ documents = [SpectrumDocument(s, n_decimals=self.n_decimals) for s in self.spectra] doc_with_meta = pd.DataFrame(s.metadata for s in self.spectra) doc_with_meta['documents'] = list(doc.words for doc in documents) doc_with_meta.scans = doc_with_meta.scans.astype(int) return doc_with_meta
[docs]@dataclass class FeatureTable: """Create a FeatureTable dataclass object from a feature table Args: path (str): Path to a feature table file (.csv) software (str): One of [mzmine, xcms, msdial, memo]: the software used for feature detection. Returns: self.feature_table (DataFrame): A cleaned feature quantification table """ path : str software : str feature_table : pd.DataFrame = field(init=False) def __post_init__(self): if self.software == 'mzmine': self.feature_table = import_data.import_mzmine2_quant_table(path = self.path) elif self.software == 'xcms': self.feature_table = import_data.import_xcms_quant_table(path = self.path) elif self.software == 'msdial': self.feature_table = import_data.import_msdial_quant_table(path = self.path) elif self.software == 'memo': self.feature_table = import_data.import_memo_quant_table(path = self.path) else: raise ValueError("software argument missing, choose one of the currently supported pre-processing softwares: [mzmine, xcms, msdial]")
[docs] def filter(self, use_samples_pattern = False, samples_pattern = '', max_occurence = None, min_rel_occurence = 0, max_rel_occurence = 1): """Filter a feature table: remove samples matching samples_pattern AND remove features occuring in more than n = max_occurence samples matched by samples_pattern Args: use_samples_pattern (bool): filter using a str pattern samples_pattern (str): the str pattern to match in samples to filter max_occurence (int): maximal number of occurence allowed in matched samples before removing a feature min_rel_occurence (float): remove features contained in less than (min_rel_occurence * 100) percent of the samples max_rel_occurence (float): remove features contained in more than (max_rel_occurence * 100) percent of the samples Returns: self.filtered_feature_table (DataFrame): A filtered feature table """ output = copy.deepcopy(self) output.feature_table = filter_table(output.feature_table, use_samples_pattern, samples_pattern, max_occurence, min_rel_occurence, max_rel_occurence) return output
[docs] def export_matrix(self, path, sep = ','): """Export a given matrix Args: path (str): path to export sep (str): separator Returns: None """ self.feature_table.to_csv(path, sep=sep)
[docs]@dataclass class MemoMatrix: """Create an empty MemoMatrix dataclass object """
[docs] def memo_from_aligned_samples(self, featuretable, spectradocuments) -> pd.DataFrame: """ Use a featuretable and a spectradocuments to generate a MEMO matrix. Returns a pd.DataFrame MEMO matrix. Args: featuretable (FeatureTable): a FeatureTable dataclass object spectradocuments (SpectraDocuments): a SpectraDocuments dataclass oject Returns: self.memo_matrix (DataFrame): A MEMO matrix """ if featuretable is None: raise ValueError("featuretable argument missing") if spectradocuments is None: raise ValueError("spectradocuments argument missing") if not isinstance(featuretable, FeatureTable): raise TypeError("featuretable argument must be of type FeatureTable") if not isinstance(spectradocuments, SpectraDocuments): raise TypeError("spectradocuments argument must be of type SpectraDocuments") print('generating memo_matrix from input featuretable and spectradocument') feature_table = featuretable.feature_table.copy() document = spectradocuments.document[['scans', 'documents']].set_index('scans')['documents'].to_dict() feature_table[feature_table == 0] = float('nan') results = feature_table.stack().reset_index(level=1).groupby(level=0, sort=False)['feature_id'].apply(list).to_dict() for samples in tqdm(results): results[samples] = [document.get(item,item) for item in results[samples]] results[samples] = [ x for x in results[samples] if not isinstance(x, int)] results[samples] = [item for sublist in results[samples] for item in sublist] results[samples] = Counter(results[samples]) memo_matrix = pd.DataFrame(results) memo_matrix = memo_matrix.transpose() memo_matrix.fillna(0, inplace=True) memo_matrix.index.name = 'filename' self.memo_matrix = memo_matrix
[docs] def memo_from_unaligned_samples(self, path_to_samples_dir, pattern_to_match = '.mgf', min_relative_intensity = 0.01, max_relative_intensity = 1.00, min_peaks_required = 10, losses_from = 10, losses_to = 200, n_decimals = 2): """Generate a Memo matrix from a list of individual .mgf files Args: path_to_samples_dir (str): Path to the directory where individual .mgf files are gathered. Subfolders will also be checked. pattern_to_match (str): Shared pattern between all spectra files to input. Will be removed in memo_matrix.index. min_relative_intensity (float): Minimal relative intensity to keep a peak max_relative_intensity (float): Maximal relative intensity to keep a peak min_peaks_required (int): Minimum number of peaks to keep a spectrum losses_from (int): minimal m/z value for losses losses_to (int): maximal m/z value for losses n_decimals (int): number of decimal when translating peaks/losses into words Returns: self.memo_matrix (DataFrame): A MEMO matrix """ #pylint: disable=too-many-locals dic_memo = {} mgf_file = [] path_to_mg_file = [] for (root, _, files) in os.walk(path_to_samples_dir, topdown=True): for file in files: if file.endswith(pattern_to_match): path_to_match_file = os.path.join(root, file) path_to_mg_file.append(path_to_match_file) mgf_file.append(file) for path, file in tqdm(zip(path_to_mg_file, mgf_file), total=len(path_to_mg_file)): spectra = import_data.load_and_filter_from_mgf( path = path, min_relative_intensity = min_relative_intensity, max_relative_intensity = max_relative_intensity, loss_mz_from = losses_from, loss_mz_to = losses_to, n_required = min_peaks_required ) documents = [SpectrumDocument(s, n_decimals= n_decimals) for s in spectra] documents = list(doc.words for doc in documents) documents = [item for sublist in documents for item in sublist] documents = dict(Counter(documents)) dic_memo[file.replace(pattern_to_match, '')] = documents self.memo_matrix = pd.DataFrame.from_dict(dic_memo, orient='index').fillna(0)
[docs] def filter(self, use_samples_pattern = False, samples_pattern = '', max_occurence = None, min_rel_occurence = 0, max_rel_occurence = 1): """Filter a MEMO matrix: remove samples matching samples_pattern AND remove features occuring in more than n = max_occurence samples matched by samples_pattern Args: use_samples_pattern (bool): filter using a str pattern samples_pattern (str): the str pattern to match in samples to filter max_occurence (int): maximal number of occurence allowed in matched samples before removing a word min_rel_occurence (float): remove words contained in less than (min_rel_occurence * 100 percent) of the samples max_rel_occurence (float): remove words contained in more than (max_rel_occurence * 100 percent) of the samples Returns: self.memo_matrix (DataFrame): A filtered feature table matrix """ output = copy.deepcopy(self) output.memo_matrix = filter_table(output.memo_matrix, use_samples_pattern, samples_pattern, max_occurence, min_rel_occurence, max_rel_occurence) return output
[docs] def merge_memo(self, memomatrix_2, drop_not_in_common=False): """Merge 2 MEMO matrix Args: memocontainer2 (MemoContainer): MemoMatrix dataclass object containing the 2nd MEMO matrix to merge drop_not_in_common (bool): Drop peaks/losses not in common Returns: MemoContainer (MemoContainer): A MemoMatrix dataclass object containing the merged MEMO matrix """ output = MemoMatrix() if not isinstance(memomatrix_2, MemoMatrix): raise TypeError ("merge_memo() memomatrix_2 argument must be a MemoMatrix") table_left = self.memo_matrix table_right = memomatrix_2.memo_matrix if drop_not_in_common is True: result = table_left.append(table_right, sort=False).dropna(axis='columns').fillna(0) else: result = table_left.append(table_right, sort=False).fillna(0) output.memo_matrix = result return output
[docs] def export_matrix(self, path, sep = ','): """Export a given matrix Args: path (str): path to export sep (str): separator Returns: None """ self.memo_matrix.to_csv(path, sep=sep)