Source code for secretflow.utils.ndarray_bigint
# Copyright 2022 Ant Group Co., Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import random
import math
import numpy as np
from heu import numpy as hnp
[docs]def randbits(shape: tuple, bits):
items = math.prod(shape)
data = [random.getrandbits(bits) - (1 << (bits - 1)) for _ in range(items)]
return BigintNdArray(data, shape)
[docs]def randint(shape: tuple, min, max):
items = math.prod(shape)
data = [random.randint(min, max) for _ in range(items)]
return BigintNdArray(data, shape)
[docs]def arange(max):
return BigintNdArray(list(range(max)), (max,))
[docs]def zeros(shape):
return BigintNdArray([0] * math.prod(shape), shape)
[docs]class BigintNdArray:
[docs] def __init__(self, data, shape):
self.shape = shape
self.data = data
[docs] def resize(self, shape):
assert math.prod(shape) == math.prod(
self.shape
), f"cannot resize array of size {self.shape} into shape {shape}"
self.shape = shape
def __to_list(self, dim, idx): # idx is a list just to make it pass by ref
if dim == len(self.shape) - 1:
dim_res = self.data[idx[0] : idx[0] + self.shape[dim]]
idx[0] += self.shape[dim]
return dim_res
else:
return [self.__to_list(dim + 1, idx) for _ in range(self.shape[dim])]
[docs] def to_list(self):
return self.__to_list(0, [0])
[docs] def to_numpy(self):
return np.array(self.to_list())
[docs] def to_hnp(self, encoder):
return hnp.array(self.to_list(), encoder=encoder)
[docs] def to_bytes(self, bytes_per_int, byteorder='little'):
mask = (1 << bytes_per_int * 8) - 1
res = bytearray()
for d in self.data:
res += (d & mask).to_bytes(bytes_per_int, byteorder)
return bytes(res)
def __str__(self):
return str(self.to_list())
def __add__(self, other):
assert (
self.shape == other.shape
), f"Int128 arrays do not support broadcasting, their shape must be the same"
return BigintNdArray([a + b for a, b in zip(self.data, other.data)], self.shape)
def __iadd__(self, other):
assert (
self.shape == other.shape
), f"Int128 arrays do not support broadcasting, their shape must be the same"
self.data = [a + b for a, b in zip(self.data, other.data)]