-
카프카에서 프로듀서는 카프카 클러스터에 메시지를 전달하는 역할을 한다.
크게 프로듀서 속성 정의, 프로듀서 생성, 데이터 전송, 프로듀서 종료 순으로 진행한다.
중간에 시리얼라이저(Serializer)가 있는데, 이건 카프카로 메시지를 보낼때, Key:Value 쌍으로 메시지를 보낸다.
이때 메시지는 입력값으로 보내는 것이 아닌, Byte값으로 변하여 전송이 되는데 그때 변환할 형식을 의미한다.
가령 key값이 1,2,3이라면 Int도 상관없지만, 문자열(test_1, Log....etc)라면 String을 사용한다.
그렇게 정의된 각각의 시리얼라이저를 Producer에 담아 보낸다.
public class ProducerDemo{private Static final Logger log = LoggerFactory.getLogger(ProducerDemo.class.getSimpleName());public static void main(String[] args){Properties properties = new Properties();properties.setProperty("bootstrap.servers","127.0.0.1:9092");properties.setProperty("key.serializer",StringSerializer.call.getName());properties.setProperty("value.serializer", StringSerializer.call.getName());KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);//create a Producer Record//아래는 demo_java 토픽으로 send_value 메시지 전달.ProducerRecord<String,String> producerRecord = new ProducerRecord<>("topic_name", "send_value");//sendproducer.send(producerRecord);//flush & closeproducer.flush();producer.close();}}Producer CallBack
메시지를 보낼때 콜백을 정의함으로써 Log로 보내는 메시지의 메타데이터를 Log로 찍어볼 수 있다.
public class ProducerDemo{private Static final Logger log = LoggerFactory.getLogger(ProducerDemo.class.getSimpleName());public static void main(String[] args){Properties properties = new Properties();properties.setProperty("bootstrap.servers","127.0.0.1:9092");properties.setProperty("key.serializer",StringSerializer.call.getName());properties.setProperty("value.serializer", StringSerializer.call.getName());KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);//create a Producer Record//아래는 demo_java 토픽으로 send_value 메시지 전달.ProducerRecord<String,String> producerRecord = new ProducerRecord<>("topic_name", "send_value");//sendproducer.send(producerRecord, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception e){//executes every time a record successfully sent for exceptionif(e==null}{//the record was successfullty sentlog.info("Received new metadata \n" +"Topic: " + metadata.topic() + "\n" +"Partition: " + metadata.partition() + "\n" +"Offest: " + metadata.offset() + "\n" +"timestamp: " + metadata.timestamp());}else {log.error("Error while Production: ", e);}}});//flush & closeproducer.flush();producer.close();}}그리고 메시지를 보낼때 가장 위에 있듯이 key값을 정할 수 있는데,
key값을 정한다면 Kafka에서 내부적으로 테이블을 만들고 해당하는 key는 언제나 동일한 파티션으로 보내진다.
가령 key값으로 for문을 돌려 id_1, id_2, id_3... 이렇게 만들고 보냈다면,
id_1 이 1번 파티션, id_2가 3번 파티션, id_3이 2번 파티션으로 보내진다면 동일한 id에 관해 동일한 파티션으로 보내진 다는 의미이다.
아래가 아니라 위처럼 단순 topic, value 로 보내 id값이 null이라면 자동으로 스티키 세션처럼 일정량, 일정시간 동안 메시지를 묶어 파티션으로 전송한다.
public class ProducerDemo{private Static final Logger log = LoggerFactory.getLogger(ProducerDemo.class.getSimpleName());public static void main(String[] args){Properties properties = new Properties();properties.setProperty("bootstrap.servers","127.0.0.1:9092");properties.setProperty("key.serializer",StringSerializer.call.getName());properties.setProperty("value.serializer", StringSerializer.call.getName());KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);//create a Producer Record//아래는 demo_java 토픽으로 send_value 메시지 전달.//3개가 된다면 topic,key,value 순이다.//(topic_name,key,value)ProducerRecord<String,String> producerRecord = new ProducerRecord<>("topic_name", "send_value");//sendproducer.send(producerRecord, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception e){//executes every time a record successfully sent for exceptionif(e==null}{//the record was successfullty sentlog.info("Received new metadata \n" +"Topic: " + metadata.topic() + "\n" +"Partition: " + metadata.partition() + "\n" +"Offest: " + metadata.offset() + "\n" +"timestamp: " + metadata.timestamp());}else {log.error("Error while Production: ", e);}}});//flush & closeproducer.flush();producer.close();}}'DevOps > Kafka' 카테고리의 다른 글
[자바(JAVA)][카프카(Kafka)] 프로듀서(Producer) 설정(Acks, Retries) (0) 2025.02.03 [자바(JAVA)][카프카(Kafka)] 외부의 데이터를 받는 프로듀서(Producer) 구현 (0) 2025.02.03 Kafka - 기초 설정 (0) 2025.01.17