• [자바(JAVA)][카프카(Kafka)] 외부의 데이터를 받는 프로듀서(Producer) 구현

    2025. 2. 3.

    by. Daramu

    자바에서 카프카를 사용하기 위한 방법이다.

    이전 게시글에 있듯이, 카프카는 프로듀서 -> 카프카 클러스터 -> 컨슈머 의 형태로 메시지가 소비된다.

    우선 프로듀서가 메시지를 카프카 클러스터에 넣어야 하는데, 순서대로 진행할 것이다.

     

    아래의 코드는  "https://stream.wikimedia.org/v2/stream/recentchange" 의 데이터를 가져오는 것이다.

    들어가 보면 알겠지만 위키의 새롭게 올라온 내용에 대한 주소로, json 형식으로 우르르 올라오는 것을 알 수 있다.

     

    우선 Producer를 생성한다.

    이전 게시물과 동일하게 부트스트랩 IP와 Port를 지정하고, 시리얼라이저 등록과 topic의 생성등을 진행한다.

    하나 다른 것은 이벤트 핸들러인데, 이벤트 핸들러의 "WikimediaChangeHandler"로 producer와 topic을 보낸다.

    topic은 정의되어있고, producer와 함께 " WikimediaChangeHandler" 로 전달되며 생성자를 통해 초기화가 진행된다.

     

    public static void main(String[] args) throw InterruptedException{
    String bootstrapServers = "127.0.0.1:9092";
    //Create Producer Properties
    Properties properties = new Properties();
    //서버와 메시지를 보낼때 Byte 변환 형식 지정
    properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerialzer.class.getName());
    properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerialzer.class.getName());
    //설정한 properties를 통해 Producer 생성
    KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
    //topic 생성
    String topic = "wikimedia.recentchage";
    //이벤트 핸들러
    EventHandler eventHandler = new WikimediaChangeHandler(produer, topic);
    String url = "https://stream.wikimedia.org/v2/stream/recentchange";
    EventSource.Builder builder = new EventSource.Builder(eventHandler, URI.create(url));
    EventSource eventSource = builder.build();
    //프로듀서 시작
    eventSource.start();
    //해당 코드는 일반적인 JAVA 어플리케이션 환경에서의 예시를 위해 작성되었다.
    //스프링 부트등의 환경이라면 서버 어플리케이션이므로 서버가 실행되는 동안 메인 스레드는 살아있다.
    //다만 일반적인(바닐라) JAVA라면 EventSource는 별도의 백그라운드 스레드에서 실행되고, 메인 스레드는 직후 종료될 수 있다.
    //그렇기에 sleep을 통해 메인 스레드를 10분동안 잠시 멈춰서 종료되지 않게 막는 것이다.
    //즉 아래의 sleep은 스프링 부트 환경이라면 필요치 않다.
    TimeUnit.MINUTES.sleep(10);
    }

     

     

    그리고 아래는 " WikimediaChangeHandler " 의 코드로 여기서 eventHandler 부분이 중요하다.

    onmessage는 event와 message를 매개변수로 받는다. 이 매개변수는 EventSource가 데이터를 받을때 자동으로 채워지며, 일반적으로 각각 이벤드의 타입과 데이터가 들어간다.

     

    즉, Main의 EventSource는 이벤트가 발생할때마다 message와 event를 아래의 핸들러의 onMessage 메서드로 자동으로 전달된다는 것이다. 그리고 onMessage는 작성되어 있는대로 send를 통해 카프카 클러스터로 메시지를 보내게 된다.

    public class WikimediaChangeHandler implements EventHandler{
    KafkaProducer<String,String> kafkaProducer;
    String topic;
    private final Logger log = LoggerFactory.getLogger(WikimediaChangeHandler.class.getSimpleName());
    public WikimediaChangeHandler(KafkaProducer<String,String> kafkaProducer, String topic){
    this.kafkaProducer = kafkaProducer;
    this.topic = topic;
    }
    @Override
    public void onOpen() Throws Exception{
    //main 에서 실행하기에 필요치 않음
    }
    @Override
    public void onClosed() Throws Exception{
    //종료시 프로듀서 또한 종료
    kafkaProducer.close();
    }
    @Override
    public void onMessage(String event, MessageEvent messageEvent) Throws Exception{
    log.info(messageEvent.getData());
    //비동기 처리
    kafkaProducer.send(new ProducerRecord<>(topic, messageEvent.getData()));
    }
    @Override
    public void onCommit(String commit) Throws Exception{
    }
    @Override
    public void onError(Throkwable t){
    log.error("Error in Stream Reading", t);
    }
    }

     

     

    요약:

    main부분에는 단지 producer와 topic만 핸들러의 생성자로 전달하여 핸들러 객체 초기화.

    EventSource가 onMessage를 호출할 때, messageEvent와 event를 자동으로 전달.

    이벤트 발생시 onMessage 메서드가 실행되고, 이 메서드는 받은 데이터를 Kafka로 전달.

     

    main부분에 topic과 producer 만 넘겨주는 이유는 EventSource와 EventHandler가 이벤트를 발생시키고 처리시키는 구조이기 때문.

    실제로 데이터를 보내는 것은 onMessage 메서드.

     

    댓글