태임쓰의 개발블로그

[RxJava2] #5. 스케줄러 본문

Rx/RxJava

[RxJava2] #5. 스케줄러

태임태임 2018. 9. 9. 11:12

5. 스케줄러

5.1 스케줄러 개념

스케줄러는 RxJava의 핵심 요소로 Observable 만큼 중요하다.

  • 스케줄러는 스레드를 지정할 수 있게 해줌.

  • 지금까지 자바로 비동기 프로그래밍을 할 때 자바 스레드를 만들면서 경쟁 조건이나 synchronized 키워드를 생각했다면 스케줄러의 코드를 보고 놀랄 것!! (간결한 코드로 다시 탄생!!)

  • subscribeOn() 함수와 onserveOn() 함수를 모두 지정하면, [Observable에서 데이터 흐름이 발행하는 스레드]와 [처리된 결과를 구독자에게 발행하는 스레드]를 분리할 수 있다.

  • subscribeOn() 함수만 호출하면 Observable의 모든 흐름이 동일한 스레드에서 실행됨

  • 스케줄러를 별도로 지정하지 않으면 현재(main) 스레드에서 동작을 실행


5.2 스케줄러의 종류

  • RxJava는 다양한 스케줄러를 제공.

  • 상황에 맞게 어떤 스케줄러를 사용해야 하는지

  • RxJava 멋진 점 : 특정 스케줄러를 사용하다가 다른 스케줄러로 변경하기 쉬움

    • 마치 map() 함수를 한번 더 호출하는 것처럼 새롭게 스케줄러를 추가하거나 기존의 스케줄러를 다른 것으로 교체할 수 있다.

5.2.1 뉴 스레드 스케줄러

  • 이름처럼 새로운 스레드 생성

  • 새로운 스레드를 만들어 어떤 동작을 실행하고 싶을 때 Schedulers.newThread() 를 인자로 넣어주면 됨.

    • 그럼 뉴 스레드 스케줄러는 요청을 받을 때마다 새로운 스레드 생성

  • 새로운 스레드를 생성하여 내가 원하는 동작을 처리하는 방법

    • 하지만, 적극적으로 추천하는 방법은 아님.

      • RxJava에는 이거보다 활용도가 높은 계산 스케줄러와 IO 스케줄러와 같은 다른 스케줄러를 제공하기 때문.


RxJava에서 추천하는 스케줄러는 크게 3가지
  • 1.계산(Computation) 스케줄러 / 2. IO 스케줄러 / 3. 트램펄린 스케줄러

  • 뉴 스레드 스케줄러나 다른 스케줄러는 특수한 상황에서 적용하길 권장



5.2.2 계산 스케줄러

  • CPU에 대응하는 계산용 스케줄러

  • '계산' 작업을 할 때는 대기 시간 없이 빠르게 결과를 도출하는 것이 중요.

    • 계산 작업이 어렵게 느껴진다면 => 입출력(I/O) 작업을 하지 않는 스케줄러라고 생각하면 됨.

  • 내부적으로 스레드 풀을 생성

    .subscribeOn (Schedulers.computatioin())


5.2.3 IO 스케줄러

  • 계산 스케줄러와는 다르게 네트워크상의 요청을 처리하거나 각종 입,출력 작업을 실행하기 위한 스케줄러

    • 계산 스케줄러와 다른 점 : 기본으로 생성되는 스레드 개수가 다르다는 것

      • 즉, 계산 스케줄러는 CPU 개수만큼 스레드를 생성하지만,

      • IO 스케줄러는 필요할 때마다 스레드를 계속 생성함.

    • 계산 스케줄러 => 일반적인 계산 작업

    • IO 스케줄러 => 네트워크상의 요청, 파일 입출력, DB 쿼리 등

  • .subscribeOn (Schedulers.io())

String root ="C:\\";
File[] files = new File(root).listFiles();

Observable<String> source = Observable.fromArray(files)
  .filter(f -> !f.isDirectory)
  .map(f -> f.getAbsolutePath())
  .subscribeOn(Schedulers.io());

source.subscribe(Log::i);
  • source변수는 File[] 배열에 있는 파일의 절대 경로를 발행하는 Observable => IO 스케줄러에서 실행됨


5.2.4 트램펄린 스케줄러

  • 새로운 스레드를 생성하지 않고 현재 스레드에 무한한 크기의 대기 행렬(Queue)를 생성하는 스케줄러

    • 새로운 스레드 생성하지 않는 것과 대기 행렬을 자동으로 만들어 준다는 것이 뉴 스레드, 계산, IO 스케줄러와 다르다.

  • 생성방법 : .subscribeOn (Schedulers.trampoline())


5.2.5 싱글 스레드 스케줄러

  • RxJava 내부에서 단일 스레드를 별도로 생성하여 구독(subscribe) 작업을 처리.

  • But, 리액티브 프로그래밍이 비동기 프로그래밍을 지향하기 때문에 싱글 스레드 스케줄러를 활용할 확률은 낮다.

  • 생성방법 : .subscribeOn (Schedulers.single())


5.2.6 Executor 변환 스케줄러

  • 자바에서는 java.util.current 패키지에서 제공하는 실행자(Executor)를 변환하여 스케줄러를 생성할 수 있음.

  • 하지만 Executor 클래스와 스케줄러의 동작 방식이 다르므로 추천방법은 아님

    • 기존에 사용하던 Executor 클래스를 재사용할 때문 한정적으로 활용한다.

  • 생성방법 : .subscribeOn (Schedulers.from(executor))


5.3 onserveOn() 함수의 활용

RxJava 스케줄러의 핵심은 결국 제공되는 스케줄러의 종류를 선택한 후 subscribeOn() 과 observeOn() 함수를 호출하는 것이다.

subscribeOn() : Observable에서 구독자가 subscribe() 함수를 호출했을 때, 데이터 흐름을 발행하는 스레드를 지정

  • 처음 지정한 스레드를 고정시키므로 다시 subscribeOn() 함수를 호출해도 무시. (하지만, observeOn() 함수는 다름)

observeOn() : 처리된 결과를 구독자에게 전달하는 스레드 지정.

  • 여러번 호출할 수 있으며, 호출되면 그 다음부터 동작하는 스레드를 바꿀 수 있다.

  • 전통적인 스레드 프로그래밍에서는 일일이 스레드를 만들어야 하고 스레드가 늘어날 때마다 동기화하는 것이 매우 부담스럽기 때문에 이러한 로직을 구현하는 것이 매우 힘듬.

    • 하지만, observeOn() 함수는 스레드 변경이 쉬우므로 활용할 수 있는 범위가 넓음






Reference



Comments