태임쓰의 개발블로그

[RxJava2] #4. 리액티브 연산자 - 활용 (생성, 변환, 결합, 조건 연산자) 본문

Rx/RxJava

[RxJava2] #4. 리액티브 연산자 - 활용 (생성, 변환, 결합, 조건 연산자)

태임태임 2018. 9. 9. 10:13

4. 리액티브 연산자 - 활용

4.1 연산자 분류

연산자 종류가 많아 카테고리 별로 나눔 (ReactiveX 홈페이지 기준)

연산자 종류연산자 함수
생성 연산자just(), fromXXX(), create(), interval(), range(), timer(), intervalRange(), defer(), repeat()
변환 연산자map(), flatMap(), concatMap(), switchMap), groupBy(), scan(), buffer(), window()
필터 연산자filter(), take(), skip(), distinct()
결합 연산자zip(), combineLatest(), Merge(), concat()
조건 연산자amb(), takeUntil(), skipUntil(), all()
에러 처리 연산자onErrorReturn(), onErrorResumeNext(), retry(), retryUntil()
기타 연산자subscribe(), subscribeOn(), observeOn(), reduce(), count()


4.2 생성 연산자

생성 연산자의 역할은 데이터의 흐름을 만드는 것이다.

간단하게 Observable (Observable, Single, Maybe 객체 등)을 만드는 것

4.2.1 interval()


  • 일정시간 간격으로 데이터 흐름 생성

  • 주어진 시간 간격으로 0부터 1씩 증가하는 Long 객체(기본형X, 래퍼클래스O) 발행

  • 함수 원형 두가지

    함수 원형설명비고
    interval(long period, TimeUnit unit)일정 시간(period) 쉬었다가 데이터 발행
    interval(long initialDelay, long period, TimeUnit unit)동작은 같고 최초 지연 시간(initialDelay)을 조절보통 초기 지연시간 없이(initialDelay를 0으로) 바로 데이터를 발행하기 위해 사용
참고 ) SchedulerSupport 어노테이션

@SchedulerSupport(SchedulerSupport.COMPUTATION)
  • interval() 함수의 동작이 계산 스케줄러에서 실행된다는 의미

  • 현재 스레드가 아니라 계산을 위한 별도의 스레드(RxJava에서 스케줄러)에서 동작


4.2.2 timer()

  • interval()과 유사하지만 한번만 실행되는 함수

  • 일정 시간이 지난 후, 한 개의 데이터를 발행하고 onComplete() 이벤트 발행



4.2.3 range()

  • 주어진 값 (n) 부터 m개의 Integer 객체를 발행

    • interval() / timer() 는 Long 객체를 발행했지만 range()는 Integer 객체를 발행하는 것이 다름

  • 스케줄러에서 실행되지 않고 현재 스레드에서 실행

    • (interval(), timer()와 다르게)



4.2.4 intervalRange()



  • interval() + range() 를 혼합해놓은 함수

    • interval() 처럼 일정시간 간격으로 값을 출력하지만

    • range() 처럼 시작 숫자(n)로 부터 m개만큼의 값만 생성하고 onComplete이벤트가 발생

      • 즉, interval() 처럼 무한히 데이터 흐름을 발행하지 않음

  • 리턴 타입은 interval()함수와 동일하게 Long 타입

public static Observable<Long> intervalRange 
  (long start, long count, long initialDelay, long period, TimeUnit unit) { }  

인자의 개수가 너무 많아서 직관적이지 않다. 차라리 다른 함수들을 조합해 intervalRange() 를 만들 수 있다.

Observable<Long> souce = Observable.interval(100L, TimeUnit.MILLISECONDS)
  .map (val -> val+1)
  .take (5);
source.subscribe(Log::i);
CommonUtils.sleep(1000);



4.2.5 defer()

  • timer() 와 비슷하지만 데이터 흐름생성을 구독자가 subscribe() 함수를 호출할 때까지 미룰 수 있음. 이때 새로운 Observable이 생성됨



4.2.6 repeat()

  • 단순히 반복 실행을 함.

  • 해당 서버가 잘 살아있는지 확인(ping, heart beat..) 하는 코드에 유용



4.3 변환 연산자

변환 연산자는 만들어진 데이터 흐름원하는 대로 변형할 수 있다.

기본이 되는 함수(map(), flatMap())와 비교하여 어떻게 다른지 그 차이점을 기억하는게 좋음.


4.3.1 concatMap()

  • flatMap() 과 비슷

    • flatMap() 은 먼저 들어온 데이터를 처리하는 중에 새로운 데이터가 들어오면 나중에 들어온 데이터의 처리결과가 먼저 출력 될 수도 있다. ==> 인터리빙(끼어들기) 라고 함.

    • But, concatMap()은 먼저 들어온 데이터 순서대로 처리해서 결과를 냄

  • 계산 속도는 flatMap()이 훨씬 바르다

    • 이유) 인터리빙을 허용하기 때문

    • concatMap() 함수의 순서를 보장해주려면 추가 시간이 필요함.



4.3.2 switchMap()

  • concatMap() 함수가 인터리빙이 발생할 수 있는 상황에서 동작의 순서를 보장해준다면,

    switchMap() 함수는 순서를 보장하기 위해 기존에 진행 중이던 작업을 바로 중단

  • 여러 개의 값이 발행되었을때, 마지막에 들어온 값만 처리하고 싶을 때 사용

    • 중간에 끊기더라도 마지막 데이터의 처리는 보장하기 때문.



4.3.3 groupBy()

  • 어떤 기준(keySelector 인자)으로 단일 Observable을 여러 개로 이루어진 Observable 그룹(GroupedObservable)으로 만듬.

    • 어떤 기준으로 Observable 각각을 여러 개 Observable의 그룹으로 구분한다고 생각하면 됨


참고) map() / flatMap() / groupBy() 함수의 동작 비교
  • map() 함수 - 1개의 데이터를 다른 값이나 다른 타입으로 변환

  • flatMap() 함수 - 1개의 값을 받아서 여러 개의 데이터(Observable)로 확장해줌.

  • groupBy() - 값들을 받아서 어떤 기준에 맞는 새로운 Observable 다수 생성


4.3.4 scan()

  • 실행할 때마다 입력값에 맞는 중간 결과 및 최종 결과를 구독자에게 발행

  • reduce() 함수와 비슷

    • reduce() 는 Observable에서 모든 데이터가 입력된 후 그것을 종합하여 마지막 1개의 데이터만을 구독자에게 발행함.

    • 즉, 다른 점은 마지막 1개 or 중간결과까지 계속 발행하냐



4.4 결합 연산자

결합 연산자는 여러개의 Observable을 조합하여 활용하는 연산자이다.

다수의 Observable을 하나로 합하는 방법을 제공한다.


4.4.1 zip()

  • 각각의 Observable을 모두 활용해 2개 혹은 그 이상의 Observable을 결합

    • 예) A,B 두개의 Observable을 결합한다면 2개의 Observable에서 모두 데이터를 발행해야 결합 가능. 그전까지는 발행을 기다림



4.4.2 combineLatest()

  • 2개 이상의 Observable을 기반으로 Observable 각각의 값이 변경되었을 때 갱신해주는 함수.

  • 두 Observable 모두 값을 발행하면 그때는 결괏값이 나옴

    • 그 다음부터는 둘 중에 어떤 것이 갱신되던지 최신 결괏값을 보여줌(이 부분이 zip()함수와 다름)



4.4.3 merge()

  • 가장 단순한 결합함수 (zip, combineLatest에 비교하면)

  • 입력 Observable의 순서와 모든 Observable이 데이터를 발행하는지 등에 관여하지 않고 어느 것이든 업스트림에서 먼저 입력되는 데이터를 그대로 발행함.



4.4.4 concat()

  • 2개 이상의 Observable을 이어 붙여주는 함수

  • 첫번째 Observable에 onComplete 이벤트가 발생해야 두 번째 Observable을 구독함. (스레드를 활용한 일반적인 코드로 이와 같은 내용을 구현하려면 만만치 않을 것)

    • 첫번째 Observabl에 onComplete 이벤트가 발생하지 않으면, 두번 째 Observable은 영원히 대기

      • 이는 잠재적인 메모리 누수(memory leak)의 위험을 내포함.

      • 따라서, 입력 Observable이 반드시 완료(onComplete 이벤트)될 수 있게 해야함.

  • 결합할 수 있는 Observable은 최대 4개

  • concat() 함수를 활용할 때는 onComplete 이벤트의 발생 여부 확인이 중요.



4.5 조건 연산자

조건 연산자는 Observable의 흐름을 제어하는 역할을 한다.

필터 연산자는 발행된 값을 채택하느냐 기각하느냐 여부에 초점을 맞춘다면, 조건 연산자는 지금까지의 흐름을 어떻게 제어할 것인지에 초점을 맞춘다.


4.5.1 amb()

  • 둘 중 어느 것이든 먼저 나오는 Observable을 채택함.

    • ambiguous(모호한) 라는 영어 단어의 줄임말

  • 여러 개의 Observable 중에서 1개의 Observable을 선택하는데,

    • 선택 기준은 가장 먼저 데이터를 발행하는 Observable.

    • 이후 나머지 Observable에서 발행하는 데이터는 모두 무시한다.



4.5.2 takeUntil()

  • take() 함수에 조건을 설정

  • 인자로 받은 Observable에서 어떤 값을 발행하면, 현재 Observable의 데이터 발행을 중단하고 즉시 완료(onComplete 이벤트 발생)

    • 즉, take() 처럼 일정 개수만 값을 발행하되, 완료 기준을 다른 Observable에서 값을 발행하는지로 판단하는 것.

takeUntil (ObservableSource<U> other)
  • 인자로 값을 발행할 수 있는 other Observable이 필요하다.



4.5.3 skipUntil()

  • takeUntil() 과 정반대 함수

  • other Observable 에서 데이터를 발행할 때까지 값을 건너 뜀.

    • other Observable 에서 값을 발행하는 순간부터 원래 Observable에서 값을 정상적으로 발행하기 시작



4.5.4 all()

  • 주어진 조건에 100% 맞을 때만 true 값을 발행

    • 조건에 맞지 않으면 바로 false 값을 발행

Single<Boolean> all (Predicate<? super T> predicate)
  • predicate 인자는 filter() 함수의 인자와 동일

  • 주어진 람다가 true인지 false 인지 판정해줌.



4.6 수학 및 기타 연산자

4.6.1 수학 함수

  • RxJava에는 여러가지 확장 모듈이 존재한다. (RxAndroid, RxNetty, RxApacheHttp등)

    • RxJava1 => 수학함수 모은 RxJavaMath 가 있음.

    • RxJava2 => RxJavaMath 지원안됨. —> 다른 라이브러리 사용해야 함.

  • RxJava2의 핵심 커미터인 데이빗 카르녹이 만든 RxJava2Extensions 라이브러리 사용

4.6.1.1 count()

  • Single< Long> count()

  • Observable에서 발행한 데이터의 개수를 발행

  • 결과가 1개 값이므로 Single< Long>을 발행

Integer[] data = {1,2,3,4,7};

Single<Long> source = Observable.fromArray(data)
  .count();
source.subscribe(count -> Log.i("count >> " + count));

// [출력] count >> 5

4.6.1.2 max() / min()

  • Flowable<T> max(Publisher<T> source)

  • Flowable<T> min(Publisher<T> source)

Integer[] data = {1,2,3,4,7};

Floawable.fromArray(data)
  .to(MathFlowable::max)
  .subscribe(max -> Log.i("max >> " + max));

// [출력] max >> 7

4.6.1.3 sum() / average()

  • sum()과 average()는 각각 아래처럼 sumInt(), averageDouble() 처럼 인자 타입이 함수 이름에 그대로 반영되어 있음. (원하는 타입 골라 사용)

  • Flowable<Integer> sumInt(Publisher<Integer> source)

  • Flowable<Integer> averageDouble(Publisher<? extends Number> source)

Integer[] data = {1,2,3,4,7};

Flowable<Integer> flowable = Flowable.fromArray(data)
  .to(MathFlowable::sumInt);
flowable.subscribe(sum -> Lob.i("sum >> " + sum));

// [출력] sum >> 17


4.6.2 delay()

  • 단순하게 인자로 전달받는 time과 시간단위(TimeUnit)만큼 입력받은 Observable의 데이터 발행을 지연시켜주는 역할

    • Observable<T> delay (long delay, TimeUnit unit)

  • Interval()과 마찬가지로 계산 스케줄러에서 실행.



4.6.3 timeInterval()

  • 어떤 값을 발행했을 때 이전 값을 발행한 이후 얼마나 시간이 흘렀는지 알려준다.






Reference



Comments