# Producer에서 새로운 Topic 생성하기 및 기본 명령어 정리
1. 현재의 토픽 리스트를 확인하기
bin/kafka-topics.sh --list --bootstrap-server localhost:9092 --exclude-internal
- 결과 확인
2. 테스트용 데이터를 발생하는 SpingBoot에서 새로운 Topic을 생성 후 Kafka에 전송
1) 아래와 같이 새로운 클래스를 생성한다.
2) 위에서 만든 클래스의 내용중 아래와 같이 토픽 이름을 지정 한다.
여기서는 "NewTopic-01"로 지정을 하였다.
3) 테스트 프로그램에서 데이터를 흘린다.
- 대략 한번 데이터를 보내면 1000건정도 쌓인다.
4) Kafka 서버에서 Topic이 생성 되었는지 확인
- 명령어 : bin/kafka-topics.sh --list --bootstrap-server localhost:9092 --exclude-internal
- 결과 : NewTopic-01 토픽이 생성된것을 확인 할 수 있다.
5) 신규 생성된 Topic의 상세 정보 확인하기
- 명령어 : bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic NewTopic-01 --describe
- 결과 : 토픽 아이디와 파티션 카운터 그리고 리플리케이션팩터 등 정보를 확인 할 수 있다.
3. Kafka 설정 변경 테스트
1) 신규로 생성된 토픽의 파티션 갯수를 늘리기
- 파티션 갯수는 한번 늘리면 줄일수 없으므로 주의!
<partition 개수를 3개로 늘리기>
bin/kafka-topics.sh \
--bootstrap-server localhost:9092 \
--topic NewTopic-01 \
--alter \
--partitions 3
- 파티션 갯수 확인 (3개로 증가 확인)
bin/kafka-topics.sh \
--bootstrap-server localhost:9092 \
--topic NewTopic-01 \
--describe
Topic: NewTopic-01 TopicId: ZH95sNAxSDSyW2TXTP1DhQ PartitionCount: 3 ReplicationFactor: 1 Configs:
Topic: NewTopic-01 Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: NewTopic-01 Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: NewTopic-01 Partition: 2 Leader: 0 Replicas: 0 Isr: 0
- 오류 날거 같은데 테스트... 늘린 파티션 수를 2개로 줄여보기 → 에러 발생
3개로 늘렸으니, 3개 이하로는 적용 불가 3개 이상만 가능 (파티션 수많큼 컨슈머 역시 추가 해줘야함.)
Error while executing topic command : Topic currently has 3 partitions, which is higher than the requested 2.
2) 카프카 설정 값 조회
- 일단 retention.ms를 1일로 변경하기
kafka retention은 로그 유지 기간을 의미한다.
<retention.ms를 1일로 변경하는 설정>
bin/kafka-configs.sh \
--bootstrap-server localhost:9092 \
--entity-type topics \
--entity-name NewTopic-01 \
--alter \
--add-config retention.ms=86400000
- 적용
Completed updating config for topic NewTopic-01.
- 정상적으로 변경 되었는지 확인하는 명령어
bin/kafka-configs.sh \
--bootstrap-server localhost:9092 \
--entity-type topics \
--entity-name NewTopic-01 \
--describe
- 결과
Dynamic configs for topic NewTopic-01 are:
retention.ms=86400000 sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:retention.ms=86400000}
4. 신규로 생성한 NewTopic-01의 내용을 들여다 보기
- from-beginning 옵션으로 토픽이 생성된 시점부터 모든 데이터를 실시간으로 확인 할 수 있다. (Consumer)
<Topic을 tail 방식으로 확인, 옵션: >
./bin/windows/kafka-console-consumer.bat \
--bootstrap-server localhost:9092 \
--topic NewTopic-01 \
--from-beginning
- 테스트 화면은 아래와 같다. tail옵션을 준것과 같이 실행되는 실시간 데이터를 확인 할 수 있다.
control + c 를 통해 빠져 나올수 있다.
5. Kafka Consumer 관련 계속 추가 정리...
1) Kafka Topic에 저장된 데이터를 Key:Value 형태로 확인하기
<확인 명령어>
./bin/windows/kafka-console-consumer.bat \
--bootstrap-server localhost:9092 \
--topic NewTopic-01 \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.timestamp=true \
--property print.key=true \
--property print.value=true \
--from-beginning
- 결과 확인
key를 부여하지 않았기 때문에 null로 표시 된다.
6. 자주 사용하는 kafka cli 명령어 정리
1) Topic
명령어 | 설명 | 비고 |
./bin/windows/kafka-topics.bat \ --create --bootstrap-server localhost:9092 \ --partition 4 \ --replication-factor 1 \ --topic {topic_name} |
토픽 생성 | |
./bin/windows/kafka-topics. bat \ --bootstrap-server localhost:9092 \ --topic {topic_name} \ --alter\ --partitions 5 |
파티션 수 변경 | 실행중에 파티션 수 감소는 불가 |
./bin/windows/kafka-topics.bat \ --bootstrap-server localhost:9092 \ --topic {topic_name} \ --alter \ --add-config {retention.ms=86400000} |
토픽 옵션 추가 | 기존 옵션 존재시 변경하지 않음 |
./bin/windows/kafka-topics.bat \ --bootstrap-server localhost:9092 \ --list |
토픽 리스트 조회 | |
./bin/windows/kafka-topics.bat \ --bootstrap-server localhost:9092 \ --describe \ --topic {topic_name} |
토픽 상세 조회 | |
./bin/windows/kafka-topics.bat \ --bootstrap-server localhost:9092 \ --topic {topic_name} \ --delete |
토픽 삭제 | # 토픽 함부로 삭제하면 안된다... 해당 토픽을 구독 하고 있는 애들 때문에 장애 발생함... |
2) Producer
명령어 | 설명 | 비고 |
./bin/windows/kafka-console-producer.sh \ --bootstrap-serverlocalhost:9092 \ --topic {topic_name} {메세지 내용} |
토픽에 메세지 전송 | ByteArraySerializer로만 직렬화 |
./bin/windows/kafka-console-producer.sh \ --bootstrap-server localhost:9092 \ --topic {topic_name} \ --property "parse.key=true" \ --property "key.separator=:" {키값}:{메세지내용} |
토픽에 키값을 포함한 메세지 전송 |
separator의 값으로 키와 메세지 구분, separator를 지정하지 않으면 \t으로 구분 |
3) Consumer
명령어 | 설명 | 비고 |
./bin/windows/kafka-console-consumer.sh \ --bootstrap-server localhost:9092 \ --topic {topic_name} \ --from-beginning |
메세지 소비 | --from-beginng 으로 처음부터 소비 |
./bin/windows/kafka-console-consumer.sh \ --bootstrap-server localhost:9092 \ --topic {topic_name} \ --property print.key=true \ --property key.separator=":" \ --from-beginning |
메세지를 키와 함께 소비 | |
./bin/windows/kafka-console-consumer.sh \ --bootstrap-server localhost:9092 \ --topic {topic_name} \ --group {그룹네임} \ --from-beginning |
컨슈머 그룹을 명시해서 소비 |
|
./bin/windows/kafka-console-consumer.sh \ --bootstrap-server localhost:9092 \ --topic {topic_name} \ --max-messages 100 \ --from-beginning |
소비할 메세지수를 정해서 소비 |
|
./bin/windows/kafka-console-consumer.sh \ --bootstrap-server localhost:9092 \ --topic {topic_name} \ --partition 1 \ --from-beginnging |
특정 파티션만 소비 | |
./bin/windows/kafka-console-consumer.sh \ --bootstrap-server localhost:9092 \ --topic {topic_name} \ --propery {옵션} \ --from-beginning |
옵션을 추가해서 소비 | 옵션 종류 print.timestamp=true|false print.key=true|false print.offset=true|false print.partition=true|false print.headers=true|false print.value=true|false key.separator=<key.separator> line.separator=<line.separator> headers.separator=<line.separator> null.literal=<null.literal> key.deserializer=<key.deserializer> value.deserializer=<value. deserializer> header.deserializer=<header. deserializer> Users can also pass in customized properties for their formatter; more specifically, users can pass in properties keyed with 'key. deserializer.', 'value. deserializer.' and 'headers. deserializer.' prefixes to configure their deserializers. |
4) Consumer Group
명령어 | 설명 | 비고 |
./bin/windows/kafka-consumer-groups.sh \ --bootstrap-serverlocalhost:9092 \ --list |
컨슈머 그룹 리스트 조회 | |
./bin/windows/kafka-consumer-groups.sh \ --bootstrap-server localhost:9092 \ --group {그룹이름} \ --describe |
컨슈머 그룹 상세 조회 | |
./bin/windows/kafka-consumer-groups.sh \ --bootstrap-server localhost:9092 \ --group {그룹이름} \ --delete |
컨슈머 그룹 삭제 |
일단 여기까지...
- 끝 -
'Apache Kafka' 카테고리의 다른 글
Apache Kafka 실행 명령어 정리 (windows) (0) | 2024.04.23 |
---|---|
Kafka 설정 정리 (0) | 2023.12.14 |
Apache Kafka 주요 개념 정리 (0) | 2023.12.14 |
Windows에서 Apache kafka 실행하기 (0) | 2023.04.11 |
EKS에 Kafka를 설치 후 테스트 하기 (0) | 2022.09.26 |