在 SecretFlow 中使用 DataBuilder 进行 SLModel 学习#

下面的代码只是演示。由于系统安全问题,请 不要直接在生产中 使用它。

隐语在 SLModel 中提供了联邦学习在垂直场景的支持,本文将以 DeepFM 推荐场景为例介绍,如何在 SLModel 中使用自定义 DataBuilder。

这篇教程的主要目标是借助 DeepFM 训练来介绍如何在 SLModel 中使用自定义 DataBuilder。

注:如果您想了解 DeepFM 相关的内容以及拆分方案,请移步 DeepFM 相关文档

环境设置#

[1]:
%load_ext autoreload
%autoreload 2
[2]:
import secretflow as sf

# Check the version of your SecretFlow
print('The version of SecretFlow: {}'.format(sf.__version__))

# In case you have a running secretflow runtime already.
sf.shutdown()
sf.init(['alice', 'bob', 'charlie'], address="local", log_to_driver=False)
alice, bob, charlie = sf.PYU('alice'), sf.PYU('bob'), sf.PYU('charlie')
Since the GPL-licensed package `unidecode` is not installed, using Python's `unicodedata` package which yields worse results.
The version of SecretFlow: 0.8.3b0
2023-07-03 20:12:02,535 INFO worker.py:1538 -- Started a local Ray instance.

数据集介绍#

我们这里将使用经典的 MovieLens 数据集 来进行演示。MovieLens是一个开放式的推荐系统数据集,包含了电影评分和电影元数据信息。

我们对数据进行了切分:

  • alice: “UserID”, “Gender”, “Age”, “Occupation”, “Zip-code”

  • bob: “MovieID”, “Rating”, “Title”, “Genres”, “Timestamp”

定义 DataBuilder 函数#

我们定义 DataBuilder 的目的是已有的 FedDataFrame 和 FedNdarray 提供的功能无法满足需求,我们可以在通过自定义 DataBuilder 来满足高阶定制化需求。

在Split Learning中(各方数据需要对齐)我们暂时仅提供了面向 CSV 数据的 DataBuilder 能力。在自定义 DataBuilder 中可以定义对每一行数据怎么处理。SLModel 会根据您定义的方式进行操作。

需要注意的点:

  • 我们的 MovieLens 数据集读取进来后会是 CSV 格式的,但是需要被当做稀疏特征来进行处理,所以会在 DataBuilder 中将每一列转成字典形式。

  • 在 Bob 侧我们将评分进行了阈值(Threshold)处理成二值形式。

  • 在 DataBuilder 中可以定义自定义函数,然后使用 dataset.map 来应用到每一列。

  • 因为垂直模式需要保证两方数据是对齐的,所以暂时只支持 CSV 模式。

  • CSV 模式只需要返回 Dataset,Dataset 需要定义好 Batch_size,Repeat,SLModel 会根据 Dataset 推断 Steps_per_epoch。

1、下载数据,并转换成 CSV。#

[6]:
%%capture
%%!
wget https://secretflow-data.oss-accelerate.aliyuncs.com/datasets/movielens/ml-1m.zip
unzip ./ml-1m.zip

读取 dat 格式的数据并转成字典。

[ ]:
def load_data(filename, columns):
    data = {}
    with open(filename, "r", encoding="unicode_escape") as f:
        for line in f:
            ls = line.strip("\n").split("::")
            data[ls[0]] = dict(zip(columns[1:], ls[1:]))
    return data
[12]:
users_data = load_data(
    "./ml-1m/users.dat",
    columns=["UserID", "Gender", "Age", "Occupation", "Zip-code"],
)
movies_data = load_data("./ml-1m/movies.dat", columns=["MovieID", "Title", "Genres"])
ratings_columns = ["UserID", "MovieID", "Rating", "Timestamp"]

rating_data = load_data("./ml-1m/ratings.dat", columns=ratings_columns)
[13]:
print(len(users_data))
print(len(movies_data))
print(len(rating_data))
6040
3883
6040

接下来我们将 user、movie 和 rating 进行 join,并进行拆分,组装成 alice_ml1m.csv 和 bob_ml1m.csv。

[14]:
fed_csv = {alice: "alice_ml1m.csv", bob: "bob_ml1m.csv"}
csv_writer_container = {alice: open(fed_csv[alice], "w"), bob: open(fed_csv[bob], "w")}
part_columns = {
    alice: ["UserID", "Gender", "Age", "Occupation", "Zip-code"],
    bob: ["MovieID", "Rating", "Title", "Genres", "Timestamp"],
}
[15]:
for device, writer in csv_writer_container.items():
    writer.write("ID," + ",".join(part_columns[device]) + "\n")
[16]:
f = open("ml-1m/ratings.dat", "r", encoding="unicode_escape")


def _parse_example(feature, columns, index):
    if "Title" in feature.keys():
        feature["Title"] = feature["Title"].replace(",", "_")
    if "Genres" in feature.keys():
        feature["Genres"] = feature["Genres"].replace("|", " ")
    values = []
    values.append(str(index))
    for c in columns:
        values.append(feature[c])
    return ",".join(values)


index = 0
num_sample = 1000
for line in f:
    ls = line.strip().split("::")
    rating = dict(zip(ratings_columns, ls))
    rating.update(users_data.get(ls[0]))
    rating.update(movies_data.get(ls[1]))
    for device, columns in part_columns.items():
        parse_f = _parse_example(rating, columns, index)
        csv_writer_container[device].write(parse_f + "\n")
    index += 1
    if num_sample > 0 and index >= num_sample:
        break
for w in csv_writer_container.values():
    w.close()

到此为止我们已经完成了数据的处理和拆分。#

产出了两个文件:

Alice: alice_ml1m.csv and Bob: bob_ml1m.csv
[17]:
! head alice_ml1m.csv
ID,UserID,Gender,Age,Occupation,Zip-code
0,1,F,1,10,48067
1,1,F,1,10,48067
2,1,F,1,10,48067
3,1,F,1,10,48067
4,1,F,1,10,48067
5,1,F,1,10,48067
6,1,F,1,10,48067
7,1,F,1,10,48067
8,1,F,1,10,48067
[18]:
! head bob_ml1m.csv
ID,MovieID,Rating,Title,Genres,Timestamp
0,1193,5,One Flew Over the Cuckoo's Nest (1975),Drama,978300760
1,661,3,James and the Giant Peach (1996),Animation Children's Musical,978302109
2,914,3,My Fair Lady (1964),Musical Romance,978301968
3,3408,4,Erin Brockovich (2000),Drama,978300275
4,2355,5,Bug's Life_ A (1998),Animation Children's Comedy,978824291
5,1197,3,Princess Bride_ The (1987),Action Adventure Comedy Romance,978302268
6,1287,5,Ben-Hur (1959),Action Adventure Drama,978302039
7,2804,5,Christmas Story_ A (1983),Comedy Drama,978300719
8,594,4,Snow White and the Seven Dwarfs (1937),Animation Children's Musical,978302268

1. 使用明文引擎开发 DataBuilder 函数。#

因为 SLModel 每方的数据是不同的,所以需要分别开发 DataBuilder。

开发 Alice 侧的 DataBuilder#

[19]:
import pandas as pd

alice_df = pd.read_csv("alice_ml1m.csv", encoding="utf-8")
[20]:
alice_df["UserID"] = alice_df["UserID"].astype("string")
alice_df = alice_df.drop(columns="ID")
alice_df
[20]:
UserID Gender Age Occupation Zip-code
0 1 F 1 10 48067
1 1 F 1 10 48067
2 1 F 1 10 48067
3 1 F 1 10 48067
4 1 F 1 10 48067
... ... ... ... ... ...
995 10 F 35 1 95370
996 10 F 35 1 95370
997 10 F 35 1 95370
998 10 F 35 1 95370
999 10 F 35 1 95370

1000 rows × 5 columns

[21]:
import tensorflow as tf

alice_dict = dict(alice_df)
data_set = tf.data.Dataset.from_tensor_slices(alice_dict).batch(32).repeat(1)
2023-07-03 20:23:17.128798: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcudart.so.11.0'; dlerror: libcudart.so.11.0: cannot open shared object file: No such file or directory
2023-07-03 20:23:19.575681: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer.so.7'; dlerror: libnvinfer.so.7: cannot open shared object file: No such file or directory
2023-07-03 20:23:19.576067: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer_plugin.so.7'; dlerror: libnvinfer_plugin.so.7: cannot open shared object file: No such file or directory
2023-07-03 20:23:19.576096: W tensorflow/compiler/tf2tensorrt/utils/py_utils.cc:38] TF-TRT Warning: Cannot dlopen some TensorRT libraries. If you would like to use Nvidia GPU with TensorRT, please make sure the missing libraries mentioned above are installed properly.
2023-07-03 20:23:22.559302: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcuda.so.1'; dlerror: libcuda.so.1: cannot open shared object file: No such file or directory
2023-07-03 20:23:22.560651: W tensorflow/compiler/xla/stream_executor/cuda/cuda_driver.cc:265] failed call to cuInit: UNKNOWN ERROR (303)
[22]:
data_set
[22]:
<RepeatDataset element_spec={'UserID': TensorSpec(shape=(None,), dtype=tf.string, name=None), 'Gender': TensorSpec(shape=(None,), dtype=tf.string, name=None), 'Age': TensorSpec(shape=(None,), dtype=tf.int64, name=None), 'Occupation': TensorSpec(shape=(None,), dtype=tf.int64, name=None), 'Zip-code': TensorSpec(shape=(None,), dtype=tf.int64, name=None)}>

开发 Bob 侧的 DataBuilder#

[23]:
bob_df = pd.read_csv("bob_ml1m.csv", encoding="utf-8")
bob_df = bob_df.drop(columns="ID")

bob_df["MovieID"] = bob_df["MovieID"].astype("string")

bob_df
[23]:
MovieID Rating Title Genres Timestamp
0 1193 5 One Flew Over the Cuckoo's Nest (1975) Drama 978300760
1 661 3 James and the Giant Peach (1996) Animation Children's Musical 978302109
2 914 3 My Fair Lady (1964) Musical Romance 978301968
3 3408 4 Erin Brockovich (2000) Drama 978300275
4 2355 5 Bug's Life_ A (1998) Animation Children's Comedy 978824291
... ... ... ... ... ...
995 3704 2 Mad Max Beyond Thunderdome (1985) Action Sci-Fi 978228364
996 1020 3 Cool Runnings (1993) Comedy 978228726
997 784 3 Cable Guy_ The (1996) Comedy 978230946
998 858 3 Godfather_ The (1972) Action Crime Drama 978224375
999 1022 5 Cinderella (1950) Animation Children's Musical 979775689

1000 rows × 5 columns

[24]:
label = bob_df["Rating"]
data = bob_df.drop(columns="Rating")


def _parse_bob(row_sample, label):
    import tensorflow as tf

    y_t = label
    y = tf.expand_dims(
        tf.where(
            y_t > 3,
            tf.ones_like(y_t, dtype=tf.float32),
            tf.zeros_like(y_t, dtype=tf.float32),
        ),
        axis=1,
    )
    return row_sample, y
[25]:
bob_dict = tuple([dict(data), label])
data_set = tf.data.Dataset.from_tensor_slices(bob_dict).batch(32).repeat(1)
[26]:
data_set = data_set.map(_parse_bob)
[27]:
next(iter(data_set))
[27]:
({'MovieID': <tf.Tensor: shape=(32,), dtype=string, numpy=
  array([b'1193', b'661', b'914', b'3408', b'2355', b'1197', b'1287',
         b'2804', b'594', b'919', b'595', b'938', b'2398', b'2918', b'1035',
         b'2791', b'2687', b'2018', b'3105', b'2797', b'2321', b'720',
         b'1270', b'527', b'2340', b'48', b'1097', b'1721', b'1545', b'745',
         b'2294', b'3186'], dtype=object)>,
  'Title': <tf.Tensor: shape=(32,), dtype=string, numpy=
  array([b"One Flew Over the Cuckoo's Nest (1975)",
         b'James and the Giant Peach (1996)', b'My Fair Lady (1964)',
         b'Erin Brockovich (2000)', b"Bug's Life_ A (1998)",
         b'Princess Bride_ The (1987)', b'Ben-Hur (1959)',
         b'Christmas Story_ A (1983)',
         b'Snow White and the Seven Dwarfs (1937)',
         b'Wizard of Oz_ The (1939)', b'Beauty and the Beast (1991)',
         b'Gigi (1958)', b'Miracle on 34th Street (1947)',
         b"Ferris Bueller's Day Off (1986)", b'Sound of Music_ The (1965)',
         b'Airplane! (1980)', b'Tarzan (1999)', b'Bambi (1942)',
         b'Awakenings (1990)', b'Big (1988)', b'Pleasantville (1998)',
         b'Wallace & Gromit: The Best of Aardman Animation (1996)',
         b'Back to the Future (1985)', b"Schindler's List (1993)",
         b'Meet Joe Black (1998)', b'Pocahontas (1995)',
         b'E.T. the Extra-Terrestrial (1982)', b'Titanic (1997)',
         b'Ponette (1996)', b'Close Shave_ A (1995)', b'Antz (1998)',
         b'Girl_ Interrupted (1999)'], dtype=object)>,
  'Genres': <tf.Tensor: shape=(32,), dtype=string, numpy=
  array([b'Drama', b"Animation Children's Musical", b'Musical Romance',
         b'Drama', b"Animation Children's Comedy",
         b'Action Adventure Comedy Romance', b'Action Adventure Drama',
         b'Comedy Drama', b"Animation Children's Musical",
         b"Adventure Children's Drama Musical",
         b"Animation Children's Musical", b'Musical', b'Drama', b'Comedy',
         b'Musical', b'Comedy', b"Animation Children's",
         b"Animation Children's", b'Drama', b'Comedy Fantasy', b'Comedy',
         b'Animation', b'Comedy Sci-Fi', b'Drama War', b'Romance',
         b"Animation Children's Musical Romance",
         b"Children's Drama Fantasy Sci-Fi", b'Drama Romance', b'Drama',
         b'Animation Comedy Thriller', b"Animation Children's", b'Drama'],
        dtype=object)>,
  'Timestamp': <tf.Tensor: shape=(32,), dtype=int64, numpy=
  array([978300760, 978302109, 978301968, 978300275, 978824291, 978302268,
         978302039, 978300719, 978302268, 978301368, 978824268, 978301752,
         978302281, 978302124, 978301753, 978302188, 978824268, 978301777,
         978301713, 978302039, 978302205, 978300760, 978300055, 978824195,
         978300103, 978824351, 978301953, 978300055, 978824139, 978824268,
         978824291, 978300019])>},
 <tf.Tensor: shape=(32, 1), dtype=float32, numpy=
 array([[1.],
        [0.],
        [0.],
        [1.],
        [1.],
        [0.],
        [1.],
        [1.],
        [1.],
        [1.],
        [1.],
        [1.],
        [1.],
        [1.],
        [1.],
        [1.],
        [0.],
        [1.],
        [1.],
        [1.],
        [0.],
        [0.],
        [1.],
        [1.],
        [0.],
        [1.],
        [1.],
        [1.],
        [1.],
        [0.],
        [1.],
        [1.]], dtype=float32)>)

2. 将单方的 DataBuilder 包装(wrap)起来。#

[28]:
# alice
def create_dataset_builder_alice(
    batch_size=128,
    repeat_count=5,
):
    def dataset_builder(x):
        import pandas as pd
        import tensorflow as tf

        x = [dict(t) if isinstance(t, pd.DataFrame) else t for t in x]
        x = x[0] if len(x) == 1 else tuple(x)
        data_set = (
            tf.data.Dataset.from_tensor_slices(x).batch(batch_size).repeat(repeat_count)
        )

        return data_set

    return dataset_builder


# bob
def create_dataset_builder_bob(
    batch_size=128,
    repeat_count=5,
):
    def _parse_bob(row_sample, label):
        import tensorflow as tf

        y_t = label["Rating"]
        y = tf.expand_dims(
            tf.where(
                y_t > 3,
                tf.ones_like(y_t, dtype=tf.float32),
                tf.zeros_like(y_t, dtype=tf.float32),
            ),
            axis=1,
        )
        return row_sample, y

    def dataset_builder(x):
        import pandas as pd
        import tensorflow as tf

        x = [dict(t) if isinstance(t, pd.DataFrame) else t for t in x]
        x = x[0] if len(x) == 1 else tuple(x)
        data_set = (
            tf.data.Dataset.from_tensor_slices(x).batch(batch_size).repeat(repeat_count)
        )

        data_set = data_set.map(_parse_bob)

        return data_set

    return dataset_builder

3. 构造 databuilder_dict。#

[29]:
data_builder_dict = {
    alice: create_dataset_builder_alice(
        batch_size=128,
        repeat_count=5,
    ),
    bob: create_dataset_builder_bob(
        batch_size=128,
        repeat_count=5,
    ),
}

接下来我们定义 DeepFM 模型,并且运行模型#

1. 定义 DeepFM 模型。#

[30]:
from secretflow.ml.nn.applications.sl_deep_fm import DeepFMbase, DeepFMfuse
from secretflow.ml.nn import SLModel

NUM_USERS = 6040
NUM_MOVIES = 3952
GENDER_VOCAB = ["F", "M"]
AGE_VOCAB = [1, 18, 25, 35, 45, 50, 56]
OCCUPATION_VOCAB = [i for i in range(21)]
GENRES_VOCAB = [
    "Action",
    "Adventure",
    "Animation",
    "Children's",
    "Comedy",
    "Crime",
    "Documentary",
    "Drama",
    "Fantasy",
    "Film-Noir",
    "Horror",
    "Musical",
    "Mystery",
    "Romance",
    "Sci-Fi",
    "Thriller",
    "War",
    "Western",
]

2. 定义 Alice 的 basenet。#

[31]:
def create_base_model_alice():
    # Create model
    def create_model():
        import tensorflow as tf

        def preprocess():
            inputs = {
                "UserID": tf.keras.Input(shape=(1,), dtype=tf.string),
                "Gender": tf.keras.Input(shape=(1,), dtype=tf.string),
                "Age": tf.keras.Input(shape=(1,), dtype=tf.int64),
                "Occupation": tf.keras.Input(shape=(1,), dtype=tf.int64),
            }
            user_id_output = tf.keras.layers.Hashing(
                num_bins=NUM_USERS, output_mode="one_hot"
            )
            user_gender_output = tf.keras.layers.StringLookup(
                vocabulary=GENDER_VOCAB, output_mode="one_hot"
            )

            user_age_out = tf.keras.layers.IntegerLookup(
                vocabulary=AGE_VOCAB, output_mode="one_hot"
            )
            user_occupation_out = tf.keras.layers.IntegerLookup(
                vocabulary=OCCUPATION_VOCAB, output_mode="one_hot"
            )

            outputs = {
                "UserID": user_id_output(inputs["UserID"]),
                "Gender": user_gender_output(inputs["Gender"]),
                "Age": user_age_out(inputs["Age"]),
                "Occupation": user_occupation_out(inputs["Occupation"]),
            }
            return tf.keras.Model(inputs=inputs, outputs=outputs)

        preprocess_layer = preprocess()
        model = DeepFMbase(
            dnn_units_size=[256, 32],
            preprocess_layer=preprocess_layer,
        )
        model.compile(
            loss=tf.keras.losses.binary_crossentropy,
            optimizer=tf.keras.optimizers.Adam(),
            metrics=[
                tf.keras.metrics.AUC(),
                tf.keras.metrics.Precision(),
                tf.keras.metrics.Recall(),
            ],
        )
        return model  # need wrap

    return create_model

3. 定义 Bob 的 basenet 和 fusenet。#

[32]:
# bob model
def create_base_model_bob():
    # Create model
    def create_model():
        import tensorflow as tf

        # define preprocess layer
        def preprocess():
            inputs = {
                "MovieID": tf.keras.Input(shape=(1,), dtype=tf.string),
                "Genres": tf.keras.Input(shape=(1,), dtype=tf.string),
            }

            movie_id_out = tf.keras.layers.Hashing(
                num_bins=NUM_MOVIES, output_mode="one_hot"
            )
            movie_genres_out = tf.keras.layers.TextVectorization(
                output_mode='multi_hot', split="whitespace", vocabulary=GENRES_VOCAB
            )
            outputs = {
                "MovieID": movie_id_out(inputs["MovieID"]),
                "Genres": movie_genres_out(inputs["Genres"]),
            }
            return tf.keras.Model(inputs=inputs, outputs=outputs)

        preprocess_layer = preprocess()

        model = DeepFMbase(
            dnn_units_size=[256, 32],
            preprocess_layer=preprocess_layer,
        )
        model.compile(
            loss=tf.keras.losses.binary_crossentropy,
            optimizer=tf.keras.optimizers.Adam(),
            metrics=[
                tf.keras.metrics.AUC(),
                tf.keras.metrics.Precision(),
                tf.keras.metrics.Recall(),
            ],
        )
        return model  # need wrap

    return create_model


def create_fuse_model():
    # Create model
    def create_model():
        import tensorflow as tf

        model = DeepFMfuse(dnn_units_size=[256, 256, 32])
        model.compile(
            loss=tf.keras.losses.binary_crossentropy,
            optimizer=tf.keras.optimizers.Adam(),
            metrics=[
                tf.keras.metrics.AUC(),
                tf.keras.metrics.Precision(),
                tf.keras.metrics.Recall(),
            ],
        )
        return model

    return create_model
[33]:
base_model_dict = {alice: create_base_model_alice(), bob: create_base_model_bob()}
model_fuse = create_fuse_model()
[ ]:
from secretflow.data.vertical import read_csv as v_read_csv

vdf = v_read_csv(
    {alice: "alice_ml1m.csv", bob: "bob_ml1m.csv"}, keys="ID", drop_keys="ID"
)
label = vdf["Rating"]

data = vdf.drop(columns=["Rating", "Timestamp", "Title", "Zip-code"])
data["UserID"] = data["UserID"].astype("string")
data["MovieID"] = data["MovieID"].astype("string")

sl_model = SLModel(
    base_model_dict=base_model_dict,
    device_y=bob,
    model_fuse=model_fuse,
)
history = sl_model.fit(
    data,
    label,
    epochs=5,
    batch_size=128,
    random_seed=1234,
    dataset_builder=data_builder_dict,
)
INFO:root:Create proxy actor <class 'secretflow.ml.nn.sl.backend.tensorflow.sl_base.PYUSLTFModel'> with party alice.
INFO:root:Create proxy actor <class 'secretflow.ml.nn.sl.backend.tensorflow.sl_base.PYUSLTFModel'> with party bob.
INFO:root:SL Train Params: {'self': <secretflow.ml.nn.sl.sl_model.SLModel object at 0x7fc9c99f51c0>, 'x': VDataFrame(partitions={PYURuntime(alice): Partition(data=<secretflow.device.device.pyu.PYUObject object at 0x7fc9c9998160>), PYURuntime(bob): Partition(data=<secretflow.device.device.pyu.PYUObject object at 0x7fc9c998c520>)}, aligned=True), 'y': VDataFrame(partitions={PYURuntime(bob): Partition(data=<secretflow.device.device.pyu.PYUObject object at 0x7fc9c99984f0>)}, aligned=True), 'batch_size': 128, 'epochs': 5, 'verbose': 1, 'callbacks': None, 'validation_data': None, 'shuffle': False, 'sample_weight': None, 'validation_freq': 1, 'dp_spent_step_freq': None, 'dataset_builder': {PYURuntime(alice): <function create_dataset_builder_alice.<locals>.dataset_builder at 0x7fcb640b9700>, PYURuntime(bob): <function create_dataset_builder_bob.<locals>.dataset_builder at 0x7fca2c6b30d0>}, 'audit_log_dir': None, 'audit_log_params': {}, 'random_seed': 1234}
  0%|                                                                                             | 0/8 [00:00<?, ?it/s]