English Русский Español Deutsch 日本語 Português
preview
为 MetaTrader 5 开发一款 MQTT 客户端:TDD 方式 - 第2部分

为 MetaTrader 5 开发一款 MQTT 客户端:TDD 方式 - 第2部分

MetaTrader 5积分 | 24 四月 2024, 09:46
283 0
Jocimar Lopes
Jocimar Lopes

概述

“过早的优化是万恶之源。”(唐纳德·克努思)

上一篇文章中,我们介绍了MQTT,这是一种高效的二进制发布/订阅消息传递协议。我们讨论了什么是MQTT,为什么它的开发始于25年前,以及它现在在几个行业中的用途,从汽车到物联网,从航空航天到简单的聊天应用程序。我们看到MQTT在任何需要内容类型无关的消息共享协议的上下文中都是有用的,包括交易应用程序的上下文。我们说明了在我们的代码库中包含用于MQTT的本地MQL5客户端的好处,并且我们确实使用在WSL(Windows Subsystem for Linux)上运行的Mosquito开源MQTT代理程序设置了最低限度的开发环境。

我们开始开发我们的本地客户端,只使用硬编码的固定头生成器功能,并达到了我们能够连接到本地Mosquitto代理的程度,但由于协议错误,服务器立即重置了连接。

引用上一篇文章底部的一句话:

“所以,是的,我们的CONNECT固定头被Mosquito识别,但由于协议错误,<unknown>客户端立即断开连接”。发生此错误是因为我们还没有包括变量头、协议名称、协议级别和其他相关元数据。我们将在下一步解决这个问题。”

图01-Metaeditor专家日志连接响应失败

图01-显示连接响应的Metaeditor专家选项卡日志:失败


由于我们使用的是测试驱动开发(Test-Driven Development,TDD)方法,我们将通过为CONNECT数据包生成器编写一个测试来开始这一步骤,该生成器包括那些相关的元数据,并且不会“由于协议错误”而被拒绝。

甚至在我们编写要测试的代码之前编写这个测试,对我们许多人来说可能是违反直觉的。但如果我们将测试视为项目需求的客观描述,如果我们将其视为开发人员所能达到的目标的最客观定义,那么这似乎是非常自然的,即使是在任何企业之前要完成的预期步骤。

“单元测试是文档。它们描述了系统的最底层设计。它们毫不含糊,准确无误,用观众能理解的语言写成,而且非常正式,以至于执行起来。它们是现存的最好的底层文档。什么专业人员不提供此类文件?”(Robert Martin,《整洁编码》,2011年)


代码组织:OOP和头文件

如上所述,我们通过用“正确”的值硬编码字节数组,开始构建连接数据包固定头。然后,我们尝试通过将硬编码的字节数组发送到本地代理来连接我们的客户端。“由于协议错误”,我们的连接尝试失败得很惨。但与此同时,我们了解了一些关于我们的开发环境,对于我们的日志,写了我们的第一个测试,最重要的是,我们开始做一些有效的事情,至少是小步前进。

正如你所看到的,那次失败是故意的。但我们知道,从长远来看,这种开发复杂应用程序的方式是不可持续的。

构建一致的MQTT数据包只是编写健壮和可维护的客户端过程中的第一步,可以说是最简单的部分。当涉及到操作行为规范时,协议的所有复杂性都会显现出来。这种复杂性将要求我们作为开发人员承担更多的工作。除了发送好的数据包,我们还需要处理大量不同的服务器响应和不同的应用程序状态。在这一点上,硬编码的字节数组——或者任何硬编码的东西——都是不够的。

幸运的是,MQL5是一种面向对象的编程语言,我们不在MQTT最初设计的内存/CPU约束环境中工作。因此,我们可以利用面向对象(OOP)范式的所有好处,以实现:

  • 通过选择正确的抽象级别,轻松推理协议
  • 简单的代码读取(请记住,读取代码的次数比写入的次数多得多)
  • 简单的代码维护,
  • 以及简单的测试

MQL5的参考文档对面向对象编程的MQL5提供了广泛的支持,它的整个部分都专门讨论该主题。 


协议定义

消息共享协议是在两个或多个实体之间建立共同理解基础的一组规则。在我们的情况下,在两个或多个设备之间。这些规则中的许多都是考虑到以前做过的事情来关注该做什么。它们是有状态的。为了选择下一个操作,我们的代码必须评估应用程序的当前状态。用MQTT协议的说法,这些是操作行为规则。

除了有状态规则——事实上,在它们之前——还有独立于应用程序状态的术语、值和计算的定义。它们通常是常量、枚举和评估算法,分别是MQTT协议名称、控制数据包类型和固定头剩余长度字节值。

我们将在两个不同的头文件中收集这两组不同的规则。其中第一个只是用于我们的文件中共享的术语和值的定义。毫无疑问,我们将把它命名为Defines.mqh。这些术语和值通常是常量,并且该文件应该几乎不发生任何变化。

另一个头文件将包含一些共享枚举、结构和函数。它将被命名为MQTT.mqh。这些枚举、结构和函数将有很大的发展,而且不仅仅是在我们开发第一个版本的时候。每当我们进行改进、优化和bug修复时,此文件都会更改。也有可能此文件将被细分为其他更具体的文件。

使用头文件进行代码组织的做法与面向对象编程无关。事实上,我们已经在Brian Kernighan和Dennis Ritchie的经典著作《C编程语言》中找到了关于它们的有用注释。 

“(…)文件之间共享的定义和声明。我们希望尽可能集中这一点,这样随着程序的发展,只有一个副本可以获得并保持正确。(…)在程序大小适中的情况下,最好有一个头文件,其中包含要在程序的任何两个部分之间共享的所有内容(…)。对于更大的程序,需要更多的组织和头文件。”

但是,在面向对象编程中,以小编译单元组织代码的做法才是真正的亮点。此外,由于我们正在构建一个库,几乎所有的代码都将在头文件中。

Defines 头文件

此时,协议名称和协议级别定义仅用于CONNECT数据包。因此,如果我们愿意,我们可以将它们放在特定的CPktConnect类中(见下文)。但为了保持一致性,我们将它们保留在Defines头文件中。尽管目前它们仅用于CONNECT数据包,但以后可能会用于其他文件。

关于该协议的注释是对官方标准规范的字面引用。

//+------------------------------------------------------------------+
//|                                                      Defines.mqh |
//|            ********* WORK IN PROGRESS **********                 |
//| **** PART OF ARTICLE https://www.mql5.com/en/articles/13334 **** |
//+------------------------------------------------------------------+
//+------------------------------------------------------------------+
//|              PROTOCOL NAME AND VERSION                           |
//+------------------------------------------------------------------+
#define MQTT_PROTOCOL_NAME_LENGTH_MSB           0x00
#define MQTT_PROTOCOL_NAME_LENGTH_LSB           0x04
#define MQTT_PROTOCOL_NAME_BYTE_3               'M'
#define MQTT_PROTOCOL_NAME_BYTE_4               'Q'
#define MQTT_PROTOCOL_NAME_BYTE_5               'T'
#define MQTT_PROTOCOL_NAME_BYTE_6               'T'
#define MQTT_PROTOCOL_VERSION                   0x05
//+------------------------------------------------------------------+
//|              PROPERTIES                                          |
//+------------------------------------------------------------------+
/*
The last field in the Variable Header of the CONNECT, CONNACK, PUBLISH, PUBACK, PUBREC,
PUBREL, PUBCOMP, SUBSCRIBE, SUBACK, UNSUBSCRIBE, UNSUBACK, DISCONNECT, and
AUTH packet is a set of Properties. In the CONNECT packet there is also an optional set of Properties in
the Will Properties field with the Payload
*/
#define MQTT_PROPERTY_PAYLOAD_FORMAT_INDICATOR          0x01 // (1) Byte                  
#define MQTT_PROPERTY_MESSAGE_EXPIRY_INTERVAL           0x02 // (2) Four Byte Integer     
#define MQTT_PROPERTY_CONTENT_TYPE                      0x03 // (3) UTF-8 Encoded String  
#define MQTT_PROPERTY_RESPONSE_TOPIC                    0x08 // (8) UTF-8 Encoded String  
#define MQTT_PROPERTY_CORRELATION_DATA                  0x09 // (9) Binary Data           
#define MQTT_PROPERTY_SUBSCRIPTION_IDENTIFIER           0x0B // (11) Variable Byte Integer
#define MQTT_PROPERTY_SESSION_EXPIRY_INTERVAL           0x11 // (17) Four Byte Integer    
#define MQTT_PROPERTY_ASSIGNED_CLIENT_IDENTIFIER        0x12 // (18) UTF-8 Encoded String  
#define MQTT_PROPERTY_SERVER_KEEP_ALIVE                 0x13 // (19) Two Byte Integer      
#define MQTT_PROPERTY_AUTHENTICATION_METHOD             0x15 // (21) UTF-8 Encoded String 
#define MQTT_PROPERTY_AUTHENTICATION_DATA               0x16 // (22) Binary Data          
#define MQTT_PROPERTY_REQUEST_PROBLEM_INFORMATION       0x17 // (23) Byte                  
#define MQTT_PROPERTY_WILL_DELAY_INTERVAL               0x18 // (24) Four Byte Integer    
#define MQTT_PROPERTY_REQUEST_RESPONSE_INFORMATION      0x19 // (25) Byte                  
#define MQTT_PROPERTY_RESPONSE_INFORMATION              0x1A // (26) UTF-8 Encoded String  
#define MQTT_PROPERTY_SERVER_REFERENCE                  0x1C // (28) UTF-8 Encoded String 
#define MQTT_PROPERTY_REASON_STRING                     0x1F // (31) UTF-8 Encoded String
#define MQTT_PROPERTY_RECEIVE_MAXIMUM                   0x21 // (33) Two Byte Integer     
#define MQTT_PROPERTY_TOPIC_ALIAS_MAXIMUM               0x22 // (34) Two Byte Integer     
#define MQTT_PROPERTY_TOPIC_ALIAS                       0x23 // (35) Two Byte Integer     
#define MQTT_PROPERTY_MAXIMUM_QOS                       0x24 // (36) Byte                 
#define MQTT_PROPERTY_RETAIN_AVAILABLE                  0x25 // (37) Byte                 
#define MQTT_PROPERTY_USER_PROPERTY                     0x26 // (38) UTF-8 String Pair   
#define MQTT_PROPERTY_MAXIMUM_PACKET_SIZE               0x27 // (39) Four Byte Integer    
#define MQTT_PROPERTY_WILDCARD_SUBSCRIPTION_AVAILABLE   0x28 // (40) Byte                  
#define MQTT_PROPERTY_SUBSCRIPTION_IDENTIFIER_AVAILABLE 0x29 // (41) Byte                  
#define MQTT_PROPERTY_SHARED_SUBSCRIPTION_AVAILABLE     0x2A // (42) Byte 
//+------------------------------------------------------------------+
//|              REASON CODES                                        |
//+------------------------------------------------------------------+
/*
A Reason Code is a one byte unsigned value that indicates the result of an operation. Reason Codes less
than 0x80 indicate successful completion of an operation. The normal Reason Code for success is 0.
Reason Code values of 0x80 or greater indicate failure.

The CONNACK, PUBACK, PUBREC, PUBREL, PUBCOMP, DISCONNECT and AUTH Control Packets
have a single Reason Code as part of the Variable Header. The SUBACK and UNSUBACK packets
contain a list of one or more Reason Codes in the Payload.
*/
#define MQTT_REASON_CODE_SUCCESS                                0x00 // (0)
#define MQTT_REASON_CODE_NORMAL_DISCONNECTION                   0x00 // (0)
#define MQTT_REASON_CODE_GRANTED_QOS_0                          0x00 // (0)
#define MQTT_REASON_CODE_GRANTED_QOS_1                          0x01 // (1)
#define MQTT_REASON_CODE_GRANTED_QOS_2                          0x02 // (2)
#define MQTT_REASON_CODE_DISCONNECT_WITH_WILL_MESSAGE           0x04 // (4)
#define MQTT_REASON_CODE_NO_MATCHING_SUBSCRIBERS                0x10 // (16)
#define MQTT_REASON_CODE_NO_SUBSCRIPTION_EXISTED                0x11 // (17)
#define MQTT_REASON_CODE_CONTINUE_AUTHENTICATION                0x18 // (24)
#define MQTT_REASON_CODE_RE_AUTHENTICATE                        0x19 // (25)
#define MQTT_REASON_CODE_UNSPECIFIED_ERROR                      0x80 // (128)
#define MQTT_REASON_CODE_MALFORMED_PACKET                       0x81 // (129)
#define MQTT_REASON_CODE_PROTOCOL_ERROR                         0x82 // (130)
#define MQTT_REASON_CODE_IMPLEMENTATION_SPECIFIC_ERROR          0x83 // (131)
#define MQTT_REASON_CODE_UNSUPPORTED_PROTOCOL_VERSION           0x84 // (132)
#define MQTT_REASON_CODE_CLIENT_IDENTIFIER_NOT_VALID            0x85 // (133)
#define MQTT_REASON_CODE_BAD_USER_NAME_OR_PASSWORD              0x86 // (134)
#define MQTT_REASON_CODE_NOT_AUTHORIZED                         0x87 // (135)
#define MQTT_REASON_CODE_SERVER_UNAVAILABLE                     0x88 // (136)
#define MQTT_REASON_CODE_SERVER_BUSY                            0x89 // (137)
#define MQTT_REASON_CODE_BANNED                                 0x8A // (138)
#define MQTT_REASON_CODE_SERVER_SHUTTING_DOWN                   0x8B // (139)
#define MQTT_REASON_CODE_BAD_AUTHENTICATION_METHOD              0x8C // (140)
#define MQTT_REASON_CODE_KEEP_ALIVE_TIMEOUT                     0x8D // (141)
#define MQTT_REASON_CODE_SESSION_TAKEN_OVER                     0x8E // (142)
#define MQTT_REASON_CODE_TOPIC_FILTER_INVALID                   0x8F // (143)
#define MQTT_REASON_CODE_TOPIC_NAME_INVALID                     0x90 // (144)
#define MQTT_REASON_CODE_PACKET_IDENTIFIER_IN_USE               0x91 // (145)
#define MQTT_REASON_CODE_PACKET_IDENTIFIER_NOT_FOUND            0x92 // (146)
#define MQTT_REASON_CODE_RECEIVE_MAXIMUM_EXCEEDED               0x93 // (147)
#define MQTT_REASON_CODE_TOPIC_ALIAS_INVALID                    0x94 // (148)
#define MQTT_REASON_CODE_PACKET_TOO_LARGE                       0x95 // (149)
#define MQTT_REASON_CODE_MESSAGE_RATE_TOO_HIGH                  0x96 // (150)
#define MQTT_REASON_CODE_QUOTA_EXCEEDED                         0x97 // (151)
#define MQTT_REASON_CODE_ADMINISTRATIVE_ACTION                  0x98 // (152)
#define MQTT_REASON_CODE_PAYLOAD_FORMAT_INVALID                 0x99 // (153)
#define MQTT_REASON_CODE_RETAIN_NOT_SUPPORTED                   0x9A // (154)
#define MQTT_REASON_CODE_QOS_NOT_SUPPORTED                      0x9B // (155)
#define MQTT_REASON_CODE_USE_ANOTHER_SERVER                     0x9C // (156)
#define MQTT_REASON_CODE_SERVER_MOVED                           0x9D // (157)
#define MQTT_REASON_CODE_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED     0x9E // (158)
#define MQTT_REASON_CODE_CONNECTION_RATE_EXCEEDED               0x9F // (159)
#define MQTT_REASON_CODE_MAXIMUM_CONNECT_TIME                   0xA0 // (160)
#define MQTT_REASON_CODE_SUBSCRIPTION_IDENTIFIERS_NOT_SUPPORTED 0xA1 // (161)
#define MQTT_REASON_CODE_WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED   0xA2 // (162)

请注意,我们在所有特定于协议的定义前面加上了MQTT。这是为了将它们与将会未来加入的我们自己的定义区分开来。还请注意,在协议名称和版本的Definition.mqh文件顶部,我们正在尝试在命名的标识符中尽可能明确。这是为了应对所谓的整洁准则的原则。这种做法应该有助于使我们的代码更易于阅读、调试和IDE友好,也就是说,更易于搜索,非常适合利用现代IDE的自动完成功能。


MQTT 头文件

//+------------------------------------------------------------------+
//|                                                         MQTT.mqh |
//|            ********* WORK IN PROGRESS **********                 |
//| **** PART OF ARTICLE https://www.mql5.com/en/articles/13334 **** |
//+------------------------------------------------------------------+
#include "Defines.mqh"
//+------------------------------------------------------------------+
//|              MQTT - CONTROL PACKET - TYPES                       |
//+------------------------------------------------------------------+
/*
Position: byte 1, bits 7-4.
Represented as a 4-bit unsigned value, the values are shown below.
*/
enum ENUM_PKT_TYPE
  {
   CONNECT     =  0x01, // Connection request
   CONNACK     =  0x02, // Connection Acknowledgment
   PUBLISH     =  0x03, // Publish message
   PUBACK      =  0x04, // Publish acknowledgment (QoS 1)
   PUBREC      =  0x05, // Publish received (QoS 2 delivery part 1)
   PUBREL      =  0x06, // Publish release (QoS 2 delivery part 2)
   PUBCOMP     =  0x07, // Publish complete (QoS 2 delivery part 3)
   SUBSCRIBE   =  0x08, // Subscribe request
   SUBACK      =  0x09, // Subscribe acknowledgment
   UNSUBSCRIBE =  0x0A, // Unsubscribe request
   UNSUBACK    =  0x0B, // Unsubscribe acknowledgment
   PINGREQ     =  0x0C, // PING request
   PINGRESP    =  0x0D, // PING response
   DISCONNECT  =  0x0E, // Disconnect notification
   AUTH        =  0x0F, // Authentication exchange
  };
//+------------------------------------------------------------------+
//|             CONNECT - VARIABLE HEADER - CONNECT FLAGS            |
//+------------------------------------------------------------------+
/*
The Connect Flags byte contains several parameters specifying the behavior of the MQTT connection. It
also indicates the presence or absence of fields in the Payload.
*/
enum ENUM_CONNECT_FLAGS
  {
   RESERVED       = 0x00,
   CLEAN_START    = 0x02,
   WILL_FLAG      = 0x04,
   WILL_QOS_1     = 0x08,
   WILL_QOS_2     = 0x10,
   WILL_RETAIN    = 0x20,
   PASSWORD_FLAG  = 0x40,
   USER_NAME_FLAG = 0x80
  };
//+------------------------------------------------------------------+
//|             CONNECT - VARIABLE HEADER - QoS LEVELS               |
//+------------------------------------------------------------------+
/*
Position: bits 4 and 3 of the Connect Flags.
These two bits specify the QoS level to be used when publishing the Will Message.
If the Will Flag is set to 0, then the Will QoS MUST be set to 0 (0x00) [MQTT-3.1.2-11].
If the Will Flag is set to 1, the value of Will QoS can be 0 (0x00), 1 (0x01), or 2 (0x02) [MQTT-3.1.2-12].
*/
enum ENUM_QOS_LEVEL
  {
   AT_MOST_ONCE   = 0x00,
   AT_LEAST_ONCE  = 0x01,
   EXACTLY_ONCE   = 0x02
  };
//+------------------------------------------------------------------+
//|                   SetProtocolVersion                             |
//+------------------------------------------------------------------+
void SetProtocolVersion(uchar& dest_buf[])
  {
   dest_buf[8] = MQTT_PROTOCOL_VERSION;
  }
//+------------------------------------------------------------------+
//|                     SetProtocolName                              |
//+------------------------------------------------------------------+
void SetProtocolName(uchar& dest_buf[])
  {
   dest_buf[2] = MQTT_PROTOCOL_NAME_LENGTH_MSB;
   dest_buf[3] = MQTT_PROTOCOL_NAME_LENGTH_LSB;
   dest_buf[4] = MQTT_PROTOCOL_NAME_BYTE_3;
   dest_buf[5] = MQTT_PROTOCOL_NAME_BYTE_4;
   dest_buf[6] = MQTT_PROTOCOL_NAME_BYTE_5;
   dest_buf[7] = MQTT_PROTOCOL_NAME_BYTE_6;
  }
//+------------------------------------------------------------------+
//|                     SetFixedHeader                               |
//+------------------------------------------------------------------+
void SetFixedHeader(ENUM_PKT_TYPE pkt_type, uchar& buf[], uchar& dest_buf[])
  {
   dest_buf[0] = (uchar)pkt_type << 4;
   dest_buf[1] = GetRemainingLength(buf);
  }
//+------------------------------------------------------------------+
//|                    GetRemainingLength                            |
//+------------------------------------------------------------------+
/*
Position: starts at byte 2.
The Remaining Length is a Variable Byte Integer that represents the number of bytes remaining within the
current Control Packet, including data in the Variable Header and the Payload. The Remaining Length
does not include the bytes used to encode the Remaining Length. The packet size is the total number of
bytes in an MQTT Control Packet, this is equal to the length of the Fixed Header plus the Remaining
Length.
*/
uchar GetRemainingLength(uchar &buf[])
  {
   uint x;
   x = ArraySize(buf);
   uint rem_len;
   do
     {
      rem_len = x % 128;
      x = (x / 128);
      if(x > 0)
        {
         rem_len = rem_len | 128;
        }
     }
   while(x > 0);
   return (uchar)rem_len;
  };

//+------------------------------------------------------------------+


类和结构

MQTT控制数据包接口

在这里,我们有一个设计选择:用抽象类或接口启动控制数据包(Control Packets)的对象层次结构。我们可以从一个非常适合任何控制包的通用基类开始。这个抽象类将专门用于更具体的派生控制包类。或者,我们可以从一个简单的接口开始,由这些控制包类来实现。

我们从一个接口IcontrolPacket开始。这个接口将有一个简单的方法。当实现协议的操作行为(Operational Behavior)部分时,此选择可能会更改。我们可能会将此接口更改为具有一些虚拟函数的抽象类

//+------------------------------------------------------------------+
//|                                               IControlPacket.mqh |
//|            ********* WORK IN PROGRESS **********                 |
//| **** PART OF ARTICLE https://www.mql5.com/en/articles/13334 **** |
//+------------------------------------------------------------------+
#include "MQTT.mqh"
//+------------------------------------------------------------------+
//|       Interface IControlPacket                                   |
//|       The root of object hierarchy                               |
//+------------------------------------------------------------------+
interface IControlPacket
  {

   bool              IsControlPacket();

  };
//+------------------------------------------------------------------+

如上所述,目前该接口的唯一目的是充当MQTT数据包对象层次结构的根。在这一点上,它只不过是一个花哨的占位符。

MQTT控制数据包连接类

CONNECT控制数据包是写入要求最高的数据包。除了我们还没有熟悉协议这一事实之外,这个特定的数据包在版本5.0中得到了最显著的改进,即连接属性(Connect Properties)和用户属性(User Properties)。

//+------------------------------------------------------------------+
//|                                                   PktConnect.mqh |
//|            ********* WORK IN PROGRESS **********                 |
//| **** PART OF ARTICLE https://www.mql5.com/en/articles/13334 **** |
//+------------------------------------------------------------------+
#include "MQTT.mqh"
#include "Defines.mqh"
#include "IControlPacket.mqh"
//+------------------------------------------------------------------+
//|        CONNECT VARIABLE HEADER                                   |
//+------------------------------------------------------------------+
/*
The Variable Header for the CONNECT Packet contains the following fields in this order:
Protocol Name,Protocol Level, Connect Flags, Keep Alive, and Properties.
*/
struct MqttClientIdentifierLength
  {
   uchar             msb;
   uchar             lsb;
  } clientIdLen;
//---
struct MqttKeepAlive
  {
   uchar             msb;
   uchar             lsb;
  } keepAlive;
//---
struct MqttConnectProperties
  {
   uint              prop_len;
   uchar             session_expiry_interval_id;
   uint              session_expiry_interval;
   uchar             receive_maximum_id;
   ushort            receive_maximum;
   uchar             maximum_packet_size_id;
   ushort            maximum_packet_size;
   uchar             topic_alias_maximum_id;
   ushort            topic_alias_maximum;
   uchar             request_response_information_id;
   uchar             request_response_information;
   uchar             request_problem_information_id;
   uchar             request_problem_information;
   uchar             user_property_id;
   string            user_property_key;
   string            user_property_value;
   uchar             authentication_method_id;
   string            authentication_method;
   uchar             authentication_data_id;
  } connectProps;
//---
struct MqttConnectPayload
  {
   uchar             client_id_len;
   string            client_id;
   ushort            will_properties_len;
   uchar             will_delay_interval_id;
   uint              will_delay_interval;
   uchar             payload_format_indicator_id;
   uchar             payload_format_indicator;
   uchar             message_expiry_interval_id;
   uint              message_expiry_interval;
   uchar             content_type_id;
   string            content_type;
   uchar             response_topic_id; // for request/response
   string            response_topic;
   uchar             correlation_data_id; // for request/response
   ulong             correlation_data[]; // binary data
   uchar             user_property_id;
   string            user_property_key;
   string            user_property_value;
   uchar             will_topic_len;
   string            will_topic;
   uchar             will_payload_len;
   ulong             will_payload[]; // binary data
   uchar             user_name_len;
   string            user_name;
   uchar             password_len;
   ulong             password; // binary data
  } connectPayload;
//+------------------------------------------------------------------+
//| Class CPktConnect.                                               |
//| Purpose: Class of MQTT Connect Control Packets.                  |
//|          Implements IControlPacket                               |
//+------------------------------------------------------------------+
class CPktConnect : public IControlPacket
  {
private:
   bool              IsControlPacket() {return true;}
protected:
   void              InitConnectFlags() {ByteArray[9] = 0;}
   void              InitKeepAlive() {ByteArray[10] = 0; ByteArray[11] = 0;}
   void              InitPropertiesLength() {ByteArray[12] = 0;}
   uchar             m_connect_flags;

public:
                     CPktConnect();
                     CPktConnect(uchar &buf[]);
                    ~CPktConnect();
   //--- methods for setting Connect Flags
   void              SetCleanStart(const bool cleanStart);
   void              SetWillFlag(const bool willFlag);
   void              SetWillQoS_1(const bool willQoS_1);
   void              SetWillQoS_2(const bool willQoS_2);
   void              SetWillRetain(const bool willRetain);
   void              SetPasswordFlag(const bool passwordFlag);
   void              SetUserNameFlag(const bool userNameFlag);
   void              SetKeepAlive(ushort seconds);
   void              SetClientIdentifierLength(string clientId);
   void              SetClientIdentifier(string clientId);

   //--- member for getting the byte array
   uchar             ByteArray[];
  };
//+------------------------------------------------------------------+
//|                                                                  |
//+------------------------------------------------------------------+
CPktConnect::CPktConnect(uchar &buf[])
  {
   ArrayFree(ByteArray);
   ArrayResize(ByteArray, buf.Size() + 2, UCHAR_MAX);
   SetFixedHeader(CONNECT, buf, ByteArray);
   SetProtocolName(ByteArray);
   SetProtocolVersion(ByteArray);
   InitConnectFlags();
  }
//+------------------------------------------------------------------+
//|                                                                  |
//+------------------------------------------------------------------+
void CPktConnect::SetClientIdentifier(string clientId)
  {
   SetClientIdentifierLength(clientId);
   StringToCharArray(clientId, ByteArray,
                     ByteArray.Size() - StringLen(clientId), StringLen(clientId));
  }
//+------------------------------------------------------------------+
//|                                                                  |
//+------------------------------------------------------------------+
void CPktConnect::SetClientIdentifierLength(string clientId)
  {
   clientIdLen.msb = (char)StringLen(clientId) >> 8;
   clientIdLen.lsb = (char)StringLen(clientId) % 256;
   ByteArray[12] = clientIdLen.msb;
   ByteArray[13] = clientIdLen.lsb;
  }
//+------------------------------------------------------------------+
//|                                                                  |
//+------------------------------------------------------------------+
void CPktConnect::SetKeepAlive(ushort seconds) // MQTT max is 65,535 sec
  {
   keepAlive.msb = (uchar)(seconds >> 8) & 255;
   keepAlive.lsb = (uchar)seconds & 255;
   ByteArray[10] = keepAlive.msb;
   ByteArray[11] = keepAlive.lsb;
  }
//+------------------------------------------------------------------+
//|                                                                  |
//+------------------------------------------------------------------+
void CPktConnect::SetPasswordFlag(const bool passwordFlag)
  {
   passwordFlag ? m_connect_flags |= PASSWORD_FLAG : m_connect_flags &= ~PASSWORD_FLAG;
   ArrayFill(ByteArray, sizeof(ByteArray), 1, m_connect_flags);
  }
//+------------------------------------------------------------------+
//|                                                                  |
//+------------------------------------------------------------------+
void CPktConnect::SetUserNameFlag(const bool userNameFlag)
  {
   userNameFlag ? m_connect_flags |= USER_NAME_FLAG : m_connect_flags &= (uchar) ~USER_NAME_FLAG;
   ArrayFill(ByteArray, sizeof(ByteArray), 1, m_connect_flags);
  }
//+------------------------------------------------------------------+
//|                                                                  |
//+------------------------------------------------------------------+
void CPktConnect::SetWillRetain(const bool willRetain)
  {
   willRetain ? m_connect_flags |= WILL_RETAIN : m_connect_flags &= ~WILL_RETAIN;
   ArrayFill(ByteArray, sizeof(ByteArray), 1, m_connect_flags);
  }
//+------------------------------------------------------------------+
//|                                                                  |
//+------------------------------------------------------------------+
void CPktConnect::SetWillQoS_2(const bool willQoS_2)
  {
   willQoS_2 ? m_connect_flags |= WILL_QOS_2 : m_connect_flags &= ~WILL_QOS_2;
   ArrayFill(ByteArray, sizeof(ByteArray), 1, m_connect_flags);
  }
//+------------------------------------------------------------------+
//|                                                                  |
//+------------------------------------------------------------------+
void CPktConnect::SetWillQoS_1(const bool willQoS_1)
  {
   willQoS_1 ? m_connect_flags |= WILL_QOS_1 : m_connect_flags &= ~WILL_QOS_1;
   ArrayFill(ByteArray, sizeof(ByteArray), 1, m_connect_flags);
  }
//+------------------------------------------------------------------+
//|                                                                  |
//+------------------------------------------------------------------+
void CPktConnect::SetWillFlag(const bool willFlag)
  {
   willFlag ? m_connect_flags |= WILL_FLAG : m_connect_flags &= ~WILL_FLAG;
   ArrayFill(ByteArray, sizeof(ByteArray), 1, m_connect_flags);
  }
//+------------------------------------------------------------------+
//|                                                                  |
//+------------------------------------------------------------------+
void CPktConnect::SetCleanStart(const bool cleanStart)
  {
   cleanStart ? m_connect_flags |= CLEAN_START : m_connect_flags &= ~CLEAN_START;
   ArrayFill(ByteArray, 9, 1, m_connect_flags);
  }
//+------------------------------------------------------------------+
//|                                                                  |
//+------------------------------------------------------------------+
CPktConnect::CPktConnect()
  {
  }
//+------------------------------------------------------------------+
//|                                                                  |
//+------------------------------------------------------------------+
CPktConnect::~CPktConnect()
  {
  }
//+------------------------------------------------------------------+


测试我们的第一个类

CPktConnect类的唯一目的是构建一个格式良好的MQTT CONNECT数据包。因此,为了测试它,我们需要从生成一种“fixture”开始,这表示一个格式良好的CONNECT数据包的样本字节数组。但是我们怎么能确定我们组成的字节数组表示一个格式良好的CONNECT数据包呢最后,这是我们将用来从头开始上课的测试。如果我们的 fixture 代表了一个错误的封包,我们的许多甚至全部辛勤工作都白费了。

协议开发人员和维护人员OASIS来拯救我们了。在第3.1.2.12节中,我们将发现CONNECT数据包的可变头的非规范示例。由于我们已经测试了我们的固定头生成器(请参阅上一篇文章),所以这个OASIS示例就足够开始了。它将使我们能够确保我们的类生成一个格式良好的数据包,具有一些不同的配置,如布尔CleanSession和Keep-Alive请求的时间跨度。

然后将此硬编码手动生成的字节数组与CpktConnect生成的数据包进行比较。

//+------------------------------------------------------------------+
//|                                  TEST_CControlPacket_Connect.mq5 |
//|                                                                  |
//|            ********* WORK IN PROGRESS **********                 |
//| **** PART OF ARTICLE https://www.mql5.com/en/articles/13334 **** |
//+------------------------------------------------------------------+
#include <MQTT\CPktConnect.mqh>

//+------------------------------------------------------------------+
//| Tests for CControlPacketConnect class                            |
//+------------------------------------------------------------------+
void OnStart()
  {
   Print(TEST_SetCleanStart_KeepAlive_ClientIdentifier());
   Print(TEST_SetClientIdentifier());
   Print(TEST_SetClientIdentifierLength());
   Print(TEST_SetCleanStart_and_SetKeepAlive());
   Print(TEST_SetKeepAlive());
   Print(TEST_SetCleanStart());
  }
/* REFERENCE ARRAY (FIXTURE)
{16, 24, 0, 4, 77, 81, 84, 84, 5, 2, 0, 10, 0, 4, 7, 17, 0, 0, 0, 10, 25, 1, 77, 81, 76, 53}
*/
//+------------------------------------------------------------------+
//|                                                                  |
//+------------------------------------------------------------------+
bool TEST_SetCleanStart_KeepAlive_ClientIdentifier()
  {
   Print(__FUNCTION__);
//--- Arrange
   static uchar expected[] =
     {16, 16, 0, 4, 77, 81, 84, 84, 5, 2, 0, 10, 0, 4, 77, 81, 76, 53};
   uchar buf[expected.Size() - 2];
   CPktConnect *cut = new CPktConnect(buf);
//--- Act
   cut.SetCleanStart(true);
   cut.SetKeepAlive(10);//10 sec
   cut.SetClientIdentifier("MQL5");
   uchar result[];
   ArrayCopy(result, cut.ByteArray);
//--- Assert
   bool isTrue = Assert(expected, result);
//--- cleanup
   delete cut;
   ZeroMemory(result);
   return  isTrue ? true : false;
  }
//+------------------------------------------------------------------+
//|                                                                  |
//+------------------------------------------------------------------+
bool TEST_SetClientIdentifier()
  {
   Print(__FUNCTION__);
//--- Arrange
   static uchar expected[] =
     {16, 16, 0, 4, 77, 81, 84, 84, 5, 0, 0, 0, 0, 4, 77, 81, 76, 53};
   uchar buf[expected.Size() - 2];
   CPktConnect *cut = new CPktConnect(buf);
//--- Act
   cut.SetClientIdentifier("MQL5");
   uchar result[];
   ArrayCopy(result, cut.ByteArray);
//--- Assert
   bool isTrue = Assert(expected, result);
//--- cleanup
   delete cut;
   ZeroMemory(result);
   return  isTrue ? true : false;
  }
//+------------------------------------------------------------------+
//|                                                                  |
//+------------------------------------------------------------------+
bool TEST_SetClientIdentifierLength()
  {
   Print(__FUNCTION__);
//--- Arrange
   static uchar expected[] =
     {16, 12, 0, 4, 77, 81, 84, 84, 5, 0, 0, 0, 0, 4};
   uchar buf[expected.Size() - 2];
   CPktConnect *cut = new CPktConnect(buf);
//--- Act
   cut.SetClientIdentifierLength("MQL5");
   uchar result[];
   ArrayCopy(result, cut.ByteArray);
//--- Assert
   bool isTrue = Assert(expected, result);
//--- cleanup
   delete cut;
   ZeroMemory(result);
   return  isTrue ? true : false;
  }
//+------------------------------------------------------------------+
//|                                                                  |
//+------------------------------------------------------------------+
bool TEST_SetCleanStart_and_SetKeepAlive()
  {
   Print(__FUNCTION__);
//--- Arrange
   static uchar expected[] =
     {16, 10, 0, 4, 77, 81, 84, 84, 5, 2, 0, 10};
   uchar buf[expected.Size() - 2];
   CPktConnect *cut = new CPktConnect(buf);
//--- Act
   cut.SetCleanStart(true);
   cut.SetKeepAlive(10); //10 secs
   uchar result[];
   ArrayCopy(result, cut.ByteArray);
//--- Assert
   bool isTrue = Assert(expected, result);
//--- cleanup
   delete cut;
   ZeroMemory(result);
   return  isTrue ? true : false;
  }
//+------------------------------------------------------------------+
bool TEST_SetKeepAlive()
  {
   Print(__FUNCTION__);
//--- Arrange
   static uchar expected[] =
     {16, 10, 0, 4, 77, 81, 84, 84, 5, 0, 0, 10};
   uchar buf[expected.Size() - 2];
   CPktConnect *cut = new CPktConnect(buf);
//--- Act
   cut.SetKeepAlive(10); //10 secs
   uchar result[];
   ArrayCopy(result, cut.ByteArray);
//--- Assert
   bool isTrue = Assert(expected, result);
//--- cleanup
   delete cut;
   ZeroMemory(result);
   return  isTrue ? true : false;
  }
//+------------------------------------------------------------------+
//|                                                                  |
//+------------------------------------------------------------------+
bool TEST_SetCleanStart()
  {
   Print(__FUNCTION__);
//--- Arrange
   static uchar expected[] =
     {16, 8, 0, 4, 77, 81, 84, 84, 5, 2};
   uchar buf[expected.Size() - 2];
   CPktConnect *cut = new CPktConnect(buf);
//--- Act
   cut.SetCleanStart(true);
   uchar result[];
   ArrayCopy(result, cut.ByteArray);
//--- Assert
   bool isTrue = Assert(expected, result);
//--- cleanup
   delete cut;
//ZeroMemory(result);
   return  isTrue ? true : false;
  }
//+------------------------------------------------------------------+
bool Assert(uchar& expected[], uchar& result[])
  {
   if(!ArrayCompare(expected, result) == 0)
     {
      for(uint i = 0; i < expected.Size(); i++)
        {
         printf("expected\t%d\t\t%d result", expected[i], result[i]);
        }
      printf("expected size %d <=> %d result size", expected.Size(), result.Size());
      Print("Expected");
      ArrayPrint(expected);
      Print("Result");
      ArrayPrint(result);
      return false;
     }
   return true;
  }
//+------------------------------------------------------------------+

注意:如您所知,为“让我们看看头数组中是否有我刚刚放入的数据”之类的内容编写测试似乎是在浪费时间。但事实并非如此。这组“显而易见”的测试将永远伴随我们的代码。人们可以将其视为一种连续的自动调试工具,当您发现某种回归错误,甚至是错误的复制粘贴等愚蠢错误时,它将证明其价值。这就是为什么我们不测试连接操作是否有效的原因。在测试连接操作之前,我们希望确保我们的头部格式良好。值得记住的是,TDD是一个过程。在我们有了第一个工作版本的代码之前,这些测试中的许多(如果不是全部的话)都会被重写甚至删除。但那些留下来的,可能会永远留下。

只有当ArrayCompare(d)与我们的引用字节数组,即我们的fixture时,CPktConnect'生成的字节数组返回0(零)时,此测试才会通过。    ​

在我们测试了一些基本连接属性的组合后,数据包将被发送到代理,并且不能“由于协议错误”而被拒绝

图02 - Metaeditor 专家日志 TEST_CPktConnect 成功

图02 - 显示CPktConnect类测试结果的Metaeditor专家选项卡日志


与我们的本地MQTT代理进行检查

现在,我们可以在WSL上运行Mosquitto本地代理,以检查我们的MQTT连接是否成功。

如果按照默认安装,Mosquito应该在Linux上作为服务运行。因此,您只需要“重拨”端口(80→ 1883),并在Metatrader 5选项中包含允许的URL上的主机名。

图03 - WSL Mosquitto 日志连接/断开连接成功

图03 - Mosquito登录WSL,显示连接/断开状态:成功。


 太好了!我们的连接尝试没有返回协议错误。现在我们可以尝试在客户端和服务器之间交换消息。


结论

在下一步中,我们将处理CONNACK的回应。在这一步中,我们将有一个坚实的基础开始发布我们的第一条消息。当然,我们将开始为它编写测试!:)继续关注!


本文由MetaQuotes Ltd译自英文
原文地址: https://www.mql5.com/en/articles/13334

附加的文件 |
Defines.mqh (8.03 KB)
MQTT.mqh (5.01 KB)
CPktConnect.mqh (9.97 KB)
MQL5中的范畴论(第20部分):自我注意的迂回与转换 MQL5中的范畴论(第20部分):自我注意的迂回与转换
我们暂时离开我们的系列文章,考虑一下 chatGPT 中的部分算法。有没有从自然变换中借鉴的相似之处或概念?我们尝试用信号类格式的代码,在一篇有趣的文章中回答这些和其他问题。
MQL5 中的范畴论 (第 17 部分):函子与幺半群 MQL5 中的范畴论 (第 17 部分):函子与幺半群
本文是我们系列文章的最后一篇,将函子作为一个主题来讨论,且把幺半群作为一个范畴来重新审视。幺半群已在我们的系列中多次讲述,于此配合多层感知器帮助确定持仓规模。
将ML模型与策略测试器集成(结论):实现价格预测的回归模型 将ML模型与策略测试器集成(结论):实现价格预测的回归模型
本文描述了一个基于决策树的回归模型的实现。该模型应预测金融资产的价格。我们已经准备好了数据,对模型进行了训练和评估,并对其进行了调整和优化。然而,需要注意的是,该模型仅用于研究目的,不应用于实际交易。
神经网络变得轻松(第五十四部分):利用随机编码器(RE3)进行高效研究 神经网络变得轻松(第五十四部分):利用随机编码器(RE3)进行高效研究
无论何时我们研究强化学习方法时,我们都会面对有效探索环境的问题。解决这个问题通常会导致算法更复杂性,以及训练额外模型。在本文中,我们将看看解决此问题的替代方法。