머리말
이전 기사에서는 Kafka 클러스터를 구축하는 방법에 대해 이야기 하며이 기사는 Kafka를 단순히 사용하는 방법에 대해 이야기합니다. 그러나 Kafka를 사용할 때는 여전히 Kafka를 간단히 이해해야합니다.
카프카 소개
Kafka는 소비자 규모 웹 사이트에서 모든 작업 흐름 데이터를 처리하는 고 처리량 분산 출판 구독 메시징 시스템입니다.
Kafka에는 다음과 같은 특성이 있습니다.
카프카 용어
Kafka Core API
Kafka에는 4 개의 핵심 API가 있습니다
예제 다이어그램은 다음과 같습니다.
Kafka 응용 프로그램 시나리오
위의 소개는 공식 Kafka 문서를 참조하십시오.
개발 준비
우리가 Kafka 프로그램을 개발하려면 어떻게해야합니까?
우선, Kafka 환경을 구축 한 후 우리는 우리가 생산자인지 소비자인지, 즉 메시지의 발신자인지 수신자인지 고려해야합니다.
그러나이 기사에서는 생산자와 소비자 모두가 개발하고 설명 할 것입니다.
Kafka를 대략적으로 이해 한 후 첫 번째 프로그램을 개발할 것입니다.
여기에 사용 된 개발 언어는 건설 도구 Maven 인 Java입니다.
Maven의 종속성은 다음과 같습니다.
<pectionency> <groupId> org.apache.kafka </groupid> <artifactid> kafka_2.12 </artifactid> <버전> 1.0.0 </version> <copop> 제공 </scope> </fexendency> <groupid> org.apache.kafka </groupid> aritifactid> kafka-clients </artifactid> <버전> 1.0.0 </version> </dependency> <pectionency> <groupId> org.apache.kafka </groupid> <artifactid> Kafka-streams </artifactid> <버전> 1.0.0 </version> </dependency>
카프카 프로듀서
개발 및 생산 중에 Kafka의 다양한 구성 지침을 간략하게 소개하겠습니다.
...
더 많은 구성이 있습니다. 공식 문서를 확인할 수 있으며 여기에는 설명되지 않습니다.
그런 다음 Kafka 프로듀서 구성은 다음과 같습니다.
속성 propss = 새로운 속성 (); props.put ( "bootstrap.servers", "Master : 9092, Slave1 : 9092, Slave2 : 9092"); props.put ( "acks", "all"); props.put ( "재시험", 0); props.put ( "batch.size", 16384); props.put ( "key.serializer", stringserializer.class.getName ()); props.put ( "value.serializer", stringserializer.class.getName ()); Kafkaproducer <String, String> producer = new Kafkaproducer <String, String> (props);
Kafka 구성을 추가 한 후 데이터 생성을 시작합니다. 생산 데이터 코드는 다음과 같습니다.
Producer.Send (New ProduceRrecord <String, String> (주제, 키, 값));
프로듀서 프로그램을 작성한 후 먼저 제작을 시작합시다!
내가 여기서 보낸 메시지는 다음과 같습니다.
String Messagest = "안녕하세요, 이것은"+messageno+"데이터입니다.
그리고 1,000 개의 메시지 만 전송되고 결과는 다음과 같습니다.
정보가 성공적으로 인쇄되었음을 알 수 있습니다.
프로그램을 사용하여 프로그램이 성공적으로 전송되는지 여부와 메시지 전송의 정확도를 확인하지 않으려면 명령을 사용하여 Kafka 서버에서이를 볼 수 있습니다.
카프카 소비자
Kafka 소비는 핵심 요점이되어야합니다. 결국, 우리는 주로 데이터 소비를 사용합니다.
Kafka 소비의 구성은 다음과 같습니다.
그런 다음 Kafka 소비자 구성은 다음과 같습니다.
속성 propss = 새로운 속성 (); props.put ( "bootstrap.servers", "Master : 9092, Slave1 : 9092, Slave2 : 9092"); props.put ( "group.id", GroupId); props.put ( "enable.auto.commit", "true"); props.put ( "auto.commit.interval.ms", "1000"); props.put ( "session.timeout.ms", "30000"); props.put ( "max.poll.records", 1000); props.put ( "auto.offset.reset", "초기"); props.put ( "key.deserializer", StringDeserializer.class.getName ()); props.put ( "value.deserializer", StringDeserializer.class.getName ()); Kafkaconsumer <String, String> Consumer = New Kafkaconsumer <String, String> (props);
자동 제출을 설정하므로 소비 코드는 다음과 같습니다.
우리는 먼저 주제, 즉 소비 할 주제를 지정하기 위해 주제를 구독해야합니다.
소비자 .subscribe (arrays.aslist (주제));
구독 후 Kafka에서 데이터를 가져옵니다.
ConsidERRECORDS <String, String> msglist = consumer.poll (1000);
일반적으로 모니터링은 소비를 수행 할 때 사용됩니다. 여기서 우리는 (;;)를 모니터링하고 1,000 개의 품목을 소비하고 출구로 설정합니다!
결과는 다음과 같습니다.
우리는 여기에서 생산 데이터를 성공적으로 소비 한 것을 알 수 있습니다.
암호
그런 다음 생산자 및 소비자의 코드는 다음과 같습니다.
생산자:
import java.util.properties; import org.apache.kafka.clients.producer.kafkaproduer; import org.apache.kafka.clients.producer.producerecord; import org.apache.kafka.common.serialization.stringserializer;/*** 제목 : Kafkaprestest* demo* kafkafrestest** 제목 : Kafkafresteceations** 제목 : Kafka. 버전 : 1.0.0* @author pancm* @date 2018 년 1 월 26 일*/public class kafkaproductest emplements runnable {private final kafkaproducer <string, string> producer; 개인 최종 문자열 주제; public kafkaproductest (String topicname) {속성 propss = new Properties (); props.put ( "bootstrap.servers", "Master : 9092, Slave1 : 9092, Slave2 : 9092"); props.put ( "acks", "all"); props.put ( "재시험", 0); props.put ( "batch.size", 16384); props.put ( "key.serializer", stringserializer.class.getName ()); props.put ( "value.serializer", stringserializer.class.getName ()); this.producer = new Kafkaproducer <String, String> (props); this.topic = topicname; } @override public void run () {int messageno = 1; try {for (;;) {String Messagest = "hello, 이것은"+messageno+"데이터의 막대"입니다. Producer.Send (New ProduceRrecord <String, String> (Topic, "Message", Messagest); // 100 항목이 생성되는 경우 (Messageno%100 == 0) {System.out.println ( "Send Message :" + Messagest); } // 1000 항목이 생성 된 경우, if (messageno%1000 == 0) {System.out.println ( "성공적으로"+messageno+"bar"); 부서지다; } Messageno ++; }} catch (예외 e) {e.printstacktrace (); } 마침내 {producer.close (); }} public static void main (String args []) {kafkaproductestest test = new Kafkaproductest ( "Kafka_test"); 스레드 스레드 = 새 스레드 (테스트); thread.start (); }}소비자:
import java.util.arrays; import java.util.properties; import org.apache.kafka.clients.consumer.consumerRecord; import org.apache.kafka.clients.consumer.consumerRecord; import org.apache.kafka.clients.consumercord; org.apache.kafka.clients.consumer.kafkaconsumer; import org.apache.kafka.common.serialization.stringdeserializer;/**** 제목 : kafkaconsumertest* 설명 :* Kafka Consumer Demo* 버전 : 1.0.0* @author pancm* @Date 26, 2018*/depk kafk instroms. Runnable {개인 최종 Kafkaconsumer <String, String> Consumer; 개인 소비점 <문자열, 문자열> msglist; 개인 최종 문자열 주제; 개인 정적 최종 문자열 GroupId = "Groupa"; public kafkaconsumertest (String TopicName) {properties propss = new Properties (); props.put ( "bootstrap.servers", "Master : 9092, Slave1 : 9092, Slave2 : 9092"); props.put ( "group.id", GroupId); props.put ( "enable.auto.commit", "true"); props.put ( "auto.commit.interval.ms", "1000"); props.put ( "session.timeout.ms", "30000"); props.put ( "auto.offset.reset", "초기"); props.put ( "key.deserializer", StringDeserializer.class.getName ()); props.put ( "value.deserializer", StringDeserializer.class.getName ()); this.consumer = new kafkaconsumer <string, String> (props); this.topic = topicname; this.consumer.subscribe (arrays.aslist (topic)); } @override public void run () {int messageno = 1; System.out.println ( "-------------------------------------------------------- try {for (;;) {msglist = consumer.poll (1000); if (null! = msglist && msglist.count ()> 0) {for (CopbereCord <String, String> record : msglist) {// 소비 될 때 100 개 항목을 인쇄하지만 인쇄 된 데이터가 반드시 규칙이 아닙니다 (Messageno%100 == 0) {System.out.println (messageno + "= +" + " +" + " +" + " +" + " +" + " +" + " +" + " +" + " +" + " +" + " record.value ()+"오프셋 ==="+record.offset ()); } // 한 번 1000 개 항목이 소비되면 (Messageno%1000 == 0) {break; } Messageno ++; }} else {thread.sleep (1000); }}} catch (InterruptedException e) {e.printstacktrace (); } 마침내 {consumer.close (); }} public static void main (String args []) {kafkaconsumertest test1 = 새로운 kafkaconsumertest ( "kafka_test"); 스레드 스레드 1 = 새 스레드 (test1); thread1.start (); }}참고 : Mas
물론, 나는 프로젝트를 Github에 올려 놓고 관심이 있으시면 살펴볼 수 있습니다. https://github.com/xuwujing/kafka (로컬 다운로드)
요약
Kafka 프로그램의 간단한 개발에는 다음 단계가 필요합니다.
Kafka 소개 공식 문서를 참조하십시오 : http://kafka.apache.org/intro
요약
위는이 기사의 전체 내용입니다. 이 기사의 내용에 모든 사람의 연구 나 작업에 대한 특정 참조 가치가 있기를 바랍니다. 궁금한 점이 있으면 의사 소통을 위해 메시지를 남길 수 있습니다. Wulin.com을 지원 해주셔서 감사합니다.