Clitical-Line Algorithm for Portfolio Optimization: An Open-Source Code 1

CLAのpythonコードは、上記二番目のgitHubからダウンロードできる。
CLAのインスタンスは以下のように、各資産の期待リターンベクトル、共分散行列、下限値ベクトル、上限値ベクトルを引数として渡して行う。

class CLA:
    def __init__(self,mean,covar,lB,uB):
        # Initialize the class
        self.mean=mean
        self.covar=covar
        self.lB=lB
        self.uB=uB
        self.w=[] # solution
        self.l=[] # lambdas
        self.g=[] # gammas
        self.f=[] # free weights

$${{\bf F}\cup{\bf B}}$$から、$${{\bf F}}$$のみリスト化しておけば良い。
CLAは最も高いポートフォリオの期待リターン$${{\bf \omega}^T{\bf \mu}}$$を持つ転換点を見つけ、それから次に低い期待リターンの転換点へと移っていくアルゴリズムである。
 最初の$${{\bf \omega}}$$は、期待リターンの順でポートフォリオの構成資産を並べ、最初に全て下限に設定した$${{\bf \oemga}}$$を、$${\mu_i}$$が高い順から上限へ変更し、$${\sum_i\u_i}$$が1を超えた資産を、1を超えないように、$${l_i < \omega_i < u_i}$$に設定し、$${{\bf F}}$$の要素となる。以下はそのままの下限値に据え置く。上限値、下限値の資産は$${{\bf B}={\bf N- F}}$$の要素となる。これが、最初の転換点となる。このコードは以下で実装される。

    def initAlgo(self):
        # Initialize the algo
        #1) Form structured array
        a=np.zeros((self.mean.shape[0]),dtype=[('id',int),('mu',float)])
        b=[self.mean[i][0] for i in range(self.mean.shape[0])] # dump array into list
        a[:]=zip(range(self.mean.shape[0]),b) # fill structured array
        #2) Sort structured array
        b=np.sort(a,order='mu')
        #3) First free weight
        i,w=b.shape[0],np.copy(self.lB)
        while sum(w)<1:
            i-=1
            w[b[i][0]]=self.uB[b[i][0]]
        w[b[i][0]]+=1-sum(w)
        return [b[i][0]],w

上記の記事で示したように、$${\displaystyle{\frac{\partial {\bf \omega}^T{\bf \mu}}{\partial \lambda} >0}}$$であるから、$${\lambda}$$を減少させて、次の転換点を探す。
 次の転換点では、$${{\bf F}}$$から資産が一つ削除されるか、加わるかの2択のシナリオがある。二番目の転換点では、最初の転換点で$${{\bf F}}$$の要素を一つのみに決めたため、$${{\bf F}}$$からの削除はなく、資産が加わるシナリオしかあり得ない。
 $${{\bf F}}$$に加わる資産を$${{\bf B}}$$の要素から選ぶコードは以下で実装される。

#2) case b): Free one bounded weight
            l_out=None
            if len(f)<self.mean.shape[0]:
                b=self.getB(f)
                for i in b:
                    covarF,covarFB,meanF,wB=self.getMatrices(f+[i])
                    covarF_inv=np.linalg.inv(covarF)
                    l,bi=self.computeLambda(covarF_inv,covarFB,meanF,wB,meanF.shape[0]-1, \
                        self.w[-1][i])
                    if (self.l[-1]==None or l<self.l[-1]) and l>l_out:
               l_out,i_out=l,i

ここで、getB(f)で$${{\bf B}}$$の要素を、$${{\bf N}-{\bf F}}$$で呼んでいる。

    def getB(self,f):
        return self.diffLists(range(self.mean.shape[0]),f)
#---------------------------------------------------------------
    def diffLists(self,list1,list2):
        return list(set(list1)-set(list2))

$${{\bf B}}$$の要素全てのiに対し、getMatrix(f+[i])で、$${{\bf F}_i={\bf F} \cup \{i\}}$$の$${{\bf V_F}_i, \ {\bf V_FB}_i, \ {\bf \mu_F}_i \ {\bf \omega_F}_i}$$を計算する。

    def getMatrices(self,f):
        # Slice covarF,covarFB,covarB,meanF,meanB,wF,wB
        covarF=self.reduceMatrix(self.covar,f,f)
        meanF=self.reduceMatrix(self.mean,f,[0])
        b=self.getB(f)
        covarFB=self.reduceMatrix(self.covar,f,b)
        wB=self.reduceMatrix(self.w[-1],b,[0])
        return covarF,covarFB,meanF,wB

#---------------------------------------------------------------
    def reduceMatrix(self,matrix,listX,listY):
        # Reduce a matrix to the provided list of rows and columns
        if len(listX)==0 or len(listY)==0:return
        matrix_=matrix[:,listY[0]:listY[0]+1]
        for i in listY[1:]:
            a=matrix[:,i:i+1]
            matrix_=np.append(matrix_,a,1)
        matrix__=matrix_[listX[0]:listX[0]+1,:]
        for i in listX[1:]:
            a=matrix_[i:i+1,:]
            matrix__=np.append(matrix__,a,0)
        return matrix__

getMartixで返ってきた行列を使い、$${\lambda^{(i)}}$$と$${C_i}$$を計算し、重み$${b_i}$$と$${\lambda^{(i)}}$$を返す。

    def computeLambda(self,covarF_inv,covarFB,meanF,wB,i,bi):
        #1) C
        onesF=np.ones(meanF.shape)
        c1=np.dot(np.dot(onesF.T,covarF_inv),onesF)
        c2=np.dot(covarF_inv,meanF)
        c3=np.dot(np.dot(onesF.T,covarF_inv),meanF)
        c4=np.dot(covarF_inv,onesF)
        c=-c1*c2[i]+c3*c4[i]
        if c==0:return
        #2) bi
        if type(bi)==list:bi=self.computeBi(c,bi)
        #3) Lambda
        if wB is None:
            # All free assets
            return float((c4[i]-c1*bi)/c),bi
        else:
            onesB=np.ones(wB.shape)
            l1=np.dot(onesB.T,wB)
            l2=np.dot(covarF_inv,covarFB)
            l3=np.dot(l2,wB)
            l2=np.dot(onesF.T,l3)
            return float(((1-l1+l2)*c4[i]-c1*(bi+l3[i]))/c),bi
#---------------------------------------------------------------
    def computeBi(self,c,bi):
        if c>0:
            bi=bi[1][0]
        if c<0:
            bi=bi[0][0]
        return bi

Case b)のコードの最終行で、$${\max_{i \in {\bf B}} \lambda^{(i)}}$$より、$${{\bf F}}$$に移る資産iを選んでいる。
 最初の二つの転換点を見つけた後は、$${{\bf F}}$$から追加、削除のどちらも起こり得る。
 $${{\bf F}}$$から削除され$${{\bf B}}$$に移るシナリオのコードは以下で実装される。

#1) case a): Bound one free weight
            l_in=None
            if len(f)>1:
                covarF,covarFB,meanF,wB=self.getMatrices(f)
                covarF_inv=np.linalg.inv(covarF)
                j=0
                for i in f:
                    l,bi=self.computeLambda(covarF_inv,covarFB,meanF,wB,j,[self.lB[i],self.uB[i]])
                    if l>l_in:l_in,i_in,bi_in=l,i,bi
                    j+=1

 $${{\bf F}}$$の要素の全てについて、削除後の$${\lambda}$$と$${b_i}$$を計算し、その中で、$${\Delta \lambda}$$の最も小さい資産が$${{\bf B}}$$へ移る候補となる。
 case a)とcase b)の両方の転換点が見つかった場合、$${\Delta \lambda}$$の小さい方が次の転換点となる。この決定は以下のコードで行われる。

#4) decide lambda
                if l_in>l_out:
                    self.l.append(l_in)
                    f.remove(i_in)
                    covarF,covarFB,meanF,wB=self.getMatrices(f)
                    w[i_in]=bi_in # set value at the correct boundary
                    wB[i_in]=bi_in
                else:
                    self.l.append(l_out)
                    f.append(i_out)
                    covarF,covarFB,meanF,wB=self.getMatrices(f)
                covarF_inv=np.linalg.inv(covarF)

次に移る転換点が決まった時点で、この転換点での重みを計算する。

#5) compute solution vector
            wF,g=self.computeW(covarF_inv,covarFB,meanF,wB)
            for i in range(len(f)):w[f[i]]=wF[i]
            self.w.append(np.copy(w)) # store solution
            self.g.append(g)
            self.f.append(f[:])
            if self.l[-1]==0:break
   def computeW(self,covarF_inv,covarFB,meanF,wB):
        #1) compute gamma
        onesF=np.ones(meanF.shape)
        g1=np.dot(np.dot(onesF.T,covarF_inv),meanF)
        g2=np.dot(np.dot(onesF.T,covarF_inv),onesF)
        if wB is None:
            g,w1=float(-self.l[-1]*g1/g2+1/g2),0
        else:
            onesB=np.ones(wB.shape)
            g3=np.dot(onesB.T,wB)
            g4=np.dot(covarF_inv,covarFB)
            w1=np.dot(g4,wB)
            g4=np.dot(onesF.T,w1)
            g=float(-self.l[-1]*g1/g2+(1-g3+g4)/g2)
        #2) compute weights
        w2=np.dot(covarF_inv,onesF)
        w3=np.dot(covarF_inv,meanF)
        return -w1+g*w2+self.l[-1]*w3,g

重みと一緒に計算されているのは、$${\lambda}$$の関数である$${\gamma}$$である。
 この繰り返しは、次の転換点の解が見つからない、または$${\lambda}$$が負になった時点で終了する。
 これまでで得られた一連の$${{\bf \omega}}$$は、各転換点で与えられた$${{\bf \omega}^T{\bf \mu}}$$の期待リターンを返す最少分散ポートフォリオで、効率フロンティア曲線をなす。しかし、効率フロンティアの左端の点は計算できていない。
 よって、最後にこの左端の最小分散ポートフォリオを計算する。

if (l_in==None or l_in<0) and (l_out==None or l_out<0):
                #3) compute minimum variance solution
                self.l.append(0)
                covarF,covarFB,meanF,wB=self.getMatrices(f)
                covarF_inv=np.linalg.inv(covarF)
                meanF=np.zeros(meanF.shape)

以上のコードは、CLAクラスの関数solve()でまとめられている。

    def solve(self):
        # Compute the turning points,free sets and weights
        f,w=self.initAlgo()
        self.w.append(np.copy(w)) # store solution
        self.l.append(None)
        self.g.append(None)
        self.f.append(f[:])
        while True:
            #1) case a): Bound one free weight
            l_in=None
            if len(f)>1:
                covarF,covarFB,meanF,wB=self.getMatrices(f)
                covarF_inv=np.linalg.inv(covarF)
                j=0
                for i in f:
                    l,bi=self.computeLambda(covarF_inv,covarFB,meanF,wB,j,[self.lB[i],self.uB[i]])
                    if l_in==None :l_in,i_in,bi_in=l,i,bi
                    elif l>l_in:l_in,i_in,bi_in=l,i,bi
                    j+=1
            #2) case b): Free one bounded weight
            l_out=None
            if len(f)<self.mean.shape[0]:
                b=self.getB(f)
                for i in b:
                    covarF,covarFB,meanF,wB=self.getMatrices(f+[i])
                    covarF_inv=np.linalg.inv(covarF)
                    l,bi=self.computeLambda(covarF_inv,covarFB,meanF,wB,meanF.shape[0]-1, \
                        self.w[-1][i])
                    if (self.l[-1]==None or l<self.l[-1]):
                        if(l_out==None): l_out,i_out = l,i
                        elif l>l_out: l_out,i_out=l,i
            if (l_in==None or l_in<0) and (l_out==None or l_out<0):
                #3) compute minimum variance solution
                self.l.append(0)
                covarF,covarFB,meanF,wB=self.getMatrices(f)
                covarF_inv=np.linalg.inv(covarF)
                meanF=np.zeros(meanF.shape)
            else:
                #4) decide lambda
                if l_in>l_out:
                    self.l.append(l_in)
                    f.remove(i_in)
                    covarF,covarFB,meanF,wB=self.getMatrices(f)
                    w[i_in]=bi_in # set value at the correct boundary
                    wB[i_in]=bi_in
                else:
                    self.l.append(l_out)
                    f.append(i_out)
                    covarF,covarFB,meanF,wB=self.getMatrices(f)
                covarF_inv=np.linalg.inv(covarF)
            #5) compute solution vector
            wF,g=self.computeW(covarF_inv,covarFB,meanF,wB)
            for i in range(len(f)):w[f[i]]=wF[i]
            self.w.append(np.copy(w)) # store solution
            self.g.append(g)
            self.f.append(f[:])
            if self.l[-1]==0:break
        #6) Purge turning points
        self.purgeNumErr(10e-10)
        self.purgeExcess()

この記事が気に入ったらサポートをしてみませんか?