# Copyright 2022 Ant Group Co., Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License")
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# This is a single party based bi-classification report
from typing import List, Tuple, Union
import jax.numpy as jnp
import pandas as pd
from .utils import equal_obs, equal_range
[docs]class Report:
"""Report containing all other reports for bi-classification evaluation
Attributes:
summary_report: SummaryReport
group_reports: List[GroupReport]
eq_frequent_bin_report: List[EqBinReport]
eq_range_bin_report: List[EqBinReport]
head_report: List[PrReport]
reports for fpr = 0.001, 0.005, 0.01, 0.05, 0.1, 0.2
"""
[docs] def __init__(
self,
eq_frequent_result_arr_list,
eq_range_result_arr_list,
summary_report_arr,
head_prs,
):
self.eq_frequent_bin_report = [
EqBinReport(a) for a in eq_frequent_result_arr_list
]
self.eq_range_result_arr_list = [
EqBinReport(a) for a in eq_range_result_arr_list
]
self.summary_report = SummaryReport(summary_report_arr)
self.head_report = [EqBinReport(a) for a in head_prs]
[docs]class PrReport:
"""Precision Related statistics Report.
Attributes:
fpr: float
FP/(FP+TN)
precision: float
TP/(TP+FP)
recall: float
TP/(TP+FN)
"""
[docs] def __init__(self, arr):
assert arr.size == PR_REPORT_STATISTICS_ENTRY_COUNT
self.fpr = arr[0]
self.precision = arr[1]
self.recall = arr[2]
PR_REPORT_STATISTICS_ENTRY_COUNT = 6
[docs]class SummaryReport:
"""Summary Report for bi-classification evaluation.
Attributes:
total_samples: int
positive_samples: int
negative_samples: int
auc: float
auc: area under the curve: https://developers.google.com/machine-learning/crash-course/classification/roc-and-auc
ks: float
Kolmogorov-Smirnov statistics: https://en.wikipedia.org/wiki/Kolmogorov%E2%80%93Smirnov_test
f1_score: float
harmonic mean of precision and recall: https://en.wikipedia.org/wiki/F-score
"""
[docs] def __init__(self, arr):
assert arr.size == SUMMARY_REPORT_STATISTICS_ENTRY_COUNT
self.total_samples = arr[0]
self.positive_samples = arr[1]
self.negative_samples = arr[2]
self.auc = arr[3]
self.ks = arr[4]
self.f1_score = arr[5]
SUMMARY_REPORT_STATISTICS_ENTRY_COUNT = 6
[docs]class GroupReport:
"""Report for each group"""
group_name: str
summary: SummaryReport
[docs]class EqBinReport:
"""Statistics Report for each bin.
Attributes:
start_value: float
end_value: float
positive: int
negative: int
total: int
precision: float
recall: float
false_positive_rate: float
f1_score: float
lift: float
see https://en.wikipedia.org/wiki/Lift_(data_mining)
predicted_positive_ratio: float
predicted positive samples / total samples.
predicted_negative_ratio: float
predicted negative samples / total samples.
cumulative_percent_of_positive: float
cumulative_percent_of_negative: float
total_cumulative_percent: float
ks: float
avg_score: float
"""
[docs] def __init__(self, arr):
# assert arr.size == BIN_REPORT_STATISTICS_ENTRY_COUNT, "{}, {}".format(arr.size, BIN_REPORT_STATISTICS_ENTRY_COUNT)
self.start_value = arr[0]
self.end_value = arr[1]
self.positive = arr[2]
self.negative = arr[3]
self.total = arr[4]
self.precision = arr[5]
self.recall = arr[6]
self.false_positive_rate = arr[7]
self.f1_score = arr[8]
self.Lift = arr[9]
self.predicted_negative_ratio = arr[10]
self.predicted_negative_ratio = arr[11]
self.cumulative_percent_of_positive = arr[12]
self.cumulative_percent_of_negative = arr[13]
self.total_cumulative_percent = arr[14]
self.ks = arr[15]
self.avg_score = arr[16]
HEAD_FPR = [0.001, 0.005, 0.01, 0.05, 0.1, 0.2]
BIN_REPORT_STATISTICS_ENTRY_COUNT = 17
[docs]def gen_all_reports(
y_true: Union[pd.DataFrame, jnp.array],
y_score: Union[pd.DataFrame, jnp.array],
bin_size: int,
):
"""Generate all reports.
Args:
y_true: Union[pd.DataFrame, jnp.array]
should be of shape n * 1 and with binary entries
1 means positive sample
y_score: Union[pd.DataFrame, jnp.array]
should be of shape n * 1 and with each entry between [0, 1]
probability of being positive
bin_size: int
number of bins to evaluate
Returns:
"""
if isinstance(y_true, pd.DataFrame):
y_true = y_true.to_numpy()
if isinstance(y_score, pd.DataFrame):
y_score = y_score.to_numpy()
sorted_label_score_pair_arr = create_sorted_label_score_pair(y_true, y_score)
pos_count = jnp.sum(y_true)
eq_frequent_result_arr_list = eq_frequent_bin_evaluate(
sorted_label_score_pair_arr, pos_count, bin_size
)
eq_range_result_arr_list = eq_range_bin_evaluate(
sorted_label_score_pair_arr, pos_count, bin_size
)
# fill summary report
# positive has index 2
positive_samples = jnp.sum(
jnp.array([bin[2] for bin in eq_frequent_result_arr_list])
)
# negative has index 3
negative_samples = jnp.sum(
jnp.array([bin[3] for bin in eq_frequent_result_arr_list])
)
# ks has index 15
ks = jnp.max(jnp.array([bin[15] for bin in eq_frequent_result_arr_list]))
# f1 has index 8
f1 = jnp.max(jnp.array([bin[8] for bin in eq_frequent_result_arr_list]))
total_samples = positive_samples + negative_samples
auc = binary_roc_auc(sorted_label_score_pair_arr)
summary_report_arr = jnp.array(
[total_samples, positive_samples, negative_samples, auc, ks, f1]
)
# fill head prs
head_prs = gen_pr_reports(sorted_label_score_pair_arr, jnp.array(HEAD_FPR))
return Report(
eq_frequent_result_arr_list,
eq_range_result_arr_list,
summary_report_arr,
head_prs,
)
[docs]def create_sorted_label_score_pair(y_true: jnp.array, y_score: jnp.array):
"""produce an n * 2 shaped array with the second column as the sorted scores, in decreasing order"""
unsorted_array = jnp.concatenate([y_true, y_score], axis=1)
return unsorted_array[jnp.argsort(unsorted_array[:, 1])[::-1]]
[docs]def eq_frequent_bin_evaluate(
sorted_pairs: jnp.array, pos_count: int, bin_size: int
) -> List[jnp.array]:
"""Fill eq frequent bin report.
Args:
sorted_pairs: jnp.array
Should be of shape n * 2 and with second col sorted
pos_count: int
Total number of positive samples
bin_size: int
Total number of bins
Returns:
bin_reports: List[jnp.array]
"""
# split points have length bin_size + 1
split_points = equal_obs(sorted_pairs[:, 1], bin_size)
# split points should be reversed to become a decreasing sequence
split_points = jnp.flip(split_points)
# Each bin has domain (split_left, split_right]
return evaluate_bins(sorted_pairs, pos_count, split_points)
[docs]def eq_range_bin_evaluate(
sorted_pairs: jnp.array, pos_count: int, bin_size: int
) -> List[jnp.array]:
"""Fill eq range bin report.
Args:
sorted_pairs: jnp.array
Should be of shape n * 2 and with second col sorted.
pos_count: int
Total number of positive samples
bin_size: int
Total number of bins
Returns:
bin_reports: List[jnp.array]
"""
# split points have length bin_size + 1
split_points = equal_range(sorted_pairs[:, 1], bin_size)
# split points should be reversed to become a decreasing sequence
split_points = jnp.flip(split_points)
# Each bin has domain (split_left, split_right]
return evaluate_bins(sorted_pairs, pos_count, split_points)
[docs]def evaluate_bins(
sorted_pairs: jnp.array, pos_count: int, split_points
) -> List[jnp.array]:
"""evaluate bins given sorted pairs, pos_count and split_points (in decreasing order)"""
n_samples = sorted_pairs.shape[0]
neg_count = n_samples - pos_count
cumulative_pos_count = 0
cumulative_neg_count = 0
start_pos = 0
end_pos = 0
bins = []
for shard in range(len(split_points)):
while (end_pos < n_samples) and (
sorted_pairs[end_pos, 1] > split_points[shard]
):
end_pos += 1
t = bin_evaluate(
sorted_pairs,
start_pos,
end_pos,
pos_count,
neg_count,
cumulative_pos_count,
cumulative_neg_count,
)
bin_report_arr, cumulative_pos_count, cumulative_neg_count = t[0], t[1], t[2]
bins.append(bin_report_arr)
start_pos = end_pos
# last bin
bin_report_arr, _, _ = bin_evaluate(
sorted_pairs,
start_pos,
n_samples,
pos_count,
neg_count,
cumulative_pos_count,
cumulative_neg_count,
)
bins.append(bin_report_arr)
return bins
[docs]def bin_evaluate(
sorted_pairs,
start_pos,
end_pos,
total_pos_count,
total_neg_count,
cumulative_pos_count,
cumulative_neg_count,
) -> Tuple[jnp.array, int, int]:
"""Evaluate statistics for a bin.
Returns:
bin_report_arr: jnp.array
an array of size BIN_REPORT_STATISTICS_ENTRY_COUNT
cumulative_pos_count: int
cumulative_neg_count: int
"""
if end_pos == start_pos:
return jnp.zeros((BIN_REPORT_STATISTICS_ENTRY_COUNT, 1))
# compute new f1
(
true_positive,
true_negative,
false_positive,
false_negative,
) = confusion_matrix_from_cum_counts(
cumulative_pos_count, cumulative_neg_count, total_neg_count, total_pos_count
)
pos_count = jnp.sum(sorted_pairs[start_pos:end_pos, 0])
neg_count = end_pos - start_pos - pos_count
score_sum = jnp.sum(sorted_pairs[start_pos:end_pos, 1])
false_negative -= pos_count
true_positive += pos_count
true_negative -= neg_count
false_positive += neg_count
f1_score = compute_f1_score(true_positive, false_positive, false_negative)
# fill in rest of eq_bin_reports
start_value = float(sorted_pairs[end_pos - 1, 1])
end_value = float(sorted_pairs[start_pos, 1])
positive = int(pos_count)
negative = int(neg_count)
total = int(end_pos - start_pos)
precision, recall, false_positive_rate = precision_recall_false_positive_rate(
true_positive, false_positive, false_negative, true_negative
)
f1_score = float(f1_score)
lift = float(precision * (total_pos_count + total_neg_count) / total_pos_count)
predicted_positive_ratio = float(pos_count / total_pos_count)
predicted_negative_ratio = float(neg_count / total_neg_count)
cumulative_percent_of_positive = float(
(pos_count + cumulative_pos_count) / total_pos_count
)
cumulative_percent_of_negative = float(
(neg_count + cumulative_neg_count) / total_neg_count
)
total_cumulative_percent = float(
(pos_count + cumulative_pos_count + neg_count + cumulative_neg_count)
/ (total_pos_count + total_neg_count)
)
ks = abs(float(cumulative_percent_of_positive - cumulative_percent_of_negative))
avg_score = float(score_sum / total)
# pack into a single array
bin_report_arr = jnp.array(
[
start_value,
end_value,
positive,
negative,
total,
precision,
recall,
false_positive_rate,
f1_score,
lift,
predicted_positive_ratio,
predicted_negative_ratio,
cumulative_percent_of_positive,
cumulative_percent_of_negative,
total_cumulative_percent,
ks,
avg_score,
]
)
assert bin_report_arr.size == BIN_REPORT_STATISTICS_ENTRY_COUNT, "{}, {}".format(
bin_report_arr.size, BIN_REPORT_STATISTICS_ENTRY_COUNT
)
# update cumulative values
cumulative_pos_count += pos_count
cumulative_neg_count += neg_count
return bin_report_arr, cumulative_pos_count, cumulative_neg_count
[docs]def gen_pr_reports(sorted_pairs: jnp.array, thresholds: jnp.array) -> List[jnp.array]:
"""Generate pr report per specified threshold.
Args:
sorted_pairs: jnp.array
y_true y_score pairs sorted by y_score in increasing order
shape n_samples * 2
thresholds: 1d jnp.ndarray
prediction thresholds on which to evaluate
Returns:
pr_report_arr: List[jnp.array]
a list of pr reports in jnp.array of shape 3 * 1, list len = len(thresholds)
"""
tps, fps, all_thresholds = binary_clf_curve(sorted_pairs)
n_positive = tps[-1]
n_negative = fps[-1]
result = []
for t in thresholds:
i = jnp.sum(all_thresholds < t)
precision = tps[i] / (tps[i] + fps[i])
recall = tps[i] / n_positive
false_positive_rate = fps[i] / n_negative
pr_report = jnp.array([false_positive_rate, precision, recall])
result.append(pr_report)
return result
# section of statistics
[docs]def precision_recall_false_positive_rate(
true_positive, false_positive, false_negative, true_negative
) -> Tuple[float, float, float]:
precision = true_positive / (true_positive + false_positive)
recall = true_positive / (true_positive + false_negative)
false_positive_rate = false_positive / (false_positive + true_negative)
return float(precision), float(recall), float(false_positive_rate)
[docs]def confusion_matrix_from_cum_counts(
cumulative_pos_count, cumulative_neg_count, total_neg_count, total_pos_count
):
"""Compute the confusion matrix.
Args:
cumulative_pos_count: int
cumulative_neg_count: int
total_neg_count: int
total_pos_count: int
Returns:
true_positive: int
true_negative: int
false_positive: int
false_negative: int
"""
true_positive = cumulative_pos_count
true_negative = total_neg_count - cumulative_neg_count
false_positive = cumulative_neg_count
false_negative = total_pos_count - cumulative_pos_count
return true_positive, true_negative, false_positive, false_negative
[docs]def binary_clf_curve(sorted_pairs: jnp.array) -> Tuple[jnp.array, jnp.array, jnp.array]:
"""Calculate true and false positives per binary classification
threshold (can be used for roc curve or precision/recall curve).
Args:
sorted_pairs: jnp.array
y_true y_score pairs sorted by y_score in decreasing order
Returns:
fps: 1d ndarray
False positives counts, index i records the number
of negative samples that got assigned a
score >= thresholds[i].
The total number of negative samples is equal to
fps[-1] (thus true negatives are given by fps[-1] - fps)
tps: 1d ndarray
True positives counts, index i records the number
of positive samples that got assigned a
score >= thresholds[i].
The total number of positive samples is equal to
tps[-1] (thus false negatives are given by tps[-1] - tps)
thresholds : 1d ndarray
Distinct predicted score sorted in decreasing order
References:
Github: scikit-learn _binary_clf_curve.
"""
# y_score typically consists of tied values. Here we extract
# the indices associated with the distinct values. We also
# concatenate a value for the end of the curve
distinct_indices = jnp.where(jnp.diff(sorted_pairs[:, 1]))[0]
end = jnp.array([sorted_pairs.shape[0] - 1])
threshold_indices = jnp.hstack((distinct_indices, end))
thresholds = sorted_pairs[threshold_indices, 1]
tps = jnp.cumsum(sorted_pairs[:, 0])[threshold_indices]
# (1 + threshold_indices) = the number of positives
# at each index, thus number of data points minus true
# positives = false positives
fps = (1 + threshold_indices) - tps
return fps, tps, thresholds
[docs]def roc_curve(sorted_pairs: jnp.array) -> Tuple[jnp.array, jnp.array, jnp.array]:
"""Compute Receiver operating characteristic (ROC).
Compared to sklearn implementation, this implementation eliminates most conditionals and ill-conditionals checking.
Args:
sorted_pairs: jnp.array
y_true y_score pairs sorted by y_score in decreasing order
Returns:
fpr: ndarray of shape (>2,)
Increasing false positive rates such that element i is the false
positive rate of predictions with score >= `thresholds[i]`.
tpr: ndarray of shape (>2,)
Increasing true positive rates such that element `i` is the true
positive rate of predictions with score >= `thresholds[i]`.
thresholds: ndarray of shape = (n_thresholds,)
Decreasing thresholds on the decision function used to compute
fpr and tpr. `thresholds[0]` represents no instances being predicted
and is arbitrarily set to `max(y_score) + 1`.
References:
Github: scikit-learn roc_curve.
"""
fps, tps, thresholds = binary_clf_curve(sorted_pairs)
tps = jnp.r_[0, tps]
fps = jnp.r_[0, fps]
thresholds = jnp.r_[thresholds[0] + 1, thresholds]
fpr = fps / fps[-1]
tpr = tps / tps[-1]
return fpr, tpr, thresholds
[docs]def auc(x, y):
"""Compute Area Under the Curve (AUC) using the trapezoidal rule.
Args:
x: ndarray of shape (n,)
monotonic X coordinates
y: ndarray of shape, (n,)
Y coordinates
Returns:
auc: float
Area Under the Curve
"""
direction = 1
dx = jnp.diff(x)
if jnp.any(dx < 0):
if jnp.all(dx <= 0):
direction = -1
else:
raise ValueError("x is neither increasing nor decreasing : {}.".format(x))
area = direction * jnp.trapz(y, x)
return area
[docs]def binary_roc_auc(sorted_pairs: jnp.array) -> float:
"""
Compute Area Under the Curve (AUC) for ROC from labels and prediction scores in sorted_pairs.
Compared to sklearn implementation, this implementation is watered down with less options and
eliminates most conditionals and ill-conditionals checking.
Args:
sorted_pairs: jnp.array
y_true y_score pairs sorted by y_score in decreasing order,
and it has shape n_samples * 2.
Returns:
roc_auc: float
References:
Github: scikit-learn _binary_roc_auc_score.
"""
fpr, tpr, _ = roc_curve(sorted_pairs)
return auc(fpr, tpr)
[docs]def compute_f1_score(
true_positive: int, false_positive: int, false_negative: int
) -> float:
"""Calculate the F1 score."""
if true_positive == 0:
return 0
if (true_positive + false_positive) == 0:
return 0
if (true_positive + false_negative) == 0:
return 0
precision = true_positive / (true_positive + false_positive)
recall = true_positive / (true_positive + false_negative)
return 2 * precision * recall / (precision + recall)