# 基础库导入
from __future__ import print_function
from __future__ import division
import warnings
# warnings.filterwarnings('ignore')
# warnings.simplefilter('ignore')
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import ipywidgets
%matplotlib inline
import os
import sys
# 使用insert 0即只使用github,避免交叉使用了pip安装的abupy,导致的版本不一致问题
sys.path.insert(0, os.path.abspath('../'))
import abupy
from abupy import AbuFactorAtrNStop, AbuFactorPreAtrNStop, AbuFactorCloseAtrNStop, AbuFactorBuyBreak
from abupy import AbuFactorSellBreak
from abupy import abu, EMarketTargetType, AbuMetricsBase, ABuMarketDrawing, ABuProgress, ABuSymbolPd
from abupy import EMarketTargetType, EDataCacheType, EMarketSourceType, EMarketDataFetchMode, EStoreAbu, AbuUmpMainMul
from abupy import AbuUmpMainDeg, AbuUmpMainJump, AbuUmpMainPrice, AbuUmpMainWave, feature, AbuFeatureDegExtend
from abupy import AbuUmpEdgeDeg, AbuUmpEdgePrice, AbuUmpEdgeWave, AbuUmpEdgeFull, AbuUmpEdgeMul, AbuUmpEegeDegExtend
from abupy import AbuUmpMainDegExtend, ump, Parallel, delayed, AbuMulPidProgress
pd.options.display.max_rows = 1000
from IPython.display import display
from IPython.core.display import HTML
from pprint import pprint, pformat
# 关闭沙盒数据
abupy.env.disable_example_env_ipython()
#os.path.join(ABuEnv.g_project_cache_dir, 'abu_socket_progress')
股池为上证50 + 创业50 + 沪深300 + 中证500 几大指数的成分股
import random
from stock_pool import stock_pool
def custom_symbols():
symbols = []
symbols.extend( random.sample(stock_pool.get('zh500'), 500))
symbols.extend( random.sample(stock_pool.get('hs300'), 300))
symbols.extend( random.sample(stock_pool.get('sh50'), 49))
symbols.extend( random.sample(stock_pool.get('chuang50'), 50))
return symbols
symbols = custom_symbols()
print(symbols)
一个经典的均线多头排列策略,策略本身优秀与否不是重点,本次主要是研究 弱人工智能如何帮我们提高收益,是观察没有使用AI和使用AI后 的对比。
from picker.WzPickNonNew import WzPickNonNew
from picker.WzPickMaAng import WzPickMaAng
from abupy import AbuPtPosition
from factor.WzFactorBuyRsiV8 import WzFactorBuyRsiV8
from factor.WzFactorSellRsiV8 import WzFactorSellRsiV8
from factor.WzFactorBuyMaV6 import WzFactorBuyMaV6
from factor.WzFactorSellMaV5 import WzFactorSellMaV5
from abupy import slippage
# 开启针对非集合竞价阶段的涨停,滑点买入价格以高概率在接近涨停的价格买入
slippage.sbb.g_enable_limit_up = True
# 将集合竞价阶段的涨停买入成功概率设置为0,如果设置为0.2即20%概率成功买入
slippage.sbb.g_pre_limit_up_rate = 0
# 开启针对非集合竞价阶段的跌停,滑点卖出价格以高概率在接近跌停的价格卖出
slippage.ssb.g_enable_limit_down = True
# 将集合竞价阶段的跌停卖出成功概率设置为0, 如果设置为0.2即20%概率成功卖出
slippage.ssb.g_pre_limit_down_rate = 0
abupy.env.g_data_cache_type = EDataCacheType.E_DATA_CACHE_CSV
abupy.env.g_market_target = EMarketTargetType.E_MARKET_TARGET_CN
abupy.env.g_data_fetch_mode = EMarketDataFetchMode.E_DATA_FETCH_FORCE_LOCAL
buy_factors=[{'buy_default': 'post=36.88',
'class': WzFactorBuyRsiV8,
# 'position': {'class': AbuPtPosition,
# 'past_day_cnt': 68,
# 'pos_base': pt_pos_base },
'rsi_timeperiod': 6,
'stock_pickers': [{'class': WzPickNonNew,
'pick_period': 'week',
'weeks': 24},
# {'class':WzPickMaAng,
# 'ma_timeperiod': 60,
# 'xdd': 20,
# 'min_ang':-1.2, # 2.7 , 5.0 , 7.1
# 'max_ang':90,
# 'pick_period': 'week',
# },
]
}]
# 卖出因子继续使用上一节使用的因子
sell_factors = [
# {'stop_loss_n': 1.0, 'stop_win_n': 3.0,
# 'class': AbuFactorAtrNStop},
# {'class': AbuFactorPreAtrNStop, 'pre_atr_n': 1.5},
{'class': WzFactorSellMaV5 ,
'ma_type': 1,
'sell_x': 'rvv=5,6,0.22,yc,1.0'},
{'class': WzFactorSellRsiV8,
'rsi_timeperiod': 6,
'sell_default':'rvv=10,-,78,16,65,1.0' }, #
{'class': AbuFactorCloseAtrNStop, 'close_atr_n': 1.75},
{'class': AbuFactorAtrNStop, 'stop_loss_n': 2.1},
{'class': WzFactorSellRsiV8,
'rsi_timeperiod': 6,
'sell_default': 'lt=2,29,1.0|lt=3,39,1.0'},
]
print("随便输出点什么表示已经执行过: ")
将股池所有股票分切成4块,拿3/4用于机器人裁判训练, 剩下1/4是测试集用途。
# import psutil
# from warnings import simplefilter
# simplefilter(action='ignore', category=FutureWarning)
# simplefilter(action='ignore', category=ResourceWarning)
# g_cpu_cnt = psutil.cpu_count(logical=True) * 1
# print ("g_cpu_cnt:", g_cpu_cnt)
# 回测生成买入时刻特征
abupy.env.g_enable_ml_feature = True
# 回测开始时将symbols切割分为训练集数据和测试集两份,使用训练集进行回测
abupy.env.g_enable_train_test_split = True
# 训练:测试 = 3:1
abupy.env.g_split_tt_n_folds = 4
feature.clear_user_feature()
feature.append_user_feature(AbuFeatureDegExtend)
read_cash = 5000000 #500w资金
o_pos_base =0.1 #abupy.beta.atr.g_atr_pos_base
print('o_pos_base:', o_pos_base)
abupy.beta.atr.g_atr_pos_base = o_pos_base / 25
print('abupy.beta.atr.g_atr_pos_base:' , abupy.beta.atr.g_atr_pos_base)
abu_result_tuple = None
def run_loop_back():
global abu_result_tuple, symbols
print(symbols)
abu_result_tuple, _ = abu.run_loop_back(read_cash,
buy_factors,
sell_factors,
choice_symbols = symbols,
# choice_symbols=None,
start='2016-02-02', end='2019-09-12',
# stock_picks= stock_pickers,
# n_process_kl = g_cpu_cnt,
# n_process_pick= g_cpu_cnt,
)
# 把运行的结果保存在本地,以便之后分析回测使用,保存回测结果数据代码如下所示
abu.store_abu_result_tuple(abu_result_tuple, n_folds=3, store_type=EStoreAbu.E_STORE_CUSTOM_NAME,
custom_name='train_cn')
ABuProgress.clear_output()
AbuMetricsBase.show_general(*abu_result_tuple, only_show_returns=True).plot_max_draw_down()
def run_load_train():
global abu_result_tuple
abu_result_tuple = abu.load_abu_result_tuple(n_folds=3, store_type=EStoreAbu.E_STORE_CUSTOM_NAME,
custom_name='train_cn')
AbuMetricsBase.show_general(*abu_result_tuple, only_show_returns=True).plot_max_draw_down()
def select(select):
if select == 'run loop back':
run_loop_back()
else:
run_load_train()
# _ = ipywidgets.interact_manual(select, select=[ 'load train data', 'run loop back'])
run_load_train()
策略效果基本和大盘同步
上面的abupy.env.g_enable_train_test_split = True
和abupy.env.g_split_tt_n_folds = 4
配置自动把训练集和测试集分切好了,打印出来看看。
from abupy import ABuFileUtil
a = ['1', '2', '3', 'a', 'b', 'c']
b = ['0','2','b','f**k']
print('a&b交集为:', set(a).intersection(b) )
train = ABuFileUtil.load_pickle("/root/abu/data/cache/market_train_symbols_hs")
test = ABuFileUtil.load_pickle("/root/abu/data/cache/market_test_symbols_hs")
print('训练集和测试集交集为:', set(train).intersection(test) )
print( len(train), "个训练集股票:\r\n", train, "\r\n")
print( len(test), "个测试集股票:\r\n", test)
# import warnings
# warnings.simplefilter("ignore", ResourceWarning)
# 需要全局设置为A股市场,在ump会根据市场类型保存读取对应的ump
abupy.env.g_market_target = EMarketTargetType.E_MARKET_TARGET_CN
ump_deg=None
ump_jump=None
ump_mul=None
ump_price=None
ump_main_deg_extend=None
ump_wave=None
# 使用训练集交易数据训练主裁
orders_pd_train_cn = abu_result_tuple.orders_pd
def train_main_ump():
print('AbuUmpMainDeg begin...') #brust_min=False,
AbuUmpMainDeg.ump_main_clf_dump(orders_pd_train_cn, save_order=False, show_order=False)
print('AbuUmpMainWave begin...')
AbuUmpMainWave.ump_main_clf_dump(orders_pd_train_cn, save_order=False, show_order=False)
print('AbuUmpMainPrice begin...')
AbuUmpMainPrice.ump_main_clf_dump(orders_pd_train_cn, save_order=False, show_order=False)
print('AbuUmpMainJump begin...')
AbuUmpMainJump.ump_main_clf_dump(orders_pd_train_cn, save_order=False, show_order=False)
print('AbuUmpMainMul begin...')
AbuUmpMainMul.ump_main_clf_dump(orders_pd_train_cn, save_order=False, show_order=False)
print('AbuUmpMainDegExtend begin...')
AbuUmpMainDegExtend.ump_main_clf_dump(orders_pd_train_cn, save_order=False, show_order=False)
# 依然使用load_main_ump,避免下面多进程内存拷贝过大
load_main_ump()
def load_main_ump():
global ump_deg, ump_jump, ump_mul, ump_price, ump_main_deg_extend, ump_wave
ump_deg = AbuUmpMainDeg(predict=True)
ump_jump = AbuUmpMainJump(predict=True)
ump_mul = AbuUmpMainMul(predict=True)
ump_price = AbuUmpMainPrice(predict=True)
ump_main_deg_extend = AbuUmpMainDegExtend(predict=True)
ump_wave = AbuUmpMainWave(predict=True )
print('load main ump complete!')
def select(select):
if select == 'train main ump':
train_main_ump()
else:
load_main_ump()
# _ = ipywidgets.interact_manual(select, select=['load main ump', 'train main ump'])
load_main_ump()
# from abupy import AbuUmpEdgeDeg, AbuUmpEdgePrice, AbuUmpEdgeWave, AbuUmpEdgeFull, AbuUmpEdgeMul, AbuUmpEegeDegExtend
# AbuUmpEdgeWave, AbuUmpEdgeFull, AbuUmpEdgeMul
# 需要全局设置为A股市场,在ump会根据市场类型保存读取对应的ump
abupy.env.g_market_target = EMarketTargetType.E_MARKET_TARGET_CN
# 使用训练集交易数据训练
orders_pd_train_cn = abu_result_tuple.orders_pd
print('AbuUmpEdgeDeg begin...')
AbuUmpEdgeDeg.ump_edge_clf_dump(orders_pd_train_cn)
edge_deg = AbuUmpEdgeDeg(predict=True)
print('AbuUmpEdgePrice begin...')
AbuUmpEdgePrice.ump_edge_clf_dump(orders_pd_train_cn)
edge_price = AbuUmpEdgePrice(predict=True)
print('AbuUmpEdgeMul begin...')
AbuUmpEdgeMul.ump_edge_clf_dump(orders_pd_train_cn)
edge_mul = AbuUmpEdgeMul(predict=True)
print('AbuUmpEegeDegExtend begin...')
AbuUmpEegeDegExtend.ump_edge_clf_dump(orders_pd_train_cn)
edge_deg_extend = AbuUmpEegeDegExtend(predict=True)
print('AbuUmpEdgeWave begin...')
AbuUmpEdgeWave.ump_edge_clf_dump(orders_pd_train_cn)
edge_wave = AbuUmpEdgeWave(predict=True)
print('AbuUmpEdgeFull begin...')
AbuUmpEdgeFull.ump_edge_clf_dump(orders_pd_train_cn)
edge_full = AbuUmpEdgeFull(predict=True)
print('fit edge complete!')
# 测试集回测时依然生成买入时刻特征
abupy.env.g_enable_ml_feature = True
# 回测时不重新切割训练集数据和测试集
abupy.env.g_enable_train_test_split = False
# 回测时使用切割好的测试数据
abupy.env.g_enable_last_split_test = True
# 测试集依然使用10,30,50,90,120日走势拟合角度特征AbuFeatureDegExtend,做为回测时的新的视角来录制比赛(记录回测特征)
feature.clear_user_feature()
feature.append_user_feature(AbuFeatureDegExtend)
read_cash = 5000000 #500w资金
o_pos_base =0.1 #abupy.beta.atr.g_atr_pos_base
print('o_pos_base:', o_pos_base)
abupy.beta.atr.g_atr_pos_base = o_pos_base / 8
print('abupy.beta.atr.g_atr_pos_base:' , abupy.beta.atr.g_atr_pos_base)
abu_result_tuple_test = None
order_has_result = None
def run_loop_back_test():
global abu_result_tuple_test, order_has_result
abu_result_tuple_test, _ = abu.run_loop_back(read_cash,
buy_factors,
sell_factors,
choice_symbols=None,
start='2016-02-02', end='2019-09-12',
# n_process_kl = 6,
# n_process_pick= 6,
)
# 把运行的结果保存在本地,以便之后分析回测使用,保存回测结果数据代码如下所示
abu.store_abu_result_tuple(abu_result_tuple_test, n_folds=3, store_type=EStoreAbu.E_STORE_CUSTOM_NAME,
custom_name='test_cn')
ABuProgress.clear_output()
AbuMetricsBase.show_general(*abu_result_tuple_test, only_show_returns=True).plot_max_draw_down()
# 验证A股主裁是否称职
# 选取有交易结果的数据order_has_result
order_has_result = abu_result_tuple_test.orders_pd[abu_result_tuple_test.orders_pd.result != 0]
order_has_result.filter(regex='^buy(_deg_|_price_|_wave_|_jump)').head()
def run_load_test():
global abu_result_tuple_test, order_has_result
abu_result_tuple_test = abu.load_abu_result_tuple(n_folds=3, store_type=EStoreAbu.E_STORE_CUSTOM_NAME,
custom_name='test_cn')
AbuMetricsBase.show_general(*abu_result_tuple_test, only_show_returns=True).plot_max_draw_down()
# 验证A股主裁是否称职
# 选取有交易结果的数据order_has_result
order_has_result = abu_result_tuple_test.orders_pd[abu_result_tuple_test.orders_pd.result != 0]
order_has_result.filter(regex='^buy(_deg_|_price_|_wave_|_jump)').head()
def select_test(select):
if select == 'run loop back':
run_loop_back_test()
else:
run_load_test()
# _ = ipywidgets.interact_manual(select_test, select=['load test data', 'run loop back',])
run_load_test()
上面是没有启用机器人裁判的成绩。
# from warnings import simplefilter
# simplefilter(action='ignore', category=FutureWarning)
# simplefilter(action='ignore', category=ResourceWarning)
# abupy.env.g_data_cache_type = EDataCacheType.E_DATA_CACHE_CSV
# abupy.env.g_market_target = EMarketTargetType.E_MARKET_TARGET_CN
# abupy.env.g_data_fetch_mode = EMarketDataFetchMode.E_DATA_FETCH_FORCE_LOCAL
# 开启内置主裁
abupy.env.g_enable_ump_main_deg_block = True
# abupy.env.g_enable_ump_main_price_block = True
# 开启内置边裁
abupy.env.g_enable_ump_edge_deg_block = True
# abupy.env.g_enable_ump_edge_price_block = True
# 回测时需要开启特征生成,因为裁判开启需要生成特征做为输入
abupy.env.g_enable_ml_feature = True
# 回测时使用上一次切割好的测试集数据
abupy.env.g_enable_last_split_test = True
""" """
feature.clear_user_feature()
# 10,30,50,90,120日走势拟合角度特征的AbuFeatureDegExtend,做为回测时的新的视角来录制比赛
feature.append_user_feature(AbuFeatureDegExtend)
# 打开使用用户自定义裁判开关
ump.manager.g_enable_user_ump = True
# 先clear一下
ump.manager.clear_user_ump()
# 把新的裁判AbuUmpMainDegExtend类名称使用append_user_ump添加到系统中
ump.manager.append_user_ump(AbuUmpEegeDegExtend)
# 把新的裁判AbuUmpMainDegExtend类名称使用append_user_ump添加到系统中
ump.manager.append_user_ump(AbuUmpMainDegExtend)
o_pos_base =0.1 #abupy.beta.atr.g_atr_pos_base
print('o_pos_base:', o_pos_base)
abupy.beta.atr.g_atr_pos_base = o_pos_base / 8
print('abupy.beta.atr.g_atr_pos_base:' , abupy.beta.atr.g_atr_pos_base)
abu_result_tuple_test_ump = None
read_cash = 5000000 #500w资金
def run_loop_back_ump():
global abu_result_tuple_test_ump, read_cash
abu_result_tuple_test_ump, _ = abu.run_loop_back(read_cash,
buy_factors,
sell_factors,
# choice_symbols = symbols,
choice_symbols=None,
# stock_picks= stock_pickers,
start='2016-02-02', end='2019-09-12',
# n_process_kl = 1,
# n_process_pick= 1,
)
# 把运行的结果保存在本地,以便之后分析回测使用,保存回测结果数据代码如下所示
abu.store_abu_result_tuple(abu_result_tuple_test_ump, n_folds=3, store_type=EStoreAbu.E_STORE_CUSTOM_NAME,
custom_name='test_ump_cn')
ABuProgress.clear_output()
AbuMetricsBase.show_general(*abu_result_tuple_test_ump).plot_max_draw_down() #, returns_cmp=True
def run_load_ump():
global abu_result_tuple_test_ump
abu_result_tuple_test_ump = abu.load_abu_result_tuple(n_folds=3, store_type=EStoreAbu.E_STORE_CUSTOM_NAME,
custom_name='test_ump_cn')
AbuMetricsBase.show_general(*abu_result_tuple_test_ump).plot_max_draw_down() #, returns_cmp=True
def select_ump(select):
if select == 'run loop back ump':
run_loop_back_ump()
else:
run_load_ump()
# _ = ipywidgets.interact_manual(select_ump, select=['load test ump data','run loop back ump'])
# print("随便输出点什么表示已经执行过: ")
run_load_ump()
盈亏比从 0.9930 提高到 1.6230,已经扭亏为盈了。
但是由于机器人裁判拦截了不少胜率不大的交易,交易机会少了,所以策略资金利用率比例:27.2689%, 下面强制提高单次买入的仓位试试,看能不能提高整体收益。
abupy.beta.atr.g_atr_pos_base = o_pos_base /8
上面的单次买入仓位
abupy.beta.atr.g_atr_pos_base = o_pos_base /5
下面的单次买入仓位
# from warnings import simplefilter
# simplefilter(action='ignore', category=FutureWarning)
# simplefilter(action='ignore', category=ResourceWarning)
# 开启内置主裁
abupy.env.g_enable_ump_main_deg_block = True
# abupy.env.g_enable_ump_main_price_block = True
# 开启内置边裁
abupy.env.g_enable_ump_edge_deg_block = True
# abupy.env.g_enable_ump_edge_price_block = True
# 回测时需要开启特征生成,因为裁判开启需要生成特征做为输入
abupy.env.g_enable_ml_feature = True
# 回测时使用上一次切割好的测试集数据
abupy.env.g_enable_last_split_test = True
""" """
feature.clear_user_feature()
# 10,30,50,90,120日走势拟合角度特征的AbuFeatureDegExtend,做为回测时的新的视角来录制比赛
feature.append_user_feature(AbuFeatureDegExtend)
# 打开使用用户自定义裁判开关
ump.manager.g_enable_user_ump = True
# 先clear一下
ump.manager.clear_user_ump()
# 把新的裁判AbuUmpMainDegExtend类名称使用append_user_ump添加到系统中
ump.manager.append_user_ump(AbuUmpEegeDegExtend)
# 把新的裁判AbuUmpMainDegExtend类名称使用append_user_ump添加到系统中
ump.manager.append_user_ump(AbuUmpMainDegExtend)
o_pos_base =0.1 #abupy.beta.atr.g_atr_pos_base
print('o_pos_base:', o_pos_base)
abupy.beta.atr.g_atr_pos_base = o_pos_base /5
print('abupy.beta.atr.g_atr_pos_base:' , abupy.beta.atr.g_atr_pos_base)
abu_result_tuple_test_ump = None
read_cash = 5000000 #500w资金
def run_loop_back_ump():
global abu_result_tuple_test_ump, read_cash
abu_result_tuple_test_ump, _ = abu.run_loop_back(read_cash,
buy_factors,
sell_factors,
# choice_symbols = symbols,
choice_symbols=None,
# stock_picks= stock_pickers,
start='2016-02-02', end='2019-09-12',
n_process_kl = 1,
n_process_pick= 1,
)
# 把运行的结果保存在本地,以便之后分析回测使用,保存回测结果数据代码如下所示
abu.store_abu_result_tuple(abu_result_tuple_test_ump, n_folds=3, store_type=EStoreAbu.E_STORE_CUSTOM_NAME,
custom_name='test_ump_cn_x25')
ABuProgress.clear_output()
AbuMetricsBase.show_general(*abu_result_tuple_test_ump).plot_max_draw_down() #, returns_cmp=True
def run_load_ump():
global abu_result_tuple_test_ump
abu_result_tuple_test_ump = abu.load_abu_result_tuple(n_folds=3, store_type=EStoreAbu.E_STORE_CUSTOM_NAME,
custom_name='test_ump_cn_x25')
AbuMetricsBase.show_general(*abu_result_tuple_test_ump).plot_max_draw_down() #, returns_cmp=True
def select_ump(select):
if select == 'run loop back ump':
run_loop_back_ump()
else:
run_load_ump()
# _ = ipywidgets.interact_manual(select_ump, select=['load test ump data','run loop back ump'])
# print("随便输出点什么表示已经执行过: ")
run_load_ump()
额,资金利用率比例提高到了39.1519%, 年化收益也提高了不止2给点,但是最大回撤从0.11 升到 0.170398。。 可怕
from warnings import simplefilter
simplefilter(action='ignore', category=FutureWarning)
simplefilter(action='ignore', category=ResourceWarning)
"""
通过一个一个迭代交易单,将交易单中的买入时刻特征传递给ump主裁决策器,让每一个主裁来决策是否进行拦截,
这样可以统计每一个主裁的拦截成功率,以及整体拦截率等
"""
order_has_result = abu_result_tuple_test.orders_pd[abu_result_tuple_test.orders_pd.result != 0]
#print(order_has_result)
# global ump_deg, ump_mul, ump_price, ump_main_deg_extend,ump_jump, ump_wave
# ump_deg = AbuUmpMainDeg(predict=True)
# ump_jump = AbuUmpMainJump(predict=True)
def apply_ml_features_ump(order, predicter, progress, need_hit_cnt):
if not isinstance(order.ml_features, dict):
import ast
# 低版本pandas dict对象取出来会成为str
ml_features = ast.literal_eval(order.ml_features)
else:
ml_features = order.ml_features
progress.show()
# 将交易单中的买入时刻特征传递给ump主裁决策器,让每一个主裁来决策是否进行拦截
return predicter.predict_kwargs(need_hit_cnt=need_hit_cnt, **ml_features)
def pararllel_func(ump_object, ump_name):
with AbuMulPidProgress(len(order_has_result), '{} complete'.format(ump_name)) as progress:
# 启动多进程进度条,对order_has_result进行apply
ump_result = order_has_result.apply(apply_ml_features_ump, axis=1, args=(ump_object, progress, 2,))
return ump_name, ump_result
# if sys.version_info > (3, 4, 0):
if False:
# python3.4以上并行处理4个主裁,每一个主裁启动一个进程进行拦截决策
parallel = Parallel( n_jobs=6, verbose=0, pre_dispatch='2*n_jobs')
out = parallel(delayed(pararllel_func)(ump_object, ump_name)
for ump_object, ump_name in zip(
[ump_deg, ump_mul, ump_price, ump_main_deg_extend, ump_jump, ump_wave],
['ump_deg', 'ump_mul', 'ump_price', 'ump_main_deg_extend','ump_jump','ump_wave']))
else:
# 3.4下由于子进程中pickle ump的内部类会找不到,所以暂时只使用一个进程一个一个的处理
out = [pararllel_func(ump_object, ump_name) for ump_object, ump_name in zip(
[ump_deg, ump_mul, ump_price, ump_main_deg_extend,ump_jump, ump_wave],
['ump_deg', 'ump_mul', 'ump_price', 'ump_main_deg_extend','ump_jump','ump_wave'])]
# 将每一个进程中的裁判的拦截决策进行汇总
for sub_out in out:
order_has_result[sub_out[0]] = sub_out[1]
print("随便输出点什么表示已经执行过: ")
""" 通过把所有主裁的决策进行相加, 如果有投票1的即会进行拦截,四个裁判整体拦截正确率统计 """
block_pd = order_has_result.filter(regex='^ump_*')
# 把所有主裁的决策进行相加
block_pd['sum_bk'] = block_pd.sum(axis=1)
block_pd['result'] = order_has_result['result']
# 有投票1的即会进行拦截
block_pd = block_pd[block_pd.sum_bk > 0]
print('6个裁判整体拦截正确率{:.2f}%'.format(block_pd[block_pd.result == -1].result.count() / block_pd.result.count() * 100))
block_pd.tail()
""" 下面统计每一个主裁的拦截正确率"""
display(HTML(' <b> 下面统计每一个主裁的拦截正确率 </b> '))
from sklearn import metrics
def sub_ump_show(block_name):
sub_block_pd = block_pd[(block_pd[block_name] == 1)]
# 如果失败就正确 -1->1 1->0
sub_block_pd.result = np.where(sub_block_pd.result == -1, 1, 0)
return metrics.accuracy_score(sub_block_pd[block_name], sub_block_pd.result) * 100, sub_block_pd.result.count()
print('角度裁判拦截正确率{:.2f}%, 拦截交易数量{}'.format(*sub_ump_show('ump_deg')))
print('角度扩展裁判拦拦截正确率{:.2f}%, 拦截交易数量{}'.format(*sub_ump_show('ump_main_deg_extend')))
print('价格裁判拦截正确率{:.2f}%, 拦截交易数量{}'.format(*sub_ump_show('ump_price')))
print('单混裁判拦截正确率{:.2f}%, 拦截交易数量{}'.format(*sub_ump_show('ump_mul')))
print('跳空缺口裁判拦截正确率{:.2f}%, 拦截交易数量{}'.format(*sub_ump_show('ump_jump')))
print('波动裁判拦截正确率{:.2f}%, 拦截交易数量{}'.format(*sub_ump_show('ump_wave')))
上面这个因为已经执行了超过12小时还没完成,就不那么好奇,不验证了。
但是下面边裁是真实的执行结果(边裁的称职验证要比主裁快很多)
""" 验证边裁是否称职 """
# 选取有交易结果的数据order_has_result
# order_has_result = abu_result_tuple_test.orders_pd[abu_result_tuple_test.orders_pd.result != 0]
def apply_ml_features_edge(order, predicter, progress):
if not isinstance(order.ml_features, dict):
import ast
# 低版本pandas dict对象取出来会成为str
ml_features = ast.literal_eval(order.ml_features)
else:
ml_features = order.ml_features
# 边裁进行裁决
progress.show()
# 将交易单中的买入时刻特征传递给ump边裁决策器,让每一个边裁来决策是否进行拦截
edge = predicter.predict(**ml_features)
return edge.value
def edge_pararllel_func(edge, edge_name):
with AbuMulPidProgress(len(order_has_result), '{} complete'.format(edge_name)) as progress:
# # 启动多进程进度条,对order_has_result进行apply
edge_result = order_has_result.apply(apply_ml_features_edge, axis=1, args=(edge, progress,))
return edge_name, edge_result
# if sys.version_info > (3, 4, 0): # 这里经常出问题,干脆单线程好了
if False:
# python3.4以上并行处理4个边裁的决策,每一个边裁启动一个进程进行拦截决策
parallel = Parallel( n_jobs=6, verbose=0, pre_dispatch='2*n_jobs')
out = parallel(delayed(edge_pararllel_func)(edge, edge_name)
for edge, edge_name in zip(
[edge_deg, edge_price, edge_mul, edge_deg_extend, edge_wave, edge_full],
['edge_deg', 'edge_price', 'edge_mul', 'edge_deg_extend','edge_wave','edge_full']))
else:
# 3.4下由于子进程中pickle ump的内部类会找不到,所以暂时只使用一个进程一个一个的处理
out = [edge_pararllel_func(edge, edge_name) for edge, edge_name in zip(
[edge_deg, edge_price, edge_mul, edge_deg_extend, edge_wave, edge_full],
['edge_deg', 'edge_price', 'edge_mul', 'edge_deg_extend','edge_wave','edge_full'])]
# 将每一个进程中的裁判的拦截决策进行汇总
for sub_out in out:
order_has_result[sub_out[0]] = sub_out[1]
print("随便输出点什么表示已经执行过: ")
block_pd = order_has_result.filter(regex='^edge_*')
"""
由于predict返回的结果中1代表win top
但是我们只需要知道loss_top,所以只保留-1, 其他1转换为0。
"""
block_pd['edge_block'] = \
np.where(np.min(block_pd, axis=1) == -1, -1, 0)
# 拿出真实的交易结果
block_pd['result'] = order_has_result['result']
# 拿出-1的结果,即判定loss_top的
block_pd = block_pd[block_pd.edge_block == -1]
print('x个裁判整体拦截正确率{:.2f}%'.format(block_pd[block_pd.result == -1].result.count() /
block_pd.result.count() * 100))
print('x个边裁拦截交易总数{}, 拦截率{:.2f}%'.format(
block_pd.shape[0],
block_pd.shape[0] / order_has_result.shape[0] * 100))
block_pd.head()
""" 单个 :"""
from sklearn import metrics
def sub_edge_show(edge_name):
sub_edge_block_pd = order_has_result[(order_has_result[edge_name] == -1)]
return metrics.accuracy_score(sub_edge_block_pd[edge_name], sub_edge_block_pd.result) * 100, sub_edge_block_pd.shape[0]
print('\r\n角度边裁拦截正确率{0:.2f}%, 拦截交易数量{1:}'.format(*sub_edge_show('edge_deg')))
print('角度扩展边裁拦截正确率{0:.2f}%, 拦截交易数量{1:}'.format(*sub_edge_show('edge_deg_extend')))
print('单混边裁拦截正确率{0:.2f}%, 拦截交易数量{1:}'.format(*sub_edge_show('edge_mul')))
print('价格边裁拦截正确率{0:.2f}%, 拦截交易数量{1:}'.format(*sub_edge_show('edge_price')))
print('Full边裁拦截正确率{0:.2f}%, 拦截交易数量{1:}'.format(*sub_edge_show('edge_full')))
print('波动边裁拦截正确率{0:.2f}%, 拦截交易数量{1:}'.format(*sub_edge_show('edge_wave')))
由于训练用的数据过少(2016-2019 3年行情 * 大概 1000只股票), 裁判还不算很厉害,有些正确率还不到60%。
# 好奇 看看裁判从哪些角度分析交易的(有哪些特征)
orders_pd_train = orders_pd_train_cn
print(orders_pd_train.columns)
orders_pd_train[0:6]