exodus-stock/torch_model.py

590 lines
18 KiB
Python
Raw Normal View History

2022-04-06 03:56:09 -04:00
import torch
import torch.nn as nn
from torch.utils.tensorboard import SummaryWriter
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error
import numpy as np
import time, math
import yfinance as yf
from indicator_MACD import *
import os
def preproc(df):
minmax = MinMaxScaler().fit(df.iloc[:,2:5].astype('float32')) # Close, Volume, and all
df_log = minmax.transform(df.iloc[:, 2:5].astype('float32')) # Close, Volume, and all
df_log = pd.DataFrame(df_log)
df_log.head()
input()
return df_log,minmax
symbol=yf.Ticker(input("Enter the Ticker : "))
data=symbol.history(interval="1d",period="4y")
data=data[:-7]
real_data = data.iloc[:, 3]
real_data = pd.DataFrame(real_data)
# real_data = real_data["Close"]
print("")
print("Real Data ")
print(real_data[-7:])
tick_macd = macd_data(real_data)
plt.subplot(2, 1, 1)
ax = plt.plot(real_data)
ax = plt.bar(tick_macd.index, tick_macd['Gap'], width=0.8)
plt.subplot(2,1,2)
ax = plt.bar(tick_macd.index, tick_macd['Gap'], width=0.8)
plt.show()
df = pd.DataFrame(data)
# df = tick_macd
# df.index = pd.to_datetime(df.index)
# print((df))
#df = pd.read_csv("./data/AAPL_2020-10-31.csv", header=0, index_col=0)
#df = df[-365:]
# df = df.iloc[:, 3]
# df = df["Close"]
print(data.head())
print(df.head())
# input()
# print(df)
# plt.plot(df)
# plt.show()
# ktest_size = 300
# ktest_data =df[:-test_size]
# ktrain_data = df[-test_size:]
# scaler = MinMaxScaler()
df_test = df
df, scaler = preproc(df)
# scaler = scaler.fit(np.expand_dims(df, axis=1))
# df = scaler.transform(np.expand_dims(df, axis=1))
# train_data = scaler.transform(np.expand_dims(train_data, axis=1))
# test_data = scaler.transform(np.expand_dims(test_data, axis=1))
class LSTM(nn.Module):
def __init__(self, input_dim, hidden_dim, num_layers, output_dim):
super(LSTM, self).__init__()
self.hidden_dim = hidden_dim
self.num_layers = num_layers
self.lstm = nn.LSTM(input_dim, hidden_dim, num_layers, batch_first=True)
self.fc = nn.Linear(hidden_dim, output_dim)
def forward(self, x):
h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_dim).requires_grad_()
c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_dim).requires_grad_()
out, (hnn,cnn) = self.lstm(x, (h0.detach(), c0.detach()))
out = self.fc(out[:,-1,:])
return out
class GRU(nn.Module):
def __init__(self, input_dim, hidden_dim, num_layers, output_dim):
super(GRU, self).__init__()
self.hidden_dim = hidden_dim
self.num_layers = num_layers
self.gru = nn.GRU(input_dim, hidden_dim, num_layers, batch_first=True)
self.fc = nn.Linear(hidden_dim, output_dim)
def forward(self, x):
h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_dim).requires_grad_()
out, (hn) = self.gru(x, (h0.detach()))
out = self.fc(out[:, -1, :])
return out
# def window(data, seq_length):
# xs = []
# ys = []
# # print(data)
# for i in range(len(data)-seq_length-1):
# x = data[i:(i+seq_length)]
# y = data[i+seq_length]
# xs.append(x)
# ys.append(y)
# #plt.plot(ys)
# #plt.plot(ys[0]+xs[0])
# #plt.show()
# print((xs[0]))
# print((ys[0]))
# print("------------")
# print(data[29])
# print(data[30])
# input()
# return np.array(xs), np.array(ys)
def split_window_mod(my_data, lookback):
data_raw = my_data.to_numpy()
# data_raw = my_data
data = []
for index in range(len(data_raw)-lookback):
data.append(data_raw[index : index + lookback+1])
# print(data[-1:])
print("Shape : ", len(data[0]))
print(data[-3:-1])
input()
# data = pd.DataFrame(data)
# data = data.iloc[:, 1:5]
data = np.array(data)
print("Current Data ")
print(data)
input()
# print(data[0])
# print(data[1])
test_set_size = int(np.round(0.3*data.shape[0]));
train_set_size = data.shape[0] - (test_set_size);
print("TEST:",train_set_size)
print("TEST:",test_set_size)
x_train = data[:train_set_size, :-1, :]
print("")
print(" ============== X_train ============== ")
print(x_train[-2:])
print("")
input()
y_train = data[1:train_set_size, -1, :]
print("")
print(" ============== y_train =====")
print(y_train[-2:])
print("")
input()
print("Current Data Y")
# print(x_train[0])
print(y_train[0])
print(y_train[1])
x_test = data[train_set_size:, :-1]
print("")
print(" ============== X_test ============== ")
print(x_test[-2:])
print("")
input()
y_test = data[-test_set_size:-1, -1, :] # Problematic slicing
print("")
print(" ============== y_test ============== ")
print(y_test[-2:])
print("")
input()
print("Current Data ")
# print(x_test[0])
x_train=x_train[:-1]
x_test=x_test[:-1]
print(len(x_train))
print(len(y_train))
print(len(x_test))
print(len(y_test))
return [x_train, y_train, x_test, y_test]
def split_window(my_data, lookback):
# data_raw = my_data.to_numpy()
data_raw = my_data.to_numpy()
data = []
for index in range(len(data_raw)-lookback):
data.append(data_raw[index : index + lookback])
# print(data)
data = np.array(data);
print("Current Data ")
test_set_size = int(np.round(0.1*data.shape[0]));
train_set_size = data.shape[0] - (test_set_size);
x_train = data[:train_set_size, :-1, :]
y_train = data[:train_set_size, -1, :]
x_test = data[train_set_size:, :-1]
y_test = data[train_set_size:, -1, :]
return [x_train, y_train, x_test, y_test]
def window(data, seq_length):
xs = []
ys = []
# print(data)
for i in range(len(data)-seq_length-1):
x = data[i:(i+seq_length)]
try:
y = data[i+7+seq_length]
except:
break
xs.append(x)
ys.append(y)
#plt.plot(ys)
#plt.plot(ys[0]+xs[0])
#plt.show()
print((xs[0]))
print((ys[0]))
print("------------")
print(data[29])
print(data[37])
# input()
return np.array(xs), np.array(ys)
def forecasting():
print("Forecasting . . .")
input_dim = 3
hidden_dim = 128
num_layers = 2
output_dim = 3
num_epochs = int(input("Epoch fof training : "))
lookback = 7
# CUDA
is_cuda = torch.cuda.is_available()
if is_cuda:
device = torch.device("cuda")
torch.cuda.set_device(0)
else:
device = torch.device("cpu")
# REPLACED
# seq_length = 7
# X_train, y_train = window(train_data, seq_length)
# X_test, y_test = window(test_data, seq_length)
# X_train, y_train, X_test, y_test = split_window(df, lookback)
# a, b, c, d = split_window_mod(df_test, lookback)
X_train, y_train, X_test, y_test = split_window_mod(df, lookback)
print(X_test.shape)
#input()
# fill Nan for adjusted forecasting
# filler_X_train = np.empty_like(X_train)
# filler_X_train[:,:] = np.nan
# X_train = np.insert(X_train, 0, X_train[7:14], axis=0)
print("")
print("Fill Attempt X_train ====================================== ")
print(X_train[:8])
input()
# filler_y_train = np.empty_like(y_train)
# filler_y_train[:, :] = np.nan
# y_train = np.append(y_train, y_train[-7:], axis=0)
print("")
print("Fill Attempt y_train ====================================== ")
print(y_train[-8:])
input()
# print("")
# print("Fill Attempt X_test ====================================== ")
# print(X_test.shape)
# # filler_X_test = X_train[-7:]
# # filler_X_test = np.empty_like(X_test)
# # filler_X_test[:,:] = np.nan
# X_test = np.insert(X_test, 0, X_test[7:14], axis=0)
# print(X_test.shape)
# print(X_test[:7])
# input()
#
# filler_y_test = np.empty_like(y_test)
# filler_y_test[:, :] = np.nan
# y_test = np.append(y_test, filler_y_test[-7:], axis=0)
# print("")
# print("Fill Attempt y_test ====================================== ")
# print(y_test[-7:])
# input()
X_train = torch.from_numpy(X_train).type(torch.Tensor)
y_train = torch.from_numpy(y_train).type(torch.Tensor)
X_test = torch.from_numpy(X_test).type(torch.Tensor)
y_test = torch.from_numpy(y_test).type(torch.Tensor)
# X_train = torch.from_numpy(X_train).float()
# y_train = torch.from_numpy(y_train).float()
# X_test = torch.from_numpy(X_test).float()
# y_test = torch.from_numpy(y_test).float()
# X_test = torch.from_numpy(np.append(X_test, future_days)).float()
# y_test = torch.from_numpy(np.append(y_test, future_days)).float()
print(X_test.shape)
#input()
# Tensorboard Setup
writer = SummaryWriter()
model = GRU(input_dim=input_dim, hidden_dim=hidden_dim, output_dim=output_dim, num_layers=num_layers)
# model = LSTM(input_dim=input_dim, hidden_dim=hidden_dim, output_dim=output_dim, num_layers=num_layers)
criterion = torch.nn.MSELoss(reduction='mean')
optimiser = torch.optim.Adam(model.parameters(), lr=0.01)
hist = np.zeros(num_epochs)
start_time = time.time()
gru = []
print("X_train : ")
print(X_train[-1:])
input()
for t in range(num_epochs):
y_train_pred = model(X_train)
loss = criterion(y_train_pred, y_train)
print("Epoch ", t+1, "MSE: ", loss.item())
# y_test_pred = model(X_test)
# loss_valid = criterion(y_test_pred, y_test)
# print("-------------------- Validation MSE : ", loss_valid.item())
hist[t] = loss.item()
optimiser.zero_grad()
loss.backward()
optimiser.step()
writer.add_scalar("Loss/Train", loss.item(), t)
# writer.add_scalar("Validation/Train", loss, t)
training_time = time.time() - start_time
print("Time Spent : {}".format(training_time))
predict = pd.DataFrame(scaler.inverse_transform(y_train_pred.detach().numpy()))
original = pd.DataFrame(scaler.inverse_transform(y_train.detach().numpy()))
plt.subplot(1, 2, 1)
ax = plt.plot(original.index, original[0])
ax = plt.plot(predict.index, predict[0])
plt.subplot(1,2,2)
ax = plt.plot(hist)
plt.show()
# Predict from test data
print("================== Test Data Specification ===========")
print(X_test[-1:])
print(X_test.shape)
for t in range(int(np.round(0.5 * num_epochs))):
y_test_pred = model(X_test)
loss = criterion(y_test_pred, y_test)
print("Epoch ", t+1, "MSE: ", loss.item())
# y_test_pred = model(X_test)
# loss_valid = criterion(y_test_pred, y_test)
# print("-------------------- Validation MSE : ", loss_valid.item())
hist[t] = loss.item()
optimiser.zero_grad()
loss.backward()
optimiser.step()
writer.add_scalar("Loss/Test", loss.item(), t)
y_test = scaler.inverse_transform(y_test.detach().numpy())
test_predict = pd.DataFrame(scaler.inverse_transform(y_test_pred.detach().numpy()))
# test_original = pd.DataFrame(scaler.inverse_transform(y_test.detach().numpy()))
test_original = pd.DataFrame(y_test)
print(test_original)
print(test_predict[:10])
input()
# plt.plot(test_original.index, test_original[0])
# plt.plot(test_original.index, test_original[0], label="Real Data")
# plt.plot(test_predict.index, test_predict[0], marker=".", label="Prediction")
# Bar
plt.bar(test_original.index, test_original[0], label="Real Data")
plt.bar(test_predict.index, test_predict[0], label="Prediction")
plt.title("Test Result ")
plt.show()
# Invert prediction results
y_train_pred = scaler.inverse_transform(y_train_pred.detach().numpy())
y_train = scaler.inverse_transform(y_train.detach().numpy())
y_test_pred = scaler.inverse_transform(y_test_pred.detach().numpy())
# calculate mean square error
# trainScore = math.sqrt(mean_squared_error(y_train[:,0], y_train_pred[:,0]))
# testScore = math.sqrt(mean_squared_error(y_test[:,0], y_test_pred[:,0]))
# print("Train Score: %.2f " % (trainScore))
# print("Test Score: %.2f " % (trainScore))
# gru.append(trainScore)
# gru.append(testScore)
# gru.append(training_time)
filler_df = np.empty_like(df)
filler_df[:, :] = np.nan
df = np.append(df, filler_df[-7:], axis=0)
trainPredictPlot = np.empty_like(df)
trainPredictPlot[:,:] = np.nan
print("Train reference:")
print("[1]", str(len(y_train_pred)+lookback))
print("[2]", str(len(df)-1))
print ( "[ y_train_pred] ", str(len(y_train_pred)) )
# input()
print("")
trainPredictPlot[:len(y_train_pred), :] = y_train_pred
# trainPredictPlot[lookback:len(y_train_pred)+lookback, :] = y_train_pred
print("y_test_pred shape ", y_test_pred.shape)
# Stacked prediction
rrr_predict = X_test[len(X_test)-1][-(lookback-1):]
rrr_predict = np.expand_dims(rrr_predict, axis=0)
rrr_predict = torch.from_numpy(rrr_predict).type(torch.Tensor)
xxx_predict = model(rrr_predict)
vvv_predict = X_test[len(X_test)-1][-(lookback-1):]
print()
print("|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||")
print("Real data ")
print(X_test[-1:])
print("input data: ")
print(vvv_predict)
print("Prediction Result: ")
print(xxx_predict)
zzz_predict = np.append(vvv_predict.detach().numpy(), xxx_predict.detach().numpy(), axis=0) #
print("new X_test n+1 ") #
print(zzz_predict) #
zzz_predict = np.expand_dims(zzz_predict, axis=1) # set axis=1 to get prediction result for 7 days straight
print(zzz_predict) #
aaa_predict = torch.from_numpy(zzz_predict).type(torch.Tensor) #
print("n+1+1 prediction ")
aaa_predict = model(aaa_predict)
# tmp_future = np.empty_like(aaa_predict)
# tmp_future = np.append(tmp_future, aaa_predict.detach().numpy(), axis=0)
# print(tmp_future)
# print("***************************************************************************************")
# for x in range(lookback):
# tmp_val = tmp_future[-(lookback-1):]
#
# aaa_predict = np.expand_dims(aaa_predict.detach().numpy(), axis=0)
# aaa_predict = torch.from_numpy(tmp_val).type(torch.Tensor) #
# aaa_predict = model(aaa_predict)
# print(aaa_predict)
# # tmp_future = np.append(tmp_future[0], aaa_predict[0], axis=0)
# print("***************************************************************************************")
aaa_future = scaler.inverse_transform(aaa_predict.detach().numpy())
y_test_pred = np.append(y_test_pred, aaa_future, axis=0)
print(y_test_pred[-2:])
# plt.plot(aaa_future)
# plt.show()
print()
print("|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||")
print()
input()
predict_test = pd.DataFrame(y_train_pred)
original_test = pd.DataFrame(y_train)
# predict_test = pd.DataFrame(y_train_pred)
# original_test = pd.DataFrame(y_train)
testPredictPlot = np.empty_like(df)
testPredictPlot[:,:] = np.nan
print("Our Problem child :")
print("[1]", str(len(y_test_pred)+lookback-1))
print("[2]", str(len(df)-1))
print("[3]", str(len(testPredictPlot)))
print("[ y_test_pre] ", str(len(y_test_pred)))
print(testPredictPlot.shape)
print(testPredictPlot[-10:])
print("")
# testPredictPlot[len(y_train_pred)+lookback-1:len(df)-3, :] = y_test_pred
testPredictPlot[len(y_train_pred):len(df)-16, :] = test_predict
fill_tail = np.empty_like(df)
fill_tail[:, :] = np.nan
z_future_pred = np.append(fill_tail, fill_tail, axis=1)
# z_future_pred = np.append(z_future_pred, fill_tail, axis=1)
z_future_pred = np.empty_like(z_future_pred)
print(fill_tail.shape)
print(z_future_pred.shape)
# print(z_future_pred[-10:])
X_future = np.empty_like(X_test[:1])
print("========= Future Days =======")
X_future[:,:] = np.nan
print(" Original shape : ", X_future.shape)
print(X_test[-1:])
X_future = np.append(X_test, X_future[lookback:], axis=0)
# print(X_future[-10:])
print("Aftern append values : ", X_future.shape)
X_future = torch.from_numpy(X_future[:lookback]).type(torch.Tensor)
print("========= Future Days =======")
print(X_future[-1:])
print(X_future.shape)
z_future_pred = model(X_future)
z_future_pred = scaler.inverse_transform(z_future_pred.detach().numpy())
# future_predictPlot[len(testPredictPlot)] =
original_df = scaler.inverse_transform(df)
# AAAAAAAAAAAAAAA
bbb_future = np.empty_like(df)
bbb_future[:,:] = np.nan
bbb_future[-(lookback):, :] = aaa_future
predictions = np.append(trainPredictPlot, testPredictPlot, axis=1)
predictions = np.append(predictions, original_df, axis=1)
predictions = np.append(predictions, bbb_future, axis=1)
# predictions = np.append(predictions, z_future_pred, axis=1)
result = pd.DataFrame(predictions)
print(result[-6:])
print(real_data[-6:])
input()
# plt.subplot(2,1,2)
for i in range(1):
plt.title("Train and Validation")
# plt.plot(result.index, result[int(input("Index for Original : "))], label="Original", color="gray", linestyle="--", linewidth=2, alpha=0.3)
# plt.plot(result.index, result[int(input("Index for Train : "))], label="Train", color="blue", marker=".", linewidth=1)
# plt.plot(result.index, result[int(input("Index for Test : "))], label="Test", color="red", marker=".", linewidth=1)
# plt.plot(result.index, result[int(input("Index for Test : "))], label="AAA", color="yellow", marker="o", linewidth=1)
# Bar
# plt.bar(result.index, result[int(input("Index for Original : "))], label="Original", color="gray", alpha=0.3)
# plt.bar(result.index, result[int(input("Index for Train : "))], label="Train", color="blue")
# plt.bar(result.index, result[int(input("Index for Test : "))], label="Test", color="red")
# plt.bar(result.index, result[int(input("Index for Test : "))], label="AAA", color="cyan")
plt.bar(result.index, result[8], label="Original", color="gray", alpha=0.3)
plt.bar(result.index, result[2], label="Train", color="blue")
plt.bar(result.index, result[5], label="Test", color="red")
plt.bar(result.index, result[11], label="Prediction", color="cyan")
# plt.plot(real_data.index, real_data[int(input("Index for real data"))], label="aaaaa")
# plt.plot(result.index, result[3], color="red", marker=".", linewidth=1)
# plt.xticks(range(0,data.shape[0],500),df['Close'][0].loc[::500],rotation=45)
plt.legend()
plt.show()
filler = np.empty_like(y_test_pred[:lookback-2])
# filler[:,:] = np.nan
print("====== Simple Forecasting =========")
future_all = np.append(filler, y_train_pred, axis=0)
future_all = np.append(future_all, y_test_pred, axis=0)
future_all = np.append(future_all, z_future_pred, axis=0)
future_all = pd.DataFrame(future_all)
# plt.plot(original_df)
plt.plot(original_df[6], color="gray", linestyle="--", linewidth=2, alpha=0.3)
print(future_all.head())
plt.plot(future_all.index, future_all[0], marker=".")
plt.title("Original data & Train+Test+Prediction Data")
# plt.plot(future_all.index, future_all[1], marker=".")
# plt.plot(future_all.index, future_all[2], marker=".")
plt.show()
print(result.tail())
if not os.path.exists("./models"):
os.makedirs("./models")
torch.save(model.state_dict(),"./models/my_model")
m = torch.jit.script(model)
m.save("./models/my_model.pt")
# writer.add_graph(model)