-
Notifications
You must be signed in to change notification settings - Fork 7.6k
Creating Observables
This page shows methods that create reactive sources, such as Observables.
Available in: 
Flowable, 
Observable, 
Maybe, 
Single, 
Completable
ReactiveX documentation: http://reactivex.io/documentation/operators/just.html
Constructs a reactive type by taking a pre-existing object and emitting that specific object to the downstream consumer upon subscription.
String greeting = "Hello world!";
Observable<String> observable = Observable.just(greeting);
observable.subscribe(item -> System.out.println(item));There exist overloads with 2 to 9 arguments for convenience, which objects (with the same common type) will be emitted in the order they are specified.
Observable<Object> observable = Observable.just("1", "A", "3.2", "def");
observable.subscribe(item -> System.out.print(item), error -> error.printStackTrace(),
() -> System.out.println());Constructs a sequence from a pre-existing source or generator type.
Note: These static methods use the postfix naming convention (i.e., the argument type is repeated in the method name) to avoid overload resolution ambiguities.
ReactiveX documentation: http://reactivex.io/documentation/operators/from.html
Available in: 
Flowable, 
Observable, 
Maybe, 
Single, 
Completable
Signals the items from a java.lang.Iterable source (such as Lists, Sets or Collections or custom Iterables) and then completes the sequence.
List<Integer> list = new ArrayList<>(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8));
Observable<Integer> observable = Observable.fromIterable(list);
observable.subscribe(item -> System.out.println(item), error -> error.printStackTrace(),
() -> System.out.println("Done"));Available in: 
Flowable, 
Observable, 
Maybe, 
Single, 
Completable
Signals the elements of the given array and then completes the sequence.
Integer[] array = new Integer[10];
for (int i = 0; i < array.length; i++) {
array[i] = i;
}
Observable<Integer> observable = Observable.fromArray(array);
observable.subscribe(item -> System.out.println(item), error -> error.printStackTrace(),
() -> System.out.println("Done"));Note: RxJava does not support primitive arrays, only (generic) reference arrays.
Available in: 
Flowable, 
Observable, 
Maybe, 
Single, 
Completable
When a consumer subscribes, the given java.util.concurrent.Callable is invoked and its returned value (or thrown exception) is relayed to that consumer.
Callable<String> callable = () -> {
System.out.println("Hello World!");
return "Hello World!");
}
Observable<String> observable = Observable.fromCallable(callable);
observable.subscribe(item -> System.out.println(item), error -> error.printStackTrace(),
() -> System.out.println("Done"));Remark: In Completable, the actual returned value is ignored and the Completable simply completes.
Available in: 
Flowable, 
Observable, 
Maybe, 
Single, 
Completable
When a consumer subscribes, the given io.reactivex.function.Action is invoked and the consumer completes or receives the exception the Action threw.
Action action = () -> System.out.println("Hello World!");
Completable completable = Completable.fromAction(action);
completable.subscribe(() -> System.out.println("Done"), error -> error.printStackTrace());Note: the difference between fromAction and fromRunnable is that the Action interface allows throwing a checked exception while the java.lang.Runnable does not.
Available in: 
Flowable, 
Observable, 
Maybe, 
Single, 
Completable
When a consumer subscribes, the given io.reactivex.function.Action is invoked and the consumer completes or receives the exception the Action threw.
Runnable runnable = () -> System.out.println("Hello World!");
Completable completable = Completable.fromRunnable(runnable);
completable.subscribe(() -> System.out.println("Done"), error -> error.printStackTrace());Note: the difference between fromAction and fromRunnable is that the Action interface allows throwing a checked exception while the java.lang.Runnable does not.
Available in: 
Flowable, 
Observable, 
Maybe, 
Single, 
Completable
Given a pre-existing, already running or already completed java.util.concurrent.Future, wait for the Future to complete normally or with an exception in a blocking fashion and relay the produced value or exception to the consumers.
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
Future<String> future = executor.schedule(() -> "Hello world!", 1, TimeUnit.SECONDS);
Observable<String> observable = Observable.fromFuture(future);
observable.subscribe(
item -> System.out.println(item),
error -> error.printStackTrace(),
() -> System.out.println("Done"));
executor.shutdown();Wraps or converts another reactive type to the target reactive type.
The following combinations are available in the various reactive types with the following signature pattern: targetType.from{sourceType}()
Available in:
| targetType \ sourceType | Publisher | Observable | Maybe | Single | Completable |
|---|---|---|---|---|---|
| Flowable | ![]() |
||||
| Observable | ![]() |
||||
| Maybe | ![]() |
![]() |
|||
| Single | ![]() |
![]() |
|||
| Completable | ![]() |
![]() |
![]() |
![]() |
*Note: not all possible conversion is implemented via the from{reactive type} method families. Check out the to{reactive type} method families for further conversion possibilities.
Flux<Integer> reactorFlux = Flux.fromCompletionStage(CompletableFuture.<Integer>completedFuture(1));
Observable<Integer> observable = Observable.fromPublisher(reactorFlux);
observable.subscribe(
item -> System.out.println(item),
error -> error.printStackTrace(),
() -> System.out.println("Done"));Available in: 
Flowable, 
Observable, 
Maybe, 
Single, 
Completable
ReactiveX documentation: http://reactivex.io/documentation/operators/create.html
Creates a cold, synchronous and stateful generator of values.
int startValue = 1;
int incrementValue = 1;
Flowable<Integer> flowable = Flowable.generate(() -> startValue, (s, emitter) -> {
int nextValue = s + incrementValue;
emitter.onNext(nextValue);
return nextValue;
});
flowable.subscribe(value -> System.out.println(value));Available in: 
Flowable, 
Observable, 
Maybe, 
Single, 
Completable
ReactiveX documentation: http://reactivex.io/documentation/operators/create.html
Construct a safe reactive type instance which when subscribed to by a consumer, runs an user-provided function and provides a type-specific Emitter for this function to generate the signal(s) the designated business logic requires. This method allows bridging the non-reactive, usually listener/callback-style world, with the reactive world.
ScheduledExecutorService executor = Executors.newSingleThreadedScheduledExecutor();
ObservableOnSubscribe<String> handler = emitter -> {
Future<Object> future = executor.schedule(() -> {
emitter.onNext("Hello");
emitter.onNext("World");
emitter.onComplete();
return null;
}, 1, TimeUnit.SECONDS);
emitter.setCancellable(() -> future.cancel(false));
};
Observable<String> observable = Observable.create(handler);
observable.subscribe(item -> System.out.println(item), error -> error.printStackTrace(),
() -> System.out.println("Done"));
Thread.sleep(2000);
executor.shutdown();Note: Flowable.create() must also specify the backpressure behavior to be applied when the user-provided function generates more items than the downstream consumer has requested.
Available in: 
Flowable, 
Observable, 
Maybe, 
Single, 
Completable
ReactiveX documentation: http://reactivex.io/documentation/operators/defer.html
Calls an user-provided java.util.concurrent.Callable when a consumer subscribes to the reactive type so that the Callable can generate the actual reactive instance to relay signals from towards the consumer. defer allows:
- associating a per-consumer state with such generated reactive instances,
- allows executing side-effects before an actual/generated reactive instance gets subscribed to,
- turn hot sources (i.e.,
Subjects andProcessors) into cold sources by basically making those hot sources not exist until a consumer subscribes.
Observable<Long> observable = Observable.defer(() -> {
long time = System.currentTimeMillis();
return Observable.just(time);
});
observable.subscribe(time -> System.out.println(time));
Thread.sleep(1000);
observable.subscribe(time -> System.out.println(time));Available in: 
Flowable, 
Observable, 
Maybe, 
Single, 
Completable
ReactiveX documentation: http://reactivex.io/documentation/operators/range.html
Generates a sequence of values to each individual consumer. The range() method generates Integers, the rangeLong() generates Longs.
String greeting = "Hello World!";
Observable<Integer> indexes = Observable.range(0, greeting.length());
Observable<Character> characters = indexes
.map(index -> greeting.charAt(index));
characters.subscribe(character -> System.out.print(character), error -> error.printStackTrace(),
() -> System.out.println());Available in: 
Flowable, 
Observable, 
Maybe, 
Single, 
Completable
ReactiveX documentation: http://reactivex.io/documentation/operators/interval.html
Periodically generates an infinite, ever increasing numbers (of type Long). The intervalRange variant generates a limited amount of such numbers.
Observable<Long> clock = Observable.interval(1, TimeUnit.SECONDS);
clock.subscribe(time -> {
if (time % 2 == 0) {
System.out.println("Tick");
} else {
System.out.println("Tock");
}
});Available in: 
Flowable, 
Observable, 
Maybe, 
Single, 
Completable
ReactiveX documentation: http://reactivex.io/documentation/operators/timer.html
After the specified time, this reactive source signals a single 0L (then completes for Flowable and Observable).
Observable<Long> eggTimer = Observable.timer(5, TimeUnit.MINUTES);
eggTimer.blockingSubscribe(v -> System.out.println("Egg is ready!"));Available in: 
Flowable, 
Observable, 
Maybe, 
Single, 
Completable
ReactiveX documentation: http://reactivex.io/documentation/operators/empty-never-throw.html
This type of source signals completion immediately upon subscription.
Observable<String> empty = Observable.empty();
empty.subscribe(
v -> System.out.println("This should never be printed!"),
error -> System.out.println("Or this!"),
() -> System.out.println("Done will be printed."));Available in: 
Flowable, 
Observable, 
Maybe, 
Single, 
Completable
ReactiveX documentation: http://reactivex.io/documentation/operators/empty-never-throw.html
This type of source does not signal any onNext, onSuccess, onError or onComplete. This type of reactive source is useful in testing or "disabling" certain sources in combinator operators.
Observable<String> never = Observable.never();
never.subscribe(
v -> System.out.println("This should never be printed!"),
error -> System.out.println("Or this!"),
() -> System.out.println("This neither!"));Available in: 
Flowable, 
Observable, 
Maybe, 
Single, 
Completable
ReactiveX documentation: http://reactivex.io/documentation/operators/empty-never-throw.html
Signal an error, either pre-existing or generated via a java.util.concurrent.Callable, to the consumer.
Observable<String> error = Observable.error(new IOException());
error.subscribe(
v -> System.out.println("This should never be printed!"),
e -> e.printStackTrace(),
() -> System.out.println("This neither!"));A typical use case is to conditionally map or suppress an exception in a chain utilizing onErrorResumeNext:
Observable<String> observable = Observable.fromCallable(() -> {
if (Math.random() < 0.5) {
throw new IOException();
}
throw new IllegalArgumentException();
});
Observable<String> result = observable.onErrorResumeNext(error -> {
if (error instanceof IllegalArgumentException) {
return Observable.empty();
}
return Observable.error(error);
});
for (int i = 0; i < 10; i++) {
result.subscribe(
v -> System.out.println("This should never be printed!"),
error -> error.printStackTrace(),
() -> System.out.println("Done"));
}Copyright (c) 2016-present, RxJava Contributors.
Twitter @RxJava | Gitter @RxJava