[이제와서 시작하는 Python 마스터하기 #12] 데이터베이스 연동하기
🚀 실전 예제로 시작하기
👥 한국 회사 사용자 관리 시스템
실제 한국 회사에서 사용할 수 있는 사용자 관리 시스템을 SQLite로 구현해보겠습니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
import sqlite3
import hashlib
import secrets
from datetime import datetime, timedelta
from contextlib import contextmanager
from typing import Optional, Dict, List
import re
class KoreanUserManagement:
"""한국 회사 사용자 관리 시스템"""
def __init__(self, db_path: str = "company_users.db"):
self.db_path = db_path
self.init_database()
def init_database(self):
"""데이터베이스 및 테이블 초기화"""
with self.get_db_connection() as conn:
cursor = conn.cursor()
# 사용자 테이블
cursor.execute("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
employee_id TEXT UNIQUE NOT NULL,
name TEXT NOT NULL,
email TEXT UNIQUE NOT NULL,
phone TEXT,
department TEXT NOT NULL,
position TEXT NOT NULL,
password_hash TEXT NOT NULL,
salt TEXT NOT NULL,
join_date DATE NOT NULL,
status TEXT DEFAULT 'active' CHECK(status IN ('active', 'inactive', 'suspended')),
last_login TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# 부서 테이블
cursor.execute("""
CREATE TABLE IF NOT EXISTS departments (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT UNIQUE NOT NULL,
description TEXT,
manager_id INTEGER,
budget INTEGER DEFAULT 0,
FOREIGN KEY (manager_id) REFERENCES users (id)
)
""")
# 로그인 이력 테이블
cursor.execute("""
CREATE TABLE IF NOT EXISTS login_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
login_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
ip_address TEXT,
user_agent TEXT,
success BOOLEAN NOT NULL,
FOREIGN KEY (user_id) REFERENCES users (id)
)
""")
# 기본 부서 데이터 삽입
departments = [
('개발팀', 'IT 시스템 개발 및 유지보수'),
('인사팀', '직원 관리 및 채용'),
('마케팅팀', '제품 홍보 및 마케팅 전략'),
('영업팀', '고객 관리 및 영업'),
('경영지원팀', '회계 및 행정 업무')
]
cursor.executemany("""
INSERT OR IGNORE INTO departments (name, description)
VALUES (?, ?)
""", departments)
# 인덱스 생성
cursor.execute("CREATE INDEX IF NOT EXISTS idx_employee_id ON users(employee_id)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_department ON users(department)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_login_time ON login_history(login_time)")
conn.commit()
@contextmanager
def get_db_connection(self):
"""데이터베이스 연결 컨텍스트 관리자"""
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row # 딕셔너리 형태로 반환
try:
yield conn
finally:
conn.close()
def _hash_password(self, password: str, salt: str = None) -> tuple:
"""비밀번호 해시화"""
if salt is None:
salt = secrets.token_hex(32)
pwd_hash = hashlib.pbkdf2_hmac(
'sha256',
password.encode('utf-8'),
salt.encode('utf-8'),
100000 # iterations
).hex()
return pwd_hash, salt
def _validate_korean_data(self, name: str, phone: str = None) -> bool:
"""한국 데이터 유효성 검증"""
# 한글 이름 검증
korean_name_pattern = r'^[가-힣]{2,5}$'
if not re.match(korean_name_pattern, name):
raise ValueError("이름은 2-5자의 한글이어야 합니다")
# 한국 전화번호 검증
if phone:
phone_pattern = r'^(010|011|016|017|018|019)-?\d{3,4}-?\d{4}$'
if not re.match(phone_pattern, phone.replace('-', '')):
raise ValueError("올바른 한국 전화번호 형식이 아닙니다")
return True
def create_user(self, employee_id: str, name: str, email: str,
department: str, position: str, password: str,
phone: str = None) -> int:
"""사용자 생성"""
# 유효성 검증
self._validate_korean_data(name, phone)
if not re.match(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$', email):
raise ValueError("올바른 이메일 형식이 아닙니다")
# 비밀번호 해시화
password_hash, salt = self._hash_password(password)
with self.get_db_connection() as conn:
cursor = conn.cursor()
try:
cursor.execute("""
INSERT INTO users (
employee_id, name, email, phone, department,
position, password_hash, salt, join_date
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
employee_id, name, email, phone, department,
position, password_hash, salt, datetime.now().date()
))
user_id = cursor.lastrowid
conn.commit()
# [!CAUTION]
# **SQL 인젝션을 조심하세요!**
#
# 절대 문자열 포맷팅(`f"SELECT ... WHERE name='{name}'"`)으로 SQL을 만들지 마세요!
# 해커가 `name`에 `' OR '1'='1` 같은 값을 넣으면 모든 데이터가 털릴 수 있습니다.
# 반드시 `?` (placeholder)를 사용해서 데이터베이스가 값을 안전하게 처리하도록 해야 합니다.
print(f"✅ 사용자 생성 완료: {name} ({employee_id})")
return user_id
except sqlite3.IntegrityError as e:
raise ValueError(f"사용자 생성 실패: {str(e)}")
def authenticate_user(self, employee_id: str, password: str,
ip_address: str = "127.0.0.1") -> Optional[Dict]:
"""사용자 인증"""
with self.get_db_connection() as conn:
cursor = conn.cursor()
# 사용자 정보 조회
cursor.execute("""
SELECT id, employee_id, name, email, department, position,
password_hash, salt, status
FROM users WHERE employee_id = ?
""", (employee_id,))
user = cursor.fetchone()
success = False
if user and user['status'] == 'active':
# 비밀번호 검증
password_hash, _ = self._hash_password(password, user['salt'])
if password_hash == user['password_hash']:
success = True
# 마지막 로그인 시간 업데이트
cursor.execute("""
UPDATE users SET last_login = CURRENT_TIMESTAMP
WHERE id = ?
""", (user['id'],))
# 로그인 이력 기록
cursor.execute("""
INSERT INTO login_history (user_id, ip_address, success)
VALUES (?, ?, ?)
""", (user['id'] if user else None, ip_address, success))
conn.commit()
if success:
return dict(user)
else:
print("❌ 로그인 실패: 잘못된 사용자 정보")
return None
def get_department_members(self, department: str) -> List[Dict]:
"""부서별 직원 조회"""
with self.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT employee_id, name, email, position, join_date, last_login
FROM users
WHERE department = ? AND status = 'active'
ORDER BY position, name
""", (department,))
return [dict(row) for row in cursor.fetchall()]
def get_company_statistics(self) -> Dict:
"""회사 통계 정보"""
with self.get_db_connection() as conn:
cursor = conn.cursor()
# 부서별 직원 수
cursor.execute("""
SELECT department, COUNT(*) as count
FROM users WHERE status = 'active'
GROUP BY department
ORDER BY count DESC
""")
dept_stats = dict(cursor.fetchall())
# 직급별 통계
cursor.execute("""
SELECT position, COUNT(*) as count
FROM users WHERE status = 'active'
GROUP BY position
ORDER BY count DESC
""")
position_stats = dict(cursor.fetchall())
# 최근 로그인 활동
cursor.execute("""
SELECT COUNT(*) as active_users
FROM users
WHERE last_login >= datetime('now', '-7 days')
AND status = 'active'
""")
recent_active = cursor.fetchone()['active_users']
return {
'departments': dept_stats,
'positions': position_stats,
'recent_active_users': recent_active,
'total_users': sum(dept_stats.values())
}
# 사용 예제
def demo_user_management():
"""사용자 관리 시스템 데모"""
um = KoreanUserManagement()
# 테스트 사용자 생성
test_users = [
{
'employee_id': 'DEV001',
'name': '김개발',
'email': 'kim.dev@company.co.kr',
'department': '개발팀',
'position': '시니어 개발자',
'password': 'secure123!',
'phone': '010-1234-5678'
},
{
'employee_id': 'HR001',
'name': '이인사',
'email': 'lee.hr@company.co.kr',
'department': '인사팀',
'position': '팀장',
'password': 'hr2024@',
'phone': '010-2345-6789'
},
{
'employee_id': 'MKT001',
'name': '박마케팅',
'email': 'park.mkt@company.co.kr',
'department': '마케팅팀',
'position': '대리',
'password': 'marketing#1',
'phone': '010-3456-7890'
}
]
# 사용자 생성
for user_data in test_users:
try:
um.create_user(**user_data)
except ValueError as e:
print(f"사용자 생성 실패: {e}")
# 로그인 테스트
print("\n🔐 로그인 테스트:")
user = um.authenticate_user('DEV001', 'secure123!')
if user:
print(f"환영합니다, {user['name']}님! ({user['department']})")
# 부서별 조회
print(f"\n👥 개발팀 멤버:")
dev_members = um.get_department_members('개발팀')
for member in dev_members:
print(f"- {member['name']} ({member['position']})")
# 통계 정보
print(f"\n📊 회사 통계:")
stats = um.get_company_statistics()
print(f"전체 직원 수: {stats['total_users']}명")
print(f"최근 활성 사용자: {stats['recent_active_users']}명")
print("부서별 인원:")
for dept, count in stats['departments'].items():
print(f" {dept}: {count}명")
# 실행
# demo_user_management()
📦 한국 이커머스 재고 관리 시스템
실제 한국 온라인 쇼핑몰에서 사용할 수 있는 재고 관리 시스템입니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
import sqlite3
from datetime import datetime, timedelta
from decimal import Decimal
from typing import Dict, List, Optional, Tuple
from contextlib import contextmanager
import json
class KoreanInventorySystem:
"""한국 이커머스 재고 관리 시스템"""
def __init__(self, db_path: str = "inventory.db"):
self.db_path = db_path
self.init_database()
def init_database(self):
"""데이터베이스 초기화"""
with self.get_db_connection() as conn:
cursor = conn.cursor()
# 상품 카테고리 테이블
cursor.execute("""
CREATE TABLE IF NOT EXISTS categories (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT UNIQUE NOT NULL,
description TEXT,
parent_id INTEGER,
FOREIGN KEY (parent_id) REFERENCES categories (id)
)
""")
# 공급업체 테이블
cursor.execute("""
CREATE TABLE IF NOT EXISTS suppliers (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
contact_person TEXT,
phone TEXT,
email TEXT,
address TEXT,
business_number TEXT UNIQUE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# 상품 테이블
cursor.execute("""
CREATE TABLE IF NOT EXISTS products (
id INTEGER PRIMARY KEY AUTOINCREMENT,
sku TEXT UNIQUE NOT NULL,
name TEXT NOT NULL,
category_id INTEGER NOT NULL,
supplier_id INTEGER NOT NULL,
cost_price DECIMAL(10,2) NOT NULL,
selling_price DECIMAL(10,2) NOT NULL,
description TEXT,
brand TEXT,
weight DECIMAL(5,2),
dimensions TEXT,
status TEXT DEFAULT 'active' CHECK(status IN ('active', 'discontinued')),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (category_id) REFERENCES categories (id),
FOREIGN KEY (supplier_id) REFERENCES suppliers (id)
)
""")
# 재고 테이블
cursor.execute("""
CREATE TABLE IF NOT EXISTS inventory (
product_id INTEGER PRIMARY KEY,
current_stock INTEGER NOT NULL DEFAULT 0,
reserved_stock INTEGER NOT NULL DEFAULT 0,
available_stock INTEGER GENERATED ALWAYS AS (current_stock - reserved_stock) VIRTUAL,
min_stock_level INTEGER DEFAULT 0,
max_stock_level INTEGER DEFAULT 1000,
last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (product_id) REFERENCES products (id)
)
""")
# 재고 변동 이력 테이블
cursor.execute("""
CREATE TABLE IF NOT EXISTS stock_movements (
id INTEGER PRIMARY KEY AUTOINCREMENT,
product_id INTEGER NOT NULL,
movement_type TEXT NOT NULL CHECK(movement_type IN ('in', 'out', 'adjustment')),
quantity INTEGER NOT NULL,
previous_stock INTEGER NOT NULL,
new_stock INTEGER NOT NULL,
reason TEXT NOT NULL,
reference_number TEXT,
cost_per_unit DECIMAL(10,2),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
created_by TEXT,
FOREIGN KEY (product_id) REFERENCES products (id)
)
""")
# 주문 테이블
cursor.execute("""
CREATE TABLE IF NOT EXISTS orders (
id INTEGER PRIMARY KEY AUTOINCREMENT,
order_number TEXT UNIQUE NOT NULL,
customer_name TEXT NOT NULL,
customer_phone TEXT,
customer_address TEXT,
order_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
status TEXT DEFAULT 'pending' CHECK(status IN ('pending', 'confirmed', 'shipped', 'delivered', 'cancelled')),
total_amount DECIMAL(10,2),
shipping_cost DECIMAL(10,2) DEFAULT 3000
)
""")
# 주문 상품 테이블
cursor.execute("""
CREATE TABLE IF NOT EXISTS order_items (
id INTEGER PRIMARY KEY AUTOINCREMENT,
order_id INTEGER NOT NULL,
product_id INTEGER NOT NULL,
quantity INTEGER NOT NULL,
unit_price DECIMAL(10,2) NOT NULL,
total_price DECIMAL(10,2) NOT NULL,
FOREIGN KEY (order_id) REFERENCES orders (id),
FOREIGN KEY (product_id) REFERENCES products (id)
)
""")
# 기본 데이터 삽입
self._insert_sample_data(cursor)
conn.commit()
def _insert_sample_data(self, cursor):
"""샘플 데이터 삽입"""
# 카테고리
categories = [
('전자제품', '컴퓨터, 스마트폰 등'),
('의류', '남성복, 여성복, 아동복'),
('생활용품', '주방용품, 청소용품 등'),
('화장품', '스킨케어, 메이크업'),
('식품', '간식, 음료, 건강식품')
]
cursor.executemany("""
INSERT OR IGNORE INTO categories (name, description) VALUES (?, ?)
""", categories)
# 공급업체
suppliers = [
('삼성전자', '김전자', '02-123-4567', 'contact@samsung.co.kr', '서울시 강남구', '123-45-67890'),
('LG전자', '이전자', '02-234-5678', 'info@lg.co.kr', '서울시 서초구', '234-56-78901'),
('네이버쇼핑', '박쇼핑', '031-345-6789', 'shop@naver.com', '경기도 성남시', '345-67-89012')
]
cursor.executemany("""
INSERT OR IGNORE INTO suppliers (name, contact_person, phone, email, address, business_number)
VALUES (?, ?, ?, ?, ?, ?)
""", suppliers)
@contextmanager
def get_db_connection(self):
"""데이터베이스 연결 컨텍스트 관리자"""
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
try:
yield conn
finally:
conn.close()
def add_product(self, sku: str, name: str, category_name: str,
supplier_name: str, cost_price: float, selling_price: float,
initial_stock: int = 0, min_stock: int = 0) -> int:
"""상품 추가"""
with self.get_db_connection() as conn:
cursor = conn.cursor()
try:
# 카테고리 ID 조회
cursor.execute("SELECT id FROM categories WHERE name = ?", (category_name,))
category = cursor.fetchone()
if not category:
raise ValueError(f"카테고리 '{category_name}'를 찾을 수 없습니다")
# 공급업체 ID 조회
cursor.execute("SELECT id FROM suppliers WHERE name = ?", (supplier_name,))
supplier = cursor.fetchone()
if not supplier:
raise ValueError(f"공급업체 '{supplier_name}'를 찾을 수 없습니다")
# 상품 추가
cursor.execute("""
INSERT INTO products (sku, name, category_id, supplier_id, cost_price, selling_price)
VALUES (?, ?, ?, ?, ?, ?)
""", (sku, name, category['id'], supplier['id'], cost_price, selling_price))
product_id = cursor.lastrowid
# 재고 초기화
cursor.execute("""
INSERT INTO inventory (product_id, current_stock, min_stock_level)
VALUES (?, ?, ?)
""", (product_id, initial_stock, min_stock))
# 재고 변동 이력 기록 (초기 재고)
if initial_stock > 0:
cursor.execute("""
INSERT INTO stock_movements
(product_id, movement_type, quantity, previous_stock, new_stock, reason, cost_per_unit)
VALUES (?, 'in', ?, 0, ?, '초기 재고', ?)
""", (product_id, initial_stock, initial_stock, cost_price))
conn.commit()
print(f"✅ 상품 추가 완료: {name} (SKU: {sku})")
return product_id
except sqlite3.IntegrityError as e:
raise ValueError(f"상품 추가 실패: {str(e)}")
def update_stock(self, sku: str, quantity: int, movement_type: str,
reason: str, cost_per_unit: float = None) -> bool:
"""재고 업데이트"""
if movement_type not in ['in', 'out', 'adjustment']:
raise ValueError("movement_type은 'in', 'out', 'adjustment' 중 하나여야 합니다")
with self.get_db_connection() as conn:
cursor = conn.cursor()
# 상품 조회
cursor.execute("""
SELECT p.id, p.name, i.current_stock
FROM products p
JOIN inventory i ON p.id = i.product_id
WHERE p.sku = ?
""", (sku,))
product = cursor.fetchone()
if not product:
raise ValueError(f"SKU '{sku}'에 해당하는 상품을 찾을 수 없습니다")
previous_stock = product['current_stock']
# 새로운 재고 계산
if movement_type == 'in':
new_stock = previous_stock + quantity
elif movement_type == 'out':
if previous_stock < quantity:
raise ValueError(f"재고 부족: 현재 {previous_stock}개, 요청 {quantity}개")
new_stock = previous_stock - quantity
else: # adjustment
new_stock = quantity
try:
# 재고 업데이트
cursor.execute("""
UPDATE inventory
SET current_stock = ?, last_updated = CURRENT_TIMESTAMP
WHERE product_id = ?
""", (new_stock, product['id']))
# 재고 변동 이력 기록
cursor.execute("""
INSERT INTO stock_movements
(product_id, movement_type, quantity, previous_stock, new_stock, reason, cost_per_unit)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (product['id'], movement_type, quantity, previous_stock, new_stock, reason, cost_per_unit))
conn.commit()
# 재고 부족 알림
cursor.execute("""
SELECT min_stock_level FROM inventory WHERE product_id = ?
""", (product['id'],))
min_stock = cursor.fetchone()['min_stock_level']
if new_stock <= min_stock:
print(f"⚠️ 재고 부족 알림: {product['name']} (현재: {new_stock}, 최소: {min_stock})")
print(f"✅ 재고 업데이트 완료: {product['name']} ({previous_stock} → {new_stock})")
return True
except Exception as e:
conn.rollback()
raise e
def create_order(self, customer_name: str, customer_phone: str,
customer_address: str, items: List[Dict]) -> str:
"""주문 생성"""
order_number = f"ORD{datetime.now().strftime('%Y%m%d%H%M%S')}"
with self.get_db_connection() as conn:
cursor = conn.cursor()
try:
# 재고 확인
for item in items:
cursor.execute("""
SELECT p.id, p.name, p.selling_price, i.current_stock, i.reserved_stock
FROM products p
JOIN inventory i ON p.id = i.product_id
WHERE p.sku = ?
""", (item['sku'],))
product = cursor.fetchone()
if not product:
raise ValueError(f"상품 '{item['sku']}'를 찾을 수 없습니다")
available = product['current_stock'] - product['reserved_stock']
if available < item['quantity']:
raise ValueError(f"재고 부족: {product['name']} (가용: {available}, 요청: {item['quantity']})")
# 주문 생성
total_amount = sum(
cursor.execute("SELECT selling_price FROM products WHERE sku = ?", (item['sku'],)).fetchone()[0] * item['quantity']
for item in items
) + 3000 # 배송비
cursor.execute("""
INSERT INTO orders (order_number, customer_name, customer_phone, customer_address, total_amount)
VALUES (?, ?, ?, ?, ?)
""", (order_number, customer_name, customer_phone, customer_address, total_amount))
order_id = cursor.lastrowid
# 주문 상품 추가 및 재고 예약
for item in items:
cursor.execute("""
SELECT id, selling_price FROM products WHERE sku = ?
""", (item['sku'],))
product = cursor.fetchone()
# 주문 상품 추가
unit_price = product['selling_price']
total_price = unit_price * item['quantity']
cursor.execute("""
INSERT INTO order_items (order_id, product_id, quantity, unit_price, total_price)
VALUES (?, ?, ?, ?, ?)
""", (order_id, product['id'], item['quantity'], unit_price, total_price))
# 재고 예약
cursor.execute("""
UPDATE inventory
SET reserved_stock = reserved_stock + ?
WHERE product_id = ?
""", (item['quantity'], product['id']))
conn.commit()
print(f"✅ 주문 생성 완료: {order_number}")
return order_number
except Exception as e:
conn.rollback()
raise e
def get_low_stock_products(self) -> List[Dict]:
"""재고 부족 상품 조회"""
with self.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT p.sku, p.name, i.current_stock, i.reserved_stock,
(i.current_stock - i.reserved_stock) as available_stock,
i.min_stock_level, s.name as supplier_name, s.phone as supplier_phone
FROM products p
JOIN inventory i ON p.id = i.product_id
JOIN suppliers s ON p.supplier_id = s.id
WHERE (i.current_stock - i.reserved_stock) <= i.min_stock_level
AND p.status = 'active'
ORDER BY available_stock ASC
""")
return [dict(row) for row in cursor.fetchall()]
def get_sales_report(self, days: int = 30) -> Dict:
"""매출 리포트"""
with self.get_db_connection() as conn:
cursor = conn.cursor()
start_date = datetime.now() - timedelta(days=days)
# 기간별 매출
cursor.execute("""
SELECT
COUNT(*) as total_orders,
SUM(total_amount) as total_revenue,
AVG(total_amount) as avg_order_value
FROM orders
WHERE order_date >= ? AND status != 'cancelled'
""", (start_date,))
summary = dict(cursor.fetchone())
# 인기 상품
cursor.execute("""
SELECT p.name, SUM(oi.quantity) as total_sold, SUM(oi.total_price) as revenue
FROM order_items oi
JOIN orders o ON oi.order_id = o.id
JOIN products p ON oi.product_id = p.id
WHERE o.order_date >= ? AND o.status != 'cancelled'
GROUP BY p.id, p.name
ORDER BY total_sold DESC
LIMIT 10
""", (start_date,))
popular_products = [dict(row) for row in cursor.fetchall()]
return {
'period_days': days,
'summary': summary,
'popular_products': popular_products
}
# 사용 예제
def demo_inventory_system():
"""재고 관리 시스템 데모"""
inventory = KoreanInventorySystem()
# 상품 추가
products = [
{
'sku': 'PHONE001',
'name': '갤럭시 S24 Ultra',
'category_name': '전자제품',
'supplier_name': '삼성전자',
'cost_price': 1200000,
'selling_price': 1500000,
'initial_stock': 50,
'min_stock': 10
},
{
'sku': 'LAPTOP001',
'name': 'LG 그램 17인치',
'category_name': '전자제품',
'supplier_name': 'LG전자',
'cost_price': 1800000,
'selling_price': 2200000,
'initial_stock': 20,
'min_stock': 5
}
]
for product in products:
try:
inventory.add_product(**product)
except ValueError as e:
print(f"상품 추가 실패: {e}")
# 주문 생성
order_items = [
{'sku': 'PHONE001', 'quantity': 2},
{'sku': 'LAPTOP001', 'quantity': 1}
]
try:
order_num = inventory.create_order(
"김고객", "010-1234-5678", "서울시 강남구 테헤란로 123",
order_items
)
print(f"주문 번호: {order_num}")
except ValueError as e:
print(f"주문 생성 실패: {e}")
# 재고 부족 상품 확인
low_stock = inventory.get_low_stock_products()
if low_stock:
print(f"\n⚠️ 재고 부족 상품:")
for product in low_stock:
print(f"- {product['name']}: {product['available_stock']}개 (최소: {product['min_stock_level']})")
# 매출 리포트
report = inventory.get_sales_report(30)
print(f"\n📊 30일 매출 리포트:")
print(f"총 주문: {report['summary']['total_orders']}건")
print(f"총 매출: {report['summary']['total_revenue']:,}원")
# 실행
# demo_inventory_system()
🗄️ 데이터베이스 기초
데이터베이스는 구조화된 데이터를 효율적으로 저장하고 관리하는 시스템입니다. Python에서는 다양한 데이터베이스와 연동할 수 있습니다.
graph TB
subgraph "데이터베이스 종류"
A[관계형 DB<br/>RDBMS] --> B[SQLite]
A --> C[MySQL]
A --> D[PostgreSQL]
E[NoSQL DB] --> F[MongoDB]
E --> G[Redis]
E --> H[Cassandra]
end
subgraph "Python 인터페이스"
I[DB-API 2.0] --> J[sqlite3]
I --> K[pymysql]
I --> L[psycopg2]
M[ORM] --> N[SQLAlchemy]
M --> O[Django ORM]
M --> P[Peewee]
end
💾 SQLite 연동
SQLite는 서버 설치 없이 사용할 수 있는 경량 데이터베이스입니다.
SQLite 기본 사용법
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
import sqlite3
from datetime import datetime
from contextlib import closing
# 데이터베이스 연결
def create_connection(db_file):
"""데이터베이스 연결 생성"""
try:
conn = sqlite3.connect(db_file)
return conn
except sqlite3.Error as e:
print(f"연결 오류: {e}")
return None
# 테이블 생성
def create_tables(conn):
"""테이블 생성"""
cursor = conn.cursor()
# 사용자 테이블
cursor.execute("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT NOT NULL UNIQUE,
email TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# 게시글 테이블
cursor.execute("""
CREATE TABLE IF NOT EXISTS posts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
title TEXT NOT NULL,
content TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users (id)
)
""")
conn.commit()
# CRUD 작업
class UserDB:
"""사용자 데이터베이스 작업"""
def __init__(self, db_file):
self.db_file = db_file
def create_user(self, username, email):
"""사용자 생성"""
with closing(sqlite3.connect(self.db_file)) as conn:
cursor = conn.cursor()
try:
cursor.execute(
"INSERT INTO users (username, email) VALUES (?, ?)",
(username, email)
)
conn.commit()
return cursor.lastrowid
except sqlite3.IntegrityError:
print(f"사용자 '{username}'는 이미 존재합니다")
return None
def get_user(self, user_id):
"""사용자 조회"""
with closing(sqlite3.connect(self.db_file)) as conn:
cursor = conn.cursor()
cursor.execute(
"SELECT * FROM users WHERE id = ?", (user_id,)
)
row = cursor.fetchone()
if row:
return {
'id': row[0],
'username': row[1],
'email': row[2],
'created_at': row[3]
}
return None
def update_user(self, user_id, email):
"""사용자 정보 수정"""
with closing(sqlite3.connect(self.db_file)) as conn:
cursor = conn.cursor()
cursor.execute(
"UPDATE users SET email = ? WHERE id = ?",
(email, user_id)
)
conn.commit()
return cursor.rowcount > 0
def delete_user(self, user_id):
"""사용자 삭제"""
with closing(sqlite3.connect(self.db_file)) as conn:
cursor = conn.cursor()
cursor.execute(
"DELETE FROM users WHERE id = ?", (user_id,)
)
conn.commit()
return cursor.rowcount > 0
def list_users(self, limit=10, offset=0):
"""사용자 목록 조회"""
with closing(sqlite3.connect(self.db_file)) as conn:
# Row Factory 설정으로 딕셔너리 형태로 반환
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute(
"SELECT * FROM users LIMIT ? OFFSET ?",
(limit, offset)
)
return [dict(row) for row in cursor.fetchall()]
# 트랜잭션 처리
def transfer_post_ownership(db_file, post_id, new_user_id):
"""게시글 소유권 이전 (트랜잭션 예제)"""
conn = sqlite3.connect(db_file)
try:
cursor = conn.cursor()
# 트랜잭션 시작
conn.execute("BEGIN")
# 게시글 확인
cursor.execute("SELECT user_id FROM posts WHERE id = ?", (post_id,))
result = cursor.fetchone()
if not result:
raise ValueError("게시글을 찾을 수 없습니다")
old_user_id = result[0]
# 새 사용자 확인
cursor.execute("SELECT id FROM users WHERE id = ?", (new_user_id,))
if not cursor.fetchone():
raise ValueError("새 사용자를 찾을 수 없습니다")
# 소유권 이전
cursor.execute(
"UPDATE posts SET user_id = ? WHERE id = ?",
(new_user_id, post_id)
)
# 로그 기록 (예시)
cursor.execute("""
INSERT INTO transfer_logs (post_id, old_user_id, new_user_id)
VALUES (?, ?, ?)
""", (post_id, old_user_id, new_user_id))
# 커밋
conn.commit()
print(f"게시글 {post_id} 소유권 이전 완료")
except Exception as e:
# 롤백
conn.rollback()
print(f"트랜잭션 실패: {e}")
finally:
conn.close()
SQLite 고급 기능
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
import json
from typing import List, Dict, Any
class AdvancedSQLite:
"""SQLite 고급 기능"""
def __init__(self, db_file):
self.db_file = db_file
self._init_db()
def _init_db(self):
"""데이터베이스 초기화"""
conn = sqlite3.connect(self.db_file)
# JSON 지원 활성화
conn.execute("PRAGMA journal_mode=WAL") # Write-Ahead Logging
conn.execute("PRAGMA foreign_keys=ON") # 외래키 제약 활성화
# 인덱스 생성
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_users_email
ON users(email)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_posts_user_id
ON posts(user_id)
""")
conn.close()
def bulk_insert_users(self, users: List[Dict[str, str]]):
"""대량 삽입"""
with closing(sqlite3.connect(self.db_file)) as conn:
cursor = conn.cursor()
cursor.executemany(
"INSERT INTO users (username, email) VALUES (?, ?)",
[(u['username'], u['email']) for u in users]
)
conn.commit()
print(f"{len(users)}명의 사용자 추가됨")
def search_posts(self, keyword: str):
"""전문 검색"""
with closing(sqlite3.connect(self.db_file)) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
# LIKE 검색
cursor.execute("""
SELECT p.*, u.username
FROM posts p
JOIN users u ON p.user_id = u.id
WHERE p.title LIKE ? OR p.content LIKE ?
ORDER BY p.created_at DESC
""", (f"%{keyword}%", f"%{keyword}%"))
return [dict(row) for row in cursor.fetchall()]
def get_user_statistics(self):
"""사용자 통계"""
with closing(sqlite3.connect(self.db_file)) as conn:
cursor = conn.cursor()
# 집계 함수 사용
cursor.execute("""
SELECT
COUNT(DISTINCT u.id) as total_users,
COUNT(p.id) as total_posts,
AVG(post_count) as avg_posts_per_user
FROM users u
LEFT JOIN (
SELECT user_id, COUNT(*) as post_count
FROM posts
GROUP BY user_id
) p ON u.id = p.user_id
""")
return dict(zip(
['total_users', 'total_posts', 'avg_posts_per_user'],
cursor.fetchone()
))
def backup_database(self, backup_file):
"""데이터베이스 백업"""
with closing(sqlite3.connect(self.db_file)) as conn:
with closing(sqlite3.connect(backup_file)) as backup:
conn.backup(backup)
print(f"백업 완료: {backup_file}")
# 사용자 정의 함수
def register_custom_functions(conn):
"""사용자 정의 SQL 함수 등록"""
# 문자열 길이 함수
def str_len(s):
return len(s) if s else 0
# JSON 파싱 함수
def json_extract(json_str, key):
try:
data = json.loads(json_str)
return data.get(key)
except:
return None
conn.create_function("STR_LEN", 1, str_len)
conn.create_function("JSON_EXTRACT", 2, json_extract)
# 컨텍스트 관리자로 데이터베이스 사용
class DatabaseConnection:
"""데이터베이스 연결 컨텍스트 관리자"""
def __init__(self, db_file):
self.db_file = db_file
self.conn = None
def __enter__(self):
self.conn = sqlite3.connect(self.db_file)
self.conn.row_factory = sqlite3.Row
return self.conn
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type:
self.conn.rollback()
else:
self.conn.commit()
self.conn.close()
# 사용 예
with DatabaseConnection("myapp.db") as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM users")
users = cursor.fetchall()
🐬 MySQL 연동
MySQL은 가장 인기 있는 오픈소스 관계형 데이터베이스입니다.
MySQL 기본 연동
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
import pymysql
from pymysql.cursors import DictCursor
import configparser
class MySQLDatabase:
"""MySQL 데이터베이스 클래스"""
def __init__(self, config_file='db_config.ini'):
self.config = self._load_config(config_file)
self.connection = None
def _load_config(self, config_file):
"""설정 파일 로드"""
config = configparser.ConfigParser()
config.read(config_file)
return {
'host': config.get('mysql', 'host'),
'user': config.get('mysql', 'user'),
'password': config.get('mysql', 'password'),
'database': config.get('mysql', 'database'),
'charset': config.get('mysql', 'charset', fallback='utf8mb4')
}
def connect(self):
"""데이터베이스 연결"""
try:
self.connection = pymysql.connect(
**self.config,
cursorclass=DictCursor
)
return True
except pymysql.Error as e:
print(f"연결 오류: {e}")
return False
def disconnect(self):
"""연결 종료"""
if self.connection:
self.connection.close()
def execute_query(self, query, params=None):
"""쿼리 실행"""
with self.connection.cursor() as cursor:
cursor.execute(query, params or ())
return cursor.fetchall()
def execute_insert(self, query, params=None):
"""INSERT 실행"""
with self.connection.cursor() as cursor:
cursor.execute(query, params or ())
self.connection.commit()
return cursor.lastrowid
def execute_many(self, query, params_list):
"""대량 실행"""
with self.connection.cursor() as cursor:
cursor.executemany(query, params_list)
self.connection.commit()
return cursor.rowcount
# 커넥션 풀 사용
from dbutils.pooled_db import PooledDB
class MySQLPool:
"""MySQL 커넥션 풀"""
def __init__(self, **config):
self.pool = PooledDB(
creator=pymysql,
maxconnections=10,
mincached=2,
maxcached=5,
blocking=True,
**config
)
def get_connection(self):
"""풀에서 연결 가져오기"""
return self.pool.connection()
# 실전 예제: 블로그 시스템
class BlogDB:
"""블로그 데이터베이스"""
def __init__(self, pool):
self.pool = pool
def create_post(self, author_id, title, content, tags=None):
"""게시글 생성"""
conn = self.pool.get_connection()
try:
with conn.cursor() as cursor:
# 게시글 삽입
cursor.execute("""
INSERT INTO posts (author_id, title, content)
VALUES (%s, %s, %s)
""", (author_id, title, content))
post_id = cursor.lastrowid
# 태그 처리
if tags:
for tag in tags:
# 태그 확인/생성
cursor.execute(
"INSERT IGNORE INTO tags (name) VALUES (%s)",
(tag,)
)
# 태그 ID 가져오기
cursor.execute(
"SELECT id FROM tags WHERE name = %s",
(tag,)
)
tag_id = cursor.fetchone()['id']
# 관계 테이블에 추가
cursor.execute("""
INSERT INTO post_tags (post_id, tag_id)
VALUES (%s, %s)
""", (post_id, tag_id))
conn.commit()
return post_id
except Exception as e:
conn.rollback()
raise e
finally:
conn.close()
def get_posts_by_tag(self, tag_name, limit=10, offset=0):
"""태그별 게시글 조회"""
conn = self.pool.get_connection()
try:
with conn.cursor() as cursor:
cursor.execute("""
SELECT p.*, GROUP_CONCAT(t.name) as tags
FROM posts p
JOIN post_tags pt ON p.id = pt.post_id
JOIN tags t ON pt.tag_id = t.id
WHERE p.id IN (
SELECT DISTINCT p2.id
FROM posts p2
JOIN post_tags pt2 ON p2.id = pt2.post_id
JOIN tags t2 ON pt2.tag_id = t2.id
WHERE t2.name = %s
)
GROUP BY p.id
ORDER BY p.created_at DESC
LIMIT %s OFFSET %s
""", (tag_name, limit, offset))
return cursor.fetchall()
finally:
conn.close()
[!TIP] with 문을 쓰면 close()가 필요 없어요!
데이터베이스 연결은 꼭 닫아줘야 하는데, 깜빡하기 쉽습니다.
with sqlite3.connect(...) as conn:처럼 사용하면, 블록이 끝날 때 자동으로 연결을 닫아줍니다. 에러가 나도 안전하게 닫히니 꼭with문을 사용하세요!
🐘 PostgreSQL 연동
PostgreSQL은 고급 기능을 제공하는 강력한 오픈소스 데이터베이스입니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
import psycopg2
from psycopg2.extras import RealDictCursor, execute_values
import psycopg2.pool
from contextlib import contextmanager
class PostgreSQLDatabase:
"""PostgreSQL 데이터베이스 클래스"""
def __init__(self, dsn):
self.dsn = dsn
self.pool = psycopg2.pool.SimpleConnectionPool(1, 20, dsn)
@contextmanager
def get_cursor(self):
"""커서 컨텍스트 관리자"""
conn = self.pool.getconn()
try:
with conn:
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
yield cursor
finally:
self.pool.putconn(conn)
def create_schema(self):
"""스키마 생성"""
with self.get_cursor() as cursor:
# ENUM 타입 생성
cursor.execute("""
DO $$ BEGIN
CREATE TYPE user_role AS ENUM ('admin', 'user', 'guest');
EXCEPTION
WHEN duplicate_object THEN null;
END $$;
""")
# 테이블 생성
cursor.execute("""
CREATE TABLE IF NOT EXISTS users (
id SERIAL PRIMARY KEY,
username VARCHAR(50) UNIQUE NOT NULL,
email VARCHAR(100) NOT NULL,
role user_role DEFAULT 'user',
metadata JSONB,
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
)
""")
# 인덱스 생성
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_users_email
ON users(email);
CREATE INDEX IF NOT EXISTS idx_users_metadata
ON users USING GIN (metadata);
""")
# JSONB 활용
class UserRepository:
"""사용자 저장소"""
def __init__(self, db):
self.db = db
def create_user(self, username, email, metadata=None):
"""사용자 생성"""
with self.db.get_cursor() as cursor:
cursor.execute("""
INSERT INTO users (username, email, metadata)
VALUES (%(username)s, %(email)s, %(metadata)s)
RETURNING id, created_at
""", {
'username': username,
'email': email,
'metadata': json.dumps(metadata or {})
})
return cursor.fetchone()
def find_by_metadata(self, key, value):
"""메타데이터로 검색"""
with self.db.get_cursor() as cursor:
cursor.execute("""
SELECT * FROM users
WHERE metadata @> %s
""", (json.dumps({key: value}),))
return cursor.fetchall()
def update_metadata(self, user_id, updates):
"""메타데이터 업데이트"""
with self.db.get_cursor() as cursor:
cursor.execute("""
UPDATE users
SET metadata = metadata || %s
WHERE id = %s
RETURNING metadata
""", (json.dumps(updates), user_id))
return cursor.fetchone()
# 고급 쿼리
class AdvancedPostgreSQL:
"""PostgreSQL 고급 기능"""
def __init__(self, db):
self.db = db
def full_text_search(self, query):
"""전문 검색"""
with self.db.get_cursor() as cursor:
cursor.execute("""
SELECT id, title, content,
ts_rank(search_vector, plainto_tsquery('english', %s)) as rank
FROM posts
WHERE search_vector @@ plainto_tsquery('english', %s)
ORDER BY rank DESC
LIMIT 10
""", (query, query))
return cursor.fetchall()
def window_function_example(self):
"""윈도우 함수 예제"""
with self.db.get_cursor() as cursor:
cursor.execute("""
SELECT
user_id,
created_at,
COUNT(*) OVER (PARTITION BY user_id) as total_posts,
ROW_NUMBER() OVER (
PARTITION BY user_id
ORDER BY created_at DESC
) as post_rank
FROM posts
""")
return cursor.fetchall()
def cte_example(self):
"""CTE (Common Table Expression) 예제"""
with self.db.get_cursor() as cursor:
cursor.execute("""
WITH user_stats AS (
SELECT
u.id,
u.username,
COUNT(p.id) as post_count,
MAX(p.created_at) as last_post_date
FROM users u
LEFT JOIN posts p ON u.id = p.user_id
GROUP BY u.id, u.username
)
SELECT *
FROM user_stats
WHERE post_count > 5
ORDER BY post_count DESC
""")
return cursor.fetchall()
🍃 MongoDB 연동
MongoDB는 대표적인 NoSQL 문서 데이터베이스입니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
from pymongo import MongoClient, ASCENDING, DESCENDING
from bson.objectid import ObjectId
from datetime import datetime
import motor.motor_asyncio # 비동기 지원
class MongoDatabase:
"""MongoDB 클래스"""
def __init__(self, connection_string, database_name):
self.client = MongoClient(connection_string)
self.db = self.client[database_name]
def create_indexes(self):
"""인덱스 생성"""
# 사용자 컬렉션
users = self.db.users
users.create_index([("username", ASCENDING)], unique=True)
users.create_index([("email", ASCENDING)])
# 게시글 컬렉션
posts = self.db.posts
posts.create_index([("author_id", ASCENDING)])
posts.create_index([("created_at", DESCENDING)])
posts.create_index([("tags", ASCENDING)])
# 텍스트 인덱스
posts.create_index([("title", "text"), ("content", "text")])
# 문서 작업
class BlogMongoDB:
"""블로그 MongoDB"""
def __init__(self, db):
self.db = db
self.users = db.users
self.posts = db.posts
def create_user(self, username, email, profile=None):
"""사용자 생성"""
user = {
"username": username,
"email": email,
"profile": profile or {},
"created_at": datetime.utcnow(),
"post_count": 0
}
result = self.users.insert_one(user)
return str(result.inserted_id)
def create_post(self, author_id, title, content, tags=None):
"""게시글 생성"""
post = {
"author_id": ObjectId(author_id),
"title": title,
"content": content,
"tags": tags or [],
"created_at": datetime.utcnow(),
"updated_at": datetime.utcnow(),
"views": 0,
"likes": [],
"comments": []
}
result = self.posts.insert_one(post)
# 사용자 게시글 수 증가
self.users.update_one(
{"_id": ObjectId(author_id)},
{"$inc": {"post_count": 1}}
)
return str(result.inserted_id)
def add_comment(self, post_id, user_id, content):
"""댓글 추가"""
comment = {
"_id": ObjectId(),
"user_id": ObjectId(user_id),
"content": content,
"created_at": datetime.utcnow()
}
self.posts.update_one(
{"_id": ObjectId(post_id)},
{"$push": {"comments": comment}}
)
return str(comment["_id"])
def like_post(self, post_id, user_id):
"""게시글 좋아요"""
self.posts.update_one(
{"_id": ObjectId(post_id)},
{"$addToSet": {"likes": ObjectId(user_id)}}
)
def search_posts(self, query, limit=10):
"""텍스트 검색"""
return list(self.posts.find(
{"$text": {"$search": query}},
{"score": {"$meta": "textScore"}}
).sort([("score", {"$meta": "textScore"})]).limit(limit))
def aggregate_stats(self):
"""집계 통계"""
pipeline = [
{
"$lookup": {
"from": "users",
"localField": "author_id",
"foreignField": "_id",
"as": "author"
}
},
{
"$unwind": "$author"
},
{
"$group": {
"_id": "$author.username",
"total_posts": {"$sum": 1},
"total_views": {"$sum": "$views"},
"total_likes": {"$sum": {"$size": "$likes"}},
"avg_comments": {"$avg": {"$size": "$comments"}}
}
},
{
"$sort": {"total_posts": -1}
}
]
return list(self.posts.aggregate(pipeline))
# 비동기 MongoDB
class AsyncMongoDB:
"""비동기 MongoDB"""
def __init__(self, connection_string, database_name):
self.client = motor.motor_asyncio.AsyncIOMotorClient(connection_string)
self.db = self.client[database_name]
async def find_posts_by_author(self, author_id, skip=0, limit=10):
"""작성자별 게시글 조회"""
cursor = self.db.posts.find(
{"author_id": ObjectId(author_id)}
).skip(skip).limit(limit).sort("created_at", -1)
posts = []
async for post in cursor:
posts.append(post)
return posts
async def bulk_update_views(self, post_views):
"""대량 조회수 업데이트"""
operations = []
for post_id, views in post_views.items():
operations.append({
"filter": {"_id": ObjectId(post_id)},
"update": {"$inc": {"views": views}}
})
if operations:
result = await self.db.posts.bulk_write(operations)
return result.modified_count
🔗 SQLAlchemy ORM
SQLAlchemy는 Python의 가장 강력한 ORM(Object-Relational Mapping) 라이브러리입니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
from sqlalchemy import create_engine, Column, Integer, String, DateTime, Text, ForeignKey, Table
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, relationship, scoped_session
from sqlalchemy.sql import func
from datetime import datetime
Base = declarative_base()
# 다대다 관계 테이블
post_tags = Table('post_tags', Base.metadata,
Column('post_id', Integer, ForeignKey('posts.id')),
Column('tag_id', Integer, ForeignKey('tags.id'))
)
# 모델 정의
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
username = Column(String(50), unique=True, nullable=False)
email = Column(String(100), nullable=False)
created_at = Column(DateTime, default=func.now())
# 관계
posts = relationship('Post', back_populates='author', cascade='all, delete-orphan')
def __repr__(self):
return f"<User(username='{self.username}', email='{self.email}')>"
class Post(Base):
__tablename__ = 'posts'
id = Column(Integer, primary_key=True)
title = Column(String(200), nullable=False)
content = Column(Text)
author_id = Column(Integer, ForeignKey('users.id'), nullable=False)
created_at = Column(DateTime, default=func.now())
updated_at = Column(DateTime, default=func.now(), onupdate=func.now())
# 관계
author = relationship('User', back_populates='posts')
tags = relationship('Tag', secondary=post_tags, back_populates='posts')
comments = relationship('Comment', back_populates='post', cascade='all, delete-orphan')
def __repr__(self):
return f"<Post(title='{self.title}', author='{self.author.username}')>"
class Tag(Base):
__tablename__ = 'tags'
id = Column(Integer, primary_key=True)
name = Column(String(50), unique=True, nullable=False)
# 관계
posts = relationship('Post', secondary=post_tags, back_populates='tags')
def __repr__(self):
return f"<Tag(name='{self.name}')>"
class Comment(Base):
__tablename__ = 'comments'
id = Column(Integer, primary_key=True)
content = Column(Text, nullable=False)
post_id = Column(Integer, ForeignKey('posts.id'), nullable=False)
user_id = Column(Integer, ForeignKey('users.id'), nullable=False)
created_at = Column(DateTime, default=func.now())
# 관계
post = relationship('Post', back_populates='comments')
user = relationship('User')
# 데이터베이스 설정
class Database:
"""데이터베이스 관리 클래스"""
def __init__(self, database_url):
self.engine = create_engine(database_url, echo=False)
self.SessionLocal = sessionmaker(bind=self.engine)
self.Session = scoped_session(self.SessionLocal)
def create_tables(self):
"""테이블 생성"""
Base.metadata.create_all(self.engine)
def get_session(self):
"""세션 반환"""
return self.Session()
def close_session(self):
"""세션 종료"""
self.Session.remove()
# Repository 패턴
class UserRepository:
"""사용자 Repository"""
def __init__(self, session):
self.session = session
def create(self, username, email):
"""사용자 생성"""
user = User(username=username, email=email)
self.session.add(user)
self.session.commit()
return user
def find_by_id(self, user_id):
"""ID로 조회"""
return self.session.query(User).filter(User.id == user_id).first()
def find_by_username(self, username):
"""사용자명으로 조회"""
return self.session.query(User).filter(User.username == username).first()
def update(self, user_id, **kwargs):
"""사용자 정보 수정"""
user = self.find_by_id(user_id)
if user:
for key, value in kwargs.items():
setattr(user, key, value)
self.session.commit()
return user
def delete(self, user_id):
"""사용자 삭제"""
user = self.find_by_id(user_id)
if user:
self.session.delete(user)
self.session.commit()
return True
return False
class PostRepository:
"""게시글 Repository"""
def __init__(self, session):
self.session = session
def create(self, author_id, title, content, tags=None):
"""게시글 생성"""
post = Post(
author_id=author_id,
title=title,
content=content
)
# 태그 처리
if tags:
for tag_name in tags:
tag = self.session.query(Tag).filter(Tag.name == tag_name).first()
if not tag:
tag = Tag(name=tag_name)
post.tags.append(tag)
self.session.add(post)
self.session.commit()
return post
def find_with_author(self, post_id):
"""작성자 정보와 함께 조회"""
return self.session.query(Post).join(User).filter(Post.id == post_id).first()
def search(self, keyword, limit=10, offset=0):
"""검색"""
return self.session.query(Post).filter(
(Post.title.contains(keyword)) |
(Post.content.contains(keyword))
).limit(limit).offset(offset).all()
def find_by_tag(self, tag_name):
"""태그로 조회"""
return self.session.query(Post).join(Post.tags).filter(
Tag.name == tag_name
).all()
# 고급 쿼리
class AdvancedQueries:
"""고급 쿼리 예제"""
def __init__(self, session):
self.session = session
def popular_posts(self, days=7):
"""인기 게시글 (댓글 수 기준)"""
from sqlalchemy import and_
from datetime import datetime, timedelta
since = datetime.utcnow() - timedelta(days=days)
return self.session.query(
Post,
func.count(Comment.id).label('comment_count')
).join(Comment).filter(
Post.created_at >= since
).group_by(Post.id).order_by(
func.count(Comment.id).desc()
).limit(10).all()
def user_statistics(self):
"""사용자 통계"""
return self.session.query(
User.username,
func.count(Post.id).label('post_count'),
func.count(Comment.id).label('comment_count')
).outerjoin(Post).outerjoin(Comment, User.id == Comment.user_id).group_by(
User.id
).all()
💡 실전 예제
1. 데이터베이스 마이그레이션 도구
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
import sqlite3
import json
from datetime import datetime
import hashlib
class DatabaseMigration:
"""데이터베이스 마이그레이션"""
def __init__(self, db_path):
self.db_path = db_path
self.conn = sqlite3.connect(db_path)
self._create_migration_table()
def _create_migration_table(self):
"""마이그레이션 테이블 생성"""
self.conn.execute("""
CREATE TABLE IF NOT EXISTS migrations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
version TEXT UNIQUE NOT NULL,
description TEXT,
applied_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
self.conn.commit()
def get_current_version(self):
"""현재 버전 조회"""
cursor = self.conn.execute(
"SELECT version FROM migrations ORDER BY id DESC LIMIT 1"
)
result = cursor.fetchone()
return result[0] if result else "0.0.0"
def apply_migration(self, version, description, up_sql, down_sql=None):
"""마이그레이션 적용"""
current = self.get_current_version()
if version <= current:
print(f"버전 {version}은 이미 적용되었습니다")
return False
try:
# UP 마이그레이션 실행
self.conn.executescript(up_sql)
# 마이그레이션 기록
self.conn.execute("""
INSERT INTO migrations (version, description)
VALUES (?, ?)
""", (version, description))
self.conn.commit()
print(f"마이그레이션 {version} 적용 완료: {description}")
return True
except Exception as e:
self.conn.rollback()
print(f"마이그레이션 실패: {e}")
# DOWN 마이그레이션 시도
if down_sql:
try:
self.conn.executescript(down_sql)
self.conn.commit()
print("롤백 완료")
except:
print("롤백 실패")
return False
def rollback_to_version(self, target_version):
"""특정 버전으로 롤백"""
# 구현 생략
pass
# 마이그레이션 정의
migrations = [
{
"version": "1.0.0",
"description": "초기 스키마 생성",
"up": """
CREATE TABLE users (
id INTEGER PRIMARY KEY,
username TEXT UNIQUE NOT NULL,
email TEXT NOT NULL
);
CREATE TABLE posts (
id INTEGER PRIMARY KEY,
user_id INTEGER NOT NULL,
title TEXT NOT NULL,
content TEXT,
FOREIGN KEY (user_id) REFERENCES users(id)
);
""",
"down": """
DROP TABLE posts;
DROP TABLE users;
"""
},
{
"version": "1.1.0",
"description": "타임스탬프 추가",
"up": """
ALTER TABLE users ADD COLUMN created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP;
ALTER TABLE posts ADD COLUMN created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP;
ALTER TABLE posts ADD COLUMN updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP;
""",
"down": None # SQLite는 DROP COLUMN을 지원하지 않음
}
]
# 마이그레이션 실행
migration = DatabaseMigration("app.db")
for m in migrations:
migration.apply_migration(
m["version"],
m["description"],
m["up"],
m.get("down")
)
2. 데이터베이스 백업 및 복구
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
import os
import gzip
import shutil
from datetime import datetime
class DatabaseBackup:
"""데이터베이스 백업 관리"""
def __init__(self, db_type="sqlite"):
self.db_type = db_type
self.backup_dir = "backups"
os.makedirs(self.backup_dir, exist_ok=True)
def backup_sqlite(self, db_path):
"""SQLite 백업"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_name = f"backup_{timestamp}.db"
backup_path = os.path.join(self.backup_dir, backup_name)
# 데이터베이스 복사
shutil.copy2(db_path, backup_path)
# 압축
compressed_path = f"{backup_path}.gz"
with open(backup_path, 'rb') as f_in:
with gzip.open(compressed_path, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
# 원본 백업 파일 삭제
os.remove(backup_path)
print(f"백업 완료: {compressed_path}")
return compressed_path
def backup_mysql(self, config):
"""MySQL 백업"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_name = f"mysql_backup_{timestamp}.sql"
backup_path = os.path.join(self.backup_dir, backup_name)
# mysqldump 실행
cmd = (
f"mysqldump -h {config['host']} "
f"-u {config['user']} -p{config['password']} "
f"{config['database']} > {backup_path}"
)
os.system(cmd)
# 압축
os.system(f"gzip {backup_path}")
compressed_path = f"{backup_path}.gz"
print(f"백업 완료: {compressed_path}")
return compressed_path
def restore_sqlite(self, backup_path, target_path):
"""SQLite 복구"""
# 압축 해제
if backup_path.endswith('.gz'):
with gzip.open(backup_path, 'rb') as f_in:
with open(target_path, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
else:
shutil.copy2(backup_path, target_path)
print(f"복구 완료: {target_path}")
def cleanup_old_backups(self, days=7):
"""오래된 백업 삭제"""
from datetime import timedelta
cutoff = datetime.now() - timedelta(days=days)
for filename in os.listdir(self.backup_dir):
filepath = os.path.join(self.backup_dir, filename)
if os.path.getctime(filepath) < cutoff.timestamp():
os.remove(filepath)
print(f"삭제됨: {filename}")
3. 캐싱 레이어
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
import redis
import pickle
import functools
from datetime import timedelta
class DatabaseCache:
"""데이터베이스 캐싱"""
def __init__(self, redis_url="redis://localhost:6379"):
self.redis = redis.from_url(redis_url)
self.default_ttl = 3600 # 1시간
def cache_key(self, prefix, *args, **kwargs):
"""캐시 키 생성"""
key_parts = [prefix]
key_parts.extend(str(arg) for arg in args)
key_parts.extend(f"{k}:{v}" for k, v in sorted(kwargs.items()))
return ":".join(key_parts)
def get(self, key):
"""캐시 조회"""
data = self.redis.get(key)
if data:
return pickle.loads(data)
return None
def set(self, key, value, ttl=None):
"""캐시 저장"""
ttl = ttl or self.default_ttl
self.redis.setex(key, ttl, pickle.dumps(value))
def delete(self, key):
"""캐시 삭제"""
self.redis.delete(key)
def invalidate_pattern(self, pattern):
"""패턴 기반 캐시 무효화"""
for key in self.redis.scan_iter(match=pattern):
self.redis.delete(key)
# 캐싱 데코레이터
def cached(cache, prefix, ttl=3600):
"""캐싱 데코레이터"""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
# 캐시 키 생성
cache_key = cache.cache_key(prefix, *args, **kwargs)
# 캐시 조회
result = cache.get(cache_key)
if result is not None:
return result
# 함수 실행
result = func(*args, **kwargs)
# 캐시 저장
cache.set(cache_key, result, ttl)
return result
wrapper.invalidate = lambda: cache.invalidate_pattern(f"{prefix}:*")
return wrapper
return decorator
# 사용 예제
cache = DatabaseCache()
class CachedUserRepository:
"""캐싱이 적용된 사용자 Repository"""
def __init__(self, db):
self.db = db
@cached(cache, "user", ttl=1800)
def find_by_id(self, user_id):
"""사용자 조회 (캐싱)"""
with self.db.get_cursor() as cursor:
cursor.execute(
"SELECT * FROM users WHERE id = %s",
(user_id,)
)
return cursor.fetchone()
def update_user(self, user_id, **updates):
"""사용자 수정"""
# 데이터베이스 업데이트
with self.db.get_cursor() as cursor:
set_clause = ", ".join(f"{k} = %s" for k in updates.keys())
values = list(updates.values()) + [user_id]
cursor.execute(
f"UPDATE users SET {set_clause} WHERE id = %s",
values
)
# 캐시 무효화
cache_key = cache.cache_key("user", user_id)
cache.delete(cache_key)
⚠️ 초보자가 자주 하는 실수들
1. SQL 인젝션 취약점
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# ❌ 위험한 SQL 인젝션 취약점
import sqlite3
def bad_user_login(username, password):
"""SQL 인젝션에 취약한 로그인 함수"""
conn = sqlite3.connect('users.db')
cursor = conn.cursor()
# 절대 이렇게 하지 마세요! SQL 인젝션 위험
query = f"SELECT * FROM users WHERE username = '{username}' AND password = '{password}'"
cursor.execute(query)
user = cursor.fetchone()
conn.close()
return user
# 공격자가 이런 입력을 하면...
# username: admin' OR '1'='1' --
# password: anything
# 실제 실행되는 쿼리:
# SELECT * FROM users WHERE username = 'admin' OR '1'='1' --' AND password = 'anything'
# 모든 사용자 정보가 노출됨!
# ✅ 안전한 Prepared Statement 사용
def safe_user_login(username, password):
"""SQL 인젝션으로부터 안전한 로그인 함수"""
conn = sqlite3.connect('users.db')
cursor = conn.cursor()
# Prepared Statement 사용 (? 플레이스홀더)
query = "SELECT * FROM users WHERE username = ? AND password = ?"
cursor.execute(query, (username, password))
user = cursor.fetchone()
conn.close()
return user
# 다른 데이터베이스에서도 안전한 방법
import pymysql
def safe_mysql_query(user_id):
"""MySQL에서 안전한 쿼리"""
conn = pymysql.connect(host='localhost', user='root', password='password', database='mydb')
cursor = conn.cursor()
# %s 플레이스홀더 사용 (MySQL)
cursor.execute("SELECT * FROM products WHERE user_id = %s", (user_id,))
products = cursor.fetchall()
cursor.close()
conn.close()
return products
# 동적 쿼리가 필요한 경우 화이트리스트 사용
def safe_dynamic_query(table_name, column_name, value):
"""동적 쿼리 안전하게 만들기"""
# 테이블명과 컬럼명은 화이트리스트로 검증
allowed_tables = ['users', 'products', 'orders']
allowed_columns = ['id', 'name', 'email', 'status']
if table_name not in allowed_tables:
raise ValueError(f"허용되지 않은 테이블: {table_name}")
if column_name not in allowed_columns:
raise ValueError(f"허용되지 않은 컬럼: {column_name}")
conn = sqlite3.connect('app.db')
cursor = conn.cursor()
# 테이블명과 컬럼명은 문자열 포맷팅, 값은 플레이스홀더
query = f"SELECT * FROM {table_name} WHERE {column_name} = ?"
cursor.execute(query, (value,))
results = cursor.fetchall()
cursor.close()
conn.close()
return results
2. 연결 누수 (Connection Leak)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# ❌ 연결을 제대로 닫지 않는 실수
import sqlite3
import time
def bad_connection_management():
"""연결 누수가 발생하는 나쁜 예"""
for i in range(100):
conn = sqlite3.connect('test.db') # 새 연결 생성
cursor = conn.cursor()
cursor.execute("SELECT * FROM users LIMIT 1")
result = cursor.fetchone()
# conn.close() 호출 안함! 메모리 누수 발생
print(f"처리 {i+1}: {result}")
def bad_exception_handling():
"""예외 발생 시 연결이 닫히지 않는 문제"""
conn = sqlite3.connect('test.db')
cursor = conn.cursor()
try:
cursor.execute("SELECT * FROM non_existent_table") # 에러 발생
result = cursor.fetchall()
except Exception as e:
print(f"에러 발생: {e}")
# 여기서 return하면 conn.close()가 실행되지 않음!
return
conn.close() # 예외 시 실행되지 않음
# ✅ 올바른 연결 관리 방법
# 방법 1: try-finally 사용
def good_connection_finally():
"""try-finally로 안전한 연결 관리"""
conn = None
try:
conn = sqlite3.connect('test.db')
cursor = conn.cursor()
cursor.execute("SELECT * FROM users LIMIT 5")
results = cursor.fetchall()
return results
except Exception as e:
print(f"데이터베이스 에러: {e}")
return []
finally:
# 예외가 발생해도 항상 실행됨
if conn:
conn.close()
# 방법 2: with 문 사용 (권장)
def good_connection_with():
"""with 문으로 자동 연결 관리"""
try:
with sqlite3.connect('test.db') as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM users LIMIT 5")
results = cursor.fetchall()
return results
# with 블록을 벗어나면 자동으로 연결 종료
except Exception as e:
print(f"데이터베이스 에러: {e}")
return []
# 방법 3: 컨텍스트 관리자 클래스
class DatabaseManager:
"""데이터베이스 연결 관리 클래스"""
def __init__(self, db_path):
self.db_path = db_path
self.conn = None
def __enter__(self):
self.conn = sqlite3.connect(self.db_path)
self.conn.row_factory = sqlite3.Row
return self.conn
def __exit__(self, exc_type, exc_val, exc_tb):
if self.conn:
if exc_type is None:
self.conn.commit() # 성공 시 커밋
else:
self.conn.rollback() # 실패 시 롤백
self.conn.close()
# 사용
def use_database_manager():
"""데이터베이스 매니저 사용"""
with DatabaseManager('test.db') as conn:
cursor = conn.cursor()
cursor.execute("INSERT INTO users (name, email) VALUES (?, ?)",
("김테스트", "test@example.com"))
cursor.execute("SELECT * FROM users WHERE name = ?", ("김테스트",))
result = cursor.fetchone()
return dict(result) if result else None
3. 트랜잭션 관리 실수
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# ❌ 트랜잭션을 제대로 관리하지 않는 실수
import sqlite3
def bad_money_transfer(from_account, to_account, amount):
"""트랜잭션 관리가 없는 송금 (위험!)"""
conn = sqlite3.connect('bank.db')
cursor = conn.cursor()
# 출금 (이 단계에서 에러가 나면?)
cursor.execute("UPDATE accounts SET balance = balance - ? WHERE id = ?",
(amount, from_account))
# 시뮬레이션: 여기서 오류 발생 가능
import random
if random.random() < 0.3: # 30% 확률로 에러
raise Exception("시스템 오류 발생!")
# 입금 (위의 에러로 인해 실행되지 않을 수 있음)
cursor.execute("UPDATE accounts SET balance = balance + ? WHERE id = ?",
(amount, to_account))
conn.commit() # 이것도 실행되지 않을 수 있음
conn.close()
# ✅ 올바른 트랜잭션 관리
def good_money_transfer(from_account, to_account, amount):
"""안전한 트랜잭션 관리가 있는 송금"""
conn = sqlite3.connect('bank.db')
try:
cursor = conn.cursor()
# 트랜잭션 시작 (기본적으로 자동 시작됨)
conn.execute("BEGIN")
# 잔액 확인
cursor.execute("SELECT balance FROM accounts WHERE id = ?", (from_account,))
balance = cursor.fetchone()
if not balance or balance[0] < amount:
raise ValueError("잔액이 부족합니다")
# 출금
cursor.execute("UPDATE accounts SET balance = balance - ? WHERE id = ?",
(amount, from_account))
# 입금
cursor.execute("UPDATE accounts SET balance = balance + ? WHERE id = ?",
(amount, to_account))
# 모든 작업이 성공하면 커밋
conn.commit()
print(f"송금 완료: {from_account} → {to_account}, {amount}원")
except Exception as e:
# 에러 발생 시 롤백
conn.rollback()
print(f"송금 실패: {e}")
raise
finally:
conn.close()
# 더 나은 방법: with 문과 함께 사용
def better_money_transfer(from_account, to_account, amount):
"""with 문을 사용한 트랜잭션 관리"""
with sqlite3.connect('bank.db') as conn:
cursor = conn.cursor()
# 잔액 확인
cursor.execute("SELECT balance FROM accounts WHERE id = ?", (from_account,))
balance = cursor.fetchone()
if not balance or balance[0] < amount:
raise ValueError("잔액이 부족합니다")
# 출금
cursor.execute("UPDATE accounts SET balance = balance - ? WHERE id = ?",
(amount, from_account))
# 입금
cursor.execute("UPDATE accounts SET balance = balance + ? WHERE id = ?",
(amount, to_account))
# with 블록을 성공적으로 빠져나가면 자동 커밋
# 예외 발생 시 자동 롤백
print(f"송금 완료: {from_account} → {to_account}, {amount}원")
4. N+1 쿼리 문제
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# ❌ N+1 쿼리 문제 (성능 저하)
import sqlite3
def bad_get_users_with_posts():
"""N+1 쿼리 문제가 있는 코드"""
conn = sqlite3.connect('blog.db')
cursor = conn.cursor()
# 1번의 쿼리로 모든 사용자 조회
cursor.execute("SELECT id, name, email FROM users")
users = cursor.fetchall()
result = []
for user in users: # N번 반복
user_data = {
'id': user[0],
'name': user[1],
'email': user[2]
}
# 각 사용자마다 별도 쿼리 실행 (N번의 추가 쿼리!)
cursor.execute("SELECT title, content FROM posts WHERE user_id = ?", (user[0],))
posts = cursor.fetchall()
user_data['posts'] = [{'title': p[0], 'content': p[1]} for p in posts]
result.append(user_data)
conn.close()
return result
# 총 1 + N번의 쿼리 실행 (사용자가 100명이면 101번 쿼리!)
# ✅ JOIN을 사용한 효율적인 쿼리
def good_get_users_with_posts():
"""JOIN을 사용한 효율적인 쿼리"""
conn = sqlite3.connect('blog.db')
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
# 한 번의 쿼리로 모든 데이터 조회
cursor.execute("""
SELECT u.id, u.name, u.email, p.title, p.content
FROM users u
LEFT JOIN posts p ON u.id = p.user_id
ORDER BY u.id, p.id
""")
rows = cursor.fetchall()
conn.close()
# Python에서 그룹화
users_dict = {}
for row in rows:
user_id = row['id']
if user_id not in users_dict:
users_dict[user_id] = {
'id': user_id,
'name': row['name'],
'email': row['email'],
'posts': []
}
# 포스트가 있는 경우에만 추가
if row['title']:
users_dict[user_id]['posts'].append({
'title': row['title'],
'content': row['content']
})
return list(users_dict.values())
# 단 1번의 쿼리로 모든 데이터 조회!
# 대안: 별도 쿼리로 IN 절 사용
def alternative_get_users_with_posts():
"""IN 절을 사용한 방법"""
conn = sqlite3.connect('blog.db')
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
# 사용자 조회
cursor.execute("SELECT id, name, email FROM users")
users = cursor.fetchall()
if not users:
return []
# 모든 사용자 ID 수집
user_ids = [user['id'] for user in users]
# 한 번의 쿼리로 모든 포스트 조회
placeholders = ','.join('?' * len(user_ids))
cursor.execute(f"""
SELECT user_id, title, content
FROM posts
WHERE user_id IN ({placeholders})
""", user_ids)
posts = cursor.fetchall()
conn.close()
# 포스트를 사용자별로 그룹화
posts_by_user = {}
for post in posts:
user_id = post['user_id']
if user_id not in posts_by_user:
posts_by_user[user_id] = []
posts_by_user[user_id].append({
'title': post['title'],
'content': post['content']
})
# 최종 결과 조합
result = []
for user in users:
user_data = dict(user)
user_data['posts'] = posts_by_user.get(user['id'], [])
result.append(user_data)
return result
# 단 2번의 쿼리로 모든 데이터 조회!
5. 데이터 타입 변환 실수
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# ❌ 데이터 타입을 제대로 처리하지 않는 실수
import sqlite3
from datetime import datetime, date
import json
def bad_data_handling():
"""데이터 타입 처리가 잘못된 예"""
conn = sqlite3.connect('test.db')
cursor = conn.cursor()
# 날짜를 문자열로 저장 (검색과 정렬이 어려움)
cursor.execute("""
INSERT INTO events (name, event_date, is_active, metadata)
VALUES (?, ?, ?, ?)
""", (
"회의",
"2024-12-25", # 문자열로 저장
"true", # 불린을 문자열로 저장
"{'type': 'meeting', 'room': 'A101'}" # JSON을 문자열로 저장
))
# 잘못된 날짜 비교 (문자열 비교가 됨)
cursor.execute("SELECT * FROM events WHERE event_date > '2024-12-01'")
events = cursor.fetchall()
conn.commit()
conn.close()
return events
# ✅ 올바른 데이터 타입 처리
def good_data_handling():
"""데이터 타입을 올바르게 처리하는 예"""
conn = sqlite3.connect('test.db')
cursor = conn.cursor()
# 테이블 생성 시 적절한 타입 지정
cursor.execute("""
CREATE TABLE IF NOT EXISTS events (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
event_date DATE NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
is_active BOOLEAN NOT NULL DEFAULT 1,
metadata TEXT
)
""")
# 올바른 데이터 타입으로 저장
event_date = date(2024, 12, 25)
is_active = True
metadata = {'type': 'meeting', 'room': 'A101'}
cursor.execute("""
INSERT INTO events (name, event_date, is_active, metadata)
VALUES (?, ?, ?, ?)
""", (
"회의",
event_date.isoformat(), # 날짜를 ISO 형식으로
is_active, # 불린 값 그대로
json.dumps(metadata) # JSON으로 직렬화
))
# 날짜 범위 검색 (제대로 작동)
start_date = date(2024, 12, 1)
cursor.execute("SELECT * FROM events WHERE event_date > ?", (start_date.isoformat(),))
events = cursor.fetchall()
conn.commit()
conn.close()
return events
# SQLAlchemy를 사용한 더 나은 방법
from sqlalchemy import create_engine, Column, Integer, String, Date, Boolean, DateTime, Text
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy.sql import func
Base = declarative_base()
class Event(Base):
"""이벤트 모델 (적절한 타입 정의)"""
__tablename__ = 'events'
id = Column(Integer, primary_key=True)
name = Column(String(100), nullable=False)
event_date = Column(Date, nullable=False)
created_at = Column(DateTime, default=func.now())
is_active = Column(Boolean, nullable=False, default=True)
metadata = Column(Text) # JSON 문자열 저장
def sqlalchemy_data_handling():
"""SQLAlchemy를 사용한 타입 안전한 방법"""
engine = create_engine('sqlite:///test.db')
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
session = Session()
# 타입 안전한 데이터 삽입
event = Event(
name="회의",
event_date=date(2024, 12, 25),
is_active=True,
metadata=json.dumps({'type': 'meeting', 'room': 'A101'})
)
session.add(event)
session.commit()
# 타입 안전한 쿼리
start_date = date(2024, 12, 1)
events = session.query(Event).filter(Event.event_date > start_date).all()
session.close()
return events
6. 대용량 데이터 처리 실수
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# ❌ 메모리 부족을 일으키는 대용량 데이터 처리
import sqlite3
def bad_large_data_processing():
"""메모리 부족을 일으키는 잘못된 방법"""
conn = sqlite3.connect('large_data.db')
cursor = conn.cursor()
# 모든 데이터를 한 번에 메모리로 로드 (위험!)
cursor.execute("SELECT * FROM large_table") # 100만 행이라면?
all_rows = cursor.fetchall() # 메모리 부족 발생 가능!
# 모든 데이터를 처리
processed_data = []
for row in all_rows:
# 복잡한 처리...
processed = process_row(row)
processed_data.append(processed)
conn.close()
return processed_data
def process_row(row):
"""행 처리 함수"""
# 복잡한 처리 시뮬레이션
return {'processed': row[0], 'length': len(str(row))}
# ✅ 메모리 효율적인 대용량 데이터 처리
def good_large_data_processing():
"""메모리 효율적인 배치 처리"""
conn = sqlite3.connect('large_data.db')
cursor = conn.cursor()
batch_size = 1000 # 배치 크기
offset = 0
processed_count = 0
while True:
# 배치 단위로 데이터 조회
cursor.execute("""
SELECT * FROM large_table
LIMIT ? OFFSET ?
""", (batch_size, offset))
batch = cursor.fetchall()
if not batch: # 더 이상 데이터가 없으면 종료
break
# 배치 처리
for row in batch:
processed = process_row(row)
# 처리된 데이터를 즉시 저장하거나 출력
save_processed_data(processed)
processed_count += 1
print(f"처리 완료: {processed_count}건")
offset += batch_size
conn.close()
return processed_count
def save_processed_data(data):
"""처리된 데이터 저장"""
# 파일에 저장하거나 다른 DB에 저장
pass
# 제너레이터를 사용한 메모리 효율적인 방법
def generator_large_data_processing():
"""제너레이터를 사용한 스트리밍 처리"""
def data_generator():
"""데이터를 스트리밍으로 생성"""
conn = sqlite3.connect('large_data.db')
cursor = conn.cursor()
cursor.execute("SELECT * FROM large_table")
while True:
row = cursor.fetchone()
if row is None:
break
yield row
conn.close()
# 스트리밍 처리
processed_count = 0
for row in data_generator():
processed = process_row(row)
save_processed_data(processed)
processed_count += 1
if processed_count % 1000 == 0:
print(f"처리 중: {processed_count}건")
return processed_count
💡 실수 방지 체크리스트
✅ 보안 관련:
- SQL 쿼리에 사용자 입력을 직접 문자열 포매팅으로 넣지 않았는가?
- 항상 Prepared Statement나 ORM을 사용하는가?
- 데이터베이스 연결 정보를 코드에 하드코딩하지 않았는가?
✅ 연결 관리:
- 데이터베이스 연결을 사용 후 반드시 닫는가?
with문이나try-finally를 사용해서 예외 시에도 연결이 닫히도록 했는가?- 커넥션 풀을 적절히 설정했는가?
✅ 트랜잭션 관리:
- 여러 테이블을 수정하는 작업에서 트랜잭션을 사용하는가?
- 예외 발생 시 롤백이 제대로 되는가?
- 트랜잭션의 범위를 적절히 설정했는가?
✅ 성능 최적화:
- N+1 쿼리 문제를 피하기 위해 JOIN이나 IN 절을 사용하는가?
- 인덱스를 적절히 설정했는가?
- 대용량 데이터 처리 시 배치나 스트리밍을 사용하는가?
✅ 데이터 타입:
- 날짜, 불린, JSON 등의 데이터를 적절한 타입으로 저장하는가?
- 데이터 검증과 변환을 적절히 수행하는가?
- NULL 값 처리를 고려했는가?
🎯 핵심 정리
데이터베이스 선택 가이드
graph TD
A[프로젝트 요구사항] --> B{데이터 구조}
B -->|정형화된 데이터| C[관계형 DB]
B -->|유연한 구조| D[NoSQL]
C --> E{규모}
E -->|소규모/프로토타입| F[SQLite]
E -->|중규모| G[MySQL]
E -->|대규모/복잡한 쿼리| H[PostgreSQL]
D --> I{용도}
I -->|문서 저장| J[MongoDB]
I -->|캐싱| K[Redis]
I -->|시계열 데이터| L[InfluxDB]
Best Practices
- 연결 관리: 커넥션 풀 사용
- 보안: Prepared Statement 사용
- 트랜잭션: 적절한 격리 수준 설정
- 인덱스: 쿼리 패턴에 맞는 인덱스 생성
- 백업: 정기적인 백업 및 복구 테스트
🎓 파이썬 마스터하기 시리즈
📚 기초편 (1-7)
- Python 소개와 개발 환경 설정
- 변수, 자료형, 연산자 완벽 정리
- 조건문과 반복문 마스터하기
- 함수와 람다 완벽 가이드
- 리스트, 튜플, 딕셔너리 정복하기
- 문자열 처리와 정규표현식
- 파일 입출력과 예외 처리
🚀 중급편 (8-12)
- 클래스와 객체지향 프로그래밍
- 모듈과 패키지 관리
- 데코레이터와 제너레이터
- 비동기 프로그래밍 (async/await)
- 데이터베이스 연동하기 (현재 글)
💼 고급편 (13-16)
이전글: 비동기 프로그래밍 (async/await) ⬅️ 현재글: 데이터베이스 연동하기 다음글: 웹 스크래핑과 API 개발 ➡️
이번 포스트에서는 Python에서 다양한 데이터베이스를 연동하는 방법을 완벽히 마스터했습니다. 다음 포스트에서는 웹 스크래핑과 API 활용에 대해 자세히 알아보겠습니다. Happy Coding! 🐍✨