• [자바(JAVA)][카프카(Kafka)] 소비자(컨슈머/Consumer) OpenSearch

    2025. 2. 4.

    by. Daramu

    프로듀서에 대한 설정은 아래의 링크 참고

    https://daramu.tistory.com/63

     

    [자바(JAVA)][카프카(Kafka)] 외부의 데이터를 받는 프로듀서(Producer) 구현

    자바에서 카프카를 사용하기 위한 방법이다.이전 게시글에 있듯이, 카프카는 프로듀서 -> 카프카 클러스터 -> 컨슈머 의 형태로 메시지가 소비된다.우선 프로듀서가 메시지를 카프카 클러스터에

    daramu.tistory.com

     

    컨슈머에 대한 설정은 여러가지가 있지만 이번에 진행할 것은 컨슈머로 OpenSearch를 사용할 때의 예제이다.

    OpenSearch는 Docker로 컨테이너로 올리는 환경이며, 그 파일은 아래와 같다.

    version: '3.7'
    services:
    opensearch:
    image: opensearchproject/opensearch:1.2.4
    environment:
    discovery.type: single-node
    plugins.security.disabled: "true" # disable https and logins
    compatibility.override_main_response_version: "true"
    ports:
    - 9200:9200
    - 9600:9600 # required for Performance Analyzer
    # console at http://localhost:5601/app/dev_tools#/console
    opensearch-dashboards:
    image: opensearchproject/opensearch-dashboards:1.2.0
    ports:
    - 5601:5601
    environment:
    OPENSEARCH_HOSTS: '["http://opensearch:9200"]'
    DISABLE_SECURITY_DASHBOARDS_PLUGIN: "true"

     

    9200 포트는 OpenSearch고, 일종의 데이터베이스 혹은 수집기라 봐도 무방하다.

    그리고 DashBoard는 우리가 UI로 보는 화면이며, 위의 DockerFile을 적용시 localhost:5601 을 통해 접근 가능하다.

     

    아래의 코드는 컨슈머(Consumer)에 대한 코드로,

    의존성으로 RestHighLevelClient 이 필요하다.

    의존성 링크: https://mvnrepository.com/artifact/org.opensearch.client/opensearch-rest-high-level-client

    public static RestHighLevelClient createOpenSearchClient() {
    String connString = "http://localhost:9200";
    // we build a URI from the connection string
    RestHighLevelClient restHighLevelClient;
    URI connUri = URI.create(connString);
    // extract login information if it exists
    String userInfo = connUri.getUserInfo();
    if (userInfo == null) {
    // REST client without security
    restHighLevelClient = new RestHighLevelClient(RestClient.builder(new HttpHost(connUri.getHost(), connUri.getPort(), "http")));
    } else {
    // REST client with security
    String[] auth = userInfo.split(":");
    CredentialsProvider cp = new BasicCredentialsProvider();
    cp.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(auth[0], auth[1]));
    restHighLevelClient = new RestHighLevelClient(
    RestClient.builder(new HttpHost(connUri.getHost(), connUri.getPort(), connUri.getScheme()))
    .setHttpClientConfigCallback(
    httpAsyncClientBuilder -> httpAsyncClientBuilder.setDefaultCredentialsProvider(cp)
    .setKeepAliveStrategy(new DefaultConnectionKeepAliveStrategy())
    )
    );
    }
    return restHighLevelClient;
    }
    private static KafkaConsumer<String, String> createKafkaConsumer() {
    String groupId = "consumer-opensearch-demo";
    // create consumer configs
    Properties properties = new Properties();
    properties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
    properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
    properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
    // create consumer
    return new KafkaConsumer<>(properties);
    }
    private static String extractId(String json) {
    // gson library
    return JsonParser.parseString(json)
    .getAsJsonObject()
    .get("meta")
    .getAsJsonObject()
    .get("id")
    .getAsString();
    }
    public static void main(String[] args) throws IOException {
    Logger log = LoggerFactory.getLogger(OpenSearchConsumer.class.getSimpleName());
    // Create an OpenSearch Client
    RestHighLevelClient openSearchClient = createOpenSearchClient();
    // Kafka Client 생성
    KafkaConsumer<String, String> consumer = createKafkaConsumer();
    // 인덱스 생성
    try (openSearchClient; consumer) {
    boolean indexExists = openSearchClient.indices().exists(new GetIndexRequest("wikimedia"), RequestOptions.DEFAULT);
    if (!indexExists) {
    // 인덱스 생성(없을 경우)
    CreateIndexRequest createIndexRequest = new CreateIndexRequest("wikimedia");
    openSearchClient.indices().create(createIndexRequest, RequestOptions.DEFAULT);
    log.info("Index 생성 완료");
    } else {
    log.info("Index 이미 존재함");
    }
    // 구독
    consumer.subscribe(Collections.singleton("wikimedia.recentchange"));
    while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(3000));
    int recordCount = records.count();
    log.info("Received " + recordCount + " record(s)");
    for (ConsumerRecord<String, String> record : records) {
    try {
    String id = extractId(record.value());
    IndexRequest indexRequest = new IndexRequest("wikimedia")
    .source(record.value(), XContentType.JSON)
    .id(id);
    bulkRequest.add(indexRequest);
    } catch (Exception e) {
    // Error handling if needed
    }
    }
    }
    }
    // Close
    openSearchClient.close();
    }

     

    코드를 보면 알겠지만, 가장 상단의 Localhost:9200은 Docker로 띄운 OpenSearch의 주소이다.

     

    그 외에 각 메소드 및 메소드의 내부적인 동작에 대한 정리이다.

     

    우선 createOpenSearchClient 메소드다.

    public static RestHighLevelClient createOpenSearchClient() {
    String connString = "http://localhost:9200";
    // we build a URI from the connection string
    RestHighLevelClient restHighLevelClient;
    URI connUri = URI.create(connString);
    // extract login information if it exists
    String userInfo = connUri.getUserInfo();
    if (userInfo == null) {
    // REST client without security
    restHighLevelClient = new RestHighLevelClient(RestClient.builder(new HttpHost(connUri.getHost(), connUri.getPort(), "http")));
    } else {
    // REST client with security
    String[] auth = userInfo.split(":");
    CredentialsProvider cp = new BasicCredentialsProvider();
    cp.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(auth[0], auth[1]));
    restHighLevelClient = new RestHighLevelClient(
    RestClient.builder(new HttpHost(connUri.getHost(), connUri.getPort(), connUri.getScheme()))
    .setHttpClientConfigCallback(
    httpAsyncClientBuilder -> httpAsyncClientBuilder.setDefaultCredentialsProvider(cp)
    .setKeepAliveStrategy(new DefaultConnectionKeepAliveStrategy())
    )
    );
    }
    return restHighLevelClient;
    }

     

    해당 메소드는 OpenSearch서버와 연결하기 위한 RestHighLevelClient(의존성 필요)의 설정 및 생성에 대한 메소드이다.

    URI(최상단 도커파일을 통해 생성)을 URI 객체로 변환하여 연결하고, 인증을 진행한다.

     

    다음은 createKafkaConsumer 메소드이다.

    private static KafkaConsumer<String, String> createKafkaConsumer() {
    String groupId = "consumer-opensearch-demo";
    // create consumer configs
    Properties properties = new Properties();
    properties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
    properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
    properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
    // create consumer
    return new KafkaConsumer<>(properties);
    }

     

    해당 메소드는 Consumer에 대한 설정으로, Properties 객체를 사용하여 Consumer에 대한 설정을 진행한다.

    여러 설정이 있지만 중요한 것은 Kafka 서버의 주소인 BOOTSTRAP_SERVERS의 정확한 주소와, OFFEST 설정이다.

    예시는 latest설정으로 최신 메시지를 소비 하곘다는 설정이다.

     

    마지막으로 extractID메소드이다.

    private static String extractId(String json) {
    // gson library
    return JsonParser.parseString(json)
    .getAsJsonObject()
    .get("meta")
    .getAsJsonObject()
    .get("id")
    .getAsString();
    }

    해당 메소드는 Json에서 ID를 추출한다.

    이 설정을 통해 두개의 중복된 데이터가 있을시 ID를 통해 하나만 받을 수 있게 진행한다.

    예시 환경은 기본적인 JAVA환경이기에 gson 의존성을 사용했으나, 알고 있듯이 스프링 부트 사용시 서블릿 컨테이너를 통해 자동적으로 json과 java 객체에 대한 변환이 이루어지기에 굳이 의존성이 필요치 않아 보인다.

     

    그리고 위의 메소드 들을 사용하는 main 메소드이다.

    public static void main(String[] args) throws IOException {
    Logger log = LoggerFactory.getLogger(OpenSearchConsumer.class.getSimpleName());
    // Create an OpenSearch Client
    RestHighLevelClient openSearchClient = createOpenSearchClient();
    // Kafka Client 생성
    KafkaConsumer<String, String> consumer = createKafkaConsumer();
    // 인덱스 생성
    try (openSearchClient; consumer) {
    boolean indexExists = openSearchClient.indices().exists(new GetIndexRequest("wikimedia"), RequestOptions.DEFAULT);
    if (!indexExists) {
    // 인덱스 생성(없을 경우)
    CreateIndexRequest createIndexRequest = new CreateIndexRequest("wikimedia");
    openSearchClient.indices().create(createIndexRequest, RequestOptions.DEFAULT);
    log.info("Index 생성 완료");
    } else {
    log.info("Index 이미 존재함");
    }
    // 구독
    consumer.subscribe(Collections.singleton("wikimedia.recentchange"));
    while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(3000));
    int recordCount = records.count();
    log.info("Received " + recordCount + " record(s)");
    for (ConsumerRecord<String, String> record : records) {
    try {
    String id = extractId(record.value());
    IndexRequest indexRequest = new IndexRequest("wikimedia")
    .source(record.value(), XContentType.JSON)
    .id(id);
    bulkRequest.add(indexRequest);
    } catch (Exception e) {
    // Error handling if needed
    }
    }
    }
    }
    // Close
    openSearchClient.close();
    }

     

    위에서 설정한 메소드를 통해 Opensearch 클라이언트 및 kafka 컨슈머를 생성한다.

    그리고 OpenSearch에 Index가 존재하는지 확인하는 "indexExists" 를 통해 인덱스가 없을 시 생성, 있을 시 그냥 넘어가는 if-else 코드가 존재한다.

     

    그리고 생성 후 종료를 막기위해 백그라운드에서 지속적으로 데이터를 처리하기 위한 while문이 존재한다.

     

    댓글