概要
そもそも ReactiveX とは何かという話ですが簡単に言えば Ruby においては Enumerable の拡張という感じです
簡単に言い過ぎているので詳細は公式などを見てほしいのですが、例えば普通の Ruby の配列 Array
を Rx::Observable
な配列として扱うことで使いやすくすることができます
今回は RxRuby サンプルを動かしつつ ReactiveX を理解してみました
環境
- macOS 10.14
- Ruby 2.5.1p57
- rx 0.0.3
ライブラリインストール
RxRuby という gem が公開されているのでこれを使います
- bundle init
- vim Gemfile
gem "rx"
bundle install --path vendor
とりあえず動かしてみる
サンプルにあるものをとりあえず動かしてみましょう
Rx::Observable
な配列を作成して zip を使って各要素を結合します
vim rx1.rb
require 'rx'
a1 = Rx::Observable.from_array [1, 2, 3]
a2 = Rx::Observable.from_array [4, 5, 6]
sub = a1.zip(a2).subscribe(
lambda { |x|
p x
},
lambda { |e|
p e
},
lambda {
p "end"
}
)
これで bundle exec ruby rx1.rb
として実行すると
[1, 4]
[2, 5]
[3, 6]
"end"
という感じで表示されると思います
普通に zip してみる
もし Rx::Observable
な配列を使わない場合は以下のようなるかと思います
- vim nrx1.rb
a1 = [1, 2, 3]
a2 = [4, 5, 6]
r = a1.zip(a2)
r.each { |rt| p rt }
これでも意図した結果を得ることができます
何が違うか
この 2 つのサンプルで大きく違うのは各要素を結合した際の経過を監視 (observe) することがでている部分です
Rx::Observable
を使った場合 zip
メソッドを subscribe
し各要素が結合された際にコールされる lambda
を定義することができます
またこの 3 つの lambda は Ruby だけではなく ReactiveX の Observable として仕様が決まっており onNext
, onError
, onCompleted
の 3 つ流れを受け取ることができます
それぞれ結合が「成功した場合」「失敗した場合」「終了した場合」にコールされます
このように結合の処理ごとにその状況を監視することが Rx::Observable
を使う大きなメリットかなと思います
ちなみにここでとりあげた zip
オペレーションは ReactiveX のページでこういう風に実装しろというのがしっかりと定義されています
各言語の ReactiveX 用のライブラリは基本的にこれらを参考に書くオペレーションが実装されています
また Rx::Observable
の処理は非同期処理になります
なので subscribe
の処理の完了を待たずに次の処理に進むので注意が必要です
その他のサンプルを動かしてみる
ReactiveX には zip 以外にも様々なオペレータが用意されています
すべて紹介するのは厳しいのでよく使われそうなものを紹介します
timer
vim rx_timer.rb
require 'rx'
t = Rx::Observable.timer(3, 1)
s = t.time_interval().pluck('interval').take(10)
sub = s.subscribe(
lambda { |x|
p x
},
lambda { |e|
p e
},
lambda {
p "end"
}
)
while Thread.list.size > 1
(Thread.list - [Thread.current]).each &:join
end
3 秒待ってから 1 秒ごとに 10 回カウントアップするタイマー処理になります
Rx::Observable.timer(3, 1)
の部分がスタートしてから 3 秒待つ処理とインターバルの秒数を 1 秒ごとに設定している部分です
ここを変更すればスタートまでのウェイトとインターバルの秒数を変更できます
take(10)
で 10 回カウントアップします
カウントアップはインターバルで指定した秒数待ってからカウントされます
同じようなオペレーションで interval というものもあります
flatMap
flatMap は配列内の各要素順番に処理し最終的に 1 つの配列としてまとめる処理です
vim rx_flat_map.rb
require 'rx'
times = [
{ value: 0, time: 0.1 },
{ value: 1, time: 0.6 },
{ value: 2, time: 0.4 },
{ value: 3, time: 0.7 },
{ value: 4, time: 0.2 }
]
s = Rx::Observable.from(times).flat_map { |item|
Rx::Observable.of(item[:value]).delay(item[:time])
}
sub = s.subscribe(
lambda {|x|
p x
},
lambda {|e|
p e
},
lambda {|r
p "end"
})
while Thread.list.size > 1
(Thread.list - [Thread.current]).each &:join
end
上記のサンプルはいわゆる遅延インサート的なことを実現しています
Rx::Observable.of(item[:value])
は値をそのまま onNext
に流す処理なのですが .delay(item[:time])
と組み合わせことで指定した時間遅延させてから処理を実行することができます
なので出力される順番が 0 から 4 の順番ではなく time で指定した値が小さい方から出力されることになります
0
4
2
1
3
"end"
こんな感じで複数の Observable
と組み合わせて配列の中の値を処理したい場合には flatMap を使うのが便利です
reduce
reduce は各要素の前後を順番に取り出し処理するためのオペレータです
例えば普通の Ruby で配列の各要素を足し合わせる場合には sum を使います
a = [1, 2, 3, 4, 5]
p a.sum
# => 15
これを RxRuby で書き換えると以下のように書けます
vim rx_reduce.rb
require 'rx'
ra = Rx::Observable.from_array [1, 2, 3, 4, 5]
s = ra.reduce(0) { |x, y|
x + y
}
s.subscribe(
lambda {|x|
p x
},
lambda {|e|
p e
},
lambda {
p "end"
}
)
reduce(0)
とすることで初期値を 0 に設定します
そして取り出す 2 つの要素を x, y で受け取り加算します
加算した値が次の x に代入されます
なので今回の加算の流れは以下のようになります
- x=1, y=2 => 3
- x=3, y=3 => 6
- x=6, y=4 => 10
- x=10, y=5 => 15
最後に
Ruby の RxRuby を使って ReactiveX に入門してみました
実際に比較して動かしてみることで使い方やメリットが理解できるかなと思います
リファレンスが Web 上になさそうなのでメソッドなどの詳細はコードを直接見るしかなさそうです
今回紹介したオペレータはほんの一部です
ReactiveX はこのオペレータたちを如何に使いこなすかがポイントなので使いこなすにはオペレータの使い方と挙動を覚える必要があるかなと思います
また Subject と呼ばれる Observer と Observable の 2 つの性能を持った機能も存在します
基本的に ReactiveX は「使うべきケース」「使えるケース」が決まっているかなと思っています
何でもかんでも ReactiveX で書くのは良くないと思います
既存のコードで配列処理を扱う部分がありその処理の内容を逐次監視したい場合などに使ったり、タイマー処理を時系列で眺めたい場合などには適しているかなと思います
また基本は非同期なので要素それぞれに対して処理したい場合には適していますが、結合結果をメイン側で受け取って何かしたい場合には適していないと思います
その辺りのユースケースも先駆者の経験からいろいろと Web を調べると出てくるので参考にしてみるといいかもしれません
ただ RxRuby のユースケースは少ない感じがするので、その場合は他の言語の適用ケースを見ると良いとかと思います
0 件のコメント:
コメントを投稿