UnityC#で並列ソートをした話し

こんにちは。DSPSEでプログラムを担当しているk.suzukiです。

Unity便利ですね。C#でゲームのみならず色々作ってしまいたくなる位色々な機能がありますね!

これだけ色々な機能が詰まっていると基本的な機能は当然の如く、お早いんでしょ?って疑うこともなく使ってしまいますね!

所で、Unityでソートって皆さん使いますか?私は結構使います。

ソートと言うとなじみが薄いかもしれませんが、ゲームのAIで大事なのは認知、判断、行動で、範囲内の値がどれくらいあるか?と言う、認知や判断部分で、判断材料の検索のしやすさが重要な要素になってきます。判断材料が多いとより多い判断が出来ますね

と言うことでまず、現状のUnityのソート機能はどれ位なの?使えるの?と言うところから見てみます。

//ランダムが詰まった配列用意
_originalArray = new NativeArray<int>(_dataNum,Allocator.Persistent);
for (var i = 0; i < _dataNum; i++){
   _originalArray[i] = Random.Range(0, int.MaxValue);
}
var test = new NativeArray<int>(_dataNum,Allocator.TempJob);
test.CopyFrom(_originalArray);

//arrayのソート 計測
var array = test.ToArray();
System.Diagnostics.Stopwatch sw = new System.Diagnostics.Stopwatch();
sw.Start();
Array.Sort(array);
sw.Stop();
UnityEngine.Debug.Log(sw.Elapsed + "ms");

//listのソート計測
var list = test.ToArray().ToList();
sw.Restart();
list.Sort();
sw.Stop();
UnityEngine.Debug.Log(sw.Elapsed + "ms");
test.Dispose();

結果(単位はms)

画像4

こうして見ると、母数が1000件位なら早いですね、フレーム毎に実行しても良さそうですね、それ以上になるとどうでしょう?フレーム毎に実行するとなると結構時間を逼迫しますね。

まあ、ソートなんで、出来ればフレーム中に何回も実行するような仕組みは避けたい所です。

1000件とは言え、意図しないオブジェクトでソートを使っていたら、知らない間に総ソート件数が増えFPSを落としてしまいそうです。

開発しているゲームの必要に応じて作ったのですが(Nativeコンテナのソートが無い)、以下は暗黒車輪再開発した物です。

遠く及びませんでした、C++ std::sort (MSVC内蔵のParallelism TS)を目指します。

後、条件としては、UnityのC#で実行できるようなポータブルな物を目指します。又並列化はジョブシステム、burstを使ったものを目指します。

まずは、シングルスレッドで動く物で、クイックソートの非再帰版。再帰版はスタックメモリが逝っちゃう可能性が多いので止めました。

        //非再帰バージョン
       public void  NonRecursiveQuickSort(NativeArray<int> array){
           var len = array.Length;
           if (len <= 0) return;
           var ptr = (int*)array.GetUnsafePtr();
           var size = (int)(math.LOG2E * math.log((len))) + 1;
           var leftStack = new NativeArray<int>(size,Allocator.TempJob);
           var rightStack = new NativeArray<int>(size,Allocator.TempJob);

           var ls =  (int*)leftStack.GetUnsafePtr();
           var rs =  (int*)rightStack.GetUnsafePtr();

           ls[0] = 0;
           rs[0] = len - 1;
           var p = 1;

           while (p > 0){
               p--;
               var left = ls[p];
               var right = rs[p];
               var keyPtr = (int*)ptr;
               while (left < right){
                   var m = (int)((left + right) / 2);
                   var pivot = ((int*)ptr)[m];
                   var l = left;
                   var r = right;
                   while (true){
                       while ((keyPtr + l)->CompareTo(pivot) < 0){l++;}
                       while (pivot.CompareTo(keyPtr[r]) < 0){r--;}
                       if (r <= l) break;
                       SwapKeyValue((int*) ptr, r, l);
                       l++;r--;
                   }
                   if (l - left < right - r) {
                       if (left < l) {
                           ls[p] = l;
                           rs[p] = right;
                           p++;
                       }
                       right = l - 1;
                   }
                   else {
                       if (r < right) {
                           ls[p] = left;
                           rs[p] = r;
                           p++;
                       }
                       left = r + 1;
                   } 
               }
           }
           rightStack.Dispose();
           leftStack.Dispose();
           void SwapKeyValue(int* kp,int x,int y){
               *(kp + x) ^= *(kp + y);
               *(kp + y) ^= *(kp + x);
               *(kp + x) ^= *(kp + y);
           }
       }

次に並列可能なソートを作成

こちらはクイックソートとマージソートを混ぜたものです。

マージソートの概念はいかにあるように、最初分割数を増やして行き、それをマージしながらソートする感じです。

自分の作ったクイックソートとマージソートの合成ソートは以下の概念です。

画像2

普通、マージソートは前半で分割していきます。

そこを端折ってクイックソートの複数稼働で前半部を一気に省略します。後半部もピンクなら4分割、朱色なら2分割の並列マージが可能なのでこの部分も並列実装します。

最後の整列がどうしても分割できませんが、それ以外の部分に速度向上の期待値が高まります。

        public void QuickMergeSort(NativeArray<int> key,int div = 128){
           var inputDeps = new JobHandle();
           inputDeps = new QmSort(){
               div = div,
               Key = key,
           }.Schedule(div,0);
           inputDeps.Complete();
           div = div >> 1;
           while (div > 0){
               inputDeps = new MargeSort(){
                   div = div,
                   Key = key,
               }.Schedule(div,0);
               inputDeps.Complete();
               div = div >> 1;
           }
       }
       [Unity.Burst.BurstCompile]
       public unsafe struct QmSort : IJobParallelFor{
           public int div;
           public NativeArray<int> Key;
           // Executeを定義
           public void Execute(int i){
               var divv = ((float)Key.Length / (float)div);
               var start = (int)(i * divv);
               var end = (int)((i + 1) * divv);
               var len = end - start;
               if (len <= 0) return;
               var size = (int)(math.LOG2E * math.log(len)) + 1;
               var leftStack = new NativeArray<int>(size,Allocator.Temp);
               var rightStack = new NativeArray<int>(size,Allocator.Temp);
       
               var ls =  (int*)leftStack.GetUnsafePtr();
               var rs =  (int*)rightStack.GetUnsafePtr();

               ls[0] = start;
               rs[0] = end - 1;
               var p = 1;
       
               var keyPtr = (int*)Key.GetUnsafePtr();
               while (p > 0){
                   p--;
                   var left = ls[p];
                   var right = rs[p];
       
                   while (left < right){
                       var m = (int)((left + right) >> 1);
                       var l = left;
                       var r = right;
                       var pivot = Median( keyPtr + l,keyPtr + m,keyPtr + r);
                       while (true){
                           while (keyPtr[l].CompareTo(pivot) < 0){l++;}
                           while (pivot.CompareTo(keyPtr[r]) < 0){r--;}
                           if (r <= l) break;
                           SwapV(keyPtr + l, keyPtr + r);
                           l++;
                           r--;
                       }
                       
                       if (l - left < right - r) {
                           if (left < l) {
                               ls[p] = l;
                               rs[p] = right;
                               p++;
                           }
                           right = l - 1;
                       }
                       else {
                           if (r < right) {
                               ls[p] = left;
                               rs[p] = r;
                               p++;
                           }
                           left = r + 1;
                       }
                   }
               }
               rightStack.Dispose();
               leftStack.Dispose();
           }
           void SwapV(int* x,int* y){
               *x ^= *y;
               *y ^= *x;
               *x ^= *y;
           }
       }
       [Unity.Burst.BurstCompile]
       public unsafe struct MargeSort : IJobParallelFor{
           public int div;
           public NativeArray<int> Key;
           // Executeを定義
           public void Execute(int idx){
               var len = Key.Length;
               var divv = ((float)len / (float)div);
               var start = (int)(idx * divv);
               var left = start;
               var right = (int)((idx + 1) * divv);
               var mid = (int)((idx + 0.5f) * divv);
               var keyPtr = (int*)Key.GetUnsafePtr();

               var i = left;
               var j = mid;
               var k = 0;
               
               var tlen = right - left;               
               var tempk = new NativeArray<int>((int) tlen , Allocator.Temp);
               var tempkPtr = (int*)tempk.GetUnsafePtr();

               while (i < mid && j < right) {
                   if (*(keyPtr + i) <= *(keyPtr + j)) {
                       tempkPtr[k++] = keyPtr[i++];
                   } else {
                       tempkPtr[k++] = keyPtr[j++];
                   }
               }
               if (i == mid) { /* 移動し尽くしたので、j側も順番に入れていく */
                   while (j < right) {
                       tempkPtr[k++] = keyPtr[j++];
                   }
               } else {
                   while (i < mid) { /* 移動し尽くしたので、i側も順番に入れていく */
                       tempkPtr[k++] = keyPtr[i++];
                   }
               }
               UnsafeUtility.MemCpy((keyPtr + start),tempkPtr,sizeof(int) * k);
               
               tempk.Dispose();
           }
       }



次に純粋にクイックソートの並列化を目指します。

こちらは普通のクイックソートのマルチスレッドと概念は同じですが若干実装が違います。

普通のクイックソートのマルチスレッド版が、左右分割した再帰分をStackしていくのに対して、下のバージョンはQSortStackでクイックソートの分割が終わるまでソートしながら分割数を増やして行き、全てNativeArray(下の例だstackOut)に、最初のインデックスと最後のインデックスを記録していきます。

最後に分割数分だけ一気に並列で非再帰版のクイックソート(NrqSort )を並列動作させます。

画像3

        public void QuickSort(NativeArray<int> key,int threshold = 64,int sizeSq = 10){
           var size = 2 << sizeSq;
           var stackZero = new NativeArray<int2>(size * 2 ,Allocator.TempJob);
           var stackIn = new NativeArray<int2>(size * 2,Allocator.TempJob);
           var stackOut = new NativeArray<int2>(size * 2,Allocator.TempJob);
           var inputDeps = new JobHandle();
           
           stackIn[0] = new int2(0,key.Length - 1);
           var d = 0;var d2 = 1;
           while (true){
               inputDeps = new QSortStack(){
                   threshold = threshold,
                   Key = key,
                   stackIn = stackIn,
                   stackOut = stackOut,
               }.Schedule(d2,0);
               inputDeps.Complete();

               if (UnsafeUtility.MemCmp(stackOut.GetUnsafePtr(), stackZero.GetUnsafePtr(),
                   sizeof(int2) * stackOut.Length).Equals(0)){
                   stackIn.Dispose();
                   stackOut.Dispose();
                   stackZero.Dispose();
                   return;
               };
               stackIn.CopyFrom(stackOut);
               d2 = 2 << d;
               d++;
               if (d2 > size){break;}
           }

           inputDeps = new NrqSort(){
               Key = key,
               stackIn = stackOut,
           }.Schedule(d2,0);
           inputDeps.Complete();

           stackIn.Dispose();
           stackOut.Dispose();
           stackZero.Dispose();
       }
       [Unity.Burst.BurstCompile]
       public unsafe struct QSortStack : IJobParallelFor{
           public int threshold; 
           public NativeArray<int> Key;
           public NativeArray<int2> stackIn;
           public NativeArray<int2> stackOut;
           // Executeを定義
           public void Execute(int i){
               // 要素数が少なくなってきたら挿入ソートに切り替え
               var bufferPtr = (int*) Key.GetUnsafePtr();
               var idx = i * 2;
               var idx2 = idx + 1;
               var stIp = (int2*)stackIn.GetUnsafePtr();
               var stOp = (int2*)stackOut.GetUnsafePtr();
               if (stIp[i].Equals(int2.zero)){
                   stOp[idx] = int2.zero;
                   stOp[idx2] = int2.zero;
                   return;
               }

               if ((stIp + i)->y - (stIp + i)->x < threshold){
                   InsertSort(bufferPtr,(stIp + i)->x, (stIp + i)->y);
                   stOp[idx] = int2.zero;
                   stOp[idx2] = int2.zero;
                   return;
               }
               
               var m = (int)(((stIp + i)->x + (stIp + i)->y) >> 1);
               var pivot = Median(bufferPtr + (stIp + i)->x, bufferPtr + m, bufferPtr +(stIp + i)->y);
               //var pivot = bufferPtr[m];
               // 左右分割
               var l = (stIp + i)->x;
               var r = (stIp + i)->y;

               while(l <= r){
                   while (l < (stIp + i)->y && (bufferPtr + l)->CompareTo(pivot) < 0) l++;
                   while (r > (stIp + i)->x && (bufferPtr + r)->CompareTo(pivot) >= 0) r--;
                   if (l > r) break;
                   SwapV(bufferPtr + l,bufferPtr + r);
                   l++; r--;
               }

               stOp[idx] = new int2((stIp + i)->x, l-1);
               stOp[idx2] = new int2(l, (stIp + i)->y);
           }
           void InsertSort(int* ptr, int start, int end){
               for (var insert = start + 1; insert <= end; insert++){
                   for (var j = insert; j > start &&  ptr[j - 1].CompareTo(ptr[j]) > 0; --j){
                       SwapV(ptr + j,ptr + (j - 1));
                   }
               }
           }
           private void SwapV(int* x,int* y){
               *x ^= *y;
               *y ^= *x;
               *x ^= *y;
           }
       }
       [Unity.Burst.BurstCompile]
       public unsafe struct NrqSort : IJobParallelFor{//NonRecursiveQuickSort
           public NativeArray<int> Key;
           public NativeArray<int2> stackIn;
           // Executeを定義
           public void Execute(int i){
               var stIp = (int2*)stackIn.GetUnsafePtr() + i;
               var len = stIp->y - stIp->x;
               if (len <= 0) return;
               var size = (int)(math.LOG2E * math.log(len)) + 1;
               var leftStack = new NativeArray<int>(size,Allocator.Temp);
               var rightStack = new NativeArray<int>(size,Allocator.Temp);
       
               var ls =  (int*)leftStack.GetUnsafePtr();
               var rs =  (int*)rightStack.GetUnsafePtr();
               *ls = stIp->x;
               *rs = stIp->y;
               var p = 1;
       
               var keyPtr = (int*)Key.GetUnsafePtr();
               while (p > 0){
                   p--;
                   var left = ls[p];
                   var right = rs[p];
       
                   while (left < right){
                       var m = (int)((left + right) >> 1);
                       var l = left;
                       var r = right;
                       //var pivot = keyPtr[m];
                       var pivot = Median( keyPtr + l,keyPtr + m,keyPtr + r);
                       while (true){
                           while ((keyPtr + l)->CompareTo(pivot) < 0){l++;}
                           while (pivot.CompareTo(keyPtr[r]) < 0){r--;}
                           if (r <= l) break;
                           SwapV(keyPtr + l,keyPtr + r);
                           l++;r--;
                       }
                       
                       if (l - left < right - r) {
                           if (left < l) {
                               ls[p] = l;
                               rs[p] = right;
                               p++;
                           }
                           right = l - 1;
                       }
                       else {
                           if (r < right) {
                               ls[p] = left;
                               rs[p] = r;
                               p++;
                           }
                           left = r + 1;
                       }
                   }
               }
               rightStack.Dispose();
               leftStack.Dispose();
           }
           private void SwapV(int* x,int* y){
               *x ^= *y;
               *y ^= *x;
               *x ^= *y;
           }
       }

実行結果

画像4

マルチスレッド版については、1000件あたりが遅く感じますが、一億想定の分割数で計測しています。ジョブの用意が結構大変なので、分割数をもっと低くすればまだまだ上がりそうです。

こうみると1000件位のソートだと普通にListやArrayの物でも大丈夫そうですが、1万を超えるあたりから考えた方がよさそうですね。

10万までマージソートが勝ち、それ以降はクイックソートに軍配が上がりました。






この記事が気に入ったらサポートをしてみませんか?