AWS

[AWS] Amazon Kinesis Data Firehose (3) - JAVA 연동

zamezzz 2023. 1. 17. 09:51

지난 AWS 콘솔에서 Firehose 설정을 마치고, 이번에는 실제 Java에서 연동하는 방법에 대해 간단히 예제를 정리해보겠습니다.

 

먼저 가장 처음 해야 할 일은 aws sdk를 설정해야 합니다.

implementation 'software.amazon.awssdk:firehose:2.18.16'

build.gradle에 추가해주고, 별도의 라이브러리 설정은 필요하지 않습니다.

 

그리고 다음으로는 kinesis 연결을 하고 데이터를 추가하는 함수를 구현해야 합니다.

 

여기서는 awsCredentials을 위한 key가 필요하니 먼저 준비해주시면 됩니다.

 

연결 예제 코드는 아래와 같습니다.

 

public String putRecordRequest(String recordValue, String streamName) {
    // ACCESS_KEY, SECRET_KEY 별도 입력
    AwsCredentials awsCredentials = AwsBasicCredentials.create(ACCESS_KEY, SECRET_KEY);
    StaticCredentialsProvider staticCredentialsProvider = StaticCredentialsProvider.create(awsCredentials);

    FirehoseClient firehoseClient = software.amazon.awssdk.services.firehose.FirehoseClient.builder()
        .region("Region.AP_NORTHEAST_2")
        .credentialsProvider(staticCredentialsProvider)
        .build();

    String recordId = null;

    try {
        SdkBytes sdkBytes = SdkBytes.fromByteArray((recordValue + "\n").getBytes());
        Record record = Record.builder()
            .data(sdkBytes)
            .build();

        PutRecordRequest putRecordRequest = PutRecordRequest.builder()
            .deliveryStreamName(streamName)
            .record(record)
            .build();

        PutRecordResponse putRecordResponse = firehoseClient.putRecord(putRecordRequest);
        recordId = putRecordResponse.recordId();
    } catch (FirehoseException e) {
        // Exception Logging
    }

    firehoseClient.close();

    if (recordId != null) {
        return "Success";
    } else {
        return "Fail";
    }
}

 

- AWSCredentials 을 생성

- FirehoseClient 를 통해 연결 설정

- 입력받은 recordValue 로 putRecordRequest를 위한 Record 객체 생성

- Record 값과 StreamName을 함께 사용하여 요청

- 응답값 확인 (recordId)

 

위 함수를 호출하는 코드는 다음과 같습니다.

 

간단하게 json 객체를 만들고 미리 콘솔에서 만들어 둔 StreamName과 함께 호출하시면 됩니다.

JSONObject jsonObject = new JSONObject();

jsonObject.put("recordId", 1);
jsonObject.put("text", "ABCD TEST");

String jsonValue = jsonObject.toString();

String putRecordResult = this.putRecordRequest(jsonValue, "FIREHOSE_STREAM_NAME");

 

이후 1분이 지나면 지정된 S3에 데이터가 쌓임을 확인하실 수 있습니다.

 

간단하게 데이터를 수집해서 S3로 저장까지의 과정을 정리해봤습니다.

 

감사합니다.

반응형