Basic structure
Let’s first look at the most basic code and analyze how this code is implemented in RxJava.
Observable.OnSubscribe<String> onSubscribe1 = new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("1"); subscriber.onCompleted(); }}; Subscriber<String> subscriber1 = new Subscriber<String>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(String s) { }};Observable.create(onSubscriber1) .subscribe(subscriber1);First, let's take a look at the code of Observable.create
public final static <T> Observable<T> create(OnSubscribe<T> f) { return new Observable<T>(hook.onCreate(f));} protected Observable(OnSubscribe<T> f) { this.onSubscribe = f;} It directly calls the constructor of Observable to create a new Observable object. We temporarily mark this object as observable1 for later traceability.
At the same time, the OnSubscribe object we passed in will be saved in the onSubscribe property of observable1. This property is very important in the subsequent context, please pay attention.
Next, let’s take a look at the subscribe method.
public final Subscription subscribe(Subscriber<? super T> subscriber) { return Observable.subscribe(subscriber, this);}private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) { ... subscriber.onStart(); hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber); return hook.onSubscribeReturn(subscriber);} As you can see, after subscribe, the observable1.onSubscribe.call method is called directly, which is the call method of the onSubscribe1 object in our code. The passed parameters are the subscriber1 object defined in our code. What is done in the call method is to call the onNext and onComplete methods of the incoming subscriber1 object.
This enables communication between the observer and the observer. Isn’t it very simple?
public void call(Subscriber<? super String> subscriber) { subscriber.onNext("1"); subscriber.onCompleted();}RxJava usage scenario summary
1. Check the cached scenario first
Get data, first check whether there is a cache in the memory and then check whether there is any previous condition in the file cache, and finally, the following condition will be satisfied from the network.
final Observable<String> memory = Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { if (memoryCache != null) { subscriber.onNext(memoryCache); } else { subscriber.onCompleted(); } }});Observable<String> disk = Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { String cachePref = rxPreferences.getString("cache").get(); if (!TextUtils.isEmpty(cachePref)) { subscriber.onNext(cachePref); } else { subscriber.onCompleted(); } }}); Observable<String> network = Observable.just("network");//The main thing is to rely on the concat operator to implement Observable.concat(memory, disk, network).first().subscribeOn(Schedulers.newThread()).subscribe(s -> { memoryCache = "memory"; System.out.println("-------------------------subscribe: " + s);});2. The interface needs to wait until multiple interfaces have collected data concurrently before updating
//Split the output of two Observables, and the order is not guaranteed. Send to the subscriber in the order of events private void testMerge() { Observable<String> observable1 = DemoUtils.createObservable1().subscribeOn(Schedulers.newThread()); Observable<String> observable2 = DemoUtils.createObservable2().subscribeOn(Schedulers.newThread()); Observable.merge(observable1, observable2) .subscribeOn(Schedulers.newThread()) .subscribe(System.out::println);}3. The request of one interface depends on the data returned by another API request.
For example, after we need to log in, we often get the message list based on the token we get.
Here, RxJava is used to solve the problem of nested callbacks. There is a proper noun called Callback hell
NetworkService.getToken("username", "password") .flatMap(s -> NetworkService.getMessage(s)) .subscribe(s -> { System.out.println("message: " + s); });4. The interface button needs to prevent continuous clicks
RxView.clicks(findViewById(R.id.btn_throttle)) .throttleFirst(1, TimeUnit.SECONDS) .subscribe(aVoid -> { System.out.println("click"); });5. Responsive interface
For example, checkbox is checked and the corresponding preference is automatically updated.
SharedPreferences preferences = PreferenceManager.getDefaultSharedPreferences(this);RxSharedPreferences rxPreferences = RxSharedPreferences.create(preferences);Preference<Boolean> checked = rxPreferences.getBoolean("checked", true);CheckBox checkBox = (CheckBox) findViewById(R.id.cb_test);RxCompoundButton.checkedChanges(checkBox) .subscribe(checked.asAction());6. Complex data transformation
Observable.just("1", "2", "2", "3", "4", "5") .map(Integer::parseInt) .filter(s -> s > 1) .distinct() .take(3) .reduce((integer, integer2) -> integer.intValue() + integer2.intValue()) .subscribe(System.out::println);//9