Study/RxKotlin

[RxKotlin] 옵저버블과 옵저버와 구독자

hegunhee 2021. 10. 16. 20:27

위 글은 코틀린 리액티브 프로그래밍책을 보며 내용을 정리한 내용입니다.
책 : 코틀린 리액티브 프로그래밍
리부 차크라보티 지음
조승진 옮김

 

옵저버블


옵저버블과 구독자는 리액티브 프로그래밍의 기반을 이룬다. 이번 글에서는 Observables, Observers, subjects에 대해 알아보자.옵저버블(Observable)은 컨슈머(Observer)가 소비할 수 있는 값을 산출해 내는 기본 작업을 갖고 있다.여기서  가장 중요한 것은 컨슈머(Observer)가 값을 풀(Pull) 방식을 사용해 접근하지 않는다는 점이다.오히려 옵저버블은 컨슈머에게 값을 푸시(Push)하는 역할을 한다.따라서 옵저버블은 일련의 연산자를 거치는 아이템을 최종 옵저버로 내보내는 푸시 기반의 조합 가능한 이터레이터이다.순차적으로 접근해보자.

  • 옵저버는 옵저버블을 구독한다.
  • 옵저버블이 그 내부의 아이템들을 내보내기 시작한다.
  • 옵저버는 옵저버블에서 내보내는 모든 아이템에 반응한다.

옵저버블이 onNext, onComplete, onError같은 이벤트 메서드를 통해 작동하는 방법을 살펴보자.

 

옵저버블이 동작하는 방법


옵저버블은 세 가지 중요한 이벤트 메서드를 갖고 있는데 이를 하나씩 자세히 이야기해보자.

  • onNext : 옵저버블은 모든 아이템을 하나씩 이 메서드에 전달한다.
  • onComplete : 모든 아이템이 onNext 메서드를 통과하면 옵저버블은 onComplete메서드를 호출한다.
  • onError : 옵저버블에서 에러가 발생하면 onError 메서드가 호출되 정의된대로 에러를 처리한다. onError가 호출되었을경우 onComplete가 호출되지 않으며. 반대의 경우도 마찬가지다.

1. 오류가 발생하지 않았을경우 : 모든 아이템이 처리될때까지 onNext()  -> onComplete()

2. 오류가 발생했을 경우 : onNext() -> 에러발생 -> onError()

코드를 통해 더 잘 이해해보자

fun main(args: Array<String>) {
    val observer: Observer<Any> = object : Observer<Any> {
        override fun onComplete() {
            println("All Complete")
        }

        override fun onSubscribe(d: Disposable) {
            println("Subscribed to $d")
        }

        override fun onNext(t: Any) {
            println("Next $t")
        }

        override fun onError(e: Throwable) {
            println("Error Occured $e")
        }
    }

    val observable: Observable<Any> =
        listOf("One", 2, "Three", "Four", 4.5, "Five", 6.0f).toObservable()

    observable.subscribe(observer)

    val observableOnList: Observable<List<Any>> = Observable.just(
        listOf("One", 2, "Three", "Four", 4.5, "Five", 6.0f),
        listOf("List with Single Item"),
        listOf(1,2,3,4,5,6))
    observableOnList.subscribe(observer)
}

 

첫번째 subscribe가 이뤄졌을때 One부터 6.0까지 모든 아이템이 처리되고 onComplete 함수가 호출된다.

 

Observable.create 메서드 이해


언제든지 Observable.create 메서드로 옵저버블을 직접 생성할 수 있다. 이 메서드는 관찰 대상자를 ObservableEmitter<T> 인터페이스의 인스턴스를 직접 입력받는다. 예제로 알아보자.

fun main(args :Array<String>){
    val observer: Observer<String> = object : Observer<String> {
        override fun onComplete() {
            println("All Complete")
        }

        override fun onSubscribe(d: Disposable) {
            println("Subscribed to $d")
        }

        override fun onNext(t: String) {
            println("Next $t")
        }

        override fun onError(e: Throwable) {
            println("Error Occured ${e.message}")
        }
    }

    val observable : Observable<String> = Observable.create<String>{
        it.onNext("Emit 1")
        it.onNext("Emit 2")
        it.onNext("Emit 3")
        it.onNext("Emit 4")
        it.onComplete()
    }

    observable.subscribe(observer)

    val observable2 : Observable<String> = Observable.create{
        it.onNext("Emit 1")
        it.onNext("Emit 2")
        it.onNext("Emit 3")
        it.onNext("Emit 4")
        it.onError(Exception("My Custom Exception"))
    }

    observable2.subscribe(observer)

}

앞의 예제와 같이 observer 인터페이스의 인스턴스를 생성했다. create메서드로 observable을 만들면 직접 onNext, onComplete, onError와 같은 메서드를 직접 호출할 수 있다.

사용자가 지정한 데이터 구조를 사용하거나 내보내는 값을 제어하려고 할때 유용하다.

Observable.from 메서드 이해


Observable.from 메서드는 Observable.create 메서드에 비해 상대적으로 간단하다. from 메서드의 도움을 받아

거의 모든 코틀린 구조체로부터 Observable 인스턴스를 생성할 수 있다.

fun main(args :Array<String>){
    val observer: Observer<String> = object : Observer<String> {
        override fun onComplete() {
            println("All Complete")
        }

        override fun onSubscribe(d: Disposable) {
            println("Subscribed to $d")
        }

        override fun onNext(t: String) {
            println("Next $t")
        }

        override fun onError(e: Throwable) {
            println("Error Occured ${e.message}")
        }
    }

    val list = listOf("String 1","String 2","String 3","String 4")
    val observableFromIterable : Observable<String> = Observable.fromIterable(list)
    observableFromIterable.subscribe(observer)

    val callable = object : Callable<String>{
        override fun call(): String {
            return "From Callable"
        }
    }
    val observableFromCallable:Observable<String> = Observable.fromCallable(callable)
    observableFromCallable.subscribe(observer)

    val future : Future<String> = object : Future<String>{
        override fun cancel(p0: Boolean): Boolean {
            return false
        }

        override fun isCancelled(): Boolean {
            return false
        }

        override fun isDone(): Boolean {
            return true
        }

        override fun get(): String {
            return "Hello From Future"
        }

        override fun get(p0: Long, p1: TimeUnit?): String {
            return "Hello From Future"
        }

    }
    val observableFromFuture:Observable<String> = Observable.fromFuture(future)
    observableFromFuture.subscribe(observer)
}

callable, future 인스턴스를 observable로 변환하였다.

callable은 runnable처럼 Thread를 실행하는대신 다른 객체로 반환한다.

Future도 callable처럼 Thread환경에서 사용한다.

 

toObservable의 확장 함수 이해


코틀린의 확장함수 덕분에 List와 같이 어떠한 Iterable 인스턴스도 Observable로 큰 어려움 없이 변경 가능하다.

fun main(args :Array<String>) {
    val observer: Observer<String> = object : Observer<String> {
        override fun onComplete() {
            println("All Complete")
        }

        override fun onSubscribe(d: Disposable) {
            println("Subscribed to $d")
        }

        override fun onNext(t: String) {
            println("Next $t")
        }

        override fun onError(e: Throwable) {
            println("Error Occured ${e.message}")
        }
    }

    val list : List<String> = listOf("String 1", "String 2","String 3","String 4")

    val observable : Observable<String> = list.toObservable()

    observable.subscribe(observer)
}

라이브러리를 찾아보면 toObservable함수는 확장함수로 작성하였으며 함수내용을 보면 Observable.from 메소드를 사용한것을 볼 수 있다.

Observable.just 메서드의 이해

이 메서드는 넘겨진 인자만을 배출하는 옵저버블이다.

Iterable 인스턴스를 Observable.just에 단일 인자로 넘기면 전체 목록을 하나의 아이템으로 배출하는데.

Observable.from과는 다르다는 점에 유의하자

Observable.just를 호출하면 다음과 같은 일이 일어난다.

  • 인자와 함께 Observable.just를 호출
  • Observable.just는 옵저버블을 생성
  • onNext 알림을 통해 각각의 아이템을 내보냄
  • 모든 인자의 제출이 완료되면 onComplete 알림을 실행
fun main(args: Array<String>) {
    val observer: Observer<Any> = object : Observer<Any> {
        override fun onComplete() {
            println("All Complete")
        }

        override fun onSubscribe(d: Disposable) {
            println("Subscribed to $d")
        }

        override fun onNext(t: Any) {
            println("Next $t")
        }

        override fun onError(e: Throwable) {
            println("Error Occured ${e.message}")
        }
    }

    Observable.just("A String").subscribe(observer)
    Observable.just(54).subscribe(observer)
    Observable.just(listOf("String 1", "String 2", "String 3", "String 4")).subscribe(observer)
    Observable.just(
        mapOf(
            Pair("Key 1", "Value 1"),
            Pair("Key 2", "Value 2"),
            Pair("Key 3", "Value 3")
        )
    ).subscribe(observer)
    Observable.just(arrayListOf(1,2,3,4,5,6)).subscribe(observer)
    Observable.just("String 1","String 2","String 3").subscribe(observer)
}

결과에서 알 수 있듯

list나 map도 단일 아이템으로 취급한다. next 1, next 2, next 3 이런식이 아닌 next [1,2,3,4,5,6] -> onComplete

Observable의 다른 팩토리 메서드


Observable의 다른 몇 가지 팩토리 메서드를 알아보자.

fun main(args: Array<String>) {
    val observer: Observer<Any> = object : Observer<Any> {
        override fun onComplete() {
            println("All Complete")
        }

        override fun onSubscribe(d: Disposable) {
            println("Subscribed to $d")
        }

        override fun onNext(t: Any) {
            println("Next $t")
        }

        override fun onError(e: Throwable) {
            println("Error Occured ${e.message}")
        }
    }

    Observable.range(1,10).subscribe(observer)
    Observable.empty<String>().subscribe(observer)

    runBlocking {
        Observable.interval(300,TimeUnit.MILLISECONDS).subscribe(observer)
        delay(900)
        Observable.timer(400,TimeUnit.MILLISECONDS).subscribe(observer)
        delay(450)
    }
}
  1. Observable.range(start, count) 팩토리 메소드는 start부터 시작해 count 만큼의 정수를 내보낸다.
  2. Observable,empty() 메서드는 onNext()로 항목을 내보내지 않고 즉시 onComplete()를 발생시킨다.
  3. Observable.interval(interval, TIMESET)은 지정된 간격 만큼의 숫자를 0부터 순차적으로 내보낸다. 구독을 취소하거나 프로그램이 종료될때까지 이어진다. timeset은 interval이 초인지 밀리초인지 지정되있는 열거형이다.
  4. Observable.timer()는 지정된 시간이 경과된 후 한번만 실행된다.

구독자 : Observer 인터페이스


이전 예제에서 볼 수 있듯이 Observer는 onNext(), onError(), onComplete(), onSubscribe(d : Disposable)의 네 가지 메서드를 가지는 인터페이스이다. 앞에서 설명햇듯이 옵저버블을 옵저버에 연결하면 이 네 가지 메서드가 호출된다. 다음은 네 가지 메서드에 대한 설명이다.

  • onNext : 아이템을 하나씩 넘겨주기 위해 옵저버블은 옵저버의 이 메서드를 호출한다.
  • onComplete : 옵저버블이 onNext를 통한 아이템 전달이 종료됐음을 알리고 싶을때 옵저버의 onComplete를 호출한다.
  • onError : 옵저버블에서 에러가 발생했을 때 옵저버에 정의된 로직이 있다면 onError를 호출하고 그렇지 않으면 예외를 발생시킨다.
  • onSubscriber : Observable이 새로운 Observer를 구독할 때마다 호출된다.

구독과 해지


Observable(관찰당하는 대상)과 Observer(관찰해야 하는 주제)가 있다. 어떻게 이 둘을 연결할 수 있을까

이 둘은 키보드와 마우스 같은 입력 장치와 컴퓨터를 연결할 때처럼 매개체가 필요하다.

 

Subscriber 연산자는 Observable을 Observer에 연결하는 매개체의 용도로 사용된다. Subscriber 연산자에 대해 1개에서 3개의 메서드를 전달할 수도 있고 Observer 인터페이스의 인스턴스를 연산자에 전달해 연결할 수도 있다.

fun main(args : Array<String>){
    val observable : Observable<Int> = Observable.range(1,5)

    observable.subscribe({
        println("Next $it")
    },{
        println("Error ${it.message}")
    },{
        println("Done")
    })

    val observer : Observer<Int> = object : Observer<Int>{
        override fun onComplete() {
            println("All Complete")
        }

        override fun onNext(t: Int) {
            println("Next $t")
        }

        override fun onError(e: Throwable) {
            println("Error Occured ${e.message}")
        }

        override fun onSubscribe(d: Disposable) {
            println("New Subscription")
        }
    }

    observable.subscribe(observer)

}

이 예제에서는 Observable 인스턴스를 생성하고 두 번에 걸쳐 오버로드된 subscriber 연산자를 사용했다. 둘다 세 가지 메서드를 전달했다.

구독의 개념을 이해했고 사용할 수 있게 됐다. 하지만 아이템을 사용하는 도중에 구독을 중지하려면 어떻게 해야할까?

옵저버의 onSubscriber 메서드에 해답이 있다.

해당 메서드를 보면 Disposable을 사용할수있는데 Disposable 변수를 만들어 이 disposable을 저장해서 원하는 타이밍에 해제시켜주면 된다.

fun main(args :Array<String>){
    runBlocking {
        val observable : Observable<Long> = Observable.interval(100,TimeUnit.MILLISECONDS)

        val observer : Observer<Long> = object : Observer<Long> {
            lateinit var disposable: Disposable
            override fun onSubscribe(d: Disposable) {
                disposable = d
            }

            override fun onNext(item: Long) {
                println("Received $item")
                if (item >= 10 && !disposable.isDisposed){
                    disposable.dispose()
                    println("Disposed")
                }
            }

            override fun onError(e: Throwable) {
                println("Error ${e.message}")
            }

            override fun onComplete() {
                println("Complete")
            }

        }
        observable.subscribe(observer)
        delay(1500)
    }
}

해당 예제를 보면 100ms마다 onNext() 메서드를 호출해서 프로그래밍이 종료되기 전인 1.5초까지는 계속 아이템을 push 해야되는데 도중에 중지되었다.

disposable 변수를 만들어 10번째 값을 받을때 구독을 해제했기 때문이다.