Este artigo será introduzido de superficial ao profundo da biografia tradicional a NIO a AIO e será acompanhada por uma explicação completa de código.
Um exemplo será usado no seguinte código: o cliente envia uma sequência da equação para o servidor e o servidor retorna o resultado ao cliente após o cálculo.
Todas as instruções para o código são usadas diretamente como comentários e são incorporadas no código, o que pode ser mais fácil de entender ao ler o código. Uma classe de ferramentas para calcular o resultado será usada no código, consulte a seção Código do artigo.
Artigos recomendados para conhecimento básico relacionado:
Introdução ao modelo de E/S da rede Linux (imagens e texto)
Java Concurrency (multi-threading)
1. Programação biológica
1.1. Programação de biografia tradicional
O modelo básico de programação de rede é o modelo C/S, ou seja, a comunicação entre dois processos.
O servidor fornece portas IP e de escuta. O cliente inicia uma solicitação de conexão através do endereço de operação de conexão que o servidor deseja ouvir. Através de três apertos de mão, se a conexão for estabelecida com sucesso, ambas as partes poderão se comunicar através dos soquetes.
No desenvolvimento do modelo de bloqueio de sincronização tradicional, o ServerSocket é responsável por vincular endereços IP e portas de escuta inicial; O soquete é responsável por iniciar operações de conexão. Depois que a conexão é bem -sucedida, ambas as partes conduzem a comunicação síncrona de bloqueio através dos fluxos de entrada e saída.
Uma breve descrição do modelo de comunicação do servidor bio: o servidor usando o modelo de comunicação biológica é geralmente um thread aceitador independente responsável por ouvir a conexão do cliente. Depois de receber a solicitação de conexão do cliente, ele cria um novo thread para cada cliente para processamento de links e falha ao processá -lo e, em seguida, retorna a resposta ao cliente através do fluxo de saída e o thread é destruído. Isto é, um modelo típico de uma requisição a recém-noite toda.
Diagrama de Modelo de Comunicação Biológico Tradicional:
O maior problema com esse modelo é que ele não possui recursos de escala elástica. Quando o número de acessos simultâneos no cliente aumenta, o número de threads no servidor é proporcional ao número de acessos simultâneos no cliente. Os tópicos em Java também são recursos relativamente valiosos do sistema. Depois que o número de threads se expande rapidamente, o desempenho do sistema cairá acentuadamente. À medida que o número de acessos continua aumentando, o sistema acabará por morrer.
Código -fonte do servidor criado por E/S de bloqueio síncrono:
pacote com.anxpp.io.calculator.bio; importar java.io.ioException; importar java.net.serversocket; importar java.net.socket; /** * Código -fonte do servidor bio * @Author Yangtao__anxpp.com * @version 1.0 */public Final Class ServerNormal {// Número da porta padrão Private static int default_port = 12345; // Singleton ServerSocket Private Static ServerSocket Server; // Defina a porta de escuta de acordo com os parâmetros recebidos. Se não houver parâmetros, chame o seguinte método e use o valor padrão public static void start () lança ioexception {// use o valor padrão inicial (default_port); } // Este método não será acessado de um grande número de maneiras simultâneas, e não há necessidade de considerar a eficiência, apenas sincronize o método diretamente público sincronizado estático void START (int) lança IoException {if (server! = Null) retornar; tente {// Crie o ServerSocket através do construtor // Se a porta estiver legal e ociosa, o servidor ouvirá com sucesso. Server = new ServerSocket (porta); System.out.println ("O servidor foi iniciado, número da porta:" + porta); // Ouça as conexões do cliente através do loop sem fio // Se não houver acesso ao cliente, ele será bloqueado na operação de aceitação. while (true) {soquete soquete = server.accept (); // Quando houver um novo acesso ao cliente, o código a seguir será executado //, em seguida, crie um novo thread para lidar com este link de soquete novo thread (novo ServerHandler (Socket)). START (); }} finalmente {// algum trabalho de limpeza necessário se (servidor! = null) {System.out.println ("o servidor está fechado."); server.close (); servidor = nulo; }}}} Mensagem de mensagens do cliente Thread ServerHandler Código fonte:
pacote com.anxpp.io.calculator.bio; importar java.io.bufferedReader; importar java.io.ioException; importar java.io.inputStreamReader; importar java.io.printwriter; importar java.net.socket; importação com.anxpp.io.utils.calculator; / *** Tópico do cliente* @author yangtao__anxpp.com* link de soquete para um cliente*/ public class ServerHandler implementa Runnable {Setorgrafia privada; public ServerHandler (soquete) {this.socket = soket; } @Override public void run () {BufferErader in = null; PrintWriter Out = NULL; tente {in = new BufferredReader (new InputStreamReader (soket.getInputStream ())); out = new PrintWriter (Socket.getOutputStream (), true); Expressão de string; Resultado da string; Enquanto (true) {// Leia uma linha através do BufferErader // Se você já leu a cauda do fluxo de entrada, retorne nulo e saia do loop // se você receber um valor não nulo, tente calcular o resultado e retornar se ((expressão = in.readline ()) == NULL) quebrar; System.out.println ("O servidor recebeu uma mensagem:" + expressão); tente {resultado = calculator.cal (expressão) .toString (); } Catch (Exceção e) {result = "calculator.cal (expressão) .toString ();} catch (Exceção e) {e.printStackTrace ();} finalmente {// Algum trabalho de limpeza necessário se (null); null) {out.close (); Código -fonte do cliente criado por E/S de bloqueio síncrono:
pacote com.anxpp.io.calculator.bio; importar java.io.bufferedReader; importar java.io.ioException; importar java.io.inputStreamReader; importar java.io.printwriter; importar java.net.socket; /** * Cliente criado bloqueando E/O * @Author Yangtao__anxpp.com * @version 1.0 */public class Client {// Número da porta padrão Private static int default_server_port = 12345; String estática privada default_server_ip = "127.0.0.1"; public static void send (String Expression) {send (default_server_port, expressão); } public static void send (int porta, string expressão) {System.out.println ("A expressão aritmética é:" + expressão); Soquete soquete = nulo; BufferredReader in = null; PrintWriter Out = NULL; tente {socket = new Socket (default_server_ip, porta); in = new bufferredreader (new inputStreamReader (soket.getInputStream ())); out = new PrintWriter (Socket.getOutputStream (), true); out.println (expressão); System.out.println ("___ o resultado é:" + in.readline ()); } catch (Exceção e) {e.printStackTrace (); } finalmente {// são o trabalho de limpeza necessário se (in! = null) {tente {in.close (); } catch (ioexception e) {e.printStackTrace (); } in = null; } if (out! = null) {out.close (); out = null; } if (soquete! = null) {tente {socket.close (); } catch (ioexception e) {e.printStackTrace (); } soquete = null; }}}} Teste o código, para facilitar a visualização dos resultados da saída no console, coloque -o no mesmo programa (JVM) para executar:
pacote com.anxpp.io.calculator.bio; importar java.io.ioException; importar java.util.random; /** * Método de teste * @author yangtao__anxpp.com * @version 1.0 */public class Test {// Teste o método principal public estático void main (string [] args) lança interruptException {// execute o novo thread (novo runnable () {@override public void () {Try {TryBetter.Better.StErt.stert.sterbet (new Runnable () {@override public run () {Try {{) E.PrintStackTrace (); // Evite o cliente que executa o código antes do início do servidor; // Execute os operadores de char do cliente [] = {'+', '-', '*', '/'}; Aleatória aleatória = novo aleatório (system.currenttimemillis ()); novo Thread (novo runnable () {@suppresswarnings ("estático-access") @Override public void run () {while (true) {// aleatório gera expressão aritmética String expressão = random.nextInt (10)+""+operadores [random.nextInt (4)]+(Random.NexT (10) +1); Thread.CurrentThread (). Sleep (Random.NextInt (1000)); } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } { Fio. currentThread (). Sleep (Random.NextInt (1000)); } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } { Fio. currentThread (). Sleep (Random.NextInt (1000)); }}}}}}}} Os resultados de uma das corridas:
O servidor foi iniciado, o número da porta: 12345 A expressão aritmética é: 4-2 Servidor recebeu a mensagem: 4-2 ___ O resultado é: 2 Expressão aritmética é: 5-10 servidor recebeu a mensagem: 5-10__ resultado é: -5 aritmética Expressão é: 0-9 Servidor Recebido A mensagem: 0-9 Resultado é: -9 ARITHMETS IS: 0 ANDA I é: 0-9 Servidor Recebido A mensagem 0-9 __ IS: -9 ARITHMET IS: Recebi a mensagem: 1/6__ O resultado é: 0.166666666666666666666666666666666666666666666666666666666666666666666666666666666666 66666666666666666666666666666666666666666666666666666666666666666666666666666666666666 66666666666666666666666666666666666666666666666666666666666666666666666666666666666666 66666666666666666666666666666666666666666666666666666666666666666666666666666666666666
A partir do código acima, é fácil ver que o principal problema da biografia é que sempre que um novo cliente solicita acesso, o servidor deve criar um novo thread para lidar com esse link, que não pode ser aplicado em cenários onde são necessários alto desempenho e alta simultaneidade (um grande número de novos threads afetará seriamente o desempenho do servidor e até a greve).
1.2. Programação de E/S pseudo-síncrona
Para melhorar esse modelo de thread de um conexão, podemos usar um pool de threads para gerenciar esses tópicos (para obter mais informações, consulte o artigo fornecido anteriormente), implementando um modelo para um ou mais threads para processar n clientes (mas a camada subjacente ainda usa o Modelo de Bloqueio Síncrono ", que é chamado de" Pseudo-asinconsonse I/O Model ".
Diagrama de modelos de E/S pseudo-asincronos:
A implementação é muito simples. Só precisamos entregar o novo thread para encadear o gerenciamento do pool e apenas alterar o código do servidor agora: agora:
pacote com.anxpp.io.calculator.bio; importar java.io.ioException; importar java.net.serversocket; importar java.net.socket; importar java.util.concurrent.executorService; importar java.util.concurrent.executores; /** * Code__PSOUDO-ASYNCHRONOUS do servidor Bio * @Author Yangtao__anxpp.com * @version 1.0 */public Final Class ServerBetter {// Número da porta padrão privado estático int default_port = 12345; // Singleton ServerSocket Private Static ServerSocket Server; // Singleton Private Static ExecorsService ExecorService = Executores.NewfixedThreadpool (60); // Defina a porta de escuta de acordo com os parâmetros recebidos. Se não houver parâmetro, chame o seguinte método e use o valor padrão public static void start () lança ioexception {// use o valor padrão inicial (default_port); } // Este método não será acessado em um grande número de simultaneamente, e não há necessidade de considerar a eficiência, apenas sincronize o método diretamente público sincronizado estático void inicial (int) lança ioexception {if (server! = Null) retornar; tente {// Crie o ServerSocket através do construtor // Se a porta estiver legal e ociosa, o servidor ouvirá com sucesso. Server = new ServerSocket (porta); System.out.println ("O servidor foi iniciado, número da porta:" + porta); // Superce a conexão do cliente através do loop sem fio // Se não houver acesso ao cliente, ela será bloqueada na operação de aceitação. while (true) {soquete soquete = server.accept (); // Quando houver um novo acesso ao cliente, o código a seguir será executado // Crie um novo thread para processar o link de soquete ExecutorService.execute (new ServerHandler (Socket)); }} finalmente {// algum trabalho de limpeza necessário se (servidor! = null) {System.out.println ("o servidor está fechado."); server.close (); servidor = nulo; }}}} Os resultados da execução do teste são os mesmos.
Sabemos que, se usarmos o Pool de threads do CachedThreadpool (sem limitar o número de threads, se não estiver claro, consulte o artigo fornecido no início do artigo), de fato, além de nos ajudar a gerenciar automaticamente os threads (reutilização), também se parece com um modelo de contagem de threads: 1: 1. Usando o FILLTHREADPOOL, controlamos efetivamente o número máximo de encadeamentos, garantimos o controle de recursos limitados do sistema e implementamos o modelo de E/S pseudo-síncrono n: m.
No entanto, como o número de threads é limitado, se ocorrer um grande número de solicitações simultâneas, os encadeamentos que excedem o número máximo só podem esperar até que haja threads gratuitos no pool de encadeamentos que possam ser reutilizados. Quando o fluxo de entrada do soquete for lido, ele será bloqueado até ocorrer:
Portanto, ao ler os dados são lentos (como grande quantidade de dados, transmissão de rede lenta etc.) e grandes quantidades de simultaneidade, outras mensagens de acesso só podem ser esperadas o tempo todo, o que é a maior desvantagem.
O NIO que será introduzido posteriormente pode resolver esse problema.
2. Programação de Nio
A biblioteca de E/S do New Java é introduzida no pacote java.nio.* No JDK 1.4, com o objetivo de aumentar a velocidade. De fato, o pacote de E/S "antigo" foi reimplementado usando o NIO, e podemos nos beneficiar dele, mesmo que não usemos explicitamente a programação do NIO. As melhorias de velocidade podem ocorrer na E/S de arquivo e à E/S de rede, mas este artigo discute apenas o último.
2.1. Introdução
Geralmente, pensamos no NIO como uma nova E/S (também o nome oficial), porque é novo na antiga biblioteca de E/S (na verdade, ela foi introduzida no JDK 1.4, mas esse substantivo continuará sendo usado por um longo tempo, mesmo que eles sejam "velhos" agora, por isso também nos lembra que precisamos considerá -lo cuidadosamente quando a nomear) e, agora, é importante. No entanto, é chamado de E/S não-block por muitas pessoas, ou seja, E/S não bloqueando, porque isso é chamado, pode refletir melhor suas características. O NIO no texto a seguir não se refere a toda a nova biblioteca de E/S, mas não está bloqueando a E/S.
O NIO fornece duas implementações diferentes de canal de soquete: o SocketchAnnel e o ServerSocketchannel correspondentes ao soquete e ao servidorsocket no modelo biológico tradicional.
Ambos os canais recém-adicionados suportam modos de bloqueio e não bloqueio.
O uso do modo de bloqueio é tão simples quanto o suporte tradicional, mas o desempenho e a confiabilidade não são bons; O modo não bloqueador é exatamente o oposto.
Para aplicações de baixa carga e baixa concorrência, a E/S de bloqueio síncrono pode ser usado para melhorar a taxa de desenvolvimento e melhorar a manutenção; Para aplicativos de alta carga e alta concorrência (rede), o modo de NIO não bloqueador deve ser usado para se desenvolver.
O conhecimento básico será introduzido primeiro abaixo.
2.2. Buffer buffer
Um buffer é um objeto que contém alguns dados a serem escritos ou lidos.
Na biblioteca do NIO, todos os dados são processados em um buffer. Ao ler dados, eles são lidos diretamente no buffer; Ao escrever dados, eles também são gravados no buffer. A qualquer momento, você acessa dados no NIO, eles são operados por meio de um buffer.
Um buffer é na verdade uma matriz e fornece informações como acesso estruturado a dados e manutenção de locais de leitura e gravação.
As áreas de cache específicas são: ByteBuffe, Charbuffer, ShortBuffer, Intbuffer, LongBuffer, Floatbuffer, Buffer DoubleBuffer. Eles implementam a mesma interface: buffer.
2.3. Canal
Nossa leitura e escrita de dados devem ser passadas pelo canal, que é como um tubo de água, um canal. A diferença entre um canal e um fluxo é que o canal é bidirecional e pode ser usado para leitura, gravação e operações simultâneas de leitura e gravação.
Os canais do sistema operacional subjacente geralmente são duplex completos, portanto, um canal duplex completo pode mapear melhor a API do sistema operacional subjacente do que um fluxo.
Os canais são divididos principalmente em duas categorias:
O ServerSocketchAnnel e o Socketchannel que estarão envolvidos no código a seguir são subclasses do SelectableChannel.
2.4. Seletor de multiplexador
Seletor é a base da programação Java Nio.
O seletor fornece a capacidade de selecionar tarefas prontas: o seletor pesquisará constantemente o canal registrado nele. Se ocorrer um evento de leitura ou gravação em um canal, o canal estará no estado pronto e será pesquisado pelo seletor. Em seguida, o conjunto de canais prontos pode ser obtido através do SelectionKey para executar operações subsequentes de E/S.
Um seletor pode pesquisar vários canais ao mesmo tempo, porque o JDK usa o epoll () em vez da implementação tradicional de seleção, não há limite no identificador máximo de conexão 1024/2048. Portanto, apenas um thread deve ser responsável pela pesquisa do seletor e pode acessar milhares de clientes.
2.5. Nio Server
O código parece muito mais complicado que a programação tradicional do soquete.
Basta colar o código e dar a descrição do código na forma de comentários.
Código -fonte do servidor criado por Nio:
pacote com.anxpp.io.calculator.nio; Public Class Server {private static int default_port = 12345; Servidor estático privado Manguarda de servidor; public static void start () {start (default_port); } public static sincronizado void inicial (int porta) {if (serverHandle! = null) serverHandle.stop (); ServerHandle = new ServerHandle (porta); novo thread (serverhandle, "servidor"). start (); } public static void main (string [] args) {start (); }} ServerHandle:
pacote com.anxpp.io.calculator.nio; importar java.io.ioException; importar java.net.inetsocketaddress; importar java.nio.byteBuffer; importar java.nio.channels.selectionKey; importar java.nio.channels.selector; importar java.nio.channels.serversocketchannel; importar java.nio.channels.socketchannel; importar java.util.iterator; importar java.util.set; importação com.anxpp.io.utils.calculator; / ** * NIO Server * @Author Yangtao__anxpp.com * @version 1.0 */ public class ServerHandle implementa Runnable {seletor de seletor privado; Serversocketchannel ServerChannel privado; O booleano volátil privado começou; /*** Construtor* @param porta Especifique o número da porta a ser ouvido*/public serverHandle (int porta) {try {// crie selector = Selector.open (); // Abra o canal de audição serverChannel = serversocketchannel.open (); // Se verdadeiro, este canal será colocado no modo de bloqueio; Se false, esse canal será colocado em modo não bloqueador serverChannel.configureblocking (false); // Ativar modo de backlog não de bloqueio // é definido como 1024 ServerChannel.socket (). Bind (New InetSocketAddress (porta), 1024); // Superce Conexão do cliente Request ServerChannel.register (Selector, SelectionKey.OP_ACECT); // Mark O servidor está ativado iniciado = true; System.out.println ("O servidor foi iniciado, número da porta:" + porta); } catch (ioexception e) {e.printStackTrace (); System.Exit (1); }} public void stop () {iniciado = false; } @Override public void run () {// Faça um loop através do seletor while (iniciado) {tente {// Se houver um evento de leitura e gravação, o seletor é acordado a cada seletor 1S selector (1000); // bloqueando, continuará apenas quando ocorrer pelo menos um evento registrado. // SELECTOR.SELECT (); Set <sectionKey> keys = Selector.SelectedKeys (); Iterator <SelectionKey> it = keys.iterator (); Seleção keyKey Key = NULL; while (it.hasnext ()) {key = it.next (); it.remove (); tente {handleInput (chave); } catch (Exceção e) {if (key! = null) {key.cancel (); if (key.channel ()! = null) {key.channel (). close (); }}}}}}}}} catch (throwable t) {t.printStackTrace (); }} // Depois que o seletor estiver fechado, os recursos gerenciados serão lançados automaticamente se (seletor! = Null) tente {Selector.close (); } catch (Exceção e) {e.printStackTrace (); }} private void handleInput (chave de seleção) lança ioexception {if (key.isvalid ()) {// processando a mensagem de solicitação para um novo acesso if (key.isacceptable ()) {serverSocketchannel ssc = (serverSocketchAnnel) key.Channel ();; // Criando uma instância do Socketchannel através do ServerSocketchAnnel Aceitar // Complete esta operação significa concluir o aperto de mão de três vias do TCP, e o link físico do TCP é oficialmente estabelecido. Socketchannel sc = sc.accept (); // defina como sc.configureblockbockbockbock (false); // Registre -se como leia SC.Register (Selector, SelectionKey.op_read); } // Leia a mensagem if (key.isReadelable ()) {socketchannel sc = (socketchannel) key.channel (); // Crie bytebuffer e abra um buffer buffer de 1M de buffer = bytebuffer.allocate (1024); // Leia o fluxo de solicitação e retorne o número de bytes read int readbytes = sc.read (buffer); // leia bytes e codifique os bytes if (readbytes> 0) {// Defina o limite atual do buffer como posicionar = 0, para as operações de leitura subsequentes do buffer.flip (); // Crie uma matriz de bytes com base no número de bytes legíveis de buffer bytes = novo byte [buffer.remaining ()]; // copie a matriz de bytes legível do buffer no recém -criado Buffer.get.get (bytes); String Expression = new String (bytes, "UTF-8"); System.out.println ("Servidor recebeu uma mensagem:" + expressão); // Processando o resultado da string de dados = null; tente {resultado = calculator.cal (expressão) .toString (); } catch (Exceção e) {resultado = "Erro de computação:" + e.getMessage (); } // Envie a mensagem de resposta dowrite (sc, resultado); } // sem bytes lê e ignoram // else if (readbytes == 0); // O link foi fechado, liberando o recurso else if (readbytes <0) {key.cancel (); sc.close (); }}}}} // Envie a mensagem de resposta de maneira assíncrona privada dowrite (canal de Socketchannel, resposta de string) lança ioexception {// codificando a mensagem como uma matriz de bytes byte [] bytes = Response.getBytes (); // Crie bytebuffer de acordo com a capacidade de matriz bytebuffer writeBuffer = bytebuffer.allocate (bytes.length); // copie a matriz de bytes para o buffer writebuffer.put (bytes); // Operação flip writebuffer.flip (); // Envie a matriz de bytes do buffer canal.write (writebuffer); // ***** O código para processamento "Write Half-Packet" não está incluído aqui}}Como você pode ver, as principais etapas para criar um servidor Nio são as seguintes:
Como a mensagem de resposta é enviada, o SocketchAnnel também é assíncrono e não bloqueando, portanto, não é garantido que os dados que precisam ser enviados possam ser enviados ao mesmo tempo, e haverá um problema de escrever meio pacote neste momento. Precisamos registrar uma operação de gravação, pesquisar constantemente o seletor para enviar as mensagens não destacadas e, em seguida, usar o método Hasremain () do buffer para determinar se a mensagem foi enviada.
2.6. Cliente nio
É melhor fazer upload do código. O processo não requer muita explicação, é um pouco semelhante ao código do servidor.
Cliente:
pacote com.anxpp.io.calculator.nio; public class Client {private Static String default_host = "127.0.0.1"; private estático int default_port = 12345; O cliente estático privado e o cliente do cliente; public static void start () {start (default_host, default_port); } public static sincronizado void inicial (string ip, int porta) {if (clientHandle! = null) clientHandle.stop (); clientHandle = new ClientHandle (IP, porta); novo thread (clienthandle, "servidor"). start (); } // Envie uma mensagem para o servidor público estático boolean sendmsg (string msg) lança exceção {if (msg.equals ("q")) retorna false; ClientHandle.Sendmsg (MSG); retornar true; } public static void main (string [] args) {start (); }} ClientHandle:
pacote com.anxpp.io.calculator.nio; importar java.io.ioException; importar java.net.inetsocketaddress; importar java.nio.byteBuffer; importar java.nio.channels.selectionKey; importar java.nio.channels.selector; importar java.nio.channels.socketchannel; importar java.util.iterator; importar java.util.set; / ** * cliente nio * @author yangtao__anxpp.com * @version 1.0 */ public class ClientHandle implementa runnable {private string host; private int porta; seletor de seletor privado; Socketchannel privado Socketchannel; O booleano volátil privado começou; public clientHandle (string ip, int porta) {this.host = ip; this.port = porta; tente {// crie seletor Selector = Selector.open (); // Abra o canal de audição Socketchannel = socketchannel.open (); // Se verdadeiro, este canal será colocado no modo de bloqueio; Se false, este canal será colocado no modo de bloqueio não bloqueio SocketchAnel.configureblocking (false); // Abra o modo não de bloco iniciado = true; } catch (ioexception e) {e.printStackTrace (); System.Exit (1); }} public void stop () {iniciado = false; } @Override public void run () {try {doconnect (); } catch (ioexception e) {e.printStackTrace (); System.Exit (1); } // Faça um loop através do seletor enquanto (iniciado) {tente {//, independentemente de haver um evento de leitura e gravação, o seletor é despertado a cada 1s selettor.select (1000); // bloqueando, e continuará apenas quando ocorrer pelo menos um evento registrado. // SELECTOR.SELECT (); Set <sectionKey> keys = Selector.SelectedKeys (); Iterator <SelectionKey> it = keys.iterator (); Seleção keyKey Key = NULL; while (it.hasnext ()) {key = it.next (); it.remove (); tente {handleInput (chave); } catch (Exceção e) {if (key! = null) {key.cancel (); if (key.channel ()! = null) {key.channel (). close (); }}}}}}}} catch (Exceção e) {e.printStackTrace (); System.Exit (1); }} // Depois que o seletor estiver fechado, os recursos gerenciados serão lançados automaticamente se (seletor! = Null) tente {Selector.close (); } catch (Exceção e) {e.printStackTrace (); }} private void handleInput (chave de seleção) lança ioexception {if (key.isValid ()) {socketchannel sc = (socketchannel) key.channel (); if (key.isconnectable ()) {if (sc.finishconnect ()); else System.Exit (1); } // Leia a mensagem if (key.isReadelable ()) {// Crie bytebuffer e abra um buffer de buffer de 1M = buffer bytebuffer.allocate (1024); // Leia o fluxo de código de solicitação e retorne o número de bytes read int readbytes = sc.read (buffer); // leia bytes e codifique os bytes if (readbytes> 0) {// Defina o limite atual do buffer como posicionar = 0, para as operações de leitura subsequentes do buffer.flip (); // Crie uma matriz de bytes com base no número de bytes legíveis no buffer byte [] bytes = new Byte [buffer.remaining ()]; // copie a matriz de bytes legível do buffer no recém -criado Buffer.get.get (bytes); String result = new String (bytes, "UTF-8"); System.out.println ("Cliente recebeu uma mensagem:" + resultado); } // nenhum bytes lido é ignorado // else if (readbytes == 0); // O link foi fechado, liberando o recurso else if (readbytes <0) {key.cancel (); sc.close (); }}}}} // Envie mensagens de forma assíncrona privada dowrite (canal de Socketchannel, String request) lança ioexception {// codificando a mensagem como uma matriz de byte byte [] bytes = request.getBytes (); // criando bytebuffer com base na capacidade de matriz bytebuffer writebuffer = bytebuffer.allocate (bytes.length); // copiando a matriz de bytes para o buffer writebuffer.put (bytes); // Operação flip writebuffer.flip (); // Envie o canal de matriz de bytes.write (writebuffer); // ***** O Código para Processamento "Write Half-Packet" não está incluído aqui} private void doconnect () lança ioexception {if (socketchannel.connect (novo inetSocketAddress (host, porta))); else socketchannel.register (seletor, SelectionKey.op_Connect); } public void sendmsg (string msg) lança a exceção {socketchannel.register (seletor, seleçãokey.op_read); Dowrite (Socketchannel, MSG); }} 2.7. Resultados da demonstração
Primeiro execute o servidor e execute um cliente a propósito:
pacote com.anxpp.io.calculator.nio; importar java.util.scanner; /** * Método de teste * @author yangtao__anxpp.com * @version 1.0 */public class Test {// Teste o método principal @suppresswarnings ("Resource") public static void main (string [] args) lança exceção {// Run Server.start (); // Evite o cliente que executa o encadeamento do código.sleep (100); // Execute client client.start (); while (client.sendmsg (new scanner (System.in) .NextLine ())); }} Também podemos executar o cliente separadamente, e os efeitos são os mesmos.
Resultados de um teste:
The server has been started, port number: 123451+2+3+4+5+6 The server received the message: 1+2+3+4+5+6 The client received the message: 211*2/3-4+5*6/7-8 The server received the message: 1*2/3-4+5*6/7-8 The client received the message: -7.0476190476190474
Não há problema em executar vários clientes.
3. Programação AIO
O NIO 2.0 apresenta o conceito de novos canais assíncronos e fornece implementações de canais de arquivos assíncronos e canais de soquete assíncronos.
O canal de soquete assíncrono é realmente assíncrono de E/S não bloqueadora, correspondente à E/S acionada por eventos (AIO) na programação da rede UNIX. Não requer muitos seletores para pesquisar os canais registrados para obter leitura e gravação assíncronas, simplificando assim o modelo de programação do NIO.
Basta fazer upload do código.
3.1. Código lateral do servidor
Servidor:
pacote com.anxpp.io.calculator.aio.server; / ** * servidor AIO * @author yangtao__anxpp.com * @version 1.0 */ public class Server {private static int default_port = 12345; ASYNCSERVERVERHANDLER STÁTICO PRIVADO Public Volátil estático LONG CLIENTCOUNT = 0; public static void start () {start (default_port); } public static sincronizado void inicial (int porta) {if (serverHandle! = null) return; serverHandle = new AsyncServerHandler (porta); novo thread (serverhandle, "servidor"). start (); } public static void main (string [] args) {server.start (); }} AsyncserverVHandler:
pacote com.anxpp.io.calculator.aio.server; importar java.io.ioException; importar java.net.inetsocketaddress; importar java.nio.channels.asyncserversocketchannel; importar java.util.concurrent.countdownlatch; classe pública AsyncServerHandler implementa Runnable {public CountDownLatch Latch; canal público assíncrono -serversocketchannel; public AsyncServerHandler (int porta) {try {// Criar canal do servidor = Asynchronserversocketchannel.open (); // Bind Port Channel.bind (new inetSocketAddress (porta)); System.out.println ("O servidor foi iniciado, número da porta:" + porta); } catch (ioexception e) {e.printStackTrace (); }} @Override public void run () {// CountDownLatch Inicialização // sua função: Permita que o campo atual bloqueie o tempo todo antes de concluir um conjunto de operações sendo executadas // aqui, deixe o campo de campo aqui para impedir que o servidor não seja necessário para se preocupar com a execução // também pode usar (o ambiente de sono), que não se preocupa, não se preocupa com a necessidade de que o servidor não se preocupe; // conexão canal.accept (isto, new AceptHandler ()); tente {latch.await (); } catch (interruptedException e) {e.printStackTrace (); }}} Aceptandler:
pacote com.anxpp.io.calculator.aio.server; importar java.nio.byteBuffer; importar java.nio.channels.asynchronsocketchannel; importar java.nio.channels.completionHandler; // Conecte -se como um manipulador de classe pública AceptHandler implementa a conclusão de mão -de -obra <SynchronsocketchAnnel, AsyncServerHandler> {@Override public void concluído (AsynchronsocketchAnnel Channel, AsyncServerHandler ServerHandHler) {// continua a aceitar de solicitações de outros clientes servidor.clientCount+; System.out.println ("Número de clientes conectados:" + server.clientCount); ServerHandler.Channel.Acept (ServerHandler, este); // crie um novo buffer buffer buffer = bytebuffer.allocate (1024); //Asynchronously read the third parameter for the service that receives message callbacks Handler channel.read(buffer, buffer, new ReadHandler(channel)); } @Override public void failed(Throwable exc, AsyncServerHandler serverHandler) { exc.printStackTrace(); serverHandler.latch.countDown(); }} ReadHandler:
package com.anxpp.io.calculator.aio.server; importar java.io.ioException; import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import com.anxpp.io.utils.Calculator; public class ReadHandler implements CompletionHandler<Integer, ByteBuffer> { //Used to read semi-packet messages and send answers private AsynchronousSocketChannel channel; public ReadHandler(AsynchronousSocketChannel channel) { this.channel = channel; } // Processing after reading the message @Override public void completed(Integer result, ByteBuffer attachment) { //flip operation attachment.flip(); //According to byte[] message = new byte[attachment.remaining()]; attachment.get(message); try { String expression = new String(message, "UTF-8"); System.out.println("The server received a message: " + expression); String calrResult = null; try{ calrResult = Calculator.cal(expression).toString(); } catch(Exception e){ calrResult = "Calculator error: " + e.getMessage(); } //Send a message doWrite(calrResult); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } //Send a message private void doWrite(String result) { byte[] bytes = result.getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); writeBuffer.put(bytes); writeBuffer.flip(); //Asynchronous write data parameters are the same as the previous read channel.write(writeBuffer, writeBuffer,new CompletionHandler<Integer, ByteBuffer>() { @Override public void completed(Integer result, ByteBuffer buffer) { //If it is not sent, continue to send until it is completed (buffer.hasRemaining()) channel.write(buffer, buffer, this); else{ //Create a new Buffer ByteBuffer readBuffer = ByteBuffer.allocate(1024); //Asynchronously read the third parameter for the business that receives message callbacks. Handler channel.read(readBuffer, readBuffer, new ReadHandler(channel)); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { channel.close(); } catch (IOException e) { } } }); } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { this.channel.close(); } catch (ioexception e) {e.printStackTrace (); } } } OK,这样就已经完成了,其实说起来也简单,虽然代码感觉很多,但是API比NIO的使用起来真的简单多了,主要就是监听、读、写等各种CompletionHandler。此处本应有一个WriteHandler的,确实,我们在ReadHandler中,以一个匿名内部类实现了它。
下面看客户端代码。
3.2. Client side code
Cliente:
package com.anxpp.io.calculator.aio.client; importar java.util.scanner; public class Client { private static String DEFAULT_HOST = "127.0.0.1"; private static int DEFAULT_PORT = 12345; private static AsyncClientHandler clientHandle; public static void start(){ start(DEFAULT_HOST,DEFAULT_PORT); } public static synchronized void start(String ip,int port){ if(clientHandle!=null) return; clientHandle = new AsyncClientHandler(ip,port); new Thread(clientHandle,"Client").start(); } //Send a message to the server public static boolean sendMsg(String msg) throws Exception{ if(msg.equals("q")) return false; clientHandle.sendMsg(msg); return true; } @SuppressWarnings("resource") public static void main(String[] args) throws Exception{ Client.start(); System.out.println("Please enter the request message:"); Scanner scanner = new Scanner(System.in); while(Client.sendMsg(scanner.nextLine())); }} AsyncClientHandler:
package com.anxpp.io.calculator.aio.client; importar java.io.ioException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.AsyncronousSocketChannel; import java.nio.channels.CompletionHandler; import java.util.concurrent.CountDownLatch; public class AsyncClientHandler implements CompletionHandler<Void, AsyncClientHandler>, Runnable { private AsynchronousSocketChannel clientChannel; private String host; private int porta; private CountDownLatch latch; public AsyncClientHandler(String host, int port) { this.host = host; this.port = port; try { //Create an asynchronous client channel clientChannel = AsynchronousSocketChannel.open(); } catch (ioexception e) {e.printStackTrace (); } } @Override public void run() { //Create CountDownLatch and wait latch = new CountDownLatch(1); //Initiate an asynchronous connection operation, the callback parameter is this class itself. If the connection is successful, the completed method clientChannel.connect(new InetSocketAddress(host, port), this, this); try { latch.await(); } catch (InterruptedException e1) { e1.printStackTrace(); } try { clientChannel.close(); } catch (ioexception e) {e.printStackTrace (); } } //Connect the server successfully// means TCP completes three handshakes @Override public void completed(Void result, AsyncClientHandler attachment) { System.out.println("Client connection to the server..."); } //Connecting to the server failed @Override public void failed(Throwable exc, AsyncClientHandler attachment) { System.err.println("Connecting to the server failed..."); exc.printStackTrace(); try { clientChannel.close(); latch.countDown(); } catch (ioexception e) {e.printStackTrace (); } } //Send a message to the server public void sendMsg(String msg){ byte[] req = msg.getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(req.length); writeBuffer.put(req); writeBuffer.flip(); //Asynchronously write clientChannel.write(writeBuffer, writeBuffer,new WriteHandler(clientChannel, latch)); }} WriteHandler:
package com.anxpp.io.calculator.aio.client; importar java.io.ioException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.util.concurrent.CountDownLatch; public class WriteHandler implements CompletionHandler<Integer, ByteBuffer> { private AsynchronousSocketChannel clientChannel; private CountDownLatch latch; public WriteHandler(AsynchronousSocketChannel clientChannel,CountDownLatch latch) { this.clientChannel = clientChannel; this.latch = latch; } @Override public void completed(Integer result, ByteBuffer buffer) { //Complete writing of all data if (buffer.hasRemaining()) { clientChannel.write(buffer, buffer, this); } else { //Read data ByteBuffer readBuffer = ByteBuffer.allocate(1024); clientChannel.read(readBuffer,readBuffer,new ReadHandler(clientChannel, latch)); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { System.err.println("Data send failed..."); try { clientChannel.close(); latch.countDown(); } catch (IOException e) { } } } ReadHandler:
package com.anxpp.io.calculator.aio.client; importar java.io.ioException; import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.util.concurrent.CountDownLatch; public class ReadHandler implements CompletionHandler<Integer, ByteBuffer> { private AsynchronousSocketChannel clientChannel; private CountDownLatch latch; public ReadHandler(AsynchronousSocketChannel clientChannel,CountDownLatch latch) { this.clientChannel = clientChannel; this.latch = latch; } @Override public void completed(Integer result,ByteBuffer buffer) { buffer.flip(); byte[] bytes = new byte[buffer.remaining()]; buffer.get(bytes); String body; try { body = new String(bytes,"UTF-8"); System.out.println("Client received result:"+ body); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } @Override public void failed(Throwable exc,ByteBuffer attachment) { System.err.println("Data read failed..."); try { clientChannel.close(); latch.countDown(); } catch (IOException e) { } } }这个API使用起来真的是很顺手。
3.3. Teste
Teste:
package com.anxpp.io.calculator.aio; importar java.util.scanner; import com.anxpp.io.calculator.aio.client.Client; import com.anxpp.io.calculator.aio.server.Server; /** * Test method* @author yangtao__anxpp.com * @version 1.0 */ public class Test { //Test main method @SuppressWarnings("resource") public static void main(String[] args) throws Exception{ //Run server Server.start(); //Avoid the client executing the code Thread.sleep(100); //Run client Client.start(); System.out.println("Please enter the request message:"); Scanner scanner = new Scanner(System.in); while(Client.sendMsg(scanner.nextLine())); }}我们可以在控制台输入我们需要计算的算数字符串,服务器就会返回结果,当然,我们也可以运行大量的客户端,都是没有问题的,以为此处设计为单例客户端,所以也就没有演示大量客户端并发。
读者可以自己修改Client类,然后开辟大量线程,并使用构造方法创建很多的客户端测试。
下面是其中一次参数的输出:
服务器已启动,端口号:12345请输入请求消息:客户端成功连接到服务器...连接的客户端数:1123456+789+456服务器收到消息: 123456+789+456客户端收到结果:1247019526*56服务器收到消息: 9526*56客户端收到结果:533456...
AIO是真正的异步非阻塞的,所以,在面对超级大量的客户端,更能得心应手。
下面就比较一下,几种I/O编程的优缺点。
4、各种I/O的对比
先以一张表来直观的对比一下:
具体选择什么样的模型或者NIO框架,完全基于业务的实际应用场景和性能需求,如果客户端很少,服务器负荷不重,就没有必要选择开发起来相对不那么简单的NIO做服务端;相反,就应考虑使用NIO或者相关的框架了。
5、附录
上文中服务端使用到的用于计算的工具类:
package com.anxpp.utils;import javax.script.ScriptEngine;import javax.script.ScriptEngineManager;import javax.script.ScriptException;public final class Calculator { private final static ScriptEngine jse = new ScriptEngineManager().getEngineByName("JavaScript"); public static Object cal(String expression) throws ScriptException{ return jse.eval(expression); }}O exposto acima é todo o conteúdo deste artigo. Espero que seja útil para o aprendizado de todos e espero que todos apoiem mais o wulin.com.