Preface
In the previous article, we talk about how to build a kafka cluster, and this article talks about how to simply use kafka. However, when using kafka, you should still briefly understand kafka.
Introduction to Kafka
Kafka is a high-throughput distributed publish subscription messaging system that handles all action flow data in a consumer-scale website.
Kafka has the following characteristics:
kafka terms
kafka core API
Kafka has four core APIs
The example diagram is as follows:
kafka application scenarios
For the above introduction, refer to the official kafka document.
Development preparation
If we were to develop a kafka program, what should we do?
First of all, after building a kafka environment, we need to consider whether we are a producer or a consumer, that is, the sender or recipient of the message.
However, in this article, both producers and consumers will develop and explain.
After a rough understanding of kafka, we will develop the first program.
The development language used here is Java, the construction tool Maven.
Maven's dependencies are as follows:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.12</artifactId> <version>1.0.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>1.0.0</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>1.0.0</version> </dependency>
Kafka Producer
During development and production, let’s briefly introduce the various configuration instructions of kafka:
...
There are more configurations, you can check the official documentation, which will not be explained here.
Then our kafka producer configuration is as follows:
Properties props = new Properties(); props.put("bootstrap.servers", "master:9092,slave1:9092,slave2:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("key.serializer", StringSerializer.class.getName()); props.put("value.serializer", StringSerializer.class.getName()); KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);After adding the kafka configuration, we start producing data. The production data code only needs to be as follows:
producer.send(new ProducerRecord<String, String>(topic, key, value));
After writing the producer program, let’s start producing first!
The message I sent here is:
String messageStr="Hello, this is the "+messageNo+"data";
And only 1,000 messages are sent and the results are as follows:
You can see that the information has been successfully printed.
If you do not want to use the program to verify whether the program is sent successfully and the accuracy of the message sending, you can use the command to view it on the kafka server.
Kafka Consumer
Kafka consumption should be the key point, after all, most of the time, we mainly use data consumption.
The configuration of kafka consumption is as follows:
Then our kafka consumer configuration is as follows:
Properties props = new Properties(); props.put("bootstrap.servers", "master:9092,slave1:9092,slave2:9092"); props.put("group.id", GROUPID); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("max.poll.records", 1000); props.put("auto.offset.reset", "earliest"); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); Since I'm setting up the automatic submission, the consumption code is as follows:
We need to subscribe to a topic first, that is, to specify which topic to consume.
consumer.subscribe(Arrays.asList(topic));
After subscribing, we pull data from kafka:
ConsumerRecords<String, String> msgList=consumer.poll(1000);
Generally speaking, monitoring is used when consumption is carried out. Here we use for(;;) to monitor, and set up consumption of 1,000 items and exit!
The results are as follows:
It can be seen that we have successfully consumed production data here.
Code
Then the codes for producers and consumers are as follows:
Producer:
import java.util.Properties;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.common.serialization.StringSerializer;/** * * Title: KafkaProducerTest* Description: * kafka Producer demo* Version:1.0.0 * @author pancm* @date January 26, 2018*/public class KafkaProducerTest implements Runnable { private final KafkaProducer<String, String> producer; private final String topic; public KafkaProducerTest(String topicName) { Properties props = new Properties(); props.put("bootstrap.servers", "master:9092,slave1:9092,slave2:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("key.serializer", StringSerializer.class.getName()); props.put("value.serializer", StringSerializer.class.getName()); this.producer = new KafkaProducer<String, String>(props); this.topic = topicName; } @Override public void run() { int messageNo = 1; try { for(;;) { String messageStr="Hello, this is the "+messageNo+"bar of data"; producer.send(new ProducerRecord<String, String>(topic, "Message", messageStr)); //If 100 items are produced, if (messageNo%100==0){ System.out.println("Sent message:" + messageStr); } //If 1000 items are produced, if (messageNo%1000==0){ System.out.println("Successfully sent"+messageNo+"bar"); break; } messageNo++; } } catch (Exception e) { e.printStackTrace(); } finally { producer.close(); } } public static void main(String args[]) { KafkaProducerTest test = new KafkaProducerTest("KAFKA_TEST"); Thread thread = new Thread(test); thread.start(); }}consumer:
import java.util.Arrays;import java.util.Properties;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.common.serialization.StringDeserializer;/** * * Title: KafkaConsumerTest* Description: * kafka consumer demo* Version:1.0.0 * @author pancm* @date January 26, 2018*/public class KafkaConsumerTest implements Runnable { private final KafkaConsumer<String, String> consumer; private ConsumerRecords<String, String> msgList; private final String topic; private static final String GROUPID = "groupA"; public KafkaConsumerTest(String topicName) { Properties props = new Properties(); props.put("bootstrap.servers", "master:9092,slave1:9092,slave2:9092"); props.put("group.id", GROUPID); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("auto.offset.reset", "earliest"); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); this.consumer = new KafkaConsumer<String, String>(props); this.topic = topicName; this.consumer.subscribe(Arrays.asList(topic)); } @Override public void run() { int messageNo = 1; System.out.println("----------------------------------------"); try { for (;;) { msgList = consumer.poll(1000); if(null!=msgList&&msgList.count()>0){ for (ConsumerRecord<String, String> record : msgList) { //Print 100 items when consumed, but the printed data is not necessarily the rule if(messageNo%100==0){ System.out.println(messageNo+"========receive: key = " + record.key() + ", value = " + record.value()+" offset==="+record.offset()); } //Once 1000 items are consumed, exit if(messageNo%1000==0){ break; } messageNo++; } }else{ Thread.sleep(1000); } } } catch (InterruptedException e) { e.printStackTrace(); } finally { consumer.close(); } } public static void main(String args[]) { KafkaConsumerTest test1 = new KafkaConsumerTest("KAFKA_TEST"); Thread thread1 = new Thread(test1); thread1.start(); }}Note: master, slave1, slave2 is because I have made relationship mapping in my own environment, which can be replaced with the server's IP.
Of course, I put the project on Github, and if you are interested, you can take a look. https://github.com/xuwujing/kafka (local download)
Summarize
Simple development of a kafka program requires the following steps:
kafka introduction refer to the official document: http://kafka.apache.org/intro
Summarize
The above is the entire content of this article. I hope that the content of this article has certain reference value for everyone's study or work. If you have any questions, you can leave a message to communicate. Thank you for your support to Wulin.com.