import json
import os
import sys
import numpy as np
from tensorflow.keras.initializers import Constant
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.layers import Embedding, LSTM, Dense, Dropout, Bidirectional
from tensorflow.keras.layers import Flatten, Conv1D, GlobalMaxPooling1D
from tensorflow.keras.models import Sequential
from sklearn.model_selection import train_test_split
###-------- Tokenizer用参数 ----------###
#最高频率单词数
num_words = 1000
#未出现词汇代表符号
oov_token='<oov_tok>'
###-------- Padding用参数 ----------###
#最大Padding长度
MAX_SEQUENCE_LENGTH = 16
#填充方式
padding = 'pre'
#截位方式
trunc_type = 'pre'
###--------Layer参数 -----------------###
# Embedding层参数
EMBEDDING_DIM = 100
###-------训练用参数 -----------------####
# 训练轮数
num_epoch = 10
# 训练集(包括验证集):测试集比率
train_ratio = 0.9
# 验证集:训练集比率
valid_ratio = 0.2
def parse_data(file):
for l in open(file,'r'):
yield json.loads(l)
sarcasm_data = list(parse_data('./Sarcasm_Headlines_Dataset_v2.json'))
def split_data(data):
X = []
y = []
for s in data:
X.append(s['headline'])
y.append(s['is_sarcastic'])
# 训练集(包含验证集):测试集 = 9:1
trainval_size = int(len(X) * train_ratio)
# 训练集及验证集
X_trainval = X[:trainval_size]
y_trainval = y[:trainval_size]
# 训练集:验证集 = 8:2
X_train, X_valid,y_train, y_valid = train_test_split(X_trainval,
y_trainval,
test_size = valid_ratio,
random_state=12)
# 测试集
X_test = X[trainval_size:]
y_test = y[trainval_size:]
return X_train,X_valid,X_test,y_train,y_valid,y_test
train_sentences,valid_sentences,test_sentences,train_labels, valid_labels,test_labels = split_data(sarcasm_data)
# 定义tokenizer
# num_words:the maximum number of words to keep, based on word frequency.
# Only the most common num_words-1 words will be kept.
tokenizer = Tokenizer(num_words = num_words, oov_token=oov_token)
# 训练tokenizer
tokenizer.fit_on_texts(train_sentences)
# tokenizer.word_index
word_index = tokenizer.word_index
print('Found %s unique tokens.' % len(word_index))
def get_paddedSequences(token,sentences):
# tokenizer.texts_to_sequences
sequences = token.texts_to_sequences(sentences)
paddedSequecnes = pad_sequences(sequences,padding=padding, maxlen=MAX_SEQUENCE_LENGTH, truncating=trunc_type)
return paddedSequecnes
train_padded = get_paddedSequences(tokenizer,train_sentences)
valid_padded = get_paddedSequences(tokenizer,valid_sentences)
print(train_padded.shape)
print(valid_padded.shape)
def get_embedding_matrix():
# 初始化embedding_matrix
# EMBEDDING_DIM要与预训练词向量的维度保持一致
embedding_matrix = np.zeros((num_words, EMBEDDING_DIM))
# prepare embedding matrix
embeddings_index = {}
#从GloVe文件中解析出每个词和它所对应的词向量,并用字典的方式存储
with open(os.path.join('glove.6B.100d.txt')) as f:
for line in f:
word, coefs = line.split(maxsplit=1)
coefs = np.fromstring(coefs, 'f', sep=' ')
embeddings_index[word] = coefs
print('Found %s word vectors.' % len(embeddings_index))
#根据得到的字典生成所定义的词向量矩阵
for word, i in word_index.items():
if i >= num_words:
continue
embedding_vector = embeddings_index.get(word)
if embedding_vector is not None:
# words not found in embedding index will be all-zeros.
embedding_matrix[i] = embedding_vector
return embedding_matrix
embedding_matrix = get_embedding_matrix()
# Embedding + Flatten
model = Sequential([
# num_words,embdedding_dim都会影响准确率和损失函数
# # load pre-trained word embeddings into an Embedding layer
# note that we set trainable = False so as to keep the embeddings fixed
# output_shape:(None, MAX_SEQUENCE_LENGTH, EMBEDDING_DIM)
Embedding(num_words,
EMBEDDING_DIM,
#embeddings_initializer=Constant(embedding_matrix),
input_length=MAX_SEQUENCE_LENGTH,
trainable=True),
#trainable=False),
Flatten(),
Dense(6, activation='relu'),
Dense(1, activation='sigmoid')
])
# Embedding + Conv1D + GlobalMaxPooling1D
model = Sequential([
# num_words,embdedding_dim都会影响准确率和损失函数
# # load pre-trained word embeddings into an Embedding layer
# note that we set trainable = False so as to keep the embeddings fixed
Embedding(num_words,
EMBEDDING_DIM,
#embeddings_initializer=Constant(embedding_matrix),
input_length=MAX_SEQUENCE_LENGTH,
trainable=True),
#trainable=False),
Conv1D(128, 5, activation='relu'),
GlobalMaxPooling1D(),
Dense(6, activation='relu'),
Dense(1, activation='sigmoid')
])
# Embedding + LSTM
model = Sequential([
# num_words,embdedding_dim都会影响准确率和损失函数
Embedding(num_words,
EMBEDDING_DIM,
#embeddings_initializer=Constant(embedding_matrix),
input_length=MAX_SEQUENCE_LENGTH,
trainable=True),
#trainable=False),
Bidirectional(LSTM(64)),
Dense(6, activation='relu'),
Dense(1, activation='sigmoid')
])
model.compile(loss='binary_crossentropy',optimizer='adam',metrics=['accuracy'])
model.summary()
%%time
history = model.fit(train_padded,
train_labels,
epochs=num_epoch,
validation_data=(valid_padded,valid_labels),
verbose=2)
import matplotlib.pyplot as plt
def plot_graphs(history, string):
plt.plot(history.history[string])
plt.plot(history.history['val_'+string])
plt.xlabel("Epochs")
plt.ylabel(string)
plt.legend([string, 'val_'+string])
plt.show()
plot_graphs(history, "acc")
plot_graphs(history, "loss")
test_padded = get_paddedSequences(tokenizer,test_sentences)
result = model.evaluate(test_padded, test_labels)
print('Accuracy:{0:.2%}'.format(result[1]))
from sklearn.metrics import f1_score
pred_labels = model.predict(test_padded)
pred_labels_int = [1 if prob[0] > 0.5 else 0 for prob in pred_labels]
print('f1-score:{0:.2%}'.format(f1_score(test_labels, pred_labels_int)))