この記事では、Springboot ProjectでKafkaの送信と受信を統合する方法を紹介します。
Kafkaは、次の特性を備えたハイスループット分散パブリッシュサブスクライブメッセージシステムです。メッセージの保存がテラバイトであっても長い間安定したパフォーマンスを維持できます。ハイスループット:非常に普通のハードウェアKafkaでさえ、毎秒数百万のメッセージをサポートできます。 Kafkaサーバーと消費者クラスターを介したメッセージのパーティション化をサポートします。 Hadoop並列データの読み込みをサポートします。
Kafkaをインストールします
KafkaをインストールするにはZookeeperのサポートが必要なため、Windowsをインストールするときは、最初にZookeeperをインストールしてからKafkaをインストールする必要があります。以下に、Macをインストールするための手順と注意を払うポイントを示します。 Windowsの構成は、さまざまな場所を除いてほとんど同じです。
brewインストールkafka
はい、それはとても簡単です。 Macのコマンドで処理できます。このインストールプロセスにはしばらく必要な場合があり、ネットワークステータスに関連する必要があります。 「エラー:リンクできなかった:/usr/local/share/doc/homebrew」など、インストールプロンプトメッセージにエラーメッセージがある場合があります。これは問題ではなく、自動的に無視されます。最後に、以下のものを見たときに成功しました。
==> summaryðº/usr/local/cellar/kafka/1.1.0:157ファイル、47.8MB
インストール構成ファイルの場所は次のとおりです。ニーズに応じてポート番号を変更するだけです。
インストールされたZoopeeperとKafka Location/USR/Local/Cellar/
構成file /usr/local/etc/kafka/server.properties /usr/local/etc/kafka/zookeeper.properties
Zookeeperを開始します
次のようにコードをコピーします:./ bin/zookeeper-server-start/usr/local/etc/kafka/zookeeper.properties&
カフカを始めます
./bin/kafka-server-start /usr/local/etc/kafka/server.properties&
Kafkaのトピックを作成します。トピックの名前はテストです。必要な名前に設定できます。戻ってコードで正しく構成します。
コードを次のようにコピーします:./ bin/kafka-topics-create-zookeeper localhost:2181 - replication-factor 1 - パーティション1 - トピックテスト
1.依存関係を最初に解決します
スプリングブートに関連する依存関係については言及しません。 Kafkaに関連する依存関係は、Spring-Kafka統合パッケージにのみ依存しています。
<Dependency> groupId> org.springframework.kafka </groupid> <artifactid> spring-kafka </artifactid> <バージョン> 1.1.1.Release </version> </dependency>
ここでは、最初に構成ファイルを表示します
#================================================================================ kafka.consumer.servers = 10.93.21.21:2181kafka.consumer.enable.auto.commit = truekafka.consumer.session.timeout = 6000kafka.consumer.auto.commit.interval = 100kafka.consumer.auto.Offset .reset = rivatekafka.consumer.topic = testkafka.consumer.group.id = testkafka.consumer.concurrency = 10kafka.producer.servers = 10.9 3.21.21:9092kafka.producer.retries = 0kafka.producer.batch.size = 4096kafka.producer.linger = 1kafka.producer.buffer.memory = 40960
2。構成:Kafkaプロデューサー
1)@Configurationおよび@EnableKafkaを介して、構成を宣言し、kafkatemplate機能を開きます。
2)Application.Properties構成ファイルに@valueを介してKafka構成を挿入します。
3)Bean、@Beanを生成します
パッケージcom.kangaroo.sentinel.collect.configuration; Import java.util.hashmap; import java.util.map; Import org.apache.kafka.clients.producer.producerconfig; Import org.apache.kafka.common.Sirealialization; Stringserializen; org.springframework.beans.factory.annotation.value; import org.springframework.context.annotation.bean; import org.springframework.context.annotation.configuration; import org.springframework.kafka.annotation.enablekafka; Import org.springframework.kafka.core.defaultkafkaproducerfactory; Import org.springframework.kafka.core.kafkatemplate; Import org.springframework.kafka.core.producerfactory;@configuration @value( "$ {kafka.producer.servers}")プライベート文字列サーバー。 @value( "$ {kafka.producer.retries}")private int retries; @value( "$ {kafka.producer.batch.size}")private int batchsize; @value( "$ {kafka.producer.linger}")private int linger; @value( "$ {kafka.producer.buffer.memory}")private int buffermemory; public Map <string、object> produceconfigs(){map <string、object> props = new hashmap <>(); props.put(producterconfig.bootstrap_servers_config、サーバー); props.put(producterconfig.retries_config、retries); props.put(producterconfig.batch_size_config、batchsize); props.put(producterconfig.linger_ms_config、linger); props.put(producterconfig.buffer_memory_config、buffermemory); props.put(producterconfig.key_serializer_class_config、stringserializer.class); props.put(producterconfig.value_serializer_class_config、stringserializer.class);小道具を返します。 } public ProducerFactory <String、String> producterFactory(){return new DefaultKafkaproducerFactory <>(decroderconfigs()); } @bean public kafkatemplate <string、string> kafkatemplate(){return new kafkatemplate <string、string>(producterfactory()); }}プロデューサーを実験し、コントローラーを書きます。 topic = test、key = key、メッセージを送信したい
パッケージcom.kangaroo.sentinel.collect.controller; Import com.kangaroo.sentinel.common.response.response; Import com.kangaroo.sentinel.common.response.resultcode; Import org.slf4j.logger; Import org.slf4j.loggerfutryory; org.springframework.beans.factory.annotation.autowired; import org.springframework.kafka.kore.kafkatemplate; Import org.springframework.web.annotation javax.servlet.http.httpservletresponse;@restcontroller@requestmapping( "/kafka")パブリッククラスCollectController {Protected final logger logger = loggerFactory.getLogger(this.getClass()); @autowiredプライベートKafkatemplate Kafkatemplate; @RequestMapping(value = "/send"、method = requestmethod.get)public respons sendkafka(httpservletrequest request、httpservletResponse応答){try {string message = request.getParameter( "message"); logger.info( "kafka message = {}"、message); kafkatemplate.send( "test"、 "key"、message); logger.info( "kafkaを正常に送信してください。");新しい応答を返します(resultcode.success、 "kafkaを正常に送信する"、null); } catch(Exception e){logger.error( "kafka failedを送信"、e);新しい応答を返します(resultCode.Exception、「kafka failedを送信」、null); }}}3。構成:Kafka Consumer
1)@Configurationおよび@EnableKafkaを介して、構成を宣言し、kafkatemplate機能を開きます。
2)Application.Properties構成ファイルに@valueを介してKafka構成を挿入します。
3)Bean、@Beanを生成します
パッケージcom.kangaroo.sentinel.collect.configuration; Import org.apache.kafka.clients.consumer.consumerconfig; Import org.apache.kafka.common.Serialization.StringDearializer; Import org.Springframework.beans.Annotation.value; org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.kafka.annotation.EnableKafka;import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;import org.springframework.kafka.config.kafkalistenercontainerfactory; Import org.springframework.kafka.core.consumerfactory; import org.springframework.kafka.core.defaultkafkaconsumerfactory; Import org.springframework.kafka.listener.concurrentmessagelistenercontainer; Import java.util.hashmap; Import java.util.map;@configuration@enablekafkapublic class kafkaconsumerconfig {@value( "$ {kafka.consumer.") @value( "$ {kafka.consumer.enable.auto.commit}")private boolean enableautocommit; @value( "$ {kafka.consumer.session.timeout}")private string sessiontimeout; @value( "$ {kafka.consumer.auto.commit.interval}")private string autocommitinterval; @value( "$ {kafka.consumer.group.id}")private string groupid; @value( "$ {kafka.consumer.auto.offset.reset}")private string autooffsetreset; @value( "$ {kafka.consumer.concurrency}")private int concurrency; @bean public kafkalistenercontainerfactory <concurrentmessagelistenercontainer <string、string >> kafkalistenercontainerfactory(){concurrentkafkalistenercontainerfactory <string、string> factory = new concurrentkafkalistenerconerfactory <>(); Factory.setConsumerFactory(ConsumerFactory()); Factory.setConcurrency(並行性); Factory.getContainerProperties()。setPollTimeout(1500);ファクトリーを返します。 } public consumerfactory <string、string> consumerfactory(){return new DefaultKafkaconsumerfactory <>(consumerconfigs()); } public map <string、object> consumerconfigs(){map <string、object> propsmap = new hashmap <>(); propsmap.put(consumerconfig.bootstrap_servers_config、サーバー); propsmap.put(consumerconfig.enable_auto_commit_config、enableautocommit); propsmap.put(consumerconfig.auto_commit_interval_ms_config、autocommitinterval); propsmap.put(consumerconfig.session_timeout_ms_config、sessiontimeout); propsmap.put(consumerconfig.key_deserializer_class_config、stringdeserializer.class); propsmap.put(consumerconfig.value_deserializer_class_config、stringdeserializer.class); propsmap.put(consumerconfig.group_id_config、groupid); propsmap.put(consumerconfig.auto_offset_reset_config、autooffsetreset); PropsMapを返します。 } @bean public liender ristener(){return new ristener(); }}新しいリスナー()は、Kafkaから読み取りデータを処理するためにBeanを生成します。リスナーの簡単な実装デモは次のとおりです。キーとメッセージの値を読んで印刷するだけです
@KafkalistenerのTopics属性は、Kafkaトピック名を指定するために使用されます。トピック名はメッセージプロデューサーによって指定されます。つまり、メッセージを送信するときにKafkatemplateによって指定されます。
パッケージcom.kangaroo.sentinel.collect.configuration; Import org.apache.kafka.clients.consumer.consumerrecord; Import org.slf4j.logger; import org.slf4j.loggerfactory; import org.spramework.kafka.annotation.kafctederirenter; logger = loggerFactory.getLogger(this.getClass()); @kafkalistener(topics = {"test"})public void risten(consumerrecord <?、> record){logger.info( "kafka's key:" + record.key()); logger.info( "kafkaの値:" + record.value()。toString()); }}ヒント:
1)Kafkaのインストールと構成の方法を紹介しませんでした。 LocalHostまたは127.0.0.1ではなく、Kafkaを構成するときに完全にバインドされたネットワークIPを使用するのが最善です
2)Kafka自身の動物園を使用してKafkaを展開しないことをお勧めします。
3)理論的には、消費者はZookeeperを通じてKafkaを読む必要がありますが、ここではKafkaserverのアドレスを使用しています。
4)監視メッセージの構成を定義する場合、Group_id_config構成項目の値を使用して、消費者グループの名前を指定します。同じグループに複数のリスナーオブジェクトがある場合、メッセージを受信できるリスナーオブジェクトは1つだけです。
上記はこの記事のすべての内容です。みんなの学習に役立つことを願っています。誰もがwulin.comをもっとサポートすることを願っています。