Эта статья будет представлена от мелкого до глубокого от традиционной биографии до NIO до AIO, и будет сопровождаться полным объяснением кодекса.
Пример будет использоваться в следующем коде: клиент отправляет строку уравнения на сервер, а сервер возвращает результат клиенту после расчета.
Все инструкции для кода используются непосредственно в качестве комментариев и встроены в код, что может быть легче понять при чтении кода. Класс инструментов для расчета результата будет использоваться в коде, см. Раздел кода статьи.
Рекомендуемые статьи для связанных базовых знаний:
Введение в модель ввода/вывода Linux Network (изображения и текст)
Параллелизм Java (многопоточное)
1. Био программирование
1.1. Традиционное биопрограммирование
Основная модель сетевого программирования - это модель C/S, то есть связь между двумя процессами.
Сервер обеспечивает IP и прослушивание портов. Клиент инициирует запрос подключения через адрес операции подключения, который сервер хочет прослушать. Через три рукопожатия, если соединение успешно установлено, обе стороны могут общаться через гнезда.
При разработке традиционной модели блокировки синхронизации Serversocket отвечает за привязывание IP -адресов и запуска портов прослушивания; Сокет отвечает за инициирование операций подключения. После успешного соединения обе стороны проводят синхронную блокирующую связь через входные и выходные потоки.
Краткое описание модели связи био сервера: сервер, использующий модель биологической связи, обычно является независимым акцепторным потоком, ответственным за прослушивание подключения клиента. После получения запроса на подключение клиента он создает новый поток для каждого клиента для обработки ссылок и не обрабатывает его, а затем возвращает ответ клиенту через выходной поток, и поток уничтожен. То есть типичная модель All-Reply Allply.
Традиционная диаграмма модели биологической связи:
Самая большая проблема с этой моделью заключается в том, что ей не хватает эластичных возможностей масштабирования. Когда количество одновременных доступа к клиенту увеличивается, количество потоков на сервере пропорционально количеству одновременных доступа к клиенту. Поток в Java также являются относительно ценными системными ресурсами. После того, как количество потоков быстро расширяется, производительность системы резко упадет. Поскольку количество доступа продолжает расти, система в конечном итоге умрет.
Исходный код сервера, созданный с помощью синхронного блокирования ввода -вывода:
пакет com.anxpp.io.calculator.bio; импортировать java.io.ioexception; импортировать java.net.serversocket; импортировать java.net.socket; /** * Исходный код био сервера * @author yangtao__anxpp.com * @version 1.0 */public final Class Servernormal {// Номер порта по умолчанию Private Static int default_port = 12345; // Singleton Serversocket Private Static Serversocket Server; // Установить порт прослушивания в соответствии с входящими параметрами. Если нет параметров, вызовите следующий метод и используйте значение по умолчанию public static void start () бросает ioException {// Использовать значение значения по умолчанию (default_port); } // Этот метод не будет доступен большим количеством одновременных способов, и нет необходимости учитывать эффективность, просто синхронизируйте метод непосредственно общедоступной синхронизированной статической void start (int port) бросает ioException {if (server! = Null) return; Попробуйте {// Создать Serversocket через конструктор // Если порт является законным и простоя, сервер будет успешно прослушиваться. Server = new Serversocket (порт); System.out.println («Сервер был запущен, номер порта:« + порт); // Слушайте клиентские подключения через беспроводной цикл // Если нет клиентского доступа, оно будет заблокировано при операции принятия. while (true) {socket socket = server.accept (); // Когда есть новый клиент -доступ, будет выполнен следующий код //, затем создайте новый поток для обработки этой новой поток сокета (New ServerHandler (Socket)). Start (); }} Наконец {// Некоторая необходимая очистка, if (server! = null) {System.out.println («Сервер закрыт.»); server.close (); сервер = null; }}}} Обработка клиентских сообщений Сервер Сервер и исходный код:
пакет com.anxpp.io.calculator.bio; Импорт java.io.bufferedReader; импортировать java.io.ioexception; Импорт java.io.inputStreamReader; Импорт java.io.printwriter; импортировать java.net.socket; импорт com.anxpp.io.utils.calculator; / *** Клиентский поток* @author yangtao__anxpp.com* Ссылка для клиента*/ public class serverhandler реализует runnable {private Socket Socket; public serverhandler (сокет сокета) {this.socket = socket; } @Override public void run () {BufferedReader in = null; Printwriter Out = null; try {in = new BufferedReader (new InputStreamReader (socket.getInputStream ())); out = new PrintWriter (socket.getOutputStream (), true); Строка выражения; Строка результата; while (true) {// Прочитайте линию через BufferedReader // Если вы прочитали хвост входного потока, верните нуль и выйдите из цикла // Если вы получите не-нулевое значение, попробуйте вычислить результат и вернуть if (Express = in.Readline ()) == NULL) разрыв; System.out.println («Сервер получил сообщение:» + выражение); try {result = calculator.cal (выражение) .toString (); } catch (Exception e) {result = "calculator.cal (Expression) .toString ();} Catch (Exception e) {e.printstacktrace ();} наконец {// Некоторая необходимая работа по очистке if (in! = null) {try {in.close ();} catch (ioException e) {e.printStacktrace (); null) {out.close (); Клиентский исходный код, созданный синхронным блокирующим вводом/выводом:
пакет com.anxpp.io.calculator.bio; Импорт java.io.bufferedReader; импортировать java.io.ioexception; Импорт java.io.inputStreamReader; Импорт java.io.printwriter; импортировать java.net.socket; /** * Клиент, созданный путем блокировки ввода/o * @author yangtao__anxpp.com * @version 1.0 */public client client {// Номер порта по умолчанию Private Static int default_server_port = 12345; частная статическая строка default_server_ip = "127.0.0.1"; public static void send (string выражение) {send (default_server_port, Expression); } public static void send (int port, string выражение) {System.out.println ("Арифметическое выражение:" + выражение); Сокет сокет = null; BufferedReader in = null; Printwriter Out = null; try {socket = new Socket (default_server_ip, порт); in = new BufferedReader (новый inputStreamReader (socket.getInputStream ())); out = new PrintWriter (socket.getOutputStream (), true); out.println (выражение); System.out.println ("___ Результат:" + in.readline ()); } catch (Exception e) {e.printstackTrace (); } Наконец {// - это необходимая очистка, если (in! = null) {try {in.close (); } catch (ioException e) {e.printstackTrace (); } in = null; } if (out! = null) {out.close (); out = null; } if (socket! = null) {try {socket.close (); } catch (ioException e) {e.printstackTrace (); } socket = null; }}}} Проверьте код, чтобы облегчить просмотр результатов вывода в консоли, поместите его в ту же программу (JVM) для запуска:
пакет com.anxpp.io.calculator.bio; импортировать java.io.ioexception; импортировать java.util.random; /** * Метод тестирования * @author yangtao__anxpp.com * @version 1.0 */public class test {// Проверка основного метода public static void main (string [] args) Throws urruptEdException {// Запуск сервера New Thread (new Runnable () {@Override public void Run () {trest {server.start () atexcept exception (); e.printstacktrace ();}}}). start (); // Избегайте клиента, выполняющего код до начала сервера; // запустить клиентские операторы char [] = {'+', '-', '*', '/'}; Случайный случайный = new Random (System.currentTimeMillis ()); Новый поток (new Runnable () {@suppresswarnings ("static-access") @Override public void run () {while (true) {// случайный генерирует арифметическую экспрессию строки = случайный. Thread.currentThread (). Sleep (random.nextint (1000)); }}}}}}}}}}}}}}}}}}}}}}}} / }}}}}}}}}}}}}}}}}}}}}}}} / } } } } } } } } { Нить. currentThread (). Sleep (random.nextint (1000)); } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } { Нить. currentThread (). Sleep (random.nextint (1000)); }}}}}}}} Результаты одного из пробежек:
Сервер был запущен, Номер порта: 12345 Арифметическое выражение: 4-2 Сервер получил сообщение: 4-2 ___ Результат: 2 Арифметическое выражение: 5-10 Сервер получил сообщение: 5-10__ Результат: -5 Арифметическое выражение: 0-9 Сервер Полученный сообщение 0-9 Получил сообщение: 1/6__ Результат: 0.166666666666666666666666666666666666666666666666666666666666666666666666666666666666 66666666666666666666666666666666666666666666666666666666666666666666666666666666666666 66666666666666666666666666666666666666666666666666666666666666666666666666666666666666 66666666666666666666666666666666666666666666666666666666666666666666666666666666666666
Из приведенного выше кода легко увидеть, что основная проблема Bio заключается в том, что всякий раз, когда новый клиент запрашивает доступ, сервер должен создать новый поток для обработки этой ссылки, который не может быть применен в сценариях, где требуется высокая производительность и высокая параллель (большое количество новых потоков будет серьезно повлиять на производительность сервера и даже ударить).
1.2. Псевдо-асинхронное программирование ввода/вывода
Чтобы улучшить эту модель с одним соединением-один-нагрузкам, мы можем использовать пул потоков для управления этими потоками (для получения дополнительной информации, пожалуйста, обратитесь к статье, представленной ранее), внедрив модель для одного или нескольких потоков для обработки N клиентов (но основной слой все еще использует синхронное ввод-в/вывод), который часто называют «моделью Pseudo-Asynchronous I/O».
Псевдо-асинхронная модель ввода/вывода диаграмма:
Реализация очень проста. Нам просто нужно передать новую ветку управлению пулами потоков и просто изменить код сервера только сейчас:
пакет com.anxpp.io.calculator.bio; импортировать java.io.ioexception; импортировать java.net.serversocket; импортировать java.net.socket; импортировать java.util.concurrent.executorservice; импортировать java.util.concurrent.executors; /** * Исходный код био сервера __pseudo-asynchronous ввода/o * @author yangtao__anxpp.com * @version 1.0 */public final Class Serverbetter {// Номер порта по умолчанию private int int default_port = 12345; // Singleton Serversocket Private Static Serversocket Server; // singleton private static executorservice executorservice = experators.newfixedthreadpool (60); // Установить порт прослушивания в соответствии с входящими параметрами. Если нет параметра, вызовите следующий метод и используйте значение по умолчанию public static void start () бросает ioException {// Использование значения по умолчанию (default_port); } // Этот метод не будет доступен в большом количестве одновременно, и нет необходимости учитывать эффективность, просто синхронизируйте метод непосредственно общедоступной синхронизированной статической void start (int port), бросает ioException {if (server! = Null) return; Попробуйте {// Создать Serversocket через конструктор // Если порт является законным и простоя, сервер будет успешно прослушиваться. Server = new Serversocket (порт); System.out.println («Сервер был запущен, номер порта:« + порт); // Superce Client Connection через беспроводной цикл // Если нет клиентского доступа, оно будет заблокировано при операции принятия. while (true) {socket socket = server.accept (); // Когда есть новый клиент -доступ, будет выполнен следующий код //, затем создайте новый поток для обработки executorserservice. }} Наконец {// Некоторая необходимая очистка, if (server! = null) {System.out.println («Сервер закрыт.»); server.close (); сервер = null; }}}} Результаты тестового прогона одинаковы.
Мы знаем, что если мы используем пул потоков CachedThreadpool (без ограничения количества потоков, если не очиститься, пожалуйста, обратитесь к статье, представленной в начале статьи), фактически, в дополнение к автоматическому помогает нам управлять потоками (повторное использование), она также выглядит как клиент 1: 1: модель количества потоков. Используя FixedThreadpool, мы эффективно контролируем максимальное количество потоков, обеспечиваем управление ограниченными ресурсами системы и реализуем псевдо-асинхронную модель ввода-вывода N: M.
Однако, поскольку количество потоков ограничено, если происходит большое количество одновременных запросов, потоки, превышающие максимальное число, могут подождать только до тех пор, пока в пуле потоков не появятся бесплатные потоки, которые можно использовать повторно. Когда входной поток сокета будет считан, он будет заблокирован до тех пор, пока не произойдет:
Следовательно, при чтении данных является медленным (например, большое количество данных, медленная сетевая передача и т. Д.) И большие объемы параллелизма, другие сообщения доступа можно ждать только все время, что является самым большим недостатком.
NIO, который будет представлен позже, может решить эту проблему.
2. Программирование NIO
Новая библиотека ввода/вывода Java вводится в упаковку java.nio.* В JDK 1.4 с целью увеличения скорости. На самом деле, «старый» пакет ввода -вывода был переосмыслен с использованием NIO, и мы можем извлечь из него выгоду, даже если мы явно не используем программирование NIO. Улучшения скорости могут происходить в вводе/выводе файлов, так и в сетевом вводе/выводе, но в этой статье обсуждается только последняя.
2.1. Введение
Мы, как правило, думаем о NIO как о новом вводе/выводе (также официальном названии), потому что он является новым в старой библиотеке ввода -вывода (на самом деле он был представлен в JDK 1.4, но это существительное будет продолжать использоваться в течение долгого времени, даже если они «старые» сейчас, поэтому оно также напоминает нам о том, что мы должны тщательно рассмотреть его, когда он называет его), и вносят большие изменения. Тем не менее, многие люди/вывод не называют не блоками, то есть не блокирующим вводом-выводом, поскольку это называется, он может лучше отражать его характеристики. NIO в следующем тексте не относится ко всей новой библиотеке ввода/вывода, но не блокирует ввод -вывод.
Nio предоставляет две различные реализации канала сокета: Socketchannel и Serversocketchannel, соответствующие розетку и Serversocket в традиционной био -модели.
Оба недавно добавленных канала поддерживают блокировку и неблокирующие режимы.
Использование режима блокировки так же просто, как и традиционная поддержка, но производительность и надежность не очень хороши; Неблокирующий режим-это точно противоположное.
Для приложений с низкой частотой, низкоконкурентными приложениями, синхронное блокирование ввода-вывода может использоваться для улучшения скорости развития и лучшего обслуживания; Для приложений с высокой нагрузкой (сеть) с высокой нагрузкой (сетевой) режим неблокирующего NIO должен использоваться для разработки.
Основные знания будут представлены первыми ниже.
2.2. Буферный буфер
Буфер - это объект, который содержит некоторые данные, которые будут записаны или прочитаны.
В библиотеке NIO все данные обрабатываются в буфере. При чтении данных он читается непосредственно в буфер; При написании данных это также записывается в буфер. В любое время вы получаете доступ к данным в NIO, он работает через буфер.
Буфер на самом деле является массивом и предоставляет такую информацию, как структурированный доступ к данным и поддержание местоположений чтения и записи.
Конкретные области кэша: Bytebuffe, Charbuffer, Shortbuffer, Intbuffer, Longbuffer, Floatbuffer, Double Buffer. Они реализуют тот же интерфейс: буфер.
2.3. Канал
Наше чтение и написание данных должно передаваться через канал, который похож на водопроводную трубу, канал. Разница между каналом и потоком заключается в том, что канал является двунаправленным и может использоваться для операций чтения, записи и одновременного чтения и записи.
Каналы базовой операционной системы, как правило, полнодуплекс, поэтому полнодуплексный канал может лучше отобразить API базовой операционной системы, чем поток.
Каналы в основном разделены на две категории:
Serversocketchannel и Socketchannel, которые будут участвовать в следующем коде, являются подклассами SelectableChannel.
2.4. Мультиплексный селектор
Селектор является основой программирования Java Nio.
Селектор предоставляет возможность выбирать готовые задачи: селектор будет постоянно опрашивать канал, зарегистрированный на нем. Если на канале происходит событие чтения или записи, канал будет в состоянии готового состояния и будет опробован селектором. Затем набор готовых каналов может быть получен через SelectionKey для выполнения последующих операций ввода -вывода.
Селектор может опросить несколько каналов одновременно, поскольку JDK использует epoll () вместо традиционной реализации Select, нет ограничения на максимальную рукоятку соединения 1024/2048. Таким образом, только один поток должен отвечать за опрос селектора, и он может получить доступ к тысячам клиентов.
2.5. Nio Server
Код кажется гораздо более сложным, чем традиционное программирование сокетов.
Просто вставьте код и дайте описание кода в форме комментариев.
Исходный код сервера, созданный NIO:
пакет com.anxpp.io.calculator.nio; Общедоступный сервер класса {private static int default_port = 12345; Private Static ServerHandle ServerHandle; public static void start () {start (default_port); } public static synchronized void start (int port) {if (serverhandle! = null) serverhandle.stop (); ServerHandle = new ServerHandle (порт); Новый поток (ServerHandle, "Server"). start (); } public static void main (string [] args) {start (); }} ServerHandle:
пакет com.anxpp.io.calculator.nio; импортировать java.io.ioexception; Импорт java.net.inetsocketAddress; Импорт java.nio.bytebuffer; Импорт java.nio.channels.selectionKey; импортировать java.nio.channels.selector; импортировать java.nio.channels.serversocketchannel; импортировать java.nio.channels.socketchannel; импортировать java.util.iterator; импортировать java.util.set; импорт com.anxpp.io.utils.calculator; / ** * Nio Server * @author yangtao__anxpp.com * @version 1.0 */ public Class Serverhandle реализует runnable {private Selector Selector; Private Serversocketchannel ServerChannel; Частный летучий логический начинается; /*** Constructor* @param port Укажите номер порта, который будет прослушать*/public serverhandle (int port) {try {// create selector = selector.open (); // Откройте канал прослушивания ServerChannel = serversocketchannel.open (); // Если это правда, этот канал будет размещен в режиме блокировки; Если false, этот канал будет размещен в не блокирующем режиме ServerChannel.configureblocking (false); // Включение режима не блокировки // Бэкли с портом привязки устанавливается на 1024 ServerChannel.socket (). Bind (New InetSocketDadress (порт), 1024); // Superce Client Connection Request ServerChannel.register (Selector, SelectionKey.op_accep); // отмечать сервер включен запускается = true; System.out.println ("Сервер был запущен, номер порта:" + port); } catch (ioException e) {e.printstackTrace (); System.Exit (1); }} public void stop () {Starts = false; } @Override public void run () {// цикл через селектор в то время как (запустил) {try {// Будь то событие чтения и записи, селектор задумывается каждым селектором 1s.select (1000); // Блокировка, он будет продолжаться только тогда, когда произойдет хотя бы одно зарегистрированное событие. // selector.select (); SET <SelectionKey> keys = selector.selectedKeys (); Итератор <selectionKey> it = keys.iterator (); SelectionKey Key = null; while (it.hasnext ()) {key = it.next (); it.remove (); try {handleinput (key); } catch (Exception e) {if (key! = null) {key.cancel (); if (key.channel ()! = null) {key.channel (). close (); }}}}}}}}} catch (throwable t) {t.printstacktrace (); }} // После закрытия селектора управляемые ресурсы будут автоматически выпущены, если (selector! = Null) try {selector.close (); } catch (Exception e) {e.printstackTrace (); }} private void handleinput (key key keeke) throws ioexception {if (key.isvalid ()) {// Обработка сообщения запроса для нового доступа if (key.iscecpeable ()) {serversocketchannel ssc = (serversocketchannel) key.channel ();); // Создание экземпляра Socketchannel через Accept Serversocketchannel // Завершение этой операции означает завершение трехстороннего рукопожатия TCP, и физическая связь TCP официально установлена. Socketchannel sc = ssc.accept (); // установить на неблокирующее sc.configureblocking (false); // зарегистрироваться как чтение sc.register (selector, selectionkey.op_read); } // Читать сообщение if (key.isReadable ()) {socketchannel sc = (socketchannel) key.channel (); // Создать bytebuffer и открыть 1 -метровый буфер bytebuffer = bytebuffer.allocate (1024); // Чтение потока запроса и вернуть количество байтов, чтение int readbytes = sc.read (buffer); // Читать байты и код байтов if (readbytes> 0) {// Установить текущий предел буфера в положение = 0, для последующих операций чтения Buffer.flip (); // Создание байтового массива на основе количества чтения буфера Bytes Bytes = new Byte [buffer.reming ()]; // Скопируйте буферный массив байтов в недавно созданный массивовый буфер.get (bytes); String Express = new String (байты, "UTF-8"); System.out.println ("Сервер получил сообщение:" + выражение); // Обработка строки данных Result = NULL; try {result = calculator.cal (выражение) .toString (); } catch (Exception e) {result = "Ошибка вычисления:" + e.getMessage (); } // Отправить ответное сообщение Dowrite (sc, result); } // Нет байтов читают и игнорируют // else if (readbytes == 0); // Ссылка была закрыта, освобождая ресурс, иначе if (readbytes <0) {key.cancel (); sc.close (); }}}}} // Отправить ответное сообщение асинхронно приватное void doowrite (канал Socketchannel, String response) Throws ioException {// Кодирование сообщения в виде байтового массива [] bytes = response.getbytes (); // Создать ByteBuffer в соответствии с емкостью массива Bytebuffer writebuffer = bytebuffer.allocate (bytes.length); // Скопировать байт массив в буфер writebuffer.put (bytes); // Операция переворачивания writebuffer.flip (); // Отправить байт массив буферного канала.write (writebuffer); // ***** Код для обработки «Записать половину пакета» здесь не включен}}}Как видите, основные шаги для создания Nio Server заключаются в следующем:
Поскольку ответное сообщение отправляется, Socketchannel также является асинхронным и не блокирующим, поэтому нельзя гарантировать, что данные, которые необходимо отправить, могут быть отправлены за один раз, и в настоящее время возникнет проблема с написанием половины пакета. Нам нужно зарегистрировать операцию записи, постоянно опросить селектор, чтобы отправить невыполненные сообщения, а затем использовать метод буфера hasremain (), чтобы определить, отправляется ли сообщение.
2.6 Nio Client
Лучше просто загрузить код. Процесс не требует слишком большого объяснения, он немного похож на код сервера.
Клиент:
пакет com.anxpp.io.calculator.nio; открытый класс клиент {частная статическая строка default_host = "127.0.0.1"; private static int default_port = 12345; частное статическое клиент -ручки ClientHandle; public static void start () {start (default_host, default_port); } public static synchronized void start (String ip, int port) {if (clientHandle! = null) clientHandle.stop (); clientHandle = new ClientHandle (ip, порт); новая поток (ClientHandle, "Server"). start (); } // Отправить сообщение на сервер Public Static Boolean SendMSG (String MSG) Throws Exception {if (msg.equals ("q")) return false; clientHandle.sendmsg (MSG); вернуть истину; } public static void main (string [] args) {start (); }} ClientHandle:
пакет com.anxpp.io.calculator.nio; импортировать java.io.ioexception; Импорт java.net.inetsocketAddress; Импорт java.nio.bytebuffer; Импорт java.nio.channels.selectionKey; импортировать java.nio.channels.selector; импортировать java.nio.channels.socketchannel; импортировать java.util.iterator; импортировать java.util.set; / ** * nio client * @author yangtao__anxpp.com * @version 1.0 */ public class clienthandle реализует runnable {private String Host; частный порт int; Частный селектор; Приватный Socketchannel Socketchannel; Частный летучий логический начинается; public clientHandle (String ip, int port) {this.host = ip; this.port = порт; try {// создать селектор selector = selector.open (); // Откройте канал прослушивания socketchannel = socketchannel.open (); // Если это правда, этот канал будет размещен в режиме блокировки; Если false, этот канал будет размещен в неблокирующем режиме socketchannel.configureblocking (false); // Открыть режим не блокировки запускается = true; } catch (ioException e) {e.printstackTrace (); System.Exit (1); }} public void stop () {Starts = false; } @Override public void run () {try {doConnect (); } catch (ioException e) {e.printstackTrace (); System.Exit (1); } // цикл через селектор В то время как (запустите) {try {// Независимо от того, есть ли событие чтения и записи, селектор пробуждается каждый селектор 1S. Select (1000); // блокировка, и это будет продолжаться только тогда, когда произойдет хотя бы одно зарегистрированное событие. // selector.select (); SET <SelectionKey> keys = selector.selectedKeys (); Итератор <selectionKey> it = keys.iterator (); SelectionKey Key = null; while (it.hasnext ()) {key = it.next (); it.remove (); try {handleinput (key); } catch (Exception e) {if (key! = null) {key.cancel (); if (key.channel ()! = null) {key.channel (). close (); }}}}}}}} catch (Exception e) {e.printstackTrace (); System.Exit (1); }} // После закрытия селектора управляемые ресурсы будут автоматически выпущены, если (selector! = Null) try {selector.close (); } catch (Exception e) {e.printstackTrace (); }} private void handleinput (key selectionKey) бросает ioException {if (key.isvalid ()) {socketchannel sc = (socketchannel) key.channel (); if (key.isconnectable ()) {if (sc.finishConnect ()); else System.Exit (1); } // Читать сообщение if (key.isReadable ()) {// Создать bytebuffer и откройте 1 -метровый буфер bytebuffer = bytebuffer.allocate (1024); // Читать поток кода запроса и вернуть номер байта, чтение int readbytes = sc.read (buffer); // Читать байты и код байтов if (readbytes> 0) {// Установить текущий предел буфера в положение = 0, для последующих операций чтения Buffer.flip (); // Создание байтового массива на основе количества читаемых байтов в буферном байте [] bytes = new Byte [buffer.reming ()]; // Скопируйте буферный массив байтов в недавно созданный массивовый буфер.get (bytes); String result = new String (байты, "UTF-8"); System.out.println («Клиент получил сообщение:» + Результат); } // NO BAYTES Reade игнорируется // else if (readbytes == 0); // Ссылка была закрыта, освобождая ресурс, иначе if (readbytes <0) {key.cancel (); sc.close (); }}}}} // Отправлять сообщения асинхронно приватный void dowrite (канал Socketchannel, строка запроса) Throws ioException {// кодирование сообщения в виде байта байта [] bytes = request.getbytes (); // Создание ByteBuffer на основе массивы емкости ByteBuffer writebuffer = bytebuffer.allocate (bytes.length); // копирование байтового массива в буфер writebuffer.put (байты); // Операция переворачивания writebuffer.flip (); // Отправить байт массив канала.write (writebuffer); // ***** Код для обработки «Записать половину пакета» не включен здесь} private void Doconnect () Throws IOException {if (socketchannel.connect (new InetSocketAddress (host, port))); else socketchannel.register (selector, selectionkey.op_connect); } public void SendMSG (String MSG) Throws Exception {socketchannel.register (selector, selectionKey.op_read); Dowrite (Socketchannel, MSG); }} 2.7 Демонстрационные результаты
Сначала запустите сервер и запустите клиент, кстати:
пакет com.anxpp.io.calculator.nio; импортировать java.util.scanner; /** * Метод испытаний * @author yangtao__anxpp.com * @version 1.0 */public class test {// test основной метод @suppresswarnings ("resource") public void main (string [] args) Throws Exception {// run server.start (); // Избегайте клиента, выполняющего поток кода.sleep (100); // запустить client client.start (); while (client.sendmsg (новый сканер (system.in) .nextline ())); }} Мы также можем запустить клиент отдельно, и эффекты одинаковы.
Результаты теста:
Сервер был запущен, номер порта: 123451+2+3+4+5+6 Сервер получил сообщение: 1+2+3+4+5+6. Клиент получил сообщение: 211*2/3-4+5*6/7-8. Сервер получил сообщение: 1*2/3-4+5*6/7-8 Получил сообщение: -7.04761904761904747474747474.
Нет проблем с запуском нескольких клиентов.
3. Программирование AIO
Nio 2.0 представляет концепцию новых асинхронных каналов и предоставляет реализации асинхронных файловых каналов и асинхронных каналов сокета.
Асинхронный канал сокета-это действительно асинхронный неблокирующий ввод-вывод, соответствующий событию, управляемую событиями (AIO) в сетевом программировании Unix. Не требуется слишком много селекторов для опроса зарегистрированных каналов для достижения асинхронного чтения и записи, что упрощает модель программирования NIO.
Просто загрузите код.
3.1. Код на стороне сервера
Сервер:
пакет com.anxpp.io.calculator.aio.server; / ** * AIO Server * @author yangtao__anxpp.com * @version 1.0 */ public Class Server {private static int default_port = 12345; частный статический AsyncServerHandler ServerHandle; Общественный нестабильный статический длинный клиентский клиент = 0; public static void start () {start (default_port); } public static synchronized void start (int port) {if (serverhandle! = null) return; ServerHandle = новый AsyncServerHandler (порт); Новый поток (ServerHandle, "Server"). start (); } public static void main (string [] args) {server.start (); }} AsyncServerHandler:
пакет com.anxpp.io.calculator.aio.server; импортировать java.io.ioexception; Импорт java.net.inetsocketAddress; Импорт java.nio.channels.asyncserversocketchannel; Импорт java.util.concurrent.countdownlatch; открытый класс AsyncServerHandler реализует Runnable {public countdownlatch Latch; Public Asynceserversocketchannel Channel; public asyncserverHandler (int port) {try {// create server channel = asynchronousserversocketchannel.open (); // связывать порт канал. Bind (новый INESOCKETECTEDDRESS (PORT)); System.out.println ("Сервер был запущен, номер порта:" + port); } catch (ioException e) {e.printstackTrace (); }} @Override public void run () {// инициализация countdownlatch // его функция: позвольте текущему полю заблокировать все время, прежде чем завершить набор выполняемых операций // здесь, пусть полевой блок здесь не должен выходить из -за этой проблемы после выполнения //, в то время как (истинно)+среда поколения не нужно беспокоиться об этой проблеме, думая, что сервер не будет extow // Connection Channel.accep (это, новое согласие ()); try {latch.await (); } catch (прерванное искусство e) {e.printstacktrace (); }}} Принять
пакет com.anxpp.io.calculator.aio.server; Импорт java.nio.bytebuffer; Импорт java.nio.channels.asynchronoussocketchannel; Импорт java.nio.channels.completionhandler; // Подключиться в качестве обработчика открытого класса. System.out.println ("Количество подключенных клиентов:" + server.clientcount); serverHandler.channel.accept(serverHandler, this); //Create a new Buffer ByteBuffer buffer = ByteBuffer.allocate(1024); //Asynchronously read the third parameter for the service that receives message callbacks Handler channel.read(buffer, buffer, new ReadHandler(channel)); } @Override public void failed(Throwable exc, AsyncServerHandler serverHandler) { exc.printStackTrace(); serverHandler.latch.countDown(); }} ReadHandler:
package com.anxpp.io.calculator.aio.server; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import com.anxpp.io.utils.Calculator; public class ReadHandler implements CompletionHandler<Integer, ByteBuffer> { //Used to read semi-packet messages and send answers private AsynchronousSocketChannel channel; public ReadHandler(AsynchronousSocketChannel channel) { this.channel = channel; } // Processing after reading the message @Override public void completed(Integer result, ByteBuffer attachment) { //flip operation attachment.flip(); //According to byte[] message = new byte[attachment.remaining()]; attachment.get(message); try { String expression = new String(message, "UTF-8"); System.out.println("The server received a message: " + expression); String calrResult = null; try{ calrResult = Calculator.cal(expression).toString(); } catch(Exception e){ calrResult = "Calculator error: " + e.getMessage(); } //Send a message doWrite(calrResult); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } //Send a message private void doWrite(String result) { byte[] bytes = result.getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); writeBuffer.put(bytes); writeBuffer.flip(); //Asynchronous write data parameters are the same as the previous read channel.write(writeBuffer, writeBuffer,new CompletionHandler<Integer, ByteBuffer>() { @Override public void completed(Integer result, ByteBuffer buffer) { //If it is not sent, continue to send until it is completed (buffer.hasRemaining()) channel.write(buffer, buffer, this); else{ //Create a new Buffer ByteBuffer readBuffer = ByteBuffer.allocate(1024); //Asynchronously read the third parameter for the business that receives message callbacks. Handler channel.read(readBuffer, readBuffer, new ReadHandler(channel)); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { channel.close(); } catch (IOException e) { } } }); } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { this.channel.close(); } catch (ioException e) {e.printstackTrace (); }}} OK,这样就已经完成了,其实说起来也简单,虽然代码感觉很多,但是API比NIO的使用起来真的简单多了,主要就是监听、读、写等各种CompletionHandler。此处本应有一个WriteHandler的,确实,我们在ReadHandler中,以一个匿名内部类实现了它。
下面看客户端代码。
3.2. Client side code
Client:
package com.anxpp.io.calculator.aio.client; импортировать java.util.scanner; public class Client { private static String DEFAULT_HOST = "127.0.0.1"; private static int DEFAULT_PORT = 12345; private static AsyncClientHandler clientHandle; public static void start(){ start(DEFAULT_HOST,DEFAULT_PORT); } public static synchronized void start(String ip,int port){ if(clientHandle!=null) return; clientHandle = new AsyncClientHandler(ip,port); new Thread(clientHandle,"Client").start(); } //Send a message to the server public static boolean sendMsg(String msg) throws Exception{ if(msg.equals("q")) return false; clientHandle.sendMsg(msg); вернуть истину; } @SuppressWarnings("resource") public static void main(String[] args) throws Exception{ Client.start(); System.out.println("Please enter the request message:"); Scanner scanner = new Scanner(System.in); while(Client.sendMsg(scanner.nextLine())); }} AsyncClientHandler:
package com.anxpp.io.calculator.aio.client; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.AsyncronousSocketChannel; import java.nio.channels.CompletionHandler; import java.util.concurrent.CountDownLatch; public class AsyncClientHandler implements CompletionHandler<Void, AsyncClientHandler>, Runnable { private AsynchronousSocketChannel clientChannel; private String host; частный порт int; private CountDownLatch latch; public AsyncClientHandler(String host, int port) { this.host = host; this.port = port; try { //Create an asynchronous client channel clientChannel = AsynchronousSocketChannel.open(); } catch (ioException e) {e.printstackTrace (); } } @Override public void run() { //Create CountDownLatch and wait latch = new CountDownLatch(1); //Initiate an asynchronous connection operation, the callback parameter is this class itself. If the connection is successful, the completed method clientChannel.connect(new InetSocketAddress(host, port), this, this); try { latch.await(); } catch (InterruptedException e1) { e1.printStackTrace(); } try { clientChannel.close(); } catch (ioException e) {e.printstackTrace (); } } //Connect the server successfully// means TCP completes three handshakes @Override public void completed(Void result, AsyncClientHandler attachment) { System.out.println("Client connection to the server..."); } //Connecting to the server failed @Override public void failed(Throwable exc, AsyncClientHandler attachment) { System.err.println("Connecting to the server failed..."); exc.printStackTrace(); try { clientChannel.close(); latch.countDown(); } catch (ioException e) {e.printstackTrace (); } } //Send a message to the server public void sendMsg(String msg){ byte[] req = msg.getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(req.length); writeBuffer.put(req); writeBuffer.flip(); //Asynchronously write clientChannel.write(writeBuffer, writeBuffer,new WriteHandler(clientChannel, latch)); }} WriteHandler:
package com.anxpp.io.calculator.aio.client; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.util.concurrent.CountDownLatch; public class WriteHandler implements CompletionHandler<Integer, ByteBuffer> { private AsynchronousSocketChannel clientChannel; private CountDownLatch latch; public WriteHandler(AsynchronousSocketChannel clientChannel,CountDownLatch latch) { this.clientChannel = clientChannel; this.latch = latch; } @Override public void completed(Integer result, ByteBuffer buffer) { //完成全部数据的写入if (buffer.hasRemaining()) { clientChannel.write(buffer, buffer, this); } else { //读取数据ByteBuffer readBuffer = ByteBuffer.allocate(1024); clientChannel.read(readBuffer,readBuffer,new ReadHandler(clientChannel, latch)); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { System.err.println("数据发送失败..."); try { clientChannel.close(); latch.countDown(); } catch (IOException e) { } } } ReadHandler:
package com.anxpp.io.calculator.aio.client; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.util.concurrent.CountDownLatch; public class ReadHandler implements CompletionHandler<Integer, ByteBuffer> { private AsynchronousSocketChannel clientChannel; private CountDownLatch latch; public ReadHandler(AsynchronousSocketChannel clientChannel,CountDownLatch latch) { this.clientChannel = clientChannel; this.latch = latch; } @Override public void completed(Integer result,ByteBuffer buffer) { buffer.flip(); byte[] bytes = new byte[buffer.remaining()]; buffer.get(bytes); String body; try { body = new String(bytes,"UTF-8"); System.out.println("客户端收到结果:"+ body); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } @Override public void failed(Throwable exc,ByteBuffer attachment) { System.err.println("数据读取失败..."); try { clientChannel.close(); latch.countDown(); } catch (IOException e) { } } }这个API使用起来真的是很顺手。
3.3. Тест
Тест:
package com.anxpp.io.calculator.aio; импортировать java.util.scanner; import com.anxpp.io.calculator.aio.client.Client; import com.anxpp.io.calculator.aio.server.Server; /** * Test method* @author yangtao__anxpp.com * @version 1.0 */ public class Test { //Test main method @SuppressWarnings("resource") public static void main(String[] args) throws Exception{ //Run server Server.start(); //Avoid the client executing the code Thread.sleep(100); //Run client Client.start(); System.out.println("Please enter the request message:"); Scanner scanner = new Scanner(System.in); while(Client.sendMsg(scanner.nextLine())); }}我们可以在控制台输入我们需要计算的算数字符串,服务器就会返回结果,当然,我们也可以运行大量的客户端,都是没有问题的,以为此处设计为单例客户端,所以也就没有演示大量客户端并发。
读者可以自己修改Client类,然后开辟大量线程,并使用构造方法创建很多的客户端测试。
下面是其中一次参数的输出:
服务器已启动,端口号:12345请输入请求消息:客户端成功连接到服务器...连接的客户端数:1123456+789+456服务器收到消息: 123456+789+456客户端收到结果:1247019526*56服务器收到消息: 9526*56客户端收到结果:533456...
AIO是真正的异步非阻塞的,所以,在面对超级大量的客户端,更能得心应手。
下面就比较一下,几种I/O编程的优缺点。
4、各种I/O的对比
先以一张表来直观的对比一下:
具体选择什么样的模型或者NIO框架,完全基于业务的实际应用场景和性能需求,如果客户端很少,服务器负荷不重,就没有必要选择开发起来相对不那么简单的NIO做服务端;相反,就应考虑使用NIO或者相关的框架了。
5、附录
上文中服务端使用到的用于计算的工具类:
package com.anxpp.utils;import javax.script.ScriptEngine;import javax.script.ScriptEngineManager;import javax.script.ScriptException;public final class Calculator { private final static ScriptEngine jse = new ScriptEngineManager().getEngineByName("JavaScript"); public static Object cal(String expression) throws ScriptException{ return jse.eval(expression); }}Выше всего содержание этой статьи. Я надеюсь, что это будет полезно для каждого обучения, и я надеюсь, что все будут поддерживать Wulin.com больше.