본문 바로가기

Apache Kafka

Producer에서 Topic 생성, Partition 개수 늘리기, Consumer 데이터 확인 및 기본 명령어 정리

# Producer에서 새로운 Topic 생성하기 및 기본 명령어 정리

1. 현재의 토픽 리스트를 확인하기

bin/kafka-topics.sh --list --bootstrap-server localhost:9092 --exclude-internal

- 결과 확인

2. 테스트용 데이터를 발생하는 SpingBoot에서 새로운 Topic을 생성 후 Kafka에 전송

1) 아래와 같이 새로운 클래스를 생성한다.

2) 위에서 만든 클래스의 내용중 아래와 같이 토픽 이름을 지정 한다.

여기서는 "NewTopic-01"로 지정을 하였다.

3) 테스트 프로그램에서 데이터를 흘린다.

- 대략 한번 데이터를 보내면 1000건정도 쌓인다.

4) Kafka 서버에서 Topic이 생성 되었는지 확인

- 명령어 : bin/kafka-topics.sh --list --bootstrap-server localhost:9092 --exclude-internal

- 결과 : NewTopic-01 토픽이 생성된것을 확인 할 수 있다.

5) 신규 생성된 Topic의 상세 정보 확인하기

- 명령어 : bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic NewTopic-01 --describe

- 결과 : 토픽 아이디와 파티션 카운터 그리고 리플리케이션팩터 등 정보를 확인 할 수 있다.

3. Kafka 설정 변경 테스트

1) 신규로 생성된 토픽의 파티션 갯수를 늘리기

- 파티션 갯수는 한번 늘리면 줄일수 없으므로 주의!

<partition 개수를 3개로 늘리기>
bin/kafka-topics.sh \
--bootstrap-server localhost:9092 \
--topic NewTopic-01 \
--alter \
--partitions 3

- 파티션 갯수 확인 (3개로 증가 확인)

bin/kafka-topics.sh \
--bootstrap-server localhost:9092 \
--topic NewTopic-01 \
--describe

Topic: NewTopic-01      TopicId: ZH95sNAxSDSyW2TXTP1DhQ PartitionCount: 3       ReplicationFactor: 1    Configs:
        Topic: NewTopic-01      Partition: 0    Leader: 0       Replicas: 0     Isr: 0
        Topic: NewTopic-01      Partition: 1    Leader: 0       Replicas: 0     Isr: 0
        Topic: NewTopic-01      Partition: 2    Leader: 0       Replicas: 0     Isr: 0

- 오류 날거 같은데 테스트... 늘린 파티션 수를 2개로 줄여보기 → 에러 발생

3개로 늘렸으니, 3개 이하로는 적용 불가 3개 이상만 가능 (파티션 수많큼 컨슈머 역시 추가 해줘야함.)

Error while executing topic command : Topic currently has 3 partitions, which is higher than the requested 2.

2) 카프카 설정 값 조회

- 일단 retention.ms를 1일로 변경하기

kafka retention은 로그 유지 기간을 의미한다.

<retention.ms를 1일로 변경하는 설정>
bin/kafka-configs.sh \
--bootstrap-server localhost:9092 \
--entity-type topics \
--entity-name NewTopic-01 \
--alter \
--add-config retention.ms=86400000

- 적용

Completed updating config for topic NewTopic-01.

- 정상적으로 변경 되었는지 확인하는 명령어
bin/kafka-configs.sh \
--bootstrap-server localhost:9092 \
--entity-type topics \
--entity-name NewTopic-01 \
--describe

- 결과
Dynamic configs for topic NewTopic-01 are:
  retention.ms=86400000 sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:retention.ms=86400000}

4. 신규로 생성한 NewTopic-01의 내용을 들여다 보기

- from-beginning 옵션으로 토픽이 생성된 시점부터 모든 데이터를 실시간으로 확인 할 수 있다. (Consumer)

<Topic을 tail 방식으로 확인, 옵션: >
./bin/windows/kafka-console-consumer.bat \
--bootstrap-server localhost:9092 \
--topic NewTopic-01 \
--from-beginning

- 테스트 화면은 아래와 같다. tail옵션을 준것과 같이 실행되는 실시간 데이터를 확인 할 수 있다.

control + c 를 통해 빠져 나올수 있다.

5. Kafka Consumer 관련 계속 추가 정리...

1) Kafka Topic에 저장된 데이터를 Key:Value 형태로 확인하기

<확인 명령어>
./bin/windows/kafka-console-consumer.bat \
--bootstrap-server localhost:9092 \
--topic NewTopic-01 \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.timestamp=true \
--property print.key=true \
--property print.value=true \
--from-beginning

- 결과 확인

key를 부여하지 않았기 때문에 null로 표시 된다.

6. 자주 사용하는 kafka cli 명령어 정리

1) Topic

명령어 설명 비고
./bin/windows/kafka-topics.bat \
--create --bootstrap-server localhost:9092 \
--partition 4 \
--replication-factor 1 \
--topic {topic_name}
토픽 생성  
./bin/windows/kafka-topics. bat \
--bootstrap-server localhost:9092 \
--topic {topic_name}  \
--alter\
--partitions 5
파티션 수 변경 실행중에 파티션 수 감소는 불가
./bin/windows/kafka-topics.bat \
--bootstrap-server localhost:9092 \
--topic {topic_name}  \
--alter \
--add-config {retention.ms=86400000}
토픽 옵션 추가 기존 옵션 존재시 변경하지 않음
./bin/windows/kafka-topics.bat \
--bootstrap-server localhost:9092 \
--list
토픽 리스트 조회  
./bin/windows/kafka-topics.bat \
--bootstrap-server localhost:9092 \
--describe \
--topic {topic_name}
토픽 상세 조회  
./bin/windows/kafka-topics.bat \
--bootstrap-server localhost:9092 \
--topic {topic_name} \
--delete
토픽 삭제 # 토픽 함부로 삭제하면 안된다...
해당 토픽을 구독 하고 있는 애들 때문에 장애 발생함...

 

2) Producer

명령어 설명 비고
./bin/windows/kafka-console-producer.sh \
--bootstrap-serverlocalhost:9092 \
--topic {topic_name}  {메세지 내용}
토픽에 메세지 전송 ByteArraySerializer로만 직렬화
./bin/windows/kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic {topic_name}  \
--property "parse.key=true" \
--property "key.separator=:" {키값}:{메세지내용}
토픽에 키값을 포함한
메세지 전송
separator의 값으로 키와 메세지 구분, separator를 지정하지 않으면 \t으로 구분

 

3) Consumer

명령어 설명 비고
./bin/windows/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic {topic_name}  \
--from-beginning
메세지 소비 --from-beginng 으로 처음부터 소비
./bin/windows/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic {topic_name} \
--property print.key=true \
--property key.separator=":" \
--from-beginning
메세지를 키와 함께 소비  
./bin/windows/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic {topic_name} \
--group {그룹네임} \
--from-beginning
컨슈머 그룹을
명시해서 소비
 
./bin/windows/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic {topic_name} \
--max-messages 100 \
--from-beginning
소비할 메세지수를
정해서 소비
 
./bin/windows/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic {topic_name} \
--partition 1 \
--from-beginnging
특정 파티션만 소비  
./bin/windows/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic {topic_name} \
--propery {옵션} \
--from-beginning

옵션을 추가해서 소비 옵션 종류

print.timestamp=true|false
print.key=true|false
print.offset=true|false
print.partition=true|false
print.headers=true|false
print.value=true|false
key.separator=<key.separator>
line.separator=<line.separator>
headers.separator=<line.separator>
null.literal=<null.literal>
key.deserializer=<key.deserializer>
value.deserializer=<value.
deserializer>
header.deserializer=<header.
deserializer>
Users can also pass in customized
properties for their formatter; more
specifically, users can pass in
properties keyed with 'key.
deserializer.', 'value.
deserializer.' and 'headers.
deserializer.' prefixes to configure
their deserializers.

 

4) Consumer Group

명령어 설명 비고
./bin/windows/kafka-consumer-groups.sh \
--bootstrap-serverlocalhost:9092 \
--list
컨슈머 그룹 리스트 조회  
./bin/windows/kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group {그룹이름} \
--describe
컨슈머 그룹 상세 조회  
./bin/windows/kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group {그룹이름} \
--delete
컨슈머 그룹 삭제  

 

일단 여기까지...

 

- 끝 -