بيئة النظام: CentOS 7.2 بموجب VM12
الإصدار المثبت الحالي: Elasticsearch -2.4.0.tar.gz
Java Operation ES Cluster الخطوات 1: تكوين معلومات كائن الكتلة ؛ 2: إنشاء عميل ؛ 3: عرض معلومات الكتلة
1: اسم الكتلة
اسم الكتلة الافتراضي هو Elasticsearch. إذا كان اسم الكتلة غير متسق مع الحجم المحدد ، فسيتم الإبلاغ عن خطأ عند استخدام موارد العقدة.
2: وظيفة استنشاق
ابدأ وظيفة شم من خلال client.transport.sniff ، بحيث تحتاج فقط إلى تحديد عقدة في الكتلة (وليس بالضرورة عقدة رئيسية) ، ثم تحميل العقد الأخرى في الكتلة. وبهذه الطريقة ، طالما استمر البرنامج في الإغلاق ، فلا يزال بإمكانك الاتصال بعقد أخرى حتى لو سقطت هذه العقدة.
3: نوع الاستعلام searchtype.query_then_fetch
هناك 4 أنواع من أنواع الاستعلام في استعلام ES
query_and_fetch:
تقوم العقدة الرئيسية بتوزيع طلب الاستعلام على جميع القطع. يتم فرز كل قشرة وتصنيفها وفقًا لقواعد الاستعلام الخاصة بها ، وهي تردد وثيقة الترددات ، ثم يعيد النتيجة إلى العقدة الرئيسية. تقوم العقدة الرئيسية بتلخيص جميع البيانات وفرزها وإعادتها إلى العميل. هذه الطريقة تتطلب التفاعل مع ES مرة واحدة.
طريقة الاستعلام هذه لديها مشاكل في حجم البيانات والفرز. ستلخص العقدة الرئيسية البيانات التي يتم إرجاعها بواسطة جميع القطع بحيث يكون حجم البيانات كبيرًا نسبيًا. ثانياً ، قد تكون القواعد على كل قطة غير متناسقة.
query_then_fetch:
تقوم العقدة الرئيسية بتوزيع الطلب على جميع القطع. بعد فرز كل قطة ، يتم إرجاع معرف البيانات ودرجها إلى العقدة الرئيسية. بعد استلامه ، تلخص العقدة الرئيسية وفرزها ، ثم تقرأ البيانات المقابلة إلى العقدة المقابلة وفقًا للمعرف المصنف ويعيدها إلى العميل. هذه الطريقة تتطلب التفاعل مع ES مرتين.
هذه الطريقة تحل مشكلة حجم البيانات ، لكن مشكلة الفرز لا تزال موجودة وهي طريقة الاستعلام الافتراضية لـ ES
def_query_and_fetch: و dfs_query_then_fetch:
توحيد قواعد كل قشرة للتسجيل. حل مشكلة الفرز ، لكن DFS_QUERY_AND_FETT لا يزال لديه مشاكل في حجم البيانات ، DFS_QUERY_THEN_FETT على حد سواء لديهم أفضل مشكلة ، ولكن الكفاءة هي الأسوأ.
1. الحصول على عميل ، طريقتان للحصول على
before public void قبل () يلقي الاستثناء {map <string ، string> map = new hashmap <string ، string> () ؛ map.put ("cluster.name" ، "elasticsearch_wenbronk") ؛ الإعدادات. إعدادات builder = الإعدادات. builder (). put (map) ؛ client = transporclient.builder (). الإعدادات (الإعدادات) .build () .addTransportAddress (inetsockettransportaddress (inetaddress.getByName ("www.wenbronk.com") ، integer.parseint ("9300"))) ؛ } before public void قبل 11 () يلقي الاستثناء {// إنشاء العميل ، استخدم اسم الكتلة الافتراضي ، "elasticsearch" // client = transportclient.builder (). build () // .addtransportdress (new inetsockettransportaddress (inetaddress.getbyname ("www // حدد معلومات تكوين الكتلة من خلال كائن الإعداد ، وإعدادات اسم الكتلة المكوّنة = الإعدادات. .PUT ("network.host" ، "192.168.50.37") .put ("client.transport.ignore_cluster_name" ، true) // تجاهل التحقق من اسم الكتلة ، ويمكنه الاتصال باسم الكتلة إذا كان اسم المخطط غير صحيح بعد فتحه //. .pt ("client.transport.ping_timeout" ، 5) // الإبلاغ عن خطأ ، وقت انتظار ping ، .build () ؛ client = transporclient.builder (). الإعدادات (الإعدادات) .build () .addTransportAddress (inetsocketTransportAddress جديد (New InetSocketaddress ("192.168.50.37" ، 9300))) ؛ // الافتراضي 5S // كم من الوقت يستغرق فتح الاتصال ، الافتراضي System.out.println ("Success Connect") ؛ }ملاحظة: لا يمكن استخدام الطريقتين الذي قدمه الموقع الرسمي ، ويجب أن يتم دمجهما لاستخدامهما ، وأنا أضيع بعد الظهر ...
معنى المعلمات الأخرى:
شفرة:
package com.wenbronk.javaes ؛ import java.net.inetaddress ؛ import java.net.inetsocketaddress ؛ import java.util.date ؛ import java.util.hashmap ؛ import java.util.list ؛ import java.util.map ؛ import java.util.timeunit ؛ org.elasticsearch.action.bulk.backoffpolicy ؛ استيراد org.elasticsearch.action.bulk.bulkprocessor ؛ استيراد org.elasticsearch.action.bulk.bulkprocessor org.elasticsearch.action.bulk.bulkresponse ؛ import org.elasticsearch.action.delete.deleterequest ؛ import org.elasticsearch.action.delete.deleterespons org.elasticsearch.action.gultigetresponse ؛ استيراد org.elasticsearch.action.index.indexrequest ؛ استيراد org.elasticsearch.action.index.IndexResponse ؛ استيراد org.elasticsearch.action.update.updateresponse ؛ import org.elasticsearch.action.update.update.update. org.elasticsearch.client.transport.transportclient ؛ استيراد org.elasticsearch.cluster.node.discoverynode ؛ استيراد org.elasticsearch.common.settings.settings org.elasticsearch.common.unit.bytesizevalue ؛ استيراد org.elasticsearch.common.unit.timevalue ؛ استيراد org.elasticsearch.common.xcontent.xcontentbuilder org.junit.before ؛ import org.junit.test ؛ import com.alibaba.fastjson.jsonobject ؛/** * استخدم java API لتشغيل elasticsearch * * Author 231 * */public class javaest {private TransportClient Client ؛ مصدر IndexRequest الخاص ؛ /*** احصل على الاتصال ، The First Way* throos استثناء* ////before public void قبل () رمي الاستثناء {map <string ، string> map = new hashmap <string ، string> () ؛ map.put ("cluster.name" ، "elasticsearch_wenbronk") ؛ الإعدادات. إعدادات builder = الإعدادات. builder (). put (map) ؛ client = transporclient.builder (). الإعدادات (الإعدادات) .build () .addTransportAddress (inetsockettransportaddress (inetaddress.getByName ("www.wenbronk.com") ، integer.parseint ("9300"))) ؛ }/ *** عرض معلومات الكتلة*/ test public void testInfo () {list <DuccessyNode> nodes = client.connectedNodes () ؛ لـ (discoverynode node: noles) {system.out.println (node.gethostaddress ()) ؛ }}/*** تنظيم سلاسل JSON ، الطريقة 1 ، الربط مباشرة*/السلسلة العامة createjson1 () {String json = "{" + "/" user/":/" kimchy/"،" + "/" postdate/":/" 2013-01-30/"،" + "/ إرجاع JSON ؛ } / ** * إنشاء JSON مع MAP * / MAP Public <String ، Object> createjson2 () {map <string ، Object> json = new hashmap <string ، Object> () ؛ json.put ("المستخدم" ، "kimchy") ؛ json.put ("postdate" ، Date ()) ؛ json.put ("Message" ، "Trying Out Elasticsearch") ؛ إرجاع JSON ؛ } / *** إنشاء باستخدام fastjson* / public jsonobject createJson3 () {jsonobject json = new JsonObject () ؛ json.put ("المستخدم" ، "kimchy") ؛ json.put ("postdate" ، Date ()) ؛ json.put ("Message" ، "Trying Out Elasticsearch") ؛ إرجاع JSON ؛ } / *** استخدم ES Class* / Public XcontentBuilder CreateJson4 () يلقي الاستثناء {// إنشاء كائن JSON ، إحدى الطرق لإنشاء JSON xContentBuilder Source = xContentFactory.jsonbuilder () .StartObject () .field ("user" ، "kimchy". Out Elasticsearch ") .endObject () ؛ مصدر الإرجاع ؛ } / *** حفظ في الفهرس* athrows استثناء* / test public void test1 () يلقي الاستثناء {xContentBuilder Source = createJson4 () ؛ // store json في index indexresponse strax = client.prepareIndex ("Twitter" ، "Tweet" ، "1"). SetSource (Source) .get () ؛ // // يتم الحصول على النتيجة index = review.getIndex () ؛ نوع السلسلة = استجابة. getType () ؛ معرف السلسلة = response.getId () ؛ الإصدار الطويل = استجابة. getVersion () ؛ Boolean Created = response.iscreated () ؛ System.out.println (index + ":" + type + ":" + id + ":" + version + ":" + created) ؛ }/ *** احصل على API احصل على معلومات المستند المحددة*/ Test Public void testget () {// getResponse Response = client.prepareget ("Twitter" ، "Tweet" ، "1") // .get () ؛ getResponse Response = client.prepareget ("Twitter" ، "Tweet" ، "1") .SetOperationThered (false) // thread safe.get () ؛ system.out.println (response.getSourCeasString ()) ؛ } / ** * اختبار حذف API * / Test public void testDelete () {deleteresponse repressing = client.prepedelete ("Twitter" ، "Tweet" ، "1") .get () ؛ سلسلة فهرس = استجابة. getIndex () ؛ نوع السلسلة = استجابة. getType () ؛ معرف السلسلة = response.getId () ؛ الإصدار الطويل = استجابة. getVersion () ؛ System.out.println (index + ":" + type + ":" + id + ":" + الإصدار) ؛ } / ** * اختبار تحديث تحديث API * استخدم updaterequest كائن * Throws استثناء * / test public void testupdate () رمي الاستثناء {updaterequest updateRequest = جديد updaterequest () ؛ updateRequest.index ("Twitter") ؛ updateRequest.type ("Tweet") ؛ updateRequest.id ("1") ؛ updaterequest.doc (xcontentfactory.jsonbuilder () .StartObject () // إضافة حقول غير موجودة ، استبدال الحقول الحالية ("الجنس" ، "ذكر") .field ("الرسالة" ، "Hello") .endobject ()) ؛ استجابة updateSponse = client.update (updaterequest) .get () ؛ // print string index = response.getIndex () ؛ نوع السلسلة = استجابة. getType () ؛ معرف السلسلة = response.getId () ؛ الإصدار الطويل = استجابة. getVersion () ؛ System.out.println (index + ":" + type + ":" + id + ":" + الإصدار) ؛ }/** * اختبر تحديث واجهة برمجة تطبيقات ، استخدم Client * Trows استثناء */test public void testupdate2 () يلقي الاستثناء {// تم تحديثه باستخدام كائن البرنامج النصي // updateresponse repressing = client.prepareupdate ("Twitter" ، "Tweet" ، "1") // .SetSetScript ( // updateSponse Response = client.prepareupdate ("Twitter" ، "Tweet" ، "1") // .setdoc (xContentFactory.jsonbuilder () // .StartObject () //. // استخدم كائن updateRequest و script // updaterequest updaterequest = new updateRequest ("Twitter" ، "Tweet" ، "1") // .Script (نص جديد ("ctx._source.gender =/" male/"")) ؛ // updateresponse respress = client.update (updateRequest) .get () ؛ // updateresponse repressing = client.update (updateRequest جديد ("Twitter" ، "Twitter" ، "1") .DOC (xContentFactory.JsonBuilder () .StartObject () .field ("الجنس" ، "male") .endobject ())). System.out.println (reponse.getIndex ()) ؛ }/** * اختبار التحديث * استخدم updateRequest * athrows استثناء * throws interruptedException */test public void testupdate3 () رمي interruptedException ، استثناء {updaterequest updatequest = new addaterequest ("twitter" ، "1"). استجابة updateSponse = client.update (updaterequest) .get () ؛ } / ** * اختبار upsert طريقة * athrows استثناء * * / test public void testupsert () يلقي الاستثناء {// اضبط ظروف الاستعلام ، أضف indexRequest الفعالة = new indexRequest ("Twitter" ، "Twitter" ، "2") .Source (xcontentfactory.jsonbuilder (). .field ("الجنس" ، "gfrerq") .endObject ()) ؛ // تعيين تحديث ، ابحث عن إعدادات التحديث أدناه updateRequest upsert = new UpdatErequest ("Twitter" ، "Tweet" ، "2") .DOC (xContentFactory.jsonbuilder () .StartObject () .field ("user" ، "wenbronk") .endobject ()). client.update (upsert) .get () ؛ } / ** * اختبار multi get get api * احصل على فهرس مختلف ، ومعرف * / test public void testmultiget () {multigetResponse multigetResponse = client.preparemultiget () .add ("twitter" ، "Tweet" ، "1) .add (" Twitter "،" Tweet "،" 2 "،" ، "،" ، "،" ، "،" ، "،" ، "،" "type" ، "foo") .get () ؛ لـ (multigetItEmResponse itemresponse: multigetResponse) {getResponse reponse = itemresponse.getResponse () ؛ if (response.isexists ()) {String sourceasString = response.getSourCeasString () ؛ system.out.println (sourceasString) ؛ }}} / *** تنفيذ الدُفعات بالجملة* يمكن تحديث استعلام أو حذف مستندات متعددة* / test public void testbulk () يلقي استثناء {bulkRequestBuilder bulkRequest = client.prepareBulk () ؛ BulkRequest.add (client.prepareIndex ("Twitter" ، "Tweet" ، "1") .SetSource (xContentFactory.jsonbuilder () .StartObject () .Field ("user" ، "kimchy"). BulkRequest.add (client.prepareindex ("Twitter" ، "Tweet" ، "2") .SetSource (xContentFactory.jsonbuilder () .StartObject () .Field ("user" ، "kimchy") .field ("postdate" ، new Date ()). استجابة bulkresponse = bulkRequest.get () ؛ System.out.println (reponse.getheaders ()) ؛ } / ** * استخدم معالجًا كبيرًا * athrows استثناء * / test public void testbulkprocessor () يلقي استثناء {// إنشاء bulkporcessor ObjectBulkProcessor bulkprocessor = long paramlong ، bulkrequest) Cub} // تنفيذ public void afterbulk (paramlong طويل ، bulkrequest parambulkrequest ، قابلة للتسمية قابلة للترمي لتنفيذ Bulk .SetBulkactions (10000) // 1GB تحديث البيانات بشكل كبير .SetBulksize (BytesizeValue الجديد (1 ، bytesizeUnit.gb)) // يجب أن يتم تحديث 5s ثابتة 5 من طلبات التوافق ، لا يسمح بالتنسيق ، بشكل متزامن. // إضافة طلب واحد bulkprocessor.add (فهرس جديد ("Twitter" ، "Tweet" ، "1")) ؛ BulkProcessor.add (New DeletereQuest ("Twitter" ، "Tweet" ، "2")) ؛ // Close Bulkprocessor.awaitclose (10 ، timeunit.minutes) ؛ // أو bulkprocessor.close () ؛ }} رمز TES2:
package com.wenbronk.javaes ؛ import java.net.inetsocketaddress ؛ import org.apache.lucene.queryparser.xml.filterbuilderfactory org.elasticsearch.action.search.searchResponse ؛ import org.elasticsearch.action.search.searchtype ؛ استيراد org.elasticsearch.client.transport.transportclient org.elasticsearch.common.transport.inetsocketTransportAddress ؛ import org.elasticsearch.common.unit.timevalue ؛ import org.elasticsearch.index.query.querybuilder ؛ import org.elasticsearch.index.query.query. org.elasticsearch.search.aggregations.aggregation ؛ import org.elasticsearch.search.aggregations.aggregations.aggregationsbuilders ؛ import org.elasticsearch.search.aggregations.bucket.histogram.datehistogramintrval ؛ misord org.elasticesearch.sorting.sortordor ؛ org.elasticsearch.sork.sort.sortparseElement ؛ استيراد org.junit.before ؛ import org.junit.test ؛/** * استخدم Java API لتشغيل ElasticSearch * api * @author 231 * */public class javaest2 {private spectclient client ؛ / *** احصل على كائن العميل*/ before public void testbefore () {builder builder = settings.settingsbuilder () ؛ Builder.put ("cluster.name" ، "wenbronk_escluster") ؛ // .put ("client.transport.ignore_cluster_name" ، true) ؛ إعدادات الإعدادات = builder.build () ؛ org.elasticsearch.client.transport.transportClient.Builder TransportBuild = transportClient.Build () ؛ ClientClient Client1 = TransportBuild.settings (الإعدادات) .Build () ؛ client = client1.addTransportAddress ((New InetSocketTransportAddress (inetsocketaddress جديد ("192.168.50.37" ، 9300))))) ؛ System.out.println ("Success Connect to Escluster") ؛ }/*** اختبار الاستعلام*/test public void testSearch () {// searchRequestBuilder SearchRequestBuilder = client.preparesearch ("Twitter" ، "Tweet" ، "1") ؛ // searchResponse Response = searchRequestBuilder.setTypes ("type1" ، "type2") .SetSearchType (SearchType.dfs_query_then_fetch) // .setquery (QueryBuilders.TermQuery ("user" ، "test") // .setpostfilter (querybuilders.rangequery ("Age"). from (0) .to (1)) //. .execute (). Actionget () ؛ SearchResponse Response = client.preparesearch () .Execute (). ActionGet () ؛ // searchHits Hits = response. system.out.println (next.getValues ()) ؛ //} //} system.out.println (response) ؛ } / *** اختبار SCROLL API* معالجة أكثر كفاءة لكميات كبيرة من البيانات* / test public void testScrolls () {QueryBuilder QueryBuilder = QueryBuilders.termquery ("Twitter" ، "Tweet") ؛ SearchResponse Response = client.preparesearch ("Twitter") .addsort (sortparseElement.doc_field_name ، sortorder.asc) .SetScroll (timevalue new (60000)) .setquery (querybuilder) .SetSize (100) .execute (). Actionget () ؛ بينما (صحيح) {for (searchHit hit: desponse.gethits (). gethits ()) {system.out.println ("أنا قادم") ؛ } searchResponse Response2 = client.preparesearchScroll (response.getScrollid ()) .SetScroll (TimeValue (60000)). execute (). Actionget () ؛ if (response2.gethits (). استراحة؛ }}} / ** * اختبار multisearch * / test public void testMultIsearch () {QueryBuilder QB1 = QueryBuilders.querystringQuery ("Elasticsearch") ؛ SearchRequestBuilder requestBuilder1 = client.preparesearch (). setQuery (qb1) .SetSize (1) ؛ QueryBuilder qb2 = querybuilders.matchquery ("المستخدم" ، "kimchy") ؛ SearchRequestBuilder requestBuilder2 = client.preparesearch (). setQuery (qb2) .SetSize (1) ؛ multisearchResponse MultiResponse = client.preparemultisearch (). add (requestBuilder1) .add (requestBuilder2) .execute (). Actionget () ؛ nbhits طويلة = 0 ؛ لـ (multiSearchResponse.item العنصر: multiResponse.getResponse ()) {searchResponse Response = item.getResponse () ؛ nbhits = response.gethits (). getTotalHits () ؛ SearchHit [] its = response.gethits (). gethits () ؛ System.out.println (nbhits) ؛ }} / *** اختبار التجميع Query* / Test public void testaggregation () {searchResponse Response = client.preparesearch () .SetQuery (QueryBuilders.MatchAllQuery () // استخدم أولاً Query لتصفية جزء من it.addaggeration (aggregationbuilders .edaggregation (AggregationBuilders.DateHistogram ("Agg2"). الحقل ("الولادة"). التجميع التجميع 2 = استجابة. التجميع التجميع = استجابة. getAggRegations (). GET ("Agg2") ؛ // searchResponse Response2 = client.search (New SearchRequest (). SearchType (SearchType.query_and_fetch)). ActionGet () ؛ } / ** * test إنهاء * / test public void testRephInateEfter () {searchResponse Response = client.preparesearch ("twitter"). if (response.isterMinnerly ()) {system.out.println ("ternimate") ؛ }} /** * استعلام التصفية: أكبر من GT ، أقل من LT ، أقل من أو يساوي LTE ، أكبر من أو يساوي GTE * /Test public void testfilter () {searchResponse Response = client.preparesearch ("Twitter"). .setsearchtype (SearchType.query_then_fetch) //. .rangefilter ("العمر"). GTE (18) .lte (22)) .SetExplain (صواب) // شرح صحيح للإشارة إلى أن الترتيب يتم فرزه وفقًا لأهمية البيانات ، وأعلى الكلمة الرئيسية مطابقة هي في المقدمة. } / *** استعلام المجموعة* / test public void testgroupby () {client.preparesearch ("Twitter"). setTypes ("Tweet") .SetQuery (QueryBuilders.MatchAllQuery () .field ("المستخدم"). الحجم (0) // Group وفقًا للمستخدم / / حجم (0) هو أيضًا 10) .get () ؛ }}ما سبق هو كل محتوى هذه المقالة. آمل أن يكون ذلك مفيدًا لتعلم الجميع وآمل أن يدعم الجميع wulin.com أكثر.