본문

171213(수) - RxJava2 (create)

RxJava2


create


RxJava1

- create

create

RxJava implements this operator as create.

It is good practice to check the observer’s isUnsubscribed state from within the function you pass to create so that your Observable can stop emitting items or doing expensive calculations when there is no longer an interested observer.

Sample Code

Observable.create(new Observable.OnSubscribe<Integer>() {
    @Override
    public void call(Subscriber<? super Integer> observer) {
        try {
            if (!observer.isUnsubscribed()) {
                for (int i = 1; i < 5; i++) {
                    observer.onNext(i);
                }
                observer.onCompleted();
            }
        } catch (Exception e) {
            observer.onError(e);
        }
    }
 } ).subscribe(new Subscriber<Integer>() {
        @Override
        public void onNext(Integer item) {
            System.out.println("Next: " + item);
        }

        @Override
        public void onError(Throwable error) {
            System.err.println("Error: " + error.getMessage());
        }

        @Override
        public void onCompleted() {
            System.out.println("Sequence complete.");
        }
    });
Next: 1
Next: 2
Next: 3
Next: 4
Sequence complete.

create does not by default operate on any particular Scheduler.



RxJava2

- create

@CheckReturnValue
 @BackpressureSupport(value=SPECIAL)
 @SchedulerSupport(value="none")
public static <T> Flowable<T> create(FlowableOnSubscribe<T> source,
                                                                                                                             BackpressureStrategy mode)
Provides an API (via a cold Flowable) that bridges the reactive world with the callback-style, generally non-backpressured world.

Example:


 Flowable.<Event>create(emitter -> {
     Callback listener = new Callback() {
         @Override
         public void onEvent(Event e) {
             emitter.onNext(e);
             if (e.isLast()) {
                 emitter.onComplete();
             }
         }

         @Override
         public void onFailure(Exception e) {
             emitter.onError(e);
         }
     };

     AutoCloseable c = api.someMethod(listener);

     emitter.setCancellable(c::close);

 }, BackpressureStrategy.BUFFER);
 

You should call the FlowableEmitter onNext, onError and onComplete methods in a serialized fashion. The rest of its methods are thread-safe.

Backpressure:
The backpressure behavior is determined by the mode parameter.
Scheduler:
create does not operate by default on a particular Scheduler.
Type Parameters:
T - the element type
Parameters:
source - the emitter that is called when a Subscriber subscribes to the returned Flowable
mode - the backpressure mode to apply if the downstream Subscriber doesn't request (fast) enough
Returns:
the new Flowable instance
See Also:
FlowableOnSubscribeBackpressureStrategyCancellable



- generate

http://www.nurkiewicz.com/2017/08/generating-backpressure-aware-streams.html

@CheckReturnValue
 @BackpressureSupport(value=FULL)
 @SchedulerSupport(value="none")
public static <T,S> Flowable<T> generate(java.util.concurrent.Callable<S> initialState,
                                                                                                                              BiConsumer<S,Emitter<T>> generator)
Returns a cold, synchronous, stateful and backpressure-aware generator of values.
Backpressure:
The operator honors downstream backpressure.
Scheduler:
generate does not operate by default on a particular Scheduler.
Type Parameters:
S - the type of the per-Subscriber state
T - the generated value type
Parameters:
initialState - the Callable to generate the initial state for each Subscriber
generator - the Consumer called with the current state whenever a particular downstream Subscriber has requested a value. The callback then should call onNextonError or onComplete to signal a value or a terminal event. Signalling multiple onNext in a call will make the operator signal IllegalStateException.
Returns:
the new Flowable instance



- unsafeCreate

@CheckReturnValue
 @BackpressureSupport(value=NONE)
 @SchedulerSupport(value="none")
public static <T> Flowable<T> unsafeCreate(Publisher<T> onSubscribe)
Create a Flowable by wrapping a Publisher which has to be implemented according to the Reactive-Streams specification by handling backpressure and cancellation correctly; no safeguards are provided by the Flowable itself.
Backpressure:
This operator is a pass-through for backpressure and the behavior is determined by the provided Publisher implementation.
Scheduler:
unsafeCreate by default doesn't operate on any particular Scheduler.
Type Parameters:
T - the value type emitted
Parameters:
onSubscribe - the Publisher instance to wrap
Returns:
the new Flowable instance
Throws:
java.lang.IllegalArgumentException - if onSubscribe is a subclass of Flowable; such instances don't need conversion and is possibly a port remnant from 1.x or one should use hide() instead.


'Mobile > RxJava2' 카테고리의 다른 글

171215(금) - RxJava2 (empty/never/error)  (0) 2017.12.15
171214(목) - RxJava2 (defer)  (0) 2017.12.14
171211(월) - @Volatile  (0) 2017.12.11
171211(월) - @Target  (0) 2017.12.11
171208(금) - @Qualifiers  (0) 2017.12.08

공유

댓글