• Kafka - 프로듀서(Prosucer)

    2025. 1. 17.

    by. Daramu

    카프카에서 프로듀서는 카프카 클러스터에 메시지를 전달하는 역할을 한다.

    크게 프로듀서 속성 정의, 프로듀서 생성, 데이터 전송, 프로듀서 종료 순으로 진행한다.

     

    중간에 시리얼라이저(Serializer)가 있는데, 이건 카프카로 메시지를 보낼때, Key:Value 쌍으로 메시지를 보낸다.

    이때 메시지는 입력값으로 보내는 것이 아닌, Byte값으로 변하여 전송이 되는데 그때 변환할 형식을 의미한다.

    가령 key값이 1,2,3이라면 Int도 상관없지만, 문자열(test_1, Log....etc)라면 String을 사용한다.

     

    그렇게 정의된 각각의 시리얼라이저를 Producer에 담아 보낸다.

    public class ProducerDemo{
    private Static final Logger log = LoggerFactory.getLogger(ProducerDemo.class.getSimpleName());
    public static void main(String[] args){
    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers","127.0.0.1:9092");
    properties.setProperty("key.serializer",StringSerializer.call.getName());
    properties.setProperty("value.serializer", StringSerializer.call.getName());
    KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);
    //create a Producer Record
    //아래는 demo_java 토픽으로 send_value 메시지 전달.
    ProducerRecord<String,String> producerRecord = new ProducerRecord<>("topic_name", "send_value");
    //send
    producer.send(producerRecord);
    //flush & close
    producer.flush();
    producer.close();
    }
    }

     

     

    Producer CallBack

    메시지를 보낼때 콜백을 정의함으로써 Log로 보내는 메시지의 메타데이터를 Log로 찍어볼 수 있다.

    public class ProducerDemo{
    private Static final Logger log = LoggerFactory.getLogger(ProducerDemo.class.getSimpleName());
    public static void main(String[] args){
    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers","127.0.0.1:9092");
    properties.setProperty("key.serializer",StringSerializer.call.getName());
    properties.setProperty("value.serializer", StringSerializer.call.getName());
    KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);
    //create a Producer Record
    //아래는 demo_java 토픽으로 send_value 메시지 전달.
    ProducerRecord<String,String> producerRecord = new ProducerRecord<>("topic_name", "send_value");
    //send
    producer.send(producerRecord, new Callback() {
    @Override
    public void onCompletion(RecordMetadata metadata, Exception e){
    //executes every time a record successfully sent for exception
    if(e==null}{
    //the record was successfullty sent
    log.info("Received new metadata \n" +
    "Topic: " + metadata.topic() + "\n" +
    "Partition: " + metadata.partition() + "\n" +
    "Offest: " + metadata.offset() + "\n" +
    "timestamp: " + metadata.timestamp());
    }else {
    log.error("Error while Production: ", e);
    }
    }
    });
    //flush & close
    producer.flush();
    producer.close();
    }
    }

     

     

    그리고 메시지를 보낼때 가장 위에 있듯이 key값을 정할 수 있는데,

    key값을 정한다면 Kafka에서 내부적으로 테이블을 만들고 해당하는 key는 언제나 동일한 파티션으로 보내진다.

     

    가령 key값으로 for문을 돌려 id_1, id_2, id_3... 이렇게 만들고 보냈다면,

    id_1 이 1번 파티션, id_2가 3번 파티션, id_3이 2번 파티션으로 보내진다면 동일한 id에 관해 동일한 파티션으로 보내진 다는 의미이다.

     

    아래가 아니라 위처럼 단순 topic, value 로 보내 id값이 null이라면 자동으로 스티키 세션처럼 일정량, 일정시간 동안 메시지를 묶어 파티션으로 전송한다.

    public class ProducerDemo{
    private Static final Logger log = LoggerFactory.getLogger(ProducerDemo.class.getSimpleName());
    public static void main(String[] args){
    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers","127.0.0.1:9092");
    properties.setProperty("key.serializer",StringSerializer.call.getName());
    properties.setProperty("value.serializer", StringSerializer.call.getName());
    KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);
    //create a Producer Record
    //아래는 demo_java 토픽으로 send_value 메시지 전달.
    //3개가 된다면 topic,key,value 순이다.
    //(topic_name,key,value)
    ProducerRecord<String,String> producerRecord = new ProducerRecord<>("topic_name", "send_value");
    //send
    producer.send(producerRecord, new Callback() {
    @Override
    public void onCompletion(RecordMetadata metadata, Exception e){
    //executes every time a record successfully sent for exception
    if(e==null}{
    //the record was successfullty sent
    log.info("Received new metadata \n" +
    "Topic: " + metadata.topic() + "\n" +
    "Partition: " + metadata.partition() + "\n" +
    "Offest: " + metadata.offset() + "\n" +
    "timestamp: " + metadata.timestamp());
    }else {
    log.error("Error while Production: ", e);
    }
    }
    });
    //flush & close
    producer.flush();
    producer.close();
    }
    }

    댓글