2) Producer & Consumer
Chapter Preview
목표
- Docker Compose 를 이용하여 주키퍼와 브로커를 생성합니다.
- Producer 와 Consumer 를 실행합니다.
- Producer 에서 메시지를 생성하고, Consumer 에서 메시지를 확인합니다.
스펙 명세서
주키퍼와 브로커를 띄우는 Docker Compose 파일을 작성합니다.
Zookeeper
- Service name :
zookeeper
- Image :
confluentinc/cp-zookeeper:7.3.0
- Container name :
zookeeper
- Port :
2181:2181
- Environment
- ZOOKEEPER_SERVER_ID :
1
- ZOOKEEPER_CLIENT_PORT :
2181
- ZOOKEEPER_SERVER_ID :
- Service name :
Broker
- Service name :
broker
- Image :
confluentinc/cp-kafka:7.3.0
- Container name :
broker
- Port :
9092:9092
- Environment
- KAFKA_BROKER_ID :
1
- KAFKA_ZOOKEEPER_CONNECT :
zookeeper:2181
- KAFKA_ADVERTISED_LISTENERS :
PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP :
PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
- KAFKA_INTER_BROKER_LISTENER_NAME :
PLAINTEXT
- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR :
1
- KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS :
0
- KAFKA_BROKER_ID :
- Service name :
Docker Compose 파일을 실행합니다.
docker compose exec
명령어를 이용하여 토픽을 생성합니다.- Topic name :
topic-test
- Partition count :
1
- Replication Factor count :
1
- Topic name :
터미널에서
docker compose exec
명령어를 이용하여 브로커에 접속한 뒤,kafka-console-consumer
명령어를 이용하여 Consumer 를 실행합니다.- Topic name :
topic-test
- Bootstrap server :
broker:29092
- Topic name :
다른 터미널에서
docker compose exec
명령어를 이용하여 브로커에 접속한 뒤,kafka-console-producer
명령어를 이용하여 Producer 를 실행합니다.- Topic name :
topic-test
- Broker list :
broker:29092
- Topic name :
Producer 에서 메시지를 생성하고, Consumer 에서 메시지를 확인합니다.
해당 파트의 전체 코드는 mlops-for-mle/part7/ 에서 확인할 수 있습니다.
part7
├── Makefile
├── README.md
├── connect.Dockerfile
├── create_table.py
├── kafka-docker-compose.yaml
├── naive-docker-compose.yaml
├── sink_connector.json
├── source_connector.json
├── target-docker-compose.yaml
└── target.Dockerfile
1. Zookeeper & Broker Setup
주키퍼와 브로커를 띄우는 Docker Compose 파일을 작성해보도록 하겠습니다.
1.1 Zookeeper Service
먼저, 주키퍼 서비스를 띄울 때 쓰이는 요소들에 대해 알아보도록 하겠습니다.
version: "3"
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.3.0
container_name: zookeeper
ports:
- 2181:2181
environment:
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_CLIENT_PORT: 2181
- image :
- 주키퍼의 이미지로
confluentinc/cp-zookeeper:7.3.0
를 이용하도록 하겠습니다.
- 주키퍼의 이미지로
- container_name :
- 컨테이너의 이름은
zookeeper
로 사용하겠습니다.
- 컨테이너의 이름은
- ports :
- 주키퍼의 포트인
2181:2181
로 포트 포워딩합니다.
- 주키퍼의 포트인
주키퍼의 환경 변수는 다음과 같습니다.
- ZOOKEEPER_SERVER_ID
- 주키퍼 클러스터에서 해당 주키퍼를 식별할 ID 를 지정합니다.
- 이번 챕터에서는 ID 를
1
로 지정하겠습니다.
- ZOOKEEPER_CLIENT_PORT
- 주키퍼 client 의 포트를 지정합니다.
- 이번 챕터에서는 기본 주키퍼의 포트인
2181
로 지정하겠습니다.
1.2 Broker Service
다음으로 브로커 서비스를 띄울 때 쓰이는 요소들에 대해 알아보도록 하겠습니다. 이번 챕터에서는 단일 브로커를 가정하고 작성해보겠습니다.
version: "3"
services:
broker:
image: confluentinc/cp-kafka:7.3.0
container_name: broker
depends_on:
- zookeeper
ports:
- 9092:9092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
- image :
- 브로커의 이미지로
confluentinc/cp-kafka:7.3.0
를 이용하도록 하겠습니다.
- 브로커의 이미지로
- container_name :
- 컨테이너의 이름은
broker
로 사용하겠습니다.
- 컨테이너의 이름은
- depends_on :
- 주키퍼가 먼저 실행된 후 다음에 브로커가 실행되어야 합니다.
- ports :
- 브로커의 포트인
9092:9092
로 포트 포워딩합니다.
- 브로커의 포트인
브로커의 환경 변수는 다음과 같습니다.
- KAFKA_SERVER_ID
- 브로커의 ID 를 지정합니다.
- 단일 브로커에서는 없어도 무방하나 이번 챕터에서는
1
로 지정하겠습니다.
- KAFKA_ZOOKEEPER_CONNECT
- 브로커가 주키퍼에 연결하기 위한 주소를 지정합니다.
- 일반적으로
주키퍼 서비스 이름 : 주피커 서비스 포트
형식으로 작성합니다. - 앞서 띄운 주키퍼의 이름과 포트인
zookeeper:2181
를 입력합니다.
- KAFKA_ADVERTISED_LISTENERS
- 내부와 외부에서 접속하기 위한 리스너를 설정합니다.
- 일반적으로 internal 과 external 를 같이 설정하며,
,
로 연결해서 작성합니다. - 이번 챕터에서는 internal 로
PLAINTEXT://broker:29092
로 설정하고, external 로PLAINTEXT_HOST://localhost:9092
으로 설정하겠습니다. - 최종적으로
PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
를 입력합니다.
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
- 보안을 위한 protocol mapping 을 설정합니다.
- 이 설정값은 KAFKA_ADVERTISED_LISTENERS 과 함께 key/value 로 매핑됩니다.
- 이번 챕터에서는
PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
로 설정하겠습니다.
- KAFKA_INTER_BROKER_LISTENER_NAME
- 컨테이너 내부에서 사용할 리스너 이름을 지정합니다.
- 이번 챕터에서는 앞서 internal 로 설정했던
PLAINTEXT
를 입력합니다.
- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR
- 토픽을 분산하여 저장할 Replication Factor 를 설정합니다.
- 이번 챕터에서는 단일 브로커를 사용하기 때문에
1
로 지정하겠습니다.
- KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS
- 카프카 클러스터에서 초기에 rebalancing 할 때 Consumer 들이 Consumer group 에 조인할 때 대기하는 시간입니다.
- 이번 챕터에서는
0
으로 설정해줍니다.
1.3 전체 코드
최종적인 Docker Compose 구성은 아래와 같습니다.
# naive-docker-compose.yaml
version: "3"
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.3.0
container_name: zookeeper
ports:
- 2181:2181
environment:
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_CLIENT_PORT: 2181
broker:
image: confluentinc/cp-kafka:7.3.0
container_name: broker
depends_on:
- zookeeper
ports:
- 9092:9092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
1.4 실행
아래 명령어를 활용해 정의된 서비스를 백그라운드 프로세스로 띄웁니다.
docker compose -p part7-naive -f naive-docker-compose.yaml up -d
- -p :
- Project name 은
part7-naive
로 사용합니다.
- Project name 은
- -f :
- File name 은 위에서 작성한 파일 이름인
naive-docker-compose.yaml
을 적어줍니다.
- File name 은 위에서 작성한 파일 이름인
docker ps
를 입력하여 잘 띄워졌는지 확인합니다.
docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
0a0599db4d96 confluentinc/cp-kafka:7.3.0 "/etc/confluent/dock…" 28 seconds ago Up 27 seconds 0.0.0.0:9092->9092/tcp broker
35cdc624ac34 confluentinc/cp-zookeeper:7.3.0 "/etc/confluent/dock…" 28 seconds ago Up 27 seconds 2888/tcp, 0.0.0.0:2181->2181/tcp, 3888/tcp zookeeper
2. Producer & Consumer Setup
이번에는 토픽을 생성해보고, 이어서 Producer 와 Consumer 를 생성해보도록 하겠습니다.
2.1 Topic
2.1.1 Topic 생성
이제 토픽을 생성해보겠습니다.
docker compose -p part7-naive exec broker kafka-topics --create --topic topic-test --bootstrap-server broker:29092 --partitions 1 --replication-factor 1
- docker compose exec :
- 컨테이너 내에 명령어를 수행합니다.
- broker :
- 생성된 브로커 서비스의 이름을 적습니다.
- kafka-topics :
- 토픽에 대한 명령을 실행합니다.
- --create :
- 토픽을 생성합니다.
- --topic :
- 생성할 토픽의 이름을 지정합니다.
- 이번 챕터에서는
topic-test
라는 이름으로 생성해보겠습니다.
- --bootstrap-server :
- 브로커 서비스에 대한 호스트 이름과 포트를 지정합니다.
- 이번 챕터에서는 앞서 Docker Compose 로 띄웠던 브로커의 환경 변수를 참고하여
broker:29092
로 설정합니다.
- --partition :
- 토픽 내에 파티션 개수를 설정합니다.
- 이번 챕터에서는
1
로 설정하겠습니다.
- --replication-factor :
- Replication Factor 를 지정합니다.
- 이번 챕터에서는
1
로 설정하겠습니다.
2.1.1 Topic 생성 확인
이어서 토픽이 잘 생성되었는지 확인해보도록 하겠습니다.
docker compose -p part7-naive exec broker kafka-topics --describe --topic topic-test --bootstrap-server broker:29092
- --describe :
- 생성된 토픽에 대한 상세 설명을 보여줍니다.
실행하면 다음과 같이 출력됩니다.
Topic: topic-test TopicId: pcR8cByuSVG1p1guQ3MFRg PartitionCount: 1 ReplicationFactor: 1 Configs:
Topic: topic-test Partition: 0 Leader: 1 Replicas: 1 Isr: 1
위에서 입력한 topic-test
토픽이 생성된 것을 확인할 수 있습니다.
2.2 Consumer
이제 토픽을 생성했으니 생성한 토픽을 사용할 Consumer 를 만들어 보겠습니다.
Consumer 를 먼저 실행하는 이유는 일반적으로 Consumer 가 메시지를 subscribe 하려고 대기하는 상태에서 Producer 가 메시지를 생성해서 보내기 때문입니다.
2.2.1 Broker Container 접속
먼저 docker compose exec
명령어를 통해 컨테이너 내부로 접속합니다.
# terminal 1
docker compose -p part7-naive exec broker /bin/bash
컨테이너에 접속하면 다음과 같이 터미널이 열립니다.
[appuser@0a0599db4d96 ~]$
2.2.2 Consumer 실행
이후에 kafka-console-consumer
를 이용하여 topic-test
토픽을 subscribe 합니다.
kafka-console-consumer --topic topic-test --bootstrap-server broker:29092
- 토픽을 생성했던 것처럼 bootstrap-server 를
broker:29092
로 설정합니다.
실행하면 다음과 같이 수신을 대기하고 있는 상태가 됩니다.
[appuser@0a0599db4d96 ~]$ kafka-console-consumer --topic topic-test --bootstrap-server broker:29092
2.3 Producer
마지막으로 Producer 를 만들어서 메시지를 보낼 준비를 하겠습니다.
2.3.1 Broker Container 접속
Consumer 와 똑같이, docker compose exec
명령어를 통해 컨테이너 내부로 접속합니다.
이 때 Consumer 를 실행했던 터미널이 아닌 새로운 터미널로 띄웁니다.
# terminal 2
docker compose -p part7-naive exec broker /bin/bash
접속되면 다음과 같이 터미널이 열립니다.
[appuser@0a0599db4d96 ~]$
2.3.2 Producer 실행
이후에 kafka-console-producer
를 이용하여 topic-test
토픽에 접근하여 publish 할 준비를 합니다.
kafka-console-producer --topic topic-test --broker-list broker:29092
명령어를 실행하면 다음과 같이 publish 할 수 있는 상태가 됩니다.
[appuser@0a0599db4d96 ~]$ kafka-console-producer --topic topic-test --broker-list broker:29092
>
2.4 Communicate
2.4.1 Producer 에 메시지 입력하기
Producer 가 열려 있는 두 번째 터미널에서 아래와 같이 hello
, this is mlops for mle
를 입력해봅니다.
[appuser@0a0599db4d96 ~]$ kafka-console-producer --topic topic-test --broker-list broker:29092
> hello
> this is mlops for mle
2.4.2 Conumer 에서 메시지 결과 보기
Consumer 에서 아래의 메시지가 전달됨을 확인할 수 있습니다.
hello
this is mlops for mle
2.5 종료
모든 과정이 종료되었으면 주키퍼와 브로커를 종료합니다.
docker compose -p part7-naive down -v