이 내용은 기존에 쓰던 책에서 적은내용이 아닌
함수형 코틀린 이라는 책에서 설명한 RxKotlin을 정리한 내용입니다.
FP와 OOP의 결합
FP와 OOP는 오래된 프로그래밍 패러다임이며, 각자만의 이점과 단점을 가진다.
OOP 시스템에서는 부수 효과를 피하기 어렵다. 또한 OOP 시스템은 종종 동시 프로그래밍의 악몽으로 불린다.
FP는 상태를 인식하지 않지만, 실생활에서 상태를 인식하지 않을 수 없다.
이런 모든 번거로움은 OOP를 FP와 결합/사용함으로써 피할 수 있다.
OOP와 FP를 결합하는 가장 일반적인 스타일은 작은 곳에서는 함수적이고, 큰 곳에서는 객체지향적이라고 할 수 있다.
OOP는 클래스와 인터페이스로 사용할 수 있고, FP는 메소드/함수작성과 같은 낮은 레벨에서 사용할 수 있다.
OOP와 FP가 혼합된다면 코틀린이 최고의 언어라고 믿는다.
함수형 리액티브 프로그래밍
함수형 리액티브 프로그래밍의 개념은 FP 패러다임과 리액티브 프로그래밍을 결합해나타냈다.
리액티브 프로그래밍은 데이터 스트림과 변화의 전파를 중심으로 하는 비동기 프로그래밍 패러다임이다.
간단히 말하자면 데이터/데이터 스트림에 영향을 주는 모든 변경점을 관련된 당사자에게 전파하는 프로그램을 리액티브 프로그램으로 부른다.
리액티브 매니페스토
리액티브 매니페스토는 다음과 같은 네가지 리액티브 원리를 정의하는 문서다.
- 반응(Responsive) : 시스템은 적시에 응답한다. 반응 시스템은 신속하고 일관된 응답 시간을 제공하는 데 주력하므로 일관된 서비스 품질을 제공한다.
- 복원(Resilient) : 시스템이 장애와 마주쳐도 응답을 유지한다. 실패는 각 컴포넌트 내에 포함되 각자로부터 컴포넌트를 분리하므로 컴포넌트에서 실패가 발생할 때 다른 컴포넌트나 전체 시스템에 영향을 미치지 않는다.
- 탄력(Elastic) : 리액티브 시스템은 변화에 반응하고 다양한 작업 부하에서 반응성을 유지한다.
- 메시지 중심(Message-driven) : 탄력성 원칙을 수렵하기 위해서는 리액티브 시스템이 비동기 메시지 전달에 의존해 컴포넌트 간의 경계를 설정해야 한다.
앞의 네 가지 원칙을 구현하는 것으로 시스템은 좀 더 신뢰성 있고 응답성이 높아지므로 리액티브가 된다.
RxJava 푸시 메커니즘과 풀 메커니즘
RxKotlin은 푸시 메커니즘을 위한 실제 이벤트 시스템을 표시하는 Observable 타입을 중심으로 하므로, 지연적이며 동기 및 비동기 모두에 사용할 수 있다.
fun main(args : Array<String>){
var list: List<Any> = listOf(1,"둘",3,"넷","다섯",5.5f)
var iterator = list.iterator()
while(iterator.hasNext()){
println(iterator.next())
}
}

결과는 다음과 같다.
리스트를 만들고 반복자를 통해 하나씩 출력하였다.
여기서 주의해야 할 점은 데이터가 수신되고 준비될 때까지 현재 스레드가 차단되는 동안 리스트에서 데이터를 가져온다는 것이다.
단순히 목록 대신 네트워크 콜/데이터베이스 쿼리로부터 데이터를 가져온다고 생각해보자. 이 경우 스레드는 얼마나 블록될까? 이런 작업을 위한 별도의 스레드를 만들 수 있지만 복잡성이 증가한다.
Rx프레임워크의 빌딩 블록은 observable이다. Observable클래스는 Iterator와 반대다. 이것은 소비자가 소비할 수 잇는 값을 생성하는 컬렉션 혹은 계산이 있다. 그러나 차이점은 Iterator 패턴과 같이 소비자가 생산자로부터 값을 가져오지 않는다는 것이다. 대신 생산자가 소비자에게 값을 알림으로 푸시한다.
이번에는 observbale예제이다.
fun main(args : Array<String>){
var list = listOf(1,"둘",3,"넷","다섯",5.5f)
var observable = list.toObservable()
observable.subscribe({
//onNext ->
println(it)
},{
//onError ->
it.printStackTrace()
},{
//onComplete ->
println("완료")
})
}

이 프로그램의 출력은 이전과 같다. 리스트의 모든 아이템을 출력한다. 차이점은 접근 방식이다. 실제로 어떻게 작성하는지 살펴보자
- 리스트를 만든다.
- Observable 인스턴스가 리스트에 의해 생성된다.
- 옵저버를 구독한다.
observable 변수가 구독할 경우 각 데이터는 준비가 되면 onNext로 푸시된다. 모든 데이터가 푸시되면 onComplete를 호출하고, 에러가 발생하면 onError를 호출한다.
이 Observable 인스턴스를 사용해 비동기 스트림을 구축하고 구독자에게 데이터 업데이트를 푸시한다. 이는 리액티브 프로그래밍 패러다임의 간단한 구현이다. 데이터는 모든 관심 있는 곳에 전파된다.
Observbables
앞에서 언급했듯이 리액티브 프로그래밍에서 Observable은 소비자(Observer)가 소비할 수 있는 값을 생산하는 계산을 가진다. 여기서 가장 중요한 것은 소비자(Observer)가 여기서 값을 가져오지 않는다는 점이다. Observable은 값을 소비자에게 푸시한다. 따라서 Observble 인터페이스는 푸시 기반의 구성 가능한 반복자로 일련의 연산을 통해 아이템을 최종 Observer로 보내 최종적으로 아이템을 소비한다고 할 수 있다. 순차적으로 끊어보자
- Observer가 Observable을 구독한다.
- Observable이 그 안에 있는 아이템을 방출하기 시작한다.
- Observer가 Observable이 방출하는 모든 아이템에 반응한다.
Observable 의 작동 방식
Observable 값은 다음과 같이 가장 중요한 이벤트/메소드 세가지를 가진다.
- onNext : Observable 인터페이스는 모든 아이템을 이 메소드로 전달한다.
- onComplete : 모든 아이템이 onNext 메소드를 통과하면 Observable은 onComplete을 호출한다.
- onError : Observable이 에러를 만나면 에러를 다루기 위해 onError 메소드를 호출한다.
여기서 주목해야 할 점은 Observable의 아이템은 무엇이든지 될 수 있다는 점이다.
fun main(args: Array<String>) {
val observer = object : Observer<Any> {
override fun onComplete() {
println("모두 완료됨")
}
override fun onNext(t: Any) {
println("다음 $t")
}
override fun onError(e: Throwable) {
println("에러 발생 $e")
}
override fun onSubscribe(d: Disposable) {
println("$d 구독됨")
}
}
val observable = listOf(1, "둘", 3, "넷", "다섯", 5.5f).toObservable()
observable.subscribe(observer)
val observableOnList =
Observable.just(
listOf(1, "둘", 3, "넷", "다섯", 6.0f), listOf("아이템이 1개인 리스트"),listOf(1,2,3)
)
observableOnList.subscribe(observer)
}

Observer 인터페이스는 내부에 4개의 메소드가 선언돼 있다.
onComplete()메소드는 Observable이 에러 없이 모든 아이템을 완료하면 방출한다.
onNext()메소드는 방출해야 하는 각 아이템의 observable 값에 의해 호출된다.
onError() 메소드는 에러를 만났을때 호출된다.
onSubscribe() 메소드는 Observer가 Observable을 구독할때 마다 호출된다.
Observable.create 메소드
언제든지 Observable.create 메소드를 사용해 Observable의 커스텀 구현을 만들 수 있다. 이 메소드는 관찰할 소스로 ObservableEmitter<T> 인터페이스의 인스턴스를 가진다. 다음 코드 예제를 살펴보자.
fun main(args: Array<String>) {
val observer = object : Observer<String> {
override fun onComplete() {
println("모두 완료됨")
}
override fun onNext(t: String) {
println("다음 $t")
}
override fun onError(e: Throwable) {
println("에러 발생 ${e.message}")
}
override fun onSubscribe(d: Disposable) {
println("새로운 구독")
}
}
val observable: Observable<String> = Observable.create<String> {
it.onNext("방출됨 1")
it.onNext("방출됨 2")
it.onNext("방출됨 3")
it.onNext("방출됨 4")
it.onComplete()
}
observable.subscribe(observer)
val observable2: Observable<String> = Observable.create<String> {
it.onNext("방출됨 1")
it.onNext("방출됨 2")
it.onError(Exception("My Exception"))
}
observable2.subscribe(observer)
}
여기서 특이한점은 Observable의 인스턴스를 만들때 람다 안에 observer의 메소드를 호출한다는 점이다.
Observable.create메소드는 특히 커스텀 데이터 구조로 작업하고 어떤 값이 방출되는지 제어하려는 경우에 유용하다.
다른 스레드에서 observer로 값을 방출할 수도 있다.
Observable.from 메소드
Observable.from 메소드는 Observable.create 메소드보다 비교적 간단하다. 메소드의 도움으로 거의 코틀린 구조에서 Observable 인스턴스를 만들 수 있다.
fun main(args : Array<String>){
val observer: Observer<String> = object : Observer<String> {
override fun onComplete() {
println("완료됨")
}
override fun onSubscribe(d: Disposable) {
println("구독")
}
override fun onNext(t: String) {
println("받음 -> $t")
}
override fun onError(e: Throwable) {
println("에러 발생 ${e.message}")
}
}
val list = listOf("Str 1","Str 2","Str 3","Str 4")
val observableFromIterable : Observable<String> = Observable.fromIterable(list)
observableFromIterable.subscribe(observer)
val callable = object : Callable<String> {
override fun call(): String {
return "Callable에서 왔다."
}
}
val observableFromCallable: Observable<String> = Observable.fromCallable(callable)
observableFromCallable.subscribe(observer)
val future : Future<String> = object : Future<String> {
override fun cancel(p0: Boolean): Boolean {
return false
}
override fun isCancelled(): Boolean {
return false
}
override fun isDone(): Boolean {
return true
}
override fun get(): String {
return "Hello From Future"
}
override fun get(p0: Long, p1: TimeUnit?): String {
return "Hello From Future"
}
}
val observableFromFuture: Observable<String> = Observable.fromFuture(future)
observableFromFuture.subscribe(observer)
}

fromXXX() 메소드로 특정 인스턴스를 옵저버블로 변환 가능하다.
Iterator<T>.toObservable()
코틀린의 확장 함수 덕분에 list와 같은 반복 가능한 인스턴스를 많은 노력 없이 Observable로 전환할 수 있다.
fun main(args : Array<String>){
val observer: Observer<String> = object : Observer<String> {
override fun onComplete() {
println("완료됨")
}
override fun onSubscribe(d: Disposable) {
println("구독")
}
override fun onNext(t: String) {
println("받음 -> $t")
}
override fun onError(e: Throwable) {
println("에러발생 -> ${e.message}")
}
}
val list : List<String> = listOf("Str 1", "Str 2","Str 3","Str 4")
val observable : Observable<String> = list.toObservable()
observable.subscribe(observer)
}

toObservable 메소드는 내부적으로 들어가보면 Observable.from 메소드를 사용한다. 코틀린의 확장 함수 덕분이다.
Subscriber : Observer 인터페이스
Observer 타입은 onNext, onError, onComplete, onSubscribe의 네 가지 메소드가 있는 인터페이스다. 앞서 언급했듯이 Observable을 Observer로 연결할 때 Observer 타입의 네 메소드를 찾아 호출한다. 다음은 네 메소드에 대한 간단한 설명이다.
- onNext : Observable은 아이템을 하나씩 전달하기 위해 Observer의 이 메소드를 호출한다.
- onComplete : onNext 메소드로 아이템을 넘기는 것이 끝났다고 Observable이 알리고 싶을 때 Observer의 onComplete 메소드를 호출한다. 다만 에러가 발생하면 이 메소드는 호출되지않는다.
- onError : Observer가 에러를 만나면 Observer 타입에 정의된 에러를 다루기 위해 onError를 호출한다. 정의되지 않은 에러는 예외를 발생시킨다.
- onSubscribe : 이 메소드는 새로운 Observable이 Observer를 구독할 때마다 호출된다.
구독 및 폐기
Observable과 Observer 타입이 있다. 이 두 타입을 어떻게 연결해야 할까?
Observable과 Observer는 입력 장치및 컴퓨터와 같다. 둘을 연결할 무언가가 필요하다.
subscriber 연산자는 Observable 인터페이스를 Observer에 연결해 미디어 용도로 사용된다.
Observable 인터페이스를 Observer와 연결하기 위해서 세 메소드를 subscriber 연산자와 연결하거나 Observer 인터페이스의 인스턴스를 subscribe 연산자와 연결할 수 있다.
fun main(args : Array<String>){
val observable : Observable<Int> = Observable.range(1,5)
observable.subscribe({
println("다음 -> $it")
},{
println("에러 -> ${it.message}")
},{
println("완료")
})
val observer : Observer<Int> = object : Observer<Int> {
override fun onComplete() {
println("모두 완료됨")
}
override fun onNext(t: Int) {
println("다음 -> $t")
}
override fun onError(e: Throwable) {
println("에러 발생-> ${e.message}")
}
override fun onSubscribe(d: Disposable) {
println("새로운 구독")
}
}
observable.subscribe(observer)
}

이제 Observer에 Observable을 연결(구독)하는 방법은 알았다. 그러면 구독 기간 후에 방출을 멈추고 싶다면 어떻게 해야할까?
onSubscribe 메소드의 파리미터에 있는 Disposable의 인스턴스를 얻어 해지시켜주면 된다.
해지
fun main(args: Array<String>) {
val observable: Observable<Long> = Observable.interval(
100,
TimeUnit.MILLISECONDS
)
val observer: Observer<Long> = object : Observer<Long> {
lateinit var disposable: Disposable
override fun onSubscribe(d: Disposable) {
disposable = d
}
override fun onNext(item: Long) {
println("받음 $item")
if (item >= 10 && !disposable.isDisposed) {
disposable.dispose()
println("정리됨")
}
}
override fun onError(e: Throwable) {
println("에러 ${e.message}")
}
override fun onComplete() {
println("완료")
}
}
runBlocking {
observable.subscribe(observer)
delay(1500)
}
}

여기서 observable을 정의할때 interval(time, format)을 설정하였는데 이것은 시간마다 0부터 순차적으로 정수를 내보내는 observable이다.
여기서 10까지 받은경우에 disposable을 dispose해준다. 이렇게 구독을 해지할 수 있다.
'Study > RxKotlin' 카테고리의 다른 글
| [RxKotlin] 백프레셔와 플로어블 (0) | 2021.10.16 |
|---|---|
| [RxKotlin] 핫, 콜드 옵저버블과 Subjects (0) | 2021.10.16 |
| [RxKotlin] 옵저버블과 옵저버와 구독자 (0) | 2021.10.16 |
| [RxKotlin] Coroutine과 모나드 (0) | 2021.10.11 |
| [RxKotlin] 함수형 프로그래밍과 코틀린의 특징 (0) | 2021.10.09 |