Разработка робота на Python и MQL5 (Часть 1): Препроцессинг данных

Yevgeniy Koshtenko | 29 марта, 2024

Введение

Рынок становится все сложнее. Сегодня он превращается в битву алгоритмов. Свыше 95% торгового оборота набирают роботы. 

Новый шаг — машинное обучение. Это не сильный ИИ, но и не простые линейные алгоритмы. Модель машинного обучения способна получать прибыль в сложных условиях. Интересно применить машинное обучение для создания торговых систем. Благодаря нейросетям, торговый робот будет анализировать большие данные, находить закономерности и прогнозировать движение цены.


Мы рассмотрим цикл разработки торгового робота: сбор данных, обработка, расширение выборки, инженерия признаков, выбор и обучение модели, создание торговой системы через Python, мониторинг торговли.

Работа через Python имеет преимущества: быстродействие в области машинного обучения, возможность отбора и генерации признаков. Экспорт моделей в ONNX требует в точности повторять логику генерации признаков, как в Python, что непросто. Поэтому выбран путь с онлайн-торговлей через Python.

Постановка задачи и выбор инструмента

Цель проекта — создать прибыльную и устойчивую модель машинного обучения для торговли через Python. Этапы работы:

Инструменты: Python MQL5, библиотеки ML на Python для быстродействия и функциональности.

Итак, с целями и задачами мы определились. В рамках данной статьи будут осуществлены следующие работы:

Настройка окружения, импортов, сбор данных

Сначала нужно получить исторические данные можно через MetaTrader 5. Код устанавливает соединение с торговой платформой, передавая путь к файлу терминала в метод инициализации.

В цикле получаем данные методом mt5.copy_rates_range() с параметрами: инструмент, таймфрейм, начальная и конечная даты. Есть счетчик неудачных попыток и задержка для стабильного соединения.

Функция завершает работу, отключаясь от платформы методом mt5.shutdown().

Это отдельная функция для дальнейшего вызова в программе. 

Для запуска скрипта вам понадобится установить следующие библиотеки:

  1. NumPy: Библиотека для работы с многомерными массивами и математическими функциями.

  2. Pandas: Инструмент для анализа данных, предоставляет удобные структуры данных для работы с таблицами и временными рядами.

  3. Random: Модуль для генерации случайных чисел и выбора случайных элементов из последовательностей.

  4. Datetime: Предоставляет классы и функции для работы с датами и временем.

  5. MetaTrader5: Библиотека для взаимодействия с торговым терминалом MetaTrader 5.

  6. Time: Модуль для работы со временем и задержками выполнения программы.

  7. Concurrent.futures: Инструмент для выполнения параллельных и асинхронных задач. Это нам будет необходимо в будущем. для параллельной мультивалютной работы.

  8. Tqdm: Библиотека для отображения индикаторов прогресса при выполнении итеративных операций. Это нам будет необходимо в будущем. для параллельной мультивалютной работы.

  9. Train_test_split: Функция для разделения набора данных на обучающую и тестовую выборки при обучении моделей машинного обучения.

Вы можете установить их с помощью pip, выполнив следующие команды: Вы можете установить их с помощью pip, выполнив следующие команды:

pip install numpy pandas MetaTrader5 concurrent-futures tqdm sklearn matplotlib imblearn
import numpy as np
import pandas as pd
import random
from datetime import datetime
import MetaTrader5 as mt5
import time
import concurrent.futures
from tqdm import tqdm
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
from sklearn.utils import class_weight
from imblearn.under_sampling import RandomUnderSampler

# GLOBALS
MARKUP = 0.00001
BACKWARD = datetime(2000, 1, 1)
FORWARD = datetime(2010, 1, 1)
EXAMWARD = datetime(2024, 1, 1)
MAX_OPEN_TRADES = 3
symbol = "EURUSD"

def retrieve_data(symbol, retries_limit=300):
    terminal_path = "C:/Program Files/MetaTrader 5/Arima/terminal64.exe"

    attempt = 0
    raw_data = None

    while attempt < retries_limit:
        if not mt5.initialize(path=terminal_path):
            print("MetaTrader initialization failed")
            return None

        instrument_count = mt5.symbols_total()
        if instrument_count > 0:
            print(f"Number of instruments in the terminal: {instrument_count}")
        else:
            print("No instruments in the terminal")

        rates = mt5.copy_rates_range(symbol, mt5.TIMEFRAME_H1, BACKWARD, EXAMWARD)
        mt5.shutdown()

        if rates is None or len(rates) == 0:
            print(f"Data for {symbol} not available (attempt {attempt+1})")
            attempt += 1
            time.sleep(1)
        else:
            raw_data = pd.DataFrame(rates[:-1], columns=['time', 'open', 'high', 'low', 'close', 'tick_volume'])
            raw_data['time'] = pd.to_datetime(raw_data['time'], unit='s')
            raw_data.set_index('time', inplace=True)
            break

    if raw_data is None:
        print(f"Data retrieval failed after {retries_limit} attempts")
        return None

    # Add simple features
    raw_data['raw_SMA_10'] = raw_data['close'].rolling(window=10).mean()
    raw_data['raw_SMA_20'] = raw_data['close'].rolling(window=20).mean()
    raw_data['Price_Change'] = raw_data['close'].pct_change() * 100

    # Additional features
    raw_data['raw_Std_Dev_Close'] = raw_data['close'].rolling(window=20).std()
    raw_data['raw_Volume_Change'] = raw_data['tick_volume'].pct_change() * 100

    raw_data['raw_Prev_Day_Price_Change'] = raw_data['close'] - raw_data['close'].shift(1)
    raw_data['raw_Prev_Week_Price_Change'] = raw_data['close'] - raw_data['close'].shift(7)
    raw_data['raw_Prev_Month_Price_Change'] = raw_data['close'] - raw_data['close'].shift(30)

    raw_data['Consecutive_Positive_Changes'] = (raw_data['Price_Change'] > 0).astype(int).groupby((raw_data['Price_Change'] > 0).astype(int).diff().ne(0).cumsum()).cumsum()
    raw_data['Consecutive_Negative_Changes'] = (raw_data['Price_Change'] < 0).astype(int).groupby((raw_data['Price_Change'] < 0).astype(int).diff().ne(0).cumsum()).cumsum()
    raw_data['Price_Density'] = raw_data['close'].rolling(window=10).apply(lambda x: len(set(x)))
    raw_data['Fractal_Analysis'] = raw_data['close'].rolling(window=10).apply(lambda x: 1 if x.idxmax() else (-1 if x.idxmin() else 0))
    raw_data['Price_Volume_Ratio'] = raw_data['close'] / raw_data['tick_volume']
    raw_data['Median_Close_7'] = raw_data['close'].rolling(window=7).median()
    raw_data['Median_Close_30'] = raw_data['close'].rolling(window=30).median()
    raw_data['Price_Volatility'] = raw_data['close'].rolling(window=20).std() / raw_data['close'].rolling(window=20).mean()

    print("\nOriginal columns:")
    print(raw_data[['close', 'high', 'low', 'open', 'tick_volume']].tail(100))

    print("\nList of features:")
    print(raw_data.columns.tolist())

    print("\nLast 100 features:")
    print(raw_data.tail(100))

    # Replace NaN values with the mean
    raw_data.fillna(raw_data.mean(), inplace=True)

    return raw_data

retrieve_data(symbol)

Определяются глобальные переменные: издержки на спред и комиссии брокеров, даты для обучающей и тестовой выборок, максимальное число сделок, символ.

Выполняется загрузка котировок из MetaTrader5 в цикле. Данные конвертируются в DataFrame и обогащаются признаками:

Эти признаки помогают учитывать факторы, влияющие на цену.

Выводится информация о столбцах и последних записях DataFrame.

Смотрим, как выполнилась наша первая функция:

Первая функция

Все работает. Код успешно загрузил котировки, выполнил подготовку и загрузку признаков. Переходим к следующей части кода.


Аугментация данных для расширения выборки

Аугментация данных — генерация новых обучающих примеров на основе существующих для увеличения выборки и повышения качества модели. Актуальна для прогнозирования временных рядов с ограниченными данными. Снижает ошибку моделей, повышает робастность.

Методы аугментации финансовых данных:

Реализована функция аугментации входных данных, принимающая DataFrame и количество новых примеров для каждого метода. Генерирует новые данные разными подходами и конкатенирует с оригинальным DataFrame.

def augment_data(raw_data, noise_level=0.01, time_shift=1, scale_range=(0.9, 1.1)):
    print(f"Number of rows before augmentation: {len(raw_data)}")

    # Copy raw_data into augmented_data
    augmented_data = raw_data.copy()

    # Add noise
    noisy_data = raw_data.copy()
    noisy_data += np.random.normal(0, noise_level, noisy_data.shape)

    # Replace NaN values with the mean
    noisy_data.fillna(noisy_data.mean(), inplace=True)

    augmented_data = pd.concat([augmented_data, noisy_data])
    print(f"Added {len(noisy_data)} rows after adding noise")

    # Time shift
    shifted_data = raw_data.copy()
    shifted_data.index += pd.DateOffset(hours=time_shift)

    # Replace NaN values with the mean
    shifted_data.fillna(shifted_data.mean(), inplace=True)

    augmented_data = pd.concat([augmented_data, shifted_data])
    print(f"Added {len(shifted_data)} rows after time shift")

    # Scaling
    scale = np.random.uniform(scale_range[0], scale_range[1])
    scaled_data = raw_data.copy()
    scaled_data *= scale

    # Replace NaN values with the mean
    scaled_data.fillna(scaled_data.mean(), inplace=True)

    augmented_data = pd.concat([augmented_data, scaled_data])
    print(f"Added {len(scaled_data)} rows after scaling")

    # Inversion
    inverted_data = raw_data.copy()
    inverted_data *= -1

    # Replace NaN values with the mean
    inverted_data.fillna(inverted_data.mean(), inplace=True)

    augmented_data = pd.concat([augmented_data, inverted_data])
    print(f"Added {len(inverted_data)} rows after inversion")

    print(f"Number of rows after augmentation: {len(augmented_data)}")

    # Print dates by years
    print("Print dates by years:")
    for year, group in augmented_data.groupby(augmented_data.index.year):
        print(f"Year {year}: {group.index}")

    return augmented_data

Вызываем код, получаем следующие результаты:

2

После применения методов аугментации данных наш исходный набор из 150 000 часовых ценовых баров был расширен до впечатляющих 747 000 строк. Многие авторитетные исследования в области машинного обучения показывают, что такое существенное увеличение объема тренировочных данных за счет генерации синтетических примеров положительно сказывается на метриках качества обученных моделей. Рассчитываем, что и в нашем случае этот прием даст желаемый эффект.


Разметка данных

Разметка данных критически важна для успеха алгоритмов обучения с учителем. Она позволяет снабдить исходные данные метками, которые модель затем изучает. Размеченные данные повышают точность, улучшают обобщение, ускоряют обучение и облегчают оценку моделей. В задаче прогнозирования EURUSD мы добавили бинарный столбец "labels", указывающий, превысило ли следующее изменение цены спред и комиссию. Это позволяет модели выучить паттерны переигрывания спреда и безоткатных трендов.

    Разметка играет ключевую роль, позволяя алгоритмам машинного обучения находить сложные закономерности в данных, недоступные для восприятия в чистом необработанном виде. А теперь перейдем к обзору кода.

    def markup_data(data, target_column, label_column, markup_ratio=0.00002):
        data.loc[:, label_column] = np.where(data.loc[:, target_column].shift(-1) > data.loc[:, target_column] + markup_ratio, 1, 0)
    
        data.loc[data[label_column].isna(), label_column] = 0
    
        print(f"Number of markups on price change greater than markup: {data[label_column].sum()}")
    
        return data

    Выполняем данный код, получаем количество меток в данных. Функция возвращает фрейм. Все работает. Кстати, из свыше 700 000 единиц данных, цена менялась больше чем на величину спреда только в 70 000 случаях.

    3

    Продолжаем работу. Теперь у нас еще одна функция разметки данных, на этот раз более близкая к непосредственному заработку.


    Функция целевых меток

    def label_data(data, symbol, min=2, max=48):
        terminal_path = "C:/Program Files/MetaTrader 5/Arima/terminal64.exe"
    
        if not mt5.initialize(path=terminal_path):
            print("Error connecting to MetaTrader 5 terminal")
            return
    
        symbol_info = mt5.symbol_info(symbol)
        stop_level = 100 * symbol_info.point
        take_level = 800 * symbol_info.point
    
        labels = []
    
        for i in range(data.shape[0] - max):
            rand = random.randint(min, max)
            curr_pr = data['close'].iloc[i]
            future_pr = data['close'].iloc[i + rand]
            min_pr = data['low'].iloc[i:i + rand].min()
            max_pr = data['high'].iloc[i:i + rand].max()
    
            price_change = abs(future_pr - curr_pr)
    
            if price_change > take_level and future_pr > curr_pr and min_pr > curr_pr - stop_level:
                labels.append(1)  # Growth
            elif price_change > take_level and future_pr < curr_pr and max_pr < curr_pr + stop_level:
                labels.append(0)  # Fall
            else:
                labels.append(None)
    
        data = data.iloc[:len(labels)].copy()
        data['labels'] = labels
    
        data.dropna(inplace=True)
    
        X = data.drop('labels', axis=1)
        y = data['labels']
    
        rus = RandomUnderSampler(random_state=2)
        X_balanced, y_balanced = rus.fit_resample(X, y)
    
        data_balanced = pd.concat([X_balanced, y_balanced], axis=1)
    
        return data

    Функция получает целевые метки для обучения моделей машинного обучения торговой прибылью. Она соединяется с MetaTrader 5, извлекает информацию о символе и рассчитывает уровни стопа/тейка. Затем для каждой точки входа определяется будущая цена через случайный период. Если изменение цены превышает тейк и не задевает стоп, и удовлетворяет условиям роста/падения, добавляется метка 1.0/0.0 соответственно. Иначе - None. Создается новый датафрейм только с размеченными данными. None заменяются на средние.

    Выводится количество меток роста/падения.

    Итак, все работает. У нас получены данные и их производные, данные аугументированы, существенно дополнены, размечены двумя разными функциями. Переходим к дальнейшему шагу - балансировке классов.

    Кстати, искушенный в машинном обучении читатель уже давно понял, что мы в итоге разработаем модель классификации, а не регрессии. Мне больше нравятся регрессионные модели, я в них вижу немного больше логики для прогнозирования, нежели в моделях классификации. 

    Итак, следующий наш ход - балансировка классов.


    Балансировка классов. Что такое классификация?

    Классификация — фундаментальный метод анализа, основанный на естественной человеческой способности структурировать информацию по общим признакам. Одним из первых применений классификации был поиск месторождений, выделяющий перспективные участки по геологическим признакам.

    С компьютерами классификация вышла на новый уровень, но суть осталась — обнаруживать закономерности в кажущемся хаосе деталей. Для финансовых рынков важно классифицировать динамику цен на рост/падение. Однако классы бывают разбалансированы — трендов мало, а флэтов много.

    Поэтому используются методы балансировки классов: удаление доминирующих примеров или генерация редких. Это позволяет моделям надежно распознавать важные, но редкие паттерны ценовой динамики.

    pip install imblearn

        data = data.iloc[:len(labels)].copy()
        data['labels'] = labels
    
        data.dropna(inplace=True)
    
        X = data.drop('labels', axis=1)
        y = data['labels']
    
        rus = RandomUnderSampler(random_state=2)
        X_balanced, y_balanced = rus.fit_resample(X, y)
    
        data_balanced = pd.concat([X_balanced, y_balanced], axis=1)

    Итак, классы у нас теперь сбалансированы, у нас равное количество меток каждого класса (падение и рост цен).

    Переходим к самому важному в прогнозировании данных — признакам.


    Что такое признаки в машинном обучении?

    Признаки и характеристики — базовые понятия, знакомые каждому с детства. Когда мы описываем объект, мы перечисляем его свойства — это и есть признаки. Набор таких характеристик позволяет полно охарактеризовать предмет.

    То же самое в анализе данных - каждое наблюдение описывается совокупностью числовых и категориальных признаков. Выбор информативных признаков критичен для успеха.

    На более высоком уровне — инженерия признаков, когда из исходных параметров конструируются новые производные характеристики для лучшего описания объекта исследования.

    Таким образом, человеческий опыт познания мира через описание объектов их признаками переносится в науку на уровне формализации и автоматизации.


    Автоматическое извлечение признаков

    Фича инжиниринг — преобразование исходных данных в набор признаков для обучения моделей машинного обучения. Цель — найти наиболее информативные признаки. Есть ручной подход (человек выбирает признаки) и автоматический (с помощью алгоритмов).

    Мы будем использовать автоматический подход. Применим метод создания новых признаков для автоматического извлечения лучших признаков из наших данных. Затем отберем самые информативные из полученного набора.

    Метод генерации новых признаков: 

    def generate_new_features(data, num_features=200, random_seed=1):
        random.seed(random_seed)
        new_features = {}
    
        for _ in range(num_features):
            feature_name = f'feature_{len(new_features)}'
    
            col1_idx, col2_idx = random.sample(range(len(data.columns)), 2)
            col1, col2 = data.columns[col1_idx], data.columns[col2_idx]
    
            operation = random.choice(['add', 'subtract', 'multiply', 'divide', 'shift', 'rolling_mean', 'rolling_std', 'rolling_max', 'rolling_min', 'rolling_sum'])
    
            if operation == 'add':
                new_features[feature_name] = data[col1] + data[col2]
            elif operation == 'subtract':
                new_features[feature_name] = data[col1] - data[col2]
            elif operation == 'multiply':
                new_features[feature_name] = data[col1] * data[col2]
            elif operation == 'divide':
                new_features[feature_name] = data[col1] / data[col2]
            elif operation == 'shift':
                shift = random.randint(1, 10)
                new_features[feature_name] = data[col1].shift(shift)
            elif operation == 'rolling_mean':
                window = random.randint(2, 20)
                new_features[feature_name] = data[col1].rolling(window).mean()
            elif operation == 'rolling_std':
                window = random.randint(2, 20)
                new_features[feature_name] = data[col1].rolling(window).std()
            elif operation == 'rolling_max':
                window = random.randint(2, 20)
                new_features[feature_name] = data[col1].rolling(window).max()
            elif operation == 'rolling_min':
                window = random.randint(2, 20)
                new_features[feature_name] = data[col1].rolling(window).min()
            elif operation == 'rolling_sum':
                window = random.randint(2, 20)
                new_features[feature_name] = data[col1].rolling(window).sum()
    
        new_data = pd.concat([data, pd.DataFrame(new_features)], axis=1)
    
        print("\nGenerated features:")
        print(new_data[list(new_features.keys())].tail(100))
    
        return data

    Обратите внимание на данный параметр:

    random_seed=42

    Параметр random_seed необходим для воспроизводимости результатов генерации новых признаков. Функция generate_new_features создает новые признаки из исходных данных. На вход: данные, количество признаков, сид.

    Инициализируется рандом с заданным сидом. В цикле: генерируется имя, случайно выбираются 2 существующих признака и операция (сложение, вычитание и т.д.). Вычисляется новый признак по выбранной операции.

    После генерации новые признаки добавляются к исходным данным. Возвращаются обновленные данные с автоматически сгенерированными признаками.

    Код позволяет автоматически создавать новые признаки для улучшения качества машинного обучения.

    Запустили код, смотрим результат:

    5

    Сгенерировано 100 новых признаков. Переходим к следующему этапу — кластеризации признаков.


    Кластеризация признаков

    Кластеризация признаков объединяет похожие признаки в группы, чтобы уменьшить их количество. Это помогает избавиться от избыточных данных, снизить корреляцию и упростить модель, не допуская переобучения.

    Популярные алгоритмы: к-средних (задается число кластеров, признаки группируются вокруг центроидов) и иерархическая кластеризация (многоуровневая древовидная структура).

    Кластеризация признаков позволяет разобраться с ворохом бесполезных признаков и повысить эффективность модели.

    Рассмотрим код кластеризации признаков:

    from sklearn.mixture import GaussianMixture
    
    def cluster_features_by_gmm(data, n_components=4):
        X = data.drop(['label', 'labels'], axis=1)
    
        X = X.replace([np.inf, -np.inf], np.nan)
        X = X.fillna(X.median())
    
        gmm = GaussianMixture(n_components=n_components, random_state=1)
    
        gmm.fit(X)
    
        data['cluster'] = gmm.predict(X)
    
        print("\nFeature clusters:")
        print(data[['cluster']].tail(100))
    
        return data

    Мы используем алгоритм GMM (Gaussian Mixture Model) для кластеризации признаков. Основная идея в том, что данные как бы порождаются смесью нормальных распределений, где каждое распределение — это один кластер.

    Сначала задаем число кластеров. Затем задаем начальные параметры модели: средние, ковариационные матрицы и вероятности кластеров. Алгоритм циклически пересчитывает эти параметры методом максимального правдоподобия, пока они не перестанут меняться.

    В итоге получаем финальные параметры для каждого кластера, по которым можно определять, к какому кластеру относится новый признак.

    GMM крутая штука, ее применяют в разных задачах. Она хороша для данных, у которых кластеры имеют сложную форму и размытые границы.

    Этот код использует GMM для разбиения признаков на кластеры. Берутся исходные данные, убираются метки классов. Применяется GMM для разбиения на заданное число кластеров. Номера кластеров добавляются как новый столбец. В конце печатается таблица полученных кластеров.

    Запускаем код кластеризации, видим результаты:

    6

    Перейдем к функции отбора лучших признаков.


    Отбор лучших признаков

    from sklearn.feature_selection import RFECV
    from sklearn.ensemble import RandomForestClassifier
    import pandas as pd
    
    def feature_engineering(data, n_features_to_select=10):
        # Remove the 'label' column as it is not a feature
        X = data.drop(['label', 'labels'], axis=1)
        y = data['labels']
    
        # Create a RandomForestClassifier model
        clf = RandomForestClassifier(n_estimators=100, random_state=1)
    
        # Use RFECV to select n_features_to_select best features
        rfecv = RFECV(estimator=clf, step=1, cv=5, scoring='accuracy', n_jobs=-1, verbose=1,
                      min_features_to_select=n_features_to_select)
        rfecv.fit(X, y)
    
        # Return a DataFrame with the best features, 'label' column, and 'labels' column
        selected_features = X.columns[rfecv.get_support(indices=True)]
        selected_data = data[selected_features.tolist() + ['label', 'labels']]  # Convert selected_features to a list
    
        # Print the table of best features
        print("\nBest features:")
        print(pd.DataFrame({'Feature': selected_features}))
    
        return selected_data
    
    labeled_data_engineered = feature_engineering(labeled_data_clustered, n_features_to_select=10)

    Эта функция выделяет из наших данных самые крутые и полезные признаки (фичи). На вход идут исходные данные с признаками и метками классов, плюс нужное количество отбираемых фич.

    Сначала метки классов сбрасываются, потому что они не помогут в выборе фич. Затем запускается алгоритм Random Forest — модель, строящая кучу деревьев решений на случайных наборах фич.

    После обучения всех деревьев Random Forest оценивает, насколько каждая фича важна и влияет на классификацию. Основываясь на этих оценках важности, функция отбирает самые топовые фичи.

    В финале отобранные фичи добавляются к данным, а метки классов возвращаются. Функция печатает таблицу с выбранными фичами и возвращает обновленные данные.

    Зачем все это? Ну так мы избавляемся от мусорных фич, которые только тормозят процесс. Оставляем самые крутые рулящие фичи, и модель будет показывать лучшие результаты, обучаясь на таких данных.

    Запустили функцию, смотрим что получилось:

    8

    Лучшим признаком для прогноза цен оказалась сама цена открытия. В топ вошли признаки на основе скользящих средних, приращений цен, стандартного отклонения, дневного и месячного изменения цены. Автоматически сгенерированные признаки оказались неинформативными.

    Код позволяет автоматически отобрать важные признаки, что может улучшить производительность и обобщающую способность модели.


    Заключение

    Итак, мы создали код работы с данными, который предвосхищает разработку нашей будущей модели машинного обучения. Кратко повторим пройденные шаги:


    В следующей статье мы займемся выбором оптимальной модели классификации, ее улучшением, внедрением кросс-валидации, и написанием функции тестера для связки Python/MQL5.