Skip to main content

6) Sink Connector

Chapter Preview


목표

  1. Docker Compose 를 이용하여 데이터를 전달받을 Target DB 서버와 Table Creator 를 구축합니다.
  2. Connect 에 Sink Connector 를 생성합니다.
  3. Target DB 서버에 데이터가 잘 전달되었는지 확인합니다.

스펙 명세서

  1. Docker Compose 를 통해 Sink Connector 로 데이터를 전달하여 저장할 Target PostgreSQL DB 서버와 DB 에 테이블을 생성하는 Table Creator 를 생성합니다.

    • Target DB Server
      • Image : postgres:14.0
      • Container name : target-postgres-server
      • POSTGRES_USER : targetuser
      • POSTGRES_PASSWORD : targetpassword
      • POSTGRES_DB : targetdatabase
      • Port forwarding : 5433:5432
    • Table Creator
      • Dockerfile :
        • psycopg2 패키지를 이용하여 Target DB 서버에 테이블을 생성하는 코드를 만든 후 Dockerfile 에서 해당 코드가 실행되도록 이미지를 만듭니다.
      • Docker Compose :
        • Container name : table-creator
  2. Sink Connector 를 띄우기 위한 설정 파일을 만듭니다.

    {
    "name": "postgres-sink-connector",
    "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "connection.url": "jdbc:postgresql://target-postgres-server:5432/targetdatabase",
    "connection.user": "targetuser",
    "connection.password": "targetpassword",
    "table.name.format": "iris_data",
    "topics": "postgres-source-iris_data",
    "auto.create": false,
    "auto.evolve": false,
    "tasks.max": 2,
    "transforms": "TimestampConverter",
    "transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
    "transforms.TimestampConverter.field": "timestamp",
    "transforms.TimestampConverter.format": "yyyy-MM-dd HH:mm:ss.S",
    "transforms.TimestampConverter.target.type": "Timestamp"
    }
    }
  3. curl 명령어를 이용하여 POST method 로 Sink Connector 를 생성합니다.
    • URL : http://localhost:8083/connectors
    • Header : Content-Type: application/json
  4. Target DB 서버에 접속하여 저장된 데이터를 확인합니다.

https://github.com/mlops-for-mle/mlops-for-mle/tree/main/part7

해당 파트의 전체 코드는 mlops-for-mle/part7/ 에서 확인할 수 있습니다.

part7
├── Makefile
├── README.md
├── connect.Dockerfile
├── create_table.py
├── kafka-docker-compose.yaml
├── naive-docker-compose.yaml
├── sink_connector.json
├── source_connector.json
├── target-docker-compose.yaml
└── target.Dockerfile

1. Architecture

[그림 7-15]은 이번 실습에서 다룰 서비스들의 다이어그램입니다.

[그림7-15] Kafka System
caution

📌 해당 파트는 01. Database 파트의 DB 를 이용합니다.
📌 DB 를 띄우지 않은 경우 01. Database 파트를 완료하고 DB 가 띄워진 상태에서 진행해주세요.

2. Target Postgres Server

2.1 Table Creator

아래의 코드는 Target DB 를 띄운 다음에 테이블만 생성하는 코드입니다. 자세한 내용은 01. Database 파트를 참고해주세요.

create_table.py
# create_table.py
import psycopg2


def create_table(db_connect):
create_table_query = """
CREATE TABLE IF NOT EXISTS iris_data (
id SERIAL PRIMARY KEY,
timestamp timestamp,
sepal_length float8,
sepal_width float8,
petal_length float8,
petal_width float8,
target int
);"""
print(create_table_query)
with db_connect.cursor() as cur:
cur.execute(create_table_query)
db_connect.commit()


if __name__ == "__main__":
db_connect = psycopg2.connect(
user="targetuser",
password="targetpassword",
host="target-postgres-server",
port=5432,
database="targetdatabase",
)
create_table(db_connect)

Dockerfile 을 이용하여 위에서 작성한 스크립트를 실행할 수 있는 이미지를 만듭니다.

target.Dockerfile
# target.Dockerfile
FROM amd64/python:3.9-slim

WORKDIR /usr/app

RUN pip install -U pip &&\
pip install psycopg2-binary

COPY create_table.py create_table.py

ENTRYPOINT ["python", "create_table.py"]

2.2 실행

Docker Compose 를 이용하여 Target DB 서버와 Table Creator 를 띄웁니다.

2.2.1 target-docker-compose.yaml

전체 코드는 아래와 같습니다.

target-docker-compose.yaml
# target-docker-compose.yaml
version: "3"

services:
target-postgres-server:
image: postgres:14.0
container_name: target-postgres-server
ports:
- 5433:5432
environment:
POSTGRES_USER: targetuser
POSTGRES_PASSWORD: targetpassword
POSTGRES_DB: targetdatabase
healthcheck:
test: ["CMD", "pg_isready", "-q", "-U", "targetuser", "-d", "targetdatabase"]
interval: 10s
timeout: 5s
retries: 5

table-creator:
build:
context: .
dockerfile: target.Dockerfile
container_name: table-creator
depends_on:
target-postgres-server:
condition: service_healthy

networks:
default:
name: mlops-network
external: true

2.2.2 실행

docker compose 명령어를 이용하여 위에서 작성한 서비스들을 띄웁니다.

docker compose -p part7-target -f target-docker-compose.yaml up -d
  • -p :
    • Project name 은 part7-target 로 사용합니다.
  • -f :
    • File name 은 위에서 작성한 파일 이름인 target-docker-compose.yaml 을 적어줍니다.

3. Sink Connector

다음으로 브로커의 토픽에서 Target DB 로 데이터를 전달하는 Sink Connector 를 생성합니다.

3.1 생성

Sink Connector 는 Source Connector 와 마찬가지로 Connect 에 API 호출을 통해 생성합니다. 아래 명령어를 통해 Sink Connector 를 띄울 수 있는 sink_connector.json 을 생성합니다.

cat <<EOF > sink_connector.json
{
"name": "postgres-sink-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:postgresql://target-postgres-server:5432/targetdatabase",
"connection.user": "targetuser",
"connection.password": "targetpassword",
"table.name.format": "iris_data",
"topics": "postgres-source-iris_data",
"auto.create": false,
"auto.evolve": false,
"tasks.max": 2,
"transforms": "TimestampConverter",
"transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.TimestampConverter.field": "timestamp",
"transforms.TimestampConverter.format": "yyyy-MM-dd HH:mm:ss.S",
"transforms.TimestampConverter.target.type": "Timestamp"
}
}

EOF

생성하는 Sink Connector 에 대한 설정 파일은 다음과 같습니다.

sink_connector.json
{
"name": "postgres-sink-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:postgresql://target-postgres-server:5432/targetdatabase",
"connection.user": "targetuser",
"connection.password": "targetpassword",
"table.name.format": "iris_data",
"topics": "postgres-source-iris_data",
"auto.create": false,
"auto.evolve": false,
"tasks.max": 2,
"transforms": "TimestampConverter",
"transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.TimestampConverter.field": "timestamp",
"transforms.TimestampConverter.format": "yyyy-MM-dd HH:mm:ss.S",
"transforms.TimestampConverter.target.type": "Timestamp"
}
}

  • name :
    • Connector 의 이름을 정합니다.
  • config :
    • connector.class :
      • Connector 를 생성하기 위한 class 를 설정합니다.
      • 이번 챕터에서는 JDBC Sink Connector 를 사용함으로 io.confluent.connect.jdbc.JdbcSinkConnector 를 기입합니다.
    • connection.url :
      • Target DB 에 접근하기 위한 주소를 설정합니다.
      • 이번 챕터에서는 앞서 띄워두었던 Target DB 서버의 URL 인 jdbc:postgresql://target-postgres-server:5432/targetdatabase 을 기입합니다.
    • connection.user :
      • Target DB 에 접속하기 위한 유저의 이름을 설정합니다.
    • connection.password :
      • Target DB 에 접속하기 위한 유저의 비밀번호를 설정합니다.
    • table.name.format :
      • Target DB 에 전달할 테이블 이름의 format 을 설정합니다.
    • topics :
      • Sink Connector 가 브로커에 있는 토픽들 중에 가져올 토픽을 설정합니다.
    • auto.create :
      • 테이블을 자동으로 생성할 지의 여부를 설정합니다.
      • 이미 테이블을 생성했기 때문에 false 로 설정합니다.
      • 만약 생성된 테이블이 없고 해당 옵션을 true 로 설정한다면, 해당 DB에 자동으로 테이블이 생성되고 Sink Connector로 들어오는 데이터 스키마에 맞춰서 테이블 스키마가 만들어집니다.
    • auto.evolve :
      • Sink Connector 로부터 들어오는 데이터의 스키마와 일치하도록 해당 DB에 있는 테이블의 스키마를 자동으로 변경시킬 지의 여부를 설정합니다.
      • 자동으로 변경하지 않도록 false 로 설정합니다.
    • tasks.max :
      • Connector 에서 task 의 수를 얼마나 가져갈 지를 설정합니다.
    • transforms :
      • 토픽에 있는 string type 의 timestamp 값을 Target DB 로 전달할 때 timestamp type 으로 변경하여 전달해야 합니다. 따라서 Sink Connector 를 생성할 때 transforms 에 있는 Timestamp Converter 를 이용하여 string type 을 timestamp type 으로 변경 후, Target DB 에 데이터를 넣습니다.
      • Transformation 을 적용할 Converter 를 설정합니다.
    • transforms.TimestampConverter.type :
      • Timestamp Converter 의 type 을 설정합니다.
      • Timestamp column 은 key 에 있는 값이 아닌 value 에 있는 값이므로 value 에 대한 Timestamp Converter 를 사용합니다.
    • transforms.TimestampConverter.field :
      • Timestamp Converter 를 적용할 field 를 설정합니다.
      • 토픽에 있는 timestamp column 을 기입합니다.
    • transforms.TimestampConverter.format :
      • Timestamp Converter 의 format 을 설정합니다.
      • Timestamp 의 format 인 yyyy-MM-dd HH:mm:ss.S 를 기입합니다.
    • transforms.TimestampConverter.target.type :
      • Timestamp Converter 를 이용하여 변환한 후에 적용할 type 을 설정합니다.
      • Target DB 에 넣기 위해 timestamp 를 기입합니다.

이제 Sink Connector 생성하는 json 파일을 curl 을 이용하여 Connect 의 REST API 에 POST method 로 보냅니다.

curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d @sink_connector.json

명령어를 실행하면 아래와 같이 출력됩니다.

{"name":"postgres-sink-connector","config":{"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector","connection.url":"jdbc:postgresql://target-postgres-server:5432/targetdatabase","connection.user":"targetuser","connection.password":"targetpassword","table.name.format":"iris_data","topics":"postgres-source-iris_data","auto.create":"false","auto.evolve":"false","tasks.max":"2","name":"postgres-sink-connector"},"tasks":[],"type":"sink"}%

2.2 생성 확인

아래의 GET method 로 현재 Connector 목록을 확인할 수 있습니다. 앞서 생성한 Connector 가 잘 있는지 확인합니다.

curl -X GET http://localhost:8083/connectors

생성된 connector 들이 잘 있는지 확인합니다.

["postgres-sink-connector","postgres-source-connector"]%

이어서 postgres-sink-connector 의 정보를 확인합니다.

curl -X GET http://localhost:8083/connectors/postgres-sink-connector

아래와 같이 출력됩니다.

{"name":"postgres-sink-connector","config":{"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector","table.name.format":"iris_data","connection.password":"targetpassword","auto.evolve":"false","connection.user":"targetuser","topics":"postgres-source-iris_data","tasks.max":"2","name":"postgres-sink-connector","auto.create":"false","connection.url":"jdbc:postgresql://target-postgres-server:5432/targetdatabase"},"tasks":[{"connector":"postgres-sink-connector","task":0},{"connector":"postgres-sink-connector","task":1}],"type":"sink"}%

2.3 데이터 확인

마지막으로 DB 에 데이터가 잘 쌓여있는지 확인해봅니다.

  1. psql 을 이용하여 Target DB 에 접속합니다.

    PGPASSWORD=targetpassword psql -h localhost -p 5433 -U targetuser -d targetdatabase
    psql (14.3, server 14.0 (Debian 14.0-1.pgdg110+1))
    Type "help" for help.

    targetdatabase=#
  2. Select 문을 작성하여 iris_data 테이블에 있는 데이터를 확인합니다.

    targetdatabase=# SELECT * FROM iris_data LIMIT 100;