bzip2を読むタイトル__2_

bzip2を読む ブロックソート6

こんにちは、junkawaです。

本記事では、前回に続きブロックソートアルゴリズムの「巡回行列のソート」についてソースコードを交えて紹介します。

前回の記事

bzip2を読む はじめに
bzip2を読む ブロックソート1
bzip2を読む ブロックソート2
bzip2を読む ブロックソート3
bzip2を読む ブロックソート4
bzip2を読む ブロックソート5

前回までのおさらい

巡回行列のソートでは、先頭2バイトが異なる行について、クイックソートを行います。

クイックソートはある程度ソートが完了した後で、シェルソートに切り替えます。

本記事では、シェルソートの実装について紹介します。

シェルソート

シェルソートは mainSimpleSort() で行います。ソート中の比較関数 は mainGtU()です。

mainGtU()では、2つの行の全てのシンボルを比較して大小の比較を行います。

ソースコード紹介

mainGtU()@blocksort.c
https://github.com/junkawa/bzip2/blob/master/bzip2-1.0.6/blocksort.c#L338

https://github.com/junkawa/bzip2/blob/master/bzip2-1.0.6/blocksort.c#L469

mainSimpleSort()@blocksort.c
https://github.com/junkawa/bzip2/blob/master/bzip2-1.0.6/blocksort.c#L472

https://github.com/junkawa/bzip2/blob/master/bzip2-1.0.6/blocksort.c#L555

mainGtU()

  338: /*---------------------------------------------*/
  339: /*--- The main, O(N^2 log(N)) sorting       ---*/
  340: /*--- algorithm.  Faster for "normal"       ---*/
  341: /*--- non-repetitive blocks.                ---*/
  342: /*---------------------------------------------*/
  343: 

同じ値やパターンが繰り返されない場合(一般的な場合)、早いそうです。
同じ値やパターンが繰り返される場合は、fallbackSort()を行います。

計算量は N^2 log(N)とのことですが、詳しく検討できていません。

  344: /*---------------------------------------------*/
  345: static
  346: __inline__
  347: Bool mainGtU ( UInt32  i1, 
  348:                UInt32  i2,
  349:                UChar*  block, 
  350:                UInt16* quadrant,
  351:                UInt32  nblock,
  352:                Int32*  budget )
  353: {

i1
入力:比較対象の先頭インデックス。block[i1]から比較を始めます。

i2
入力:較対象の先頭インデックス。block[i2]から比較を始めます。

block
入力:入力データブロック

quadrant
入力:同じ入力文字、同じ入力パターンが連続する場合、比較に時間がかかるが、これを避けて高速に比較できるようにする情報。

nblock
入力:blockのサイズ。block[0..nblock]。nblockは最大で900,000。

budget
入力、出力:同じ入力文字、同じ入力パターンが連続する場合、クイックソートだと時間がかかるので、別のソート方法(fallbackSort())に切り替える必要がある。そのためのカウンタ。8バイト一致したら1減ります。

戻り値
True i1 の行がi2の行より大きい
Flase i1 の行がi2の行より小さい
   i1 の行とi2の行が全て一致

  354:    Int32  k;
  355:    UChar  c1, c2;
  356:    UInt16 s1, s2;
  357: 

後で紹介します。

  358:    AssertD ( i1 != i2, "mainGtU" );

i1とi2が同じ場合、アサートします。

比較対象のi1とi2は違う必要があります。仮に同じ場合、全てのシンボルが一致し、比較に時間がかかってしまいます。

  359:    /* 1 */
  360:    c1 = block[i1]; c2 = block[i2];
  361:    if (c1 != c2) return (c1 > c2);
  362:    i1++; i2++;

block[i1]とblock[i2]を比較し、値が異なる場合、block[i1] が block[i2]より大きい場合Trueを返し、小さい場合Flaseを返します。

値が同じ場合、次の文字を比較します。

以下、/* 1 */ の処理を12回繰り返します。

12という回数は、歴史的経緯で決定された値だと思われます。
経験的にこれが一番高速だったのでしょうか。

bzip2の最初のバージョン0.9.0では、1バイト比較の6回となっています。
0.9.5dでは、2バイト比較の6回。
1.0.0以降では、1バイト比較の12回になっています。

0.9.0のソースコードftp://ftp.gefoekom.de/pub/really_old_stuff/unix/tools/archiver/bzip2-0.9.0.tar.gz
0.9.5dのソースコード
ftp://ftp2.piotrkosoft.net/pub/mirrors/squid-www/contrib/seafood/SW/bzip2-0.9.5d.tar.gz
1.0.0のソースコード
https://mirrors.slackware.com/slackware/slackware-7.1/source/a/bzip2/bzip2-1.0.0.tar.gz

  363:    /* 2 */
  364:    c1 = block[i1]; c2 = block[i2];
  365:    if (c1 != c2) return (c1 > c2);
  366:    i1++; i2++;
  367:    /* 3 */
  368:    c1 = block[i1]; c2 = block[i2];
  369:    if (c1 != c2) return (c1 > c2);
  370:    i1++; i2++;
  371:    /* 4 */
  372:    c1 = block[i1]; c2 = block[i2];
  373:    if (c1 != c2) return (c1 > c2);
  374:    i1++; i2++;
  375:    /* 5 */
  376:    c1 = block[i1]; c2 = block[i2];
  377:    if (c1 != c2) return (c1 > c2);
  378:    i1++; i2++;
  379:    /* 6 */
  380:    c1 = block[i1]; c2 = block[i2];
  381:    if (c1 != c2) return (c1 > c2);
  382:    i1++; i2++;
  383:    /* 7 */
  384:    c1 = block[i1]; c2 = block[i2];
  385:    if (c1 != c2) return (c1 > c2);
  386:    i1++; i2++;
  387:    /* 8 */
  388:    c1 = block[i1]; c2 = block[i2];
  389:    if (c1 != c2) return (c1 > c2);
  390:    i1++; i2++;
  391:    /* 9 */
  392:    c1 = block[i1]; c2 = block[i2];
  393:    if (c1 != c2) return (c1 > c2);
  394:    i1++; i2++;
  395:    /* 10 */
  396:    c1 = block[i1]; c2 = block[i2];
  397:    if (c1 != c2) return (c1 > c2);
  398:    i1++; i2++;
  399:    /* 11 */
  400:    c1 = block[i1]; c2 = block[i2];
  401:    if (c1 != c2) return (c1 > c2);
  402:    i1++; i2++;
  403:    /* 12 */
  404:    c1 = block[i1]; c2 = block[i2];
  405:    if (c1 != c2) return (c1 > c2);
  406:    i1++; i2++;
  407: 

/* 1 */ の処理を12回繰り返します。

  408:    k = nblock + 8;
  409: 

ここで、nblockに8を加算している理由は不明です。

  410:    do {
  411:       /* 1 */
  412:       c1 = block[i1]; c2 = block[i2];
  413:       if (c1 != c2) return (c1 > c2);

まず、c1とc2で比較を行い、一致した場合、

  414:       s1 = quadrant[i1]; s2 = quadrant[i2];
  415:       if (s1 != s2) return (s1 > s2);

quadrantを使って比較を行います。
quadrantは、mainSort()のStep 3でセットされます。

ここでは、quadrantの簡単な紹介にとどめます。
(quadrantについては、mainSort()のStep 3で詳しく紹介します。)

c1(=c2)のシンボルから始まる行がすでにソート済みの場合

ブロック番号i1、i2はptr上でソート完了しています。
つまりptr上での i1の位置と、i2の位置を比べれば、以降のシンボルを比較しないでも、結果が分かることになります。

図では、説明を簡単にするためにシンボルをアルファベットにしています。実際は0〜255のシンボルです。

mainGtU()にて、block[88]以降とblock[198]以降を比較します。

先頭12バイトが一致したので、13バイト目を比較しています。
13バイト目(block[100]とblock[2100])が ‘b’ で一致しました。ここで、’b’から始まる行がすでにソート済みであると仮定します。'b'から始まる行がソートされた時、mainSort()のStep3にて、quadrantに ‘b’から始まる行についてptr上でのインデックスがセットされます。

block[100]から始まる行はソートの結果、ptrのインデックス70、block[2100]から始まる行はptrのインデックス40にソートされたとします。つまり、block[2100]から始まる行の方がblock[100]から始まる行より小さい、です。

quadrant[100]には70、quadrant[2100]には40がセットされます。
quadrantを使うことで、ブロック番号(100,2100)から、ptr上での位置(=比較結果)を得ることができます(quadrantを使わずに、ptr上を探索しても同様のことはできますが、時間がかかり本末転倒となるので、事前にセットしています)。

ソースに合わせて説明すると、i1が100、i2が2100。
c1が’b’、c2が’b’。s1が70、s2が40。100と2100以降を比較しなくても、この時点で以降の比較結果をquadrantから知ることができます。

c1(=c2)のシンボルから始まる行がソート済みでない場合

quadrantは初期値の0のままです。

したがって、quardant[i1]とquadrant[i2]はどちらも0となり、次の文字を比較することになります。

   416:       i1++; i2++;

先頭12バイトの比較にquadrantを使わない理由は不明です。
先頭12バイトで同じシンボルが続く確率が低いため、quadrantを使わずに処理を高速化しているのでしょうか。

  417:       /* 2 */
  418:       c1 = block[i1]; c2 = block[i2];
  419:       if (c1 != c2) return (c1 > c2);
  420:       s1 = quadrant[i1]; s2 = quadrant[i2];
  421:       if (s1 != s2) return (s1 > s2);
  422:       i1++; i2++;
  423:       /* 3 */
  424:       c1 = block[i1]; c2 = block[i2];
  425:       if (c1 != c2) return (c1 > c2);
  426:       s1 = quadrant[i1]; s2 = quadrant[i2];
  427:       if (s1 != s2) return (s1 > s2);
  428:       i1++; i2++;
  429:       /* 4 */
  430:       c1 = block[i1]; c2 = block[i2];
  431:       if (c1 != c2) return (c1 > c2);
  432:       s1 = quadrant[i1]; s2 = quadrant[i2];
  433:       if (s1 != s2) return (s1 > s2);
  434:       i1++; i2++;
  435:       /* 5 */
  436:       c1 = block[i1]; c2 = block[i2];
  437:       if (c1 != c2) return (c1 > c2);
  438:       s1 = quadrant[i1]; s2 = quadrant[i2];
  439:       if (s1 != s2) return (s1 > s2);
  440:       i1++; i2++;
  441:       /* 6 */
  442:       c1 = block[i1]; c2 = block[i2];
  443:       if (c1 != c2) return (c1 > c2);
  444:       s1 = quadrant[i1]; s2 = quadrant[i2];
  445:       if (s1 != s2) return (s1 > s2);
  446:       i1++; i2++;
  447:       /* 7 */
  448:       c1 = block[i1]; c2 = block[i2];
  449:       if (c1 != c2) return (c1 > c2);
  450:       s1 = quadrant[i1]; s2 = quadrant[i2];
  451:       if (s1 != s2) return (s1 > s2);
  452:       i1++; i2++;
  453:       /* 8 */
  454:       c1 = block[i1]; c2 = block[i2];
  455:       if (c1 != c2) return (c1 > c2);
  456:       s1 = quadrant[i1]; s2 = quadrant[i2];
  457:       if (s1 != s2) return (s1 > s2);
  458:       i1++; i2++;
  459: 

/* 1 */ を8回繰り返します。

  460:       if (i1 >= nblock) i1 -= nblock;
  461:       if (i2 >= nblock) i2 -= nblock;
  462: 

ここでやっと、blokck[nblock-1]を超えた場所を参照した場合の対応をしていいます。

ここまで nblock を超えて参照したかをチェックしなかったのは、処理の高速化が原因だと思われます。

どれだけblock[nblock-1]を超えるか、図示します。

呼び出し元のdは最大15となっています(先頭2バイトの計数ソート分とクイックソートの13バイト分を超えた時)。

呼び出し元が、比較対象の要素数hi-loが20未満となって本関数を呼び出す時には、dは14以下かもしれません。呼び出し元が、先頭からの一致シンボル数が14より多い場合に本関数を呼び出す場合、dは15となっています。

呼び出し元のptr[i]の値は、最大nblock-1となります。また、本関数の先頭12バイトの比較と、ループ初回時の8バイトの比較で、計35バイト分、block[nblock-1]からオーバーしています。

これを考慮して、mainSort() 796行目では、事前に34バイト(BZ_N_OVERSHOOT)分、block[nblokc]〜block[nblock+BZ_N_OVERSHOOT-1]まで、block[0]〜block[BZ_N_OVERSHOOT-1]の値をコピーしています。

  463:       k -= 8;
  464:       (*budget)--;

8バイト一致したら、budgetを1減らします。

呼び出し元で、budgetが負数になっていることが確認できたら、一致するシンボルが多すぎるということで、fallbackSort()に切り替えます。

  465:    }
  466:       while (k >= 0);
  467: 

kの初期値がnblock+8なので、nblock+8 -> nblock -> nblock-8 … となり、kが負数になると、全てのシンボルが一致したとして比較終了します。

なぜnblock+8から始まるのか不明です。k=nblockとしても、全てのブロックについて比較できます。データキャッシュに関連する高速化のためでしょうか。

  468:    return False;

比較対象の行の全てのシンボルが一致した場合、Falseを返します。


  469: }
  470: 
  471: 

mainSimpleSort()

  472: /*---------------------------------------------*/
  473: /*--
  474:    Knuth's increments seem to work better
  475:    than Incerpi-Sedgewick here.  Possibly
  476:    because the number of elems to sort is
  477:    usually small, typically <= 20.
  478: --*/
  479: static
  480: Int32 incs[14] = { 1, 4, 13, 40, 121, 364, 1093, 3280,
  481:                    9841, 29524, 88573, 265720,
  482:                    797161, 2391484 };
  483: 

incs配列はシェルソートの間隔です。

シェルソートの間隔はいくつか提案されているのですが、ここでは、Knuth's increments を採用しています。

mainSimpleSort()を呼び出す時は、ソート対象の数がMAIN_QSORT_SMALL_THRESH(20)未満としています。この場合Incerpi-Sedgewickの間隔 (1,3,7,21,48,112,...)より、Knuthの間隔がよいそうです。

  484: static
  485: void mainSimpleSort ( UInt32* ptr,
  486:                       UChar*  block,
  487:                       UInt16* quadrant,
  488:                       Int32   nblock,
  489:                       Int32   lo, 
  490:                       Int32   hi, 
  491:                       Int32   d,
  492:                       Int32*  budget )
  493: {

ptr
入力:ptr[lo..hi]の範囲で、ブロック番号 ptr[i] (block[ptr[i]]) から始まる(巡回行列の)行をソートする。
出力:ソート結果をptrに格納する。

block
入力:入力データブロック

quadrant
入力:同じ入力文字、同じ入力パターンが連続する場合、比較に時間がかかるが、これを避けて高速に比較できるようにする情報。

nblock
入力:blockのサイズ。block[0..nblock]。nblockは最大で900,000。

lo
入力:ソート対象の、ptr の下限インデックス。

hi
入力:ソート対象の、ptr の上限インデックス。

d
入力:文字列中の比較文字の位置。

budget
入力、出力:同じ入力文字、同じ入力パターンが連続する場合、クイックソートだと時間がかかるので、別のソート方法(fallbackSort())に切り替える必要がある。そのためのカウンタ。

  494:    Int32 i, j, h, bigN, hp;
  495:    UInt32 v;
  496: 

以下で説明します。

  497:    bigN = hi - lo + 1;
  498:    if (bigN < 2) return;
  499: 

bigNが1となるのは、hi と lo が同じ時です。この時、ソートする必要はないので、すぐに関数を抜けます。

  500:    hp = 0;
  501:    while (incs[hp] < bigN) hp++;
  502:    hp--;
  503: 

シェルソートの間隔の初期値を決定します。
incs配列中の値で、bigNより小さい中で、一番大きな値を間隔とします。

bigN = 4000 の時、incs[hp] は3280 を指します。

  504:    for (; hp >= 0; hp--) {

hp=0、つまり間隔が1(incs[0]は1)でソートするまで、ループを回します。

  505:       h = incs[hp];
  506: 

シェルソートでは、間隔hを徐々に狭めていきます。hpが小さくなるにつれ、間隔incs[hp]も小さくなります。

図では4色しか表示していませんが、h=3280の場合3280色、h=1093の場合1093色あります。同じ色の要素間でソートを行います。

このように、間隔をだんだん狭めてソートすることで、最後のh=1でのソートを高速に終えることができます。

  507:       i = lo + h;

比較の基準値 block[ptr[i]+d] i の初期値を lo+h とします。

  508:       while (True) {
  509: 

lo+h〜hi の範囲で、間隔hで要素間をソートします。

図は、h=1093、lo=300の場合です。507行目より、i = 1393。

図中の上段は、i = 1393 の時です。
510〜523行の copy 1 では、512行目で v = ptr[1393]。514行目でmainGtU(ptr[300]+d, v+d,...)を比較し、517行目で大きい方をptr[1393]に入れ、521行目で小さい方をptr[300]に入れています。

図中の中段は、522行目で i = 1394とし、同様にcopy 2 を行った場合です。

図中の下段は、508のwhileループを繰り返し、i = 2486 ( lo+2*h)になった場合です。
この時、ptr[300]+d、ptr[1393]+d、ptr[2486]+d間でソートを行います。300、1393間はすでにソート済み(i=1393の時)です。
まず v = ptr[2486]で、ptr[2486]とptr[1393]をmainGtU(ptr[1393]+d, v+d,...) で比較します。ptr[1393]+dの方が小さい場合、mainGtU()はFalseを返すので、514行目のwhileループには入りません。この時、ptr[300]+d、ptr[1393]+d、ptr[2486]+d間はソート済みとなるため、2489の値を入れ替える必要はありません。
ptr[1393]+dの方が大きい場合、514行目のwhileループに入り、517行目でptr[1393]の値をptr[2486]に入れます。また、次のwhileループでptr[300]+dとv+dを比較します。ptr[300]+dの方が大きい場合、517行目でptr[300]の値をptr[1393]に入れます。そして521行目でptr[300]にvを入れます。ptr[300]+dの方が小さい場合、whileループには入らず、521行目でptr[1393]にvを入れます。結果として、ptr[300]+d、ptr[1393]+d、ptr[2486]+d間はソート済みとなります。

  510:          /*-- copy 1 --*/
  511:          if (i > hi) break;

比較対象の要素がなくなったら(hiを超えたら)、whileループを抜けます。

  512:          v = ptr[i];
  513:          j = i;

下記では、v を ptr[j]、ptr[j-h]、ptr[j-h*2]、ptr[j-h*3] … の中で適当な場所に挿入します。

  514:          while ( mainGtU ( 
  515:                     ptr[j-h]+d, v+d, block, quadrant, nblock, budget 
  516:                  ) ) {

ptr[j-h]とvの0からd-1までのシンボルは同じ値です。

mainGtU()では、ptr[j-h]+d 、v+d 以降のシンボルを行の最後まで比較していきます。

ソートの比較では、-1、0、1を返すことが多いですが、ここでは、True、Falseが返ってきます。

ptr[j-h]+d以降のシンボルの方が大きい場合、Trueが返ります。v+d以降のシンボルの方が大きい場合、またはptr[j-h]+d以降とv+d以降のシンボルが全て一致した場合、Falseが返ります。

以下の処理は、ptr[j-h]+d以降のシンボルの方が大きい場合となります。
ソートでは、シンボルが大きい方を ptr の後方に置きます。

  517:             ptr[j] = ptr[j-h];

ptr[j-h] を後方(ptr[j])に移動します。

  518:             j = j - h;

間隔h離れた要素を比較対象にします。

  519:             if (j <= (lo + h - 1)) break;

間隔h離れた要素がない場合、ループを抜けます。

  520:          }
  521:          ptr[j] = v;

v を、適切な場所(ptr[j])に挿入します。

  522:          i++;
  523: 

次の要素(ptr[i])を、比較対象とします。

  524:          /*-- copy 2 --*/
  525:          if (i > hi) break;
  526:          v = ptr[i];
  527:          j = i;
  528:          while ( mainGtU ( 
  529:                     ptr[j-h]+d, v+d, block, quadrant, nblock, budget 
  530:                  ) ) {
  531:             ptr[j] = ptr[j-h];
  532:             j = j - h;
  533:             if (j <= (lo + h - 1)) break;
  534:          }
  535:          ptr[j] = v;
  536:          i++;
  537: 

copy 1と同じです。

  538:          /*-- copy 3 --*/
  539:          if (i > hi) break;
  540:          v = ptr[i];
  541:          j = i;
  542:          while ( mainGtU ( 
  543:                     ptr[j-h]+d, v+d, block, quadrant, nblock, budget 
  544:                  ) ) {
  545:             ptr[j] = ptr[j-h];
  546:             j = j - h;
  547:             if (j <= (lo + h - 1)) break;
  548:          }
  549:          ptr[j] = v;
  550:          i++;
  551: 

copy 1と同じです。

  552:          if (*budget < 0) return;

budgetがなくなったら、呼び出し元に戻ります。
(fallbackSort()に切り替えます)

  553:       }
  554:    }
  555: }
  556: 
  557: 

まとめ

クイックソートから呼び出されるシェルソートについて紹介しました。これで先頭2バイトの値が異なる行についてのソートが完了しました。

次回は、先頭2バイトが同じ値となる行のソートについて紹介します。

ご覧下さりありがとうございます。いただいたサポートは図書館への交通費などに使わせていただきます。