본문 바로가기

Apache Kafka

Kafka 설정 정리

# Kafka 설정 옵션

Zookeeper Config (zookeeper.properties)

옵션 설명
dataDir=/tmp/zookeeper 주키퍼의 상태, 스냅션, 트랜잭션 로그들을 저장 및 업데이트하는 디렉토리의 경로.
dataLogDir=/data/zookeeper/logs 트랜잭션 로그를 저장하는 디렉토리.
특별한 설정을 하지 않으면 dataDir에 저장된다.
성능상 다른 디스크로 분리하는것이 좋다.
clientPort=2181 클라이언트로 요청을 받기 위한 포트.
initLimit=5 초기에 팔로워가 리더에 접속하거나 데이터를 동기화 시키기 위한 시간이며,
단위는 tickTime 이며 initLimit * tickTime으로 계산된다.
syncLimit=2 follower가 leader로부터 얼마나 sync에 뒤쳐질 수 있는지를 제한하는 timeout 단위.
이 시간동안 sync가 안되면 해당 팔로워는 클러스터에서 제외된다.
maxClientCnxns=0 클라이언트로 부터 동시에 접속할 수 있는 연결 수를 지정.
연결수는 클라이언트 IP당 개수이며, 기본값은 10이며 0은 무제한이다.
admin.enableServer=false 주키퍼는 default로 admin server를 기동하는데에 8080 port를 사용,
주키퍼 기동 시 port 충돌을 방지하기 위해 false 로 사용.
admin.serverPort=8080 admin 서버 기동시 포트 정보이다.
server.1=xxx.xx.xx.xxx:2888:3888
server.2=xxx.xx.xx.xxx:2888:3888
server.3=xxx.xx.xx.xxx:2888:3888
주키퍼 클러스터(앙상블) 구성을 위한 서버 설정이며,
server.myid(서버를 식별하는 ID) 형식으로 사용.
IP는 각 서버의 IP를 입력하고, 2888 포트는 follower가 leader에 접속하기 위한 포트이며,
3888 포트는 leader를 선출하는데 사용되는 포트
tickTime=2000 주키퍼가 사용하는 기본 시간 단위(milliseconds)로, heartbeats를 보내는데에 사용된다. 
default는 2000. (2초)

initLimit, syncLimit 두 가지 timeout들은 모두 앞서 설정한 tickTime의 시간 단위를 사용한다. 즉, 위 설정에서 initLimit의 timeout은 5 ticks이며 tickTime이 2000 milliseconds이기 때문에, '10초'로 설정된 것이다.

Kafka Config (server.properties)

옵션 설명
broker.id=0
The id of the broker. This must be set to a unique integer for each broker.
broker.id.generation.enable=true 해당 서버에서 사용할 브로커의 ID이며, 각 브로커의 고유 정수로 설정.
broker.id.generation.enable 옵션을 사용하여 서버에서 브로커 id 자동 생성 활성화.
listeners=PLAINTEXT://xxx.xx.xx.xxx:9092,EXTERNAL://xxx.xx.xx.xxx:9093 Broker가 사용하는 호스트와 포트를 지정.
내부 브로커들 간의 통신을 위한 엔드포인트와 외부 클라이언트를 위한 엔드포인트를 구분하려면 advertised.listeners 옵션을 사용.
advertised.listeners=PLAINTEXT://your.host.name:9092
# Listener name, hostname and port the broker will advertise to clients.
# If not set, it uses the value for "listeners".
listener.security.protocol.map=PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT 위 listeners 옵션에서 사용된 PLAINTEXT 는 보안 프로토콜이 아니기 때문에 보안을 위해서 해당 옵션을 설정해야한다.
num.network.threads=3 서버에서 네트워크로부터 요청을 받고 네트워크로 응답을 전송할 때 사용할 스레드 수.
(네트워크 요청을 처리하는 Thread의 개수)
num.io.threads=8 서버에서 request를 처리할 때 사용하는 스레드 수.
(
I/O가 생길때 마다 생성되는 Thread의 개수)
socket.send.buffer.bytes=102400 네트워크 요청을 처리할 소켓 서버가 사용하는 수신 메모리 사이즈.
socket.receive.buffer.bytes=102400 네트워크 요청을 처리할 소켓 서버가 사용하는 송신 메모리 사이즈.
socket.request.max.bytes=104857600
서버가 받을 수 있는 최대 요청 사이즈이며,
JAVA의 Heap 보다 작게 설정해야 함.
log.dirs=/tmp/kafka-logs 로그 파일을 저장할 쉼표로 구분된 디렉터리 목록.
- 로그 파일 종류
1) Log Segment File(.log) : 전송되는 topic 메시지와 같이 실제 메타데이터가 저장되는 파일.
2) Index File(.index) : offset 번호를 가지고 해당 데이터를 빨리 찾기위한 인덱스 파일.
3) Time-based Index File(.timeindex) : timestamp를 기반으로 데이터를 빨리 찾기위한 인덱스 파일.
4) leader-epoch-checkpoint : 리더가 변경이 되었을 때 offset 정보를 저장하는 파일.
num.partitions=1 토픽당 파티션의 수를 의미하며, 
입력한 수만큼 병렬처리를 할 수 있지만 데이터 파일도 그만큼 늘어난다.
num.recovery.threads.per.data.dir=1 기동 시 로그 복구와, 셧 다운 시 플러시에 사용할 데이터 디렉토리 당 스레드 수.
장애에 빠르게 대응하고자 한다면 높은 값을 설정해야한다.
offsets.topic.replication.factor=1

오프셋 토픽(__consumer_offsets)의 replication factor를 지정.
클러스터 크기가 이 값보다 작으면 internal topic 생성에 실패한다.
__consumer_offsets : 각 topic에 대한 커밋된 오프셋에 대한 정보를 저장하는 데 사용된다.
default.replication.factor=1 자동으로 생성된 토픽에 대한 기본 복제 인수(factor)다.
기본값은 1이다.
transaction.state.log.replication.factor=1 트랜잭션 토픽(__transaction_state)의 replication factor를 지정.
클러스터 크기가 이 값보다 작으면 internal topic 생성에 실패한다.

카프카 트랜잭션은 프로듀서와 컨슈머에서 옵션을 설정해야 한다.
__transaction_state : 이 topic은 프로듀서가 보내는 데이터의 트랜젝션을 기록하는데 사용되며, 기록된 데이터는 트랜젝션이 활성화된 컨슈머가 데이터를 가져가는데 사용한다.
transaction.state.log.min.isr=1 트랜잭션 토픽에 대한 min.insync.replicas 설정을 재정의함.
min.insync.replicas=2 프로듀서가 acks=all(-1)로 설정하여 메시지를 보낼 때,
write를 성공하기 위한 최소 복제본의 수.

ex) N개의 Replica가 있고 해당 옵션이 2일 경우, N-2개의 장애를 허용할 수 있음.
(보통 replica가 3일때, 2로 많이 사용)
acks=all 프로듀서가 kafka에 데이터를 전달하고 데이터를 잘 받았는지 확인하는 옵션.

1) acks=0 : 프로듀서가 데이터를 전달한 후 확인하지 않는 것.
리더 파티션은 데이터가 저장된 이후 몇번째 오프셋에 저장되는지 리턴하게 되는데, 0으로 설정하면 이를 받지 않고 전달만 하게 됨.
이렇게 지정하는 경우 retries 옵션도 무의미하게 되며 데이터 유실이 발생하는 경우가 발생할 수 있지만 속도는 훨씬 빨라진다. (데이터보다 속도에 중점을 두는 경우 사용할 수 있는 옵션)

2) acks=1 : 데이터를 전달하고 리더 파티션에 정상적으로 적재되었는지를 확인하는 옵션. 리더 파티션에 적재가 되었다는 응답을 받지 못하면 정상적으로 전달되었다고 판단하지 않으며, 데이터를 재전송하게 됨.
그렇다고 데이터 유실이 발생하지 않는 것은 아닌데, 리더 파티션으로 데이터를 전달한 직후 팔로워 파티션과 데이터가 동기화되기 전에 리더 파티션에 문제가 발생하면 데이터의 유실이 발생할 수 있음.

3) acks=all(-1) : 리더 파티션과 팔로워 파티션에 데이터가 모두 적재되었는지를 확인하는 옵션으로 당연히 0, 1 옵션을 사용하는 경우보다 속도가 느리지만 그만큼의 안정성을 보장하게 됨.
delete.topic.enable=true --delete --topic 명령을 사용하기 위해 해당 옵션을 true로 설정해야 함.
log.retention.hours=168 로그 파일(메시지)의 수명. 기본값은 168(7일).
hours 단위로 지정하는 것이 아닌 분단위 minutes, 밀리초단위 ms 설정도 지원.
일반적으로 3일 정도로 설정.
log.segment.bytes=1073741824 로그 파일의 최대 크기. 이 크기에 도달하면 새 로그 세그먼트가 생성됨.
log.retention.check.interval.ms=300000 로그를 삭제하기 위한 check 간격으로 ms단위임.
zookeeper.connect=zookeeper-01:2181,zookeeper-02:2181,zookeeper-03:2181 zookeeper.connect=localhost:2181
연결된 zookeeper 서버 정보
zookeeper.connection.timeout.ms=18000 주키퍼 연결 최대 대기 시간
group.initial.rebalance.delay.ms=0
그룹 코디네이터가 새 그룹에서 처음 리밸런스를 수행하기 전에, 더 많은 컨슈머가 그룹에 들어올 수 있도록 기다리는 시간. 
더 오래 기다리면 앞으로 리밸런스를 덜 할 순 있지만, 리밸런스를 시작하기까지 시간이 더 오래 걸린다.
auto.create.topics.enable=false 브로커 서버의 토픽 자동 생성을 활성화 함.
Connect의 topic.creation.enable 옵션과 독립적이며, Connect의 옵션이 우선 적용됨.
confluent.topic.replication.factor=1 Confluent Control Center 모니터링을 위한 옵션.

Connect (connect-distributed.properties)

옵션 설명
bootstrap.servers=kafka-broker-01:9092,kafka-broker-02:9092,kafka-broker-03:9092,kafka-broker-04:9092,kafka-broker-05:9092 bootstrap.servers=localhost:9092
카프카 클러스터에 대한 초기 커넥션을 구축하는 데 사용할 호스트/포트 쌍으로 생성.

kafka broker 서버 = bootstrap 서버.
group.id=connect-cluster connect를 같은 클러스터로 묶기 위한 그룹 설정.
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
카프카 커넥트 포맷과 카프카에 기록한 직렬화된 포맷 간을 변환할 때 사용할 컨버터 클래스.
카프카에서 쓰거나 읽은 메세지 키 포맷은 이 클래스가 제어하며, 컨버터는 커넥터와는 독립적이기 때문에, 사용하는 커넥터와는 무관하게 어떤 직렬화 포맷으로도 작업가능.
key.converter.schemas.enable=true
value.converter.schemas.enable=true
key/value값이 내부 schema와 data를 모두 포함하는 복합 객체로 처리되도록 하는 설정.
offset.storage.topic=connect-offsets connect offset을 저장할 kafka topic명.
offset.storage.replication.factor=1 offset을 저장하는 토픽을 생성할때 사용할 replication factor. default는 1
config.storage.topic=connect-configs connector 설정을 저장할 kafka topic명
config.storage.replication.factor=1 설정을 저장하는 topic을 생성할 때 사용할 replication factor. default는 1
status.storage.topic=connect-status connector와 task 상태를 저장할 topic명.
status.storage.replication.factor=1 상태를 저장하는 topic을 생성할 때 사용할 replication factor. default는 1
# status.storage.partitions=5
상태를 저장하는 topic 파티션 수
offset.flush.interval.ms=10000 task에서 offset 커밋을 시도하는 간격
plugin.path=/home/plugins plugin이 들어있는 path 리스트.
connector.client.config.override.policy=All 커넥터가 broker의 모든 설정을 바꿀 수 있도록 All로 정의.
topic.creation.enable=true source connector에서 사용하는 topic의 자동 생성을 허용할지 여부.

- 엄청 많은데...

- 일단 중요한거만 체크해서 정리하자.