secretflow.ml.nn.fl package#

Subpackages#

Submodules#

secretflow.ml.nn.fl.compress module#

Functions:

stc_compress(compressor, server_weights, ...)

do_compress([strategy, sparsity, ...])

secretflow.ml.nn.fl.compress.stc_compress(compressor, server_weights, agg_updates, res)[source]#
secretflow.ml.nn.fl.compress.do_compress(strategy='batch', sparsity=0.0, server_weights=None, updates=None, res=None)[source]#

secretflow.ml.nn.fl.fl_model module#

FedModel

Classes:

FLModel([server, device_list, model, ...])

class secretflow.ml.nn.fl.fl_model.FLModel(server=None, device_list: List[PYU] = [], model: Union[TorchModel, Callable[[], tensorflow.keras.Model]] = None, aggregator=None, strategy='fed_avg_w', consensus_num=1, backend='tensorflow', **kwargs)[source]#

Bases: object

Methods:

__init__([server, device_list, model, ...])

init_workers(model, device_list, strategy, ...)

initialize_weights()

handle_file(train_dict, label[, batch_size, ...])

handle_data(train_x[, train_y, batch_size, ...])

fit(x, y[, batch_size, batch_sampling_rate, ...])

Horizontal federated training interface

predict(x[, batch_size, label_decoder, ...])

Horizontal federated offline prediction interface

evaluate(x[, y, batch_size, sample_weight, ...])

Horizontal federated offline evaluation interface

save_model(model_path[, is_test])

Horizontal federated save model interface

load_model(model_path[, is_test])

Horizontal federated load model interface

__init__(server=None, device_list: List[PYU] = [], model: Union[TorchModel, Callable[[], tensorflow.keras.Model]] = None, aggregator=None, strategy='fed_avg_w', consensus_num=1, backend='tensorflow', **kwargs)[source]#
init_workers(model, device_list, strategy, backend)[source]#
initialize_weights()[source]#
handle_file(train_dict: Dict[PYU, str], label: str, batch_size: Union[int, Dict[PYU, int]] = 32, sampling_rate=None, shuffle=False, random_seed=1234, epochs=1, stage='train', label_decoder=None, max_batch_size=20000, prefetch_buffer_size=None)[source]#
handle_data(train_x: Union[HDataFrame, FedNdarray], train_y: Optional[Union[HDataFrame, FedNdarray]] = None, batch_size: Union[int, Dict[PYU, int]] = 32, sampling_rate=None, shuffle=False, random_seed=1234, epochs=1, sample_weight: Optional[Union[HDataFrame, FedNdarray]] = None, sampler_method='batch', stage='train')[source]#
fit(x: Union[HDataFrame, FedNdarray, Dict[PYU, str]], y: Union[HDataFrame, FedNdarray, str], batch_size: Union[int, Dict[PYU, int]] = 32, batch_sampling_rate: Optional[float] = None, epochs: int = 1, verbose: int = 1, callbacks=None, validation_data=None, shuffle=False, class_weight=None, sample_weight=None, validation_freq=1, aggregate_freq=1, label_decoder=None, max_batch_size=20000, prefetch_buffer_size=None, sampler_method='batch', random_seed=None, dp_spent_step_freq=None, audit_log_dir=None) History[source]#

Horizontal federated training interface

Parameters
  • x – feature, FedNdArray, HDataFrame or Dict {PYU: model_path}

  • y – label, FedNdArray, HDataFrame or str(column name of label)

  • batch_size – Number of samples per gradient update, int or Dict, recommend 64 or more for safety

  • batch_sampling_rate – Ratio of sample per batch, float

  • epochs – Number of epochs to train the model

  • verbose – 0, 1. Verbosity mode

  • callbacks – List of keras.callbacks.Callback instances.

  • validation_data – Data on which to evaluate

  • shuffle – whether to shuffle the training data

  • class_weight – Dict mapping class indices (integers) to a weight (float)

  • sample_weight – weights for the training samples

  • validation_freq – specifies how many training epochs to run before a new validation run is performed

  • aggregate_freq – Number of steps of aggregation

  • label_decoder – Only used for CSV reading, for label preprocess

  • max_batch_size – Max limit of batch size

  • prefetch_buffer_size – An int specifying the number of feature batches to prefetch for performance improvement. Only for csv reader

  • sampler_method – The name of sampler method

  • random_seed – Prg seed for shuffling

  • dp_spent_step_freq – specifies how many training steps to check the budget of dp

  • audit_log_dir – path of audit log dir, checkpoint will be save if audit_log_dir is not None

Returns

A history object. It’s history.global_history attribute is a aggregated record of training loss values and metrics, while history.local_history attribute is a record of training loss values and metrics of each party.

predict(x: Union[HDataFrame, FedNdarray, Dict], batch_size=None, label_decoder=None, sampler_method='batch', random_seed=1234) Dict[PYU, PYUObject][source]#

Horizontal federated offline prediction interface

Parameters
  • x – feature, FedNdArray or HDataFrame

  • batch_size – Number of samples per gradient update, int or Dict

  • label_decoder – Only used for CSV reading, for label preprocess

  • sampler_method – The name of sampler method

  • random_seed – Prg seed for shuffling

Returns

predict results, numpy.array

evaluate(x: Union[HDataFrame, FedNdarray, Dict], y: Optional[Union[HDataFrame, FedNdarray, str]] = None, batch_size: Union[int, Dict[PYU, int]] = 32, sample_weight: Optional[Union[HDataFrame, FedNdarray]] = None, label_decoder=None, return_dict=False, sampler_method='batch', random_seed=None) Tuple[Union[List[Metric], Dict[str, Metric]], Union[Dict[str, List[Metric]], Dict[str, Dict[str, Metric]]]][source]#

Horizontal federated offline evaluation interface

Parameters
  • x – Input data. It could be: - FedNdArray - HDataFrame - Dict {PYU: model_path}

  • y – Label. It could be: - FedNdArray - HDataFrame - str column name of csv

  • batch_size – Integer or Dict. Number of samples per batch of computation. If unspecified, batch_size will default to 32.

  • sample_weight – Optional Numpy array of weights for the test samples, used for weighting the loss function.

  • label_decoder – User define how to handle label column when use csv reader

  • sampler_method – The name of sampler method

  • return_dict – If True, loss and metric results are returned as a dict, with each key being the name of the metric. If False, they are returned as a list.

Returns

A tuple of two objects. The first object is a aggregated record of metrics, and the second object is a record of training loss values and metrics of each party.

save_model(model_path: Union[str, Dict[PYU, str]], is_test=False)[source]#

Horizontal federated save model interface

Parameters
  • model_path – model path, only support format like ‘a/b/c’, where c is the model name

  • is_test – whether is test mode

load_model(model_path: Union[str, Dict[PYU, str]], is_test=False)[source]#

Horizontal federated load model interface

Parameters
  • model_path – model path

  • is_test – whether is test mode

secretflow.ml.nn.fl.metrics module#

keras global evaluation metrics

Classes:

Metric()

Default(name, total, count)

Mean(name, total, count)

keras.metrics.Mean on fede

AUC(name, thresholds, true_positives, ...[, ...])

Federated keras.metrics.AUC

Precision(name, thresholds, true_positives, ...)

Federated keras.metrics.Precision

Recall(name, thresholds, true_positives, ...)

Federated keras.metrics.Recall

Functions:

aggregate_metrics(local_metrics)

Aggregate Model metrics values of each party and calculate global metrics.

class secretflow.ml.nn.fl.metrics.Metric[source]#

Bases: ABC

Methods:

result()

abstract result()[source]#
class secretflow.ml.nn.fl.metrics.Default(name: str, total: float, count: float)[source]#

Bases: Metric

Attributes:

name

total

count

Methods:

result()

__init__(name, total, count)

name: str#
total: float#
count: float#
result()[source]#
__init__(name: str, total: float, count: float) None#
class secretflow.ml.nn.fl.metrics.Mean(name: str, total: float, count: float)[source]#

Bases: Metric

keras.metrics.Mean on fede

total#

sum of metrics

Type

float

count#

num of samples

Type

float

Attributes:

name

total

count

Methods:

result()

__init__(name, total, count)

name: str#
total: float#
count: float#
result()[source]#
__init__(name: str, total: float, count: float) None#
class secretflow.ml.nn.fl.metrics.AUC(name: str, thresholds: List[float], true_positives: List[float], true_negatives: List[float], false_positives: List[float], false_negatives: List[float], curve=None)[source]#

Bases: Metric

Federated keras.metrics.AUC

thresholds#

threshold of buckets. same to tf.keras.metrics.AUC,must contain 0 and 1.

true_positives#

num samples of true positive.

true_negatives#

num samples of true negative.

false_positives#

num samples of false positive.

false_negatives#

num samples of false negative.

curve#

type of AUC curve, same to ‘tf.keras.metrics.AUC’, it can be ‘ROC’ or ‘PR’.

Methods:

__init__(name, thresholds, true_positives, ...)

result()

__init__(name: str, thresholds: List[float], true_positives: List[float], true_negatives: List[float], false_positives: List[float], false_negatives: List[float], curve=None)[source]#
result()[source]#
class secretflow.ml.nn.fl.metrics.Precision(name: str, thresholds: float, true_positives: float, false_positives: float)[source]#

Bases: Metric

Federated keras.metrics.Precision

thresholds#

value of threshold, float or list, in [0, 1].

Type

float

true_positives#

num samples of true positive

Type

float

false_positives#

num samples of false positive

Type

float

Attributes:

name

thresholds

true_positives

false_positives

Methods:

result()

__init__(name, thresholds, true_positives, ...)

name: str#
thresholds: float#
true_positives: float#
false_positives: float#
result()[source]#
__init__(name: str, thresholds: float, true_positives: float, false_positives: float) None#
class secretflow.ml.nn.fl.metrics.Recall(name: str, thresholds: float, true_positives: float, false_negatives: float)[source]#

Bases: Metric

Federated keras.metrics.Recall

thresholds#

value of threshold, float or list, in [0, 1].

Type

float

true_positives#

num samples of true positive

Type

float

false_negatives#

num samples of false negative

Type

float

Attributes:

name

thresholds

true_positives

false_negatives

Methods:

result()

__init__(name, thresholds, true_positives, ...)

name: str#
thresholds: float#
true_positives: float#
false_negatives: float#
result()[source]#
__init__(name: str, thresholds: float, true_positives: float, false_negatives: float) None#
secretflow.ml.nn.fl.metrics.aggregate_metrics(local_metrics: List[List]) List[source]#

Aggregate Model metrics values of each party and calculate global metrics.

Parameters

local_metrics – Model metrics values in this party.

Returns

A list of aggregations of each party metrics.

secretflow.ml.nn.fl.sparse module#

Classes:

STCSparse(sparse_rate)

Stc sparser, sample TopK element from original Weights TODO: 补充docstring

SCRSparse(threshold)

Stc sparser, sample TopK element from original Weights TODO: 补充docstring

class secretflow.ml.nn.fl.sparse.STCSparse(sparse_rate: float)[source]#

Bases: object

Stc sparser, sample TopK element from original Weights TODO: 补充docstring

Methods:

__init__(sparse_rate)

__init__(sparse_rate: float)[source]#
class secretflow.ml.nn.fl.sparse.SCRSparse(threshold: float)[source]#

Bases: object

Stc sparser, sample TopK element from original Weights TODO: 补充docstring

Methods:

__init__(threshold)

get_dimension(index_value, threshold)

__init__(threshold: float)[source]#
get_dimension(index_value, threshold)[source]#

secretflow.ml.nn.fl.strategy_dispatcher module#

Classes:

Dispatcher()

Functions:

register_strategy([_cls, strategy_name, backend])

register new strategy

dispatch_strategy(name, backend, *args, **kwargs)

strategy dispatcher

class secretflow.ml.nn.fl.strategy_dispatcher.Dispatcher[source]#

Bases: object

Methods:

__init__()

register(name, cls)

dispatch(name, backend, *args, **kwargs)

__init__()[source]#
register(name, cls)[source]#
dispatch(name, backend, *args, **kwargs)[source]#
secretflow.ml.nn.fl.strategy_dispatcher.register_strategy(_cls=None, *, strategy_name=None, backend=None)[source]#

register new strategy

Parameters
  • _cls

  • strategy_name – name of strategy

Returns:

secretflow.ml.nn.fl.strategy_dispatcher.dispatch_strategy(name, backend, *args, **kwargs)[source]#

strategy dispatcher

Parameters
  • name – name of strategy, str

  • *args

  • **kwargs

Returns:

secretflow.ml.nn.fl.utils module#

Classes:

History(local_history, Dict[str, ...)

Functions:

metric_wrapper(func, *args, **kwargs)

optim_wrapper(func, *args, **kwargs)

class secretflow.ml.nn.fl.utils.History(local_history: Dict[str, Dict[str, List[float]]] = <factory>, local_detailed_history: Dict[str, Dict[str, List[secretflow.ml.nn.fl.metrics.Metric]]] = <factory>, global_history: Dict[str, List[float]] = <factory>, global_detailed_history: Dict[str, List[secretflow.ml.nn.fl.metrics.Metric]] = <factory>)[source]#

Bases: object

Attributes:

local_history

Examples: >>> { 'alice': {'loss': [0.46011224], 'accuracy': [0.8639647]}, 'bob': {'loss': [0.46011224], 'accuracy': [0.8639647]}, }

local_detailed_history

Examples: >>> { 'alice': { 'mean': [Mean()] }, 'bob': { 'mean': [Mean()] }, }

global_history

Examples: >>> { 'loss': [0.46011224], 'accuracy': [0.8639647] }

global_detailed_history

Examples: >>> { 'loss': [Loss(name='loss')], 'precision': [Precision(name='precision')], }

Methods:

__init__([local_history, ...])

record_local_history(party, metrics[, stage])

record_global_history(metrics[, stage])

local_history: Dict[str, Dict[str, List[float]]]#

Examples: >>> {

‘alice’: {‘loss’: [0.46011224], ‘accuracy’: [0.8639647]}, ‘bob’: {‘loss’: [0.46011224], ‘accuracy’: [0.8639647]},

}

local_detailed_history: Dict[str, Dict[str, List[Metric]]]#

Examples: >>> {

‘alice’: {

‘mean’: [Mean()]

}, ‘bob’: {

‘mean’: [Mean()]

},

}

global_history: Dict[str, List[float]]#

Examples: >>> {

‘loss’: [0.46011224], ‘accuracy’: [0.8639647]

}

global_detailed_history: Dict[str, List[Metric]]#

Examples: >>> {

‘loss’: [Loss(name=’loss’)], ‘precision’: [Precision(name=’precision’)],

}

__init__(local_history: ~typing.Dict[str, ~typing.Dict[str, ~typing.List[float]]] = <factory>, local_detailed_history: ~typing.Dict[str, ~typing.Dict[str, ~typing.List[~secretflow.ml.nn.fl.metrics.Metric]]] = <factory>, global_history: ~typing.Dict[str, ~typing.List[float]] = <factory>, global_detailed_history: ~typing.Dict[str, ~typing.List[~secretflow.ml.nn.fl.metrics.Metric]] = <factory>) None#
record_local_history(party, metrics: List[Metric], stage='train')[source]#
record_global_history(metrics: List[Metric], stage='train')[source]#
secretflow.ml.nn.fl.utils.metric_wrapper(func, *args, **kwargs)[source]#
secretflow.ml.nn.fl.utils.optim_wrapper(func, *args, **kwargs)[source]#

Module contents#