-
프로듀서에 대한 설정은 아래의 링크 참고
[자바(JAVA)][카프카(Kafka)] 외부의 데이터를 받는 프로듀서(Producer) 구현
자바에서 카프카를 사용하기 위한 방법이다.이전 게시글에 있듯이, 카프카는 프로듀서 -> 카프카 클러스터 -> 컨슈머 의 형태로 메시지가 소비된다.우선 프로듀서가 메시지를 카프카 클러스터에
daramu.tistory.com
컨슈머에 대한 설정은 여러가지가 있지만 이번에 진행할 것은 컨슈머로 OpenSearch를 사용할 때의 예제이다.
OpenSearch는 Docker로 컨테이너로 올리는 환경이며, 그 파일은 아래와 같다.
version: '3.7'services:opensearch:image: opensearchproject/opensearch:1.2.4environment:discovery.type: single-nodeplugins.security.disabled: "true" # disable https and loginscompatibility.override_main_response_version: "true"ports:- 9200:9200- 9600:9600 # required for Performance Analyzer# console at http://localhost:5601/app/dev_tools#/consoleopensearch-dashboards:image: opensearchproject/opensearch-dashboards:1.2.0ports:- 5601:5601environment:OPENSEARCH_HOSTS: '["http://opensearch:9200"]'DISABLE_SECURITY_DASHBOARDS_PLUGIN: "true"9200 포트는 OpenSearch고, 일종의 데이터베이스 혹은 수집기라 봐도 무방하다.
그리고 DashBoard는 우리가 UI로 보는 화면이며, 위의 DockerFile을 적용시 localhost:5601 을 통해 접근 가능하다.
아래의 코드는 컨슈머(Consumer)에 대한 코드로,
의존성으로 RestHighLevelClient 이 필요하다.
의존성 링크: https://mvnrepository.com/artifact/org.opensearch.client/opensearch-rest-high-level-client
public static RestHighLevelClient createOpenSearchClient() {String connString = "http://localhost:9200";// we build a URI from the connection stringRestHighLevelClient restHighLevelClient;URI connUri = URI.create(connString);// extract login information if it existsString userInfo = connUri.getUserInfo();if (userInfo == null) {// REST client without securityrestHighLevelClient = new RestHighLevelClient(RestClient.builder(new HttpHost(connUri.getHost(), connUri.getPort(), "http")));} else {// REST client with securityString[] auth = userInfo.split(":");CredentialsProvider cp = new BasicCredentialsProvider();cp.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(auth[0], auth[1]));restHighLevelClient = new RestHighLevelClient(RestClient.builder(new HttpHost(connUri.getHost(), connUri.getPort(), connUri.getScheme())).setHttpClientConfigCallback(httpAsyncClientBuilder -> httpAsyncClientBuilder.setDefaultCredentialsProvider(cp).setKeepAliveStrategy(new DefaultConnectionKeepAliveStrategy())));}return restHighLevelClient;}private static KafkaConsumer<String, String> createKafkaConsumer() {String groupId = "consumer-opensearch-demo";// create consumer configsProperties properties = new Properties();properties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");// create consumerreturn new KafkaConsumer<>(properties);}private static String extractId(String json) {// gson libraryreturn JsonParser.parseString(json).getAsJsonObject().get("meta").getAsJsonObject().get("id").getAsString();}public static void main(String[] args) throws IOException {Logger log = LoggerFactory.getLogger(OpenSearchConsumer.class.getSimpleName());// Create an OpenSearch ClientRestHighLevelClient openSearchClient = createOpenSearchClient();// Kafka Client 생성KafkaConsumer<String, String> consumer = createKafkaConsumer();// 인덱스 생성try (openSearchClient; consumer) {boolean indexExists = openSearchClient.indices().exists(new GetIndexRequest("wikimedia"), RequestOptions.DEFAULT);if (!indexExists) {// 인덱스 생성(없을 경우)CreateIndexRequest createIndexRequest = new CreateIndexRequest("wikimedia");openSearchClient.indices().create(createIndexRequest, RequestOptions.DEFAULT);log.info("Index 생성 완료");} else {log.info("Index 이미 존재함");}// 구독consumer.subscribe(Collections.singleton("wikimedia.recentchange"));while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(3000));int recordCount = records.count();log.info("Received " + recordCount + " record(s)");for (ConsumerRecord<String, String> record : records) {try {String id = extractId(record.value());IndexRequest indexRequest = new IndexRequest("wikimedia").source(record.value(), XContentType.JSON).id(id);bulkRequest.add(indexRequest);} catch (Exception e) {// Error handling if needed}}}}// CloseopenSearchClient.close();}코드를 보면 알겠지만, 가장 상단의 Localhost:9200은 Docker로 띄운 OpenSearch의 주소이다.
그 외에 각 메소드 및 메소드의 내부적인 동작에 대한 정리이다.
우선 createOpenSearchClient 메소드다.
public static RestHighLevelClient createOpenSearchClient() {String connString = "http://localhost:9200";// we build a URI from the connection stringRestHighLevelClient restHighLevelClient;URI connUri = URI.create(connString);// extract login information if it existsString userInfo = connUri.getUserInfo();if (userInfo == null) {// REST client without securityrestHighLevelClient = new RestHighLevelClient(RestClient.builder(new HttpHost(connUri.getHost(), connUri.getPort(), "http")));} else {// REST client with securityString[] auth = userInfo.split(":");CredentialsProvider cp = new BasicCredentialsProvider();cp.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(auth[0], auth[1]));restHighLevelClient = new RestHighLevelClient(RestClient.builder(new HttpHost(connUri.getHost(), connUri.getPort(), connUri.getScheme())).setHttpClientConfigCallback(httpAsyncClientBuilder -> httpAsyncClientBuilder.setDefaultCredentialsProvider(cp).setKeepAliveStrategy(new DefaultConnectionKeepAliveStrategy())));}return restHighLevelClient;}해당 메소드는 OpenSearch서버와 연결하기 위한 RestHighLevelClient(의존성 필요)의 설정 및 생성에 대한 메소드이다.
URI(최상단 도커파일을 통해 생성)을 URI 객체로 변환하여 연결하고, 인증을 진행한다.
다음은 createKafkaConsumer 메소드이다.
private static KafkaConsumer<String, String> createKafkaConsumer() {String groupId = "consumer-opensearch-demo";// create consumer configsProperties properties = new Properties();properties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");// create consumerreturn new KafkaConsumer<>(properties);}해당 메소드는 Consumer에 대한 설정으로, Properties 객체를 사용하여 Consumer에 대한 설정을 진행한다.
여러 설정이 있지만 중요한 것은 Kafka 서버의 주소인 BOOTSTRAP_SERVERS의 정확한 주소와, OFFEST 설정이다.
예시는 latest설정으로 최신 메시지를 소비 하곘다는 설정이다.
마지막으로 extractID메소드이다.
private static String extractId(String json) {// gson libraryreturn JsonParser.parseString(json).getAsJsonObject().get("meta").getAsJsonObject().get("id").getAsString();}해당 메소드는 Json에서 ID를 추출한다.
이 설정을 통해 두개의 중복된 데이터가 있을시 ID를 통해 하나만 받을 수 있게 진행한다.
예시 환경은 기본적인 JAVA환경이기에 gson 의존성을 사용했으나, 알고 있듯이 스프링 부트 사용시 서블릿 컨테이너를 통해 자동적으로 json과 java 객체에 대한 변환이 이루어지기에 굳이 의존성이 필요치 않아 보인다.
그리고 위의 메소드 들을 사용하는 main 메소드이다.
public static void main(String[] args) throws IOException {Logger log = LoggerFactory.getLogger(OpenSearchConsumer.class.getSimpleName());// Create an OpenSearch ClientRestHighLevelClient openSearchClient = createOpenSearchClient();// Kafka Client 생성KafkaConsumer<String, String> consumer = createKafkaConsumer();// 인덱스 생성try (openSearchClient; consumer) {boolean indexExists = openSearchClient.indices().exists(new GetIndexRequest("wikimedia"), RequestOptions.DEFAULT);if (!indexExists) {// 인덱스 생성(없을 경우)CreateIndexRequest createIndexRequest = new CreateIndexRequest("wikimedia");openSearchClient.indices().create(createIndexRequest, RequestOptions.DEFAULT);log.info("Index 생성 완료");} else {log.info("Index 이미 존재함");}// 구독consumer.subscribe(Collections.singleton("wikimedia.recentchange"));while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(3000));int recordCount = records.count();log.info("Received " + recordCount + " record(s)");for (ConsumerRecord<String, String> record : records) {try {String id = extractId(record.value());IndexRequest indexRequest = new IndexRequest("wikimedia").source(record.value(), XContentType.JSON).id(id);bulkRequest.add(indexRequest);} catch (Exception e) {// Error handling if needed}}}}// CloseopenSearchClient.close();}위에서 설정한 메소드를 통해 Opensearch 클라이언트 및 kafka 컨슈머를 생성한다.
그리고 OpenSearch에 Index가 존재하는지 확인하는 "indexExists" 를 통해 인덱스가 없을 시 생성, 있을 시 그냥 넘어가는 if-else 코드가 존재한다.
그리고 생성 후 종료를 막기위해 백그라운드에서 지속적으로 데이터를 처리하기 위한 while문이 존재한다.
댓글