Spaces:
Sleeping
Sleeping
| import tensorflow as tf | |
| import time | |
| import numpy as np | |
| import matplotlib.pyplot as plt | |
| def gelu(x): | |
| return 0.5 * x * (1.0 + tf.math.erf(x / tf.sqrt(2.))) | |
| def scaled_dot_product_attention(q, k, v, mask,adjoin_matrix): | |
| """Calculate the attention weights. | |
| q, k, v must have matching leading dimensions. | |
| k, v must have matching penultimate dimension, i.e.: seq_len_k = seq_len_v. | |
| The mask has different shapes depending on its type(padding or look ahead) | |
| but it must be broadcastable for addition. | |
| Args: | |
| q: query shape == (..., seq_len_q, depth) | |
| k: key shape == (..., seq_len_k, depth) | |
| v: value shape == (..., seq_len_v, depth_v) | |
| mask: Float tensor with shape broadcastable | |
| to (..., seq_len_q, seq_len_k). Defaults to None. | |
| Returns: | |
| output, attention_weights | |
| """ | |
| matmul_qk = tf.matmul(q, k, transpose_b=True) # (..., seq_len_q, seq_len_k) | |
| # scale matmul_qk | |
| dk = tf.cast(tf.shape(k)[-1], tf.float32) | |
| scaled_attention_logits = matmul_qk / tf.math.sqrt(dk) | |
| # add the mask to the scaled tensor. | |
| if mask is not None: | |
| scaled_attention_logits += (mask * -1e9) | |
| if adjoin_matrix is not None: | |
| #adjoin_matrix1 =tf.where(adjoin_matrix>0,0.0,-1e9) | |
| #scaled_attention_logits += adjoin_matrix1 | |
| #scaled_attention_logits = scaled_attention_logits * adjoin_matrix | |
| scaled_attention_logits += adjoin_matrix | |
| # softmax is normalized on the last axis (seq_len_k) so that the scores | |
| # add up to 1. | |
| attention_weights = tf.nn.softmax(scaled_attention_logits, axis=-1) # (..., seq_len_q, seq_len_k) | |
| output = tf.matmul(attention_weights, v) # (..., seq_len_q, depth_v) | |
| return output, attention_weights | |
| class MultiHeadAttention(tf.keras.layers.Layer): | |
| def __init__(self, d_model, num_heads): | |
| super(MultiHeadAttention, self).__init__() | |
| self.num_heads = num_heads | |
| self.d_model = d_model | |
| assert d_model % self.num_heads == 0 | |
| self.depth = d_model // self.num_heads | |
| self.wq = tf.keras.layers.Dense(d_model) | |
| self.wk = tf.keras.layers.Dense(d_model) | |
| self.wv = tf.keras.layers.Dense(d_model) | |
| self.dense = tf.keras.layers.Dense(d_model) | |
| def split_heads(self, x, batch_size): | |
| """Split the last dimension into (num_heads, depth). | |
| Transpose the result such that the shape is (batch_size, num_heads, seq_len, depth) | |
| """ | |
| x = tf.reshape(x, (batch_size, -1, self.num_heads, self.depth)) | |
| return tf.transpose(x, perm=[0, 2, 1, 3]) | |
| def call(self, v, k, q, mask,adjoin_matrix): | |
| batch_size = tf.shape(q)[0] | |
| q = self.wq(q) # (batch_size, seq_len, d_model) | |
| k = self.wk(k) # (batch_size, seq_len, d_model) | |
| v = self.wv(v) # (batch_size, seq_len, d_model) | |
| q = self.split_heads(q, batch_size) # (batch_size, num_heads, seq_len_q, depth) | |
| k = self.split_heads(k, batch_size) # (batch_size, num_heads, seq_len_k, depth) | |
| v = self.split_heads(v, batch_size) # (batch_size, num_heads, seq_len_v, depth) | |
| # scaled_attention.shape == (batch_size, num_heads, seq_len_q, depth) | |
| # attention_weights.shape == (batch_size, num_heads, seq_len_q, seq_len_k) | |
| scaled_attention, attention_weights = scaled_dot_product_attention( | |
| q, k, v, mask,adjoin_matrix) | |
| scaled_attention = tf.transpose(scaled_attention, | |
| perm=[0, 2, 1, 3]) # (batch_size, seq_len_q, num_heads, depth) | |
| concat_attention = tf.reshape(scaled_attention, | |
| (batch_size, -1, self.d_model)) # (batch_size, seq_len_q, d_model) | |
| output = self.dense(concat_attention) # (batch_size, seq_len_q, d_model) | |
| return output, attention_weights | |
| def point_wise_feed_forward_network(d_model, dff): | |
| return tf.keras.Sequential([ | |
| tf.keras.layers.Dense(dff, activation=gelu), # (batch_size, seq_len, dff)tf.keras.layers.LeakyReLU(0.01) | |
| tf.keras.layers.Dense(d_model) # (batch_size, seq_len, d_model) | |
| ]) | |
| class EncoderLayer(tf.keras.layers.Layer): | |
| def __init__(self, d_model, num_heads, dff, rate=0.1): | |
| super(EncoderLayer, self).__init__() | |
| self.mha = MultiHeadAttention(d_model, num_heads) | |
| self.ffn = point_wise_feed_forward_network(d_model, dff) | |
| self.layernorm1 = tf.keras.layers.LayerNormalization(epsilon=1e-6) | |
| self.layernorm2 = tf.keras.layers.LayerNormalization(epsilon=1e-6) | |
| self.dropout1 = tf.keras.layers.Dropout(rate) | |
| self.dropout2 = tf.keras.layers.Dropout(rate) | |
| def call(self, x, training, mask,adjoin_matrix): | |
| attn_output, attention_weights = self.mha(x, x, x, mask,adjoin_matrix) # (batch_size, input_seq_len, d_model) | |
| attn_output = self.dropout1(attn_output, training=training) | |
| out1 = self.layernorm1(x + attn_output) # (batch_size, input_seq_len, d_model) | |
| ffn_output = self.ffn(out1) # (batch_size, input_seq_len, d_model) | |
| ffn_output = self.dropout2(ffn_output, training=training) | |
| out2 = self.layernorm2(out1 + ffn_output) # (batch_size, input_seq_len, d_model) | |
| return out2,attention_weights | |
| class Encoder(tf.keras.Model): | |
| def __init__(self, num_layers, d_model, num_heads, dff, input_vocab_size, | |
| maximum_position_encoding, rate=0.1): | |
| super(Encoder, self).__init__() | |
| self.d_model = d_model | |
| self.num_layers = num_layers | |
| self.embedding = tf.keras.layers.Embedding(input_vocab_size, d_model) | |
| # self.pos_encoding = positional_encoding(maximum_position_encoding, | |
| # self.d_model) | |
| self.enc_layers = [EncoderLayer(d_model, num_heads, dff, rate) | |
| for _ in range(num_layers)] | |
| self.dropout = tf.keras.layers.Dropout(rate) | |
| def call(self, x, training, mask,adjoin_matrix): | |
| seq_len = tf.shape(x)[1] | |
| adjoin_matrix = adjoin_matrix[:,tf.newaxis,:,:] | |
| # adding embedding and position encoding. | |
| x = self.embedding(x) # (batch_size, input_seq_len, d_model) | |
| x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32)) | |
| x = self.dropout(x, training=training) | |
| for i in range(self.num_layers): | |
| x,attention_weights = self.enc_layers[i](x, training, mask,adjoin_matrix) | |
| return x # (batch_size, input_seq_len, d_model) | |
| class Encoder_test(tf.keras.Model): | |
| def __init__(self, num_layers, d_model, num_heads, dff, input_vocab_size, | |
| maximum_position_encoding, rate=0.1): | |
| super(Encoder_test, self).__init__() | |
| self.d_model = d_model | |
| self.num_layers = num_layers | |
| self.embedding = tf.keras.layers.Embedding(input_vocab_size, d_model) | |
| # self.pos_encoding = positional_encoding(maximum_position_encoding, | |
| # self.d_model) | |
| self.enc_layers = [EncoderLayer(d_model, num_heads, dff, rate) | |
| for _ in range(num_layers)] | |
| self.dropout = tf.keras.layers.Dropout(rate) | |
| def call(self, x, training, mask,adjoin_matrix): | |
| seq_len = tf.shape(x)[1] | |
| adjoin_matrix = adjoin_matrix[:,tf.newaxis,:,:] | |
| # adding embedding and position encoding. | |
| x = self.embedding(x) # (batch_size, input_seq_len, d_model) | |
| x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32)) | |
| # x += self.pos_encoding[:, :seq_len, :] | |
| x = self.dropout(x, training=training) | |
| attention_weights_list = [] | |
| xs = [] | |
| for i in range(self.num_layers): | |
| x,attention_weights = self.enc_layers[i](x, training, mask,adjoin_matrix) | |
| attention_weights_list.append(attention_weights) | |
| xs.append(x) | |
| return x,attention_weights_list,xs | |
| class BertModel_test(tf.keras.Model): | |
| def __init__(self,num_layers = 6,d_model = 256,dff = 512,num_heads = 8,vocab_size = 17,dropout_rate = 0.1): | |
| super(BertModel_test, self).__init__() | |
| self.encoder = Encoder_test(num_layers=num_layers,d_model=d_model, | |
| num_heads=num_heads,dff=dff,input_vocab_size=vocab_size,maximum_position_encoding=200,rate=dropout_rate) | |
| self.fc1 = tf.keras.layers.Dense(d_model, activation=gelu) | |
| self.layernorm = tf.keras.layers.LayerNormalization(-1) | |
| self.fc2 = tf.keras.layers.Dense(vocab_size) | |
| def call(self,x,adjoin_matrix,mask,training=False): | |
| x,att,xs = self.encoder(x,training=training,mask=mask,adjoin_matrix=adjoin_matrix) | |
| x = self.fc1(x) | |
| x = self.layernorm(x) | |
| x = self.fc2(x) | |
| return x,att,xs | |
| class BertModel(tf.keras.Model): | |
| def __init__(self,num_layers = 6,d_model = 256,dff = 512,num_heads = 8,vocab_size = 17,dropout_rate = 0.1): | |
| super(BertModel, self).__init__() | |
| self.encoder = Encoder(num_layers=num_layers,d_model=d_model, | |
| num_heads=num_heads,dff=dff,input_vocab_size=vocab_size,maximum_position_encoding=200,rate=dropout_rate) | |
| self.fc1 = tf.keras.layers.Dense(d_model, activation=gelu) | |
| self.layernorm = tf.keras.layers.LayerNormalization(-1) | |
| self.fc2 = tf.keras.layers.Dense(vocab_size) | |
| def call(self,x,adjoin_matrix,mask,training=False): | |
| x = self.encoder(x,training=training,mask=mask,adjoin_matrix=adjoin_matrix) | |
| x = self.fc1(x) | |
| x = self.layernorm(x) | |
| x = self.fc2(x) | |
| return x | |
| class PredictModel(tf.keras.Model): | |
| def __init__(self,num_layers = 8,d_model = 256,dff = 512,num_heads = 8,vocab_size =17,dropout_rate = 0.1,dense_dropout=0.1): | |
| super(PredictModel, self).__init__() | |
| self.encoder = Encoder(num_layers=num_layers,d_model=d_model, | |
| num_heads=num_heads,dff=dff,input_vocab_size=vocab_size,maximum_position_encoding=200,rate=dropout_rate) | |
| self.fc1 = tf.keras.layers.Dense(256,activation=tf.keras.layers.LeakyReLU(0.25)) | |
| self.fc2 = tf.keras.layers.Dense(256,activation=tf.keras.layers.LeakyReLU(0.25)) | |
| self.dropout = tf.keras.layers.Dropout(dense_dropout) | |
| self.fc3 = tf.keras.layers.Dense(1) | |
| def call(self,x,adjoin_matrix,mask,training=False): | |
| x = self.encoder(x,training=training,mask=mask,adjoin_matrix=adjoin_matrix) | |
| x = x[:,0,:] | |
| x = self.fc1(x) | |
| x = self.dropout(x,training=training) | |
| x = self.fc2(x) | |
| x = self.fc3(x) | |
| return x | |
| class PredictModel_test(tf.keras.Model): | |
| def __init__(self,num_layers = 6,d_model = 256,dff = 512,num_heads = 8,vocab_size =17,dropout_rate = 0.1,dense_dropout=0.5): | |
| super(PredictModel_test, self).__init__() | |
| self.encoder = Encoder_test(num_layers=num_layers,d_model=d_model, | |
| num_heads=num_heads,dff=dff,input_vocab_size=vocab_size,maximum_position_encoding=200,rate=dropout_rate) | |
| self.fc1 = tf.keras.layers.Dense(256, activation=tf.keras.layers.LeakyReLU(0.1)) | |
| self.dropout = tf.keras.layers.Dropout(dense_dropout) | |
| self.fc2 = tf.keras.layers.Dense(1) | |
| def call(self,x,adjoin_matrix,mask,training=False): | |
| x,att,xs = self.encoder(x,training=training,mask=mask,adjoin_matrix=adjoin_matrix) | |
| x = x[:, 0, :] | |
| x = self.fc1(x) | |
| x = self.dropout(x, training=training) | |
| x = self.fc2(x) | |
| return x,att,xs | |