見出し画像

Now in REALITY Tech #102 Kotlin Flowのcollectを深掘ってみた

こんにちは。
Androidチームの応為です。

今回は、Kotlin Flowにおける重要な概念であるcollectに焦点を当ててみたいと思います。
KotlinのFlowは、非同期ストリームデータを操作するための便利なライブラリですが、その中でもcollectとcollectLatestは特に使われている関数です。

そこで今回は、collectとcollectLatestの違いに焦点を当て、実際の動作を出力してみたいと思います。
具体的には、1から3までの整数をcollectとcollectLatestを使って購読し、その挙動の違いを検証してみます。

Kotlin Flowの中でもよく使われるcollectとcollectLatestの違いについて深堀りをする機会になればと思います。

collect

collectは、流れてきた要素を順次処理していく関数です。
この検証では、処理がすぐに終了しないようにdelayを使って待機しています。
それでは、このcollectの出力結果はどうなるでしょうか?

val flow = (1..3).asFlow()

runBlocking {
    flow.collect {
        println("start $it")
        delay(1000)
        println("end $it")
    }
}

実際に実行してみると、出力結果はこのようになります。

start 1
end 1
start 2
end 2
start 3
end 3


この結果を見てみると、collectで記述したラムダ引数が直列で処理されていることがわかります。
先に1~3のstartが出力され、その後に各数値のendが出力されるように、並列処理がされると予想された方もいらっしゃるのではないでしょうか?

collectLatest

collectLatestもcollectと同様に、emitされた要素を順次処理する関数です。
では、同じ条件でcollectLatestで購読した場合、どのような結果になるでしょうか?

val flow = (1..3).asFlow()

runBlocking {
    flow.collectLatest {
        println("start $it")
        delay(1000)
        println("end $it")
    }
}

実際に実行してみると、出力結果はこのようになります。

start 1
start 2
start 3
end 3

startが1から3まで順次表示され、最後にendが3のみが出力されています。
これは、collectの処理が終了していないときに新しい値がemitされた場合、実行中の処理を中断して新しい値の処理を開始するためです。

このことから、collectとcollectLatestの違いが明確になります。
それぞれの関数は、emitされるたびに実行されている処理の終了を待つか、実行中の処理を中断して新しい値を処理するかの違いがあります。
また、どちらの場合も購読した処理が直列で実行されることに注目してください。
このようにデータの流れを制御する仕組みはバックプレッシャーと呼ばれています。
では、バックプレッシャーについて説明をしていきます。

バックプレッシャー

データの流れを制御するために用いられる概念がバックプレッシャーです。
バックプレッシャーは、メモリの枯渇を防ぐためにデータの流れを適切に制御する手法です。

バックプレッシャーを扱うには、まずbufferについて理解する必要があります。
bufferとは、collectに渡す要素を一時的に保管しておく概念です。
collectでの処理が終了すると、buffer内に待機している要素が順次処理され、新しい購読処理が開始されます。
これにより、非同期でemitされた要素も順次処理することが可能となります。

では、bufferに値が過剰にたまった場合はどうなるでしょうか?
ここで、一度buffer関数を詳しく見てみましょう。

public fun <T> Flow<T>.buffer(
    capacity: Int = BUFFERED,
   onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): Flow<T>

capacityは、bufferに保持できる要素の量を表し、デフォルト値であるBUFFEREDは-2ですが、実際の個数としては64個を表します。
一方、onBufferOverflowは、capacityで指定した量を超えたときの挙動を指定します。
このonBufferOverflowで指定されるBufferOverflowには3種類の値があり、それぞれSUSPEND、DROP_OLDEST、DROP_LATESTの3つです。
また、Flow#bufferに関するKotlinのAPIリンクを本記事の最後に記載しているため合わせて参考にしてみてください。

では、下記のコードを用意して、3つのケースを見ていきましょう。

val flow = (1..4).asFlow()

runBlocking {
    flow
        .buffer(
            capacity = 1,
            onBufferOverflow = BufferOverflow.SUSPEND
        )
        .collect {
            println("start $it")
            delay(1000)
            println("end $it")
        }
}

SUSPEND

SUSPENDはcapacityに空きができるまで待機をします。
つまりemitのコールそのものが待たされる状態がつづくこととなり、
出力結果は下記のようにどの要素も欠けることなく実行されます。

start 1
end 1
start 2
end 2
start 3
end 3
start 4
end 4

DROP_OLDEST

DROP_OLDESTは、capacityに空きがない場合に古いものから削除されます。
最も古いものが削除された場合の出力はこのような結果となります。

start 1
end 1
start 4
end 4

1が最も古いのでは?と一瞬頭を捻りがちですが、1をemitしたタイミングではcollectにそのまま流されるため
bufferで待機する要素が2以降となり、最後にemitされた最新の4が出力されます。

DROP_LATEST

DROP_LATESTは、capacityに空きがない場合に新しいものから削除されます。
最新のものから削除された場合の出力はこのような結果となります。

start 1
end 1
start 2
end 2

DROP_OLDESTと同じく、1はそのままcollectに流れ、最新扱いとなる3、4が削除されることで1、2のみが出力されます。

まとめ

普段なら流れで使いそうなflowのcollectやcollectLatestについて深掘ってみました。
emitを短時間で多数コールすること自体そこまで多くはないため、バックプレッシャーを意識することはそうそうないですが、
順序管理をFlowが勝手に制御してくれる形になっていることはありがたいですね!

Flowのデータ処理でつまづいたときの参考になれれば幸いです。

またFlow#bufferについてはAPIに細かく記載があるためもっと詳しく見たいという方はAPIの方も合わせて読んでみてください。