Anotação de dados na análise de série temporal (Parte 3): Exemplo de uso de anotação de dados
Introdução
O artigo descreve como usar PyTorch Lightning e o framework PyTorch Forecasting através da plataforma de negociação MetaTrader 5 para implementar a previsão de séries temporais financeiras baseadas em redes neurais.
Neste artigo, também explicamos os motivos pelos quais escolhemos essas duas plataformas e o formato de dados que usamos.
Quanto aos dados, você pode usar os dados obtidos por meio da anotação de dados de meus dois artigos anteriores. Uma vez que eles têm o mesmo formato, você pode facilmente expandi-los, seguindo a metodologia descrita neste artigo.
Links para os dois artigos anteriores:
- Anotação de dados na análise de série temporal (Parte 1): Criação de um conjunto de dados com rótulos de tendência usando um gráfico EA
- Anotação de dados na análise de série temporal (Parte 2): Criação de conjuntos de dados com rótulos de tendência usando Python
Conteúdo:
- Introdução
- Algumas bibliotecas importantes do Python
- Inicialização
- Reescrevendo a classe pytorch_forecasting.TimeSeriesDataSet
- Criando conjuntos de dados para treinamento e validação
- Criando e treinando o modelo
- Definindo a lógica de execução
- Considerações finais
Algumas bibliotecas importantes do Python
Primeiramente, vamos apresentar as principais bibliotecas Python que usaremos.
1. PyTorch Lightning
PyTorch Lightning é um ambiente de aprendizado profundo, especialmente projetado para pesquisadores e engenheiros de inteligência artificial e aprendizado de máquina que precisam de máxima flexibilidade sem comprometer a escalabilidade.
A ideia principal é separar o código acadêmico (por exemplo, definições de modelos, propagação/retropropagação, otimizadores, validação etc.) do código de engenharia (por exemplo, laços for, mecanismos de salvamento, logs do TensorBoard, estratégias de treinamento etc.), resultando em um código mais organizado e compreensível.
Entre as principais vantagens estão:
- Alta reusabilidade. A estrutura permite reutilizar o código em vários projetos.
- Facilidade de manutenção. Graças ao design estruturado, manter o código se torna mais simples.
- Lógica clara. Com a abstração do código de engenharia padrão, o código de aprendizado de máquina se torna mais fácil de identificar e compreender.
No geral, o PyTorch Lightning é uma biblioteca extremamente poderosa que oferece um método eficiente para elaborar e gerenciar seu código PyTorch. Além disso, ele proporciona uma abordagem estruturada para resolver tarefas comuns, mas complexas, como treinamento, validação e teste do modelo.
O uso detalhado dessa biblioteca pode ser encontrado na documentação oficial: https://lightning.ai/docs.2. PyTorch Forecasting
Trata-se de uma biblioteca Python especificamente desenvolvida para a previsão de séries temporais. Como é baseada no PyTorch, você pode aproveitar as poderosas bibliotecas de diferenciação automática e otimização do PyTorch, bem como desfrutar da usabilidade que o PyTorch Forecasting oferece para a previsão de séries temporais.
No PyTorch Forecasting, você pode encontrar implementações de vários modelos de previsão, incluindo, entre outros, modelos de autorregressão (AR, ARIMA), modelos de espaço de estados (SARIMAX), redes neurais (LSTM, GRU) e métodos de ensemble (Prophet, N-Beats). Isso significa que você pode experimentar e comparar diferentes abordagens de previsão no mesmo ambiente sem a necessidade de escrever extenso código padrão para cada abordagem.
A biblioteca também oferece uma série de ferramentas de pré-processamento de dados, que podem ajudar a resolver tarefas típicas em séries temporais. Essas ferramentas incluem, entre outras, a substituição de valores ausentes, escalonamento, extração de características e transformações de janela deslizante. Isso significa que você pode se concentrar mais no desenvolvimento e otimização do seu modelo, sem gastar muito tempo no processamento de dados.
A biblioteca também possui uma interface unificada para avaliação de desempenho do modelo. Ela implementa funções de perda e métricas de validação para séries temporais, como QuantileLoss e SMAPE, e também suporta metodologias de treinamento como parada precoce e validação cruzada. Isso permite que você monitore e melhore a performance do seu modelo de maneira mais simples.
Se você está procurando uma maneira de aumentar a eficiência e a facilidade de manutenção do seu projeto de previsão de séries temporais, então o PyTorch Forecasting pode ser uma excelente escolha. A biblioteca oferece meios eficazes e flexíveis para organizar e gerenciar seu código PyTorch, permitindo que você se concentre no aspecto mais importante: o próprio modelo de aprendizado de máquina.
Uma descrição detalhada do uso da biblioteca pode ser encontrada na documentação oficial: https://pytorch-forecasting.readthedocs.io/en/stable.
3. Modelo N-HiTS
O modelo N-HiTS resolve os problemas de volatilidade das previsões e complexidade computacional em previsões de longo prazo através da implementação de métodos inovadores de interpolação hierárquica e amostragem de dados com várias taxas. Isso permite que o modelo N-HiTS aproxime eficientemente um intervalo de previsão de qualquer comprimento.
Além disso, experimentos abrangentes realizados em conjuntos de dados de grande escala mostraram que o modelo N-HiTS aumenta a precisão em média quase 20% em comparação com a última arquitetura Transformer, além de reduzir em 50 vezes o tempo de cálculo.
Link para o artigo: https://doi.org/10.48550/arXiv.2201.12886.
Inicialização
Primeiramente, precisamos importar as bibliotecas necessárias. Entre essas bibliotecas estão o MetaTrader 5 (para interagir com o terminal de mesmo nome), PyTorch Lightning (para treinar o modelo) e algumas outras bibliotecas para processamento e visualização de dados.
import MetaTrader5 as mt5 import lightning.pytorch as pl from lightning.pytorch.callbacks import EarlyStopping import matplotlib.pyplot as plt import pandas as pd from pytorch_forecasting import Baseline, NHiTS, TimeSeriesDataSet from pytorch_forecasting.data import NaNLabelEncoder from pytorch_forecasting.metrics import MAE, SMAPE, MQF2DistributionLoss, QuantileLoss from lightning.pytorch.tuner import Tuner
Em seguida, precisamos inicializar o MetaTrader 5. Isso é feito chamando a função mt.initialize(). Se você não conseguir inicializá-lo apenas usando isso, você precisará passar o caminho para o terminal MetaTrader 5 como um parâmetro desta função (no exemplo "D:\Project\mt\MT5\terminal64.exe" — esse é o meu caminho, no aplicativo real você precisará definir seu próprio caminho). Se a inicialização for bem-sucedida, a função retornará True, caso contrário, retornará False.
if not mt.initialize("D:\\Project\\mt\\MT5\\terminal64.exe"): print('initialize() failed!') else: print(mt.version())
A função mt.symbols_total() é usada para obter o número total de símbolos negociáveis disponíveis no terminal MetaTrader 5. Podemos usá-la para determinar se podemos obter os dados corretamente. Se o número total for maior que 0, podemos usar a função mt.copy_rates_from_pos() para obter os dados históricos do símbolo especificado. Neste exemplo, obtivemos os dados mais recentes sobre o comprimento do período "mt_data_len" M15 (15 minutos) do símbolo GOLD_micro.
sb=mt.symbols_total() rts=None if sb > 0: rts=mt.copy_rates_from_pos("GOLD_micro",mt.TIMEFRAME_M15,0,mt_data_len) mt.shutdown()
Por fim, usamos a função mt.shutdown() para fechar a conexão com o terminal MetaTrader 5 e converter os dados recebidos para o formato Pandas DataFrame.
mt.shutdown() rts_fm=pd.DataFrame(rts)
Agora, vamos considerar como pré-processar os dados obtidos do terminal.
Primeiro, precisamos converter os rótulos temporais em datas:
rts_fm['time']=pd.to_datetime(rts_fm['time'], unit='s')
Aqui, não descrevemos mais como anotar os dados. Você pode encontrar métodos nos meus dois artigos anteriores (os links estão na introdução deste artigo). Para uma breve demonstração de como usar modelos de previsão, simplesmente dividimos todos os fragmentos de dados por max_encoder_length+2max_prediction_length em grupos. Em cada grupo, há uma sequência de 0 a "max_encoder_length+2max_prediction_length-1". Vamos preenchê-la. Dessa forma, adicionamos os rótulos necessários aos dados de entrada. Primeiro, precisamos converter o índice de tempo inicial (ou seja, o índice do DataFrame). Vamos calcular o resto do índice de tempo inicial, dividido por (max_encoder_length+2max_prediction_length), e usar o resultado como o novo índice de tempo. Associaremos o índice de tempo ao intervalo de 0 a "max_encoder_length+2max_prediction_length-1":
rts_fm['time_idx']= rts_fm.index%(max_encoder_length+2*max_prediction_length)
Também precisamos converter o índice de tempo inicial em um grupo. Calcularemos o índice de tempo inicial, dividido por "max_encoder_length+2*max_prediction_length", e usaremos o resultado como o novo grupo:
rts_fm['series']=rts_fm.index//(max_encoder_length+2*max_prediction_length)
Encapsularemos a parte de pré-processamento de dados em uma função. Precisamos passar para ela apenas o comprimento dos dados que precisamos recuperar, e ela poderá completar o trabalho de pré-processamento de dados:
def get_data(mt_data_len:int): if not mt.initialize("D:\\Project\\mt\\MT5\\terminal64.exe"): print('initialize() failed!') else: print(mt.version()) sb=mt.symbols_total() rts=None if sb > 0: rts=mt.copy_rates_from_pos("GOLD_micro",mt.TIMEFRAME_M15,0,mt_data_len) mt.shutdown() # print(len(rts)) rts_fm=pd.DataFrame(rts) rts_fm['time']=pd.to_datetime(rts_fm['time'], unit='s') rts_fm['time_idx']= rts_fm.index%(max_encoder_length+2*max_prediction_length) rts_fm['series']=rts_fm.index//(max_encoder_length+2*max_prediction_length) return rts_fm
Reescrevendo a classe pytorch_forecasting.TimeSeriesDataSet
Reescrevendo a função to_dataloader() em pytorch_forecasting. Isso permite controlar se os dados serão misturados e se o último grupo do pacote será descartado (principalmente para evitar erros imprevisíveis causados por um comprimento insuficiente do último grupo de dados). Veja como você pode fazer isso:
class New_TmSrDt(TimeSeriesDataSet): def to_dataloader(self, train: bool = True, batch_size: int = 64, batch_sampler: Sampler | str = None, shuffle:bool=False, drop_last:bool=False, **kwargs) -> DataLoader: default_kwargs = dict( shuffle=shuffle, drop_last=drop_last, #modification collate_fn=self._collate_fn, batch_size=batch_size, batch_sampler=batch_sampler, ) default_kwargs.update(kwargs) kwargs = default_kwargs if kwargs["batch_sampler"] is not None: sampler = kwargs["batch_sampler"] if isinstance(sampler, str): if sampler == "synchronized": kwargs["batch_sampler"] = TimeSynchronizedBatchSampler( SequentialSampler(self), batch_size=kwargs["batch_size"], shuffle=kwargs["shuffle"], drop_last=kwargs["drop_last"], ) else: raise ValueError(f"batch_sampler {sampler} unknown - see docstring for valid batch_sampler") del kwargs["batch_size"] del kwargs["shuffle"] del kwargs["drop_last"] return DataLoader(self,**kwargs)Este código cria uma nova classe New_TmSrDt, que herda de TimeSeriesDataSet. Em seguida, nesta nova classe, a função to_dataloader() é sobrescrita para incluir os parâmetros shuffle e drop_last. Dessa forma, você pode controlar melhor o processo de carregamento dos dados. Não esqueça de substituir as instâncias de TimeSeriesDataSet por New_TmSrDt no código.
Criando conjuntos de dados para treinamento e validação
Primeiramente, precisamos definir um ponto de corte para os dados de treinamento. Isso é feito subtraindo o comprimento máximo de previsão do valor máximo de time_idx.
max_encoder_length = 2*96 max_prediction_length = 30 training_cutoff = rts_fm["time_idx"].max() - max_prediction_length
Então usamos a classe New_TmSrDt (que é a classe TimeSeriesDataSet reescrita por nós) para criar o conjunto de dados de treinamento. Essa classe requer os seguintes parâmetros:
- DataFrame (neste caso, rts_fm)
- A coluna time_idx, que é uma sequência contínua de números inteiros.
- A coluna alvo (neste caso, close), que é o valor que queremos prever.
- A coluna do grupo (neste caso, series), que representa diferentes séries temporais.
- Os comprimentos máximos do codificador e do preditor
context_length = max_encoder_length prediction_length = max_prediction_length training = New_TmSrDt( data[lambda x: x.time_idx <= training_cutoff], time_idx="time_idx", target="close", categorical_encoders={"series":NaNLabelEncoder().fit(data.series)}, group_ids=["series"], time_varying_unknown_reals=["close"], max_encoder_length=context_length, # min_encoder_length=max_encoder_length//2, max_prediction_length=prediction_length, # min_prediction_length=1, ) validation = New_TmSrDt.from_dataset(training, data, min_prediction_idx=training_cutoff + 1)
Em seguida, usamos a função New_TmSrDt.from_dataset() para criar um conjunto de dados de validação. Esta função requer os seguintes parâmetros:
- Conjunto de dados de treinamento
- DataFrame
- O índice mínimo de previsão, que deve ser maior em um do que o valor máximo de time_idx dos dados de treinamento.
validation = New_TmSrDt.from_dataset(training, rts_fm, min_prediction_idx=training_cutoff + 1)
Finalmente, usamos a função to_dataloader() para converter os conjuntos de dados de treinamento e validação em objetos PyTorch DataLoader. Esta função requer os seguintes parâmetros:
- O parâmetro train, que indica se os dados devem ser embaralhados.
- O parâmetro batch_size, que especifica o número de amostras em um lote.
- O parâmetro num_workers, que indica o número de processos de trabalho para carregar os dados.
train_dataloader = training.to_dataloader(train=True, shuffle=t_shuffle, drop_last=t_drop_last, batch_size=batch_size, num_workers=0,) val_dataloader = validation.to_dataloader(train=False, shuffle=v_shuffle, drop_last=v_drop_last, batch_size=batch_size, num_workers=0)
Finalmente, encapsulamos esta parte do código em uma função spilt_data(data:pd.DataFrame,t_drop_last:bool,t_shuffle:bool,v_drop_last:bool,v_shuffle:bool) e especificamos os seguintes parâmetros:
- O parâmetro data é usado para obter o conjunto de dados que precisa ser processado.
- O parâmetro t_drop_last indica se o último grupo do conjunto de dados de treinamento deve ser removido.
- O parâmetro t_shuffle indica se os dados de treinamento devem ser misturados.
- O parâmetro v_drop_last indica se o último grupo do conjunto de dados de validação deve ser removido.
- O parâmetro v_shuffle indica se os dados de validação devem ser embaralhados.
train_dataloader (instância do dataloader para o conjunto de dados de treinamento), val_dataloader (instância do dataloader para o conjunto de dados de validação) e training (instância do TimeSeriesDataSet para o conjunto de dados) são usados como valores de retorno desta função, pois serão utilizados posteriormente.
def spilt_data(data:pd.DataFrame, t_drop_last:bool, t_shuffle:bool, v_drop_last:bool, v_shuffle:bool): training_cutoff = data["time_idx"].max() - max_prediction_length #max:95 context_length = max_encoder_length prediction_length = max_prediction_length training = New_TmSrDt( data[lambda x: x.time_idx <= training_cutoff], time_idx="time_idx", target="close", categorical_encoders={"series":NaNLabelEncoder().fit(data.series)}, group_ids=["series"], time_varying_unknown_reals=["close"], max_encoder_length=context_length, # min_encoder_length=max_encoder_length//2, max_prediction_length=prediction_length, # min_prediction_length=1, ) validation = New_TmSrDt.from_dataset(training, data, min_prediction_idx=training_cutoff + 1) train_dataloader = training.to_dataloader(train=True, shuffle=t_shuffle, drop_last=t_drop_last, batch_size=batch_size, num_workers=0,) val_dataloader = validation.to_dataloader(train=False, shuffle=v_shuffle, drop_last=v_drop_last, batch_size=batch_size, num_workers=0) return train_dataloader,val_dataloader,training
Criando e treinando o modelo
Vamos iniciar a criação do modelo NHiTS. Esta parte mostrará como configurar seus parâmetros e como treiná-lo.
1. Encontramos a melhor taxa de aprendizado
Antes de começarmos a criar o modelo, usamos o objeto Tuner do PyTorch Lightning para encontrar a melhor taxa de aprendizado.
Primeiramente, precisamos criar um objeto Trainer, onde o parâmetro accelerator é usado para indicar o tipo de dispositivo, e gradient_clip_val é usado para prevenir a explosão do gradiente.
pl.seed_everything(42) trainer = pl.Trainer(accelerator="cpu", gradient_clip_val=0.1)
Em seguida, usamos a função NHiTS.from_dataset() para criar a rede do modelo NHiTS. Esta função requer os seguintes parâmetros:
- Conjunto de dados de treinamento
- Taxa de aprendizado
- Decaimento de peso
- Função de perda
- Tamanho da camada oculta
- Otimizador
net = NHiTS.from_dataset( training, learning_rate=3e-2, weight_decay=1e-2, loss=MQF2DistributionLoss(prediction_length=max_prediction_length), backcast_loss_ratio=0.0, hidden_size=64, optimizer="AdamW", )
Depois, criamos uma instância da classe Tuner e chamamos a função lr_find(). Ela treinará o modelo com várias taxas de aprendizado baseadas em nosso conjunto de dados e comparará as perdas de cada taxa de aprendizado para encontrar a melhor delas.
res = Tuner(trainer).lr_find( net, train_dataloaders=train_dataloader, val_dataloaders=val_dataloader, min_lr=1e-5, max_lr=1e-1 ) lr_=res.suggestion()
De forma similar, encapsulamos essa parte do código que obtém a melhor taxa de aprendizado em uma função get_learning_rate() e fazemos da taxa de aprendizado obtida seu valor de retorno:
def get_learning_rate(): pl.seed_everything(42) trainer = pl.Trainer(accelerator="cpu", gradient_clip_val=0.1,logger=False) net = NHiTS.from_dataset( training, learning_rate=3e-2, weight_decay=1e-2, loss=MQF2DistributionLoss(prediction_length=max_prediction_length), backcast_loss_ratio=0.0, hidden_size=64, optimizer="AdamW", ) res = Tuner(trainer).lr_find( net, train_dataloaders=t_loader, val_dataloaders=v_loader, min_lr=1e-5, max_lr=1e-1 ) lr_=res.suggestion() return lr_
Se quiser visualizar a taxa de aprendizado, você pode adicionar o seguinte código:
print(f"suggested learning rate: {res.suggestion()}")
fig = res.plot(show=True, suggest=True)
fig.show()
O resultado neste exemplo é o seguinte:
Taxa de aprendizado recomendada: 0.003981071705534973.
2. Definição do EarlyStopping Callback
Esse callback é principalmente usado para monitorar as perdas de validação e parar o treinamento quando as perdas não melhoram ao longo de várias épocas consecutivas. Isso pode prevenir o sobreajuste do modelo.
early_stop_callback = EarlyStopping(monitor="val_loss", min_delta=1e-4, patience=10, verbose=True, mode="min")
Aqui, é importante notar o parâmetro patience, que essencialmente define quando parar durante o treinamento, se as perdas não diminuírem ao longo de várias épocas consecutivas. Nós o definimos como 10.
3. Definição do Callback ModelCheckpoint
Esse callback é principalmente usado para gerenciar o salvamento do modelo e o nome do arquivo salvo. Principalmente, definimos as seguintes duas variáveis.
ck_callback=ModelCheckpoint(monitor='val_loss', mode="min", save_top_k=1, filename='{epoch}-{val_loss:.2f}')
save_top_k é usado para controlar o salvamento de múltiplos melhores modelos. Definimos o valor como 1 e mantemos apenas o melhor modelo.
4. Definição do Training Model
Primeiramente, precisamos criar uma instância da classe Trainer em Lightning.pytorch e adicionar os dois callbacks que definimos anteriormente.
trainer = pl.Trainer( max_epochs=ep, accelerator="cpu", enable_model_summary=True, gradient_clip_val=1.0, callbacks=[early_stop_callback,ck_callback], limit_train_batches=30, enable_checkpointing=True, )
Aqui, precisamos prestar atenção aos parâmetros max_epochs (número máximo de épocas de treinamento), gradient_clip_val (usado para prevenir a explosão do gradiente) e callbacks. Aqui, max_epochs usa a variável global ep, que definiremos mais tarde, e callbacks é nossa coleção de callbacks.
Em seguida, também precisamos definir o modelo NHiTS e criar sua instância:
net = NHiTS.from_dataset( training, learning_rate=lr, log_interval=10, log_val_interval=1, weight_decay=1e-2, backcast_loss_ratio=0.0, hidden_size=64, optimizer="AdamW", loss=MQF2DistributionLoss(prediction_length=max_prediction_length), )
Aqui, não é necessário alterar os parâmetros, basta usar os valores padrão. Aqui, apenas definimos loss para a função de perda MQF2DistributionLoss.
5. Módulo de treinamento
Usamos a função fit() do objeto Trainer para treinar o modelo:
trainer.fit( net, train_dataloaders=train_dataloader, val_dataloaders=val_dataloader, )
Da mesma forma, encapsulamos essa parte do código em uma função train():
def train(): early_stop_callback = EarlyStopping(monitor="val_loss", min_delta=1e-4, patience=10, # The number of times without improvement will stop verbose=True, mode="min") ck_callback=ModelCheckpoint(monitor='val_loss', mode="min", save_top_k=1, # Save the top few best ones filename='{epoch}-{val_loss:.2f}') trainer = pl.Trainer( max_epochs=ep, accelerator="cpu", enable_model_summary=True, gradient_clip_val=1.0, callbacks=[early_stop_callback,ck_callback], limit_train_batches=30, enable_checkpointing=True, ) net = NHiTS.from_dataset( training, learning_rate=lr, log_interval=10, log_val_interval=1, weight_decay=1e-2, backcast_loss_ratio=0.0, hidden_size=64, optimizer="AdamW", loss=MQF2DistributionLoss(prediction_length=max_prediction_length), ) trainer.fit( net, train_dataloaders=t_loader, val_dataloaders=v_loader, # ckpt_path='best' ) return trainer
Essa função retornará o modelo treinado, que você pode usar para tarefas de previsão.
Definindo a lógica de execução
1. Definição de variáveis globais:
ep=200 __train=False mt_data_len=200000 max_encoder_length = 2*96 max_prediction_length = 30 batch_size = 128
__train é usado para controlar se estamos treinando ou testando o modelo no momento.
Vale destacar que ep é usado para gerenciar a época máxima de treinamento. Como definimos EarlyStopping, esse valor pode ser aumentado, pois o modelo automaticamente parará quando a convergência cessar.
mt_data_len é o número dos últimos dados de séries temporais recebidos do cliente.
max_encoder_length e max_prediction_length são, respectivamente, o comprimento máximo de codificação e o comprimento máximo de previsão.
2. Treinamento
Também precisamos salvar os resultados ótimos atuais do treinamento em um arquivo local após a conclusão do treinamento, por isso definimos um arquivo json para salvar essas informações:
info_file='results.json'
Para tornar nosso processo de treinamento mais claro, precisamos evitar a exibição de informações de aviso desnecessárias durante o treinamento, então adicionaremos o seguinte código:
warnings.filterwarnings("ignore")
Segue nossa lógica de treinamento:
dt=get_data(mt_data_len=mt_data_len) if __train: # print(dt) # dt=get_data(mt_data_len=mt_data_len) t_loader,v_loader,training=spilt_data(dt, t_shuffle=False,t_drop_last=True, v_shuffle=False,v_drop_last=True) lr=get_learning_rate() trainer__=train() m_c_back=trainer__.checkpoint_callback m_l_back=trainer__.early_stopping_callback best_m_p=m_c_back.best_model_path best_m_l=m_l_back.best_score.item() # print(best_m_p) if os.path.exists(info_file): with open(info_file,'r+') as f1: last=json.load(fp=f1) last_best_model=last['last_best_model'] last_best_score=last['last_best_score'] if last_best_score > best_m_l: last['last_best_model']=best_m_p last['last_best_score']=best_m_l json.dump(last,fp=f1) else: with open(info_file,'w') as f2: json.dump(dict(last_best_model=best_m_p,last_best_score=best_m_l),fp=f2)
Quando o treinamento estiver concluído, você poderá encontrar a localização da nossa melhor modelo e melhor resultado no arquivo results.json no diretório raiz.
Durante o processo de treinamento, você verá um indicador de progresso que mostra o progresso de cada época.
Treinamento:
Treinamento concluído:
3. Verificação do modelo
Após o treinamento, queremos validar o modelo e visualizá-lo. Podemos adicionar o seguinte código:
best_model = NHiTS.load_from_checkpoint(best_m_p) predictions = best_model.predict(v_loader, trainer_kwargs=dict(accelerator="cpu",logger=False), return_y=True) raw_predictions = best_model.predict(v_loader, mode="raw", return_x=True, trainer_kwargs=dict(accelerator="cpu",logger=False)) for idx in range(10): # plot 10 examples best_model.plot_prediction(raw_predictions.x, raw_predictions.output, idx=idx, add_loss_to_title=True) # sample 500 paths samples = best_model.loss.sample(raw_predictions.output["prediction"][[0]], n_samples=500)[0] # plot prediction fig = best_model.plot_prediction(raw_predictions.x, raw_predictions.output, idx=0, add_loss_to_title=True) ax = fig.get_axes()[0] # plot first two sampled paths ax.plot(samples[:, 0], color="g", label="Sample 1") ax.plot(samples[:, 1], color="r", label="Sample 2") fig.legend() plt.show()
Você também pode usar o TensorBoard para visualizar o processo em tempo real durante o treinamento.
Resultado:
4. Teste do modelo
Primeiro, abrimos o arquivo json para encontrar o local ótimo de armazenamento do modelo:
with open(info_file) as f: best_m_p=json.load(fp=f)['last_best_model'] print('model path is:',best_m_p)
Então, carregamos o modelo:
best_model = NHiTS.load_from_checkpoint(best_m_p)
Em seguida, obtemos dados em tempo real do cliente para testar o modelo:
offset=1 dt=dt.iloc[-max_encoder_length-offset:-offset,:] last_=dt.iloc[-1] #get the last group of data # print(len(dt)) for i in range(1,max_prediction_length+1): dt.loc[dt.index[-1]+1]=last_ dt['series']=0 # dt['time_idx']=dt.apply(lambda x:x.index,args=1) dt['time_idx']=dt.index-dt.index[0] # dt=get_data(mt_data_len=max_encoder_length) predictions=best_model.predict(dt,mode='raw',trainer_kwargs=dict(accelerator="cpu",logger=False),return_x=True) best_model.plot_prediction(predictions.x,predictions.output,show_future_observed=False) plt.show()
E aqui está o resultado:
5. Avaliação do modelo
Claro, podemos usar algumas métricas da biblioteca PyTorch Forecasting para avaliar o desempenho do modelo. Aqui está como avaliar usando o erro absoluto médio (MAE) e o Erro Absoluto Percentual Médio Simétrico (SMAPE), e exibir os resultados da avaliação:
from pytorch_forecasting.metrics import MAE, SMAPE mae = MAE()(raw_predictions["prediction"], raw_predictions["target"]) print(f"Mean Absolute Error: {mae}") smape = SMAPE()(raw_predictions["prediction"], raw_predictions["target"]) print(f"Symmetric Mean Absolute Percentage Error: {smape}")
Neste trecho de código, primeiro importamos as métricas MAE e SMAPE. Então, usamos essas métricas para calcular o erro entre os valores previstos (raw_predictions["prediction"]) e os valores reais (raw_predictions["target"]). Estas métricas podem nos ajudar a ver o desempenho do nosso modelo e indicar direções para sua melhoria contínua.
Considerações finais
Neste artigo, exploramos como usar os dados de rótulos mencionados nos dois artigos anteriores e demonstramos como criar um modelo N-HiTS usando nossos dados. Depois, treinamos e verificamos o modelo. Como vimos, alcançamos bons resultados. Também demonstramos como usar este modelo no MetaTrader 5 para previsão de 30 velas. Claro, não mencionamos como colocar ordens com base nos resultados de previsão, pois a negociação real requer que os leitores realizem uma grande quantidade de testes de acordo com sua situação real e estabeleçam regras de negociação apropriadas.Obrigado pela atenção!
Anexo:
Código completo:
# Copyright 2021, MetaQuotes Ltd. # https://www.mql5.com # from typing import Union import lightning.pytorch as pl import os from lightning.pytorch.callbacks import EarlyStopping,ModelCheckpoint import matplotlib.pyplot as plt import numpy as np import pandas as pd # import torch from pytorch_forecasting import NHiTS, TimeSeriesDataSet from pytorch_forecasting.data import NaNLabelEncoder,timeseries from pytorch_forecasting.metrics import MQF2DistributionLoss from pytorch_forecasting.data.samplers import TimeSynchronizedBatchSampler from lightning.pytorch.tuner import Tuner import MetaTrader5 as mt import warnings import json from torch.utils.data import DataLoader from torch.utils.data.sampler import Sampler,SequentialSampler class New_TmSrDt(TimeSeriesDataSet): ''' rewrite dataset class ''' def to_dataloader(self, train: bool = True, batch_size: int = 64, batch_sampler: Sampler | str = None, shuffle:bool=False, drop_last:bool=False, **kwargs) -> DataLoader: default_kwargs = dict( shuffle=shuffle, # drop_last=train and len(self) > batch_size, drop_last=drop_last, # collate_fn=self._collate_fn, batch_size=batch_size, batch_sampler=batch_sampler, ) default_kwargs.update(kwargs) kwargs = default_kwargs # print(kwargs['drop_last']) if kwargs["batch_sampler"] is not None: sampler = kwargs["batch_sampler"] if isinstance(sampler, str): if sampler == "synchronized": kwargs["batch_sampler"] = TimeSynchronizedBatchSampler( SequentialSampler(self), batch_size=kwargs["batch_size"], shuffle=kwargs["shuffle"], drop_last=kwargs["drop_last"], ) else: raise ValueError(f"batch_sampler {sampler} unknown - see docstring for valid batch_sampler") del kwargs["batch_size"] del kwargs["shuffle"] del kwargs["drop_last"] return DataLoader(self,**kwargs) def get_data(mt_data_len:int): if not mt.initialize(): print('initialize() failed!') else: print(mt.version()) sb=mt.symbols_total() rts=None if sb > 0: rts=mt.copy_rates_from_pos("GOLD_micro",mt.TIMEFRAME_M15,0,mt_data_len) mt.shutdown() # print(len(rts)) rts_fm=pd.DataFrame(rts) rts_fm['time']=pd.to_datetime(rts_fm['time'], unit='s') rts_fm['time_idx']= rts_fm.index%(max_encoder_length+2*max_prediction_length) rts_fm['series']=rts_fm.index//(max_encoder_length+2*max_prediction_length) return rts_fm def spilt_data(data:pd.DataFrame, t_drop_last:bool, t_shuffle:bool, v_drop_last:bool, v_shuffle:bool): training_cutoff = data["time_idx"].max() - max_prediction_length #max:95 context_length = max_encoder_length prediction_length = max_prediction_length training = New_TmSrDt( data[lambda x: x.time_idx <= training_cutoff], time_idx="time_idx", target="close", categorical_encoders={"series":NaNLabelEncoder().fit(data.series)}, group_ids=["series"], time_varying_unknown_reals=["close"], max_encoder_length=context_length, # min_encoder_length=max_encoder_length//2, max_prediction_length=prediction_length, # min_prediction_length=1, ) validation = New_TmSrDt.from_dataset(training, data, min_prediction_idx=training_cutoff + 1) train_dataloader = training.to_dataloader(train=True, shuffle=t_shuffle, drop_last=t_drop_last, batch_size=batch_size, num_workers=0,) val_dataloader = validation.to_dataloader(train=False, shuffle=v_shuffle, drop_last=v_drop_last, batch_size=batch_size, num_workers=0) return train_dataloader,val_dataloader,training def get_learning_rate(): pl.seed_everything(42) trainer = pl.Trainer(accelerator="cpu", gradient_clip_val=0.1,logger=False) net = NHiTS.from_dataset( training, learning_rate=3e-2, weight_decay=1e-2, loss=MQF2DistributionLoss(prediction_length=max_prediction_length), backcast_loss_ratio=0.0, hidden_size=64, optimizer="AdamW", ) res = Tuner(trainer).lr_find( net, train_dataloaders=t_loader, val_dataloaders=v_loader, min_lr=1e-5, max_lr=1e-1 ) # print(f"suggested learning rate: {res.suggestion()}") lr_=res.suggestion() return lr_ def train(): early_stop_callback = EarlyStopping(monitor="val_loss", min_delta=1e-4, patience=10, verbose=True, mode="min") ck_callback=ModelCheckpoint(monitor='val_loss', mode="min", save_top_k=1, filename='{epoch}-{val_loss:.2f}') trainer = pl.Trainer( max_epochs=ep, accelerator="cpu", enable_model_summary=True, gradient_clip_val=1.0, callbacks=[early_stop_callback,ck_callback], limit_train_batches=30, enable_checkpointing=True, ) net = NHiTS.from_dataset( training, learning_rate=lr, log_interval=10, log_val_interval=1, weight_decay=1e-2, backcast_loss_ratio=0.0, hidden_size=64, optimizer="AdamW", loss=MQF2DistributionLoss(prediction_length=max_prediction_length), ) trainer.fit( net, train_dataloaders=t_loader, val_dataloaders=v_loader, # ckpt_path='best' ) return trainer if __name__=='__main__': ep=200 __train=False mt_data_len=200000 max_encoder_length = 2*96 max_prediction_length = 30 batch_size = 128 info_file='results.json' warnings.filterwarnings("ignore") dt=get_data(mt_data_len=mt_data_len) if __train: # print(dt) # dt=get_data(mt_data_len=mt_data_len) t_loader,v_loader,training=spilt_data(dt, t_shuffle=False,t_drop_last=True, v_shuffle=False,v_drop_last=True) lr=get_learning_rate() trainer__=train() m_c_back=trainer__.checkpoint_callback m_l_back=trainer__.early_stopping_callback best_m_p=m_c_back.best_model_path best_m_l=m_l_back.best_score.item() # print(best_m_p) if os.path.exists(info_file): with open(info_file,'r+') as f1: last=json.load(fp=f1) last_best_model=last['last_best_model'] last_best_score=last['last_best_score'] if last_best_score > best_m_l: last['last_best_model']=best_m_p last['last_best_score']=best_m_l json.dump(last,fp=f1) else: with open(info_file,'w') as f2: json.dump(dict(last_best_model=best_m_p,last_best_score=best_m_l),fp=f2) best_model = NHiTS.load_from_checkpoint(best_m_p) predictions = best_model.predict(v_loader, trainer_kwargs=dict(accelerator="cpu",logger=False), return_y=True) raw_predictions = best_model.predict(v_loader, mode="raw", return_x=True, trainer_kwargs=dict(accelerator="cpu",logger=False)) for idx in range(10): # plot 10 examples best_model.plot_prediction(raw_predictions.x, raw_predictions.output, idx=idx, add_loss_to_title=True) samples = best_model.loss.sample(raw_predictions.output["prediction"][[0]], n_samples=500)[0] # plot prediction fig = best_model.plot_prediction(raw_predictions.x, raw_predictions.output, idx=0, add_loss_to_title=True) ax = fig.get_axes()[0] # plot first two sampled paths ax.plot(samples[:, 0], color="g", label="Sample 1") ax.plot(samples[:, 1], color="r", label="Sample 2") fig.legend() plt.show() else: with open(info_file) as f: best_m_p=json.load(fp=f)['last_best_model'] print('model path is:',best_m_p) best_model = NHiTS.load_from_checkpoint(best_m_p) offset=1 dt=dt.iloc[-max_encoder_length-offset:-offset,:] last_=dt.iloc[-1] # print(len(dt)) for i in range(1,max_prediction_length+1): dt.loc[dt.index[-1]+1]=last_ dt['series']=0 # dt['time_idx']=dt.apply(lambda x:x.index,args=1) dt['time_idx']=dt.index-dt.index[0] # dt=get_data(mt_data_len=max_encoder_length) predictions = best_model.predict(dt, mode='raw',trainer_kwargs=dict(accelerator="cpu",logger=False),return_x=True) best_model.plot_prediction(predictions.x,predictions.output,show_future_observed=False) plt.show()
Traduzido do Inglês pela MetaQuotes Ltd.
Artigo original: https://www.mql5.com/en/articles/13255
- Aplicativos de negociação gratuitos
- 8 000+ sinais para cópia
- Notícias econômicas para análise dos mercados financeiros
Você concorda com a política do site e com os termos de uso