English 中文 Español Deutsch 日本語 Português
preview
Разметка данных в анализе временных рядов (Часть 3):Пример использования разметки данных

Разметка данных в анализе временных рядов (Часть 3):Пример использования разметки данных

MetaTrader 5Эксперты | 15 февраля 2024, 13:06
1 111 0
Yuqiang Pan
Yuqiang Pan

Введение

Статья рассказывает, как использовать PyTorch Lightning и фреймворк PyTorch Forecasting через торговую платформу MetaTrader 5 для реализации прогнозирования финансовых временных рядов на основе нейронных сетей.

В статье мы также объясним причины, по которым мы выбрали эти две платформы и используемый нами формат данных.

Что касается данных, вы можете использовать данные, полученные путем разметки данных из двух моих предыдущих статей. Поскольку они имеют один и тот же формат, вы можете легко расширить его, следуя методологии, описанной в этой статье.

Ссылки на две предыдущие статьи: 

  1. Разметка данных в анализе временных рядов (Часть 1):Создаем набор данных с маркерами тренда с помощью графика советника
  2. Разметка данных в анализе временных рядов (Часть 2):Создаем наборы данных с маркерами тренда с помощью Python

Содержание:


Несколько важных библиотек Python

Сначала давайте представим основные библиотеки Python, которые мы будем использовать.

1. PyTorch Lightning

PyTorch Lightning — это среда глубокого обучения, специально разработанная для профессиональных исследователей искусственного интеллекта и инженеров по машинному обучению, которым требуется максимальная гибкость без ущерба для масштабируемости.

Основная идея заключается в том, чтобы отделить академический код (например, определения моделей, прямое/обратное распространение, оптимизаторы, валидацию и т. д.) от инженерного кода (например, циклы for, механизмы сохранения, журналы TensorBoard, стратегии обучения и т. д.), в результате чего можно получить более упорядоченный и понятный код.

К основным преимуществам относятся:

  • Высокая возможность повторного использования. Конструкция позволяет повторно использовать код в различных проектах.
  • Простота обслуживания. Благодаря структурированному дизайну поддерживать код становится проще.
  • Четкая логика. Благодаря абстрагированию шаблонного инженерного кода код машинного обучения становится легче идентифицировать и понимать.

В целом, PyTorch Lightning — чрезвычайно мощная библиотека, предлагающая эффективный метод организации и управления вашим кодом PyTorch. Кроме того, он обеспечивает структурированный подход к решению распространенных, но сложных задач, таких как обучение, проверка и тестирование модели.

Подробное использование этой библиотеки можно найти в официальной документации: https://lightning.ai/docs.

2. PyTorch Forecasting

Библиотека Python, специально разработанная для прогнозирования временных рядов. Поскольку она создана на основе PyTorch, вы можете использовать мощные библиотеки автоматической дифференциации и оптимизации PyTorch, а также воспользоваться удобством, которое PyTorch Forecasting предлагает для прогнозирования временных рядов.

В PyTorch Forecasting вы можете найти реализации различных моделей прогнозирования, включая, помимо прочего, модели авторегрессии (AR, ARIMA), модели пространства состояний (SARIMAX), нейронные сети (LSTM, GRU) и ансамблевые методы (Prophet, N-Beats). Это означает, что вы можете экспериментировать и сравнивать различные прогнозные подходы в одной и той же среде без необходимости писать обширный стандартный код для каждого подхода.

Библиотека также предлагает ряд инструментов предварительной обработки данных, которые могут помочь в решении типичных задач во временных рядах. Эти инструменты включают, среди прочего, подстановку недостающих значений, масштабирование, извлечение признаков и преобразования скользящего окна. Это означает, что вы можете больше сосредоточиться на разработке и оптимизации вашей модели, не тратя много времени на обработку данных.

Библиотека также располагает унифицированным интерфейсом для оценки производительности модели. Она реализует функции потерь и метрики проверки для временных рядов, такие как QuantileLoss и SMAPE, а также поддерживает такие методологии обучения, как ранняя остановка и перекрестная проверка. Это позволяет вам более удобно отслеживать и повышать производительность вашей модели.

Если вы ищете метод повышения эффективности и удобства обслуживания вашего проекта прогнозирования временных рядов, то PyTorch Forecasting может стать отличным выбором. Библиотека предлагает эффективные и гибкие средства для организации вашего кода PyTorch и управления им, позволяя вам сосредоточиться на самом важном аспекте — самой модели машинного обучения.

Подробное описание использования библиотеки можно найти в официальной документации: https://pytorch-forecasting.readthedocs.io/en/stable.

3. Модель N-HiTS

Модель N-HiTS решает проблемы волатильности прогнозов и вычислительной сложности в долгосрочном прогнозировании за счет внедрения инновационных методов иерархической интерполяции и многоскоростной выборки данных. Это позволяет модели N-HiTS эффективно аппроксимировать диапазон прогнозирования любой длины.

Кроме того, обширные эксперименты, проведенные на крупномасштабных наборах данных, показали, что модель N-HiTS повышает точность в среднем почти на 20% по сравнению с последней архитектурой Transformer, а также сокращает время вычислений на порядок (в 50 раз).

Link to the paper: https://doi.org/10.48550/arXiv.2201.12886.


Инициализация

Сначала нам нужно импортировать необходимые библиотеки. К таким библиотекам относятся MetaTrader 5 (для взаимодействия с одноименным терминалом), PyTorch Lightning (для обучения модели) и некоторые другие библиотеки для обработки и визуализации данных.

import MetaTrader5 as mt5
import lightning.pytorch as pl
from lightning.pytorch.callbacks import EarlyStopping
import matplotlib.pyplot as plt
import pandas as pd
from pytorch_forecasting import Baseline, NHiTS, TimeSeriesDataSet
from pytorch_forecasting.data import NaNLabelEncoder
from pytorch_forecasting.metrics import MAE, SMAPE, MQF2DistributionLoss, QuantileLoss
from lightning.pytorch.tuner import Tuner

Далее нам нужно инициализировать MetaTrader 5. Это делается путем вызова функции mt.initialize(). Если вы не можете его инициализировать, просто используя его, вам необходимо передать путь к терминалу MetaTrader 5 в качестве параметра этой функции (в примере "D:\Project\mt\MT5\terminal64.exe" — это мой путь, в реальном приложении вам необходимо установить ваш собственный путь). Если инициализация прошла успешно, функция вернет True, в противном случае — False.

if not mt.initialize("D:\\Project\\mt\\MT5\\terminal64.exe"):
    print('initialize() failed!')
else:
    print(mt.version())

Функция mt.symbols_total() используется для получения общего количества торгуемых разновидностей, доступных в терминале MetaTrader 5. Мы можем использовать его, чтобы определить, сможем ли мы правильно получить данные. Если общее число больше 0, мы можем использовать функцию mt.copy_rates_from_pos() для получения исторических данных указанной торгуемой разновидности. В этом примере мы получили самые последние данные о длине периода "mt_data_len" M15 (15 минут) разновидности GOLD_micro.

sb=mt.symbols_total()
rts=None
if sb > 0:
    rts=mt.copy_rates_from_pos("GOLD_micro",mt.TIMEFRAME_M15,0,mt_data_len) 
    mt.shutdown()

Наконец, мы используем функцию mt.shutdown(), чтобы закрыть соединение с терминалом MetaTrader 5 и преобразовать полученные данные в формат Pandas DataFrame.

mt.shutdown()
rts_fm=pd.DataFrame(rts)

Теперь рассмотрим, как предварительно обработать данные, полученные с терминала.

Сначала нам нужно преобразовать временные метки в даты:

rts_fm['time']=pd.to_datetime(rts_fm['time'], unit='s') 

Здесь мы больше не описываем, как размечать данные. Вы можете найти методы в двух моих предыдущих статьях (ссылки на них есть во введении к этой статье). Для краткой демонстрации того, как использовать модели прогнозирования, мы просто разделим все фрагменты данных max_encoder_length+2max_prediction_length на группы. В каждой группе есть последовательность от 0 до "max_encoder_length+2max_prediction_length-1". Заполним ее. Таким образом мы добавляем к исходным данным необходимые метки. Сначала нам нужно преобразовать исходный индекс времени (то есть индекс DataFrame). Вычислим остаток исходного индекса времени, разделенный на (max_encoder_length+2max_prediction_length), и используем результат в качестве нового индекса времени. Сопоставим индекс времени с диапазоном от 0 до "max_encoder_length+2*max_prediction_length-1":

rts_fm['time_idx']= rts_fm.index%(max_encoder_length+2*max_prediction_length) 

Нам также необходимо преобразовать исходный индекс времени в группу. Вычислим исходный индекс времени, разделенный на "max_encoder_length+2*max_prediction_length", и используем результат как новую группу:

rts_fm['series']=rts_fm.index//(max_encoder_length+2*max_prediction_length)

Инкапсулируем часть предварительной обработки данных в функцию. Нам нужно передать ей только ту длину данных, которую нам нужно получить, и она сможет завершить работу по предварительной обработке данных:

def get_data(mt_data_len:int):
    if not mt.initialize("D:\\Project\\mt\\MT5\\terminal64.exe"):
        print('initialize() failed!') 
    else:
        print(mt.version())
        sb=mt.symbols_total()
        rts=None
        if sb > 0:
            rts=mt.copy_rates_from_pos("GOLD_micro",mt.TIMEFRAME_M15,0,mt_data_len) 
        mt.shutdown()
        # print(len(rts))
    rts_fm=pd.DataFrame(rts)
    rts_fm['time']=pd.to_datetime(rts_fm['time'], unit='s') 
    rts_fm['time_idx']= rts_fm.index%(max_encoder_length+2*max_prediction_length) 
    rts_fm['series']=rts_fm.index//(max_encoder_length+2*max_prediction_length)
return rts_fm


Переписывание класса pytorch_forecasting.TimeSeriesDataSet

Переписывание функции to_dataloader() в pytorch_forecasting. Это позволяет контролировать, будут ли данные перемешиваться и удалять ли последнюю группу пакета (главным образом, чтобы предотвратить непредсказуемые ошибки, вызванные недостаточной длиной последней группы данных). Вот как можно это сделать:

class New_TmSrDt(TimeSeriesDataSet):
    def to_dataloader(self, train: bool = True, 
                      batch_size: int = 64, 
                      batch_sampler: Sampler | str = None, 
                      shuffle:bool=False,
                      drop_last:bool=False,
                      **kwargs) -> DataLoader:
        default_kwargs = dict(
            shuffle=shuffle,
            drop_last=drop_last, #modification
            collate_fn=self._collate_fn,
            batch_size=batch_size,
            batch_sampler=batch_sampler,
        )
        default_kwargs.update(kwargs)
        kwargs = default_kwargs
        if kwargs["batch_sampler"] is not None:
            sampler = kwargs["batch_sampler"]
            if isinstance(sampler, str):
                if sampler == "synchronized":
                    kwargs["batch_sampler"] = TimeSynchronizedBatchSampler(
                        SequentialSampler(self),
                        batch_size=kwargs["batch_size"],
                        shuffle=kwargs["shuffle"],
                        drop_last=kwargs["drop_last"],
                    )
                else:
                    raise ValueError(f"batch_sampler {sampler} unknown - see docstring for valid batch_sampler")
            del kwargs["batch_size"]
            del kwargs["shuffle"]
            del kwargs["drop_last"]

        return DataLoader(self,**kwargs)
Этот код создает новый класс New_TmSrDt, который наследуется от TimeSeriesDataSet. Затем в этом новом классе переопределяется функция to_dataloader(), включающая параметры shuffle и drop_last. Таким образом, вы сможете лучше контролировать процесс загрузки данных. Не забудьте заменить в коде экземпляры TimeSeriesDataSet на New_TmSrDt.


Создание наборов данных для обучения и проверки

Во-первых, нам нужно определить точку отсечения для обучающих данных. Это делается путем вычитания максимальной длины прогноза из максимального значения time_idx.

max_encoder_length = 2*96
max_prediction_length = 30
training_cutoff = rts_fm["time_idx"].max() - max_prediction_length

Затем мы используем класс New_TmSrDt (который представляет собой переписанный нами класс TimeSeriesDataSet) для создания набора обучающих данных. Этот класс требует следующих параметров:

  • DataFrame (в данном случае - rts_fm)
  • Столбец time_idx, который представляет собой непрерывную целочисленную последовательность.
  • Целевой столбец (в данном случае close), который представляет собой значение, которое мы хотим спрогнозировать.
  • Столбец группы (в данном случае series), который представляет разные временные ряды.
  • Максимальные длины энкодера и предиктора
context_length = max_encoder_length
prediction_length = max_prediction_length

training = New_TmSrDt(
        data[lambda x: x.time_idx <= training_cutoff],
        time_idx="time_idx",
        target="close",
        categorical_encoders={"series":NaNLabelEncoder().fit(data.series)},
        group_ids=["series"],
        time_varying_unknown_reals=["close"],
        max_encoder_length=context_length,
        # min_encoder_length=max_encoder_length//2,
        max_prediction_length=prediction_length,
        # min_prediction_length=1,
    )
validation = New_TmSrDt.from_dataset(training, 
                                  data, 
                                  min_prediction_idx=training_cutoff + 1)

Далее мы используем функцию New_TmSrDt.from_dataset() для создания набора данных проверки. Для этой функции требуются следующие параметры:

  • Набор обучающих данных
  • DataFrame
  • Минимальный индекс прогнозирования, который должен быть на 1 больше максимального значения time_idx обучающих данных.

validation = New_TmSrDt.from_dataset(training, rts_fm, min_prediction_idx=training_cutoff + 1)

Наконец мы используем функцию  to_dataloader() , чтобы перевести обучающий и проверочный наборы данных в объекты DataLoader PyTorch. Для этой функции требуются следующие параметры:

  • Параметр train, указывающий, следует ли перемешивать данные.
  • Параметр batch_size, который указывает количество образцов в пакете.
  • Параметр num_workers, указывающий количество рабочих процессов для загрузки данных.

train_dataloader = training.to_dataloader(train=True,
                                          shuffle=t_shuffle, 
                                          drop_last=t_drop_last,
                                          batch_size=batch_size, 
                                          num_workers=0,)
val_dataloader = validation.to_dataloader(train=False, 
                                          shuffle=v_shuffle,
                                          drop_last=v_drop_last,
                                          batch_size=batch_size, 
                                          num_workers=0)

Наконец, мы инкапсулируем эту часть кода в функцию spilt_data(data:pd.DataFrame,t_drop_last:bool,t_shuffle:bool,v_drop_last:bool,v_shuffle:bool) и указываем следующие параметры:

  • Параметр data используется для получения набора данных, который необходимо обработать.
  • Параметр t_drop_last указывает, следует ли удалить последнюю группу набора обучающих данных.
  • Параметр t_shuffle указывает, следует ли перемешивать обучающие данные.
  • Параметр v_drop_last указывает, следует ли удалить последнюю группу набора проверочных данных.
  • Параметр v_shuffle указывает, следует ли перемешать данные проверки.

train_dataloader (экземпляр dataloader для набора обучающих данных), val_dataloader (экземпляр dataloader для набора проверочных данных) и training (экземпляр TimeSeriesDataSet для набора данных) используются в качестве возвращаемых значений этой функции, поскольку они будут использоваться позже.

def spilt_data(data:pd.DataFrame,
               t_drop_last:bool,
               t_shuffle:bool,
               v_drop_last:bool,
               v_shuffle:bool):
    training_cutoff = data["time_idx"].max() - max_prediction_length #max:95
    context_length = max_encoder_length
    prediction_length = max_prediction_length
    training = New_TmSrDt(
        data[lambda x: x.time_idx <= training_cutoff],
        time_idx="time_idx",
        target="close",
        categorical_encoders={"series":NaNLabelEncoder().fit(data.series)},
        group_ids=["series"],
        time_varying_unknown_reals=["close"],
        max_encoder_length=context_length,
        # min_encoder_length=max_encoder_length//2,
        max_prediction_length=prediction_length,
        # min_prediction_length=1,
    )

    validation = New_TmSrDt.from_dataset(training, 
                                         data, 
                                         min_prediction_idx=training_cutoff + 1)
    
    train_dataloader = training.to_dataloader(train=True,
                                              shuffle=t_shuffle, 
                                              drop_last=t_drop_last,
                                              batch_size=batch_size, 
                                              num_workers=0,)
    val_dataloader = validation.to_dataloader(train=False, 
                                              shuffle=v_shuffle,
                                              drop_last=v_drop_last,
                                              batch_size=batch_size, 
                                              num_workers=0)
    return train_dataloader,val_dataloader,training


Создание и обучение модели

Приступим к созданию модели NHiTS. В этой части будет показано, как установить ее параметры и как ее обучать.

1. Найдите лучшую скорость обучения

Прежде чем начать создание модели, мы используем объект Tuner PyTorch Lightning, чтобы найти наилучшую скорость обучения.

Во-первых, нам нужно создать объект Trainer, в котором параметр accelerator используется для указания типа устройства, а gradient_clip_val используется для предотвращения взрыва градиента.

pl.seed_everything(42)
trainer = pl.Trainer(accelerator="cpu", gradient_clip_val=0.1)

Далее мы используем функцию NHiTS.from_dataset() для создания сети модели NHiTS. Для этой функции требуются следующие параметры:

  • Набор обучающих данных
  • Скорость обучения
  • Снижение веса
  • Функция потерь
  • Размер скрытого слоя
  • Оптимизатор
net = NHiTS.from_dataset(
    training,
    learning_rate=3e-2,
    weight_decay=1e-2,
    loss=MQF2DistributionLoss(prediction_length=max_prediction_length),
    backcast_loss_ratio=0.0,
    hidden_size=64,
    optimizer="AdamW",
)

Затем мы создаем экземпляр класса Tuner и вызываем функцию lr_find(). Она будет обучать модель на основе ряда скоростей обучения на основе нашего набора данных и сравнивать потери каждой скорости обучения, чтобы получить наилучшую скорость обучения.

res = Tuner(trainer).lr_find(
    net, train_dataloaders=train_dataloader, val_dataloaders=val_dataloader, min_lr=1e-5, max_lr=1e-1
)
lr_=res.suggestion()

Аналогично, мы инкапсулируем эту часть кода, которая получает лучшую скорость обучения, в функцию get_learning_rate() и делаем полученную лучшую скорость обучения ее возвращаемым значением:

def get_learning_rate():
    
    pl.seed_everything(42)
    trainer = pl.Trainer(accelerator="cpu", gradient_clip_val=0.1,logger=False)
    net = NHiTS.from_dataset(
        training,
        learning_rate=3e-2,
        weight_decay=1e-2,
        loss=MQF2DistributionLoss(prediction_length=max_prediction_length),
        backcast_loss_ratio=0.0,
        hidden_size=64,
        optimizer="AdamW",
    )
    res = Tuner(trainer).lr_find(
        net, train_dataloaders=t_loader, val_dataloaders=v_loader, min_lr=1e-5, max_lr=1e-1
    )
    lr_=res.suggestion()
    return lr_

Если вы хотите визуализировать скорость обучения, вы можете добавить следующий код:

print(f"suggested learning rate: {res.suggestion()}")
fig = res.plot(show=True, suggest=True)
fig.show()

Результат в этом примере следующий:

lr

рекомендуемая скорость обучения: 0.003981071705534973.

2. Определение EarlyStopping Callback

Этот обратный вызов в основном используется для мониторинга потерь при проверке и остановки обучения, когда потери не улучшаются в течение нескольких последовательных эпох. Это может предотвратить переобучение модели.

early_stop_callback = EarlyStopping(monitor="val_loss", 
                                    min_delta=1e-4, 
                                    patience=10,  
                                    verbose=True, 
                                    mode="min")

Здесь следует отметить параметр patience, который в основном определяет, когда следует остановиться во время тренировки, если потери не уменьшаются в течение нескольких последовательных эпох. Мы установили его на 10.

3. Определение ModelCheckpoint Callback

Этот обратный вызов в основном используется для управления архивированием модели и именем архива. В основном мы устанавливаем следующие две переменные.

ck_callback=ModelCheckpoint(monitor='val_loss',
                            mode="min",
                            save_top_k=1,  
                            filename='{epoch}-{val_loss:.2f}')

save_top_k используется для управления сохранением нескольких лучших моделей. Мы устанавливаем значение 1 и сохраняем только лучшую модель.

4. Определение Training Model

Сначала нам нужно создать экземпляр класса Trainer в Lightning.pytorch и добавить два обратных вызова, которые мы определили ранее.

trainer = pl.Trainer(
    max_epochs=ep,
    accelerator="cpu",
    enable_model_summary=True,
    gradient_clip_val=1.0,
    callbacks=[early_stop_callback,ck_callback],
    limit_train_batches=30,
    enable_checkpointing=True,
)

Здесь нам нужно обратить внимание на параметры max_epochs (максимальное количество эпох обучения), gradient_clip_val (используется для предотвращения взрыва градиента) и callbacks. Здесь max_epochs использует глобальную переменную ep, которую мы определим позже, а callbacks — это наша коллекция обратных вызовов.

Далее нам также необходимо определить модель NHiTS и создать ее экземпляр:

net = NHiTS.from_dataset(
    training,
    learning_rate=lr,
    log_interval=10,
    log_val_interval=1,
    weight_decay=1e-2,
    backcast_loss_ratio=0.0,
    hidden_size=64,
    optimizer="AdamW",
    loss=MQF2DistributionLoss(prediction_length=max_prediction_length),
)

Здесь параметры вообще не нужно изменять, просто используйте значения по умолчанию. Здесь мы только устанавливаем loss на функцию потерь MQF2DistributionLoss.

5. Модуль обучения 

Мы используем функцию fit() объекта Trainer для обучения модели:

trainer.fit(
    net,
    train_dataloaders=train_dataloader,
    val_dataloaders=val_dataloader,
)

Аналогично мы инкапсулируем эту часть кода в функцию train():

def train():
    early_stop_callback = EarlyStopping(monitor="val_loss", 
                                        min_delta=1e-4, 
                                        patience=10,  # The number of times without improvement will stop
                                        verbose=True, 
                                        mode="min")
    ck_callback=ModelCheckpoint(monitor='val_loss',
                                mode="min",
                                save_top_k=1,  # Save the top few best ones
                                filename='{epoch}-{val_loss:.2f}')
    trainer = pl.Trainer(
        max_epochs=ep,
        accelerator="cpu",
        enable_model_summary=True,
        gradient_clip_val=1.0,
        callbacks=[early_stop_callback,ck_callback],
        limit_train_batches=30,
        enable_checkpointing=True,
    )
    net = NHiTS.from_dataset(
        training,
        learning_rate=lr,
        log_interval=10,
        log_val_interval=1,
        weight_decay=1e-2,
        backcast_loss_ratio=0.0,
        hidden_size=64,
        optimizer="AdamW",
        loss=MQF2DistributionLoss(prediction_length=max_prediction_length),
    )
    trainer.fit(
        net,
        train_dataloaders=t_loader,
        val_dataloaders=v_loader,
        # ckpt_path='best'
    )
return trainer

Эта функция вернет обученную модель, которую вы можете использовать для задач прогнозирования.


Определение логики выполнения

1. Определение глобальных переменных:

ep=200
__train=False
mt_data_len=200000
max_encoder_length = 2*96
max_prediction_length = 30
batch_size = 128

__train используется для контроля того, обучаем ли мы в данный момент модель или тестируем ее.

Стоит отметить, что ep используется для управления максимальной эпохой обучения. Поскольку мы установили EarlyStopping, это значение можно увеличить, потому что модель автоматически остановится, когда схождение прекратится.

mt_data_len — количество последних данных временных рядов, полученных от клиента.

max_encoder_length и max_prediction_length — это соответственно максимальная длина кодирования и максимальная длина прогнозирования.

2. Обучение

Нам также необходимо сохранить текущие оптимальные результаты обучения в локальном файле после завершения обучения, поэтому мы определяем файл json для сохранения этой информации:

info_file='results.json'

Чтобы сделать наш процесс обучения более понятным, нам нужно избегать вывода ненужной предупреждающей информации во время обучения, поэтому мы добавим следующий код:

warnings.filterwarnings("ignore")

Далее идет наша логика обучения:

dt=get_data(mt_data_len=mt_data_len)
if __train:
    # print(dt)
    # dt=get_data(mt_data_len=mt_data_len)
    t_loader,v_loader,training=spilt_data(dt,
                                    t_shuffle=False,t_drop_last=True,
                                    v_shuffle=False,v_drop_last=True)
    lr=get_learning_rate()
    trainer__=train()
    m_c_back=trainer__.checkpoint_callback
    m_l_back=trainer__.early_stopping_callback
    best_m_p=m_c_back.best_model_path
    best_m_l=m_l_back.best_score.item()
    # print(best_m_p)
    if os.path.exists(info_file):
        with open(info_file,'r+') as f1:
            last=json.load(fp=f1)
            last_best_model=last['last_best_model']
            last_best_score=last['last_best_score']
            if last_best_score > best_m_l:
                last['last_best_model']=best_m_p
                last['last_best_score']=best_m_l
                json.dump(last,fp=f1)
    else:               
        with open(info_file,'w') as f2:
            json.dump(dict(last_best_model=best_m_p,last_best_score=best_m_l),fp=f2)

Когда обучение завершится, вы сможете найти место хранения нашей лучшей модели и лучшего результата в файле results.json в корневом каталоге.

Во время процесса обучения вы увидите индикатор выполнения, показывающий прогресс каждой эпохи. 

Обучение:

обучение

Обучение завершено:

ts

3. Проверка модели

После обучения мы хотим проверить модель и визуализировать ее. Мы можем добавить следующий код:

best_model = NHiTS.load_from_checkpoint(best_m_p)
predictions = best_model.predict(v_loader, trainer_kwargs=dict(accelerator="cpu",logger=False), return_y=True)
raw_predictions = best_model.predict(v_loader, mode="raw", return_x=True, trainer_kwargs=dict(accelerator="cpu",logger=False))
for idx in range(10):  # plot 10 examples
    best_model.plot_prediction(raw_predictions.x, raw_predictions.output, idx=idx, add_loss_to_title=True)
    # sample 500 paths
samples = best_model.loss.sample(raw_predictions.output["prediction"][[0]], n_samples=500)[0]

# plot prediction
fig = best_model.plot_prediction(raw_predictions.x, raw_predictions.output, idx=0, add_loss_to_title=True)
ax = fig.get_axes()[0]
# plot first two sampled paths
ax.plot(samples[:, 0], color="g", label="Sample 1")
ax.plot(samples[:, 1], color="r", label="Sample 2")
fig.legend()
plt.show()

Вы также можете использовать TensorBoard для визуализации процесса в реальном времени во время обучения.

Результат:

ref

4. Тестирование обученной модели

Сначала открываем json-файл, чтобы найти оптимальное место хранения модели:

with open(info_file) as f:
    best_m_p=json.load(fp=f)['last_best_model']
print('model path is:',best_m_p)

Затем загрузим модель:

best_model = NHiTS.load_from_checkpoint(best_m_p)

Затем мы получаем данные от клиента в режиме реального времени для тестирования модели:

offset=1
dt=dt.iloc[-max_encoder_length-offset:-offset,:]
last_=dt.iloc[-1] #get the last group of data
# print(len(dt))
for i in range(1,max_prediction_length+1):
    dt.loc[dt.index[-1]+1]=last_
dt['series']=0
# dt['time_idx']=dt.apply(lambda x:x.index,args=1)
dt['time_idx']=dt.index-dt.index[0]
# dt=get_data(mt_data_len=max_encoder_length)
predictions=best_model.predict(dt,mode='raw',trainer_kwargs=dict(accelerator="cpu",logger=False),return_x=True)
best_model.plot_prediction(predictions.x,predictions.output,show_future_observed=False)
plt.show()

И вот результат:

pref

5.  Оценка модели

Конечно, мы можем использовать некоторые метрики из библиотеки PyTorch Forecasting, чтобы оценить производительность модели. Вот как можно оценить, используя среднюю абсолютную ошибку (MAE) и симметричную среднюю абсолютную ошибку в процентах (SMAPE), и вывести результаты оценки:

from pytorch_forecasting.metrics import MAE, SMAPE
mae = MAE()(raw_predictions["prediction"], raw_predictions["target"])
print(f"Mean Absolute Error: {mae}")
smape = SMAPE()(raw_predictions["prediction"], raw_predictions["target"])
print(f"Symmetric Mean Absolute Percentage Error: {smape}")

В этом фрагменте кода мы сначала импортируем метрики MAE и SMAPE. Затем мы используем эти метрики для расчета ошибки между прогнозируемыми значениями (raw_predictions["prediction"]) и фактическими значениями (raw_predictions["target"]). Эти метрики могут помочь нам увидеть производительность нашей модели и указать направление для ее дальнейшего улучшения.



Заключение

В этой статье мы рассмотрели, как использовать данные меток, упомянутые в двух предыдущих статьях, и продемонстрировали, как создать модель N-HiTs, используя наши данные. Затем мы обучили модель и проверили ее. Как видим, мы добились хороших результатов. Мы также продемонстрировали, как использовать эту модель в MetaTrader 5 для прогнозирования 30 свечей. Конечно, мы не упомянули, как размещать ордера на основе результатов прогнозирования, поскольку реальная торговля требует от читателей проведения большого количества тестов в соответствии с вашей реальной ситуацией и указания соответствующих правил торговли.

Спасибо за внимание!


Приложение:

Полный код:

# Copyright 2021, MetaQuotes Ltd.
# https://www.mql5.com


# from typing import Union
import lightning.pytorch as pl
import os
from lightning.pytorch.callbacks import EarlyStopping,ModelCheckpoint
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
# import torch
from pytorch_forecasting import NHiTS, TimeSeriesDataSet
from pytorch_forecasting.data import NaNLabelEncoder,timeseries
from pytorch_forecasting.metrics import MQF2DistributionLoss
from pytorch_forecasting.data.samplers import TimeSynchronizedBatchSampler
from lightning.pytorch.tuner import Tuner
import MetaTrader5 as mt
import warnings
import json


from torch.utils.data import DataLoader
from torch.utils.data.sampler import Sampler,SequentialSampler

class New_TmSrDt(TimeSeriesDataSet):
    '''
    rewrite dataset class
    '''
    def to_dataloader(self, train: bool = True, 
                      batch_size: int = 64, 
                      batch_sampler: Sampler | str = None, 
                      shuffle:bool=False,
                      drop_last:bool=False,
                      **kwargs) -> DataLoader:

        default_kwargs = dict(
            shuffle=shuffle,
            # drop_last=train and len(self) > batch_size,
            drop_last=drop_last, #
            collate_fn=self._collate_fn,
            batch_size=batch_size,
            batch_sampler=batch_sampler,
        )
        default_kwargs.update(kwargs)
        kwargs = default_kwargs
        # print(kwargs['drop_last'])
        if kwargs["batch_sampler"] is not None:
            sampler = kwargs["batch_sampler"]
            if isinstance(sampler, str):
                if sampler == "synchronized":
                    kwargs["batch_sampler"] = TimeSynchronizedBatchSampler(
                        SequentialSampler(self),
                        batch_size=kwargs["batch_size"],
                        shuffle=kwargs["shuffle"],
                        drop_last=kwargs["drop_last"],
                    )
                else:
                    raise ValueError(f"batch_sampler {sampler} unknown - see docstring for valid batch_sampler")
            del kwargs["batch_size"]
            del kwargs["shuffle"]
            del kwargs["drop_last"]

        return DataLoader(self,**kwargs)

def get_data(mt_data_len:int):
    if not mt.initialize():
        print('initialize() failed!') 
    else:
        print(mt.version())
        sb=mt.symbols_total()
        rts=None
        if sb > 0:
            rts=mt.copy_rates_from_pos("GOLD_micro",mt.TIMEFRAME_M15,0,mt_data_len) 
        mt.shutdown()
        # print(len(rts))
    rts_fm=pd.DataFrame(rts)
    rts_fm['time']=pd.to_datetime(rts_fm['time'], unit='s') 

    rts_fm['time_idx']= rts_fm.index%(max_encoder_length+2*max_prediction_length) 
    rts_fm['series']=rts_fm.index//(max_encoder_length+2*max_prediction_length)
    return rts_fm


def spilt_data(data:pd.DataFrame,
               t_drop_last:bool,
               t_shuffle:bool,
               v_drop_last:bool,
               v_shuffle:bool):
    training_cutoff = data["time_idx"].max() - max_prediction_length #max:95
    context_length = max_encoder_length
    prediction_length = max_prediction_length
    training = New_TmSrDt(
        data[lambda x: x.time_idx <= training_cutoff],
        time_idx="time_idx",
        target="close",
        categorical_encoders={"series":NaNLabelEncoder().fit(data.series)},
        group_ids=["series"],
        time_varying_unknown_reals=["close"],
        max_encoder_length=context_length,
        # min_encoder_length=max_encoder_length//2,
        max_prediction_length=prediction_length,
        # min_prediction_length=1,
    )

    validation = New_TmSrDt.from_dataset(training, 
                                         data, 
                                         min_prediction_idx=training_cutoff + 1)
    
    train_dataloader = training.to_dataloader(train=True,
                                              shuffle=t_shuffle, 
                                              drop_last=t_drop_last,
                                              batch_size=batch_size, 
                                              num_workers=0,)
    val_dataloader = validation.to_dataloader(train=False, 
                                              shuffle=v_shuffle,
                                              drop_last=v_drop_last,
                                              batch_size=batch_size, 
                                              num_workers=0)
    return train_dataloader,val_dataloader,training

def get_learning_rate():
    
    pl.seed_everything(42)
    trainer = pl.Trainer(accelerator="cpu", gradient_clip_val=0.1,logger=False)
    net = NHiTS.from_dataset(
        training,
        learning_rate=3e-2,
        weight_decay=1e-2,
        loss=MQF2DistributionLoss(prediction_length=max_prediction_length),
        backcast_loss_ratio=0.0,
        hidden_size=64,
        optimizer="AdamW",
    )
    res = Tuner(trainer).lr_find(
        net, train_dataloaders=t_loader, val_dataloaders=v_loader, min_lr=1e-5, max_lr=1e-1
    )
    # print(f"suggested learning rate: {res.suggestion()}")
    lr_=res.suggestion()
    return lr_
def train():
    early_stop_callback = EarlyStopping(monitor="val_loss", 
                                        min_delta=1e-4, 
                                        patience=10,  
                                        verbose=True, 
                                        mode="min")
    ck_callback=ModelCheckpoint(monitor='val_loss',
                                mode="min",
                                save_top_k=1,  
                                filename='{epoch}-{val_loss:.2f}')
    trainer = pl.Trainer(
        max_epochs=ep,
        accelerator="cpu",
        enable_model_summary=True,
        gradient_clip_val=1.0,
        callbacks=[early_stop_callback,ck_callback],
        limit_train_batches=30,
        enable_checkpointing=True,
    )
    net = NHiTS.from_dataset(
        training,
        learning_rate=lr,
        log_interval=10,
        log_val_interval=1,
        weight_decay=1e-2,
        backcast_loss_ratio=0.0,
        hidden_size=64,
        optimizer="AdamW",
        loss=MQF2DistributionLoss(prediction_length=max_prediction_length),
    )
    trainer.fit(
        net,
        train_dataloaders=t_loader,
        val_dataloaders=v_loader,
        # ckpt_path='best'
    )
    return trainer

if __name__=='__main__':
    ep=200
    __train=False
    mt_data_len=200000
    max_encoder_length = 2*96
    max_prediction_length = 30
    batch_size = 128
    info_file='results.json'
    warnings.filterwarnings("ignore")
    dt=get_data(mt_data_len=mt_data_len)
    if __train:
        # print(dt)
        # dt=get_data(mt_data_len=mt_data_len)
        t_loader,v_loader,training=spilt_data(dt,
                                              t_shuffle=False,t_drop_last=True,
                                              v_shuffle=False,v_drop_last=True)
        lr=get_learning_rate()
        trainer__=train()
        m_c_back=trainer__.checkpoint_callback
        m_l_back=trainer__.early_stopping_callback
        best_m_p=m_c_back.best_model_path
        best_m_l=m_l_back.best_score.item()

        # print(best_m_p)
        
        if os.path.exists(info_file):
            with open(info_file,'r+') as f1:
                last=json.load(fp=f1)
                last_best_model=last['last_best_model']
                last_best_score=last['last_best_score']
                if last_best_score > best_m_l:
                    last['last_best_model']=best_m_p
                    last['last_best_score']=best_m_l
                    json.dump(last,fp=f1)
        else:               
            with open(info_file,'w') as f2:
                json.dump(dict(last_best_model=best_m_p,last_best_score=best_m_l),fp=f2)

        best_model = NHiTS.load_from_checkpoint(best_m_p)
        predictions = best_model.predict(v_loader, trainer_kwargs=dict(accelerator="cpu",logger=False), return_y=True)
        raw_predictions = best_model.predict(v_loader, mode="raw", return_x=True, trainer_kwargs=dict(accelerator="cpu",logger=False))
    
        for idx in range(10):  # plot 10 examples
            best_model.plot_prediction(raw_predictions.x, raw_predictions.output, idx=idx, add_loss_to_title=True)
        samples = best_model.loss.sample(raw_predictions.output["prediction"][[0]], n_samples=500)[0]

        # plot prediction
        fig = best_model.plot_prediction(raw_predictions.x, raw_predictions.output, idx=0, add_loss_to_title=True)
        ax = fig.get_axes()[0]
        # plot first two sampled paths
        ax.plot(samples[:, 0], color="g", label="Sample 1")
        ax.plot(samples[:, 1], color="r", label="Sample 2")
        fig.legend()
        plt.show()
    else:
        with open(info_file) as f:
            best_m_p=json.load(fp=f)['last_best_model']
        print('model path is:',best_m_p)
        
        best_model = NHiTS.load_from_checkpoint(best_m_p)

        offset=1
        dt=dt.iloc[-max_encoder_length-offset:-offset,:]
        last_=dt.iloc[-1] 
        # print(len(dt))
        for i in range(1,max_prediction_length+1):
            dt.loc[dt.index[-1]+1]=last_
        dt['series']=0
        # dt['time_idx']=dt.apply(lambda x:x.index,args=1)
        dt['time_idx']=dt.index-dt.index[0]
        # dt=get_data(mt_data_len=max_encoder_length)
        predictions = best_model.predict(dt, mode='raw',trainer_kwargs=dict(accelerator="cpu",logger=False),return_x=True)
        best_model.plot_prediction(predictions.x,predictions.output,show_future_observed=False)
        plt.show()


Перевод с английского произведен MetaQuotes Ltd.
Оригинальная статья: https://www.mql5.com/en/articles/13255

Прикрепленные файлы |
n_hits.py (9.7 KB)
Использование алгоритмов оптимизации для настройки параметров советника "на лету" Использование алгоритмов оптимизации для настройки параметров советника "на лету"
В статье рассматриваются практические аспекты использования алгоритмов оптимизации для поиска наилучших параметров советников "на лету", виртуализация торговых операций и логики советника. Данная статья может быть использована как своеобразная инструкция для внедрения алгоритмов оптимизации в торгового советника.
Разработка системы репликации (Часть 28): Проект советника — класс C_Mouse (II) Разработка системы репликации (Часть 28): Проект советника — класс C_Mouse (II)
Когда начали создаваться первые системы, способные что-то считать, всё потребовало вмешательства инженеров, обладающих обширными знаниями о том, что проектируется. Мы говорим о рассвете компьютерной техники, о времени, когда не было даже терминалов, позволяющих что-либо программировать. По мере развития и роста интереса к тому, чтобы большее число людей могли создавать что-либо, появлялись новые идеи и методы программирования этих машин, которые раньше сводились к изменению положения соединителей. Именно тогда появились первые терминалы.
Разрабатываем мультивалютный советник (Часть 3): Ревизия архитектуры Разрабатываем мультивалютный советник (Часть 3): Ревизия архитектуры
Мы уже несколько продвинулись в разработке мультивалютного советника с несколькими параллельно работающими стратегиями. С учетом накопленного опыта проведем ревизию архитектуры нашего решения и попробуем ее улучшить, пока не ушли слишком далеко вперед.
Теория категорий в MQL5 (Часть 23): Другой взгляд на двойную экспоненциальную скользящую среднюю Теория категорий в MQL5 (Часть 23): Другой взгляд на двойную экспоненциальную скользящую среднюю
В этой статье мы продолжаем рассматривать популярные торговые индикаторы под новым углом. Мы собираемся обрабатывать горизонтальную композицию естественных преобразований. Лучшим индикатором для этого является двойная экспоненциальная скользящая средняя (Double Exponential Moving Average, DEMA).