카테고리 없음

[MS] Kafka 의 기본 구조

yooverd 2024. 2. 22. 23:57
728x90
반응형
SMALL

Kafka 란

 아파치 카프카(Apache kafka) 는 아파치 소프트웨어 재단이 스칼라로 개발한 오픈 소스 메시지 브로커 프로젝트로, 고성능 데이터 파이프라인, 스트리밍 분석, 데이터 통합 ​​및 미션 크리티컬 애플리케이션을 위해 사용하는 오픈 소스 분산 이벤트 스트리밍 플랫폼이다.

 

 위의 정의를 읽었을 때, 카프카가 어떤 역할을 하는지 자세히 이해가 가질 않았는데, 대충 '대용량-대규모 데이터를 효율적으로 처리할 수 있는 프레임워크 혹은 오픈소스 도구' 라고 보면 되는 것 같다. 

 

Kafka 의 등장 배경

2011년, 링크드인(Linked)에서 카프카를 개발하였다. 기존에는 단방향 통신으로 데이터를 생성하는 소스 애플리케이션에서 데이터가 최종 적재되는 타깃 애플리케이션으로 연동하는 방식으로 운영을 하였으나 점차 아키텍쳐가 크고 복잡해지면 문제를 겪게 되었다. 대용량 데이터를 안전하고 빠르게 처리하기 위해  데이터 처리를 위해 각각의 앱이 연결되는 방식이 아닌, 한 곳에 모아 처리가 가능한 중앙집중화된 형태의 구조를 고안해내었다. 

Kafka의 장점

높은 처리량

- 데이터를 묶음을 처리하므로 네트워크 통신 횟수를 줄여 비용을 낮출 수 있다.

- 배치를 이용하여 많은 양의 데이터를 묶음 단위로 처리할 수 있다.

- 파티션을 이용하여 데이터를 병렬처리할 수 있다.

확장성

- 가변적인 환경에서 안정적으로 확장 가능하도록 설계되어 데이터의 양에 따라 브로커의 수를 유동적으로 운영할 수 있다

      예시1)    1개 장비의 용량 한계 도달 -> 브로커 추가, 파티션 추가로 해결가능

      예시2)    컨슈머가 느림 -> 컨슈머 추가(+파티션 추가) 로 해결 가능

영속성

- 카프카는 전송받은 데이터를 메모리에 저장하지 않고 파일 시스템에 저장한다.

- 이 때, 운영체제의 페이지 캐시 메모리영역을 사용하여 파일 시스템에 데이터를 저장 전송하므로 처리량을 높은 것이다.

- 디스크 기반의 파일 시스템으로 인해 브로커 앱에 장애가 발생하더라도 안전하게 데이터를 재처리할 수 있다.

고가용성

-  레플리케이션을 이용하여 데이터를 복제 저장하므로 브로커에 장애가 발생했을 시, 복제된 데이터를 이용할 수 있다.


카프카의 기본 구조

Kafka의 기본 구조

 

주키퍼 : 카프카 클러스터를 관리하는 역할. 분산 메시지큐의 정보를 관리해주는 역할로, 카프카를 띄우기 위해서 반드시 주키퍼가 실행되어야 한다.

 

카프카 클러스터 : 메시지를 저장하는 저장소

브로커 : 카프카의 서버. 메시지를 나누어 저장, 이중화 처리, 장애 발생 시 서로 대체하는 역할을 한다

토픽 : 카프카에 저장되는 메시지의 단위. 하나의 토픽은 하나 이상의 파티션으로 구성된다

 

프로듀서 : 카프카 클러스터에 메시지를 보내는 역할

 

컨슈머 : 카프카에서 메시지를 읽어오는 역할

 

 

토픽과 파티션 그리고 오프셋

프로듀서와 컨슈머는 토픽을 기준으로 카프카에 메시지를 주고받는다. 즉, 토픽은 각 메시지를 알맞게 구분하기 위한 목적으로 사용된다. 한 개의 토픽은 한 개 이상의 파티션으로 구성된다. 

 

파티션은 메시지를 저장하는 물리적인 파일을 의미하며 appended-only 파일이다.

 

파티션에서 각 메시지의 저장 위치를 offset 이라고 부르는데, 각 프로듀서가 넣은 메시지는 파티션의 맨 뒤에 추가되며 컨슈머는 오프셋 기준으로 메시지를 순서대로 읽는다. 예를 들어 어떤 컨슈머가 특정 파티션의 3번 데이터 오프셋부터 읽는다면 이전의 0, 1, 2번 메시지는 읽을 수 없고, 3, 4, 5 번 메시지를 순차적으로 읽어온다. 이러한 순서 보장은 각 파티션 내에서 보장된다. 그리고 컨슈머가 메시지를 읽어도 메시지는 삭제되지 않으며 지정해놓은 설정에 따라 일정 시간이 지난 뒤 삭제된다.

파티션이 여러 개인 경우, 프로듀서는 어떤 파티션에 메시지를 저장할까? 또, 컨슈머는 어떤 파티션에서 메시지를 읽어올까?

 

프로듀서는 라운드로빈으로 돌아가면서 저장하거나  키로 파티션을 선택할 수 있는데, 키를 통해 파티션을 선택하는 경우, 지정된 키에 따라 메시지가 해당 파티션에 저장되므로 메시지의 순서가 유지된다.

 

하나의 컨슈머 그룹에 속한 컨슈머들은 같은 파티션을 공유할 수 없다는 특징이 있다. 위 사진의 경우, 컨슈머 그룹2의 컨슈머1과 컨슈머2는 파티션 1을 공유할 수 없다. 또한 파티션의 메시지는 한 컨슈머그룹을 기준으로 순차적으로 처리된다.

 

리플리카

리플리카는 파티션의 복제본으로, 복제수(replication factor)만큼 파티션의 복제본이 각 브로커에 생성된다. 예를 들어, 토픽 생성 시, 복제 수를 3으로 지정하면 동일한 데이터를 갖고있는 파티션이 서로 다른 브로커에 세 개가 생성된다. 리플리카는 하나의 리더와 나머지 팔로워로 구성되는데, 프로듀서와 컨슈머는 리더를 통해서만 메시지를 처리하고 팔로워는 데이터를 복제 저장하는 역할을 수행한다. 리플리카는 장애 대응 시 사용될 수 있는데, 리더가 속한 브로커에서 장애가 발생하면 다른 팔로워가 리더가 되어 데이터 처리를 수행할 수 있다.

 

Topic 1 : replication-facior = 1 / Topic 2 : replication-factor = 2 / Topic 3 : replication-factor = 3
출처 :  https://team-platform.tistory.com/11?category=829378

 

 


프로듀서

다음은 프로듀서로 메시지를 보내는 코드의 예시이다.

// 프로듀서 설정 정보 (브로커 목록, key-value 직렬화를 위한 정serializer, 배치 사이즈 등을 Properites 를 이용하여 지정)
Properties prop = new Properties();
props.put("bootstrap.servers", "kafka01:9092, kafka01:9092, kafka01:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

// 위의 Properties를 이용하여 KafkaProducer 객체 생성
KafkaProducer<Integer, String> producer = new KafkaProducer <> (prop);

// 프로듀서 객체의 send메소드의 ProducerRecord(카프카 브로커에 전송할 메시지)를 전달하여 메시지를 보냄
// ProducerRecord의 두가지 생성방법
producer.send(new ProducerRecord<>("topicname", "key", "value"));
producer.send(new ProducerRecord<> ("topicname", "value"));

// 프로듀서 사용 종료
producer.close();

 

 

프로듀서의 기본 흐름

0. KafkaProducer 객체의 send() 메서드를 통해 레코드 전송

1. 프로듀서가 Serializer를 이용하여 전송된 메시지를 byte 배열로 변환

2. 프로듀서가 Patitioner를 이용하여 메시지를 어느 토픽의 파티션으로 보낼지 결정

3. 버퍼에 변환된 byte 배열을 배치로 묶어 버퍼에 저장

4. Sender가 배치를 순차적으로 가져와 카프카 브로커로 전송함

출처: kafka 조금 아는 척하기 2

 

 

위의 과정에서  Sender는 배치가 꽉 차지 않아도 보낼 수 있는 조건만 만족한다면 순차적으로 배치를 브로커에 전송한다.  참고로 Sender는 별도 스레드로 동작한다. 따라서 Sender 가 브로커에 배치를 전송하는 동안 send() 메서드를 통해 들어온 레코드는 계속해서 배치에 누적된다. 

 

처리량과 밀접한 관련을 맺고 있는 Sender가 배치를 전송하는 최소한의 조건을 설정할 수 있는 주요 속성으로 batchsize 와 linger.ms 를 꼽아볼 수 있겠다.

 batchsize는 배치의 크기를 의미하는데, 만일 지정한 배치 크기만큼 꽉 차게 되면 배치를 전송하게 된다. 배치의 사이즈가 작다면 한 번에 보낼 수 있는 메시지의 양이 감소하므로 전송 횟수가 증가하고 처리량이 떨어지게 된다.

 linger.ms 는 전송시간을 의미하고 기본값은 0이다. 따라서 대기시간을 설정하지 않으면 배치가 덜 차도 브로커로 배치를 전송한다. 대기시간을 주게 되면 그 시간만큼 배치에 메시지를 누적할 수 있으므로 한 번의 처리에 더 많은 메시지를 보낼 수 있고 이를 통해 전반적인 처리량이 증가하는 효과를 기대할 수 있겠다.

 


프로듀서의 전송 결과 확인

단순히 프로듀서의 send() 메서드만 활용할 경우 전송 실패 여부를 알 수 없다. 따라서 프로듀서의 전송 결과를 확인하고 싶은 경우 send 메시지가 반환하는 Future 객체를 활용하거나 send() 메서드에 콜백 객체를 전달하는 등의 방법을 사용해야 한다.

 

 Future 객체를 활용하는 경우, get 메서드를 활용하여 성공 결과를 할 수 있다.

Future<RecordMetadata> f = producer.send(new ProducerRecord<> ("topic", "value"));
try {
    RecordMetadata meta = f.get(); // 블로킹
    
} catch (ExecutionException ex) {
}

 

 

한 가지 주의할 점이 있다면 get() 메서드가 사용되는 시점에 스레드가 블로킹 처리가 된다는 점이다. 따라서 배치에 메시지 지가 하나씩 들어가게 되어 배치 효과가 떨어져 처리량 저하를 일으킬 수 있다. 그렇지만 각 전송 건마다 확실하게 전송결과를 확인할 수 있다는 장점 또한 존재한다. 따라서 이 방식은 처리량이 낮아도 되는 경우에만 사용한다고 한다.

 

 

Callback 객체를 전달하여 전송 결과를 확인하는 경우 코드의 작성은 다음과 같다.

producer.send(new ProducerRecord<>("simple", "value"),
    new Callback() {
        @Override
        public void onCompletion(RecordMetadata metadata, Exception ex) {
        }
    });

 

send() 메서드에 전달한 콜백 객체는 전송이 완료되면 그 결과를 onCompletion() 메서드로 받는데, 이때 Exception  객체를 받는다면 전송이 실패되었음을 알 수 있다. 따라서 전송 실패 여부에 따라 알맞은 후처리를 할 수 있다. 이 방식은 Future의 get() 메서드와 달리 스레드가 블로킹되지 않으므로 batch에 메시지가 정상적으로 누적되므로 처리량 저하를 발생시키지 않는다.


전송 보장과 acks 

메시지 전송 실패 여부를 확인하는 데 있어 중요한 프로듀서의 설정 중 하나가 바로 acks 이다.

acks의 값은 0, 1 그리고 all (혹은 -1) 으로 지정할 수 있는데, 각 값에 따라 메시지 전송 보장도가 달라진다.

 

 만일 acks 의 값을 0 으로 지정한다면 프로듀서는 전송 후 서버의 응답을 기다리지 않는다. 이러한 설정은 처리량은 증대시킬 수 있으나 전송을 보장하지 않으므로 메시지가 유실되더라도 실패 여부를 알 수 없다.

 만일 acks 의 값을 1 로 지정한다면 프로듀서는 메시지가 파티션의 리더에 저장될 때 성공 응답을 받는다. 따라서 리더에 메시지를 저장했으나 아직 나머지 팔로워에는 복제되지 않은 상태에서 리더에 장애 발생 시, 메시지 유실 가능성이 생긴다. 

 acks의 값을 all 또는 -1 로 지정하는 경우 프로듀서는 모든 리플리카에 메시지가 저장될 때 성공 응답을 받는다. 이때, ' 모든 리플리카 ' 의 정의는 브로커의 min.insync.replicas 설정에 따라 달라진다. 예를 들자면, 리플리카의 개수가 3이고 브로커의 min.insync.replcias 의 값이 2 일 때, acks = all 이라면 리더와 하나의 팔로워가 복제를 성공했을 시 성공 응답을 받게 된다. 따라서 acks = all 설정 시, 리플리카의 개수와 min.insync.replicas 의 개수가 동일하다면 팔로워 한 개라도 장애가 발생했을 시 저장 실패를 응답받으므로 주의가 필요하다.


프로듀서 전송 실패 에러 유형과 대응

그렇다면 주로 어떤 상황에서 프로듀서에 에러가 발생하게 될까. 프로듀서의 에러를 전송 과정을 기준으로 크게  전송 전에 발생한 실패, 전송 중에 발생한 실패로 나눠볼 수 있겠다.

 전송 전에 실패한 경우는 보통 직렬화를 실패하거나 프로듀서 자체 요청크기 제한이 초과된 경우, 프로듀서 버퍼가 꽉 차서 기다린 시간이 최대 대기시간을 초과한 경우를 생각해 볼 수 있다. 전송 중에는 일시적인 네트워크 오류 등으로 전송 타임 아웃, 리더가 다운되어 새 리더를 선출하는 도중 전송을 받는 경우, 브로커에서 설정한 메시지 크기 한도가 초과되는 경우들이 있겠다. 이렇게 전송이 실패되는 경우 다음과 같은 방법으로 전송 실패에 대응해 볼 수 있을 것이다.

 

  첫 번째 대응 방법은 기록이다. 에러 발생 즉시 대응하는 것이 아닌 추후 처리를 위해 기록해 두는 개념이다. 별도 파일이나 DB를 이용해서 실패한 메시지를 기록해 두고 추후 보정 작업을 진행하면 된다. send() 메서드에서 익셉션 발생 시, send()  메서드에서 전달한 콜백 객체가  익셉션을 받는 경우, send() 메서드가 리턴한 Future의 get() 메서드에서 익셉션 발생 시에 기록을 해두는 것이 권장된다.   

 

 두 번째 대응 방법은 재시도이다. 참고로 프로듀서는 기본적으로 브로커 전송 과정에서 에러가 발생하면 지속적으로 재시도를 수행한다. send() 메서드에서 익셉션 발생 시, 콜백 메서드에서 익셉션을 받을 경우 send()를 재호출 하는데, 특별한 이유가 없다면 재시도가 끝없이 발생하는 것을 막아주어야 한다. 재시도를 함으로써 새롭게 보내야 할 메시지가 밀리게 되므로 재시도를 일정 시간이나 횟수로 제한할 것을 권장한다고 한다.

 

 한 가지 더 주의해야 할 점이 있다면, 메시지 저장은 잘 되었으나 브로커 응답이 늦게 와서 재시도가 발생하는 경우엔 메시지 중복 발송 가능성이 있다는 점이다. 이 경우 enable.idemptotence 속성을 지정하면 중복 전송될 가능성을 줄여줄 수 있다고 한다.

 

또한, 재시도로 인해 전송하는 메시지의 순서가 바뀔 수도 있다. 만일 프로듀서에서 배치A 배치B 배치C 를 한꺼번에 전송하고 있다고 가정해보자. 배치A 의 전송이 실패하고, 배치B 와 배치C 는 전송이 성공했다면 이후 전송이 재시도된 배치A 가 가장 마지막 순서로 브로커에 저장될 것이다. 따라서 브로커 저장 순서가 BCA 가 되는 것이다. 프로듀서의 max.in.flight.requests.per.connection 는 블로킹 없이 한 커넥션에서 전송할 수 있는 최대 전송적인 요청 개수를 의미한다. 이 값이 1보다 크면 재시도 시점에 따라 메시지 순서가 바뀔 수 있으므로 만일 전송 순서가 중요하다면 이 값을 1로 지정해야 한다.


컨슈머

다음은 컨슈머로 토픽 파티션에서 레코드를 조회하는 코드의 예시이다.

// Properties 설정지정(서버, 그룹id, 역직렬화 등 지정)
Properties prop = new Properties();
prop.put("bootstrap.servers", "localhost9092");
prop.put("group.id", "group1");
prop.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
prop.put("value.deserializer", "org.apache.kafka.comon.serialztion.StringDeserializer");

// 설정을 이용하여 KafkaConsumer  객체 생성
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, Strng>(prop);
// KafkaConsumer의 subscribe 메서드 호출 (구독할 토픽목록을 전달)
consumer.subscribe(Collections.singleton("simple")); // 토픽 구독

// 특정 조건을 충족하는 동안 consumer의 poll메소드 호출 : 일정시간동안 대기하다가
// 브로커로부터 컨슈머레코드 목록을 읽어옴
// 읽어온 목록을 루프를 돌며 사용함
while(조건) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMilis(100));
    for (ConsumerRecord<String, String> record : records) {
        System.out.println(record.value() + "." + record.topic() + "."  + record.partition() + "." + record.offset());
    }
}

// 사용 완료. 종료
consumer.close();

 

 

파티션의 개수와 과 컨슈머의 개수

컨슈머는 컨슈머 그룹 단위로 파티션이 할당된다. 앞서 언급했던 것과 같이 한 그룹에서 하나의 파티션은 두 개 이상의 컨슈머가 공유할 수 없다. 따라서 만일 처리량이 떨어져 컨슈머의 개수를 증가하게 된다면 파티션의 개수도 함께 늘려야 한다. 그렇지 않으면 일하지 않는 컨슈머가 생기게 된다.

 

커밋과 오프셋

카프카의 파티션으로부터 메시지를 가져오는 데 사용되는 poll 메서드는 레코드에 저장된 offset 이후의 레코드를 읽어오고 마지막으로 읽은 레코드의 offset을 커밋한다. 

 

 오프셋이란 특정 파티션 내에서 할당된 고유한 순서 번호이다. 컨슈머는 각 파티션에서 오프셋을 이용하여 마지막으로 읽은 메시지를 확인하여 순차적으로 메시지를 읽어올 수 있도록 도우며 어디까지 메시지를 처리했는지 기록할 수 있다.

 

 

따라서 카프카의 파티션으로부터 메시지를 가져오는데 사용되는 poll 메소드는 이전에 커밋한 오프셋이 있다면 해당 오프셋 이후의 메시지를 읽어오고 마지막으로 읽어온 레코드의 offset을 커밋한다. 만일 최초 접근이거나 커밋된 오프셋이 없는 경우에는 설정에 따라 어떤 메시지를 읽어올지 결정한다. 가장 처음 오프셋을 사용하고 싶다면 earliest, 가장 마지막 오프셋을 사용하고 싶다면 latest (기본값)을 설정하고 만약 컨슈머 그룹에 대한 이전 커밋이 없을 때 익셉션을 발생시키고 싶다면 none을 주면 된다. 참고로 none 은 자주 사용되지 않는다고 한다.

// 오프셋이 없는 경우 가장 최근 메시지를 읽도록 latest 설정
prop.put("auto.offset.reset", "latest");

 

 

컨슈머 주요 조회 설정

그 밖의 컨슈머의 조회방식에 비교적 크게 영향을 미치는 설정들은 다음과 같다.

 

 fetch.min.bytes는 조회 시 브로커가 전송할 최소 데이터의 크기를 지정하는 데 사용되는 설정이다. 이 설정값 이상의 데이터가 쌓일 때까지 브로커는 대기하고 있다가 데이터를 준다. 따라서 해당 값이 크다면 대기시간은 늘어나겠지만 처리량은 증가하는 효과를 볼 수 있다.

 

 fetch.max.wait ms는  조회 시 브로커가 전송할 데이터가 최소 크기가 될 때까지 기다릴 시간을 지정하는데 사용되는 설정다. 기본값은 500이며 poll() 메소드의 대기시간과는 다르다. 해당 옵션은 브로커 리턴할 때까지의 대기 시간을 의미한다.

 

max.partition.fetch.bytes는 파티션 당 서버가 리턴할 수 있는 최대 크기를 지정하는 옵션으로 기본값은 104857(1MB)이며 최대 크기가 넘어가면 바로 리턴한다.

 

enable.auto.commit은 커밋의 자동여부를 설정하는 옵션이다. false를 주면 수동으로 커밋을 실행할 수 있고 true(기본값)를 주면 일정 주기로 컨슈머가 읽은 오프셋을 커밋한다. 이 자동 커밋 주기는 auto.commit.intervals.ms 옵션으로 지정할 수 있다. 기본값은 5000(5초)이며 poll(), close() 메서드 호출 시에도 자동으로 커밋이 실행된다.


수동커밋

그렇다면 수동 커밋을 설정한 경우에는 어떻게 커밋을 해야 할까? 

수동 커밋의 방식은 크기 동기 커밋, 비동기 커밋으로 나누어 볼 수 있다.

 

다음은 commitSync()를 통한 수동 커밋이다. 참고로 이 경우엔 커밋에 실패한 경우에만 Exception이 발생한다.

ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
    // 처리
}
try {
    consumer.commitSync();
} catch (Exception ex) {
    // 커밋 실패시 에러 발생
}

 

다음은 commitAsync()를 통한 비동기 커밋이다. 비동기이므로 코드 자체에서 실패 여부를 알 수 없으며, 성공 실패 여부를 알고 싶다면 콜백 객체를 받아서 처리해야 한다.

ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
    // 처리
}
consumer.commitAsync(); // commitAsync(OffsetCommitCallback callback)

 

 

 

재처리와 순서 

만일 컨슈머가 메시지를 정상적으로 메시지를 읽어오지 못했다면 재처리를 해야 한다. 메시지를 재처리해야 할 때는 몇 가지 유의해야 할 사항이 있다. 

 

 컨슈머가 동일한 메시지를 조회해 올 가능성이 존재한다. 일시적으로 커밋이 실패하거나, 새로운 컨슈머를 추가 혹은 삭제하하여 리밸런스가 발생하는 경우, 컨슈머가 동일한 메시지를 조회해 올 가능성이 있다. 따라서 컨슈머는 멱등성(idempotence)을 고려해야 한다. 멱등성이란 어떤 작업이 한 번 이상 적용되더라도 결과가 변하지 않는 특성을 의미하는데 말하자면 동일한 작업을 여러 번 수행하더라도 처음 한 번만 수행한 것과 동일한 결과를 얻어야 한다는 말이다. 예를 들어,  '조회수 1 증가 -> 좋아요 1 증가 -> 조회수 1 증가' 의 메시지를 재처리할 때, 멱등성을 고려하지 않는다면 재처리 후 조회수가 2가 되는 것이 아닌 4 라는 결과를 받을 위험이 존재한다는 것이다.

 


컨슈머-브로커  연결 유지 관리

 카프카는 컨슈머와 브로커 간의 연결을 유지하고 관리하기 위해 세션 타임아웃과 하트비트를 이용한다.

 

  세션 타임아웃은 컨슈머와 브로커 간의 세션을 유지하는 시간을 의미한다. 세션 타임아웃으로 지정한 시간 동안 컨슈머가 브로커에게 하트비트를 보내지 않으면 브로커는 컨슈머에 이상이 있다고 판단하여 컨슈머를 세션에서 제거하고 재할당한다. 하트비트는 컨슈머가 브로커에게 활성 상태임을 알리는 신호 메시지로 주기적으로 브로커에게 하트비트 메시지를 보내 연결이 활성 상태임을 알린다. 브로커는 이 하트비트를 수신하면 해당 컨슈머의 세션을 유지하고 세션 타임아웃을 재설정한다. 

 

 이러한 매커니즘은 컨슈머가 동작하지 않거나 브로커와의 연결이 끊어진 경우를 대비하는데 효과적으로 작용하며 카프카에서 컨슈머의 신뢰성과 안정성을 유지할 수 있다.

 

세션 타임아웃 시간을 설정하는 옵션은 session.timemout.ms (기본값 10초) 이며, 하트비트의 전송 주기를 설정하는 옵션은 heartbeat.intervals.ms (기본값 3초) 이다. 하트비트의 전송 주기는 세션 타임아웃 시간의 1/3 이하로 설정하는 것이 권장된다고 한다. 또한 max.poll.intervals.ms 옵션을 통해 최대 호출 간격을 지정할 수 있는데, 여기서 지정한 시간이 초과될 때까지 poll() 메서드를 사용하지 않으면 해당 컨슈머를 컨슈머그룹에서 삭제하고 리밸런스를 진행한다.


종료처리

 

컨슈머는 주로 무한루프 내부에서 poll() 메서드로 레코드를 읽어오는 형식으로 코드가 작성된다. 이 루프를 벗어나는 것이 관건인데, 다른 쓰레드에서  wakeup() 메서드를 호출해서 안정적으로 컨슈머 종료 처리가 가능하다. 즉, 다른 쓰레드에서 wakeup() 메서드가 호출되면 루프를 돌고 있던 컨슈머의 poll() 메서드가 WakeupException을 발생시키고 예외처리를 통해 close()  메서드로 컨슈머를 종료할 수 있다. 다음은 앞에서 소개한 방법의 코드 예시이다.

KafkaConsumer <String, String> consumer = new KafkaConsumer<String, String> (prop);
consumer.subscribe(Collections.singletone("simple"));
try {
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofSecons(1)); // wakeup() 호출 시 익셉션 발생
        // ... records 처리
        try {
            consumer.commitAsync();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
} catch (Exception ex) {
    // ...
} finally {
    consumer.close();
}

 

 

컨슈머는 쓰레드에 안전하지 않다. 따라서 종료를 위한 wakeup() 메서드를 제외하고 여러 쓰레드에서 동시에 사용하지 않는 것이 권장된다.

 

 

참고 자료

 

Apache Kafka

Apache Kafka: A Distributed Streaming Platform.

kafka.apache.org

 

 

아파치 카프카 - 위키백과, 우리 모두의 백과사전

위키백과, 우리 모두의 백과사전. 아파치 카프카(Apache Kafka)는 아파치 소프트웨어 재단이 스칼라로 개발한 오픈 소스 메시지 브로커 프로젝트이다. 이 프로젝트는 실시간 데이터 피드를 관리하

ko.wikipedia.org

 

 

 

아파치 카프카란 무엇일까?

백엔드 아키텍처에서 자주 보였던 플랫폼인 아파치 카프카가 무엇인지 궁금했다. 포춘 100대 기업에서 80개 기업 이상이 도입할 정도이고, 국내 서비스 기업들도 많이 사용하고 있다. 카프카가

ssdragon.tistory.com

 

 

카프카(Kafka)의 이해

대용량 게임로그 수집을 위해 Elastic Stack을 도입하게 되었는데, 중간에 버퍼역할(메시지큐)을 하는 Kafka에 대서 알아보려고 한다. 메시지큐? 메시지 지향 미들웨어(Message Oriented Middleware: MOM)은 비

team-platform.tistory.com

 

 

 

아직 아무것도 모른 주니어 개린이가 정리한 글입니다. ( •́ ̯•。̀ )

만일 잘못 이해했거나 서술한 부분은 댓글로 한 수 가르쳐 주신다면 정말 감사드리겠습니다.

728x90
반응형
LIST