본문

160812(금) - RxJava

RxJava - 2. Observable


Observer Pattern

- GoF의 디자인 패턴(Design Patterns : Elements of Reusable Object-Oriented Software) 에서 거론

- 행위 패턴

- 1 : 다 의존성으로 묶는 방법 제공

- 한 객체가 변경되면 의존된 모든 객체는 자동으로 알림을 받고 갱신


Subject

- 특별한 객체

- Subject가 변경됐을 때, 알림을 받기를 원하는 객체의 리스트를 가지고 있다.


Observer

- Subject가 자신의 상태가 변경되면 호출하는 알림 메소드를 드러내고 있음


사용 시기

- 서로 의존적인 2개의 개체를 가진 구조에서, 이를 분리하거나 독립적으로 재사용 하고싶은 경우

- 변경사항이 있는 객체가 자신의 변경사항을 다수의 객체에게 알려야 하는 경우

- 변경사항을 불특정 다수의 객체에게 추정없이 알려야하는 경우



RxJava Observer Pattern 

- Observable

- 기존 자바의 비동기 작업

Thread, Future, FutureTask, CompletableFuture 등의 고전적인 클래스 제공

복잡도가 증가할수록 지저분해지고 유지하기 어려워 진다.

메소드 체이닝 미지원


- Observable

위의 문제를 해결하기 위해 등장

유연하고 사용하기 쉬움

메소드 체이닝 가능

단일 스칼라, 시퀸스 값 발행, 무한 스트림 발행 등에 사용가능


- 생명주기

데이터 가져오기

onNext(T) 


오류 발견

onError(Throwable)


완료

onCompleted()


※ Iterable에서 소비자는 생산자에게서 동기 상태로 값을 풀링하고 값이 도착하기 전까지 스레드가 차단된다.

BUT! 옵저버블의 생산자는 값이 이용가능해지면 옵저버에게 비동기적으로 푸시함.(유연한 접근법)


- 핫 옵저버블

아이템이 생성되자마자 발행 시작

옵저버는 중간쯤에서야 시퀸스를 관찰 가능


- 콜드 옵저버블 

옵저버가 구독할 때까지 대기

처음부터 옵저버가 모든 시퀸스 볼 수 있음을 보장


- Observable.create()

create()

개발자가 새로운 옵저버블 생성 가능하도록 기능 제공


Observable.create(new Observable.OnSubscribe<Object>() {

@Overide

public void call(Subscriber<? super Object> subscriber) {

// 옵저버가 옵저버블을 구독하면 call() 함수를 실행

// Observable은 subscriber 변수를 통해 옵저버와 통신하며, 상황에 따라 

}

});



(ex)

Observable<Integer> observableString

Observable.create(new Observable.OnSubscribe<Integer>() {

@Overide

public void call(Subsriber<? super Integer> observer) {

for(int i = 0; i < 5; i++) {

observer.onNext(i);

}


observer.onCompleted();

}

});


Subscription subscriptionPrint = 

observableString.subscribe(new Observer<Integer>() {

@Overide

public void onCompleted() {
    System.out.println("Observable completed");

}


@Overide

public void onError(Throwable e) {

System.out.println("Oh no! Something wrong happend!");

}


@Overide

public void onNext(Integer item) {

System.out.println("Item is " + item);

}

});


- Observable.from()

from()

이미 생성된 리스트로부터 옵저버블 시퀸스를 생성한다.


List<Integer> items  = new ArrayList<Integer>();

items.add(1);

items.add(10);

items.add(100);

items.add(200);


Observable<Integer> observableString = Observable.from(items);

Subscription subscriptionPrint = 

observableString.subscribe(new Observer<Integer>() {

@Overide

public void onCompleted() {    

System.out.println("Observable completed");

}


@Overide

public void onError(Throwable e) {

System.out.println("Oh no! Something wrong happend!");

}


@Overide

public void onNext(Integer item) {

System.out.println("Item is " + item);

}

});


- Observable.just()

just()

- 이미 갖고 있는 자바 함수를 옵저버블로 변환 가능. 많은 boilerplate code를 줄일 수 있다.

- 10개의 인자를 가질 수 있고, 전달받은 인자 순서와 동일한 순서로 값을 발행한다.

- List나 Array를 가질 수 있지만, 리스트를 순회하면서 모든 값을 반환하지는 않고 리스트 전체를 반환



private String helloWorld() {

return "Hello World";

}


Observable<String> observableString = Observable.just(helloWorld());

Subscription subscriptionPrint = 

observableString.subscribe(new Observer<String>() {

@Overide

public void onCompleted() {
    System.out.println("Observable completed");

}


@Overide

public void onError(Throwable e) {

System.out.println("Oh no! Something wrong happend!");

}


@Overide

public void onNext(String message) {

System.out.println(message);

}

});


- Observable.empty()

아무것도 발행하지 않으면서 정상적으로 종료되는 옵저버블


- Observable.never()

아무것도 발행하지 않으면서 종료도 안되는 옵저버블


- Observable.throw()

아무것도 발행하지 않으면서 에러와 함께 종료되는 옵저버블



-Subject

= Observable + Observer


- 옵저버블이면서 동시에 옵저버

- 두 세계를 연결하는 다리 역할


- PublishSubject

기본적인 Subject 객체


PublishSubject<String> stringPublishSubject = PublishSubject.create();


Subscription subscriptionPrint = 

stringPublishSubject.subscribe(new Oberver<String>() {

@Override

public void onCompleted() {
    System.out.println("Observable completed");

}


@Override

public void onError(Throwable e) {

System.out.println("Oh no! Something wrong happend!");

}


@Override

public void onNext(String message) {

System.out.println(message);

}

});


stringPublishSubject.onNext("Hello World");




- private Observable


final PublishSubject<Boolean> subject = PublishSubject.create();


subject.subscribe(new Observer<Boolean>() {

@Override

public void onCompleted() {


}


@Override

public void onError(Throwable e) {


}


@Override

public void onNext(Boolean aBoolean) {

System.out.println("Observable completed!");

}

});



Observable.create(new Observable.OnSubscribe<Integer>() {

@Override

public void call(Subscriber<? super Integer> subscriber) {

for(int i = 0; i < 5; i++) {

subscriber.onNext(i);

}

subscriber.onCompleted();

}

}).doOnCompleted(new Action0() {    // 옵저버블이 종료될 때 어떤 일이 일어날 것인지 명시

@Override

public void call() {

subject.onNext(true);

}

}).subscribe();    // 단지 옵저버블이 시작하도록 호출될 뿐, 값이나 완료, 에러는 무시


- BehaviorSubject

가장 최근에 관찰된 아이템과 그 후에 관찰된 나머지 아이템을 구독하는 옵저버에게 발행

옵저버가 관찰하는 순간 가장 최근의 값을 발행해야 하므로 초기값 필요


BehaviorSubject<Integer> behaviorSubject = BehaviorSubject.create(1);


- ReplaySubject

관찰한 모든 아이템을 버퍼에 저장하고 구독하는 옵저버에게 재생


ReplaySubject<Integer> replaySubject = ReplaySubject.create();


- AsyncSubject

옵저버블이 완료됐을 때 구독하고 있는 각 옵저버에게 관찰한 마지막 아이템만을 발행


AsyncSubject<Integer> asyncSubject = AsyncSubject.create();

공유

댓글