# coding: utf-8
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from __future__ import division
# 获取数据函数
def get_stock_data(stock_code, index_code, start_date, end_date):
"""
:param stock_code: 股票代码,例如‘sz000002’
:param index_code: 指数代码,例如‘sh000001’
:param start_date: 回测开始日期,例如‘1991-1-30'
python index函数 :param end_date: 回测结束日期,例如‘2015-12-31’
:return: 函数返回其他函数的各参数序列
"""
# 此处为存放csv文件的本地路径,请自行改正地址.注意windows和mac系统,斜杠的方向不一样
stock_data = pd.read_csv(r'G:\财通实习\历史日线数据_样本%282013 2014年数据%292\all_trading_data\stock data\sh600000.csv', parse_dates=['date'])
benchmark = pd.read_csv(r'G:\财通实习\历史日线数据_样本%282013 2014年数据%292\all_trading_data\index data\sh000001.csv', parse_dates=['date'])
date = pd.date_range(2016-01-01,2016-03-15) # 生成日期序列
# 选取在日期范围内的股票数据序列并按日期排序
stock_data = stock_data.ix[stock_data['date'].isin(date), ['date', 'change', 'adjust_price']]
stock_data = stock_data.ix[stock_data['date'].isin(date), ['date', 'change', 'adjust_price']]
# 选取在日期范围内的指数数据序列并按日期排序
date_list = list(stock_data['date'])
benchmark = benchmark.ix[benchmark['date'].isin(date_list), ['date', 'change', 'close']]
benchmark.sort_values(by='date', inplace=True)
benchmark.set_index('date', inplace=True)
# 将回测要用到的各个数据序列转成list格式
date_line = list(benchmark.index.strftime('%Y-%m-%d')) # 日期序列
capital_line = list(stock_data['adjust_price']) # 账户价值序列
return_line = list(stock_data['change']) # 收益率序列
indexreturn_line = list(benchmark['change']) # 指数的变化率序列
index_line = list(benchmark['close']) # 指数序列
return date_line, capital_line, return_line, index_line, indexreturn_line
# 计算年化收益率函数
def annual_return(date_line, capital_line):
"""
:param date_line: 日期序列
:param capital_line: 账户价值序列
:return: 输出在回测期间的年化收益率
"""
# 将数据序列合并成dataframe并按日期排序
df = pd.DataFrame({'date': date_line, 'capital': capital_line})
df.sort_values(by='date', inplace=True)
df.reset_index(drop=True, inplace=True)
rng = pd.period_range(df['date'].iloc[0], df['date'].iloc[-1], freq='D')
# 计算年化收益率
annual = pow(df.ix[len(df.index) - 1, 'capital'] / df.ix[0, 'capital'], 250 / len(rng)) - 1
print '年化收益率为:%f' % annual
# 计算最大回撤函数
def max_drawdown(date_line, capital_line):
"""
:param date_line: 日期序列
:param capital_line: 账户价值序列
:return: 输出最大回撤及开始日期和结束日期
"""
# 将数据序列合并为一个dataframe并按日期排序
df = pd.DataFrame({'date': date_line, 'capital': capital_line})
df.sort_values(by='date', inplace=True)
df.reset_index(drop=True, inplace=True)
df['max2here'] = pd.expanding_max(df['capital']) # 计算当日之前的账户最大价值
df['dd2here'] = df['capital'] / df['max2here'] - 1 # 计算当日的回撤
# 计算最大回撤和结束时间
temp = df.sort_values(by='dd2here').iloc[0][['date', 'dd2here']]
max_dd = temp['dd2here']
end_date = temp['date']
# 计算开始时间
df = df[df['date'] <= end_date]
start_date = df.sort_values(by='capital', ascending=False).iloc[0]['date']
print '最大回撤为:%f, 开始日期:%s, 结束日期:%s' % (max_dd, start_date, end_date)
# 计算平均涨幅
def average_change(date_line, return_line):
"""
:param date_line: 日期序列
:param return_line: 账户日收益率序列
:return: 输出平均涨幅
"""
df = pd.DataFrame({'date': date_line, 'rtn': return_line})
ave = df['rtn'].mean()
print '平均涨幅为:%f' % ave
# 计算上涨概率
def prob_up(date_line, return_line):
"""
:param date_line: 日期序列
:param return_line: 账户日收益率序列
:return: 输出上涨概率
"""
df = pd.DataFrame({'date': date_line, 'rtn': return_line})
df.ix[df['rtn'] > 0, 'rtn'] = 1 # 收益率大于0的记为1
df.ix[df['rtn'] <= 0, 'rtn'] = 0 # 收益率小于等于0的记为0
# 统计1和0各出现的次数
count = df['rtn'].value_counts()
p_up = count.loc[1] / len(df.index)
print '上涨概率为:%f' % p_up
# 计算最大连续上涨天数和最大连续下跌天数
def max_successive_up(date_line, return_line):
"""
:param date_line: 日期序列
:param return_line: 账户日收益率序列
:return: 输出最大连续上涨天数和最大连续下跌天数
"""
df = pd.DataFrame({'date': date_line, 'rtn': return_line})
# 新建一个全为空值的series,并作为dataframe新的一列
s = pd.Series(np.nan, index=df.index)
s.name = 'up'
df = pd.concat([df, s], axis=1)
# 当收益率大于0时,up取1,小于0时,up取0,等于0时采用前向差值
df.ix[df['rtn'] > 0, 'up'] = 1
df.ix[df['rtn'] < 0, 'up'] = 0
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论