บทความนี้จะแนะนำจากตื้นไปจนถึงลึกจากชีวภาพดั้งเดิมไปยัง Nio ถึง AIO และจะมาพร้อมกับคำอธิบายรหัสที่สมบูรณ์
ตัวอย่างจะถูกใช้ในรหัสต่อไปนี้: ไคลเอนต์ส่งสตริงของสมการไปยังเซิร์ฟเวอร์และเซิร์ฟเวอร์จะส่งคืนผลลัพธ์ไปยังไคลเอนต์หลังจากการคำนวณ
คำแนะนำทั้งหมดสำหรับรหัสจะใช้โดยตรงเป็นความคิดเห็นและฝังอยู่ในรหัสซึ่งสามารถเข้าใจได้ง่ายขึ้นเมื่ออ่านรหัส คลาสเครื่องมือสำหรับการคำนวณผลลัพธ์จะใช้ในรหัสดูส่วนรหัสของบทความ
บทความที่แนะนำสำหรับความรู้พื้นฐานที่เกี่ยวข้อง:
บทนำสู่ Linux Network I/O รุ่น (รูปภาพและข้อความ)
Java Concurrency (มัลติเธรด)
1. การเขียนโปรแกรมชีวภาพ
1.1. การเขียนโปรแกรมชีวภาพแบบดั้งเดิม
รูปแบบพื้นฐานของการเขียนโปรแกรมเครือข่ายคือโมเดล C/S นั่นคือการสื่อสารระหว่างสองกระบวนการ
เซิร์ฟเวอร์จัดเตรียมพอร์ต IP และการฟัง ไคลเอนต์เริ่มต้นคำขอการเชื่อมต่อผ่านที่อยู่การเชื่อมต่อการเชื่อมต่อที่เซิร์ฟเวอร์ต้องการฟัง ผ่านการจับมือกันสามครั้งหากมีการสร้างการเชื่อมต่อสำเร็จทั้งสองฝ่ายสามารถสื่อสารผ่านซ็อกเก็ตได้
ในการพัฒนารูปแบบการปิดกั้นการซิงโครไนซ์แบบดั้งเดิม Serversocket มีหน้าที่รับผิดชอบในการเชื่อมโยงที่อยู่ IP และเริ่มการฟังพอร์ต ซ็อกเก็ตมีหน้าที่รับผิดชอบในการเริ่มต้นการเชื่อมต่อ หลังจากการเชื่อมต่อสำเร็จทั้งสองฝ่ายจะทำการสื่อสารการปิดกั้นแบบซิงโครนัสผ่านกระแสอินพุตและเอาต์พุต
คำอธิบายสั้น ๆ ของรูปแบบการสื่อสารเซิร์ฟเวอร์ Bio: เซิร์ฟเวอร์ที่ใช้รูปแบบการสื่อสารทางชีวภาพมักจะเป็นเธรดตัวรับอิสระที่รับผิดชอบในการฟังการเชื่อมต่อของลูกค้า หลังจากได้รับคำขอการเชื่อมต่อไคลเอนต์จะสร้างเธรดใหม่สำหรับแต่ละไคลเอนต์สำหรับการประมวลผลลิงก์และไม่สามารถประมวลผลได้จากนั้นส่งคืนการตอบกลับไปยังไคลเอนต์ผ่านสตรีมเอาต์พุตและเธรดจะถูกทำลาย นั่นคือโมเดลหนึ่งคืนที่ตอบกลับตลอดทั้งคืน
ไดอะแกรมรูปแบบการสื่อสารทางชีวภาพแบบดั้งเดิม:
ปัญหาที่ใหญ่ที่สุดของรุ่นนี้คือมันขาดความสามารถในการปรับสเกลที่ยืดหยุ่น เมื่อจำนวนการเข้าถึงที่เกิดขึ้นพร้อมกันของไคลเอนต์เพิ่มขึ้นจำนวนเธรดบนเซิร์ฟเวอร์จะเป็นสัดส่วนกับจำนวนการเข้าถึงพร้อมกันบนไคลเอนต์ เธรดใน Java ยังเป็นทรัพยากรระบบที่มีค่าเช่นกัน หลังจากจำนวนเธรดขยายอย่างรวดเร็วประสิทธิภาพของระบบจะลดลงอย่างรวดเร็ว เมื่อจำนวนการเข้าถึงยังคงเพิ่มขึ้นในที่สุดระบบจะตายในที่สุด
ซอร์สโค้ดเซิร์ฟเวอร์ที่สร้างโดยการปิดกั้นแบบซิงโครนัส I/O:
แพ็คเกจ com.anxpp.io.calculator.bio; นำเข้า java.io.ioException; นำเข้า java.net.serversocket; นำเข้า java.net.socket; /** * ซอร์สโค้ดเซิร์ฟเวอร์ Bio * @author yangtao__anxpp.com * @version 1.0 */คลาสสุดท้ายของคลาสสุดท้าย servernormal {// หมายเลขพอร์ตเริ่มต้นหมายเลขส่วนตัวคงที่ int default_port = 12345; // Singleton Serversocket เซิร์ฟเวอร์เซิร์ฟเวอร์เซิร์ฟเวอร์สแตติกส่วนตัว // ตั้งค่าพอร์ตการฟังตามพารามิเตอร์ที่เข้ามา หากไม่มีพารามิเตอร์ให้เรียกใช้วิธีการต่อไปนี้และใช้ค่าเริ่มต้นเป็นโมฆะค่าเริ่มต้นสาธารณะเริ่มต้น () พ่น IOException {// ใช้ค่าเริ่มต้นเริ่มต้น (default_port); } // วิธีนี้จะไม่สามารถเข้าถึงได้ในรูปแบบที่เกิดขึ้นพร้อมกันจำนวนมากและไม่จำเป็นต้องพิจารณาประสิทธิภาพเพียงซิงโครไนซ์วิธีการเรียกใช้โมฆะแบบคงที่แบบคงที่สาธารณะโดยตรง ลอง {// สร้าง Serversocket ผ่านตัวสร้าง // หากพอร์ตนั้นถูกกฎหมายและไม่ได้ใช้งานเซิร์ฟเวอร์จะฟังสำเร็จ Server = ใหม่ Serversocket (พอร์ต); System.out.println ("เซิร์ฟเวอร์เริ่มต้นแล้วหมายเลขพอร์ต:" + พอร์ต); // ฟังการเชื่อมต่อไคลเอนต์ผ่าน Loop ไร้สาย // หากไม่มีการเข้าถึงไคลเอนต์จะถูกบล็อกในการดำเนินการที่ยอมรับ ในขณะที่ (จริง) {ซ็อกเก็ตซ็อกเก็ต = server.accept (); // เมื่อมีการเข้าถึงไคลเอนต์ใหม่โค้ดต่อไปนี้จะถูกเรียกใช้งาน // จากนั้นสร้างเธรดใหม่เพื่อจัดการลิงค์ซ็อกเก็ตลิงก์นี้ใหม่ (ServerHandler ใหม่ (ซ็อกเก็ต)) เริ่มต้น (); }} ในที่สุด {// การทำความสะอาดที่จำเป็นบางอย่างถ้า (เซิร์ฟเวอร์! = null) {system.out.println ("เซิร์ฟเวอร์ปิด"); Server.close (); เซิร์ฟเวอร์ = null; - การประมวลผลข้อความไคลเอนต์เธรดเซิร์ฟเวอร์ซอร์สโค้ด:
แพ็คเกจ com.anxpp.io.calculator.bio; นำเข้า java.io.bufferedreader; นำเข้า java.io.ioException; นำเข้า Java.io.InputStreamReader; นำเข้า java.io.printwriter; นำเข้า java.net.socket; นำเข้า com.anxpp.io.utils.calculator; / *** เธรดไคลเอนต์* @author Yangtao__anxpp.com* ลิงก์ซ็อกเก็ตสำหรับไคลเอนต์*/ คลาสสาธารณะ ServerHandler ใช้งาน Runnable {ซ็อกเก็ตส่วนตัวซ็อกเก็ต; Public ServerHandler (ซ็อกเก็ตซ็อกเก็ต) {this.socket = ซ็อกเก็ต; } @Override โมฆะสาธารณะเรียกใช้ () {bufferedReader ใน = null; printwriter out = null; ลอง {in = ใหม่ bufferedReader (ใหม่ inputStreamReader (socket.getInputStream ())); out = new printWriter (socket.getOutputStream (), true); การแสดงออกของสตริง; สตริงผลลัพธ์; ในขณะที่ (จริง) {// อ่านบรรทัดผ่าน bufferedReader // หากคุณได้อ่านหางของสตรีมอินพุตให้ส่งคืนค่า null และออกจากลูป // หากคุณได้รับค่าที่ไม่ใช่ NULL ลองคำนวณผลลัพธ์และส่งคืนถ้า ((นิพจน์ = in.readline ()) == null) System.out.println ("เซิร์ฟเวอร์ได้รับข้อความ:" + นิพจน์); ลอง {result = calculator.cal (นิพจน์) .toString (); } catch (exception e) {result = "calculator.cal (นิพจน์) .toString ();} catch (Exception e) {e.printstacktrace ();} ในที่สุด {// การทำความสะอาดที่จำเป็นบางอย่างถ้า (ใน! = null) {ลอง {in.close (); null) {out.close () out = null; ซอร์สโค้ดไคลเอนต์ที่สร้างโดยการปิดกั้นแบบซิงโครนัส I/O:
แพ็คเกจ com.anxpp.io.calculator.bio; นำเข้า java.io.bufferedreader; นำเข้า java.io.ioException; นำเข้า Java.io.InputStreamReader; นำเข้า java.io.printwriter; นำเข้า java.net.socket; /** * ไคลเอนต์ที่สร้างขึ้นโดยการปิดกั้น I/O * @author Yangtao__anxpp.com * @Version 1.0 */ไคลเอนต์คลาสสาธารณะ {// หมายเลขพอร์ตเริ่มต้นหมายเลขส่วนตัวคงที่ int default_server_port = 12345; สตริงคงที่ส่วนตัว default_server_ip = "127.0.0.1"; โมฆะคงที่สาธารณะส่ง (นิพจน์สตริง) {ส่ง (default_server_port, นิพจน์); } โมฆะคงที่สาธารณะส่ง (int พอร์ต, นิพจน์สตริง) {system.out.println ("นิพจน์ทางคณิตศาสตร์คือ:" + นิพจน์); ซ็อกเก็ตซ็อกเก็ต = null; bufferedReader ใน = null; printwriter out = null; ลอง {socket = ซ็อกเก็ตใหม่ (default_server_ip, พอร์ต); ใน = ใหม่ bufferedReader (ใหม่ inputStreamReader (socket.getInputStream ())); out = new printWriter (socket.getOutputStream (), true); out.println (นิพจน์); System.out.println ("___ ผลลัพธ์คือ:" + in.readline ()); } catch (exception e) {e.printstacktrace (); } ในที่สุด {// เป็นงานทำความสะอาดที่จำเป็นถ้า (ใน! = null) {ลอง {in.close (); } catch (ioexception e) {e.printstacktrace (); } in = null; } if (out! = null) {out.close (); ออก = null; } if (ซ็อกเก็ต! = null) {ลอง {socket.close (); } catch (ioexception e) {e.printstacktrace (); } ซ็อกเก็ต = null; - ทดสอบรหัสเพื่ออำนวยความสะดวกในการดูผลลัพธ์ผลลัพธ์ในคอนโซลให้ใส่ในโปรแกรมเดียวกัน (JVM) เพื่อเรียกใช้:
แพ็คเกจ com.anxpp.io.calculator.bio; นำเข้า java.io.ioException; นำเข้า java.util.random; /** * วิธีทดสอบ * @author Yangtao__anxpp.com * @version 1.0 */การทดสอบคลาสสาธารณะ {// ทดสอบวิธีหลักโมฆะสาธารณะคงที่หลัก (สตริง [] args) พ่น InterruptedException {// เรียกใช้เซิร์ฟเวอร์ใหม่หัวข้อ (ใหม่ runnable () E.PrintStackTrace ();}}}) เริ่มต้น (); // หลีกเลี่ยงไคลเอนต์ที่เรียกใช้รหัสก่อนที่เซิร์ฟเวอร์จะเริ่ม // เรียกใช้ตัวดำเนินการถ่านหินไคลเอนต์ [] = {'+', '-', '*', '/'}; สุ่มสุ่ม = ใหม่สุ่ม (System.currentTimeMillis ()); เธรดใหม่ (ใหม่ runnable () {@suppresswarnings ("Static-Access") @Override public void run () {ในขณะที่ (จริง) {// สุ่มสร้างการแสดงออกทางคณิตศาสตร์นิพจน์ = random.nextint (10)+"" ตัวดำเนินการ [สุ่ม thread.currentthread (). sleep (random.nextint (1000)); - - }}}}}}}} {เธรด currentthread (). sleep (random.nextint (1,000)); }}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}} {เธรด currentthread (). sleep (random.nextint (1,000)); - ผลลัพธ์ของหนึ่งในการวิ่ง:
เซิร์ฟเวอร์เริ่มต้นแล้วหมายเลขพอร์ต: 12345 นิพจน์เลขคณิตคือ: 4-2 เซิร์ฟเวอร์ได้รับข้อความ: 4-2 ___ ผลลัพธ์คือ: 2 นิพจน์เลขคณิตคือ: 5-10 เซิร์ฟเวอร์ได้รับข้อความ: 5-10__ ผลลัพธ์คือ: -5 นิพจน์คณิตศาสตร์คือ: 0-9 เซิร์ฟเวอร์ที่ได้รับ: 0-9__ ได้รับข้อความ: 1/6__ ผลลัพธ์คือ
จากรหัสข้างต้นเป็นเรื่องง่ายที่จะเห็นว่าปัญหาหลักของชีวภาพคือเมื่อใดก็ตามที่ไคลเอนต์ขอการเข้าถึงการเข้าถึงเซิร์ฟเวอร์จะต้องสร้างเธรดใหม่เพื่อจัดการลิงค์นี้ซึ่งไม่สามารถนำไปใช้ในสถานการณ์ที่มีประสิทธิภาพสูงและจำเป็นต้องมีการพร้อมกันสูง
1.2. การเขียนโปรแกรม I/O Pseudo-asynchronous
เพื่อปรับปรุงรูปแบบการเชื่อมต่อแบบหนึ่งเดียวนี้เราสามารถใช้พูลเธรดเพื่อจัดการเธรดเหล่านี้ (สำหรับข้อมูลเพิ่มเติมโปรดดูบทความที่ให้ไว้ก่อนหน้านี้) การใช้แบบจำลองสำหรับหนึ่งหรือมากกว่านั้นเพื่อประมวลผลไคลเอนต์ N (แต่ชั้นพื้นฐานยังคงใช้การปิดกั้นแบบซิงโครนัส I/O) ซึ่งมักเรียกว่า
Pseudo-asynchronous I/O Model Model Diagram:
การใช้งานนั้นง่ายมาก เราเพียงแค่ต้องมอบเธรดใหม่ให้กับการจัดการพูลเธรดและเพียงแค่เปลี่ยนรหัสเซิร์ฟเวอร์ตอนนี้:
แพ็คเกจ com.anxpp.io.calculator.bio; นำเข้า java.io.ioException; นำเข้า java.net.serversocket; นำเข้า java.net.socket; นำเข้า java.util.concurrent.executorservice; นำเข้า java.util.concurrent.executors; /** * ซอร์สโค้ดเซิร์ฟเวอร์ Bio __pseudo-asynchronous i/o * @author yangtao__anxpp.com * @version 1.0 */คลาสสุดท้ายของคลาสสุดท้ายเซิร์ฟเวอร์เบ็ตเตอร์ {// หมายเลขพอร์ตเริ่มต้น // Singleton Serversocket เซิร์ฟเวอร์เซิร์ฟเวอร์เซิร์ฟเวอร์สแตติกส่วนตัว // Singleton Private ExecutorService ExecutorService = Executors.NewFixedThreadPool (60); // ตั้งค่าพอร์ตการฟังตามพารามิเตอร์ที่เข้ามา หากไม่มีพารามิเตอร์ให้เรียกใช้วิธีการต่อไปนี้และใช้ค่าเริ่มต้นโมฆะค่าเริ่มต้นสาธารณะเริ่มต้น () พ่น IOException {// ใช้ค่าเริ่มต้นเริ่มต้น (default_port); } // วิธีนี้จะไม่สามารถเข้าถึงได้ในจำนวนมากพร้อมกันและไม่จำเป็นต้องพิจารณาประสิทธิภาพเพียงซิงโครไนซ์วิธีการเรียกใช้โมฆะคงที่สาธารณะแบบคงที่โดยตรง ลอง {// สร้าง Serversocket ผ่านตัวสร้าง // หากพอร์ตนั้นถูกกฎหมายและไม่ได้ใช้งานเซิร์ฟเวอร์จะฟังสำเร็จ Server = ใหม่ Serversocket (พอร์ต); System.out.println ("เซิร์ฟเวอร์เริ่มต้นแล้วหมายเลขพอร์ต:" + พอร์ต); // superce การเชื่อมต่อไคลเอนต์ผ่านลูปไร้สาย // หากไม่มีการเข้าถึงไคลเอนต์มันจะถูกบล็อกในการดำเนินการยอมรับ ในขณะที่ (จริง) {ซ็อกเก็ตซ็อกเก็ต = server.accept (); // เมื่อมีการเข้าถึงไคลเอนต์ใหม่โค้ดต่อไปนี้จะถูกเรียกใช้งาน // จากนั้นสร้างเธรดใหม่เพื่อประมวลผลซ็อกเก็ตลิงค์ executorService.execute (ServerHandler ใหม่ (ซ็อกเก็ต)); }} ในที่สุด {// การทำความสะอาดที่จำเป็นบางอย่างถ้า (เซิร์ฟเวอร์! = null) {system.out.println ("เซิร์ฟเวอร์ปิด"); Server.close (); เซิร์ฟเวอร์ = null; - ผลการทดสอบการรันเหมือนกัน
เรารู้ว่าถ้าเราใช้พูล cachedthreadpool (ไม่มีการ จำกัด จำนวนเธรดหากไม่ชัดเจนโปรดดูบทความที่ให้ไว้ในตอนต้นของบทความ) อันที่จริงนอกเหนือจากการช่วยเราจัดการเธรด (นำกลับมาใช้ใหม่โดยอัตโนมัติ การใช้ recidethreadpool เราควบคุมจำนวนเธรดสูงสุดได้อย่างมีประสิทธิภาพตรวจสอบให้แน่ใจว่าการควบคุมทรัพยากรที่ จำกัด ของระบบและใช้โมเดล I/O pseudo-asynchronous N: M
อย่างไรก็ตามเนื่องจากจำนวนเธรดมี จำกัด หากมีการร้องขอพร้อมกันจำนวนมากเธรดที่เกินจำนวนสูงสุดสามารถรอได้จนกว่าจะมีเธรดฟรีในพูลเธรดที่สามารถนำกลับมาใช้ใหม่ได้ เมื่ออ่านกระแสอินพุตซ็อกเก็ตมันจะถูกบล็อกจนกว่าจะเกิดขึ้น:
ดังนั้นเมื่อการอ่านข้อมูลช้า (เช่นข้อมูลจำนวนมากการส่งเครือข่ายช้า ฯลฯ ) และการเกิดพร้อมกันจำนวนมากข้อความการเข้าถึงอื่น ๆ สามารถรอได้ตลอดเวลาซึ่งเป็นข้อเสียที่ใหญ่ที่สุด
NIO ที่จะแนะนำในภายหลังสามารถแก้ปัญหานี้ได้
2. การเขียนโปรแกรม NIO
ห้องสมุด Java I/O ใหม่ได้รับการแนะนำในแพ็คเกจ java.nio.* ใน JDK 1.4 โดยมีวัตถุประสงค์เพื่อเพิ่มความเร็ว ในความเป็นจริงแพ็คเกจ "เก่า" I/O ได้รับการปรับปรุงใหม่โดยใช้ NIO และเราจะได้รับประโยชน์จากมันแม้ว่าเราจะไม่ได้ใช้การเขียนโปรแกรม NIO อย่างชัดเจน การปรับปรุงความเร็วสามารถเกิดขึ้นได้ทั้งในไฟล์ I/O และเครือข่าย I/O แต่บทความนี้กล่าวถึงหลังเท่านั้น
2.1. การแนะนำ
โดยทั่วไปเราคิดว่า NIO เป็น I/O ใหม่ (ชื่ออย่างเป็นทางการ) เพราะมันเป็นใหม่สำหรับห้องสมุด I/O เก่า (จริง ๆ แล้วมันได้รับการแนะนำใน JDK 1.4 แต่คำนามนี้จะยังคงใช้เป็นเวลานานแม้ว่าพวกเขาจะ "เก่า" ตอนนี้ดังนั้นมันยังเตือนเราว่าเราต้องพิจารณาอย่างระมัดระวัง อย่างไรก็ตามมันเรียกว่าไม่ใช่บล็อก I/O ของหลาย ๆ คนนั่นคือ I/O ที่ไม่ปิดกั้นเพราะสิ่งนี้เรียกว่ามันสามารถสะท้อนลักษณะของมันได้ดีขึ้น NIO ในข้อความต่อไปนี้ไม่ได้อ้างถึงไลบรารี I/O ใหม่ทั้งหมด แต่ไม่ได้ปิดกั้น I/O
NIO ให้การใช้ช่องซ็อกเก็ตสองช่องที่แตกต่างกัน: Socketchannel และ ServersocketChannel ที่สอดคล้องกับซ็อกเก็ตและ Serversocket ในรุ่นชีวภาพแบบดั้งเดิม
ทั้งสองช่องที่เพิ่มเข้ามารองรับการบล็อกและโหมดไม่ปิดกั้น
การใช้โหมดการบล็อกนั้นง่ายเหมือนการสนับสนุนแบบดั้งเดิม แต่ประสิทธิภาพและความน่าเชื่อถือไม่ดี โหมดที่ไม่ปิดกั้นนั้นตรงกันข้าม
สำหรับแอพพลิเคชั่นที่มีการโหลดต่ำ, ความขัดแย้งต่ำ, การปิดกั้นแบบซิงโครนัส I/O สามารถใช้เพื่อปรับปรุงอัตราการพัฒนาและการบำรุงรักษาที่ดีขึ้น สำหรับแอพพลิเคชั่นที่มีการโหลดสูง (เครือข่าย) สูงควรใช้โหมดที่ไม่ปิดกั้นของ NIO เพื่อพัฒนา
ความรู้พื้นฐานจะได้รับการแนะนำก่อนด้านล่าง
2.2. บัฟเฟอร์บัฟเฟอร์
บัฟเฟอร์เป็นวัตถุที่มีข้อมูลบางอย่างที่จะเขียนหรืออ่านออก
ในไลบรารี NIO ข้อมูลทั้งหมดจะถูกประมวลผลในบัฟเฟอร์ เมื่ออ่านข้อมูลจะถูกอ่านลงในบัฟเฟอร์โดยตรง เมื่อเขียนข้อมูลมันจะถูกเขียนลงในบัฟเฟอร์ เมื่อใดก็ตามที่คุณเข้าถึงข้อมูลใน NIO จะดำเนินการผ่านบัฟเฟอร์
บัฟเฟอร์เป็นอาร์เรย์และให้ข้อมูลเช่นการเข้าถึงข้อมูลที่มีโครงสร้างและการบำรุงรักษาสถานที่อ่านและเขียน
พื้นที่แคชเฉพาะคือ: bytebuffe, charbuffer, shortbuffer, intbuffer, longbuffer, floatbuffer, doublebuffer พวกเขาใช้อินเทอร์เฟซเดียวกัน: บัฟเฟอร์
2.3. ช่อง
การอ่านและการเขียนข้อมูลของเราจะต้องผ่านช่องทางซึ่งเป็นเหมือนท่อน้ำช่องทาง ความแตกต่างระหว่างช่องและสตรีมคือช่องเป็นสองทิศทางและสามารถใช้สำหรับการอ่านการอ่านเขียนและการอ่านพร้อมกัน
ช่องทางของระบบปฏิบัติการพื้นฐานโดยทั่วไปจะมีความซับซ้อนเต็มรูปแบบดังนั้นช่องทางฟูลเพล็กซ์สามารถแมป API ของระบบปฏิบัติการพื้นฐานได้ดีกว่าสตรีม
ช่องส่วนใหญ่แบ่งออกเป็นสองประเภท:
ServersocketChannel และ Socketchannel ที่จะมีส่วนร่วมในรหัสต่อไปนี้เป็นทั้งคลาสย่อยของ SelectableChannel
2.4. ตัวเลือกมัลติเพล็กเซอร์
ตัวเลือกเป็นพื้นฐานของการเขียนโปรแกรม Java Nio
ตัวเลือกให้ความสามารถในการเลือกงานพร้อม: ตัวเลือกจะสำรวจช่องสัญญาณที่ลงทะเบียนอย่างต่อเนื่อง หากเหตุการณ์การอ่านหรือเขียนเกิดขึ้นในช่องช่องจะอยู่ในสถานะพร้อมและจะถูกสำรวจโดยตัวเลือก จากนั้นชุดของช่องพร้อมรับสามารถรับได้ผ่านการเลือกคีย์เพื่อดำเนินการ I/O ที่ตามมา
ตัวเลือกสามารถสำรวจความคิดเห็นหลายช่องทางในเวลาเดียวกันเนื่องจาก JDK ใช้ epoll () แทนการใช้งานแบบเลือกแบบดั้งเดิมไม่มีการ จำกัด การเชื่อมต่อสูงสุดที่จับ 1024/2048 ดังนั้นจึงจำเป็นต้องมีเธรดเพียงเธรดเดียวที่ต้องรับผิดชอบในการสำรวจตัวเลือกและสามารถเข้าถึงลูกค้าหลายพันรายได้
2.5. เซิร์ฟเวอร์ NIO
รหัสดูเหมือนซับซ้อนกว่าการเขียนโปรแกรมซ็อกเก็ตแบบดั้งเดิม
เพียงวางรหัสและให้คำอธิบายรหัสในรูปแบบของความคิดเห็น
ซอร์สโค้ดเซิร์ฟเวอร์ที่สร้างโดย NIO:
แพ็คเกจ com.anxpp.io.calculator.nio; เซิร์ฟเวอร์คลาสสาธารณะ {ส่วนตัวคงที่ int default_port = 12345; เซิร์ฟเวอร์เซิร์ฟเวอร์ส่วนตัวแบบคงที่ส่วนตัว; public static void start () {start (default_port); } public Static synchronized void start (พอร์ต int) {ถ้า (serverhandle! = null) serverhandle.stop (); ServerHandle = ใหม่ ServerHandle (พอร์ต); เธรดใหม่ (ServerHandle, "เซิร์ฟเวอร์"). start (); } โมฆะคงที่สาธารณะหลัก (สตริง [] args) {start (); - Serverhandle:
แพ็คเกจ com.anxpp.io.calculator.nio; นำเข้า java.io.ioException; นำเข้า java.net.inetsocketaddress; นำเข้า java.nio.bytebuffer; นำเข้า java.nio.channels.SelectionKey; นำเข้า java.nio.channels.Selector; นำเข้า java.nio.channels.serversocketChannel; นำเข้า java.nio.channels.socketChannel; นำเข้า java.util.iterator; นำเข้า java.util.set; นำเข้า com.anxpp.io.utils.calculator; / ** * เซิร์ฟเวอร์ NIO * @author Yangtao__anxpp.com * @version 1.0 */ คลาสสาธารณะ ServerHandle ใช้งาน Runnable {ตัวเลือกส่วนตัว; ServersocketChannel ส่วนตัว ServerChannel; บูลีนผันผวนส่วนตัวเริ่มต้น; /*** constructor* @param พอร์ตระบุหมายเลขพอร์ตที่จะฟัง*/public serverhandle (พอร์ต int) {ลอง {// สร้าง selector = selector.open (); // เปิดช่องฟัง ServerChannel = ServersocketChannel.Open (); // ถ้าเป็นจริงช่องนี้จะถูกวางไว้ในโหมดการปิดกั้น หากเป็นเท็จช่องนี้จะถูกวางไว้ในโหมดที่ไม่ใช่การปิดกั้น ServerChannel.ConfigureBlocking (เท็จ); // เปิดใช้งานโหมดที่ไม่ปิดกั้น // bind backlog พอร์ตถูกตั้งค่าเป็น 1024 ServerChannel.socket (). bind (ใหม่ inetSocketAddress (พอร์ต), 1024); // คำขอการเชื่อมต่อไคลเอนต์ Superce ServerChannel.Register (ตัวเลือก, selectionKey.op_accept); // ทำเครื่องหมายเซิร์ฟเวอร์ถูกเปิดใช้งานเริ่มต้น = true; System.out.println ("เซิร์ฟเวอร์เริ่มต้นแล้วหมายเลขพอร์ต:" + พอร์ต); } catch (ioexception e) {e.printstacktrace (); System.Exit (1); }} โมฆะสาธารณะหยุด () {เริ่มต้น = false; } @Override โมฆะสาธารณะ Run () {// วนผ่านตัวเลือกในขณะที่ (เริ่มต้น) {ลอง {// ไม่ว่าจะมีเหตุการณ์การอ่านและเขียนตัวเลือกจะตื่นขึ้นมาทุก 1S selector.select (1,000); // การบล็อกมันจะดำเนินต่อไปเมื่อมีเหตุการณ์ที่ลงทะเบียนอย่างน้อยหนึ่งเหตุการณ์ // selector.select (); SET <SelectionKey> ปุ่ม = selector.selectedKeys (); Iterator <SelectionKey> it = keys.iterator (); selectionKey key = null; ในขณะที่ (it.hasnext ()) {key = it.next (); it.remove (); ลอง {handleinput (คีย์); } catch (exception e) {if (key! = null) {key.cancel (); if (key.channel ()! = null) {key.channel (). close (); }}}}}}}}} catch (throwable t) {t.printstacktrace (); }} // หลังจากตัวเลือกถูกปิดทรัพยากรที่มีการจัดการจะถูกปล่อยออกมาโดยอัตโนมัติหาก (ตัวเลือก! = null) ลอง {selector.close (); } catch (exception e) {e.printstacktrace (); }} โมฆะส่วนตัวที่จับริทิพ (คีย์ SelectionKey) พ่น IOException {ถ้า (key.isvalid ()) {// การประมวลผลข้อความคำขอสำหรับการเข้าถึงใหม่ถ้า (key.iscceptable ()) {ServersocketChannel ssc = (serversocketChannel) key.channel (); // การสร้างอินสแตนซ์ SocketChannel ผ่านการยอมรับของ ServersocketChannel // เสร็จสิ้นการดำเนินการนี้หมายถึงการทำ Handshake สามทาง TCP และการเชื่อมโยงทางกายภาพ TCP ได้รับการจัดตั้งขึ้นอย่างเป็นทางการ Socketchannel SC = ssc.accept (); // ตั้งค่าเป็น sc.configureblocking ที่ไม่ปิดกั้น (เท็จ); // ลงทะเบียนเป็นอ่าน Sc.Register (ตัวเลือก, selectionKey.op_read); } // อ่านข้อความถ้า (key.isreadable ()) {socketchannel sc = (socketchannel) key.channel (); // สร้าง bytebuffer และเปิดบัฟเฟอร์บัฟเฟอร์ 1M bytebuffer บัฟเฟอร์ = bytebuffer.allocate (1024); // อ่านสตรีมคำขอและส่งคืนจำนวนไบต์อ่าน int readBytes = sc.read (บัฟเฟอร์); // อ่านไบต์และรหัสไบต์ถ้า (readbytes> 0) {// ตั้งค่าขีด จำกัด ปัจจุบันของบัฟเฟอร์เป็นตำแหน่ง = 0 สำหรับการดำเนินการอ่านที่ตามมาของ buffer.flip (); // สร้างอาร์เรย์ไบต์ตามจำนวนบัฟเฟอร์ที่อ่านได้ไบต์ไบต์ = ไบต์ใหม่ [buffer.remaining ()]; // คัดลอกอาร์เรย์บัฟเฟอร์ที่อ่านได้ลงในอาร์เรย์อาร์เรย์ที่สร้างขึ้นใหม่ (ไบต์); String expression = สตริงใหม่ (ไบต์, "UTF-8"); System.out.println ("เซิร์ฟเวอร์ได้รับข้อความ:" + นิพจน์); // การประมวลผลสตริงข้อมูลผลลัพธ์ = null; ลอง {result = calculator.cal (นิพจน์) .toString (); } catch (Exception e) {result = "ข้อผิดพลาดในการคำนวณ:" + e.getMessage (); } // ส่งข้อความตอบกลับ dowrite (sc, ผลลัพธ์); } // ไม่มีไบต์อ่านและละเว้น // อื่น ๆ ถ้า (readbytes == 0); // ลิงค์ถูกปิดการปลดปล่อยทรัพยากรอื่นถ้า (readbytes <0) {key.cancel (); sc.close (); }}}}} // ส่งข้อความตอบกลับแบบอะซิงโครนัสส่วนตัว dowrite (ช่อง Socketchannel, การตอบสนองของสตริง) พ่น IOException {// การเข้ารหัสข้อความเป็นไบต์อาร์เรย์ไบต์ [] bytes = response.getBytes (); // สร้าง bytebuffer ตามความสามารถของอาร์เรย์ bytebuffer writebuffer = bytebuffer.allocate (bytes.length); // คัดลอกอาร์เรย์ไบต์ไปยังบัฟเฟอร์ writebuffer.put (ไบต์); // การดำเนินการพลิก wringbuffer.flip (); // ส่งอาร์เรย์ไบต์ของช่องบัฟเฟอร์เขียน (writebuffer); // ***** รหัสสำหรับการประมวลผล "เขียนครึ่งแพ็คเก็ต" ไม่รวมอยู่ที่นี่}}อย่างที่คุณเห็นขั้นตอนหลักสำหรับการสร้างเซิร์ฟเวอร์ NIO มีดังนี้:
เนื่องจากมีการส่งข้อความตอบกลับ Socketchannel จึงเป็นแบบอะซิงโครนัสและไม่ปิดกั้นดังนั้นจึงไม่สามารถรับประกันได้ว่าข้อมูลที่ต้องส่งสามารถส่งได้ในครั้งเดียวและจะมีปัญหาในการเขียนครึ่งแพ็กเก็ตในเวลานี้ เราจำเป็นต้องลงทะเบียนการดำเนินการเขียนสำรวจตัวเลือกอย่างต่อเนื่องเพื่อส่งข้อความที่ไม่ได้รับและใช้วิธี Hasremain () ของบัฟเฟอร์เพื่อตรวจสอบว่ามีการส่งข้อความหรือไม่
2.6. ไคลเอนต์ NIO
เป็นการดีกว่าที่จะอัปโหลดรหัส กระบวนการไม่ต้องการคำอธิบายมากเกินไป แต่ก็คล้ายกับรหัสเซิร์ฟเวอร์
ลูกค้า:
แพ็คเกจ com.anxpp.io.calculator.nio; ไคลเอนต์คลาสสาธารณะ {สตริงคงที่ส่วนตัว default_host = "127.0.0.1"; ส่วนตัวคงที่ int default_port = 12345; clienthandle clienthandle ส่วนตัว; โมฆะคงที่สาธารณะเริ่มต้น () {start (default_host, default_port); } public Static synchronized void start (string ip, พอร์ต int) {ถ้า (clienthandle! = null) clienthandle.stop (); clientHandle = ใหม่ clientHandle (IP, พอร์ต); เธรดใหม่ (clienthandle, "เซิร์ฟเวอร์"). start (); } // ส่งข้อความไปยังเซิร์ฟเวอร์สาธารณะบูลีนคงที่ sendmsg (สตริงผงชูรส) โยนข้อยกเว้น {ถ้า (msg.equals ("q")) ส่งคืน false; clienthandle.sendmsg (msg); กลับมาจริง; } โมฆะคงที่สาธารณะหลัก (สตริง [] args) {start (); - clienthandle:
แพ็คเกจ com.anxpp.io.calculator.nio; นำเข้า java.io.ioException; นำเข้า java.net.inetsocketaddress; นำเข้า java.nio.bytebuffer; นำเข้า java.nio.channels.SelectionKey; นำเข้า java.nio.channels.Selector; นำเข้า java.nio.channels.socketChannel; นำเข้า java.util.iterator; นำเข้า java.util.set; / ** * ไคลเอนต์ NIO * @author Yangtao__anxpp.com * @version 1.0 */ Class Public Class Clienthandle ใช้งาน runnable {โฮสต์สตริงส่วนตัว; พอร์ต int ส่วนตัว; ตัวเลือกส่วนตัว Socketchannel Socketchannel ส่วนตัว; บูลีนผันผวนส่วนตัวเริ่มต้น; Public ClientHandle (String IP, int พอร์ต) {this.host = ip; this.port = พอร์ต; ลอง {// สร้างตัวเลือกตัวเลือก = selector.open (); // เปิดช่องทางการฟัง SocketChannel = SocketChannel.Open (); // ถ้าเป็นจริงช่องนี้จะถูกวางไว้ในโหมดการปิดกั้น หากเป็นเท็จช่องนี้จะถูกวางไว้ในโหมดที่ไม่ปิดกั้น socketchannel.ConfigureBlocking (false); // เปิดโหมดที่ไม่ปิดกั้นเริ่มต้น = true; } catch (ioexception e) {e.printstacktrace (); System.Exit (1); }} โมฆะสาธารณะหยุด () {เริ่มต้น = false; } @Override โมฆะสาธารณะเรียกใช้ () {ลอง {doconnect (); } catch (ioexception e) {e.printstacktrace (); System.Exit (1); } // วนซ้ำผ่านตัวเลือกในขณะที่ (เริ่มต้น) {ลอง {// โดยไม่คำนึงว่ามีเหตุการณ์การอ่านและเขียนตัวเลือกจะถูกปลุกทุก 1s selector.select (1,000); // การบล็อกและมันจะดำเนินต่อไปเฉพาะเมื่อมีเหตุการณ์ที่ลงทะเบียนอย่างน้อยหนึ่งเหตุการณ์ // selector.select (); SET <SelectionKey> ปุ่ม = selector.selectedKeys (); Iterator <SelectionKey> it = keys.iterator (); selectionKey key = null; ในขณะที่ (it.hasnext ()) {key = it.next (); it.remove (); ลอง {handleinput (คีย์); } catch (exception e) {if (key! = null) {key.cancel (); if (key.channel ()! = null) {key.channel (). close (); }}}}}}}} catch (Exception E) {E.printStackTrace (); System.Exit (1); }} // หลังจากตัวเลือกถูกปิดทรัพยากรที่มีการจัดการจะถูกปล่อยออกมาโดยอัตโนมัติหาก (ตัวเลือก! = null) ลอง {selector.close (); } catch (exception e) {e.printstacktrace (); }} โมฆะส่วนตัวที่จับริทิพ (คีย์ SelectionKey) พ่น IOException {if (key.isvalid ()) {Socketchannel sc = (socketchannel) key.channel (); if (key.isconnectable ()) {ถ้า (sc.finishconnect ()); ELSE SYSTEM.EXIT (1); } // อ่านข้อความถ้า (key.isreadable ()) {// สร้าง byteBuffer และเปิดบัฟเฟอร์บัฟเฟอร์ 1M bytebuffer บัฟเฟอร์ = byteBuffer.Allocate (1024); // อ่านสตรีมรหัสคำขอและส่งคืนจำนวนไบต์อ่าน int readBytes = sc.read (บัฟเฟอร์); // อ่านไบต์และรหัสไบต์ถ้า (readbytes> 0) {// ตั้งค่าขีด จำกัด ปัจจุบันของบัฟเฟอร์เป็นตำแหน่ง = 0 สำหรับการดำเนินการอ่านที่ตามมาของ buffer.flip (); // สร้างอาร์เรย์ไบต์ขึ้นอยู่กับจำนวนไบต์ที่อ่านได้ในบัฟเฟอร์ไบต์ [] ไบต์ = ไบต์ใหม่ [บัฟเฟอร์. remaining ()]; // คัดลอกอาร์เรย์บัฟเฟอร์ที่อ่านได้ลงในอาร์เรย์อาร์เรย์ที่สร้างขึ้นใหม่ (ไบต์); สตริงผลลัพธ์ = สตริงใหม่ (ไบต์, "UTF-8"); System.out.println ("ไคลเอนต์ได้รับข้อความ:" + ผลลัพธ์); } // ไม่มีการอ่านไบต์จะถูกละเว้น // else ถ้า (readBytes == 0); // ลิงค์ถูกปิดการปลดปล่อยทรัพยากรอื่นถ้า (readbytes <0) {key.cancel (); sc.close (); }}}}} // ส่งข้อความเป็นโมฆะส่วนตัวแบบอะซิงโครนัส Dowrite (ช่อง SocketChannel, คำขอสตริง) พ่น IOException {// การเข้ารหัสข้อความเป็นไบต์อาร์เรย์ไบต์ [] bytes = request.getByTes (); // การสร้าง bytebuffer ขึ้นอยู่กับความสามารถของอาร์เรย์ bytebuffer writebuffer = bytebuffer.allocate (bytes.length); // การคัดลอกอาร์เรย์ไบต์ไปยังบัฟเฟอร์ writebuffer.put (ไบต์); // การดำเนินการพลิก wringbuffer.flip (); // ส่ง BYTE Array Channel.write (writeBuffer); // ***** รหัสสำหรับการประมวลผล "เขียนครึ่งแพ็คเก็ต" ไม่รวมอยู่ที่นี่} void doconnect ส่วนตัว () พ่น IOException {ถ้า (socketchannel.connect (ใหม่ inetSocketAddress (โฮสต์พอร์ต))); else Socketchannel.register (ตัวเลือก, selectionKey.op_connect); } โมฆะสาธารณะ sendmsg (สตริงผงชูรส) โยนข้อยกเว้น {socketchannel.register (ตัวเลือก, selectionKey.op_read); Dowrite (Socketchannel, MSG); - 2.7. ผลการสาธิต
ก่อนอื่นให้เรียกใช้เซิร์ฟเวอร์และเรียกใช้ไคลเอนต์โดยวิธีการ:
แพ็คเกจ com.anxpp.io.calculator.nio; นำเข้า java.util.scanner; /** * วิธีทดสอบ * @author Yangtao__anxpp.com * @version 1.0 */การทดสอบคลาสสาธารณะ {// ทดสอบวิธีหลัก @suppresswarnings ("ทรัพยากร") โมฆะคงที่สาธารณะ (สตริง [] args) โยนข้อยกเว้น {// เรียกใช้เซิร์ฟเวอร์เซิร์ฟเวอร์ // หลีกเลี่ยงไคลเอ็นต์ที่ดำเนินการค้นหารหัส.sleep (100); // เรียกใช้ไคลเอนต์ client.start (); ในขณะที่ (client.sendmsg (สแกนเนอร์ใหม่ (system.in) .nextline ())); - นอกจากนี้เรายังสามารถเรียกใช้ไคลเอนต์แยกต่างหากและเอฟเฟกต์ก็เหมือนกัน
ผลการทดสอบ:
เซิร์ฟเวอร์เริ่มต้นแล้วหมายเลขพอร์ต: 123451+2+3+4+5+6 เซิร์ฟเวอร์ได้รับข้อความ: 1+2+3+4+5+6 ไคลเอ็นต์ได้รับข้อความ: 211*2/3-4+5*6/7-8 เซิร์ฟเวอร์ได้รับข้อความ: 1*2/3-4+5*6/7-8
ไม่มีปัญหาในการเรียกใช้ลูกค้าหลายราย
3. การเขียนโปรแกรม AIO
NIO 2.0 แนะนำแนวคิดของช่องทางอะซิงโครนัสใหม่และให้การใช้งานช่องสัญญาณแบบอะซิงโครนัสและช่องซ็อกเก็ตแบบอะซิงโครนัส
ช่องซ็อกเก็ตแบบอะซิงโครนัสเป็น I/O ที่ไม่ปิดกั้นแบบอะซิงโครนัสอย่างแท้จริงซึ่งสอดคล้องกับ I/O (AIO) ที่ขับเคลื่อนด้วยเหตุการณ์ในการเขียนโปรแกรมเครือข่าย UNIX ไม่จำเป็นต้องมีตัวเลือกมากเกินไปในการสำรวจช่องทางที่ลงทะเบียนเพื่อให้ได้การอ่านและเขียนแบบอะซิงโครนัสซึ่งทำให้โมเดลการเขียนโปรแกรม NIO ง่ายขึ้น
เพียงอัปโหลดรหัส
3.1. รหัสด้านเซิร์ฟเวอร์
เซิร์ฟเวอร์:
แพ็คเกจ com.anxpp.io.calculator.aio.server; / ** * AIO Server * @author Yangtao__anxpp.com * @version 1.0 */ เซิร์ฟเวอร์คลาสสาธารณะ {ส่วนตัวคงที่ int default_port = 12345; Private Static AsyncServerHandler ServerHandle; Public ระเหยได้คงที่ clientCount ยาว = 0; public static void start () {start (default_port); } public Static synchronized void start (พอร์ต int) {if (serverhandle! = null) return; ServerHandle = ใหม่ asyncServerHandler (พอร์ต); เธรดใหม่ (ServerHandle, "เซิร์ฟเวอร์"). start (); } โมฆะคงที่สาธารณะหลัก (สตริง [] args) {server.start (); - AsyncServerHandler:
แพ็คเกจ com.anxpp.io.calculator.aio.server; นำเข้า java.io.ioException; นำเข้า java.net.inetsocketaddress; นำเข้า java.nio.channels.asyncServersocketChannel; นำเข้า java.util.concurrent.countdownlatch; คลาสสาธารณะ AsyncServerHandler ใช้งาน Runnable {Public Countdownlatch Latch; Public AsyncServersocketChannel Channel; Public AsyncServerHandler (พอร์ต int) {ลอง {// สร้าง server channel = asynchronoussersocketChannel.open (); // ผูกพอร์ต channel.bind (ใหม่ inetSocketAddress (พอร์ต)); System.out.println ("เซิร์ฟเวอร์เริ่มต้นแล้วหมายเลขพอร์ต:" + พอร์ต); } catch (ioexception e) {e.printstacktrace (); }} @Override โมฆะสาธารณะ Run () {// CountdownLatch Initialization // ฟังก์ชั่นของมัน: อนุญาตให้ฟิลด์ปัจจุบันบล็อกตลอดเวลาก่อนที่จะดำเนินการชุดปฏิบัติการที่ดำเนินการ // ที่นี่ให้บล็อกฟิลด์ที่นี่เพื่อป้องกันไม่ให้เซิร์ฟเวอร์ออกจากการดำเนินการ // // การเชื่อมต่อ channel.accept (นี่, ใหม่ accepthandler ()); ลอง {latch.await (); } catch (interruptedException e) {e.printStackTrace (); - accepthandler:
แพ็คเกจ com.anxpp.io.calculator.aio.server; นำเข้า java.nio.bytebuffer; นำเข้า java.nio.channels.asynchronoussocketChannel; นำเข้า java.nio.channels.completionhandler; // เชื่อมต่อเป็นตัวจัดการคลาสสาธารณะ Accepthandler ใช้ PrectiveHandler <AsynchronoussocketChannel, asyncServerHandler> {@Override โมฆะสาธารณะเสร็จสมบูรณ์ System.out.println ("จำนวนไคลเอนต์ที่เชื่อมต่อ:" + Server.ClientCount); serverHandler.channel.accept(serverHandler, this); //Create a new Buffer ByteBuffer buffer = ByteBuffer.allocate(1024); //Asynchronously read the third parameter for the service that receives message callbacks Handler channel.read(buffer, buffer, new ReadHandler(channel)); } @Override public void failed(Throwable exc, AsyncServerHandler serverHandler) { exc.printStackTrace(); serverHandler.latch.countDown(); - ReadHandler:
package com.anxpp.io.calculator.aio.server; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import com.anxpp.io.utils.Calculator; public class ReadHandler implements CompletionHandler<Integer, ByteBuffer> { //Used to read semi-packet messages and send answers private AsynchronousSocketChannel channel; public ReadHandler(AsynchronousSocketChannel channel) { this.channel = channel; } // Processing after reading the message @Override public void completed(Integer result, ByteBuffer attachment) { //flip operation attachment.flip(); //According to byte[] message = new byte[attachment.remaining()]; attachment.get(message); try { String expression = new String(message, "UTF-8"); System.out.println("The server received a message: " + expression); String calrResult = null; try{ calrResult = Calculator.cal(expression).toString(); } catch(Exception e){ calrResult = "Calculator error: " + e.getMessage(); } //Send a message doWrite(calrResult); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } //Send a message private void doWrite(String result) { byte[] bytes = result.getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); writeBuffer.put(bytes); writeBuffer.flip(); //Asynchronous write data parameters are the same as the previous read channel.write(writeBuffer, writeBuffer,new CompletionHandler<Integer, ByteBuffer>() { @Override public void completed(Integer result, ByteBuffer buffer) { //If it is not sent, continue to send until it is completed (buffer.hasRemaining()) channel.write(buffer, buffer, this); else{ //Create a new Buffer ByteBuffer readBuffer = ByteBuffer.allocate(1024); //Asynchronously read the third parameter for the business that receives message callbacks. Handler channel.read(readBuffer, readBuffer, new ReadHandler(channel)); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { channel.close(); } catch (IOException e) { } } }); } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { this.channel.close(); } catch (IOException e) { e.printStackTrace(); - OK,这样就已经完成了,其实说起来也简单,虽然代码感觉很多,但是API比NIO的使用起来真的简单多了,主要就是监听、读、写等各种CompletionHandler。此处本应有一个WriteHandler的,确实,我们在ReadHandler中,以一个匿名内部类实现了它。
下面看客户端代码。
3.2. Client side code
ลูกค้า:
package com.anxpp.io.calculator.aio.client; นำเข้า java.util.scanner; public class Client { private static String DEFAULT_HOST = "127.0.0.1"; private static int DEFAULT_PORT = 12345; private static AsyncClientHandler clientHandle; public static void start(){ start(DEFAULT_HOST,DEFAULT_PORT); } public static synchronized void start(String ip,int port){ if(clientHandle!=null) return; clientHandle = new AsyncClientHandler(ip,port); new Thread(clientHandle,"Client").start(); } //Send a message to the server public static boolean sendMsg(String msg) throws Exception{ if(msg.equals("q")) return false; clientHandle.sendMsg(msg); return true; } @SuppressWarnings("resource") public static void main(String[] args) throws Exception{ Client.start(); System.out.println("Please enter the request message:"); Scanner scanner = new Scanner(System.in); while(Client.sendMsg(scanner.nextLine())); - AsyncClientHandler:
package com.anxpp.io.calculator.aio.client; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.AsyncronousSocketChannel; import java.nio.channels.CompletionHandler; import java.util.concurrent.CountDownLatch; public class AsyncClientHandler implements CompletionHandler<Void, AsyncClientHandler>, Runnable { private AsynchronousSocketChannel clientChannel; private String host; พอร์ต int ส่วนตัว; private CountDownLatch latch; public AsyncClientHandler(String host, int port) { this.host = host; this.port = port; try { //Create an asynchronous client channel clientChannel = AsynchronousSocketChannel.open(); } catch (IOException e) { e.printStackTrace(); } } @Override public void run() { //Create CountDownLatch and wait latch = new CountDownLatch(1); //Initiate an asynchronous connection operation, the callback parameter is this class itself. If the connection is successful, the completed method clientChannel.connect(new InetSocketAddress(host, port), this, this); try { latch.await(); } catch (InterruptedException e1) { e1.printStackTrace(); } try { clientChannel.close(); } catch (IOException e) { e.printStackTrace(); } } //Connect the server successfully// means TCP completes three handshakes @Override public void completed(Void result, AsyncClientHandler attachment) { System.out.println("Client connection to the server..."); } //Connecting to the server failed @Override public void failed(Throwable exc, AsyncClientHandler attachment) { System.err.println("Connecting to the server failed..."); exc.printStackTrace(); try { clientChannel.close(); latch.countDown(); } catch (IOException e) { e.printStackTrace(); } } //Send a message to the server public void sendMsg(String msg){ byte[] req = msg.getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(req.length); writeBuffer.put(req); writeBuffer.flip(); //Asynchronously write clientChannel.write(writeBuffer, writeBuffer,new WriteHandler(clientChannel, latch)); - WriteHandler:
package com.anxpp.io.calculator.aio.client; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.util.concurrent.CountDownLatch; public class WriteHandler implements CompletionHandler<Integer, ByteBuffer> { private AsynchronousSocketChannel clientChannel; private CountDownLatch latch; public WriteHandler(AsynchronousSocketChannel clientChannel,CountDownLatch latch) { this.clientChannel = clientChannel; this.latch = latch; } @Override public void completed(Integer result, ByteBuffer buffer) { //完成全部数据的写入if (buffer.hasRemaining()) { clientChannel.write(buffer, buffer, this); } else { //读取数据ByteBuffer readBuffer = ByteBuffer.allocate(1024); clientChannel.read(readBuffer,readBuffer,new ReadHandler(clientChannel, latch)); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { System.err.println("数据发送失败..."); try { clientChannel.close(); latch.countDown(); } catch (IOException e) { } } } ReadHandler:
package com.anxpp.io.calculator.aio.client; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.util.concurrent.CountDownLatch; public class ReadHandler implements CompletionHandler<Integer, ByteBuffer> { private AsynchronousSocketChannel clientChannel; private CountDownLatch latch; public ReadHandler(AsynchronousSocketChannel clientChannel,CountDownLatch latch) { this.clientChannel = clientChannel; this.latch = latch; } @Override public void completed(Integer result,ByteBuffer buffer) { buffer.flip(); byte[] bytes = new byte[buffer.remaining()]; buffer.get(bytes); String body; try { body = new String(bytes,"UTF-8"); System.out.println("客户端收到结果:"+ body); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } @Override public void failed(Throwable exc,ByteBuffer attachment) { System.err.println("数据读取失败..."); try { clientChannel.close(); latch.countDown(); } catch (IOException e) { } } }这个API使用起来真的是很顺手。
3.3. ทดสอบ
ทดสอบ:
package com.anxpp.io.calculator.aio; นำเข้า java.util.scanner; import com.anxpp.io.calculator.aio.client.Client; import com.anxpp.io.calculator.aio.server.Server; /** * Test method* @author yangtao__anxpp.com * @version 1.0 */ public class Test { //Test main method @SuppressWarnings("resource") public static void main(String[] args) throws Exception{ //Run server Server.start(); //Avoid the client executing the code Thread.sleep(100); //Run client Client.start(); System.out.println("Please enter the request message:"); Scanner scanner = new Scanner(System.in); while(Client.sendMsg(scanner.nextLine())); -我们可以在控制台输入我们需要计算的算数字符串,服务器就会返回结果,当然,我们也可以运行大量的客户端,都是没有问题的,以为此处设计为单例客户端,所以也就没有演示大量客户端并发。
读者可以自己修改Client类,然后开辟大量线程,并使用构造方法创建很多的客户端测试。
下面是其中一次参数的输出:
服务器已启动,端口号:12345请输入请求消息:客户端成功连接到服务器...连接的客户端数:1123456+789+456服务器收到消息: 123456+789+456客户端收到结果:1247019526*56服务器收到消息: 9526*56客户端收到结果:533456...
AIO是真正的异步非阻塞的,所以,在面对超级大量的客户端,更能得心应手。
下面就比较一下,几种I/O编程的优缺点。
4、各种I/O的对比
先以一张表来直观的对比一下:
具体选择什么样的模型或者NIO框架,完全基于业务的实际应用场景和性能需求,如果客户端很少,服务器负荷不重,就没有必要选择开发起来相对不那么简单的NIO做服务端;相反,就应考虑使用NIO或者相关的框架了。
5、附录
上文中服务端使用到的用于计算的工具类:
package com.anxpp.utils;import javax.script.ScriptEngine;import javax.script.ScriptEngineManager;import javax.script.ScriptException;public final class Calculator { private final static ScriptEngine jse = new ScriptEngineManager().getEngineByName("JavaScript"); public static Object cal(String expression) throws ScriptException{ return jse.eval(expression); -ข้างต้นเป็นเนื้อหาทั้งหมดของบทความนี้ ฉันหวังว่ามันจะเป็นประโยชน์ต่อการเรียนรู้ของทุกคนและฉันหวังว่าทุกคนจะสนับสนุน wulin.com มากขึ้น