태임쓰의 개발블로그

[RxJava2] #2. Observable (Single, Maybe, 뜨거운/차가운 Observable, 팩토리함수) 본문

Rx/RxJava

[RxJava2] #2. Observable (Single, Maybe, 뜨거운/차가운 Observable, 팩토리함수)

태임태임 2018. 9. 8. 22:23

2. Observable

  • RxJava는 Observable에서 시작해 Observable로 끝난다고 해도 과언이 아닐 정도로 중요한 개념.

  • RxJava 1.x 에서는 Observable, Single 클래스

    • 2.x 에서는 Observable, Maybe, Flowable 클래스 (상황에 맞게 세분화해 구분해 사용)


2.1 Observable 클래스

  • Observable은 옵서버(observer) 패턴을 구현

    • 옵서버 패턴은 객체의 상태 변화를 관찰하는 관찰자(옵서버) 목록을 객체에 등록

  • 그리고 상태 변화가 있을 때마다 메서드를 호출하여 객체가 직접 목록의 각 옵서버에게 변화를 알려준다.

    • 라이브사이클은 존재하지 않으며, 보통 단일 함수를 통해 변화만 알린다.

  • Observable은 무슨 뜻?

    • 직관적으로 => 관찰자(Observer)가 관찰하는 대상

  • 조금 부족해! => Observed라는 단어가 관찰을 통해서 얻은 결과를 의미한다면, Observable은 현재는 관찰되지 않았지만 이론을 통해서 앞으로 관찰할 가능성을 의미한다.

  • 옵서버 패턴의 대표적인 예 ) 버튼을 누르면 버튼에 미리 등록해 둔 onClick() 메서드를 호출해 원하는 처리를 하는 것

  • RxJava의 Observable은 세가지의 알림을 구독자에게 전달

    • onNext : Observable이 데이터의 발행을 알린다. 기존의 옵서버 패턴과 같음.

    • onComplete : 모든 데이터의 발행을 완료했음을 알린다. onComplete이벤트는 단 한번만 발생하며, 발생한 후에는 더 이상 onNext 이벤트가 발생해선 안 된다.
    • onError : 에러 발생을 알림. onError 이벤트가 발생하면 이후에 onNext 및 onComplete 이벤트가 발생하지 않는다. 즉, Observable의 실행을 종료.
  • Observable 클래스에 많은 수의 함수 존재 (Observable을 생성하는 팩토리 함수, 중간 결과를 처리하는 함수, 디버그 및 예외 함수가 모두 포함)

  • Observable을 생성할 때는 직접 인스턴스를 만들지 않고 정적 팩토리 함수를 호출.

    • 1.x 기본 팩토리 함수 : create(), just(), from()

  • 2.x 추가 팩토리 함수 : fromArray(), fromIterable(), fromCallable(), fromFuture(), fromPublisher()

    • 기타 팩토리 함수 : interval(), range(), timer(), defer()


2.2 팩토리 함수

Observable을 객체를 생성하는 팩토리 함수.

Observable을 생성할 때는 인스턴스를 직접 만들지 않고 정적 팩토리 함수를 호출한다.

2.2.1 just()

  • 인자로 넣은 데이터를 차례로 발행하고 Observable을 생성

  • 1~10개의 인자를 넣을 수 있음. (단, 타입은 모두 같아야함)


2.2.2 subscribe() 함수 / Disposable 객체

1. subscibe() 함수
  • 내가 동작시키기 원하는 것을 사전에 정의해둔 다음 실제 그것이 실행되는 시점을 조절 => subscribe()

  • just() 등의 팩토리 함수로 데이터 흐름을 정의한 후, subscribe()를 호출해야 실제로 데이터를 발행함.

  • 주요 원형(4가지)

    • 특징 : 모두 Disposable 인터페이스의 객체를 리턴함.

    원형특징
    Disposable subscribe()인자없음, onNext와 onComplete 이벤트를 무시하고 onError 이벤트가 발생했을 때만 exception throw함. ==> 테스트하거나 디버깅할 때 활용
    Disposable subscribe(Consumer<? super T> onNext)인자가 1개 있는 오버로딩은 onNext 이벤트를 처리 / 이때도 onError 발생하면 exception을 던짐
    Disposable subscribe (Consumer<? super T> onNext, Consumer<? super java.lang.Throwable> onError)onNext와 onError 이벤트 처리
    Disposable subscibe(Consumer<? super T> onNext, Consumer<? super java.lang.Throwable> onError, Action OnComplete)onNext, onError, onComplete 이벤트 모두 처리

2. Disposable 객체
  • void dispose() / boolean isDisposed() => 2개 함수만 존재

  • dispose()

    • Observable에게 더 이상 데이터를 발행하지 않도록 구독을 해지하는 함수

    • Observable이 onComplete 알림을 보냈을 때, 자동으로 dispose()를 호출해

      Observable과 구독자의 관계를 끊는다. => 따로 dispose() 호출 X

  • isDisposed()

    • Observable이 데이터를 발행하지 않는지 (구독을 해지했는지) 확인하는 함수

    • 구독을 해지했으면 -> true


2.2.3 create() 함수

  • onNext, onComplete, onError 같은 알림을 개발자가 직접 호출해야함.

    • 반면에) just() 함수는 데이터를 인자로 넣으면 자동으로 알림 이벤트가 발생

  • (RxJava의 javadoc에 따르면) create()는 RxJava에 익숙한 사용자만 활용하도록 권고

    • 다른 팩토리 함수를 활용하면 같은 효과를 낼 수 있음


2.2.4 fromXXX() 계열 함수

  • 단일 데이터가 아닐 때 fromXXX() 계열 함수 사용

    • just(), create() 는 단일 데이터를 다뤘음.

  • RxJava2에서 from() 함수 세분화 됨.

    • RxJava 1.x에서는 from(), fromCallable() 만 사용

    • from() 을 배열, 반복자, 비동기 계산 등에 모두 사용하다 보니 모호함이 있어 세분화함.


2.2.4.1 fromArray() 함수

  • 배열이 들어있는 데이터를 처리할 때 사용

    • Observable.fromArray(arr)

  • RxJava에서 int 배열을 인식시키려면 Integer[] 로 변환해야 함.


2.2.4.2 fromIterable() 함수

  • Observable을 만드는 다른 방법은 Iterable 인터페이스를 구현한 클래스에서 Observable 객체를 생성하는 것.

  • Iterable< E > 인터페이스 구현하는 대표적인 클래스

    • ArrayList(List), ArrayBlockingQueue(BlockingQueue), HashSet(Set), LinkedList, Stack, TreeSet, Vector 등..

List<String> foods = new ArrayList<>();
foods.add("pizza");
foods.add("chicken");
foods.add("hamburger");

Observable<String> source = Observable.fromIterable(foods);
source.subscribe(System.out::println);

/* 실행결과
pizza
chicken
hamburger
*/



2.2 Single 클래스

  • Observable의 특수한 형태 (1.x 버전부터 존재)

  • Observable 클래스는 데이터를 무한하게 발행할 수 있지만, Single 클래스는 오직 1개의 데이터만 발행하도록 한정

    • 보통 결과과 유일한 서버 API를 호출할 때 유용하게 사용
  • 데이터 하나가 발행과 동시에 종료

    • 라이프사이클 관점에서 onNext() / onComplete() 함수가 onSuccess() 로 통합

    • onSuccess(T value) / onError() 함수로 구성


2.2.1 Single 객체 생성 가장 쉬운 방법 - just()

Single 객체를 생성할 때, 가장 간단한 방법은 just() 함수를 사용하는 것이다.

Single.just(“Hello Single”);


2.2.2 Observable에서 Single 클래스 사용

  • Single은 Observable의 특수한 형태이므로 Observable에서 변환할 수 있다.

  • Observable 클래스에서 Single 클래스 사용


2.2.3 Single 클래스의 올바른 사용방법

just() 에 여러개의 값을 넣으면 어떻게 될까~?

Observable.just("one","error").single("default item");
  • 에러 발생!!

  • 두번째 값을 발행하면서 onNext 이벤트가 발생할 때 에러가 발생됨

    • (IllegalArgumentException: Sequence contains more than one element!)



2.3 Maybe 클래스

  • RxJava2에 처음 도입된 Observable의 또 다른 특수형태

    • RxJava1.X 에는 없었음.

  • Single 클래스와 마찬가지로 최대 데이터 하나를 가질 수 있지만 데이터 발행 없이 바로 데이터 발생을 완료할 수 있다.

    • Single : 1개 완료 / Maybe : 0 or 1 완료

  • 생성방법

    • Maybe 클래스를 통해 생성할 수 있지만,

    • 보통 Observable의 특정 연산자를 통해 생성할 때가 많음

      • elementAt(), firstElement(), flatMapMaybe(), lastElement(), reduce(), singleElement() 함수 등


2.4 뜨거운 VS. 차가운 Observable

  • Observable 2가지로 나뉨 ==> 뜨거운 Observable / 차가운 Observable

  • 차가운 -> 뜨거운 변환 가능 (Subject 객체를 만들거나 ConnectableObservable 클래스 활용)


2.4.1 차가운 Observable

  • Observable을 선언하고 just() 함수를 호출해도 옵서버가 subscribe() 함수를 호출하여 구독하지 않으면 데이터를 발행하지 않는다. ==> 게으른 접근법

    • 예시) 웹 요청, 데이터베이스 쿼리와 읽기 등 / 내가 원하는 URL이나 데이터를 지정하면 그때부터 서버나 데이터베이스에 요청을 보내고 결과를 받아옴

  • 구독자가 구독하면 준비된 데이터를 처음부터 발행.

  • 지금까지 우리가 다룬거, 앞으로도 별도의 언급이 없으면 차가운 Observable


2.4.2 뜨거운 Observable

  • 구독자가 존재 여부와 관계없이 데이터를 발행함. => 여러 구독자를 고려할 수 있다.

  • 단, 구독자로서는 Observable에서 발행하는 데이터를 모두 수신할 것으로 보장할 수 없다.

  • 구독한 시점부터 Observable에서 발행한 값을 받는다.

    • 예시) 마우스 이벤트, 키보드 이벤트, 시스템 이벤트, 센서 데이터, 주식 가격 등

  • 주의할점 ) 배압을 고려해야 함.

    • 배압은 Observable에서 데이터를 발행하는 속도와 구독자가 처리하는 속도의 차이가 클 때 발생

    • Flowable이라는 특화 클래스에서 배압을 처리


2.5 Subject 클래스

Subject 클래스는 차가운 Observable을 뜨거운 Observable로 변환해준다.

2.5.1 Subject 클래스의 특성

  • Observable의 속성과 구독자의 속성이 모두 있다.

    • Observable 처럼) 데이터 발행 가능

    • 구독자 처럼 ) 발행된 데이터를 바로 처리 가능

2.5.2 주요 Subject 클래스

주요 Subject 클래스에는 AsyncSubject / BehaviorSubject / PublishSubject / ReplaySubject 등이 있다.



Comments