기본 구조
먼저 가장 기본적인 코드를 살펴 보고이 코드가 Rxjava에서 구현되는 방식을 분석합시다.
Observable.onSubscribe <string> onsubscribe1 = new Observable.onSubscribe <string> () {@override public void call (구독자 <? super string> 가입자) {Subscriber.onnext ( "1"); 가입자.oncompleted (); }}; 가입자 <stringer1 = 새 가입자 <string> () {@override public void oncompleted () {} @override public void onerror (trashable e) {} @override public void onnext (string s) {}}; persoundable.create (onsubscriber1).먼저 관찰 가능한 코드를 살펴 보겠습니다.
공개 최종 정적 <T> 관찰 가능 <T> Create (OnSubScribe <T> f) {return new Observable <T> (hook.oncreate (f));} 보호 관측 가능 (onsubscribe <t> f) {this.onsubscribe = f;} 새로운 관찰 가능한 객체를 생성하기 위해 Observable의 생성자를 직접 호출합니다. 우리는이 객체를 일시적으로 나중에 추적 성을 위해 관찰 가능한 것으로 표시합니다.
동시에, 우리가 통과 한 onsubscribe 객체는 Observable1의 OnSubscribe 속성에 저장됩니다 1. 이 속성은 후속 상황에서 매우 중요합니다.주의를 기울이십시오.
다음으로 구독 방법을 살펴 보겠습니다.
공개 최종 구독 구독 (가입자 <? Super T> 가입자) {return Observable.Subscribe (구독자, this);} 개인 static <t> 가입 구독 (구독자 <? super t> 가입자, 관찰 가능한 <t> 관찰 가능) {... 가입자. hook.onsubscribeStart (관찰 가능, 관찰 가능.onsubscribe) .call (가입자); return hook.onsubscribereturn (구독자);} 보시다시피, 구독 후 Observable1.onsubscribe.call 메소드는 직접 호출되며, 이는 코드에서 OnSubscribe1 객체의 호출 메소드입니다. 전달 된 매개 변수는 코드에 정의 된 가입자 1 객체입니다. 통화 방법에서 수행되는 것은 들어오는 가입자 객체의 onext 및 oncompleter 메소드를 호출하는 것입니다.
이를 통해 관찰자와 관찰자 간의 의사 소통이 가능합니다. 매우 간단하지 않습니까?
public void call (가입자 <? super string> 가입자) {가입자 .onnext ( "1"); 가입자.oncompleted ();}rxjava 사용 시나리오 요약
1. 캐시 된 시나리오를 먼저 확인하십시오
데이터를 가져오고 먼저 메모리에 캐시가 있는지 확인한 다음 파일 캐시에 이전 조건이 있는지 확인한 다음 마지막으로 네트워크에서 다음 조건이 충족됩니다.
최종 관찰 가능한 <string> memory = verciable.create (new Observable.onSubscribe <string> () {@override public void call (subscriber <? super string> 가입자) {if (memoryCache! = null) {subscriper.onnext (memoryCache);} else {elsecce.oncompleted ();}}; Observable.create (new Observable.onSubscribe <string> () {@override public void call (구독자 <? super string> subscriber) {String cachepref = rxpreferences.getString ( "cache"). get (! textutils.isempty (cachepref)) {cachepref.onxt (cachepref); 가입자 .oncompleted (); Observable <string> network = personsable.just ( "Network"); // 가장 중요한 것은 Observable.concat (Memory, Disk, Network) .first (). 가입자 (schedulers.newthread ())를 구현하기 위해 Concat 연산자에 의존하는 것입니다 (s-> {MemoryCache = "Memory"; System.)2. 인터페이스는 여러 인터페이스가 업데이트하기 전에 동시에 데이터를 수집 할 때까지 기다려야합니다.
// 두 개의 관측 가능성의 출력을 분할하면 순서가 보장되지 않습니다. 이벤트 순서대로 가입자에게 보내기 개인 무효 testmerge () {Observable <String> Observable1 = demoutils.createObservable1 (). bubscribeon (schedulers.newthread ()); Observable <string> Observable2 = Demoutils.createObservable2 (). 가입자 (Schedulers.newthread ()); Observable.Merge (Observable1, Observable2) .Subscribeon (schedulers.newthread ()) .subscribe (System.out :: println);}3. 한 인터페이스의 요청은 다른 API 요청에 의해 반환 된 데이터에 따라 다릅니다.
예를 들어, 로그인 해야하는 후에는 종종 우리가 얻는 토큰을 기반으로 메시지 목록을 얻습니다.
여기서 rxjava는 중첩 된 콜백 문제를 해결하는 데 사용됩니다. 콜백 지옥이라는 적절한 명사가 있습니다
NetworkService.getToken ( "username", "password") .flatmap (s-> networkService.getMessage (s)) .SubScribe (s-> {system.out.println ( "message :" + s);});4. 인터페이스 버튼은 연속 클릭을 방지해야합니다
rxview.clicks (findViewById (r.id.btn_throttle)) .throttleFirst (1, timeUnit.seconds) .SubScribe (피하기 -> {system.out.println ( "Click");});5. 반응 형 인터페이스
예를 들어, 확인란이 확인되고 해당 환경 설정이 자동으로 업데이트됩니다.
sharedpreferences preference = preferenceManager.getDefaultSharedPreferences (this); rxsharedpreferences rxpreferences = rxsharedpreferences.create (preference); preference <boolean> 확인 = rxpreferences.getboole ( "Checked", true); 확인란). findViewById (r.id.cb_test); rxcompoundButton.checkedChanges (checkbox) .subscribe (checked.asaction ());
6. 복잡한 데이터 변환
Observable.just ( "1", "2", "2", "3", "4", "5") .map (Integer :: parseint) .filter (s-> s> 1) .distinct () .take (3) .reduce ((integer, integer2) -> intger.intvalue () + Integer2.intvalue2.intvalue2.intvalue2 .Subscribe (System.out :: println); // 9