Skip to main content

1) Stream Serving

Chapter Preview


목표

  1. Stream Serving 을 위한 Data Subscriber 코드를 작성합니다.
  2. Docker Compose 를 이용하여 Data Subcriber 를 생성합니다.
  3. Target DB 에 접속하여 예측값이 잘 쌓이고 있는지 확인합니다.

스펙 명세서

  1. Stream Serving 을 위한 Data Subscriber 코드를 작성합니다.

      1. psycopg2 패키지를 이용하여 예측값을 저장할 iris_prediction 테이블을 Target DB 서버에 생성합니다.
      1. kafka-python 패키지를 이용하여 07. Kafka 파트에서 생성한 토픽에서 데이터를 가져올 수 있는 Consumer 를 구현합니다.
        • Topic name : postgres-source-iris_data
        • bootstrap_servers : broker:29092
        • auto_offset_reset : earliest
        • group_id : iris-data-consumer-group
        • value_deserializer : lambda x: loads(x)
      1. requests 패키지를 이용하여 API 에 모델의 예측을 요청하고 예측값을 받습니다.
      1. 결과값을 iris_prediction 테이블에 삽입합니다.
  2. Docker Compose 를 이용하여 Data Subscriber 를 생성합니다.

  3. Target DB 에 접속하여 실제로 예측값이 잘 쌓이고 있는지 확인합니다.

https://github.com/mlops-for-mle/mlops-for-mle/tree/main/part8

해당 파트의 전체 코드는 mlops-for-mle/part8/ 에서 확인할 수 있습니다.

part8
├── Dockerfile
├── Makefile
├── README.md
├── data_subscriber.py
├── grafana-docker-compose.yaml
└── stream-docker-compose.yaml

0. 패키지 설치

이번 챕터에서 사용할 패키지들입니다.

pip install kafka-python requests psycopg2-binary
  • kafka-python :
    • Python 에서 Kafka 를 SDK 형태로 사용하도록 도와주는 Kafka Python Client 패키지입니다.
    • 해당 챕터에서는 Consumer 를 구현할 때 사용합니다.
  • requests :
    • Python 으로 HTTP 통신이 필요한 프로그램을 작성할 때 가장 많이 사용되는 패키지입니다.
    • API 를 호출할 때 사용합니다.

1. Architecture

이번 챕터에서 구현할 서비스들은 [그림 8-2]와 같습니다.

Stream serving flow [그림 8-2] Stream Serving Workflow

07. Kafka 파트에서는 Source Connector 와 Sink Connector 를 생성하여 Source DB 에서 Target DB 로 데이터를 전달하는 과정을 살펴봤습니다.08. Stream 파트에서는 다시 Model Deployment 관점으로 돌아와서 Kafka 를 어떻게 쓸 지 살펴보겠습니다.

이번 파트에서는 07. Kafka 파트와 동일한 Zookeeper, Broker, Connect, Schema Registry 를 사용합니다. 또한 Source Connector 를 사용하여 데이터를 가져오는 과정도 동일합니다. 달라지는 점은 Sink Connector 를 대신해서 직접 Kafka Python SDK 를 이용하여 Consumer 를 구현하는 것입니다.

왜 직접 구현해서 사용해야 할까요?
07. Kafka 파트에서 사용한 Sink Connector 를 살펴 보면, 설정 파일을 통해 생성한 후 자동으로 Sink Connector 가 해당 토픽에 있는 데이터를 읽어서 Target DB 에 전달했습니다. 하지만 08. Stream 파트에서는 06. API serving 파트에서 생성한 API 서버에 request 를 보내고 response 를 받아야합니다. 이 과정에서 Sink Connector 를 쓰려면 토픽에서 데이터를 읽어서 전달할 endpoint 가 필요하지만, API Serving 에서는 수동으로 request 를 보내고 response 를 받기 때문에 Sink Connector 를 사용할 수가 없습니다.

따라서 Source DB 에서 데이터를 받아 API 서버로 요청을 보내고, 모델의 예측 값을 받고, 받은 결과를 Target DB 에 삽입하는 과정을 담당하는 코드가 필요합니다.

이번 챕터에서는 Sink Connector 없이 kafka-python, requests, psycopg2 패키지들을 이용하여 Data Subscriber 를 구축해보겠습니다.

2. Data Subscriber

이번에 구현할 Data Subscriber 의 절차는 다음과 같습니다.

  1. psycopg2 패키지를 이용하여 Target DB 에 접근하여 테이블을 생성합니다.
  2. kafka-python 패키지를 이용하여 브로커의 토픽에 있는 데이터를 읽는 Consumer 를 생성합니다.
  3. requests 패키지를 이용하여 Consumer 를 통해 받은 데이터를 06. API serving 파트에서 띄운 API 서버에 데이터를 보내고 예측값을 받습니다.
  4. psycopg2 패키지를 이용하여 받은 response 를 Target DB 에 삽입합니다.

2.1 Prediction 테이블 생성

먼저, 예측값을 저장할 테이블을 생성합니다. 전반적인 코드는 01. Database 파트와 동일하며 다른 내용은 아래와 같습니다.

data_subscriber.py
import psycopg2

def create_table(db_connect):
create_table_query = """
CREATE TABLE IF NOT EXISTS iris_prediction (
id SERIAL PRIMARY KEY,
timestamp timestamp,
iris_class int
);"""
print(create_table_query)
with db_connect.cursor() as cur:
cur.execute(create_table_query)
db_connect.commit()

if __name__ == "__main__":
db_connect = psycopg2.connect(
user="targetuser",
password="targetpassword",
host="target-postgres-server",
port=5432,
database="targetdatabase",
)
create_table(db_connect)
  • Connection :
    • user : targetuser
    • password : targetpassword
    • host : target-postgres-server
    • port : 5432
    • database : targetdatabase
  • Table name : iris_prediction
  • Schema :
    • id (PK), timestamp (timestamp), iris_class (int)

2.2 Consumer 생성

다음으로, Consumer 를 생성하겠습니다.
kafka-python 패키지를 이용하여 KafkaConsumer 의 인스턴스를 만들겠습니다.

data_subscriber.py
from json import loads
from kafka import KafkaConsumer

consumer = KafkaConsumer(
"postgres-source-iris_data",
bootstrap_servers="broker:29092",
auto_offset_reset="earliest",
group_id="iris-data-consumer-group",
value_deserializer=lambda x: loads(x),
)
  • topics:
    • 데이터를 읽어들이고 싶은 토픽을 설정합니다.
  • bootstrap_servers :
    • Bootstrap 서버로 띄워져있는 브로커의 브로커 서비스 이름 : 브로커 서비스 내부 포트 을 넣습니다.
  • auto_offset_reset :
    • 토픽에 있는 데이터를 어떤 offset 값부터 가져올 지 설정합니다.
    • 2가지 설정이 있으며, earliest 는 가장 초기 offset 값, latest 는 가장 마지막 offset 값입니다.
    • 이번 챕터에서는 첫번째 데이터부터 가져오고 싶기 때문에 earliest 를 작성합니다.
  • group_id :
    • Consumer 그룹을 식별하기 위해 그룹 ID 를 설정합니다.
  • value_deserializer :
    • Source Connector (또는 Produceer) 에서 serialization 된 value 값을 deserialization 할 때 사용할 deserializer 를 설정합니다.
    • 07. Kafka 파트에서는 Connect 를 띄울 때 value converter 로써 Json Converter 를 사용하였습니다. 따라서 데이터는 json 으로 serialization 이 되어있습니다.
    • 이번 챕터에서는 데이터를 읽어서 Json Deserializer 를 이용하여 deserialization 을 해야 하기 때문에 lambda function 과 json 의 loads 를 이용하여 lambda x: loads(x) 를 작성합니다.

이렇게 만들어진 Consumer 인스턴스는 for 문을 이용하여 토픽에 있는 데이터를 실시간으로 계속해서 가져올 수 있습니다.

data_subscriber.py
for msg in consumer:
print(
f"Topic : {msg.topic}\n"
f"Partition : {msg.partition}\n"
f"Offset : {msg.offset}\n"
f"Key : {msg.key}\n"
f"Value : {msg.value}\n",
)
# Topic : postgres-source-iris_data
# Partition : 0
# Offset : 133
# Key : None
# Value : {'schema': {'type': 'struct', 'fields': [{'type': 'int32', 'optional': False, 'field': 'id'}, {'type': 'string', 'optional': True, 'field': 'timestamp'}, {'type': 'double', 'optional': True, 'field': 'sepal_length'}, {'type': 'double', 'optional': True, 'field': 'sepal_width'}, {'type': 'double', 'optional': True, 'field': 'petal_length'}, {'type': 'double', 'optional': True, 'field': 'petal_width'}, {'type': 'int32', 'optional': True, 'field': 'target'}], 'optional': False, 'name': 'iris_data'}, 'payload': {'id': 134, 'timestamp': '2022-12-15 04:49:41.21', 'sepal_length': 6.1, 'sepal_width': 2.8, 'petal_length': 4.0, 'petal_width': 1.3, 'target': 1}}
#
# Topic : postgres-source-iris_data
# Partition : 0
# Offset : 134
# Key : None
# Value : {'schema': {'type': 'struct', 'fields': [{'type': 'int32', 'optional': False, 'field': 'id'}, {'type': 'string', 'optional': True, 'field': 'timestamp'}, {'type': 'double', 'optional': True, 'field': 'sepal_length'}, {'type': 'double', 'optional': True, 'field': 'sepal_width'}, {'type': 'double', 'optional': True, 'field': 'petal_length'}, {'type': 'double', 'optional': True, 'field': 'petal_width'}, {'type': 'int32', 'optional': True, 'field': 'target'}], 'optional': False, 'name': 'iris_data'}, 'payload': {'id': 135, 'timestamp': '2022-12-15 04:49:42.27', 'sepal_length': 6.2, 'sepal_width': 2.9, 'petal_length': 4.3, 'petal_width': 1.3, 'target': 1}}
#
# Topic : postgres-source-iris_data
# Partition : 0
# Offset : 135
# Key : None
# Value : {'schema': {'type': 'struct', 'fields': [{'type': 'int32', 'optional': False, 'field': 'id'}, {'type': 'string', 'optional': True, 'field': 'timestamp'}, {'type': 'double', 'optional': True, 'field': 'sepal_length'}, {'type': 'double', 'optional': True, 'field': 'sepal_width'}, {'type': 'double', 'optional': True, 'field': 'petal_length'}, {'type': 'double', 'optional': True, 'field': 'petal_width'}, {'type': 'int32', 'optional': True, 'field': 'target'}], 'optional': False, 'name': 'iris_data'}, 'payload': {'id': 225, 'timestamp': '2022-12-15 04:51:14.238', 'sepal_length': 6.7, 'sepal_width': 3.1, 'petal_length': 4.4, 'petal_width': 1.4, 'target': 1}}

Print 문과 출력된 형태를 통해 메시지에 있는 topic, partition, offset, key, value 을 볼 수 있습니다. 앞으로 사용할 데이터는 value 에 있는 payload 값입니다.

payload 값의 형태를 보면 아래와 같이 나오는 것을 볼 수 있습니다.

'payload': {'id': 134, 'timestamp': '2022-12-15 04:49:41.21', 'sepal_length': 6.1, 'sepal_width': 2.8, 'petal_length': 4.0, 'petal_width': 1.3, 'target': 1}

2.3 API 호출

다음 과정은 읽어드린 데이터를 06. API Serving 파트에서 생성한 API 서버에 전달하고, 예측값을 받는 것입니다.

2.3.1 Schema 확인

먼저 06. API Serving 파트에서 띄워둔 API 서버의 schema 를 살펴보겠습니다.

schemas.py
from pydantic import BaseModel

class PredictIn(BaseModel):
sepal_length: float
sepal_width: float
petal_length: float
petal_width: float

class PredictOut(BaseModel):
iris_class: int

API 서버에 request 로 보낼 값들에는 sepal_length, sepal_width, petal_length, petal_width column 이 있고, response 로 받는 값들에는 iris_class column 이 있습니다.

2.3.2 API 요청 및 응답

API 서버에 request 로 보낼 값들 중 payload 에서 필요없는 column 들을 아래와 같이 삭제합니다.

data_subscriber.py
msg.value["payload"].pop("id")
msg.value["payload"].pop("target")
ts = msg.value["payload"].pop("timestamp")

timestamp 의 경우, Source DB 에서 나온 timestamp 를 Target DB 에 넣어줄 것이기 때문에 똑같이 삭제는 하되, ts 변수로 할당해줍니다.

이제 requests 패키지에 있는 POST method 를 이용하여 payload 값들을 보내고 response 를 받습니다.

data_subscriber.py
response = requests.post(
url="http://api-with-model:8000/predict",
json=msg.value["payload"],
headers={"Content-Type": "application/json"},
).json()
response["timestamp"] = ts
  • url :
    • API 서버의 endpoint 를 설정합니다.
    • 이번 챕터에서는 API 서버의 호스트 이름과 포트, 그리고 POST method 인 predict 를 합하여 http://api-with-model:8000/predict 로 넣어줍니다.
  • json :
    • request 로 보낼 인자값들을 명시합니다.
    • 이번 챕터에서는 payload 값인 msg.value["payload"] 를 넣어줍니다.
  • headers :
    • Client 에서 Server 로 request 를 보낼 때 부가적인 정보를 전송할 수 있도록 설정합니다.
    • 이번 챕터에서는 보낼 때 json 형식으로 보낼 것이기 때문에 {"Content-Type": "application/json"} header 로 적어줍니다.
  • Response 를 받고 난 뒤에 아까 남겨두었던 ts 변수를 response 에 넣어줍니다.

2.4 Prediction 테이블에 예측값 삽입

마지막으로 01. Database 파트에서 사용했던 insert_data 함수를 이용하여 response 에 담긴 데이터를 Target DB 에 삽입합니다.

data_subscriber.py
def insert_data(db_connect, data):
insert_row_query = f"""
INSERT INTO iris_prediction
(timestamp, iris_class)
VALUES (
'{data["timestamp"]}',
{data["iris_class"]}
);"""
print(insert_row_query)
with db_connect.cursor() as cur:
cur.execute(insert_row_query)
db_connect.commit()

insert_data(db_connect, response)

2.5 data_subscriber.py

앞서 살펴봤던 모든 코드들은 다음과 같습니다.

data_subscriber.py
# data_subscriber.py
from json import loads

import psycopg2
import requests
from kafka import KafkaConsumer


def create_table(db_connect):
create_table_query = """
CREATE TABLE IF NOT EXISTS iris_prediction (
id SERIAL PRIMARY KEY,
timestamp timestamp,
iris_class int
);"""
print(create_table_query)
with db_connect.cursor() as cur:
cur.execute(create_table_query)
db_connect.commit()


def insert_data(db_connect, data):
insert_row_query = f"""
INSERT INTO iris_prediction
(timestamp, iris_class)
VALUES (
'{data["timestamp"]}',
{data["iris_class"]}
);"""
print(insert_row_query)
with db_connect.cursor() as cur:
cur.execute(insert_row_query)
db_connect.commit()


def subscribe_data(db_connect, consumer):
for msg in consumer:
print(
f"Topic : {msg.topic}\n"
f"Partition : {msg.partition}\n"
f"Offset : {msg.offset}\n"
f"Key : {msg.key}\n"
f"Value : {msg.value}\n",
)

msg.value["payload"].pop("id")
msg.value["payload"].pop("target")
ts = msg.value["payload"].pop("timestamp")

response = requests.post(
url="http://api-with-model:8000/predict",
json=msg.value["payload"],
headers={"Content-Type": "application/json"},
).json()
response["timestamp"] = ts
insert_data(db_connect, response)


if __name__ == "__main__":
db_connect = psycopg2.connect(
user="targetuser",
password="targetpassword",
host="target-postgres-server",
port=5432,
database="targetdatabase",
)
create_table(db_connect)

consumer = KafkaConsumer(
"postgres-source-iris_data",
bootstrap_servers="broker:29092",
auto_offset_reset="earliest",
group_id="iris-data-consumer-group",
value_deserializer=lambda x: loads(x),
)
subscribe_data(db_connect, consumer)

3. Docker Compose

3.1 Dockerfile

Data Subscriber 코드를 Docker 에서 실행할 Dockerfile 을 만듭니다.

Dockerfile
FROM amd64/python:3.9-slim

WORKDIR /usr/app

RUN pip install -U pip &&\
pip install psycopg2-binary kafka-python requests

COPY data_subscriber.py data_subscriber.py

ENTRYPOINT ["python", "data_subscriber.py"]

3.2 Docker Compose

Dockerfile 을 이용하여 Docker Compose 파일을 구성하면 아래와 같습니다.

stream-docker-compose.yaml
# stream-docker-compose.yaml
version: "3"

services:
data-subscriber:
build:
context: .
dockerfile: Dockerfile
container_name: data-subscriber

networks:
default:
name: mlops-network
external: true
  • 서비스들을 연결할 Docker Network 는 01. Database 파트에서 생성한 mlops-network 네트워크를 사용합니다.

3.3 실행

docker compose 명령어를 이용하여 Data Subscriber 서비스를 생성합니다.

docker compose -p part8-stream -f stream-docker-compose.yaml up -d
  • -p :
    • Project name 은 part8-stream 로 사용합니다.
  • -f :
    • File name 은 위에서 작성한 파일 이름인 stream-docker-compose.yaml 을 적어줍니다.

3.4 데이터 확인

  1. psql 로 Target DB 에 접속합니다.

    $ PGPASSWORD=targetpassword psql -h localhost -p 5433 -U targetuser -d targetdatabase
    psql (14.6, server 14.0 (Debian 14.0-1.pgdg110+1))
    Type "help" for help.

    targetdatabase=#
  2. Select 문을 작성하여 iris_prediction 테이블에 있는 데이터를 확인합니다.

    targetdatabase=# SELECT * FROM iris_prediction LIMIT 100;
    id | timestamp | iris_class
    -----+-------------------------+------------
    1 | 2022-12-21 23:31:12.705 | 1
    2 | 2022-12-21 23:31:13.804 | 2
    3 | 2022-12-21 23:31:14.815 | 2
    4 | 2022-12-21 23:31:15.828 | 2
    5 | 2022-12-21 23:31:16.835 | 1
    6 | 2022-12-21 23:31:17.848 | 1
    7 | 2022-12-21 23:31:18.854 | 1
    8 | 2022-12-21 23:31:19.863 | 0
    9 | 2022-12-21 23:31:20.875 | 2