English Deutsch 日本語 Português
preview
Разработка MQTT-клиента для MetaTrader 5: методология TDD (Часть 6)

Разработка MQTT-клиента для MetaTrader 5: методология TDD (Часть 6)

MetaTrader 5Интеграция | 23 июля 2024, 16:59
331 0
Jocimar Lopes
Jocimar Lopes

 

"Оптимизм — это профессиональный источник опасности для программиста: обратная связь клиента может его сгладить" (Кент Бек)

Введение

Методология разработки через тестирование (Test-Driven Development, TDD) дает множество преимуществ и имеет один существенный недостаток. Она помогает нам писать четко определенные модули и правильно названные переменные, чтобы добиться высокого охвата тестами, лучше понять предметную область, избежать чрезмерного усложнения и сосредоточиться на поставленной задаче. Главный недостаток является прямым следствием такой узкой фокусировки на поставленной задаче - чтобы не пугаться общей сложности проекта, мы, как разработчики, продолжаем решать минимально возможную задачу за раз. Если гений — это человек, который устраняет сложность путем ее решения, то разработчик TDD — это человек, который намеренно игнорирует сложность. 

Его можно сравнить с лошадью в шорах или с ослом, который гонится за морковкой.

Сложность не исчезает оттого, что мы ее проигнорировали. Игнорируя лес и внимательно рассматривая лист, мы продолжаем оставлять за собой технический долг. Мы оставляем избыточные функции, дублированные члены, бесполезные тесты, ненужные классы, нечитаемый и недоступный код. Этот технический долг, накапливающийся во время разработки, может нанести вред нашей производительности. Именно по этой причине рефакторинг является неотъемлемой частью практики TDD. На схеме ниже показаны типичные этапы TDD.

Типичные этапы TDD: красный, зеленый, рефакторинг

Рис. 01. Типичные этапы TDD: красный, зеленый, рефакторинг (источник: IBM Developer)

В следующих разделах я опишу, как я реорганизовал ранее написанные классы, и прокомментирую некоторые улучшения. Я покажу создание пакетов PUBLISH после улучшений, а также получение жизнеспособного проекта для классов построения пакетов. Первый класс, следующий новому шаблону, — это класс PUBACK. Поскольку пакеты PUBACK являются аналогом пакетов PUBLISH с QoS 1, нам нужно заняться управлением состоянием сеанса (Session State). Нашему клиенту понадобится какой-то уровень сохранения (persistence layer) для сохранения и обновления состояния. 

Слой сохранения выходит за рамки стандарта OASIS. Он зависит от приложения. Это может быть простой файл в локальной файловой системе или полностью распределенная система баз данных высокой доступности в облаке. Для наших целей будет достаточно базы данных наподобие сервера PostgreSQL, работающего локально на Windows или через WSL. Однако, поскольку у нас есть встроенная интеграция между MQL и SQLite, выбор этой однофайловой СУБД без сервера здесь очевиден. SQLite отличается простотой, масштабируемостью, надежностью и не требует обслуживания сервера. У нас даже можем быть база данных, хранящаяся только в памяти, что очень удобно для тестирования и отладки. 

Но на данном этапе я не буду реализовывать уровень сохранения, поскольку я решил тщательно протестировать запись и чтение пакетов, прежде чем приступать к управлению состоянием сеанса. Прежде чем переходить к уровню сохранения, нам необходимо убедиться, что мы правильно кодируем и декодируем различные типы данных, используемые протоколом MQTT. Для достижения этой цели я пишу обширные модульные тесты и вскоре начну проводить небольшие функциональные тесты на реальном брокере, работающем локально (брокер mosquitto с открытым исходным кодом от Eclipse Foundation).

Итак, чтобы протестировать взаимодействие PUBLISH/PUBACK, я буду использовать ненастоящую базу данных - набор функций для генерации контролируемых данных, необходимых нам для тестирования. Я представлю его ниже при описании класса CPUback.

В последующих описаниях мы используем слова НЕОБХОДИМО/ДОЛЖНО (MUST) и МОЖЕТ/МОЖНО (MAY) так, как они используются в Стандарте OASIS, который, в свою очередь, использует их, как описано в документе IETF RFC 2119.

Также, если не указано иное, все цитаты взяты из Стандарта OASIS.


Создание пакетов PUBLISH

В процессе переписывания класса CPublish мы удалили некоторые члены класса. Мы также объединили построение фиксированного и переменного заголовков в одношаговый конструктор. Эти изменения реплицируются в других классах пакетов управления.

В настоящее время наш класс CPublish имеет следующие члены и методы.

//+------------------------------------------------------------------+
//|                                                      Publish.mqh |
//|            ********* WORK IN PROGRESS **********                 |
//| **** PART OF ARTICLE https://www.mql5.com/en/articles/14391 **** |
//+------------------------------------------------------------------+
#include "IControlPacket.mqh"
//+------------------------------------------------------------------+
//|        PUBLISH VARIABLE HEADER                                   |
//+------------------------------------------------------------------+
/*
The Variable Header of the PUBLISH Packet contains the following fields in the order: Topic Name,
Packet Identifier, and Properties.
*/
//+------------------------------------------------------------------+
//| Class CPublish.                                                  |
//| Purpose: Class of MQTT Publish Control Packets.                  |
//|          Implements IControlPacket                               |
//+------------------------------------------------------------------+
class CPublish : public IControlPacket
  {
private:
   bool              IsControlPacket() {return true;}
   bool              HasWildcardChar(const string str);
protected:
   uchar             m_pubflags;
   uint              m_remlen;
   uchar             m_topname[];
   uchar             m_props[];
   uint              m_payload[];
public:
                     CPublish();
                    ~CPublish();
   //--- methods for setting Publish flags
   void              SetRetain(const bool retain);
   void              SetQoS_1(const bool QoS_1);
   void              SetQoS_2(const bool QoS_2);
   void              SetDup(const bool dup);
   //--- method for setting Topic Name
   void              SetTopicName(const string topic_name);
   //--- methods for setting Properties
   void              SetPayloadFormatIndicator(PAYLOAD_FORMAT_INDICATOR format);
   void              SetMessageExpiryInterval(uint msg_expiry_interval);
   void              SetTopicAlias(ushort topic_alias);
   void              SetResponseTopic(const string response_topic);
   void              SetCorrelationData(uchar &binary_data[]);
   void              SetUserProperty(const string key, const string val);
   void              SetSubscriptionIdentifier(uint subscript_id);
   void              SetContentType(const string content_type);
   //--- method for setting the payload
   void              SetPayload(const string payload);
   //--- method for building the final packet
   void              Build(uchar &result[]);
  };

Помимо упрощения, теперь процесс установки флагов публикации, названий тем и свойств является независимым, то есть каждый из них может быть установлен в любом порядке, при условии, что метод Build() вызывается последним.
Этот тест формализует такое поведение. Он проверяет конструктор класса с двумя установленными флагами RETAIN и QoS1 и необходимым именем темы.
bool TEST_Ctor_Retain_QoS1_TopicName1Char()
  {
   Print(__FUNCTION__);
   CPublish *cut = new CPublish();
   uchar expected[] = {51, 6, 0, 1, 'a', 0, 1, 0}; // QoS > 0 require packet ID
   uchar result[];
   cut.SetTopicName("a");
   cut.SetRetain(true);
   cut.SetQoS_1(true);
   cut.Build(result);
   bool isTrue = AssertEqual(expected, result);
   delete(cut);
   ZeroMemory(result);
   return isTrue;
  }

Теперь методы SetTopicName(), SetRetain() и SetQos1() можно вызывать в любом порядке, и полученный пакет по-прежнему действителен. Как уже говорилось, такое поведение воспроизводится во всех классах пакетов управления, и у нас есть тест для каждой комбинации флагов публикации. Все тесты представлены в приложенных файлах.

Фиксированный заголовок пакета PUBLISH

Фиксированные заголовки пакетов PUBLISH отличаются от всех других пакетов управления MQTT 5.0 в текущей версии протокола. У них есть три флага, которые НЕ зарезервированы для использования в будущем: флаги RETAIN, QoS и DUP. Подробное описание флагов PUBLISH можно найти в предыдущей (пятой) статье серии.

Флаги RETAIN, QoS Level и DUP фиксированного заголовка пакета MQTT 5.0 PUBLISH

Рис. 02. Фиксированный заголовок пакета PUBLISH MQTT 5.0 RETAIN, флаг QoS Level и DUP

Мы используем тот же шаблон для переключения любого из флагов публикации, но теперь, после рефакторинга, мы больше не вызываем SetFixedHeader() в каждом из них. Во-первых, мы определяем переключатель как логическое значение, которое передается в качестве аргумента функции.

void CPktPublish::SetRetain(const bool retain)
  {
   retain ? m_pubflags |= RETAIN_FLAG : m_pubflags &= ~RETAIN_FLAG;
  }

Затем мы проверяем, является ли логическое значение истинным или ложным.

void CPktPublish::SetQoS_1(const bool QoS_1)
  {
   QoS_1 ? m_pubflags |= QoS_1_FLAG : m_pubflags &= ~QoS_1_FLAG;
  }

Если логическое значение истинно, мы применяем побитовое присваивание ИЛИ между значением флага и элементом uchar (один байт), чтобы установить флаг.

void CPktPublish::SetQoS_2(const bool QoS_2)
  {
   QoS_2 ? m_pubflags |= QoS_2_FLAG : m_pubflags &= ~QoS_2_FLAG;
  }

Если логическое значение ложно, мы выполняем побитовое присваивание И между значением флага и тем же элементом uchar, чтобы снять флаг.

void CPktPublish::SetDup(const bool dup)
  {
   dup ? m_pubflags |= DUP_FLAG : m_pubflags &= ~DUP_FLAG;
  }

Таким образом, переменная m_pubflags содержит все установленные/снятые флаги при настройке пакета. Позже, когда вызывается метод Build(), мы снова выполняем побитовое ИЛИ, на этот раз между m_pubflags и первым байтом пакета (байт 0).

pkt[0] |= m_pubflags;


Заголовок переменной пакета PUBLISH

Переменный заголовок пакета PUBLISH содержит следующие поля в следующем порядке: имя темы, идентификатор пакета и свойства.

Название темы

Поскольку все отношения между издателями и подписчиками привязаны к названию темы публикации, это поле является обязательным в пакетах PUBLISH и не может содержать подстановочные знаки. При установке этого поля у нас есть два защитных условия: для подстановочных знаков и для строки нулевой длины, которые немедленно возвращаются и регистрируют ошибку, если какое-либо из этих условий истинно.

void CPktPublish::SetTopicName(const string topic_name)
  {
   if(HasWildcardChar(topic_name) || StringLen(topic_name) == 0)
     {
      ArrayFree(m_topname);
      return;
     }
   EncodeUTF8String(topic_name, m_topname);
  }

Если ни одно из защитных условий не выполняется, мы кодируем строку как UTF-8 и сохраняем массив символов в защищенном элементе m_topname для дальнейшего включения в окончательный пакет при вызове Build().

Идентификатор пакета

Идентификатор пакета НЕ задается пользователем и не требуется для QoS 0. Вместо этого он устанавливается автоматически в методе Build(), если требуемое качество обслуживания > 0. 

// QoS > 0 requires packet ID
   if((m_pubflags & 0x06) != 0)
     {
      SetPacketID(pkt, pkt.Size());
     }
При построении окончательного пакета мы проверяем член m_pubflags посредством побитового И с двоичным значением 0110 (0x06). Если результат не равен нулю, мы знаем, что пакет имеет QoS_1 или QoS_2, и устанавливаем идентификатор пакета.
Функция SetPacketID генерирует псевдослучайное целое число, используя TimeLocal() для генерации начального состояния. Чтобы облегчить нам жизнь при тестировании, мы определили логическую переменную TEST. Если переменная равна true, функция устанавливает значение 1 в качестве идентификатора пакета.
//+------------------------------------------------------------------+
//|            SetPacketID                                           |
//+------------------------------------------------------------------+
#define TEST true

void SetPacketID(uchar& buf[], int start_idx)
  {
// MathRand - Before the first call of the function, it's necessary to call
// MathSrand to set the generator of pseudorandom numbers to the initial state.
   MathSrand((int)TimeLocal());
   int packet_id = MathRand();
   if(ArrayResize(buf, buf.Size() + 2) < 0)
     {
      printf("ERROR: failed to resize array at %s", __FUNCTION__);
      return;
     }
   buf[start_idx] = (uchar)packet_id >> 8; // MSB
   buf[start_idx + 1] = (uchar)(packet_id % 256) & 0xff; //LSB
//--- if testing, set packet ID to 1
   if(TEST)
     {
      Print("WARN: SetPacketID TEST true fixed ID = 1");
      buf[start_idx] = 0; // MSB
      buf[start_idx + 1] = 1; //LSB
     }
  }

Как видите, на всякий случай у нас также есть предупреждение (WARN).

Свойства

В четвертой части серии мы подробно рассмотрели свойства и их роль в механизмах расширения MQTT 5.0. Здесь я опишу их реализацию, уделив особое внимание кодировке разных типов данных.

В пакете управления MQTT 5.0 существуют шесть типов представления данных для кодирования значений свойств:

  1. Однобайтовое целое число, представляющее собой 8-битные целые числа без знака
  2. Двухбайтовые целые числа, которые представляют собой 16-битные целые числа без знака в порядке от старшего к младшему (сетевой порядок)
  3. Четырехбайтовые целые числа, которые представляют собой 32-битные целые числа без знака, также в сетевом порядке
  4. Переменные байтовые целые числа, в которых используется минимальное количество до четырех байтов для представления значения от 0 до 268 435 455
  5. Бинарные данные от 0 до 65 535
  6. Строки в кодировке UTF-8, которые также можно использовать для кодирования пары ключ:значение в свойствах пользователя.

В следующей таблице показаны доступные свойства PUBLISH и соответствующее представление данных.

Свойство Представление данных
Индикатор формата полезной нагрузки (Payload Format Indicator) Однобайтовым целым числом
Интервал действия сообщения (Message Expiry Interval) Четырехбайтовым целым числом
Псевдоним темы (Topic Alias) Двухбайтовым целым числом 
Тема ответа (Response Topic) Строка в кодировке UTF-8
Данные корреляции (Correlation Data)  Бинарные данные 
Свойство пользователя (User Property)  Пара строк в кодировке UTF-8
Идентификатор подписки (Subscription Identifier)  Целое число с переменным числом байт
Тип содержимого (Content Type) Строка в кодировке UTF-8

Таблица 01. Свойства PUBLISH и их соответствующее представление данных в MQTT 5.0

Идентификаторы свойств были включены в заголовок Defines.mqh.

//+------------------------------------------------------------------+
//|              PROPERTIES                                          |
//+------------------------------------------------------------------+
/*
The last field in the Variable Header of the CONNECT, CONNACK, PUBLISH, PUBACK, PUBREC,
PUBREL, PUBCOMP, SUBSCRIBE, SUBACK, UNSUBSCRIBE, UNSUBACK, DISCONNECT, and
AUTH packet is a set of Properties. In the CONNECT packet there is also an optional set of Properties in
the Will Properties field with the Payload
*/
#define MQTT_PROP_IDENTIFIER_PAYLOAD_FORMAT_INDICATOR          0x01 // (1) Byte                  
#define MQTT_PROP_IDENTIFIER_MESSAGE_EXPIRY_INTERVAL           0x02 // (2) Four Byte Integer     
#define MQTT_PROP_IDENTIFIER_CONTENT_TYPE                      0x03 // (3) UTF-8 Encoded String  
#define MQTT_PROP_IDENTIFIER_RESPONSE_TOPIC                    0x08 // (8) UTF-8 Encoded String  
#define MQTT_PROP_IDENTIFIER_CORRELATION_DATA                  0x09 // (9) Binary Data           
#define MQTT_PROP_IDENTIFIER_SUBSCRIPTION_IDENTIFIER           0x0B // (11) Variable Byte Integer
#define MQTT_PROP_IDENTIFIER_SESSION_EXPIRY_INTERVAL           0x11 // (17) Four Byte Integer   
.
.
. 

Индикатор формата полезной нагрузки (Payload Format Indicator)

Индикатор формата полезной нагрузки может иметь значение 0 или 1, что означает необработанные байты или строку в кодировке UTF-8 соответственно. При отсутствии предполагается, что он равен 0 (необработанные байты).

Хотя это поле можно установить непосредственно в массиве элементов m_props, мы решили использовать вспомогательный локальный буфер в качестве посредника, чтобы обеспечить согласованность с большинством свойств, требующих каких-либо манипуляций перед копированием в окончательный массив свойств.

void CPktPublish::SetPayloadFormatIndicator(PAYLOAD_FORMAT_INDICATOR format)
  {
   uchar aux[2];
   aux[0] = MQTT_PROP_IDENTIFIER_PAYLOAD_FORMAT_INDICATOR;
   aux[1] = (uchar)format;
   ArrayCopy(m_props, aux, m_props.Size());
  }

Хотя для этого свойства существует только два возможных значения, мы решили присвоить им условное значение ради удобства чтения.

enum PAYLOAD_FORMAT_INDICATOR
  {
   RAW_BYTES   = 0x00,
   UTF8        = 0x01
  };

Использование условного значения делает вызов метода явным для конечного пользователя библиотеки.

cut.SetPayloadFormatIndicator(RAW_BYTES);
cut.SetPayloadFormatIndicator(UTF8);

Интервал действия сообщения (Message Expiry Interval)

Интервал действия сообщения представлен в виде четырехбайтового целого числа. Стоит помнить, что это представление отличается от представления целочисленного байта переменной. Хотя в последнем случае будет использоваться минимальное количество байтов, необходимое для представления значения, в первом случае всегда будут использоваться четыре байта.

void CPktPublish::SetMessageExpiryInterval(uint msg_expiry_interval)
  {
   uchar aux[4];
   aux[0] = MQTT_PROP_IDENTIFIER_MESSAGE_EXPIRY_INTERVAL;
   ArrayCopy(m_props, aux, m_props.Size(), 0, 1);
   EncodeFourByteInteger(msg_expiry_interval, aux);
   ArrayCopy(m_props, aux, m_props.Size());
  }

Наша функция для кодирования четырехбайтового целого числа следует хорошо известному шаблону двух сдвигов вправо, чтобы обеспечить требуемый сетевой порядок.

void EncodeFourByteInteger(uint val, uchar &dest_buf[])
  {
   ArrayResize(dest_buf, 4);
   dest_buf[0] = (uchar)(val >> 24) & 0xff;
   dest_buf[1] = (uchar)(val >> 16) & 0xff;
   dest_buf[2] = (uchar)(val >> 8) & 0xff;
   dest_buf[3] = (uchar)val & 0xff;
  }

Псевдоним темы (Topic Alias)

Свойство можно использовать для уменьшения размера пакета. Оно ограничено каждым сетевым подключением и является частью состояния сеанса MQTT. Итак, на данный момент нашу функцию для установки псевдонима темы можно рассматривать как заглушку. Ее необходимо заполнить при работе с состоянием сеанса.

void CPktPublish::SetTopicAlias(ushort topic_alias)
  {
   uchar aux[2];
   aux[0] = MQTT_PROP_IDENTIFIER_TOPIC_ALIAS;
   ArrayCopy(m_props, aux, m_props.Size(), 0, 1);
   EncodeTwoByteInteger(topic_alias, aux);
   ArrayCopy(m_props, aux, m_props.Size());
  }

Наша функция для кодирования двухбайтового целого числа следует тому же хорошо известному шаблону двух сдвигов вправо, чтобы обеспечить требуемый сетевой порядок.

void EncodeTwoByteInteger(uint val, uchar &dest_buf[])
  {
   ArrayResize(dest_buf, 2);
   dest_buf[0] = (uchar)(val >> 8) & 0xff;
   dest_buf[1] = (uchar)val & 0xff;
  }

Тема ответа (Response Topic)

Свойство не является частью шаблона публикации/подписки. Это часть взаимодействия запроса/ответа через MQTT. Как видите, наша функция использует два вспомогательных буфера: один для размещения идентификатора свойства, а другой — для размещения строки в кодировке UTF-8. То же самое произойдет и с другими строками в кодировке UTF-8, поскольку наша функция кодирования строк не имеет третьего параметра для указания индекса начала буфера назначения. Решением может служить перегрузка в следующих версиях.

void CPktPublish::SetResponseTopic(const string response_topic)
  {
   uchar aux[1];
   aux[0] = MQTT_PROP_IDENTIFIER_RESPONSE_TOPIC;
   ArrayCopy(m_props, aux, m_props.Size());
   uchar buf[];
   EncodeUTF8String(response_topic, buf);
   ArrayCopy(m_props, buf, m_props.Size());
  }

Данные корреляции (Correlation Data)

Также является частью взаимодействия "запрос/ответ" через MQTT, но не частью взаимодействия "публикация/подписка". Поскольку его значением являются бинарные данные, наша функция просто копирует данные, переданные в качестве аргумента, в байтовый массив m_props после установки идентификатора свойства.

void CPktPublish::SetCorrelationData(uchar &binary_data[])
  {
   uchar aux[1];
   aux[0] = MQTT_PROP_IDENTIFIER_CORRELATION_DATA;
   ArrayCopy(m_props, aux, m_props.Size());
   ArrayCopy(m_props, binary_data, m_props.Size());
  }

Свойство пользователя (User Property)

Это наиболее гибкое свойство MQTT 5.0, поскольку его можно использовать для передачи пар ключ:значение в кодировке UTF-8 с семантикой, зависящей от приложения.

"Ненормативный комментарий

Это свойство предназначено для предоставления средств передачи тегов имени-значения прикладного уровня, значение и интерпретация которых известны только прикладным программам, ответственным за их отправку и получение".

Наша функция использует три вспомогательных буфера для кодирования этого свойства, поскольку в настоящее время наш кодировщик строк UTF-8 не имеет третьего параметра для указания индекса начала буфера назначения. Решением может служить перегрузка в следующих версиях. (см. раздел "Тема ответа" выше.)

void CPktPublish::SetUserProperty(const string key, const string val)
  {
   uchar aux[1];
   aux[0] = MQTT_PROP_IDENTIFIER_USER_PROPERTY;
   ArrayCopy(m_props, aux, m_props.Size());
   uchar key_buf[];
   EncodeUTF8String(key, key_buf);
   ArrayCopy(m_props, key_buf, m_props.Size());
   uchar val_buf[];
   EncodeUTF8String(val, val_buf);
   ArrayCopy(m_props, val_buf, m_props.Size());
  }

Идентификатор подписки (Subscription Identifier)

Функция для установки идентификатора подписки проверяет, находится ли переданный аргумент в диапазоне от 1 до 268 435 455, что является допустимым значением для этого свойства. Если нет, мы печатаем/регистрируем сообщение об ошибке и немедленно возвращаемся.

void CPktPublish::SetSubscriptionIdentifier(uint subscript_id)
  {
   if(subscript_id < 1 || subscript_id > 0xfffffff)
     {
      printf("Error: " + __FUNCTION__ +  "Subscription Identifier must be between 1 and 268,435,455");
      return;
     }
   uchar aux[1];
   aux[0] = MQTT_PROP_IDENTIFIER_SUBSCRIPTION_IDENTIFIER;
   ArrayCopy(m_props, aux, m_props.Size());
   uchar buf[];
   EncodeVariableByteInteger(subscript_id, buf);
   ArrayCopy(m_props, buf, m_props.Size());
  }

Тип содержимого (Content Type)

Значение свойства определяется приложением. "MQTT не выполняет никакой проверки строки, за исключением проверки того, что она является допустимой строкой в кодировке UTF-8".

void CPktPublish::SetContentType(const string content_type)
  {
   uchar aux[1];
   aux[0] = MQTT_PROP_IDENTIFIER_CONTENT_TYPE;
   ArrayCopy(m_props, aux, m_props.Size());
   uchar buf[];
   EncodeUTF8String(content_type, buf);
   ArrayCopy(m_props, buf, m_props.Size());
  };

Полезная нагрузка (Payload)

Последнее поле в заголовке переменной PUBLISH. Полезная нагрузка нулевой длины является допустимой. Наша функция — это не что иное, как оболочка нашего кодировщика строк UTF-8, следующая тому же шаблону использования вспомогательного буфера для дальнейшего копирования в элемент m_payload.

void CPktPublish::SetPayload(const string payload)
  {
   uchar aux[];
   EncodeUTF8String(payload, aux);
   ArrayCopy(m_payload, aux, m_props.Size());
  }

Последний метод Build

Целью метода Build() является объединение фиксированного заголовка, имени темы, идентификатора пакета, свойств и полезной нагрузки в конечном пакете, при этом кодируя как длину свойств, так и оставшуюся длину пакета как переменное байтовое целое число.

Сначала проверяем наличие обязательного названия темы. Если длина равна нулю, печатаем/регистрируем ошибку и немедленно возвращаемся.

void CPktPublish::Build(uchar &pkt[])
  {
   if(m_topname.Size() == 0)
     {
      printf("Error: " + __FUNCTION__ + " topic name is mandatory");
      return;
     }
   ArrayResize(pkt, 2);


Затем устанавливаем первый байт фиксированного заголовка с типом пакета управления и соответствующими флагами PUBLISH.

// pkt type with publish flags
   pkt[0] = (uchar)PUBLISH << 4;
   pkt[0] |= m_pubflags;

Затем копируем массив m_topname в последний пакет и устанавливаем/копируем идентификатор пакета, если QoS > 0.

// topic name
   ArrayCopy(pkt, m_topname, pkt.Size());
// QoS > 0 require packet ID
   if((m_pubflags & 0x06) != 0)
     {
      SetPacketID(pkt, pkt.Size());
     }

Затем мы кодируем длину свойства как переменное байтовое целое число.

// properties length
   uchar buf[];
   EncodeVariableByteInteger(m_props.Size(), buf);
   ArrayCopy(pkt, buf, pkt.Size());

Копируем свойства и полезную нагрузку из членов класса в окончательный массив пакетов.

// properties
   ArrayCopy(pkt, m_props, pkt.Size());
// payload
   ArrayCopy(pkt, m_payload, pkt.Size());

Наконец, устанавливаем оставшуюся длину пакета, закодированную как переменное байтовое целое число.

// remaining length
   m_remlen += pkt.Size() - 2;
   uchar aux[];
   EncodeVariableByteInteger(m_remlen, aux);
   ArrayCopy(pkt, aux, 1);
  }


Пакет управления PUBACK

Как мы видели выше при реализации нашего класса CPublish, любой пакет PUBLISH с QoS 1 требует ненулевого идентификатора пакета. Этот идентификатор пакета будет возвращен в соответствующем пакете PUBACK. Именно этот идентификатор позволяет нашему клиенту узнать, был ли доставлен ранее отправленный пакет PUBLISH или произошла ошибка. Будь то успешная доставка или неудача, PUBACK является триггером, который мы будем использовать для обновления состояния сеанса. Мы будем обновлять состояние сеанса на основе кода(ов) причины.

Пакет PUBACK вернет один из девяти кодов причины.

SUCCESS - с сообщением все в порядке. Оно было принято, идет публикация. Получатель принял право собственности на сообщение. Это единственный код причины, который может быть неявным, то есть его можно опустить. PUBACK имеющий лишь идентификатором пакета ДОЛЖЕН интерпретироваться как успешная доставка QoS 1.

"Клиент или сервер, отправляющий пакет PUBACK, ДОЛЖЕН использовать один из кодов причины PUBACK [MQTT-3.4.2-1]. Код причины и длину свойства можно опустить, если код причины равен 0x00 (Success), а свойства отсутствуют.”

NO MATCHING SUBSCRIBERS - с сообщением все в порядке. Оно было принято, идет публикация, но на название темы никто не подписан. Этот код причины отправляется только брокером и является необязательным, то есть брокер МОЖЕТ отправить этот код причины вместо SUCCESS.

UNSPECIFIED ERROR - сообщение отклонено, но издатель не хочет раскрывать причину, или ни один из других более конкретных кодов причины не подходит для описания причины.

IMPLEMENTATION SPECIFIC ERROR - с сообщением все в порядке, но издатель не хочет его публиковать. Стандарт не содержит дополнительных подробностей о семантике этого кода причины, но мы можем сделать вывод, что причина отсутствия публикации не входит в область действия протокола, то есть она зависит от приложения.

NOT AUTHORIZED - нет прав доступа.

TOPIC NAME INVALID - с сообщением все в порядке, включая имя темы, которое представляет собой правильно сформированную строку в кодировке UTF-8. Но издатель, будь то клиент или брокер, не принимает это название темы. Опять же, мы можем сделать вывод, что причина отсутствия публикации зависит от приложения.

PACKET IDENTIFIER IN USE - с сообщением все в порядке, но возможно несоответствие состояния сеанса между клиентом и брокером, поскольку идентификатор пакета, который мы отправили в PUBLISH, уже используется.

QUOTA EXCEEDED - превышена квота. Причина отклонения, опять же, не в рамках протокола. Она зависит от приложения.

PAYLOAD FORMAT INVALID - с сообщением все в порядке, но свойство индикатора формата полезной нагрузки, которое мы отправили в нашей публикации, отличается от фактического формата полезной нагрузки.

Помимо кода причины, пакет PUBACK может содержать строку причины и свойство пользователя.

Строка причины — это удобочитаемая строка в кодировке UTF-8, предназначенная для помощи в диагностике. Она не предназначен для анализа получателем. Вместо этого ее цель состоит в том, чтобы нести дополнительную информацию, которую можно регистрировать, распечатывать, прикреплять к отчетам и т. д. Стоит отметить, что любой совместимый сервер или клиент не будет отправлять строку причины, если ее включение увеличивает размер пакета сверх максимального размера, указанного во время соединения (пакет CONNECT).

PUBACK также может содержать любое количество пар ключ:значение, закодированных как свойство(а) пользователя. Эти пары можно использовать для предоставления дополнительной информации об ошибке. Также они зависят от приложения. Протокол не определяет их семантику. 

Наш клиент "ДОЛЖЕН рассматривать пакет PUBLISH как "неподтвержденный" (unacknowledged) до тех пор, пока он не получит соответствующий пакет PUBACK от получателя".


Класс CPuback

Наш класс CPuback следует той же схеме, что и класс CPublish. Он также реализует интерфейс IControlPacket, который является корнем-заглушкой для иерархии объектов.

Пакет PUBACK отправляется в ответ на пакеты PUBLISH с QoS 1. Его двухбайтовый фиксированный заголовок содержит только идентификатор управляющего пакета в первом байте и оставшуюся длину пакета во втором байте. В этой версии протокола все битовые флаги установлены в RESERVED.

Структура фиксированного заголовка пакета MQTT-5.0 PUBACK

Рис. 03. Структура фиксированного заголовка пакета MQTT-5.0-PUBACK

"Переменный заголовок пакета PUBACK содержит следующие поля в следующем порядке: идентификатор пакета из подтверждаемого пакета PUBLISH, код причины PUBACK, длина свойств и сами свойства".

Структура переменного заголовка пакета MQTT-5.0 PUBACK

Рис. 04. Структура переменного заголовка пакета MQTT-5.0-PUBACK

До сих пор мы работали с нашим Клиентом только как с отправителем. С этого момента нам необходимо учитывать и его роль получателя.

"Протокол доставки симметричен, [...] Клиент и Сервер могут выступать в роли отправителя или получателя".

Нам нужно написать тест для функции, которая получает идентификатор подтверждающего пакета

  1. из возвращенного пакета, отправленного брокером при получении PUBACK
  2. или из нашей системы сохранения при отправке PUBACK

Пакет PUBLISH с QoS 1 не имеет смысла без соответствующего PUBACK, который, в свою очередь, требует сохранения идентификатора соответствующего пакета PUBLISH. Но, хотя мы уже знаем, что в какой-то момент нам понадобится использовать реальную базу данных в качестве уровня сохранения, на данный момент она нам пока не нужна. Для тестирования и разработки нашей функции нам нужно что-то, что работает как база данных, то есть при запросе возвращает идентификатор пакетов PUBLISH, ожидающих подтверждения. Чтобы избежать сюрпризов, создадим функцию GetPendingPublishIDs(ushort &result[]) и сохраним ее в файле DB.mqh.

void GetPendingPublishIDs(ushort &result[])
  {
   ArrayResize(result, 3);
   result[0] = 1;
   result[1] = 255; // one byte
   result[2] = 65535; // two bytes
  }

С нашим "уровнем сохранения" мы можем сконцентрироваться на поставленной задаче: написать функцию, которая при передаче байтового массива (пакета) PUBACK, отправленного брокером, получает идентификатор подтвержденного PUBLISH и сверяет его с отложенными идентификаторами PUBLISH, которые хранятся на нашем уровне сохранения. При совпадении идентификаторов, возвращается True. Позже, при реализации рабочего поведения (Operational Behavior) протокола, мы выпустим этот идентификатор из реального хранилища.

Учитывая приведенную выше структуру заголовка переменной PUBACK, все, что нам сейчас нужно, — это прочитать первые два байта, чтобы получить идентификатор подтверждаемого пакета.

ushort CPuback::GetPacketID(uchar &pkt[])
  {
   return (pkt[0] * 256) + pkt[1];
  }

Как мы помним, идентификатор пакета кодируется как двухбайтовое целое число в сетевом порядке, причем первым указывается старший байт (most significant byte, MSB). Для кодирования мы использовали побитовую операцию сдвига влево (<<). Для расшифровки мы умножаем значение старшего байта на 256 и добавляем младший байт.

Вышеуказанной функции на данный момент достаточно. Позже, при тестировании реального брокера в открытой сети, нам, возможно, придется столкнуться с проблемами порядка байтов, но на данном этапе мы не будем их проверять. Давайте продолжим выполнять поставленную задачу.

bool CPuback::IsPendingPkt(uchar &pkt[])
{
   ushort pending_ids[];
   GetPendingPublishIDs(pending_ids);
   ushort packet_id = GetPacketID(pkt);
   for(uint i = 0; i < pending_ids.Size(); i++)
     {
      if(pending_ids[i] == packet_id)
        {
         return true;
        }
     }
   return false;
}

Вышеуказанная функция получает в качестве аргумента массив байтов. Этот массив байтов является переменным заголовком пакета PUBACK. Затем он сохраняет в локальной переменной (pending_ids) массив идентификаторов пакетов из нашего хранилища/базы данных, которые еще не были подтверждены. Наконец, он считывает идентификатор пакета из массива байтов, отправленного брокером, и сравнивает его с массивом отложенных идентификаторов. Если пакет находится в массиве, наша функция возвращает True, и мы можем освободить идентификатор.

Та же логика позволит нам освободить идентификаторы пакетов PUBREC, PUBREL и PUBCOMP для PUBLISH с QoS 2. Кроме того, позже мы заменим наш ненастоящий "уровень сохранения" с одной функцией в файле реальной базой данных, но основная логика функции останется. На этом этапе другой разработчик может работать над уровнем сохранения, в то время как мы разрабатываем наши классы пакетов совершенно независимым образом.

Нам также необходимо иметь возможность читать коды причины из заголовка переменной PUBACK. Поскольку это поле имеет фиксированную позицию и размер, всё, что нам нужно, — это прочитать этот конкретный байт.

uchar CPuback::GetReasonCode(uchar &pkt[])
  {
   return pkt[2];
  }


Поскольку мы работаем только на стороне получателя нашего клиента, т. е. пока не отправляем PUBACK, вышеперечисленных функций достаточно для наших следующих функциональных тестов. Теперь нужна проверка на реальном брокере.


Заключение

Непрерывный рефакторинг является частью методологии TDD. Он направлен на достижение не только полностью функционального, но и чистого кода: унифицированные единицы ответственности и функции (в нашем случае классы и методы), читаемые идентификаторы (классы, методы и имена переменных) и избегание избыточности ("не повторяйтесь"). Это процесс, а не одноэтапная задача. Итак, мы уже точно знаем, что будем постоянно проводить рефакторинг, пока не получим полнофункциональный клиент MQTT 5.0.

Теперь мы готовы начать писать наш первый функциональный тест на реальном MQTT-брокере, чтобы проверить, работают ли наши пакеты CONNECT, CONNACK, PUBLISH и PUBACK должным образом. 

Пакеты PUBACK являются аналогом пакетов PUBLISH с QoS 1. Для пакетов PUBLISH с QoS 2 в качестве аналога потребуются пакеты PUBREC, PUBCOMP и PUBREL. Им посвящена наша следующая статья.

Если вы хорошо разбираетесь в MQL5 и можете внести свой вклад в разработку этого MQTT-клиента с открытым исходным кодом, напишите об этом в комментариях ниже или в чате сообщества. 


Перевод с английского произведен MetaQuotes Ltd.
Оригинальная статья: https://www.mql5.com/en/articles/14391

Прикрепленные файлы |
MQTT.zip (20.83 KB)
Tests.zip (16.88 KB)
Теория хаоса в трейдинге (Часть 1): Введение, применение на финансовых рынках и индикатор Ляпунова Теория хаоса в трейдинге (Часть 1): Введение, применение на финансовых рынках и индикатор Ляпунова
Можно ли применять теорию хаоса на финансовых рынках? Чем классическая теория Хаоса и хаотические системы отличаются от концепции, предложенной Биллом Вильямсом, рассмотрим в этой статье.
Разрабатываем мультивалютный советник (Часть 15): Готовим советник к реальной торговле Разрабатываем мультивалютный советник (Часть 15): Готовим советник к реальной торговле
Постепенно приближаясь к получению готового советника, необходимо уделить внимание вопросам, которые являются второстепенными на этапе тестирования торговой стратегии, но становятся важными при переходе к реальной торговле.
Разработка системы репликации (Часть 42): Проект Chart Trade (I) Разработка системы репликации (Часть 42): Проект Chart Trade (I)
Давайте создадим что-нибудь поинтереснее. Не хочу портить сюрприз, поэтому следите за статьей, чтобы лучше понять. С самого начала этой серии о разработке системы репликации/моделирования, я говорил, что идея состоит в том, чтобы использовать платформу MetaTrader 5 одинаково как в разрабатываемой нами системе, так и на реальном рынке. Важно, чтобы это было сделано должным образом. Никто не хочет тренироваться и учиться сражаться, используя одни инструменты, в то время как во время боя ему придется пользоваться другими.
Нейросети в трейдинге: Снижение потребления памяти методом оптимизации Adam (Adam-mini) Нейросети в трейдинге: Снижение потребления памяти методом оптимизации Adam (Adam-mini)
Одним из направлений повышения эффективности процесса обучения и сходимости моделей является улучшение методов оптимизации. Adam-mini представляет собой адаптивный метод оптимизации, разработанный для улучшения базового алгоритма Adam.