6) Sink Connector
Chapter Preview
목표
- Docker Compose 를 이용하여 데이터를 전달받을 Target DB 서버와 Table Creator 를 구축합니다.
- Connect 에 Sink Connector 를 생성합니다.
- Target DB 서버에 데이터가 잘 전달되었는지 확인합니다.
스펙 명세서
- Docker Compose 를 통해 Sink Connector 로 데이터를 전달하여 저장할 Target PostgreSQL DB 서버와 DB 에 테이블을 생성하는 Table Creator 를 생성합니다. - Target DB Server- Image : postgres:14.0
- Container name : target-postgres-server
- POSTGRES_USER : targetuser
- POSTGRES_PASSWORD : targetpassword
- POSTGRES_DB : targetdatabase
- Port forwarding : 5433:5432
 
- Image : 
- Table Creator- Dockerfile : - psycopg2 패키지를 이용하여 Target DB 서버에 테이블을 생성하는 코드를 만든 후 Dockerfile 에서 해당 코드가 실행되도록 이미지를 만듭니다.
 
- Docker Compose :- Container name : table-creator
 
- Container name : 
 
- Dockerfile : 
 
- Target DB Server
- Sink Connector 를 띄우기 위한 설정 파일을 만듭니다. - {
 "name": "postgres-sink-connector",
 "config": {
 "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
 "connection.url": "jdbc:postgresql://target-postgres-server:5432/targetdatabase",
 "connection.user": "targetuser",
 "connection.password": "targetpassword",
 "table.name.format": "iris_data",
 "topics": "postgres-source-iris_data",
 "auto.create": false,
 "auto.evolve": false,
 "tasks.max": 2,
 "transforms": "TimestampConverter",
 "transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
 "transforms.TimestampConverter.field": "timestamp",
 "transforms.TimestampConverter.format": "yyyy-MM-dd HH:mm:ss.S",
 "transforms.TimestampConverter.target.type": "Timestamp"
 }
 }
- curl명령어를 이용하여 POST method 로 Sink Connector 를 생성합니다.- URL : http://localhost:8083/connectors
- Header : Content-Type: application/json
 
- URL : 
- Target DB 서버에 접속하여 저장된 데이터를 확인합니다. 
해당 파트의 전체 코드는 mlops-for-mle/part7/ 에서 확인할 수 있습니다.
part7
├── Makefile
├── README.md
├── connect.Dockerfile
├── create_table.py
├── kafka-docker-compose.yaml
├── naive-docker-compose.yaml
├── sink_connector.json
├── source_connector.json
├── target-docker-compose.yaml
└── target.Dockerfile
1. Architecture
[그림 7-15]은 이번 실습에서 다룰 서비스들의 다이어그램입니다.
 [그림7-15] Kafka System
[그림7-15] Kafka System📌  해당 파트는 01. Database 파트의 DB 를 이용합니다.
📌  DB 를 띄우지 않은 경우 01. Database 파트를 완료하고 DB 가 띄워진 상태에서 진행해주세요.  
2. Target Postgres Server
2.1 Table Creator
아래의 코드는 Target DB 를 띄운 다음에 테이블만 생성하는 코드입니다. 자세한 내용은 01. Database 파트를 참고해주세요.
# create_table.py
import psycopg2
def create_table(db_connect):
    create_table_query = """
    CREATE TABLE IF NOT EXISTS iris_data (
        id SERIAL PRIMARY KEY,
        timestamp timestamp,
        sepal_length float8,
        sepal_width float8,
        petal_length float8,
        petal_width float8,
        target int
    );"""
    print(create_table_query)
    with db_connect.cursor() as cur:
        cur.execute(create_table_query)
        db_connect.commit()
if __name__ == "__main__":
    db_connect = psycopg2.connect(
        user="targetuser",
        password="targetpassword",
        host="target-postgres-server",
        port=5432,
        database="targetdatabase",
    )
    create_table(db_connect)
Dockerfile 을 이용하여 위에서 작성한 스크립트를 실행할 수 있는 이미지를 만듭니다.
# target.Dockerfile
FROM amd64/python:3.9-slim
WORKDIR /usr/app
RUN pip install -U pip &&\
    pip install psycopg2-binary
COPY create_table.py create_table.py
ENTRYPOINT ["python", "create_table.py"]
2.2 실행
Docker Compose 를 이용하여 Target DB 서버와 Table Creator 를 띄웁니다.
2.2.1 target-docker-compose.yaml
전체 코드는 아래와 같습니다.
# target-docker-compose.yaml
version: "3"
services:
  target-postgres-server:
    image: postgres:14.0
    container_name: target-postgres-server
    ports:
      - 5433:5432
    environment:
      POSTGRES_USER: targetuser
      POSTGRES_PASSWORD: targetpassword
      POSTGRES_DB: targetdatabase
    healthcheck:
      test: ["CMD", "pg_isready", "-q", "-U", "targetuser", "-d", "targetdatabase"]
      interval: 10s
      timeout: 5s
      retries: 5
  table-creator:
    build:
      context: .
      dockerfile: target.Dockerfile
    container_name: table-creator
    depends_on:
      target-postgres-server:
        condition: service_healthy
networks:
  default:
    name: mlops-network
    external: true
2.2.2 실행
docker compose 명령어를 이용하여 위에서 작성한 서비스들을 띄웁니다.
docker compose -p part7-target -f target-docker-compose.yaml up -d
- -p :- Project name 은 part7-target로 사용합니다.
 
- Project name 은 
- -f :- File name 은 위에서 작성한 파일 이름인 target-docker-compose.yaml을 적어줍니다.
 
- File name 은 위에서 작성한 파일 이름인 
3. Sink Connector
다음으로 브로커의 토픽에서 Target DB 로 데이터를 전달하는 Sink Connector 를 생성합니다.
3.1 생성
Sink Connector 는 Source Connector 와 마찬가지로 Connect 에 API 호출을 통해 생성합니다.
아래 명령어를 통해 Sink Connector 를 띄울 수 있는 sink_connector.json 을 생성합니다.
cat <<EOF > sink_connector.json
{
    "name": "postgres-sink-connector",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "connection.url": "jdbc:postgresql://target-postgres-server:5432/targetdatabase",
        "connection.user": "targetuser",
        "connection.password": "targetpassword",
        "table.name.format": "iris_data",
        "topics": "postgres-source-iris_data",
        "auto.create": false,
        "auto.evolve": false,
        "tasks.max": 2,
        "transforms": "TimestampConverter",
        "transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
        "transforms.TimestampConverter.field": "timestamp",
        "transforms.TimestampConverter.format": "yyyy-MM-dd HH:mm:ss.S",
        "transforms.TimestampConverter.target.type": "Timestamp"
    }
}
EOF
생성하는 Sink Connector 에 대한 설정 파일은 다음과 같습니다.
{
    "name": "postgres-sink-connector",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "connection.url": "jdbc:postgresql://target-postgres-server:5432/targetdatabase",
        "connection.user": "targetuser",
        "connection.password": "targetpassword",
        "table.name.format": "iris_data",
        "topics": "postgres-source-iris_data",
        "auto.create": false,
        "auto.evolve": false,
        "tasks.max": 2,
        "transforms": "TimestampConverter",
        "transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
        "transforms.TimestampConverter.field": "timestamp",
        "transforms.TimestampConverter.format": "yyyy-MM-dd HH:mm:ss.S",
        "transforms.TimestampConverter.target.type": "Timestamp"
    }
}
- name :- Connector 의 이름을 정합니다.
 
- config :- connector.class :- Connector 를 생성하기 위한 class 를 설정합니다.
- 이번 챕터에서는 JDBC Sink Connector 를 사용함으로 io.confluent.connect.jdbc.JdbcSinkConnector를 기입합니다.
 
- connection.url :- Target DB 에 접근하기 위한 주소를 설정합니다.
- 이번 챕터에서는 앞서 띄워두었던 Target DB 서버의 URL 인 jdbc:postgresql://target-postgres-server:5432/targetdatabase을 기입합니다.
 
- connection.user :- Target DB 에 접속하기 위한 유저의 이름을 설정합니다.
 
- connection.password :- Target DB 에 접속하기 위한 유저의 비밀번호를 설정합니다.
 
- table.name.format :- Target DB 에 전달할 테이블 이름의 format 을 설정합니다.
 
- topics :- Sink Connector 가 브로커에 있는 토픽들 중에 가져올 토픽을 설정합니다.
 
- auto.create :- 테이블을 자동으로 생성할 지의 여부를 설정합니다.
- 이미 테이블을 생성했기 때문에 false로 설정합니다.
- 만약 생성된 테이블이 없고 해당 옵션을 true로 설정한다면, 해당 DB에 자동으로 테이블이 생성되고 Sink Connector로 들어오는 데이터 스키마에 맞춰서 테이블 스키마가 만들어집니다.
 
- auto.evolve :- Sink Connector 로부터 들어오는 데이터의 스키마와 일치하도록 해당 DB에 있는 테이블의 스키마를 자동으로 변경시킬 지의 여부를 설정합니다.
- 자동으로 변경하지 않도록 false로 설정합니다.
 
- tasks.max :- Connector 에서 task 의 수를 얼마나 가져갈 지를 설정합니다.
 
- transforms :- 토픽에 있는 string type 의 timestamp 값을 Target DB 로 전달할 때 timestamp type 으로 변경하여 전달해야 합니다. 따라서 Sink Connector 를 생성할 때 transforms 에 있는 Timestamp Converter 를 이용하여 string type 을 timestamp type 으로 변경 후, Target DB 에 데이터를 넣습니다.
- Transformation 을 적용할 Converter 를 설정합니다.
 
- transforms.TimestampConverter.type :- Timestamp Converter 의 type 을 설정합니다.
- Timestamp column 은 key 에 있는 값이 아닌 value 에 있는 값이므로 value 에 대한 Timestamp Converter 를 사용합니다.
 
- transforms.TimestampConverter.field :- Timestamp Converter 를 적용할 field 를 설정합니다.
- 토픽에 있는 timestamp column 을 기입합니다.
 
- transforms.TimestampConverter.format :- Timestamp Converter 의 format 을 설정합니다.
- Timestamp 의 format 인 yyyy-MM-dd HH:mm:ss.S를 기입합니다.
 
- transforms.TimestampConverter.target.type :- Timestamp Converter 를 이용하여 변환한 후에 적용할 type 을 설정합니다.
- Target DB 에 넣기 위해 timestamp를 기입합니다.
 
 
- connector.class :
이제 Sink Connector 생성하는 json 파일을 curl 을 이용하여 Connect 의 REST API 에 POST method 로 보냅니다.
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d @sink_connector.json
명령어를 실행하면 아래와 같이 출력됩니다.
{"name":"postgres-sink-connector","config":{"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector","connection.url":"jdbc:postgresql://target-postgres-server:5432/targetdatabase","connection.user":"targetuser","connection.password":"targetpassword","table.name.format":"iris_data","topics":"postgres-source-iris_data","auto.create":"false","auto.evolve":"false","tasks.max":"2","name":"postgres-sink-connector"},"tasks":[],"type":"sink"}%
2.2 생성 확인
아래의 GET method 로 현재 Connector 목록을 확인할 수 있습니다. 앞서 생성한 Connector 가 잘 있는지 확인합니다.
curl -X GET http://localhost:8083/connectors
생성된 connector 들이 잘 있는지 확인합니다.
["postgres-sink-connector","postgres-source-connector"]%
이어서 postgres-sink-connector 의 정보를 확인합니다.
curl -X GET http://localhost:8083/connectors/postgres-sink-connector
아래와 같이 출력됩니다.
{"name":"postgres-sink-connector","config":{"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector","table.name.format":"iris_data","connection.password":"targetpassword","auto.evolve":"false","connection.user":"targetuser","topics":"postgres-source-iris_data","tasks.max":"2","name":"postgres-sink-connector","auto.create":"false","connection.url":"jdbc:postgresql://target-postgres-server:5432/targetdatabase"},"tasks":[{"connector":"postgres-sink-connector","task":0},{"connector":"postgres-sink-connector","task":1}],"type":"sink"}%
2.3 데이터 확인
마지막으로 DB 에 데이터가 잘 쌓여있는지 확인해봅니다.
- psql을 이용하여 Target DB 에 접속합니다.- PGPASSWORD=targetpassword psql -h localhost -p 5433 -U targetuser -d targetdatabase
 psql (14.3, server 14.0 (Debian 14.0-1.pgdg110+1))
 Type "help" for help.
 targetdatabase=#
- Select 문을 작성하여 - iris_data테이블에 있는 데이터를 확인합니다.- targetdatabase=# SELECT * FROM iris_data LIMIT 100;