Source code for secretflow.utils.simulation.tf_gnn_model

import tensorflow as tf
from tensorflow.keras import activations
from tensorflow.keras import backend as K
from tensorflow.keras import constraints, initializers, regularizers
from tensorflow.keras.layers import Dropout, Layer, LeakyReLU


[docs]class GraphAttention(Layer):
[docs] def __init__( self, F_, attn_heads=1, attn_heads_reduction='average', # {'concat', 'average'} dropout_rate=0.5, activation='relu', use_bias=True, kernel_initializer='glorot_uniform', bias_initializer='zeros', attn_kernel_initializer='glorot_uniform', kernel_regularizer=None, bias_regularizer=None, attn_kernel_regularizer=None, activity_regularizer=None, kernel_constraint=None, bias_constraint=None, attn_kernel_constraint=None, **kwargs, ): if attn_heads_reduction not in {'concat', 'average'}: raise ValueError('Possbile reduction methods: concat, average') self.F_ = F_ # Number of output features (F' in the paper) self.attn_heads = attn_heads # Number of attention heads (K in the paper) self.attn_heads_reduction = attn_heads_reduction # Eq. 5 and 6 in the paper self.dropout_rate = dropout_rate # Internal dropout rate self.activation = activations.get(activation) # Eq. 4 in the paper self.use_bias = use_bias self.kernel_initializer = initializers.get(kernel_initializer) self.bias_initializer = initializers.get(bias_initializer) self.attn_kernel_initializer = initializers.get(attn_kernel_initializer) self.kernel_regularizer = regularizers.get(kernel_regularizer) self.bias_regularizer = regularizers.get(bias_regularizer) self.attn_kernel_regularizer = regularizers.get(attn_kernel_regularizer) self.activity_regularizer = regularizers.get(activity_regularizer) self.kernel_constraint = constraints.get(kernel_constraint) self.bias_constraint = constraints.get(bias_constraint) self.attn_kernel_constraint = constraints.get(attn_kernel_constraint) self.supports_masking = False # Populated by build() self.kernels = [] # Layer kernels for attention heads self.biases = [] # Layer biases for attention heads self.attn_kernels = [] # Attention kernels for attention heads if attn_heads_reduction == 'concat': # Output will have shape (..., K * F') self.output_dim = self.F_ * self.attn_heads else: # Output will have shape (..., F') self.output_dim = self.F_ super(GraphAttention, self).__init__(**kwargs)
[docs] def build(self, input_shape): assert len(input_shape) >= 2 F = input_shape[0][-1] # Initialize weights for each attention head for head in range(self.attn_heads): # Layer kernel kernel = self.add_weight( shape=(F, self.F_), initializer=self.kernel_initializer, regularizer=self.kernel_regularizer, constraint=self.kernel_constraint, name='kernel_{}'.format(head), ) self.kernels.append(kernel) # # Layer bias if self.use_bias: bias = self.add_weight( shape=(self.F_,), initializer=self.bias_initializer, regularizer=self.bias_regularizer, constraint=self.bias_constraint, name='bias_{}'.format(head), ) self.biases.append(bias) # Attention kernels attn_kernel_self = self.add_weight( shape=(self.F_, 1), initializer=self.attn_kernel_initializer, regularizer=self.attn_kernel_regularizer, constraint=self.attn_kernel_constraint, name='attn_kernel_self_{}'.format(head), ) attn_kernel_neighs = self.add_weight( shape=(self.F_, 1), initializer=self.attn_kernel_initializer, regularizer=self.attn_kernel_regularizer, constraint=self.attn_kernel_constraint, name='attn_kernel_neigh_{}'.format(head), ) self.attn_kernels.append([attn_kernel_self, attn_kernel_neighs]) self.built = True
[docs] def call(self, inputs): X = inputs[0] # Node features (N x F) A = inputs[1] # Adjacency matrix (N x N) outputs = [] for head in range(self.attn_heads): kernel = self.kernels[head] # W in the paper (F x F') attention_kernel = self.attn_kernels[ head ] # Attention kernel a in the paper (2F' x 1) # Compute inputs to attention network features = K.dot(X, kernel) # (N x F') # Compute feature combinations # Note: [[a_1], [a_2]]^T [[Wh_i], [Wh_2]] = [a_1]^T [Wh_i] + [a_2]^T [Wh_j] attn_for_self = K.dot( features, attention_kernel[0] ) # (N x 1), [a_1]^T [Wh_i] attn_for_neighs = K.dot( features, attention_kernel[1] ) # (N x 1), [a_2]^T [Wh_j] # Attention head a(Wh_i, Wh_j) = a^T [[Wh_i], [Wh_j]] dense = attn_for_self + K.transpose( attn_for_neighs ) # (N x N) via broadcasting # Add nonlinearty dense = LeakyReLU(alpha=0.2)(dense) # Mask values before activation (Vaswani et al., 2017) mask = -10e9 * (1.0 - A) dense += mask # Apply softmax to get attention coefficients dense = K.softmax(dense) # (N x N) # Apply dropout to features and attention coefficients dropout_attn = Dropout(self.dropout_rate)(dense) # (N x N) dropout_feat = Dropout(self.dropout_rate)(features) # (N x F') # Linear combination with neighbors' features node_features = K.dot(dropout_attn, dropout_feat) # (N x F') if self.use_bias: node_features = K.bias_add(node_features, self.biases[head]) # Add output of attention head to final output outputs.append(node_features) # Aggregate the heads' output according to the reduction method if self.attn_heads_reduction == 'concat': output = K.concatenate(outputs) # (N x KF') else: output = K.mean(K.stack(outputs), axis=0) # N x F') output = self.activation(output) return output
[docs] def compute_output_shape(self, input_shape): output_shape = input_shape[0][0], self.output_dim return output_shape
[docs] def get_config(self): config = super().get_config().copy() config.update( { 'attn_heads': self.attn_heads, 'attn_heads_reduction': self.attn_heads_reduction, 'F_': self.F_, } ) return config
[docs]class ServerNet(tf.keras.layers.Layer):
[docs] def __init__( self, in_channel: int, hidden_size: int, num_layer: int, num_class: int, dropout: float, **kwargs, ): super(ServerNet, self).__init__() self.num_class = num_class self.num_layer = num_layer self.hidden_size = hidden_size self.in_channel = in_channel self.dropout = dropout self.layers = [] super(ServerNet, self).__init__(**kwargs)
[docs] def build(self, input_shape): self.layers.append( tf.keras.layers.Dense(self.hidden_size, input_shape=(self.in_channel,)) ) for i in range(self.num_layer - 2): self.layers.append( tf.keras.layers.Dense(self.hidden_size, input_shape=(self.hidden_size,)) ) self.layers.append( tf.keras.layers.Dense(self.num_class, input_shape=(self.hidden_size,)) ) super(ServerNet, self).build(input_shape)
[docs] def call(self, inputs): x = inputs x = Dropout(self.dropout)(x) for i in range(self.num_layer): x = Dropout(self.dropout)(x) x = self.layers[i](x) return K.softmax(x)
[docs] def compute_output_shape(self, input_shape): output_shape = self.hidden_size, self.output_dim return output_shape
[docs] def get_config(self): config = super().get_config().copy() config.update( { 'in_channel': self.in_channel, 'hidden_size': self.hidden_size, 'num_layer': self.num_layer, 'num_class': self.num_class, 'dropout': self.dropout, } ) return config