DevOps/Kafka

[Kafka] 카프카 개요(카프카 토픽/프로듀서의 역할과 개념)

didue 2023. 11. 20. 13:09
반응형

Apache Kafka

  • 개요
    소스/타겟 애플리케이션 증가로 데이터 전송라인 복잡 및 파편화 이슈 발생
    이에 따른 유지보수 어려움과 복잡함을 해결하기위해 나옴

[이미지출처 데브원영]

kafka에서 queue와 같은 역할을 하는 topic에 데이터를 넣는 역할은 producer가, 가져가는 역할은 consumer가 한다.(producerconsumer는 라이브러리 형태로 제공)

카프카는 낮은 지연과 높은 처리량을 지원하기때문에 빅(Big) 데이터 처리에는 kafka를 도입하는게 유리함

 

Kafka Topic

카프카에는 다양한 데이터가 들어갈 수 있는데, 그 공간을 topic이라고 함 (AMQP와는 동작 방식이 상이함)

 

kafka에서는 topic을 여러개 생성할 수 있는데, table과 파일시스템의 폴더와 유사한 성질을 가짐
topicproducer가 넣고 consumer가 가져가는 형태로 이용됨.
목적에 따라 이름이 부여되는데 클릭로그, send_sms, location_log 등 이용하는 곳을 정확히 명시하는게 좋음

 

 

[이미지출처 데브원영]

하나의 topic은 여러개의 partition으로 구성될 수 있으며, 파티션의 번호는 0번부터 시작하여 queue와 같이 데이터가 partition 끝에서부터 저장됨.

 

 

consumer는 파티션의 데이터가 오래된 순서부터 가져가게 되는데, 이때 consumerrecord를 가져가도 데이터가 삭제되지 않음.

[이미지출처 데브원영]

partition이 늘어나면 추가 데이터를 저장하는 방식은 producer가 데이터를 보낼때 지정하는 key로 구분된다.

  • key가 null이고, 기본 파티셔너 사용할 경우 =>라운드로빈(Round robin)으로 할당
  • key가 있고, 기본 파티셔너 사용할 경우 => 키의 Hash 값을 구하고 특정 파티션에 할당

 

이때 partition은 늘릴 수 있지만 줄일 수 없다. 단 partitionrecord는 옵션에 따라 일정 기간 혹은 용량동안 저장된 뒤 데이터가 삭제될 수 있도록 설정할 수 있다.

 

 

Kafka Producer

 

producer 역할

1. 직렬화(serialization) : 직렬화는 지정된 설정을 통해 Message key와 value를 어떤  byte array로 변환할지 결정

2. 파티셔닝(Partioning) : 어떤 partition으로 record를 보낼지 결정

3. 압축(Compression) : record 압축설정이 되어있을 경우 설정된 포맷으로 압축을 수행하는데, 예를 들어 snappy나 iz4와 같은 포맷으로 진행

4. 메시지 배치(Message Accumulator) : 설정값에 의거하여 record를 queue에 저장했다가 한번에 broker로 전달

5. 전달(Sender) : 실제 broker에게 보내야하는 record들은 record batch에 의해 전송

 

  • topic에 해당하는 메세지를 생성
  • 특정 topic으로 데이터를 publish
  • kafka로 처리 실패/재시도
  • kafka-clients와 kafka broker의 하위호환성이 완벽하지 않으니 호환성 확인 필요

 

카프카 프로듀서로 데이터 전송하기

public class Producer {
    public static void main(String[] args) throws IOException {

        //1) Producer 설정 정의
        Properties configs = new Properties();
        configs.put("bootstrap.servers" , "localhost:9092"); //2개 이상의 브로커 ip, host 설정하기를 권장
        configs.put("key.serializer"    , "org.apache.kafka.common.serialization.StringSerializer"); //key, value를 직렬화 하기 위한 설정
        configs.put("value.serializer"  , "org.apache.kafka.common.serialization.StringSerializer");

        //2) Producer Instance 생성(전송할 객체)
        KafkaProducer<String, String> producer = new KafkaProducer<String, String(configs);

        //3) Record 생성
        //topic, key, value  또는 topic, value로 선택 정의할 수 있다.
        ProducerRecord record = new ProducerRecord<String, String>("topic", "key", "value");

        producer.send(record);
        producer.close();
    }
}

 

 

반응형