bzip2を読むタイトル__2_

bzip2を読む ブロックソート11

こんにちは、junkawaです。

本記事では、前回に続きブロックソートアルゴリズムの「巡回行列のソート」についてソースコードを交えて紹介します。

これまでの記事

https://note.mu/junkawashima/m/m3adbdc56f010

前回までのおさらい

前回は、fallbackSort()の紹介を行いました。

本記事では、fallbackSort() でバケツ内の要素をソートするために呼び出すfallbackQSort3()と、そこから呼び出すfallbackSimpleSort()を紹介します。

fallbackQSort3()では、fmap[lo]〜fmap[hi]の要素をソートします。ソート比較にはeclass[fmap[lo]]〜eclass[fmap[hi]]を使用します。

クイックソートfallbackQSort3() を用いて要素を分割していき、要素数が一定以下になったらシェルソートfallbackSimpleSort() に切り替えてソートします。
これはmainSort()のmainQSor3t()と同じ流れです。

ソースコード紹介

fallbackSimpleSort()@blocksort.c
https://github.com/junkawa/bzip2/blob/master/bzip2-1.0.6/blocksort.c#L32

https://github.com/junkawa/bzip2/blob/master/bzip2-1.0.6/blocksort.c#L24

https://github.com/junkawa/bzip2/blob/master/bzip2-1.0.6/blocksort.c#L59

   24: /*---------------------------------------------*/
   25: /*--- Fallback O(N log(N)^2) sorting        ---*/
   26: /*--- algorithm, for repetitive blocks      ---*/
   27: /*---------------------------------------------*/
   28:
   29: /*---------------------------------------------*/
   30: static
   31: __inline__
   32: void fallbackSimpleSort ( UInt32* fmap,
   33:                           UInt32* eclass,
   34:                           Int32   lo,
   35:                           Int32   hi )
   36: {

fmap
入力:ソート対象のブロック番号
出力:ソート結果

eclass
入力:ソート時の比較に使用する値

lo
入力:fmapの先頭インデックス

hi
入力:fmapの末尾インデックス

37: Int32 i, j, tmp;
38: UInt32 ec_tmp;
39:

下で紹介します。

   40:    if (lo == hi) return;
   41:

lo == hi の時、比較する要素が1つだけなので、ソート不要です。

   42:    if (hi - lo > 3) {

ソート対象が4個以上ある場合、4個おきにシェルソート(挿入ソート)を行います。

事前に大雑把にソートを済ませることで、52行目からの挿入ソートの高速化を図っています。

   43:       for ( i = hi-4; i >= lo; i-- ) {
   44:          tmp = fmap[i];
   45:          ec_tmp = eclass[tmp];
   46:          for ( j = i+4; j <= hi && ec_tmp > eclass[fmap[j]]; j += 4 )
   47:             fmap[j-4] = fmap[j];
   48:          fmap[j-4] = tmp;
   49:       }

fmap[i]を、fmap[i+4]、fmap[i+8]、fmap[i+12]、... の中で適切な位置に挿入します。

46行目のループでは、fmap[i] と fmap[i+4]、fmap[i+8]、... を比較して、fmap[i]より小さい場合、左(低位のインデックス)に一つずらしていきます(47行目)。fmap[i]より大きいfmap[i+4*n] が見つかった場合、その一つ前にfmap[i]をセットします(48行目)。

図は i=hi-4の時の比較を説明しています。

この時、fmap[hi-4]のブロック番号を、46〜48行で適切な位置に挿入します。

44行目でfmap[hi-4](=100)をtmpに退避します。
45行目では、ソート比較に用いる eclass[fmap[hi-4]](=650)をec_tmpにセットします。
46行目のループでは、i+4がhi-4 + 4 より、hiなので、j=hiとなり、ec_tmp(=650)とeclass[fmap[hi]](=eclass[301]=10)を比較して、ec_tmpの方が大きいのでループに入り、47行目を実行します。47行目では、fmap[hi-4] = fmap[hi] とブロック番号を入れ替えています。
j+=4として、ループ条件を確認すると、jがhi+4となっており、j<=hi がfalseとなるのでループを抜けます。

48行目で、fmap[hi] = tmp として、退避した100をfmap[hi]にセットします。

i=hi-4の時、fmap[hi-4] と fmap[hi] の eclass を比較してソートを行います。eclassが小さい方が左(低位のインデックス)になります。

図では、eclass[ fmap[hi-4] ] が 650、eclass[ fmap[hi] ] が 10 なので、eclass[ fmap[hi] ] の方が小さく、fmap[hi-4]の100とfmap[hi]の301を入れ替えています。

図は i=hi-5の時の比較を説明しています。

43行目のforループでは、iをデクリメントしていくので、i=hi-4で挿入ソートを行った次は、i=hi-5で挿入ソートを行います。

i=hi-5の時、fmap[hi-5] と fmap[hi-1] の eclass を比較してソートを行います。eclassが小さい方が左(低位のインデックス)になります。

図では、eclass[ fmap[hi-5] ] が 21、eclass[ fmap[hi-1] ] が 350 なので、ソートの必要はありません。

図は i=hi-8の時の比較を説明しています。

i=hi-8の時、fmap[hi-8] と fmap[hi-4] と fmap[hi] の eclass を比較してソートを行います。eclassが小さい方が左(低位のインデックス)になります。

ここで注意が必要なのは、fmap[hi-4]とfmap[hi]間のソートは、i=hi-4の時にすでに終了している、という点です。つまり、fmap[hi-8]がfmap[hi-4]より小さい場合、自動的にfmap[hi-8]はfmap[hi]より小さいことになり、fmap[hi-8]とfmap[hi]を比較する必要がありません。

このソート済みの fmap[hi-4]とfmap[hi]に対して、新規にソート対象として加えるのが fmap[hi-8]です。

まず、fmap[hi-8] と fmap[hi-4] を比較します。図では、eclass[ fmap[hi-8] ] が99、eclass[ fmap[hi-4] ] が 10 なので、fmap[hi-8] の250と fmap[hi-4] の301を入れ替えます。

図では、eclass[ fmap[hi-5] ] が 21、eclass[ fmap[hi-1] ] が 350 なので、ソートの必要はありません。

   50:    }
   51:
   52:    for ( i = hi-1; i >= lo; i-- ) {
   53:       tmp = fmap[i];
   54:       ec_tmp = eclass[tmp];
   55:       for ( j = i+1; j <= hi && ec_tmp > eclass[fmap[j]]; j++ )
   56:          fmap[j-1] = fmap[j];
   57:       fmap[j-1] = tmp;
   58:    }

fmap[i]を、fmap[i+1]、fmap[i+2]、fmap[i+3]、... 、fmap[hi] の中で適切な位置に挿入します。

55行目のループでは、fmap[i] と fmap[i+1]、fmap[i+2]、... を比較して、fmap[i]より小さい場合、左(低位のインデックス)に一つずらしていきます(56行目)。fmap[i]より大きいfmap[i+n] が見つかった場合、その一つ前にfmap[i]をセットします(57行目)。

比較の詳細については、前述の43〜49行のループと同じなので割愛します。

図は i=hi-1、i=hi-2、i=hi-3の時の、比較対象を示しています。

   59: }
   60:
   61:


fallbackQSort3()@blocksort.c
https://github.com/junkawa/bzip2/blob/master/bzip2-1.0.6/blocksort.c#L93

https://github.com/junkawa/bzip2/blob/master/bzip2-1.0.6/blocksort.c#L62

https://github.com/junkawa/bzip2/blob/master/bzip2-1.0.6/blocksort.c#L180

   62: /*---------------------------------------------*/
   63: #define fswap(zz1, zz2) \
   64:    { Int32 zztmp = zz1; zz1 = zz2; zz2 = zztmp; }
   65:

zz1とzz2を入れ替えます。

   66: #define fvswap(zzp1, zzp2, zzn)       \
   67: {                                     \
   68:    Int32 yyp1 = (zzp1);               \
   69:    Int32 yyp2 = (zzp2);               \
   70:    Int32 yyn  = (zzn);                \
   71:    while (yyn > 0) {                  \
   72:       fswap(fmap[yyp1], fmap[yyp2]);  \
   73:       yyp1++; yyp2++; yyn--;          \
   74:    }                                  \
   75: }
   76:
   77:

zzp1〜zzp1+zzn-1と、zzp2〜zzp2+zzn-1の領域を交換する。

   78: #define fmin(a,b) ((a) < (b)) ? (a) : (b)
   79:

a,b の小さい方の値を返します。

   80: #define fpush(lz,hz) { stackLo[sp] = lz; \
   81:                        stackHi[sp] = hz; \
   82:                        sp++; }
   83:

3つのスタック、stackLo、stackHiへデータを積みます。
スタックポインタspをインクリメントします。

   84: #define fpop(lz,hz) { sp--;              \
   85:                       lz = stackLo[sp];  \
   86:                       hz = stackHi[sp]; }
   87:

スタックからデータを取り出します。
スタックポインタspをデクリメントします。

   88: #define FALLBACK_QSORT_SMALL_THRESH 10

ソート対象の、比較する要素数がFALLBACK_QSORT_SMALL_THRESH未満の場合、クイックソートをやめて、シェルソート(fallbackSimpleSort())を行います。

   89: #define FALLBACK_QSORT_STACK_SIZE   100

クイックソートを再帰ではなく、スタックで実装します。
スタックのサイズはFALLBACK_QSORT_STACK_SIZEです。

   90:
   91:
   92: static
   93: void fallbackQSort3 ( UInt32* fmap,
   94:                       UInt32* eclass,
   95:                       Int32   loSt,
   96:                       Int32   hiSt )
   97: {

fmap
入力:ソート対象のブロック番号
出力:ソート結果

eclass
入力:ソート時の比較に使用する値

loSt
入力:fmapの下限インデックス

hiSt
入力:fmapの上限インデックス

   98:    Int32 unLo, unHi, ltLo, gtHi, n, m;
   99:    Int32 sp, lo, hi;
  100:    UInt32 med, r, r3;

ローカル変数です。下記で紹介します。

  101:    Int32 stackLo[FALLBACK_QSORT_STACK_SIZE];
  102:    Int32 stackHi[FALLBACK_QSORT_STACK_SIZE];
  103:

クイックソートを再帰ではなくスタックに積むことでループとして実装しています。
stackLo[]、stackHi[]はソート対象の領域の下限と上限です。fmap[starkLo[i] .. starkHi[i]] の範囲をソートします。

  104:    r = 0;
  105:

乱数の初期値を0としています。

  106:    sp = 0;
  107:    fpush ( loSt, hiSt );
  108:

初期値としてスタックに、loSt、hiStをセットします。

  109:    while (sp > 0) {
  110:

spが0になる時は、全てのソートが終わった時です。

  111:       AssertH ( sp < FALLBACK_QSORT_STACK_SIZE - 1, 1004 );
  112:

sp >= 99 (FALLBACK_QSORT_STACK_SIZE - 1) の時、アサートします。
stackLo[0..99] なので、spが100の時にmpush()するとオーバーフローします。

アサートが発生するspが99の時、本whileループでは、fpop()が1回、その後にfpush()が2回発生するのでspが99でfpush()することになります。まだオーバーフローする段階ではないですが、余裕をもってアサートしているものと思われます。

  113:       fpop ( lo, hi );

スタックからlo、hiを取り出します。

lo、hiはソート対象の領域の下限と上限です。fmap[lo .. hi] の範囲をソートします。

  114:       if (hi - lo < FALLBACK_QSORT_SMALL_THRESH) {

ソート対象の数(hi-lo)が、FALLBACK_QSORT_SMALL_THRESH(10)未満の場合、対象のソート方法fallbackSimpleSort()に切り替えます。

  115:          fallbackSimpleSort ( fmap, eclass, lo, hi );

fallbackSimpleSort () を実行します。

  116:          continue;

fmap[lo..hi]の範囲についてfallbackSimpleSort()でソートが完了したので、スタックに積まれている次の範囲をソートするためにcontinueします。

  117:       }
  118:

以下は、fallbackSimpleSort()ではなく、クイックソートする処理です。

  119:       /* Random partitioning.  Median of 3 sometimes fails to
  120:          avoid bad cases.  Median of 9 seems to help but
  121:          looks rather expensive.  This too seems to work but
  122:          is cheaper.  Guidance for the magic constants
  123:          7621 and 32768 is taken from Sedgewick's algorithms
  124:          book, chapter 35.
  125:       */

通常、クイックソートでは要素の左端、中間、右端の値の中で中央値を基準値(ピボット)として、領域を二分割します。

ここでは、3値の中央値では、問題があると主張しています。
代わりに9値の中央値は良さそうですが、計算コストが高いとしています。

中央値の代わりに乱数を用いて、3値のどれを基準値として用いるかを決めます。
乱数の生成には、Sedgewick's algorithms Chapter 35 に出てくるマジックナンバー7621、32768を使用します。

  126:       r = ((r * 7621) + 1) % 32768;

前回の値を元に乱数を生成します。

  127:       r3 = r % 3;

乱数をもとに 0,1,2 のどれかの値を取得します。

  128:       if (r3 == 0) med = eclass[fmap[lo]]; else
  129:       if (r3 == 1) med = eclass[fmap[(lo+hi)>>1]]; else
  130:                    med = eclass[fmap[hi]];
  131:

乱数に応じて基準値medを決定します。

r3が0の時、左端のeclass[fmap[lo]]。
r3が1の時、中間のeclass[fmap[(lo+hi)>>1]]。
r3が2の時、右端のeclass[fmap[hi]]。

  132:       unLo = ltLo = lo;
  133:       unHi = gtHi = hi;
  134:

3つの領域に分割するための区切りの変数unLo、ltLo、unHi、gtHiの初期値をセットする。

  135:       while (1) {

ここでは、4つの領域(ピボットと一致、ピボットより小さい、ピボットより大きい、ピボットと一致)に分割する。
このループを抜けたところで、最初と最後のピボットと一致する領域を結合し、3つの領域(ピボットより小さい、ピボットと一致、ピボットより大きい)にします。

  136:          while (1) {
  137:             if (unLo > unHi) break;
  138:             n = (Int32)eclass[fmap[unLo]] - (Int32)med;
  139:             if (n == 0) {
  140:                fswap(fmap[unLo], fmap[ltLo]);
  141:                ltLo++; unLo++;
  142:                continue;
  143:             };
  144:             if (n > 0) break;
  145:             unLo++;
  146:          }

fmap[unLo]に対応するeclass[ fmap[unLo] ] がmedより小さい場合、unLoを右に進めます(145行目)。この時、ltLoはmedより小さい文字を指しています。

eclass[ fmap[unLo] ] がmedに一致する場合(139行目)、unLoとltLoを入れ替えます(140行目)。unLo>=ltLoとなっているので、medに一致する値はlo側にセットされ、medより小さい値はunLo側にセットされます。交換した時点では、ltLoはmedに一致する値を指しています。次の141行目で、ltLo++することで、medより小さい文字を指すようにします。また、unLo++することで次に確認する値を指します。

eclass[ fmap[unLo] ] がmedより大きい場合、ループから抜けます(144行目)。

unHiを超えた場合、そこはすでにmedより大きい領域、またはhi(hi=unHiの時)を超えた領域なので、ループを抜けます。

直感的には、medより小さい、medに一致の順に並べ替えますが、ここではmedに一致、medより小さい、の順に並べ替えます。166行目でmedより小さい、medに一致の順に並べ直します。

  147:          while (1) {
  148:             if (unLo > unHi) break;
  149:             n = (Int32)eclass[fmap[unHi]] - (Int32)med;
  150:             if (n == 0) {
  151:                fswap(fmap[unHi], fmap[gtHi]);
  152:                gtHi--; unHi--;
  153:                continue;
  154:             };
  155:             if (n < 0) break;
  156:             unHi--;
  157:          }

fmap[unHi]に対応するeclass[ fmap[unHi] ]がmedより大きい場合、unHiを左に進めます(156行目)。この時、gtHiはmedより大きい文字を指しています。

eclass[ fmap[unHi] ]がmedに一致する場合(150行目)、unHiとgtHiを入れ替えます。unHi<=gtHiとなっているので、medに一致する値はhi側にセットされ、medより大きい値はunHi側にセットされます。交換した時点では、gtHiはmedに一致する値を指しています。次の152行目で、gtHi--することで、medより大きい文字を指すようにします。また、unHi--することで次に確認する値を指します。

eclass[ fmap[unHi] ]がmedより小さい場合、ループから抜けます(155行目)。

unLoを超えた場合、そこはすでにmedより小さい領域、またはlo(lo=unLoの時)を超えた領域なので、ループを抜けます。

直感的にはmedに一致、medより大きい、の順に並べ替えますが、ここではmedより大きい、medに一致、の順に並べ替えます。167行目でmedに一致、medより大きいの順に並べ直します。

  158:          if (unLo > unHi) break;

unLo > unHi が成立する時、領域に振り分ける要素がないので、whileループを抜ける。

  159:          fswap(fmap[unLo], fmap[unHi]); unLo++; unHi--;

159行目の時点で、eclass[ fmap[unLo] ]はmedより大きい、eclass[ fmap[unHi] ]はmedより小さい、となっています(図の上部)。
unLoとunHiを入れ替えると、medより小さい領域とmedより大きい領域をそれぞれ拡張することができます(図の下部)。

  160:       }
  161:
  162:       AssertD ( unHi == unLo-1, "fallbackQSort3(2)" );
  163:

上記のwhileループを抜けた時は、unHiとunLoは隣接し、unLo > unHi となることが期待されます。これを満たさない場合、アサートします。

  164:       if (gtHi < ltLo) continue;
  165:

gtHi < ltLoとなるのは、loからhiまでの全てのソート対象がmedに一致した場合です。
この場合、この領域はこれ以上ソートすることができないので、continueして再度別の領域にクイックソートを行います。

  166:       n = fmin(ltLo-lo, unLo-ltLo); fvswap(lo, unLo-n, n);


medに一致する領域とmedより小さい領域を入れ替えます。
medに一致する領域のサイズ(ltLo-lo)とmedより小さい領域のサイズ(unLo-ltLo)の小さい方のサイズ分だけ入れ替え対象とします。

  167:       m = fmin(hi-gtHi, gtHi-unHi); fvswap(unLo, hi-m+1, m);
  168:

medに一致する領域とmedより大きい領域を入れ替えます。
medに一致する領域のサイズ(hi-gtHi)とmedより大きい領域のサイズ(gtHi-unHi)の小さい方のサイズ分だけ入れ替え対象とします。
大きい方のサイズで入れ替えても、動作に支障は起きませんが、無駄な入れ替え(一致する領域から一致する領域への入れ替え、大きい領域から大きい領域への入れ替え)が発生してしまいます。

  169:       n = lo + unLo - ltLo - 1;
  170:       m = hi - (gtHi - unHi) + 1;
  171:

medより小さい領域のサイズはunLo-ltLoです。medより小さい領域の先頭はlo、最後はlo+unLo-ltLo-1(=n)となります。

medより大きい領域のサイズはgtHi-unHiです。medより大きい領域の最後はhi、先頭はhi-(gtHi-unHi)+1(=m)となります。

  172:       if (n - lo > hi - m) {
  173:          fpush ( lo, n );
  174:          fpush ( m, hi );
  175:       } else {
  176:          fpush ( m, hi );
  177:          fpush ( lo, n );
  178:       }

172行目で n-lo > hi-m がtrueとなるのは、medより小さい領域の要素数が、medより大きい領域の要素数より多い場合です。
この時、173、174行目で、medより小さい領域を先にスタックに積んでいます。

176、177行目は、medより大きい領域の要素数が、medより小さい領域の要素数より多い場合に実行されます。
この時、medより大きい領域を先にスタックに積んでいます。

どちらの場合でも、領域の要素数の多い方を先にスタックに積んでいます。言い換えると、要素数の少ない方を先にスタックから取り出すことになります。

サイズの小さい領域を優先的にソートすることで早めにソートを完了させ、スタックに積む回数が増えないようにしていると思われます。

  179:    }
  180: }

まとめ

本記事では、fallbackSort()でのクイックソートfallbackQSort3()とシェルソートfallbackSimpleSort()を紹介しました。

これで、blocksort.c のすべての処理の紹介が終わりました。

次回は、bzip2 の圧縮処理で、ブロックソートの次に実行する、MTF(Move To Frone)変換について紹介します。


ご覧下さりありがとうございます。いただいたサポートは図書館への交通費などに使わせていただきます。