![Datenkennzeichnung für die Zeitreihenanalyse (Teil 3):Beispiel für die Verwendung von Datenkennzeichnungen](https://c.mql5.com/2/58/Data_label_for_time_series_mining_V4_Impr_600x314.jpg)
Datenkennzeichnung für die Zeitreihenanalyse (Teil 3):Beispiel für die Verwendung von Datenkennzeichnungen
Einführung
Dieser Artikel stellt vor, wie man PyTorch Lightning und PyTorch Forecasting Framework über die MetaTrader5-Handelsplattform verwendet, um finanzielle Zeitreihenprognosen auf der Grundlage neuronaler Netzwerke zu implementieren.
In diesem Artikel werden wir auch die Gründe für die Wahl dieser beiden Rahmen und das von uns verwendete Datenformat erläutern.
Was die Daten anbelangt, so können Sie die Daten verwenden, die durch die Datenkennzeichnung (data labeling) in meinen beiden vorherigen Artikeln erzeugt wurden. Da sie dasselbe Format haben, können Sie sie problemlos nach der in diesem Dokument beschriebenen Methode erweitern.
Die Links zu den beiden vorangegangenen Artikeln lauten:
- Datenkennzeichnung für Zeitreihenanalyse (Teil 1): Erstellen eines Datensatzes mit Trendmarkierungen durch den EA auf einem Chart
- Datenkennzeichnung für Zeitreihenanalyse (Teil 2): Datensätze mit Trendmarkern mit Python erstellen
Inhaltsverzeichnis
- Einführung
- Mehrere wichtige Python-Bibliotheken
- Initialisierung
- Umschreiben der Klasse pytorch_forecasting.TimeSeriesDataSet
- Erstellen von Trainings- und Validierungsdatensätzen
- Modellerstellung und Training
- Definieren der Ausführungslogik
- Schlussfolgerung
Mehrere wichtige Python-Bibliotheken
Zunächst wollen wir die wichtigsten Python-Bibliotheken vorstellen, die wir verwenden werden.
1. PyTorch Lightning
PyTorch Lightning ist ein Deep-Learning-Framework, das speziell für professionelle KI-Forscher und Ingenieure für maschinelles Lernen entwickelt wurde, die ein Höchstmaß an Flexibilität benötigen, ohne Kompromisse bei der Skalierbarkeit einzugehen.
Das zentrale Konzept besteht darin, akademischen Code (wie Modelldefinitionen, Vorwärts-/Rückwärtspropagation, Optimierer, Validierung usw.) von technischem Code (wie for-Schleifen, Speichermechanismen, TensorBoard-Protokolle, Trainingsstrategien usw.) zu trennen, was zu einem schlankeren und verständlicheren Code führt.
Die wichtigsten Vorteile sind:
- Hohe Wiederverwendbarkeit - Das Design der Software ermöglicht die Wiederverwendung des Codes in verschiedenen Projekten.
- Einfache Wartung - Dank des strukturierten Designs wird die Wartung des Codes einfacher.
- Klare Logik - Durch die Abstrahierung von Standard-Engineering-Code ist der Code für maschinelles Lernen leichter zu erkennen und zu verstehen.
Insgesamt ist PyTorch Lightning eine äußerst leistungsfähige Bibliothek, die eine effiziente Methode zur Organisation und Verwaltung Ihres PyTorch-Codes bietet. Darüber hinaus bietet es einen strukturierten Ansatz für die Bewältigung gängiger, aber komplizierter Aufgaben wie Modelltraining, -validierung und -test.
Die detaillierte Verwendung dieser Bibliothek kann in der offiziellen Dokumentation nachgelesen werden: https://lightning.ai/docs.2. PyTorch Vorhersage
Es handelt sich um eine Python-Bibliothek, die speziell für Zeitreihenprognosen entwickelt wurde. Da es auf PyTorch aufbaut, können Sie die leistungsstarken automatischen Differenzierungs- und Optimierungsbibliotheken von PyTorch nutzen und gleichzeitig von den Vorteilen profitieren, die PyTorch Forecasting für Zeitreihenprognosen bietet.
In PyTorch Forecasting finden Sie Implementierungen einer Vielzahl von Vorhersagemodellen, einschließlich, aber nicht beschränkt auf autoregressive Modelle (AR, ARIMA), Zustandsraummodelle (SARIMAX), neuronale Netze (LSTM, GRU) und Ensemblemethoden (Prophet, N-Beats). Dies bedeutet, dass Sie mit verschiedenen Vorhersageansätzen innerhalb desselben Rahmens experimentieren und diese vergleichen können, ohne dass Sie für jeden Ansatz einen umfangreichen Boilerplate-Code schreiben müssen.
Die Bibliothek bietet auch eine Reihe von Werkzeugen für die Datenvorverarbeitung, die Sie bei der Bearbeitung gängiger Aufgaben in Zeitreihen unterstützen können. Diese Werkzeuge umfassen unter anderem die Imputation fehlender Werte, Skalierung, Merkmalsextraktion und Rolling-Window-Transformationen. Dies bedeutet, dass Sie sich mehr auf den Entwurf und die Optimierung Ihres Modells konzentrieren können, ohne viel Zeit für die Datenverarbeitung aufwenden zu müssen.
Außerdem bietet es eine einheitliche Schnittstelle für die Bewertung der Modellleistung. Es implementiert Verlustfunktionen und Validierungsmetriken für Zeitreihen wie QuantileLoss und SMAPE und unterstützt Trainingsmethoden wie Early Stopping und Kreuzvalidierung. So können Sie die Leistung Ihres Modells bequem verfolgen und verbessern.
Wenn Sie eine Methode suchen, um die Effizienz und Wartbarkeit Ihres Zeitreihenvorhersageprojekts zu verbessern, dann könnte PyTorch Forecasting eine ausgezeichnete Wahl sein. Es bietet ein effektives und flexibles Mittel zur Organisation und Verwaltung Ihres PyTorch-Codes, sodass Sie sich auf den wichtigsten Aspekt konzentrieren können - das maschinelle Lernmodell selbst.
Die detaillierte Verwendung dieser Bibliothek kann in der offiziellen Dokumentation nachgelesen werden: https://pytorch-forecasting.readthedocs.io/en/stable.
3. Über das Modell N-HiTS
Das N-HiTS-Modell befasst sich mit den Problemen der Vorhersagevolatilität und der Rechenkomplexität bei langfristigen Vorhersagen, indem es innovative hierarchische Interpolations- und Multiraten-Datenstichprobenverfahren einführt. Dadurch kann das N-HiTS-Modell einen beliebig langen Vorhersagebereich effektiv annähern.
Darüber hinaus haben umfangreiche Experimente mit großen Datensätzen gezeigt, dass das N-HiTS-Modell die Genauigkeit im Vergleich zur neuesten Transformer-Architektur um durchschnittlich fast 20 % verbessert und gleichzeitig die Berechnungszeit um eine Größenordnung (50-mal) reduziert.
Hier der Link zu dem Papier: https://doi.org/10.48550/arXiv.2201.12886.
Initialisierung
Zunächst müssen wir die erforderlichen Bibliotheken importieren. Zu diesen Bibliotheken gehören MetaTrader5 (für die Interaktion mit dem MT5-Terminal), PyTorch Lightning (für das Training des Modells) und einige andere Bibliotheken für die Datenverarbeitung und Visualisierung.
import MetaTrader5 as mt5 import lightning.pytorch as pl from lightning.pytorch.callbacks import EarlyStopping import matplotlib.pyplot as plt import pandas as pd from pytorch_forecasting import Baseline, NHiTS, TimeSeriesDataSet from pytorch_forecasting.data import NaNLabelEncoder from pytorch_forecasting.metrics import MAE, SMAPE, MQF2DistributionLoss, QuantileLoss from lightning.pytorch.tuner import Tuner
Als Nächstes müssen wir MetaTrader5 initialisieren. Dies geschieht durch den Aufruf der Funktion mt.initialize(). Wenn Sie die Funktion nicht einfach initialisieren können, müssen Sie den Pfad des MT5-Terminals als Parameter an diese Funktion übergeben (im Beispiel ist "D:\Project\mt\MT5\terminal64.exe" mein persönlicher Pfad, in der tatsächlichen Anwendung müssen Sie dafür Ihren eigenen Pfad angeben). Wenn die Initialisierung erfolgreich war, gibt die Funktion True zurück, andernfalls False.
if not mt.initialize("D:\\Project\\mt\\MT5\\terminal64.exe"): print('initialize() failed!') else: print(mt.version())
Die Funktion mt.symbols_total() wird verwendet, um die Gesamtzahl der im MT5-Terminal verfügbaren handelbaren Varianten zu ermitteln. Wir können damit beurteilen, ob wir Daten korrekt erhalten können. Wenn die Gesamtzahl größer als 0 ist, können wir die Funktion mt.copy_rates_from_pos() verwenden, um historische Daten des angegebenen handelbaren Finanzinstruments zu erhalten. In diesem Beispiel haben wir die jüngste Länge „mt_data_len“ der M15 (15 Minuten) Perioden-Daten von „GOLD_micro“ erhalten.
sb=mt.symbols_total() rts=None if sb > 0: rts=mt.copy_rates_from_pos("GOLD_micro",mt.TIMEFRAME_M15,0,mt_data_len) mt.shutdown()
Dann verwenden wir die Funktion mt.shutdown(), um die Verbindung mit dem MT5-Terminal zu schließen und die erhaltenen Daten in das Format Pandas DataFrame zu konvertieren.
mt.shutdown() rts_fm=pd.DataFrame(rts)
Lassen Sie uns nun besprechen, wie man die vom MT5-Terminal erhaltenen Daten vorverarbeitet.
Zunächst müssen wir die Zeitstempel in Daten umwandeln:
rts_fm['time']=pd.to_datetime(rts_fm['time'], unit='s')
Hier wird nicht mehr beschrieben, wie die Daten zu kennzeichnen sind. Die Methoden finden Sie in meinen beiden früheren Artikeln (die Links zu den Artikeln finden Sie in der Einleitung dieses Artikels). Für eine kurze Demonstration der Verwendung von Vorhersagemodellen teilen wir einfach alle „max_encoder_length+2max_prediction_length“ Daten in eine Gruppe ein. Jede Gruppe hat eine Sequenz von 0 bis „max_encoder_length+2max_prediction_length-1“, und füllt sie. Auf diese Weise fügen wir den Originaldaten die erforderlichen Kennzeichnungen hinzu. Zunächst muss der ursprüngliche Zeitindex (d. h. der Index des DataFrame) in einen Zeitindex umgewandelt werden. Wir berechnen den Rest des ursprünglichen Zeitindexes geteilt durch (max_encoder_length+2max_prediction_length), und verwenden das Ergebnis als neuen Zeitindex. Damit wird der Zeitindex auf einen Bereich von 0 bis „max_encoder_length+2*max_prediction_length-1“ abgebildet:
rts_fm['time_idx']= rts_fm.index%(max_encoder_length+2*max_prediction_length)
Wir müssen auch den ursprünglichen Zeitindex in eine Gruppe umwandeln. Wir berechnen den ursprünglichen Zeitindex geteilt durch „max_encoder_length+2*max_prediction_length“, und verwenden das Ergebnis als neue Gruppe:
rts_fm['series']=rts_fm.index//(max_encoder_length+2*max_prediction_length)
Wir kapseln den Teil der Datenvorverarbeitung in eine Funktion ein. Wir brauchen ihm nur die Länge der Daten zu übergeben, die wir benötigen, und es kann die Vorverarbeitung der Daten abschließen:
def get_data(mt_data_len:int): if not mt.initialize("D:\\Project\\mt\\MT5\\terminal64.exe"): print('initialize() failed!') else: print(mt.version()) sb=mt.symbols_total() rts=None if sb > 0: rts=mt.copy_rates_from_pos("GOLD_micro",mt.TIMEFRAME_M15,0,mt_data_len) mt.shutdown() # print(len(rts)) rts_fm=pd.DataFrame(rts) rts_fm['time']=pd.to_datetime(rts_fm['time'], unit='s') rts_fm['time_idx']= rts_fm.index%(max_encoder_length+2*max_prediction_length) rts_fm['series']=rts_fm.index//(max_encoder_length+2*max_prediction_length) return rts_fm
Umschreiben der Klasse pytorch_forecasting.TimeSeriesDataSet
Umschreiben der Funktion to_dataloader() in pytorch_forecasting. Damit können wir steuern, ob die Daten gemischt werden und ob die letzte Gruppe eines Stapels verworfen werden soll (hauptsächlich, um unvorhersehbare Fehler zu vermeiden, die durch eine unzureichende Länge der letzten Datengruppe verursacht werden). So können wir es machen:
class New_TmSrDt(TimeSeriesDataSet): def to_dataloader(self, train: bool = True, batch_size: int = 64, batch_sampler: Sampler | str = None, shuffle:bool=False, drop_last:bool=False, **kwargs) -> DataLoader: default_kwargs = dict( shuffle=shuffle, drop_last=drop_last, #modification collate_fn=self._collate_fn, batch_size=batch_size, batch_sampler=batch_sampler, ) default_kwargs.update(kwargs) kwargs = default_kwargs if kwargs["batch_sampler"] is not None: sampler = kwargs["batch_sampler"] if isinstance(sampler, str): if sampler == "synchronized": kwargs["batch_sampler"] = TimeSynchronizedBatchSampler( SequentialSampler(self), batch_size=kwargs["batch_size"], shuffle=kwargs["shuffle"], drop_last=kwargs["drop_last"], ) else: raise ValueError(f"batch_sampler {sampler} unknown - see docstring for valid batch_sampler") del kwargs["batch_size"] del kwargs["shuffle"] del kwargs["drop_last"] return DataLoader(self,**kwargs)Dieser Code erstellt eine neue Klasse New_TmSrDt, die von TimeSeriesDataSet erbt. Die Funktion to_dataloader() wird dann in dieser neuen Klasse überschrieben, um die Parameter shuffle und drop_last aufzunehmen. Auf diese Weise haben wir mehr Kontrolle über Ihren Datenladeprozess. Denken Sie daran, Instanzen von TimeSeriesDataSet durch New_TmSrDt in Ihrem Code zu ersetzen.
Erstellen von Trainings- und Validierungsdatensätzen
Zunächst müssen wir den Abschneidepunkt für die Trainingsdaten festlegen. Dazu wird die maximale Vorhersagelänge vom maximalen Wert „time_idx“ subtrahiert.
max_encoder_length = 2*96 max_prediction_length = 30 training_cutoff = rts_fm["time_idx"].max() - max_prediction_length
Dann verwenden wir die Klasse New_TmSrDt (die von uns neu geschriebene TimeSeriesDataSet-Klasse), um einen Trainingsdatensatz zu erstellen. Diese Klasse benötigt die folgenden Parameter:
- DataFrame (in diesem Fall 'rts_fm'),
- die Spalte "time_idx", die eine fortlaufende Ganzzahlsequenz ist,
- die Zielspalte (in diesem Fall „close“), die den Wert darstellt, den wir vorhersagen wollen
- die Gruppenspalte (in diesem Fall „series“), die verschiedene Zeitreihen darstellt
- Die maximalen Längen des Encoders und des Prädiktors.
context_length = max_encoder_length prediction_length = max_prediction_length training = New_TmSrDt( data[lambda x: x.time_idx <= training_cutoff], time_idx="time_idx", target="close", categorical_encoders={"series":NaNLabelEncoder().fit(data.series)}, group_ids=["series"], time_varying_unknown_reals=["close"], max_encoder_length=context_length, # min_encoder_length=max_encoder_length//2, max_prediction_length=prediction_length, # min_prediction_length=1, ) validation = New_TmSrDt.from_dataset(training, data, min_prediction_idx=training_cutoff + 1)
Als Nächstes verwenden wir die Funktion New_TmSrDt.from_dataset(), um einen Validierungsdatensatz zu erstellen. Diese Funktion benötigt die folgenden Parameter:
- den Trainingsdatensatz,
- DataFrame,
- den minimalen Vorhersageindex, der um 1 größer sein sollte als der maximale Wert von „time_idx“ der Trainingsdaten.
validation = New_TmSrDt.from_dataset(training, rts_fm, min_prediction_idx=training_cutoff + 1)
Schließlich verwenden wir die Funktion to_dataloader(), um die Trainings- und Validierungsdatensätze in PyTorch DataLoader-Objekte zu konvertieren. Diese Funktion benötigt folgende Parameter:
- den Parameter „train“, der angibt, ob die Daten gemischt werden sollen,
- den Parameter „batch_size“, der die Anzahl der Proben pro Batch angibt,
- den Parameter „num_workers“, der die Anzahl der Arbeitsprozesse für das Laden der Daten angibt.
train_dataloader = training.to_dataloader(train=True, shuffle=t_shuffle, drop_last=t_drop_last, batch_size=batch_size, num_workers=0,) val_dataloader = validation.to_dataloader(train=False, shuffle=v_shuffle, drop_last=v_drop_last, batch_size=batch_size, num_workers=0)
Zuletzt kapseln wir diesen Teil des Codes in eine Funktion spilt_data(data:pd.DataFrame,t_drop_last:bool,t_shuffle:bool,v_drop_last:bool,v_shuffle:bool) , und geben folgenden Parameter an:
- den Parameter „data“, der den zu bearbeitenden Datensatz enthält,
- den Parameter „t_drop_last“, der angibt, ob die letzte Gruppe des Trainingsdatensatzes verworfen werden soll,
- den Parameter „t_shuffle“, der angibt, ob die Trainingsdaten gemischt werden sollen,
- den Parameter „v_drop_last“, der angibt, ob die letzte Gruppe des Validierungsdatensatzes fallen gelassen werden soll,
- den Parameter „v_shuffle“, der angibt, ob die Validierungsdaten gemischt werden sollen.
Wir machen train_dataloader (eine Instanz von dataloader für den Trainingsdatensatz), val_dataloader (eine Instanz von dataloader für den Validierungsdatensatz) und training (eine Instanz von TimeSeriesDataSet für den Datensatz) als Rückgabewerte dieser Funktion, da sie später verwendet werden.
def spilt_data(data:pd.DataFrame, t_drop_last:bool, t_shuffle:bool, v_drop_last:bool, v_shuffle:bool): training_cutoff = data["time_idx"].max() - max_prediction_length #max:95 context_length = max_encoder_length prediction_length = max_prediction_length training = New_TmSrDt( data[lambda x: x.time_idx <= training_cutoff], time_idx="time_idx", target="close", categorical_encoders={"series":NaNLabelEncoder().fit(data.series)}, group_ids=["series"], time_varying_unknown_reals=["close"], max_encoder_length=context_length, # min_encoder_length=max_encoder_length//2, max_prediction_length=prediction_length, # min_prediction_length=1, ) validation = New_TmSrDt.from_dataset(training, data, min_prediction_idx=training_cutoff + 1) train_dataloader = training.to_dataloader(train=True, shuffle=t_shuffle, drop_last=t_drop_last, batch_size=batch_size, num_workers=0,) val_dataloader = validation.to_dataloader(train=False, shuffle=v_shuffle, drop_last=v_drop_last, batch_size=batch_size, num_workers=0) return train_dataloader,val_dataloader,training
Modellerstellung und Training
Jetzt beginnen wir mit der Erstellung des NHiTS-Modells. In diesem Teil wird gezeigt, wie man seine Parameter einstellt und wie man es trainiert.
1. Finden der besten Lernrate
Bevor wir mit der Modellerstellung beginnen, verwenden wir das Tuner-Objekt von PyTorch Lightning, um die beste Lernrate zu finden.
Zunächst müssen wir ein Trainer-Objekt von PyTorch Lightning erstellen, wobei der Parameter „accelerator“ verwendet wird, um den Gerätetyp anzugeben, und „gradient_clip_val“, um eine Gradientenexplosion zu verhindern.
pl.seed_everything(42) trainer = pl.Trainer(accelerator="cpu", gradient_clip_val=0.1)
Als Nächstes verwenden wir die Funktion NHiTS.from_dataset(), um ein NHiTS-Modellnetz zu erstellen. Diese Funktion benötigt folgende Parameter:
- den Trainingsdatensatz,
- die Lernrate,
- die Abnahme der Gewichte,
- die Verlustfunktion,
- die Größe der verborgenen Schicht,
- der Optimierer.
net = NHiTS.from_dataset( training, learning_rate=3e-2, weight_decay=1e-2, loss=MQF2DistributionLoss(prediction_length=max_prediction_length), backcast_loss_ratio=0.0, hidden_size=64, optimizer="AdamW", )
Dann instanziieren wir die Klasse Tuner und rufen die Funktion lr_find() auf. Diese Funktion trainiert das Modell mit einer Reihe von Lernraten auf der Grundlage unseres Datensatzes und vergleicht den Verlust jeder Lernrate, um die beste Lernrate zu ermitteln.
res = Tuner(trainer).lr_find( net, train_dataloaders=train_dataloader, val_dataloaders=val_dataloader, min_lr=1e-5, max_lr=1e-1 ) lr_=res.suggestion()
In ähnlicher Weise kapseln wir diesen Teil des Codes, der die beste Lernrate ermittelt, in eine Funktion get_learning_rate() ein und machen die erhaltene beste Lernrate zu ihrem Rückgabewert:
def get_learning_rate(): pl.seed_everything(42) trainer = pl.Trainer(accelerator="cpu", gradient_clip_val=0.1,logger=False) net = NHiTS.from_dataset( training, learning_rate=3e-2, weight_decay=1e-2, loss=MQF2DistributionLoss(prediction_length=max_prediction_length), backcast_loss_ratio=0.0, hidden_size=64, optimizer="AdamW", ) res = Tuner(trainer).lr_find( net, train_dataloaders=t_loader, val_dataloaders=v_loader, min_lr=1e-5, max_lr=1e-1 ) lr_=res.suggestion() return lr_
Wenn man die Lernrate visualisieren möchten, kann man den folgenden Code hinzufügen:
print(f"suggested learning rate: {res.suggestion()}")
fig = res.plot(show=True, suggest=True)
fig.show()
Das Ergebnis in diesem Beispiel ist wie folgt:
vorgeschlagene Lernrate: 0.003981071705534973.
2. Definieren des Callbacks von EarlyStopping
Dieser Callback wird hauptsächlich verwendet, um den Validierungsverlust zu überwachen und das Training abzubrechen, wenn sich der Verlust in mehreren aufeinanderfolgenden Epochen nicht verbessert hat. Dies kann eine Überanpassung des Modells verhindern.
early_stop_callback = EarlyStopping(monitor="val_loss", min_delta=1e-4, patience=10, verbose=True, mode="min")
Der Parameter, der hier zu beachten ist, ist „patience“, der hauptsächlich steuert, wann das Training abgebrochen wird, wenn sich der Verlust in mehreren aufeinanderfolgenden Epochen nicht verbessert hat. Wir stellen ihn auf 10.
3. Definieren des Callbacks von ModelCheckpoint
Dieser Callback wird hauptsächlich zur Steuerung der Modellarchivierung und des Namens des Archivs verwendet. Wir setzen hauptsächlich diese beiden Variablen.
ck_callback=ModelCheckpoint(monitor='val_loss', mode="min", save_top_k=1, filename='{epoch}-{val_loss:.2f}')
Mit „save_top_k“ wird die Speicherung der besten Modelle gesteuert. Wir setzen ihn auf 1 und speichern nur das beste Modell.
4. Definition des Trainingsmodells
Zunächst müssen wir eine Trainerklasse in lightning.pytorch instanziieren und die beiden Callbacks hinzufügen, die wir zuvor definiert haben.
trainer = pl.Trainer( max_epochs=ep, accelerator="cpu", enable_model_summary=True, gradient_clip_val=1.0, callbacks=[early_stop_callback,ck_callback], limit_train_batches=30, enable_checkpointing=True, )
Die Parameter, auf die wir hier achten müssen, sind „max_epochs“ (maximale Anzahl von Trainingsepochen), „gradient_clip_val“ (zur Vermeidung einer Gradientenexplosion) und „callbacks“. Hier verwendet „max_epochs“ ep, eine globale Variable, die wir später definieren werden, und „callbacks“ ist unsere Kollektion von Rückrufen.
Als Nächstes müssen wir auch das NHiTS-Modell definieren und instanziieren:
net = NHiTS.from_dataset( training, learning_rate=lr, log_interval=10, log_val_interval=1, weight_decay=1e-2, backcast_loss_ratio=0.0, hidden_size=64, optimizer="AdamW", loss=MQF2DistributionLoss(prediction_length=max_prediction_length), )
Hier müssen die Parameter in der Regel nicht geändert werden, sondern es genügt, die Standardwerte zu verwenden. Hier ändern wir nur „loss“ in die Verlustfunktion MQF2DistributionLoss.
5. Trainingsmodul
Wir verwenden die Funktion fit() des Trainer-Objekts, um das Modell zu trainieren:
trainer.fit( net, train_dataloaders=train_dataloader, val_dataloaders=val_dataloader, )
In ähnlicher Weise kapseln wir diesen Teil des Codes in eine Funktion train() :
def train(): early_stop_callback = EarlyStopping(monitor="val_loss", min_delta=1e-4, patience=10, # The number of times without improvement will stop verbose=True, mode="min") ck_callback=ModelCheckpoint(monitor='val_loss', mode="min", save_top_k=1, # Save the top few best ones filename='{epoch}-{val_loss:.2f}') trainer = pl.Trainer( max_epochs=ep, accelerator="cpu", enable_model_summary=True, gradient_clip_val=1.0, callbacks=[early_stop_callback,ck_callback], limit_train_batches=30, enable_checkpointing=True, ) net = NHiTS.from_dataset( training, learning_rate=lr, log_interval=10, log_val_interval=1, weight_decay=1e-2, backcast_loss_ratio=0.0, hidden_size=64, optimizer="AdamW", loss=MQF2DistributionLoss(prediction_length=max_prediction_length), ) trainer.fit( net, train_dataloaders=t_loader, val_dataloaders=v_loader, # ckpt_path='best' ) return trainer
Diese Funktion gibt ein trainiertes Modell zurück, das Sie für Vorhersageaufgaben verwenden können.
Definieren der Ausführungslogik
1. Definieren von globalen Variablen:
ep=200 __train=False mt_data_len=200000 max_encoder_length = 2*96 max_prediction_length = 30 batch_size = 128
__train wird verwendet, um zu kontrollieren, ob wir das Modell gerade trainieren oder testen.
Es ist erwähnenswert, dass ep verwendet wird, um die maximale Trainingsepoche zu kontrollieren. Da wir EarlyStopping eingestellt haben, kann dieser Wert etwas größer gewählt werden, da das Modell automatisch stoppt, wenn es nicht mehr konvergiert.
mt_data_len ist die Anzahl der jüngsten Zeitreihendaten, die vom Client bezogen wurden.
max_encoder_length und max_prediction_length sind die maximale Kodierungslänge bzw. die maximale Vorhersagelänge.
2. Training
Wir müssen auch die aktuellen optimalen Trainingsergebnisse in einer lokalen Datei speichern, wenn das Training abgeschlossen ist, also definieren wir eine json-Datei, um diese Informationen zu speichern:
info_file='results.json'
Um unseren Trainingsprozess übersichtlicher zu gestalten, müssen wir vermeiden, dass während des Trainings unnötige Warnhinweise ausgegeben werden, daher fügen wir den folgenden Code hinzu:
warnings.filterwarnings("ignore")
Als Nächstes folgt unsere Trainingslogik:
dt=get_data(mt_data_len=mt_data_len) if __train: # print(dt) # dt=get_data(mt_data_len=mt_data_len) t_loader,v_loader,training=spilt_data(dt, t_shuffle=False,t_drop_last=True, v_shuffle=False,v_drop_last=True) lr=get_learning_rate() trainer__=train() m_c_back=trainer__.checkpoint_callback m_l_back=trainer__.early_stopping_callback best_m_p=m_c_back.best_model_path best_m_l=m_l_back.best_score.item() # print(best_m_p) if os.path.exists(info_file): with open(info_file,'r+') as f1: last=json.load(fp=f1) last_best_model=last['last_best_model'] last_best_score=last['last_best_score'] if last_best_score > best_m_l: last['last_best_model']=best_m_p last['last_best_score']=best_m_l json.dump(last,fp=f1) else: with open(info_file,'w') as f2: json.dump(dict(last_best_model=best_m_p,last_best_score=best_m_l),fp=f2)
Wenn das Training abgeschlossen ist, finden wir den Speicherort unseres besten Modells und die beste Punktzahl in der Datei results.json im Stammverzeichnis.
Während des Trainingsprozesses sieht man einen Fortschrittsbalken, der den Fortschritt jeder Epoche anzeigt.
Training:
Training abgeschlossen:
3. Validierung des Modells
Nach dem Training wollen wir das Modell validieren und es visualisieren. Wir können den folgenden Code hinzufügen:
best_model = NHiTS.load_from_checkpoint(best_m_p) predictions = best_model.predict(v_loader, trainer_kwargs=dict(accelerator="cpu",logger=False), return_y=True) raw_predictions = best_model.predict(v_loader, mode="raw", return_x=True, trainer_kwargs=dict(accelerator="cpu",logger=False)) for idx in range(10): # plot 10 examples best_model.plot_prediction(raw_predictions.x, raw_predictions.output, idx=idx, add_loss_to_title=True) # sample 500 paths samples = best_model.loss.sample(raw_predictions.output["prediction"][[0]], n_samples=500)[0] # plot prediction fig = best_model.plot_prediction(raw_predictions.x, raw_predictions.output, idx=0, add_loss_to_title=True) ax = fig.get_axes()[0] # plot first two sampled paths ax.plot(samples[:, 0], color="g", label="Sample 1") ax.plot(samples[:, 1], color="r", label="Sample 2") fig.legend() plt.show()
Sie können auch TensorBoard verwenden, um die Visualisierung der Trainingssituation in Echtzeit während des Trainings zu sehen, wir machen hier keine Demonstration.
Ergebnis:
4. Testen des trainierten Modells
Zunächst öffnen wir die json-Datei, um den optimalen Speicherort für das Modell zu finden:
with open(info_file) as f: best_m_p=json.load(fp=f)['last_best_model'] print('model path is:',best_m_p)
Dann laden wir das Modell:
best_model = NHiTS.load_from_checkpoint(best_m_p)
Danach erhalten wir vom Client die Daten in Echtzeit, um das Modell zu testen:
offset=1 dt=dt.iloc[-max_encoder_length-offset:-offset,:] last_=dt.iloc[-1] #get the last group of data # print(len(dt)) for i in range(1,max_prediction_length+1): dt.loc[dt.index[-1]+1]=last_ dt['series']=0 # dt['time_idx']=dt.apply(lambda x:x.index,args=1) dt['time_idx']=dt.index-dt.index[0] # dt=get_data(mt_data_len=max_encoder_length) predictions=best_model.predict(dt,mode='raw',trainer_kwargs=dict(accelerator="cpu",logger=False),return_x=True) best_model.plot_prediction(predictions.x,predictions.output,show_future_observed=False) plt.show()
Das Ergebnis sieht wie folgt aus:
5. Bewertung des Modells
Natürlich können wir einige Metriken in der PyTorch Forecasting-Bibliothek verwenden, um die Leistung des Modells zu bewerten. Im Folgenden wird beschrieben, wie man den mittleren absoluten Fehler (MAE) und den symmetrischen mittleren absoluten Fehler in Prozent (SMAPE) auswerten und die Ergebnisse ausgibt:
from pytorch_forecasting.metrics import MAE, SMAPE mae = MAE()(raw_predictions["prediction"], raw_predictions["target"]) print(f"Mean Absolute Error: {mae}") smape = SMAPE()(raw_predictions["prediction"], raw_predictions["target"]) print(f"Symmetric Mean Absolute Percentage Error: {smape}")
In diesem Codeschnipsel importieren wir zunächst die MAE- und SMAPE-Metriken. Anschließend wird anhand dieser Metriken der Fehler zwischen den vorhergesagten Werten ( raw_predictions["prediction"] ) und den tatsächlichen Werten ( raw_predictions["target"] ) berechnet. Diese Metriken können uns helfen, die Leistung unseres Modells zu verstehen und Hinweise zur weiteren Verbesserung unseres Modells zu geben.
Schlussfolgerung
In diesem Artikel haben wir uns angeschaut, wie die in den beiden vorangegangenen Artikeln erwähnten Datenkennzeichnungen verwendet werden können, und demonstriert, wie ein Modell N-HiTs mit unseren Daten erstellt werden kann. Dann haben wir das Modell trainiert und überprüft. Und aus dem Ergebnisdiagramm können wir leicht erkennen, dass unsere Ergebnisse gut sind. Wir haben auch gezeigt, wie dieses Modell im MT5 verwendet werden kann, um Vorhersagen für 30 Kerzen zu treffen. Natürlich haben wir nicht erwähnt, wie man auf der Grundlage der Vorhersageergebnisse Orders platziert, denn für den realen Handel müssen die Leser eine Menge Tests entsprechend ihrer tatsächlichen Situation durchführen und die entsprechenden Handelsregeln festlegen.Zum Schluss: Viel Spaß!
Anhang:
Der vollständige Code:
# Copyright 2021, MetaQuotes Ltd. # https://www.mql5.com # from typing import Union import lightning.pytorch as pl import os from lightning.pytorch.callbacks import EarlyStopping,ModelCheckpoint import matplotlib.pyplot as plt import numpy as np import pandas as pd # import torch from pytorch_forecasting import NHiTS, TimeSeriesDataSet from pytorch_forecasting.data import NaNLabelEncoder,timeseries from pytorch_forecasting.metrics import MQF2DistributionLoss from pytorch_forecasting.data.samplers import TimeSynchronizedBatchSampler from lightning.pytorch.tuner import Tuner import MetaTrader5 as mt import warnings import json from torch.utils.data import DataLoader from torch.utils.data.sampler import Sampler,SequentialSampler class New_TmSrDt(TimeSeriesDataSet): ''' rewrite dataset class ''' def to_dataloader(self, train: bool = True, batch_size: int = 64, batch_sampler: Sampler | str = None, shuffle:bool=False, drop_last:bool=False, **kwargs) -> DataLoader: default_kwargs = dict( shuffle=shuffle, # drop_last=train and len(self) > batch_size, drop_last=drop_last, # collate_fn=self._collate_fn, batch_size=batch_size, batch_sampler=batch_sampler, ) default_kwargs.update(kwargs) kwargs = default_kwargs # print(kwargs['drop_last']) if kwargs["batch_sampler"] is not None: sampler = kwargs["batch_sampler"] if isinstance(sampler, str): if sampler == "synchronized": kwargs["batch_sampler"] = TimeSynchronizedBatchSampler( SequentialSampler(self), batch_size=kwargs["batch_size"], shuffle=kwargs["shuffle"], drop_last=kwargs["drop_last"], ) else: raise ValueError(f"batch_sampler {sampler} unknown - see docstring for valid batch_sampler") del kwargs["batch_size"] del kwargs["shuffle"] del kwargs["drop_last"] return DataLoader(self,**kwargs) def get_data(mt_data_len:int): if not mt.initialize(): print('initialize() failed!') else: print(mt.version()) sb=mt.symbols_total() rts=None if sb > 0: rts=mt.copy_rates_from_pos("GOLD_micro",mt.TIMEFRAME_M15,0,mt_data_len) mt.shutdown() # print(len(rts)) rts_fm=pd.DataFrame(rts) rts_fm['time']=pd.to_datetime(rts_fm['time'], unit='s') rts_fm['time_idx']= rts_fm.index%(max_encoder_length+2*max_prediction_length) rts_fm['series']=rts_fm.index//(max_encoder_length+2*max_prediction_length) return rts_fm def spilt_data(data:pd.DataFrame, t_drop_last:bool, t_shuffle:bool, v_drop_last:bool, v_shuffle:bool): training_cutoff = data["time_idx"].max() - max_prediction_length #max:95 context_length = max_encoder_length prediction_length = max_prediction_length training = New_TmSrDt( data[lambda x: x.time_idx <= training_cutoff], time_idx="time_idx", target="close", categorical_encoders={"series":NaNLabelEncoder().fit(data.series)}, group_ids=["series"], time_varying_unknown_reals=["close"], max_encoder_length=context_length, # min_encoder_length=max_encoder_length//2, max_prediction_length=prediction_length, # min_prediction_length=1, ) validation = New_TmSrDt.from_dataset(training, data, min_prediction_idx=training_cutoff + 1) train_dataloader = training.to_dataloader(train=True, shuffle=t_shuffle, drop_last=t_drop_last, batch_size=batch_size, num_workers=0,) val_dataloader = validation.to_dataloader(train=False, shuffle=v_shuffle, drop_last=v_drop_last, batch_size=batch_size, num_workers=0) return train_dataloader,val_dataloader,training def get_learning_rate(): pl.seed_everything(42) trainer = pl.Trainer(accelerator="cpu", gradient_clip_val=0.1,logger=False) net = NHiTS.from_dataset( training, learning_rate=3e-2, weight_decay=1e-2, loss=MQF2DistributionLoss(prediction_length=max_prediction_length), backcast_loss_ratio=0.0, hidden_size=64, optimizer="AdamW", ) res = Tuner(trainer).lr_find( net, train_dataloaders=t_loader, val_dataloaders=v_loader, min_lr=1e-5, max_lr=1e-1 ) # print(f"suggested learning rate: {res.suggestion()}") lr_=res.suggestion() return lr_ def train(): early_stop_callback = EarlyStopping(monitor="val_loss", min_delta=1e-4, patience=10, verbose=True, mode="min") ck_callback=ModelCheckpoint(monitor='val_loss', mode="min", save_top_k=1, filename='{epoch}-{val_loss:.2f}') trainer = pl.Trainer( max_epochs=ep, accelerator="cpu", enable_model_summary=True, gradient_clip_val=1.0, callbacks=[early_stop_callback,ck_callback], limit_train_batches=30, enable_checkpointing=True, ) net = NHiTS.from_dataset( training, learning_rate=lr, log_interval=10, log_val_interval=1, weight_decay=1e-2, backcast_loss_ratio=0.0, hidden_size=64, optimizer="AdamW", loss=MQF2DistributionLoss(prediction_length=max_prediction_length), ) trainer.fit( net, train_dataloaders=t_loader, val_dataloaders=v_loader, # ckpt_path='best' ) return trainer if __name__=='__main__': ep=200 __train=False mt_data_len=200000 max_encoder_length = 2*96 max_prediction_length = 30 batch_size = 128 info_file='results.json' warnings.filterwarnings("ignore") dt=get_data(mt_data_len=mt_data_len) if __train: # print(dt) # dt=get_data(mt_data_len=mt_data_len) t_loader,v_loader,training=spilt_data(dt, t_shuffle=False,t_drop_last=True, v_shuffle=False,v_drop_last=True) lr=get_learning_rate() trainer__=train() m_c_back=trainer__.checkpoint_callback m_l_back=trainer__.early_stopping_callback best_m_p=m_c_back.best_model_path best_m_l=m_l_back.best_score.item() # print(best_m_p) if os.path.exists(info_file): with open(info_file,'r+') as f1: last=json.load(fp=f1) last_best_model=last['last_best_model'] last_best_score=last['last_best_score'] if last_best_score > best_m_l: last['last_best_model']=best_m_p last['last_best_score']=best_m_l json.dump(last,fp=f1) else: with open(info_file,'w') as f2: json.dump(dict(last_best_model=best_m_p,last_best_score=best_m_l),fp=f2) best_model = NHiTS.load_from_checkpoint(best_m_p) predictions = best_model.predict(v_loader, trainer_kwargs=dict(accelerator="cpu",logger=False), return_y=True) raw_predictions = best_model.predict(v_loader, mode="raw", return_x=True, trainer_kwargs=dict(accelerator="cpu",logger=False)) for idx in range(10): # plot 10 examples best_model.plot_prediction(raw_predictions.x, raw_predictions.output, idx=idx, add_loss_to_title=True) samples = best_model.loss.sample(raw_predictions.output["prediction"][[0]], n_samples=500)[0] # plot prediction fig = best_model.plot_prediction(raw_predictions.x, raw_predictions.output, idx=0, add_loss_to_title=True) ax = fig.get_axes()[0] # plot first two sampled paths ax.plot(samples[:, 0], color="g", label="Sample 1") ax.plot(samples[:, 1], color="r", label="Sample 2") fig.legend() plt.show() else: with open(info_file) as f: best_m_p=json.load(fp=f)['last_best_model'] print('model path is:',best_m_p) best_model = NHiTS.load_from_checkpoint(best_m_p) offset=1 dt=dt.iloc[-max_encoder_length-offset:-offset,:] last_=dt.iloc[-1] # print(len(dt)) for i in range(1,max_prediction_length+1): dt.loc[dt.index[-1]+1]=last_ dt['series']=0 # dt['time_idx']=dt.apply(lambda x:x.index,args=1) dt['time_idx']=dt.index-dt.index[0] # dt=get_data(mt_data_len=max_encoder_length) predictions = best_model.predict(dt, mode='raw',trainer_kwargs=dict(accelerator="cpu",logger=False),return_x=True) best_model.plot_prediction(predictions.x,predictions.output,show_future_observed=False) plt.show()
Übersetzt aus dem Englischen von MetaQuotes Ltd.
Originalartikel: https://www.mql5.com/en/articles/13255
![ALGLIB Bibliothek für numerische Analysen in MQL5](https://c.mql5.com/2/58/ALGLIB_in_MQL5_avatar.png)
![Fertige Vorlagen für die Verwendung von Indikatoren in Expert Advisors (Teil 1): Oszillatoren](https://c.mql5.com/2/57/ready_made_templates_for_connecting_indicators_001_avatar.png)
![ONNX meistern: Der Game-Changer für MQL5-Händler](https://c.mql5.com/2/59/Mastering_ONNX_logo_up.png)
![Wie man einen einfachen EA für mehrere Währungen mit MQL5 erstellt (Teil 2): Indikator-Signale: Multi-Zeitrahmen Parabolic SAR Indikator](https://c.mql5.com/2/58/FXSAR_MTF_MCEA_icon.png)
![MQL5 - Sprache von Handelsstrategien, eingebaut ins Kundenterminal MetaTrader 5](https://c.mql5.com/i/registerlandings/logo-2.png)
- Freie Handelsapplikationen
- Über 8.000 Signale zum Kopieren
- Wirtschaftsnachrichten für die Lage an den Finanzmärkte
Sie stimmen der Website-Richtlinie und den Nutzungsbedingungen zu.