/kafka-study

kafka-study - Apache Kafka Tutorials

Primary LanguageJava

--------------------------------------------------------------------------------

			# KAFKA-STUDY #

--------------------------------------------------------------------------------

########################################################
### KAFKA

https://kafka.apache.org/
https://www.confluent.io/
https://aws.amazon.com/ko/msk/

https://kafka.apache.org/documentation/

https://github.com/apache/kafka

https://spring.io/projects/spring-kafka#overview
https://docs.spring.io/spring-kafka/docs/current/reference/html/

https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals

https://github.com/linkedin/Burrow

https://www.confluent.io/hub/

https://ksqldb.io/

########################################################
### Reference

https://kafka.apache.org/quickstart

- 아파치 카프카 - book 예제 소스 
https://github.com/bjpublic/apache-kafka-with-java

https://github.com/AndersonChoi/

- 동영상 강좌 
https://www.youtube.com/watch?v=waw0XXNX-uQ&list=PL3Re5Ri5rZmkY46j6WcJXQYRlDRZSUQ1j

https://www.youtube.com/watch?v=VJKZvOASvUA
https://www.youtube.com/watch?v=iUX6d14bvj0
https://www.youtube.com/watch?v=dubFjEXuK6w
https://www.youtube.com/watch?v=oyNjiQ2q2CE
https://www.youtube.com/watch?v=3OPZ7_sHtWo

- blog
https://blog.voidmainvoid.net/
https://blog.voidmainvoid.net/category/빅데이터/Kafka
https://velog.io/@jaehyeong/Apache-Kafka아파치-카프카란-무엇인가
https://soft.plusblog.co.kr/category/정리 예정
https://pearlluck.tistory.com/category/Big%20Data/Kafka


########################################################
### Kafka Guide

* download
https://kafka.apache.org/downloads
다운로드 kafka_2.12-3.3.1.tgz 
tar xvf  kafka_2.12-3.3.1.tgz 

* heap memory 설정   .bashrc 등에 설정 
ex) export KAFKA_HEAP_OPTS="-Xmx400m -Xms400m" 

* zookeeper 실행 
cd  kafka_2.12-3.3.1
$ bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
$ jps -vm 

# zookeeper.properties
	dataDir=/tmp/zookeeper
	clientPort=2181
	maxClientCnxns=0
	admin.enableServer=false

* server.properties 파일 내용 체크 
$ vi config/server.properties 
하단 내용 설정 
#advertised.listeners=PLAINTEXT://your.host.name:9092
advertised.listeners=PLAINTEXT://127.0.0.1:9092

* kafka broker 실행 
$ bin/kafka-server-start.sh -daemon config/server.properties
$ jps -m 
$ tail -f logs/server.log

# server.properties
	# kafka server.properties for localhost broker test
	broker.id=0
	num.network.threads=3
	num.io.threads=8

	# Please modify directory
	log.dirs=/tmp/kafka
	num.partitions=3
	listeners=PLAINTEXT://localhost:9092
	advertised.listeners=PLAINTEXT://localhost:9092
	socket.send.buffer.bytes=102400
	socket.receive.buffer.bytes=102400
	socket.request.max.bytes=104857600
	num.recovery.threads.per.data.dir=1
	offsets.topic.replication.factor=1
	transaction.state.log.replication.factor=1
	transaction.state.log.min.isr=1
	log.retention.hours=168
	log.segment.bytes=1073741824
	log.retention.check.interval.ms=300000
	zookeeper.connect=localhost:2181
	zookeeper.connection.timeout.ms=18000
	group.initial.rebalance.delay.ms=0

*  kafka 통신 테스트
$ bin/kafka-broker-api-versions.sh --bootstrap-server 127.0.0.1:9092

# kafka command line tool
* kafka-topics.sh 
$ bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --topic hello.kafka
$ bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1 --config retention.ms=1728000000 --topic hello.kafka2
$ bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --topic hello.kafka
$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic hello.kafka2 
$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic hello.kafka --alter --partitions 4
$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic hello.kafka --describe
$ bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name hello.kafka --alter --add-config retention.ms=86400000
$ bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name hello.kafka --describe 

* kafka-console-producer.sh 
$ bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic hello.kafka
$ bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic hello.kafka --property "parse.key=true" --property "key.separator=:"

* kafka-console-consumer.sh
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic hello.kafka --from-beginning
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic hello.kafka --property print.key=true --property key.separator="-" --group hello-group --from-beginning

* kafka-consumer-groups.sh
$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list hello-group
$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group hello-group --describe

* kafka-verifiable-producer, consumer.sh 
$ bin/kafka-verifiable-producer.sh --bootstrap-server localhost:9092 --max-messages 10 --topic verify-test 
$ bin/kafka-verifiable-consumer.sh --bootstrap-server localhost:9092 --topic verify-test --group-id test-group

* kafka-delete-records.sh 
$ vi delete-topic.json
{"partitions": [{"topic":"test" ,"partition": 0, "offset": 50}], "version":1}
$ bin/kafka-delete-records.sh --bootstrap-server localhost:9092 --offset-json-file delete-topic.json

--------------------------------------------------------------------------------

### example (아파치 카프카 - book 예제 소스 )
https://github.com/bjpublic/apache-kafka-with-java

* example/kafka-producer/ 
	- SimpleProducer.java
	- ProducerWithKeyValue.java
	- ProducerExactPartition.java
	- ProducerWithCustomPartitioner.java
	  CustomPartitioner.java
	- ProducerWithSyncCallback.java
	- ProducerWithAsyncCallback.java
	  ProducerWithAsyncCallback.java
	- IdempotenceProducer.java
	- TransactionProducer.java
	- ConfluentSimpleProducer.java

* example/kafka-consumer/
	- SimpleConsumer.java
	- ConsumerWithSyncCommit.java
	- ConsumerWithSyncOffsetCommit.java
	- ConsumerWithASyncCommit.java
	- ConsumerWithRebalanceListener.java
	  RebalanceListener.java
	- ConsumerWithExactPartition.java
	- ConsumerWithSyncOffsetCommitShutdownHook.java
	- ConsumerWithAutoCommit.java
	- MultiConsumerThread.java
	  ConsumerWorker.java
	- MultiConsumerThreadByPartition.java
	  ConsumerWorkerByPartition.java
	- ConsumerWithMultiWorkerThread.java
	  ConsumerWorkerMulit.java
	- TransactionConsumer .java
	- ConfluentSimpleConsumer.java

* example/kafka-admin/	  
	- KafkaAdminClient.java

* example/kafka-streams/	  
	- SimpleStreamApplication.java
	- StreamsFilter.java
	- KStreamJoinKTable.java
	- KStreamJoinGlobalKTable.java
	- SimpleKafkaProcessor.java
	  FilterProcessor.java
	- KStreamCountApplication.java
	- QueryableStore.java
	- MetricStreams.java
	  MetricJsonUtils.java

* example/kafka-connector/	  
	simple-source-connector/
	- SingleFileSourceConnector.java
	  SingleFileSourceConnectorConfig.java
	  SingleFileSourceTask.java
	simple-sink-connector/
	- SingleFileSinkConnector.java
	  SingleFileSinkConnectorConfig.java
	  SingleFileSinkTask.java

* example/spring-kafka/	  
	spring-kafka-producer/
	spring-kafka-template-producer/
	spring-kafka-record-listener/
	spring-kafka-batch-listener/
	spring-kafka-commit-listener/
	spring-kafka-listener-container/

--------------------------------------------------------------------------------

# 단일 모드 커넥트 
$ bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties

# connect-distributed.properties 
	bootstrap.servers=localhost:9092
	group.id=connect-cluster

	key.converter=org.apache.kafka.connect.storage.StringConverter
	value.converter=org.apache.kafka.connect.storage.StringConverter
	key.converter.schemas.enable=false
	value.converter.schemas.enable=false

	offset.storage.topic=connect-offsets
	offset.storage.replication.factor=1
	config.storage.topic=connect-configs
	config.storage.replication.factor=1
	status.storage.topic=connect-status
	status.storage.replication.factor=1
	offset.flush.interval.ms=10000

# 분산 모드 커넥트 
$ bin/connect-distributed.sh config/connect-distributed.properties 

# 커넥터 플러그인 조회
$ curl -X GET http://localhost:8083/connector-plugins

# FileStreamSinkConnector 생성

$ curl -X POST \
  http://localhost:8083/connectors \
  -H 'Content-Type: application/json' \
  -d '{
    "name": "file-sink-test",
    "config":
    {
	    "topics":"test",
	    "connector.class":"org.apache.kafka.connect.file.FileStreamSinkConnector",
	    "tasks.max":1,
	    "file":"/tmp/connect-test.txt"
    }
  }'

# file-sink-test 커넥터 실행 상태 확인
$ curl http://localhost:8083/connectors/file-sink-test/status

# file-sink-test 커넥터의 태스크 확인
$ curl http://localhost:8083/connectors/file-sink-test/tasks

# file-sink-test 커넥터 특정 태스크 상태 확인
$ curl http://localhost:8083/connectors/file-sink-test/tasks/0/status

# file-sink-test 커넥터 특정 태스크 재시작
$ curl -X POST http://localhost:8083/connectors/file-sink-test/tasks/0/restart

# file-sink-test 커넥터 수정
$ curl -X PUT http://localhost:8083/connectors/file-sink-test/config \
  -H 'Content-Type: application/json' \
  -d '{
	    "topics":"test",
	    "connector.class":"org.apache.kafka.connect.file.FileStreamSinkConnector",
	    "tasks.max":1,
	    "file":"/tmp/connect-test2.txt"
	}'

# file-sink-test 커넥터 중지
$ curl -X PUT http://localhost:8083/connectors/file-sink-test/pause

# file-sink-test 커넥터 시작
$ curl -X PUT http://localhost:8083/connectors/file-sink-test/resume

# file-sink-test 커넥터 재시작
$ curl -X POST http://localhost:8083/connectors/file-sink-test/restart

# file-sink-test 커넥터 삭제
$ curl -X DELETE http://localhost:8083/connectors/file-sink-test

# mac에서 내 ip 확인
$ ifconfig | grep "inet " | awk '{ print $2}'

# kafka-connect-ui 도커 실행
$ docker run --rm -it -p 8000:8000 \
           -e "CONNECT_URL=http://{{my-ip}}:8083" \
           landoop/kafka-connect-ui

# 카프카 미러케이커2 
 config/ connect-mirror-maker.properties
* connect-mirror-maker.properties
    ---------------------------------
	clusters = A, B

	A.bootstrap.servers = a-kafka:9092
	B.bootstrap.servers = b-kafka:9092

	A->B.enabled = true
	A->B.topics = test

	B->A.enabled = false
	B->A.topics = .*

	replication.factor=1

	checkpoints.topic.replication.factor=1
	heartbeats.topic.replication.factor=1
	offset-syncs.topic.replication.factor=1
	offset.storage.replication.factor=1
	status.storage.replication.factor=1
	config.storage.replication.factor=1
    ---------------------------------

	$ bin/connect-mirror-maker.sh config/connect-mirror-maker.properties 
	$ bin/kafka-console-producer.sh --bootstrap-server a-kafka:9002 --topic test
	> a 
	> b 
	> c 
	$ bin/kafka-console-consumer.sh --bootstrap-server b-kafka:9092 --topic A.test --from-beginning 
	$ bin/kafka-topics.sh --bootstrap-server a-kafka:9092 --topic test --alter --partitions 5
	$ bin/kafka-topics.sh --bootstrap-server b-kafka:9092 --topic A.test --describe 

	* Geo-Replication 
	- Active-standby
	- Active-Active
	- Hub and spoke

# 컨슈머 랙 체크 
	* 카프카 명령어를 사용하여 컨슈머 랙 조회 
	$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my-group --describe

	* 컨슈머 metrics() 메소드를 사용하여 컨슈머 랙 조회 
		for (Map.Entry<MetricName, ? extends Metric> entry : kafkaConsumer.metrics().
		entrySet()) {
			if("records-lag-max".equals(entry.getKey().name()) |
			"records-lag".equals(entry.getKey().name()) |
			"records-lag-avg".equals(entry.getKey().name())){
				Metric metric = entry.getValue();
				logger.info("{}:{}", entry.getKey().name(), metric.metricValue());
			}
		}
	
	* 외부 모니터링 툴을 사용하여 컨슈머 랙 조회 
	- Datadog
	- Confluent Control Center 
	- Burrow (Open Source)

   * 컨슈머 랙 모니터링 아키텍처 툴
	- 버로우
	- 텔레그래프
	- 엘라스틱서치
	-그라파나 

=================================================================================

### 실전 예제 

## 웹 페이지 이벤트 적재 파이프라인 생성 

# 아키텍처 
* 사용자 이벤트 --- (Rest Api) ---> 프로듀서 APP  ---> Kafka ---> 컨슈머 APP ---> 하둡 
* 사용자 이벤트 --- (Rest Api) ---> 프로듀서 APP  ---> Kafka ---> 분산 커넥트  ---> 엘라스틱서치 

# 작업 리스트 
* 로컬 하둡, 엘라스틱서치 , 키바나 설치
* 토픽 생성
* 이벤트 수집 웹페이지 개발
* REST API 프로듀서 애플리케이션 개발
* 하둡 적재 컨슈머 애플리케이션 개발
* 엘라스틱서치 싱크 커넥터 개발 

1.  로컬 하둡, 엘라스틱서치 , 키바나 설치
brew install hadoop elasticsearch kibana

hadoop - core-site.xml 
<configuration>
 <property>
  <name>fs.defaultFS</name>
  <value>hdfs://localhost:9000</value>
 </property>
</configuration>

2.  토픽 생성
$ bin/kafka-topics.sh --create --bootstrap-server localhost:9092 \
--replication-factor 1 --partitions 3 --topic select-color 

3.  이벤트 수집 웹페이지 개발 
example/practical_example/favorite-color-webpage/index.html 

4. REST API 프로듀서 애플리케이션 개발
example/practical_example/kafka-spring-producer-with-rest-controller

5. 하둡 적재 컨슈머 애플리케이션 개발
example/practical_example/kafka-multi-consumer-thread-hdfs-save

6. 엘라스틱서치 싱크 커넥터 개발 
example/practical_example/elasticsearch-kafka-connector

## 상용 인프라 아키텍처 
* L4 로드 밸런서 : 웹 이벤트를 받아서 프로듀서로 분배 역활 
* 프로듀서 :  2개 이상의 서버 , 각 서버당 1개 프로듀서 
* 카프카 클러스터 : 3개 이상의 브로커로 구성 
* 컨슈머 : 2개 이상의 서버, 각 서버당 1개 컨슈머 
* 커넥트 : 2개 이상의 서버, 분산 모드 커넥트로 구성 

# 기본 
* 사용자 이벤트 - (Rest Api) -(L4)-> 프로듀서 APP (2대) ---> Kafka(클러스터 브로커3개) ---> 컨슈머 APP(2개) ---> 하둡 
* 사용자 이벤트 - (Rest Api) -(L4)-> 프로듀서 APP (2대) ---> Kafka(클러스터 브로커3개) ---> 분산 커넥트 (2개) ---> 엘라스틱서치 

# 요쳥량 증가 
* 사용자 이벤트 - (Rest Api) -(L4)-> 프로듀서 APP (서버 스케일아웃) -> Kafka(클러스터 브로커3개 - 파티션 개수 증가) 
	-> 컨슈머 APP(2개 - 스레드 개수 증가/ 태스크 개수 증가) -> 하둡 
* 사용자 이벤트 - (Rest Api) -(L4)-> 프로듀서 APP (서버 스케일아웃) -> Kafka(클러스터 브로커3개 - 파티션 개수 증가) 
	-> 분산 커넥트 (2개 - 스레드 개수 증가/ 태스크 개수 증가) -> 엘라스틱서치 

--------------------------------------------

## 서버 지표 수집 파이프라인 생성과 카프카 스트림즈 활용

1.  토픽 생성
- 서버 전체 지표들을 저장하는 토픽 생성 
$ bin/kafka-topics.sh --create --bootstrap-server localhost:9092 \
--replication-factor 2 --partitions 3 --topic metric.all 

- CPU 지표를  저장하는 토픽 생성 
$ bin/kafka-topics.sh --create --bootstrap-server localhost:9092 \
--replication-factor 2 --partitions 3 --topic metric.cpu

- 메모리 지표를  저장하는 토픽 생성 
$ bin/kafka-topics.sh --create --bootstrap-server localhost:9092 \
--replication-factor 2 --partitions 3 --topic metric.memory 

- 비정상  지표를  저장하는 토픽 생성 
$ bin/kafka-topics.sh --create --bootstrap-server localhost:9092 \
--replication-factor 2 --partitions 3 --topic metric.cpu.alert

상용 아키텍처에 적용하는 것을 기준으로 replication-factor 2 로 설정 싱글 브로커로 구성된 환경이면 1로 설정 

2.  메트릭비트 설치 및 설정
$ brew install metricbeat
$ brew info metricbeat

$ cd 메트릭설치 bin폴더 
$ vi metricbeat.yaml 
metricbeat.modules:
  - module: system
    metricsets:
      - cpu
      - memory
    enabled: true
    period: 10s
    processes: [".*"]

output.kafka:
  hosts: ["localhost:9092"]
  topic: "metric.all"

  3. 카프카 스트림즈 개발 
  * example/kafka-streams/	  
	- MetricStreams.java
	  MetricJsonUtils.java

스트림즈에서 필요한 JSON 구조
* 전체 CPU 사용량 퍼센티지 : system > cpu > total > norm > pct 값
* 메트릭 종류 추출 : metricset > name 값
* 호스트 이름과 timestamp 값 조합
   * 호스트 이름 : host > name 값 
   * timestamp 값 : @timestamp 값 

4. 기능 테스트 
- 메트릭비트 실행
$ ./metricbeat -c metricbeat.yaml 
확인) $ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic metric.all --from-beginning

- 스트림 애플리케이션 실행 
 MetricStreams.java 실행 
 확인) $ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic metric.cpu --from-beginning
          $ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic metric.cpu.alert --from-beginning

## 상용 인프라 아키텍처 
* 카프카 클러스터 : 3개 이상의 브로커로 구성
* 스트림즈 : 2개 이상의 서버, 각 서버당 1개 스트림즈 애플리케이션 (스케일 아웃을 통해 처리량 늘릴 수 있음)
* 커넥트 : 서버 지표 데이터 저장용, 2개 이상의 서버, 분산 모드 커넥트로 구성 

--------------------------------------------

## 미러메이커2를 사용한 토픽 미러링 

1. config/ connect-mirror-maker.properties 파일 설정
* connect-mirror-maker.properties
    ---------------------------------
	clusters = A, B

	A.bootstrap.servers = a-kafka:9092
	B.bootstrap.servers = b-kafka:9092

	A->B.enabled = true
	A->B.topics = weather.seoul

	B->A.enabled = false
	B->A.topics = .*

	replication.factor=1

	checkpoints.topic.replication.factor=1
	heartbeats.topic.replication.factor=1
	offset-syncs.topic.replication.factor=1
	offset.storage.replication.factor=1
	status.storage.replication.factor=1
	config.storage.replication.factor=1
    ---------------------------------

2. 클러스터 A에 weather.seoul 토픽 생성 
$ bin/kafka-topics.sh --create --bootstrap-server a-kafka:9092 --partitions 3 --topic weather.seoul 

3. 미러메이커2 실행 
$ bin/connect-mirror-maker.sh config/connect-mirror-maker.peoperties
확인) $ bin/kafka-topics.sh --bootstrap-server b-kafka:9092 --list A.weather.seoul 
         $ bin/kafka-topics.sh --bootstrap-server b-kafka:9092 --list A.weather.seoul  --describe

$ bin/kafka-console-producer.sh --bootstrap-server a-kafka:9092 --topic weather.seoul
> sunny
> cloudy

$ bin/kafka-console-consumer.sh --bootstrap-server b-kafka:9092 --topic A.weather.seoul --from-beginning

## 상용 인프라 아키텍처 
* 미러메이커2 : 2개 이상의 서버 

=================================================================================

#  ETC 
* 로컬 테스트용 카프카 도커 이미지 실행
$ git clone https://github.com/AndersonChoi/kafka-docker.git
$ cd kafka-docker
$ docker-compose -f docker-compose-single-broker.yml up -d 
$ docker ps 
$ docker-compose down 

* AWS에 카프카 클러스터 설치하기(ec2, 3 brokers) 
https://blog.voidmainvoid.net/325