Kafka Connect로 DB 데이터 쉽게 연동하기

1 week ago 4

Kafka Connect와 JDBC 커넥터를 이용해 DB 데이터를 쉽게 Kafka로 전송하는 방법과 발생 가능한 문제를 해결하는 방법을 공유합니다.

김소라 게시 날짜: 2025.03.04.

안녕하세요. 컬리 데이터베이스개발팀 김소라입니다.
현재 컬리는 다양한 서비스와 시스템 운영을 위해 여러 종류의 데이터베이스를 사용하고 있습니다. 하지만 각 데이터베이스의 구조와 처리 방식이 달라 관리가 복잡하고 활용하기 어렵기 때문에, 데이터를 하나의 분석 플랫폼으로 통합하는 다양한 파이프라인을 운영하고 있습니다. 그 중 오늘은 카프카 커넥트(Kafka Connect)를 활용한 데이터 파이프라인에 대해 설명해드리려고 합니다.

카프카 커넥트(Kafka Connect)란?

Kafka Connect는 카프카 메시징 시스템을 기반으로 다양한 데이터 소스 시스템, 예를들어 RDBMS, NoSQL 혹은 csv와 같은 파일이나 로그 등의 데이터 소스에서 발생한 이벤트를 다른 데이터 타겟 시스템으로 별도의 코딩 없이 실시간으로 전달할 수 있는 Kafka Component 중 하나입니다.

카프카 커넥트의 장점

카프카 커넥트의 가장 큰 장점으로는 별도의 코딩없이 파이프라인 구성이 가능하다는 점 입니다. 다양한 데이터 소스 시스템의 이벤트를 별도의 kafka client를 이용한 코딩 없이 json 형태의 config만 정의하면 카프카를 통해 안전하게 다른 시스템에 동기화할 수 있습니다. 그래서 반복적인 데이터 파이프라인을 구성할 때 유리합니다.

또한 카프카 커넥트는 다양한 데이터 소스와 시스템 간의 연동을 지원하는 커넥터를 제공합니다. 최근 오픈 소스의 활성화와 클라우드 서비스의 확산으로 데이터 소스 시스템의 종류가 급격히 다양해졌는데, 이에 맞춰 카프카 커넥트는 여러 종류의 커넥터를 지원하며, 오픈 소스로도 제공되고 있습니다. 이를 통해 비용 절감은 물론 필요에 따라 커스터마이징도 가능합니다.

이러한 장점들은 서로 다른 서비스 간 데이터 연동이나 운영 데이터베이스의 데이터를 스테이징 영역으로 복사하는 등의 다양한 상황에 유용하게 활용될 수 있습니다. 컬리 또한 다양한 소스 데이터베이스의 데이터를 카프카 메시지로 바꿔 타겟 시스템에 적재하는 데이터 파이프라인 구성에 활용하고 있습니다.

카프카 커넥트 아키텍처

  • 카프카 커넥트(Kafka Connect) 는 커넥터로 구성되어있는 프레임워크이며 커넥터를 동작시키는 역할을 합니다.
  • 커넥터(Connector) 는 카프카 커넥트 내부의 실제 메시지 파이프라인이며 데이터를 실질적으로 처리하는 태스크(task)들을 관리합니다.
  • 태스크(Task) 는 실제로 데이터를 가져오거나 넣는 일을 수행하는 단위로, thread 레벨로 수행됩니다.
  • 워커(Worker) 는 카프카 커넥트 프로세스가 실행되는 서버 또는 인스턴스를 의미합니다.

config를 제출해 데이터 처리가 호출되면 워커 내부의 커넥터들에 의해 테스크들이 생성되고 파이프라인이 구동됩니다. 그 후 테스크 내부에서 카프카와의 전송이 가능하도록 메시지 형식과 포맷을 변환하여, 외부 시스템과 카프카 간에 데이터를 전송합니다.

위의 그림은 데이터를 변환하는 구조를 나타낸 그림입니다.
커넥터 인스턴스에서 소스 시스템의 데이터를 메세지 형태로 가져오면 Transform(SMT: Single Message Transform)으로 필요 시 메시지를 변환할 수 있습니다. 그리고 이 변환된 메시지를 Converter에서 json이나 avro등 적절한 포맷으로 변환해 카프카와 주고 받는 역할을 하게 됩니다.
한마디로 Transform은 데이터의 내용을 변환하고, Converter는 데이터의 포맷을 변환한다고 볼 수 있습니다. Transform과 Converter는 커넥터를 실행할 때 json config 안에서 정의합니다.

소스(Source)와 싱크(Sink) 커넥터

실제 메시지 파이프라인을 구성해 데이터를 처리하는 커넥터는 소스(Source)와 싱크(Sink) 커넥터로 나눌 수 있습니다.

그림과 같이 소스 커넥터는 데이터베이스에서 카프카로 메시지를 적재하는 역할을 합니다. 싱크 커넥터는 카프카에서 타겟 데이터베이스 시스템으로 데이터를 적재합니다.

Confluent는 다양한 데이터 시스템과의 통합을 쉽게 할 수 있도록 여러 종류의 소스(Source) 및 싱크(Sink) 커넥터를 기본적으로 제공합니다. 이 커넥터들은 Confluent Hub에서 다운로드하고 관리할 수 있으며 설치, 관리, 업그레이드를 손쉽게 처리할 수 있습니다.

이 중 JDBC Connector는 JDBC 호환 데이터베이스의 테이블에 주기적으로 SQL 쿼리를 보내 변경된 데이터 추출하여 카프카로 적재 하거나, 카프카의 데이터를 테이블에 삽입하는 기능을 제공합니다. 컬리에서는 JDBC 소스(Source) 커넥터를 이용해 MySQL, Oracle, Postgresql 등의 다양한 데이터베이스의 데이터를 카프카를 거쳐 타겟 데이터베이스로 동기화하는 파이프라인을 운영하고 있습니다.

JDBC 소스 커넥터 (JDBC Source Connector)

JDBC 소스 커넥터는 관계형 데이터베이스(RDBMS)에서 주기적으로 SQL 쿼리를 실행해 데이터를 카프카로 전송합니다. 이 커넥터는 매 풀링 주기마다 모든 데이터를 추출하는 Bulk 모드와 사용자 지정 쿼리를 통해 필요한 데이터만 추출하는 Custom Query 모드, 이전 풀링과 비교하여 새로운 데이터만 추출하는 증분 쿼리 모드(Incremental Query Mode) 등의 쿼리 모드를 지원합니다. 이번에는 증분 쿼리 모드에 대해서 설명 드리겠습니다.

증분 쿼리 모드는 이전 poll과 비교하여 새로운 데이터(row)만을 정의하고, 해당 데이터만 카프카에 적재하는 방식으로 효율적인 데이터 처리가 가능합니다. 각 poll마다 마지막으로 처리된 레코드의 특정 컬럼 값을 오프셋으로 저장하여 이전에 처리된 데이터를 건너뛰고 새로운 데이터만 추출할 수 있기 때문입니다. 증분 쿼리 모드에는 아래의 세가지 모드가 포함되며, 이 중 하나를 선택하여 원하는 방식으로 데이터를 적재할 수 있습니다.

  1. Incrementing Mode: 테이블의 Auto Increment(primary key) 컬럼을 기준으로 더 큰 값을 가진 행이 추가될 때 카프카에 적재합니다.
  2. Timestamp Mode: 테이블의 timestamp 컬럼을 기준으로 더 큰 timestamp 값을 가진 행이 추가될 때 카프카에 적재합니다.
  3. Timestamp and Incrementing Mode: Incrementing모드와 Timestamp 모드를 결합하여, 더 큰 PK 값이나 timestamp 값이 추가될 때 카프카에 적재합니다. 이 모드는 동일한 timestamp 값을 가진 여러 레코드가 카프카로 적재되는 과정에서 부분적으로 실패하더라도, PK 값을 이용해 처리되지 않은 레코드를 정확하게 감지하여 재전송할 수 있도록 합니다.

JDBC 소스 커넥터를 사용하는 이유

그럼 컬리에서는 왜 JDBC 소스 커넥터를 사용할까요?

CDC(Change Data Capture)는 데이터베이스나 시스템에서 데이터의 모든 변경점을 추적하여 이를 외부 시스템으로 전달하는 기술입니다. 카프카 커넥트를 활용하면 CDC 방식으로 데이터베이스의 변경 사항을 실시간으로 카프카로 스트리밍하고, 다른 시스템으로 전달하는 데이터 파이프라인을 쉽게 구축할 수 있습니다.

CDC에는 크게 로그 기반(log-based) 방식과 쿼리 기반(query-based) 방식이 존재합니다.

먼저 로그 기반 방식은 데이터베이스에 존재하는 바이너리 파일인 트랜잭션 로그를 이용하는 방식을 의미합니다. 트랜잭션 로그는 작업 유형, 영향을 받는 행 또는 레코드, 트랜잭션 타임스탬프 등 데이터베이스에 대한 모든 변경 사항을 기록합니다. 따라서 삽입, 수정, 삭제와 같은 변경이 발생하면 즉시 트랜잭션 로그에 기록되고, 이를 통해 변경 사항을 추적할 수 있습니다. 이 방식은 데이터베이스를 폴링(polling)하지 않고 트랜잭션 로그를 읽기 때문에 대기 시간이 짧고, 데이터베이스에 가해지는 부하도 줄어듭니다. 또한, 모든 변경 사항이 캡처되므로 데이터의 충실도도 높습니다. 가장 직관적이고 효율적인 CDC 방식이라고 볼 수 있습니다. 카프카 커넥트에서도 이러한 로그 기반 방식을 지원하는 커넥터를 제공하고 있으며, 대표적으로 Debezium 커넥터가 있습니다.

출처: Ingest Data from Databases into Kafka with Change Data Capture (CDC)

그러나 로그 기반 방식은 구현이 복잡합니다. 실시간 트랜잭션 로그와 쓰기 지연에 대한 의존성이 높으며, 로그에 엑세스하기 위해 더 많은 설정 단계와 높은 권한이 필요합니다. DB에 따라 별도의 로그 분석 도구가 필요할 수도 있습니다. 이는 모든 트랜잭션 로그(예: MySQL의 BinLog, PostgreSQL의 WAL, Oracle의 Redo Log, Mongo DB의 Oplog 등)가 개념적으로는 동일하지만 구현 방식이 다르기 때문입니다. 이러한 복잡한 설정 단계는 구현을 더 어렵게 만들고, 분석 도구에 따라 변경량이 많을수록 DB 서버의 I/O 또는 CPU에도 부하를 줄 수 있습니다.

출처: Ingest Data from Databases into Kafka with Change Data Capture (CDC)

반면, 쿼리 기반 방식은 주기적으로 쿼리를 실행하여 데이터를 추출하기 때문에 구현이 간단하며 쿼리를 실행하기 위한 테이블 읽기 권한만 있으면 됩니다. 또한 로그 의존도가 없고 DB별로 별도의 로그 분석 도구도 필요 없으며, 다양한 데이터베이스 시스템에서 동일한 방식으로 구현할 수 있어 이식성이 뛰어나고 구축 비용 또한 낮습니다. 카프카 커넥트에서는 JDBC 커넥터를 통해 쿼리 기반 CDC를 운영할 수 있습니다. 현재 컬리에서는 로그 기반의 CDC를 주로 활용하고 있으나, DB의 설정 변경이 어려운 경우 JDBC 소스 커넥터를 활용하여 쿼리 기반 방식의 CDC를 적용하고 있습니다.

JDBC 소스 커넥터 제약사항

하지만 JDBC 소스 커넥터에는 몇 가지 제약이 있습니다.

첫번째로 증분 쿼리 모드를 사용하려면 데이터의 증분을 추적하기 위해 auto increment 속성을 가진 PK 컬럼이나 timestamp 컬럼이 필요합니다. 다만, 대부분의 데이터베이스 테이블에는 PK나 timestamp 컬럼이 포함되어 있어 큰 문제가 되지 않습니다.

두번째로 JDBC 소스 커넥터는 주기적으로 쿼리를 실행하므로 설정한 주기에 따라 카프카로 데이터가 전송되기까지 지연 시간이 발생할 수 있습니다. 그렇기 때문에 데이터의 지연 시간을 허용할 수 있는 테이블에 커넥터를 적용하는 것이 좋습니다.

세번째로 데이터 누락이 발생할 수 있습니다. 주기적인 쿼리 실행만으로는 데이터베이스의 모든 변경 사항을 완벽하게 추적할 수 없습니다. 이는 쿼리 기반 방식의 한계로 데이터 변경 작업 종류나 쿼리 실행 주기의 타이밍 등 다양한 상황에 따라 누락이 발생할 수 있으며, 이러한 누락은 데이터 분석 결과에 큰 영향을 미칠 수 있습니다.

JDBC 소스 커넥터의 데이터 누락 극복하기

데이터 누락이 분석에 큰 영향을 미치는 만큼 이를 방지하는 것이 매우 중요합니다. JDBC 소스 커넥터를 사용하며 데이터 누락이 발생할 수 있는 다양한 케이스를 상세히 살펴보고, 이를 예방하거나 보완할 수 있는 방법들에 대해 자세히 다뤄보겠습니다.

데이터 누락 케이스 1 - 변경을 감지하지 못하는 케이스

JDBC 소스 커넥터를 사용하면 테이블에서 삭제(delete) 또는 갱신(update) 작업이 발생할 경우 데이터 누락이 발생할 수 있습니다. 이 케이스에 대해 자세히 살펴보겠습니다.

1-1. 데이터의 삭제(delete) 감지 불가

컬리의 유저가 장바구니에 과일을 담는 예시를 들어보겠습니다. timestamp 모드 및 쿼리 주기(poll interval)를 2분으로 세팅하였으며, 저장된 오프셋은 10시라고 가정합니다.

유저가 10시 1분에 배를 장바구니에 담아 배 데이터가 테이블에 삽입됩니다.

그리고 10시 2분에 주기를 맞아 데이터를 카프카로 전송하는 쿼리를 실행합니다. 쿼리의 조건에 이전에 저장된 오프셋 정보는 start time으로, 쿼리 실행 시간은 end time으로 설정되어 10시부터 10시 2분까지의 모든 데이터를 전송하게 됩니다. 따라서 10시 1분에 장바구니에 담은 배 데이터가 카프카로 전송되며, 오프셋은 전송된 데이터의 가장 마지막 타임스탬프 값으로 갱신되어 10시 1분으로 저장됩니다.

그런데 10시 3분에 유저가 장바구니에서 1분에 담았던 배를 다시 꺼내며 테이블에 삽입되었던 배의 데이터도 삭제됩니다. 이후 4분에 주기가 되어 쿼리가 실행되고, 이때의 쿼리 조건은 이전 오프셋인 1분부터 현재 시간인 4분까지로 설정됩니다. 그러나 배 데이터는 이미 테이블에서 삭제되어 남아있지 않기 때문에 카프카로 전송되지 않으며, 배가 제거되었다는 정보를 알 수 없습니다. 이처럼 삭제(delete)된 레코드에 대한 정보는 Incrementing, Timestamp 모드 모두 감지할 수 없습니다.

1-2. 데이터의 갱신(update) 감지 불가

Auto Increment 컬럼을 기준으로 쿼리 조건을 사용하는 Incrementing 모드의 경우, 추가적인 데이터 누락이 발생할 수 있습니다. 마찬가지로 컬리의 유저가 장바구니에 과일을 담는 예시를 들어보겠습니다.

id필드가 Auto Increment 컬럼이라고 가정합니다. id가 1인 배를 담은 데이터와 id가 2인 사과 데이터를 카프카로 전송한 후, 오프셋의 id는 2로 갱신됩니다.

이후 id가 1인 배를 포도로 변경한다고 가정합니다.

쿼리문에는 오프셋으로 저장된 id 값인 2보다 큰 값이 조건으로 포함되기 때문에, id가 1인 데이터를 업데이트하더라도 변경된 이벤트로 감지되지 않아 해당 이벤트를 전송할 수 없습니다.

이와 같이 Incrementing 모드는 Auto Increment 컬럼 값의 갱신이 이루어지지 않으므로 데이터의 삭제(delete)뿐만 아니라 갱신(update)도 감지할 수 없습니다.

데이터 누락 케이스 2 - 쿼리 주기 간 여러번 변경되는 케이스

두번째 케이스는 쿼리 주기 사이에 발생한 여러 번의 이벤트가 누락되는 경우입니다. 그림과 같이 하나의 주기 내에서 장바구니에 배를 담았다가 다시 빼고 사과를 담았다고 가정합니다. 이 상태에서 쿼리를 실행하면 남아 있는 사과 데이터만 전송되고, 배를 담았던 데이터는 누락됩니다.

이처럼 쿼리 기반으로 동작하는 JDBC 소스 커넥터는 모든 데이터의 갱신(update)이나 삭제(delete)와 같은 변경 사항을 감지하기 어렵다는 한계가 있습니다. 따라서 JDBC 소스 커넥터는 모든 데이터의 변경 사항을 수집할 필요성이 없을 때, 즉 쿼리 시점의 최신 상태만 필요한 경우에 사용하기 적합하며 데이터의 갱신/삭제 작업 없이 삽입(insert)만 이루어지는 로그성 테이블에 활용하는 것이 좋습니다.

데이터 누락 케이스 3 - 순서가 일치하지 않는 이벤트

마지막 케이스로 순서가 변경되는 이벤트가 있습니다. 긴 트랜잭션, 혹은 네트워크의 지연 등으로 인해 먼저 적재되어야 할 이벤트가 뒤늦게 적재되어 발생하는 경우입니다.

timestamp 모드로 쿼리 주기를 5분으로 세팅하고, 저장된 오프셋은 10시이며 10시 2분에 배를 담았으나 트랜잭션이 완료되지 않은 상태라고 가정합니다.

4분에 사과를 담고, 5분에 주기가 찾아와 쿼리를 실행합니다. JDBC 소스 커넥터는 기본적으로 커밋이 완료된 데이터만 전송 대상 데이터로 포함합니다. (트랜잭션 격리 수준을 제어하는 모드(transaction.isolation.mode)를 제공하지만, 데이터의 무결성을 위해 기본(default) 모드를 사용한다고 가정합니다.) 따라서 아직 트랜잭션이 끝나지 않은 배 데이터는 조건에 포함되지 않고 사과 데이터만 카프카로 전송되며, 오프셋 정보 또한 사과 데이터의 값인 4분으로 업데이트 됩니다.

이후 6분에 배 데이터의 트랜잭션이 완료되었다고 가정합니다. 그러나 10분에 주기가 찾아와 쿼리를 실행할 때의 조건은 이전에 전송된 사과 데이터의 오프셋 정보인 4분보다 크고, 쿼리 실행시각인 10분보다 작은 데이터를 대상으로 하게 됩니다. 따라서 2분에 삽입된 배 데이터는 조건에 포함되지 않아 카프카로 전송되지 않고 누락됩니다.

timestamp.delay.interval.ms 로 누락 방어하기

timestamp.delay.interval.ms 옵션은 timestamp 모드 사용 시 적용할 수 있는 옵션으로, 데이터를 카프카로 적재하기 전 까지 일정 시간 동안 지연을 둘 수 있습니다. 위와 같이 데이터가 지연되어 적재되는 경우 timestamp.delay.interval.ms를 적용하여 데이터 누락을 줄일 수 있습니다.
다음은 timestamp.delay.interval.ms 의 적용 예시입니다.

이전 예제와 같이 timestamp 모드에서 쿼리 주기는 5분, 저장된 오프셋은 10시이며, 추가로 timestamp.delay.interval.ms를 2분으로 지정했다고 가정합니다. 마찬가지로 2분에 배를 담았으나, 트랜잭션이 완료되지 않은 상태의 데이터가 있습니다. 이후 4분에 사과 데이터의 적재가 일어납니다.

주기인 5분이 되어 쿼리를 실행합니다. 이 때 timestamp.delay.interval.ms 옵션이 2분으로 적용되어 쿼리의 조건에 현재 쿼리 실행 시각인 5분이 아닌, 5분에서 2분만큼의 값을 뺀 3분까지의 데이터만 카프카로 적재하게 됩니다. 따라서 아직 트랜잭션이 완료되지 않은 2분의 사과 데이터도 적재되지 않지만, 트랜잭션이 완료된 4분의 사과 데이터 또한 카프카로 적재되지 않습니다. 이에 따라 커넥터의 오프셋 정보도 업데이트 되지 않습니다.

이후 6분에 배 데이터의 트랜잭션이 완료됩니다. 그리고 쿼리 주기인 10분에서 옵션으로 지정한 2분을 뺀 8분까지의 데이터를 적재하게 됩니다. 이에 따라 4분에 적재된 사과 데이터와 더불어 트랜잭션이 길었던 2분의 배 데이터도 모두 카프카로 적재됩니다. 이렇게 timestamp.delay.interval.ms 옵션은 이벤트 발생 후 적재까지 임의의 지연을 두어 데이터 처리의 순서와 정합성을 유지할 수 있습니다.

timestamp.delay.interval.ms 주의사항

하지만 이 옵션을 적용할 때의 주의사항이 있습니다. 해당 옵션의 값이 크면 클 수록 포함할 수 있는 이벤트는 많아지겠지만 그만큼 적재까지의 딜레이도 증가하게 됩니다.

위 그림과 같이 모든 데이터가 누락없이 카프카로 적재되기는 하나, 5분 실행 쿼리에 포함되어야 할 데이터가 10분 쿼리에 포함되어 적재까지의 딜레이가 커진 것을 확인할 수 있습니다. 따라서 해당 옵션을 무작정 크게 설정하는 것은 바람직하지 않으며, 트랜잭션 타임아웃 시간 등 데이터의 특성에 맞춰 적절한 값을 설정하여 데이터 적재 지연과 누락을 균형 있게 처리해야 합니다.

정리하며

JDBC 소스 커넥터는 데이터베이스와 카프카 간의 데이터를 손쉽게 전송할 수 있는 강력한 도구입니다. 다양한 쿼리 모드, 특히 증분 쿼리 모드를 활용하면 변경된 데이터만 효율적으로 추출할 수 있어 성능과 리소스 관리 측면에서 매우 유리합니다. 물론 위에서 서술한 바와 같이 몇 가지 제약 사항은 존재합니다. 하지만 데이터의 특성에 맞게 적절히 활용한다면 복잡한 권한 설정이나 구성이 없이 간단하게 데이터 파이프라인을 구축할 수 있습니다. 해당 포스팅 내용을 통해 더 효율적이고 안정적인 데이터 파이프라인 구축에 도움이 되기를 바랍니다. 감사합니다.

Read Entire Article