时间序列挖掘的数据标签(第3部分):使用标签数据的示例
概述
本文介绍了如何通过MetaTrader5交易平台使用PyTorch Lightning和PyTorch Forecasting框架来实现基于神经网络的金融时间序列预测。
在本文中,我们还将解释选择这两个框架的原因以及我们使用的数据格式。
关于数据,您可以使用我之前两篇文章中通过数据标记产生的数据。由于它们共享相同的格式,您可以按照本文中的方法轻松地扩展它。
前两篇文章的链接是:
目录:
几个重要的 Python 库
首先,让我们介绍一下我们将使用的主要Python库。
1. PyTorch Lightning
PyTorch Lightning 是一个深度学习框架,专门为专业人工智能研究人员和机器学习工程师设计,他们需要最大的灵活性而不影响可扩展性。
它的核心概念是将学术代码(如模型定义、前向/后向传播、优化器、验证等)与工程代码(如循环、保存机制、TensorBoard日志、训练策略等)分离,从而产生更精简、更易于理解的代码。
主要好处包括:
- 高可重用性 - 它的设计使代码能够在各种项目中重用。
- 易于维护 - 由于其结构化设计,维护代码变得更简单。
- 清晰的逻辑 - 通过抽象样板工程代码,机器学习代码变得更容易识别和理解。
总的来说,PyTorch Lightning是一个非常强大的库,它为组织和管理PyTorch代码提供了一种有效的方法。此外,它还提供了一种结构化的方法来处理常见但复杂的任务,如模型训练、验证和测试。
该图书馆的详细用法可在其官方文件中找到:https://lightning.ai/docs。2. PyTorch Forecasting(预测)
这是一个专门为时间序列预测设计的Python库。由于它是基于PyTorch构建的,您可以利用PyTorch强大的自动差异化和优化库,同时还可以受益于PyTorch Forecasting为时间序列预测提供的便利。
在PyTorch Forecasting中,您可以找到各种预测模型的实现,包括但不限于自回归模型(AR、ARIMA)、状态空间模型(SARIMAX)、神经网络(LSTM、GRU)和集成方法(Prophet、N-Beats)。这意味着您可以在同一框架内试验和比较不同的预测方法,而无需为每种方法编写大量的样板代码。
该库还提供了一系列数据预处理工具,可以帮助您处理时间序列中的常见任务。这些工具包括缺失值插补、缩放、特征提取和滚动窗口转换等。这意味着您可以更加专注于模型的设计和优化,而无需花费大量时间进行数据处理。
它还提供了一个用于评估模型性能的统一接口。它实现了QuantileLoss和SMAPE等时间序列的损失函数和验证指标,并支持早期停止和交叉验证等训练方法。这使您能够更方便地跟踪和增强模型的性能。
如果你正在寻找一种方法来提高时间序列预测项目的效率和可维护性,那么PyTorch Forecasting可能是一个很好的选择。它提供了一种有效而灵活的方法来组织和管理PyTorch代码,使您能够专注于最关键的方面——机器学习模型本身。
该库的详细用法可在其官方文件中找到:https://pytorch-forecasting.readthedocs.io/en/stable。
3. 关于N-HiTS模型
N-HiTS模型通过引入创新的分层插值和多速率数据采样技术,解决了长期预测中的预测波动性和计算复杂性问题。这允许N-HiTS模型有效地近似任何长度的预测范围。
此外,在大规模数据集上进行的大量实验表明,与最新的Transformer架构相比,N-HiTS模型平均提高了近20%的精度,同时还将计算时间减少了一个数量级(50倍)。
论文链接:https://doi.org/10.48550/arXiv.2201.12886。
初始化
首先,我们需要导入所需的库。这些库包括MetaTrader5(用于与MT5终端交互)、PyTorch Lightning(用于训练模型)以及其他一些用于数据处理和可视化的库。
import MetaTrader5 as mt5 import lightning.pytorch as pl from lightning.pytorch.callbacks import EarlyStopping import matplotlib.pyplot as plt import pandas as pd from pytorch_forecasting import Baseline, NHiTS, TimeSeriesDataSet from pytorch_forecasting.data import NaNLabelEncoder from pytorch_forecasting.metrics import MAE, SMAPE, MQF2DistributionLoss, QuantileLoss from lightning.pytorch.tuner import Tuner
接下来,我们需要初始化MetaTrader5。这是通过调用mt.initialize()函数来完成的。如果你不能简单地使用它来初始化它,你需要将MT5终端的路径作为参数传递给这个函数(在示例中“D:\Project\mt\MT5\terminal64.exe”是我的个人路径位置,在实际应用中你需要将它配置到你自己的路径位置)。如果初始化成功,函数将返回True,否则返回False。
if not mt.initialize("D:\\Project\\mt\\MT5\\terminal64.exe"): print('initialize() failed!') else: print(mt.version())
mt.symbols_total()函数用于获取MT5终端中可用的可交易品种的总数。我们可以用它来判断我们是否能够正确地获得数据。如果总数大于0,我们可以使用mt.copy_rates_from_pos()函数来获取指定交易品种的历史数据。在本例中,我们获得了“GOLD_micro”品种的“mt_data_len”M15(15分钟)周期数据的最新长度。
sb=mt.symbols_total() rts=None if sb > 0: rts=mt.copy_rates_from_pos("GOLD_micro",mt.TIMEFRAME_M15,0,mt_data_len) mt.shutdown()
最后,我们使用mt.shutdown()函数关闭与MT5终端的连接,并将获得的数据转换为Pandas DataFrame格式。
mt.shutdown() rts_fm=pd.DataFrame(rts)
现在,让我们讨论如何对从MT5终端获得的数据进行预处理。
我们需要先将时间戳转换为日期:
rts_fm['time']=pd.to_datetime(rts_fm['time'], unit='s')
在这里,我们不再描述如何标记数据。你可以在我之前的两篇文章中找到方法(本文简介中有文章链接)。为了简明地演示如何使用预测模型,我们只需将每个“max_encoder_length+2max_prediction_length”数据片段划分为一组。每组有一个从0到“max_encoder_length+2max_prediction_length-1”的序列,并填充它们。通过这种方式,我们将所需的标签添加到原始数据中。首先,我们需要将原始时间索引(即DataFrame的索引)转换为时间索引。计算原始时间索引的余数除以(max_encoder_length+2max_prediction_length),并将结果用作新的时间索引。这将时间索引映射到从0到“max_encoder_length+2*max_prediction_length-1”的范围内:
rts_fm['time_idx']= rts_fm.index%(max_encoder_length+2*max_prediction_length)
我们还需要将原始时间索引转换为一个组。计算原始时间指数除以“max_encoder_length+2*max_prediction_length”,并将结果用作新组:
rts_fm['series']=rts_fm.index//(max_encoder_length+2*max_prediction_length)
我们将数据预处理部分封装到一个函数中。我们只需要将我们需要获得的数据长度传递给它,它就可以完成数据预处理工作:
def get_data(mt_data_len:int): if not mt.initialize("D:\\Project\\mt\\MT5\\terminal64.exe"): print('initialize() failed!') else: print(mt.version()) sb=mt.symbols_total() rts=None if sb > 0: rts=mt.copy_rates_from_pos("GOLD_micro",mt.TIMEFRAME_M15,0,mt_data_len) mt.shutdown() # print(len(rts)) rts_fm=pd.DataFrame(rts) rts_fm['time']=pd.to_datetime(rts_fm['time'], unit='s') rts_fm['time_idx']= rts_fm.index%(max_encoder_length+2*max_prediction_length) rts_fm['series']=rts_fm.index//(max_encoder_length+2*max_prediction_length) return rts_fm
重写 pytorch_forecasting.TimeSeriesDataSet 类
重写pytorch_foredicting中的to_datalader()函数。这允许您控制是否对数据进行混洗以及是否丢弃最后一组批处理(主要是为了防止最后一组数据长度不足导致的不可预测错误)。以下是您的操作方法:
class New_TmSrDt(TimeSeriesDataSet): def to_dataloader(self, train: bool = True, batch_size: int = 64, batch_sampler: Sampler | str = None, shuffle:bool=False, drop_last:bool=False, **kwargs) -> DataLoader: default_kwargs = dict( shuffle=shuffle, drop_last=drop_last, #modification collate_fn=self._collate_fn, batch_size=batch_size, batch_sampler=batch_sampler, ) default_kwargs.update(kwargs) kwargs = default_kwargs if kwargs["batch_sampler"] is not None: sampler = kwargs["batch_sampler"] if isinstance(sampler, str): if sampler == "synchronized": kwargs["batch_sampler"] = TimeSynchronizedBatchSampler( SequentialSampler(self), batch_size=kwargs["batch_size"], shuffle=kwargs["shuffle"], drop_last=kwargs["drop_last"], ) else: raise ValueError(f"batch_sampler {sampler} unknown - see docstring for valid batch_sampler") del kwargs["batch_size"] del kwargs["shuffle"] del kwargs["drop_last"] return DataLoader(self,**kwargs)此代码创建从TimeSeriesDataSet继承的新类new_TmSrDt。to_datalader()函数随后在这个新类中被重写,以包括shuffle和drop_last参数。这样,您就可以更好地控制数据加载过程。请记住在代码中使用New_TmSrDt替换TimeSeriesDataSet的实例。
创建训练和验证数据集
首先,我们需要确定训练数据的截止点。这是通过从最大“time_idx”值中减去最大预测长度来完成的。
max_encoder_length = 2*96 max_prediction_length = 30 training_cutoff = rts_fm["time_idx"].max() - max_prediction_length
然后,我们使用New_TmSrDt类(这是我们重写的TimeSeriesDataSet类)来创建训练数据集。此类需要以下参数:
- DataFrame(在本例中为“rts_fom”)
- “time_idx”列,它是一个连续的整数序列
- 目标列(在本例中为“close”),这是我们要预测的值
- 组列(在本例中为“序列”),表示不同的时间序列
- 编码器和预测器的最大长度
context_length = max_encoder_length prediction_length = max_prediction_length training = New_TmSrDt( data[lambda x: x.time_idx <= training_cutoff], time_idx="time_idx", target="close", categorical_encoders={"series":NaNLabelEncoder().fit(data.series)}, group_ids=["series"], time_varying_unknown_reals=["close"], max_encoder_length=context_length, # min_encoder_length=max_encoder_length//2, max_prediction_length=prediction_length, # min_prediction_length=1, ) validation = New_TmSrDt.from_dataset(training, data, min_prediction_idx=training_cutoff + 1)
接下来,我们使用New_TmSrD.from_dataset()函数创建一个验证数据集。此函数需要以下参数:
- 训练数据集
- DataFrame
- 最小预测指数,应比训练数据的最大“time_idx”值大1
validation = New_TmSrDt.from_dataset(training, rts_fm, min_prediction_idx=training_cutoff + 1)
最后,我们使用to_datalader()函数将训练和验证数据集转换为PyTorch DataLoader对象。此函数需要以下参数:
- “train”参数,指示数据是否应该被打乱
- “batch_size”参数,用于指定每个批次的样本数
- “num_workers”参数,用于指定数据加载的工作进程数
train_dataloader = training.to_dataloader(train=True, shuffle=t_shuffle, drop_last=t_drop_last, batch_size=batch_size, num_workers=0,) val_dataloader = validation.to_dataloader(train=False, shuffle=v_shuffle, drop_last=v_drop_last, batch_size=batch_size, num_workers=0)
最后,我们将这部分代码封装到函数spilt_data(data:pd.DataFrame,t_drop_last:bool,t_shuffle:bool,v_drop_last:bol,v_shuffle:bol)中,并指定以下参数:
- “data”参数,用于接收需要处理的数据集
- “t_drop_last”参数,指示是否应丢弃训练数据集的最后一组
- “t_shuffle”参数,指示是否应对训练数据进行混洗
- “v_drop_last”参数,指示是否应删除验证数据集的最后一组
- “v_shuffle”参数,指示是否应对验证数据进行混洗
我们将train_dataloader(用于训练数据集的dataloader的实例)、val_dataloadr(用于验证数据集的dataloader的实例)和training(用于数据集的TimeSeriesDataSet的实例)作为该函数的返回值,因为它们将在以后使用。
def spilt_data(data:pd.DataFrame, t_drop_last:bool, t_shuffle:bool, v_drop_last:bool, v_shuffle:bool): training_cutoff = data["time_idx"].max() - max_prediction_length #max:95 context_length = max_encoder_length prediction_length = max_prediction_length training = New_TmSrDt( data[lambda x: x.time_idx <= training_cutoff], time_idx="time_idx", target="close", categorical_encoders={"series":NaNLabelEncoder().fit(data.series)}, group_ids=["series"], time_varying_unknown_reals=["close"], max_encoder_length=context_length, # min_encoder_length=max_encoder_length//2, max_prediction_length=prediction_length, # min_prediction_length=1, ) validation = New_TmSrDt.from_dataset(training, data, min_prediction_idx=training_cutoff + 1) train_dataloader = training.to_dataloader(train=True, shuffle=t_shuffle, drop_last=t_drop_last, batch_size=batch_size, num_workers=0,) val_dataloader = validation.to_dataloader(train=False, shuffle=v_shuffle, drop_last=v_drop_last, batch_size=batch_size, num_workers=0) return train_dataloader,val_dataloader,training
模型创建和训练
现在我们开始创建NHiTS模型。这一部分将展示如何设置其参数以及如何对其进行训练。
1.找到最佳学习率
在开始创建模型之前,我们使用PyTorch Lightning的Tuner对象来找到最佳学习率。
首先,我们需要创建PyTorch Lightning的Trainer对象,其中“accelerator”参数用于指定设备类型,“gradient_clip_val”用于防止梯度爆炸。
pl.seed_everything(42) trainer = pl.Trainer(accelerator="cpu", gradient_clip_val=0.1)
接下来,我们使用NHiTS.from_dataset()函数来创建一个NHiTS模型网。此函数需要以下参数:
- 训练数据集
- 学习率
- 权重衰减
- 损失函数
- 隐藏层的大小
- 优化器
net = NHiTS.from_dataset( training, learning_rate=3e-2, weight_decay=1e-2, loss=MQF2DistributionLoss(prediction_length=max_prediction_length), backcast_loss_ratio=0.0, hidden_size=64, optimizer="AdamW", )
然后我们实例化Tuner类并调用lr_find()函数。该函数将基于我们的数据集在一系列学习率上训练模型,并比较每个学习率的损失,以获得最佳学习率。
res = Tuner(trainer).lr_find( net, train_dataloaders=train_dataloader, val_dataloaders=val_dataloader, min_lr=1e-5, max_lr=1e-1 ) lr_=res.suggestion()
同样,我们将获得最佳学习率的这部分代码封装到函数get_learning_rate()中,并将获得的最佳学习率作为其返回值:
def get_learning_rate(): pl.seed_everything(42) trainer = pl.Trainer(accelerator="cpu", gradient_clip_val=0.1,logger=False) net = NHiTS.from_dataset( training, learning_rate=3e-2, weight_decay=1e-2, loss=MQF2DistributionLoss(prediction_length=max_prediction_length), backcast_loss_ratio=0.0, hidden_size=64, optimizer="AdamW", ) res = Tuner(trainer).lr_find( net, train_dataloaders=t_loader, val_dataloaders=v_loader, min_lr=1e-5, max_lr=1e-1 ) lr_=res.suggestion() return lr_
如果您想可视化学习率,可以添加以下代码:
print(f"suggested learning rate: {res.suggestion()}")
fig = res.plot(show=True, suggest=True)
fig.show()
此示例中的结果如下:
建议学习率:0.003981071705534973。
2. 定义EarlyStoping回调
此回调主要用于监测验证损失,并在损失连续几个时期没有改善时停止训练。这样可以防止模型过拟合。
early_stop_callback = EarlyStopping(monitor="val_loss", min_delta=1e-4, patience=10, verbose=True, mode="min")
这里需要注意的参数是“patience”,它主要控制在训练过程中,如果损失连续几个时期没有改善,何时停止。我们把它设置为10。
3. 定义ModelCheckpoint回调
此回调主要用于控制模型归档和归档的名称。我们主要设置这两个变量。
ck_callback=ModelCheckpoint(monitor='val_loss', mode="min", save_top_k=1, filename='{epoch}-{val_loss:.2f}')
“save_top_k”用于控制保存前几个最好的模型。我们将其设置为1,只保存最好的模型。
4. 定义训练模型
我们首先需要在lightning.pytarch中实例化一个Trainer类,并添加我们之前定义的两个回调。
trainer = pl.Trainer( max_epochs=ep, accelerator="cpu", enable_model_summary=True, gradient_clip_val=1.0, callbacks=[early_stop_callback,ck_callback], limit_train_batches=30, enable_checkpointing=True, )
这里我们需要注意的参数是“max_epochs”(最大训练时期数)、“gradient_clip_val”(用于防止梯度爆炸)和“回调”。这里“max_epochs”使用ep,这是我们稍后将定义的全局变量,而“callbacks”是我们的回调集合。
接下来,我们还需要定义NHiTS模型并实例化它:
net = NHiTS.from_dataset( training, learning_rate=lr, log_interval=10, log_val_interval=1, weight_decay=1e-2, backcast_loss_ratio=0.0, hidden_size=64, optimizer="AdamW", loss=MQF2DistributionLoss(prediction_length=max_prediction_length), )
这里,参数通常不需要修改,只需使用默认的参数即可。这里我们只将“loss”修改为MQF2DistributionLoss 损失函数。
5. 训练模块
我们使用Trainer对象的fit()函数来训练模型:
trainer.fit( net, train_dataloaders=train_dataloader, val_dataloaders=val_dataloader, )
类似地,我们将这部分代码封装到一个函数train()中:
def train(): early_stop_callback = EarlyStopping(monitor="val_loss", min_delta=1e-4, patience=10, # The number of times without improvement will stop verbose=True, mode="min") ck_callback=ModelCheckpoint(monitor='val_loss', mode="min", save_top_k=1, # Save the top few best ones filename='{epoch}-{val_loss:.2f}') trainer = pl.Trainer( max_epochs=ep, accelerator="cpu", enable_model_summary=True, gradient_clip_val=1.0, callbacks=[early_stop_callback,ck_callback], limit_train_batches=30, enable_checkpointing=True, ) net = NHiTS.from_dataset( training, learning_rate=lr, log_interval=10, log_val_interval=1, weight_decay=1e-2, backcast_loss_ratio=0.0, hidden_size=64, optimizer="AdamW", loss=MQF2DistributionLoss(prediction_length=max_prediction_length), ) trainer.fit( net, train_dataloaders=t_loader, val_dataloaders=v_loader, # ckpt_path='best' ) return trainer
此函数将返回一个经过训练的模型,可用于预测任务。
定义执行逻辑
1. 定义全局变量:
ep=200 __train=False mt_data_len=200000 max_encoder_length = 2*96 max_prediction_length = 30 batch_size = 128
__train用于控制我们当前是在训练还是测试模型。
值得注意的是,ep用于控制最大训练时期。由于我们已经设置了EarlyStoping,因此可以将该值设置得更大一点,因为当模型不再收敛时,它将自动停止。
mt_data_len是从客户端获得的最近时间序列数据的数量。
max_encoder_length 和 max_prediction_length 分别是最大编码长度和最大预测长度。
2.训练
当训练完成时,我们还需要将当前的最佳训练结果保存到本地文件中,因此我们定义了一个json文件来保存这些信息:
info_file='results.json'
为了使我们的训练过程更加清晰,我们需要避免在训练过程中输出一些不必要的警告信息,因此我们将添加以下代码:
warnings.filterwarnings("ignore")
接下来是我们的训练逻辑:
dt=get_data(mt_data_len=mt_data_len) if __train: # print(dt) # dt=get_data(mt_data_len=mt_data_len) t_loader,v_loader,training=spilt_data(dt, t_shuffle=False,t_drop_last=True, v_shuffle=False,v_drop_last=True) lr=get_learning_rate() trainer__=train() m_c_back=trainer__.checkpoint_callback m_l_back=trainer__.early_stopping_callback best_m_p=m_c_back.best_model_path best_m_l=m_l_back.best_score.item() # print(best_m_p) if os.path.exists(info_file): with open(info_file,'r+') as f1: last=json.load(fp=f1) last_best_model=last['last_best_model'] last_best_score=last['last_best_score'] if last_best_score > best_m_l: last['last_best_model']=best_m_p last['last_best_score']=best_m_l json.dump(last,fp=f1) else: with open(info_file,'w') as f2: json.dump(dict(last_best_model=best_m_p,last_best_score=best_m_l),fp=f2)
训练完成后,您可以在根目录的results.json文件中找到我们最佳模型的存储位置和最佳分数。
在训练过程中,您将看到一个进度条,显示每个 epoch 的进度。
训练:
训练完成
3. 验证模型
在训练之后,我们希望验证模型并将其可视化。我们可以添加以下代码:
best_model = NHiTS.load_from_checkpoint(best_m_p) predictions = best_model.predict(v_loader, trainer_kwargs=dict(accelerator="cpu",logger=False), return_y=True) raw_predictions = best_model.predict(v_loader, mode="raw", return_x=True, trainer_kwargs=dict(accelerator="cpu",logger=False)) for idx in range(10): # plot 10 examples best_model.plot_prediction(raw_predictions.x, raw_predictions.output, idx=idx, add_loss_to_title=True) # sample 500 paths samples = best_model.loss.sample(raw_predictions.output["prediction"][[0]], n_samples=500)[0] # plot prediction fig = best_model.plot_prediction(raw_predictions.x, raw_predictions.output, idx=0, add_loss_to_title=True) ax = fig.get_axes()[0] # plot first two sampled paths ax.plot(samples[:, 0], color="g", label="Sample 1") ax.plot(samples[:, 1], color="r", label="Sample 2") fig.legend() plt.show()
您也可以在训练期间使用TensorBoard实时查看训练情况的可视化,我们在这里不做演示。
结果:
4. 测试经过训练的模型
首先,我们打开json文件,找到最佳的模型存储位置:
with open(info_file) as f: best_m_p=json.load(fp=f)['last_best_model'] print('model path is:',best_m_p)
然后加载模型:
best_model = NHiTS.load_from_checkpoint(best_m_p)
然后,我们从客户端实时获取数据,用于测试模型:
offset=1 dt=dt.iloc[-max_encoder_length-offset:-offset,:] last_=dt.iloc[-1] #get the last group of data # print(len(dt)) for i in range(1,max_prediction_length+1): dt.loc[dt.index[-1]+1]=last_ dt['series']=0 # dt['time_idx']=dt.apply(lambda x:x.index,args=1) dt['time_idx']=dt.index-dt.index[0] # dt=get_data(mt_data_len=max_encoder_length) predictions=best_model.predict(dt,mode='raw',trainer_kwargs=dict(accelerator="cpu",logger=False),return_x=True) best_model.plot_prediction(predictions.x,predictions.output,show_future_observed=False) plt.show()
结果如下:
5.评估模型
当然,我们可以使用PyTorch预测库中的一些度量来评估模型的性能。以下是如何使用平均绝对误差(MAE)和对称平均绝对百分比误差(SMAPE)进行评估,并输出评估结果:
from pytorch_forecasting.metrics import MAE, SMAPE mae = MAE()(raw_predictions["prediction"], raw_predictions["target"]) print(f"Mean Absolute Error: {mae}") smape = SMAPE()(raw_predictions["prediction"], raw_predictions["target"]) print(f"Symmetric Mean Absolute Percentage Error: {smape}")
在这个代码片段中,我们首先导入MAE和SMAPE度量。然后,我们使用这些度量来计算预测值(raw_predictions[“prediction”])和实际值(raw_preditions[“target”])之间的误差。这些指标可以帮助我们了解模型的性能,并为进一步改进模型提供方向。
结论
在这篇文章中,我们研究了如何使用前两篇文章中提到的标签数据,并演示了如何使用我们的数据创建N-HiTs模型。然后我们对模型进行了训练并对模型进行验证。从结果图中我们可以很容易地看出,我们的结果是好的。我们还演示了如何在MT5中使用该模型来预测30个烛形。当然,我们没有提到如何根据预测结果下单,因为真实的交易需要读者根据您的实际情况进行大量测试,并指定相应的交易规则。最后,祝你玩得愉快!
附件:
完整代码:
# Copyright 2021, MetaQuotes Ltd. # https://www.mql5.com # from typing import Union import lightning.pytorch as pl import os from lightning.pytorch.callbacks import EarlyStopping,ModelCheckpoint import matplotlib.pyplot as plt import numpy as np import pandas as pd # import torch from pytorch_forecasting import NHiTS, TimeSeriesDataSet from pytorch_forecasting.data import NaNLabelEncoder,timeseries from pytorch_forecasting.metrics import MQF2DistributionLoss from pytorch_forecasting.data.samplers import TimeSynchronizedBatchSampler from lightning.pytorch.tuner import Tuner import MetaTrader5 as mt import warnings import json from torch.utils.data import DataLoader from torch.utils.data.sampler import Sampler,SequentialSampler class New_TmSrDt(TimeSeriesDataSet): ''' rewrite dataset class ''' def to_dataloader(self, train: bool = True, batch_size: int = 64, batch_sampler: Sampler | str = None, shuffle:bool=False, drop_last:bool=False, **kwargs) -> DataLoader: default_kwargs = dict( shuffle=shuffle, # drop_last=train and len(self) > batch_size, drop_last=drop_last, # collate_fn=self._collate_fn, batch_size=batch_size, batch_sampler=batch_sampler, ) default_kwargs.update(kwargs) kwargs = default_kwargs # print(kwargs['drop_last']) if kwargs["batch_sampler"] is not None: sampler = kwargs["batch_sampler"] if isinstance(sampler, str): if sampler == "synchronized": kwargs["batch_sampler"] = TimeSynchronizedBatchSampler( SequentialSampler(self), batch_size=kwargs["batch_size"], shuffle=kwargs["shuffle"], drop_last=kwargs["drop_last"], ) else: raise ValueError(f"batch_sampler {sampler} unknown - see docstring for valid batch_sampler") del kwargs["batch_size"] del kwargs["shuffle"] del kwargs["drop_last"] return DataLoader(self,**kwargs) def get_data(mt_data_len:int): if not mt.initialize(): print('initialize() failed!') else: print(mt.version()) sb=mt.symbols_total() rts=None if sb > 0: rts=mt.copy_rates_from_pos("GOLD_micro",mt.TIMEFRAME_M15,0,mt_data_len) mt.shutdown() # print(len(rts)) rts_fm=pd.DataFrame(rts) rts_fm['time']=pd.to_datetime(rts_fm['time'], unit='s') rts_fm['time_idx']= rts_fm.index%(max_encoder_length+2*max_prediction_length) rts_fm['series']=rts_fm.index//(max_encoder_length+2*max_prediction_length) return rts_fm def spilt_data(data:pd.DataFrame, t_drop_last:bool, t_shuffle:bool, v_drop_last:bool, v_shuffle:bool): training_cutoff = data["time_idx"].max() - max_prediction_length #max:95 context_length = max_encoder_length prediction_length = max_prediction_length training = New_TmSrDt( data[lambda x: x.time_idx <= training_cutoff], time_idx="time_idx", target="close", categorical_encoders={"series":NaNLabelEncoder().fit(data.series)}, group_ids=["series"], time_varying_unknown_reals=["close"], max_encoder_length=context_length, # min_encoder_length=max_encoder_length//2, max_prediction_length=prediction_length, # min_prediction_length=1, ) validation = New_TmSrDt.from_dataset(training, data, min_prediction_idx=training_cutoff + 1) train_dataloader = training.to_dataloader(train=True, shuffle=t_shuffle, drop_last=t_drop_last, batch_size=batch_size, num_workers=0,) val_dataloader = validation.to_dataloader(train=False, shuffle=v_shuffle, drop_last=v_drop_last, batch_size=batch_size, num_workers=0) return train_dataloader,val_dataloader,training def get_learning_rate(): pl.seed_everything(42) trainer = pl.Trainer(accelerator="cpu", gradient_clip_val=0.1,logger=False) net = NHiTS.from_dataset( training, learning_rate=3e-2, weight_decay=1e-2, loss=MQF2DistributionLoss(prediction_length=max_prediction_length), backcast_loss_ratio=0.0, hidden_size=64, optimizer="AdamW", ) res = Tuner(trainer).lr_find( net, train_dataloaders=t_loader, val_dataloaders=v_loader, min_lr=1e-5, max_lr=1e-1 ) # print(f"suggested learning rate: {res.suggestion()}") lr_=res.suggestion() return lr_ def train(): early_stop_callback = EarlyStopping(monitor="val_loss", min_delta=1e-4, patience=10, verbose=True, mode="min") ck_callback=ModelCheckpoint(monitor='val_loss', mode="min", save_top_k=1, filename='{epoch}-{val_loss:.2f}') trainer = pl.Trainer( max_epochs=ep, accelerator="cpu", enable_model_summary=True, gradient_clip_val=1.0, callbacks=[early_stop_callback,ck_callback], limit_train_batches=30, enable_checkpointing=True, ) net = NHiTS.from_dataset( training, learning_rate=lr, log_interval=10, log_val_interval=1, weight_decay=1e-2, backcast_loss_ratio=0.0, hidden_size=64, optimizer="AdamW", loss=MQF2DistributionLoss(prediction_length=max_prediction_length), ) trainer.fit( net, train_dataloaders=t_loader, val_dataloaders=v_loader, # ckpt_path='best' ) return trainer if __name__=='__main__': ep=200 __train=False mt_data_len=200000 max_encoder_length = 2*96 max_prediction_length = 30 batch_size = 128 info_file='results.json' warnings.filterwarnings("ignore") dt=get_data(mt_data_len=mt_data_len) if __train: # print(dt) # dt=get_data(mt_data_len=mt_data_len) t_loader,v_loader,training=spilt_data(dt, t_shuffle=False,t_drop_last=True, v_shuffle=False,v_drop_last=True) lr=get_learning_rate() trainer__=train() m_c_back=trainer__.checkpoint_callback m_l_back=trainer__.early_stopping_callback best_m_p=m_c_back.best_model_path best_m_l=m_l_back.best_score.item() # print(best_m_p) if os.path.exists(info_file): with open(info_file,'r+') as f1: last=json.load(fp=f1) last_best_model=last['last_best_model'] last_best_score=last['last_best_score'] if last_best_score > best_m_l: last['last_best_model']=best_m_p last['last_best_score']=best_m_l json.dump(last,fp=f1) else: with open(info_file,'w') as f2: json.dump(dict(last_best_model=best_m_p,last_best_score=best_m_l),fp=f2) best_model = NHiTS.load_from_checkpoint(best_m_p) predictions = best_model.predict(v_loader, trainer_kwargs=dict(accelerator="cpu",logger=False), return_y=True) raw_predictions = best_model.predict(v_loader, mode="raw", return_x=True, trainer_kwargs=dict(accelerator="cpu",logger=False)) for idx in range(10): # plot 10 examples best_model.plot_prediction(raw_predictions.x, raw_predictions.output, idx=idx, add_loss_to_title=True) samples = best_model.loss.sample(raw_predictions.output["prediction"][[0]], n_samples=500)[0] # plot prediction fig = best_model.plot_prediction(raw_predictions.x, raw_predictions.output, idx=0, add_loss_to_title=True) ax = fig.get_axes()[0] # plot first two sampled paths ax.plot(samples[:, 0], color="g", label="Sample 1") ax.plot(samples[:, 1], color="r", label="Sample 2") fig.legend() plt.show() else: with open(info_file) as f: best_m_p=json.load(fp=f)['last_best_model'] print('model path is:',best_m_p) best_model = NHiTS.load_from_checkpoint(best_m_p) offset=1 dt=dt.iloc[-max_encoder_length-offset:-offset,:] last_=dt.iloc[-1] # print(len(dt)) for i in range(1,max_prediction_length+1): dt.loc[dt.index[-1]+1]=last_ dt['series']=0 # dt['time_idx']=dt.apply(lambda x:x.index,args=1) dt['time_idx']=dt.index-dt.index[0] # dt=get_data(mt_data_len=max_encoder_length) predictions = best_model.predict(dt, mode='raw',trainer_kwargs=dict(accelerator="cpu",logger=False),return_x=True) best_model.plot_prediction(predictions.x,predictions.output,show_future_observed=False) plt.show()
本文由MetaQuotes Ltd译自英文
原文地址: https://www.mql5.com/en/articles/13255