레이블이 RxAndroid인 게시물을 표시합니다. 모든 게시물 표시
레이블이 RxAndroid인 게시물을 표시합니다. 모든 게시물 표시

2017년 6월 4일 일요일

[STUDY] RxAndroid Part 9 (Connectable Observable )

[목차]==================================================
1. Connectable Observable 정의 
2. Connectable Observable 사용 예제
======================================================


1. Connectable Observable 정의
 Connectable Observable은 구독을 하더라도 이템 방출을 시작하지 않는다는 점을 제외하면 일반적인 Observable과 비슷합니다. connect()을 호출했을 때에만 방출합니다. 이 방법으로 Subscriber들에게 Observable가 방출을 시작하기전에 Observable구독하도록 기다릴 수 있습니다.
  • ConnectableObservable.connect( ) — Connectable Observable에게 아이템 방출을 시작하라고 지시한다.
  • Observable.publish( ) — Observable을 Connectable Observable으로 변형시킨다.
  • Observable.replay( ) — 모든 Observer들에게 방출이 시작된 후에 구독을 했을 경우라도 같은 순서의 방출된 아이템을 볼 수 있도록 보장합니다.
  • ConnectableObservable.refCount( ) — Connectable Observable을 일반적인 Observable처럼 작동하도록 만듭니다.

 
2. Connectable Observable 사용 예제
아래의 예제코드는 같은 Observable을 구독하는 두개의 subscriber를 보여주는 코드입니다. 첫번째 케이스에서는 일반적인 Observable이고 두번째 케이스에서는 Connectable Observable으로 subscriber가 모두 구독한 이후 연결하였습니다.
 Observable firstMillion  = Observable.range( 1, 1000000 ).sample(7, java.util.concurrent.TimeUnit.MILLISECONDS);

firstMillion.subscribe(
   { println("Subscriber #1:" + it); },       // onNext
   { println("Error: " + it.getMessage()); }, // onError
   { println("Sequence #1 complete"); }       // onCompleted
);

firstMillion.subscribe(
    { println("Subscriber #2:" + it); },       // onNext
    { println("Error: " + it.getMessage()); }, // onError
    { println("Sequence #2 complete"); }       // onCompleted
);
Subscriber #1:211128
Subscriber #1:411633
Subscriber #1:629605
Subscriber #1:841903
Sequence #1 complete
Subscriber #2:244776
Subscriber #2:431416
Subscriber #2:621647
Subscriber #2:826996
Sequence #2 complete

 Observable firstMillion  = Observable.range( 1, 1000000 ).sample(7, java.util.concurrent.TimeUnit.MILLISECONDS).publish();

firstMillion.subscribe(
   { println("Subscriber #1:" + it); },       // onNext
   { println("Error: " + it.getMessage()); }, // onError
   { println("Sequence #1 complete"); }       // onCompleted
);
firstMillion.subscribe(
   { println("Subscriber #2:" + it); },       // onNext
   { println("Error: " + it.getMessage()); }, // onError
   { println("Sequence #2 complete"); }       // onCompleted
);
firstMillion.connect();
Subscriber #2:208683
Subscriber #1:208683
Subscriber #2:432509
Subscriber #1:432509
Subscriber #2:644270
Subscriber #1:644270
Subscriber #2:887885
Subscriber #1:887885
Sequence #2 complete
Sequence #1 complete



출처 : 인터넷에서 RxAndroid 검색하여 필요한 정보를 다양한 사이트에서 종합하여 작성된 것입니다. 많은 사이트 내용을 종합하여 공부하여 작성하다보니 일일이 나열하지 못하였습니다. ㅈㅅ(_ _) 이글은 자유롭게 퍼 가셔서 도움이 되었으면 좋겠습니다. 감사합니다.

[STUDY] RxAndroid Part 8 (Error 핸들링 방법)


[목차]==================================================
1. onError 해주지 않으면 Crash가 발생 주의
2. subscribe 에서 에러 처리
3. Error 캐치 - onErrorReturn
4. Error 캐치 - OnErrorResumeNext
5. Retry
6. Retry 횟수 제한
7. RetryWhen
======================================================


RxJava & RxAndroid 사용 시 항상 에러 핸들러를 구독하고 제공하는지 항상 확인해야 합니다. 그렇지 않으면 특히 Scheduler를 적용할 때 스택 트레이스에 아무것도 없을 수 있습니다. 물론 RxJava & RxAndroid 에서 뭔가 잘못됐다고 알려주긴 하지만 어디서 발생했는지 찾을 방법이 없습니다. 항상 에러 콜백을 사용하고, 에러가 발생한다면 에러를 로그로 남겨서 예상치 못한 오류를 기록해야 합니다.

1. onError 해주지 않으면 Crash가 발생 주의
 Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> subscriber) throws Exception {
                log("subscribe");
                subscriber.onNext("emit 1");
                subscriber.onNext("emit 2");
                subscriber.onError(new Throwable());
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                log("on next: " + s);
            }
        });

2. subscribe 에서 에러 처리

Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> subscriber) throws Exception {
                log("subscribe");
                subscriber.onNext("emit 1");
                subscriber.onNext("emit 2");
                subscriber.onError(new Throwable());
            }
        }).subscribe(new DefaultObserver<String>() {
            @Override
            public void onNext(String value) {
                log("on next: " + value);
            }
            @Override
            public void onError(Throwable e) {

                // 에러시 처리를 여기로 받음
                log("error:" + e);
            }
            @Override
            public void onComplete() {
                log("completed");
            }
        });
[출력결과]
subscribe
on next: emit 1
on next: emit 2
error:java.lang.Throwable

3. Error 캐치 - onErrorReturn
Observable 체인 안에서 발생한 Error 를 캐치해서, 대체할 Object로 변환하는 것으로 subscriber에 Error가 전달되는 것을 막을 수 있다.
 Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> subscriber) throws Exception {
                log("subscribe");
                subscriber.onNext("emit 1");
                subscriber.onNext("emit 2");
                subscriber.onError(new Throwable());            }
        }).onErrorReturn(new Function<Throwable, String>() {
            @Override
            public String apply(Throwable throwable) throws Exception {
                return "return";
            }
        }).subscribe(new DefaultObserver<String>() {
            @Override
            public void onNext(String value) {
                log("on next: " + value);
            }
            @Override
            public void onError(Throwable e) {
                // 에러시 처리를 여기로 받음
                log("error:" + e);
            }
            @Override
            public void onComplete() {
                log("completed");
            }
        });
[출력결과]
subscribe
on next: emit 1
on next: emit 2
on next: return
completed

4. Error 캐치 - OnErrorResumeNext
Observable 체인에서 발생한 Error를 캐치해서, 그 안에서 다시 한 번 Observable를 호출하면 에러시 대체 Stream을 반환할 수 있다.
 Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> subscriber) throws Exception {
                log("subscribe");
                subscriber.onNext("emit 1");
                subscriber.onNext("emit 2");
                subscriber.onError(new Throwable());            }
        }).onErrorResumeNext(new Function<Throwable, ObservableSource<? extends String>>() {            @Override
            public ObservableSource<? extends String> apply(Throwable throwable) throws Exception {
                return Observable.fromArray(new String[]{"resume 1", "resume 2"});
            }
        }).subscribe(new DefaultObserver<String>() {
            @Override
            public void onNext(String value) {
                log("on next: " + value);
            }
            @Override
            public void onError(Throwable e) {
                // 에러시 처리를 여기로 받음
                log("error:" + e);
            }
            @Override
            public void onComplete() {
                log("completed");
            }
        });
[출력결과]
subscribe
on next: emit 1
on next: emit 2
on next: resume 1
on next: resume 2
completed

5. Retry
Error가 일어났을 때, 자동으로 subscribe를 다시 해준다.
성공할때까지 계속... 무한루프 될 가능성이 있으므로 유의해야 한다
 Observable.create(subscriber -> {
            log("subscribe");
            subscriber.onNext("emit 1");
            subscriber.onNext("emit 2");
            subscriber.onError(new Throwable());
        })
        .retry()
        .subscribe()
[출력결과]
subscribe
on next: emit 1
on next: emit 2
subscribe
on next: emit 1
on next: emit 2
subscribe
on next: emit 1
on next: emit 2
.
.
.
반복

6. Retry 횟수 제한
 Observable.create(subscriber -> {
            log("subscribe");
            subscriber.onNext("emit 1");
            subscriber.onNext("emit 2");
            subscriber.onError(new Throwable());
        })
        .retry(3)
        .subscribe()
[출력결과]
subscribe
on next: emit 1
on next: emit 2
subscribe
on next: emit 1
on next: emit 2
subscribe
on next: emit 1
on next: emit 2
subscribe
on next: emit 1
on next: emit 2
error:java.lang.Throwable

Retry 좀 더 구체적인 설정
 Observable.create(subscriber -> {
            log("subscribe");
            subscriber.onNext("emit 1");
            subscriber.onNext("emit 2");
            subscriber.onError(new Throwable());        })
        .retry(new BiPredicate<Integer, Throwable>() {
            @Override
            public boolean test(Integer integer, Throwable throwable) throws Exception {
                if (integer < 3) {
                    return true;
                }
                return throwable instanceof IllegalStateException;
            }
        })

        .subscribe(s -> log("on next: " + s)
            , e -> log("error:" + e)
            , () -> log("completed"));
[출력결과]
subscribe
on next: emit 1
on next: emit 2
subscribe
on next: emit 1
on next: emit 2
subscribe
on next: emit 1
on next: emit 2
error:java.lang.Throwable

7. RetryWhen
보다 세밀하게 retry 처리를 제어하기 위한 함수.
 Observable.create(subscriber -> {
            log("subscribe");
            subscriber.onNext("emit 1");
            subscriber.onNext("emit 2");
            subscriber.onError(new Throwable());        }).retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
            @Override
            public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {
                return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
                    @Override
                    public ObservableSource<?> apply(Throwable throwable) throws Exception {
                        return Observable.timer(3, TimeUnit.SECONDS);
                    }
                });
            }
        }).subscribe(s -> log("on next: " + s)
                        , e -> log("error:" + e)
                        , () -> log("completed"));
[출력결과]
subscribe
on next: emit 1
on next: emit 2
// 3초 후
subscribe
on next: emit 1
on next: emit 2
// 3초 후
subscribe
on next: emit 1
on next: emit 2
.
.
.
반복

그냥 Error인채로 종료
 Observable.create(subscriber -> {
            log("subscribe");
            subscriber.onNext("emit 1");
            subscriber.onNext("emit 2");
            subscriber.onError(new Throwable());
        }).subscribeOn(AndroidSchedulers.mainThread())
                .retryWhen(throwableObservable -> throwableObservable.flatMap(
                        throwable -> Observable.error(throwable)
                ))
.subscribe(s -> log("on next: " + s)
                , e -> log("error:" + e)
                , () -> log("completed"));

Error에 대한 처리를 하지 않고 Complete하기
 Observable.create(subscriber -> {
            log("subscribe");
            subscriber.onNext("emit 1");
            subscriber.onNext("emit 2");
            subscriber.onError(new Throwable());
        }).onErrorResumeNext(throwable -> {Observable.empty();})                .subscribe(s -> log("on next: " + s)
                , e -> log("error:" + e)
                , () -> log("completed"));
[출력결과]
subscribe
on next: emit 1
on next: emit 2
completed

3번 retry하고 종료
이 경우, 앞의 retry(count) 함수와의 차이는 retry(count)에서는 retry 횟수가 제한에 도달한 후에 error로 종료합니다만, 이 케이스는 completed 에서 종료한다는 점이다.
 Observable.create(subscriber -> {
            log("subscribe");
            subscriber.onNext("emit 1");
            subscriber.onNext("emit 2");
            subscriber.onError(new Throwable());        }).retryWhen(throwableObservable -> throwableObservable.take(3))                .subscribe(s -> log("on next: " + s)
                , e -> log("error:" + e)
                , () -> log("completed"));
[출력결과]
subscribe
on next: emit 1
on next: emit 2
subscribe
on next: emit 1
on next: emit 2
subscribe
on next: emit 1
on next: emit 2
completed

3초 retry를 3번 하고 종료하기
 Observable.create(subscriber -> {
            log("subscribe");
            subscriber.onNext("emit 1");
            subscriber.onNext("emit 2");
            subscriber.onError(new Throwable());
        }).retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
            @Override
            public ObservableSource<?> apply(@NonNull Observable<Throwable> throwableObservable) throws Exception {
                return throwableObservable.take(3).flatMap(new Function<Throwable, ObservableSource<?>>() {
                    @Override
                    public ObservableSource<?> apply(@NonNull Throwable throwable) throws Exception {
                        return Observable.timer(3, TimeUnit.SECONDS);
                    }
                });
        
    }
        }).subscribe(s -> log("on next: " + s)
                        , e -> log("error:" + e)
                        , () -> log("completed"));
[출력결과]
subscribe
on next: emit 1
on next: emit 2
// 3초 후
subscribe
on next: emit 1
on next: emit 2
// 3초 후
subscribe
on next: emit 1
on next: emit 2
completed

 


 출처 : 인터넷에서 RxAndroid 검색하여 필요한 정보를 다양한 사이트에서 종합하여 작성된 것입니다. 많은 사이트 내용을 종합하여 공부하여 작성하다보니 일일이 나열하지 못하였습니다. ㅈㅅ(_ _) 이글은 자유롭게 퍼 가셔서 도움이 되었으면 좋겠습니다. 감사합니다. 

[STUDY] RxAndroid Part 7 (Scheduler 이해하기)


[목차]==================================================
1. RxJava와 RxAndroid에서 제공하는 Scheduler 종류
1) RxJava가 제공하는 Scheduler
2) RxAndroid는 제공하는 Scheduler
2. 독립적으로 사용 가능한 Scheduler
3. 사용법 예제
1) Scheduler 안한 경우 기본 동작
2) subscribeOn() API만 사용한 경우 동작
3) observerOn() API만 사용한 경우 동작
4) subscribeOn() & observerOn() 1번씩 API 사용한 경우 동작
5) subscribeOn() & observerOn() 다수 API 사용한 경우 동작
======================================================


RxAndroid에서는 Scheduler를 통해 어느 쓰레드에서 실행이 될지 결정 할 수 있습니다.Scheduler는 subsctibeOn(), observeOn() 에서 각각 지정할 수 있는데
subsctibeOn()은 observable의 작업을 시작하는 쓰레드를 선택 할 수 있습니다.( 중복해서 적을 경우 가장 마지막에 적힌 스레드에서 시작합니다.)
observeOn()은 이후에 나오는 Operator, subscribe의 Scheduler를 변경 할 수 있습니다.

1. RxJava와 RxAndroid에서 제공하는 Scheduler 종류
1) RxJava가 제공하는 Scheduler
  • Schedulers.computation()
이벤트 룹에서 간단한 연산이나 콜백 처리를 위해서 쓰는 것입니다. I/O 처리를 여기에서 해서는 안됩니다.
RxComputationThreadPool라는 별도의 스레드 풀에서 돌아갑니다. 최대 cpu갯수 ?개의 스레드 풀이 순환하면서 실행됩니다.
  • Schedulers.from(executor)
특정 executor를 스케쥴러로 사용합니다
  • Schedulers.immediate
현재 스레드에서 즉시 수행합니다.
observeOn()이 여러번 쓰였을 경우 immediate()를 선언한 바로 윗쪽의 스레드를 따라갑니다.
  • Schedulers.io()
동기 I/O를 별도로 처리시켜 비동기 효율을 얻기 위한 스케줄러입니다. 자체적인 스레드 풀에 의존합니다.
자체적인 스레드 풀 CachedThreadPool을 사용합니다. API 호출 등 네트워크를 사용한 호출 시 사용됩니다.
  • Schedulers.newThread()
항상 새로운 스레드를 만드는 스케쥴러입니다.
  • Schedulers.trampoline()
큐에 있는 일이 끝나면 이어서 현재 스레드에서 수행하는 스케쥴러

※ 일부 오퍼레이터들은 자체적으로 어떤 스케쥴러를 사용할지 지정합니다. 예를 들어 buffer 오퍼레이터는 Schedulers.computation()에 의존하며 repeat은 Schedulers.trampoline()를 사용합니다.


2) RxAndroid는 제공하는 Scheduler
  • AndroidSchedulers.mainThread()
안드로이드의 UI 스레드에서 동작
  • HandlerScheduler.from(handler)
특정 핸들러 handler에 의존하여 동작

※ 안드로이드에 특화된 스케쥴러입니다. 보통은 RxAndroid가 제공하는 AndroidSchedulers.mainThread()와 RxJava가 제공하는 Schedulers.io()를 조합해서 Schedulers.io()에서 수행한 결과를 AndroidSchedulers.mainThread()에서 받아 UI에 반영하는 패턴등이 일반적으로 쓰입니다.


2. 독립적으로 사용 가능한 Scheduler
Scheduler는 Observable, Operator, Subscriber 모델 밖에서 별도로 사용할 수 있습니다.
 worker = Schedulers.newThread().createWorker();

worker.schedule(new Action0() {
    @Override
    public void call() {
        realmJob();
    }
});

 Scheduler.Worker worker = Schedulers.newThread().createWorker();
        worker.schedule(new Runnable() {
            @Override
            public void run() {
                log("worker: " + Thread.currentThread().getName());
            }
        });
[출력결과]
worker: RxNewThreadScheduler-1


3. 사용법 예제
1) Scheduler 안한 경우 기본 동작
Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                log("subscribe:" + Thread.currentThread().getName());
                e.onNext("next");
            }
        }).map(s1 -> {
            log("map: " + Thread.currentThread().getName());
            return s1;
        }).subscribe(s -> log("on next: " + Thread.currentThread().getName()));
[출력결과]
subscribe:main
map: main
on next: main

2) subscribeOn() API만 사용한 경우 동작
Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                log("subscribe:" + Thread.currentThread().getName());
                e.onNext("next");
            }
        }).subscribeOn(Schedulers.computation()).map(s1 -> {
            log("map: " + Thread.currentThread().getName());
            return s1;
        }).subscribe(s -> log("on next: " + Thread.currentThread().getName()));
[출력결과]
subscribe:RxComputationThreadPool-1
map: RxComputationThreadPool-1
on next: RxComputationThreadPool-1

3) observerOn() API만 사용한 경우 동작
Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                log("subscribe:" + Thread.currentThread().getName());
                e.onNext("next");
            }
        }).observeOn(Schedulers.computation()).map(s1 -> {
            log("map: " + Thread.currentThread().getName());
            return s1;
        }).subscribe(s -> log("on next: " + Thread.currentThread().getName()));
[출력결과]
subscribe:main
map: RxComputationThreadPool-1
on next: RxComputationThreadPool-1

4) subscribeOn() & observerOn() 1번씩 API 사용한 경우 동작
Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                log("subscribe:" + Thread.currentThread().getName());
                e.onNext("next");
            }
        }).subscribeOn(Schedulers.io())
              .observeOn(AndroidSchedulers.mainThread())
                .map(s1 -> {
                        log("map: " + Thread.currentThread().getName());
                        return s1;
         }).subscribe(s -> log("on next: " + Thread.currentThread().getName()));
[출력결과]
subscribe:RxCachedThreadScheduler-1
map: main
on next: main

5) subscribeOn() & observerOn() 다수 API 사용한 경우 동작
 Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                log("subscribe:" + Thread.currentThread().getName());
                e.onNext("next");
            }
        }).subscribeOn(Schedulers.io())
          .subscribeOn(Schedulers.computation())
          .observeOn(AndroidSchedulers.mainThread()).map(s1 -> {
                        log("map1: " + Thread.currentThread().getName());
                        return s1;
         }).observeOn(Schedulers.newThread()).map(s2 -> {
                        log("map2: " + Thread.currentThread().getName());
                        return s2;
         }).observeOn(Schedulers.single()).map(s3 -> {
                        log("map3: " + Thread.currentThread().getName());
                        return s3;
         }).subscribe(s -> log("on next: " + Thread.currentThread().getName()));
[출력결과]
subscribe:RxCachedThreadScheduler-1
map1: main
map2: RxNewThreadScheduler-1
map3: RxSingleScheduler-1
on next: RxSingleScheduler-1



 
출처 : 인터넷에서 RxAndroid 검색하여 필요한 정보를 다양한 사이트에서 종합하여 작성된 것입니다. 많은 사이트 내용을 종합하여 공부하여 작성하다보니 일일이 나열하지 못하였습니다. ㅈㅅ(_ _) 이글은 자유롭게 퍼 가셔서 도움이 되었으면 좋겠습니다. 감사합니다. 

[STUDY] RxAndroid Part 6 (Subject 사용 - PublishSubject/BehaviorSubject)

[목차]==================================================
1. PublishSubject
2. BehaviorSubject
3. ReplaySubject
4. AsyncSubject
======================================================


Subject 는 Observable + Subscriber 로 종종 표현되고는 합니다만, 정확한 표현은 아니라고 생각합니다. (왜냐하면 일반적인 Subscriber 처럼 subscribe()에 직접 사용되는 경우는 거의 없습니다.)
물론 Subject 는 Observable 과 Subscriber 를 모두 implementation 하고 있으니 틀린 이야기는 아닙니다만,
저는 Subject 는 이벤트를 전달받아 구독자들에게 이벤트를 전파하는 중간다리라고 하는게 좀 더 정확한 표현이라고 생각합니다.
onNext()로 전달받은 이벤트를 구독자들에게 전파하며, onCompleted()나 onError()를 받으면 이것 역시 구독자들에게 전파하고 Observable로의 역할을 종료하게 됩니다.
Android 에서는 EventBus 와 같은 형태로도 사용이 가능합니다. 즉 RxJava 를 사용하면 다른 EventBus 라이브러리가 불필요해집니다.
EventBus 언급을 한것에서 살짝 힌트를 받으셨겠지만 Subject 들은 보통 onCompleted 와 같이 종료하는 과정이 없이, 액티비티 라이프사이클(또는 앱 라이프사이클)과 동일하게 살아서 이벤트를 전파하는 역할로 자주 사용됩니다.
여기서는 PublishSubject 와 BehaviorSubject 만 언급하고 넘어가겠습니다만, 다른 Subject 들도 무척 유용하니 기회가 된다면 한번씩 사용해보시는걸 권장해드립니다.

1. PublishSubject
PublishSubject 를 구독한 시점으로부터 이후에 발생하는 이벤트들을 전달받음
     public void publishSubject() {
        PublishSubject<String> subject = PublishSubject.create();
        subject.onNext("첫번째 호출");
        subject.onNext("두번째 호출");

        subject.subscribe(text -> {
            System.out.println("onNext : " + text);
        });

        subject.onNext("세번째 호출");
        subject.onNext("네번째 호출");
    }
     // 결과
    onNext : 세번째 호출
    onNext : 네번째 호출


2. BehaviorSubject
BehaviorSubject 는 PublishSubject 와 비슷합니다만, 구독전에 한건이라도 이벤트가 발생했다면 구독시점에 해당 이벤트도 같이 전달받음
가장 최근에 관찰된 아이템과 그 후에 관찰된 나머지 아이템을 구독하는 옵저버에게 발행

     public void behaviorSubject() {
        BehaviorSubject<String> subject = BehaviorSubject.create();
        subject.onNext("첫번째 호출");
        subject.onNext("두번째 호출");
        subject.subscribe(text -> {
            System.out.println("onNext : " + text);
        });
        subject.onNext("세번째 호출");
        subject.onNext("네번째 호출");
    }
     // 결과
    onNext : 두번째 호출
    onNext : 세번째 호출
    onNext : 네번째 호출


3. ReplaySubject
관찰한 모든 아이템을 버퍼에 저장하고 구독하는 옵저버에게 재생
(보완필요)

4. AsyncSubject
옵저버블이 완료됐을 때 구독하고 있는 각 옵저버에게 관찰한 마지막 아이템만을 발행
(보완필요)



출처 : 인터넷에서 RxAndroid 검색하여 필요한 정보를 다양한 사이트에서 종합하여 작성된 것입니다. 많은 사이트 내용을 종합하여 공부하여 작성하다보니 일일이 나열하지 못하였습니다. ㅈㅅ(_ _) 이글은 자유롭게 퍼 가셔서 도움이 되었으면 좋겠습니다. 감사합니다

[STUDY] RxAndroid Part 5 (Observable 변형/가공/합성 - map/flatMap/zip)

[목차]==================================================
1. Observable 변형 및 가공 
1) map()
2) flatMap()
3) Filter()
4) 기타
2. Observable 합성
1) zip()
2) 기타
======================================================


1. Observable 변형 및 가공
발생되는 이벤트를 다른 형태로 변형하기를 원하실 수도 있습니다.
가장 많이 사용되는건 map 과 flatMap 입니다.

1) map()
map() 함수를 사용하여 전달받은 이벤트를 다른값으로 변경합니다.
     public void map() {
        Observable.from(new String[] { "개미", "매", "마루" })
            .map(text -> "** " + text + " **")
            .subscribe(
                text -> System.out.println("onNext : " + text),
                e -> System.out.println("onError"),
                () -> System.out.println("onCompleted"));
    }
     // 결과
    onNext : ** 개미 **
    onNext : ** 매 **
    onNext : ** 마루 **
    onCompleted

Observable.just("hello world")
        .map(new Func1<String, String>() {
            @Override
            public String call(String s) {
                return s + "RxJava";
            }
        })
        .subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                Log.i(TAG, ">>> onNextAction  : " + s);
            }
        })

2) flatMap()
flatMap()은 전달받은 이벤트로부터 다른 Observable 들을 생성하고, 그 Observable 들에서 발생한 이벤트들을 쭉 펼쳐서 전파합니다.
    public void flatMap() {
        Observable.from(new String[] { "개미", "매", "마루" })
            .flatMap(
                text -> Observable.from(new String[] { text + " 사랑합니다.", text + " 고맙습니다." })

            )
            .subscribe(
                text -> System.out.println("onNext : " + text),
                e -> System.out.println("onError"),
                () -> System.out.println("onCompleted"));
    }
    // 결과
    onNext : 개미 사랑합니다.
    onNext : 개미 고맙습니다.
    onNext : 매 사랑합니다.
    onNext : 매 고맙습니다.
    onNext : 마루 사랑합니다.
    onNext : 마루 고맙습니다.
    onCompleted

3) Filter()
데이터 filter 역할
  Observable.just("hello world")
        .filter(new Func1<String, Boolean>() {
            @Override
            public Boolean call(String s) {
                if (s.contains("hello")) {
                    return true;
                } else {
                    return false;
                }
            }
        })
        .subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                Log.i(TAG, ">>> onNextAction  : " + s);
            }
        });
위와 같이 filter 함수를 보면 observable 을 통해서 넘어온 데이터에 hello 가 포함되어있는지를 확인 하고 있다.
포함되어 있지 않으면 false 가 넘어가게 되어서 subscribe 가 호출되지 않는다.

4) 기타
map() / 특정 Func 객체를 받아서 옵저버블이 발행하는 모든 값에 Func 객체를 적용
flatMap() / 모든 발행한 값을 하나의 최종 옵저버블로 병합하는 방식으로 시퀀스를 플랫하게 만드는 방법을 제공
concatMap() / 발행한 값을 병합하는 대신, 발행한 값을 연쇄적으로 연결할 수 있는 플랫한 함수를 제공해 flatMap()의 인터리빙 문제를 해결
flatMapIterable() / 소스 아이템과 생성된 옵저버블 간에 쌍이 되는 것이 아니라 소스 아이템과 생성된 이터러블이 한 쌍이 됨
switchMap() / 소스 옵저버블에서 새로운 아이템을 발행할 때마다 먼저 발행한 아이템이 생성한 옵저버블의 구독을 해지하고 미러링을 중지한 다음 새로 발행한 아이템을 미러링
scan() / 옵저버블이 발행하는 각 아이템에 함수를 적용해 함수 결과를 계산한 다음, 결과를 옵저버블 시퀀스에 다시 주입해 다음번에 발행되는 값과 함께 사용하기 위해 대기
groupBy() / 특정 기준에 따라 리스트의 요소를 분류하는 함수
buffer() / 소스 옵저버블을 새로운 옵저버블로 변환하며, 새로운 옵저버블은 하나의 아이템이 아닌 리스트 형태로 값을 발행
window() / buffer()와 유사하지만, 리스트가 아닌 옵저버블을 발행
cast() / map() 연산자의 특수 버전으로 소스 옵저버블의 각 아이템을 다른 Class로 캐스팅해 새로운 타입으로 변환

filter(return E) / return E값이 true일 때만 값이 발행되고 옵저버에게 전달
take(n) / 옵저버블을 1번째부터 n번째까지만 발행 -> 쉽게 알 수 있는 아이템의 소규모 그룹을 얻기 위해 사용
takeLast(n) / 옵저버블의 마지막 요소 n개만큼 발행
distinct() / 특정 값을 단 한번만 처리
distinctUntilChanged() / 모든 중복 값은 무시하고 새로운 값만 발행
first() / 옵저버블이 발행한 요소에서 첫번째 요소만을 발행
last() / 옵저버블이 발행한 요소에서 마지막 요소만을 발행
skip(n) / 옵저버블이 발행한 요소에서 처음 n개의 요소를 숨김
skipLast(n) / 옵저버블이 발행한 요소에서 마지막 n개의 요소를 숨김
elementAt(n) / 시퀀스에서 n번째 요소만을 발행한 후 시퀀스를 완료 (elementAtOrDefault() 는 해당 시퀀스가 없을 경우도 포함)
sample(n, time) / n(time)마다 마지막으로 발행된 값을 발행 / 첫 번째 아이템을 발행시키기 위해서는 throttleFirst() 사용
timeout(n, time) / 옵저버블 시퀀스 소스에 반영하며 지정한 시간 내에 아무런 값을 받지 못할 경우 에러를 발행
debounce(n, time) / 옵저버블에서 아이템이 발행된 다음 바로 뒤따라서 발행된 아이템을 필터링하고, 옵저버블에서 일정 시간 동안 다른 아이템이 발행되지 않으면 아이템을 발행



2. Observable 합성
두개 이상의 Observable 을 합성해야 하는 경우도 있다.
data-flow 에 기반한 개발에서 매우 자주 언급되고 사용되기는 하지만, 일반적인 비동기작업에서는 자주 사용하는 개념은 아니다.
하지만 알고 있으면 종종 사용하게되는 유용한 도구들이니 한번쯤 살펴보고 넘어가시는걸 추천

1) zip()
네트워크 작업으로 사용자의 프로필과 프로필 이미지를 동시에 요청하고, 그 결과를 합성해서 화면에 표현해준다거나 하는 형태의 작업이 필요한 경우 zip() 유용하게 사용
     public void zip() {
        Observable.zip(
            Observable.just("개미"),
            Observable.just("gaemi.jpg"),
            (profile, image) -> "프로필 : " + profile + ", 이미지 : " + image
        ).subscribe(
            print -> System.out.println("onNext : " + print),
            e -> System.out.println("onError"),
            () -> System.out.println("onCompleted")
        );
    }
     // 결과
    onNext : 프로필 : 개미, 이미지 : gaemi.jpg
    onCompleted

2) 기타
merge() / 옵저버블이 발행한 아이템을 병합하는 방식으로 2개 이상의 옵저버블을 결합
zip() / 여러 옵저버블에서 발행한 아이템을 결합하고, 특정한 함수인 Func에 따라 아이템을 변환한 다음 새로운 값을 발행
join() / merge()와 zip()은 발행한 아이템의 도메인에서 동작하기 때문에 값을 처리하는 방법을 결정하기 전에 시간을 고려해야 하는 시나리오가 있을 수 있는데, 해당 함수를 사용함으로 타임 윈도와 함께 동작해 두 옵저버블의 아이템을 결합 / 간단한 상황에서는 문자열과 동작하며, 단순히 발행한 문자열열을 하나의 최종 문자열로 조인하는 연산자도 있다.
combineLatest() / zip()은 두 옵저버블의 가장 최근 언집된 아이템에서 동작하는 대신, 해당 함수는 가장 최근에 발행한 아이템에서 동작
and(), then(), when()  /해당 함수들을 사용함으로써 패턴과 플랜 같은 구조체를 사용해 발행한 아이템을 결합
switch() / 옵저버블을 발행하는 옵저버블을 가장 최근 발행한 옵저버블을 발행하는 옵저버블로 변환
startWith() / 옵저버블이 아이템 발행을 시작하기 전에 인자로 전달받은 아이템의 시퀀스로 발행
 


출처 : 인터넷에서 RxAndroid 검색하여 필요한 정보를 다양한 사이트에서 종합하여 작성된 것입니다. 많은 사이트 내용을 종합하여 공부하여 작성하다보니 일일이 나열하지 못하였습니다. ㅈㅅ(_ _) 이글은 자유롭게 퍼 가셔서 도움이 되었으면 좋겠습니다. 감사합니다.

2017년 6월 3일 토요일

[STUDY] RxAndroid Part 4 (Observable - Create/Defer/Empty/Never/Throw/Never/Throw/From/Interval 등)


[목차]==================================================
1. Observable을 만드는 Operator 목록
2. Create 
3. Defer
4. Empty/Never/Throw
5. From
6. Interval
7. Just
8. Range
9. Repeat
10. Start
11. Timer
======================================================


Observable을 좀 더 쉽게 만들 수 있는 방법이 있습니다.
바로 미리  생성된 Operator를 사용하는 것 입니다


1. Observable을 만드는 Operator 목록

just( ) — convert an object or several objects into an Observable that emits that object or those objects
from( ) — convert an Iterable, a Future, or an Array into an Observable
repeat( ) — create an Observable that emits a particular item or sequence of items repeatedly
repeatWhen( ) — create an Observable that emits a particular item or sequence of items repeatedly, depending on the emissions of a second Observable
create( ) — create an Observable from scratch by means of a function
defer( ) — do not create the Observable until a Subscriber subscribes; create a fresh Observable on each subscription
range( ) — create an Observable that emits a range of sequential integers
interval( ) — create an Observable that emits a sequence of integers spaced by a given time interval
timer( ) — create an Observable that emits a single item after a given delay
empty( ) — create an Observable that emits nothing and then completes
error( ) — create an Observable that emits nothing and then signals an error
never( ) — create an Observable that emits nothing at all
 package tiii.com.rxandroid;
import android.os.Bundle;
import android.widget.TextView;
import com.trello.rxlifecycle.components.support.RxAppCompatActivity;
import rx.Observable;
import rx.functions.Action1;

public class MainActivity extends RxAppCompatActivity {
    public static final String TAG = MainActivity.class.getSimpleName();
    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);
        Observable
                .just("Hello RxAndroid !!")
                .compose(this.<String>bindToLifecycle())
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        ((TextView) findViewById(R.id.textview)).setText(s);
                    }
                });
    }
}


2. Create 
  • 수동으로 옵져버 메소드 호출하여 새로운 Observable을 생성
  • 언뜻본다면 큰 문제는 없어 보인다.
    물론 Observable.from() 를 사용한다면 subscriber.onComplete() 와 같은 코드를 신경쓰지 않고 더 편하게 사용이 가능하다
    하지만 Observable.just() 나 Observable.from() 와 같은 경우 발행되는 Item 들이 observable 생성시점에 이미 정해져있어야 한다.
    즉 Database 상에서 데이터를 읽어 오는 작업과 같이 비용이 큰 작업들을 비동기로 처리하고자 할 때에는 적절하지 않다.
  • 이처럼 Observable.create() 만으로는 간단한 비동기 처리 흐름을 만들기는 어려운 작업이며 실수할 여지가 매우 많다.
    그래서 많은 개발자들은 Observable.create() 대신 Observable.defer() 를 사용하는걸 추천한다.

Create

Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<super String> subscriber) {
        try {
            subscriber.onNext("Hello_Create");
            subscriber.onCompleted();
        } catch (Exception e) {
            subscriber.onError(e);
        }
    }
})
.compose(mMainView.ActivityLifecycleProvider().bindToLifecycle())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(s -> {
            Log.d(TAG, s);
            mMainView.TextChange(s);
        },
        throwable -> throwable.printStackTrace(),
        () -> {
            Log.d(TAG, "onComplete");
            LogTextView();
        }
);


3. Defer 
  • 구독하기 전까지 Observable을 생성하지 않습니다. 그리고 각각의 옵져버에게 매번 새로운 Observable을 생성합니다.
  • 다른 생성 오퍼레이터와 다른 점이 뭔지 애매했었는데 데이타스트림이 메모리에 할당되는 타이밍이 다른 것이 였습니다. 다른 오퍼레이터들은 오퍼레이터를 선언하는 순간 메모리에 할당 되지만 defer는 subscribe가 호출 될 때에 할당 된다고 합니다.
Defer

 Observable.defer(() -> {
    return SomethingLongTask(); //return Observable<String>
})
.compose(mMainView.ActivityLifecycleProvider().bindToLifecycle());
.observeOn(AndroidSchedulers.mainThread())
.subscribe(s -> {
            Log.d(TAG, s);
            mMainView.TextChange(s);
        },
        throwable -> throwable.printStackTrace(),
        () -> {
            Log.d(TAG, "onComplete");
            LogTextView();
        }
);

  • Observable.defer() 비동기로 observable 을 생성하고 하위 스트림에서 사용할 수 있도록 해준다.
 Observable.defer(() -> {
    List<Book> books = dao.findAll();
    return Observable.just(books);
  })

  .subscribeOn(Schedulers.io())
  .subscribe(books -> {
    // Next Step
  }, throwable -> {
    // Error handling
  });
 여기서는 dao.findAll() 을 통해 반환된 결과를 Observable.just() 를 사용하여 새로운 작업흐름을 만들고 반환하였다.
observable 을 만들기 위한 observable 이 필요하다는 점에서 추가되는 비용이 있지만 Observable.create() 보다는 훨씬 간결한 코드가 만들어졌다.
명시적으로 onComplete() 나 onError() 를 처리해줄 필요가 없으며, subscriber 의 구독상태를 확인할 필요도 없다.

4. Empty/Never/Throw
매우 정확하고 제한적인 행동의 Observable을 생성합니다.
1) Empty
  • 방출하는 아이템이 없고 정상적으로 종료되는 옵저버블을 생성합니다.
    Empty
    2) Never
  • 방출하는 아이템이 없고 종료되지 않는 옵저버블을 생성합니다.
    Never
    3) Throw
  • 방출하는 아이템이 없고 에러를 발생하여 종료되는 옵저버블을 생성합니다.
Throw

5. From
  • 배열이나 Iterable의 요소를 순차적으로 방출 시키는 Observable으로 변환
에서

6. Interval

  • 특정한 시간 간격으로 아이템을 방출하는 Observable을 생성합니다.
  • 일정시간 마다 반복적인 작업이 필요할 때 사용
Interval


  • 7. Just
    • 오브젝트나 오브젝트셋을 바로 방출하는 Oservable으로 변환
    • 만약에 아무것도 하지 않는 옵저버블을 만들기 위해 null 을 넣는다면 null을 방출하는 옵저버블이 만들어 집니다.
    • 아무것도 하지 않은 옵저버블을 원하신다면 empty를 사용하시면 됩니다.
    Just


    8. Range
    • 정수의 순차적인 범위를 가지고 있는 Observable을 생성
    • Interval과 비슷하지만 반복횟수의 제한이 있습니다. m개 만큼의 반복
    Range


    9. Repeat
    • 일정 횟수를 반복하는 Observable을 생성
    • 이 오퍼레이터는 단독으로 사용되지 않고 다른 오퍼레이터 뒤에 붙여서 사용되며 .Repeat(n) 바로 앞 오퍼레이터를 일정횟수 만큼 반복
    Repeat

    10. Start
    Start
    11. Timer
    • 일정 시간의 딜레이 이후에 단일 항목을 방출하는 Observable을 생성
    Timer




  • 출처 : 인터넷에서 RxAndroid 검색하여 필요한 정보를 다양한 사이트에서 종합하여 작성된 것입니다. 많은 사이트 내용을 종합하여 공부하여 작성하다보니 일일이 나열하지 못하였습니다. ㅈㅅ(_ _) 이글은 자유롭게 퍼 가셔서 도움이 되었으면 좋겠습니다. 감사합니다.