import tensorflow as tf
from tensorflow.keras import activations
from tensorflow.keras import backend as K
from tensorflow.keras import constraints, initializers, regularizers
from tensorflow.keras.layers import Dropout, Layer, LeakyReLU
[docs]class GraphAttention(Layer):
[docs] def __init__(
self,
F_,
attn_heads=1,
attn_heads_reduction='average', # {'concat', 'average'}
dropout_rate=0.5,
activation='relu',
use_bias=True,
kernel_initializer='glorot_uniform',
bias_initializer='zeros',
attn_kernel_initializer='glorot_uniform',
kernel_regularizer=None,
bias_regularizer=None,
attn_kernel_regularizer=None,
activity_regularizer=None,
kernel_constraint=None,
bias_constraint=None,
attn_kernel_constraint=None,
**kwargs,
):
if attn_heads_reduction not in {'concat', 'average'}:
raise ValueError('Possbile reduction methods: concat, average')
self.F_ = F_ # Number of output features (F' in the paper)
self.attn_heads = attn_heads # Number of attention heads (K in the paper)
self.attn_heads_reduction = attn_heads_reduction # Eq. 5 and 6 in the paper
self.dropout_rate = dropout_rate # Internal dropout rate
self.activation = activations.get(activation) # Eq. 4 in the paper
self.use_bias = use_bias
self.kernel_initializer = initializers.get(kernel_initializer)
self.bias_initializer = initializers.get(bias_initializer)
self.attn_kernel_initializer = initializers.get(attn_kernel_initializer)
self.kernel_regularizer = regularizers.get(kernel_regularizer)
self.bias_regularizer = regularizers.get(bias_regularizer)
self.attn_kernel_regularizer = regularizers.get(attn_kernel_regularizer)
self.activity_regularizer = regularizers.get(activity_regularizer)
self.kernel_constraint = constraints.get(kernel_constraint)
self.bias_constraint = constraints.get(bias_constraint)
self.attn_kernel_constraint = constraints.get(attn_kernel_constraint)
self.supports_masking = False
# Populated by build()
self.kernels = [] # Layer kernels for attention heads
self.biases = [] # Layer biases for attention heads
self.attn_kernels = [] # Attention kernels for attention heads
if attn_heads_reduction == 'concat':
# Output will have shape (..., K * F')
self.output_dim = self.F_ * self.attn_heads
else:
# Output will have shape (..., F')
self.output_dim = self.F_
super(GraphAttention, self).__init__(**kwargs)
[docs] def build(self, input_shape):
assert len(input_shape) >= 2
F = input_shape[0][-1]
# Initialize weights for each attention head
for head in range(self.attn_heads):
# Layer kernel
kernel = self.add_weight(
shape=(F, self.F_),
initializer=self.kernel_initializer,
regularizer=self.kernel_regularizer,
constraint=self.kernel_constraint,
name='kernel_{}'.format(head),
)
self.kernels.append(kernel)
# # Layer bias
if self.use_bias:
bias = self.add_weight(
shape=(self.F_,),
initializer=self.bias_initializer,
regularizer=self.bias_regularizer,
constraint=self.bias_constraint,
name='bias_{}'.format(head),
)
self.biases.append(bias)
# Attention kernels
attn_kernel_self = self.add_weight(
shape=(self.F_, 1),
initializer=self.attn_kernel_initializer,
regularizer=self.attn_kernel_regularizer,
constraint=self.attn_kernel_constraint,
name='attn_kernel_self_{}'.format(head),
)
attn_kernel_neighs = self.add_weight(
shape=(self.F_, 1),
initializer=self.attn_kernel_initializer,
regularizer=self.attn_kernel_regularizer,
constraint=self.attn_kernel_constraint,
name='attn_kernel_neigh_{}'.format(head),
)
self.attn_kernels.append([attn_kernel_self, attn_kernel_neighs])
self.built = True
[docs] def call(self, inputs):
X = inputs[0] # Node features (N x F)
A = inputs[1] # Adjacency matrix (N x N)
outputs = []
for head in range(self.attn_heads):
kernel = self.kernels[head] # W in the paper (F x F')
attention_kernel = self.attn_kernels[
head
] # Attention kernel a in the paper (2F' x 1)
# Compute inputs to attention network
features = K.dot(X, kernel) # (N x F')
# Compute feature combinations
# Note: [[a_1], [a_2]]^T [[Wh_i], [Wh_2]] = [a_1]^T [Wh_i] + [a_2]^T [Wh_j]
attn_for_self = K.dot(
features, attention_kernel[0]
) # (N x 1), [a_1]^T [Wh_i]
attn_for_neighs = K.dot(
features, attention_kernel[1]
) # (N x 1), [a_2]^T [Wh_j]
# Attention head a(Wh_i, Wh_j) = a^T [[Wh_i], [Wh_j]]
dense = attn_for_self + K.transpose(
attn_for_neighs
) # (N x N) via broadcasting
# Add nonlinearty
dense = LeakyReLU(alpha=0.2)(dense)
# Mask values before activation (Vaswani et al., 2017)
mask = -10e9 * (1.0 - A)
dense += mask
# Apply softmax to get attention coefficients
dense = K.softmax(dense) # (N x N)
# Apply dropout to features and attention coefficients
dropout_attn = Dropout(self.dropout_rate)(dense) # (N x N)
dropout_feat = Dropout(self.dropout_rate)(features) # (N x F')
# Linear combination with neighbors' features
node_features = K.dot(dropout_attn, dropout_feat) # (N x F')
if self.use_bias:
node_features = K.bias_add(node_features, self.biases[head])
# Add output of attention head to final output
outputs.append(node_features)
# Aggregate the heads' output according to the reduction method
if self.attn_heads_reduction == 'concat':
output = K.concatenate(outputs) # (N x KF')
else:
output = K.mean(K.stack(outputs), axis=0) # N x F')
output = self.activation(output)
return output
[docs] def compute_output_shape(self, input_shape):
output_shape = input_shape[0][0], self.output_dim
return output_shape
[docs] def get_config(self):
config = super().get_config().copy()
config.update(
{
'attn_heads': self.attn_heads,
'attn_heads_reduction': self.attn_heads_reduction,
'F_': self.F_,
}
)
return config
[docs]class ServerNet(tf.keras.layers.Layer):
[docs] def __init__(
self,
in_channel: int,
hidden_size: int,
num_layer: int,
num_class: int,
dropout: float,
**kwargs,
):
super(ServerNet, self).__init__()
self.num_class = num_class
self.num_layer = num_layer
self.hidden_size = hidden_size
self.in_channel = in_channel
self.dropout = dropout
self.layers = []
super(ServerNet, self).__init__(**kwargs)
[docs] def build(self, input_shape):
self.layers.append(
tf.keras.layers.Dense(self.hidden_size, input_shape=(self.in_channel,))
)
for i in range(self.num_layer - 2):
self.layers.append(
tf.keras.layers.Dense(self.hidden_size, input_shape=(self.hidden_size,))
)
self.layers.append(
tf.keras.layers.Dense(self.num_class, input_shape=(self.hidden_size,))
)
super(ServerNet, self).build(input_shape)
[docs] def call(self, inputs):
x = inputs
x = Dropout(self.dropout)(x)
for i in range(self.num_layer):
x = Dropout(self.dropout)(x)
x = self.layers[i](x)
return K.softmax(x)
[docs] def compute_output_shape(self, input_shape):
output_shape = self.hidden_size, self.output_dim
return output_shape
[docs] def get_config(self):
config = super().get_config().copy()
config.update(
{
'in_channel': self.in_channel,
'hidden_size': self.hidden_size,
'num_layer': self.num_layer,
'num_class': self.num_class,
'dropout': self.dropout,
}
)
return config