English Русский 中文 Español 日本語 Português
preview
Datenkennzeichnung für die Zeitreihenanalyse (Teil 3):Beispiel für die Verwendung von Datenkennzeichnungen

Datenkennzeichnung für die Zeitreihenanalyse (Teil 3):Beispiel für die Verwendung von Datenkennzeichnungen

MetaTrader 5Experten | 30 Januar 2024, 12:55
182 0
Yuqiang Pan
Yuqiang Pan

Einführung

Dieser Artikel stellt vor, wie man PyTorch Lightning und PyTorch Forecasting Framework über die MetaTrader5-Handelsplattform verwendet, um finanzielle Zeitreihenprognosen auf der Grundlage neuronaler Netzwerke zu implementieren.

In diesem Artikel werden wir auch die Gründe für die Wahl dieser beiden Rahmen und das von uns verwendete Datenformat erläutern.

Was die Daten anbelangt, so können Sie die Daten verwenden, die durch die Datenkennzeichnung (data labeling) in meinen beiden vorherigen Artikeln erzeugt wurden. Da sie dasselbe Format haben, können Sie sie problemlos nach der in diesem Dokument beschriebenen Methode erweitern.

Die Links zu den beiden vorangegangenen Artikeln lauten: 

  1. Datenkennzeichnung für Zeitreihenanalyse (Teil 1): Erstellen eines Datensatzes mit Trendmarkierungen durch den EA auf einem Chart
  2. Datenkennzeichnung für Zeitreihenanalyse (Teil 2): Datensätze mit Trendmarkern mit Python erstellen

Inhaltsverzeichnis


Mehrere wichtige Python-Bibliotheken

Zunächst wollen wir die wichtigsten Python-Bibliotheken vorstellen, die wir verwenden werden.

1. PyTorch Lightning

PyTorch Lightning ist ein Deep-Learning-Framework, das speziell für professionelle KI-Forscher und Ingenieure für maschinelles Lernen entwickelt wurde, die ein Höchstmaß an Flexibilität benötigen, ohne Kompromisse bei der Skalierbarkeit einzugehen.

Das zentrale Konzept besteht darin, akademischen Code (wie Modelldefinitionen, Vorwärts-/Rückwärtspropagation, Optimierer, Validierung usw.) von technischem Code (wie for-Schleifen, Speichermechanismen, TensorBoard-Protokolle, Trainingsstrategien usw.) zu trennen, was zu einem schlankeren und verständlicheren Code führt.

Die wichtigsten Vorteile sind:

  • Hohe Wiederverwendbarkeit - Das Design der Software ermöglicht die Wiederverwendung des Codes in verschiedenen Projekten.
  • Einfache Wartung - Dank des strukturierten Designs wird die Wartung des Codes einfacher.
  • Klare Logik - Durch die Abstrahierung von Standard-Engineering-Code ist der Code für maschinelles Lernen leichter zu erkennen und zu verstehen.

Insgesamt ist PyTorch Lightning eine äußerst leistungsfähige Bibliothek, die eine effiziente Methode zur Organisation und Verwaltung Ihres PyTorch-Codes bietet. Darüber hinaus bietet es einen strukturierten Ansatz für die Bewältigung gängiger, aber komplizierter Aufgaben wie Modelltraining, -validierung und -test.

Die detaillierte Verwendung dieser Bibliothek kann in der offiziellen Dokumentation nachgelesen werden: https://lightning.ai/docs.

2. PyTorch Vorhersage

Es handelt sich um eine Python-Bibliothek, die speziell für Zeitreihenprognosen entwickelt wurde. Da es auf PyTorch aufbaut, können Sie die leistungsstarken automatischen Differenzierungs- und Optimierungsbibliotheken von PyTorch nutzen und gleichzeitig von den Vorteilen profitieren, die PyTorch Forecasting für Zeitreihenprognosen bietet.

In PyTorch Forecasting finden Sie Implementierungen einer Vielzahl von Vorhersagemodellen, einschließlich, aber nicht beschränkt auf autoregressive Modelle (AR, ARIMA), Zustandsraummodelle (SARIMAX), neuronale Netze (LSTM, GRU) und Ensemblemethoden (Prophet, N-Beats). Dies bedeutet, dass Sie mit verschiedenen Vorhersageansätzen innerhalb desselben Rahmens experimentieren und diese vergleichen können, ohne dass Sie für jeden Ansatz einen umfangreichen Boilerplate-Code schreiben müssen.

Die Bibliothek bietet auch eine Reihe von Werkzeugen für die Datenvorverarbeitung, die Sie bei der Bearbeitung gängiger Aufgaben in Zeitreihen unterstützen können. Diese Werkzeuge umfassen unter anderem die Imputation fehlender Werte, Skalierung, Merkmalsextraktion und Rolling-Window-Transformationen. Dies bedeutet, dass Sie sich mehr auf den Entwurf und die Optimierung Ihres Modells konzentrieren können, ohne viel Zeit für die Datenverarbeitung aufwenden zu müssen.

Außerdem bietet es eine einheitliche Schnittstelle für die Bewertung der Modellleistung. Es implementiert Verlustfunktionen und Validierungsmetriken für Zeitreihen wie QuantileLoss und SMAPE und unterstützt Trainingsmethoden wie Early Stopping und Kreuzvalidierung. So können Sie die Leistung Ihres Modells bequem verfolgen und verbessern.

Wenn Sie eine Methode suchen, um die Effizienz und Wartbarkeit Ihres Zeitreihenvorhersageprojekts zu verbessern, dann könnte PyTorch Forecasting eine ausgezeichnete Wahl sein. Es bietet ein effektives und flexibles Mittel zur Organisation und Verwaltung Ihres PyTorch-Codes, sodass Sie sich auf den wichtigsten Aspekt konzentrieren können - das maschinelle Lernmodell selbst.

Die detaillierte Verwendung dieser Bibliothek kann in der offiziellen Dokumentation nachgelesen werden: https://pytorch-forecasting.readthedocs.io/en/stable.

3. Über das Modell N-HiTS

Das N-HiTS-Modell befasst sich mit den Problemen der Vorhersagevolatilität und der Rechenkomplexität bei langfristigen Vorhersagen, indem es innovative hierarchische Interpolations- und Multiraten-Datenstichprobenverfahren einführt. Dadurch kann das N-HiTS-Modell einen beliebig langen Vorhersagebereich effektiv annähern.

Darüber hinaus haben umfangreiche Experimente mit großen Datensätzen gezeigt, dass das N-HiTS-Modell die Genauigkeit im Vergleich zur neuesten Transformer-Architektur um durchschnittlich fast 20 % verbessert und gleichzeitig die Berechnungszeit um eine Größenordnung (50-mal) reduziert.

Hier der Link zu dem Papier: https://doi.org/10.48550/arXiv.2201.12886.


Initialisierung

Zunächst müssen wir die erforderlichen Bibliotheken importieren. Zu diesen Bibliotheken gehören MetaTrader5 (für die Interaktion mit dem MT5-Terminal), PyTorch Lightning (für das Training des Modells) und einige andere Bibliotheken für die Datenverarbeitung und Visualisierung.

import MetaTrader5 as mt5
import lightning.pytorch as pl
from lightning.pytorch.callbacks import EarlyStopping
import matplotlib.pyplot as plt
import pandas as pd
from pytorch_forecasting import Baseline, NHiTS, TimeSeriesDataSet
from pytorch_forecasting.data import NaNLabelEncoder
from pytorch_forecasting.metrics import MAE, SMAPE, MQF2DistributionLoss, QuantileLoss
from lightning.pytorch.tuner import Tuner

Als Nächstes müssen wir MetaTrader5 initialisieren. Dies geschieht durch den Aufruf der Funktion mt.initialize(). Wenn Sie die Funktion nicht einfach initialisieren können, müssen Sie den Pfad des MT5-Terminals als Parameter an diese Funktion übergeben (im Beispiel ist "D:\Project\mt\MT5\terminal64.exe" mein persönlicher Pfad, in der tatsächlichen Anwendung müssen Sie dafür Ihren eigenen Pfad angeben). Wenn die Initialisierung erfolgreich war, gibt die Funktion True zurück, andernfalls False.

if not mt.initialize("D:\\Project\\mt\\MT5\\terminal64.exe"):
    print('initialize() failed!')
else:
    print(mt.version())

Die Funktion mt.symbols_total() wird verwendet, um die Gesamtzahl der im MT5-Terminal verfügbaren handelbaren Varianten zu ermitteln. Wir können damit beurteilen, ob wir Daten korrekt erhalten können. Wenn die Gesamtzahl größer als 0 ist, können wir die Funktion mt.copy_rates_from_pos() verwenden, um historische Daten des angegebenen handelbaren Finanzinstruments zu erhalten. In diesem Beispiel haben wir die jüngste Länge „mt_data_len“ der M15 (15 Minuten) Perioden-Daten von „GOLD_micro“ erhalten.

sb=mt.symbols_total()
rts=None
if sb > 0:
    rts=mt.copy_rates_from_pos("GOLD_micro",mt.TIMEFRAME_M15,0,mt_data_len) 
    mt.shutdown()

Dann verwenden wir die Funktion mt.shutdown(), um die Verbindung mit dem MT5-Terminal zu schließen und die erhaltenen Daten in das Format Pandas DataFrame zu konvertieren.

mt.shutdown()
rts_fm=pd.DataFrame(rts)

Lassen Sie uns nun besprechen, wie man die vom MT5-Terminal erhaltenen Daten vorverarbeitet.

Zunächst müssen wir die Zeitstempel in Daten umwandeln:

rts_fm['time']=pd.to_datetime(rts_fm['time'], unit='s') 

Hier wird nicht mehr beschrieben, wie die Daten zu kennzeichnen sind. Die Methoden finden Sie in meinen beiden früheren Artikeln (die Links zu den Artikeln finden Sie in der Einleitung dieses Artikels). Für eine kurze Demonstration der Verwendung von Vorhersagemodellen teilen wir einfach alle „max_encoder_length+2max_prediction_length“ Daten in eine Gruppe ein. Jede Gruppe hat eine Sequenz von 0 bis „max_encoder_length+2max_prediction_length-1“, und füllt sie. Auf diese Weise fügen wir den Originaldaten die erforderlichen Kennzeichnungen hinzu. Zunächst muss der ursprüngliche Zeitindex (d. h. der Index des DataFrame) in einen Zeitindex umgewandelt werden. Wir berechnen den Rest des ursprünglichen Zeitindexes geteilt durch (max_encoder_length+2max_prediction_length), und verwenden das Ergebnis als neuen Zeitindex. Damit wird der Zeitindex auf einen Bereich von 0 bis „max_encoder_length+2*max_prediction_length-1“ abgebildet:

rts_fm['time_idx']= rts_fm.index%(max_encoder_length+2*max_prediction_length) 

Wir müssen auch den ursprünglichen Zeitindex in eine Gruppe umwandeln. Wir berechnen den ursprünglichen Zeitindex geteilt durch „max_encoder_length+2*max_prediction_length“, und verwenden das Ergebnis als neue Gruppe:

rts_fm['series']=rts_fm.index//(max_encoder_length+2*max_prediction_length)

Wir kapseln den Teil der Datenvorverarbeitung in eine Funktion ein. Wir brauchen ihm nur die Länge der Daten zu übergeben, die wir benötigen, und es kann die Vorverarbeitung der Daten abschließen:

def get_data(mt_data_len:int):
    if not mt.initialize("D:\\Project\\mt\\MT5\\terminal64.exe"):
        print('initialize() failed!') 
    else:
        print(mt.version())
        sb=mt.symbols_total()
        rts=None
        if sb > 0:
            rts=mt.copy_rates_from_pos("GOLD_micro",mt.TIMEFRAME_M15,0,mt_data_len) 
        mt.shutdown()
        # print(len(rts))
    rts_fm=pd.DataFrame(rts)
    rts_fm['time']=pd.to_datetime(rts_fm['time'], unit='s') 
    rts_fm['time_idx']= rts_fm.index%(max_encoder_length+2*max_prediction_length) 
    rts_fm['series']=rts_fm.index//(max_encoder_length+2*max_prediction_length)
return rts_fm


Umschreiben der Klasse pytorch_forecasting.TimeSeriesDataSet

Umschreiben der Funktion to_dataloader() in pytorch_forecasting. Damit können wir steuern, ob die Daten gemischt werden und ob die letzte Gruppe eines Stapels verworfen werden soll (hauptsächlich, um unvorhersehbare Fehler zu vermeiden, die durch eine unzureichende Länge der letzten Datengruppe verursacht werden). So können wir es machen:

class New_TmSrDt(TimeSeriesDataSet):
    def to_dataloader(self, train: bool = True, 
                      batch_size: int = 64, 
                      batch_sampler: Sampler | str = None, 
                      shuffle:bool=False,
                      drop_last:bool=False,
                      **kwargs) -> DataLoader:
        default_kwargs = dict(
            shuffle=shuffle,
            drop_last=drop_last, #modification
            collate_fn=self._collate_fn,
            batch_size=batch_size,
            batch_sampler=batch_sampler,
        )
        default_kwargs.update(kwargs)
        kwargs = default_kwargs
        if kwargs["batch_sampler"] is not None:
            sampler = kwargs["batch_sampler"]
            if isinstance(sampler, str):
                if sampler == "synchronized":
                    kwargs["batch_sampler"] = TimeSynchronizedBatchSampler(
                        SequentialSampler(self),
                        batch_size=kwargs["batch_size"],
                        shuffle=kwargs["shuffle"],
                        drop_last=kwargs["drop_last"],
                    )
                else:
                    raise ValueError(f"batch_sampler {sampler} unknown - see docstring for valid batch_sampler")
            del kwargs["batch_size"]
            del kwargs["shuffle"]
            del kwargs["drop_last"]

        return DataLoader(self,**kwargs)
Dieser Code erstellt eine neue Klasse New_TmSrDt, die von TimeSeriesDataSet erbt. Die Funktion to_dataloader() wird dann in dieser neuen Klasse überschrieben, um die Parameter shuffle und drop_last aufzunehmen. Auf diese Weise haben wir mehr Kontrolle über Ihren Datenladeprozess. Denken Sie daran, Instanzen von TimeSeriesDataSet durch New_TmSrDt in Ihrem Code zu ersetzen.


Erstellen von Trainings- und Validierungsdatensätzen

Zunächst müssen wir den Abschneidepunkt für die Trainingsdaten festlegen. Dazu wird die maximale Vorhersagelänge vom maximalen Wert „time_idx“ subtrahiert.

max_encoder_length = 2*96
max_prediction_length = 30
training_cutoff = rts_fm["time_idx"].max() - max_prediction_length

Dann verwenden wir die Klasse New_TmSrDt (die von uns neu geschriebene TimeSeriesDataSet-Klasse), um einen Trainingsdatensatz zu erstellen. Diese Klasse benötigt die folgenden Parameter:

  • DataFrame (in diesem Fall 'rts_fm'),
  • die Spalte "time_idx", die eine fortlaufende Ganzzahlsequenz ist,
  • die Zielspalte (in diesem Fall „close“), die den Wert darstellt, den wir vorhersagen wollen
  • die Gruppenspalte (in diesem Fall „series“), die verschiedene Zeitreihen darstellt
  • Die maximalen Längen des Encoders und des Prädiktors.
context_length = max_encoder_length
prediction_length = max_prediction_length

training = New_TmSrDt(
        data[lambda x: x.time_idx <= training_cutoff],
        time_idx="time_idx",
        target="close",
        categorical_encoders={"series":NaNLabelEncoder().fit(data.series)},
        group_ids=["series"],
        time_varying_unknown_reals=["close"],
        max_encoder_length=context_length,
        # min_encoder_length=max_encoder_length//2,
        max_prediction_length=prediction_length,
        # min_prediction_length=1,
    )
validation = New_TmSrDt.from_dataset(training, 
                                  data, 
                                  min_prediction_idx=training_cutoff + 1)

Als Nächstes verwenden wir die Funktion New_TmSrDt.from_dataset(), um einen Validierungsdatensatz zu erstellen. Diese Funktion benötigt die folgenden Parameter:

  • den Trainingsdatensatz,
  • DataFrame,
  • den minimalen Vorhersageindex, der um 1 größer sein sollte als der maximale Wert von „time_idx“ der Trainingsdaten.

validation = New_TmSrDt.from_dataset(training, rts_fm, min_prediction_idx=training_cutoff + 1)

Schließlich verwenden wir die Funktion to_dataloader(), um die Trainings- und Validierungsdatensätze in PyTorch DataLoader-Objekte zu konvertieren. Diese Funktion benötigt folgende Parameter:

  • den Parameter „train“, der angibt, ob die Daten gemischt werden sollen,
  • den Parameter „batch_size“, der die Anzahl der Proben pro Batch angibt,
  • den Parameter „num_workers“, der die Anzahl der Arbeitsprozesse für das Laden der Daten angibt.

train_dataloader = training.to_dataloader(train=True,
                                          shuffle=t_shuffle, 
                                          drop_last=t_drop_last,
                                          batch_size=batch_size, 
                                          num_workers=0,)
val_dataloader = validation.to_dataloader(train=False, 
                                          shuffle=v_shuffle,
                                          drop_last=v_drop_last,
                                          batch_size=batch_size, 
                                          num_workers=0)

Zuletzt kapseln wir diesen Teil des Codes in eine Funktion spilt_data(data:pd.DataFrame,t_drop_last:bool,t_shuffle:bool,v_drop_last:bool,v_shuffle:bool) , und geben folgenden Parameter an:

  • den Parameter „data“, der den zu bearbeitenden Datensatz enthält,
  • den Parameter „t_drop_last“, der angibt, ob die letzte Gruppe des Trainingsdatensatzes verworfen werden soll,
  • den Parameter „t_shuffle“, der angibt, ob die Trainingsdaten gemischt werden sollen,
  • den Parameter „v_drop_last“, der angibt, ob die letzte Gruppe des Validierungsdatensatzes fallen gelassen werden soll,
  • den Parameter „v_shuffle“, der angibt, ob die Validierungsdaten gemischt werden sollen.

Wir machen train_dataloader (eine Instanz von dataloader für den Trainingsdatensatz), val_dataloader (eine Instanz von dataloader für den Validierungsdatensatz) und training (eine Instanz von TimeSeriesDataSet für den Datensatz) als Rückgabewerte dieser Funktion, da sie später verwendet werden.

def spilt_data(data:pd.DataFrame,
               t_drop_last:bool,
               t_shuffle:bool,
               v_drop_last:bool,
               v_shuffle:bool):
    training_cutoff = data["time_idx"].max() - max_prediction_length #max:95
    context_length = max_encoder_length
    prediction_length = max_prediction_length
    training = New_TmSrDt(
        data[lambda x: x.time_idx <= training_cutoff],
        time_idx="time_idx",
        target="close",
        categorical_encoders={"series":NaNLabelEncoder().fit(data.series)},
        group_ids=["series"],
        time_varying_unknown_reals=["close"],
        max_encoder_length=context_length,
        # min_encoder_length=max_encoder_length//2,
        max_prediction_length=prediction_length,
        # min_prediction_length=1,
    )

    validation = New_TmSrDt.from_dataset(training, 
                                         data, 
                                         min_prediction_idx=training_cutoff + 1)
    
    train_dataloader = training.to_dataloader(train=True,
                                              shuffle=t_shuffle, 
                                              drop_last=t_drop_last,
                                              batch_size=batch_size, 
                                              num_workers=0,)
    val_dataloader = validation.to_dataloader(train=False, 
                                              shuffle=v_shuffle,
                                              drop_last=v_drop_last,
                                              batch_size=batch_size, 
                                              num_workers=0)
    return train_dataloader,val_dataloader,training


Modellerstellung und Training

Jetzt beginnen wir mit der Erstellung des NHiTS-Modells. In diesem Teil wird gezeigt, wie man seine Parameter einstellt und wie man es trainiert.

1. Finden der besten Lernrate

Bevor wir mit der Modellerstellung beginnen, verwenden wir das Tuner-Objekt von PyTorch Lightning, um die beste Lernrate zu finden.

Zunächst müssen wir ein Trainer-Objekt von PyTorch Lightning erstellen, wobei der Parameter „accelerator“ verwendet wird, um den Gerätetyp anzugeben, und „gradient_clip_val“, um eine Gradientenexplosion zu verhindern.

pl.seed_everything(42)
trainer = pl.Trainer(accelerator="cpu", gradient_clip_val=0.1)

Als Nächstes verwenden wir die Funktion NHiTS.from_dataset(), um ein NHiTS-Modellnetz zu erstellen. Diese Funktion benötigt folgende Parameter:

  • den Trainingsdatensatz,
  • die Lernrate,
  • die Abnahme der Gewichte,
  • die Verlustfunktion,
  • die Größe der verborgenen Schicht,
  • der Optimierer.
net = NHiTS.from_dataset(
    training,
    learning_rate=3e-2,
    weight_decay=1e-2,
    loss=MQF2DistributionLoss(prediction_length=max_prediction_length),
    backcast_loss_ratio=0.0,
    hidden_size=64,
    optimizer="AdamW",
)

Dann instanziieren wir die Klasse Tuner und rufen die Funktion lr_find() auf. Diese Funktion trainiert das Modell mit einer Reihe von Lernraten auf der Grundlage unseres Datensatzes und vergleicht den Verlust jeder Lernrate, um die beste Lernrate zu ermitteln.

res = Tuner(trainer).lr_find(
    net, train_dataloaders=train_dataloader, val_dataloaders=val_dataloader, min_lr=1e-5, max_lr=1e-1
)
lr_=res.suggestion()

In ähnlicher Weise kapseln wir diesen Teil des Codes, der die beste Lernrate ermittelt, in eine Funktion get_learning_rate() ein und machen die erhaltene beste Lernrate zu ihrem Rückgabewert:

def get_learning_rate():
    
    pl.seed_everything(42)
    trainer = pl.Trainer(accelerator="cpu", gradient_clip_val=0.1,logger=False)
    net = NHiTS.from_dataset(
        training,
        learning_rate=3e-2,
        weight_decay=1e-2,
        loss=MQF2DistributionLoss(prediction_length=max_prediction_length),
        backcast_loss_ratio=0.0,
        hidden_size=64,
        optimizer="AdamW",
    )
    res = Tuner(trainer).lr_find(
        net, train_dataloaders=t_loader, val_dataloaders=v_loader, min_lr=1e-5, max_lr=1e-1
    )
    lr_=res.suggestion()
    return lr_

Wenn man die Lernrate visualisieren möchten, kann man den folgenden Code hinzufügen:

print(f"suggested learning rate: {res.suggestion()}")
fig = res.plot(show=True, suggest=True)
fig.show()

Das Ergebnis in diesem Beispiel ist wie folgt:

lr

vorgeschlagene Lernrate: 0.003981071705534973.

2. Definieren des Callbacks von EarlyStopping

Dieser Callback wird hauptsächlich verwendet, um den Validierungsverlust zu überwachen und das Training abzubrechen, wenn sich der Verlust in mehreren aufeinanderfolgenden Epochen nicht verbessert hat. Dies kann eine Überanpassung des Modells verhindern.

early_stop_callback = EarlyStopping(monitor="val_loss", 
                                    min_delta=1e-4, 
                                    patience=10,  
                                    verbose=True, 
                                    mode="min")

Der Parameter, der hier zu beachten ist, ist „patience“, der hauptsächlich steuert, wann das Training abgebrochen wird, wenn sich der Verlust in mehreren aufeinanderfolgenden Epochen nicht verbessert hat. Wir stellen ihn auf 10.

3. Definieren des Callbacks von ModelCheckpoint

Dieser Callback wird hauptsächlich zur Steuerung der Modellarchivierung und des Namens des Archivs verwendet. Wir setzen hauptsächlich diese beiden Variablen.

ck_callback=ModelCheckpoint(monitor='val_loss',
                            mode="min",
                            save_top_k=1,  
                            filename='{epoch}-{val_loss:.2f}')

Mit „save_top_k“ wird die Speicherung der besten Modelle gesteuert. Wir setzen ihn auf 1 und speichern nur das beste Modell.

4. Definition des Trainingsmodells

Zunächst müssen wir eine Trainerklasse in lightning.pytorch instanziieren und die beiden Callbacks hinzufügen, die wir zuvor definiert haben.

trainer = pl.Trainer(
    max_epochs=ep,
    accelerator="cpu",
    enable_model_summary=True,
    gradient_clip_val=1.0,
    callbacks=[early_stop_callback,ck_callback],
    limit_train_batches=30,
    enable_checkpointing=True,
)

Die Parameter, auf die wir hier achten müssen, sind „max_epochs“ (maximale Anzahl von Trainingsepochen), „gradient_clip_val“ (zur Vermeidung einer Gradientenexplosion) und „callbacks“. Hier verwendet „max_epochs“ ep, eine globale Variable, die wir später definieren werden, und „callbacks“ ist unsere Kollektion von Rückrufen.

Als Nächstes müssen wir auch das NHiTS-Modell definieren und instanziieren:

net = NHiTS.from_dataset(
    training,
    learning_rate=lr,
    log_interval=10,
    log_val_interval=1,
    weight_decay=1e-2,
    backcast_loss_ratio=0.0,
    hidden_size=64,
    optimizer="AdamW",
    loss=MQF2DistributionLoss(prediction_length=max_prediction_length),
)

Hier müssen die Parameter in der Regel nicht geändert werden, sondern es genügt, die Standardwerte zu verwenden. Hier ändern wir nur „loss“ in die Verlustfunktion MQF2DistributionLoss.

5. Trainingsmodul 

Wir verwenden die Funktion fit() des Trainer-Objekts, um das Modell zu trainieren:

trainer.fit(
    net,
    train_dataloaders=train_dataloader,
    val_dataloaders=val_dataloader,
)

In ähnlicher Weise kapseln wir diesen Teil des Codes in eine Funktion train() :

def train():
    early_stop_callback = EarlyStopping(monitor="val_loss", 
                                        min_delta=1e-4, 
                                        patience=10,  # The number of times without improvement will stop
                                        verbose=True, 
                                        mode="min")
    ck_callback=ModelCheckpoint(monitor='val_loss',
                                mode="min",
                                save_top_k=1,  # Save the top few best ones
                                filename='{epoch}-{val_loss:.2f}')
    trainer = pl.Trainer(
        max_epochs=ep,
        accelerator="cpu",
        enable_model_summary=True,
        gradient_clip_val=1.0,
        callbacks=[early_stop_callback,ck_callback],
        limit_train_batches=30,
        enable_checkpointing=True,
    )
    net = NHiTS.from_dataset(
        training,
        learning_rate=lr,
        log_interval=10,
        log_val_interval=1,
        weight_decay=1e-2,
        backcast_loss_ratio=0.0,
        hidden_size=64,
        optimizer="AdamW",
        loss=MQF2DistributionLoss(prediction_length=max_prediction_length),
    )
    trainer.fit(
        net,
        train_dataloaders=t_loader,
        val_dataloaders=v_loader,
        # ckpt_path='best'
    )
return trainer

Diese Funktion gibt ein trainiertes Modell zurück, das Sie für Vorhersageaufgaben verwenden können.


Definieren der Ausführungslogik

1. Definieren von globalen Variablen:

ep=200
__train=False
mt_data_len=200000
max_encoder_length = 2*96
max_prediction_length = 30
batch_size = 128

__train wird verwendet, um zu kontrollieren, ob wir das Modell gerade trainieren oder testen.

Es ist erwähnenswert, dass ep verwendet wird, um die maximale Trainingsepoche zu kontrollieren. Da wir EarlyStopping eingestellt haben, kann dieser Wert etwas größer gewählt werden, da das Modell automatisch stoppt, wenn es nicht mehr konvergiert.

mt_data_len ist die Anzahl der jüngsten Zeitreihendaten, die vom Client bezogen wurden.

max_encoder_length und max_prediction_length sind die maximale Kodierungslänge bzw. die maximale Vorhersagelänge.

2. Training

Wir müssen auch die aktuellen optimalen Trainingsergebnisse in einer lokalen Datei speichern, wenn das Training abgeschlossen ist, also definieren wir eine json-Datei, um diese Informationen zu speichern:

info_file='results.json'

Um unseren Trainingsprozess übersichtlicher zu gestalten, müssen wir vermeiden, dass während des Trainings unnötige Warnhinweise ausgegeben werden, daher fügen wir den folgenden Code hinzu:

warnings.filterwarnings("ignore")

Als Nächstes folgt unsere Trainingslogik:

dt=get_data(mt_data_len=mt_data_len)
if __train:
    # print(dt)
    # dt=get_data(mt_data_len=mt_data_len)
    t_loader,v_loader,training=spilt_data(dt,
                                    t_shuffle=False,t_drop_last=True,
                                    v_shuffle=False,v_drop_last=True)
    lr=get_learning_rate()
    trainer__=train()
    m_c_back=trainer__.checkpoint_callback
    m_l_back=trainer__.early_stopping_callback
    best_m_p=m_c_back.best_model_path
    best_m_l=m_l_back.best_score.item()
    # print(best_m_p)
    if os.path.exists(info_file):
        with open(info_file,'r+') as f1:
            last=json.load(fp=f1)
            last_best_model=last['last_best_model']
            last_best_score=last['last_best_score']
            if last_best_score > best_m_l:
                last['last_best_model']=best_m_p
                last['last_best_score']=best_m_l
                json.dump(last,fp=f1)
    else:               
        with open(info_file,'w') as f2:
            json.dump(dict(last_best_model=best_m_p,last_best_score=best_m_l),fp=f2)

Wenn das Training abgeschlossen ist, finden wir den Speicherort unseres besten Modells und die beste Punktzahl in der Datei results.json im Stammverzeichnis.

Während des Trainingsprozesses sieht man einen Fortschrittsbalken, der den Fortschritt jeder Epoche anzeigt. 

Training:

training

Training abgeschlossen:

ts

3. Validierung des Modells

Nach dem Training wollen wir das Modell validieren und es visualisieren. Wir können den folgenden Code hinzufügen:

best_model = NHiTS.load_from_checkpoint(best_m_p)
predictions = best_model.predict(v_loader, trainer_kwargs=dict(accelerator="cpu",logger=False), return_y=True)
raw_predictions = best_model.predict(v_loader, mode="raw", return_x=True, trainer_kwargs=dict(accelerator="cpu",logger=False))
for idx in range(10):  # plot 10 examples
    best_model.plot_prediction(raw_predictions.x, raw_predictions.output, idx=idx, add_loss_to_title=True)
    # sample 500 paths
samples = best_model.loss.sample(raw_predictions.output["prediction"][[0]], n_samples=500)[0]

# plot prediction
fig = best_model.plot_prediction(raw_predictions.x, raw_predictions.output, idx=0, add_loss_to_title=True)
ax = fig.get_axes()[0]
# plot first two sampled paths
ax.plot(samples[:, 0], color="g", label="Sample 1")
ax.plot(samples[:, 1], color="r", label="Sample 2")
fig.legend()
plt.show()

Sie können auch TensorBoard verwenden, um die Visualisierung der Trainingssituation in Echtzeit während des Trainings zu sehen, wir machen hier keine Demonstration.

Ergebnis:

ref

4. Testen des trainierten Modells

Zunächst öffnen wir die json-Datei, um den optimalen Speicherort für das Modell zu finden:

with open(info_file) as f:
    best_m_p=json.load(fp=f)['last_best_model']
print('model path is:',best_m_p)

Dann laden wir das Modell:

best_model = NHiTS.load_from_checkpoint(best_m_p)

Danach erhalten wir vom Client die Daten in Echtzeit, um das Modell zu testen:

offset=1
dt=dt.iloc[-max_encoder_length-offset:-offset,:]
last_=dt.iloc[-1] #get the last group of data
# print(len(dt))
for i in range(1,max_prediction_length+1):
    dt.loc[dt.index[-1]+1]=last_
dt['series']=0
# dt['time_idx']=dt.apply(lambda x:x.index,args=1)
dt['time_idx']=dt.index-dt.index[0]
# dt=get_data(mt_data_len=max_encoder_length)
predictions=best_model.predict(dt,mode='raw',trainer_kwargs=dict(accelerator="cpu",logger=False),return_x=True)
best_model.plot_prediction(predictions.x,predictions.output,show_future_observed=False)
plt.show()

Das Ergebnis sieht wie folgt aus:

pref

5.  Bewertung des Modells

Natürlich können wir einige Metriken in der PyTorch Forecasting-Bibliothek verwenden, um die Leistung des Modells zu bewerten. Im Folgenden wird beschrieben, wie man den mittleren absoluten Fehler (MAE) und den symmetrischen mittleren absoluten Fehler in Prozent (SMAPE) auswerten und die Ergebnisse ausgibt:

from pytorch_forecasting.metrics import MAE, SMAPE
mae = MAE()(raw_predictions["prediction"], raw_predictions["target"])
print(f"Mean Absolute Error: {mae}")
smape = SMAPE()(raw_predictions["prediction"], raw_predictions["target"])
print(f"Symmetric Mean Absolute Percentage Error: {smape}")

In diesem Codeschnipsel importieren wir zunächst die MAE- und SMAPE-Metriken. Anschließend wird anhand dieser Metriken der Fehler zwischen den vorhergesagten Werten ( raw_predictions["prediction"] ) und den tatsächlichen Werten ( raw_predictions["target"] ) berechnet. Diese Metriken können uns helfen, die Leistung unseres Modells zu verstehen und Hinweise zur weiteren Verbesserung unseres Modells zu geben.



Schlussfolgerung

In diesem Artikel haben wir uns angeschaut, wie die in den beiden vorangegangenen Artikeln erwähnten Datenkennzeichnungen verwendet werden können, und demonstriert, wie ein Modell N-HiTs mit unseren Daten erstellt werden kann. Dann haben wir das Modell trainiert und überprüft. Und aus dem Ergebnisdiagramm können wir leicht erkennen, dass unsere Ergebnisse gut sind. Wir haben auch gezeigt, wie dieses Modell im MT5 verwendet werden kann, um Vorhersagen für 30 Kerzen zu treffen. Natürlich haben wir nicht erwähnt, wie man auf der Grundlage der Vorhersageergebnisse Orders platziert, denn für den realen Handel müssen die Leser eine Menge Tests entsprechend ihrer tatsächlichen Situation durchführen und die entsprechenden Handelsregeln festlegen.

Zum Schluss: Viel Spaß!


Anhang:

Der vollständige Code:

# Copyright 2021, MetaQuotes Ltd.
# https://www.mql5.com


# from typing import Union
import lightning.pytorch as pl
import os
from lightning.pytorch.callbacks import EarlyStopping,ModelCheckpoint
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
# import torch
from pytorch_forecasting import NHiTS, TimeSeriesDataSet
from pytorch_forecasting.data import NaNLabelEncoder,timeseries
from pytorch_forecasting.metrics import MQF2DistributionLoss
from pytorch_forecasting.data.samplers import TimeSynchronizedBatchSampler
from lightning.pytorch.tuner import Tuner
import MetaTrader5 as mt
import warnings
import json


from torch.utils.data import DataLoader
from torch.utils.data.sampler import Sampler,SequentialSampler

class New_TmSrDt(TimeSeriesDataSet):
    '''
    rewrite dataset class
    '''
    def to_dataloader(self, train: bool = True, 
                      batch_size: int = 64, 
                      batch_sampler: Sampler | str = None, 
                      shuffle:bool=False,
                      drop_last:bool=False,
                      **kwargs) -> DataLoader:

        default_kwargs = dict(
            shuffle=shuffle,
            # drop_last=train and len(self) > batch_size,
            drop_last=drop_last, #
            collate_fn=self._collate_fn,
            batch_size=batch_size,
            batch_sampler=batch_sampler,
        )
        default_kwargs.update(kwargs)
        kwargs = default_kwargs
        # print(kwargs['drop_last'])
        if kwargs["batch_sampler"] is not None:
            sampler = kwargs["batch_sampler"]
            if isinstance(sampler, str):
                if sampler == "synchronized":
                    kwargs["batch_sampler"] = TimeSynchronizedBatchSampler(
                        SequentialSampler(self),
                        batch_size=kwargs["batch_size"],
                        shuffle=kwargs["shuffle"],
                        drop_last=kwargs["drop_last"],
                    )
                else:
                    raise ValueError(f"batch_sampler {sampler} unknown - see docstring for valid batch_sampler")
            del kwargs["batch_size"]
            del kwargs["shuffle"]
            del kwargs["drop_last"]

        return DataLoader(self,**kwargs)

def get_data(mt_data_len:int):
    if not mt.initialize():
        print('initialize() failed!') 
    else:
        print(mt.version())
        sb=mt.symbols_total()
        rts=None
        if sb > 0:
            rts=mt.copy_rates_from_pos("GOLD_micro",mt.TIMEFRAME_M15,0,mt_data_len) 
        mt.shutdown()
        # print(len(rts))
    rts_fm=pd.DataFrame(rts)
    rts_fm['time']=pd.to_datetime(rts_fm['time'], unit='s') 

    rts_fm['time_idx']= rts_fm.index%(max_encoder_length+2*max_prediction_length) 
    rts_fm['series']=rts_fm.index//(max_encoder_length+2*max_prediction_length)
    return rts_fm


def spilt_data(data:pd.DataFrame,
               t_drop_last:bool,
               t_shuffle:bool,
               v_drop_last:bool,
               v_shuffle:bool):
    training_cutoff = data["time_idx"].max() - max_prediction_length #max:95
    context_length = max_encoder_length
    prediction_length = max_prediction_length
    training = New_TmSrDt(
        data[lambda x: x.time_idx <= training_cutoff],
        time_idx="time_idx",
        target="close",
        categorical_encoders={"series":NaNLabelEncoder().fit(data.series)},
        group_ids=["series"],
        time_varying_unknown_reals=["close"],
        max_encoder_length=context_length,
        # min_encoder_length=max_encoder_length//2,
        max_prediction_length=prediction_length,
        # min_prediction_length=1,
    )

    validation = New_TmSrDt.from_dataset(training, 
                                         data, 
                                         min_prediction_idx=training_cutoff + 1)
    
    train_dataloader = training.to_dataloader(train=True,
                                              shuffle=t_shuffle, 
                                              drop_last=t_drop_last,
                                              batch_size=batch_size, 
                                              num_workers=0,)
    val_dataloader = validation.to_dataloader(train=False, 
                                              shuffle=v_shuffle,
                                              drop_last=v_drop_last,
                                              batch_size=batch_size, 
                                              num_workers=0)
    return train_dataloader,val_dataloader,training

def get_learning_rate():
    
    pl.seed_everything(42)
    trainer = pl.Trainer(accelerator="cpu", gradient_clip_val=0.1,logger=False)
    net = NHiTS.from_dataset(
        training,
        learning_rate=3e-2,
        weight_decay=1e-2,
        loss=MQF2DistributionLoss(prediction_length=max_prediction_length),
        backcast_loss_ratio=0.0,
        hidden_size=64,
        optimizer="AdamW",
    )
    res = Tuner(trainer).lr_find(
        net, train_dataloaders=t_loader, val_dataloaders=v_loader, min_lr=1e-5, max_lr=1e-1
    )
    # print(f"suggested learning rate: {res.suggestion()}")
    lr_=res.suggestion()
    return lr_
def train():
    early_stop_callback = EarlyStopping(monitor="val_loss", 
                                        min_delta=1e-4, 
                                        patience=10,  
                                        verbose=True, 
                                        mode="min")
    ck_callback=ModelCheckpoint(monitor='val_loss',
                                mode="min",
                                save_top_k=1,  
                                filename='{epoch}-{val_loss:.2f}')
    trainer = pl.Trainer(
        max_epochs=ep,
        accelerator="cpu",
        enable_model_summary=True,
        gradient_clip_val=1.0,
        callbacks=[early_stop_callback,ck_callback],
        limit_train_batches=30,
        enable_checkpointing=True,
    )
    net = NHiTS.from_dataset(
        training,
        learning_rate=lr,
        log_interval=10,
        log_val_interval=1,
        weight_decay=1e-2,
        backcast_loss_ratio=0.0,
        hidden_size=64,
        optimizer="AdamW",
        loss=MQF2DistributionLoss(prediction_length=max_prediction_length),
    )
    trainer.fit(
        net,
        train_dataloaders=t_loader,
        val_dataloaders=v_loader,
        # ckpt_path='best'
    )
    return trainer

if __name__=='__main__':
    ep=200
    __train=False
    mt_data_len=200000
    max_encoder_length = 2*96
    max_prediction_length = 30
    batch_size = 128
    info_file='results.json'
    warnings.filterwarnings("ignore")
    dt=get_data(mt_data_len=mt_data_len)
    if __train:
        # print(dt)
        # dt=get_data(mt_data_len=mt_data_len)
        t_loader,v_loader,training=spilt_data(dt,
                                              t_shuffle=False,t_drop_last=True,
                                              v_shuffle=False,v_drop_last=True)
        lr=get_learning_rate()
        trainer__=train()
        m_c_back=trainer__.checkpoint_callback
        m_l_back=trainer__.early_stopping_callback
        best_m_p=m_c_back.best_model_path
        best_m_l=m_l_back.best_score.item()

        # print(best_m_p)
        
        if os.path.exists(info_file):
            with open(info_file,'r+') as f1:
                last=json.load(fp=f1)
                last_best_model=last['last_best_model']
                last_best_score=last['last_best_score']
                if last_best_score > best_m_l:
                    last['last_best_model']=best_m_p
                    last['last_best_score']=best_m_l
                    json.dump(last,fp=f1)
        else:               
            with open(info_file,'w') as f2:
                json.dump(dict(last_best_model=best_m_p,last_best_score=best_m_l),fp=f2)

        best_model = NHiTS.load_from_checkpoint(best_m_p)
        predictions = best_model.predict(v_loader, trainer_kwargs=dict(accelerator="cpu",logger=False), return_y=True)
        raw_predictions = best_model.predict(v_loader, mode="raw", return_x=True, trainer_kwargs=dict(accelerator="cpu",logger=False))
    
        for idx in range(10):  # plot 10 examples
            best_model.plot_prediction(raw_predictions.x, raw_predictions.output, idx=idx, add_loss_to_title=True)
        samples = best_model.loss.sample(raw_predictions.output["prediction"][[0]], n_samples=500)[0]

        # plot prediction
        fig = best_model.plot_prediction(raw_predictions.x, raw_predictions.output, idx=0, add_loss_to_title=True)
        ax = fig.get_axes()[0]
        # plot first two sampled paths
        ax.plot(samples[:, 0], color="g", label="Sample 1")
        ax.plot(samples[:, 1], color="r", label="Sample 2")
        fig.legend()
        plt.show()
    else:
        with open(info_file) as f:
            best_m_p=json.load(fp=f)['last_best_model']
        print('model path is:',best_m_p)
        
        best_model = NHiTS.load_from_checkpoint(best_m_p)

        offset=1
        dt=dt.iloc[-max_encoder_length-offset:-offset,:]
        last_=dt.iloc[-1] 
        # print(len(dt))
        for i in range(1,max_prediction_length+1):
            dt.loc[dt.index[-1]+1]=last_
        dt['series']=0
        # dt['time_idx']=dt.apply(lambda x:x.index,args=1)
        dt['time_idx']=dt.index-dt.index[0]
        # dt=get_data(mt_data_len=max_encoder_length)
        predictions = best_model.predict(dt, mode='raw',trainer_kwargs=dict(accelerator="cpu",logger=False),return_x=True)
        best_model.plot_prediction(predictions.x,predictions.output,show_future_observed=False)
        plt.show()


Übersetzt aus dem Englischen von MetaQuotes Ltd.
Originalartikel: https://www.mql5.com/en/articles/13255

Beigefügte Dateien |
n_hits.py (9.7 KB)
ALGLIB Bibliothek für numerische Analysen in MQL5 ALGLIB Bibliothek für numerische Analysen in MQL5
Der Artikel wirft einen kurzen Blick auf die numerische Analysebibliothek ALGLIB 3.19, ihre Anwendungen und neue Algorithmen, die die Effizienz der Finanzdatenanalyse verbessern können.
Fertige Vorlagen für die Verwendung von Indikatoren in Expert Advisors (Teil 1): Oszillatoren Fertige Vorlagen für die Verwendung von Indikatoren in Expert Advisors (Teil 1): Oszillatoren
Der Artikel berücksichtigt Standardindikatoren aus der Kategorie der Oszillatoren. Wir werden gebrauchsfertige Vorlagen für ihre Verwendung in EAs erstellen — Deklaration und Einstellung von Parametern, Initialisierung und Deinitialisierung von Indikatoren sowie das Abrufen von Daten und Signalen aus den Indikatorpuffern in den EAs.
ONNX meistern: Der Game-Changer für MQL5-Händler ONNX meistern: Der Game-Changer für MQL5-Händler
Tauchen Sie ein in die Welt von ONNX, dem leistungsstarken offenen Standardformat für den Austausch von Modellen für maschinelles Lernen. Entdecken Sie, wie der Einsatz von ONNX den algorithmischen Handel in MQL5 revolutionieren kann, indem er es Händlern ermöglicht, hochmoderne KI-Modelle nahtlos zu integrieren und ihre Strategien auf ein neues Niveau zu heben. Entdecken Sie die Geheimnisse der plattformübergreifenden Kompatibilität und lernen Sie, wie Sie das volle Potenzial von ONNX in Ihren MQL5-Handelsbestrebungen ausschöpfen können. Verbessern Sie Ihr Trading-Spiel mit diesem umfassenden Leitfaden zur Beherrschung von ONNX:
Wie man einen einfachen EA für mehrere Währungen mit MQL5 erstellt (Teil 2): Indikator-Signale: Multi-Zeitrahmen Parabolic SAR Indikator Wie man einen einfachen EA für mehrere Währungen mit MQL5 erstellt (Teil 2): Indikator-Signale: Multi-Zeitrahmen Parabolic SAR Indikator
Der Expert Advisor für mehrere Währungen in diesem Artikel ist ein Expert Advisor oder Handelsroboter, der handeln kann (z.B. Aufträge öffnen, schließen und verwalten, Trailing Stop Loss und Trailing Profit) für mehr als 1 Symbolpaar von nur einem Symbolchart aus. Dieses Mal werden wir nur 1 Indikator verwenden, nämlich den Parabolic SAR oder iSAR in mehreren Zeitrahmen von PERIOD_M15 bis PERIOD_D1.