4) Data Insertion Loop
https://github.com/mlops-for-mle/mlops-for-mle/tree/main/part1
해당 파트의 전체 코드는 mlops-for-mle/part1/ 에서 확인할 수 있습니다.
part1
├── Dockerfile
├── Makefile
├── data_generator.py
├── data_insertion.py
├── data_insertion_loop.py
├── docker-compose.yaml
└── table_creator.py
1. 데이터 생성
1.1 Loop 추가
3) Data Insertion 챕터에서 작성한data_insertion.py
스크립트를 참고하여 계속해서 데이터를 추가하는 data_insertion_loop.py
를 작성해보겠습니다. 여기서는 추가적으로 while 문을 사용하여 코드를 작성하겠습니다. while True
를 통해 외부의 개입이 없다면 계속해서 while 문을 실행할 수 있습니다.def generate_data(db_connect, df):
while True:
insert_data(db_connect, df.sample(1).squeeze())
다만 위와 같이 작성할 경우 너무 빠른 시간에 데이터가 추가되기 때문에 DB 에 부하가 생길 수 있습니다. 부하를 방지하기 위해서는 데이터를 삽입 후 잠시 대기하는 시간을 추가하면 됩니다.
Python 의 time
패키지의 sleep
함수를 이용하여 데이터를 삽입하고 나서 다음 데이터를 삽입하기 전에 1초동안 대기하도록 다음과 같이 코드를 작성합니다.
import time
def generate_data(db_connect, df):
while True:
insert_data(db_connect, df.sample(1).squeeze())
time.sleep(1)
1.2 Query 실행
1.2.1 data_insertion_loop.py
3) Data Insertion 챕터에서 작성한 내용들과 이번 챕터에서 설명한 내용을 모아서 data_insertion_loop.py
로 작성합니다.data_insertion_loop.py
# data_insertion_loop.py
import time
import pandas as pd
import psycopg2
from sklearn.datasets import load_iris
def get_data():
X, y = load_iris(return_X_y=True, as_frame=True)
df = pd.concat([X, y], axis="columns")
rename_rule = {
"sepal length (cm)": "sepal_length",
"sepal width (cm)": "sepal_width",
"petal length (cm)": "petal_length",
"petal width (cm)": "petal_width",
}
df = df.rename(columns=rename_rule)
return df
def insert_data(db_connect, data):
insert_row_query = f"""
INSERT INTO iris_data
(timestamp, sepal_length, sepal_width, petal_length, petal_width, target)
VALUES (
NOW(),
{data.sepal_length},
{data.sepal_width},
{data.petal_length},
{data.petal_width},
{data.target}
);
"""
print(insert_row_query)
with db_connect.cursor() as cur:
cur.execute(insert_row_query)
db_connect.commit()
def generate_data(db_connect, df):
while True:
insert_data(db_connect, df.sample(1).squeeze())
time.sleep(1)
if __name__ == "__main__":
db_connect = psycopg2.connect(
user="myuser",
password="mypassword",
host="localhost",
port=5432,
database="mydatabase",
)
df = get_data()
generate_data(db_connect, df)
1.2.2 실행
위에서 작성한 스크립트를 실행하면, 다음과 같이 출력되는 것을 확인할 수 있습니다.
python data_insertion_loop.py
INSERT INTO iris_data
(timestamp, sepal_length, sepal_width, petal_length, petal_width, target)
VALUES (
NOW(),
6.5,
2.8,
4.6,
1.5,
1.0
);
INSERT INTO iris_data
(timestamp, sepal_length, sepal_width, petal_length, petal_width, target)
VALUES (
NOW(),
5.5,
4.2,
1.4,
0.2,
0.0
);
INSERT INTO iris_data
(timestamp, sepal_length, sepal_width, petal_length, petal_width, target)
VALUES (
NOW(),
4.4,
2.9,
1.4,
0.2,
0.0
);
2. 데이터 확인
psql
을 이용하여 DB 에 접속하고, 계속해서 데이터가 삽입되고 있는지 확인해보겠습니다.
psql
을 이용하여 DB 에 접속합니다.PGPASSWORD=mypassword psql -h localhost -p 5432 -U myuser -d mydatabase
psql (14.3, server 14.0 (Debian 14.0-1.pgdg110+1))
Type "help" for help.
mydatabase=#- 3) Data Insertion 챕터에서 작성한
iris_data
테이블에 있는 데이터 전체를 확인하기 위한 query 를 실행합니다.mydatabase=# select * from iris_data;
id | sepal_length | sepal_width | petal_length | petal_width | target
----+--------------+-------------+--------------+-------------+--------
1 | 5.6 | 3 | 4.5 | 1.5 | 1
2 | 5.9 | 3 | 5.1 | 1.8 | 2
3 | 5.5 | 2.4 | 3.8 | 1.1 | 1
4 | 5.4 | 3.9 | 1.3 | 0.4 | 0
(4 rows)계속해서 데이터가 추가되고 있는 것을 확인할 수 있습니다.