はじめに
前回はRxJavaにおける合成オペレータについてサンプルを用いて解説しました。
その記事は以下の通りです。
今回は、RxJavaにおけるSchedulerについてでsubscribeOnメソッドとobserveOnメソッドの違いについてお話します。
Schedulerとは?
RxJavaで用意されているスレッドを管理するクラスのことです。
スレッドとは
threadとは、糸とかいう意味でプログラミング的には処理の始めから終わりまでの実行の流れのことをいいます。詳しくは下記参照。入門てきな技術書にも載ってます。
僕もそういえば初めてオリジナルアプリ作ったときにやったなぁとふと思い出した。(小並感)
スレッドはプログラミングを勉強し始めたばかりだとほぼほぼ使うことが無かったりするのですが、実際に開発をやる際には気を付けることが多々あります。
例えば、通信を行うスレッドとUIの描画を行うスレッドなどを一緒にしてはいけません。描画が遅くなってパフォーマンスが悪くなったりするからです。僕は一度やって失敗した経験があります。(笑)
話を元に戻します
RxJavaでは時間を扱う処理をしなければ、デフォルトのスレッドですべての処理が行われます。
また扱うメソッドによってはデフォルトでメインのスレッド上とは異なるスレッドで処理を行ってくれる生産者もあります。
前回の記事の中で使用していたintervalメソッドなどです。
スレッドを受け取ったデータに対してどのように処理をするのかについてわかっていないと僕のような失敗を引き起こすことがあります。
なのでまずは、subscribeOnとobserveOnメソッドの違いについて見ていきましょう。
subscribeOnメソッド
subscribeOnではObservableの処理をどのScheduler上で行うか設定できます。
実際にサンプルプログラムを見ていきましょう。
fun main(args: Array<String>) {
val array: Array<String> = arrayOf("allow", "bow", "", "dead")
Observable.from(array)
.subscribeOn(Schedulers.computation())
.subscribeOn(Schedulers.io())
.subscribeOn(Schedulers.immediate())
.subscribeOn(Schedulers.trampoline())
.subscribe{data ->
val thread = Thread.currentThread().name
println(thread + ": " + data)
}
Thread.sleep(1000L)
}
こんかいはいつも通りに文字列のObservableにたいしていくつかのSchedulerを設定してみます。スレッドの名前を取得して表示するようにしてあります。
実際の実行結果が以下になります。
subscribeOnメソッドでは一度設定するとそれ以降Schedulerを設定することはできません。またデフォルトの設定があるintervalメソッドでも設定することはできません。
実際にサンプルを見ていきましょう。
fun main(args: Array<String>) {
Observable.interval(500L, TimeUnit.MILLISECONDS)
.take(10)
.subscribeOn(Schedulers.io())
.subscribeOn(Schedulers.immediate())
.subscribeOn(Schedulers.trampoline())
.subscribe{data ->
val thread = Thread.currentThread().name
println(thread + ": " + data)
}
Thread.sleep(1000L)
}
intervalメソッドでObservableを生成し、10件だけ取ってきてSchedulerを他に設定してみます。
この実行結果が以下になります。
やはりデフォルトで指定されている場合もSchedulerの変更ができないことが分かります。
では、処理の途中でSchedulerの変更を行いたい場合はどうすればよいでしょうか?
それが次に紹介するobserveOnメソッドになります。
observeOnメソッド
observeOnメソッドでは、受け取ったデータの処理をどのSchedulerで行うのかを設定することが出来ます。subscribeOnとの違いはSchedulerの設定を変更することができる点です。
実際のサンプルプログラムを見ていきましょう。
fun main(args: Array<String>) {
val array: Array<String> = arrayOf("allow", "bow", "", "dead")
Observable.from(array)
.observeOn(Schedulers.computation())
.observeOn(Schedulers.io())
.observeOn(Schedulers.immediate())
.observeOn(Schedulers.trampoline())
.subscribe{data ->
val thread = Thread.currentThread().name
println(thread + ": " + data)
}
Thread.sleep(1000L)
}
さっきsubscribeOnメソッドで説明していたサンプルでsubscribeOnをobserveOnに変更してやってみました。
実行結果が以下になります。
実行結果を見てみると、確かに最初に設定したSchedulerと異なるSchedulerを設定できていることが分かります。
では、デフォルトで設定してあるSchedulerをもつintervalメソッドでのObservableについてはどうなのでしょうか?
実際にサンプルを見ていきましょう。
fun main(args: Array<String>) {
Observable.interval(500L, TimeUnit.MILLISECONDS)
.take(10)
.observeOn(Schedulers.io())
.observeOn(Schedulers.immediate())
.observeOn(Schedulers.trampoline())
.subscribe{data ->
val thread = Thread.currentThread().name
println(thread + ": " + data)
}
Thread.sleep(1000L)
}
これもさきほどのsubscribeOnの時と同じでsubscribeOnをobserveOnに変更してやってみました。
実際の実行結果が以下になります。
defaultで指定されている場合でもobserveOnメソッドではSchedulerを変更することが可能です。
おわりに
今回はRxJavaにおけるSchedulerとsubscribeOn, observeOnの違いについてお話しました。
まだまだ知ることがあるのですが今日のところはここまでにします。
また次回にSchedulerについて学んだことを書き残していきたいと思います。