Source code for secretflow.utils.compressor

import numpy as np
from abc import ABC, abstractmethod
from scipy import sparse
from typing import Union, List, Any


[docs]class Compressor(ABC): """Abstract base class for cross device data compressor"""
[docs] @abstractmethod def compress( self, data: Union[np.ndarray, List[np.ndarray]] ) -> Union[Any, List[Any]]: """Compress data before send. Args: data (Union[np.ndarray, List[np.ndarray]]): data need to compress. Returns: Union[Any, List[Any]]: compressed data. """ raise NotImplementedError()
[docs] @abstractmethod def decompress( self, data: Union[Any, List[Any]] ) -> Union[np.ndarray, List[np.ndarray]]: """Decompress data after receive. Args: data (Union[Any, List[Any]]): data need to decompress. Returns: Union[np.ndarray, List[np.ndarray]]: decompressed data. """ raise NotImplementedError()
[docs]class SparseCompressor(Compressor):
[docs] def __init__(self, sparse_rate: float): """Initialize Args: sparse_rate: the percentage of cells are zero. """ assert ( 0 <= sparse_rate <= 1 ), f'sparse rate should between 0 and 1, but get {sparse_rate}' self.sparse_rate = sparse_rate self.fuse_sparse_masks = []
@abstractmethod def _compress_one(self, data: np.ndarray) -> sparse.spmatrix: """Compress one data to sparse matrix. Args: data (np.ndarray): data need to compress. Returns: sparse.spmatrix: compressed sparse matrix. """ raise NotImplementedError() # sample random element from original List[np.ndarray]
[docs] def compress( self, data: Union[np.ndarray, List[np.ndarray]], ) -> Union[sparse.spmatrix, List[sparse.spmatrix]]: """Compress data to sparse matrix before send. Args: data (Union[np.ndarray, List[np.ndarray]]): data need to compress. Returns: Union[sparse.spmatrix, List[sparse.spmatrix]]: compressed data. """ # there is no need for sparsification in evaluate/predict. is_list = True if isinstance(data, np.ndarray): is_list = False data = [data] elif not isinstance(data, (list, tuple)): assert False, f'invalid data: {type(data)}' out = list(map(lambda d: self._compress_one(d), data)) return out if is_list else out[0]
[docs] def decompress( self, data: Union[sparse.spmatrix, List[sparse.spmatrix]] ) -> Union[np.ndarray, List[np.ndarray]]: """Decompress data from sparse matrix to dense after received. Args: data (Union[sparse.spmatrix, List[sparse.spmatrix]]): data need to decompress. Returns: Union[np.ndarray, List[np.ndarray]]: decompressed data. """ # there is no need for sparsification in evaluate/predict. is_list = True if sparse.issparse(data): is_list = False data = [data] elif not isinstance(data, (list, tuple)): assert False, f'invalid data: {type(data)}' data = list(map(lambda d: d.todense(), data)) return data if is_list else data[0]
[docs]class RandomSparse(SparseCompressor): """Random sparse compressor compress data by randomly set element to zero."""
[docs] def __init__(self, sparse_rate: float): super().__init__(sparse_rate)
def _compress_one(self, data): data_shape = data.shape data_flat = data.flatten() data_len = data_flat.shape[0] mask_num = round((1 - self.sparse_rate) * data_len) rng = np.random.default_rng() mask_index = rng.choice(data_len, mask_num) row, col = np.unravel_index(mask_index, data_shape) matrix = sparse.coo_matrix( (data_flat[mask_index], (row, col)), shape=data_shape ) return matrix.tocsr()
[docs]class TopkSparse(SparseCompressor): """Topk sparse compressor use topK algorithm to transfer dense matrix into sparse matrix."""
[docs] def __init__(self, sparse_rate: float): super().__init__(sparse_rate)
def _compress_one(self, data): data_shape = data.shape data_flat = data.flatten() data_len = data_flat.shape[0] mask_num = round((1 - self.sparse_rate) * data_len) mask_index = np.argpartition(np.abs(data), -mask_num, axis=None)[-mask_num:] row, col = np.unravel_index(mask_index, data_shape) matrix = sparse.coo_matrix( (data_flat[mask_index], (row, col)), shape=data_shape ) return matrix.tocsr()