Artikel ini akan memperkenalkan dari dangkal ke jauh dari bio tradisional ke nio ke aio, dan akan disertai dengan penjelasan kode lengkap.
Contoh akan digunakan dalam kode berikut: Klien mengirim string persamaan ke server, dan server mengembalikan hasilnya ke klien setelah perhitungan.
Semua instruksi untuk kode secara langsung digunakan sebagai komentar dan tertanam dalam kode, yang bisa lebih mudah dipahami saat membaca kode. Kelas alat untuk menghitung hasilnya akan digunakan dalam kode, lihat bagian kode artikel.
Artikel yang Disarankan untuk Pengetahuan Dasar Terkait:
Pengantar model I/O Network Linux (gambar dan teks)
Java Concurrency (multi-threading)
1. Pemrograman Bio
1.1. Pemrograman bio tradisional
Model dasar pemrograman jaringan adalah model C/S, yaitu komunikasi antara dua proses.
Server menyediakan IP dan port mendengarkan. Klien memulai permintaan koneksi melalui alamat operasi koneksi yang ingin didengarkan server. Melalui tiga jabat tangan, jika koneksi berhasil dibuat, kedua belah pihak dapat berkomunikasi melalui soket.
Dalam pengembangan model pemblokiran sinkronisasi tradisional, ServerCocket bertanggung jawab untuk mengikat alamat IP dan memulai port mendengarkan; Socket bertanggung jawab untuk memulai operasi koneksi. Setelah koneksi berhasil, kedua belah pihak melakukan komunikasi pemblokiran sinkron melalui aliran input dan output.
Deskripsi singkat dari model komunikasi bio server: server menggunakan model komunikasi bio biasanya merupakan utas akseptor independen yang bertanggung jawab untuk mendengarkan koneksi klien. Setelah menerima permintaan koneksi klien, itu membuat utas baru untuk setiap klien untuk pemrosesan tautan dan gagal memprosesnya, dan kemudian mengembalikan balasan ke klien melalui aliran output, dan utasnya dihancurkan. Artinya, model sepanjang malam yang khas.
Diagram Model Komunikasi Bio Tradisional:
Masalah terbesar dengan model ini adalah bahwa ia tidak memiliki kemampuan penskalaan elastis. Ketika jumlah akses bersamaan pada klien meningkat, jumlah utas di server sebanding dengan jumlah akses bersamaan pada klien. Utas di Java juga merupakan sumber daya sistem yang relatif berharga. Setelah jumlah utas meluas dengan cepat, kinerja sistem akan turun tajam. Ketika jumlah akses terus meningkat, sistem pada akhirnya akan mati.
Kode Sumber Server Dibuat oleh Blocking Sinkron I/O:
paket com.anxpp.io.calculator.bio; impor java.io.ioException; impor java.net.serversocket; impor java.net.socket; /** * Kode Sumber Bio Server * @Author Yangtao__anxpp.com * @Version 1.0 */Public Final Class Servernormal {// Nomor Port Default Private Static Int Default_port = 12345; // server singleton server statis private server; // Atur port mendengarkan sesuai dengan parameter yang masuk. Jika tidak ada parameter, hubungi metode berikut dan gunakan nilai default statis statis statis start () melempar ioException {// Gunakan nilai start default (default_port); } // Metode ini tidak akan diakses dengan sejumlah besar cara bersamaan, dan tidak perlu mempertimbangkan efisiensi, cukup menyinkronkan metode yang langsung disinkronkan statis statis statis (int port) melempar ioException {if (server! = Null) kembali; Coba {// Buat serversocket melalui konstruktor // Jika port legal dan idle, server akan berhasil mendengarkan. Server = server new (port); System.out.println ("Server telah dimulai, nomor port:" + port); // Dengarkan koneksi klien melalui nirkabel loop // Jika tidak ada akses klien, itu akan diblokir pada operasi penerimaan. while (true) {socket socket = server.accept (); // Ketika ada akses klien baru, kode berikut akan dieksekusi // Kemudian buat utas baru untuk menangani soket ini tautan utas baru (serverhandler baru (soket)). Start (); }} akhirnya {// Beberapa pekerjaan pembersihan yang diperlukan jika (server! = null) {System.out.println ("Server ditutup."); server.close (); server = null; }}}} Kode Sumber Sumber Pesan Pesan Klien:
paket com.anxpp.io.calculator.bio; impor java.io.bufferedReader; impor java.io.ioException; impor java.io.inputStreamReader; impor java.io.printwriter; impor java.net.socket; impor com.anxpp.io.utils.calculator; / *** utas klien* @author Yangtao__anxpp.com* tautan soket untuk klien*/ kelas publik ServerHandler mengimplementasikan runnable {soket soket pribadi; Public ServerHandler (soket soket) {this.socket = socket; } @Override public void run () {bufferedReader di = null; Printwriter out = null; coba {in = new bufferedReader (inputStreamReader baru (socket.getInputStream ())); out = printwriter baru (socket.getoutputStream (), true); Ekspresi string; Hasil string; while (true) {// Baca baris melalui bufferedReader // Jika Anda telah membaca ekor aliran input, kembalikan nol dan keluar dari loop // jika Anda mendapatkan nilai non-nol, cobalah untuk menghitung hasilnya dan mengembalikan if ((ekspresi = in.readline ()) == null) break; System.out.println ("Server menerima pesan:" + ekspresi); coba {result = calculator.cal (ekspresi) .toString (); } catch (Exception e) {result = "calculator.cal (ekspresi) .toString ();} catch (exception e) {e.printstacktrace ();} akhirnya {// beberapa pekerjaan pembersihan yang diperlukan jika (in! = null) {coba {in.close ();} catch (ioException e) {null (in); null) {out.close (); Kode Sumber Klien Dibuat oleh Blocking Sinkron I/O:
paket com.anxpp.io.calculator.bio; impor java.io.bufferedReader; impor java.io.ioException; impor java.io.inputStreamReader; impor java.io.printwriter; impor java.net.socket; /** * Klien dibuat dengan memblokir I/O * @Author Yangtao__anxpp.com * @Version 1.0 */Klien Kelas Publik {// Nomor Port Default Private Static Int Default_Server_port = 12345; string statis private default_server_ip = "127.0.0.1"; public static void kirim (ekspresi string) {send (default_server_port, ekspresi); } public static void Send (int port, string ekspresi) {System.out.println ("Ekspresi aritmatika adalah:" + ekspresi); Soket soket = null; BufferedReader di = null; Printwriter out = null; coba {socket = soket baru (default_server_ip, port); di = BufferedReader baru (inputStreamReader baru (socket.getInputStream ())); out = printwriter baru (socket.getoutputStream (), true); out.println (ekspresi); System.out.println ("___ Hasilnya adalah:" + in.readline ()); } catch (Exception e) {E.PrintStackTrace (); } akhirnya {// adalah pekerjaan pembersihan yang diperlukan jika (di! = null) {coba {in.close (); } catch (ioException e) {e.printstacktrace (); } di = null; } if (out! = null) {out.close (); keluar = null; } if (socket! = null) {coba {socket.close (); } catch (ioException e) {e.printstacktrace (); } socket = null; }}}} Uji kode, untuk memfasilitasi melihat hasil output di konsol, letakkan di program yang sama (JVM) untuk menjalankan:
paket com.anxpp.io.calculator.bio; impor java.io.ioException; impor java.util.random; /** * Metode Tes * @Author Yangtao__Anxpp.com * @Version 1.0 */tes kelas publik {// uji metode utama public static void main (string [] args) melempar interruptedException {// run server {i iRo) { @@slide public void run () {try {store. E.PrintStackTrace ();}}}). Start (); // hindari klien yang menjalankan kode sebelum server dimulai; // Jalankan operator char klien [] = {'+', '-', '*', '/'}; Acak acak = acak baru (System.currentTimemillis ()); utas baru (runnable baru () {@suppressWarnings ("static-access") @Override public void run () {while (true) {// Random menghasilkan ekspresi ekspresi aritmatika = random.nextInt (10)+"+Operator [random.nextInt (4)]+(random.nextInt (10)+"+Operator [random.nextInt (4)]+(random.nextInt (10) +1); Thread.currentThread (). Sleep (random.nextint (1000)); }}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}} }}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}} } } } } } } } } { Benang. currentThread (). sleep (acak.nextint (1000)); } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } { Benang. currentThread (). sleep (acak.nextint (1000)); }}}}}}}} Hasil dari salah satu run:
Server telah dimulai, Nomor Port: 12345 Ekspresi aritmatika adalah: 4-2 Server Menerima Pesan: 4-2 ___ Hasilnya adalah: 2 Ekspresi aritmatika adalah: 5-10 Server menerima pesan: 5-10__ Hasilnya adalah: -5 Ekspresi Aritmatika IS: 0-9 Server yang diterima Pesan: 0-9 Hasil Hasil adalah: -9 Ekspresi Arithmetic adalah: 0+6 Server Pesan 0-9 Menerima pesan: 1/6__ Hasilnya adalah: 0.1666666666666666666666666666666666666666666666666666666666666666666666666666666666666 666666666666666666666666666666666666666666666666666666666666666 666666666666666666666666666666666666666666666666666666666666666 666666666666666666666666666666666666666666666666666666666666666
Dari kode di atas, mudah untuk melihat bahwa masalah utama BIO adalah bahwa setiap kali klien baru meminta akses, server harus membuat utas baru untuk menangani tautan ini, yang tidak dapat diterapkan dalam skenario di mana kinerja tinggi dan konkurensi tinggi diperlukan (sejumlah besar utas baru akan secara serius mempengaruhi kinerja server dan bahkan mogok).
1.2. Pemrograman I/O Pseudo-Asynchronous
Untuk meningkatkan model satu-koneksi-satu-utara ini, kami dapat menggunakan kumpulan utas untuk mengelola utas ini (untuk informasi lebih lanjut, silakan merujuk ke artikel yang disediakan sebelumnya), menerapkan model untuk satu atau lebih utas untuk memproses N klien (tetapi lapisan yang mendasarinya masih menggunakan Model I/O Sinkron.
Diagram model I/O pseudo-asinkron:
Implementasinya sangat sederhana. Kami hanya perlu menyerahkan utas baru ke manajemen kumpulan utas, dan hanya mengubah kode server sekarang:
paket com.anxpp.io.calculator.bio; impor java.io.ioException; impor java.net.serversocket; impor java.net.socket; impor java.util.concurrent.executorservice; impor java.util.concurrent.Executors; /** * Kode Sumber Server Bio__pseudo-Asynchronous I/O * @Author Yangtao__Anxpp.com * @Version 1.0 */Public Final Class ServerBetter {// Nomor port default Private static int default_port = 12345; // server singleton server statis private server; // Singleton Private Static ExecutorService ExecutorService = Executors.newfixedThreadPool (60); // Atur port mendengarkan sesuai dengan parameter yang masuk. Jika tidak ada parameter, hubungi metode berikut dan gunakan nilai default statis statis statis start () melempar ioException {// gunakan nilai start default (default_port); } // Metode ini tidak akan diakses dalam sejumlah besar secara bersamaan, dan tidak perlu mempertimbangkan efisiensi, cukup menyinkronkan metode yang langsung disinkronkan statis statis statis (port int) melempar ioException {if (server! = Null) pengembalian; Coba {// Buat serversocket melalui konstruktor // Jika port legal dan idle, server akan berhasil mendengarkan. Server = server new (port); System.out.println ("Server telah dimulai, nomor port:" + port); // Superce Koneksi klien melalui nirkabel loop // Jika tidak ada akses klien, itu akan diblokir pada operasi penerimaan. while (true) {socket socket = server.accept (); // Ketika ada akses klien baru, kode berikut akan dieksekusi // Kemudian buat utas baru untuk memproses socket link executorservice.execute (serverhandler baru (soket)); }} akhirnya {// Beberapa pekerjaan pembersihan yang diperlukan jika (server! = null) {System.out.println ("Server ditutup."); server.close (); server = null; }}}} Hasil uji coba adalah sama.
Kita tahu bahwa jika kita menggunakan kumpulan utas CachedThreadPool (tidak membatasi jumlah utas, jika tidak jelas, silakan merujuk ke artikel yang disediakan di awal artikel), pada kenyataannya, selain secara otomatis membantu kami mengelola utas (penggunaan kembali), itu juga terlihat seperti model hitungan utas 1: 1. Menggunakan FixedThreadPool, kami secara efektif mengontrol jumlah maksimum utas, memastikan kontrol sumber daya sistem yang terbatas, dan mengimplementasikan model I/O pseudo-asinkron.
Namun, karena jumlah utas terbatas, jika sejumlah besar permintaan bersamaan terjadi, utas melebihi jumlah maksimum hanya dapat menunggu sampai ada utas gratis di kumpulan utas yang dapat digunakan kembali. Ketika aliran input soket dibaca, itu akan diblokir sampai terjadi:
Oleh karena itu, ketika data membaca lambat (seperti sejumlah besar data, transmisi jaringan lambat, dll.) Dan sejumlah besar konkurensi, pesan akses lainnya hanya dapat ditunggu sepanjang waktu, yang merupakan kerugian terbesar.
NIO yang akan diperkenalkan nanti dapat menyelesaikan masalah ini.
2. Pemrograman NIO
Perpustakaan I/O Java baru diperkenalkan ke dalam Java.nio.* Paket di JDK 1.4, dengan tujuan meningkatkan kecepatan. Bahkan, paket I/O "lama" telah diimplementasikan menggunakan NIO, dan kita dapat memperoleh manfaat darinya bahkan jika kita tidak secara eksplisit menggunakan pemrograman NIO. Peningkatan kecepatan dapat terjadi di kedua file I/O dan jaringan I/O, tetapi artikel ini hanya membahas yang terakhir.
2.1. Perkenalan
Kami umumnya menganggap NIO sebagai I/O baru (juga nama resmi), karena itu baru di perpustakaan I/O lama (sebenarnya telah diperkenalkan dalam JDK 1.4, tetapi kata benda ini akan terus digunakan untuk waktu yang lama, bahkan jika mereka sudah "lama", jadi juga mengingatkan kita bahwa kita perlu mempertimbangkannya dengan cermat ketika menyebutkannya), dan telah membuat perubahan besar. Namun, itu disebut non-blok I/O oleh banyak orang, yaitu, non-blocking I/O, karena ini disebut, ia dapat lebih mencerminkan karakteristiknya. NIO dalam teks berikut tidak merujuk ke seluruh perpustakaan I/O yang baru, tetapi tidak memblokir I/O.
NIO menyediakan dua implementasi saluran soket yang berbeda: Socketchannel dan ServerSocketchannel yang sesuai dengan soket dan server dalam model bio tradisional.
Baik saluran penyangga saluran yang baru ditambahkan dan mode non-blocking.
Penggunaan mode pemblokiran sesederhana dukungan tradisional, tetapi kinerja dan keandalannya tidak baik; Mode non-blocking justru sebaliknya.
Untuk aplikasi rendah, konkurensi rendah, I/O blokir sinkron dapat digunakan untuk meningkatkan laju pengembangan dan pemeliharaan yang lebih baik; Untuk aplikasi tinggi, konkurensi tinggi (jaringan), mode Non-blocking NIO harus digunakan untuk dikembangkan.
Pengetahuan dasar akan diperkenalkan pertama di bawah ini.
2.2. Buffer buffer
Buffer adalah objek yang berisi beberapa data yang akan ditulis atau dibacakan.
Di pustaka NIO, semua data diproses dalam buffer. Saat membaca data, itu dibaca langsung ke buffer; Saat menulis data, itu juga ditulis ke dalam buffer. Kapan saja Anda mengakses data di NIO, dioperasikan melalui buffer.
Buffer sebenarnya adalah array dan memberikan informasi seperti akses terstruktur ke data dan memelihara lokasi baca dan tulis.
Area cache spesifik adalah: bytebuffe, charbuffer, shortbuffer, intbuffer, longbuffer, floatbuffer, doublebuffer. Mereka menerapkan antarmuka yang sama: buffer.
2.3. Saluran
Bacaan dan penulisan data kami harus dilewatkan melalui saluran, yang seperti pipa air, saluran. Perbedaan antara saluran dan aliran adalah bahwa saluran tersebut adalah dua arah dan dapat digunakan untuk operasi baca, tulis dan baca dan tulis secara simultan.
Saluran sistem operasi yang mendasarinya umumnya dupleks penuh, sehingga saluran dupleks penuh dapat memetakan API dari sistem operasi yang mendasarinya daripada aliran.
Saluran terutama dibagi menjadi dua kategori:
ServerSocketchannel dan Socketchannel yang akan terlibat dalam kode berikut adalah keduanya subkelas selectableChannel.
2.4. Pemilih multiplexer
Pemilih adalah dasar pemrograman Java Nio.
Selector menyediakan kemampuan untuk memilih tugas siap: Pemilih akan terus -menerus polling saluran yang terdaftar di atasnya. Jika acara baca atau tulis terjadi pada saluran, saluran akan berada dalam keadaan siap dan akan disurvei oleh pemilih. Kemudian, himpunan saluran siap dapat diperoleh melalui seleksi untuk melakukan operasi I/O berikutnya.
Seorang pemilih dapat melakukan polling beberapa saluran secara bersamaan, karena JDK menggunakan Epoll () alih -alih implementasi pilih tradisional, tidak ada batasan pada pegangan koneksi maksimum 1024/2048. Jadi, hanya satu utas yang harus bertanggung jawab atas pemungutan suara pemilih, dan dapat mengakses ribuan klien.
2.5. Server nio
Kode ini tampaknya jauh lebih rumit daripada pemrograman soket tradisional.
Cukup tempel kode dan berikan deskripsi kode dalam bentuk komentar.
Kode Sumber Server Dibuat oleh NIO:
paket com.anxpp.io.calculator.nio; server kelas publik {private static int default_port = 12345; ServerHandle sverhandle private static; public static void start () {start (default_port); } public static static void start (int port) {if (serverHandle! = null) serverHandle.stop (); serverHandle = serverHandle baru (port); utas baru (serverHandle, "server"). start (); } public static void main (string [] args) {start (); }} ServerHandle:
paket com.anxpp.io.calculator.nio; impor java.io.ioException; impor java.net.inetsocketaddress; impor java.nio.bytebuffer; impor java.nio.channels.selectionKey; impor java.nio.channels.elector; impor java.nio.channels.serversocketchannel; impor java.nio.channels.socketchannel; impor java.util.iterator; impor java.util.set; impor com.anxpp.io.utils.calculator; / ** * NIO Server * @Author Yangtao__Anxpp.com * @Version 1.0 */ kelas publik ServerHandle mengimplementasikan runnable {pemilih pemilih swasta; Serverchetchannel Private ServerChannel; Boolean yang mudah menguap pribadi dimulai; /*** port konstruktor* @param Tentukan nomor port yang akan didengarkan*/serverHandle publik (int port) {coba {// buat selector = selector.open (); // buka saluran mendengarkan serverchannel = serversocketchannel.open (); // Jika benar, saluran ini akan ditempatkan dalam mode pemblokiran; Jika false, saluran ini akan ditempatkan dalam mode non-blocking serverchannel.configureblocking (false); // Aktifkan mode non-blocking // Bind port Backlog diatur ke 1024 serverchannel.socket (). Bind (inetsocketAddress (port) baru, 1024); // permintaan koneksi klien superce serverchannel.register (selector, selectionkey.op_accept); // Tandai server diaktifkan dimulai = true; System.out.println ("Server telah dimulai, nomor port:" + port); } catch (ioException e) {e.printstacktrace (); System.exit (1); }} public void stop () {start = false; } @Override public void run () {// loop melalui pemilih sementara (mulai) {coba {// apakah ada acara baca dan tulis, pemilih sedang membangunkan setiap 1s selector.select (1000); // Pemblokiran, itu akan berlanjut hanya ketika setidaknya satu peristiwa terdaftar terjadi. // selector.select (); Set <dectionKey> keys = selector.selectedKeys (); Iterator <dectionKey> it = keys.iterator (); Kunci seleksi = null; while (it.hasnext ()) {key = it.next (); it.remove (); coba {handleInput (key); } catch (exception e) {if (key! = null) {key.cancel (); if (key.channel ()! = null) {key.channel (). close (); }}}}}}}}} catch (Throwable t) {t.printStackTrace (); }} // Setelah pemilih ditutup, sumber daya yang dikelola akan secara otomatis dirilis if (selector! = Null) coba {selector.close (); } catch (Exception e) {E.PrintStackTrace (); }} private void handleInput (kunci seleksi) melempar ioException {if (key.isvalid ()) {// Memproses pesan permintaan untuk akses baru jika (key.isacceptable ()) {serversocketchannel ssc = (serversocketchannel) key.channel (); // Membuat instance Socketchannel melalui Serversocketchannel's Accept // Lengkapi operasi ini berarti menyelesaikan jabat tangan tiga arah TCP, dan tautan fisik TCP secara resmi ditetapkan. Socketchannel sc = ssc.accept (); // diatur ke sc.configureblocking non-blocking (false); // Daftarkan sebagai baca sc.register (selector, selectionkey.op_read); } // Baca pesan if (key.isreadable ()) {socketchannel sc = (socketchannel) key.channel (); // Buat bytebuffer dan buka buffer bytebuffer 1m buffer = byteBuffer.allocate (1024); // Baca aliran permintaan dan kembalikan jumlah byte baca int readbytes = sc.read (buffer); // Baca byte dan kode byte if (readbytes> 0) {// atur batas saat ini dari buffer ke position = 0, untuk operasi baca berikutnya dari buffer.flip (); // Buat array byte berdasarkan jumlah buffer yang dapat dibaca byte bytes = byte baru [buffer.remaining ()]; // Salin array byte buffer yang dapat dibaca ke dalam array buffer.get (bytes) yang baru dibuat; String ekspresi = string baru (bytes, "UTF-8"); System.out.println ("Server menerima pesan:" + ekspresi); // Memproses Hasil String Data = NULL; coba {result = calculator.cal (ekspresi) .toString (); } catch (Exception e) {result = "Kesalahan komputasi:" + e.getMessage (); } // Kirim pesan balasan dowrite (sc, hasil); } // Tidak ada byte membaca dan mengabaikan // else if (readbytes == 0); // tautan telah ditutup, membebaskan sumber daya lain jika (readbytes <0) {key.cancel (); sc.close (); }}}}} // Kirim pesan balasan secara tidak sinkron void dowrite (saluran socketchannel, respons string) melempar ioException {// mengkode pesan sebagai byte array byte [] bytes = response.getbytes (); // buat bytebuffer sesuai dengan kapasitas array bytebuffer writeBuffer = byteBuffer.allocate (bytes.length); // Salin array byte ke buffer writeBuffer.put (bytes); // Operasi flip writeBuffer.flip (); // Kirim array byte dari buffer channel.write (writeBuffer); // ***** Kode untuk memproses "Tulis Half-Packet" tidak termasuk di sini}}Seperti yang Anda lihat, langkah -langkah utama untuk membuat server NIO adalah sebagai berikut:
Karena pesan respons dikirim, Socketchannel juga tidak sinkron dan tidak blokir, sehingga tidak dapat dijamin bahwa data yang perlu dikirim dapat dikirim pada satu waktu, dan akan ada masalah penulisan setengah paket saat ini. Kita perlu mendaftarkan operasi penulisan, terus -menerus polling pemilih untuk mengirim pesan yang tidak ada, dan kemudian menggunakan metode HasRemain () dari buffer untuk menentukan apakah pesan dikirim.
2.6. Klien nio
Lebih baik hanya mengunggah kode. Prosesnya tidak memerlukan terlalu banyak penjelasan, ini sedikit mirip dengan kode server.
Klien:
paket com.anxpp.io.calculator.nio; klien kelas publik {private static string default_host = "127.0.0.1"; private static int default_port = 12345; client clienthandle statis pribadi; public static void start () {start (default_host, default_port); } public static static void start (string ip, int port) {if (clientHandle! = null) clientHandle.stop (); clientHandle = clientHandle baru (ip, port); utas baru (clientHandle, "server"). start (); } // Kirim pesan ke server public static boolean sendmsg (string msg) melempar pengecualian {if (msg.equals ("q")) return false; clientHandle.sendmsg (msg); Kembali Benar; } public static void main (string [] args) {start (); }} ClientHandle:
paket com.anxpp.io.calculator.nio; impor java.io.ioException; impor java.net.inetsocketaddress; impor java.nio.bytebuffer; impor java.nio.channels.selectionKey; impor java.nio.channels.elector; impor java.nio.channels.socketchannel; impor java.util.iterator; impor java.util.set; / ** * klien nio * @author Yangtao__anxpp.com * @Version 1.0 */ kelas publik ClientHandle mengimplementasikan runnable {private string host; port int pribadi; pemilih pemilih swasta; socketchannel socketchannel pribadi; Boolean yang mudah menguap pribadi dimulai; klien publik (string ip, port int) {this.host = ip; this.port = port; coba {// buat pemilih selector = selector.open (); // buka saluran mendengarkan socketchannel = socketchannel.open (); // Jika benar, saluran ini akan ditempatkan dalam mode pemblokiran; Jika false, saluran ini akan ditempatkan dalam mode non-blocking socketchannel.configureblocking (false); // buka mode non-blocking dimulai = true; } catch (ioException e) {e.printstacktrace (); System.exit (1); }} public void stop () {start = false; } @Override public void run () {coba {doconnect (); } catch (ioException e) {e.printstacktrace (); System.exit (1); } // Loop melalui pemilih sementara (dimulai) {coba {// terlepas dari apakah ada acara baca dan tulis, pemilih dibangunkan setiap selektor 1s.select (1000); // memblokir, dan itu akan berlanjut hanya ketika setidaknya satu peristiwa terdaftar terjadi. // selector.select (); Set <dectionKey> keys = selector.selectedKeys (); Iterator <dectionKey> it = keys.iterator (); Kunci seleksi = null; while (it.hasnext ()) {key = it.next (); it.remove (); coba {handleInput (key); } catch (exception e) {if (key! = null) {key.cancel (); if (key.channel ()! = null) {key.channel (). close (); }}}}}}}} catch (Exception e) {e.printstacktrace (); System.exit (1); }} // Setelah pemilih ditutup, sumber daya yang dikelola akan secara otomatis dirilis if (selector! = Null) coba {selector.close (); } catch (Exception e) {E.PrintStackTrace (); }} private void handleInput (kunci selection) melempar ioException {if (key.isvalid ()) {socketchannel sc = (socketchannel) key.channel (); if (key.isconnectable ()) {if (sc.finishConnect ()); lain System.exit (1); } // Baca pesan if (key.isReadable ()) {// Buat BytEbuffer dan buka buffer buffer 1m buffer Buffer = byteBuffer.allocate (1024); // Baca aliran kode permintaan dan kembalikan jumlah byte baca int readbytes = sc.read (buffer); // Baca byte dan kode byte if (readbytes> 0) {// atur batas saat ini dari buffer ke position = 0, untuk operasi baca berikutnya dari buffer.flip (); // Buat array byte berdasarkan jumlah byte yang dapat dibaca dalam buffer byte [] bytes = byte baru [buffer.remaining ()]; // Salin array byte buffer yang dapat dibaca ke dalam array buffer.get (bytes) yang baru dibuat; String result = string baru (bytes, "UTF-8"); System.out.println ("Klien menerima pesan:" + hasil); } // Tidak ada byte yang dibaca diabaikan // else if (readbytes == 0); // tautan telah ditutup, membebaskan sumber daya lain jika (readbytes <0) {key.cancel (); sc.close (); }}}}} // Kirim pesan secara pribadi void dowrite (saluran socketchannel, permintaan string) melempar ioException {// mengkode pesan sebagai byte array byte [] bytes = request.getbytes (); // membuat bytebuffer berdasarkan array kapasitas bytebuffer writeBuffer = byteBuffer.allocate (bytes.length); // Menyalin array byte ke buffer writeBuffer.put (bytes); // Operasi flip writeBuffer.flip (); // Kirim saluran array byte.write (writeBuffer); // ***** Kode untuk memproses "Tulis Half-Packet" tidak termasuk di sini} private void doconnect () melempar ioException {if (socketchannel.connect (inetsocketAddress baru (host, port))); lain socketchannel.register (selector, selectionkey.op_connect); } public void sendMSG (string msg) melempar Exception {socketchannel.register (selector, selectionkey.op_read); Dowrite (Socketchannel, MSG); }} 2.7. Hasil demonstrasi
Pertama jalankan server dan jalankan klien:
paket com.anxpp.io.calculator.nio; impor java.util.scanner; /** * Metode tes * @Author Yangtao__anxpp.com * @Version 1.0 */tes kelas publik {// uji metode utama @suppresswarnings ("Resource") public static void main (string [] args) melempar pengecualian {// run server server.start (); // hindari klien yang menjalankan code thread.sleep (100); // Jalankan klien klien.start (); while (client.sendmsg (pemindai baru (System.in) .nextline ())); }} Kami juga dapat menjalankan klien secara terpisah, dan efeknya sama.
Hasil tes:
Server telah dimulai, nomor port: 123451+2+3+4+5+6 Server menerima pesan: 1+2+3+4+5+6 Klien menerima pesan: 211*2/3-4+5- 7- 7-7 Klien menerima pesan: 1*2/3-4+5*6/78 Klien menerima pesan: -7.04
Tidak ada masalah menjalankan banyak klien.
3. Pemrograman AIO
NIO 2.0 memperkenalkan konsep saluran asinkron baru dan menyediakan implementasi saluran file asinkron dan saluran soket asinkron.
Saluran soket asinkron benar-benar I/O non-blocking asinkron, sesuai dengan I/O (AIO) yang digerakkan oleh acara dalam pemrograman jaringan UNIX. Tidak memerlukan terlalu banyak pemilih untuk polling saluran terdaftar untuk mencapai baca dan menulis asinkron, sehingga menyederhanakan model pemrograman NIO.
Cukup unggah kode.
3.1. Kode sisi server
Server:
paket com.anxpp.io.calculator.aio.server; / ** * aio server * @author Yangtao__anxpp.com * @Version 1.0 */ server kelas publik {private static int default_port = 12345; Private Static AsyncServerHandler ServerHandle; Publik statis long clientcount = 0; public static void start () {start (default_port); } public static static void start (int port) {if (serverHandle! = null) return; serverHandle = asyncserverhandler baru (port); utas baru (serverHandle, "server"). start (); } public static void main (string [] args) {server.start (); }} AsyncServerHandler:
paket com.anxpp.io.calculator.aio.server; impor java.io.ioException; impor java.net.inetsocketaddress; impor java.nio.channels.asyncserversocketchannel; impor java.util.concurrent.countdownlatch; kelas publik AsyncServerHandler mengimplementasikan runnable {public countdownlatch latch; Saluran AsyncserverSocketchannel Publik; public asyncserverhandler (int port) {coba {// buat server channel = asynchronServersocketchannel.open (); // bind channel.bind (inetsocketAddress (port) baru); System.out.println ("Server telah dimulai, nomor port:" + port); } catch (ioException e) {e.printstacktrace (); }} @Override public void run () {// Countdownlatch Inisialisasi // Fungsinya: Biarkan bidang saat ini untuk memblokir sepanjang waktu sebelum menyelesaikan serangkaian operasi yang dijalankan // di sini, biarkan lapangan memblokir di sini untuk mencegah server keluar (Eksekusi // Anda juga dapat menggunakan (true)+Sleep // lingkungan generasi tidak perlu khawatir tentang masalah ini, pemikiran ini, ExIt, EXIT, EXIT SLEEP // LINGKURAN GENERASI TIDAK PERLU KEMBALI THE SUKU INI) THE SUPPLOCT, THE SUBLOCT, EXIT SELAMA DOMPLOCK/SLEY+SLEEP // LINGKUNGAN GENERATION TIDAK PERLU PERLU KEMBALI THE SUPSI INI) // Connection Channel.accept (ini, AccepThandler baru ()); coba {latch.aWait (); } catch (InterruptedException e) {E.PrintStackTrace (); }}} AccepThandler:
paket com.anxpp.io.calculator.aio.server; impor java.nio.bytebuffer; impor java.nio.channels.asynchronoussocketchannel; impor java.nio.channels.CompletionHandler; // Sambungkan sebagai penangan kelas publik AccepThandler mengimplementasikan penyelesaian <synchronoussocketchannel, asyncserverHandler> {@Override public void selesai (asynchronoussocketchannel saluran, asyncerverhandler serverHandler) {// terus menerima permintaan dari klien lain. System.out.println ("Jumlah klien yang terhubung:" + server.clientCount); serverHandler.channel.accept (ServerHandler, This); // Buat buffer bytebuffer buffer baru = byteBuffer.allocate (1024); //Asynchronously read the third parameter for the service that receives message callbacks Handler channel.read(buffer, buffer, new ReadHandler(channel)); } @Override public void failed(Throwable exc, AsyncServerHandler serverHandler) { exc.printStackTrace(); serverHandler.latch.countDown(); }} ReadHandler:
package com.anxpp.io.calculator.aio.server; impor java.io.ioException; import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import com.anxpp.io.utils.Calculator; public class ReadHandler implements CompletionHandler<Integer, ByteBuffer> { //Used to read semi-packet messages and send answers private AsynchronousSocketChannel channel; public ReadHandler(AsynchronousSocketChannel channel) { this.channel = channel; } // Processing after reading the message @Override public void completed(Integer result, ByteBuffer attachment) { //flip operation attachment.flip(); //According to byte[] message = new byte[attachment.remaining()]; attachment.get(message); try { String expression = new String(message, "UTF-8"); System.out.println("The server received a message: " + expression); String calrResult = null; try{ calrResult = Calculator.cal(expression).toString(); } catch(Exception e){ calrResult = "Calculator error: " + e.getMessage(); } //Send a message doWrite(calrResult); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } //Send a message private void doWrite(String result) { byte[] bytes = result.getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); writeBuffer.put(bytes); writeBuffer.flip(); //Asynchronous write data parameters are the same as the previous read channel.write(writeBuffer, writeBuffer,new CompletionHandler<Integer, ByteBuffer>() { @Override public void completed(Integer result, ByteBuffer buffer) { //If it is not sent, continue to send until it is completed (buffer.hasRemaining()) channel.write(buffer, buffer, this); else{ //Create a new Buffer ByteBuffer readBuffer = ByteBuffer.allocate(1024); //Asynchronously read the third parameter for the business that receives message callbacks. Handler channel.read(readBuffer, readBuffer, new ReadHandler(channel)); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { channel.close(); } catch (IOException e) { } } }); } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { this.channel.close(); } catch (ioException e) {e.printstacktrace (); }}} OK,这样就已经完成了,其实说起来也简单,虽然代码感觉很多,但是API比NIO的使用起来真的简单多了,主要就是监听、读、写等各种CompletionHandler。此处本应有一个WriteHandler的,确实,我们在ReadHandler中,以一个匿名内部类实现了它。
下面看客户端代码。
3.2、Client端代码
Klien:
package com.anxpp.io.calculator.aio.client; import java.util.Scanner; public class Client { private static String DEFAULT_HOST = "127.0.0.1"; private static int DEFAULT_PORT = 12345; private static AsyncClientHandler clientHandle; public static void start(){ start(DEFAULT_HOST,DEFAULT_PORT); } public static synchronized void start(String ip,int port){ if(clientHandle!=null) return; clientHandle = new AsyncClientHandler(ip,port); new Thread(clientHandle,"Client").start(); } //Send a message to the server public static boolean sendMsg(String msg) throws Exception{ if(msg.equals("q")) return false; clientHandle.sendMsg(msg); Kembali Benar; } @SuppressWarnings("resource") public static void main(String[] args) throws Exception{ Client.start(); System.out.println("Please enter the request message:"); Scanner scanner = new Scanner(System.in); while(Client.sendMsg(scanner.nextLine())); }} AsyncClientHandler:
package com.anxpp.io.calculator.aio.client; impor java.io.ioException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.AsyncronousSocketChannel; import java.nio.channels.CompletionHandler; import java.util.concurrent.CountDownLatch; public class AsyncClientHandler implements CompletionHandler<Void, AsyncClientHandler>, Runnable { private AsynchronousSocketChannel clientChannel; private String host; port int pribadi; private CountDownLatch latch; public AsyncClientHandler(String host, int port) { this.host = host; this.port = port; try { //Create an asynchronous client channel clientChannel = AsynchronousSocketChannel.open(); } catch (ioException e) {e.printstacktrace (); } } @Override public void run() { //Create CountDownLatch and wait latch = new CountDownLatch(1); //Initiate an asynchronous connection operation, the callback parameter is this class itself. If the connection is successful, the completed method clientChannel.connect(new InetSocketAddress(host, port), this, this); try { latch.await(); } catch (InterruptedException e1) { e1.printStackTrace(); } try { clientChannel.close(); } catch (ioException e) {e.printstacktrace (); } } //Connect the server successfully// means TCP completes three handshakes @Override public void completed(Void result, AsyncClientHandler attachment) { System.out.println("Client connection to the server..."); } //Connecting to the server failed @Override public void failed(Throwable exc, AsyncClientHandler attachment) { System.err.println("Connecting to the server failed..."); exc.printStackTrace(); try { clientChannel.close(); latch.countDown(); } catch (ioException e) {e.printstacktrace (); } } //Send a message to the server public void sendMsg(String msg){ byte[] req = msg.getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(req.length); writeBuffer.put(req); writeBuffer.flip(); //Asynchronously write clientChannel.write(writeBuffer, writeBuffer,new WriteHandler(clientChannel, latch)); }} WriteHandler:
package com.anxpp.io.calculator.aio.client; impor java.io.ioException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.util.concurrent.CountDownLatch; public class WriteHandler implements CompletionHandler<Integer, ByteBuffer> { private AsynchronousSocketChannel clientChannel; private CountDownLatch latch; public WriteHandler(AsynchronousSocketChannel clientChannel,CountDownLatch latch) { this.clientChannel = clientChannel; this.latch = latch; } @Override public void completed(Integer result, ByteBuffer buffer) { //Complete writing of all data if (buffer.hasRemaining()) { clientChannel.write(buffer, buffer, this); } else { //Read data ByteBuffer readBuffer = ByteBuffer.allocate(1024); clientChannel.read(readBuffer,readBuffer,new ReadHandler(clientChannel, latch)); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { System.err.println("Data send failed..."); try { clientChannel.close(); latch.countDown(); } catch (IOException e) { } } } ReadHandler:
package com.anxpp.io.calculator.aio.client; impor java.io.ioException; import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.util.concurrent.CountDownLatch; public class ReadHandler implements CompletionHandler<Integer, ByteBuffer> { private AsynchronousSocketChannel clientChannel; private CountDownLatch latch; public ReadHandler(AsynchronousSocketChannel clientChannel,CountDownLatch latch) { this.clientChannel = clientChannel; this.latch = latch; } @Override public void completed(Integer result,ByteBuffer buffer) { buffer.flip(); byte[] bytes = new byte[buffer.remaining()]; buffer.get(bytes); String body; try { body = new String(bytes,"UTF-8"); System.out.println("Client received result:"+ body); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } @Override public void failed(Throwable exc,ByteBuffer attachment) { System.err.println("Data read failed..."); try { clientChannel.close(); latch.countDown(); } catch (IOException e) { } } }这个API使用起来真的是很顺手。
3.3. Tes
Tes:
package com.anxpp.io.calculator.aio; import java.util.Scanner; import com.anxpp.io.calculator.aio.client.Client; import com.anxpp.io.calculator.aio.server.Server; /** * Test method* @author yangtao__anxpp.com * @version 1.0 */ public class Test { //Test main method @SuppressWarnings("resource") public static void main(String[] args) throws Exception{ //Run server Server.start(); //Avoid the client executing the code Thread.sleep(100); //Run client Client.start(); System.out.println("Please enter the request message:"); Scanner scanner = new Scanner(System.in); while(Client.sendMsg(scanner.nextLine())); }}我们可以在控制台输入我们需要计算的算数字符串,服务器就会返回结果,当然,我们也可以运行大量的客户端,都是没有问题的,以为此处设计为单例客户端,所以也就没有演示大量客户端并发。
读者可以自己修改Client类,然后开辟大量线程,并使用构造方法创建很多的客户端测试。
下面是其中一次参数的输出:
服务器已启动,端口号:12345请输入请求消息:客户端成功连接到服务器...连接的客户端数:1123456+789+456服务器收到消息: 123456+789+456客户端收到结果:1247019526*56服务器收到消息: 9526*56客户端收到结果:533456...
AIO是真正的异步非阻塞的,所以,在面对超级大量的客户端,更能得心应手。
下面就比较一下,几种I/O编程的优缺点。
4、各种I/O的对比
先以一张表来直观的对比一下:
具体选择什么样的模型或者NIO框架,完全基于业务的实际应用场景和性能需求,如果客户端很少,服务器负荷不重,就没有必要选择开发起来相对不那么简单的NIO做服务端;相反,就应考虑使用NIO或者相关的框架了。
5、附录
上文中服务端使用到的用于计算的工具类:
package com.anxpp.utils;import javax.script.ScriptEngine;import javax.script.ScriptEngineManager;import javax.script.ScriptException;public final class Calculator { private final static ScriptEngine jse = new ScriptEngineManager().getEngineByName("JavaScript"); public static Object cal(String expression) throws ScriptException{ return jse.eval(expression); }}Di atas adalah semua konten artikel ini. I hope it will be helpful to everyone's learning and I hope everyone will support Wulin.com more.