アセットマネージャーのためのファイナンス機械学習:ポートフォリオの構築 練習問題 CLA

minVarPort関数を、上限と下限の制約付きの最適化ができるCLAに置き換えて、NCO有りと無しでモンテカルトシミュレーションを行う。

上記のCLA.pyは細かなバグがあるので、修正した最終版が以下のコードになる。

#!/usr/bin/env python
# 
# I (Martin Dengler) have been given the OK to upload this code to
# github, as long as I indicate:
#
# 1.  That David H. Bailey and Marcos Lopez de Prado are the original
#     authors.
#
# 2.  That this code is provided under a GPL license for
#     non-commercial purposes.
#
# 3.  That the original authors retain the rights as it relates to
#     commercial applications.
#
# The accompanying paper was published in an open-access application:
# http://ssrn.com/abstract=2197616
#
# The original code is here:
#
# http://www.quantresearch.info/CLA.py.txt
# http://www.quantresearch.info/CLA_Main.py.txt
#
# A sample dataset can be found here:
# http://www.quantresearch.info/CLA_Data.csv.txt
#
#
# On 20130210, v0.2
# Critical Line Algorithm
# by MLdP <lopezdeprado@lbl.gov>
#
# From: http://papers.ssrn.com/sol3/papers.cfm?abstract_id=2197616
#
# "All code in this paper is provided 'as is', and contributed to the
# academic community for non-business purposes only, under a GNU-GPL
# license"
#
import numpy as np
#---------------------------------------------------------------
#---------------------------------------------------------------
class CLA:
    def __init__(self,mean,covar,lB=None,uB=None):
        # Initialize the class
        self.mean=mean
        self.covar=covar
        if(lB is None):
             self.lB=np.full_like(mean,-1.)
        else: self.lB=lB
        if(uB is None):
              self.uB=np.ones_like(mean)
        else: self.uB=uB
        self.w=[] # solution
        self.l=[] # lambdas
        self.g=[] # gammas
        self.f=[] # free weights
#---------------------------------------------------------------
    def solve(self):
        # Compute the turning points,free sets and weights
        f,w=self.initAlgo()
        self.w.append(np.copy(w)) # store solution
        self.l.append(None)
        self.g.append(None)
        self.f.append(f[:])
        while True:
            #1) case a): Bound one free weight
            l_in=None
            if len(f)>1:
                covarF,covarFB,meanF,wB=self.getMatrices(f)
                covarF_inv=np.linalg.inv(covarF)
                j=0
                for i in f:
                    l,bi=self.computeLambda(covarF_inv,covarFB,meanF,wB,j,[self.lB[i],self.uB[i]])
                    if l_in==None :l_in,i_in,bi_in=l,i,bi
                    elif l>l_in:l_in,i_in,bi_in=l,i,bi
                    j+=1
            #2) case b): Free one bounded weight
            l_out=None
            if len(f)<self.mean.shape[0]:
                b=self.getB(f)
                for i in b:
                    covarF,covarFB,meanF,wB=self.getMatrices(f+[i])
                    covarF_inv=np.linalg.inv(covarF)
                    l,bi=self.computeLambda(covarF_inv,covarFB,meanF,wB,meanF.shape[0]-1,self.w[-1][i])
                    if (self.l[-1]==None or l<self.l[-1]):
                        if(l_out==None): l_out,i_out = l,i
                        elif l>l_out: l_out,i_out=l,i
            if (l_in==None or l_in<0) and (l_out==None or l_out<0):
                #3) compute minimum variance solution
                self.l.append(0)
                covarF,covarFB,meanF,wB=self.getMatrices(f)
                meanF=np.zeros(meanF.shape)
            else:
                #4) decide lambda
                if (l_in is not None):
                    if (l_out==None) or (l_in > l_out):
                        self.l.append(l_in)
                        f.remove(i_in)
                        covarF,covarFB,meanF,wB=self.getMatrices(f)
                        w[i_in]=bi_in # set value at the correct boundary
                        b=self.getB(f)
                        wB[b.index(i_in)]=bi_in
                    else:
                        self.l.append(l_out)
                        f.append(i_out)
                        covarF,covarFB,meanF,wB=self.getMatrices(f)
                else:
                    self.l.append(l_out)
                    f.append(i_out)
                    covarF,covarFB,meanF,wB=self.getMatrices(f)
                covarF_inv=np.linalg.inv(covarF)
            #5) compute solution vector
            wF,g=self.computeW(covarF_inv,covarFB,meanF,wB)
            for i in range(len(f)):w[f[i]]=wF[i]
            self.w.append(np.copy(w)) # store solution
            self.g.append(g)
            self.f.append(f[:])
            if self.l[-1]==0:break
        #6) Purge turning points
        self.purgeNumErr(10e-10)
        self.purgeExcess()
#---------------------------------------------------------------
    def initAlgo(self):
        # Initialize the algo
        #1) Form structured array
        a=np.zeros((self.mean.shape[0]),dtype=[('id',int),('mu',float)])
        b=[self.mean[i][0] for i in range(self.mean.shape[0])] # dump array into list
        a[:]=list(zip(range(self.mean.shape[0]),b)) # fill structured array
        #2) Sort structured array
        b=np.sort(a,order='mu')
        #3) First free weight
        i,w=b.shape[0],np.copy(self.lB)
        while sum(w)<1:
            i-=1
            w[b[i][0]]=self.uB[b[i][0]]
        w[b[i][0]]+=1-sum(w)
        return [b[i][0]],w
#---------------------------------------------------------------
    def computeBi(self,c,bi):
        if c>0:
            bi=bi[1][0]
        if c<0:
            bi=bi[0][0]
        return bi
#---------------------------------------------------------------
    def computeW(self,covarF_inv,covarFB,meanF,wB):
        #1) compute gamma
        onesF=np.ones(meanF.shape)
        g1=np.dot(np.dot(onesF.T,covarF_inv),meanF)
        g2=np.dot(np.dot(onesF.T,covarF_inv),onesF)
        if wB is None:
            g,w1=float(-self.l[-1]*g1/g2+1/g2),0
        else:
            onesB=np.ones(wB.shape)
            g3=np.dot(onesB.T,wB)
            g4=np.dot(covarF_inv,covarFB)
            w1=np.dot(g4,wB)
            g4=np.dot(onesF.T,w1)
            g=float(-self.l[-1]*g1/g2+(1-g3+g4)/g2)
        #2) compute weights
        w2=np.dot(covarF_inv,onesF)
        w3=np.dot(covarF_inv,meanF)
        return -w1+g*w2+self.l[-1]*w3,g
#---------------------------------------------------------------
    def computeLambda(self,covarF_inv,covarFB,meanF,wB,i,bi):
        #1) C
        onesF=np.ones(meanF.shape)
        c1=np.dot(np.dot(onesF.T,covarF_inv),onesF)
        c2=np.dot(covarF_inv,meanF)
        c3=np.dot(np.dot(onesF.T,covarF_inv),meanF)
        c4=np.dot(covarF_inv,onesF)
        c=-c1*c2[i]+c3*c4[i]
        if c==0:return
        #2) bi
        if type(bi)==list:bi=self.computeBi(c,bi)
        #3) Lambda
        if wB is None:
            # All free assets
            return float((c4[i]-c1*bi)/c),bi
        else:
            onesB=np.ones(wB.shape)
            l1=np.dot(onesB.T,wB)
            l2=np.dot(covarF_inv,covarFB)
            l3=np.dot(l2,wB)
            l2=np.dot(onesF.T,l3)
            return float(((1-l1+l2)*c4[i]-c1*(bi+l3[i]))/c),bi
#---------------------------------------------------------------
    def getMatrices(self,f):
        # Slice covarF,covarFB,covarB,meanF,meanB,wF,wB
        covarF=self.reduceMatrix(self.covar,f,f)
        meanF=self.reduceMatrix(self.mean,f,[0])
        b=self.getB(f)
        covarFB=self.reduceMatrix(self.covar,f,b)
        wB=self.reduceMatrix(self.w[-1],b,[0])
        return covarF,covarFB,meanF,wB
#---------------------------------------------------------------
    def getB(self,f):
        return self.diffLists(range(self.mean.shape[0]),f)
#---------------------------------------------------------------
    def diffLists(self,list1,list2):
        return list(set(list1)-set(list2))
#---------------------------------------------------------------
    def reduceMatrix(self,matrix,listX,listY):
        # Reduce a matrix to the provided list of rows and columns
        if len(listX)==0 or len(listY)==0:return
        matrix_=matrix[:,listY[0]:listY[0]+1]
        for i in listY[1:]:
            a=matrix[:,i:i+1]
            matrix_=np.append(matrix_,a,1)
        matrix__=matrix_[listX[0]:listX[0]+1,:]
        for i in listX[1:]:
            a=matrix_[i:i+1,:]
            matrix__=np.append(matrix__,a,0)
        return matrix__
#---------------------------------------------------------------
    def purgeNumErr(self,tol):
        # Purge violations of inequality constraints (associated with ill-conditioned covar matrix)
        i=0
        while True:
            if i==len(self.w):break
            w=self.w[i]
            for j in range(w.shape[0]):
                if w[j]-self.lB[j]<-tol or w[j]-self.uB[j]>tol:
                    del self.w[i]
                    del self.l[i]
                    del self.g[i]
                    del self.f[i]
                    break
            i+=1
#---------------------------------------------------------------

NCOからこのCLAを呼ぶコードは以下の通りになる。上限は全ての資産で$${+1.}$$、下限を$${-1.}$$とし、空売りを許した。

import numpy as np
import pandas as pd
import MarPat as MP
import ONC as NC
def CLA_nco(cov, mu, lB=None, uB=None, maxNumClusters=None,minVarPortf=True):
    import CLA

    cov = pd.DataFrame(cov)
    mu = pd.Series(mu[:,0])

    corr1 = MP.cov2corr(cov)
    corr1, clstrs, _ = NC.clusterKMeansBase(corr1, maxNumClusters, n_init=10)
    wIntra= pd.DataFrame(0, index=cov.index, columns=clstrs.keys(),dtype='float64')
    lB_=None
    uB_=None
    for i in clstrs:
        cov_ = cov.loc[clstrs[i], clstrs[i]].values
        mu_ = mu.loc[clstrs[i]].values.reshape(-1,1)
        if(lB is not None): lB_ = lB.loc[clstrs[i]].values.reshape(-1,1)
        if(uB is not None): uB_ = uB.loc[clstrs[i]].values.reshape(-1,1)
        cla=CLA.CLA(mu_,cov_,lB_,uB_)
        cla.solve()
        if(minVarPortf==True):
            wIntra.loc[clstrs[i], i] = cla.w[-1].flatten()
        else :
            max_sr, max_w_sr = cla.getMaxSR()
            wIntra.loc[clstrs[i], i] = max_w_sr.flatten()

    cov_= wIntra.T.dot(np.dot(cov, wIntra))
    mu_ = wIntra.T.dot(mu)
    mu_ = mu_.values.reshape(-1,1)
    if(lB is not None):
          lB_ = wIntra.T.dot(lB)
          lB_ = lB_.values.reshape(-1,1)
    if(uB is not None):
          uB_ = wIntra.T.dot(uB)
          uB_ = uB_.values.reshape(-1,1)
    cla=CLA.CLA(mu_,cov_.values,lB_,uB_)
    cla.solve()
    if(minVarPortf==True):
        wInter= pd.Series(cla.w[-1].flatten(), index=cov_.index)
    else:
        max_sr, max_w_sr = cla.getMaxSR()
        wInter= pd.Series(max_w_sr.flatten(), index=cov_.index)

    nco = wIntra.mul(wInter, axis=1).sum(axis=1).values.reshape(-1,1)
    return nco

def CLAExp(mu0,cov0,lB=None, uB=None,
            nObs=1000, nSims=1000 , minVarPortf = True,shrink=False):

    w1 = pd.DataFrame(0, index=range(0, nSims),
                  columns=range(0, cov0.shape[0]),dtype='float64')
    w1_d = pd.DataFrame(0, index=range(0, nSims),
                    columns=range(0, cov0.shape[0]),dtype='float64')
    for i in range(0, nSims):
        mu1, cov1 = MP.simCovMu(mu0, cov0, nObs, shrink=shrink)

        cla=CLA.CLA(mu1,cov1,lB,uB)
        cla.solve()
        if(minVarPortf == True):
            w1.loc[i] = cla.w[-1].flatten()
        else:
            max_sr, max_w_sr = cla.getMaxSR()
            w1.loc[i] = max_w_sr.flatten()
        w1_d.loc[i] = CLA_nco(cov1, mu1, lB, uB,int(cov1.shape[0]/2),minVarPortf).flatten()
    return w1, w1_d

def main():

    nBlocks, bSize, bCorr = 10, 5, .5
    q=10.0

    np.random.seed(0)
    mu0, cov0 = MP.formTrueMatrix(nBlocks, bSize, bCorr)

    minVarPortf=True

    w1, w1_d=CLAExp(mu0=mu0,cov0=cov0,minVarPortf = minVarPortf)
    cla=CLA.CLA(mu0,cov0.values)
    cla.solve()
    w0 = cla.w[-1]
    w0 = np.repeat(w0.T, w1.shape[0], axis=0)

    print('minVar Markowitz RMSE: ',
          round(np.mean((w1-w0).values.flatten()**2)**.5,4))
    print('minVar NCO RMSE:',
          round(np.mean((w1_d-w0).values.flatten()**2)**.5,4))


    minVarPortf=False

    w1, w1_d=CLAExp(mu0=mu0,cov0=cov0,minVarPortf = minVarPortf)

    max_sr, w0 = cla.getMaxSR()
    w0 = np.repeat(w0.T, w1.shape[0], axis=0)

    print('maxSR Markowitz  RMSE: ',
          round(np.mean((w1-w0).values.flatten()**2)**.5,4))
    print('maxSR NCO RMSE:',
          round(np.mean((w1_d-w0).values.flatten()**2)**.5,4))

if __name__ == '__main__':
    main()

このコードの結果は以下のようになった。

CLAを用いたNCOからのポートフォリオ


この記事が気に入ったらサポートをしてみませんか?