
時系列マイニングのためのデータラベル(第5回):ソケットを使用したEAへの応用とテスト
はじめに
前回の記事では、自分のニーズに合わせてデータにアノテーションを付け、それを使用して時系列予測モデルを訓練する方法について説明しましたが、これらのモデルをより良く使用する方法について、あまり良いアイデアをお持ちでないかもしれません。次に、作成したモデルをMetaTrader 5の履歴バックテストで検証し、EAに組み込む方法について説明します。ただし、EAでは重要なロジックとしてストラテジーが必要であり、実際に使えるストラテジーには具体的な理論的根拠が必要であり、その堅牢性を確保するために多くの検証と調整が必要であることを知っておく必要があります。
この記事の戦略は非常に単純なものであり、あくまで簡単なデモンストレーション例です。もちろん、多くの様々なライブラリのサポートにより、pythonだけでこの作業を完了することもできますが、MetaTrader 5はこのような便利で包括的なバックテストツールを提供し、より正確に取引環境をシミュレートすることができるので、バックテストのプラットフォームとしてMetaTrader 5クライアントを選択する必要があります。ただ、モデル作成環境がpythonであるのに、MetaTrader 5の履歴バックテストはMQL5で実装しなければなりません。バックテスト作業の実装は少々難しいですが、解決策がないわけではありません。この記事では、モデルの品質の向上と向上に役立てるために、MetaTrader 5環境でモデルをバックテストするための3つの異なる方法について説明します。これから何回かに分けて様々なメソッドを紹介しますが、今回はWebSocketメソッドについて説明します。
目次
実施原則
まず、PythonスクリプトにWebサーバーのインスタンスを追加し、モデル推論を追加します。次に、MQL5を使用してWebクライアントを作成し、サーバーの推論サービスをリクエストします。
EAのロジックを以下のように仮定します。
- まず、OnTick()イベントがトリガーされるたびに、最新の300バーチャートデータがクライアントを通じてサーバーに送信されます。
- 情報を受信したサーバーは、モデル推論を通じて、次の6つの棒グラフの予測トレンドをEAクライアントに送信します。ここでは、予測をトレンドに分解できるため、前回の記事で紹介したNbeatsモデルを使用します。
- 下降トレンドなら売り、上昇トレンドなら買いを実行します。
Pythonサーバー関数の実装
pythonが提供するソケットには主に以下のような関数があります。
- socket.bind():アドレス(ホスト、ポート)をソケットにバインドします。AF_INETでは、アドレスはタプル(ホスト、ポート)で表されます。
- socket.listen():TCPリスニングを開始します。backlogは、接続を拒否する前にオペレーティングシステムが中断できる接続の最大数を指定します。この値は少なくとも1であり、ほとんどのアプリケーションでは5に設定されています。
- socket.accept():TCPクライアント接続を受動的に受け入れ、(ブロックして)接続が到着するのを待機します。
- socket.connect():TCPサーバー接続を積極的に初期化します。一般的に、アドレスの形式はタプル(hostname,port)です。接続に失敗した場合は、socket.errorエラーを返します。
- socket.connect_ex():connect()関数の拡張版で、エラーが発生すると例外socket.recv()を投げる代わりにエラーコードを返します。TCPデータを受信すると、データは文字列として返されます。bufsizeは受信するデータの最大量を指定します。flagはメッセージに関する追加情報を提供しますが、通常は無視できます。
- socket.send():TCPデータを送信し、接続されたソケットに文字列でデータを送信します。戻り値は送信されるバイト数で、文字列のバイトサイズより小さい場合があります。
- socket.sendall():TCPデータを完全に送信します。接続されているソケットに文字列でデータを送信します。ただし、戻る前にすべてのデータの送信を試みます。成功すればNoneを返し、失敗すれば例外を発生させます。
- socket.recvfrom():UDPデータを受信します。recv()と似ていますが、戻り値は(data,address)です。ここで、dataは受信データを含む文字列、addressはデータを送信するソケットのアドレスです。
- socket.sendto():UDPデータを送信します。ソケットにデータを送信します。addressは(ipaddr,port)という形式のタプルで、リモートアドレスを指定します。戻り値は送信したバイト数です。
- socket.close():ソケットを閉じます。
- socket.getpeername():接続されているソケットのリモートアドレスを返します。戻り値は通常、タプル(ipaddr,port)です。
- socket.getsockname():ソケット自身のアドレスを返します。通常はタプル(ipaddr,port)です。
- socket.setsockopt(level,optname,value):与えられたソケットオプションの値を設定します。
- socket.getsockopt(level,optname[.buflen]):ソケットオプションの値を返します。
- socket.settimeout(timeout):ソケット操作のタイムアウト時間を秒単位で設定します。タイムアウトは浮動小数点数です。Noneはタイムアウトがないことを意味します。接続操作(connect()など)に使用される可能性があるため、一般的に、タイムアウト時間はソケットが生成された直後に設定すべきです。
- socket.gettimeout():現在のタイムアウト時間を秒単位で返すか、タイムアウト時間が設定されていない場合はNoneを返します。
- socket.fileno():ソケットのファイルディスクリプタを返します。
- socket.setblocking(flag):フラグが0の場合、ソケットをノンブロッキングモードに設定し、それ以外の場合、ソケットをブロッキングモード(デフォルト値)に設定します。ノンブロッキングモードの場合、recv()を呼び出した際にデータが見つからなかったり、send()を呼び出した際にすぐにデータを送信できなかったりすると、socket.error例外が発生します。
- socket.makefile():ソケットに関連するファイルを作成します。
1.必要なパッケージをインポートする
このクラスの実装では、追加のパッケージをインストールする必要はなく、ソケットライブラリは通常デフォルトで含まれています(conda環境下で)。警告メッセージが邪魔な場合は、warningsモジュールを追加し、warnings.filterwarnings("ignore")という文を追加することができます。同時に、必要なグローバル変数も定義する必要があります。
- max_encoder_length=96
- max_prediction_length=20
- info_file="results.json"
これらのグローバル変数は、前回の記事で訓練したモデルに基づいて定義されています。
コードは次の通りです。
import socket import json from time import sleep import pandas as pd import numpy as np import warnings from pytorch_forecasting import NBeats warnings.filterwarnings("ignore") max_encoder_length=96 max_prediction_length=20 info_file="results.json"
2.サーバークラスの作成
サービスクラスを作成し、以下の関数を含むソケットの基本設定を初期化します。
socket.socket():2つのパラメータをsocket.AF_INETとsocket.SOCK_STREAMに設定します。
socket.socket()のbind()メソッド:この関数はhostパラメータを127.0.0.1に、portパラメータを8989に設定します。ここで、ホストを変更することは推奨されません。8989 が使用されている場合は、ポートを他の値に設定できます。
モデルは後で紹介するので、一時的にNoneに初期化します。
サーバーポートをリッスン(self.sk.listen(1))し、TCPクライアント接続を受動的に受け入れ、接続が到着するのを待つ(self.sk_, self.ad_ = self.sock.accept())必要があります。情報を受け取るためにループする際に初期化が繰り返されるのを避けるため、クラスの初期化でこれらのタスクを完了させます。
class server_: def __init__(self, host = '127.0.0.1', port = 8989): self.sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.host = host self.port = port self.sk.bind((self.host, self.port)) self.re = '' self.model=None self.stop=None self.sk.listen(1) self.sk_, self.ad_ = self.sk.accept() print('server running:',self.sk_, self.ad_)
メモ: サーバーをdockerまたはdockerのようなコンテナにデプロイする場合、ホストを0.0.0.0に設定する必要があるかもしれません。そうしないと、クライアントがサーバーを見つけられない可能性があります。
3. 受信した情報を処理するロジック
クラスメソッドmsg()を定義し、受信した情報をwhileループで処理します。ここで注意しなければならないのは、受信したデータはdecode("utf-8")でデコードする必要があるということです。処理された情報は推論ロジックの処理関数で送られます(send(bytes(eva(self.re), “utf-8”)))。ここで、推論ロジックの関数はeva()と定義され、パラメータは受信した情報で、後で実装します。次に、やるべきことがもう1つあります。EAバックテストが停止するときにサーバーも停止するようにすることです。そうしないと、バックグラウンドでリソースが占有されてしまいます。これは、EAが終了した後、サーバーにstop文字列を送信し、この文字列を受信したら、サーバーにループを停止させ、プロセスを終了させることでおこなうことができます。このクラス属性はサーバークラスの初期化ですでに追加してあるので、あとはこのシグナルを受信したときにtrueに設定するだけです。
def msg(self): self.re = '' while True: data = self.sk_.recv(2374) if not data: break data=data.decode("utf-8") # print(len(data)) if data=="stop": self.stop=True break self.re+=data bt=eva(self.re, self.model) bt=bytes(bt, "utf-8") self.sk_.send(bt) return self.re
注:この例では、self.sk_.recv(2374)というパラメータを2374に設定しています。これはちょうど300浮動小数点数の長さです。受信したデータに不備がある場合は、この値を調整することができます。
4. リソースを取り戻す
サーバーが停止した後は、リソースを取り戻す必要があります。
def __del__(self):
print("server closed!")
self.sk_.close()
self.ad_.close()
self.sock.close()
5. 推論ロジックを定義する
この例の推論ロジックは非常にシンプルです。モデルを読み込み、クライアントから与えられた棒グラフを使用して結果を予測し、それをトレンドに分解してクライアントに結果を送り返すだけです。注意しなければならないのは、ここでモデルを初期化するのではなく、サーバークラスの初期化でモデルを初期化することで、モデルがプリロードされ、いつでも推論できるようになるということです。
まず、モデルをロードする関数を定義し、サーバークラスの初期化でこの関数を呼び出して、インスタンス化されたモデルを取得します。前回は、モデルの保存と読み込みの処理を紹介しました。モデルは訓練後、フォルダのルートディレクトリにあるresults.json JSONファイルに情報を保存します。モデルを読み込みんでロードすることができます。もちろん、server.pyファイルもフォルダのルートディレクトリに配置する必要があります。def load_model(): with open(info_file) as f: m_p=json.load(fp=f)['last_best_model'] model = NBeats.load_from_checkpoint(m_p) return model次に、class server_(): self.model=load_model()のinit()関数に追加して初期化し、初期化されたモデルを推論関数に渡します。
def __init__(self, host = '127.0.0.1', port = 8989): self.sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.host = host self.port = port self.sk.bind((self.host, self.port)) self.re = '' self.model=load_model() self.stop=None self.sk.listen(1) self.sk_, self.ad_ = self.sk.accept() print('server running:',self.sk_, self.ad_)
次に、推論関数を完成させましょう。
ここで特に注意しなければならないのは、モデルが入力する必要があるデータ形式はDataFrame形式でなければならないということです。そのため、まず受信したデータをnumpyの配列に変換(msg=np.fromstring(msg, dtype=float, sep= ‘,’))してから、DataFrameに変換(dt=pd.DataFrame(msg))する必要があります。推論が完了すると、結果が返されます。最後のトレンド値がトレンド値の平均より大きければ上昇トレンド、そうでなければ下降トレンドとします。上昇トレンドなら「買い」、下降トレンドなら「売り」を返します。具体的な推論プロセスについては、本稿では改めて説明しないので、本連載の過去記事の推論プロセスを参照してください。もう1つ、ここで強調すべき点があります。DataFrameのclose列にモデルの予測値を設定したので、DataFrameに変換されたデータにclose列を追加する必要があります(dt['close']=dt)。
def eva(msg,model): offset=1 msg=np.fromstring(msg, dtype=float, sep= ',') # print(msg) dt=pd.DataFrame(msg) dt=dt.iloc[-max_encoder_length-offset:-offset,:] last_=dt.iloc[-1] for i in range(1,max_prediction_length+1): dt.loc[dt.index[-1]+1]=last_ dt['close']=dt dt['series']=0 dt['time_idx']=dt.index-dt.index[0] print(dt) predictions = model.predict(dt, mode='raw',trainer_kwargs=dict(accelerator="cpu",logger=False),return_x=True) trend =predictions.output["trend"][0].detach().cpu() if (trend[-1]-trend.mean()) >= 0: return "buy" else: return "sell"
次に、メインループを追加する必要があります。
まず、サービスクラスを初期化し、次にwhileループの中に情報処理関数を追加します。終了シグナルを受信したらループを終了し、プログラムを終了します。ループの実行速度が速くなりすぎないように、sleep(0.5)を追加してループ速度を制限し、CPUの使用率が高くならないようにしています。
while True: rem=sv.msg() if sv.stop: break sleep(0.5)ここまでで、簡単なサーバーは完成したので、次はクライアントをEAに実装します。
MQL5クライアント関数の実装
1.MQL5のソケット関数
現在、ソケットモジュールには以下の関数が含まれています。
- SocketCreate:指定された識別子でソケットを作成し、そのハンドルを返す
- SocketClose:ソケットを閉じる
- SocketConnect:タイムアウト制御付きでサーバーに接続する
- SocketIsConnected:ソケットが現在接続されているか確認する
- SocketIsReadable:ソケットから読み込めるバイト数を取得する
- SocketIsWritable:現在の時刻にソケットにデータを書き込めるかどうかを確認する
- SocketTimeouts:システムソケットオブジェクトのデータ受信送信タイムアウトを設定する
- SocketRead:ソケットからデータを読み込む
- SocketSend:ソケットにデータを書き込む
- SocketTlsHandshake:TLSHandshakeプロトコルを使用して、指定されたホストとのセキュアなTLS (SSL)接続を開始する
- SocketTlsCertificate:セキュアなネットワーク接続に使用される証明書データを取得する
- SocketTlsRead:セキュアなTLS接続からデータを読み込む
- SocketTlsReadAvailable:セキュアなTLS接続から利用可能なデータをすべて読み込む
- SocketTlsSend:セキュアなTLS接続でデータを送信する
これらのメソッドを参照することで、クライアント側で簡単に機能を追加することができます。
2. EA機能の実装
まず、EAの機能ロジックについて説明しましょう。
intOnInit()でソケットを初期化します。
次に、void OnTick()で、クライアントからデータの受信し、クライアントへの現在のバーチャートデータの送信、EAのバックテストロジックを実装します。
void OnDeinit(constintreason)では、サーバーにstopメッセージを送り、ソケットを閉じる必要があります。3. EAの初期化
まず、ソケット生成後にハンドルを受け取るグローバル変数intskを定義します。
OnInit()関数では、SocketCreate()を使用してクライアントを作成します(sk=SocketCreate())。
次に、サーバーのアドレスを定義します(string host=“127.0.0.1”;)。サーバーポートを定義します(int port=8989;)。
送信するデータ長を定義します。以前、一度に300データを送信することを説明しました(int data_len=300;)。
OnInit()関数では、初期化の状況を判断する必要があります。作成に失敗すると、初期化も失敗します。
次に、サーバーとの接続を作成します(SocketConnect(sk,host,port,1000))。ここで、ポートはサーバーサイドと一致している必要があります。接続に失敗すると、初期化も失敗します。
int sk=-1; string host="127.0.0.1"; int port= 8989; int OnInit() { //--- sk=SocketCreate(); Print(sk); Print(GetLastError()); if (sk==INVALID_HANDLE) { Print("Failed to create socket"); return INIT_FAILED; } if (!SocketConnect(sk,host, port,1000)) { Print("Failed to connect to server"); return INIT_FAILED; } //--- return(INIT_SUCCEEDED); }
void OnDeinit(const int reason) { socket.Disconnect(); }
4. 取引ロジック
ここでは、メインのデータ処理ロジックと取引ロジックをvoidOnTick()で定義する必要があります。
注文タスクを実行するための変数MqlTradeRequestrequestとMqlTradeResultresultを作成します。
サーバー情報を受信するためのchar配列変数charrecv_data[]を作成します。
double priceData[300]というdouble配列変数を作成し、チャートデータをコピーします。
変数stringdataToSendとchards[]を作成し、double配列をソケットで使用可能なchar配列に変換します。
まず、送信するデータをチャートからコピーする必要があります(int nc=CopyClose(Symbol(),0,0,data_len,priceData);)。
次に、データを文字列形式に変換しますfor(for(int i=0;i<ArraySize(priceData);i++) dataToSend+=(string)priceData[i]+“,”)。各データを区切るために「,」を使用します。
次に、文字列データをソケットで使用可能なchar配列に変換します(int dsl=StringToCharArray(dataToSend,ds))。
データ変換の後、SocketIsWritable(sk)を使用してソケットがデータを送信できるかどうかを判断し、できるならSocketSend(sk,ds,dsl)を使用してデータを送信します。
サーバーから情報を読み取る必要もあります。uint len=SocketIsReadable(sk)を使用して、現在のポートに利用可能なデータがあるかどうかを確認し、情報が空でない場合は、取引ロジックを実行します(int rsp_len=SocketRead(sk,recv_data,len,500))。lenはバッファサイズ、500はタイムアウト設定(ミリ秒)です。
buyが受信されたら、買い注文を出し、リクエストを以下のように設定します。
- 取引リクエスト構造リクエストをリセット:ZeroMemory(request)
- 取引コマンドを即座に実行するように設定:request.action = TRADE_ACTION_DEAL
- 取引通貨ペアを設定:request.symbol=Symbol()
- 注文の数量:request.volume = 0.1
- 注文のタイプ:request.type = ORDER_TYPE_BUY
- SymbolInfoDouble関数には2つの入力が必要で、1つ目は通貨ペアの文字列、2つ目はENUM_SYMBOL_INFO_DOUBLE列挙の型:request.price = SymbolInfoDouble(Symbol(), SYMBOL_ASK)
- 取引の許容スリッページ:request.deviation = 5
- その後、取引注文を送信:OrderSend(request, result)
sellを受信した場合は、売り注文を出し、次のようにリクエストを設定します(設定は買い注文を参照し、ここでは詳しく説明しません)。
- ZeroMemory(request)
- request.action = TRADE_ACTION_DEAL
- request.symbol = Symbol()
- request.volume = 0.1
- request.type = ORDER_TYPE_SELL
- request.price = SymbolInfoDouble(Symbol(), SYMBOL_BID)
- request.deviation = 5
- その後、取引注文を送信:OrderSend(request, result)
ここでは、テストコードでの問題を避けるために、実際の注文送信関数をコメントアウトし、バックテストでそれを開きます。
完全なコードは、次のとおりです。
void OnTick() { MqlTradeRequest request; MqlTradeResult result; char recv_data[]; double priceData[300]; string dataToSend; char ds[]; int nc=CopyClose(Symbol(),0,0,300,priceData); for(int i=0;i<ArraySize(priceData);i++) dataToSend+=(string)priceData[i]+","; int dsl=StringToCharArray(dataToSend,ds); if (SocketIsWritable(sk)) { Print("Send data:",dsl); int ssl=SocketSend(sk,ds,dsl); } uint len=SocketIsReadable(sk); if (len) { int rsp_len=SocketRead(sk,recv_data,len,500); if(rsp_len>0) { string result; result+=CharArrayToString(recv_data,0,rsp_len); Print("The predicted value is:",result); if (StringFind(result,"buy")) { ZeroMemory(request); request.action = TRADE_ACTION_DEAL; request.symbol = Symbol(); request.volume = 0.1; request.type = ORDER_TYPE_BUY; request.price = SymbolInfoDouble(Symbol(), SYMBOL_ASK); request.deviation = 5; //OrderSend(request, result); } else{ ZeroMemory(request); request.action = TRADE_ACTION_DEAL; request.symbol = Symbol(); request.volume = 0.1; request.type = ORDER_TYPE_SELL; request.price = SymbolInfoDouble(Symbol(), SYMBOL_BID); request.deviation = 5; //OrderSend(request, result); } } } }
メモ: SocketSend()関数のbuffer_maxlenパラメータは、サーバーの設定と一致していなければなりません。この値は、StringToCharArray()関数が実行されたときに自動的に計算されて返されます。
まずserver.pyを実行し、次にMetaTrader 5クライアントのチャートにEAを追加します。
バックテストの方法
先ほどMQL5でのソケットの制限について触れましたが、次にMQL5ファイルとpythonファイルの両方でwebsocketのサポートを追加する必要があります。
1. クライアントにWebSocketサポートを追加する
バックテストでは、Windows APIのwinhttp.mqhを使用することで、欲しい機能を実現できます。このAPIの詳細については、以下を参照してください。
マイクロソフトの公式ドキュメント:https://docs.microsoft.com/en-us/windows/win32/winhttp/winhttp-functions。ここでは主な関数のみを列挙します。
- WinHttpOpen():ライブラリを初期化し、アプリケーションが使用できるようにする
- WinHttpConnect():アプリケーションが通信したいサーバーのドメイン名を設定する
- WinHttpOpenRequest():HTTPリクエストハンドルを作成する
- WinHttpSetOption:HTTP接続の各種設定オプションを設定する
- WinHttpSendRequest:サーバーにリクエストを送信する
- WinHttpReceiveResponse:リクエスト送信後、サーバーからの応答を受信する
- WinHttpWebSocketCompleteUpgrade:サーバーから受信した応答がWebSocketプロトコルに適合していることを確認する
- WinHttpCloseHandle:以前に使用されたリソース記述子を破棄する
- WinHttpWebSocketSend:WebSocket接続でデータを送信する
- WinHttpWebSocketReceive:WebSocket接続を使用してデータを受信する
- WinHttpWebSocketClose:WebSocket接続を閉じる
- WinHttpWebSocketQueryCloseStatus:サーバーから送信されたクローズステータスメッセージを確認する
winhttp.mqhファイルをダウンロードし、クライアントデータフォルダ「Include\WinAPI\」にコピーします。では、コード部分を完成させましょう。
グローバル変数HINTERNET ses_h,cnt_h,re_h,ws_hに使用するハンドル変数を追加し、OnInit()で初期化します。
- まず、NULL:ses_h=cnt_h=re_h=ws_h=NULL;に設定して乱数を避けます
- 次に、HTTPセッションを開始します(ses_h=WinHttpOpen(“MT5”,WINHTTP_ACCESS_TYPE_DEFAULT_PROXY,NULL,NULL,0))。これが失敗したら初期化に失敗します
- サーバに接続します(cnt_h=WinHttpConnect(ses_h,host,port,0))。これが失敗したら初期化に失敗します
- リクエストの初期化を実行します(re_h=WinHttpOpenRequest(cnt_h,“GET”,NULL,NULL,NULL,NULL,0))。これが失敗すると初期化に失敗します
- WebSocketを設定します(WinHttpSetOption(re_h,WINHTTP_OPTION_UPGRADE_TO_WEB_SOCKET,nullpointer,0))。これが失敗したら初期化に失敗します
- WebSocketハンドシェイクリクエストを実行します(WinHttpSendRequest( re_h,NULL, 0,nullpointer, 0, 0, 0))。これが失敗したら初期化に失敗します
- サーバーのハンドシェーク応答を受け取ります(WinHttpReceiveResponse(re_h,nullpointer))。これが失敗したら初期化に失敗します
- WebSocketにアップグレードし、初期化後にハンドルを取得します(WinHttpWebSocketCompleteUpgrade(re_h,nv))。これが失敗したら初期化に失敗します
- アップグレードが完了したら、元のリクエストハンドルが必要なくなるので、閉じます:WinHttpCloseHandle(re_h);
このようにして、クライアントとサーバー間の接続プロセス全体が完了しました。これらのプロセスは、順番に厳密に実行されなければなりません。初期化失敗文の元の設定は、バックテストプロセスで常に有効であり、初期化に失敗する原因となるため、コメントアウトする必要があります。
int sk=-1; string host="127.0.0.1"; int port= 8989; int data_len=300; HINTERNET ses_h,cnt_h,re_h,ws_h; int OnInit() { //--- ses_h=cnt_h=re_h=ws_h=NULL; ses_h=WinHttpOpen("MT5", WINHTTP_ACCESS_TYPE_DEFAULT_PROXY, NULL, NULL, 0); Print(ses_h); if (ses_h==NULL){ Print("Http open failed!"); return INIT_FAILED; } cnt_h=WinHttpConnect(ses_h, host, port, 0); Print(cnt_h); if (cnt_h==-1){ Print("Http connect failed!"); return INIT_FAILED; } re_h=WinHttpOpenRequest(cnt_h, "GET", NULL, NULL, NULL, NULL, 0); if(re_h==NULL){ Print("Request open failed!"); return INIT_FAILED; } uchar nullpointer[]= {}; if(!WinHttpSetOption(re_h,WINHTTP_OPTION_UPGRADE_TO_WEB_SOCKET,nullpointer,0)) { Print("Set web socket failed!"); return INIT_FAILED; } bool br; br = WinHttpSendRequest( re_h, NULL, 0, nullpointer, 0, 0, 0); if (!br) { Print("send request failed!"); return INIT_FAILED; } br=WinHttpReceiveResponse(re_h,nullpointer); if (!br) { Print("receive response failed!",string(kernel32::GetLastError())); return INIT_FAILED; } ulong nv=0; ws_h=WinHttpWebSocketCompleteUpgrade(re_h,nv); if (!ws_h) { Print("Web socket upgrade failed!",string(kernel32::GetLastError())); return INIT_FAILED; } WinHttpCloseHandle(re_h); re_h=NULL; sk=SocketCreate(); Print(sk); Print(GetLastError()); if (sk==INVALID_HANDLE) { Print("Failed to create socket"); //return INIT_FAILED; } if (!SocketConnect(sk,host, port,1000)) { Print("Failed to connect to server"); //return INIT_FAILED; } //--- return(INIT_SUCCEEDED); }
次に、OnTick()関数に関連するロジックコードを追加します。
まず、ソケットハンドルのグローバル変数を定義しているので、ソケットの初期化に成功したかどうかで、通常環境かテスト環境かを判断することができます。したがって、「sk!=-1」がtrueの場合、ソケットの初期化が成功したことを意味するため、コードのこの部分を変更する必要はありません。「sk!=-1」がtrueでなければ、WebSocketの作業ロジックを完成させる必要があります。
- まずサーバーにデータを送信します(WinHttpWebSocketSend(ws_h,WINHTTP_WEB_SOCKET_BINARY_MESSAGE_BUFFER_TYPE,ds,dsl))。ここで特に注意しなければならないのは、この処理が成功した場合、関数の戻り値は0であり、そうでない場合は該当するエラーコードが返されるということです。
- 成功したら、受信したデータ変数を空にします(ZeroMemory(recv_data))。
- データを受信します(get=WinHttpWebSocketReceive(ws_h,recv_data,ArraySize(recv_data),rb,st))。データ受信に成功すれば戻り値は0、そうでなければエラーコードです。
- データを受信したら、データをデコードします(pre+=CharArrayToString(recv_data,0))。
サーバーがbuyを送ってきたら買い注文を出し、そうでなければ売り注文を出します。すでに注文がある場合は、まず未決済の注文があるかどうかを判断(nutt=PositionsTotal()>0)し、ある場合は注文タイプを取得(tpt=OrderGetInteger(ORDER_TYPE))。してから、注文タイプがORDER_TYPE_SELLかORDER_TYPE_BUYかを確認します。注文タイプがサーバーから送信されたトレンドと同じであれば、操作は必要ありません。注文タイプがトレンドと反対の場合、現在の注文を決済し、トレンドに一致する注文を出します。
このプロセスを紹介するために、buyサーバー情報を例に挙げます。
tpt==ORDER_TYPE_BUYの場合はそのまま戻ります。tpt==ORDER_TYPE_SELLの場合は売り注文があるので、request.order=tikとrequest.action=TRADE_ACTION_REMOVEを設定し、OrderSend(request,result)を実行すると、売り注文が決済されます。
注文がなければ、以下を設定します。
- request.action = TRADE_ACTION_DEAL;
- request.action = TRADE_ACTION_DEAL;
- request.symbol = Symbol();
- request.volume = 0.1;
- request.type = ORDER_TYPE_BUY;
- request.price = SymbolInfoDouble(Symbol(), SYMBOL_ASK);
- request.deviation = 5;
- request.type_filling=ORDER_FILLING_IOC;
OrderSend(request,result)を実行すると、買い注文が発注されます。サーバー情報がsellの場合も同様に設定しますが、ここでは詳細には触れません。
void OnTick() { //--- MqlTradeRequest request; MqlTradeResult result; char recv_data[5]; double priceData[300]; string dataToSend; char ds[]; int nc=CopyClose(Symbol(),0,0,data_len,priceData); for(int i=0;i<ArraySize(priceData);i++) dataToSend+=(string)priceData[i]+","; int dsl=StringToCharArray(dataToSend,ds); if (sk!=-1) { if (SocketIsWritable(sk)) { Print("Send data:",dsl); int ssl=SocketSend(sk,ds,dsl); } uint len=SocketIsReadable(sk); if (len) { int rsp_len=SocketRead(sk,recv_data,len,500); if(rsp_len>0) { string result=NULL; result+=CharArrayToString(recv_data,0,rsp_len); Print("The predicted value is:",result); if (StringFind(result,"buy")) { ZeroMemory(request); request.action = TRADE_ACTION_DEAL; request.symbol = "EURUSD"; request.volume = 0.1; request.type = ORDER_TYPE_BUY; request.price = SymbolInfoDouble("EURUSD", SYMBOL_ASK); request.deviation = 5; //OrderSend(request, result); } else{ ZeroMemory(request); request.action = TRADE_ACTION_DEAL; request.symbol = "EURUSD"; request.volume = 0.1; request.type = ORDER_TYPE_SELL; request.price = SymbolInfoDouble("EURUSD", SYMBOL_BID); request.deviation = 5; //OrderSend(request, result); } } } } else { ulong send=0; if (ws_h) { send=WinHttpWebSocketSend(ws_h, WINHTTP_WEB_SOCKET_BINARY_MESSAGE_BUFFER_TYPE, ds, dsl); //Print("Send data failed!",string(kernel32::GetLastError())); if(!send) { ZeroMemory(recv_data); ulong rb=0; WINHTTP_WEB_SOCKET_BUFFER_TYPE st=-1; ulong get=WinHttpWebSocketReceive(ws_h,recv_data,ArraySize(recv_data),rb,st); if (!get) { string pre=NULL; pre+=CharArrayToString(recv_data,0); Print("The predicted value is:",pre); ulong numt=0; ulong tik=0; bool sod=false; ulong tpt=-1; numt=PositionsTotal(); if (numt>0) { tik=OrderGetTicket(numt-1); sod=OrderSelect(tik); tpt=OrderGetInteger(ORDER_TYPE);//ORDER_TYPE_BUY or ORDER_TYPE_SELL } if (pre=="buy") { if (tpt==ORDER_TYPE_BUY) return; else if(tpt==ORDER_TYPE_SELL) { request.order=tik; request.action=TRADE_ACTION_REMOVE; Print("Close sell order."); } else{ ZeroMemory(request); request.action = TRADE_ACTION_DEAL; request.symbol = Symbol(); request.volume = 1; request.type = ORDER_TYPE_BUY; request.price = SymbolInfoDouble(Symbol(), SYMBOL_ASK); request.deviation = 5; request.type_filling=ORDER_FILLING_IOC; Print("Open buy order."); } OrderSend(request, result); } else{ if (tpt==ORDER_TYPE_SELL) return; else if(tpt==ORDER_TYPE_BUY) { request.order=tik; request.action=TRADE_ACTION_REMOVE; Print("Close buy order."); } else{ ZeroMemory(request); request.action = TRADE_ACTION_DEAL; request.symbol = Symbol(); request.volume = 1; request.type = ORDER_TYPE_SELL; request.price = SymbolInfoDouble(Symbol(), SYMBOL_BID); request.deviation = 5; request.type_filling=ORDER_FILLING_IOC; Print("OPen sell order."); } OrderSend(request, result); } } } } } }
この時点で、MQL5 WebSocketクライアントの設定は完了です。
2. サーバーサイドの設定
server.pyにwebsocketのサポートを追加する必要があります。
まず、必要なライブラリを読み込む必要があります。import base64
import hashlib
import struct
主な処理はサーバークラスのmsg(self)関数でおこなわれます。まず、websockerフラグ変数wsk=Falseを追加し、受信データがマスクされているかどうかを判断します。
マスクされている場合、データの2バイト目の上位ビットは1であり、(data[1] & 0x80) >> 7の値だけを判断すればよくなります。
マスクされていない場合は、data.decode("utf-8")を使用します。
マスクされている場合は、マスクキー(mask=data[4:8])とペイロードデータ(payload=data[8:])を見つけ、マスクを解除(for i in range(len(payload)):message += chr(payload[i] ^ mask[i % 4]))し、フラグ変数wskをtrueに設定します。
マスキングの問題を解決した後は、WebSocketハンドシェイクのプロセスも追加する必要があります。
まず、ハンドシェーク処理であるかどうかを判断します(if ‘\r\n\r\n’ in data;)。
ハンドシェーク処理であれば、キー値を取得します(data.split(“\r\n”)[4].split(": ")[1];)。
Sec-WebSocket-Acceptの値を計算します(base64.b64encode(hashlib.sha1((key+GUID).encode(‘utf-8’)).digest()),)。ここでGUIDは固定値「258EAFA5-E914-47DA-95CA-C5AB0DC85B11」です。
次に、ハンドシェイク応答ヘッダーを定義します。
response_tpl="HTTP/1.1 101 Switching Protocols\r\n" \ "Upgrade:websocket\r\n" \ "Connection: Upgrade\r\n" \ "Sec-WebSocket-Accept: %s\r\n" \ "WebSocket-Location: ws://%s/\r\n\r\n"
応答ヘッダーを埋めます(response_str = response_tpl % (ac.decode(‘utf-8’), “127.0.0.1:8989”))。
最後に、ハンドシェイク応答を送信します(self.sk_.send(bytes(response_str、encoding='utf-8')))。
もう1つ追加することがあります。それは、送信する情報をWebSocketで受け入れ可能な情報として処理することです。
if wsk: tk=b'\x81' lgt=len(bt) tk+=struct.pack('B',lgt) bt=tk+bt
これで、サーバーサイドの修正が必要な部分は基本的に完了しました。
def msg(self): self.re = '' wsk=False while True: data = self.sk_.recv(2500) if not data: break if (data[1] & 0x80) >> 7: fin = (data[0] & 0x80) >> 7 # FIN bit opcode = data[0] & 0x0f # opcode masked = (data[1] & 0x80) >> 7 # mask bit mask = data[4:8] # masking key payload = data[8:] # payload data print('fin is:{},opcode is:{},mask:{}'.format(fin,opcode,masked)) message = "" for i in range(len(payload)): message += chr(payload[i] ^ mask[i % 4]) data=message wsk=True else: data=data.decode("utf-8") if '\r\n\r\n' in data: key = data.split("\r\n")[4].split(": ")[1] print(key) GUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11" ac = base64.b64encode(hashlib.sha1((key+GUID).encode('utf-8')).digest()) response_tpl="HTTP/1.1 101 Switching Protocols\r\n" \ "Upgrade:websocket\r\n" \ "Connection: Upgrade\r\n" \ "Sec-WebSocket-Accept: %s\r\n" \ "WebSocket-Location: ws://%s/\r\n\r\n" response_str = response_tpl % (ac.decode('utf-8'), "127.0.0.1:8989") self.sk_.send(bytes(response_str, encoding='utf-8')) data=data.split('\r\n\r\n',1)[1] if "stop" in data: self.stop=True break if len(data)<200: break self.re+=data bt=eva(self.re, self.model) bt=bytes(bt, "utf-8") if wsk: tk=b'\x81' lgt=len(bt) tk+=struct.pack('B',lgt) bt=tk+bt self.sk_.sendall(bt) return self.re
3. 使用
まず、サーバーサイドを実行する必要があります。コマンドラインでserver.pyファイルのディレクトリを探し、サービスを開始するためにpythonserver.pyを実行します。
その後、MetaTrader 5クライアントに戻り、ソースコードを開き、Ctrl+F5を直接押すか、テストボタンをクリックしてテストを開始します。


バックテストの結果は以下の通りです。
注:
- チャートで直接テストしたい場合は、ご注意ください。これまでのコードでは、WebSocketとソケットの両方を同時に初期化します。もちろん、ソケットの初期化が成功すれば、実行ロジックはWebSocketのロジック部分を実行しませんが、無用なトラブルを避けるために、この場合はOnInit()のWebSocketの初期化部分をコメントアウトすることをお勧めします。
- OnTick()を使用してメインのロジックを完成させるだけでなく、OnTimer()にロジックを実装して、15分おきにデータを送信するなど、データを送信する特定の時間を設定できるようにすることも検討できます。これにより、クウォートが届くたびに頻繁にデータを送信する必要がなくなります。本記事では具体的な実装コードを示しません。本記事の実装方法を参考にして独自の実装コードを書いてください。
結論
この記事では、サーバークライアント方式を使用して、以前に訓練したモデルをバックテストする方法と、バックテストシナリオと非バックテストシナリオの両方でシステムをテストする方法を紹介しました。この記事には、言語横断的、領域横断的な知識が多く含まれていることは否定できません。理解するのが最も難しいのは、複雑なエンジニアリングプロジェクトであるWebSocketの部分でしょう。しかし、この記事のステップに従いさえすれば、必ず成功します。この記事は、かなり単純な戦略で私たちのモデルをテストするための例を提供しているに過ぎないことを強調しておく必要があります。実際の取引には使用しないでください。実際の取引では、このシステムを安定して稼動させるために各部を最適化する必要があるでしょう。もう一度言いますが、この例を実際の取引にそのまま使用しないでください。次回は、ソケット依存を取り除き、EAで直接モデルを使用する方法について説明します。
何かを学んでいただけたなら幸いです。
参考文献
MetaQuotes Ltdにより英語から翻訳されました。
元の記事: https://www.mql5.com/en/articles/13254





- 無料取引アプリ
- 8千を超えるシグナルをコピー
- 金融ニュースで金融マーケットを探索