우아한형제들의 BUDS(Baemin User Data System) 팀은 배달의민족과 Delivery Hero 고객에게 더 나은 맞춤형 마케팅 경험을 제공하고자 Segmentum이라는 타깃팅 서비스를 개발하여 운영 중입니다.
Segmentum은 그동안 일 단위로 데이터를 갱신해 왔으나, 실시간 마케팅에 대한 요구가 증가함에 따라 실시간으로 데이터를 업데이트해야 할 필요성이 생겼습니다. 이에 따라 실시간으로 데이터 갱신을 위한 데이터 파이프라인 아키텍처를 만들게 되었고, 이를 검증하고자 PoC(Proof of Concept, 개념 증명)를 진행했습니다.
이 글에서는 PoC 과정에서 얻은 경험을 바탕으로 주요 과정과 배운 점을 공유합니다.
다룰 내용은 다음과 같습니다.
- 실시간 마케팅 PoC를 위한 아키텍처 – 전반적인 아키텍처 구조도
- 이벤트 소스 파이프라인 – 실시간 고객 속성 이벤트 인입/처리 방법
- Flink 실행 환경 구성 – Flink를 구성하기 위한 비용과 관리 방안
- 실시간 처리 – Flink 프로젝트 구성
- 부하 테스트 – 전체 파이프라인이 처리할 트래픽 테스트, 최적화
글에 들어가기에 앞서, 현재는 데이터가 어떤 식으로 저장되고 마케팅에 활용되는지 간단하게 설명드리겠습니다.
현재 데이터 처리 시나리오
현재 고객 속성 데이터를 아래와 같이 저장하고 있습니다.
{ "customer_id" : "7935520", # 고객 id "order_days_after_last_order" : 10, # 마지막주문후경과일 "order_count_last_4_weeks" : 1, # 4주이내주문수 ..... }그리고 이러한 고객 속성 데이터를 사용해 “세그먼트”를 만듭니다. 세그먼트란 공통된 속성을 가진 고객들의 집합입니다. 예시로 치킨 카테고리의 주문량을 조사해서 치킨을 자주 시켜 먹는 고객 집단 세그먼트를 만들어둘 수 있습니다. 광고 플랫폼에서는 이 세그먼트의 고객군들을 대상으로 치킨을 더 시켜 먹도록 독려하기 위해 광고를 할 수 있습니다.
실시간 마케팅 상황에서는 고객이 주문을 하거나, 특정 광고를 클릭했을 경우 실시간으로 고객 속성 데이터가 수신되어 업데이트되고, 이 업데이트된 속성 데이터를 활용한 실시간 마케팅을 진행하게 됩니다.
실시간 마케팅 PoC를 위한 아키텍처
PoC를 위해 구성한 파이프라인을 요약하자면 SNS -> Lambda -> Kinesis -> Flink -> DB의 흐름이라고 볼 수 있습니다.
이제 각각의 파이프라인에 대해 설명드리겠습니다.
실시간 데이터는 SNS로 들어옵니다. 여기에 Lambda를 연결해, SNS에 데이터가 들어올 경우 Lambda를 호출하도록 하였습니다. Lambda에서는 이 데이터를 받아서 Kinesis로 전달하고, Kinesis에 있는 데이터를 Flink가 받아서 처리합니다.
Flink 내부에서는 들어온 데이터에서 키값을 추출해 OpenSearch(ElasticSearch)에 이미 데이터가 있는지 확인 요청을 보내고, 데이터가 있으면 갱신합니다. 그리고 갱신한 데이터를 DynamoDB에 씁니다.
이번 PoC의 핵심 목표는 실제 마케팅 요구 사항인 최대 순간 호출량 12k RPS(초당 12,000건 요청) 환경에서, 모든 데이터를 5초 이내 End-to-End Latency로 안정적으로 처리해 파이프라인의 출력을 확인하는 것입니다.
12k RPS 상황을 구현하기 위해 부하 테스트 도구인 Locust를 사용해 SNS에 데이터를 넣으면서 테스트하였습니다.
이벤트 소스 파이프라인
먼저, Flink가 실시간으로 처리할 데이터를 수집하는 소스 파이프라인부터 알아보겠습니다.
이 파트에서는 AWS 자원을 코드로 관리할 수 있는 Terragrunt를 사용했습니다. 가장 먼저 AWS 자원 관리용 GitHub 저장소를 만들고 아래와 같은 프로젝트 구조를 생성했습니다.
envs 폴더에는 AWS의 환경별 변수가 들어가 있고, terraform 폴더에는 관리하고자 하는 자원들이 모듈화되어 나뉘어 들어가 있습니다. 이번에 만들 파이프라인은 segmentum-rt-pipeline 폴더에 정리했습니다.
안에는 메인 로직이 들어간 main.tf가 있습니다. 여기에는 리소스 블록이라고 불리는 자원 정의 코드가 포함되어 있습니다. 아래는 리소스 블록의 예시입니다.
# 변수 선언 locals { lambda_name = "segmentum-lambda" lambda_code_path = "${path.module}/lambda_code.zip" ... } ... # Lambda 생성 resource "aws_lambda_function" "segmentum_lambda" { function_name = local.lambda_name # 생성할 Lambda의 이름 role = aws_iam_role.segmentum_lambda_role.arn # Lambda가 가질 IAM handler = "lambda_code.lambda_handler" # Lambda가 실행시킬 함수 runtime = "python3.10" # Lambda가 사용할 언어 filename = local.lambda_code_path # Lambda가 사용할 코드 경로 source_code_hash = filebase64sha256(local.lambda_code_path) # 소스코드 변환 체크 }위 코드를 환경 변수 파일과 함께 실행했을 때 정상적으로 자원이 생성되면 성공입니다. 실행 시 정의한 자원들이 이미 존재하고 변경 사항이 없을 경우에는 건너뛰므로 계속 리소스 블록을 추가하며 작업했고, AWS 환경들을 코드로 관리하고 자동 생성할 수 있게 되었습니다.
이제 CI를 구성하고자 합니다. CI를 구성하는 이유는 리소스 변경 시 Terragrunt 명령어를 실행시켜 정상 반영 여부를 확인하고 관리하기 위함입니다.
CI는 자동으로 구성된 Drone CI를 사용했습니다. Drone CI를 구성하려면 루트 경로의 drone.yml에 동작들을 명시하면 됩니다.
먼저, 관리 중인 브랜치에 머지될 때 실행되도록 푸시 이벤트를 설정했습니다.
이제 main, staging 브랜치에 머지되면 설정한 명령어가 실행됩니다. Drone에서는 Docker이미지를 사용해 명령어 실행이 가능한데, Terragrunt 이미지를 사용해 명령어를 실행하도록 했습니다.
steps: - name: apply-segmentum-aws-infra-staging image: devopsinfra/docker-terragrunt:aws-tf-1.4.4-tg-0.45.2 environment: <<: *staging_env commands: - rm -rf .terraform ~/.terraform.d - aws configure list - aws sts get-caller-identity - make create-segmentum-rt-pipeline-stg ...이제 관리 중인 브랜치에 머지될 때마다 설정한 리소스 블록들이 실행되며 파이프라인을 구성하게 됩니다.
Flink 실행환경 구성
이제 Flink를 실행할 환경을 만들어야 합니다. 환경 구성으로는 EMR을 만들고, 그 위에 flink를 올리는 방식을 선택하였습니다. Flink의 실행환경 구성에 대해서는 여러 의견이 있었는데 중요도 순서대로 아래의 옵션들이 고려되었습니다.
- 비용이 싼 편이어야 함
- 관리 포인트가 적을수록 좋음
- 팀원들이 관련 기술에 경험이 있으면 좋음
이에 따라 Kubernetes, EMR, AWS KDA가 고려되었으나 Kubernetes 클러스터는 적합한 환경이 존재하지 않아 새로 만들어야 했는데, 오직 Flink만을 위한 Kubernetes 운영 비용이 만만치 않다 생각하여 KDA와 EMR중 하나를 선택하기로 결정하였습니다.
KDA와 EMR에 대해 비용을 조사했는데, KDA는 같은 자원을 사용 시 EMR에 비해 비용이 두 배 이상 비쌌습니다.
아래는 m6g.4xlarge 타입 인스턴스를 기준으로 비교한 표입니다.
타입 | m6g.4xlarge | KPU |
자원 | 1 m6g.4xlarge = 16vCPU, 64GiB | 1 KPU = 1vCPU, 4GiB |
비교 | 1 m6g.4xlarge = 16 KPU | |
가격(시간) | 1 Instance당 USD 0.922 (0.768+0.154, ap-southeast-1) |
1 KPU당 USD 0.138 (ap-southeast-1) |
1달 예상 가격 | USD 663.84 (0.768+0.154) * 24 * 30 |
USD 1589.76 0.138 * 16 * 24 * 30 |
이와 같은 결과로 Flink의 실행환경으로 EMR을 사용하기로 결정하였습니다.
이제 Flink의 코드 업데이트 환경을 만들어야 합니다. 코드를 업데이트할 때 필요한 최소한의 동작은 아래와 같았습니다.
- 현재 상태 저장 후 Flink를 안전하게 종료(graceful shutdown)
- 업데이트된 코드 적용
- Flink가 종료된 지점부터 재시작
- Flink UI 노출
가장 먼저 graceful shutdown은 아래처럼 Flink REST를 사용해 모든 데이터 스트림을 비우고 종료하도록 구현하였습니다.
curl -s -X POST "$FLINK_REST_BASE/jobs/$FLINK_JOB_ID/stop" \ -H "Content-Type: application/json" \ -d "{\"targetDirectory\": \"s3://$BUCKET/cdp/flink/savepoints/\", \"drain\": true}"이제 Flink를 종료된 지점부터 다시 시작하면 업데이트된 코드가 적용되어 실행됩니다.
Flink를 종료된 지점부터 재시작하기 위한 방법론으로는 세이브포인트와 체크포인트를 활용하는 방법이 있습니다. 체크포인트는 일정 기간마다 자동으로 저장되고, 세이브포인트는 사용자가 수동으로 저장 시에 생성됩니다. 이 특성을 고려해 아래처럼 구현합니다.
- 세이브포인트가 존재하는 경우 해당 세이브포인트에서 실행되도록 설정합니다.(정상적으로 종료된 경우)
- 세이브포인트는 존재하지 않지만 체크포인트가 존재하는 경우 가장 최근 체크포인트에서 실행합니다.(비정상적으로 종료된 경우)
- 둘 다 존재하지 않으면 처음 시작되는 것이므로 그냥 시작합니다.
이제 Flink가 잘 실행되었으니 외부에서 Flink UI에 접근 가능하도록 열어줘야 합니다. Flink의 UI를 노출하기 위해서는 Flink가 돌아가는 노드의 주소를 찾아 오픈해 주면 됩니다. 이를 위해 yarn application -list 명령어로 Flink의 REST 주소를 얻은 후, Route53 -> ALB -> Target Group(Flink REST node)와 같은 흐름으로 노출하였습니다.
한 가지 팁으로 Flink의 shutdown후 재시작 시간을 짧게 유지하는 것이 좋았습니다.
예를 들어, Flink 실행에 필요한 jar는 미리 빌드해두는 것이 좋은데, Flink가 중지된 시간이 길수록 처리되지 않은 데이터에서 쌓이는 백로그 이벤트가 많아 재시작 직후 순간 많은 처리량을 요구합니다. 이에 대한 Catch-up시 처리량이 부족한 경우 병목이 생길 가능성이 있습니다.
실시간 처리
실시간 처리를 구현하는 Flink 프로젝트 구성은 아래와 같습니다.
├── src │ ├── main │ │ ├── java │ │ │ └── com.woowahan.ds ... │ │ │ ├── BaseApplication.java │ │ │ ├── buds.segmentum │ │ │ │ └── Application.java │ │ │ │ ├── processing │ │ │ │ │ └── CDPStreamFilter.java │ │ │ │ │ ├── CDPStreamMapper.java ... │ │ └── resources │ │ ├── buds │ │ │ ├── flink_dev.yaml │ │ │ ├── flink_production.yaml │ │ │ ├── flink_staging.yaml ...Flink의 메인 함수 역할을 하는 파일은 Application이며, 환경설정같은 공통 설정을 담은 BaseApplication을 상속한 클래스입니다.
Application은 처리에 필요한 함수들을 별도의 클래스로 작성하여 파이프라인의 비즈니스 로직 구성이 한 눈에 파악되도록 구성하였습니다. 각 함수의 세부로직은 processing, sink 등으로 범주가 나눠진 폴더에서 관리됩니다. 아래는 코드가 어떤 식으로 구성되었는지 예시입니다.
Application.java
... DataStream<Map<String, Object>> transformedStream = deserializedStream .map(new KeyReplacer(delimiter, "__")) // delimiter를 __로 replace .disableChaining() ...Application에서는 전반적인 비즈니스 로직을 파악하고, 각각의 연산을 처리하는 클래스에서는 세부 로직을 파악하도록 구성하였습니다. 개발은 Localstack 등을 활용해 로컬 환경에서 진행했습니다. 아래는 Flink를 개발하면서 당면했던 이슈와 팁입니다.
- 의존성 충돌을 예방하는 shaded jar
- 표준 ObjectMapper
- Flink shaded ObjectMapper
- Flink 옵션 설정 우선순의 주의
자바에는 의존성 충돌을 해결하기 위한 shaded jar가 있습니다. 특정 라이브러리에 맞게 빌드된 jar를 말하는데, Flink에 맞게 빌드된 shaded jar도 존재합니다. Flink를 개발하면서 많은 의존성 에러를 겪을 경우 shaded jar이 있는지 찾아보고 사용하는 게 해결책이 될 수 있습니다.
예로 jackson의 ObjectMapper 예시입니다.
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
Flink에서 옵션 설정 시 StreamExecutionEnvironment 객체를 사용할 수 있습니다. 이 객체에는 configure 메서드와 setGlobalJobParameters 메서드가 존재합니다.
configure는 Flink의 체크포인트, state backend등 Flink의 런타임 환경을 설정하고, setGlobalJobParameters는 사용자가 전달한 옵션을 적용할 때 사용합니다. 여기서 주의할 점은, setGlobalJobParameters를 사용해서 Flink의 런타임 환경을 설정하면 옵션이 적용되지 않는다는 점입니다.
예로 체크포인트의 기록별 시간 주기를 설정하는 execution.checkpointing.interval 옵션이 있는데, configure에서 설정하지 않고 setGlobalJobParameters에서 설정하면 제대로 적용되지 않아 많은 혼란이 있었습니다.
부하 테스트
이제 실시간 파이프라인 구성을 완료했으니 처음에 언급한 제약사항인 12kRPS에도 5초 안에 모든 동작이 끝나는지 확인해야 합니다.
부하 테스트에서 사용하는 Locust는 프로세스등의 자원을 인자로 전달해 돌아가는 환경의 자원이 허용하는한 같은 시간내에 요청을 더 많이 보낼 수 있습니다. 간단히 프로그래밍해도 되지만 조금 더 편한 옵션을 제공하기에 Locust를 사용했습니다.
맥북에서 Locust에 최대한의 자원을 할당해 요청을 보내봤습니다. 제 맥북에서는 평균 4~5kRPS가 나왔습니다. 자원이 부족해 사내의 Load Testing툴을 사용해야 하나 고민하다가 3명이 보내면 12k RPS에 도달할 수 있다는 의견에 따라 그렇게 결정하였습니다.
현재 데이터의 흐름은 다음과 같습니다. Locust -> SNS -> Lambda -> Kinesis -> Flink
3명이 요청을 보내며 테스트했는데, 병목구간이 생기기 시작했습니다. SNS로는 데이터가 하나씩 들어옵니다. 그리고 데이터를 받는 Lambda에서도 속도에 맞춰 데이터를 빠르게 넘겨야 하는데 Lambda -> Kinesis 속도가 제대로 나오지 않았습니다.
Lambda에서 Kinesis로의 속도 측정은 CloudWatch로 Kinesis의 IncomingRecords값을 측정하는 방법을 사용하였습니다. CloudWatch가 1분 단위로 집계되므로 60으로 나눠서 평균 RPS를 계산했습니다.
그래프를 보면 최대 9.9kRPS 수준에 머무른 것을 알 수 있습니다.
Lambda의 메모리를 늘리고 호출수를 개선해봤지만 해결되지 않아, Lambda의 제약사항일거라 추측했습니다. 그래서 SNS와 Lambda 사이에 SQS를 넣기로 결정했습니다. 중간에 SQS가 들어갔을 때는 Lambda가 SQS의 데이터를 가져올 때 한 번 호출에 10개씩 묶어서 가져올 수 있는 장점이 있습니다.
그리고 보낼 때도 PutRecords를 사용하면 요청 한번에 10개씩 묶어 보낼 수 있으니 호출수가 줄어 속도가 더 빨라질 것이라 예상했고, 실제로 Lambda의 처리 속도 이슈는 사라졌습니다. 아래 그래프에서 12k 이상 요청도 처리됨을 확인할 수 있습니다.
이후 다시 테스트했을 때, 이번에는 AWS파이프라인이 아닌 Flink에서 이슈가 발생하였습니다.
Flink UI에서는 파이프라인의 부하 상태를 색으로 표시하며, 요청이 많아질수록 빨간색으로 변합니다. 중간 단계에 부하가 집중돼 처리 지연이 발생하는 것을 확인했습니다.
부하가 집중된 부분은 ElasticSearch(ES)에 데이터 존재 여부를 확인하는 단계로, 한 레코드가 한 번씩 요청을 보내니, 조회속도가 요청 속도를 따라가지 못했습니다. 그래서 이 부분도 모아서 처리할 수 있도록 로직을 수정하였습니다.
AS IS
1개의 요청이 들어옴 -> 1개 요청에 대해 ES에 search 요청을 1번 보내 데이터가 존재하는지 확인
TO BE
2초 동안 들어온 요청을 모아둠 -> 들어온 요청들을 ES의 msearch 요청을 1번 날려 각각 데이터가 존재하는지 확인
그리고 스큐(Skew) 비율이 높아 레코드 리밸런싱을 하였으며, ES의 CPU 사용률이 높지 않아 ES의 호출을 늘려도 되겠다고 판단했습니다. 이에 따라 Flink의 병렬성(parallelism)을 높였습니다. Flink의 병렬성을 높인다는 것은 작업을 실행하는 Operator의 subtask 수를 늘리는 개념으로 scale out과 비슷합니다.
이렇게 개선한 뒤 부하가 어느정도 해소되었음을 확인할 수 있습니다.
이제 12kRPS의 기준은 충족했는데, End to End갱신시간이 5초 내가 되는지도 확인해야합니다. 시간 측정은, Locust로 데이터를 보낼 때 현재 시간을 포함하고, 처리 완료 시점에 [현재 시간 – 데이터 내의 시간]으로 소요 시간을 계산했습니다.
Flink에서는 아래처럼 사용자 정의 메트릭을 등록할 수 있습니다.
LatencyGauge.java
public class LatencyGauge extends RichMapFunction<Map<String, Object>, Map<String, Object>> { ... public LatencyGauge(String keyToExtract) { this.keyToExtract = keyToExtract; } @Override public void open(OpenContext openContext) { diffGauge = getRuntimeContext() .getMetricGroup() .gauge("ProcessLatency", this::computeTimeDiff); } private Long computeTimeDiff() { if (lastTimestampValue <= 0) { return 0L; } long diff = System.currentTimeMillis() - lastTimestampValue; log.info("Latency: {}", diff); lastTimestampValue = 0; return diff; } @Override public Map<String, Object> map(Map<String, Object> value) throws Exception { if (value.containsKey(keyToExtract)) { try { Object fieldValue = value.get(keyToExtract); if (fieldValue != null) { if (fieldValue instanceof Number) { lastTimestampValue = ((Number) fieldValue).longValue(); } else if (fieldValue instanceof String) { Instant instant = Instant.parse((String) fieldValue); // Parse ISO-8601 timestamp lastTimestampValue = instant.toEpochMilli(); log.info("Extracted timestamp from `{}`: {}", keyToExtract, lastTimestampValue); } } } catch (Exception e) { log.warn("Failed to extract timestamp from `{}`: {}", keyToExtract, e.getMessage()); } } return value; } } ...LatencyGauge클래스 내의 ProcessLatency라는 이름으로 등록 후, 본 stream인 Application에서 LatencyGauge를 호출하고 Vertex를 따로 분리합니다.
Application.java
... deserializedStream .map(new LatencyGauge("source_at")) // source_at를 추출해서 현재 시간과 비교 ...이후 Flink UI에서 메트릭 정보가 들어있는 Vertex를 클릭한 후 Metrics탭을 열어 등록한 클래스인 ProcessLatency를 검색해 필터링하면 메트릭 정보를 확인할 수 있습니다.
안정적으로 End to End 갱신 시간이 5초를 넘지 않는 것을 확인하였고, PoC는 성공적으로 종료되었습니다.
마치며
지금까지 BUDS팀이 실시간 데이터 PoC를 하면서 했었던 일들과 고민들을 정리해봤습니다. 물론 위의 작업들은 PoC 단계인 만큼 서비스 가능한 레벨의 애플리케이션까지 개발하여 실제로 배포하기까지는 먼 길이 남았습니다.
비슷한 사용 사례가 있거나 어려움을 겪고 고민하시는 분들께 도움이 되었기를 바랍니다.