def scaled_dot_product_attention(query, key, value, mask): # query 크기 : (batch_size, num_heads, query의 문장 길이, d_model/num_heads) # key 크기 : (batch_size, num_heads, key의 문장 길이, d_model/num_heads) # value 크기 : (batch_size, num_heads, value의 문장 길이, d_model/num_heads) # padding_mask : (batch_size, 1, 1, key의 문장 길이) # Q와 K의 곱. 어텐션 스코어 행렬. matmul_qk = tf.matmul(query, key, transpose_b=True) # 스케일링 # dk의 루트값으로 나눠준다. depth = tf.cast(tf.shape(key)[-1], tf.float32) logits = matmul_qk / tf.math.sqrt(depth) # 마스킹. 어텐션 스코어 행렬의 마스킹 할 위치에 매우 작은 음수값을 넣는다. # 매우 작은 값이므로 소프트맥스 함수를 지나면 행렬의 해당 위치의 값은 0이 된다. if mask is not None: logits += (mask * -1e9) # 소프트맥스 함수는 마지막 차원인 key의 문장 길이 방향으로 수행된다. # attention weight : (batch_size, num_heads, query의 문장 길이, key의 문장 길이) attention_weights = tf.nn.softmax(logits, axis=-1) # output : (batch_size, num_heads, query의 문장 길이, d_model/num_heads) output = tf.matmul(attention_weights, value) return output, attention_weights
class MultiHeadAttention(tf.keras.layers.Layer): def __init__(self, d_model, num_heads, name="multi_head_attention"): super(MultiHeadAttention, self).__init__(name=name) self.num_heads = num_heads self.d_model = d_model assert d_model % self.num_heads == 0 # d_model을 num_heads로 나눈 값. # 논문 기준 : 64 self.depth = d_model // self.num_heads # WQ, WK, WV에 해당하는 밀집층 정의 self.query_dense = tf.keras.layers.Dense(units=d_model) self.key_dense = tf.keras.layers.Dense(units=d_model) self.value_dense = tf.keras.layers.Dense(units=d_model) # WO에 해당하는 밀집층 정의 self.dense = tf.keras.layers.Dense(units=d_model) # num_heads 개수만큼 q, k, v를 split하는 함수 def split_heads(self, inputs, batch_size): inputs = tf.reshape( inputs, shape=(batch_size, -1, self.num_heads, self.depth)) return tf.transpose(inputs, perm=[0, 2, 1, 3]) def call(self, inputs): query, key, value, mask = inputs['query'], inputs['key'], inputs[ 'value'], inputs['mask'] batch_size = tf.shape(query)[0] # 1. WQ, WK, WV에 해당하는 밀집층 지나기 # q : (batch_size, query의 문장 길이, d_model) # k : (batch_size, key의 문장 길이, d_model) # v : (batch_size, value의 문장 길이, d_model) # 참고) 인코더(k, v)-디코더(q) 어텐션에서는 query 길이와 key, value의 길이는 다를 수 있다. query = self.query_dense(query) key = self.key_dense(key) value = self.value_dense(value) # 2. 헤드 나누기 # q : (batch_size, num_heads, query의 문장 길이, d_model/num_heads) # k : (batch_size, num_heads, key의 문장 길이, d_model/num_heads) # v : (batch_size, num_heads, value의 문장 길이, d_model/num_heads) query = self.split_heads(query, batch_size) key = self.split_heads(key, batch_size) value = self.split_heads(value, batch_size) # 3. 스케일드 닷 프로덕트 어텐션. 앞서 구현한 함수 사용. # (batch_size, num_heads, query의 문장 길이, d_model/num_heads) scaled_attention, _ = scaled_dot_product_attention(query, key, value, mask) # (batch_size, query의 문장 길이, num_heads, d_model/num_heads) scaled_attention = tf.transpose(scaled_attention, perm=[0, 2, 1, 3]) # 4. 헤드 연결(concatenate)하기 # (batch_size, query의 문장 길이, d_model) concat_attention = tf.reshape(scaled_attention, (batch_size, -1, self.d_model)) # 5. WO에 해당하는 밀집층 지나기 # (batch_size, query의 문장 길이, d_model) outputs = self.dense(concat_attention) return outputs
import pandas as pdimport urllib.requestimport tensorflow_datasets as tfdsimport tensorflow as tfimport timeimport numpy as npimport matplotlib.pyplot as pltimport re
['12시 땡 !', '1지망 학교 떨어졌어', '3박4일 놀러가고 싶다', '3박4일 정도 놀러가고 싶다', 'PPL 심하네']
['하루가 또 가네요 .', '위로해 드립니다 .', '여행은 언제나 좋죠 .', '여행은 언제나 좋죠 .', '눈살이 찌푸려지죠 .']
# 서브워드텍스트인코더를 사용하여 질문과 답변을 모두 포함한 단어 집합(Vocabulary) 생성tokenizer = tfds.deprecated.text.SubwordTextEncoder.build_from_corpus( questions + answers, target_vocab_size=2**13)# 시작 토큰과 종료 토큰에 대한 정수 부여.START_TOKEN, END_TOKEN = [tokenizer.vocab_size], [tokenizer.vocab_size + 1]# 시작 토큰과 종료 토큰을 고려하여 단어 집합의 크기를 + 2VOCAB_SIZE = tokenizer.vocab_size + 2
print('시작 토큰 번호 :',START_TOKEN)print('종료 토큰 번호 :',END_TOKEN)print('단어 집합의 크기 :',VOCAB_SIZE)
시작 토큰 번호 : [8178]
종료 토큰 번호 : [8179]
단어 집합의 크기 : 8180
# 서브워드텍스트인코더 토크나이저의 .encode()를 사용하여 텍스트 시퀀스를 정수 시퀀스로 변환.print('Tokenized sample question: {}'.format(tokenizer.encode(questions[20])))
# 서브워드텍스트인코더 토크나이저의 .encode()와 decode() 테스트해보기# 임의의 입력 문장을 sample_string에 저장sample_string = questions[20]# encode() : 텍스트 시퀀스 --> 정수 시퀀스tokenized_string = tokenizer.encode(sample_string)print ('정수 인코딩 후의 문장 {}'.format(tokenized_string))# decode() : 정수 시퀀스 --> 텍스트 시퀀스original_string = tokenizer.decode(tokenized_string)print ('기존 문장: {}'.format(original_string))
정수 인코딩 후의 문장 [5766, 611, 3509, 141, 685, 3747, 849]
기존 문장: 가스비 비싼데 감기 걸리겠어
# 각 정수는 각 단어와 어떻게 mapping되는지 병렬로 출력# 서브워드텍스트인코더는 의미있는 단위의 서브워드로 토크나이징한다. 띄어쓰기 단위 X 형태소 분석 단위 Xfor ts in tokenized_string: print ('{} ----> {}'.format(ts, tokenizer.decode([ts])))
5766 ----> 가스
611 ----> 비
3509 ----> 비싼
141 ----> 데
685 ----> 감기
3747 ----> 걸리
849 ----> 겠어
# 최대 길이를 40으로 정의MAX_LENGTH = 40# 토큰화 / 정수 인코딩 / 시작 토큰과 종료 토큰 추가 / 패딩def tokenize_and_filter(inputs, outputs): tokenized_inputs, tokenized_outputs = [], [] for (sentence1, sentence2) in zip(inputs, outputs): # encode(토큰화 + 정수 인코딩), 시작 토큰과 종료 토큰 추가 sentence1 = START_TOKEN + tokenizer.encode(sentence1) + END_TOKEN sentence2 = START_TOKEN + tokenizer.encode(sentence2) + END_TOKEN tokenized_inputs.append(sentence1) tokenized_outputs.append(sentence2) # 패딩 tokenized_inputs = tf.keras.preprocessing.sequence.pad_sequences( tokenized_inputs, maxlen=MAX_LENGTH, padding='post') tokenized_outputs = tf.keras.preprocessing.sequence.pad_sequences( tokenized_outputs, maxlen=MAX_LENGTH, padding='post') return tokenized_inputs, tokenized_outputs
print('단어 집합의 크기(Vocab size): {}'.format(VOCAB_SIZE))print('전체 샘플의 수(Number of samples): {}'.format(len(questions)))
단어 집합의 크기(Vocab size): 8180
전체 샘플의 수(Number of samples): 11823
# 텐서플로우 dataset을 이용하여 셔플(shuffle)을 수행하되, 배치 크기로 데이터를 묶는다.# 또한 이 과정에서 교사 강요(teacher forcing)을 사용하기 위해서 디코더의 입력과 실제값 시퀀스를 구성한다.BATCH_SIZE = 64BUFFER_SIZE = 20000# 디코더의 실제값 시퀀스에서는 시작 토큰을 제거해야 한다.dataset = tf.data.Dataset.from_tensor_slices(( { 'inputs': questions, 'dec_inputs': answers[:, :-1] # 디코더의 입력. 마지막 패딩 토큰이 제거된다. }, { 'outputs': answers[:, 1:] # 맨 처음 토큰이 제거된다. 다시 말해 시작 토큰이 제거된다. },))dataset = dataset.cache()dataset = dataset.shuffle(BUFFER_SIZE)dataset = dataset.batch(BATCH_SIZE)dataset = dataset.prefetch(tf.data.experimental.AUTOTUNE)
# 임의의 샘플에 대해서 [:, :-1]과 [:, 1:]이 어떤 의미를 가지는지 테스트해본다.print(answers[0]) # 기존 샘플print(answers[:1][:, :-1]) # 마지막 패딩 토큰 제거하면서 길이가 39가 된다.print(answers[:1][:, 1:]) # 맨 처음 토큰이 제거된다. 다시 말해 시작 토큰이 제거된다. 길이는 역시 39가 된다.
def evaluate(sentence): sentence = preprocess_sentence(sentence) sentence = tf.expand_dims( START_TOKEN + tokenizer.encode(sentence) + END_TOKEN, axis=0) output = tf.expand_dims(START_TOKEN, 0) # 디코더의 예측 시작 for i in range(MAX_LENGTH): predictions = model(inputs=[sentence, output], training=False) # 현재(마지막) 시점의 예측 단어를 받아온다. predictions = predictions[:, -1:, :] predicted_id = tf.cast(tf.argmax(predictions, axis=-1), tf.int32) # 만약 마지막 시점의 예측 단어가 종료 토큰이라면 예측을 중단 if tf.equal(predicted_id, END_TOKEN[0]): break # 마지막 시점의 예측 단어를 출력에 연결한다. # 이는 for문을 통해서 디코더의 입력으로 사용될 예정이다. output = tf.concat([output, predicted_id], axis=-1) return tf.squeeze(output, axis=0)def predict(sentence): prediction = evaluate(sentence) predicted_sentence = tokenizer.decode( [i for i in prediction if i < tokenizer.vocab_size]) print('Input: {}'.format(sentence)) print('Output: {}'.format(predicted_sentence)) return predicted_sentence