6) Sink Connector
Chapter Preview
목표
- Docker Compose 를 이용하여 데이터를 전달받을 Target DB 서버와 Table Creator 를 구축합니다.
- Connect 에 Sink Connector 를 생성합니다.
- Target DB 서버에 데이터가 잘 전달되었는지 확인합니다.
스펙 명세서
Docker Compose 를 통해 Sink Connector 로 데이터를 전달하여 저장할 Target PostgreSQL DB 서버와 DB 에 테이블을 생성하는 Table Creator 를 생성합니다.
- Target DB Server
- Image :
postgres:14.0
- Container name :
target-postgres-server
- POSTGRES_USER :
targetuser
- POSTGRES_PASSWORD :
targetpassword
- POSTGRES_DB :
targetdatabase
- Port forwarding :
5433:5432
- Image :
- Table Creator
- Dockerfile :
- psycopg2 패키지를 이용하여 Target DB 서버에 테이블을 생성하는 코드를 만든 후 Dockerfile 에서 해당 코드가 실행되도록 이미지를 만듭니다.
- Docker Compose :
- Container name :
table-creator
- Container name :
- Dockerfile :
- Target DB Server
Sink Connector 를 띄우기 위한 설정 파일을 만듭니다.
{
"name": "postgres-sink-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:postgresql://target-postgres-server:5432/targetdatabase",
"connection.user": "targetuser",
"connection.password": "targetpassword",
"table.name.format": "iris_data",
"topics": "postgres-source-iris_data",
"auto.create": false,
"auto.evolve": false,
"tasks.max": 2,
"transforms": "TimestampConverter",
"transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.TimestampConverter.field": "timestamp",
"transforms.TimestampConverter.format": "yyyy-MM-dd HH:mm:ss.S",
"transforms.TimestampConverter.target.type": "Timestamp"
}
}curl
명령어를 이용하여 POST method 로 Sink Connector 를 생성합니다.- URL :
http://localhost:8083/connectors
- Header :
Content-Type: application/json
- URL :
Target DB 서버에 접속하여 저장된 데이터를 확인합니다.
해당 파트의 전체 코드는 mlops-for-mle/part7/ 에서 확인할 수 있습니다.
part7
├── Makefile
├── README.md
├── connect.Dockerfile
├── create_table.py
├── kafka-docker-compose.yaml
├── naive-docker-compose.yaml
├── sink_connector.json
├── source_connector.json
├── target-docker-compose.yaml
└── target.Dockerfile
1. Architecture
[그림 7-15]은 이번 실습에서 다룰 서비스들의 다이어그램입니다.
📌 해당 파트는 01. Database 파트의 DB 를 이용합니다.
📌 DB 를 띄우지 않은 경우 01. Database 파트를 완료하고 DB 가 띄워진 상태에서 진행해주세요.
2. Target Postgres Server
2.1 Table Creator
아래의 코드는 Target DB 를 띄운 다음에 테이블만 생성하는 코드입니다. 자세한 내용은 01. Database 파트를 참고해주세요.
# create_table.py
import psycopg2
def create_table(db_connect):
create_table_query = """
CREATE TABLE IF NOT EXISTS iris_data (
id SERIAL PRIMARY KEY,
timestamp timestamp,
sepal_length float8,
sepal_width float8,
petal_length float8,
petal_width float8,
target int
);"""
print(create_table_query)
with db_connect.cursor() as cur:
cur.execute(create_table_query)
db_connect.commit()
if __name__ == "__main__":
db_connect = psycopg2.connect(
user="targetuser",
password="targetpassword",
host="target-postgres-server",
port=5432,
database="targetdatabase",
)
create_table(db_connect)
Dockerfile 을 이용하여 위에서 작성한 스크립트를 실행할 수 있는 이미지를 만듭니다.
# target.Dockerfile
FROM amd64/python:3.9-slim
WORKDIR /usr/app
RUN pip install -U pip &&\
pip install psycopg2-binary
COPY create_table.py create_table.py
ENTRYPOINT ["python", "create_table.py"]
2.2 실행
Docker Compose 를 이용하여 Target DB 서버와 Table Creator 를 띄웁니다.
2.2.1 target-docker-compose.yaml
전체 코드는 아래와 같습니다.
# target-docker-compose.yaml
version: "3"
services:
target-postgres-server:
image: postgres:14.0
container_name: target-postgres-server
ports:
- 5433:5432
environment:
POSTGRES_USER: targetuser
POSTGRES_PASSWORD: targetpassword
POSTGRES_DB: targetdatabase
healthcheck:
test: ["CMD", "pg_isready", "-q", "-U", "targetuser", "-d", "targetdatabase"]
interval: 10s
timeout: 5s
retries: 5
table-creator:
build:
context: .
dockerfile: target.Dockerfile
container_name: table-creator
depends_on:
target-postgres-server:
condition: service_healthy
networks:
default:
name: mlops-network
external: true
2.2.2 실행
docker compose
명령어를 이용하여 위에서 작성한 서비스들을 띄웁니다.
docker compose -p part7-target -f target-docker-compose.yaml up -d
- -p :
- Project name 은
part7-target
로 사용합니다.
- Project name 은
- -f :
- File name 은 위에서 작성한 파일 이름인
target-docker-compose.yaml
을 적어줍니다.
- File name 은 위에서 작성한 파일 이름인
3. Sink Connector
다음으로 브로커의 토픽에서 Target DB 로 데이터를 전달하는 Sink Connector 를 생성합니다.
3.1 생성
Sink Connector 는 Source Connector 와 마찬가지로 Connect 에 API 호출을 통해 생성합니다.
아래 명령어를 통해 Sink Connector 를 띄울 수 있는 sink_connector.json
을 생성합니다.
cat <<EOF > sink_connector.json
{
"name": "postgres-sink-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:postgresql://target-postgres-server:5432/targetdatabase",
"connection.user": "targetuser",
"connection.password": "targetpassword",
"table.name.format": "iris_data",
"topics": "postgres-source-iris_data",
"auto.create": false,
"auto.evolve": false,
"tasks.max": 2,
"transforms": "TimestampConverter",
"transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.TimestampConverter.field": "timestamp",
"transforms.TimestampConverter.format": "yyyy-MM-dd HH:mm:ss.S",
"transforms.TimestampConverter.target.type": "Timestamp"
}
}
EOF
생성하는 Sink Connector 에 대한 설정 파일은 다음과 같습니다.
{
"name": "postgres-sink-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:postgresql://target-postgres-server:5432/targetdatabase",
"connection.user": "targetuser",
"connection.password": "targetpassword",
"table.name.format": "iris_data",
"topics": "postgres-source-iris_data",
"auto.create": false,
"auto.evolve": false,
"tasks.max": 2,
"transforms": "TimestampConverter",
"transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.TimestampConverter.field": "timestamp",
"transforms.TimestampConverter.format": "yyyy-MM-dd HH:mm:ss.S",
"transforms.TimestampConverter.target.type": "Timestamp"
}
}
- name :
- Connector 의 이름을 정합니다.
- config :
- connector.class :
- Connector 를 생성하기 위한 class 를 설정합니다.
- 이번 챕터에서는 JDBC Sink Connector 를 사용함으로
io.confluent.connect.jdbc.JdbcSinkConnector
를 기입합니다.
- connection.url :
- Target DB 에 접근하기 위한 주소를 설정합니다.
- 이번 챕터에서는 앞서 띄워두었던 Target DB 서버의 URL 인
jdbc:postgresql://target-postgres-server:5432/targetdatabase
을 기입합니다.
- connection.user :
- Target DB 에 접속하기 위한 유저의 이름을 설정합니다.
- connection.password :
- Target DB 에 접속하기 위한 유저의 비밀번호를 설정합니다.
- table.name.format :
- Target DB 에 전달할 테이블 이름의 format 을 설정합니다.
- topics :
- Sink Connector 가 브로커에 있는 토픽들 중에 가져올 토픽을 설정합니다.
- auto.create :
- 테이블을 자동으로 생성할 지의 여부를 설정합니다.
- 이미 테이블을 생성했기 때문에
false
로 설정합니다. - 만약 생성된 테이블이 없고 해당 옵션을
true
로 설정한다면, 해당 DB에 자동으로 테이블이 생성되고 Sink Connector로 들어오는 데이터 스키마에 맞춰서 테이블 스키마가 만들어집니다.
- auto.evolve :
- Sink Connector 로부터 들어오는 데이터의 스키마와 일치하도록 해당 DB에 있는 테이블의 스키마를 자동으로 변경시킬 지의 여부를 설정합니다.
- 자동으로 변경하지 않도록
false
로 설정합니다.
- tasks.max :
- Connector 에서 task 의 수를 얼마나 가져갈 지를 설정합니다.
- transforms :
- 토픽에 있는 string type 의 timestamp 값을 Target DB 로 전달할 때 timestamp type 으로 변경하여 전달해야 합니다. 따라서 Sink Connector 를 생성할 때 transforms 에 있는 Timestamp Converter 를 이용하여 string type 을 timestamp type 으로 변경 후, Target DB 에 데이터를 넣습니다.
- Transformation 을 적용할 Converter 를 설정합니다.
- transforms.TimestampConverter.type :
- Timestamp Converter 의 type 을 설정합니다.
- Timestamp column 은 key 에 있는 값이 아닌 value 에 있는 값이므로 value 에 대한 Timestamp Converter 를 사용합니다.
- transforms.TimestampConverter.field :
- Timestamp Converter 를 적용할 field 를 설정합니다.
- 토픽에 있는 timestamp column 을 기입합니다.
- transforms.TimestampConverter.format :
- Timestamp Converter 의 format 을 설정합니다.
- Timestamp 의 format 인
yyyy-MM-dd HH:mm:ss.S
를 기입합니다.
- transforms.TimestampConverter.target.type :
- Timestamp Converter 를 이용하여 변환한 후에 적용할 type 을 설정합니다.
- Target DB 에 넣기 위해
timestamp
를 기입합니다.
- connector.class :
이제 Sink Connector 생성하는 json 파일을 curl 을 이용하여 Connect 의 REST API 에 POST method 로 보냅니다.
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d @sink_connector.json
명령어를 실행하면 아래와 같이 출력됩니다.
{"name":"postgres-sink-connector","config":{"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector","connection.url":"jdbc:postgresql://target-postgres-server:5432/targetdatabase","connection.user":"targetuser","connection.password":"targetpassword","table.name.format":"iris_data","topics":"postgres-source-iris_data","auto.create":"false","auto.evolve":"false","tasks.max":"2","name":"postgres-sink-connector"},"tasks":[],"type":"sink"}%
2.2 생성 확인
아래의 GET method 로 현재 Connector 목록을 확인할 수 있습니다. 앞서 생성한 Connector 가 잘 있는지 확인합니다.
curl -X GET http://localhost:8083/connectors
생성된 connector 들이 잘 있는지 확인합니다.
["postgres-sink-connector","postgres-source-connector"]%
이어서 postgres-sink-connector
의 정보를 확인합니다.
curl -X GET http://localhost:8083/connectors/postgres-sink-connector
아래와 같이 출력됩니다.
{"name":"postgres-sink-connector","config":{"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector","table.name.format":"iris_data","connection.password":"targetpassword","auto.evolve":"false","connection.user":"targetuser","topics":"postgres-source-iris_data","tasks.max":"2","name":"postgres-sink-connector","auto.create":"false","connection.url":"jdbc:postgresql://target-postgres-server:5432/targetdatabase"},"tasks":[{"connector":"postgres-sink-connector","task":0},{"connector":"postgres-sink-connector","task":1}],"type":"sink"}%
2.3 데이터 확인
마지막으로 DB 에 데이터가 잘 쌓여있는지 확인해봅니다.
psql
을 이용하여 Target DB 에 접속합니다.PGPASSWORD=targetpassword psql -h localhost -p 5433 -U targetuser -d targetdatabase
psql (14.3, server 14.0 (Debian 14.0-1.pgdg110+1))
Type "help" for help.
targetdatabase=#Select 문을 작성하여
iris_data
테이블에 있는 데이터를 확인합니다.targetdatabase=# SELECT * FROM iris_data LIMIT 100;