Skip to main content

4) Kafka System

Chapter Preview


목표

  1. Docker Compose 를 이용하여 Zookeeper, Broker, Schema Registry, Connect 를 생성합니다.

스펙 명세서

  1. Zookeeper, Broker, Schema Registry, Connect 를 생성하는 Docker Compose 파일을 작성합니다.

    • Zookeeper

      • Service name : zookeeper
      • Image : confluentinc/cp-zookeeper:7.3.0
      • Container name : zookeeper
      • Port : 2181:2181
      • Environment
        • ZOOKEEPER_SERVER_ID : 1
        • ZOOKEEPER_CLIENT_PORT : 2181
    • Broker

      • Service name : broker
      • Image : confluentinc/cp-kafka:7.3.0
      • Container name : broker
      • Port : 9092:9092
      • Environment
        • KAFKA_BROKER_ID : 1
        • KAFKA_ZOOKEEPER_CONNECT : zookeeper:2181
        • KAFKA_ADVERTISED_LISTENERS : PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
        • KAFKA_LISTENER_SECURITY_PROTOCOL_MAP : PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
        • KAFKA_INTER_BROKER_LISTENER_NAME : PLAINTEXT
        • KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR : 1
        • KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS : 0
    • Schema-registry

      • Service name : schema-registry
      • Image : confluentinc/cp-schema-registry:7.3.0
      • Container name : schema-registry
      • Port : 8081:8081
      • Environment
        • SCHEMA_REGISTRY_HOST_NAME : schema-registry
        • SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS : broker:29092
        • SCHEMA_REGISTRY_LISTENERS : http://schema-registry:8081
    • Connect

      • Service name : connect
      • Image : confluentinc/cp-connect:7.3.0 를 이용한 Dockerfile
        • Environment- CONNECT_PLUGIN_PATH : “/usr/share/java,/usr/share/confluent-hub-components”
        • Run
          • confluent-hub install --no-prompt snowflakeinc/snowflake-kafka-connector:1.5.5
          • confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:10.2.2
          • confluent-hub install --no-prompt confluentinc/kafka-connect-json-schema-converter:7.3.0
      • Container name : connect
      • Port : 8083:8083
      • Environment
        • CONNECT_BOOTSTRAP_SERVERS : schema-registry
        • CONNECT_BOOTSTRAP_SERVERS : broker:29092
        • CONNECT_REST_ADVERTISED_HOST_NAME : connect
        • CONNECT_GROUP_ID : docker-connect-group
        • CONNECT_CONFIG_STORAGE_TOPIC : docker-connect-configs
        • CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR : 1
        • CONNECT_OFFSET_STORAGE_TOPIC : docker-connect-offsets
        • CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR : 1
        • CONNECT_STATUS_STORAGE_TOPIC : docker-connect-status
        • CONNECT_STATUS_STORAGE_REPLICATION_FACTOR : 1
        • CONNECT_KEY_CONVERTER : org.apache.kafka.connect.storage.StringConverter
        • CONNECT_VALUE_CONVERTER : org.apache.kafka.connect.json.JsonConverter
        • CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL : http://schema-registry:8081
  2. Docker Compose 파일을 실행합니다.

  3. docker ps 명령어를 이용하여 잘 띄워졌는지 확인합니다.

https://github.com/mlops-for-mle/mlops-for-mle/tree/main/part7

해당 파트의 전체 코드는 mlops-for-mle/part7/ 에서 확인할 수 있습니다.

part7
├── Makefile
├── README.md
├── connect.Dockerfile
├── create_table.py
├── kafka-docker-compose.yaml
├── naive-docker-compose.yaml
├── sink_connector.json
├── source_connector.json
├── target-docker-compose.yaml
└── target.Dockerfile

0. 환경 설정

caution

📌 이번 챕터를 진행하기 위해서는 위해서 앞서 2) Producer & Consumer 챕터에서 실행된 Docker Compose 를 종료해야 합니다.

아래 명령어를 통해 종료합니다.

docker compose -p part7-naive down -v

1. Architecture

[그림 7-13]은 이번 실습에서 다룰 서비스들의 다이어그램입니다.

[그림7-13] Kafka System

각각의 서비스에 대해 알아보겠습니다.

  • Zookeeper : 브로커 서버의 상태 감지를 위해 사용되는 주키퍼 서버입니다.
  • Broker : Source Connector 에서 데이터를 받아 토픽에 저장하고, Sink Connector 로 데이터를 넘겨줄 브로커 서버입니다. 이번 챕터에서는 다중 브로커가 아닌 단일 브로커를 사용합니다.
  • Schema Registry : 메시지의 schema 를 저장하기 위한 Schema Registry 서버입니다.
  • Connect : Connector 를 띄우기 위한 Connect 서버입니다.

이제 서비스를 하나씩 구체적으로 살펴보겠습니다.

2. Kafka System

2.1 Zookeeper & Broker

주키퍼와 브로커를 띄우는 코드는 2) Producer & Consumer 챕터에서 작성한 내용을 사용합니다.

kafka-docker-compose.yaml
version: "3"

services:
zookeeper:
image: confluentinc/cp-zookeeper:7.3.0
container_name: zookeeper
ports:
- 2181:2181
environment:
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_CLIENT_PORT: 2181
broker:
image: confluentinc/cp-kafka:7.3.0
container_name: broker
depends_on:
- zookeeper
ports:
- 9092:9092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0

2.2 Schema Registry

다음으로 Schema Registry 를 띄우는 코드에 대해 알아보겠습니다.

kafka-docker-compose.yaml
version: "3"

services:
schema-registry:
image: confluentinc/cp-schema-registry:7.3.0
container_name: schema-registry
depends_on:
- broker
ports:
- 8081:8081
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: broker:29092
SCHEMA_REGISTRY_LISTENERS: http://schema-registry:8081
  • image :
    • Schema Registry 의 이미지로 confluentinc/cp-schema-registry:7.3.0 를 이용합니다.
  • container_name :
    • 컨테이너의 이름은 schema-registry 로 사용하겠습니다.
  • depends_on :
    • broker 가 먼저 실행된 후 Schema Registry 가 실행되어야 합니다.
  • ports :
    • Schema Registry 의 포트인 8081:8081 을 포트 포워딩합니다.

Schema Registry 의 환경 변수는 다음과 같습니다.

  • SCHEMA_REGISTRY_HOST_NAME
    • Schema Registry 의 호스트 이름을 지정합니다.
    • 이번 챕터에서는 schema-registry 로 지정하겠습니다.
  • SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS
    • Bootstrap 으로 띄워진 브로커 서버를 지정합니다.
    • 일반적으로 브로커 서비스 이름 : 브로커 서비스 내부 포트 형식으로 작성합니다.
    • 이번 챕터에서는 broker:29092 을 사용하겠습니다.
  • SCHEMA_REGISTRY_LISTENERS
    • 외부에서 접속할 리스너를 설정합니다.
    • 이번 챕터에서는 http://schema-registry:8081 으로 설정하겠습니다.

2.3 Connect

다음으로 Connect 를 생성하는 코드에 대해 알아보겠습니다.

Connect 의 경우 이미지를 build 하기 위한 Dockerfile 이 필요합니다. Dockerfile 은 아래와 같이 작성할 수 있습니다.

connect.Dockerfile
# connect.Dockerfile
FROM confluentinc/cp-kafka-connect:7.3.0

ENV CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components"

RUN confluent-hub install --no-prompt snowflakeinc/snowflake-kafka-connector:1.5.5 &&\
confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:10.2.2 &&\
confluent-hub install --no-prompt confluentinc/kafka-connect-json-schema-converter:7.3.0
  • FROM :
    • Base 이미지로 confluentinc/cp-kafka-connect:7.3.0 을 사용하겠습니다.
  • ENV :
    • Connect 에서는 플러그인의 path 를 설정해줍니다.
    • 이번 챕터에서는 base 이미지에 있는 /usr/share/java path 와 /usr/share/confluent-hub-components path 를 플러그인 path 로 설정합니다.
    • 합쳐서 /usr/share/java,/usr/share/confluent-hub-components 플러그인 path 로 사용하겠습니다.
  • RUN :
    • 이번 챕터에서는 JDBC Connector 를 사용할 것이며, PostgreSQL DB 에 접근이 가능한 Connector 를 설치해야합니다.
      • confluent-hub install --no-prompt snowflakeinc/snowflake-kafka-connector:1.5.5
      • confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:10.2.2
    • 또한 value schema 의 Converter 는 Json Schema Converter 를 사용하겠습니다.
      • confluent-hub install --no-prompt confluentinc/kafka-connect-json-schema-converter:7.3.0

앞서 작성한 Dockerfile 을 이용한 Connect 의 코드는 아래와 같습니다.

kafka-docker-compose.yaml
version: "3"

services:
connect:
build:
context: .
dockerfile: connect.Dockerfile
container_name: connect
depends_on:
- broker
- schema-registry
ports:
- 8083:8083
environment:
CONNECT_BOOTSTRAP_SERVERS: broker:29092
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_GROUP_ID: docker-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
  • build :
    • Dockerfile 을 build 하기 위한 경로로 . 을 입력하고, Dockerfile 파일명인 connect.Dockerfile 을 입력합니다.
  • container_name :
    • 컨테이너 이름은 connect 로 사용하겠습니다.
  • depends_on :
    • 브로커와 Schema Registry 가 먼저 실행된 다음에 Connect 가 실행되야 합니다.
  • ports :
    • Connect 의 포트인 8083:8083 를 포트 포워딩합니다.

Connect 의 환경 변수는 다음과 같습니다.

  • CONNECT_BOOTSTRAP_SERVERS
    • Bootstrap 으로 띄워진 브로커 서버를 지정합니다.
    • 일반적으로 브로커 서비스 이름 : 브로커 서비스 내부 포트 형식을 사용합니다.
  • CONNECT_REST_ADVERTISED_HOST_NAME
    • Connect 에서는 REST API 요청에 대한 처리와 Connector 의 등록, 설정, 시작, 종료 등의 처리를 담당하는 Worker 가 존재합니다.
    • Worker 간의 연결이 가능하도록 호스트 이름을 지정합니다.
  • CONNECT_GROUP_ID
    • Connect 의 Worker 프로세스 그룹 (또는 클러스터)를 구성하는 데 사용하는 고유한 ID 를 지정합니다.
    • 단, Consumer 그룹 ID 와 충돌하면 안됩니다.
  • CONNECT_CONFIG_STORAGE_TOPIC
    • Connector 의 환경 설정을 저장할 브로커의 토픽 이름을 설정합니다.
  • CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR
    • 환경 설정을 저장하는 토픽을 생성할 때 사용할 Replication Factor 의 수를 설정합니다.
  • CONNECT_OFFSET_STORAGE_TOPIC
    • Connector 의 offset 을 저장할 브로커의 토픽 이름을 설정합니다.
  • CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR
    • Offset 을 저장하는 토픽을 생성할 때 사용할 Replication Factor 의 수를 설정합니다.
  • CONNECT_STATUS_STORAGE_TOPIC
    • Connector 와 task 의 상태를 저장할 브로커의 토픽 이름을 설정합니다.
  • CONNECT_STATUS_STORAGE_REPLICATION_FACTOR
    • 상태를 저장하는 토픽을 생성할 때 사용할 Replication Factor 의 수를 설정합니다.
  • CONNECT_KEY_CONVERTER
    • Key 에 대한 Converter 를 설정합니다.
    • 이번 챕터에서는 String Converter 를 사용하겠습니다.
  • CONNECT_VALUE_CONVERTER
    • Value 에 대한 Converter 를 설정합니다.
    • 이번 챕터에서는 Json Converter 를 사용하겠습니다.
  • CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
    • Value Converter 에 대한 Schema Registry URL 을 설정합니다.
    • 이번 챕터에서는 Schema Registry 의 서비스 이름과 포트인 http://schema-registry:8081 을 기입해줍니다.

2.4 kafka-docker-compose.yaml

모든 서비스를 띄우는 코드는 아래와 같습니다.

# kafka-docker-compose.yaml
version: "3"

services:
zookeeper:
image: confluentinc/cp-zookeeper:7.3.0
container_name: zookeeper
ports:
- 2181:2181
environment:
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_CLIENT_PORT: 2181

broker:
image: confluentinc/cp-kafka:7.3.0
container_name: broker
depends_on:
- zookeeper
ports:
- 9092:9092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0

schema-registry:
image: confluentinc/cp-schema-registry:7.3.0
container_name: schema-registry
depends_on:
- broker
ports:
- 8081:8081
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: broker:29092
SCHEMA_REGISTRY_LISTENERS: http://schema-registry:8081

connect:
build:
context: .
dockerfile: connect.Dockerfile
container_name: connect
depends_on:
- broker
- schema-registry
ports:
- 8083:8083
environment:
CONNECT_BOOTSTRAP_SERVERS: broker:29092
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_GROUP_ID: docker-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081

networks:
default:
name: mlops-network
external: true

3. 실행 및 확인

3.1 실행

docker compose 명령어를 이용하여 위에서 작성한 서비스들을 생성합니다.

docker compose -p part7-kafka -f kafka-docker-compose.yaml up -d
  • -p :
    • Project name 은 part7-kafka 로 사용합니다.
  • -f :
    • File name 은 위에서 작성한 파일 이름인 kafka-docker-compose.yaml 을 적어줍니다.

3.2 서비스 확인

docker ps 명령어를 이용하여 서비스들이 잘 띄워졌는지 확인합니다.

docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
2fe161195b13 part7-kafka-connect "/etc/confluent/dock…" 41 seconds ago Up 39 seconds (health: starting) 0.0.0.0:8083->8083/tcp, 9092/tcp connect
4b98228b7e77 confluentinc/cp-schema-registry:7.3.0 "/etc/confluent/dock…" 41 seconds ago Up 39 seconds 0.0.0.0:8081->8081/tcp schema-registry
505d3a4d6fdb confluentinc/cp-kafka:7.3.0 "/etc/confluent/dock…" 41 seconds ago Up 40 seconds 0.0.0.0:9092->9092/tcp broker
7f4a6f51837c confluentinc/cp-zookeeper:7.3.0 "/etc/confluent/dock…" 42 seconds ago Up 40 seconds 2888/tcp, 0.0.0.0:2181->2181/tcp, 3888/tcp zookeeper