คำนำ:
เมื่อเร็ว ๆ นี้เมื่อวิเคราะห์ RPC ของ Hadoop (โปรโตคอลการโทรขั้นตอนระยะไกล) โปรโตคอลที่ร้องขอบริการจากโปรแกรมคอมพิวเตอร์ระยะไกลผ่านเครือข่ายโดยไม่เข้าใจเทคโนโลยีเครือข่ายพื้นฐาน คุณสามารถอ้างถึง: http://baike.baidu.com/view/32726.htm) กลไกพบว่าการใช้งานกลไก RPC ของ Hadoop ส่วนใหญ่ใช้เทคโนโลยีสองอย่าง: พร็อกซีแบบไดนามิก (คุณสามารถอ้างถึงบล็อก: http://weixiaolu.iteye เพื่อวิเคราะห์ซอร์สโค้ด RPC ของ Hadoop อย่างถูกต้องฉันคิดว่าจำเป็นต้องศึกษาหลักการและการใช้งานเฉพาะของ Java Nio เป็นครั้งแรก
ในบล็อกนี้ส่วนใหญ่ฉันวิเคราะห์ Java Nio จากสองทิศทาง
สารบัญ:
หนึ่ง. ความแตกต่างระหว่าง Java Nio และการปิดกั้น I/O
1. การปิดกั้นรูปแบบการสื่อสาร I/O
2. Java Nio หลักการและการสื่อสารรุ่น 2. Java Nio Server และการใช้งานรหัสไคลเอนต์
การวิเคราะห์เฉพาะ:
หนึ่ง. ความแตกต่างระหว่าง Java Nio และการปิดกั้น I/O
1. การปิดกั้นรูปแบบการสื่อสาร I/O
หากคุณมีความเข้าใจบางอย่างเกี่ยวกับการปิดกั้น I/O ตอนนี้เรารู้ว่าการปิดกั้น I/O ถูกบล็อกเมื่อเรียกใช้เมธอด InputStream.read () มันจะรอจนกว่าข้อมูลจะมาถึง (หรือหมดเวลา) ก่อนที่จะกลับมา; ในทำนองเดียวกันเมื่อเรียกเมธอด serversocket.accept () มันจะบล็อกจนกว่าจะมีการเชื่อมต่อไคลเอนต์ก่อนที่จะกลับมา หลังจากไคลเอนต์แต่ละตัวเชื่อมต่อเซิร์ฟเวอร์จะเริ่มเธรดเพื่อประมวลผลคำขอของลูกค้า ไดอะแกรมโมเดลการสื่อสารของการปิดกั้น I/O มีดังนี้:
หากคุณวิเคราะห์อย่างระมัดระวังคุณจะพบว่ามีข้อเสียบางประการของการปิดกั้น I/O จากรูปแบบการสื่อสาร I/O ที่ปิดกั้นฉันสรุปข้อเสียสองประการ:
1. เมื่อมีลูกค้าจำนวนมากจะมีการสร้างเธรดการประมวลผลจำนวนมาก และแต่ละเธรดจะใช้พื้นที่สแต็กและเวลา CPU บางส่วน
2. การปิดกั้นสามารถนำไปสู่การสลับบริบทบ่อยครั้งและการสลับบริบทส่วนใหญ่อาจไม่มีความหมาย
ในกรณีนี้ I/O ที่ไม่ปิดกั้นมีโอกาสในการใช้งาน
2. รูปแบบหลักการและการสื่อสาร Java Nio
Java Nio เริ่มต้นใน JDK1.4 และอาจกล่าวได้ว่าเป็นทั้ง "I/O ใหม่" หรือ I/O ที่ไม่ปิดกั้น นี่คือวิธีการทำงานของ Java Nio:
1. เธรดเฉพาะจัดการกิจกรรม IO ทั้งหมดและรับผิดชอบการแจกจ่าย
2. กลไกที่ขับเคลื่อนด้วยเหตุการณ์: ทริกเกอร์เมื่อเหตุการณ์มาถึงแทนที่จะตรวจสอบเหตุการณ์พร้อมกัน
3. การสื่อสารเธรด: เธรดสื่อสารผ่านการรอการแจ้งเตือนและวิธีการอื่น ๆ ตรวจสอบให้แน่ใจว่าสวิตช์บริบททุกอย่างเหมาะสม ลดการสลับเธรดที่ไม่จำเป็น
หลังจากอ่านข้อมูลบางอย่างฉันโพสต์แผนผังแผนผังของ Java Nio ที่ฉันเข้าใจ:
(หมายเหตุ: โฟลว์การประมวลผลของแต่ละเธรดอาจอ่านข้อมูลการถอดรหัสการประมวลผลการคำนวณการเข้ารหัสและการส่งการตอบกลับ)
เซิร์ฟเวอร์ Java Nio ต้องเริ่มต้นเธรดพิเศษเพื่อจัดการเหตุการณ์ IO ทั้งหมด รูปแบบการสื่อสารนี้ใช้งานได้อย่างไร? ฮ่าฮ่าเรามาสำรวจความลึกลับด้วยกัน Java Nio ใช้ช่องสองทางสำหรับการส่งข้อมูลแทนที่จะเป็นสตรีมแบบทางเดียวและสามารถลงทะเบียนเหตุการณ์ที่น่าสนใจในช่อง มีสี่เหตุการณ์ทั้งหมด:
| ชื่อเหตุการณ์ | ค่าที่สอดคล้องกัน |
| เซิร์ฟเวอร์ได้รับเหตุการณ์การเชื่อมต่อไคลเอนต์ | SelectionKey.op_accept (16) |
| เหตุการณ์เซิร์ฟเวอร์การเชื่อมต่อไคลเอนต์ | SelectionKey.op_connect (8) |
| อ่านกิจกรรม | selectionKey.op_read (1) |
| เขียนกิจกรรม | selectionKey.op_write (4) |
เซิร์ฟเวอร์และไคลเอนต์แต่ละคนจะรักษาวัตถุที่จัดการช่องทางซึ่งเราเรียกว่าตัวเลือกซึ่งสามารถตรวจจับเหตุการณ์ในหนึ่งช่องหรือมากกว่า ลองใช้เซิร์ฟเวอร์เป็นตัวอย่าง หากเหตุการณ์การอ่านลงทะเบียนบนตัวเลือกของเซิร์ฟเวอร์ไคลเอนต์จะส่งข้อมูลบางอย่างไปยังเซิร์ฟเวอร์ในบางจุด เมื่อปิดกั้น I/O จะเรียกวิธีการอ่าน () เพื่อบล็อกข้อมูลและเซิร์ฟเวอร์ NIO จะเพิ่มเหตุการณ์การอ่านลงในตัวเลือก เธรดการประมวลผลของเซิร์ฟเวอร์จะสำรวจความคิดเห็นเพื่อเข้าถึงตัวเลือก หากพบเหตุการณ์ที่น่าสนใจเมื่อเข้าถึงตัวเลือกจะดำเนินการกับเหตุการณ์เหล่านี้ หากไม่มีเหตุการณ์ที่น่าสนใจมาถึงเธรดการประมวลผลจะบล็อกจนกว่าเหตุการณ์ที่น่าสนใจจะมาถึง ด้านล่างนี้เป็นแผนผังแผนผังของรูปแบบการสื่อสารของ Java Nio ที่ฉันเข้าใจ:
สอง. Java Nio Server และการใช้งานรหัสไคลเอนต์
เพื่อให้เข้าใจ Java Nio ได้ดีขึ้นต่อไปนี้คือการใช้รหัสง่าย ๆ สำหรับเซิร์ฟเวอร์และไคลเอนต์
เซิร์ฟเวอร์:
แพ็คเกจ cn.nio; นำเข้า java.io.ioException; นำเข้า java.net.inetsocketaddress; นำเข้า java.nio.bytebuffer; นำเข้า java.nio.channels.SelectionKey; นำเข้า java.nio.channels.Selector; นำเข้า java.nio.channels.serversocketChannel; นำเข้า java.nio.channels.socketChannel; นำเข้า java.util.iterator; /*** NIO Server* @author เส้นทางขนาดเล็ก*/คลาสสาธารณะ Nioserver {// Channel Manager ตัวเลือกตัวเลือกส่วนตัว; / ** * รับช่อง Serversocket และทำงานเริ่มต้นบางอย่างบนช่อง * @param พอร์ตพอร์ตที่ถูกผูกไว้ * @throws ioexception */ โมฆะสาธารณะ initserver (พอร์ต int) พ่น IOException {// รับ Serversocket Channel ServersocketChannel ServerChannel = ServersocketChannel.Open (); // ตั้งค่าช่องเป็น ServerChannel.ConfigureBlocking (FALSE); // ผูก serversocket ที่สอดคล้องกับช่องนี้กับพอร์ตพอร์ต ServelChannel.Socket (). bind (ใหม่ inetSocketAddress (พอร์ต)); // รับตัวจัดการช่องทางนี้ selector = selector.open (); // ผูกตัวจัดการช่องไปยังช่องและลงทะเบียนเหตุการณ์ selectionKey.op_accept สำหรับช่อง หลังจากลงทะเบียนเหตุการณ์ // เมื่อเหตุการณ์มาถึง Selector.select () จะกลับมา หากเหตุการณ์ไม่ถึง selector.select () จะบล็อก ServerChannel.Register (ตัวเลือก, selectionKey.op_accept); } /*** ใช้การสำรวจเพื่อฟังว่ามีเหตุการณ์ในตัวเลือกที่ต้องดำเนินการหรือไม่ ถ้าเป็นเช่นนั้นมันจะถูกประมวลผล * @throws ioexception */ @suppresswarnings ("ไม่ได้ตรวจสอบ") โมฆะสาธารณะฟัง () พ่น IOException {system.out.println ("เซิร์ฟเวอร์เริ่มสำเร็จ!"); // การสำรวจเพื่อเข้าถึงตัวเลือกในขณะที่ (จริง) {// เมื่อเหตุการณ์ที่ลงทะเบียนมาถึงวิธีการกลับมา; มิฉะนั้นวิธีการจะปิดกั้น selector.select (); // รับตัววนซ้ำของรายการที่เลือกในตัวเลือกและรายการที่เลือกคือรายการ iterator เหตุการณ์ที่ลงทะเบียน = this.Selector.SelectedKeys () Iterator (); ในขณะที่ (ite.hasnext ()) {selectionKey key = (selectionKey) ite.next (); // ลบคีย์ที่เลือกเพื่อป้องกันการประมวลผลซ้ำของ item.remove (); // ไคลเอนต์ร้องขอเหตุการณ์การเชื่อมต่อถ้า (key.iscceptable ()) {serversocketChannel Server = (serversocketChannel) คีย์. channel (); // รับแชนเนลเพื่อเชื่อมต่อกับไคลเอนต์ SocketChannel channel = server.accept (); // ตั้งค่าเป็น channel ที่ไม่ปิดกั้น configureblocking (false); // คุณสามารถส่งข้อมูลไปยังไคลเอนต์ที่นี่ channel.write (byteBuffer.wrap (สตริงใหม่ ("ส่งข้อความไปยังไคลเอนต์"). getBytes ())); // หลังจากการเชื่อมต่อกับลูกค้าสำเร็จเพื่อรับข้อมูลของลูกค้าคุณต้องตั้งค่าการอ่านสิทธิ์สำหรับช่อง channel.register (this.Selector, selectionKey.op_read); // ได้รับเหตุการณ์ที่อ่านได้} อื่นถ้า (key.isreadable ()) {อ่าน (คีย์); }}}} / ** * เหตุการณ์การประมวลผลที่อ่านข้อความที่ส่งโดยไคลเอนต์ * @param คีย์ * @throws ioexception * / โมฆะสาธารณะอ่าน (คีย์ตัวเลือกคีย์) โยน ioexception {// เซิร์ฟเวอร์สามารถอ่านข้อความ: รับช่องซ็อกเก็ตที่เหตุการณ์เกิดขึ้น socketchannel channel = (Socketchannel) key.channel (); // สร้างบัฟเฟอร์การอ่านบัฟเฟอร์บัฟเฟอร์ = byteBuffer.Allocate (10); channel.read (บัฟเฟอร์); ไบต์ [] data = buffer.array (); String msg = สตริงใหม่ (data) .trim (); System.out.println ("เซิร์ฟเวอร์ได้รับข้อความ:"+msg); ByteBuffer Outbuffer = byteBuffer.wrap (msg.getBytes ()); channel.write (outbuffer); // ส่งข้อความกลับไปยังไคลเอนต์}/ *** เริ่มต้นการทดสอบเซิร์ฟเวอร์* @throws ioexception*/ โมฆะคงที่สาธารณะหลัก (สตริง [] args) โยน ioexception {nioserver server = neioserver ใหม่ (); Server.InitServer (8000); Server.Listen (); - ลูกค้า:
แพ็คเกจ cn.nio; นำเข้า java.io.ioException; นำเข้า java.net.inetsocketaddress; นำเข้า java.nio.bytebuffer; นำเข้า java.nio.channels.SelectionKey; นำเข้า java.nio.channels.Selector; นำเข้า java.nio.channels.socketChannel; นำเข้า java.util.iterator; /*** ไคลเอนต์ NIO* @Author เส้นทางขนาดเล็ก*/คลาสสาธารณะ nioclient {// ช่องสัญญาณตัวเลือกตัวเลือกส่วนตัวตัวเลือก; / ** * รับช่องซ็อกเก็ตและทำการเริ่มต้นบางช่อง * @param ip ip ของเซิร์ฟเวอร์ที่เชื่อมต่อกับ * @param พอร์ตหมายเลขพอร์ตของเซิร์ฟเวอร์ที่เชื่อมต่อกับ * @throws ioexception */ public void initclient (สตริง ip, พอร์ต int) โยน ioexception {// // ตั้งค่าช่องเป็นช่องที่ไม่ใช่การบล็อกการกำหนดค่าการบดบัง (เท็จ); // รับตัวจัดการช่องทางนี้ selector = selector.open (); // ไคลเอนต์เชื่อมต่อกับเซิร์ฟเวอร์ ในความเป็นจริงการดำเนินการวิธีการไม่ได้ใช้การเชื่อมต่อ คุณต้องโทร // ใช้ channel.finishconnect (); ในวิธีการฟัง () เพื่อให้ช่องเชื่อมต่อเสร็จสมบูรณ์การเชื่อมต่อ (ใหม่ inetSocketAddress (IP, พอร์ต)); // ผูกตัวจัดการช่องไปยังช่องและลงทะเบียนเหตุการณ์ selectionKey.op_connect สำหรับช่อง channel.register (ตัวเลือก, selectionKey.op_connect); } /*** ใช้การสำรวจเพื่อฟังว่ามีเหตุการณ์ในตัวเลือกที่ต้องดำเนินการหรือไม่ ถ้าเป็นเช่นนั้นมันจะถูกประมวลผล * @throws ioexception */ @suppresswarnings ("ไม่ได้ตรวจสอบ") โมฆะสาธารณะฟัง () พ่น IOException {// โพลเพื่อเข้าถึงตัวเลือกในขณะที่ (จริง) {selector.select (); // รับตัววนซ้ำสำหรับรายการที่เลือกในรายการตัวเลือกตัวเลือก = this.Selector.SelectedKeys (). iterator (); ในขณะที่ (ite.hasnext ()) {selectionKey key = (selectionKey) item.next (); // ลบคีย์ที่เลือกเพื่อป้องกันการประมวลผลซ้ำของ item.remove (); // เหตุการณ์การเชื่อมต่อเกิดขึ้นถ้า (key.isconnectable ()) {socketchannel channel = (socketchannel) คีย์. channel (); // หากเชื่อมต่อเชื่อมต่อให้เสร็จสิ้นการเชื่อมต่อถ้า (channel.isconnectionPending ()) {channel.finishconnect (); } // ตั้งค่าเป็น channel ที่ไม่ใช่การปิดกั้น configureblocking (false); // คุณสามารถส่งข้อมูลไปยัง server channel.write (byteBuffer.wrap (สตริงใหม่ ("ส่งข้อความไปยังเซิร์ฟเวอร์"). getBytes ())); // หลังจากการเชื่อมต่อกับเซิร์ฟเวอร์สำเร็จเพื่อรับข้อมูลเซิร์ฟเวอร์ช่องจะต้องตั้งค่าให้อ่านสิทธิ์ channel.register (this.Selector, selectionKey.op_read); // ได้รับเหตุการณ์ที่อ่านได้} อื่นถ้า (key.isreadable ()) {อ่าน (คีย์); }}}} /*** เหตุการณ์การประมวลผลที่อ่านข้อความที่ส่งโดยเซิร์ฟเวอร์* @param คีย์* @throws ioexception* /โมฆะสาธารณะอ่าน (คีย์ตัวเลือกคีย์) พ่น IOException {// เหมือนกับวิธีการอ่านของเซิร์ฟเวอร์} /*** เริ่มต้นการทดสอบไคลเอนต์* @throws Nioclient (); client.initClient ("localhost", 8000); client.listen (); -สรุป:
ในที่สุดการวิเคราะห์พร็อกซีแบบไดนามิกและ Java Nio เสร็จสมบูรณ์ ฮ่าฮ่าต่อไปนี้คือการวิเคราะห์ซอร์สโค้ดของกลไก RPC ของ Hadoop ที่อยู่บล็อกคือ: http://weixiaolu.iteye.com/blog/1504898 อย่างไรก็ตามหากคุณมีการคัดค้านความเข้าใจของคุณเกี่ยวกับ Java Nio คุณสามารถพูดคุยกันได้
หากคุณต้องการพิมพ์ซ้ำโปรดระบุแหล่งที่มา: http://weixiaolu.iteye.com/blog/1479656