Prefácio:
Recentemente, ao analisar o RPC do Hadoop (Protocolo de Chamada de Procedimento Remoto), um protocolo que solicita serviços de programas de computador remotos por meio da rede sem entender a tecnologia de rede subjacente. You can refer to: http://baike.baidu.com/view/32726.htm) mechanism, it was found that the implementation of hadoop's RPC mechanism mainly uses two technologies: dynamic proxy (you can refer to the blog: http://weixiaolu.iteye.com/blog/1477774) and java NIO. Para analisar corretamente o código -fonte do RPC do Hadoop, acho que é necessário primeiro estudar os princípios e a implementação específica do Java Nio.
Neste blog, analiso principalmente o Java Nio de duas direções
Índice:
um. A diferença entre Java Nio e E/S bloqueando
1. Bloqueando o modelo de comunicação de E/S
2. Java Nio Princípio e Comunicação Modelo 2. Java Nio Server e Código do cliente Implementação
Análise específica:
um. A diferença entre Java Nio e E/S bloqueando
1. Bloqueando o modelo de comunicação de E/S
Se você tem um certo entendimento do bloqueio de E/S agora, sabemos que o bloqueio de E/S está bloqueado ao chamar o método InputStream.read (), ele esperará até que os dados cheguem (ou tempo limite) antes de retornar; Da mesma forma, ao ligar para o método ServerSocket.accept (), ele será bloqueado até que haja uma conexão com o cliente antes de retornar. Depois que cada cliente se conectar, o servidor iniciará um thread para processar a solicitação do cliente. O diagrama do modelo de comunicação da E/S de bloqueio é o seguinte:
Se você analisá -lo com cuidado, definitivamente descobrirá que existem algumas desvantagens de bloquear a E/S. Com base no modelo de comunicação de E/S de bloqueio, resumi suas duas desvantagens:
1. Quando houver muitos clientes, será criado um grande número de tópicos de processamento. E cada thread ocupa o espaço da pilha e algum tempo da CPU
2. O bloqueio pode levar a uma comutação de contexto frequente, e a maioria das mudanças de contexto pode não ter sentido.
Nesse caso, a E/S não bloqueadora possui suas perspectivas de aplicativos.
2. Java Nio Princípio e Modelo de Comunicação
Java Nio foi iniciado em JDK1.4, e pode-se dizer que são os dois "E/S novos" ou a E/S não bloqueadora. Aqui está como Java Nio funciona:
1. Um encadeamento dedicado lida com todos os eventos de IO e é responsável pela distribuição.
2. Mecanismo orientado a eventos: gatilhos quando um evento chega, em vez de monitorar eventos simultaneamente.
3. Comunicação de threads: os threads se comunicam através da espera, notificar e outros meios. Certifique -se de que cada interruptor de contexto faça sentido. Reduza a troca desnecessária de roscas.
Depois de ler algumas informações, publiquei o diagrama esquemático de Java Nio que entendo:
(Nota: o fluxo de processamento de cada encadeamento provavelmente está lendo dados, decodificação, processamento de computação, codificação e envio de respostas.)
O servidor Java Nio precisa apenas iniciar um tópico especial para lidar com todos os eventos de IO. Como esse modelo de comunicação é implementado? Haha, vamos explorar seu mistério juntos. O Java Nio usa um canal de mão dupla para transmissão de dados, em vez de um fluxo unidirecional, e eventos de interesse podem ser registrados no canal. Existem quatro eventos no total:
| Nome do evento | Valor correspondente |
| O servidor recebe eventos de conexão do cliente | SelectionKey.op_accept (16) |
| Evento do servidor de conexão do cliente | SelectionKey.op_connect (8) |
| Leia eventos | SelectionKey.op_read (1) |
| Escreva eventos | SelectionKey.op_write (4) |
O servidor e o cliente mantêm um objeto que gerencia um canal, que chamamos de seletor, que pode detectar eventos em um ou mais canais. Vamos tomar o servidor como exemplo. Se um evento de leitura estiver registrado no seletor do servidor, o cliente enviará alguns dados para o servidor em algum momento. Ao bloquear a E/S, ele chamará o método read () para bloquear os dados, e o servidor NIO adicionará um evento de leitura ao seletor. O encadeamento de processamento do servidor será pesquisado para acessar o seletor. Se for encontrado um evento de interesse ao acessar o seletor, ele processará esses eventos. Se nenhum evento de interesse chegar, o tópico de processamento será bloqueado até que o evento de interesse chegue. Abaixo está um diagrama esquemático do modelo de comunicação de Java Nio que eu entendo:
dois. Java Nio Server e Código do cliente Implementação
Para entender melhor o Java Nio, a seguir é uma implementação simples de código para o servidor e o cliente.
Servidor:
pacote cn.nio; importar java.io.ioException; importar java.net.inetsocketaddress; importar java.nio.byteBuffer; importar java.nio.channels.selectionKey; importar java.nio.channels.selector; importar java.nio.channels.serversocketchannel; importar java.nio.channels.socketchannel; importar java.util.iterator; /*** Nio Server* @Author Small Path*/public Class Nioserver {// Channel Manager Seletor Private Seletor; / ** * Obtenha um canal do ServerSocket e faça algum trabalho de inicialização no canal * @param porta porta ligada * @throws ioexception */ public void initServer (int porta) lança ioexception {// obtenha um serviderSocket Channel ServerSocketchAnnel ServerChannel = serverSockEnkAnel.open (); // Defina o canal para o serverChannel.configureblocking não-bloqueador (false); // vincula o servidorsocket correspondente a este canal ao port serverChannel.socket (). Bind (new inetSocketAddress (porta)); // Obtenha um gerenciador de canal this.selector = selector.open (); // Ligue o gerenciador de canais ao canal e registre o evento SelectionKey.OP_ACECT para o canal. Depois de registrar o evento, // Quando o evento chegar, selettor.Select () retornará. Se o evento não chegar a seletor.Select () bloqueará. serverChannel.register (seletor, SelectionKey.op_accept); } /*** Use a pesquisa para ouvir se existem eventos no seletor que precisam ser processados. Nesse caso, ele será processado * @THOWSoException */ @suppresswarnings ("desmarcado") public void listen () lança ioexception {System.out.println ("O lado do servidor começa com sucesso!"); // Pesquisa para acessar seletor while (true) {// Quando o evento registrado chegar, o método retorna; Caso contrário, o método continuará bloqueando selettor.Select (); // Obtenha o iterador do item selecionado no seletor e o item selecionado é o iterador de evento registrado item = this.selector.SelectedKeys (). Iterator (); while (ite.hasnext ()) {seleçãokey Key = (SelectionKey) iTe.Next (); // exclua a chave selecionada para evitar o processamento repetido de item.remove (); // O cliente solicita o evento de conexão if (key.isacceptable ()) {serversocketchannel server = (serversocketchannel) key .channel (); // Obtenha o canal para conectar -se ao canal do SocketchAnnel do cliente = server.accept (); // defina como canal não bloqueador.configureblocking (false); // Você pode enviar informações para o cliente aqui canal.write (bytebuffer.wrap (new string ("Envie uma mensagem para o cliente"). GetBytes ())); // Depois que a conexão com o cliente for bem -sucedida, para receber as informações do cliente, você precisa definir permissões de leitura para o canal. Channel.Register (this.selector, SelectionKey.op_read); // Um evento legível foi obtido} else if (key.isReadable ()) {read (key); }}}} / ** * Eventos de processamento que leem as mensagens enviadas pelo cliente * @param key * @throws ioexception * / public void Read (seleção keyKey) lança ioexception {// O servidor pode ler mensagens: Obtenha o canal de soquete onde o evento ocorre. Canal de socketchannel = (Socketchannel) key.Channel (); // crie um buffer de leitura buffer buffer = bytebuffer.allocate (10); canal.read (buffer); byte [] dados = buffer.array (); String msg = new String (dados) .Trim (); System.out.println ("O servidor recebeu a mensagem:"+msg); Bytebuffer outbuffer = bytebuffer.wrap (msg.getBytes ()); Channel.Write (outbuffer); // Envie a mensagem de volta ao cliente}/ *** Teste do servidor START* @THOWSOWSCECCECTION*/ public static void main (string [] args) lança ioexception {nioserver server = new nioserver (); Server.initServer (8000); server.Listen (); }} Cliente:
pacote cn.nio; importar java.io.ioException; importar java.net.inetsocketaddress; importar java.nio.byteBuffer; importar java.nio.channels.selectionKey; importar java.nio.channels.selector; importar java.nio.channels.socketchannel; importar java.util.iterator; /*** cliente nio* @Author Small Path*/Public Class nioclient {// Channel Manager Private Seletor Seletor; /** * Get a Socket channel and do some initialization of the channel * @param ip The ip of the server connected to * @param port The port number of the server connected to * @throws IOException */ public void initClient(String ip,int port) throws IOException { // Get a Socket channel SocketChannel channel = SocketChannel.open(); // defina o canal para canal não bloqueador.configureblocking (false); // Obtenha um gerenciador de canal this.selector = selector.open (); // O cliente se conecta ao servidor. De fato, a execução do método não implementa a conexão. Você precisa ligar // usar canal.finishconnect (); no método LODE () para concluir o canal de conexão.Connect (new inetSocketAddress (IP, porta)); // Ligue o gerenciador de canais ao canal e registre o evento SelectionKey.op_Connect para o canal. Channel.Register (Selector, SelectionKey.OP_Connect); } /*** Use a pesquisa para ouvir se existem eventos no seletor que precisam ser processados. Nesse caso, ele será processado * @THOWSoException */ @suppresswarnings ("desmarcado") public void listen () lança ioexception {// pesquisa para acessar o seletor while (true) {Selector.Select (); // Obtenha o iterador para o item selecionado no iterador seletor item = this.selector.SelectedKeys (). Iterator (); while (ite.hasNext ()) {selectionKey key = (seleçãoKey) item.Next (); // exclua a chave selecionada para evitar o processamento repetido de item.remove (); // O evento de conexão ocorre se (key.isconnectable ()) {canal de socketchannel = (socketchannel) key .channel (); // Se a conexão estiver sendo conectada, preencha a conexão se (canal.isconnectionPending ()) {Channel.finishConnect (); } // Defina como canal não-block.configureblocking (false); // você pode enviar informações para o servidor canal.write (bytebuffer.wrap (new string ("Envie uma mensagem para o servidor"). GetBytes ())); // Depois que a conexão com o servidor for bem -sucedida, para receber as informações do servidor, o canal precisa ser definido para ler permissões. Channel.Register (this.selector, SelectionKey.op_read); // Um evento legível foi obtido} else if (key.isReadable ()) {read (key); }}}} /*** Eventos de processamento que leem as mensagens enviadas pelo servidor* @param key* @throws ioexception* /public void Read (seleção KeyKey) lança IoException {// O mesmo que o método de leitura do servidor} /*** Args Test do cliente* @throws ioException* /public static Void Main ([] string Nioclient (); client.initclient ("localhost", 8000); client.Listen (); }}resumo:
Finalmente, a análise dinâmica de proxy e java nio é concluída. Haha, o seguinte é analisar o código -fonte do mecanismo RPC do Hadoop. O endereço do blog é: http://weixiaolu.iteye.com/blog/1504898. No entanto, se você tiver alguma objeção à sua compreensão do Java Nio, poderá discutir isso juntos.
Se você precisar reimprimir, indique a fonte: http://weixiaolu.iteye.com/blog/1479656