Customize DataBuilder on SLModel in SecretFlow#
The following codes are demos only. It’s NOT for production due to system security concerns, please DO NOT use it directly in production.
We support federated learning in vertical scenarios on SLModel in SecretFlow. In this tutorial, we will take DeepFM model for recommendation as an example to introduce how to customize a DataBuilder on SLModel.
The main goal of this tutorial is to train DeepFM to show how to customize a DataBuilder on SLModel.
If you want to learn more about related content for DeepFM model and split plans, please move to related documents for DeepFM model.
Environment Setting#
[1]:
%load_ext autoreload
%autoreload 2
[2]:
import secretflow as sf
# Check the version of your SecretFlow
print('The version of SecretFlow: {}'.format(sf.__version__))
# In case you have a running secretflow runtime already.
sf.shutdown()
sf.init(['alice', 'bob', 'charlie'], address="local", log_to_driver=False)
alice, bob, charlie = sf.PYU('alice'), sf.PYU('bob'), sf.PYU('charlie')
Since the GPL-licensed package `unidecode` is not installed, using Python's `unicodedata` package which yields worse results.
The version of SecretFlow: 0.8.3b0
2023-07-03 20:12:02,535 INFO worker.py:1538 -- Started a local Ray instance.
Datasets#
We will use the classic dataset MovieLens for introduction. MovieLens is an open recommendation system dataset that contains movie ratings and movie metadata information.
And we split data into several pieces:
alice: “UserID”, “Gender”, “Age”, “Occupation”, “Zip-code”
bob: “MovieID”, “Rating”, “Title”, “Genres”, “Timestamp”
Define DataBuilders#
We customize a DataBuilder with the aim to meet higher level customization demands, as the existing FedDataFrame and FedNdarray did not provide the required functionality.
In Split Learning(where all sides of the data need to be aligned), we only provide DataBuilder for CSV data for the time being. You can define what to do with each row of data in the custom DataBuilder. The operations in SLModel depend on the way you define it.
The things you should notice here:
We read MovieLens dataset in CSV format. However, it needs to be treated as sparse features. The DataBuilder will convert each column into the form of dictionary.
Bob’s score was processed into binary form using threshold.
Custom functions can be defined in the DataBuilder and then applied to each column using dataset.map.
Only CSV format is supported in the vertical scenario due to the restriction that both sides of the data should be aligned.
It returns Dataset which has been defined Batch_size and Repeat in the CSV mode so that SLModel can infer the Steps_per_epoch according to the Dataset.
1. Download the dataset and convert it to the CSV format.#
[6]:
%%capture
%%!
wget https://secretflow-data.oss-accelerate.aliyuncs.com/datasets/movielens/ml-1m.zip
unzip ./ml-1m.zip
Read the data from the dat format and convert it to the dictionary form.
[ ]:
def load_data(filename, columns):
data = {}
with open(filename, "r", encoding="unicode_escape") as f:
for line in f:
ls = line.strip("\n").split("::")
data[ls[0]] = dict(zip(columns[1:], ls[1:]))
return data
[12]:
users_data = load_data(
"./ml-1m/users.dat",
columns=["UserID", "Gender", "Age", "Occupation", "Zip-code"],
)
movies_data = load_data("./ml-1m/movies.dat", columns=["MovieID", "Title", "Genres"])
ratings_columns = ["UserID", "MovieID", "Rating", "Timestamp"]
rating_data = load_data("./ml-1m/ratings.dat", columns=ratings_columns)
[13]:
print(len(users_data))
print(len(movies_data))
print(len(rating_data))
6040
3883
6040
Next, we join user, movie and rating, split up and assemble them into ‘alice_ml1m.csv’ and ‘bob_ml1m.csv’.
[14]:
fed_csv = {alice: "alice_ml1m.csv", bob: "bob_ml1m.csv"}
csv_writer_container = {alice: open(fed_csv[alice], "w"), bob: open(fed_csv[bob], "w")}
part_columns = {
alice: ["UserID", "Gender", "Age", "Occupation", "Zip-code"],
bob: ["MovieID", "Rating", "Title", "Genres", "Timestamp"],
}
[15]:
for device, writer in csv_writer_container.items():
writer.write("ID," + ",".join(part_columns[device]) + "\n")
[16]:
f = open("ml-1m/ratings.dat", "r", encoding="unicode_escape")
def _parse_example(feature, columns, index):
if "Title" in feature.keys():
feature["Title"] = feature["Title"].replace(",", "_")
if "Genres" in feature.keys():
feature["Genres"] = feature["Genres"].replace("|", " ")
values = []
values.append(str(index))
for c in columns:
values.append(feature[c])
return ",".join(values)
index = 0
num_sample = 1000
for line in f:
ls = line.strip().split("::")
rating = dict(zip(ratings_columns, ls))
rating.update(users_data.get(ls[0]))
rating.update(movies_data.get(ls[1]))
for device, columns in part_columns.items():
parse_f = _parse_example(rating, columns, index)
csv_writer_container[device].write(parse_f + "\n")
index += 1
if num_sample > 0 and index >= num_sample:
break
for w in csv_writer_container.values():
w.close()
So we’ve done the data processing and splitting up by now.#
And we output two files:
Alice: alice_ml1m.csv and Bob: bob_ml1m.csv
[17]:
! head alice_ml1m.csv
ID,UserID,Gender,Age,Occupation,Zip-code
0,1,F,1,10,48067
1,1,F,1,10,48067
2,1,F,1,10,48067
3,1,F,1,10,48067
4,1,F,1,10,48067
5,1,F,1,10,48067
6,1,F,1,10,48067
7,1,F,1,10,48067
8,1,F,1,10,48067
[18]:
! head bob_ml1m.csv
ID,MovieID,Rating,Title,Genres,Timestamp
0,1193,5,One Flew Over the Cuckoo's Nest (1975),Drama,978300760
1,661,3,James and the Giant Peach (1996),Animation Children's Musical,978302109
2,914,3,My Fair Lady (1964),Musical Romance,978301968
3,3408,4,Erin Brockovich (2000),Drama,978300275
4,2355,5,Bug's Life_ A (1998),Animation Children's Comedy,978824291
5,1197,3,Princess Bride_ The (1987),Action Adventure Comedy Romance,978302268
6,1287,5,Ben-Hur (1959),Action Adventure Drama,978302039
7,2804,5,Christmas Story_ A (1983),Comedy Drama,978300719
8,594,4,Snow White and the Seven Dwarfs (1937),Animation Children's Musical,978302268
1. Use plaintext engines to develop Databuilders.#
Because the data for each side of the SLModel is different, DataBuilder needs to be developed separately.
Develop Alice’s DataBuilder#
[19]:
import pandas as pd
alice_df = pd.read_csv("alice_ml1m.csv", encoding="utf-8")
[20]:
alice_df["UserID"] = alice_df["UserID"].astype("string")
alice_df = alice_df.drop(columns="ID")
alice_df
[20]:
| UserID | Gender | Age | Occupation | Zip-code | |
|---|---|---|---|---|---|
| 0 | 1 | F | 1 | 10 | 48067 |
| 1 | 1 | F | 1 | 10 | 48067 |
| 2 | 1 | F | 1 | 10 | 48067 |
| 3 | 1 | F | 1 | 10 | 48067 |
| 4 | 1 | F | 1 | 10 | 48067 |
| ... | ... | ... | ... | ... | ... |
| 995 | 10 | F | 35 | 1 | 95370 |
| 996 | 10 | F | 35 | 1 | 95370 |
| 997 | 10 | F | 35 | 1 | 95370 |
| 998 | 10 | F | 35 | 1 | 95370 |
| 999 | 10 | F | 35 | 1 | 95370 |
1000 rows × 5 columns
[21]:
import tensorflow as tf
alice_dict = dict(alice_df)
data_set = tf.data.Dataset.from_tensor_slices(alice_dict).batch(32).repeat(1)
2023-07-03 20:23:17.128798: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcudart.so.11.0'; dlerror: libcudart.so.11.0: cannot open shared object file: No such file or directory
2023-07-03 20:23:19.575681: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer.so.7'; dlerror: libnvinfer.so.7: cannot open shared object file: No such file or directory
2023-07-03 20:23:19.576067: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer_plugin.so.7'; dlerror: libnvinfer_plugin.so.7: cannot open shared object file: No such file or directory
2023-07-03 20:23:19.576096: W tensorflow/compiler/tf2tensorrt/utils/py_utils.cc:38] TF-TRT Warning: Cannot dlopen some TensorRT libraries. If you would like to use Nvidia GPU with TensorRT, please make sure the missing libraries mentioned above are installed properly.
2023-07-03 20:23:22.559302: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcuda.so.1'; dlerror: libcuda.so.1: cannot open shared object file: No such file or directory
2023-07-03 20:23:22.560651: W tensorflow/compiler/xla/stream_executor/cuda/cuda_driver.cc:265] failed call to cuInit: UNKNOWN ERROR (303)
[22]:
data_set
[22]:
<RepeatDataset element_spec={'UserID': TensorSpec(shape=(None,), dtype=tf.string, name=None), 'Gender': TensorSpec(shape=(None,), dtype=tf.string, name=None), 'Age': TensorSpec(shape=(None,), dtype=tf.int64, name=None), 'Occupation': TensorSpec(shape=(None,), dtype=tf.int64, name=None), 'Zip-code': TensorSpec(shape=(None,), dtype=tf.int64, name=None)}>
Develop Bob’s DataBuilder#
[23]:
bob_df = pd.read_csv("bob_ml1m.csv", encoding="utf-8")
bob_df = bob_df.drop(columns="ID")
bob_df["MovieID"] = bob_df["MovieID"].astype("string")
bob_df
[23]:
| MovieID | Rating | Title | Genres | Timestamp | |
|---|---|---|---|---|---|
| 0 | 1193 | 5 | One Flew Over the Cuckoo's Nest (1975) | Drama | 978300760 |
| 1 | 661 | 3 | James and the Giant Peach (1996) | Animation Children's Musical | 978302109 |
| 2 | 914 | 3 | My Fair Lady (1964) | Musical Romance | 978301968 |
| 3 | 3408 | 4 | Erin Brockovich (2000) | Drama | 978300275 |
| 4 | 2355 | 5 | Bug's Life_ A (1998) | Animation Children's Comedy | 978824291 |
| ... | ... | ... | ... | ... | ... |
| 995 | 3704 | 2 | Mad Max Beyond Thunderdome (1985) | Action Sci-Fi | 978228364 |
| 996 | 1020 | 3 | Cool Runnings (1993) | Comedy | 978228726 |
| 997 | 784 | 3 | Cable Guy_ The (1996) | Comedy | 978230946 |
| 998 | 858 | 3 | Godfather_ The (1972) | Action Crime Drama | 978224375 |
| 999 | 1022 | 5 | Cinderella (1950) | Animation Children's Musical | 979775689 |
1000 rows × 5 columns
[24]:
label = bob_df["Rating"]
data = bob_df.drop(columns="Rating")
def _parse_bob(row_sample, label):
import tensorflow as tf
y_t = label
y = tf.expand_dims(
tf.where(
y_t > 3,
tf.ones_like(y_t, dtype=tf.float32),
tf.zeros_like(y_t, dtype=tf.float32),
),
axis=1,
)
return row_sample, y
[25]:
bob_dict = tuple([dict(data), label])
data_set = tf.data.Dataset.from_tensor_slices(bob_dict).batch(32).repeat(1)
[26]:
data_set = data_set.map(_parse_bob)
[27]:
next(iter(data_set))
[27]:
({'MovieID': <tf.Tensor: shape=(32,), dtype=string, numpy=
array([b'1193', b'661', b'914', b'3408', b'2355', b'1197', b'1287',
b'2804', b'594', b'919', b'595', b'938', b'2398', b'2918', b'1035',
b'2791', b'2687', b'2018', b'3105', b'2797', b'2321', b'720',
b'1270', b'527', b'2340', b'48', b'1097', b'1721', b'1545', b'745',
b'2294', b'3186'], dtype=object)>,
'Title': <tf.Tensor: shape=(32,), dtype=string, numpy=
array([b"One Flew Over the Cuckoo's Nest (1975)",
b'James and the Giant Peach (1996)', b'My Fair Lady (1964)',
b'Erin Brockovich (2000)', b"Bug's Life_ A (1998)",
b'Princess Bride_ The (1987)', b'Ben-Hur (1959)',
b'Christmas Story_ A (1983)',
b'Snow White and the Seven Dwarfs (1937)',
b'Wizard of Oz_ The (1939)', b'Beauty and the Beast (1991)',
b'Gigi (1958)', b'Miracle on 34th Street (1947)',
b"Ferris Bueller's Day Off (1986)", b'Sound of Music_ The (1965)',
b'Airplane! (1980)', b'Tarzan (1999)', b'Bambi (1942)',
b'Awakenings (1990)', b'Big (1988)', b'Pleasantville (1998)',
b'Wallace & Gromit: The Best of Aardman Animation (1996)',
b'Back to the Future (1985)', b"Schindler's List (1993)",
b'Meet Joe Black (1998)', b'Pocahontas (1995)',
b'E.T. the Extra-Terrestrial (1982)', b'Titanic (1997)',
b'Ponette (1996)', b'Close Shave_ A (1995)', b'Antz (1998)',
b'Girl_ Interrupted (1999)'], dtype=object)>,
'Genres': <tf.Tensor: shape=(32,), dtype=string, numpy=
array([b'Drama', b"Animation Children's Musical", b'Musical Romance',
b'Drama', b"Animation Children's Comedy",
b'Action Adventure Comedy Romance', b'Action Adventure Drama',
b'Comedy Drama', b"Animation Children's Musical",
b"Adventure Children's Drama Musical",
b"Animation Children's Musical", b'Musical', b'Drama', b'Comedy',
b'Musical', b'Comedy', b"Animation Children's",
b"Animation Children's", b'Drama', b'Comedy Fantasy', b'Comedy',
b'Animation', b'Comedy Sci-Fi', b'Drama War', b'Romance',
b"Animation Children's Musical Romance",
b"Children's Drama Fantasy Sci-Fi", b'Drama Romance', b'Drama',
b'Animation Comedy Thriller', b"Animation Children's", b'Drama'],
dtype=object)>,
'Timestamp': <tf.Tensor: shape=(32,), dtype=int64, numpy=
array([978300760, 978302109, 978301968, 978300275, 978824291, 978302268,
978302039, 978300719, 978302268, 978301368, 978824268, 978301752,
978302281, 978302124, 978301753, 978302188, 978824268, 978301777,
978301713, 978302039, 978302205, 978300760, 978300055, 978824195,
978300103, 978824351, 978301953, 978300055, 978824139, 978824268,
978824291, 978300019])>},
<tf.Tensor: shape=(32, 1), dtype=float32, numpy=
array([[1.],
[0.],
[0.],
[1.],
[1.],
[0.],
[1.],
[1.],
[1.],
[1.],
[1.],
[1.],
[1.],
[1.],
[1.],
[1.],
[0.],
[1.],
[1.],
[1.],
[0.],
[0.],
[1.],
[1.],
[0.],
[1.],
[1.],
[1.],
[1.],
[0.],
[1.],
[1.]], dtype=float32)>)
2. Wrap their DataBuilders separately.#
[28]:
# alice
def create_dataset_builder_alice(
batch_size=128,
repeat_count=5,
):
def dataset_builder(x):
import pandas as pd
import tensorflow as tf
x = [dict(t) if isinstance(t, pd.DataFrame) else t for t in x]
x = x[0] if len(x) == 1 else tuple(x)
data_set = (
tf.data.Dataset.from_tensor_slices(x).batch(batch_size).repeat(repeat_count)
)
return data_set
return dataset_builder
# bob
def create_dataset_builder_bob(
batch_size=128,
repeat_count=5,
):
def _parse_bob(row_sample, label):
import tensorflow as tf
y_t = label["Rating"]
y = tf.expand_dims(
tf.where(
y_t > 3,
tf.ones_like(y_t, dtype=tf.float32),
tf.zeros_like(y_t, dtype=tf.float32),
),
axis=1,
)
return row_sample, y
def dataset_builder(x):
import pandas as pd
import tensorflow as tf
x = [dict(t) if isinstance(t, pd.DataFrame) else t for t in x]
x = x[0] if len(x) == 1 else tuple(x)
data_set = (
tf.data.Dataset.from_tensor_slices(x).batch(batch_size).repeat(repeat_count)
)
data_set = data_set.map(_parse_bob)
return data_set
return dataset_builder
3. Construct a databuilder_dict.#
[29]:
data_builder_dict = {
alice: create_dataset_builder_alice(
batch_size=128,
repeat_count=5,
),
bob: create_dataset_builder_bob(
batch_size=128,
repeat_count=5,
),
}
Define DeepFM Model and run it#
1. Define a DeepFM model.#
[30]:
from secretflow.ml.nn.applications.sl_deep_fm import DeepFMbase, DeepFMfuse
from secretflow.ml.nn import SLModel
NUM_USERS = 6040
NUM_MOVIES = 3952
GENDER_VOCAB = ["F", "M"]
AGE_VOCAB = [1, 18, 25, 35, 45, 50, 56]
OCCUPATION_VOCAB = [i for i in range(21)]
GENRES_VOCAB = [
"Action",
"Adventure",
"Animation",
"Children's",
"Comedy",
"Crime",
"Documentary",
"Drama",
"Fantasy",
"Film-Noir",
"Horror",
"Musical",
"Mystery",
"Romance",
"Sci-Fi",
"Thriller",
"War",
"Western",
]
2. Define Alice’s basenet.#
[31]:
def create_base_model_alice():
# Create model
def create_model():
import tensorflow as tf
def preprocess():
inputs = {
"UserID": tf.keras.Input(shape=(1,), dtype=tf.string),
"Gender": tf.keras.Input(shape=(1,), dtype=tf.string),
"Age": tf.keras.Input(shape=(1,), dtype=tf.int64),
"Occupation": tf.keras.Input(shape=(1,), dtype=tf.int64),
}
user_id_output = tf.keras.layers.Hashing(
num_bins=NUM_USERS, output_mode="one_hot"
)
user_gender_output = tf.keras.layers.StringLookup(
vocabulary=GENDER_VOCAB, output_mode="one_hot"
)
user_age_out = tf.keras.layers.IntegerLookup(
vocabulary=AGE_VOCAB, output_mode="one_hot"
)
user_occupation_out = tf.keras.layers.IntegerLookup(
vocabulary=OCCUPATION_VOCAB, output_mode="one_hot"
)
outputs = {
"UserID": user_id_output(inputs["UserID"]),
"Gender": user_gender_output(inputs["Gender"]),
"Age": user_age_out(inputs["Age"]),
"Occupation": user_occupation_out(inputs["Occupation"]),
}
return tf.keras.Model(inputs=inputs, outputs=outputs)
preprocess_layer = preprocess()
model = DeepFMbase(
dnn_units_size=[256, 32],
preprocess_layer=preprocess_layer,
)
model.compile(
loss=tf.keras.losses.binary_crossentropy,
optimizer=tf.keras.optimizers.Adam(),
metrics=[
tf.keras.metrics.AUC(),
tf.keras.metrics.Precision(),
tf.keras.metrics.Recall(),
],
)
return model # need wrap
return create_model
3. Define Bob’s basenet and fusenet.#
[32]:
# bob model
def create_base_model_bob():
# Create model
def create_model():
import tensorflow as tf
# define preprocess layer
def preprocess():
inputs = {
"MovieID": tf.keras.Input(shape=(1,), dtype=tf.string),
"Genres": tf.keras.Input(shape=(1,), dtype=tf.string),
}
movie_id_out = tf.keras.layers.Hashing(
num_bins=NUM_MOVIES, output_mode="one_hot"
)
movie_genres_out = tf.keras.layers.TextVectorization(
output_mode='multi_hot', split="whitespace", vocabulary=GENRES_VOCAB
)
outputs = {
"MovieID": movie_id_out(inputs["MovieID"]),
"Genres": movie_genres_out(inputs["Genres"]),
}
return tf.keras.Model(inputs=inputs, outputs=outputs)
preprocess_layer = preprocess()
model = DeepFMbase(
dnn_units_size=[256, 32],
preprocess_layer=preprocess_layer,
)
model.compile(
loss=tf.keras.losses.binary_crossentropy,
optimizer=tf.keras.optimizers.Adam(),
metrics=[
tf.keras.metrics.AUC(),
tf.keras.metrics.Precision(),
tf.keras.metrics.Recall(),
],
)
return model # need wrap
return create_model
def create_fuse_model():
# Create model
def create_model():
import tensorflow as tf
model = DeepFMfuse(dnn_units_size=[256, 256, 32])
model.compile(
loss=tf.keras.losses.binary_crossentropy,
optimizer=tf.keras.optimizers.Adam(),
metrics=[
tf.keras.metrics.AUC(),
tf.keras.metrics.Precision(),
tf.keras.metrics.Recall(),
],
)
return model
return create_model
[33]:
base_model_dict = {alice: create_base_model_alice(), bob: create_base_model_bob()}
model_fuse = create_fuse_model()
[ ]:
from secretflow.data.vertical import read_csv as v_read_csv
vdf = v_read_csv(
{alice: "alice_ml1m.csv", bob: "bob_ml1m.csv"}, keys="ID", drop_keys="ID"
)
label = vdf["Rating"]
data = vdf.drop(columns=["Rating", "Timestamp", "Title", "Zip-code"])
data["UserID"] = data["UserID"].astype("string")
data["MovieID"] = data["MovieID"].astype("string")
sl_model = SLModel(
base_model_dict=base_model_dict,
device_y=bob,
model_fuse=model_fuse,
)
history = sl_model.fit(
data,
label,
epochs=5,
batch_size=128,
random_seed=1234,
dataset_builder=data_builder_dict,
)
INFO:root:Create proxy actor <class 'secretflow.ml.nn.sl.backend.tensorflow.sl_base.PYUSLTFModel'> with party alice.
INFO:root:Create proxy actor <class 'secretflow.ml.nn.sl.backend.tensorflow.sl_base.PYUSLTFModel'> with party bob.
INFO:root:SL Train Params: {'self': <secretflow.ml.nn.sl.sl_model.SLModel object at 0x7fc9c99f51c0>, 'x': VDataFrame(partitions={PYURuntime(alice): Partition(data=<secretflow.device.device.pyu.PYUObject object at 0x7fc9c9998160>), PYURuntime(bob): Partition(data=<secretflow.device.device.pyu.PYUObject object at 0x7fc9c998c520>)}, aligned=True), 'y': VDataFrame(partitions={PYURuntime(bob): Partition(data=<secretflow.device.device.pyu.PYUObject object at 0x7fc9c99984f0>)}, aligned=True), 'batch_size': 128, 'epochs': 5, 'verbose': 1, 'callbacks': None, 'validation_data': None, 'shuffle': False, 'sample_weight': None, 'validation_freq': 1, 'dp_spent_step_freq': None, 'dataset_builder': {PYURuntime(alice): <function create_dataset_builder_alice.<locals>.dataset_builder at 0x7fcb640b9700>, PYURuntime(bob): <function create_dataset_builder_bob.<locals>.dataset_builder at 0x7fca2c6b30d0>}, 'audit_log_dir': None, 'audit_log_params': {}, 'random_seed': 1234}
0%| | 0/8 [00:00<?, ?it/s]