Skip to main content

2) Producer & Consumer

Chapter Preview


목표

  1. Docker Compose 를 이용하여 주키퍼와 브로커를 생성합니다.
  2. Producer 와 Consumer 를 실행합니다.
  3. Producer 에서 메시지를 생성하고, Consumer 에서 메시지를 확인합니다.

스펙 명세서

  1. 주키퍼와 브로커를 띄우는 Docker Compose 파일을 작성합니다.

    • Zookeeper

      • Service name : zookeeper
      • Image : confluentinc/cp-zookeeper:7.3.0
      • Container name : zookeeper
      • Port : 2181:2181
      • Environment
        • ZOOKEEPER_SERVER_ID : 1
        • ZOOKEEPER_CLIENT_PORT : 2181
    • Broker

      • Service name : broker
      • Image : confluentinc/cp-kafka:7.3.0
      • Container name : broker
      • Port : 9092:9092
      • Environment
        • KAFKA_BROKER_ID : 1
        • KAFKA_ZOOKEEPER_CONNECT : zookeeper:2181
        • KAFKA_ADVERTISED_LISTENERS : PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
        • KAFKA_LISTENER_SECURITY_PROTOCOL_MAP : PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
        • KAFKA_INTER_BROKER_LISTENER_NAME : PLAINTEXT
        • KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR : 1
        • KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS : 0
  2. Docker Compose 파일을 실행합니다.

  3. docker compose exec 명령어를 이용하여 토픽을 생성합니다.

    • Topic name : topic-test
    • Partition count : 1
    • Replication Factor count : 1
  4. 터미널에서 docker compose exec 명령어를 이용하여 브로커에 접속한 뒤, kafka-console-consumer 명령어를 이용하여 Consumer 를 실행합니다.

    • Topic name : topic-test
    • Bootstrap server : broker:29092
  5. 다른 터미널에서 docker compose exec 명령어를 이용하여 브로커에 접속한 뒤, kafka-console-producer 명령어를 이용하여 Producer 를 실행합니다.

    • Topic name : topic-test
    • Broker list : broker:29092
  6. Producer 에서 메시지를 생성하고, Consumer 에서 메시지를 확인합니다.

https://github.com/mlops-for-mle/mlops-for-mle/tree/main/part7

해당 파트의 전체 코드는 mlops-for-mle/part7/ 에서 확인할 수 있습니다.

part7
├── Makefile
├── README.md
├── connect.Dockerfile
├── create_table.py
├── kafka-docker-compose.yaml
├── naive-docker-compose.yaml
├── sink_connector.json
├── source_connector.json
├── target-docker-compose.yaml
└── target.Dockerfile

1. Zookeeper & Broker Setup

주키퍼와 브로커를 띄우는 Docker Compose 파일을 작성해보도록 하겠습니다.

1.1 Zookeeper Service

먼저, 주키퍼 서비스를 띄울 때 쓰이는 요소들에 대해 알아보도록 하겠습니다.

naive-docker-compose.yaml
version: "3"

services:
zookeeper:
image: confluentinc/cp-zookeeper:7.3.0
container_name: zookeeper
ports:
- 2181:2181
environment:
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_CLIENT_PORT: 2181
  • image :
    • 주키퍼의 이미지로 confluentinc/cp-zookeeper:7.3.0 를 이용하도록 하겠습니다.
  • container_name :
    • 컨테이너의 이름은 zookeeper 로 사용하겠습니다.
  • ports :
    • 주키퍼의 포트인 2181:2181 로 포트 포워딩합니다.

주키퍼의 환경 변수는 다음과 같습니다.

  • ZOOKEEPER_SERVER_ID
    • 주키퍼 클러스터에서 해당 주키퍼를 식별할 ID 를 지정합니다.
    • 이번 챕터에서는 ID 를 1로 지정하겠습니다.
  • ZOOKEEPER_CLIENT_PORT
    • 주키퍼 client 의 포트를 지정합니다.
    • 이번 챕터에서는 기본 주키퍼의 포트인 2181 로 지정하겠습니다.

1.2 Broker Service

다음으로 브로커 서비스를 띄울 때 쓰이는 요소들에 대해 알아보도록 하겠습니다. 이번 챕터에서는 단일 브로커를 가정하고 작성해보겠습니다.

naive-docker-compose.yaml
version: "3"

services:
broker:
image: confluentinc/cp-kafka:7.3.0
container_name: broker
depends_on:
- zookeeper
ports:
- 9092:9092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
  • image :
    • 브로커의 이미지로 confluentinc/cp-kafka:7.3.0 를 이용하도록 하겠습니다.
  • container_name :
    • 컨테이너의 이름은 broker 로 사용하겠습니다.
  • depends_on :
    • 주키퍼가 먼저 실행된 후 다음에 브로커가 실행되어야 합니다.
  • ports :
    • 브로커의 포트인 9092:9092 로 포트 포워딩합니다.

브로커의 환경 변수는 다음과 같습니다.

  • KAFKA_SERVER_ID
    • 브로커의 ID 를 지정합니다.
    • 단일 브로커에서는 없어도 무방하나 이번 챕터에서는 1 로 지정하겠습니다.
  • KAFKA_ZOOKEEPER_CONNECT
    • 브로커가 주키퍼에 연결하기 위한 주소를 지정합니다.
    • 일반적으로 주키퍼 서비스 이름 : 주피커 서비스 포트 형식으로 작성합니다.
    • 앞서 띄운 주키퍼의 이름과 포트인 zookeeper:2181 를 입력합니다.
  • KAFKA_ADVERTISED_LISTENERS
    • 내부와 외부에서 접속하기 위한 리스너를 설정합니다.
    • 일반적으로 internal 과 external 를 같이 설정하며, , 로 연결해서 작성합니다.
    • 이번 챕터에서는 internal 로 PLAINTEXT://broker:29092 로 설정하고, external 로 PLAINTEXT_HOST://localhost:9092 으로 설정하겠습니다.
    • 최종적으로 PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092 를 입력합니다.
  • KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
    • 보안을 위한 protocol mapping 을 설정합니다.
    • 이 설정값은 KAFKA_ADVERTISED_LISTENERS 과 함께 key/value 로 매핑됩니다.
    • 이번 챕터에서는 PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT 로 설정하겠습니다.
  • KAFKA_INTER_BROKER_LISTENER_NAME
    • 컨테이너 내부에서 사용할 리스너 이름을 지정합니다.
    • 이번 챕터에서는 앞서 internal 로 설정했던 PLAINTEXT 를 입력합니다.
  • KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR
    • 토픽을 분산하여 저장할 Replication Factor 를 설정합니다.
    • 이번 챕터에서는 단일 브로커를 사용하기 때문에 1 로 지정하겠습니다.
  • KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS
    • 카프카 클러스터에서 초기에 rebalancing 할 때 Consumer 들이 Consumer group 에 조인할 때 대기하는 시간입니다.
    • 이번 챕터에서는 0 으로 설정해줍니다.

1.3 전체 코드

최종적인 Docker Compose 구성은 아래와 같습니다.

naive-docker-compose.yaml
# naive-docker-compose.yaml
version: "3"

services:
zookeeper:
image: confluentinc/cp-zookeeper:7.3.0
container_name: zookeeper
ports:
- 2181:2181
environment:
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_CLIENT_PORT: 2181
broker:
image: confluentinc/cp-kafka:7.3.0
container_name: broker
depends_on:
- zookeeper
ports:
- 9092:9092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0

1.4 실행

아래 명령어를 활용해 정의된 서비스를 백그라운드 프로세스로 띄웁니다.

docker compose -p part7-naive -f naive-docker-compose.yaml up -d
  • -p :
    • Project name 은 part7-naive 로 사용합니다.
  • -f :
    • File name 은 위에서 작성한 파일 이름인 naive-docker-compose.yaml 을 적어줍니다.

docker ps 를 입력하여 잘 띄워졌는지 확인합니다.

docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
0a0599db4d96 confluentinc/cp-kafka:7.3.0 "/etc/confluent/dock…" 28 seconds ago Up 27 seconds 0.0.0.0:9092->9092/tcp broker
35cdc624ac34 confluentinc/cp-zookeeper:7.3.0 "/etc/confluent/dock…" 28 seconds ago Up 27 seconds 2888/tcp, 0.0.0.0:2181->2181/tcp, 3888/tcp zookeeper

2. Producer & Consumer Setup

이번에는 토픽을 생성해보고, 이어서 Producer 와 Consumer 를 생성해보도록 하겠습니다.

2.1 Topic

2.1.1 Topic 생성

이제 토픽을 생성해보겠습니다.

docker compose -p part7-naive exec broker kafka-topics --create --topic topic-test --bootstrap-server broker:29092 --partitions 1 --replication-factor 1
  • docker compose exec :
    • 컨테이너 내에 명령어를 수행합니다.
  • broker :
    • 생성된 브로커 서비스의 이름을 적습니다.
  • kafka-topics :
    • 토픽에 대한 명령을 실행합니다.
  • --create :
    • 토픽을 생성합니다.
  • --topic :
    • 생성할 토픽의 이름을 지정합니다.
    • 이번 챕터에서는 topic-test 라는 이름으로 생성해보겠습니다.
  • --bootstrap-server :
    • 브로커 서비스에 대한 호스트 이름과 포트를 지정합니다.
    • 이번 챕터에서는 앞서 Docker Compose 로 띄웠던 브로커의 환경 변수를 참고하여 broker:29092 로 설정합니다.
  • --partition :
    • 토픽 내에 파티션 개수를 설정합니다.
    • 이번 챕터에서는 1 로 설정하겠습니다.
  • --replication-factor :
    • Replication Factor 를 지정합니다.
    • 이번 챕터에서는 1 로 설정하겠습니다.

2.1.1 Topic 생성 확인

이어서 토픽이 잘 생성되었는지 확인해보도록 하겠습니다.

docker compose -p part7-naive exec broker kafka-topics --describe --topic topic-test --bootstrap-server broker:29092
  • --describe :
    • 생성된 토픽에 대한 상세 설명을 보여줍니다.

실행하면 다음과 같이 출력됩니다.

Topic: topic-test       TopicId: pcR8cByuSVG1p1guQ3MFRg PartitionCount: 1       ReplicationFactor: 1   Configs: 
Topic: topic-test Partition: 0 Leader: 1 Replicas: 1 Isr: 1

위에서 입력한 topic-test 토픽이 생성된 것을 확인할 수 있습니다.

2.2 Consumer

이제 토픽을 생성했으니 생성한 토픽을 사용할 Consumer 를 만들어 보겠습니다.
Consumer 를 먼저 실행하는 이유는 일반적으로 Consumer 가 메시지를 subscribe 하려고 대기하는 상태에서 Producer 가 메시지를 생성해서 보내기 때문입니다.

2.2.1 Broker Container 접속

먼저 docker compose exec 명령어를 통해 컨테이너 내부로 접속합니다.

# terminal 1
docker compose -p part7-naive exec broker /bin/bash

컨테이너에 접속하면 다음과 같이 터미널이 열립니다.

[appuser@0a0599db4d96 ~]$

2.2.2 Consumer 실행

이후에 kafka-console-consumer 를 이용하여 topic-test 토픽을 subscribe 합니다.

[appuser@0a0599db4d96 ~]$
kafka-console-consumer --topic topic-test --bootstrap-server broker:29092
  • 토픽을 생성했던 것처럼 bootstrap-server 를 broker:29092 로 설정합니다.

실행하면 다음과 같이 수신을 대기하고 있는 상태가 됩니다.

[appuser@0a0599db4d96 ~]$ kafka-console-consumer --topic topic-test --bootstrap-server broker:29092

2.3 Producer

마지막으로 Producer 를 만들어서 메시지를 보낼 준비를 하겠습니다.

2.3.1 Broker Container 접속

Consumer 와 똑같이, docker compose exec 명령어를 통해 컨테이너 내부로 접속합니다.

caution

이 때 Consumer 를 실행했던 터미널이 아닌 새로운 터미널로 띄웁니다.

# terminal 2
docker compose -p part7-naive exec broker /bin/bash

접속되면 다음과 같이 터미널이 열립니다.

[appuser@0a0599db4d96 ~]$ 

2.3.2 Producer 실행

이후에 kafka-console-producer 를 이용하여 topic-test 토픽에 접근하여 publish 할 준비를 합니다.

[appuser@0a0599db4d96 ~]$
kafka-console-producer --topic topic-test --broker-list broker:29092

명령어를 실행하면 다음과 같이 publish 할 수 있는 상태가 됩니다.

[appuser@0a0599db4d96 ~]$ kafka-console-producer --topic topic-test --broker-list broker:29092
>

2.4 Communicate

communicate

2.4.1 Producer 에 메시지 입력하기

Producer 가 열려 있는 두 번째 터미널에서 아래와 같이 hello , this is mlops for mle 를 입력해봅니다.

[appuser@0a0599db4d96 ~]$ kafka-console-producer --topic topic-test --broker-list broker:29092
> hello
> this is mlops for mle

2.4.2 Conumer 에서 메시지 결과 보기

Consumer 에서 아래의 메시지가 전달됨을 확인할 수 있습니다.

hello
this is mlops for mle

2.5 종료

모든 과정이 종료되었으면 주키퍼와 브로커를 종료합니다.

docker compose -p part7-naive down -v