在SecretFlow中加载csv数据#

下面的代码只是示例,由于系统安全问题,请勿在 生产环境中直接使用。

这篇教程会通过几个示例来展示,如何在SecretFlow中加载CSV数据,进而可以利用SecretFlow进行数据处理,建模。

设置#

[1]:
%load_ext autoreload
%autoreload 2
[2]:
import secretflow as sf

# Check the version of your SecretFlow
print('The version of SecretFlow: {}'.format(sf.__version__))

# In case you have a running secretflow runtime already.
sf.shutdown()
sf.init(['alice', 'bob', 'charlie'], address="local", log_to_driver=True)
alice, bob, charlie = sf.PYU('alice'), sf.PYU('bob'), sf.PYU('charlie')
2023-04-13 13:45:04,665 INFO worker.py:1538 -- Started a local Ray instance.

接口介绍#

我们在SecretFlow中提供了类似于pandas.read_csv的接口来将各方数据的CSV读取成为一个联邦概念的统一的数据。

  • 对于水平场景有 secretflow.horizontal.read_csv API

  • 对于垂直场景有 secretflow.vertical.read_csv API

通过read_csv可以读取多方的csv文件,构成一个FedDataFrame。

构建联邦表
联邦表是一个跨多方的虚拟概念。
  1. 联邦表中各方的数据存储在本地,不允许出域。

  2. 除了拥有数据的一方之外,没有人可以访问数据存储。

  3. 联邦表的任何操作都会由Driver调度给每个Worker,执行指令会逐层传递,直到特定拥有数据的Worker的Python Runtime。 框架确保只有 Worker.device 和 Object.device相同的时候才能够操作数据。

  4. 联合表旨在从中心角度管理和操作多方数据

  5. 接口方面和 pandas.DataFrame 对齐,以降低多方数据操作的成本。

vdataframe.png

数据下载,分割#

[3]:
%%capture
%%!
wget https://secretflow-data.oss-accelerate.aliyuncs.com/datasets/iris/iris.csv
E0413 13:45:12.618292213  130484 fork_posix.cc:76]           Other threads are currently calling into gRPC, skipping fork() handlers
[4]:
import pandas as pd

alldata_df = pd.read_csv("./iris.csv")
[5]:
len(alldata_df)
[5]:
150
[6]:
h_alice_df = alldata_df.loc[:70]
h_bob_df = alldata_df.loc[70:]

将水平拆分后的dataframe分别保存成csv。

[7]:
# save the data to local file system
import tempfile

_, h_alice_path = tempfile.mkstemp()
_, h_bob_path = tempfile.mkstemp()
h_alice_df.to_csv(h_alice_path, index=False)
h_bob_df.to_csv(h_bob_path, index=False)
[8]:
v_alice_df = alldata_df.loc[:, ['sepal_length', 'sepal_width']]
v_bob_df = alldata_df.loc[:, ['petal_length', 'petal_width', 'class']]

将垂直拆分后的dataframe分别保存成csv。

[9]:
# save the data to local file system
_, v_alice_path = tempfile.mkstemp()
_, v_bob_path = tempfile.mkstemp()
v_alice_df.to_csv(v_alice_path, index=True, index_label="id")
v_bob_df.to_csv(v_bob_path, index=True, index_label="id")

接下来我们以水平为例介绍如何加载csv数据#

[10]:
from secretflow.data.horizontal import read_csv
from secretflow.security.aggregation.plain_aggregator import PlainAggregator
from secretflow.security.compare.plain_comparator import PlainComparator
from secretflow.data.split import train_test_split

首先准备好两方的数据csv文件,水平场景要求两方的schema是一致的。

  • Alice: datapath (alice机器能访问到的本地路径)

  • Bob: datapath (bob机器能访问到的本地路径)

因为水平场景相同schema的数据分布在多方,所以在进行一些dataframe操作的时候需要跨域计算。read_csv接口需要传入aggregatorcomparator,我们可以在计算的时候指定安全聚合器安全比较器来对数据隐私进行保护。

[11]:
path_dict = {alice: h_alice_path, bob: h_bob_path}

aggregator = PlainAggregator(charlie)
comparator = PlainComparator(charlie)

hdf = read_csv(filepath=path_dict, aggregator=aggregator, comparator=comparator)
[12]:
hdf.columns
[12]:
Index(['sepal_length', 'sepal_width', 'petal_length', 'petal_width', 'class'], dtype='object')

参考文档使用FedDataFrame进行数据预处理

[13]:
label = hdf["class"]
data = hdf.drop(columns="class")

得到的datalabel即可以作为输入传入FLModel或者SLModel进行建模。

隐语提供了train_test_split可以对数据进行拆分。

[14]:
train_data, test_data = train_test_split(
    data, train_size=0.8, shuffle=True, random_state=1234
)
[15]:
print(train_data.partition_shape(), test_data.partition_shape())
{alice: (56, 4), bob: (64, 4)} {alice: (15, 4), bob: (16, 4)}

接下来我们以垂直为例介绍如何加载csv数据#

首先准备好两方的数据csv文件,垂直场景两方数据并不强制要求对齐,我们在read_csv接口中提供了PSI的能力。

  • Alice: datapath (alice机器能访问到的本地路径)

  • Bob: datapath (bob机器能访问到的本地路径)

垂直场景是各方的schema不同,但是每一方都拥有每一个column的全部数据。不再需要比较器和聚合器。但是各方数据不一定是对齐的,我们需要在读取时候通过PSI来进行数据对齐。

  • path_dict:数据路径

  • spu: 用于求交使用的spu设备

  • keys: 用于求交的keys(支持多列求交)

  • drop_keys: 求交后需要删去的ID列名

[16]:
spu = sf.SPU(sf.utils.testing.cluster_def(['alice', 'bob']))
[17]:
spu
[17]:
<secretflow.device.device.spu.SPU at 0x7f3b808347f0>
[18]:
from secretflow.data.vertical import read_csv
[19]:
path_dict = {
    alice: v_alice_path,  # The path that alice can access
    bob: v_bob_path,  # The path that bob can access
}

# Prepare the SPU device
spu = sf.SPU(sf.utils.testing.cluster_def(['alice', 'bob']))

vdf = read_csv(path_dict, spu=spu, keys='id', drop_keys="id")
(SPURuntime pid=23157) 2023-04-13 13:45:34.913 [error] [context.cc:operator():132] connect to rank=1 failed with error [external/yacl/yacl/link/transport/channel_brpc.cc:368] send, rpc failed=112, message=[E111]Fail to connect Socket{id=0 addr=127.0.0.1:44893} (0x0x55850693c900): Connection refused [R1][E112]Not connected to 127.0.0.1:44893 yet, server_id=0 [R2][E112]Not connected to 127.0.0.1:44893 yet, server_id=0 [R3][E112]Not connected to 127.0.0.1:44893 yet, server_id=0
(SPURuntime pid=23518) 2023-04-13 13:45:36.308 [error] [context.cc:operator():132] connect to rank=0 failed with error [external/yacl/yacl/link/transport/channel_brpc.cc:368] send, rpc failed=112, message=[E111]Fail to connect Socket{id=0 addr=127.0.0.1:24875} (0x0x562301a903c0): Connection refused [R1][E112]Not connected to 127.0.0.1:24875 yet, server_id=0 [R2][E112]Not connected to 127.0.0.1:24875 yet, server_id=0 [R3][E112]Not connected to 127.0.0.1:24875 yet, server_id=0
(SPURuntime pid=23518) 2023-04-13 13:45:37.312 [info] [bucket_psi.cc:Init:228] bucket size set to 1048576
(SPURuntime pid=23518) 2023-04-13 13:45:37.312 [info] [bucket_psi.cc:Run:97] Begin sanity check for input file: /tmp/tmp0y82frfo, precheck_switch:true
(SPURuntime pid=23518) 2023-04-13 13:45:37.313 [info] [csv_checker.cc:CsvChecker:121] Executing duplicated scripts: LC_ALL=C sort --buffer-size=1G --temporary-directory=/tmp --stable selected-keys.1681364737312870835 | LC_ALL=C uniq -d > duplicate-keys.1681364737312870835
(SPURuntime pid=23518) 2023-04-13 13:45:37.316 [info] [bucket_psi.cc:Run:115] End sanity check for input file: /tmp/tmp0y82frfo, size=150
(SPURuntime pid=23518) 2023-04-13 13:45:37.316 [info] [bucket_psi.cc:Run:133] Skip doing psi, because dataset has been aligned!
(SPURuntime pid=23518) 2023-04-13 13:45:37.316 [info] [bucket_psi.cc:Run:178] Begin post filtering, indices.size=150, should_sort=true
(SPURuntime pid=23518) 2023-04-13 13:45:37.317 [info] [utils.cc:MultiKeySort:88] Executing sort scripts: tail -n +2 /tmp/tmp-sort-in-1681364737316884224 | LC_ALL=C sort --buffer-size=3G --parallel=8 --temporary-directory=./ --stable --field-separator=, --key=1,1 >>/tmp/tmp-sort-out-1681364737316884224
(SPURuntime pid=23518) 2023-04-13 13:45:37.320 [info] [utils.cc:MultiKeySort:90] Finished sort scripts: tail -n +2 /tmp/tmp-sort-in-1681364737316884224 | LC_ALL=C sort --buffer-size=3G --parallel=8 --temporary-directory=./ --stable --field-separator=, --key=1,1 >>/tmp/tmp-sort-out-1681364737316884224, ret=0
(SPURuntime pid=23518) 2023-04-13 13:45:37.320 [info] [bucket_psi.cc:Run:216] End post filtering, in=/tmp/tmp0y82frfo, out=/tmp/tmp0y82frfo.psi_output_94874
(SPURuntime pid=23517) 2023-04-13 13:45:37.312 [info] [bucket_psi.cc:Init:228] bucket size set to 1048576
(SPURuntime pid=23517) 2023-04-13 13:45:37.312 [info] [bucket_psi.cc:Run:97] Begin sanity check for input file: /tmp/tmp5xjv6qs8, precheck_switch:true
(SPURuntime pid=23517) 2023-04-13 13:45:37.313 [info] [csv_checker.cc:CsvChecker:121] Executing duplicated scripts: LC_ALL=C sort --buffer-size=1G --temporary-directory=/tmp --stable selected-keys.1681364737313158626 | LC_ALL=C uniq -d > duplicate-keys.1681364737313158626
(SPURuntime pid=23517) 2023-04-13 13:45:37.316 [info] [bucket_psi.cc:Run:115] End sanity check for input file: /tmp/tmp5xjv6qs8, size=150
(SPURuntime pid=23517) 2023-04-13 13:45:37.316 [info] [bucket_psi.cc:Run:133] Skip doing psi, because dataset has been aligned!
(SPURuntime pid=23517) 2023-04-13 13:45:37.316 [info] [bucket_psi.cc:Run:178] Begin post filtering, indices.size=150, should_sort=true
(SPURuntime pid=23517) 2023-04-13 13:45:37.316 [info] [utils.cc:MultiKeySort:88] Executing sort scripts: tail -n +2 /tmp/tmp-sort-in-1681364737316796390 | LC_ALL=C sort --buffer-size=3G --parallel=8 --temporary-directory=./ --stable --field-separator=, --key=1,1 >>/tmp/tmp-sort-out-1681364737316796390
(SPURuntime pid=23517) 2023-04-13 13:45:37.319 [info] [utils.cc:MultiKeySort:90] Finished sort scripts: tail -n +2 /tmp/tmp-sort-in-1681364737316796390 | LC_ALL=C sort --buffer-size=3G --parallel=8 --temporary-directory=./ --stable --field-separator=, --key=1,1 >>/tmp/tmp-sort-out-1681364737316796390, ret=0
(SPURuntime pid=23517) 2023-04-13 13:45:37.319 [info] [bucket_psi.cc:Run:216] End post filtering, in=/tmp/tmp5xjv6qs8, out=/tmp/tmp5xjv6qs8.psi_output_94874
[20]:
vdf.columns
[20]:
Index(['sepal_length', 'sepal_width', 'petal_length', 'petal_width', 'class'], dtype='object')
[21]:
label = vdf["class"]
data = vdf.drop(columns="class")

同样这里也可以通过train_test_split来进行切分。

[22]:
train_data, test_data = train_test_split(
    data, train_size=0.8, shuffle=True, random_state=1234
)

接下来,可以用自己的csv数据尝试一下#