본문
160812(금) - RxJava
RxJava - 2. Observable
Observer Pattern
- GoF의 디자인 패턴(Design Patterns : Elements of Reusable Object-Oriented Software) 에서 거론
- 행위 패턴
- 1 : 다 의존성으로 묶는 방법 제공
- 한 객체가 변경되면 의존된 모든 객체는 자동으로 알림을 받고 갱신
Subject
- 특별한 객체
- Subject가 변경됐을 때, 알림을 받기를 원하는 객체의 리스트를 가지고 있다.
Observer
- Subject가 자신의 상태가 변경되면 호출하는 알림 메소드를 드러내고 있음
사용 시기
- 서로 의존적인 2개의 개체를 가진 구조에서, 이를 분리하거나 독립적으로 재사용 하고싶은 경우
- 변경사항이 있는 객체가 자신의 변경사항을 다수의 객체에게 알려야 하는 경우
- 변경사항을 불특정 다수의 객체에게 추정없이 알려야하는 경우
RxJava Observer Pattern
- Observable
- 기존 자바의 비동기 작업
Thread, Future, FutureTask, CompletableFuture 등의 고전적인 클래스 제공
복잡도가 증가할수록 지저분해지고 유지하기 어려워 진다.
메소드 체이닝 미지원
- Observable
위의 문제를 해결하기 위해 등장
유연하고 사용하기 쉬움
메소드 체이닝 가능
단일 스칼라, 시퀸스 값 발행, 무한 스트림 발행 등에 사용가능
- 생명주기
데이터 가져오기
onNext(T)
오류 발견
onError(Throwable)
완료
onCompleted()
※ Iterable에서 소비자는 생산자에게서 동기 상태로 값을 풀링하고 값이 도착하기 전까지 스레드가 차단된다.
BUT! 옵저버블의 생산자는 값이 이용가능해지면 옵저버에게 비동기적으로 푸시함.(유연한 접근법)
- 핫 옵저버블
아이템이 생성되자마자 발행 시작
옵저버는 중간쯤에서야 시퀸스를 관찰 가능
- 콜드 옵저버블
옵저버가 구독할 때까지 대기
처음부터 옵저버가 모든 시퀸스 볼 수 있음을 보장
- Observable.create()
create()
개발자가 새로운 옵저버블 생성 가능하도록 기능 제공
Observable.create(new Observable.OnSubscribe<Object>() {
@Overide
public void call(Subscriber<? super Object> subscriber) {
// 옵저버가 옵저버블을 구독하면 call() 함수를 실행
// Observable은 subscriber 변수를 통해 옵저버와 통신하며, 상황에 따라
}
});
(ex)
Observable<Integer> observableString =
Observable.create(new Observable.OnSubscribe<Integer>() {
@Overide
public void call(Subsriber<? super Integer> observer) {
for(int i = 0; i < 5; i++) {
observer.onNext(i);
}
observer.onCompleted();
}
});
Subscription subscriptionPrint =
observableString.subscribe(new Observer<Integer>() {
@Overide
public void onCompleted() {
System.out.println("Observable completed");
}
@Overide
public void onError(Throwable e) {
System.out.println("Oh no! Something wrong happend!");
}
@Overide
public void onNext(Integer item) {
System.out.println("Item is " + item);
}
});
- Observable.from()
from()
이미 생성된 리스트로부터 옵저버블 시퀸스를 생성한다.
List<Integer> items = new ArrayList<Integer>();
items.add(1);
items.add(10);
items.add(100);
items.add(200);
Observable<Integer> observableString = Observable.from(items);
Subscription subscriptionPrint =
observableString.subscribe(new Observer<Integer>() {
@Overide
public void onCompleted() {
System.out.println("Observable completed");
}
@Overide
public void onError(Throwable e) {
System.out.println("Oh no! Something wrong happend!");
}
@Overide
public void onNext(Integer item) {
System.out.println("Item is " + item);
}
});
- Observable.just()
just()
- 이미 갖고 있는 자바 함수를 옵저버블로 변환 가능. 많은 boilerplate code를 줄일 수 있다.
- 10개의 인자를 가질 수 있고, 전달받은 인자 순서와 동일한 순서로 값을 발행한다.
- List나 Array를 가질 수 있지만, 리스트를 순회하면서 모든 값을 반환하지는 않고 리스트 전체를 반환
private String helloWorld() {
return "Hello World";
}
Observable<String> observableString = Observable.just(helloWorld());
Subscription subscriptionPrint =
observableString.subscribe(new Observer<String>() {
@Overide
public void onCompleted() {
System.out.println("Observable completed");
}
@Overide
public void onError(Throwable e) {
System.out.println("Oh no! Something wrong happend!");
}
@Overide
public void onNext(String message) {
System.out.println(message);
}
});
- Observable.empty()
아무것도 발행하지 않으면서 정상적으로 종료되는 옵저버블
- Observable.never()
아무것도 발행하지 않으면서 종료도 안되는 옵저버블
- Observable.throw()
아무것도 발행하지 않으면서 에러와 함께 종료되는 옵저버블
-Subject
= Observable + Observer
- 옵저버블이면서 동시에 옵저버
- 두 세계를 연결하는 다리 역할
- PublishSubject
기본적인 Subject 객체
PublishSubject<String> stringPublishSubject = PublishSubject.create();
Subscription subscriptionPrint =
stringPublishSubject.subscribe(new Oberver<String>() {
@Override
public void onCompleted() {
System.out.println("Observable completed");
}
@Override
public void onError(Throwable e) {
System.out.println("Oh no! Something wrong happend!");
}
@Override
public void onNext(String message) {
System.out.println(message);
}
});
stringPublishSubject.onNext("Hello World");
- private Observable
final PublishSubject<Boolean> subject = PublishSubject.create();
subject.subscribe(new Observer<Boolean>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Boolean aBoolean) {
System.out.println("Observable completed!");
}
});
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for(int i = 0; i < 5; i++) {
subscriber.onNext(i);
}
subscriber.onCompleted();
}
}).doOnCompleted(new Action0() { // 옵저버블이 종료될 때 어떤 일이 일어날 것인지 명시
@Override
public void call() {
subject.onNext(true);
}
}).subscribe(); // 단지 옵저버블이 시작하도록 호출될 뿐, 값이나 완료, 에러는 무시
- BehaviorSubject
가장 최근에 관찰된 아이템과 그 후에 관찰된 나머지 아이템을 구독하는 옵저버에게 발행
옵저버가 관찰하는 순간 가장 최근의 값을 발행해야 하므로 초기값 필요
BehaviorSubject<Integer> behaviorSubject = BehaviorSubject.create(1);
- ReplaySubject
관찰한 모든 아이템을 버퍼에 저장하고 구독하는 옵저버에게 재생
ReplaySubject<Integer> replaySubject = ReplaySubject.create();
- AsyncSubject
옵저버블이 완료됐을 때 구독하고 있는 각 옵저버에게 관찰한 마지막 아이템만을 발행
AsyncSubject<Integer> asyncSubject = AsyncSubject.create();
'Mobile > RxJava2' 카테고리의 다른 글
171211(월) - @Target (0) | 2017.12.11 |
---|---|
171208(금) - @Qualifiers (0) | 2017.12.08 |
171206(수) - @Scope, @Retention (0) | 2017.12.06 |
170904(월) - RxJava2 (Infinite scroll with RxJava2, Kotlin) (0) | 2017.09.04 |
160802(화) - RxJava(1. 닷넷에서 Rx자바까지) (0) | 2016.08.03 |
댓글