4) Kafka System
Chapter Preview
목표
- Docker Compose 를 이용하여 Zookeeper, Broker, Schema Registry, Connect 를 생성합니다.
스펙 명세서
Zookeeper, Broker, Schema Registry, Connect 를 생성하는 Docker Compose 파일을 작성합니다.
Zookeeper
- Service name :
zookeeper
- Image :
confluentinc/cp-zookeeper:7.3.0
- Container name :
zookeeper
- Port :
2181:2181
- Environment
- ZOOKEEPER_SERVER_ID :
1
- ZOOKEEPER_CLIENT_PORT :
2181
- ZOOKEEPER_SERVER_ID :
- Service name :
Broker
- Service name :
broker
- Image :
confluentinc/cp-kafka:7.3.0
- Container name :
broker
- Port :
9092:9092
- Environment
- KAFKA_BROKER_ID :
1
- KAFKA_ZOOKEEPER_CONNECT :
zookeeper:2181
- KAFKA_ADVERTISED_LISTENERS :
PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP :
PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
- KAFKA_INTER_BROKER_LISTENER_NAME :
PLAINTEXT
- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR :
1
- KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS :
0
- KAFKA_BROKER_ID :
- Service name :
Schema-registry
- Service name :
schema-registry
- Image :
confluentinc/cp-schema-registry:7.3.0
- Container name :
schema-registry
- Port :
8081:8081
- Environment
- SCHEMA_REGISTRY_HOST_NAME :
schema-registry
- SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS :
broker:29092
- SCHEMA_REGISTRY_LISTENERS :
http://schema-registry:8081
- SCHEMA_REGISTRY_HOST_NAME :
- Service name :
Connect
- Service name :
connect
- Image :
confluentinc/cp-connect:7.3.0
를 이용한 Dockerfile- Environment- CONNECT_PLUGIN_PATH :
“/usr/share/java,/usr/share/confluent-hub-components”
- Run
confluent-hub install --no-prompt snowflakeinc/snowflake-kafka-connector:1.5.5
confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:10.2.2
confluent-hub install --no-prompt confluentinc/kafka-connect-json-schema-converter:7.3.0
- Environment- CONNECT_PLUGIN_PATH :
- Container name :
connect
- Port :
8083:8083
- Environment
- CONNECT_BOOTSTRAP_SERVERS :
schema-registry
- CONNECT_BOOTSTRAP_SERVERS :
broker:29092
- CONNECT_REST_ADVERTISED_HOST_NAME :
connect
- CONNECT_GROUP_ID :
docker-connect-group
- CONNECT_CONFIG_STORAGE_TOPIC :
docker-connect-configs
- CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR :
1
- CONNECT_OFFSET_STORAGE_TOPIC :
docker-connect-offsets
- CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR :
1
- CONNECT_STATUS_STORAGE_TOPIC :
docker-connect-status
- CONNECT_STATUS_STORAGE_REPLICATION_FACTOR :
1
- CONNECT_KEY_CONVERTER :
org.apache.kafka.connect.storage.StringConverter
- CONNECT_VALUE_CONVERTER :
org.apache.kafka.connect.json.JsonConverter
- CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL :
http://schema-registry:8081
- CONNECT_BOOTSTRAP_SERVERS :
- Service name :
Docker Compose 파일을 실행합니다.
docker ps
명령어를 이용하여 잘 띄워졌는지 확인합니다.
해당 파트의 전체 코드는 mlops-for-mle/part7/ 에서 확인할 수 있습니다.
part7
├── Makefile
├── README.md
├── connect.Dockerfile
├── create_table.py
├── kafka-docker-compose.yaml
├── naive-docker-compose.yaml
├── sink_connector.json
├── source_connector.json
├── target-docker-compose.yaml
└── target.Dockerfile
0. 환경 설정
📌 이번 챕터를 진행하기 위해서는 위해서 앞서 2) Producer & Consumer 챕터에서 실행된 Docker Compose 를 종료해야 합니다.
아래 명령어를 통해 종료합니다.
docker compose -p part7-naive down -v
1. Architecture
[그림 7-13]은 이번 실습에서 다룰 서비스들의 다이어그램입니다.
각각의 서비스에 대해 알아보겠습니다.
- Zookeeper : 브로커 서버의 상태 감지를 위해 사용되는 주키퍼 서버입니다.
- Broker : Source Connector 에서 데이터를 받아 토픽에 저장하고, Sink Connector 로 데이터를 넘겨줄 브로커 서버입니다. 이번 챕터에서는 다중 브로커가 아닌 단일 브로커를 사용합니다.
- Schema Registry : 메시지의 schema 를 저장하기 위한 Schema Registry 서버입니다.
- Connect : Connector 를 띄우기 위한 Connect 서버입니다.
이제 서비스를 하나씩 구체적으로 살펴보겠습니다.
2. Kafka System
2.1 Zookeeper & Broker
주키퍼와 브로커를 띄우는 코드는 2) Producer & Consumer 챕터에서 작성한 내용을 사용합니다.
version: "3"
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.3.0
container_name: zookeeper
ports:
- 2181:2181
environment:
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_CLIENT_PORT: 2181
broker:
image: confluentinc/cp-kafka:7.3.0
container_name: broker
depends_on:
- zookeeper
ports:
- 9092:9092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
2.2 Schema Registry
다음으로 Schema Registry 를 띄우는 코드에 대해 알아보겠습니다.
version: "3"
services:
schema-registry:
image: confluentinc/cp-schema-registry:7.3.0
container_name: schema-registry
depends_on:
- broker
ports:
- 8081:8081
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: broker:29092
SCHEMA_REGISTRY_LISTENERS: http://schema-registry:8081
- image :
- Schema Registry 의 이미지로
confluentinc/cp-schema-registry:7.3.0
를 이용합니다.
- Schema Registry 의 이미지로
- container_name :
- 컨테이너의 이름은
schema-registry
로 사용하겠습니다.
- 컨테이너의 이름은
- depends_on :
broker
가 먼저 실행된 후 Schema Registry 가 실행되어야 합니다.
- ports :
- Schema Registry 의 포트인
8081:8081
을 포트 포워딩합니다.
- Schema Registry 의 포트인
Schema Registry 의 환경 변수는 다음과 같습니다.
- SCHEMA_REGISTRY_HOST_NAME
- Schema Registry 의 호스트 이름을 지정합니다.
- 이번 챕터에서는
schema-registry
로 지정하겠습니다.
- SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS
- Bootstrap 으로 띄워진 브로커 서버를 지정합니다.
- 일반적으로
브로커 서비스 이름 : 브로커 서비스 내부 포트
형식으로 작성합니다. - 이번 챕터에서는
broker:29092
을 사용하겠습니다.
- SCHEMA_REGISTRY_LISTENERS
- 외부에서 접속할 리스너를 설정합니다.
- 이번 챕터에서는
http://schema-registry:8081
으로 설정하겠습니다.
2.3 Connect
다음으로 Connect 를 생성하는 코드에 대해 알아보겠습니다.
Connect 의 경우 이미지를 build 하기 위한 Dockerfile 이 필요합니다. Dockerfile 은 아래와 같이 작성할 수 있습니다.
# connect.Dockerfile
FROM confluentinc/cp-kafka-connect:7.3.0
ENV CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components"
RUN confluent-hub install --no-prompt snowflakeinc/snowflake-kafka-connector:1.5.5 &&\
confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:10.2.2 &&\
confluent-hub install --no-prompt confluentinc/kafka-connect-json-schema-converter:7.3.0
- FROM :
- Base 이미지로
confluentinc/cp-kafka-connect:7.3.0
을 사용하겠습니다.
- Base 이미지로
- ENV :
- Connect 에서는 플러그인의 path 를 설정해줍니다.
- 이번 챕터에서는 base 이미지에 있는
/usr/share/java
path 와/usr/share/confluent-hub-components
path 를 플러그인 path 로 설정합니다. - 합쳐서
/usr/share/java,/usr/share/confluent-hub-components
플러그인 path 로 사용하겠습니다.
- RUN :
- 이번 챕터에서는 JDBC Connector 를 사용할 것이며, PostgreSQL DB 에 접근이 가능한 Connector 를 설치해야합니다.
confluent-hub install --no-prompt snowflakeinc/snowflake-kafka-connector:1.5.5
confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:10.2.2
- 또한 value schema 의 Converter 는 Json Schema Converter 를 사용하겠습니다.
confluent-hub install --no-prompt confluentinc/kafka-connect-json-schema-converter:7.3.0
- 이번 챕터에서는 JDBC Connector 를 사용할 것이며, PostgreSQL DB 에 접근이 가능한 Connector 를 설치해야합니다.
앞서 작성한 Dockerfile 을 이용한 Connect 의 코드는 아래와 같습니다.
version: "3"
services:
connect:
build:
context: .
dockerfile: connect.Dockerfile
container_name: connect
depends_on:
- broker
- schema-registry
ports:
- 8083:8083
environment:
CONNECT_BOOTSTRAP_SERVERS: broker:29092
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_GROUP_ID: docker-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
- build :
- Dockerfile 을 build 하기 위한 경로로
.
을 입력하고, Dockerfile 파일명인connect.Dockerfile
을 입력합니다.
- Dockerfile 을 build 하기 위한 경로로
- container_name :
- 컨테이너 이름은
connect
로 사용하겠습니다.
- 컨테이너 이름은
- depends_on :
- 브로커와 Schema Registry 가 먼저 실행된 다음에 Connect 가 실행되야 합니다.
- ports :
- Connect 의 포트인
8083:8083
를 포트 포워딩합니다.
- Connect 의 포트인
Connect 의 환경 변수는 다음과 같습니다.
- CONNECT_BOOTSTRAP_SERVERS
- Bootstrap 으로 띄워진 브로커 서버를 지정합니다.
- 일반적으로
브로커 서비스 이름 : 브로커 서비스 내부 포트
형식을 사용합니다.
- CONNECT_REST_ADVERTISED_HOST_NAME
- Connect 에서는 REST API 요청에 대한 처리와 Connector 의 등록, 설정, 시작, 종료 등의 처리를 담당하는 Worker 가 존재합니다.
- Worker 간의 연결이 가능하도록 호스트 이름을 지정합니다.
- CONNECT_GROUP_ID
- Connect 의 Worker 프로세스 그룹 (또는 클러스터)를 구성하는 데 사용하는 고유한 ID 를 지정합니다.
- 단, Consumer 그룹 ID 와 충돌하면 안됩니다.
- CONNECT_CONFIG_STORAGE_TOPIC
- Connector 의 환경 설정을 저장할 브로커의 토픽 이름을 설정합니다.
- CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR
- 환경 설정을 저장하는 토픽을 생성할 때 사용할 Replication Factor 의 수를 설정합니다.
- CONNECT_OFFSET_STORAGE_TOPIC
- Connector 의 offset 을 저장할 브로커의 토픽 이름을 설정합니다.
- CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR
- Offset 을 저장하는 토픽을 생성할 때 사용할 Replication Factor 의 수를 설정합니다.
- CONNECT_STATUS_STORAGE_TOPIC
- Connector 와 task 의 상태를 저장할 브로커의 토픽 이름을 설정합니다.
- CONNECT_STATUS_STORAGE_REPLICATION_FACTOR
- 상태를 저장하는 토픽을 생성할 때 사용할 Replication Factor 의 수를 설정합니다.
- CONNECT_KEY_CONVERTER
- Key 에 대한 Converter 를 설정합니다.
- 이번 챕터에서는 String Converter 를 사용하겠습니다.
- CONNECT_VALUE_CONVERTER
- Value 에 대한 Converter 를 설정합니다.
- 이번 챕터에서는 Json Converter 를 사용하겠습니다.
- CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
- Value Converter 에 대한 Schema Registry URL 을 설정합니다.
- 이번 챕터에서는 Schema Registry 의 서비스 이름과 포트인
http://schema-registry:8081
을 기입해줍니다.
2.4 kafka-docker-compose.yaml
모든 서비스를 띄우는 코드는 아래와 같습니다.
# kafka-docker-compose.yaml
version: "3"
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.3.0
container_name: zookeeper
ports:
- 2181:2181
environment:
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_CLIENT_PORT: 2181
broker:
image: confluentinc/cp-kafka:7.3.0
container_name: broker
depends_on:
- zookeeper
ports:
- 9092:9092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
schema-registry:
image: confluentinc/cp-schema-registry:7.3.0
container_name: schema-registry
depends_on:
- broker
ports:
- 8081:8081
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: broker:29092
SCHEMA_REGISTRY_LISTENERS: http://schema-registry:8081
connect:
build:
context: .
dockerfile: connect.Dockerfile
container_name: connect
depends_on:
- broker
- schema-registry
ports:
- 8083:8083
environment:
CONNECT_BOOTSTRAP_SERVERS: broker:29092
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_GROUP_ID: docker-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
networks:
default:
name: mlops-network
external: true
3. 실행 및 확인
3.1 실행
docker compose
명령어를 이용하여 위에서 작성한 서비스들을 생성합니다.
docker compose -p part7-kafka -f kafka-docker-compose.yaml up -d
- -p :
- Project name 은
part7-kafka
로 사용합니다.
- Project name 은
- -f :
- File name 은 위에서 작성한 파일 이름인
kafka-docker-compose.yaml
을 적어줍니다.
- File name 은 위에서 작성한 파일 이름인
3.2 서비스 확인
docker ps
명령어를 이용하여 서비스들이 잘 띄워졌는지 확인합니다.
docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
2fe161195b13 part7-kafka-connect "/etc/confluent/dock…" 41 seconds ago Up 39 seconds (health: starting) 0.0.0.0:8083->8083/tcp, 9092/tcp connect
4b98228b7e77 confluentinc/cp-schema-registry:7.3.0 "/etc/confluent/dock…" 41 seconds ago Up 39 seconds 0.0.0.0:8081->8081/tcp schema-registry
505d3a4d6fdb confluentinc/cp-kafka:7.3.0 "/etc/confluent/dock…" 41 seconds ago Up 40 seconds 0.0.0.0:9092->9092/tcp broker
7f4a6f51837c confluentinc/cp-zookeeper:7.3.0 "/etc/confluent/dock…" 42 seconds ago Up 40 seconds 2888/tcp, 0.0.0.0:2181->2181/tcp, 3888/tcp zookeeper