序文
前の記事では、Kafkaクラスターの構築方法について説明します。この記事では、単にKafkaを使用する方法について説明します。ただし、Kafkaを使用する場合、Kafkaを簡単に理解する必要があります。
Kafkaの紹介
Kafkaは、消費者規模のWebサイトですべてのアクションフローデータを処理するハイスループット分散パブリッシュサブスクリプションメッセージングシステムです。
Kafkaには次の特性があります。
Kafka用語
Kafka Core API
Kafkaには4つのコアAPIがあります
図の例は次のとおりです。
Kafkaアプリケーションシナリオ
上記の紹介については、公式のKafkaドキュメントを参照してください。
開発準備
Kafkaプログラムを開発する場合、どうすればよいですか?
まず、Kafka環境を構築した後、私たちがプロデューサーであるか消費者であるか、つまりメッセージの送信者または受信者であるかどうかを考慮する必要があります。
ただし、この記事では、生産者と消費者の両方が開発および説明します。
Kafkaの大まかな理解の後、最初のプログラムを開発します。
ここで使用される開発言語は、建設ツールMavenであるJavaです。
Mavenの依存関係は次のとおりです。
<Dependency> groupId> org.apache.kafka </groupid> <artifactid> kafka_2.12 </artifactid> <version> 1.0.0 </version> <scope>提供</scope> <バージョン> 1.0.0 </version> </dependency> <依存関係> <groupId> org.apache.kafka </groupid> <artifactid> kafka-streams </artifactid> <バージョン> 1.0.0 </version> </depence>
Kafkaプロデューサー
開発と生産中に、Kafkaのさまざまな構成手順を簡単に紹介しましょう。
...
より多くの構成があります。公式のドキュメントを確認できますが、ここでは説明しません。
次に、Kafkaプロデューサーの構成は次のとおりです。
Properties Props = new Properties(); props.put( "Bootstrap.Servers"、 "Master:9092、Slave1:9092、Slave2:9092"); props.put( "acks"、 "all"); props.put( "retries"、0); props.put( "batch.size"、16384); props.put( "key.serializer"、stringserializer.class.getName()); props.put( "value.serializer"、stringserializer.class.getName()); kafkaproducer <string、string> producer = new kafkaproducer <string、string>(props);
Kafka構成を追加した後、データの生成を開始します。生産データコードは次のようにする必要があります。
producer.send(new ProduceRecord <String、String>(トピック、キー、値));
プロデューサープログラムを書いた後、最初にプロデュースを始めましょう!
ここで送信したメッセージは次のとおりです。
String Messagestr = "こんにちは、これは"+messageno+"data"です。
そして、1,000のメッセージのみが送信され、結果は次のとおりです。
情報が正常に印刷されていることがわかります。
プログラムを使用して、プログラムが正常に送信され、メッセージ送信の正確性を確認したくない場合は、コマンドを使用してKafkaサーバーで表示できます。
カフカ消費者
Kafkaの消費は重要なポイントでなければなりませんが、結局のところ、ほとんどの場合、主にデータ消費を使用しています。
Kafka消費の構成は次のとおりです。
次に、Kafkaの消費者構成は次のとおりです。
Properties Props = new Properties(); props.put( "Bootstrap.Servers"、 "Master:9092、Slave1:9092、Slave2:9092"); props.put( "group.id"、groupId); props.put( "enable.auto.commit"、 "true"); props.put( "auto.commit.interval.ms"、 "1000"); props.put( "session.timeout.ms"、 "30000"); props.put( "max.poll.records"、1000); props.put( "auto.offset.reset"、 "hireliest"); props.put( "key.deserializer"、stringdeserializer.class.getName()); props.put( "value.deserializer"、stringdeserializer.class.getName()); kafkaconsumer <string、string> consumer = new Kafkaconsumer <String、String>(Props);
自動提出物を設定しているため、消費コードは次のとおりです。
最初にトピックを購読する必要があります。つまり、消費するトピックを指定する必要があります。
consumer.subscribe(arrays.aslist(トピック));
購読後、Kafkaからデータを取得します。
ConsumerRecords <string、string> msglist = consumer.poll(1000);
一般的に言えば、消費が実行されると監視が使用されます。ここでは、(;;)を使用して、1,000個のアイテムの消費を監視して設定して終了します。
結果は次のとおりです。
ここで生産データを正常に消費したことがわかります。
コード
次に、生産者と消費者向けのコードは次のとおりです。
プロデューサー:
java.util.properties; Import org.apache.kafka.clients.producer.kafkaproducer; import org.apache.kafka.clients.producer.producerrecord; Import org.apache.kafka.common.common.serialization.stringseriizer; Kafka Producer demo*バージョン:1.0.0* @author pancm* @date 2018年1月26日*/パブリッククラスKafkaproducertestは実行可能{プライベート最終kafkaproducer <string、string>プロデューサー;プライベート最終文字列トピック。 public kafkaproducertest(string topicname){properties props = new Properties(); props.put( "Bootstrap.Servers"、 "Master:9092、Slave1:9092、Slave2:9092"); props.put( "acks"、 "all"); props.put( "retries"、0); props.put( "batch.size"、16384); props.put( "key.serializer"、stringserializer.class.getName()); props.put( "value.serializer"、stringserializer.class.getName()); this.producer = new kafkaproducer <string、string>(props); this.topic = topicname; } @Override public void run(){int messageno = 1; try {for(;;){string messagestr = "こんにちは、これは"+messageno+"bar of data"; producer.send(new produceerrecord <string、string>(トピック、 "メッセージ"、messagestr)); // 100個のアイテムが生成されている場合、if(messageno%100 == 0){system.out.println( "送信メッセージ:" + messagestr); } // 1000個のアイテムが生成されている場合、if(messageno%1000 == 0){system.out.println( "ressfully send"+messageno+"bar");壊す; } messageno ++; }} catch(例外e){e.printstacktrace(); }最後に{producer.close(); }} public static void main(string args []){kafkaproducertest test = new kafkaproducertest( "kafka_test");スレッドスレッド= newスレッド(テスト); thread.start(); }}消費者:
java.util.arraysのインポート; java.util.propertiesをインポート; import org.apache.kafka.clients.consumer.consumerrecord; Import org.apache.kafka.clients.consumer.consumerrecord; Import org.apache.kafka.clients.consumer.consumerrecs; org.apache.kafka.clients.consumer.kafkaconsumer; Import org.apache.kafka.common.serialization.stringdeserializer;/****タイトル:kafkaconsumertest*説明:*説明:* kafka消費者デモ*バージョン:1.0.0 kafkaconsumertestはrunnable {private final kafkaconsumer <string、string> consumer; Private ConsumerRecords <String、String> msglist;プライベート最終文字列トピック。 private static final string groupid = "Groupa"; public kafkaconsumertest(string topicname){properties props = new Properties(); props.put( "Bootstrap.Servers"、 "Master:9092、Slave1:9092、Slave2:9092"); props.put( "group.id"、groupId); props.put( "enable.auto.commit"、 "true"); props.put( "auto.commit.interval.ms"、 "1000"); props.put( "session.timeout.ms"、 "30000"); props.put( "auto.offset.reset"、 "hireliest"); props.put( "key.deserializer"、stringdeserializer.class.getName()); props.put( "value.deserializer"、stringdeserializer.class.getName()); this.consumer = new Kafkaconsumer <String、String>(props); this.topic = topicname; this.consumer.subscribe(arrays.aslist(topic)); } @Override public void run(){int messageno = 1; System.out.println("----------------------------------------"); try {for(;;){msglist = consumer.poll(1000); if(null!= msglist && msglist.count()> 0){for(consumerrecord <string、string> record:msglist){//消費されたときに100項目を印刷しますが、印刷されたデータは必ずしもルールではありませんrecord.value()+"offset ==="+record.offset()); } // 1000アイテムが消費されると、if(messageno%1000 == 0){break; } messageno ++; }} else {thread.sleep(1000); }}} catch(arturnedexception e){e.printstacktrace(); }最後に{consumer.close(); }} public static void main(string args []){kafkaconsumertest test1 = new kafkaconsumertest( "kafka_test");スレッドスレッド1 =新しいスレッド(test1); thread1.start(); }}注: Master、Slave1、Slave2は、自分の環境で関係マッピングを行ったためです。これは、サーバーのIPに置き換えることができます。
もちろん、私はプロジェクトをGithubに置きます。興味があれば、見てみることができます。 https://github.com/xuwujing/kafka(ローカルダウンロード)
要約します
Kafkaプログラムの簡単な開発には、次の手順が必要です。
Kafkaはじめに公式文書を参照してください:http://kafka.apache.org/intro
要約します
上記は、この記事のコンテンツ全体です。この記事の内容には、すべての人の研究や仕事に特定の参照値があることを願っています。ご質問がある場合は、メッセージを残してコミュニケーションをとることができます。 wulin.comへのご支援ありがとうございます。