|
大家好,我是Lucy@FinTech社区,今天的文章将和大家分享基于Python的交易策略优化遗传算法。欢迎添加以下微信:fintech78,加入tech社群,提认知,攒人脉,求职招聘!
如果你听说过系统交易或算法交易,那么你一定知道策略的优化是决定该策略能否实现盈亏平衡的最重要因素之一。最糟糕的是:优化的计算量很大。想象一个简单的 MACD 交叉策略,将至少有 3 个参数:fast、slow和 signal moving average period,每个参数都有数百个可能的值,使其超过一百万种可能的组合。
遗传算法 (GA)
一种概率和启发式搜索算法,灵感来自达尔文的自然选择理论,即适者生存。在本文,我们将使用 GA 作为一种优化算法来识别最佳参数集。我们将以Nvidia的股价为例实现简单的 MACD 来说明它。请记住,这只是应用 GA 优化交易策略的一个demo,不应简单复制或盲目效仿。
什么是遗传算法?
这是受达尔文进化论的启发,遗传算法是一个迭代过程,用于搜索问题的全局最优解,从而获得在严酷世界中生存的最佳基因到识别最佳参数,再到本文中的交易策略。为了更好地理解遗传算法的工作原理,先介绍一些关键的概念:
基因
这是指解决方案的参数/变量。将基因表示为位(即 0 或 1)是很常见的,但可以根据潜在的问题陈述进行更改。
个体/染色体
代表解决方案的一串基因。
种群
个体的下一代。
适应度函数
用于计算个体适应度得分的函数。根据潜在的问题陈述,我们可以搜索适应度得分最高或最低的个体。为了迎合适者生存的概念,适应度得分越高,个体能够生存和繁衍形成下一代的机会就越大。
选择策略
这定义了我们如何将个体相互比较以选择下一代的种子。一些常见的例子是锦标赛(随机样本的个人面对面 1 对 1,获胜者将赢得下一代的一席之地)、轮盘赌(选择的概率与适应度得分成正比),或者针对更复杂场景的双人锦标赛。
交叉策略
这本质上是在繁衍时将亲本基因传递给后代的方式。交叉应该模仿有性繁殖,其中需要两个父母进行繁殖。然后父母的基因将重新组合形成后代。虽然不同的策略取决于场景和数据类型,但最常见的两种是 k 点交叉和统一交叉。在 k 点交叉中,将随机选择 k 个交叉点,其中一个点右侧的基因被交换。均匀交叉相对更直接:不是交换基因部分,而是每个基因都有相同的机会被交换。
1-point crossover
2-point crossover
突变策略
作为保持基因多样性和防止过早收敛的一种方式,子代的基因将有随机发生突变的机会,这意味着实际值将偏离父母的值。突变通常以位翻转、索引混洗或有界和无界统计分布的形式出现。然后将这些关键概念组合起来形成一个迭代算法:
1. 参数化问题描述。
2. 定义适应度函数。
3. 定义交叉、突变和选择策略。
4. 生成初始种群。
5. 评估群体中个体的适应度。
6. 选择个体,交叉变异形成下一代种群。
7. 重复步骤 5 和 6,直到收敛或满足结束条件。
现在我们已经完成了速成课程,让我们看看如何将其应用于交易策略优化。
准备
在开始之前,让我们确保我们已安装并准备好所有库。除了通常的 Pandas、Numpy 等,我们还将使用以下内容:
Alpha Vantage:免费增值数据提供商。放心。免费许可证就足够了。
Backtrader:很棒的开源 python 框架,让你可以专注于编写可重用的交易策略、指标和分析器,而不必花时间构建基础设施。它支持回测,以便您评估您提出的策略!
DEAP:Python 中的分布式进化算法,一种新颖的进化计算框架,用于快速原型设计和想法测试。
执行下面的命令,全部安装它们:
conda create -n myenv
conda activate myenv && conda install -y python=3.8.5
pip install alpha_vantage backtrader[plotting] deap
数据采集
使用注册Alpha Vantage 的密钥,我们可以使用下面的代码片段获取 Nvidia 在股票代码“NVDA”下的历史股价。处理金融股票数据时要记住的一件事是价格调整。例如,特斯拉在 8 月 31 日经历了五对一的拆分。所以拆分前的一股应该是拆分后的五倍左右。这就是为什么我们需要根据拆分和股息事件来调整价格。
幸运的是,Alpha Vantage 的每日调整端点,我们将根据证券价格研究中心的公式调整收盘价。然后,我们可以使用从未调整的开盘价到未调整的收盘价的百分比变化来估计调整后的开盘价、最高价和最低价:如果当天价格的百分比变化是 +10%,那么收盘价应该是开盘价的 110%,无论是调整后还是不是调整后的收盘价。
import pandas as pd
import numpy as np
import datetime
from pathlib import Path
from alpha_vantage.timeseries import TimeSeries
import sys
import os
ALPHA_VANTAGE_DIR_PATH = Path("Path/to/folder/where/you/store/your/data")
SECRET = "demo"
def get_alpha_vantage(key, ticker):
"""Given a key to Alpha Vantage and a valid ticker, this function will
query alpha vantage and save the dataset into a csv in a predefined
directory using ticker as the filename.
"""
ts = TimeSeries(key=key, output_format="pandas", indexing_type="date")
try:
data, meta_data = ts.get_daily_adjusted(symbol=i, outputsize="full")
data.to_csv(ALPHA_VANTAGE_DIR_PATH / f"{ticker}.csv")
print(f"{ticker} has been downloaded to {ALPHA_VANTAGE_DIR_PATH}/{ticker}.csv")
except:
print(f"{ticker} Not found.")
def read_alpha_vantage(ticker):
"""If the ticker's csv has been downloaded with `get_alpha_vantage`,
this function will return a pandas dataframe of adjusted open, adjusted
high, adjusted low, adjusted close and volume rounded to 4 decimal places
"""
if not (ALPHA_VANTAGE_DIR_PATH / f"{ticker}.csv").exists():
return None
df = pd.read_csv(
ALPHA_VANTAGE_DIR_PATH / f"{ticker}.csv", index_col=0, parse_dates=True
).sort_index()
df = df.rename(
columns={
"1. open": "Open",
"2. high": "High",
"3. low": "Low",
"4. close": "Close",
"5. adjusted close": "Adjusted Close",
"6. volume": "Volume",
"7. dividend amount": "Dividend",
"8. split coefficient": "Split Coefficient",
}
)
df["Unadjusted Open"] = df["Open"]
df["Open"] = df["Close"] * df["Adjusted Close"] / df["Open"]
df["High"] = df["High"] * df["Open"] / df["Unadjusted Open"]
df["Low"] = df["Low"] * df["Open"] / df["Unadjusted Open"]
df["Close"] = df["Adjusted Close"]
return df[["Open", "High", "Low", "Close", "Volume"]].round(4)
get_alpha_vantage(key=SECRET, ticker="NVDA")
df = read_alpha_vantage(ticker="NVDA")
print(df.head())运行完脚本后,read_alpha_vantage 应该返回一个数据框,其中包含类似于以下内容的第一行和最后 5 行,具体取决于您查询 Alpha Vantage 的时间:date,Open,High,Low,Close,Volume
1999-11-01,1.9456,2.1808,1.9456,1.8007,1630300.0
1999-11-02,2.0005,2.0999,1.9846,1.9156,1744800.0
1999-11-03,2.2191,2.4008,2.2028,2.1026,4191000.0
1999-11-04,2.3528,2.5384,2.3528,2.2367,2625700.0
1999-11-05,2.0384,2.0554,1.8264,2.1647,1284100.0date,Open,High,Low,Close,Volume
2020-12-24,518.0158,521.8004,514.1021,519.7500,244708.0
2020-12-28,509.5809,509.5809,497.9039,516.0000,5286655.0
2020-12-29,518.4610,524.3777,515.1417,517.7300,4259304.0
2020-12-30,532.1220,541.2748,528.2895,525.8300,5634929.0
2020-12-31,518.9602,519.5380,510.2098,522.2000,4810610.0
Backtrader框架
在 Backtrader 中,策略需要遵循 backtrader.strategy 的接口。界面中最重要的组件是:
params:策略使用的参数
__init__:我们在其中进行数据准备和定义指标
next():决定下一步策略应该做什么import backtrader as bt
class CrossoverStrategy(bt.Strategy):
params = dict()
def __init__(self):
# initialise strategy
# do data prep
# define indicators
pass
def next(self):
# trading logic
pass一个简单的 MACD 交叉策略由四行组成:
Fast(12 天)指数移动平均线。
Slow(26 天)指数移动平均线。
MACD线,快速移动平均线和慢速移动平均线之间的差异。
Singal line,MACD线的9日均线。
我们还需要在 next() 下放入交易逻辑。为简单起见,此策略将是多头的:
多头入场:当 MACD 线穿过信号线时。
多头退出:当 MACD 线穿过信号线下方时。import backtrader as bt
class CrossoverStrategy(bt.Strategy):
# list of parameters which are configurable for the strategy
params = dict(fast_period=12, slow_period=26, signal_period=9)
def __init__(self):
self.fast_ma = bt.indicators.EMA(self.data.close, period=self.p.fast_period)
self.slow_ma = bt.indicators.EMA(self.data.close, period=self.p.slow_period)
self.macd_line = self.fast_ma - self.slow_ma
self.signal_line = bt.indicators.EMA (self.macd_line, period=self.p.signal_period)
self.macd_crossover = bt.indicators.CrossOver (self.macd_line, self.signal_line)
def next(self):
if self.macd_crossover > 0:
self.buy() # enter long position
elif self.macd_crossover < 0:
self.close() # close long position
该策略现在需要馈送到 Backtrader 的主引擎 — backtrader.cerebro,以及我们的数据、策略跟踪器 (bt.observers) 和分析器 (bt.analyzers),以及其他经纪级别设置,如佣金和账户余额。TICKER = &#34;NVDA&#34;
STRATEGY_PARAMS = dict(fast_period=12, slow_period=26, signal_period=9)
def run_backtest(plot=True, **strategy_params):
cerebro = bt.Cerebro()
cerebro.adddata(
bt.feeds.PandasData(dataname=read_alpha_vantage(ticker=TICKER), name=TICKER)
)
# Remember to set it high enough or the strategy may not
# be able to trade because of short of cash
cerebro.broker.setcash(10000.0)
# Print out the starting conditions
print(f&#34;Starting Portfolio Value: {cerebro.broker.getvalue():,.2f}&#34;)
# Although we have defined some default params in the strategy,
# we can override it by passing in keyword arguments here.
cerebro.addstrategy(CrossoverStrategy, **strategy_params)
cerebro.addobserver(bt.observers.Trades)
cerebro.addobserver(bt.observers.DrawDown)
cerebro.addanalyzer(bt.analyzers.SharpeRatio)
# Let&#39;s say that we have 0.25% slippage and commission per trade,
# that is 0.5% in total for a round trip.
cerebro.broker.setcommission(commission=0.0025, margin=False)
# Run over everything
strats = cerebro.run()
print(f&#34;Final Portfolio Value:{cerebro.broker.getvalue():,.2f}&#34;)
if plot:
cerebro.plot()
当您执行 run_backtest(plot=True, **STRATEGY_PARAMS) 时,你应该得到如下内容:
Backtrader的结果
遗传算法参数优化
使用默认参数从 Nvidia 的火箭中只赚 73.16 美元,这看起来一点也不乐观。让我们尝试优化参数,看看我们是否可以让它更可行。
基于之前的速成课程,让我们定义我们的算法如下:
Gene:这应该是我们 MACD 策略的参数。不幸的是,DEAP 不适用于关键字参数,因为交叉需要索引切片。相反,我们将使用一个列表来存储参数。
初始种群:我们的初始种群将有 100 个个体,每个个体的随机整数 fast_period 在 [1, 151) 范围内,slow_period 在 [10, 251) 范围内,signal_period 在 [1, 301) 范围内。这意味着总共有 150 x 240 x 300 = 1080 万种可能的组合。
适应度函数:为了平衡回报和风险,我们将使用总利润/最大回撤作为适应度函数的唯一目标。由于 DEAP 是一个通用框架,它支持具有多个目标的适应度函数。因此,该函数需要返回一个适应度得分列表。
选择策略:我们将采用经典的锦标赛方法,每轮锦标赛的获胜者将被选为下一代的种子。
交叉策略:我们将使用统一交叉,每个基因有 50% 的机会交叉。
突变策略:我们将使用 [1, 101) 范围内的整数的均匀分布,每个基因的突变概率为 30%。
结束条件:算法将在完成 20 次迭代后停止。
下面的代码片段是我们的遗传算法在上述设置下的实现,它将在算法的迭代中跟踪最优秀的个体。import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns
from tqdm import trange
import time
import random
from deap import base
from deap import creator
from deap import tools
# fix the seed so that we will get the same results
# feel free to change it or comment out the line
random.seed(1)
# GA parameters
PARAM_NAMES = [&#34;fast_period&#34;, &#34;slow_period&#34;, &#34;signal_period&#34;]
NGEN = 20
NPOP = 100
CXPB = 0.5
MUTPB = 0.3
data = bt.feeds.PandasData(dataname=read_alpha_vantage(ticker=TICKER), name=TICKER)
def evaluate(individual, plot=False, log=False):
# convert list of parameter values into dictionary of kwargs
strategy_params = {k: v for k, v in zip(PARAM_NAMES, individual)}
# fast moving average by definition cannot be slower than the slow one
if strategy_params[&#34;fast_period&#34;] >= strategy_params[&#34;slow_period&#34;]:
return [-np.inf]
# by setting stdstats to False, backtrader will not store the changes in
# statistics like number of trades, buys & sells, etc.
cerebro = bt.Cerebro(stdstats=False)
cerebro.adddata(data)
# Remember to set it high enough or the strategy may not
# be able to trade because of short of cash
initial_capital = 10_000.0
cerebro.broker.setcash(initial_capital)
# Pass in the genes of the individual as kwargs
cerebro.addstrategy(CrossoverStrategy, **strategy_params)
# This is needed for calculating our fitness score
cerebro.addanalyzer(bt.analyzers.DrawDown)
# Let&#39;s say that we have 0.25% slippage and commission per trade,
# that is 0.5% in total for a roud trip.
cerebro.broker.setcommission(commission=0.0025, margin=False)
# Run over everything
strats = cerebro.run()
profit = cerebro.broker.getvalue() - initial_capital
max_dd = strats[0].analyzers.drawdown.get_analysis()[&#34;max&#34;][&#34;moneydown&#34;]
fitness = profit / (max_dd if max_dd > 0 else 1)
if log:
print(f&#34;Starting Portfolio Value: {initial_capital:,.2f}&#34;)
print(f&#34;Final Portfolio Value:{cerebro.broker.getvalue():,.2f}&#34;)
print(f&#34;Total Profit: {profit:,.2f}&#34;)
print(f&#34;Maximum Drawdown: {max_dd:,.2f}&#34;)
print(f&#34;Profit / Max DD: {fitness}&#34;)
if plot:
cerebro.plot()
return [fitness]
# our fitness score is supposed to be maximised and there is only 1 objective
creator.create(&#34;FitnessMax&#34;, base.Fitness, weights=(1.0,))
# our individual is a list of genes, with the fitness score the higher the better
creator.create(&#34;Individual&#34;, list, fitness=creator.FitnessMax)
# register some handy functions for calling
toolbox = base.Toolbox()
toolbox.register(&#34;indices&#34;, random.sample, range(NPOP), NPOP)
# crossover strategy
toolbox.register(&#34;mate&#34;, tools.cxUniform, indpb=CXPB)
# mutation strategy
toolbox.register(&#34;mutate&#34;, tools.mutUniformInt, low=1, up=151, indpb=0.2)
# selection strategy
toolbox.register(&#34;select&#34;, tools.selTournament, tournsize=3)
# fitness function
toolbox.register(&#34;evaluate&#34;, evaluate)
# definition of an individual & a population
toolbox.register(&#34;attr_fast_period&#34;, random.randint, 1, 51)
toolbox.register(&#34;attr_slow_period&#34;, random.randint, 10, 151)
toolbox.register(&#34;attr_signal_period&#34;, random.randint, 1, 101)
toolbox.register(
&#34;individual&#34;,
tools.initCycle,
creator.Individual,
(
toolbox.attr_fast_period,
toolbox.attr_slow_period,
toolbox.attr_signal_period,
),
)
toolbox.register(&#34;population&#34;, tools.initRepeat, list, toolbox.individual)
mean = np.ndarray(NGEN)
best = np.ndarray(NGEN)
hall_of_fame = tools.HallOfFame(maxsize=3)
t = time.perf_counter()
pop = toolbox.population(n=NPOP)
for g in trange(NGEN):
# Select the next generation individuals
offspring = toolbox.select(pop, len(pop))
# Clone the selected individuals
offspring = list(map(toolbox.clone, offspring))
# Apply crossover on the offspring
for child1, child2 in zip(offspring[::2], offspring[1::2]):
if random.random() < CXPB:
toolbox.mate(child1, child2)
del child1.fitness.values
del child2.fitness.values
# Apply mutation on the offspring
for mutant in offspring:
if random.random() < MUTPB:
toolbox.mutate(mutant)
del mutant.fitness.values
# Evaluate the individuals with an invalid fitness
invalid_ind = [ind for ind in offspring if not ind.fitness.valid]
fitnesses = toolbox.map(toolbox.evaluate, invalid_ind)
for ind, fit in zip(invalid_ind, fitnesses):
ind.fitness.values = fit
# The population is entirely replaced by the offspring
pop[:] = offspring
hall_of_fame.update(pop)
print(
&#34;HALL OF FAME:\n&#34;
+ &#34;\n&#34;.join(
[
f&#34; {_}: {ind}, Fitness: {ind.fitness.values[0]}&#34;
for _, ind in enumerate(hall_of_fame)
]
)
)
fitnesses = [
ind.fitness.values[0] for ind in pop if not np.isinf(ind.fitness.values[0])
]
mean[g] = np.mean(fitnesses)
best[g] = np.max(fitnesses)
end_t = time.perf_counter()
print(f&#34;Time Elapsed: {end_t - t:,.2f}&#34;)
fig, ax = plt.subplots(sharex=True, figsize=(16, 9))
sns.lineplot(x=range(NGEN), y=mean, ax=ax, label=&#34;Average Fitness Score&#34;)
sns.lineplot(x=range(NGEN), y=best, ax=ax, label=&#34;Best Fitness Score&#34;)
ax.set_title(&#34;Fitness Score&#34;)
ax.set_xticks(range(NGEN))
ax.set_xlabel(&#34;Iteration&#34;)
plt.tight_layout()
plt.show()
运行代码段后,我们得到以下结果
HALL OF FAME:
0: [36, 84, 5], Fitness: 5.331913978803928
1: [36, 87, 5], Fitness: 5.329609884401389
2: [5, 143, 44], Fitness: 5.309777394151804
适应分数在 20 次迭代内收敛
从上图中,我们可以看到性能如何在大约 10 次迭代中快速收敛到解决方案。这意味着,该算法没有尝试全部 1080 万个组合,而是通过一千次回测运行为我们提供了最优解决方案的候选者,不到问题空间的 0.1%。假设每一代个体中的所有个体都不同,这已经是我们案例的最坏情况估计。
如果我们将 dict (fast_period=5, slow_period=82, signal_period=38) 的最优解插回前面用以下几行介绍的回测函数,我们可以看到现在利润是默认 MACD 的 7 倍多一点 交叉策略:
OPTIMISED_STRATEGY_PARAMS = {
k: v for k, v in zip(PARAM_NAMES, hall_of_fame[0])}
run_backtest(**OPTIMISED_STRATEGY_PARAMS)
遗传算法优化策略的性能
结论在本文中,我们介绍了遗传算法的关键概念,并展示了我们如何使用它们来优化交易策略。
从历史上看,参数优化对计算能力的要求很高。在我们的示例中,只有三个参数的简单 MACD 交叉策略已经生成了数百万种可能的组合。即使每个回测可以在 0.01 秒内完成,那总共已经是 30 小时的计算时间。随着参数数量的增加,这只会呈指数增长。
使用遗传算法,我们可以在大约 10 次迭代后收敛到全局最优的候选者,这不到问题空间的 0.1%。优化后的 MACD 策略也比使用默认参数的策略表现好很多,超过原始利润的 7 倍。
话虽如此,请记住,本文是展示我们如何利用遗传算法作为优化工具,因此一直专注于底层概念和简单的代码结构。 |
本帖子中包含更多资源
您需要 登录 才可以下载或查看,没有账号?立即注册
×
|