위 글은 코틀린 리액티브 프로그래밍책을 보며 내용을 정리한 내용입니다.
책 : 코틀린 리액티브 프로그래밍
리부 차크라보티 지음
조승진 옮김
핫, 콜드 옵저버블
Observables는 그 행동에 따라 두 가지 범주로 나눌 수 있다. 제목에서 알 수 있듯이 두 가지 범주는 핫 옵저버블과 콜드 옵저버블이다. 이 두가지에 대해 알아보자.
콜드 옵저버블
모든 예제에서 동일한 옵저버블을 여러 번 구독해도 모든 구독의 새로운 배출을 얻을 수 있다.
fun main(args :Array<String>){
val observable : Observable<String> = listOf("String 1","String 2","String 3","String 4").toObservable()
observable.subscribe({
println("Received $it")
},{
println("Error ${it.message}")
},{
println("Done")
})
observable.subscribe({
println("Received $it")
},{
println("Error ${it.message}")
},{
println("Done")
})
}

이 예제에서는 구독을 두번하였다. 결과는 모두 동일하였다.
옵저버블은 특징적인 기능을 갖고 있는데 각 구독마다 처음부터 아이템을 배출하는 것을 콜드 옵저버블이라고 한다.
더 구체적으로 설명하자면 콜드 옵저버블은 구독 시에 실행을 시작하고 subscriber가 호출되면 아이템을 푸시하기 시작하는데 각 구독에서 아이템의 동일한 순서를 푸시한다.
콜드 옵저버블은 데이터와 유사하다. SQLite나 Room 데이터베이스로 작업하는 동안은 핫 옵저버블 보다는 콜드 옵저버블에 더욱 많이 의존한다.
핫 옵저버블
콜드 옵저버블은 수동적이며 구독이 호출될 때까지 아무것도 내보내지 않는다. 핫 옵저버블은 콜드 옵저버블과 반대로, 배출을 시작하기 위해 구독할 필요가 없다.
콜드 옵저버블을 CD/DVD 레코딩으로 본다면 핫 옵저버블은 TV 채널과 비슷하다.
핫 옵저버블은 시청자가 시청하는지 여부에 관계없이 콘텐츠를 계속 브로드캐스팅한다.
핫 옵저버블은 데이터보다는 이벤트에 유사하다. 이벤트에는 데이터가 포함될 수 있지만 시간에 민감한 특징을 가지는데 최근에 가입한 Observer가 이전에 내보넨 데이터를 놓칠 수 있기 때문이다
이런 특징은 안드로이드/JavaFX/Swing등에서 UI 이벤트를 다룰 때 유용하다.
ConnectableObservable 객체의 소개
ConnectableObservable은 가장 유용한 핫 옵저버블 중 하나로, 핫 옵저버블의 좋은 예시이다.
ConnectableObservable은 옵저버블, 심지어 콜드 옵저버블을 핫 옵저버블로 바꿀 수 있다.
subscribe호출로 배출을 시작하는 대신에 connect 메서드를 호출한 후 활성화된다.
반드시 connect를 호출하기 전에 subscribe를 호출해야 한다.
connect를 호출한 후 구독하는 모든 호출은 이전에 생성된 배출을 놓치게된다.
fun main(args : Array<String>){
val connectableObservable = listOf<String>("String 1","String 2","String 3","String 4").toObservable().publish()
connectableObservable.subscribe({println("Subscription 1 : $it")})
connectableObservable.map(String::reversed).subscribe({println("Subscription 2 $it")})
connectableObservable.connect()
connectableObservable.subscribe({println("Subscription 3: $it")})
}

두번의 구독을 수행한 뒤 connect를 호출한다.
구독에 대한 결과가 Subscription 1이 모두 진행되고 2가 진행될줄 알았지만 순차적으로 진행하였다.
connect 메소드 이후의 구독은 무시된다.
Subjects
Hot Observables을 구현하는 또 다른 좋은 방법은 Subject이다.
그것은 기본적으로 옵저버블과 옵저버의 조합인데 두 가지 모두의 공통된 동작을 갖고 있기 때문이다.
핫 옵저버블과 마찬가지로 내부 Observer 목록을 유지하고 배출 시에 가입한 모든 옵저버에게 단일 푸시를 전달한다.
Subject가 제공하는 것을 살펴보자
- 옵저버블이 가져야 하는 모든 연산자를 갖고있다.
- 옵저버와 마찬가지로 배출된 모든 값에 접근할 수 있다.
- Subject가 완료/오류/구독 해지된 후에는 재사용 할 수없다.
- 가장 흥미로운 점은 그 자체로 가치를 전달한다는 것이다. 자세히 설명하자면 onNext를 사용해 값을 Subject측에 전달하면 Observable에서 접근 가능하게 된다.
fun main(args :Array<String>){
val observable = Observable.interval(100,TimeUnit.MILLISECONDS)
val subject = PublishSubject.create<Long>()
observable.subscribe(subject)
subject.subscribe({
println("Received $it")
})
runBlocking { delay(1100) }
}

Observabale.interval을 사용하였고 observable이 subject를 구독하고 subject에서 구독 방식을 정의하고 배출 하게된다.
fun main(args: Array<String>) {
val observable = Observable.interval(100, TimeUnit.MILLISECONDS)
val subject = PublishSubject.create<Long>()
observable.subscribe(subject)
subject.subscribe({
println("Subscription 1 Received $it")
})
runBlocking { delay(1100) }
subject.subscribe({
println("Subscription 2 Received $it")
})
runBlocking { delay(1100) }
}

Subscription 2는 0부터 시작하는것이 아닌 delay 1100ms가 지난후의 값인 11부터 시작하게 된다.
Subject는 콜드 옵저버블과 같이 행동을 반복하지 않는다.
Subject는 모든 옵저버에게 전달된 배출을 중계하고, 콜드 옵저버블을 핫 옵저버블로 변경시킨다.
다양한 구독자
앞에서 설명햇듯이 여러 종류의 구독자가 있다.
다음은 앞으로 살펴볼 가장 유용하고 중요한 구독자다.
- AsyncSubject
- PublishSubject
- BehaviorSubject
- ReplaySubject
AsyncSubject 이해
AsyncSubject는 수신 대기중인 소스 옵저버블을 마지막 값과 배출만 전달한다.
더 명확히 말하면 AsyncSubject는 마지막 값을 한번만 배출한다.
fun main(args : Array<String>){
val observable = Observable.just(1,2,3,4)
val subject = AsyncSubject.create<Int>()
observable.subscribe(subject)
subject.subscribe({
println("Received $it")
},{
it.printStackTrace()
},{
println("Complete")
})
subject.onComplete()
}

마지막에 호출된 Received 4값만 호출된다.
다음 예제도 보자.
fun main(args :Array<String>{
val subject2 = AsyncSubject.create<Int>()
subject2.onNext(1)
subject2.onNext(2)
subject2.onNext(3)
subject2.onNext(4)
subject2.subscribe({
println("S1 Received $it")
},{
it.printStackTrace()
},{
println("S1 Complete")
})
subject2.onNext(5)
subject2.subscribe({
println("S2 Received $it")
},{
it.printStackTrace()
},{
println("S2 Complete")
})
subject2.onComplete()
}

여기서도 두 Observable에대한 호출값도 모두 마지막 값만 전달하게 된다.
PublishSubject 이해
PublishSubject는 onNext 메서드 또는 다른 구독을 통해 값을 받았는지 여부에 관계없이 구독 시점에 이어지는 모든 값을 배출한다. 이미 PublishSubject를 적용해 봤으며 가장 많이 사용되는 Subject 변형이다.
BehaviorSubject 이해
BehaviorSubject는 멀티 태스킹으로 동작하는데 구독 전의 마지막 아이템과 구독 후 모든 아이템을 배출한다.
즉 내부 옵저버 목록을 유지하는 데 중복 전달 없이 모든 옵저버에게 동일한 배출을 전달한다.
fun main(args :Array<String>){
val subject = BehaviorSubject.create<Int>()
subject.onNext(1)
subject.onNext(2)
subject.onNext(3)
subject.onNext(4)
subject.subscribe({
println("S1 Received $it")
},{
it.printStackTrace()
},{
println("S1 Complete")
})
subject.onNext(5)
subject.subscribe({
println("S2 Received $it")
},{
it.printStackTrace()
},{
println("S2 Complete")
})
subject.onComplete()
}

첫번째 구독전에 호출된 onNext(4) 값과 그 후에 호출된 onNext(5)가 전달되며 두번째는 구독전에 호출된 onNext(5)만 호출된다.
fun main(args: Array<String>) {
val subject = ReplaySubject.create<Int>()
subject.onNext(1)
subject.onNext(2)
subject.onNext(3)
subject.onNext(4)
subject.subscribe({
println("S1 Received $it")
}, {
it.printStackTrace()
}, {
println("S1 Complete")
})
subject.onNext(5)
subject.subscribe({
println("S2 Received $it")
}, {
it.printStackTrace()
}, {
println("S2 Complete")
})
subject.onComplete()
}

마치 콜드 옵저버블처럼 모든 값을 호출한다.
'Study > RxKotlin' 카테고리의 다른 글
| [RxKotlin] 함수형 프로그래밍과 리액티브 프로그래밍 (0) | 2021.10.18 |
|---|---|
| [RxKotlin] 백프레셔와 플로어블 (0) | 2021.10.16 |
| [RxKotlin] 옵저버블과 옵저버와 구독자 (0) | 2021.10.16 |
| [RxKotlin] Coroutine과 모나드 (0) | 2021.10.11 |
| [RxKotlin] 함수형 프로그래밍과 코틀린의 특징 (0) | 2021.10.09 |