레이블이 From인 게시물을 표시합니다. 모든 게시물 표시
레이블이 From인 게시물을 표시합니다. 모든 게시물 표시

2017년 6월 4일 일요일

[STUDY] RxAndroid Part 7 (Scheduler 이해하기)


[목차]==================================================
1. RxJava와 RxAndroid에서 제공하는 Scheduler 종류
1) RxJava가 제공하는 Scheduler
2) RxAndroid는 제공하는 Scheduler
2. 독립적으로 사용 가능한 Scheduler
3. 사용법 예제
1) Scheduler 안한 경우 기본 동작
2) subscribeOn() API만 사용한 경우 동작
3) observerOn() API만 사용한 경우 동작
4) subscribeOn() & observerOn() 1번씩 API 사용한 경우 동작
5) subscribeOn() & observerOn() 다수 API 사용한 경우 동작
======================================================


RxAndroid에서는 Scheduler를 통해 어느 쓰레드에서 실행이 될지 결정 할 수 있습니다.Scheduler는 subsctibeOn(), observeOn() 에서 각각 지정할 수 있는데
subsctibeOn()은 observable의 작업을 시작하는 쓰레드를 선택 할 수 있습니다.( 중복해서 적을 경우 가장 마지막에 적힌 스레드에서 시작합니다.)
observeOn()은 이후에 나오는 Operator, subscribe의 Scheduler를 변경 할 수 있습니다.

1. RxJava와 RxAndroid에서 제공하는 Scheduler 종류
1) RxJava가 제공하는 Scheduler
  • Schedulers.computation()
이벤트 룹에서 간단한 연산이나 콜백 처리를 위해서 쓰는 것입니다. I/O 처리를 여기에서 해서는 안됩니다.
RxComputationThreadPool라는 별도의 스레드 풀에서 돌아갑니다. 최대 cpu갯수 ?개의 스레드 풀이 순환하면서 실행됩니다.
  • Schedulers.from(executor)
특정 executor를 스케쥴러로 사용합니다
  • Schedulers.immediate
현재 스레드에서 즉시 수행합니다.
observeOn()이 여러번 쓰였을 경우 immediate()를 선언한 바로 윗쪽의 스레드를 따라갑니다.
  • Schedulers.io()
동기 I/O를 별도로 처리시켜 비동기 효율을 얻기 위한 스케줄러입니다. 자체적인 스레드 풀에 의존합니다.
자체적인 스레드 풀 CachedThreadPool을 사용합니다. API 호출 등 네트워크를 사용한 호출 시 사용됩니다.
  • Schedulers.newThread()
항상 새로운 스레드를 만드는 스케쥴러입니다.
  • Schedulers.trampoline()
큐에 있는 일이 끝나면 이어서 현재 스레드에서 수행하는 스케쥴러

※ 일부 오퍼레이터들은 자체적으로 어떤 스케쥴러를 사용할지 지정합니다. 예를 들어 buffer 오퍼레이터는 Schedulers.computation()에 의존하며 repeat은 Schedulers.trampoline()를 사용합니다.


2) RxAndroid는 제공하는 Scheduler
  • AndroidSchedulers.mainThread()
안드로이드의 UI 스레드에서 동작
  • HandlerScheduler.from(handler)
특정 핸들러 handler에 의존하여 동작

※ 안드로이드에 특화된 스케쥴러입니다. 보통은 RxAndroid가 제공하는 AndroidSchedulers.mainThread()와 RxJava가 제공하는 Schedulers.io()를 조합해서 Schedulers.io()에서 수행한 결과를 AndroidSchedulers.mainThread()에서 받아 UI에 반영하는 패턴등이 일반적으로 쓰입니다.


2. 독립적으로 사용 가능한 Scheduler
Scheduler는 Observable, Operator, Subscriber 모델 밖에서 별도로 사용할 수 있습니다.
 worker = Schedulers.newThread().createWorker();

worker.schedule(new Action0() {
    @Override
    public void call() {
        realmJob();
    }
});

 Scheduler.Worker worker = Schedulers.newThread().createWorker();
        worker.schedule(new Runnable() {
            @Override
            public void run() {
                log("worker: " + Thread.currentThread().getName());
            }
        });
[출력결과]
worker: RxNewThreadScheduler-1


3. 사용법 예제
1) Scheduler 안한 경우 기본 동작
Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                log("subscribe:" + Thread.currentThread().getName());
                e.onNext("next");
            }
        }).map(s1 -> {
            log("map: " + Thread.currentThread().getName());
            return s1;
        }).subscribe(s -> log("on next: " + Thread.currentThread().getName()));
[출력결과]
subscribe:main
map: main
on next: main

2) subscribeOn() API만 사용한 경우 동작
Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                log("subscribe:" + Thread.currentThread().getName());
                e.onNext("next");
            }
        }).subscribeOn(Schedulers.computation()).map(s1 -> {
            log("map: " + Thread.currentThread().getName());
            return s1;
        }).subscribe(s -> log("on next: " + Thread.currentThread().getName()));
[출력결과]
subscribe:RxComputationThreadPool-1
map: RxComputationThreadPool-1
on next: RxComputationThreadPool-1

3) observerOn() API만 사용한 경우 동작
Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                log("subscribe:" + Thread.currentThread().getName());
                e.onNext("next");
            }
        }).observeOn(Schedulers.computation()).map(s1 -> {
            log("map: " + Thread.currentThread().getName());
            return s1;
        }).subscribe(s -> log("on next: " + Thread.currentThread().getName()));
[출력결과]
subscribe:main
map: RxComputationThreadPool-1
on next: RxComputationThreadPool-1

4) subscribeOn() & observerOn() 1번씩 API 사용한 경우 동작
Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                log("subscribe:" + Thread.currentThread().getName());
                e.onNext("next");
            }
        }).subscribeOn(Schedulers.io())
              .observeOn(AndroidSchedulers.mainThread())
                .map(s1 -> {
                        log("map: " + Thread.currentThread().getName());
                        return s1;
         }).subscribe(s -> log("on next: " + Thread.currentThread().getName()));
[출력결과]
subscribe:RxCachedThreadScheduler-1
map: main
on next: main

5) subscribeOn() & observerOn() 다수 API 사용한 경우 동작
 Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                log("subscribe:" + Thread.currentThread().getName());
                e.onNext("next");
            }
        }).subscribeOn(Schedulers.io())
          .subscribeOn(Schedulers.computation())
          .observeOn(AndroidSchedulers.mainThread()).map(s1 -> {
                        log("map1: " + Thread.currentThread().getName());
                        return s1;
         }).observeOn(Schedulers.newThread()).map(s2 -> {
                        log("map2: " + Thread.currentThread().getName());
                        return s2;
         }).observeOn(Schedulers.single()).map(s3 -> {
                        log("map3: " + Thread.currentThread().getName());
                        return s3;
         }).subscribe(s -> log("on next: " + Thread.currentThread().getName()));
[출력결과]
subscribe:RxCachedThreadScheduler-1
map1: main
map2: RxNewThreadScheduler-1
map3: RxSingleScheduler-1
on next: RxSingleScheduler-1



 
출처 : 인터넷에서 RxAndroid 검색하여 필요한 정보를 다양한 사이트에서 종합하여 작성된 것입니다. 많은 사이트 내용을 종합하여 공부하여 작성하다보니 일일이 나열하지 못하였습니다. ㅈㅅ(_ _) 이글은 자유롭게 퍼 가셔서 도움이 되었으면 좋겠습니다. 감사합니다. 

2017년 6월 3일 토요일

[STUDY] RxAndroid Part 4 (Observable - Create/Defer/Empty/Never/Throw/Never/Throw/From/Interval 등)


[목차]==================================================
1. Observable을 만드는 Operator 목록
2. Create 
3. Defer
4. Empty/Never/Throw
5. From
6. Interval
7. Just
8. Range
9. Repeat
10. Start
11. Timer
======================================================


Observable을 좀 더 쉽게 만들 수 있는 방법이 있습니다.
바로 미리  생성된 Operator를 사용하는 것 입니다


1. Observable을 만드는 Operator 목록

just( ) — convert an object or several objects into an Observable that emits that object or those objects
from( ) — convert an Iterable, a Future, or an Array into an Observable
repeat( ) — create an Observable that emits a particular item or sequence of items repeatedly
repeatWhen( ) — create an Observable that emits a particular item or sequence of items repeatedly, depending on the emissions of a second Observable
create( ) — create an Observable from scratch by means of a function
defer( ) — do not create the Observable until a Subscriber subscribes; create a fresh Observable on each subscription
range( ) — create an Observable that emits a range of sequential integers
interval( ) — create an Observable that emits a sequence of integers spaced by a given time interval
timer( ) — create an Observable that emits a single item after a given delay
empty( ) — create an Observable that emits nothing and then completes
error( ) — create an Observable that emits nothing and then signals an error
never( ) — create an Observable that emits nothing at all
 package tiii.com.rxandroid;
import android.os.Bundle;
import android.widget.TextView;
import com.trello.rxlifecycle.components.support.RxAppCompatActivity;
import rx.Observable;
import rx.functions.Action1;

public class MainActivity extends RxAppCompatActivity {
    public static final String TAG = MainActivity.class.getSimpleName();
    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);
        Observable
                .just("Hello RxAndroid !!")
                .compose(this.<String>bindToLifecycle())
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        ((TextView) findViewById(R.id.textview)).setText(s);
                    }
                });
    }
}


2. Create 
  • 수동으로 옵져버 메소드 호출하여 새로운 Observable을 생성
  • 언뜻본다면 큰 문제는 없어 보인다.
    물론 Observable.from() 를 사용한다면 subscriber.onComplete() 와 같은 코드를 신경쓰지 않고 더 편하게 사용이 가능하다
    하지만 Observable.just() 나 Observable.from() 와 같은 경우 발행되는 Item 들이 observable 생성시점에 이미 정해져있어야 한다.
    즉 Database 상에서 데이터를 읽어 오는 작업과 같이 비용이 큰 작업들을 비동기로 처리하고자 할 때에는 적절하지 않다.
  • 이처럼 Observable.create() 만으로는 간단한 비동기 처리 흐름을 만들기는 어려운 작업이며 실수할 여지가 매우 많다.
    그래서 많은 개발자들은 Observable.create() 대신 Observable.defer() 를 사용하는걸 추천한다.

Create

Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<super String> subscriber) {
        try {
            subscriber.onNext("Hello_Create");
            subscriber.onCompleted();
        } catch (Exception e) {
            subscriber.onError(e);
        }
    }
})
.compose(mMainView.ActivityLifecycleProvider().bindToLifecycle())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(s -> {
            Log.d(TAG, s);
            mMainView.TextChange(s);
        },
        throwable -> throwable.printStackTrace(),
        () -> {
            Log.d(TAG, "onComplete");
            LogTextView();
        }
);


3. Defer 
  • 구독하기 전까지 Observable을 생성하지 않습니다. 그리고 각각의 옵져버에게 매번 새로운 Observable을 생성합니다.
  • 다른 생성 오퍼레이터와 다른 점이 뭔지 애매했었는데 데이타스트림이 메모리에 할당되는 타이밍이 다른 것이 였습니다. 다른 오퍼레이터들은 오퍼레이터를 선언하는 순간 메모리에 할당 되지만 defer는 subscribe가 호출 될 때에 할당 된다고 합니다.
Defer

 Observable.defer(() -> {
    return SomethingLongTask(); //return Observable<String>
})
.compose(mMainView.ActivityLifecycleProvider().bindToLifecycle());
.observeOn(AndroidSchedulers.mainThread())
.subscribe(s -> {
            Log.d(TAG, s);
            mMainView.TextChange(s);
        },
        throwable -> throwable.printStackTrace(),
        () -> {
            Log.d(TAG, "onComplete");
            LogTextView();
        }
);

  • Observable.defer() 비동기로 observable 을 생성하고 하위 스트림에서 사용할 수 있도록 해준다.
 Observable.defer(() -> {
    List<Book> books = dao.findAll();
    return Observable.just(books);
  })

  .subscribeOn(Schedulers.io())
  .subscribe(books -> {
    // Next Step
  }, throwable -> {
    // Error handling
  });
 여기서는 dao.findAll() 을 통해 반환된 결과를 Observable.just() 를 사용하여 새로운 작업흐름을 만들고 반환하였다.
observable 을 만들기 위한 observable 이 필요하다는 점에서 추가되는 비용이 있지만 Observable.create() 보다는 훨씬 간결한 코드가 만들어졌다.
명시적으로 onComplete() 나 onError() 를 처리해줄 필요가 없으며, subscriber 의 구독상태를 확인할 필요도 없다.

4. Empty/Never/Throw
매우 정확하고 제한적인 행동의 Observable을 생성합니다.
1) Empty
  • 방출하는 아이템이 없고 정상적으로 종료되는 옵저버블을 생성합니다.
    Empty
    2) Never
  • 방출하는 아이템이 없고 종료되지 않는 옵저버블을 생성합니다.
    Never
    3) Throw
  • 방출하는 아이템이 없고 에러를 발생하여 종료되는 옵저버블을 생성합니다.
Throw

5. From
  • 배열이나 Iterable의 요소를 순차적으로 방출 시키는 Observable으로 변환
에서

6. Interval

  • 특정한 시간 간격으로 아이템을 방출하는 Observable을 생성합니다.
  • 일정시간 마다 반복적인 작업이 필요할 때 사용
Interval


  • 7. Just
    • 오브젝트나 오브젝트셋을 바로 방출하는 Oservable으로 변환
    • 만약에 아무것도 하지 않는 옵저버블을 만들기 위해 null 을 넣는다면 null을 방출하는 옵저버블이 만들어 집니다.
    • 아무것도 하지 않은 옵저버블을 원하신다면 empty를 사용하시면 됩니다.
    Just


    8. Range
    • 정수의 순차적인 범위를 가지고 있는 Observable을 생성
    • Interval과 비슷하지만 반복횟수의 제한이 있습니다. m개 만큼의 반복
    Range


    9. Repeat
    • 일정 횟수를 반복하는 Observable을 생성
    • 이 오퍼레이터는 단독으로 사용되지 않고 다른 오퍼레이터 뒤에 붙여서 사용되며 .Repeat(n) 바로 앞 오퍼레이터를 일정횟수 만큼 반복
    Repeat

    10. Start
    Start
    11. Timer
    • 일정 시간의 딜레이 이후에 단일 항목을 방출하는 Observable을 생성
    Timer




  • 출처 : 인터넷에서 RxAndroid 검색하여 필요한 정보를 다양한 사이트에서 종합하여 작성된 것입니다. 많은 사이트 내용을 종합하여 공부하여 작성하다보니 일일이 나열하지 못하였습니다. ㅈㅅ(_ _) 이글은 자유롭게 퍼 가셔서 도움이 되었으면 좋겠습니다. 감사합니다.