Marcado de datos en el análisis de series temporales (Parte 3): Ejemplo de uso del marcado de datos
Introducción
El presente artículo explica cómo utilizar PyTorch Lightning y el marco PyTorch Forecasting a través de la plataforma comercial MetaTrader 5 para implementar el pronóstico de series temporales financieras basada en redes neuronales.
En el artículo también explicaremos los motivos por los que hemos elegido estas dos plataformas y el formato de datos utilizado.
En cuanto a los datos, podemos usar los datos obtenidos marcando los datos de los dos artículos anteriores. Como comparten el mismo formato, podemos ampliarlo fácilmente siguiendo la metodología descrita en este artículo.
Enlaces a los dos artículos anteriores:
- Marcado de datos en el análisis de series temporales (Parte 1): Creamos un conjunto de datos con marcadores de tendencia utilizando el gráfico de un asesor
- Marcado de datos en el análisis de series temporales (Parte 2): Creando conjuntos de datos con marcadores de tendencias utilizando Python
Contenido:
- Introducción
- Algunas bibliotecas importantes de Python
- Inicialización
- Reescritura de la clase pytorch_forecasting.TimeSeriesDataSet
- Creación de conjuntos de datos para el entrenamiento y la prueba
- Creación y entrenamiento del modelo
- Definición de la lógica de ejecución
- Conclusión
Algunas bibliotecas importantes de Python
En primer lugar, vamos a introducir las bibliotecas básicas de Python que vamos a usar.
1. PyTorch Lightning
PyTorch Lightning es un entorno de aprendizaje profundo diseñado específicamente para investigadores profesionales de inteligencia artificial e ingenieros de aprendizaje automático que necesitan la máxima flexibilidad sin sacrificar la escalabilidad.
La idea básica consiste en separar el código académico (por ejemplo, las definiciones de modelos, la propagación directa/inversa, los optimizadores, la validación, etc.) del código de ingeniería (por ejemplo, los ciclos for, los mecanismos de persistencia, los diarios de registro de TensorBoard, las estrategias de aprendizaje, etc.), lo cual nos dará un código más racionalizado y comprensible.
Las principales ventajas son:
- Alta reutilización. El diseño permite reutilizar el código en distintos proyectos.
- Facilidad de mantenimiento. Su diseño estructurado facilita el mantenimiento del código.
- Lógica clara. Abstrayendo el código de ingeniería de las plantillas, el código de aprendizaje automático resulta más fácil de identificar y comprender.
En general, PyTorch Lightning supone una biblioteca extremadamente potente que ofrece un método eficiente de organizar y gestionar nuestro código PyTorch. También ofrece un enfoque estructurado de tareas comunes pero complejas, tales como el entrenamiento, la validación y la prueba de modelos.
Encontrará información detallada sobre el uso de esta biblioteca en la documentación oficial: https://lightning.ai/docs.2. PyTorch Forecasting
Es una biblioteca de Python diseñada específicamente para la previsión de series temporales. Como está construida sobre PyTorch, podemos utilizar las potentes bibliotecas de diferenciación automática y optimización de PyTorch, y aprovechar la comodidad que ofrece PyTorch Forecasting para la previsión de series temporales.
En PyTorch Forecasting podremos encontrar implementaciones de varios modelos de previsión, incluyendo (pero no solo) modelos autorregresivos (AR, ARIMA), modelos de espacio de estados (SARIMAX), redes neuronales (LSTM, GRU) y métodos de ensamblaje (Prophet, N-Beats). Esto significa que podemos experimentar y comparar diferentes enfoques predictivos en el mismo entorno sin tener que escribir un extenso código estándar para cada enfoque.
La biblioteca también ofrece una serie de herramientas de preprocesamiento de datos que pueden ayudarnos a resolver problemas típicos de series temporales. Estas herramientas incluyen la sustitución de valores perdidos, el escalado, la extracción de características y las transformaciones de ventanas deslizantes, entre otras cosas. Esto significa que podemos centrarnos más en desarrollar y optimizar nuestro modelo sin dedicar mucho tiempo a procesar datos.
La biblioteca también dispone de una interfaz unificada para evaluar el rendimiento de los modelos. Asimismo, implementa funciones de pérdida y métricas de validación para series temporales como QuantileLoss y SMAPE, y admite metodologías de aprendizaje como la detención temprana y la validación cruzada. Esto nos permite controlar y mejorar más cómodamente el rendimiento de nuestro modelo.
Si estamos buscando un método para mejorar la eficacia y la facilidad de mantenimiento de nuestro proyecto de previsión de series temporales, PyTorch Forecasting podría resultar una gran elección. La biblioteca ofrece herramientas eficientes y flexibles para organizar y gestionar nuestro código PyTorch, permitiéndonos centrarnos en el aspecto más importante: el propio modelo de aprendizaje automático.
Encontrará una descripción detallada de cómo usar la biblioteca en la documentación oficial: https://pytorch-forecasting.readthedocs.io/en/stable.
3. Modelo N-HiTS
El modelo N-HiTS resuelve los problemas de la volatilidad de las previsiones y la complejidad computacional de las previsiones a largo plazo introduciendo técnicas innovadoras de interpolación jerárquica y muestreo de datos multivariable. Esto permite al modelo N-HiTS aproximarse con eficacia a un rango de predicción de cualquier longitud.
Además, extensos experimentos realizados en conjuntos de datos a gran escala han demostrado que el modelo N-HiTS mejora la precisión en casi un 20% de media en comparación con la última arquitectura de Transformer, y reduce el tiempo de cálculo en un orden de magnitud (50 veces).
Enlace al artículo: https://doi.org/10.48550/arXiv.2201.12886.
Inicialización
Primero tendremos que importar las bibliotecas necesarias. Entre estas bibliotecas se encuentran MetaTrader 5 (para la interacción con el terminal del mismo nombre), PyTorch Lightning (para el entrenamiento de modelos) y algunas otras bibliotecas para el tratamiento y la visualización de datos.
import MetaTrader5 as mt5 import lightning.pytorch as pl from lightning.pytorch.callbacks import EarlyStopping import matplotlib.pyplot as plt import pandas as pd from pytorch_forecasting import Baseline, NHiTS, TimeSeriesDataSet from pytorch_forecasting.data import NaNLabelEncoder from pytorch_forecasting.metrics import MAE, SMAPE, MQF2DistributionLoss, QuantileLoss from lightning.pytorch.tuner import Tuner
A continuación, deberemos inicializar MetaTrader 5. Esto se logra llamando a la función mt.initialise(). Si no podemos inicializarlo simplemente usándolo, necesitaremos transmitir la ruta al terminal MetaTrader 5 como parámetro a esta función (en el ejemplo "D:\Project\mt\MT5\terminal64.exe" es mi ruta, en una aplicación real usted necesitará establecer la suya propia). Si la inicialización se ha realizado correctamente, la función retornará True; en caso contrario, devolverá False.
if not mt.initialize("D:\\Project\\mt\\MT5\\terminal64.exe"): print('initialize() failed!') else: print(mt.version())
La función mt.symbols_total() se utiliza para obtener el número total de variedades comerciadas disponibles en el terminal MetaTrader 5. Podemos usarlo para determinar si podemos obtener los datos correctos. Si el número total es superior a 0, podremos utilizar la función mt.copy_rates_from_pos() para recuperar los datos históricos de la variedad comerciada especificada. En este ejemplo, tendremos los datos más recientes sobre la duración del periodo "mt_data_len" M15 (15 minutos) de la variedad GOLD_micro.
sb=mt.symbols_total() rts=None if sb > 0: rts=mt.copy_rates_from_pos("GOLD_micro",mt.TIMEFRAME_M15,0,mt_data_len) mt.shutdown()
Finalmente, usaremos la función mt.shutdown() para cerrar la conexión con el terminal MetaTrader 5 y convertir los datos recibidos al formato Pandas DataFrame.
mt.shutdown() rts_fm=pd.DataFrame(rts)
Veamos ahora el preprocesamiento de los datos obtenidos del terminal.
Primero deberemos convertir las marcas de tiempo en fechas:
rts_fm['time']=pd.to_datetime(rts_fm['time'], unit='s')
Aquí ya no describiremos cómo marcar los datos. Encontrará los métodos en los dos artículos anteriores (enlazados en la introducción de este artículo). Para demostrar brevemente cómo utilizar los modelos de predicción, simplemente categorizaremos todos los fragmentos de datos de max_encoder_length+2max_prediction_length en grupos. Cada grupo tendrá una secuencia de 0 a "max_encoder_length+2max_prediction_length-1". Vamos a rellenarla. De esta forma, añadiremos las marcas necesarias a los datos originales. Primero tendremos que transformar el índice temporal original (es decir, el índice DataFrame). Luego calcularemos el resto del índice de tiempo original dividido por (max_encoder_length+2max_prediction_length), y utilizaremos el resultado como nuevo índice de tiempo. Después haremos coincidir el índice de tiempo con un rango de 0 a "max_encoder_length+2*max_prediction_length-1":
rts_fm['time_idx']= rts_fm.index%(max_encoder_length+2*max_prediction_length)
También tendremos que convertir el índice de tiempo original en un grupo. A continuación, calcularemos el índice de tiempo original dividido por "max_encoder_length+2*max_prediction_length" y utilizaremos el resultado como nuevo grupo:
rts_fm['series']=rts_fm.index//(max_encoder_length+2*max_prediction_length)
Asimismo, encapsularemos la parte correspondiente al preprocesamiento de datos en una función. Solo tendremos que transmitirle la longitud de los datos que necesitamos recuperar, y ya podremos completar el trabajo de preprocesamiento de datos:
def get_data(mt_data_len:int): if not mt.initialize("D:\\Project\\mt\\MT5\\terminal64.exe"): print('initialize() failed!') else: print(mt.version()) sb=mt.symbols_total() rts=None if sb > 0: rts=mt.copy_rates_from_pos("GOLD_micro",mt.TIMEFRAME_M15,0,mt_data_len) mt.shutdown() # print(len(rts)) rts_fm=pd.DataFrame(rts) rts_fm['time']=pd.to_datetime(rts_fm['time'], unit='s') rts_fm['time_idx']= rts_fm.index%(max_encoder_length+2*max_prediction_length) rts_fm['series']=rts_fm.index//(max_encoder_length+2*max_prediction_length) return rts_fm
Reescritura de la clase pytorch_forecasting.TimeSeriesDataSet
La reescritura de la función to_dataloader() en pytorch_forecasting controla si los datos se mezclarán y si se eliminará el último grupo de un paquete (principalmente para evitar errores impredecibles causados por una longitud insuficiente del último grupo de datos). Aquí tiene cómo hacerlo:
class New_TmSrDt(TimeSeriesDataSet): def to_dataloader(self, train: bool = True, batch_size: int = 64, batch_sampler: Sampler | str = None, shuffle:bool=False, drop_last:bool=False, **kwargs) -> DataLoader: default_kwargs = dict( shuffle=shuffle, drop_last=drop_last, #modification collate_fn=self._collate_fn, batch_size=batch_size, batch_sampler=batch_sampler, ) default_kwargs.update(kwargs) kwargs = default_kwargs if kwargs["batch_sampler"] is not None: sampler = kwargs["batch_sampler"] if isinstance(sampler, str): if sampler == "synchronized": kwargs["batch_sampler"] = TimeSynchronizedBatchSampler( SequentialSampler(self), batch_size=kwargs["batch_size"], shuffle=kwargs["shuffle"], drop_last=kwargs["drop_last"], ) else: raise ValueError(f"batch_sampler {sampler} unknown - see docstring for valid batch_sampler") del kwargs["batch_size"] del kwargs["shuffle"] del kwargs["drop_last"] return DataLoader(self,**kwargs)Este código creará una nueva clase New_TmSrDt que heredará de TimeSeriesDataSet. La función to_dataloader() se sobrescribirá en esta nueva clase para incluir los parámetros shuffle y drop_last. De este modo, podremos controlar mejor el proceso de carga de datos. No se olvide de sustituir los ejemplares de TimeSeriesDataSet por New_TmSrDt en el código.
Creación de conjuntos de datos para el entrenamiento y la prueba
En primer lugar, tendremos que definir un punto de corte para los datos de entrenamiento. Esto se hará restando la longitud máxima de la previsión del valor máximo de time_idx.
max_encoder_length = 2*96 max_prediction_length = 30 training_cutoff = rts_fm["time_idx"].max() - max_prediction_length
A continuación, utilizaremos la clase New_TmSrDt (que supone la reescritura de nuestra clase TimeSeriesDataSet) para crear el conjunto de datos de entrenamiento. Esta clase requerirá los siguientes parámetros:
- DataFrame (en este caso - rts_fm)
- La columna time_idx, que supone una secuencia entera continua.
- La columna objetivo (en este caso close), que representa el valor que queremos pronosticar.
- La columna del grupo (en este caso series) que representa diferentes series temporales.
- Las longitudes máximas del codificador y el predictor
context_length = max_encoder_length prediction_length = max_prediction_length training = New_TmSrDt( data[lambda x: x.time_idx <= training_cutoff], time_idx="time_idx", target="close", categorical_encoders={"series":NaNLabelEncoder().fit(data.series)}, group_ids=["series"], time_varying_unknown_reals=["close"], max_encoder_length=context_length, # min_encoder_length=max_encoder_length//2, max_prediction_length=prediction_length, # min_prediction_length=1, ) validation = New_TmSrDt.from_dataset(training, data, min_prediction_idx=training_cutoff + 1)
A continuación, utilizaremos la función New_TmSrDt.from_dataset() para crear el conjunto de datos de comprobación. Los siguientes parámetros serán necesarios para esta función:
- Conjunto de datos de entrenamiento
- DataFrame
- Índice mínimo de predicción; deberá ser 1 unidad mayor que el valor máximo de time_idx de los datos de entrenamiento.
validation = New_TmSrDt.from_dataset(training, rts_fm, min_prediction_idx=training_cutoff + 1)
Por último, utilizaremos la función to_dataloader() para convertir los conjuntos de datos de entrenamiento y validación en objetos PyTorch DataLoader. Los siguientes parámetros serán necesarios para esta función:
- El parámetro train, que indica si los datos deben mezclarse o no.
- El parámetro batch_size, que indica el número de muestras del lote.
- El parámetro num_workers, que especifica el número de procesos de trabajo para cargar los datos.
train_dataloader = training.to_dataloader(train=True, shuffle=t_shuffle, drop_last=t_drop_last, batch_size=batch_size, num_workers=0,) val_dataloader = validation.to_dataloader(train=False, shuffle=v_shuffle, drop_last=v_drop_last, batch_size=batch_size, num_workers=0)
Finalmente, encapsularemos este trozo de código en la función spilt_data(data:pd.DataFrame,t_drop_last:bool,t_shuffle:bool,v_drop_last:bool,v_shuffle:bool) y especificaremos los siguientes parámetros:
- El parámetro data se utilizará para obtener el conjunto de datos que se va a procesar.
- El parámetro t_drop_last especificará si debemos eliminar el último grupo del conjunto de datos de entrenamiento.
- El parámetro t_shuffle especificará si debemos barajar los datos de entrenamiento.
- El parámetro v_drop_last indicará si debemos eliminar el último grupo del conjunto de datos de validación.
- El parámetro v_shuffle especificará si barajaremos los datos de validación.
train_dataloader (el ejemplar de dataloader para el conjunto de datos de entrenamiento), val_dataloader (el ejemplar de dataloader para el conjunto de datos de validación) y training (el ejemplar de TimeSeriesDataSet para el conjunto de datos) se utilizarán como valores de retorno de esta función porque los usaremos más adelante.
def spilt_data(data:pd.DataFrame, t_drop_last:bool, t_shuffle:bool, v_drop_last:bool, v_shuffle:bool): training_cutoff = data["time_idx"].max() - max_prediction_length #max:95 context_length = max_encoder_length prediction_length = max_prediction_length training = New_TmSrDt( data[lambda x: x.time_idx <= training_cutoff], time_idx="time_idx", target="close", categorical_encoders={"series":NaNLabelEncoder().fit(data.series)}, group_ids=["series"], time_varying_unknown_reals=["close"], max_encoder_length=context_length, # min_encoder_length=max_encoder_length//2, max_prediction_length=prediction_length, # min_prediction_length=1, ) validation = New_TmSrDt.from_dataset(training, data, min_prediction_idx=training_cutoff + 1) train_dataloader = training.to_dataloader(train=True, shuffle=t_shuffle, drop_last=t_drop_last, batch_size=batch_size, num_workers=0,) val_dataloader = validation.to_dataloader(train=False, shuffle=v_shuffle, drop_last=v_drop_last, batch_size=batch_size, num_workers=0) return train_dataloader,val_dataloader,training
Creación y entrenamiento del modelo
Vamos a crear el modelo NHiTS. En esta parte veremos cómo configurar nuestros propios parámetros y cómo entrenar el modelo.
1. Encuentre la mejor curva de aprendizaje
Antes de empezar a construir el modelo, utilizaremos el objeto PyTorch Lightning Tuner para encontrar la mejor velocidad de aprendizaje.
En primer lugar, tendremos que crear un objeto Trainer donde el parámetro accelerator se utilizará para especificar el tipo de dispositivo, mientras que el gradient_clip_val se utilizará para evitar que la explosión del gradiente.
pl.seed_everything(42) trainer = pl.Trainer(accelerator="cpu", gradient_clip_val=0.1)
A continuación, utilizaremos la función NHiTS.from_dataset() para crear la red modelo NHiTS. Los siguientes parámetros serán necesarios para esta función:
- Conjunto de datos de entrenamiento
- Velocidad de aprendizaje
- Reducción del peso
- Función de pérdida
- Tamaño de la capa oculta
- Optimizador
net = NHiTS.from_dataset( training, learning_rate=3e-2, weight_decay=1e-2, loss=MQF2DistributionLoss(prediction_length=max_prediction_length), backcast_loss_ratio=0.0, hidden_size=64, optimizer="AdamW", )
A continuación crearemos un ejemplar de la clase Tuner y llamaremos a la función lr_find(). Luego entrenaremos el modelo basándonos en una serie de velocidades de aprendizaje de nuestro conjunto de datos y compararemos la pérdida de cada velocidad de aprendizaje para obtener la mejor.
res = Tuner(trainer).lr_find( net, train_dataloaders=train_dataloader, val_dataloaders=val_dataloader, min_lr=1e-5, max_lr=1e-1 ) lr_=res.suggestion()
Del mismo modo, encapsularemos esta parte del código que obtiene la mejor velocidad de aprendizaje en la función get_learning_rate() y haremos que la mejor velocidad de aprendizaje resultante sea su valor de retorno:
def get_learning_rate(): pl.seed_everything(42) trainer = pl.Trainer(accelerator="cpu", gradient_clip_val=0.1,logger=False) net = NHiTS.from_dataset( training, learning_rate=3e-2, weight_decay=1e-2, loss=MQF2DistributionLoss(prediction_length=max_prediction_length), backcast_loss_ratio=0.0, hidden_size=64, optimizer="AdamW", ) res = Tuner(trainer).lr_find( net, train_dataloaders=t_loader, val_dataloaders=v_loader, min_lr=1e-5, max_lr=1e-1 ) lr_=res.suggestion() return lr_
Si queremos visualizar el ritmo de aprendizaje, podemos añadir el siguiente código:
print(f"suggested learning rate: {res.suggestion()}")
fig = res.plot(show=True, suggest=True)
fig.show()
El resultado en este ejemplo será el siguiente:
velocidad de entrenamiento recomendada: 0.003981071705534973.
2. Definición de EarlyStopping Callback
Esta llamada de retorno se utilizará principalmente para controlar las pérdidas de comprobación y detener el entrenamiento cuando las pérdidas no mejoren durante varias épocas consecutivas. Esto podría impedir el sobreentrenamiento el modelo.
early_stop_callback = EarlyStopping(monitor="val_loss", min_delta=1e-4, patience=10, verbose=True, mode="min")
Aquí cabe destacar el parámetro patience, que básicamente determinará cuándo parar durante el entrenamiento si las pérdidas no disminuyen durante varias épocas consecutivas. Lo pondremos a 10.
3. Definición de ModelCheckpoint Callback
Esta llamada de retorno se utilizará principalmente para controlar el archivo del modelo y el nombre del archivo. Básicamente estableceremos las dos variables siguientes.
ck_callback=ModelCheckpoint(monitor='val_loss', mode="min", save_top_k=1, filename='{epoch}-{val_loss:.2f}')
save_top_k se utilizará para controlar el guardado de varios de los mejores modelos. Fijaremos el valor en 1 y nos quedaremos solo con el mejor modelo.
4. Definición de Training Model
Primero tendremos que crear un ejemplar de la clase Trainer en Lightning.pytorch y añadir las dos llamadas de retorno que hemos definido antes.
trainer = pl.Trainer( max_epochs=ep, accelerator="cpu", enable_model_summary=True, gradient_clip_val=1.0, callbacks=[early_stop_callback,ck_callback], limit_train_batches=30, enable_checkpointing=True, )
Aquí deberemos prestar atención a los parámetros max_epochs (número máximo de épocas de entrenamiento), gradient_clip_val (utilizado para evitar la explosión del gradiente) y callbacks. Aquí max_epochs utilizará la variable global ep, que definiremos más adelante, mientras que callbacks será nuestra colección de callbacks.
A continuación, también tendremos que definir el modelo NHiTS y crear un ejemplar del mismo:
net = NHiTS.from_dataset( training, learning_rate=lr, log_interval=10, log_val_interval=1, weight_decay=1e-2, backcast_loss_ratio=0.0, hidden_size=64, optimizer="AdamW", loss=MQF2DistributionLoss(prediction_length=max_prediction_length), )
Aquí no tendremos que cambiar los parámetros en absoluto, bastará con utilizar los valores por defecto. Aquí solo estableceremos la pérdida en la función de pérdida MQF2DistributionLoss.
5. Módulo de entrenamiento
Utilizaremos la función fit() del objeto Trainer para entrenar el modelo:
trainer.fit( net, train_dataloaders=train_dataloader, val_dataloaders=val_dataloader, )
Del mismo modo, encapsularemos esta parte del código en la función train():
def train(): early_stop_callback = EarlyStopping(monitor="val_loss", min_delta=1e-4, patience=10, # The number of times without improvement will stop verbose=True, mode="min") ck_callback=ModelCheckpoint(monitor='val_loss', mode="min", save_top_k=1, # Save the top few best ones filename='{epoch}-{val_loss:.2f}') trainer = pl.Trainer( max_epochs=ep, accelerator="cpu", enable_model_summary=True, gradient_clip_val=1.0, callbacks=[early_stop_callback,ck_callback], limit_train_batches=30, enable_checkpointing=True, ) net = NHiTS.from_dataset( training, learning_rate=lr, log_interval=10, log_val_interval=1, weight_decay=1e-2, backcast_loss_ratio=0.0, hidden_size=64, optimizer="AdamW", loss=MQF2DistributionLoss(prediction_length=max_prediction_length), ) trainer.fit( net, train_dataloaders=t_loader, val_dataloaders=v_loader, # ckpt_path='best' ) return trainer
Esta función retornará el modelo entrenado que se podrá utilizar para tareas de predicción.
Definición de la lógica de ejecución
1. Definición de variables globales:
ep=200 __train=False mt_data_len=200000 max_encoder_length = 2*96 max_prediction_length = 30 batch_size = 128
__train se utilizará para controlar si estamos entrenando o probando el modelo.
Cabe destacar que ep se usará para controlar la época máxima de entrenamiento. Como hemos establecido EarlyStopping, este valor podrá aumentarse porque el modelo se detendrá automáticamente cuando se detenga la convergencia.
mt_data_len — número de los últimos datos de series temporales recibidos del cliente.
max_encoder_length y max_prediction_length — longitud máxima de codificación y longitud máxima de predicción, respectivamente.
2. Entrenamiento
También necesitaremos guardar los resultados del entrenamiento óptimo actual en un archivo local una vez finalizado el entrenamiento, por lo que definiremos un archivo json para guardar esta información:
info_file='results.json'
Para que nuestro proceso de aprendizaje sea más comprensible, deberemos evitar mostrar información de advertencia innecesaria durante el entrenamiento, por lo que añadiremos el siguiente código:
warnings.filterwarnings("ignore")
A continuación vendrá nuestra lógica del aprendizaje:
dt=get_data(mt_data_len=mt_data_len) if __train: # print(dt) # dt=get_data(mt_data_len=mt_data_len) t_loader,v_loader,training=spilt_data(dt, t_shuffle=False,t_drop_last=True, v_shuffle=False,v_drop_last=True) lr=get_learning_rate() trainer__=train() m_c_back=trainer__.checkpoint_callback m_l_back=trainer__.early_stopping_callback best_m_p=m_c_back.best_model_path best_m_l=m_l_back.best_score.item() # print(best_m_p) if os.path.exists(info_file): with open(info_file,'r+') as f1: last=json.load(fp=f1) last_best_model=last['last_best_model'] last_best_score=last['last_best_score'] if last_best_score > best_m_l: last['last_best_model']=best_m_p last['last_best_score']=best_m_l json.dump(last,fp=f1) else: with open(info_file,'w') as f2: json.dump(dict(last_best_model=best_m_p,last_best_score=best_m_l),fp=f2)
Una vez finalizado el entrenamiento, podremos encontrar dónde se almacenan nuestro mejor modelo y nuestro mejor resultado en el archivo results.json del directorio raíz.
Durante el proceso de entrenamiento, veremos una barra de progreso que muestra el progreso de cada época.
Entrenamiento:
El entrenamiento ha finalizado:
3. Comprobación del modelo
Tras el entrenamiento, deberemos validar el modelo y visualizarlo. Podemos añadir el código siguiente:
best_model = NHiTS.load_from_checkpoint(best_m_p) predictions = best_model.predict(v_loader, trainer_kwargs=dict(accelerator="cpu",logger=False), return_y=True) raw_predictions = best_model.predict(v_loader, mode="raw", return_x=True, trainer_kwargs=dict(accelerator="cpu",logger=False)) for idx in range(10): # plot 10 examples best_model.plot_prediction(raw_predictions.x, raw_predictions.output, idx=idx, add_loss_to_title=True) # sample 500 paths samples = best_model.loss.sample(raw_predictions.output["prediction"][[0]], n_samples=500)[0] # plot prediction fig = best_model.plot_prediction(raw_predictions.x, raw_predictions.output, idx=0, add_loss_to_title=True) ax = fig.get_axes()[0] # plot first two sampled paths ax.plot(samples[:, 0], color="g", label="Sample 1") ax.plot(samples[:, 1], color="r", label="Sample 2") fig.legend() plt.show()
También podremos utilizar TensorBoard para visualizar el proceso en tiempo real durante el entrenamiento.
Resultado:
4. Prueba del modelo entrenado
En primer lugar, abriremos el archivo json para encontrar la ubicación de almacenamiento óptima para el modelo:
with open(info_file) as f: best_m_p=json.load(fp=f)['last_best_model'] print('model path is:',best_m_p)
Luego cargaremos el modelo:
best_model = NHiTS.load_from_checkpoint(best_m_p)
A continuación, obtendremos los datos en tiempo real del cliente para probar el modelo:
offset=1 dt=dt.iloc[-max_encoder_length-offset:-offset,:] last_=dt.iloc[-1] #get the last group of data # print(len(dt)) for i in range(1,max_prediction_length+1): dt.loc[dt.index[-1]+1]=last_ dt['series']=0 # dt['time_idx']=dt.apply(lambda x:x.index,args=1) dt['time_idx']=dt.index-dt.index[0] # dt=get_data(mt_data_len=max_encoder_length) predictions=best_model.predict(dt,mode='raw',trainer_kwargs=dict(accelerator="cpu",logger=False),return_x=True) best_model.plot_prediction(predictions.x,predictions.output,show_future_observed=False) plt.show()
Y aquí tenemos el resultado:
5. Valoración del modelo
Obviamente, podemos usar algunas métricas de la biblioteca PyTorch Forecasting para evaluar el rendimiento del modelo. A continuación veremos cómo realizar una valoración utilizando el error medio absoluto (MAE) y el error porcentual absoluto medio simétrico (SMAPE) y obtener los resultados de la valoración:
from pytorch_forecasting.metrics import MAE, SMAPE mae = MAE()(raw_predictions["prediction"], raw_predictions["target"]) print(f"Mean Absolute Error: {mae}") smape = SMAPE()(raw_predictions["prediction"], raw_predictions["target"]) print(f"Symmetric Mean Absolute Percentage Error: {smape}")
En este fragmento de código, primero importaremos las métricas de MAE y SMAPE. A continuación, utilizaremos estas métricas para calcular el error entre los valores predichos (raw_predictions["prediction"]) y los valores reales (raw_predictions["target"]). Estas métricas pueden ayudarnos a ver el rendimiento de nuestro modelo y orientarnos hacia nuevas mejoras.
Conclusión
En este artículo, hemos visto cómo usar los datos de marcado mencionados en los dos artículos anteriores y hemos mostrado cómo crear un modelo de N-HiTs utilizando nuestros datos. Acto seguido, hemos entrenado el modelo y lo hemos comprobado. Como puede ver, hemos avanzado mucho. También hemos demostrado cómo utilizar este modelo en MetaTrader 5 para predecir 30 velas. Obviamente, no hemos mencionado cómo colocar órdenes basadas en los resultados de la predicción, ya que el trading real requiere que los lectores realicen muchas pruebas según su situación real y especifiquen las reglas comerciales correspondientes.¡Gracias por su atención!
App:
Código completo:
# Copyright 2021, MetaQuotes Ltd. # https://www.mql5.com # from typing import Union import lightning.pytorch as pl import os from lightning.pytorch.callbacks import EarlyStopping,ModelCheckpoint import matplotlib.pyplot as plt import numpy as np import pandas as pd # import torch from pytorch_forecasting import NHiTS, TimeSeriesDataSet from pytorch_forecasting.data import NaNLabelEncoder,timeseries from pytorch_forecasting.metrics import MQF2DistributionLoss from pytorch_forecasting.data.samplers import TimeSynchronizedBatchSampler from lightning.pytorch.tuner import Tuner import MetaTrader5 as mt import warnings import json from torch.utils.data import DataLoader from torch.utils.data.sampler import Sampler,SequentialSampler class New_TmSrDt(TimeSeriesDataSet): ''' rewrite dataset class ''' def to_dataloader(self, train: bool = True, batch_size: int = 64, batch_sampler: Sampler | str = None, shuffle:bool=False, drop_last:bool=False, **kwargs) -> DataLoader: default_kwargs = dict( shuffle=shuffle, # drop_last=train and len(self) > batch_size, drop_last=drop_last, # collate_fn=self._collate_fn, batch_size=batch_size, batch_sampler=batch_sampler, ) default_kwargs.update(kwargs) kwargs = default_kwargs # print(kwargs['drop_last']) if kwargs["batch_sampler"] is not None: sampler = kwargs["batch_sampler"] if isinstance(sampler, str): if sampler == "synchronized": kwargs["batch_sampler"] = TimeSynchronizedBatchSampler( SequentialSampler(self), batch_size=kwargs["batch_size"], shuffle=kwargs["shuffle"], drop_last=kwargs["drop_last"], ) else: raise ValueError(f"batch_sampler {sampler} unknown - see docstring for valid batch_sampler") del kwargs["batch_size"] del kwargs["shuffle"] del kwargs["drop_last"] return DataLoader(self,**kwargs) def get_data(mt_data_len:int): if not mt.initialize(): print('initialize() failed!') else: print(mt.version()) sb=mt.symbols_total() rts=None if sb > 0: rts=mt.copy_rates_from_pos("GOLD_micro",mt.TIMEFRAME_M15,0,mt_data_len) mt.shutdown() # print(len(rts)) rts_fm=pd.DataFrame(rts) rts_fm['time']=pd.to_datetime(rts_fm['time'], unit='s') rts_fm['time_idx']= rts_fm.index%(max_encoder_length+2*max_prediction_length) rts_fm['series']=rts_fm.index//(max_encoder_length+2*max_prediction_length) return rts_fm def spilt_data(data:pd.DataFrame, t_drop_last:bool, t_shuffle:bool, v_drop_last:bool, v_shuffle:bool): training_cutoff = data["time_idx"].max() - max_prediction_length #max:95 context_length = max_encoder_length prediction_length = max_prediction_length training = New_TmSrDt( data[lambda x: x.time_idx <= training_cutoff], time_idx="time_idx", target="close", categorical_encoders={"series":NaNLabelEncoder().fit(data.series)}, group_ids=["series"], time_varying_unknown_reals=["close"], max_encoder_length=context_length, # min_encoder_length=max_encoder_length//2, max_prediction_length=prediction_length, # min_prediction_length=1, ) validation = New_TmSrDt.from_dataset(training, data, min_prediction_idx=training_cutoff + 1) train_dataloader = training.to_dataloader(train=True, shuffle=t_shuffle, drop_last=t_drop_last, batch_size=batch_size, num_workers=0,) val_dataloader = validation.to_dataloader(train=False, shuffle=v_shuffle, drop_last=v_drop_last, batch_size=batch_size, num_workers=0) return train_dataloader,val_dataloader,training def get_learning_rate(): pl.seed_everything(42) trainer = pl.Trainer(accelerator="cpu", gradient_clip_val=0.1,logger=False) net = NHiTS.from_dataset( training, learning_rate=3e-2, weight_decay=1e-2, loss=MQF2DistributionLoss(prediction_length=max_prediction_length), backcast_loss_ratio=0.0, hidden_size=64, optimizer="AdamW", ) res = Tuner(trainer).lr_find( net, train_dataloaders=t_loader, val_dataloaders=v_loader, min_lr=1e-5, max_lr=1e-1 ) # print(f"suggested learning rate: {res.suggestion()}") lr_=res.suggestion() return lr_ def train(): early_stop_callback = EarlyStopping(monitor="val_loss", min_delta=1e-4, patience=10, verbose=True, mode="min") ck_callback=ModelCheckpoint(monitor='val_loss', mode="min", save_top_k=1, filename='{epoch}-{val_loss:.2f}') trainer = pl.Trainer( max_epochs=ep, accelerator="cpu", enable_model_summary=True, gradient_clip_val=1.0, callbacks=[early_stop_callback,ck_callback], limit_train_batches=30, enable_checkpointing=True, ) net = NHiTS.from_dataset( training, learning_rate=lr, log_interval=10, log_val_interval=1, weight_decay=1e-2, backcast_loss_ratio=0.0, hidden_size=64, optimizer="AdamW", loss=MQF2DistributionLoss(prediction_length=max_prediction_length), ) trainer.fit( net, train_dataloaders=t_loader, val_dataloaders=v_loader, # ckpt_path='best' ) return trainer if __name__=='__main__': ep=200 __train=False mt_data_len=200000 max_encoder_length = 2*96 max_prediction_length = 30 batch_size = 128 info_file='results.json' warnings.filterwarnings("ignore") dt=get_data(mt_data_len=mt_data_len) if __train: # print(dt) # dt=get_data(mt_data_len=mt_data_len) t_loader,v_loader,training=spilt_data(dt, t_shuffle=False,t_drop_last=True, v_shuffle=False,v_drop_last=True) lr=get_learning_rate() trainer__=train() m_c_back=trainer__.checkpoint_callback m_l_back=trainer__.early_stopping_callback best_m_p=m_c_back.best_model_path best_m_l=m_l_back.best_score.item() # print(best_m_p) if os.path.exists(info_file): with open(info_file,'r+') as f1: last=json.load(fp=f1) last_best_model=last['last_best_model'] last_best_score=last['last_best_score'] if last_best_score > best_m_l: last['last_best_model']=best_m_p last['last_best_score']=best_m_l json.dump(last,fp=f1) else: with open(info_file,'w') as f2: json.dump(dict(last_best_model=best_m_p,last_best_score=best_m_l),fp=f2) best_model = NHiTS.load_from_checkpoint(best_m_p) predictions = best_model.predict(v_loader, trainer_kwargs=dict(accelerator="cpu",logger=False), return_y=True) raw_predictions = best_model.predict(v_loader, mode="raw", return_x=True, trainer_kwargs=dict(accelerator="cpu",logger=False)) for idx in range(10): # plot 10 examples best_model.plot_prediction(raw_predictions.x, raw_predictions.output, idx=idx, add_loss_to_title=True) samples = best_model.loss.sample(raw_predictions.output["prediction"][[0]], n_samples=500)[0] # plot prediction fig = best_model.plot_prediction(raw_predictions.x, raw_predictions.output, idx=0, add_loss_to_title=True) ax = fig.get_axes()[0] # plot first two sampled paths ax.plot(samples[:, 0], color="g", label="Sample 1") ax.plot(samples[:, 1], color="r", label="Sample 2") fig.legend() plt.show() else: with open(info_file) as f: best_m_p=json.load(fp=f)['last_best_model'] print('model path is:',best_m_p) best_model = NHiTS.load_from_checkpoint(best_m_p) offset=1 dt=dt.iloc[-max_encoder_length-offset:-offset,:] last_=dt.iloc[-1] # print(len(dt)) for i in range(1,max_prediction_length+1): dt.loc[dt.index[-1]+1]=last_ dt['series']=0 # dt['time_idx']=dt.apply(lambda x:x.index,args=1) dt['time_idx']=dt.index-dt.index[0] # dt=get_data(mt_data_len=max_encoder_length) predictions = best_model.predict(dt, mode='raw',trainer_kwargs=dict(accelerator="cpu",logger=False),return_x=True) best_model.plot_prediction(predictions.x,predictions.output,show_future_observed=False) plt.show()
Traducción del inglés realizada por MetaQuotes Ltd.
Artículo original: https://www.mql5.com/en/articles/13255
- Aplicaciones de trading gratuitas
- 8 000+ señales para copiar
- Noticias económicas para analizar los mercados financieros
Usted acepta la política del sitio web y las condiciones de uso