Last active
September 16, 2022 14:07
-
-
Save robcarver17/5204ea7b1a7d5723da0b01b8ba413e72 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#import matplotlib | |
#matplotlib.use("TkAgg") | |
import matplotlib.pyplot as plt | |
from scipy.optimize import minimize | |
import numpy as np | |
### Following is the optimisation code: | |
### First the main function. | |
def optimise_with_sigma(sigma, mean_list): | |
""" | |
Given mean and covariance matrix, find portfolio weights that maximise SR | |
We assume we can have as much leverage as we like to hit a given risk target, hence we can | |
constrain weights to sum to 1 i.e. no cash option | |
:param sigma: np.array NxN, covariance matrix | |
:param mean_list: list of floats N in length, means | |
:return: list of weights | |
""" | |
mus = np.array(mean_list, ndmin=2).transpose() | |
number_assets = sigma.shape[1] | |
# Starting weights, equal weighting | |
start_weights = [1.0 / number_assets] * number_assets | |
# Set up constraints - positive weights, adding to 1.0 | |
bounds = [(0.0, 1.0)] * number_assets | |
cdict = [{'type': 'eq', 'fun': addem}] | |
ans = minimize( | |
neg_sharpe_ratio, | |
start_weights, (sigma, mus), | |
method='SLSQP', | |
bounds=bounds, | |
constraints=cdict, | |
tol=0.00001) | |
weights = ans['x'] | |
return weights | |
## This is the function we optimise over | |
def neg_sharpe_ratio(weights, sigma, mus): | |
""" | |
Returns minus return /risk for a portfolio | |
:param weights: list of floats N in length | |
:param sigma: np.array NxN, covariance matrix | |
:param mus: np.array Nx1 in length, means | |
:return: float | |
""" | |
weights = np.matrix(weights) | |
estreturn = (weights * mus)[0, 0] | |
std_dev = (variance(weights, sigma)**.5) | |
# Returns minus the portfolio Sharpe Ratio (as we're minimising) | |
return -estreturn / std_dev | |
## Portfolio standard deviation and variance given weights and covariance matrix sigma | |
def portfolio_stdev(weights, sigma): | |
std_dev = (variance(weights, sigma)**.5) | |
return std_dev | |
def variance(weights, sigma): | |
# returns the variance (NOT standard deviation) given weights and sigma | |
return (weights * sigma * weights.transpose())[0, 0] | |
def addem(weights): | |
# Used for constraints, weights must sum to 1 | |
return 1.0-sum(weights) | |
# Useful for working in standard deviation and correlation space | |
# rather than covariance space | |
def optimise_with_corr_and_std( mean_list,stdev_list, corrmatrix): | |
""" | |
Given mean, standard deviation and correlation matrix, find portfolio weights that maximise SR | |
:param mean_list: list of N floats | |
:param stdev_list: list of N floats | |
:param corrmatrix: NxN np.array | |
:return: list of floats | |
""" | |
sigma = sigma_from_corr_and_std(stdev_list, corrmatrix) | |
weights = optimise_with_sigma(sigma, mean_list) | |
return weights | |
def sigma_from_corr_and_std(stdev_list, corrmatrix): | |
""" | |
Translates standard deviation and correlation matrix into covariance matrix | |
:param stdev_list: list of float | |
:param corrmatrix: NxN np.array | |
:return: covariance matrix, NxN np.array | |
""" | |
stdev = np.array(stdev_list, ndmin=2).transpose() | |
sigma = stdev * corrmatrix * stdev | |
return sigma | |
### Do all the examples in the slides | |
optimise_with_corr_and_std([0.04,0.04,0.04], [.08,.08,0.08], np.array([[1,.9,.9],[.9,1,.9],[.9,.9,1]])) | |
optimise_with_corr_and_std( [0.04,0.04,0.04], [.08,.08,0.08],np.array([[1,0.0,0.0],[0.0,1,0.0],[0.0,0.0,1]])) | |
optimise_with_corr_and_std( [0.04,0.04,0.04],[.08,.08,0.08], np.array([[1,0.7,0.0],[0.7,1,0.0],[0.0,0.0,1]])) | |
optimise_with_corr_and_std([.04,.04,0.04], [0.08,0.08,0.12], np.array([[1,0.0,0.0],[0.0,1,0.0],[0.0,0.0,1]])) | |
optimise_with_corr_and_std([.04,.04,0.04], [0.08,0.08,0.085], np.array([[1,.9,.9],[.9,1,.9],[.9,.9,1]])) | |
optimise_with_corr_and_std([.04,.04,0.045], [0.08,0.08,0.08], np.array([[1,.9,.9],[.9,1,.9],[.9,.9,1]])) | |
""" This code is used by me to get the data from my trading system. You can ignore it | |
from systems.provided.futures_chapter15.basesystem import futures_system | |
system = futures_system() | |
data=dict([(code,system.rawdata.get_percentage_returns(code)) for code in ["SP500", "US10", "US5"]]) | |
import pandas as pd | |
data = pd.concat(data, axis=1) | |
data = data.resample("7D").sum() | |
data = data[pd.datetime(1998,1,1):] | |
""" | |
import pandas as pd | |
data = pd.read_csv("returns.csv") | |
WEEKS_IN_YEAR = 365.25/7 | |
## Some functions to analyse data and return various statistics | |
## Assumes weekly returns | |
def annual_returns_from_weekly_data(data): | |
return data.mean()*WEEKS_IN_YEAR | |
def annual_stdev_from_weekly_data(data): | |
return data.std()*(WEEKS_IN_YEAR**.5) | |
def sharpe_ratio(data): | |
SR = annual_returns_from_weekly_data(data)/annual_stdev_from_weekly_data(data) | |
return SR | |
def optimise_with_data(data): | |
""" | |
Given some data, find the optimal SR portfolio | |
:param data: np.DataFrame containing N columns | |
:return: list of N floats | |
""" | |
mean_list = annual_returns_from_weekly_data(data).values | |
stdev_list = annual_stdev_from_weekly_data(data).values | |
corrmatrix = data.corr().values | |
weights = optimise_with_corr_and_std(mean_list, stdev_list, corrmatrix) | |
return weights | |
## Bootstrapping code | |
import random | |
## Do optimisation with some random data | |
def optimisation_with_random_bootstrap(data): | |
bootstrapped_data = get_bootstrap_series(data) | |
weights = optimise_with_data(bootstrapped_data) | |
return weights | |
## Get some random data | |
def get_bootstrap_series(data): | |
length_of_series = len(data.index) | |
random_indices = [int(random.uniform(0,length_of_series)) for _unused in range(length_of_series)] | |
bootstrap_data = data.iloc[random_indices] | |
return bootstrap_data | |
def bootstrap_optimisation_distributions(data, monte_count=1000): | |
""" | |
Bootstrap over data, returning distribution of portfolio weights | |
:param data: np.DataFrame containing N columns | |
:param monte_count: int, how many bootstraps to do | |
:return: list of N floats | |
""" | |
dist_of_weights = [] | |
for i in range(monte_count): | |
single_bootstrap_weights = optimisation_with_random_bootstrap(data) | |
dist_of_weights.append(single_bootstrap_weights) | |
dist_of_weights = pd.DataFrame(dist_of_weights) | |
dist_of_weights.columns = data.columns | |
return dist_of_weights | |
## Generate data for the slides | |
weight_distr = bootstrap_optimisation_distributions(data, monte_count=1000) | |
plt.hist(weight_distr.SP500, bins=100) | |
plt.hist(weight_distr.US10, bins=100) | |
plt.hist(weight_distr.US5, bins=100) | |
## Now for the plots showing the sensitivity of weights to a given factor, eg Sharpe Ratio | |
## First we need a function that allows us to estimate correlations for a given 'code' eg SP500/US10 | |
def correlation_for_code(data): | |
""" | |
Calculate correlation matrix and return as dict | |
:param data: pd.DataFrame with column names eg xyz... | |
:return: dict with keys eg x/y, y/z, x/z, y/x, z/y, z/x ... containing correlations | |
""" | |
corr = data.corr() | |
instruments = data.columns | |
results_dict = {} | |
for instr1 in instruments: | |
for instr2 in instruments: | |
code = join_corr_code(instr1, instr2) | |
results_dict[code] = corr.loc[instr1, instr2] | |
return results_dict | |
def split_corr_code(code): | |
split_code = code.split("/") | |
instr1 = split_code[0] | |
instr2 = split_code[1] | |
return instr1, instr2 | |
def join_corr_code(instr1, instr2): | |
return '%s/%s' % (instr1, instr2) | |
## Now we have functions that allow us to find the factor distribution for any factor | |
func_dict = dict(sharpe=sharpe_ratio, stdev = annual_stdev_from_weekly_data, | |
corr = correlation_for_code) | |
factor_list = list(func_dict.keys()) | |
def plot_changeling(code, factor,data): | |
""" | |
Top level function which gets the data, plots and analyses it | |
:param code: str. Must be a column name in data or 'A/B' if correlations are used | |
:param factor: str. Must be in factor_list | |
:param data: pd.DataFrame | |
:return: None | |
""" | |
assert factor in factor_list | |
# Get the data | |
factor_distribution, weights_data = changeling_graph_data(code, factor, data) | |
# Now make two plots | |
fig, (ax1, ax2) = plt.subplots(2) | |
fig.suptitle("Distribution of %s for %s, average %.2f" % (factor, code, np.mean(factor_distribution))) | |
ax1.hist(factor_distribution, bins=50) | |
weights_data.plot(ax=ax2) | |
# Now analyse | |
analyse_changeling_results(code, factor, factor_distribution, data) | |
def changeling_graph_data(code, factor, data): | |
""" | |
Get the data required for the graphing; a factor distribution and the matched weights | |
:param code: str. Must be a column name in data or 'A/B' if correlations are used | |
:param factor: str. Must be in factor_list | |
:param data: pd.DataFrame | |
:return: tuple: factor_distribution and weights | |
""" | |
factor_distribution = get_factor_distribution(code, factor, data) | |
weights_data = get_weights_data(code, factor, data, factor_distribution) | |
return factor_distribution, weights_data | |
def get_factor_distribution(code, factor, data, monte_length=10000): | |
""" | |
Get the bootstrapped distribution of a given factor value | |
:param code: str. Must be a column name in data or 'A/B' if correlations are used | |
:param factor: str. Must be in factor_list | |
:param data: pd.DataFrame | |
:param monte_length: int, how many monte carlo runs to do | |
:return: list of float | |
""" | |
factor_func = func_dict[factor] | |
factor_distr = [] | |
for _not_used in range(monte_length): | |
bootstrap_data = get_bootstrap_series(data) | |
factor_estimate = factor_func(bootstrap_data) | |
factor_estimate_for_code = factor_estimate[code] | |
factor_distr.append(factor_estimate_for_code) | |
# note we drop the outlying values which are probably extreme | |
factor_distr.sort() | |
drop_values = int(monte_length*.01) | |
factor_distr = factor_distr[drop_values:-drop_values] | |
return factor_distr | |
def get_weights_data(code, factor, data, factor_distribution, points_to_use=100): | |
""" | |
Optimise weights, using the statistics in data, but replacing the estimate of factor for code | |
by various points on the factor distribution. | |
:param code: str. Must be a column name in data or 'A/B' if correlations are used | |
:param factor: str. Must be in factor_list | |
:param data: pd.DataFrame | |
:param factor_distribution: list of float | |
:param points_to_use: int, how many points on the factor distribution | |
:return: pd.DataFrame of weights, same columns as data, length is points_to_use | |
""" | |
factor_range = [np.min(factor_distribution), np.max(factor_distribution)] | |
factor_step = (factor_range[1] - factor_range[0])/points_to_use | |
factor_values_to_test = np.arange(start = factor_range[0], stop = factor_range[1], step = factor_step) | |
weight_results = [] | |
for factor_value_to_use in factor_values_to_test: | |
weights = optimise_with_replaced_factor_value(data, code, factor, factor_value_to_use) | |
weight_results.append(weights) | |
weight_results = pd.DataFrame(weight_results) | |
weight_results.columns = data.columns | |
return weight_results | |
def optimise_with_replaced_factor_value(data, code, factor, factor_value_to_use): | |
""" | |
Optimise weights, using the statistics in data, but replacing the estimate of factor for code | |
by factor_value_to_use | |
:param code: str. Must be a column name in data or 'A/B' if correlations are used | |
:param factor: str. Must be in factor_list | |
:param data: pd.DataFrame | |
:param factor_value_to_use: float | |
:return: list of floats, weights | |
""" | |
mean_list, stdev_list, corrmatrix = replace_factor_value(data, code, factor, factor_value_to_use) | |
weights = optimise_with_corr_and_std(mean_list, stdev_list, corrmatrix) | |
return weights | |
def replace_factor_value(data, code, factor, factor_value_to_use): | |
""" | |
Estimate statistics from data, but replacing the estimate of factor for code | |
by factor_value_to_use | |
:param code: str. Must be a column name in data or 'A/B' if correlations are used | |
:param factor: str. Must be in factor_list | |
:param data: pd.DataFrame | |
:param factor_value_to_use: float | |
:return: 3 tuple mean_list, stdev_list, corrmatrix | |
""" | |
# First standard deviation | |
stdev_list = annual_stdev_from_weekly_data(data) | |
if factor=="stdev": | |
stdev_list[code] = factor_value_to_use | |
stdev_list = stdev_list.values | |
# Next Sharpe Ratio | |
sharpe_list = sharpe_ratio(data) | |
if factor=="sharpe": | |
sharpe_list[code] = factor_value_to_use | |
sharpe_list = sharpe_list.values | |
# We don't derive mean directly instead we derive it from Sharpe Ratios | |
mean_list = stdev_list * sharpe_list | |
# Now correlations. Needs a fancy function | |
corrmatrix = data.corr() | |
if factor=="corr": | |
corrmatrix = replace_corr_value(corrmatrix, code, factor_value_to_use) | |
corrmatrix = corrmatrix.values | |
return mean_list, stdev_list, corrmatrix | |
def replace_corr_value(corrmatrix, code, factor_value_to_use): | |
""" | |
Given a correlation matrix, replace one value (actually two because of symettry) | |
:param corrmatrix: NxN matrix with labels | |
:param code: str 'A/B' where A and N are labels of correlation matrix | |
:param factor_value_to_use: float, to replace A/B and B/A correlation with | |
:return: new NxN correlation matrix | |
""" | |
instr1, instr2 = split_corr_code(code) | |
corrmatrix.loc[instr1, instr2] = factor_value_to_use | |
corrmatrix.loc[instr2, instr1] = factor_value_to_use | |
return corrmatrix | |
def analyse_changeling_results(code, factor, factor_distribution, data): | |
""" | |
Prints some analysis of factor_distribution | |
:param code: str. Must be a column name in data or 'A/B' if correlations are used | |
:param factor: str. Must be in factor_list | |
:param factor_distribution: list of float | |
:return: None, just prints | |
""" | |
factor5 = np.percentile(factor_distribution, 5) | |
factor95 = np.percentile(factor_distribution, 95) | |
print("There is a 90%% chance that %s for %s was between %.2f and %.2f" % (factor, code, factor5, factor95)) | |
weights5 = optimise_with_replaced_factor_value(data, code, factor, factor5) | |
weights95 = optimise_with_replaced_factor_value(data, code, factor, factor95) | |
## Get the weights we need for code | |
## For correlations return the weights of the first item in the pair 'A/B' | |
if factor=="corr": | |
code = split_corr_code(code)[0] | |
# Weights are list not dict, so need to know position of right weight | |
instruments = list(data.columns) | |
code_index = instruments.index(code) | |
weight5_code = weights5[code_index] | |
weight95_code = weights95[code_index] | |
print("Giving weights for %s between %.3f and %.3f" % (code, weight5_code, weight95_code)) | |
# Now do the plots in the slides | |
plot_changeling("SP500", "sharpe", data) | |
plot_changeling("US10", "sharpe", data) | |
plot_changeling("US5", "sharpe", data) | |
plot_changeling("SP500", "stdev", data) | |
plot_changeling("US10", "stdev", data) | |
plot_changeling("US5", "stdev", data) | |
plot_changeling("SP500/US10", "corr", data) | |
plot_changeling("SP500/US5", "corr", data) | |
plot_changeling("US5/US10", "corr", data) | |
## Let's do optimisation better! | |
## 1) Bootstrapped weights | |
weight_distr = bootstrap_optimisation_distributions(data, monte_count=1000) | |
weight_distr.mean() | |
## 2) Set some values to be identical | |
def optimise_with_identical_values(data, identical_SR=False, identical_stdev=False, identical_corr=False): | |
if identical_stdev: | |
stdev_list = get_identical_stdev(data) | |
else: | |
stdev_list = annual_stdev_from_weekly_data(data).values | |
if identical_SR: | |
## We want means, but derive them from SR based on the standard deviations we have | |
mean_list = get_means_assuming_identical_SR(data, stdev_list) | |
else: | |
mean_list = annual_returns_from_weekly_data(data).values | |
if identical_corr: | |
corr_matrix = get_identical_corr(data) | |
else: | |
corr_matrix = data.corr().values | |
weights = optimise_with_corr_and_std(mean_list, stdev_list, corr_matrix) | |
return weights | |
def get_identical_corr(data): | |
## Returns a boring correlation matrix whose off diagonal elements are equal to the average correlation | |
instrument_count = len(data.columns) | |
estimated_corr = data.corr().values | |
avg_corr = get_avg_corr(estimated_corr) | |
corrmatrix = boring_corr_matrix(instrument_count, offdiag=avg_corr) | |
return corrmatrix | |
def get_identical_stdev(data): | |
## Returns a vector of standard deviations whose elements are the average standard deviation in the data | |
estimated_stdev = annual_stdev_from_weekly_data(data) | |
instrument_count = len(data.columns) | |
average_stdev = estimated_stdev.mean() | |
stdev_list = [average_stdev]*instrument_count | |
return stdev_list | |
def get_means_assuming_identical_SR(data, using_stdev): | |
## Returns a vector of means assuming equal SR, identical to the average in the data | |
# and using the standard deviations provided | |
average_SR = sharpe_ratio(data).mean() | |
mean_list = [stdev*average_SR for stdev in using_stdev] | |
return mean_list | |
from copy import copy | |
def boring_corr_matrix(size, offdiag=0.99, diag=1.0): | |
""" | |
Create a boring correlation matrix | |
:param size: dimensions | |
:param offdiag: value to put in off diagonal | |
:param diag: value to put in diagonal | |
:return: np.array 2 dimensions, size | |
""" | |
size_index = range(size) | |
def _od(i, j, offdiag, diag): | |
if i == j: | |
return diag | |
else: | |
return offdiag | |
m = [[_od(i, j, offdiag, diag) for i in size_index] for j in size_index] | |
m = np.array(m) | |
return m | |
def get_avg_corr(sigma): | |
## Get the average correlation value in a correlation matrix | |
new_sigma = copy(sigma) | |
np.fill_diagonal(new_sigma, np.nan) | |
if np.all(np.isnan(new_sigma)): | |
return np.nan | |
avg_corr = np.nanmean(new_sigma) | |
return avg_corr | |
## 3) Bayesian optimisation | |
def optimise_with_shrinkage_parameters(data, SR_shrinkage=0.0, corr_shrinkage=0.0, stdev_shrinkage=0.0): | |
""" | |
Optimise using a mixture of data derived | |
:param data: pd.DataFrame | |
:param SR_shrinkage: float, 0= no shrinkage just use the data, 1=just use the prior (identical, average values) | |
:param corr_shrinkage: float | |
:param stdev_shrinkage: float | |
:return: list of floats, weights | |
""" | |
prior_stdev_list = np.array(get_identical_stdev(data)) | |
estimated_stdev_list = annual_stdev_from_weekly_data(data).values | |
stdev_list = prior_stdev_list*stdev_shrinkage + estimated_stdev_list*(1-stdev_shrinkage) | |
prior_mean_list = np.array(get_means_assuming_identical_SR(data, stdev_list)) | |
estimated_mean_list = annual_returns_from_weekly_data(data).values | |
mean_list = prior_mean_list*SR_shrinkage + estimated_mean_list*(1-SR_shrinkage) | |
prior_corr_matrix = get_identical_corr(data) | |
estimated_corr_matrix = data.corr().values | |
corr_matrix = prior_corr_matrix*corr_shrinkage + estimated_corr_matrix*(1-corr_shrinkage) | |
weights = optimise_with_corr_and_std(mean_list, stdev_list, corr_matrix) | |
return weights |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment