Source code for diffupy.matrix

# -*- coding: utf-8 -*-

"""Main Matrix Class."""

import logging
import os

import numpy as np
import pandas as pd
from diffupy.constants import CSV
from networkx import DiGraph

from .utils import get_label_ix_mapping, get_label_list_graph, get_laplacian, decode_labels, get_idx_scores_mapping, \
    get_repeated_labels, from_dataframe_file, from_nparray_to_df

log = logging.getLogger(__name__)

__all__ = [
    'Matrix',
    'LaplacianMatrix',
]


[docs]class Matrix: """Matrix class.""" def __init__( self, mat=None, rows_labels=None, cols_labels=None, graph=None, quadratic=False, name='', init_value=None, ): """Initialize matrix. :param mat: matrix initialization :param rows_labels: :param cols_labels: column labels :param graph: graph :param quadratic: quadratic :param name: name :param init_value: value to be initialized (int) or list of values from labels """ # This initialization would make a matrix representing the input scores if isinstance(rows_labels, list) or isinstance(rows_labels, set) or isinstance(rows_labels, np.ndarray): self.rows_labels = list(rows_labels) # This initialization would make a matrix representing the graph (taken the graph labels 'name') elif graph: self.rows_labels = list(get_label_list_graph(graph, 'name')) else: raise ValueError('No rows_labels list provided.') if isinstance(cols_labels, list) or isinstance(cols_labels, set) or isinstance(cols_labels, np.ndarray): self._cols_labels = list(cols_labels) elif not quadratic: raise ValueError('No cols_labels list provided.') self.name = name self.quadratic = quadratic # if isinstance(init_value, list): # mat = init_value # todo: THIS SHOULD BE AN ELIF if init_value is not None and self.rows_labels and list(self.cols_labels): mat = np.full((len(self.rows_labels), len(self.cols_labels)), init_value) elif mat is None: raise ValueError('A path matrix or initialization should be provided.') self.mat = np.array(mat) self.get_labels = True self.get_indices = False self.validate_labels() def __str__(self): """Return a string representation of the Matrix.""" s = f" {self.cols_labels}" for i, row_label in enumerate(self.rows_labels): s += f"\n {row_label} {self.mat[i]} " return f"\nmatrix {self.name} \n {s} \n " def __iter__(self, **attr): """Iterate Matrix.""" self.i = -1 self.j = 0 if 'get_indices' in attr: self.get_indices = attr['get_indices'] if 'get_labels' in attr: self.get_labels = attr['get_labels'] return self def __next__(self): """Iterate Matrix helper.""" if self.i >= len(self.rows_labels) - 1 and self.j >= len(self.cols_labels) - 1: self.get_labels = True self.get_indices = False raise StopIteration if self.i >= len(self.rows_labels) - 1: self.i = 0 self.j += 1 else: self.i += 1 nxt = tuple() if len(self.rows_labels) == 1: nxt += (self.mat[0, self.j],) elif len(self.cols_labels) == 1: nxt += (self.mat[self.i, 0],) else: nxt += (self.mat[self.i][self.j],) if self.get_indices: nxt += (self.i, self.j,) if self.get_labels: nxt += (self.rows_labels[self.i], self.cols_labels[self.j]) if len(nxt) == 1: return nxt[0] else: return nxt def __copy__(self): """Return a copy of a Matrix object.""" return Matrix(self.mat, rows_labels=self.rows_labels, cols_labels=self.cols_labels, name=self.name, quadratic=self.quadratic, ) """Validators """
[docs] def validate_labels(self): """Sanity function to check the dimensionality of the Matrix.""" if self.rows_labels: self.rows_labels = decode_labels(self.rows_labels) if len(self.rows_labels) != len(set(self.rows_labels)): dup = get_repeated_labels(self.rows_labels) raise Exception( 'Duplicate row labels in Matrix. /n duplicated number: {} /n duplicated list: {}'.format( len(dup), dup ) ) if hasattr(self, '_cols_labels'): self._cols_labels = decode_labels(self.cols_labels) if len(self._cols_labels) != len(set(self._cols_labels)): raise Exception('Duplicate column labels in Matrix.')
[docs] def update_ix_mappings(self): """Update the index-label mapping.""" if hasattr(self, '_rows_labels_ix_mapping') and self.rows_labels: self._rows_labels_ix_mapping = get_label_ix_mapping(self.rows_labels) if hasattr(self, '_cols_labels_ix_mapping') and hasattr(self, '_cols_labels'): self._cols_labels_ix_mapping = get_label_ix_mapping(self._cols_labels)
[docs] def validate_labels_and_update_ix_mappings(self): """Update function, called when the Matrix mutates, combining the two previous functionalities.""" self.validate_labels() self.update_ix_mappings()
"""Getters and Setters""" # Columns labels @property def cols_labels(self): """Return a copy of Matrix Object.""" if self.quadratic: return self.rows_labels return self._cols_labels @cols_labels.setter def cols_labels(self, cols_labels): """Set column labels.""" if self.quadratic: self.rows_labels = list(cols_labels) else: self._cols_labels = list(cols_labels) # Rows ix mapping @property def rows_labels_ix_mapping(self): """Set row labels to ix.""" if hasattr(self, '_rows_labels_ix_mapping'): return self._rows_labels_ix_mapping self._rows_labels_ix_mapping = get_label_ix_mapping(self.rows_labels) return self._rows_labels_ix_mapping @rows_labels_ix_mapping.setter def rows_labels_ix_mapping(self, rows_labels_ix_mapping): """Set labels labels to ix.""" self._rows_labels_ix_mapping = rows_labels_ix_mapping # Columns ix mapping @property def cols_labels_ix_mapping(self): """Set column labels to ix.""" if self.quadratic: return self.rows_labels_ix_mapping if hasattr(self, '_cols_labels_ix_mapping'): return self._cols_labels_ix_mapping self._cols_labels_ix_mapping = get_label_ix_mapping(self.cols_labels) return self._cols_labels_ix_mapping @cols_labels_ix_mapping.setter def cols_labels_ix_mapping(self, cols_labels_ix_mapping): """Set mapping labels to ix.""" if self.quadratic: self._rows_labels_ix_mapping = cols_labels_ix_mapping self._cols_labels_ix_mapping = cols_labels_ix_mapping # Rows scores mapping @property def rows_idx_scores_mapping(self): """Set mapping indexes to scores.""" if hasattr(self, '_rows_idx_scores_mapping'): return self._rows_idx_scores_mapping self._rows_idx_scores_mapping = get_idx_scores_mapping(self.mat) return self._rows_idx_scores_mapping @rows_idx_scores_mapping.setter def rows_idx_scores_mapping(self, rows_idx_scores_mapping): """Set mapping rows to ids.""" self._rows_idx_scores_mapping = rows_idx_scores_mapping # Columns scores mapping @property def cols_idx_scores_mapping(self): """Set mapping indexes to scores.""" if hasattr(self, '_cols_idx_scores_mapping'): return self._cols_idx_scores_mapping self._cols_idx_scores_mapping = get_idx_scores_mapping(self.mat.transpose()) return self._cols_idx_scores_mapping @cols_idx_scores_mapping.setter def cols_idx_scores_mapping(self, cols_idx_scores_mapping): if self.quadratic: self._rows_idx_scores_mapping = cols_idx_scores_mapping self._cols_idx_scores_mapping = cols_idx_scores_mapping """Getters setters and delete from labels"""
[docs] def get_row_from_label(self, label): """Get row from labels.""" return self.mat[self.rows_labels_ix_mapping[label]]
[docs] def set_row_from_label(self, label, x): """Set row from label.""" self.mat[self.rows_labels_ix_mapping[label]] = x
[docs] def delete_row_from_label(self, label): """Set row from label.""" self.mat = np.delete(self.mat, self.rows_labels_ix_mapping[label], 0) self.rows_labels.remove(label) self.update_ix_mappings()
[docs] def get_col_from_label(self, label): """Get col from labels.""" return self.mat[:, self.cols_labels_ix_mapping[label]]
[docs] def delete_col_from_label(self, label): """Set col from label.""" self.mat = np.delete(self.mat, self.cols_labels_ix_mapping[:, self.cols_labels_ix_mapping[label]], 1) self.cols_labels.remove(label) self.update_ix_mappings()
[docs] def set_cell_from_labels(self, row_label, col_label, x): """Set cell from labels.""" if len(self.rows_labels) == 1: self.mat[0, self.cols_labels_ix_mapping[col_label]] = x elif len(self.cols_labels) == 1: self.mat[self.rows_labels_ix_mapping[row_label], 0] = x else: self.mat[self.rows_labels_ix_mapping[row_label], self.cols_labels_ix_mapping[col_label]] = x
[docs] def get_cell_from_labels(self, row_label, col_label): """Get cell from labels.""" if len(self.rows_labels) == 1: return self.mat[0, self.cols_labels_ix_mapping[col_label]] elif len(self.cols_labels) == 1: return self.mat[self.rows_labels_ix_mapping[row_label], 0] else: return self.mat[self.rows_labels_ix_mapping[row_label], self.cols_labels_ix_mapping[col_label]]
"""Methods""" """Binds"""
[docs] def row_bind(self, rows=None, rows_labels=None, matrix=None): """Return a copy of Matrix Object.""" if matrix: rows = matrix.mat rows_labels = matrix.rows_labels if list(rows): self.mat = np.concatenate((self.mat, np.array(rows)), axis=0) self.rows_labels += rows_labels self.validate_labels_and_update_ix_mappings() else: log.warning('No column given to concatenate to matrix.')
[docs] def col_bind(self, cols=None, cols_labels=None, matrix=None): """Return a copy of Matrix Object.""" if matrix: cols = matrix.mat cols_labels = matrix.cols_labels if list(cols): self.mat = np.concatenate((self.mat, np.array(cols)), axis=1) self.cols_labels += cols_labels self.validate_labels_and_update_ix_mappings() else: log.warning('No column given to concatenate to matrix.')
"""Match matrices"""
[docs] def match_rows(self, reference_matrix): """Match method to set rows labels as reference matrix.""" if self.quadratic: log.warning('Changing rows of a symmetric Matrix implies changing also columns.') return self.match_mat(reference_matrix, True) mat_match = self.__copy__() mat_match.rows_labels = reference_matrix.rows_labels for row_label in reference_matrix.rows_labels: mat_match.mat[reference_matrix.rows_labels_ix_mapping[row_label]] = self.get_row_from_label(row_label) mat_match.validate_labels_and_update_ix_mappings() return mat_match
[docs] def match_cols(self, reference_matrix): """Match method to set cols labels as reference matrix.""" if reference_matrix.cols_labels == reference_matrix.cols_labels: return self if self.quadratic: log.warning('Changing columns of a symmetric Matrix implies changing also rows.') return self.match_mat(reference_matrix, True) mat_match = self.__copy__() mat_match.cols_labels = reference_matrix.cols_labels for col_label in reference_matrix.cols_labels: mat_match.mat[reference_matrix.cols_labels_ix_mapping[col_label]] = self.get_col_from_label(col_label) mat_match.validate_labels_and_update_ix_mappings() return mat_match
[docs] def match_mat(self, reference_matrix, match_quadratic=None): """Match method to set axis labels as reference matrix.""" if reference_matrix.cols_labels == self.cols_labels and reference_matrix.rows_labels == self.rows_labels: return self mat_match = self.__copy__() mat_match.rows_labels = reference_matrix.rows_labels if match_quadratic is None: match_quadratic = reference_matrix.quadratic if not match_quadratic: mat_match.cols_labels = reference_matrix.cols_labels else: Warning('Matching quadratic matrix: Same columns and row labels.') for score, row_label, col_label in iter(reference_matrix): mat_match.mat[reference_matrix.rows_labels_ix_mapping[row_label], reference_matrix.cols_labels_ix_mapping[col_label]] \ = self.get_cell_from_labels(row_label, col_label) mat_match.validate_labels_and_update_ix_mappings() return mat_match
[docs] def match_missing_rows(self, reference_labels, missing_fill=0): """Match method to set missing rows labels from reference labels with the missing_fill value.""" if reference_labels == self.rows_labels: return self missing_labels = set(reference_labels) - set(self.rows_labels) mat_match = self.__copy__() mat_match.rows_labels += list(missing_labels) missing_values = np.full((len(missing_labels), len(self.cols_labels)), missing_fill) mat_match.mat = np.concatenate((mat_match.mat, missing_values), axis=0) mat_match.validate_labels_and_update_ix_mappings() return mat_match
[docs] def match_delete_rows(self, reference_labels): """Match method to set missing rows labels from reference labels with the missing_fill value.""" if reference_labels == self.rows_labels: return self mat_match = self.__copy__() over_labels = set(mat_match.rows_labels) - set(reference_labels) for label in over_labels: mat_match.delete_row_from_label(label) mat_match.validate_labels_and_update_ix_mappings() return mat_match
[docs] def match_missing_cols(self, reference_labels, missing_fill): """Match method to set missing cols labels from reference labels with the missing_fill value.""" if reference_labels == self.cols_labels: return self mat_match = self.__copy__() missing_labels = set(reference_labels) - set(self.cols_labels) mat_match.cols_labels.append(missing_labels) missing_values = np.array([len(self.rows_labels), len(reference_labels.cols_labels)]) missing_values.fill(missing_fill) mat_match.mat = np.concatenate(mat_match.mat, missing_values, axis=1) mat_match.validate_labels_and_update_ix_mappings() return mat_match
"""Order"""
[docs] def order_rows(self, reverse=True, col_ref_idx=None): """Order matrix rows by cell values.""" # Get the row index-cell value mapping. mapping = self.rows_idx_scores_mapping if len(self.mat[0]) != 1: if isinstance(col_ref_idx, int): mapping = {k: v[col_ref_idx] for k, v in mapping.items()} else: raise ValueError('Please use integers as indexes') # Get a list of index ordered by row values. idx_order = [k for k in sorted(mapping, key=mapping.get, reverse=reverse)] # Get a copy of the matrix object for not infer accessing values and to return. ordered_mat = self.__copy__() # Set the matrix's cells and row labels order according the previous ordered index list. for i, idx in enumerate(idx_order): ordered_mat.mat[i] = self.mat[idx] ordered_mat.rows_labels[i] = self.rows_labels[idx] return ordered_mat
[docs] def len_not_null(self): """Get count of n cells not 0 in matrix.""" cnt = 0 for x in self.__iter__(get_labels=False, get_indices=False): if x != -1 and x != 0: cnt += 1 return cnt
[docs] def binarize(self, null_value=-1, threshold=0, positive_value=1): """Get count of n cells not 0 in matrix.""" binarize = False for score, i, j in self.__iter__(get_labels=False, get_indices=True): if score not in [null_value, positive_value]: binarize = True if score > threshold: self.mat[i, j] = positive_value else: self.mat[i, j] = null_value if binarize: log.warning( f"The input scores must be binary. The array {self.cols_labels} has been automatically binarized.")
"""Export"""
[docs] def to_dict(self, ordered=True): """Export/convert matrix as a dictionary data structure.""" if ordered: mat = self.order_rows() else: mat = self # Construct dict first assigning the headers of rows_labels d = {'rows_labels': mat.rows_labels} for col_label in mat.cols_labels: d[col_label] = mat.get_col_from_label(col_label) return d
[docs] def as_pd_dataframe(self, ordered=True): """Export matrix as a data frame using the headers (row_labels, cols_labels) of the Matrix class.""" d = self.to_dict(ordered) rows_labels = d.pop('rows_labels') df = pd.DataFrame(d) df.set_axis(rows_labels) return df
[docs] def as_csv(self, path, index=True, ordered=True): """Export matrix to csv file using the headers (row_labels, cols_labels) of the Matrix class.""" # Generate dataframe self.as_pd_dataframe(ordered).to_csv(path, index=index)
[docs] def to_nx_graph(self): """Export matrix as a Graph using the headers (row_labels, cols_labels) of the Matrix class.""" if len(self.cols_labels) != len(self.rows_labels) or not self.quadratic: raise ValueError('The matrix cannot be converted as a graph since it is not quadratic, which ' 'it is the used representation of a network (usually a kernel) as a Matrix.') graph = DiGraph() for score, sub_name, obj_name in self.__iter__(get_labels=True, get_indices=False): if score != 0: graph.add_edge( sub_name, obj_name, ) return graph
class MatrixFromDict(Matrix): """Constructor matrix class for Dictionary data structure to Matrix conversion.""" def __init__(self, d, name=''): """Initialize laplacian.""" rows = list(d.pop('rows_labels')) cols = list(d.keys()) Matrix.__init__(self, mat=np.array(list(d.values())), rows_labels=rows, cols_labels=cols, quadratic=len(cols) == len(rows), name=name ) class MatrixFromDataFrame(Matrix): """Constructor matrix class for DataFrame to Matrix conversion.""" def __init__(self, df, name=''): """Initialize laplacian.""" rows = list(df.rows.values) cols = list(df.cols.values) Matrix.__init__(self, mat=df.to_numpy(), rows_labels=rows, cols_labels=cols, quadratic=len(cols) == len(rows), name=name ) class MatrixFromNumpyArray(Matrix): """Constructor matrix class for DataFrame to Matrix conversion.""" def __init__(self, nparray, name=''): """Initialize laplacian.""" df = from_nparray_to_df(nparray) rows = list(df.rows.values) cols = list(df.cols.values) Matrix.__init__(self, mat=df.to_numpy(), rows_labels=rows, cols_labels=cols, quadratic=len(cols) == len(rows), name=name ) class MatrixFromCSV(Matrix): """Constructor matrix class for CSV to Matrix conversion.""" def __init__(self, csv_path, fmt=CSV, name=None): """Initialize laplacian.""" df = from_dataframe_file(csv_path, fmt) if name is None: name = str(os.path.basename(csv_path).replace('.csv', '')) rows = list(df.rows.values) cols = list(df.cols.values) Matrix.__init__(self, mat=df.to_numpy(), rows_labels=rows, cols_labels=cols, quadratic=len(cols) == len(rows), name=name ) class MatrixFromGraph(Matrix): """Constructor matrix class for nx.Graph to Matrix conversion.""" # TODO : move instances initialization from global argument graph to here def __init__(self, graph, node_argument='name', name=''): # This initialization would make a matrix representing the graph (taking a graph argument as label) rows = list(get_label_list_graph(graph, node_argument)) Matrix.__init__(self, rows_labels=rows, init_value=1, quadratic=True, name=name, )
[docs]class LaplacianMatrix(Matrix): """Laplacian matrix class.""" def __init__(self, graph, normalized=False, node_argument='name', name=''): """Initialize laplacian.""" l_mat = get_laplacian(graph, normalized) rows = list(get_label_list_graph(graph, node_argument)) Matrix.__init__(self, mat=l_mat, rows_labels=rows, quadratic=True, name=name )