مقدمة:
في الآونة الأخيرة ، عند تحليل بروتوكول استدعاء Hadoop (بروتوكول استدعاء الإجراءات عن بُعد) ، وهو بروتوكول يطلب خدمات من برامج الكمبيوتر عن بُعد من خلال الشبكة دون فهم تكنولوجيا الشبكة الأساسية. يمكنك الرجوع إلى: http://baike.baidu.com/view/32726.htm) ، وقد وجد أن تنفيذ آلية RPC في Hadoop يستخدم بشكل أساسي تقنيتين: الوكيل الديناميكي (يمكنك الرجوع إلى المدونة: http://weixiaolu.itey.com/blog/14774) من أجل تحليل رمز مصدر RPC الخاص بـ RPC بشكل صحيح ، أعتقد أنه من الضروري أولاً دراسة المبادئ والتنفيذ المحدد لـ Java NIO.
في هذه المدونة ، أقوم بتحليل Java Nio بشكل أساسي من اتجاهين
جدول المحتويات:
واحد. الفرق بين جافا نيو وحظر الإدخال/س
1. حظر نموذج الاتصال I/O
2. مبدأ Java NIO ونموذج الاتصال 2. خادم Java NIO وتنفيذ رمز العميل
تحليل محدد:
واحد. الفرق بين جافا نيو وحظر الإدخال/س
1. حظر نموذج الاتصال I/O
إذا كان لديك فهم معين لحظر I/O الآن ، فإننا نعلم أن حظر الإدخال/الإخراج يتم حظره عند استدعاء طريقة inputStream.Read () ، فسوف تنتظر حتى تصل البيانات (أو المهلة) قبل العودة ؛ وبالمثل ، عند استدعاء طريقة ServersOcket.accept () ، سيتم حظره حتى يكون هناك اتصال عميل قبل العودة. بعد اتصال كل عميل ، سيبدأ الخادم مؤشر ترابط لمعالجة طلب العميل. مخطط نموذج الاتصال لحظر الإدخال/الإخراج هو كما يلي:
إذا قمت بتحليلها بعناية ، فستجد بالتأكيد أن هناك بعض عيوب منع I/O. استنادًا إلى نموذج الاتصال I/O الحظر ، لخصت عيوبه:
1. عندما يكون هناك العديد من العملاء ، سيتم إنشاء عدد كبير من مؤشرات الترابط المعالجة. ويأخذ كل موضوع مساحة مكدس وبعض وقت وحدة المعالجة المركزية
2. يمكن أن يؤدي الحظر إلى تبديل سياق متكرر ، وقد يكون تبديل السياق لا معنى له.
في هذه الحالة ، فإن I/O غير المحظور لديه آفاق التطبيق الخاصة به.
2. مبدأ جافا نيو ونموذج الاتصال
بدأت Java Nio في JDK1.4 ، ويمكن قول أن كلاهما "I/O جديد" أو I/O غير المحظور. إليكم كيف تعمل جافا نيو:
1. الخيط المخصص يتعامل مع جميع أحداث IO وهو مسؤول عن التوزيع.
2. آلية تعتمد على الحدث: المشغلات عند وصول الحدث ، بدلاً من مراقبة الأحداث في وقت واحد.
3. اتصال الخيط: المواضيع التواصل من خلال الانتظار ، والإخطار والوسائل الأخرى. تأكد من أن كل مفتاح سياق منطقي. تقليل تبديل الخيط غير الضروري.
بعد قراءة بعض المعلومات ، نشرت مخططًا تخطيطيًا لـ Java Nio الذي أفهمه:
(ملاحظة: من المحتمل أن يكون تدفق المعالجة لكل مؤشر ترابط قراءة البيانات ، وفك التشفير ، ومعالجة الحوسبة ، وترميز ، وإرسال الردود.)
يحتاج خادم Java Nio فقط إلى بدء مؤشر ترابط خاص للتعامل مع جميع أحداث IO. كيف يتم تنفيذ نموذج الاتصال هذا؟ هاها ، دعونا نستكشف لغزها معًا. تستخدم Java Nio قناة ثنائية الاتجاه لنقل البيانات ، بدلاً من دفق في اتجاه واحد ، ويمكن تسجيل الأحداث ذات الاهتمام على القناة. هناك أربعة أحداث في المجموع:
| اسم الحدث | القيمة المقابلة |
| يتلقى الخادم أحداث اتصال العميل | selectekey.op_accept (16) |
| حدث خادم اتصال العميل | selectekey.op_connect (8) |
| قراءة الأحداث | selectekey.op_read (1) |
| اكتب الأحداث | selectekey.op_write (4) |
يحافظ كل من الخادم والعميل على كائن يدير قناة ، والتي نسميها المحدد ، والتي يمكنها اكتشاف الأحداث على واحدة أو أكثر من القناة. لنأخذ الخادم كمثال. إذا تم تسجيل حدث قراءة على محدد الخادم ، فإن العميل يرسل بعض البيانات إلى الخادم في مرحلة ما. عند حظر الإدخال/الإخراج ، سيتصل طريقة Read () لحظر البيانات ، وسيقوم خادم NIO بإضافة حدث قراءة إلى المحدد. سيقوم مؤشر ترابط معالجة الخادم بالاستطلاع للوصول إلى المحدد. إذا تم العثور على حدث ذو أهمية عند الوصول إلى المحدد ، فسيقوم بمعالجة هذه الأحداث. في حالة عدم وصول أي حدث اهتمام ، سيتم حظر مؤشر ترابط المعالجة حتى وصول الحدث المهم. فيما يلي مخطط تخطيطي لنموذج الاتصال لجافا نيو الذي أفهمه:
اثنين. خادم Java Nio وتطبيق رمز العميل
من أجل فهم جافا نيو بشكل أفضل ، فإن ما يلي هو تطبيق رمز بسيط للخادم والعميل.
الخادم:
حزمة cn.nio ؛ استيراد java.io.ioException ؛ استيراد java.net.inetsocketaddress ؛ استيراد java.nio.bytebuffer ؛ استيراد java.nio.channels.selectionKey ؛ استيراد java.nio.channels.selector ؛ استيراد java.nio.channels.serversocketchannel ؛ استيراد java.nio.channels.socketchannel ؛ استيراد java.util.iterator ؛ /*** NIO Server* Author Small Path*/Public Class Nioserver {// Channel Manager Selector Selector ؛ / ** * احصل على قناة ServersOcket وقم ببعض أعمال التهيئة على المنفذ المنفذ * param * throws ioException */ public void initserver (int port) يلقي ioException {// الحصول على serversocket cannel serversocketchannel serverChannel = serversocketchannel.open () ؛ // قم بتعيين القناة على ServerChannel.ConfigureBlocking (False) ؛ // ربط serversocket المقابلة لهذه القناة إلى منفذ المنفذ serverChannel.socket (). bind (inetsocketaddress (المنفذ) الجديد) ؛ // احصل على مدير قناة this.selector = selector.open () ؛ // ربط مدير القناة بالقناة وتسجيل حدث SelecteKey.op_accept للقناة. بعد تسجيل الحدث ، // عند وصول الحدث ، سيعود Selector.Select (). إذا لم يصل الحدث إلى Selector.Select () سيحظر. ServerChannel.register (Selector ، SelecteKey.op_accept) ؛ } /*** استخدم الاقتراع للاستماع إلى ما إذا كانت هناك أحداث على المحدد يجب معالجتها. إذا كان الأمر كذلك ، فسيتم معالجته * throws ioException */ suppressWarnings ("غير محدد") public void inchist () remows ioException {system.out.println ("ابدأ من جانب الخادم بنجاح!") ؛ // استطلاع للوصول إلى المحدد بينما (صحيح) {// عند وصول الحدث المسجل ، تعود الطريقة ؛ خلاف ذلك ، فإن الطريقة ستواصل حظر Selector.select () ؛ // احصل على Iterator للعنصر المحدد في المحدد ، والعنصر المحدد هو عنصر Iterator المسجل = this.selector.selectedKeys (). iterator () ؛ بينما (ite.hasnext ()) {selectekey key = (selectekey) ite.next () ؛ // حذف المفتاح المحدد لمنع المعالجة المتكررة لـ item.remove () ؛ // يطلب العميل حدث الاتصال إذا كان (key.isacceptable ()) {serversoCketchAnl Server = (serversocketchannel) مفتاح .channel () ؛ // الحصول على القناة للاتصال بقناة SocketchAnnel العميل = server.accept () ؛ // ضبط على قناة غير حظر. // يمكنك إرسال معلومات إلى العميل هنا channel.write (bytebuffer.wrap (سلسلة جديدة ("أرسل رسالة إلى العميل"). getBytes ())) ؛ // بعد نجاح الاتصال مع العميل ، من أجل تلقي معلومات العميل ، تحتاج إلى تعيين أذونات القراءة للقناة. channel.register (this.selector ، selectekey.op_read) ؛ // تم الحصول على حدث قابل للقراءة} آخر if (key.isReadable ()) {read (key) ؛ }}}} / ** * أحداث المعالجة التي تقرأ الرسائل المرسلة من قبل العميل * مفتاح param * throws ioException * / public void read (مفتاح SelecteKey) يلقي ioException {// يمكن للخادم قراءة الرسائل: الحصول على قناة المقبس حيث يحدث الحدث. socketchannel قناة = (socketchannel) key.channel () ؛ // قم بإنشاء مخزن مؤقت للمخزن المؤقت لـ read bytebuffer = bytebuffer.allocate (10) ؛ قناة. القراءة (المخزن المؤقت) ؛ byte [] data = buffer.array () ؛ String msg = new string (data) .trim () ؛ System.out.println ("تلقى الخادم الرسالة:"+msg) ؛ bytebuffer outbuffer = bytebuffer.wrap (msg.getBytes ()) ؛ channel.write (outBuffer) ؛ // أرسل الرسالة مرة أخرى إلى العميل}/ *** ابدأ اختبار الخادم* throws ioException*/ public static void main (string [] args) يلقي ioException {nioserver server = new nioserver () ؛ server.initserver (8000) ؛ server.listen () ؛ }} عميل:
حزمة cn.nio ؛ استيراد java.io.ioException ؛ استيراد java.net.inetsocketaddress ؛ استيراد java.nio.bytebuffer ؛ استيراد java.nio.channels.selectionKey ؛ استيراد java.nio.channels.selector ؛ استيراد java.nio.channels.socketchannel ؛ استيراد java.util.iterator ؛ /*** NIO Client* Author Small Path*/Public Class Nioclient {// Channel Manager Selector Selector ؛ / ** * احصل على قناة مقبس وقم ببعض تهيئة القناة * param IP IP من الخادم المتصل بـ * param رقم المنفذ من الخادم المتصل بـ * throws ioException */ public void initClient (سلسلة IP ، منفذ int) يلقي ioException {// الحصول على قناة socketchannel socketchannel socketnel.open.open () ؛ // قم بتعيين القناة على قناة غير محظورة. // احصل على مدير قناة this.selector = selector.open () ؛ // يتصل العميل بالخادم. في الواقع ، فإن تنفيذ الطريقة لا ينفذ الاتصال. تحتاج إلى الاتصال // استخدام channel.finishConnect () ؛ في طريقة الاستماع () لإكمال قناة الاتصال. // ربط مدير القناة بالقناة وتسجيل حدث SelecteKey.OP_Connect للقناة. channel.register (selector ، selectekey.op_connect) ؛ } /*** استخدم الاقتراع للاستماع إلى ما إذا كانت هناك أحداث على المحدد يجب معالجتها. إذا كان الأمر كذلك ، فسيتم معالجته * throws ioException */ suppressWarnings ("غير محدد") public void stive () يلقي ioException {// الاقتراع للوصول إلى المحدد بينما (صحيح) {celector.select () ؛ // احصل على ITerator للعنصر المحدد في عنصر ITerator المحدد = this.selector.selectedKeys (). iterator () ؛ بينما (ite.hasnext ()) {selectekey key = (selectekey) item.next () ؛ // حذف المفتاح المحدد لمنع المعالجة المتكررة لـ item.remove () ؛ // يحدث حدث الاتصال إذا (key.isconnectable ()) {socketchannel Channel = (socketchannel) key .Channel () ؛ // إذا كان الاتصال متصلًا ، أكمل الاتصال if (channel.isconnectionPending ()) {channel.finishConnect () ؛ } // ضبط على قناة غير حظر. // يمكنك إرسال معلومات إلى قناة Server.write (bytebuffer.wrap (سلسلة جديدة ("أرسل رسالة إلى الخادم"). getBytes ())) ؛ // بعد أن أصبح الاتصال بالخادم ناجحًا ، من أجل تلقي معلومات الخادم ، يجب تعيين القناة لقراءة الأذونات. channel.register (this.selector ، selectekey.op_read) ؛ // تم الحصول على حدث قابل للقراءة} آخر if (key.isReadable ()) {read (key) ؛ }}}} /*** أحداث المعالجة التي تقرأ الرسائل المرسلة بواسطة الخادم* مفتاح param* throws ioException* /public void read (selectekey key) يلقي ioException {// نفس طريقة قراءة الخادم} /** nioclient () ؛ Client.InitClient ("LocalHost" ، 8000) ؛ client.listen () ؛ }}ملخص:
أخيرًا ، تم الانتهاء من تحليل الوكيل الديناميكي وجافا نيو. هاها ، ما يلي هو تحليل الكود المصدر لآلية RPC في Hadoop. عنوان المدونة هو: http://weixiaolu.iteye.com/blog/1504898. ومع ذلك ، إذا كان لديك أي اعتراضات على فهمك لجافا نيو ، فأنت مرحب بك لمناقشتها معًا.
إذا كنت بحاجة إلى إعادة الطباعة ، فيرجى الإشارة إلى المصدر: http://weixiaolu.iteye.com/blog/1479656