Source code for secretflow.data.io.oss
# Copyright 2022 Ant Group Co., Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import base64
import os
from distutils.util import strtobool
import s3fs as s3
_S3_ENDPOINT = 'DATA_STORAGE_S3_ENDPOINT'
_S3_ACCESSKEYID = 'DATA_STORAGE_S3_ACCESSKEYID'
_S3_ACCESSSECRET = 'DATA_STORAGE_S3_SECRETKEY'
_S3_VIRTUALHOSTED = 'DATA_STORAGE_S3_VIRTUAL_HOSTED'
_SCHEME = 'oss://'
[docs]def s3fs():
"""Return a s3 filesystem instance."""
endpoint = os.environ.get(_S3_ENDPOINT)
ak = os.environ.get(_S3_ACCESSKEYID)
sk = os.environ.get(_S3_ACCESSSECRET)
assert endpoint is not None, f'{_S3_ENDPOINT} not set'
assert ak is not None, f'{_S3_ACCESSKEYID} not set'
assert sk is not None, f'{_S3_ACCESSSECRET} not set'
addressing_style = 'path'
try:
if strtobool(os.environ.get(_S3_VIRTUALHOSTED)):
addressing_style = 'virtual'
except Exception:
pass
ak, sk = base64.b64decode(ak).decode("utf-8"), base64.b64decode(sk).decode("utf-8")
if not endpoint.startswith('http'):
endpoint = f'http://{endpoint}'
return s3.S3FileSystem(
anon=False,
key=ak,
secret=sk,
client_kwargs={'endpoint_url': endpoint},
config_kwargs={'s3': {'addressing_style': addressing_style}},
)
[docs]def open(path, mode='rb'):
"""Open a oss object.
Args:
path: oss file path.
mode: optional; open mode.
Returns:
A file-like object.
"""
assert path.startswith(_SCHEME), f'Invalid path: {path}, should be oss://...'
s3 = s3fs()
return s3.open(path[len(_SCHEME) :], mode)