ستقدم هذه المقالة من الضحلة إلى العمق من السيرة الذاتية التقليدية إلى NIO إلى AIO ، وسوف تكون مصحوبة بتفسير رمز كامل.
سيتم استخدام مثال في الكود التالي: يرسل العميل سلسلة من المعادلة إلى الخادم ، ويعيد الخادم النتيجة إلى العميل بعد الحساب.
يتم استخدام جميع إرشادات الكود مباشرة كتعليقات ومضمنة في الكود ، والتي قد يكون من الأسهل فهمها عند قراءة الكود. سيتم استخدام فئة الأدوات لحساب النتيجة في الكود ، راجع قسم الكود في المقالة.
المقالات الموصى بها للمعرفة الأساسية ذات الصلة:
مقدمة لنموذج Linux Network I/O (الصور والنص)
تزامن جافا (متعدد الخيوط)
1. البرمجة الحيوية
1.1. البرمجة الحيوية التقليدية
النموذج الأساسي لبرمجة الشبكة هو نموذج C/S ، أي التواصل بين عمليتين.
يوفر الخادم منافذ IP والاستماع. يبدأ العميل طلب اتصال من خلال عنوان تشغيل الاتصال الذي يريد الخادم الاستماع إليه. من خلال ثلاثة مصافحة ، إذا تم إنشاء الاتصال بنجاح ، فيمكن كلا الطرفين التواصل من خلال مآخذ.
في تطوير نموذج حظر المزامنة التقليدية ، تكون ServersOcket مسؤولة عن ربط عناوين IP وبدء منافذ الاستماع ؛ المقبس مسؤول عن بدء عمليات الاتصال. بعد نجاح الاتصال ، يقوم كلا الطرفين بإجراء اتصال متزامن من خلال تدفقات الإدخال والمخرجات.
وصف موجز لنموذج اتصال الخادم الحيوي: عادة ما يكون الخادم الذي يستخدم نموذج الاتصال الحيوي مؤشر ترابط مستقل مستقل مسؤول عن الاستماع إلى اتصال العميل. بعد تلقي طلب اتصال العميل ، يقوم بإنشاء مؤشر ترابط جديد لكل عميل لمعالجة الارتباطات ويفشل في معالجته ، ثم يعيد الرد على العميل من خلال دفق الإخراج ، ويتم تدمير مؤشر الترابط. وهذا هو ، نموذج نموذجي واحد من جديد إلى كل ليلة.
مخطط نموذج الاتصال الحيوي التقليدي:
أكبر مشكلة في هذا النموذج هي أنه يفتقر إلى قدرات التحجيم المرنة. عندما يزداد عدد الوصول المتزامن على العميل ، يتناسب عدد مؤشرات الترابط على الخادم مع عدد الوصول المتزامن على العميل. المواضيع في Java هي أيضا موارد النظام قيمة نسبيا. بعد أن يتوسع عدد الخيوط بسرعة ، سينخفض أداء النظام بشكل حاد. مع استمرار زيادة عدد الوصول ، سيموت النظام في النهاية.
رمز مصدر الخادم الذي تم إنشاؤه بواسطة حظر متزامن I/O:
حزمة com.anxpp.io.calculator.bio ؛ استيراد java.io.ioException ؛ استيراد java.net.serversocket ؛ استيراد java.net.socket ؛ /** * رمز مصدر الخادم الحيوي * Author yangtao__anxpp.com * version 1.0 */الفئة النهائية العامة ServerNormal {// رقم المنفذ الافتراضي الخاص ثابت int default_port = 12345 ؛ // Singleton Serversoction Servers Serversocket Server ؛ // قم بتعيين منفذ الاستماع وفقًا للمعلمات الواردة. إذا لم تكن هناك معلمات ، فاتصل بالطريقة التالية واستخدم القيمة الافتراضية الفراغ الثابتة () start () رمي ioException {// استخدم القيمة الافتراضية (Default_Port) ؛ } // لن يتم الوصول إلى هذه الطريقة في عدد كبير من الطرق المتزامنة ، وليس هناك حاجة للنظر في الكفاءة ، مجرد مزامنة الطريقة التي يتم بها إرجاع الفراغ الثابتة المزامنة بشكل مباشر (منفذ int) {if (server! = null) ؛ جرب {// إنشاء ServersOcket من خلال المُنشئ // إذا كان المنفذ قانونيًا وخمولًا ، فسيستمع الخادم بنجاح. Server = ServersOcket (المنفذ) ؛ System.out.println ("تم بدء الخادم ، رقم المنفذ:" + منفذ) ؛ // استمع إلى اتصالات العميل من خلال الحلقة اللاسلكية // إذا لم يكن هناك وصول عميل ، فسيتم حظره في عملية القبول. بينما (صواب) {socket socket = server.accept () ؛ // عندما يكون هناك وصول جديد للعميل ، سيتم تنفيذ الكود التالي // ثم إنشاء مؤشر ترابط جديد للتعامل مع ارتباط المقبس هذا مؤشر ترابط جديد (ServerHandler الجديد (Socket)). Start () ؛ }} أخيرًا {// بعض أعمال التنظيف اللازمة إذا (server! = null) {system.out.println ("الخادم مغلق.") ؛ server.close () ؛ خادم = فارغ ؛ }}}} رسائل العميل معالجة مؤشر ترابط الرمز المصدر ServerHandler:
حزمة com.anxpp.io.calculator.bio ؛ استيراد java.io.bufferedreader ؛ استيراد java.io.ioException ؛ استيراد java.io.inputstreamreader ؛ استيراد java.io.printwriter ؛ استيراد java.net.socket ؛ استيراد com.anxpp.io.utils.calculator ؛ / *** مؤشر ترابط العميل* Author yangtao__anxpp.com* رابط المقبس للعميل*/ public class serverHandler يطرف Runnable {private socket socket ؛ Public ServerHandler (Socket Socket) {this.socket = socket ؛ } Override public void run () {bufferedReader in = null ؛ printWriter Out = null ؛ جرب {في = جديد bufferedReader (new inputStreamReader (socket.getInputStream ())) ؛ out = new printWriter (socket.getOutputStream () ، true) ؛ تعبير سلسلة نتيجة السلسلة بينما (صواب) {// اقرأ سطرًا من خلال BufferedReader // إذا كنت قد قرأت ذيل دفق الإدخال ، فأرجع إلى الخالية والخروج من الحلقة // إذا حصلت على قيمة غير خالية ، فحاول حساب النتيجة والعودة إذا (التعبير = in.readline ()) == null) System.out.println ("تلقى الخادم رسالة:" + تعبير) ؛ حاول {result = calculator.cal (التعبير) .ToString () ؛ } catch (استثناء e) {result = "calculator.cal (expression) .toString () ؛} catch (استثناء e) {e.printStackTrace () ؛} أخيرًا {// بعض عمل التنظيف اللازم (في! = null) {try {in.close () ؛} catch (ioexception e) { null) {out.close () ؛ رمز مصدر العميل الذي تم إنشاؤه بواسطة حظر متزامن I/O:
حزمة com.anxpp.io.calculator.bio ؛ استيراد java.io.bufferedreader ؛ استيراد java.io.ioException ؛ استيراد java.io.inputstreamreader ؛ استيراد java.io.printwriter ؛ استيراد java.net.socket ؛ /** * العميل الذي تم إنشاؤه عن طريق حظر I/O * Author Yangtao__Anxpp.com * version 1.0 */client client client {// رقم المنفذ الافتراضي الخاص static int default_server_port = 12345 ؛ سلسلة ثابتة خاصة Default_server_ip = "127.0.0.1" ؛ إرسال الفراغ الثابت العام (تعبير السلسلة) {send (default_server_port ، expression) ؛ } إرسال الفراغ الثابت العام (منفذ int ، تعبير السلسلة) {system.out.println ("التعبير الحسابي هو:" + التعبير) ؛ مقبس المقبس = فارغ ؛ BufferedReader في = فارغة ؛ printWriter Out = null ؛ Try {Socket = New Socket (default_server_ip ، port) ؛ في = جديد BufferedReader (New InputStreamReader (socket.getInputStream ())) ؛ out = new printWriter (socket.getOutputStream () ، true) ؛ out.println (التعبير) ؛ System.out.println ("___ النتيجة هي:" + in.readline ()) ؛ } catch (استثناء e) {E.PrintStackTrace () ؛ } أخيرًا {// هي عمل التنظيف اللازم إذا (في! = null) {try {in.close () ؛ } catch (ioException e) {E.PrintStackTrace () ؛ } في = null ؛ } if (out! = null) {out.close () ؛ خارج = فارغ ؛ } if (socket! = null) {try {socket.close () ؛ } catch (ioException e) {E.PrintStackTrace () ؛ } socket = null ؛ }}}} اختبر الرمز ، من أجل تسهيل عرض نتائج الإخراج في وحدة التحكم ، ضعه في نفس البرنامج (JVM) لتشغيله:
حزمة com.anxpp.io.calculator.bio ؛ استيراد java.io.ioException ؛ استيراد java.util.random ؛ /** * طريقة الاختبار * Author yangtao__anxpp.com * version 1.0 */اختبار الفئة العامة {// اختبر الطريقة الرئيسية الرئيسية الفراغ الثابتة الرئيسية (String [] args) يلقي interruptedException {// Run the Server New Thread (new Runnable () { @everride public run () {try {serverbet.start () ؛ E.PrintStackTrace () ؛ // تجنب العميل الذي ينفذ الرمز قبل بدء الخادم ؛ // قم بتشغيل مشغلات char العميل [] = {'+' ، '-' ، '*' ، '/'} ؛ عشوائي عشوائي = جديد عشوائي (System.CurrentTimeMillis ()) ؛ مؤشر ترابط جديد (جديد RunNable () {suppressWarnings ("static-access") Override public void run () {بينما (true) {// random يولد سلسلة التعبير الحسابية = عشوائي. Thread.CurrentTher (). Sleep (Random.NextInt (1000)) ؛ }}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}} }}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}} } } } } } } } } { خيط. CurrentThread (). Sleep (Random.NextInt (1000)) ؛ } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } { خيط. CurrentThread (). Sleep (Random.NextInt (1000)) ؛ }}}}}}}} نتائج أحد أشواط:
لقد تم بدء تشغيل الخادم ، رقم المنفذ: 12345 تعبير الحساب هو: 4-2 خادم تلقى الرسالة: 4-2 ___ النتيجة هي: 2 التعبير الحسابي هو: 5-10 خادم تلقى الرسالة: 5-10__ النتيجة هي: -5 الحساب التعبير هو: 0+6 ‘الخادم استلم الرسالة: 0-9__ النتيجة: -9 تعبير عن الحساب هو: 0+6 خادم. تلقيت الرسالة: 1/6__ النتيجة هي: 0.16666666666666666666666666666666666666666666666666666666666666666666666666666666 666666666666666666666666666666666666666666666666666666666666666666666666666666666666 666666666666666666666666666666666666666666666666666666666666666666666666666666666666 666666666666666666666666666666666666666666666666666666666666666666666666666666666666
من الكود أعلاه ، من السهل أن نرى أن المشكلة الرئيسية للسيرة الذاتية هي أنه كلما طلب العميل الجديد وصولًا ، يجب على الخادم إنشاء مؤشر ترابط جديد للتعامل مع هذا الرابط ، والذي لا يمكن تطبيقه في سيناريوهات حيث تكون هناك حاجة إلى أداء عالي وتزامن عالٍ (سيؤثر عدد كبير من المواضيع الجديدة على أداء الخادم بشكل خطير وحتى الإضراب).
1.2. pseudo-ashychronous برمجة I/O.
لتحسين هذا النموذج الذي يتصل به الاتصال الفردي ، يمكننا استخدام تجمع مؤشرات ترابط لإدارة هذه المواضيع (لمزيد من المعلومات ، يرجى الرجوع إلى المقالة المقدمة مسبقًا) ، وتنفيذ نموذج لواحد أو أكثر لمعالجة العملاء (لكن الطبقة الأساسية لا تزال تستخدم الحظر المتزامن I/O) ، والتي تسمى غالبًا "نموذج I/O-O-O-O-O-O-O".
مخطط نموذج I/O المتزامن الزائفة:
التنفيذ بسيط للغاية. نحتاج فقط إلى تسليم مؤشر الترابط الجديد إلى إدارة تجمع مؤشرات الترابط ، وتغيير رمز الخادم الآن فقط:
حزمة com.anxpp.io.calculator.bio ؛ استيراد java.io.ioException ؛ استيراد java.net.serversocket ؛ استيراد java.net.socket ؛ استيراد java.util.concurrent.executorservice ؛ استيراد java.util.concurrent.executors ؛ /** * Bio Server Sourd Code__pseudo-ashynchronous I/O * Author Yangtao__anxpp.com * version 1.0 */Public Class Class Serverbetter {// رقم المنفذ الافتراضي الثابت int default_port = 12345 ؛ // Singleton Serversoction Servers Serversocket Server ؛ . // قم بتعيين منفذ الاستماع وفقًا للمعلمات الواردة. إذا لم يكن هناك معلمة ، فاتصل بالطريقة التالية واستخدم القيمة الافتراضية الفراغ الثابتة () start () remexception {// استخدم القيمة الافتراضية (default_port) ؛ } // لن يتم الوصول إلى هذه الطريقة في عدد كبير من التزامن ، وليس هناك حاجة للنظر في الكفاءة ، مجرد مزامنة الطريقة التي يتم بها إرجاع الفراغ الثابتة المزامنة بشكل مباشر (منفذ int) {if (server! = null) ؛ جرب {// إنشاء ServersOcket من خلال المُنشئ // إذا كان المنفذ قانونيًا وخمولًا ، فسيستمع الخادم بنجاح. Server = ServersOcket (المنفذ) ؛ System.out.println ("تم بدء الخادم ، رقم المنفذ:" + منفذ) ؛ // superce اتصال العميل من خلال حلقة لاسلكية // إذا لم يكن هناك وصول عميل ، فسيتم حظره في عملية القبول. بينما (صواب) {socket socket = server.accept () ؛ // عندما يكون هناك وصول جديد للعميل ، سيتم تنفيذ الكود التالي // ثم إنشاء مؤشر ترابط جديد لمعالجة Socket Link SeviceSorService.execute (ServerHandler (Socket) الجديد) ؛ }} أخيرًا {// بعض أعمال التنظيف اللازمة إذا (server! = null) {system.out.println ("الخادم مغلق.") ؛ server.close () ؛ خادم = فارغ ؛ }}}} نتائج تشغيل الاختبار هي نفسها.
نحن نعلم أننا إذا استخدمنا تجمع مؤشرات ترابط CachedThreadPool (لا يوجد حد لعدد مؤشرات الترابط ، إن لم يكن واضحًا ، فيرجى الرجوع إلى المقالة المقدمة في بداية المقالة) ، في الواقع ، بالإضافة إلى مساعدتنا تلقائيًا على إدارة المواضيع (إعادة الاستخدام) ، يبدو أيضًا وكأنه نموذج عميل 1: 1: عدد المواضيع. باستخدام FlexThreadPool ، نتحكم بشكل فعال في الحد الأقصى لعدد المواضيع ، وضمان التحكم في الموارد المحدودة للنظام ، وتنفيذ نموذج I/O المتزامن n: m.
ومع ذلك ، نظرًا لأن عدد المواضيع محدودة ، في حالة حدوث عدد كبير من الطلبات المتزامنة ، لا يمكن أن تنتظر المواضيع التي تتجاوز الحد الأقصى للرقم إلا حتى يكون هناك مؤشرات ترابط مجانية في تجمع الخيوط الذي يمكن إعادة استخدامه. عند قراءة دفق إدخال المقبس ، سيتم حظره حتى يحدث:
لذلك ، عندما تكون قراءة البيانات بطيئة (مثل كمية كبيرة من البيانات ، ونقل الشبكة البطيء ، وما إلى ذلك) وكميات كبيرة من التزامن ، لا يمكن انتظار رسائل الوصول الأخرى إلا في كل وقت ، وهو أكبر عيب.
يمكن لـ NIO التي سيتم تقديمها لاحقًا حل هذه المشكلة.
2. برمجة NIO
يتم تقديم مكتبة Java I/O الجديدة في حزمة Java.nio.* في JDK 1.4 ، بغرض زيادة السرعة. في الواقع ، تم إعادة تنفيذ حزمة I/O "القديمة" باستخدام NIO ، ويمكننا الاستفادة منها حتى لو لم نستخدم برمجة NIO بشكل صريح. يمكن أن تحدث تحسينات السرعة في كل من ملف الإدخال/الإخراج والشبكة I/O ، لكن هذه المقالة تناقش فقط الأخير.
2.1. مقدمة
إننا نفكر عمومًا في NIO على أنه I/O جديد (الاسم الرسمي أيضًا) ، لأنه جديد على مكتبة I/O القديمة (في الواقع تم تقديمه في JDK 1.4 ، لكن هذا الاسم سيستمر في استخدامه لفترة طويلة ، حتى لو كانوا "قديمة" الآن ، لذلك يذكرنا أيضًا بأننا بحاجة إلى التفكير في ذلك بعناية عند النام) ، ويتم إجراء تغييرات كبيرة. ومع ذلك ، فإنه يطلق عليه I/O من قبل العديد من الأشخاص ، أي I/O غير المحظور ، لأن هذا يسمى ، يمكن أن يعكس خصائصه بشكل أفضل. لا يشير NIO في النص التالي إلى مكتبة I/O الجديدة بالكامل ، ولكنها لا تمنع I/O.
يوفر NIO تطبيقين مختلفين لقناة Socket: Socketchannel و Serversocketchannel المقابل للمقبس والخدم في النموذج الحيوي التقليدي.
تدعم كلتا القنوات المضافة حديثًا أوضاع الحظر وغير الحظر.
يعد استخدام وضع الحظر أمرًا بسيطًا مثل الدعم التقليدي ، لكن الأداء والموثوقية ليسا جيدًا ؛ وضع عدم الحظر هو عكس ذلك بالضبط.
بالنسبة للتطبيقات المنخفضة الحمولة ، يمكن استخدام I/O المتزامن لتحسين معدل التطوير وصيانة أفضل ؛ بالنسبة لتطبيقات التحميل العالي (الشبكة) ، يجب استخدام وضع عدم الحظر لـ NIO للتطوير.
سيتم تقديم المعرفة الأساسية أولاً أدناه.
2.2. المخزن المؤقت العازلة
المخزن المؤقت هو كائن يحتوي على بعض البيانات التي يجب كتابتها أو قراءتها.
في مكتبة NIO ، تتم معالجة جميع البيانات في المخزن المؤقت. عند قراءة البيانات ، تتم قراءتها مباشرة في المخزن المؤقت ؛ عند كتابة البيانات ، تتم كتابتها أيضًا في المخزن المؤقت. في أي وقت يمكنك الوصول إلى البيانات في NIO ، يتم تشغيله من خلال المخزن المؤقت.
المخزن المؤقت هو في الواقع صفيف ويوفر معلومات مثل الوصول المنظم إلى البيانات والحفاظ على مواقع القراءة والكتابة.
مناطق ذاكرة التخزين المؤقت المحددة هي: Bytebuffe و Charbuffer و Shortbuffer و Intbuffer و Longbuffer و Floatbuffer و DoubleBuffer. تنفذ نفس الواجهة: المخزن المؤقت.
2.3. قناة
يجب تمرير قراءتنا وكتابة البيانات عبر القناة ، والتي تشبه أنبوب المياه ، قناة. الفرق بين القناة والتيار هو أن القناة ثنائية الاتجاه ويمكن استخدامها لعمليات القراءة والكتابة والقراءة والكتابة في وقت واحد.
تكون قنوات نظام التشغيل الأساسي بشكل عام كاملة ، لذلك يمكن للقناة المزدوجة الكاملة أن تقوم بتخطيط واجهة برمجة التطبيقات لنظام التشغيل الأساسي بشكل أفضل من الدفق.
تنقسم القنوات بشكل أساسي إلى فئتين:
يعد SoveroCketchAnnel و Socketchannel الذي سيشارك في الكود التالي فئات فرعية من SelectableChannel.
2.4. محدد مضاعف
المحدد هو أساس برمجة Java NIO.
يوفر المحدد القدرة على اختيار مهام جاهزة: سيقوم المحدد باستمرار بالاستطلاع القناة المسجلة عليها. في حالة حدوث حدث قراءة أو كتابة على قناة ، ستكون القناة في الحالة الجاهزة وسيتم استطلاعها من قبل المحدد. بعد ذلك ، يمكن الحصول على مجموعة القنوات الجاهزة من خلال مفتاح التحديد لأداء عمليات الإدخال/الإخراج اللاحقة.
يمكن للمحدد استطلاع قنوات متعددة في نفس الوقت ، لأن JDK يستخدم EPOLL () بدلاً من تطبيق SELECT التقليدي ، لا يوجد حد على مقبض الاتصال القصوى 1024/2048. لذلك ، لا يلزم سوى موضوع واحد ليكون مسؤولاً عن استقصاء المحدد ، ويمكنه الوصول إلى الآلاف من العملاء.
2.5. خادم NIO
يبدو الرمز أكثر تعقيدًا من برمجة المقبس التقليدية.
ما عليك سوى الصق الرمز وإعطاء وصف الرمز في شكل تعليقات.
رمز مصدر الخادم الذي تم إنشاؤه بواسطة NIO:
حزمة com.anxpp.io.calculator.nio ؛ خادم الفئة العامة {private static int default_port = 12345 ؛ static static serverhandle serverHandle ؛ public static void start () {start (default_port) ؛ } start void static static static start (int port) {if (serverHandle! = null) serverHandle.stop () ؛ ServerHandle = New ServerHandle (port) ؛ مؤشر ترابط جديد (ServerHandle ، "Server"). Start () ؛ } public static void main (string [] args) {start () ؛ }} ServerHandle:
حزمة com.anxpp.io.calculator.nio ؛ استيراد java.io.ioException ؛ استيراد java.net.inetsocketaddress ؛ استيراد java.nio.bytebuffer ؛ استيراد java.nio.channels.selectionKey ؛ استيراد java.nio.channels.selector ؛ استيراد java.nio.channels.serversocketchannel ؛ استيراد java.nio.channels.socketchannel ؛ استيراد java.util.iterator ؛ استيراد java.util.set ؛ استيراد com.anxpp.io.utils.calculator ؛ / ** * NIO Server * Author yangtao__anxpp.com * version 1.0 */ public class serverHandle تنفذ Runnable {private selector selector ؛ خوادم serverChannel خوادم serverChannel ؛ بدأت منطقية متقلبة خاصة ؛ /*** Constructor* Param Port حدد رقم المنفذ المراد استمع إليه إلى*/Public ServerHandle (int port) {try {// Create Selector = celector.open () ؛ // افتح قناة الاستماع serverChannel = serversocketchannel.open () ؛ // إذا كان ذلك صحيحًا ، فسيتم وضع هذه القناة في وضع الحظر ؛ إذا كان خطأ ، فسيتم وضع هذه القناة في وضع عدم الحظر ServerChannel.ConfigureBlocking (false) ؛ // تمكين وضع عدم الحظر // يتم تعيين تراكم منفذ ربط على 1024 ServerChannel.socket (). Bind (New InetSocketaddress (Port) ، 1024) ؛ // طلب اتصال العميل Superce ServerChannel.register (Selector ، SelecteKey.op_accept) ؛ // MARK تم تمكين الخادم بدءا = صحيح ؛ System.out.println ("تم بدء الخادم ، رقم المنفذ:" + منفذ) ؛ } catch (ioException e) {E.PrintStackTrace () ؛ System.exit (1) ؛ }} public void stop () {chation = false ؛ } Override public void run () {// loop من خلال المحدد بينما (بدأ) {try {// ما إذا كان هناك حدث قراءة وكتابة ، فإن المحدد يتجاهل كل 1S Selector.select (1000) ؛ // الحظر ، سوف يستمر فقط عند حدوث حدث واحد مسجل على الأقل. // selector.select () ؛ SET <SelecteKey> KEYS = SECECTURCE.SelectedKeys () ؛ iterator <selectekey> it = keys.iterator () ؛ مفتاح SelecteKey = NULL ؛ بينما (it.hasnext ()) {key = it.next () ؛ it.remove () ؛ حاول {handleinput (مفتاح) ؛ } catch (استثناء e) {if (key! = null) {key.cancel () ؛ if (key.channel ()! = null) {key.channel (). close () ؛ }}}}}}}}} catch (throwable t) {t.printStackTrace () ؛ }} // بعد إغلاق المحدد ، سيتم إصدار الموارد المدارة تلقائيًا إذا (Selector! = null) حاول {celector.close () ؛ } catch (استثناء e) {E.PrintStackTrace () ؛ }} private void Generation (مفتاح SelecteKey) يلقي ioException {if (key.isvalid ()) {// معالجة رسالة الطلب للوصول الجديد إذا (key.isacceptable ()) {serversocketchannel ssc = (serversocketchannel) key.channel () ؛ . socketchannel sc = ssc.accept () ؛ // ضبط على SC.ConfigureBlocking غير المحظور (false) ؛ // register as read sc.register (selector ، selectekey.op_read) ؛ } // قراءة الرسالة if (key.isReadable ()) {socketchannel sc = (socketchannel) key.channel () ؛ // إنشاء bytebuffer وافتح 1M Buffer Bytebuffer Buffer = bytebuffer.allocate (1024) ؛ // قراءة دفق الطلب وأرجع عدد البايتات قراءة int readBytes = sc.read (buffer) ؛ // قراءة البايت والرمز البايت إذا (readBytes> 0) {// قم بتعيين الحد الحالي للمخزن المؤقت إلى الموضع = 0 ، لعمليات القراءة اللاحقة لـ buffer.flip () ؛ // قم بإنشاء صفيف بايت يعتمد على عدد بايت بايت قابل للقراءة العازلة = new byte [buffer.remaining ()] ؛ // انسخ صفيف البايت القابل للقراءة في المخزن المؤقت إلى buffer.get تم إنشاؤه حديثًا (بايت) ؛ تعبير السلسلة = سلسلة جديدة (بايت ، "UTF-8") ؛ System.out.println ("تلقى الخادم رسالة:" + تعبير) ؛ // معالجة سلسلة بيانات البيانات = فارغة ؛ حاول {result = calculator.cal (التعبير) .ToString () ؛ } catch (استثناء e) {result = "خطأ في الحساب:" + e.getMessage () ؛ } // إرسال رسالة الرد dowrite (SC ، النتيجة) ؛ } // no bytes قراءة وتجاهل // else if (readBytes == 0) ؛ // تم إغلاق الرابط ، مع تحرير المورد الآخر إذا (readBytes <0) {key.cancel () ؛ Sc.Close () ؛ }}}}} // أرسل رسالة الرد بشكل غير متزامن باطلب dowrite (قناة SocketchAnnel ، استجابة السلسلة) يلقي ioException {// ترميز الرسالة كبايت بايت بايت []] bytes = response.getBytes () ؛ // إنشاء bytebuffer وفقًا لسعة الصفيف bytebuffer writeBuffer = bytebuffer.allocate (bytes.length) ؛ // انسخ صفيف البايت إلى المخزن المؤقت trintbuffer.put (بايت) ؛ // flip operation trustbuffer.flip () ؛ // إرسال مجموعة البايت من channel.write (writebuffer) ؛ // ***** لم يتم تضمين رمز معالجة "كتابة نصف حزمة" هنا}}كما ترون ، فإن الخطوات الرئيسية لإنشاء خادم NIO هي كما يلي:
نظرًا لأن رسالة الاستجابة يتم إرسالها ، فإن Socketchannel غير متزامن وغير محظور ، لذلك لا يمكن ضمان إمكانية إرسال البيانات التي يجب إرسالها في وقت واحد ، وستكون هناك مشكلة في كتابة نصف حزمة في هذا الوقت. نحتاج إلى تسجيل عملية كتابة ، واتخاذ استطلاع المحدد باستمرار لإرسال الرسائل غير المرئية ، ثم استخدام طريقة Hasremain () للمخزن المؤقت لتحديد ما إذا تم إرسال الرسالة.
2.6. عميل NIO
من الأفضل تحميل الرمز. لا تتطلب العملية الكثير من التفسير ، فهي تشبه إلى حد ما رمز الخادم.
عميل:
حزمة com.anxpp.io.calculator.nio ؛ عميل الفئة العامة {private static String Default_Host = "127.0.0.1" ؛ static int int default_port = 12345 ؛ خاص clientHandle ClientHandle ؛ public static void start () {start (default_host ، default_port) ؛ } start void static static static start (سلسلة IP ، منفذ int) {if (clientHandle! = null) clientHandle.stop () ؛ ClientHandle = New ClientHandle (IP ، Port) ؛ مؤشر ترابط جديد (ClientHandle ، "Server"). Start () ؛ }. ClientHandle.sendmsg (msg) ؛ العودة صحيح. } public static void main (string [] args) {start () ؛ }} ClientHandle:
حزمة com.anxpp.io.calculator.nio ؛ استيراد java.io.ioException ؛ استيراد java.net.inetsocketaddress ؛ استيراد java.nio.bytebuffer ؛ استيراد java.nio.channels.selectionKey ؛ استيراد java.nio.channels.selector ؛ استيراد java.nio.channels.socketchannel ؛ استيراد java.util.iterator ؛ استيراد java.util.set ؛ / ** * NIO Client * Author yangtao__anxpp.com * version 1.0 */ public clientHandle تنفذ Runnable {private string host ؛ منفذ الباحث الخاص ؛ محدد المحدد الخاص ؛ خاص Socketchannel Socketchannel ؛ بدأت منطقية متقلبة خاصة ؛ Public ClientHandle (سلسلة IP ، منفذ int) {this.host = ip ؛ this.port = port ؛ حاول {// إنشاء محدد محدد = celector.open () ؛ // افتح قناة الاستماع socketchannel = socketchannel.open () ؛ // إذا كان ذلك صحيحًا ، فسيتم وضع هذه القناة في وضع الحظر ؛ إذا كان خطأ ، فسيتم وضع هذه القناة في وضع عدم الحظر Socketchannel.ConfigureBlocking (false) ؛ // فتح وضع عدم الحظر = صحيح ؛ } catch (ioException e) {E.PrintStackTrace () ؛ System.exit (1) ؛ }} public void stop () {chation = false ؛ } Override public void run () {try {doconnect () ؛ } catch (ioException e) {E.PrintStackTrace () ؛ System.exit (1) ؛ } // حلقة من خلال المحدد بينما (بدأ) {try {// بغض النظر عما إذا كان هناك حدث قراءة وكتابة ، يتم استيقاظ المحدد كل 1S Selector.select (1000) ؛ // الحظر ، وسيستمر فقط عند حدوث حدث واحد على الأقل. // selector.select () ؛ SET <SelecteKey> KEYS = SECECTURCE.SelectedKeys () ؛ iterator <selectekey> it = keys.iterator () ؛ مفتاح SelecteKey = NULL ؛ بينما (it.hasnext ()) {key = it.next () ؛ it.remove () ؛ حاول {handleinput (مفتاح) ؛ } catch (استثناء e) {if (key! = null) {key.cancel () ؛ if (key.channel ()! = null) {key.channel (). close () ؛ }}}}}}}} catch (استثناء E) {E.PrintStackTrace () ؛ System.exit (1) ؛ }} // بعد إغلاق المحدد ، سيتم إصدار الموارد المدارة تلقائيًا إذا (Selector! = null) حاول {celector.close () ؛ } catch (استثناء e) {E.PrintStackTrace () ؛ }} private void handleinput (SelecteKey Key) يلقي ioException {if (key.isvalid ()) {socketchannel sc = (socketchannel) key.channel () ؛ if (key.isconnectable ()) {if (sc.finishConnect ()) ؛ else system.exit (1) ؛ } // اقرأ الرسالة if (key.isReadable ()) {// إنشاء bytebuffer وافتح 1M Buffer Bytebuffer Buffer = bytebuffer.allocate (1024) ؛ // اقرأ دفق رمز الطلب وأرجع عدد البايتينات القراءة int readBytes = sc.read (buffer) ؛ // قراءة البايت والرمز البايت إذا (readBytes> 0) {// قم بتعيين الحد الحالي للمخزن المؤقت إلى الموضع = 0 ، لعمليات القراءة اللاحقة لـ buffer.flip () ؛ // إنشاء صفيف بايت استنادًا إلى عدد البايتات القابلة للقراءة في البايت المخزن المؤقت [] بايت = بايت جديد [buffer.remaining ()] ؛ // انسخ صفيف البايت القابل للقراءة في المخزن المؤقت إلى buffer.get تم إنشاؤه حديثًا (بايت) ؛ سلسلة السلسلة = سلسلة جديدة (بايت ، "UTF-8") ؛ System.out.println ("تلقى العميل رسالة:" + نتيجة) ؛ } // no bytes read يتم تجاهلها // else if (readBytes == 0) ؛ // تم إغلاق الرابط ، مع تحرير المورد الآخر إذا (readBytes <0) {key.cancel () ؛ Sc.Close () ؛ }}}}} // إرسال رسائل غير متزامنة بشكل غير متزامن void dowrite (قناة socketchannel ، طلب السلسلة) يلقي ioException {// ترميز الرسالة كبايت بايت بايت []] bytes = request.getbytes () ؛ // إنشاء bytebuffer بناءً على سعة الصفيف bytebuffer writebuffer = bytebuffer.allocate (bytes.length) ؛ // نسخ صفيف البايت إلى المخزن المؤقت trintbuffer.put (بايت) ؛ // flip operation trustbuffer.flip () ؛ // أرسل قناة Byte Array.Write (WriteBuffer) ؛ // ***** لم يتم تضمين رمز معالجة "كتابة نصف حزمة" هنا} private void doconnect () رمي ioException {if (socketchannel.connect (new inetsocketaddress (مضيف ، منفذ))) ؛ else Socketchannel.register (Selector ، SelecteKey.OP_Connect) ؛ } public void sendmsg (String msg) يلقي استثناء {socketchannel.register (المحدد ، selectekey.op_read) ؛ Dowrite (Socketchannel ، MSG) ؛ }} 2.7. نتائج التوضيح
قم بتشغيل الخادم أولاً وقم بتشغيل عميل بالمناسبة:
حزمة com.anxpp.io.calculator.nio ؛ استيراد java.util.scanner ؛ /** * طريقة الاختبار * Auuthor yangtao__anxpp.com * version 1.0 */اختبار الفئة العامة {// اختبار الطريقة الرئيسية suppressWarnings ("Resource") الفراغ الثابت العام (String [] args) يلقي استثناء {// Run server.start () ؛ // تجنب العميل الذي ينفذ مؤشر ترابط الكود. sleep (100) ؛ // Run Client.start () ؛ بينما (client.sendmsg (ماسح ضوئي جديد (system.in) .nextLine ())) ؛ }} يمكننا أيضًا تشغيل العميل بشكل منفصل ، والآثار هي نفسها.
نتائج الاختبار:
تم بدء تشغيل الخادم ، رقم المنفذ: 123451+2+3+4+5+6 استلم الخادم الرسالة: 1+2+4+4+5+6 استلم العميل الرسالة: 211*2/3-4+5*6/7-8 تلقى الخادم الرسالة: 1*2/3-4+5*6/7-7.
لا توجد مشكلة في تشغيل العديد من العملاء.
3. برمجة AIO
يقدم NIO 2.0 مفهوم القنوات غير المتزامنة الجديدة ويوفر تطبيقات لقنوات الملفات غير المتزامنة وقنوات المقبس غير المتزامن.
قناة المقبس غير المتزامن هي I/O غير متزامنة غير متزامنة حقًا ، والتي تتوافق مع I/O (AIO) التي تعتمد على الحدث في برمجة شبكة UNIX. لا يتطلب الأمر الكثير من المحددات لاستطلاع القنوات المسجلة لتحقيق القراءة والكتابة غير المتزامنة ، وبالتالي تبسيط نموذج البرمجة NIO.
فقط قم بتحميل الكود.
3.1. رمز جانب الخادم
الخادم:
حزمة com.anxpp.io.calculator.aio.server ؛ / ** * AIO Server * Author yangtao__anxpp.com * version 1.0 */ public class server {private static int default_port = 12345 ؛ خاص ثابت asyncserverhandler serverHandle ؛ Publatile Static Static Clientcount = 0 ؛ public static void start () {start (default_port) ؛ } start void static static static start (int port) {if (serverHandle! = null) return ؛ serverHandle = جديد asyncserverHandler (port) ؛ مؤشر ترابط جديد (ServerHandle ، "Server"). Start () ؛ } public static void main (string [] args) {server.start () ؛ }} AsyncserverHandler:
حزمة com.anxpp.io.calculator.aio.server ؛ استيراد java.io.ioException ؛ استيراد java.net.inetsocketaddress ؛ استيراد java.nio.channels.asyncserversocketchannel ؛ استيراد java.util.concurrent.countdownlatch ؛ الطبقة العامة asyncserverhandler تنفذ Runnable {public countdownlatch latch ؛ قناة Asyncserversocketchannel العامة ؛ Public AsyncServerHandler (int port) {try {// إنشاء قناة خادم = asynchronousserversocketchannel.open () ؛ // bind Port Channel.bind (New InetSocketAddress (port)) ؛ System.out.println ("تم بدء الخادم ، رقم المنفذ:" + منفذ) ؛ } catch (ioException e) {E.PrintStackTrace () ؛ }} Override public void run () {// countdownlatch initialization // وظيفته: السماح للحقل الحالي بحظر كل الوقت قبل إكمال مجموعة من العمليات التي يتم تنفيذها // هنا ، دع حظر الحقل هنا لمنع الخادم من الخروج بعد التنفيذ // يمكنك أيضًا استخدام (true)+sleep // بيئة التوليد لا تحتاج إلى القلق بشأن مشكلة هذا الخادم. // connection channel.accept (هذا ، New entcethandler ()) ؛ حاول {latch.await () ؛ } catch (interruptedException e) {E.PrintStackTrace () ؛ }}} مقبله:
حزمة com.anxpp.io.calculator.aio.server ؛ استيراد java.nio.bytebuffer ؛ استيراد java.nio.channels.asynchronoussocketchannel ؛ استيراد java.nio.channels.completionHandler ؛ . System.out.println ("عدد العملاء المتصلين:" + server.clientCount) ؛ serverHandler.channel.accept(serverHandler, this); //Create a new Buffer ByteBuffer buffer = ByteBuffer.allocate(1024); //Asynchronously read the third parameter for the service that receives message callbacks Handler channel.read(buffer, buffer, new ReadHandler(channel)); } @Override public void failed(Throwable exc, AsyncServerHandler serverHandler) { exc.printStackTrace(); serverHandler.latch.countDown(); }} ReadHandler:
package com.anxpp.io.calculator.aio.server; استيراد java.io.ioException ؛ import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import com.anxpp.io.utils.Calculator; public class ReadHandler implements CompletionHandler<Integer, ByteBuffer> { //Used to read semi-packet messages and send answers private AsynchronousSocketChannel channel; public ReadHandler(AsynchronousSocketChannel channel) { this.channel = channel; } // Processing after reading the message @Override public void completed(Integer result, ByteBuffer attachment) { //flip operation attachment.flip(); //According to byte[] message = new byte[attachment.remaining()]; attachment.get(message); try { String expression = new String(message, "UTF-8"); System.out.println("The server received a message: " + expression); String calrResult = null; try{ calrResult = Calculator.cal(expression).toString(); } catch(Exception e){ calrResult = "Calculator error: " + e.getMessage(); } //Send a message doWrite(calrResult); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } //Send a message private void doWrite(String result) { byte[] bytes = result.getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); writeBuffer.put(bytes); writeBuffer.flip(); //Asynchronous write data parameters are the same as the previous read channel.write(writeBuffer, writeBuffer,new CompletionHandler<Integer, ByteBuffer>() { @Override public void completed(Integer result, ByteBuffer buffer) { //If it is not sent, continue to send until it is completed (buffer.hasRemaining()) channel.write(buffer, buffer, this); else{ //Create a new Buffer ByteBuffer readBuffer = ByteBuffer.allocate(1024); //Asynchronously read the third parameter for the business that receives message callbacks. Handler channel.read(readBuffer, readBuffer, new ReadHandler(channel)); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { channel.close(); } catch (IOException e) { } } }); } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { this.channel.close(); } catch (IOException e) { e.printStackTrace(); }}} OK,这样就已经完成了,其实说起来也简单,虽然代码感觉很多,但是API比NIO的使用起来真的简单多了,主要就是监听、读、写等各种CompletionHandler。此处本应有一个WriteHandler的,确实,我们在ReadHandler中,以一个匿名内部类实现了它。
下面看客户端代码。
3.2、Client端代码
عميل:
package com.anxpp.io.calculator.aio.client; import java.util.Scanner; public class Client { private static String DEFAULT_HOST = "127.0.0.1"; private static int DEFAULT_PORT = 12345; private static AsyncClientHandler clientHandle; public static void start(){ start(DEFAULT_HOST,DEFAULT_PORT); } public static synchronized void start(String ip,int port){ if(clientHandle!=null) return; clientHandle = new AsyncClientHandler(ip,port); new Thread(clientHandle,"Client").start(); } //Send a message to the server public static boolean sendMsg(String msg) throws Exception{ if(msg.equals("q")) return false; clientHandle.sendMsg(msg); return true; } @SuppressWarnings("resource") public static void main(String[] args) throws Exception{ Client.start(); System.out.println("Please enter the request message:"); Scanner scanner = new Scanner(System.in); while(Client.sendMsg(scanner.nextLine())); }} AsyncClientHandler:
package com.anxpp.io.calculator.aio.client; استيراد java.io.ioException ؛ import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.AsyncronousSocketChannel; import java.nio.channels.CompletionHandler; import java.util.concurrent.CountDownLatch; public class AsyncClientHandler implements CompletionHandler<Void, AsyncClientHandler>, Runnable { private AsynchronousSocketChannel clientChannel; private String host; منفذ الباحث الخاص ؛ private CountDownLatch latch; public AsyncClientHandler(String host, int port) { this.host = host; this.port = port; try { //Create an asynchronous client channel clientChannel = AsynchronousSocketChannel.open(); } catch (IOException e) { e.printStackTrace(); } } @Override public void run() { //Create CountDownLatch and wait latch = new CountDownLatch(1); //Initiate an asynchronous connection operation, the callback parameter is this class itself. If the connection is successful, the completed method clientChannel.connect(new InetSocketAddress(host, port), this, this); try { latch.await(); } catch (InterruptedException e1) { e1.printStackTrace(); } try { clientChannel.close(); } catch (IOException e) { e.printStackTrace(); } } //Connect the server successfully// means TCP completes three handshakes @Override public void completed(Void result, AsyncClientHandler attachment) { System.out.println("Client connection to the server..."); } //Connecting to the server failed @Override public void failed(Throwable exc, AsyncClientHandler attachment) { System.err.println("Connecting to the server failed..."); exc.printStackTrace(); try { clientChannel.close(); latch.countDown(); } catch (IOException e) { e.printStackTrace(); } } //Send a message to the server public void sendMsg(String msg){ byte[] req = msg.getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(req.length); writeBuffer.put(req); writeBuffer.flip(); //Asynchronously write clientChannel.write(writeBuffer, writeBuffer,new WriteHandler(clientChannel, latch)); }} WriteHandler:
package com.anxpp.io.calculator.aio.client; استيراد java.io.ioException ؛ import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.util.concurrent.CountDownLatch; public class WriteHandler implements CompletionHandler<Integer, ByteBuffer> { private AsynchronousSocketChannel clientChannel; private CountDownLatch latch; public WriteHandler(AsynchronousSocketChannel clientChannel,CountDownLatch latch) { this.clientChannel = clientChannel; this.latch = latch; } @Override public void completed(Integer result, ByteBuffer buffer) { //Complete writing of all data if (buffer.hasRemaining()) { clientChannel.write(buffer, buffer, this); } else { //Read data ByteBuffer readBuffer = ByteBuffer.allocate(1024); clientChannel.read(readBuffer,readBuffer,new ReadHandler(clientChannel, latch)); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { System.err.println("Data send failed..."); try { clientChannel.close(); latch.countDown(); } catch (IOException e) { } } } ReadHandler:
package com.anxpp.io.calculator.aio.client; استيراد java.io.ioException ؛ import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.util.concurrent.CountDownLatch; public class ReadHandler implements CompletionHandler<Integer, ByteBuffer> { private AsynchronousSocketChannel clientChannel; private CountDownLatch latch; public ReadHandler(AsynchronousSocketChannel clientChannel,CountDownLatch latch) { this.clientChannel = clientChannel; this.latch = latch; } @Override public void completed(Integer result,ByteBuffer buffer) { buffer.flip(); byte[] bytes = new byte[buffer.remaining()]; buffer.get(bytes); String body; try { body = new String(bytes,"UTF-8"); System.out.println("Client received result:"+ body); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } @Override public void failed(Throwable exc,ByteBuffer attachment) { System.err.println("Data read failed..."); try { clientChannel.close(); latch.countDown(); } catch (IOException e) { } } }这个API使用起来真的是很顺手。
3.3. امتحان
امتحان:
package com.anxpp.io.calculator.aio; import java.util.Scanner; import com.anxpp.io.calculator.aio.client.Client; import com.anxpp.io.calculator.aio.server.Server; /** * Test method* @author yangtao__anxpp.com * @version 1.0 */ public class Test { //Test main method @SuppressWarnings("resource") public static void main(String[] args) throws Exception{ //Run server Server.start(); //Avoid the client executing the code Thread.sleep(100); //Run client Client.start(); System.out.println("Please enter the request message:"); Scanner scanner = new Scanner(System.in); while(Client.sendMsg(scanner.nextLine())); }}我们可以在控制台输入我们需要计算的算数字符串,服务器就会返回结果,当然,我们也可以运行大量的客户端,都是没有问题的,以为此处设计为单例客户端,所以也就没有演示大量客户端并发。
读者可以自己修改Client类,然后开辟大量线程,并使用构造方法创建很多的客户端测试。
下面是其中一次参数的输出:
服务器已启动,端口号:12345请输入请求消息:客户端成功连接到服务器...连接的客户端数:1123456+789+456服务器收到消息: 123456+789+456客户端收到结果:1247019526*56服务器收到消息: 9526*56客户端收到结果:533456...
AIO是真正的异步非阻塞的,所以,在面对超级大量的客户端,更能得心应手。
下面就比较一下,几种I/O编程的优缺点。
4、各种I/O的对比
先以一张表来直观的对比一下:
具体选择什么样的模型或者NIO框架,完全基于业务的实际应用场景和性能需求,如果客户端很少,服务器负荷不重,就没有必要选择开发起来相对不那么简单的NIO做服务端;相反,就应考虑使用NIO或者相关的框架了。
5、附录
上文中服务端使用到的用于计算的工具类:
package com.anxpp.utils;import javax.script.ScriptEngine;import javax.script.ScriptEngineManager;import javax.script.ScriptException;public final class Calculator { private final static ScriptEngine jse = new ScriptEngineManager().getEngineByName("JavaScript"); public static Object cal(String expression) throws ScriptException{ return jse.eval(expression); }}ما سبق هو كل محتوى هذه المقالة. آمل أن يكون ذلك مفيدًا لتعلم الجميع وآمل أن يدعم الجميع wulin.com أكثر.