0. 시작하기 전에
RxJava의 연산자를 제대로 이해하기 위해서는 마블 다이어그램에 대해 알아야 한다.
아래 마블 다이어그램을 한 번 보자.
위 마블 다이어그램에 대해 완벽하게 이해하고 있다면 이 포스팅을 읽지 않아도 된다.
만약 그렇지 않다면, 이 포스팅을 읽고 난 뒤에는 완벽하게 이해할 수 있을 것이다.
1. 연산자 소개
(1) subscribeOn()
subscribeOn 연산자는 observableSource가 subscribe 되었을 때,
데이터를 다음 스트림으로 전달하는 액션을 수행하는 스케줄러를 지정한다.
간단하게 말해서, subscribeOn 연산자는 "Observable이 동작을 시작할 스레드"를 지정한다.
추가적으로 subscribeOn 연산자를 사용할 때 알아둬야 할 사항 몇가지에 대해 설명하자면,
- 어느 위치에 작성하던지 Observable은 해당 스레드에서 시작하고 업/다운스트림 모두에 적용되며,
observeOn을 만나면 새로 지정된 스레드에서 연산을 수행한다. - 최초 1번 호출된 이후부터는 다시 호출해도 ignore된다.
시작스레드를 또다시 설정할 필요가 없으며, 다운스트림 처리에는 observeOn이라는 좋은 연산자가 있기 때문이다.
간단한 예시 코드를 통해 해당 연산자가 적용되는 방식을 배워보자.
아래 코드는 subscribeOn이 업/다운스트림 모두에 적용된다는 간단한 예시이다.
getUserAge부터 subscribe까지 모두 io 스레드에서 실행된다.
getUserAge부터 io 스레드에서 실행되는것 = Observable이 시작할 스레드를 subscribeOn으로 정했기 때문이다!
subscribeOn 예제 코드 (1)
private void subscribeOnExample() {
Observable<Integer> source = getUserAge() // (2) io 스레드 | getUserAge
.map(Integer::parseInt) // (3) io 스레드 | map
.subscribeOn(Schedulers.io()) // (1) Observable이 io 스레드에서 생성되고 작동하도록 지정
.map(age -> age - 1); // (4) io 스레드 | map
source.subscribe(LogUtil::it); // (5) io 스레드 | subscribe
}
LogUtil 클래스
public class LogUtil {
private static final String TAG = LogUtil.class.getSimpleName();
public static void it(Object object) {
long time = System.currentTimeMillis() - CommonUtils.sStartTime;
System.out.println(Thread.currentThread().getName() + " | " + time + " | " + object);
}
}
(1-1) subscribeOn()을 호출하는 위치는 어디가 좋을까?
앞서 말했듯, subscribeOn 연산자는 업/다운스트림 모두에게 적용된다.(이후에 observeOn이 호출되지 않는다면)
시작 스레드를 지정하는 것이라면 어디에서 호출해도 상관없지 않나요? 라고 생각할 수 있다.
하지만 코드에도 개연성과 직관성이 있어야 좋은 코드라고 할 수 있는 법이다.
아래는 아무 곳에서나 subscribeOn()을 호출한 코드이다.
subscribeOn 예제 코드 (2)
private void rxjavaExample() {
Observable<Integer> source = getUserData() // (2) main | getUserData
.doOnNext(d -> LogUtil.it("String value : " + d))
.observeOn(Schedulers.computation()) // (3) 다운스트림 연산을 computation 스레드에서 수행하도록 지정
.map(Integer::parseInt) // (4) computation | map
.doOnNext(d -> LogUtil.it("int value : " + d))
.subscribeOn(Schedulers.io()) // (1) Observable이 io 스레드에서 작동을 시작하도록 지정
.observeOn(AndroidSchedulers.mainThread()); // (5) 다운스트림 연산을 main 스레드에서 수행하도록 지정
source.subscribe(LogUtil::it); // (6) main | subscribe
}
Log
I/System.out: RxCachedThreadScheduler-1 | 134 | String value : 1
I/System.out: RxCachedThreadScheduler-1 | 135 | String value : 3
I/System.out: RxCachedThreadScheduler-1 | 138 | String value : 5
I/System.out: RxComputationThreadPool-1 | 136 | int value : 1
I/System.out: RxComputationThreadPool-1 | 144 | int value : 3
I/System.out: RxComputationThreadPool-1 | 144 | int value : 5
위 코드를 보면, getUserData와 map 연산을 마친 후에 subscribeOn을 호출했지만,
그 효과에 대한 적용은 첫 줄인 getUserData()에서 적용되었다.
아래 그림은 https://reactivex.io/documentation/operators/subscribeon.html 사이트의 subscribeOn 마블 다이어 그램이다.
위 그림 또한 map 함수 호출 이후에 subscribeOn이 호출된걸로 그려져있지만,
Observable의 시작은 subscribeOn이 지정한 스레드에서 시작했다. 그럼 제일 처음 호출되는 함수임에도
제일 마지막줄에 코딩해도 괜찮을까?
subscribeOn 함수는 "처음, 중간, 끝 어디에서 호출해도 시작 스레드를 지정하는 것이기 때문에 영향을 받지 않는다"
하지만 이 말이 "해당 함수를 아무곳에서나 호출해도 된다"는 것을 의미하지는 않는다. (실행하는데에는 문제 없다 ㅎ;)
하지만, 개발자가 "io 스레드에서 userData를 가져와서, computation 스레드에서 데이터를 가공한 뒤에, 메인 스레드에서 UI를 갱신해" 라는 의도로 작성한 코드라면
그 코드 또한 저 생각의 흐름대로 작성하는 것이 직관적이고, 옳다고 생각한다.
따라서,
- observeOn 호출 O -> 제일 첫 줄에 데이터를 불러오는 부분 직후에서 호출
- observeOn 호출 X -> 제일 첫/마지막 줄에서 호출
위와 같이 작성하는 것이 최선이라고 생각한다.
(2) observeOn()
observeOn 연산자는 Observable이 관찰자에게 알림을 보내는데 사용할 다른 스레드를 지정한다.
- observeOn 호출 이후 수행되는 스트림(다운스트림)의 액션을 수행하는 스케줄러를 지정한다.
- 한 번 호출하면 이후 호출을 무시하는 subscribeOn과 다르게, observeOn은 여러번 호출해서 새로운 스레드에 작업을 할당할 수 있다.
private void observeOnExample() {
Observable<Integer> source = getUserAge() // (1) main 스레드 | getUserAge
.map(Integer::parseInt) // (2) main 스레드 | map
.observeOn(Schedulers.io()) // (3) 다운스트림 연산을 io 스레드에서 수행하도록 지정
.map(age -> age - 1) // (4) io 스레드 | map
.observeOn(AndroidSchedulers.mainThread()); // (5) 다운스트림 연산을 main 스레드에서 수행하도록 지정
source.subscribe(LogUtil::it); // (6) main 스레드 | subscribe
}
observeOn은 설명보다, 예제를 통해 여러 위치에서 호출해보고 로그를 찍어서 의도한 스레드에서 동작하는지 확인해본다면 훨씬 효과적으로 이해할 수 있다.
2. 예제를 통한 이해
(1) subscribeOn() 의 중복 호출
private void rxjavaExample() {
String[] effectiveAndroid = {"RxJava", "Dagger", "Retrofit", "Jetpack"};
Flowable<String> blog = Flowable.fromArray(effectiveAndroid)
.doOnNext(LogUtil::it)
.subscribeOn(Schedulers.io()) // io 스레드 지정
.map(skill -> skill + " let's go")
.subscribeOn(Schedulers.computation()) // computation 스레드 지정, ignored
.zipWith(Flowable.interval(100L, TimeUnit.MILLISECONDS),
(s, t) -> s)
.subscribeOn(Schedulers.trampoline()); // trampoline 스레드 지정, ignored
blog.subscribe(LogUtil::it);
}
subscribeOn은 Observable이 활동을 시작할 스레드를 지정하는 연산자이기 때문에 여러 번 호출해도 최초 1번의 호출만 적용되며, 이후 모든 호출은 ignore 된다.
위 예제 코드에서는 io 스레드를 지정한 subscribeOn 연산자만 적용된다.
아래는 위 코드에 대한 로그이다.
I/System.out: RxCachedThreadScheduler-1 | 90 | RxJava
I/System.out: RxCachedThreadScheduler-1 | 91 | Dagger
I/System.out: RxCachedThreadScheduler-1 | 91 | Retrofit
I/System.out: RxCachedThreadScheduler-1 | 92 | Jetpack
I/System.out: RxComputationThreadPool-1 | 183 | RxJava let's go
I/System.out: RxComputationThreadPool-1 | 284 | Dagger let's go
I/System.out: RxComputationThreadPool-1 | 383 | Retrofit let's go
I/System.out: RxComputationThreadPool-1 | 485 | Jetpack let's go
fromArray는 지정된 io 스레드에서 실행되었으며, doOnNext로 지정된 스레드에서 실행되었는지 로그를 출력하도록 했다.
이후 computation, trampoline 스레드의 호출은 ignore된 것을 볼 수 있다.
만약 해당 스레드에서 동작하도록 수정하려면, subscribeOn 대신 observeOn을 호출하면 된다.
edit : zipWith 함수는 기본적으로 computation 스레드에서 동작하기 때문에 Log에 ComputationThread가 출력된다.
(2) 올바른 사용
private void rxjavaExample() {
Observable<String> study = getSkills()
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.filter(val -> val.startsWith("android"))
.map(skill -> skill + " study!")
.zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS),
(s, t) -> s);
study.observeOn(AndroidSchedulers.mainThread())
.subscribe(this::updateUi);
}
private Observable<String> getSkills() {
String[] effectiveAndroid =
{"android-RxJava", "android-Dagger", "android-Retrofit", "android-Jetpack",
"RxSwift", "JavaScript", "Python"};
return Observable.fromArray(effectiveAndroid);
}
private void updateUi(String str) {
mTextView.setText(str);
}
(0) subscribeOn 함수로 Observable을 io 스레드에서 시작하도록 지정
(1) (io 스레드) getSkills() 메서드로 데이터 가져옴
(2) observeOn 함수로 다운스트림 연산을 computation 스레드에서 수행하도록 지정
(3) (computation 스레드) filter-map-zipWith 연산 수행
(4) observeOn 함수로 다운스트림 연산을 Android 메인 스레드에서 수행하도록 지정
(5) (main 스레드) subscribe 함수로 updateUi 수행
edit: 안드로이드에서 UI 관련 작업은 무조건 MainThread에서 수행되어야 한다.
오늘은 RxJava의 최대 장점인 비동기처리에 사용되는 subscribeOn과 observeOn에 대해 알아보았다.
기존 Java에서 하던 비동기 처리를 생각해보면, RxJava라는 라이브러리가 얼마나 효율적이고 개발자 친화적인지 느끼게 된다.
하지만 사용하기 쉽고, 높은 퍼포먼스를 낼 수 있는만큼 모든 연산자에 대해서 제대로 이해하고
최선의 효율을 낼 수 있도록 코딩하는 것이 무엇보다 중요하다.
edit: 본 포스팅은 "RxJava 프로그래밍" 책을 2회독한 후, 책의 내용을 그대로 옮겨적는 것이 아닌 필자가 직접 공부하며 체득한 내용을 적은 주관적인 포스팅이다. 다른 어떤 포스팅보다 객관적이고 형식적이지는 않지만, 읽는 사람 입장에서 이해하기 쉽도록 최대한 풀어 썼으나 중간중간 잘못된 정보가 있으니 만약 있다면 댓글로 남겨주시면 감사하겠습니다.
'개발 > Android' 카테고리의 다른 글
[안드로이드] ViewModel에서 context를 사용하는 방법 (1) | 2023.05.10 |
---|---|
[안드로이드] AAC ViewModel과 MVVM ViewModel (0) | 2023.03.26 |
[안드로이드] Glide를 알아보자 (0) | 2023.01.12 |
Repository 패턴을 사용하는 이유 (0) | 2022.09.07 |
[안드로이드] START_STICKY & START_NOT_STICKY (0) | 2022.05.24 |