Разметка данных в анализе временных рядов (Часть 3):Пример использования разметки данных
Введение
Статья рассказывает, как использовать PyTorch Lightning и фреймворк PyTorch Forecasting через торговую платформу MetaTrader 5 для реализации прогнозирования финансовых временных рядов на основе нейронных сетей.
В статье мы также объясним причины, по которым мы выбрали эти две платформы и используемый нами формат данных.
Что касается данных, вы можете использовать данные, полученные путем разметки данных из двух моих предыдущих статей. Поскольку они имеют один и тот же формат, вы можете легко расширить его, следуя методологии, описанной в этой статье.
Ссылки на две предыдущие статьи:
- Разметка данных в анализе временных рядов (Часть 1):Создаем набор данных с маркерами тренда с помощью графика советника
- Разметка данных в анализе временных рядов (Часть 2):Создаем наборы данных с маркерами тренда с помощью Python
Содержание:
- Введение
- Несколько важных библиотек Python
- Инициализация
- Переписывание класса pytorch_forecasting.TimeSeriesDataSet
- Создание наборов данных для обучения и проверки
- Создание и обучение модели
- Определение логики выполнения
- Заключение
Несколько важных библиотек Python
Сначала давайте представим основные библиотеки Python, которые мы будем использовать.
1. PyTorch Lightning
PyTorch Lightning — это среда глубокого обучения, специально разработанная для профессиональных исследователей искусственного интеллекта и инженеров по машинному обучению, которым требуется максимальная гибкость без ущерба для масштабируемости.
Основная идея заключается в том, чтобы отделить академический код (например, определения моделей, прямое/обратное распространение, оптимизаторы, валидацию и т. д.) от инженерного кода (например, циклы for, механизмы сохранения, журналы TensorBoard, стратегии обучения и т. д.), в результате чего можно получить более упорядоченный и понятный код.
К основным преимуществам относятся:
- Высокая возможность повторного использования. Конструкция позволяет повторно использовать код в различных проектах.
- Простота обслуживания. Благодаря структурированному дизайну поддерживать код становится проще.
- Четкая логика. Благодаря абстрагированию шаблонного инженерного кода код машинного обучения становится легче идентифицировать и понимать.
В целом, PyTorch Lightning — чрезвычайно мощная библиотека, предлагающая эффективный метод организации и управления вашим кодом PyTorch. Кроме того, он обеспечивает структурированный подход к решению распространенных, но сложных задач, таких как обучение, проверка и тестирование модели.
Подробное использование этой библиотеки можно найти в официальной документации: https://lightning.ai/docs.2. PyTorch Forecasting
Библиотека Python, специально разработанная для прогнозирования временных рядов. Поскольку она создана на основе PyTorch, вы можете использовать мощные библиотеки автоматической дифференциации и оптимизации PyTorch, а также воспользоваться удобством, которое PyTorch Forecasting предлагает для прогнозирования временных рядов.
В PyTorch Forecasting вы можете найти реализации различных моделей прогнозирования, включая, помимо прочего, модели авторегрессии (AR, ARIMA), модели пространства состояний (SARIMAX), нейронные сети (LSTM, GRU) и ансамблевые методы (Prophet, N-Beats). Это означает, что вы можете экспериментировать и сравнивать различные прогнозные подходы в одной и той же среде без необходимости писать обширный стандартный код для каждого подхода.
Библиотека также предлагает ряд инструментов предварительной обработки данных, которые могут помочь в решении типичных задач во временных рядах. Эти инструменты включают, среди прочего, подстановку недостающих значений, масштабирование, извлечение признаков и преобразования скользящего окна. Это означает, что вы можете больше сосредоточиться на разработке и оптимизации вашей модели, не тратя много времени на обработку данных.
Библиотека также располагает унифицированным интерфейсом для оценки производительности модели. Она реализует функции потерь и метрики проверки для временных рядов, такие как QuantileLoss и SMAPE, а также поддерживает такие методологии обучения, как ранняя остановка и перекрестная проверка. Это позволяет вам более удобно отслеживать и повышать производительность вашей модели.
Если вы ищете метод повышения эффективности и удобства обслуживания вашего проекта прогнозирования временных рядов, то PyTorch Forecasting может стать отличным выбором. Библиотека предлагает эффективные и гибкие средства для организации вашего кода PyTorch и управления им, позволяя вам сосредоточиться на самом важном аспекте — самой модели машинного обучения.
Подробное описание использования библиотеки можно найти в официальной документации: https://pytorch-forecasting.readthedocs.io/en/stable.
3. Модель N-HiTS
Модель N-HiTS решает проблемы волатильности прогнозов и вычислительной сложности в долгосрочном прогнозировании за счет внедрения инновационных методов иерархической интерполяции и многоскоростной выборки данных. Это позволяет модели N-HiTS эффективно аппроксимировать диапазон прогнозирования любой длины.
Кроме того, обширные эксперименты, проведенные на крупномасштабных наборах данных, показали, что модель N-HiTS повышает точность в среднем почти на 20% по сравнению с последней архитектурой Transformer, а также сокращает время вычислений на порядок (в 50 раз).
Link to the paper: https://doi.org/10.48550/arXiv.2201.12886.
Инициализация
Сначала нам нужно импортировать необходимые библиотеки. К таким библиотекам относятся MetaTrader 5 (для взаимодействия с одноименным терминалом), PyTorch Lightning (для обучения модели) и некоторые другие библиотеки для обработки и визуализации данных.
import MetaTrader5 as mt5 import lightning.pytorch as pl from lightning.pytorch.callbacks import EarlyStopping import matplotlib.pyplot as plt import pandas as pd from pytorch_forecasting import Baseline, NHiTS, TimeSeriesDataSet from pytorch_forecasting.data import NaNLabelEncoder from pytorch_forecasting.metrics import MAE, SMAPE, MQF2DistributionLoss, QuantileLoss from lightning.pytorch.tuner import Tuner
Далее нам нужно инициализировать MetaTrader 5. Это делается путем вызова функции mt.initialize(). Если вы не можете его инициализировать, просто используя его, вам необходимо передать путь к терминалу MetaTrader 5 в качестве параметра этой функции (в примере "D:\Project\mt\MT5\terminal64.exe" — это мой путь, в реальном приложении вам необходимо установить ваш собственный путь). Если инициализация прошла успешно, функция вернет True, в противном случае — False.
if not mt.initialize("D:\\Project\\mt\\MT5\\terminal64.exe"): print('initialize() failed!') else: print(mt.version())
Функция mt.symbols_total() используется для получения общего количества торгуемых разновидностей, доступных в терминале MetaTrader 5. Мы можем использовать его, чтобы определить, сможем ли мы правильно получить данные. Если общее число больше 0, мы можем использовать функцию mt.copy_rates_from_pos() для получения исторических данных указанной торгуемой разновидности. В этом примере мы получили самые последние данные о длине периода "mt_data_len" M15 (15 минут) разновидности GOLD_micro.
sb=mt.symbols_total() rts=None if sb > 0: rts=mt.copy_rates_from_pos("GOLD_micro",mt.TIMEFRAME_M15,0,mt_data_len) mt.shutdown()
Наконец, мы используем функцию mt.shutdown(), чтобы закрыть соединение с терминалом MetaTrader 5 и преобразовать полученные данные в формат Pandas DataFrame.
mt.shutdown() rts_fm=pd.DataFrame(rts)
Теперь рассмотрим, как предварительно обработать данные, полученные с терминала.
Сначала нам нужно преобразовать временные метки в даты:
rts_fm['time']=pd.to_datetime(rts_fm['time'], unit='s')
Здесь мы больше не описываем, как размечать данные. Вы можете найти методы в двух моих предыдущих статьях (ссылки на них есть во введении к этой статье). Для краткой демонстрации того, как использовать модели прогнозирования, мы просто разделим все фрагменты данных max_encoder_length+2max_prediction_length на группы. В каждой группе есть последовательность от 0 до "max_encoder_length+2max_prediction_length-1". Заполним ее. Таким образом мы добавляем к исходным данным необходимые метки. Сначала нам нужно преобразовать исходный индекс времени (то есть индекс DataFrame). Вычислим остаток исходного индекса времени, разделенный на (max_encoder_length+2max_prediction_length), и используем результат в качестве нового индекса времени. Сопоставим индекс времени с диапазоном от 0 до "max_encoder_length+2*max_prediction_length-1":
rts_fm['time_idx']= rts_fm.index%(max_encoder_length+2*max_prediction_length)
Нам также необходимо преобразовать исходный индекс времени в группу. Вычислим исходный индекс времени, разделенный на "max_encoder_length+2*max_prediction_length", и используем результат как новую группу:
rts_fm['series']=rts_fm.index//(max_encoder_length+2*max_prediction_length)
Инкапсулируем часть предварительной обработки данных в функцию. Нам нужно передать ей только ту длину данных, которую нам нужно получить, и она сможет завершить работу по предварительной обработке данных:
def get_data(mt_data_len:int): if not mt.initialize("D:\\Project\\mt\\MT5\\terminal64.exe"): print('initialize() failed!') else: print(mt.version()) sb=mt.symbols_total() rts=None if sb > 0: rts=mt.copy_rates_from_pos("GOLD_micro",mt.TIMEFRAME_M15,0,mt_data_len) mt.shutdown() # print(len(rts)) rts_fm=pd.DataFrame(rts) rts_fm['time']=pd.to_datetime(rts_fm['time'], unit='s') rts_fm['time_idx']= rts_fm.index%(max_encoder_length+2*max_prediction_length) rts_fm['series']=rts_fm.index//(max_encoder_length+2*max_prediction_length) return rts_fm
Переписывание класса pytorch_forecasting.TimeSeriesDataSet
Переписывание функции to_dataloader() в pytorch_forecasting. Это позволяет контролировать, будут ли данные перемешиваться и удалять ли последнюю группу пакета (главным образом, чтобы предотвратить непредсказуемые ошибки, вызванные недостаточной длиной последней группы данных). Вот как можно это сделать:
class New_TmSrDt(TimeSeriesDataSet): def to_dataloader(self, train: bool = True, batch_size: int = 64, batch_sampler: Sampler | str = None, shuffle:bool=False, drop_last:bool=False, **kwargs) -> DataLoader: default_kwargs = dict( shuffle=shuffle, drop_last=drop_last, #modification collate_fn=self._collate_fn, batch_size=batch_size, batch_sampler=batch_sampler, ) default_kwargs.update(kwargs) kwargs = default_kwargs if kwargs["batch_sampler"] is not None: sampler = kwargs["batch_sampler"] if isinstance(sampler, str): if sampler == "synchronized": kwargs["batch_sampler"] = TimeSynchronizedBatchSampler( SequentialSampler(self), batch_size=kwargs["batch_size"], shuffle=kwargs["shuffle"], drop_last=kwargs["drop_last"], ) else: raise ValueError(f"batch_sampler {sampler} unknown - see docstring for valid batch_sampler") del kwargs["batch_size"] del kwargs["shuffle"] del kwargs["drop_last"] return DataLoader(self,**kwargs)Этот код создает новый класс New_TmSrDt, который наследуется от TimeSeriesDataSet. Затем в этом новом классе переопределяется функция to_dataloader(), включающая параметры shuffle и drop_last. Таким образом, вы сможете лучше контролировать процесс загрузки данных. Не забудьте заменить в коде экземпляры TimeSeriesDataSet на New_TmSrDt.
Создание наборов данных для обучения и проверки
Во-первых, нам нужно определить точку отсечения для обучающих данных. Это делается путем вычитания максимальной длины прогноза из максимального значения time_idx.
max_encoder_length = 2*96 max_prediction_length = 30 training_cutoff = rts_fm["time_idx"].max() - max_prediction_length
Затем мы используем класс New_TmSrDt (который представляет собой переписанный нами класс TimeSeriesDataSet) для создания набора обучающих данных. Этот класс требует следующих параметров:
- DataFrame (в данном случае - rts_fm)
- Столбец time_idx, который представляет собой непрерывную целочисленную последовательность.
- Целевой столбец (в данном случае close), который представляет собой значение, которое мы хотим спрогнозировать.
- Столбец группы (в данном случае series), который представляет разные временные ряды.
- Максимальные длины энкодера и предиктора
context_length = max_encoder_length prediction_length = max_prediction_length training = New_TmSrDt( data[lambda x: x.time_idx <= training_cutoff], time_idx="time_idx", target="close", categorical_encoders={"series":NaNLabelEncoder().fit(data.series)}, group_ids=["series"], time_varying_unknown_reals=["close"], max_encoder_length=context_length, # min_encoder_length=max_encoder_length//2, max_prediction_length=prediction_length, # min_prediction_length=1, ) validation = New_TmSrDt.from_dataset(training, data, min_prediction_idx=training_cutoff + 1)
Далее мы используем функцию New_TmSrDt.from_dataset() для создания набора данных проверки. Для этой функции требуются следующие параметры:
- Набор обучающих данных
- DataFrame
- Минимальный индекс прогнозирования, который должен быть на 1 больше максимального значения time_idx обучающих данных.
validation = New_TmSrDt.from_dataset(training, rts_fm, min_prediction_idx=training_cutoff + 1)
Наконец мы используем функцию to_dataloader() , чтобы перевести обучающий и проверочный наборы данных в объекты DataLoader PyTorch. Для этой функции требуются следующие параметры:
- Параметр train, указывающий, следует ли перемешивать данные.
- Параметр batch_size, который указывает количество образцов в пакете.
- Параметр num_workers, указывающий количество рабочих процессов для загрузки данных.
train_dataloader = training.to_dataloader(train=True, shuffle=t_shuffle, drop_last=t_drop_last, batch_size=batch_size, num_workers=0,) val_dataloader = validation.to_dataloader(train=False, shuffle=v_shuffle, drop_last=v_drop_last, batch_size=batch_size, num_workers=0)
Наконец, мы инкапсулируем эту часть кода в функцию spilt_data(data:pd.DataFrame,t_drop_last:bool,t_shuffle:bool,v_drop_last:bool,v_shuffle:bool) и указываем следующие параметры:
- Параметр data используется для получения набора данных, который необходимо обработать.
- Параметр t_drop_last указывает, следует ли удалить последнюю группу набора обучающих данных.
- Параметр t_shuffle указывает, следует ли перемешивать обучающие данные.
- Параметр v_drop_last указывает, следует ли удалить последнюю группу набора проверочных данных.
- Параметр v_shuffle указывает, следует ли перемешать данные проверки.
train_dataloader (экземпляр dataloader для набора обучающих данных), val_dataloader (экземпляр dataloader для набора проверочных данных) и training (экземпляр TimeSeriesDataSet для набора данных) используются в качестве возвращаемых значений этой функции, поскольку они будут использоваться позже.
def spilt_data(data:pd.DataFrame, t_drop_last:bool, t_shuffle:bool, v_drop_last:bool, v_shuffle:bool): training_cutoff = data["time_idx"].max() - max_prediction_length #max:95 context_length = max_encoder_length prediction_length = max_prediction_length training = New_TmSrDt( data[lambda x: x.time_idx <= training_cutoff], time_idx="time_idx", target="close", categorical_encoders={"series":NaNLabelEncoder().fit(data.series)}, group_ids=["series"], time_varying_unknown_reals=["close"], max_encoder_length=context_length, # min_encoder_length=max_encoder_length//2, max_prediction_length=prediction_length, # min_prediction_length=1, ) validation = New_TmSrDt.from_dataset(training, data, min_prediction_idx=training_cutoff + 1) train_dataloader = training.to_dataloader(train=True, shuffle=t_shuffle, drop_last=t_drop_last, batch_size=batch_size, num_workers=0,) val_dataloader = validation.to_dataloader(train=False, shuffle=v_shuffle, drop_last=v_drop_last, batch_size=batch_size, num_workers=0) return train_dataloader,val_dataloader,training
Создание и обучение модели
Приступим к созданию модели NHiTS. В этой части будет показано, как установить ее параметры и как ее обучать.
1. Найдите лучшую скорость обучения
Прежде чем начать создание модели, мы используем объект Tuner PyTorch Lightning, чтобы найти наилучшую скорость обучения.
Во-первых, нам нужно создать объект Trainer, в котором параметр accelerator используется для указания типа устройства, а gradient_clip_val используется для предотвращения взрыва градиента.
pl.seed_everything(42) trainer = pl.Trainer(accelerator="cpu", gradient_clip_val=0.1)
Далее мы используем функцию NHiTS.from_dataset() для создания сети модели NHiTS. Для этой функции требуются следующие параметры:
- Набор обучающих данных
- Скорость обучения
- Снижение веса
- Функция потерь
- Размер скрытого слоя
- Оптимизатор
net = NHiTS.from_dataset( training, learning_rate=3e-2, weight_decay=1e-2, loss=MQF2DistributionLoss(prediction_length=max_prediction_length), backcast_loss_ratio=0.0, hidden_size=64, optimizer="AdamW", )
Затем мы создаем экземпляр класса Tuner и вызываем функцию lr_find(). Она будет обучать модель на основе ряда скоростей обучения на основе нашего набора данных и сравнивать потери каждой скорости обучения, чтобы получить наилучшую скорость обучения.
res = Tuner(trainer).lr_find( net, train_dataloaders=train_dataloader, val_dataloaders=val_dataloader, min_lr=1e-5, max_lr=1e-1 ) lr_=res.suggestion()
Аналогично, мы инкапсулируем эту часть кода, которая получает лучшую скорость обучения, в функцию get_learning_rate() и делаем полученную лучшую скорость обучения ее возвращаемым значением:
def get_learning_rate(): pl.seed_everything(42) trainer = pl.Trainer(accelerator="cpu", gradient_clip_val=0.1,logger=False) net = NHiTS.from_dataset( training, learning_rate=3e-2, weight_decay=1e-2, loss=MQF2DistributionLoss(prediction_length=max_prediction_length), backcast_loss_ratio=0.0, hidden_size=64, optimizer="AdamW", ) res = Tuner(trainer).lr_find( net, train_dataloaders=t_loader, val_dataloaders=v_loader, min_lr=1e-5, max_lr=1e-1 ) lr_=res.suggestion() return lr_
Если вы хотите визуализировать скорость обучения, вы можете добавить следующий код:
print(f"suggested learning rate: {res.suggestion()}")
fig = res.plot(show=True, suggest=True)
fig.show()
Результат в этом примере следующий:
рекомендуемая скорость обучения: 0.003981071705534973.
2. Определение EarlyStopping Callback
Этот обратный вызов в основном используется для мониторинга потерь при проверке и остановки обучения, когда потери не улучшаются в течение нескольких последовательных эпох. Это может предотвратить переобучение модели.
early_stop_callback = EarlyStopping(monitor="val_loss", min_delta=1e-4, patience=10, verbose=True, mode="min")
Здесь следует отметить параметр patience, который в основном определяет, когда следует остановиться во время тренировки, если потери не уменьшаются в течение нескольких последовательных эпох. Мы установили его на 10.
3. Определение ModelCheckpoint Callback
Этот обратный вызов в основном используется для управления архивированием модели и именем архива. В основном мы устанавливаем следующие две переменные.
ck_callback=ModelCheckpoint(monitor='val_loss', mode="min", save_top_k=1, filename='{epoch}-{val_loss:.2f}')
save_top_k используется для управления сохранением нескольких лучших моделей. Мы устанавливаем значение 1 и сохраняем только лучшую модель.
4. Определение Training Model
Сначала нам нужно создать экземпляр класса Trainer в Lightning.pytorch и добавить два обратных вызова, которые мы определили ранее.
trainer = pl.Trainer( max_epochs=ep, accelerator="cpu", enable_model_summary=True, gradient_clip_val=1.0, callbacks=[early_stop_callback,ck_callback], limit_train_batches=30, enable_checkpointing=True, )
Здесь нам нужно обратить внимание на параметры max_epochs (максимальное количество эпох обучения), gradient_clip_val (используется для предотвращения взрыва градиента) и callbacks. Здесь max_epochs использует глобальную переменную ep, которую мы определим позже, а callbacks — это наша коллекция обратных вызовов.
Далее нам также необходимо определить модель NHiTS и создать ее экземпляр:
net = NHiTS.from_dataset( training, learning_rate=lr, log_interval=10, log_val_interval=1, weight_decay=1e-2, backcast_loss_ratio=0.0, hidden_size=64, optimizer="AdamW", loss=MQF2DistributionLoss(prediction_length=max_prediction_length), )
Здесь параметры вообще не нужно изменять, просто используйте значения по умолчанию. Здесь мы только устанавливаем loss на функцию потерь MQF2DistributionLoss.
5. Модуль обучения
Мы используем функцию fit() объекта Trainer для обучения модели:
trainer.fit( net, train_dataloaders=train_dataloader, val_dataloaders=val_dataloader, )
Аналогично мы инкапсулируем эту часть кода в функцию train():
def train(): early_stop_callback = EarlyStopping(monitor="val_loss", min_delta=1e-4, patience=10, # The number of times without improvement will stop verbose=True, mode="min") ck_callback=ModelCheckpoint(monitor='val_loss', mode="min", save_top_k=1, # Save the top few best ones filename='{epoch}-{val_loss:.2f}') trainer = pl.Trainer( max_epochs=ep, accelerator="cpu", enable_model_summary=True, gradient_clip_val=1.0, callbacks=[early_stop_callback,ck_callback], limit_train_batches=30, enable_checkpointing=True, ) net = NHiTS.from_dataset( training, learning_rate=lr, log_interval=10, log_val_interval=1, weight_decay=1e-2, backcast_loss_ratio=0.0, hidden_size=64, optimizer="AdamW", loss=MQF2DistributionLoss(prediction_length=max_prediction_length), ) trainer.fit( net, train_dataloaders=t_loader, val_dataloaders=v_loader, # ckpt_path='best' ) return trainer
Эта функция вернет обученную модель, которую вы можете использовать для задач прогнозирования.
Определение логики выполнения
1. Определение глобальных переменных:
ep=200 __train=False mt_data_len=200000 max_encoder_length = 2*96 max_prediction_length = 30 batch_size = 128
__train используется для контроля того, обучаем ли мы в данный момент модель или тестируем ее.
Стоит отметить, что ep используется для управления максимальной эпохой обучения. Поскольку мы установили EarlyStopping, это значение можно увеличить, потому что модель автоматически остановится, когда схождение прекратится.
mt_data_len — количество последних данных временных рядов, полученных от клиента.
max_encoder_length и max_prediction_length — это соответственно максимальная длина кодирования и максимальная длина прогнозирования.
2. Обучение
Нам также необходимо сохранить текущие оптимальные результаты обучения в локальном файле после завершения обучения, поэтому мы определяем файл json для сохранения этой информации:
info_file='results.json'
Чтобы сделать наш процесс обучения более понятным, нам нужно избегать вывода ненужной предупреждающей информации во время обучения, поэтому мы добавим следующий код:
warnings.filterwarnings("ignore")
Далее идет наша логика обучения:
dt=get_data(mt_data_len=mt_data_len) if __train: # print(dt) # dt=get_data(mt_data_len=mt_data_len) t_loader,v_loader,training=spilt_data(dt, t_shuffle=False,t_drop_last=True, v_shuffle=False,v_drop_last=True) lr=get_learning_rate() trainer__=train() m_c_back=trainer__.checkpoint_callback m_l_back=trainer__.early_stopping_callback best_m_p=m_c_back.best_model_path best_m_l=m_l_back.best_score.item() # print(best_m_p) if os.path.exists(info_file): with open(info_file,'r+') as f1: last=json.load(fp=f1) last_best_model=last['last_best_model'] last_best_score=last['last_best_score'] if last_best_score > best_m_l: last['last_best_model']=best_m_p last['last_best_score']=best_m_l json.dump(last,fp=f1) else: with open(info_file,'w') as f2: json.dump(dict(last_best_model=best_m_p,last_best_score=best_m_l),fp=f2)
Когда обучение завершится, вы сможете найти место хранения нашей лучшей модели и лучшего результата в файле results.json в корневом каталоге.
Во время процесса обучения вы увидите индикатор выполнения, показывающий прогресс каждой эпохи.
Обучение:
Обучение завершено:
3. Проверка модели
После обучения мы хотим проверить модель и визуализировать ее. Мы можем добавить следующий код:
best_model = NHiTS.load_from_checkpoint(best_m_p) predictions = best_model.predict(v_loader, trainer_kwargs=dict(accelerator="cpu",logger=False), return_y=True) raw_predictions = best_model.predict(v_loader, mode="raw", return_x=True, trainer_kwargs=dict(accelerator="cpu",logger=False)) for idx in range(10): # plot 10 examples best_model.plot_prediction(raw_predictions.x, raw_predictions.output, idx=idx, add_loss_to_title=True) # sample 500 paths samples = best_model.loss.sample(raw_predictions.output["prediction"][[0]], n_samples=500)[0] # plot prediction fig = best_model.plot_prediction(raw_predictions.x, raw_predictions.output, idx=0, add_loss_to_title=True) ax = fig.get_axes()[0] # plot first two sampled paths ax.plot(samples[:, 0], color="g", label="Sample 1") ax.plot(samples[:, 1], color="r", label="Sample 2") fig.legend() plt.show()
Вы также можете использовать TensorBoard для визуализации процесса в реальном времени во время обучения.
Результат:
4. Тестирование обученной модели
Сначала открываем json-файл, чтобы найти оптимальное место хранения модели:
with open(info_file) as f: best_m_p=json.load(fp=f)['last_best_model'] print('model path is:',best_m_p)
Затем загрузим модель:
best_model = NHiTS.load_from_checkpoint(best_m_p)
Затем мы получаем данные от клиента в режиме реального времени для тестирования модели:
offset=1 dt=dt.iloc[-max_encoder_length-offset:-offset,:] last_=dt.iloc[-1] #get the last group of data # print(len(dt)) for i in range(1,max_prediction_length+1): dt.loc[dt.index[-1]+1]=last_ dt['series']=0 # dt['time_idx']=dt.apply(lambda x:x.index,args=1) dt['time_idx']=dt.index-dt.index[0] # dt=get_data(mt_data_len=max_encoder_length) predictions=best_model.predict(dt,mode='raw',trainer_kwargs=dict(accelerator="cpu",logger=False),return_x=True) best_model.plot_prediction(predictions.x,predictions.output,show_future_observed=False) plt.show()
И вот результат:
5. Оценка модели
Конечно, мы можем использовать некоторые метрики из библиотеки PyTorch Forecasting, чтобы оценить производительность модели. Вот как можно оценить, используя среднюю абсолютную ошибку (MAE) и симметричную среднюю абсолютную ошибку в процентах (SMAPE), и вывести результаты оценки:
from pytorch_forecasting.metrics import MAE, SMAPE mae = MAE()(raw_predictions["prediction"], raw_predictions["target"]) print(f"Mean Absolute Error: {mae}") smape = SMAPE()(raw_predictions["prediction"], raw_predictions["target"]) print(f"Symmetric Mean Absolute Percentage Error: {smape}")
В этом фрагменте кода мы сначала импортируем метрики MAE и SMAPE. Затем мы используем эти метрики для расчета ошибки между прогнозируемыми значениями (raw_predictions["prediction"]) и фактическими значениями (raw_predictions["target"]). Эти метрики могут помочь нам увидеть производительность нашей модели и указать направление для ее дальнейшего улучшения.
Заключение
В этой статье мы рассмотрели, как использовать данные меток, упомянутые в двух предыдущих статьях, и продемонстрировали, как создать модель N-HiTs, используя наши данные. Затем мы обучили модель и проверили ее. Как видим, мы добились хороших результатов. Мы также продемонстрировали, как использовать эту модель в MetaTrader 5 для прогнозирования 30 свечей. Конечно, мы не упомянули, как размещать ордера на основе результатов прогнозирования, поскольку реальная торговля требует от читателей проведения большого количества тестов в соответствии с вашей реальной ситуацией и указания соответствующих правил торговли.Спасибо за внимание!
Приложение:
Полный код:
# Copyright 2021, MetaQuotes Ltd. # https://www.mql5.com # from typing import Union import lightning.pytorch as pl import os from lightning.pytorch.callbacks import EarlyStopping,ModelCheckpoint import matplotlib.pyplot as plt import numpy as np import pandas as pd # import torch from pytorch_forecasting import NHiTS, TimeSeriesDataSet from pytorch_forecasting.data import NaNLabelEncoder,timeseries from pytorch_forecasting.metrics import MQF2DistributionLoss from pytorch_forecasting.data.samplers import TimeSynchronizedBatchSampler from lightning.pytorch.tuner import Tuner import MetaTrader5 as mt import warnings import json from torch.utils.data import DataLoader from torch.utils.data.sampler import Sampler,SequentialSampler class New_TmSrDt(TimeSeriesDataSet): ''' rewrite dataset class ''' def to_dataloader(self, train: bool = True, batch_size: int = 64, batch_sampler: Sampler | str = None, shuffle:bool=False, drop_last:bool=False, **kwargs) -> DataLoader: default_kwargs = dict( shuffle=shuffle, # drop_last=train and len(self) > batch_size, drop_last=drop_last, # collate_fn=self._collate_fn, batch_size=batch_size, batch_sampler=batch_sampler, ) default_kwargs.update(kwargs) kwargs = default_kwargs # print(kwargs['drop_last']) if kwargs["batch_sampler"] is not None: sampler = kwargs["batch_sampler"] if isinstance(sampler, str): if sampler == "synchronized": kwargs["batch_sampler"] = TimeSynchronizedBatchSampler( SequentialSampler(self), batch_size=kwargs["batch_size"], shuffle=kwargs["shuffle"], drop_last=kwargs["drop_last"], ) else: raise ValueError(f"batch_sampler {sampler} unknown - see docstring for valid batch_sampler") del kwargs["batch_size"] del kwargs["shuffle"] del kwargs["drop_last"] return DataLoader(self,**kwargs) def get_data(mt_data_len:int): if not mt.initialize(): print('initialize() failed!') else: print(mt.version()) sb=mt.symbols_total() rts=None if sb > 0: rts=mt.copy_rates_from_pos("GOLD_micro",mt.TIMEFRAME_M15,0,mt_data_len) mt.shutdown() # print(len(rts)) rts_fm=pd.DataFrame(rts) rts_fm['time']=pd.to_datetime(rts_fm['time'], unit='s') rts_fm['time_idx']= rts_fm.index%(max_encoder_length+2*max_prediction_length) rts_fm['series']=rts_fm.index//(max_encoder_length+2*max_prediction_length) return rts_fm def spilt_data(data:pd.DataFrame, t_drop_last:bool, t_shuffle:bool, v_drop_last:bool, v_shuffle:bool): training_cutoff = data["time_idx"].max() - max_prediction_length #max:95 context_length = max_encoder_length prediction_length = max_prediction_length training = New_TmSrDt( data[lambda x: x.time_idx <= training_cutoff], time_idx="time_idx", target="close", categorical_encoders={"series":NaNLabelEncoder().fit(data.series)}, group_ids=["series"], time_varying_unknown_reals=["close"], max_encoder_length=context_length, # min_encoder_length=max_encoder_length//2, max_prediction_length=prediction_length, # min_prediction_length=1, ) validation = New_TmSrDt.from_dataset(training, data, min_prediction_idx=training_cutoff + 1) train_dataloader = training.to_dataloader(train=True, shuffle=t_shuffle, drop_last=t_drop_last, batch_size=batch_size, num_workers=0,) val_dataloader = validation.to_dataloader(train=False, shuffle=v_shuffle, drop_last=v_drop_last, batch_size=batch_size, num_workers=0) return train_dataloader,val_dataloader,training def get_learning_rate(): pl.seed_everything(42) trainer = pl.Trainer(accelerator="cpu", gradient_clip_val=0.1,logger=False) net = NHiTS.from_dataset( training, learning_rate=3e-2, weight_decay=1e-2, loss=MQF2DistributionLoss(prediction_length=max_prediction_length), backcast_loss_ratio=0.0, hidden_size=64, optimizer="AdamW", ) res = Tuner(trainer).lr_find( net, train_dataloaders=t_loader, val_dataloaders=v_loader, min_lr=1e-5, max_lr=1e-1 ) # print(f"suggested learning rate: {res.suggestion()}") lr_=res.suggestion() return lr_ def train(): early_stop_callback = EarlyStopping(monitor="val_loss", min_delta=1e-4, patience=10, verbose=True, mode="min") ck_callback=ModelCheckpoint(monitor='val_loss', mode="min", save_top_k=1, filename='{epoch}-{val_loss:.2f}') trainer = pl.Trainer( max_epochs=ep, accelerator="cpu", enable_model_summary=True, gradient_clip_val=1.0, callbacks=[early_stop_callback,ck_callback], limit_train_batches=30, enable_checkpointing=True, ) net = NHiTS.from_dataset( training, learning_rate=lr, log_interval=10, log_val_interval=1, weight_decay=1e-2, backcast_loss_ratio=0.0, hidden_size=64, optimizer="AdamW", loss=MQF2DistributionLoss(prediction_length=max_prediction_length), ) trainer.fit( net, train_dataloaders=t_loader, val_dataloaders=v_loader, # ckpt_path='best' ) return trainer if __name__=='__main__': ep=200 __train=False mt_data_len=200000 max_encoder_length = 2*96 max_prediction_length = 30 batch_size = 128 info_file='results.json' warnings.filterwarnings("ignore") dt=get_data(mt_data_len=mt_data_len) if __train: # print(dt) # dt=get_data(mt_data_len=mt_data_len) t_loader,v_loader,training=spilt_data(dt, t_shuffle=False,t_drop_last=True, v_shuffle=False,v_drop_last=True) lr=get_learning_rate() trainer__=train() m_c_back=trainer__.checkpoint_callback m_l_back=trainer__.early_stopping_callback best_m_p=m_c_back.best_model_path best_m_l=m_l_back.best_score.item() # print(best_m_p) if os.path.exists(info_file): with open(info_file,'r+') as f1: last=json.load(fp=f1) last_best_model=last['last_best_model'] last_best_score=last['last_best_score'] if last_best_score > best_m_l: last['last_best_model']=best_m_p last['last_best_score']=best_m_l json.dump(last,fp=f1) else: with open(info_file,'w') as f2: json.dump(dict(last_best_model=best_m_p,last_best_score=best_m_l),fp=f2) best_model = NHiTS.load_from_checkpoint(best_m_p) predictions = best_model.predict(v_loader, trainer_kwargs=dict(accelerator="cpu",logger=False), return_y=True) raw_predictions = best_model.predict(v_loader, mode="raw", return_x=True, trainer_kwargs=dict(accelerator="cpu",logger=False)) for idx in range(10): # plot 10 examples best_model.plot_prediction(raw_predictions.x, raw_predictions.output, idx=idx, add_loss_to_title=True) samples = best_model.loss.sample(raw_predictions.output["prediction"][[0]], n_samples=500)[0] # plot prediction fig = best_model.plot_prediction(raw_predictions.x, raw_predictions.output, idx=0, add_loss_to_title=True) ax = fig.get_axes()[0] # plot first two sampled paths ax.plot(samples[:, 0], color="g", label="Sample 1") ax.plot(samples[:, 1], color="r", label="Sample 2") fig.legend() plt.show() else: with open(info_file) as f: best_m_p=json.load(fp=f)['last_best_model'] print('model path is:',best_m_p) best_model = NHiTS.load_from_checkpoint(best_m_p) offset=1 dt=dt.iloc[-max_encoder_length-offset:-offset,:] last_=dt.iloc[-1] # print(len(dt)) for i in range(1,max_prediction_length+1): dt.loc[dt.index[-1]+1]=last_ dt['series']=0 # dt['time_idx']=dt.apply(lambda x:x.index,args=1) dt['time_idx']=dt.index-dt.index[0] # dt=get_data(mt_data_len=max_encoder_length) predictions = best_model.predict(dt, mode='raw',trainer_kwargs=dict(accelerator="cpu",logger=False),return_x=True) best_model.plot_prediction(predictions.x,predictions.output,show_future_observed=False) plt.show()
Перевод с английского произведен MetaQuotes Ltd.
Оригинальная статья: https://www.mql5.com/en/articles/13255
- Бесплатные приложения для трейдинга
- 8 000+ сигналов для копирования
- Экономические новости для анализа финансовых рынков
Вы принимаете политику сайта и условия использования