1. Atraso a ação de execução
Ele pode ser implementado usando o método do timer+mapa. O código é o seguinte:
Obsertable.timer (5, timeUnit.millisEconds) .Map (valor-> {return Dosomething ();}). Subscribe (System.out :: println); } 2. Atraso o resultado de envio
Esse cenário exige que a ação da geração de dados seja executada imediatamente, mas o resultado está atrasado no envio. Isso é diferente do cenário acima.
Esse cenário pode ser implementado usando Observable.zip .
O operador ZIP combina os dados transmitidos por vários observáveis em ordem, cada dados só pode ser combinado uma vez e todos são ordenados. O número de dados combinados finais é determinado pelo observável, que transmite o mínimo de dados.
Para dados no mesmo local de cada observável, você precisa esperar um pelo outro. Ou seja, depois que os dados no primeiro local do primeiro observável forem gerados, você deve aguardar os dados no primeiro local do segundo observável a ser gerado e, depois que os dados no mesmo local de cada observável forem gerados, você pode combinar de acordo com as regras especificadas. Isso é realmente o que queremos usar.
Existem muitos tipos de declarações no ZIP, mas é aproximadamente o mesmo, que deve passar em vários observáveis e, em seguida, especificar uma regra para processar os dados no local correspondente de cada observável e gerar novos dados. Aqui está um dos mais simples:
public static <t1, t2, r> observável <r> zip (observável <? estende t1> o1, observável <? estende T2> o2, func2 final <? Super T1 ,? Super T2 ,? estende r> zipfunction);
Os resultados da execução de push e envio usando zip são os seguintes:
Obsertable.zip (obsertable.timer (5, timeunit.millisEconds), obsertable.just (Dosomething ()), (x, y)-> y) .Subscribe (System.out :: println));
3. Use o adiamento para executar determinadas ações no thread especificado
Como no código a seguir, embora especifiquem o método de execução do thread, a função doSomething() ainda é executada no encadeamento chamado pelo código atual.
Obsertable.Just (Dosomething ()) .SubScribeon (schedulers.io ()) .observeon (schedulers.computation ()) .subscribe (v-> utils.printlnwithThread (v.toString ()););
Normalmente, usamos os seguintes métodos para atingir nosso objetivo:
Obsertable.create (s-> {S.OnnExt (Dosomething ());}) .SubScribe (schedulers.io ()) .Observeon (schedulers.computation ()) .subscribe (v-> {ults.prIntlnwithThread (v.ToString ());});Mas, de fato, podemos atingir o mesmo objetivo usando o adiamento.
Sobre adiar
O operador de adiamento é o mesmo que Create, Just, de e outros operadores. Ele cria operadores de classe, mas todos os dados relacionados a esse operador entram em vigor apenas se você se inscrever.
declaração:
public static <t> observável <t> adiar (func0 <observável <t>> observablefactory);
O observável no Func0 de adier é criado apenas quando inscrito.
efeito:
Não crie o observável até que um observador assine; Crie um novo observável em cada assinatura.
Em outras palavras, o observável é criado ao assinar.
O problema acima é implementado com o adiamento:
Obsertable.defer (()-> obsertable.Just (Dosomething ())) .SubScribe (schedulers.io ()) .observeon (schedulers.computation ()) .subscribe (v-> {utils.printlnWithThread (v.ToString (););}); 4. Não quebre a estrutura da corrente usando composição
Muitas vezes vemos o seguinte código:
Obsertable.Just (Dosomething ()) .SubScribeon (schedulers.io ()) .observeon (schedulers.computation ()) .SubScribe (v-> {utils.printlnwithThread (v.tostring ()); No código acima, subscribeOn(xxx).observeOn(xxx) pode ser o mesmo em muitos lugares. Se planejarmos implementá -lo em um determinado lugar, podemos escrever assim:
estático privado <t> observable <T> Aplicações (observáveis <T> observable) {return observable.subScribe (schedulers.io ()) .Observeon (schedulers.computation ()); }Mas toda vez que precisamos chamar o método acima, será aproximadamente como o seguinte, e o mais externo é uma função, que é equivalente a quebrar a estrutura do link:
APLICSCHEDULERS (observável.From (SOMESOURCE) .MAP (novo Func1 <Data, Data> () {@Override Public Data Call (Data Data) {return manipulate (data);}})) .subscribe (new Action1 <data> () @override void den);O operador de composição pode ser usado para alcançar o objetivo de não quebrar a estrutura do link.
A declaração de composição é a seguinte:
public observável compõe (transformador <? super t ,? estende r> transformador);
Seu parâmetro recebido é uma interface do transformador e a saída é um observável. O transformador é na verdade um Func1<Observable<T> , Observable<R>> , em outras palavras: Um tipo de observável pode ser convertido em outro tipo de observável.
Simplificando, a composição pode converter o observável original em outro observável através do método de conversão especificado (transformador de parâmetros de entrada).
Através da compor, use o método a seguir para especificar o método do thread:
estático privado <t> transformador <t, t> ApplSchedulers () {retorna novo transformador <t, t> () {@Override public observável <t> Call (observável <t> observável) {return observable.subScribeon (schedulers.io ()) .Observeon (schedulers.computation (); }}; } Obsertable.Just (Dosomething ()). Compose (ApplSchedulers ()) .Subscribe (v-> {utils.printlnWithThRead (v.toString ());});A função ApplyScheduler pode ser simplificada usando expressões Lambda para o seguinte:
estático privado <t> transformador <t, t> ApplSchedulers () {return observable-> observable.subScribe (schedulers.io ()) .observeon (schedulers.computation ()); } 5. Use diferentes resultados de execução de acordo com a prioridade
O título acima provavelmente não expressou o cenário que eu queria expressar claramente. De fato, o cenário que eu quero expressar é semelhante ao cenário usual de obter dados de rede: se houver um cache, ele será obtido no cache e, se não houver, será obtido na rede.
É necessário aqui que, se houver um cache, a ação de obter dados da rede não será executada.
Isso pode ser implementado usando concat+primeiro.
A Concat mescla vários observáveis em um observável e retorna o observável final. E esses dados são como ser enviados de um observável. Os parâmetros podem ser múltiplos observáveis ou iterador que contém observalbe.
Os dados do novo observável são organizados na ordem do observável no concat original, ou seja, os dados no novo resultado são classificados na ordem original.
A seguir, a implementação dos requisitos acima:
Obsertable.Concat (getDataFromCache (), getDataFromNetwork ()). Primeiro () .SubScribe (v-> System.out.println ("Resultado:"+V)); // Obtenha dados do cache Observável estático privado <String> getDataFromCache () {return observable.create (s -> {// doSomething para obter dados Int Value = new Random (). NextInt (); Value = Value 2; Create Data! S.OnCompleted (); } // Obtenha dados da rede Observável estática privada <String> getDataFromNetwork () {return observable.create (s -> {for (int i = 0; i <10; i ++) {utils.println ("obs2 gene"+i); s.onnExt ("Dados da rede:"+i); }Na implementação acima, se o getDataFromCache tiver dados, o código aqui no getDataFromNetwork não será executado, que é exatamente o que queremos.
Existem várias implementações acima que precisam de atenção:
1. É possível que os dados não possam ser obtidos em ambos os locais. Nesse cenário, o uso de primeiro lançará uma exceção NosuchElementException. Se esse cenário for o caso, você precisará substituir o primeiro acima pelo FirstorDefault.
2. Em getDataFromCache() acima, se não houver dados, chamamos o concorrido diretamente. Se não ligarmos para o Oncompleted, mas ligar para o OnError, o uso do Concat acima mencionado não obterá nenhum resultado. Porque quando a Concat recebe qualquer erro, a mesclagem será interrompida. Portanto, se você deseja usar o OnError, precisa usar o concatdelayerror em vez de concat.concatDelayError O concatdelayerror ignorará o erro primeiro e adiará o erro até o processamento final.
Resumir
O acima é o conteúdo inteiro deste artigo. Espero que o conteúdo deste artigo seja de ajuda para o seu estudo ou trabalho. Se você tiver alguma dúvida, pode deixar uma mensagem para se comunicar.