[이제와서 시작하는 Python 마스터하기 #15] 성능 최적화 기법
[이제와서 시작하는 Python 마스터하기 #15] 성능 최적화 기법
🏢 실전 한국 비즈니스 예제: 쿠팡 물류센터 재고 관리 시스템
쿠팡의 물류센터에서 실시간으로 수십만 개의 상품 재고를 관리하는 시스템을 개발한다고 가정해봅시다. 이 시스템은 다음과 같은 요구사항이 있습니다:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class InventoryManager:
"""재고 관리 시스템"""
def __init__(self):
self.products = {} # 상품ID: 재고정보
self.warehouse_locations = {} # 창고 위치 정보
self.pending_orders = [] # 대기 중인 주문
def check_stock(self, product_id: str, quantity: int) -> bool:
"""재고 확인 (1초 내 응답 필요)"""
return self.products.get(product_id, 0) >= quantity
def process_order_batch(self, orders: list) -> list:
"""주문 일괄 처리 (10만 건/분 처리 목표)"""
results = []
for order in orders:
# 재고 확인, 할당, 예약 처리
result = self.allocate_inventory(order)
results.append(result)
return results
def calculate_restock_needs(self) -> dict:
"""재입고 필요량 계산 (실시간 분석)"""
# 판매 패턴 분석, 계절성 고려, 예측 알고리즘
# 대용량 데이터 처리 최적화 필요
pass
이런 시스템에서는:
- 응답 시간: 재고 조회 1초 이내
- 처리량: 분당 10만 건 주문 처리
- 메모리 효율성: 수백만 개 상품 정보 관리
- 동시성: 수천 명의 동시 접속 처리
성능 최적화가 비즈니스 성공을 직접적으로 좌우하는 상황입니다.
⚡ 성능 최적화의 중요성
성능 최적화는 프로그램의 실행 속도를 향상시키고 자원 사용을 효율화하는 과정입니다. 하지만 섣부른 최적화는 피해야 합니다.
[!WARNING] 섣부른 최적화는 만악의 근원!
도널드 커누스(Donald Knuth)가 한 유명한 말입니다. 코드를 짜자마자 “이거 느리면 어떡하지?” 하고 최적화부터 하려고 하지 마세요.
- 일단 가독성 좋고 정확하게 작동하는 코드를 만듭니다.
- 실제로 느리다면, 프로파일링을 통해 어디가 느린지 찾습니다.
- 그 병목 지점만 최적화합니다. 대부분의 경우, 우리가 걱정하는 부분은 전체 실행 시간의 1%도 차지하지 않습니다!
graph TB
subgraph "최적화 프로세스"
A[성능 측정] --> B[병목 지점 파악]
B --> C[최적화 적용]
C --> D[결과 검증]
D --> E{목표 달성?}
E -->|아니오| B
E -->|예| F[완료]
end
subgraph "최적화 기법"
G[알고리즘 개선] --> H[O(n²) → O(n log n)]
I[자료구조 최적화] --> J[list → set/dict]
K[병렬 처리] --> L[multiprocessing]
M[컴파일 최적화] --> N[Cython/Numba]
end
🔍 성능 프로파일링
최적화의 첫 단계는 성능을 측정하고 병목 지점을 찾는 것입니다.
시간 측정 기법
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
import time
import timeit
from contextlib import contextmanager
from functools import wraps
import cProfile
import pstats
from memory_profiler import profile
import tracemalloc
# 간단한 시간 측정
def simple_timer():
"""기본 시간 측정"""
start = time.time()
# 측정할 코드
result = sum(i ** 2 for i in range(1000000))
end = time.time()
print(f"실행 시간: {end - start:.4f}초")
return result
# 컨텍스트 관리자를 사용한 시간 측정
@contextmanager
def timer(name="작업"):
"""시간 측정 컨텍스트 관리자"""
print(f"{name} 시작...")
start = time.perf_counter()
try:
yield
finally:
end = time.perf_counter()
print(f"{name} 완료: {end - start:.4f}초")
# 사용 예제
with timer("데이터 처리"):
data = [i ** 2 for i in range(1000000)]
# 데코레이터를 사용한 시간 측정
def measure_time(func):
"""실행 시간 측정 데코레이터"""
@wraps(func)
def wrapper(*args, **kwargs):
start = time.perf_counter()
result = func(*args, **kwargs)
end = time.perf_counter()
print(f"{func.__name__} 실행 시간: {end - start:.4f}초")
return result
return wrapper
@measure_time
def slow_function(n):
"""느린 함수 예제"""
return sum(i ** 2 for i in range(n))
# timeit을 사용한 정확한 측정
def compare_methods():
"""여러 방법의 성능 비교"""
# 리스트 컴프리헨션
list_comp_time = timeit.timeit(
'[i ** 2 for i in range(1000)]',
number=10000
)
# map 함수
map_time = timeit.timeit(
'list(map(lambda x: x ** 2, range(1000)))',
number=10000
)
# 제너레이터 표현식
gen_exp_time = timeit.timeit(
'list(i ** 2 for i in range(1000))',
number=10000
)
print(f"리스트 컴프리헨션: {list_comp_time:.4f}초")
print(f"map 함수: {map_time:.4f}초")
print(f"제너레이터 표현식: {gen_exp_time:.4f}초")
[!TIP] time.time() vs timeit
time.time()은 간단하지만, 운영체제의 다른 작업에 영향을 받아 부정확할 수 있습니다. 아주 짧은 코드를 측정할 때는timeit모듈을 사용하세요. 수천 번 반복 실행해서 평균을 내주기 때문에 훨씬 정확합니다!
cProfile을 사용한 상세 프로파일링
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
import cProfile
import pstats
import io
def profile_code():
"""프로파일링할 코드"""
def factorial(n):
if n <= 1:
return 1
return n * factorial(n - 1)
def fibonacci(n):
if n <= 1:
return n
return fibonacci(n - 1) + fibonacci(n - 2)
# 여러 함수 호출
results = []
for i in range(20):
results.append(factorial(i))
for i in range(30):
results.append(fibonacci(i))
return results
# 프로파일링 실행
def run_profiling():
"""프로파일링 실행 및 결과 분석"""
# 프로파일러 생성
profiler = cProfile.Profile()
# 프로파일링 시작
profiler.enable()
# 코드 실행
profile_code()
# 프로파일링 종료
profiler.disable()
# 결과 분석
s = io.StringIO()
ps = pstats.Stats(profiler, stream=s).sort_stats('cumulative')
ps.print_stats(10) # 상위 10개 함수 출력
print(s.getvalue())
# 데코레이터를 사용한 프로파일링
def profile_function(func):
"""함수 프로파일링 데코레이터"""
@wraps(func)
def wrapper(*args, **kwargs):
profiler = cProfile.Profile()
profiler.enable()
result = func(*args, **kwargs)
profiler.disable()
# 결과 출력
s = io.StringIO()
ps = pstats.Stats(profiler, stream=s).sort_stats('cumulative')
ps.print_stats()
print(f"\n{func.__name__} 프로파일링 결과:")
print(s.getvalue())
return result
return wrapper
# line_profiler를 사용한 라인별 프로파일링
# @profile 데코레이터 사용 (line_profiler 설치 필요)
def line_by_line_example():
"""라인별 프로파일링 예제"""
total = 0
# 느린 부분
for i in range(1000000):
total += i ** 2
# 빠른 부분
result = sum(range(1000))
return total + result
# 메모리 프로파일링
@profile
def memory_intensive_function():
"""메모리 집약적 함수"""
# 큰 리스트 생성
big_list = [i for i in range(1000000)]
# 딕셔너리 생성
big_dict = {i: i ** 2 for i in range(100000)}
# 문자열 연결
result = ""
for i in range(10000):
result += str(i)
return len(big_list) + len(big_dict)
메모리 프로파일링
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
import tracemalloc
import gc
from memory_profiler import profile
import sys
def memory_usage_tracking():
"""메모리 사용량 추적"""
# tracemalloc 시작
tracemalloc.start()
# 메모리를 사용하는 코드
data = []
for i in range(1000000):
data.append(i ** 2)
# 현재 메모리 사용량
current, peak = tracemalloc.get_traced_memory()
print(f"현재 메모리: {current / 10**6:.1f} MB")
print(f"최대 메모리: {peak / 10**6:.1f} MB")
# 메모리 사용량 상위 10개
snapshot = tracemalloc.take_snapshot()
top_stats = snapshot.statistics('lineno')
print("\n메모리 사용량 상위 10개:")
for stat in top_stats[:10]:
print(stat)
tracemalloc.stop()
return data
# 객체 크기 측정
def measure_object_size():
"""객체 크기 측정"""
# 다양한 객체의 크기
objects = {
'int': 42,
'float': 3.14,
'str': "Hello, World!",
'list': list(range(1000)),
'dict': {i: i**2 for i in range(100)},
'set': set(range(1000))
}
for name, obj in objects.items():
size = sys.getsizeof(obj)
print(f"{name}: {size} bytes")
# 재귀적 크기 계산
def get_deep_size(obj, seen=None):
"""객체의 재귀적 크기 계산"""
size = sys.getsizeof(obj)
if seen is None:
seen = set()
obj_id = id(obj)
if obj_id in seen:
return 0
seen.add(obj_id)
if isinstance(obj, dict):
size += sum(get_deep_size(v, seen) for v in obj.values())
size += sum(get_deep_size(k, seen) for k in obj.keys())
elif hasattr(obj, '__dict__'):
size += get_deep_size(obj.__dict__, seen)
elif hasattr(obj, '__iter__') and not isinstance(obj, (str, bytes, bytearray)):
size += sum(get_deep_size(i, seen) for i in obj)
return size
# 중첩된 구조의 크기
nested_data = {
'list': [[i] * 10 for i in range(100)],
'dict': {i: {j: j**2 for j in range(10)} for i in range(10)}
}
for name, obj in nested_data.items():
shallow_size = sys.getsizeof(obj)
deep_size = get_deep_size(obj)
print(f"\n{name}:")
print(f" 얕은 크기: {shallow_size} bytes")
print(f" 깊은 크기: {deep_size} bytes")
# 메모리 누수 감지
class MemoryLeakDetector:
"""메모리 누수 감지기"""
def __init__(self):
self.snapshots = []
def take_snapshot(self, label=""):
"""메모리 스냅샷 촬영"""
gc.collect()
snapshot = {
'label': label,
'objects': len(gc.get_objects()),
'memory': tracemalloc.get_traced_memory()[0] if tracemalloc.is_tracing() else 0
}
self.snapshots.append(snapshot)
return snapshot
def compare_snapshots(self, idx1=-2, idx2=-1):
"""스냅샷 비교"""
if len(self.snapshots) < 2:
print("비교할 스냅샷이 부족합니다")
return
snap1 = self.snapshots[idx1]
snap2 = self.snapshots[idx2]
print(f"\n스냅샷 비교: '{snap1['label']}' → '{snap2['label']}'")
print(f"객체 수 변화: {snap2['objects'] - snap1['objects']:+d}")
if snap1['memory'] and snap2['memory']:
memory_diff = (snap2['memory'] - snap1['memory']) / 10**6
print(f"메모리 변화: {memory_diff:+.2f} MB")
🚀 알고리즘 최적화
알고리즘 개선은 가장 효과적인 최적화 방법입니다.
시간 복잡도 개선
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
from typing import List, Dict, Set, Tuple
import bisect
from collections import defaultdict, Counter, deque
import heapq
# O(n²) → O(n) 최적화 예제
def find_pair_with_sum_naive(arr: List[int], target: int) -> Tuple[int, int]:
"""두 수의 합이 target인 쌍 찾기 (나이브)"""
n = len(arr)
for i in range(n):
for j in range(i + 1, n):
if arr[i] + arr[j] == target:
return (arr[i], arr[j])
return None
def find_pair_with_sum_optimized(arr: List[int], target: int) -> Tuple[int, int]:
"""두 수의 합이 target인 쌍 찾기 (최적화)"""
seen = set()
for num in arr:
complement = target - num
if complement in seen:
return (complement, num)
seen.add(num)
return None
# 성능 비교
def compare_pair_finding():
"""성능 비교"""
import random
import time
arr = [random.randint(1, 1000) for _ in range(10000)]
target = 1500
# 나이브 방법
start = time.time()
result1 = find_pair_with_sum_naive(arr, target)
time1 = time.time() - start
# 최적화 방법
start = time.time()
result2 = find_pair_with_sum_optimized(arr, target)
time2 = time.time() - start
print(f"나이브 방법: {time1:.4f}초")
print(f"최적화 방법: {time2:.4f}초")
print(f"성능 향상: {time1/time2:.1f}배")
# 정렬 알고리즘 최적화
def optimized_sorting_examples():
"""정렬 최적화 예제"""
# 1. 부분 정렬이 필요한 경우
def get_top_k_elements(arr: List[int], k: int) -> List[int]:
"""상위 k개 요소 추출"""
# 전체 정렬: O(n log n)
# return sorted(arr, reverse=True)[:k]
# 힙 사용: O(n log k)
return heapq.nlargest(k, arr)
# 2. 특정 범위의 정수 정렬
def counting_sort(arr: List[int], max_val: int) -> List[int]:
"""계수 정렬 (O(n + k))"""
count = [0] * (max_val + 1)
for num in arr:
count[num] += 1
result = []
for i, c in enumerate(count):
result.extend([i] * c)
return result
# 3. 거의 정렬된 배열
def insertion_sort_optimized(arr: List[int]) -> List[int]:
"""삽입 정렬 (거의 정렬된 경우 효율적)"""
for i in range(1, len(arr)):
key = arr[i]
j = i - 1
while j >= 0 and arr[j] > key:
arr[j + 1] = arr[j]
j -= 1
arr[j + 1] = key
return arr
# 검색 알고리즘 최적화
class OptimizedSearch:
"""최적화된 검색 알고리즘"""
@staticmethod
def binary_search_variants():
"""이진 검색 변형"""
# 1. 첫 번째 발생 위치 찾기
def find_first_occurrence(arr: List[int], target: int) -> int:
left, right = 0, len(arr) - 1
result = -1
while left <= right:
mid = (left + right) // 2
if arr[mid] == target:
result = mid
right = mid - 1 # 왼쪽 계속 검색
elif arr[mid] < target:
left = mid + 1
else:
right = mid - 1
return result
# 2. 범위 검색
def find_range(arr: List[int], low: int, high: int) -> List[int]:
"""범위 내의 모든 요소 찾기"""
left = bisect.bisect_left(arr, low)
right = bisect.bisect_right(arr, high)
return arr[left:right]
# 3. 회전된 정렬 배열에서 검색
def search_rotated_array(arr: List[int], target: int) -> int:
left, right = 0, len(arr) - 1
while left <= right:
mid = (left + right) // 2
if arr[mid] == target:
return mid
# 왼쪽 부분이 정렬됨
if arr[left] <= arr[mid]:
if arr[left] <= target < arr[mid]:
right = mid - 1
else:
left = mid + 1
# 오른쪽 부분이 정렬됨
else:
if arr[mid] < target <= arr[right]:
left = mid + 1
else:
right = mid - 1
return -1
return {
'first_occurrence': find_first_occurrence,
'range_search': find_range,
'rotated_search': search_rotated_array
}
# 동적 계획법 최적화
class DynamicProgrammingOptimization:
"""동적 계획법 최적화"""
@staticmethod
def fibonacci_optimizations():
"""피보나치 수열 최적화"""
# 1. 재귀 (매우 느림)
def fib_recursive(n: int) -> int:
if n <= 1:
return n
return fib_recursive(n-1) + fib_recursive(n-2)
# 2. 메모이제이션
def fib_memoized(n: int, memo: Dict[int, int] = None) -> int:
if memo is None:
memo = {}
if n in memo:
return memo[n]
if n <= 1:
return n
memo[n] = fib_memoized(n-1, memo) + fib_memoized(n-2, memo)
return memo[n]
# 3. 동적 계획법 (상향식)
def fib_dp(n: int) -> int:
if n <= 1:
return n
dp = [0] * (n + 1)
dp[1] = 1
for i in range(2, n + 1):
dp[i] = dp[i-1] + dp[i-2]
return dp[n]
# 4. 공간 최적화
def fib_optimized(n: int) -> int:
if n <= 1:
return n
a, b = 0, 1
for _ in range(2, n + 1):
a, b = b, a + b
return b
# 5. 행렬 거듭제곱 (O(log n))
def fib_matrix(n: int) -> int:
def matrix_mult(A, B):
return [
[A[0][0]*B[0][0] + A[0][1]*B[1][0], A[0][0]*B[0][1] + A[0][1]*B[1][1]],
[A[1][0]*B[0][0] + A[1][1]*B[1][0], A[1][0]*B[0][1] + A[1][1]*B[1][1]]
]
def matrix_power(M, n):
if n == 1:
return M
if n % 2 == 0:
half = matrix_power(M, n // 2)
return matrix_mult(half, half)
else:
return matrix_mult(M, matrix_power(M, n - 1))
if n <= 1:
return n
base = [[1, 1], [1, 0]]
result = matrix_power(base, n)
return result[0][1]
return {
'recursive': fib_recursive,
'memoized': fib_memoized,
'dp': fib_dp,
'optimized': fib_optimized,
'matrix': fib_matrix
}
💾 자료구조 최적화
적절한 자료구조 선택은 성능에 큰 영향을 미칩니다.
자료구조 선택 가이드
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
from collections import defaultdict, Counter, deque, OrderedDict
from typing import List, Dict, Set
import array
import bisect
import heapq
class DataStructureOptimization:
"""자료구조 최적화"""
@staticmethod
def list_vs_deque():
"""리스트 vs 덱 성능 비교"""
import time
n = 100000
# 리스트 - 앞에 삽입 (O(n))
start = time.time()
lst = []
for i in range(n):
lst.insert(0, i)
list_time = time.time() - start
# 덱 - 앞에 삽입 (O(1))
start = time.time()
dq = deque()
for i in range(n):
dq.appendleft(i)
deque_time = time.time() - start
print(f"리스트 (앞 삽입): {list_time:.4f}초")
print(f"덱 (앞 삽입): {deque_time:.4f}초")
print(f"성능 향상: {list_time/deque_time:.1f}배")
@staticmethod
def set_operations():
"""집합 연산 최적화"""
# 중복 제거
def remove_duplicates_list(lst: List[int]) -> List[int]:
"""리스트로 중복 제거 (O(n²))"""
result = []
for item in lst:
if item not in result:
result.append(item)
return result
def remove_duplicates_set(lst: List[int]) -> List[int]:
"""집합으로 중복 제거 (O(n))"""
return list(set(lst))
# 교집합 찾기
def intersection_list(lst1: List[int], lst2: List[int]) -> List[int]:
"""리스트로 교집합 (O(n*m))"""
return [x for x in lst1 if x in lst2]
def intersection_set(lst1: List[int], lst2: List[int]) -> List[int]:
"""집합으로 교집합 (O(n+m))"""
return list(set(lst1) & set(lst2))
return {
'remove_duplicates_list': remove_duplicates_list,
'remove_duplicates_set': remove_duplicates_set,
'intersection_list': intersection_list,
'intersection_set': intersection_set
}
@staticmethod
def dictionary_optimizations():
"""딕셔너리 최적화"""
# 1. defaultdict 사용
def count_words_naive(words: List[str]) -> Dict[str, int]:
"""일반 딕셔너리로 단어 카운트"""
counts = {}
for word in words:
if word in counts:
counts[word] += 1
else:
counts[word] = 1
return counts
def count_words_defaultdict(words: List[str]) -> Dict[str, int]:
"""defaultdict로 단어 카운트"""
counts = defaultdict(int)
for word in words:
counts[word] += 1
return dict(counts)
def count_words_counter(words: List[str]) -> Dict[str, int]:
"""Counter로 단어 카운트"""
return dict(Counter(words))
# 2. 딕셔너리 컴프리헨션
def create_index_mapping(items: List[str]) -> Dict[str, int]:
"""인덱스 매핑 생성"""
# 느린 방법
# mapping = {}
# for i, item in enumerate(items):
# mapping[item] = i
# 빠른 방법
return {item: i for i, item in enumerate(items)}
# 3. setdefault 사용
def group_by_key(items: List[Tuple[str, int]]) -> Dict[str, List[int]]:
"""키로 그룹화"""
groups = {}
for key, value in items:
groups.setdefault(key, []).append(value)
return groups
return {
'count_naive': count_words_naive,
'count_defaultdict': count_words_defaultdict,
'count_counter': count_words_counter,
'index_mapping': create_index_mapping,
'group_by_key': group_by_key
}
# 메모리 효율적인 자료구조
class MemoryEfficientStructures:
"""메모리 효율적인 자료구조"""
@staticmethod
def array_vs_list():
"""array vs list 메모리 비교"""
import sys
n = 1000000
# 리스트
lst = list(range(n))
list_size = sys.getsizeof(lst)
# array
arr = array.array('i', range(n))
array_size = sys.getsizeof(arr)
print(f"리스트 크기: {list_size / 10**6:.2f} MB")
print(f"array 크기: {array_size / 10**6:.2f} MB")
print(f"메모리 절약: {(1 - array_size/list_size) * 100:.1f}%")
@staticmethod
def slots_optimization():
"""__slots__를 사용한 메모리 최적화"""
> [!TIP]
> **__slots__가 뭔가요?**
>
> 파이썬 객체는 기본적으로 `__dict__`라는 딕셔너리에 속성을 저장합니다. 이게 메모리를 꽤 많이 먹습니다.
> 클래스에 `__slots__ = ['name', 'age']` 처럼 속성을 미리 지정해주면, 딕셔너리 대신 고정된 공간만 사용해서 **메모리를 40~50%나 절약**할 수 있습니다!
> 수백만 개의 객체를 생성해야 할 때 아주 유용합니다.
class RegularClass:
def __init__(self, x, y):
self.x = x
self.y = y
class SlottedClass:
__slots__ = ['x', 'y']
def __init__(self, x, y):
self.x = x
self.y = y
import sys
# 인스턴스 생성
regular = RegularClass(10, 20)
slotted = SlottedClass(10, 20)
# 크기 비교
regular_size = sys.getsizeof(regular) + sys.getsizeof(regular.__dict__)
slotted_size = sys.getsizeof(slotted)
print(f"일반 클래스: {regular_size} bytes")
print(f"__slots__ 클래스: {slotted_size} bytes")
print(f"메모리 절약: {(1 - slotted_size/regular_size) * 100:.1f}%")
@staticmethod
def generator_vs_list():
"""제너레이터 vs 리스트 메모리 비교"""
import sys
# 리스트 (모든 값을 메모리에 저장)
def create_list(n):
return [i ** 2 for i in range(n)]
# 제너레이터 (필요할 때 생성)
def create_generator(n):
return (i ** 2 for i in range(n))
n = 1000000
lst = create_list(n)
gen = create_generator(n)
print(f"리스트 크기: {sys.getsizeof(lst) / 10**6:.2f} MB")
print(f"제너레이터 크기: {sys.getsizeof(gen)} bytes")
# 캐싱 전략
class CachingStrategies:
"""캐싱 전략"""
@staticmethod
def lru_cache_example():
"""LRU 캐시 예제"""
from functools import lru_cache
# 캐시 없는 버전
def fibonacci_no_cache(n):
if n < 2:
return n
return fibonacci_no_cache(n-1) + fibonacci_no_cache(n-2)
# LRU 캐시 버전
@lru_cache(maxsize=128)
def fibonacci_cached(n):
if n < 2:
return n
return fibonacci_cached(n-1) + fibonacci_cached(n-2)
import time
# 성능 비교
n = 35
start = time.time()
result1 = fibonacci_no_cache(n)
time1 = time.time() - start
start = time.time()
result2 = fibonacci_cached(n)
time2 = time.time() - start
print(f"캐시 없음: {time1:.4f}초")
print(f"LRU 캐시: {time2:.4f}초")
print(f"성능 향상: {time1/time2:.1f}배")
# 캐시 정보
print(f"\n캐시 정보: {fibonacci_cached.cache_info()}")
@staticmethod
def custom_cache():
"""커스텀 캐시 구현"""
class LRUCache:
def __init__(self, capacity: int):
self.capacity = capacity
self.cache = OrderedDict()
def get(self, key: int) -> int:
if key not in self.cache:
return -1
# 최근 사용으로 이동
self.cache.move_to_end(key)
return self.cache[key]
def put(self, key: int, value: int) -> None:
if key in self.cache:
self.cache.move_to_end(key)
self.cache[key] = value
if len(self.cache) > self.capacity:
# 가장 오래된 항목 제거
self.cache.popitem(last=False)
# 사용 예제
cache = LRUCache(3)
cache.put(1, 1)
cache.put(2, 2)
cache.put(3, 3)
print(cache.get(1)) # 1
cache.put(4, 4) # 2 제거
print(cache.get(2)) # -1 (제거됨)
🔧 NumPy와 Cython 최적화
NumPy와 Cython을 사용하면 Python 코드의 성능을 크게 향상시킬 수 있습니다.
NumPy 최적화
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
import numpy as np
import time
class NumpyOptimization:
"""NumPy 최적화"""
@staticmethod
def vectorization_example():
"""벡터화 예제"""
n = 10000000
# 순수 Python
def pure_python_sum_squares(n):
result = 0
for i in range(n):
result += i ** 2
return result
# NumPy 벡터화
def numpy_sum_squares(n):
arr = np.arange(n)
return np.sum(arr ** 2)
# 성능 비교
start = time.time()
result1 = pure_python_sum_squares(n)
time1 = time.time() - start
start = time.time()
result2 = numpy_sum_squares(n)
time2 = time.time() - start
print(f"순수 Python: {time1:.4f}초")
print(f"NumPy: {time2:.4f}초")
print(f"성능 향상: {time1/time2:.1f}배")
@staticmethod
def broadcasting_example():
"""브로드캐스팅 예제"""
# 행렬 연산
def matrix_operations():
# 1000x1000 행렬
A = np.random.rand(1000, 1000)
B = np.random.rand(1000, 1000)
# 행렬 곱셈
start = time.time()
C = np.dot(A, B)
dot_time = time.time() - start
# 요소별 연산
start = time.time()
D = A * B + 2 * A - B / 2
element_time = time.time() - start
print(f"행렬 곱셈: {dot_time:.4f}초")
print(f"요소별 연산: {element_time:.4f}초")
# 브로드캐스팅
def broadcasting_operations():
# 다른 차원의 배열
A = np.random.rand(1000, 1000)
b = np.random.rand(1000)
c = np.random.rand(1, 1000)
# 브로드캐스팅 연산
result1 = A + b # b가 각 행에 더해짐
result2 = A * c # c가 각 행에 곱해짐
return result1, result2
matrix_operations()
@staticmethod
def memory_layout_optimization():
"""메모리 레이아웃 최적화"""
n = 5000
# C-order (행 우선)
A = np.ones((n, n), order='C')
# F-order (열 우선)
B = np.ones((n, n), order='F')
# 행 우선 접근
start = time.time()
for i in range(n):
A[i, :] = i
c_row_time = time.time() - start
start = time.time()
for i in range(n):
B[i, :] = i
f_row_time = time.time() - start
# 열 우선 접근
start = time.time()
for i in range(n):
A[:, i] = i
c_col_time = time.time() - start
start = time.time()
for i in range(n):
B[:, i] = i
f_col_time = time.time() - start
print("메모리 레이아웃 성능:")
print(f"C-order 행 접근: {c_row_time:.4f}초")
print(f"F-order 행 접근: {f_row_time:.4f}초")
print(f"C-order 열 접근: {c_col_time:.4f}초")
print(f"F-order 열 접근: {f_col_time:.4f}초")
# Numba를 사용한 JIT 컴파일
try:
from numba import jit, njit, prange
class NumbaOptimization:
"""Numba 최적화"""
@staticmethod
def jit_compilation_example():
"""JIT 컴파일 예제"""
# 일반 Python 함수
def monte_carlo_pi_python(n):
count = 0
for i in range(n):
x = np.random.random()
y = np.random.random()
if x*x + y*y <= 1:
count += 1
return 4 * count / n
# Numba JIT 컴파일
@jit(nopython=True)
def monte_carlo_pi_numba(n):
count = 0
for i in range(n):
x = np.random.random()
y = np.random.random()
if x*x + y*y <= 1:
count += 1
return 4 * count / n
# 성능 비교
n = 10000000
start = time.time()
pi1 = monte_carlo_pi_python(n)
time1 = time.time() - start
# 첫 실행 (컴파일 포함)
monte_carlo_pi_numba(100)
start = time.time()
pi2 = monte_carlo_pi_numba(n)
time2 = time.time() - start
print(f"Python: {time1:.4f}초 (π ≈ {pi1:.6f})")
print(f"Numba: {time2:.4f}초 (π ≈ {pi2:.6f})")
print(f"성능 향상: {time1/time2:.1f}배")
@staticmethod
def parallel_computation():
"""병렬 계산 예제"""
@njit(parallel=True)
def parallel_sum(A):
total = 0.0
for i in prange(len(A)):
total += A[i]
return total
# 큰 배열 생성
arr = np.random.rand(100000000)
# 성능 측정
start = time.time()
result = parallel_sum(arr)
end = time.time()
print(f"병렬 합계: {end - start:.4f}초")
except ImportError:
print("Numba가 설치되지 않았습니다")
💡 실전 예제
1. 웹 애플리케이션 성능 최적화
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
from functools import lru_cache
import time
import hashlib
from typing import List, Dict, Any
import json
class WebAppOptimization:
"""웹 애플리케이션 성능 최적화"""
def __init__(self):
self.cache = {}
self.db_cache = {}
# 데이터베이스 쿼리 캐싱
@lru_cache(maxsize=1000)
def get_user_by_id(self, user_id: int) -> Dict[str, Any]:
"""사용자 정보 조회 (캐싱)"""
# 실제로는 데이터베이스 쿼리
print(f"데이터베이스에서 사용자 {user_id} 조회")
# 시뮬레이션
time.sleep(0.1) # DB 지연
return {
'id': user_id,
'name': f'User {user_id}',
'email': f'user{user_id}@example.com'
}
# 응답 캐싱
def response_cache(self, ttl=300):
"""응답 캐싱 데코레이터"""
def decorator(func):
cache = {}
def wrapper(*args, **kwargs):
# 캐시 키 생성
key = hashlib.md5(
f"{func.__name__}:{args}:{kwargs}".encode()
).hexdigest()
# 캐시 확인
if key in cache:
cached_result, cached_time = cache[key]
if time.time() - cached_time < ttl:
print(f"캐시 히트: {key[:8]}...")
return cached_result
# 함수 실행
result = func(*args, **kwargs)
# 캐시 저장
cache[key] = (result, time.time())
return result
wrapper.clear_cache = lambda: cache.clear()
return wrapper
return decorator
@response_cache(ttl=60)
def get_product_list(self, category: str, page: int = 1) -> List[Dict]:
"""상품 목록 조회"""
print(f"상품 목록 생성: {category}, 페이지 {page}")
# 시뮬레이션
time.sleep(0.5)
return [
{
'id': i,
'name': f'{category} Product {i}',
'price': 10000 + i * 1000
}
for i in range(page * 10, (page + 1) * 10)
]
# 데이터베이스 연결 풀링
class ConnectionPool:
"""데이터베이스 연결 풀"""
def __init__(self, max_connections=10):
self.max_connections = max_connections
self.connections = []
self.in_use = set()
def get_connection(self):
"""연결 가져오기"""
if self.connections:
conn = self.connections.pop()
elif len(self.in_use) < self.max_connections:
conn = self._create_connection()
else:
raise Exception("연결 풀이 가득 찼습니다")
self.in_use.add(conn)
return conn
def return_connection(self, conn):
"""연결 반환"""
self.in_use.remove(conn)
self.connections.append(conn)
def _create_connection(self):
"""새 연결 생성"""
print("새 데이터베이스 연결 생성")
return f"Connection-{len(self.in_use) + 1}"
# 배치 처리 최적화
def batch_process_users(self, user_ids: List[int], batch_size: int = 100):
"""사용자 일괄 처리"""
results = []
for i in range(0, len(user_ids), batch_size):
batch = user_ids[i:i + batch_size]
# 배치 쿼리 (시뮬레이션)
print(f"배치 처리: {len(batch)}명의 사용자")
batch_results = [
{'id': uid, 'processed': True}
for uid in batch
]
results.extend(batch_results)
return results
# 지연 로딩
class LazyLoader:
"""지연 로딩 구현"""
def __init__(self, loader_func):
self.loader_func = loader_func
self._data = None
@property
def data(self):
if self._data is None:
print("데이터 로딩...")
self._data = self.loader_func()
return self._data
# 비동기 처리 시뮬레이션
def optimize_io_operations(self):
"""I/O 작업 최적화"""
import concurrent.futures
def fetch_data(url):
"""데이터 가져오기 (시뮬레이션)"""
print(f"데이터 가져오기: {url}")
time.sleep(0.5)
return f"Data from {url}"
urls = [f"http://api.example.com/data/{i}" for i in range(10)]
# 순차 처리
start = time.time()
sequential_results = []
for url in urls:
sequential_results.append(fetch_data(url))
sequential_time = time.time() - start
# 병렬 처리
start = time.time()
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
parallel_results = list(executor.map(fetch_data, urls))
parallel_time = time.time() - start
print(f"\n순차 처리: {sequential_time:.2f}초")
print(f"병렬 처리: {parallel_time:.2f}초")
print(f"성능 향상: {sequential_time/parallel_time:.1f}배")
# 사용 예제
def test_web_optimization():
"""웹 최적화 테스트"""
app = WebAppOptimization()
# 사용자 정보 캐싱 테스트
print("=== 사용자 정보 캐싱 ===")
for _ in range(3):
user = app.get_user_by_id(1)
print(f"사용자: {user['name']}")
print(f"\n캐시 정보: {app.get_user_by_id.cache_info()}")
# 응답 캐싱 테스트
print("\n=== 응답 캐싱 ===")
for _ in range(3):
products = app.get_product_list('electronics', 1)
print(f"상품 수: {len(products)}")
# 배치 처리
print("\n=== 배치 처리 ===")
user_ids = list(range(1, 1001))
results = app.batch_process_users(user_ids)
print(f"처리된 사용자 수: {len(results)}")
# I/O 최적화
print("\n=== I/O 최적화 ===")
app.optimize_io_operations()
2. 데이터 처리 파이프라인 최적화
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
import numpy as np
from typing import List, Dict, Any, Callable
import multiprocessing as mp
from functools import partial
import pandas as pd
class DataPipelineOptimization:
"""데이터 처리 파이프라인 최적화"""
def __init__(self):
self.data = None
# 청크 단위 처리
def process_large_file_chunked(self, filename: str, chunk_size: int = 10000):
"""대용량 파일 청크 단위 처리"""
def process_chunk(chunk):
"""청크 처리 함수"""
# 데이터 정제
chunk = chunk.strip().split(',')
# 변환
try:
return {
'id': int(chunk[0]),
'value': float(chunk[1]),
'category': chunk[2]
}
except:
return None
results = []
with open(filename, 'r') as f:
chunk = []
for i, line in enumerate(f):
chunk.append(line)
if len(chunk) >= chunk_size:
# 청크 처리
processed = [
process_chunk(item)
for item in chunk
if item.strip()
]
results.extend([p for p in processed if p])
chunk = []
# 마지막 청크 처리
if chunk:
processed = [
process_chunk(item)
for item in chunk
if item.strip()
]
results.extend([p for p in processed if p])
return results
# 병렬 처리
def parallel_data_processing(self, data: List[Any],
process_func: Callable,
n_workers: int = None):
"""병렬 데이터 처리"""
if n_workers is None:
n_workers = mp.cpu_count()
# 데이터 분할
chunk_size = len(data) // n_workers
chunks = [
data[i:i + chunk_size]
for i in range(0, len(data), chunk_size)
]
# 병렬 처리
with mp.Pool(n_workers) as pool:
results = pool.map(process_func, chunks)
# 결과 병합
combined_results = []
for result in results:
combined_results.extend(result)
return combined_results
# NumPy 벡터화
def vectorized_operations(self, n: int = 10000000):
"""벡터화된 연산"""
# 데이터 생성
data = np.random.randn(n)
# 1. 루프 방식
def loop_normalize(arr):
result = np.empty_like(arr)
mean = np.mean(arr)
std = np.std(arr)
for i in range(len(arr)):
result[i] = (arr[i] - mean) / std
return result
# 2. 벡터화 방식
def vectorized_normalize(arr):
return (arr - np.mean(arr)) / np.std(arr)
# 성능 비교
import time
start = time.time()
result1 = loop_normalize(data[:100000]) # 작은 샘플
loop_time = time.time() - start
start = time.time()
result2 = vectorized_normalize(data)
vector_time = time.time() - start
print(f"루프 방식 (100K): {loop_time:.4f}초")
print(f"벡터화 방식 (10M): {vector_time:.4f}초")
print(f"예상 성능 향상: {loop_time * 100 / vector_time:.1f}배")
# 메모리 효율적 처리
def memory_efficient_aggregation(self, filename: str):
"""메모리 효율적인 집계"""
# 스트리밍 집계
class StreamingStats:
def __init__(self):
self.count = 0
self.sum = 0
self.sum_sq = 0
self.min_val = float('inf')
self.max_val = float('-inf')
def update(self, value):
self.count += 1
self.sum += value
self.sum_sq += value ** 2
self.min_val = min(self.min_val, value)
self.max_val = max(self.max_val, value)
@property
def mean(self):
return self.sum / self.count if self.count > 0 else 0
@property
def std(self):
if self.count < 2:
return 0
variance = (self.sum_sq / self.count) - (self.mean ** 2)
return variance ** 0.5
stats = StreamingStats()
# 한 줄씩 처리
with open(filename, 'r') as f:
for line in f:
try:
value = float(line.strip())
stats.update(value)
except ValueError:
continue
return {
'count': stats.count,
'mean': stats.mean,
'std': stats.std,
'min': stats.min_val,
'max': stats.max_val
}
# 인덱싱 최적화
def optimize_data_access(self):
"""데이터 접근 최적화"""
# 데이터 준비
n = 1000000
data = pd.DataFrame({
'id': range(n),
'category': np.random.choice(['A', 'B', 'C', 'D'], n),
'value': np.random.randn(n)
})
# 1. 느린 방법: iterrows
def slow_filter(df):
results = []
for index, row in df.iterrows():
if row['category'] == 'A' and row['value'] > 0:
results.append(row['id'])
return results
# 2. 빠른 방법: 벡터화
def fast_filter(df):
mask = (df['category'] == 'A') & (df['value'] > 0)
return df.loc[mask, 'id'].tolist()
# 3. 인덱싱 사용
data_indexed = data.set_index('category')
def indexed_filter(df):
try:
a_data = df.loc['A']
return a_data[a_data['value'] > 0]['id'].tolist()
except KeyError:
return []
# 성능 비교
import time
# 작은 샘플로 테스트
small_data = data.head(10000)
start = time.time()
result1 = slow_filter(small_data)
time1 = time.time() - start
start = time.time()
result2 = fast_filter(data)
time2 = time.time() - start
start = time.time()
result3 = indexed_filter(data_indexed)
time3 = time.time() - start
print(f"iterrows (10K): {time1:.4f}초")
print(f"벡터화 (1M): {time2:.4f}초")
print(f"인덱싱 (1M): {time3:.4f}초")
# 사용 예제
def test_pipeline_optimization():
"""파이프라인 최적화 테스트"""
pipeline = DataPipelineOptimization()
print("=== 벡터화 연산 ===")
pipeline.vectorized_operations()
print("\n=== 데이터 접근 최적화 ===")
pipeline.optimize_data_access()
⚠️ 초보자들이 자주 하는 실수
1. 측정하지 않고 추측으로 최적화하는 실수
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# ❌ 잘못된 방법: 추측 기반 최적화
def process_data(data):
# "for 루프가 느릴 것 같으니 복잡한 알고리즘 사용"
return complex_algorithm(data) # 실제로는 더 느림!
# ✅ 올바른 방법: 측정 후 최적화
import time
def benchmark_methods(data):
# 방법 1 측정
start = time.time()
result1 = simple_loop(data)
time1 = time.time() - start
# 방법 2 측정
start = time.time()
result2 = complex_algorithm(data)
time2 = time.time() - start
print(f"간단한 방법: {time1:.4f}초")
print(f"복잡한 방법: {time2:.4f}초")
return result1 if time1 < time2 else result2
2. 잘못된 자료구조 선택하는 실수
1
2
3
4
5
6
7
8
9
10
11
# ❌ 잘못된 방법: 검색에 리스트 사용
user_ids = [1, 2, 3, 4, 5, ...] # 10만 개
def find_user(user_id):
return user_id in user_ids # O(n) - 매우 느림!
# ✅ 올바른 방법: 검색에 set 사용
user_ids = {1, 2, 3, 4, 5, ...} # 10만 개
def find_user(user_id):
return user_id in user_ids # O(1) - 매우 빠름!
3. 불필요한 반복 계산하는 실수
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# ❌ 잘못된 방법: 매번 계산
def calculate_total_price(items):
total = 0
for item in items:
# 매번 세율을 계산!
tax_rate = get_tax_rate_from_db(item.category)
total += item.price * (1 + tax_rate)
return total
# ✅ 올바른 방법: 캐싱 활용
from functools import lru_cache
@lru_cache(maxsize=100)
def get_tax_rate_cached(category):
return get_tax_rate_from_db(category)
def calculate_total_price(items):
total = 0
for item in items:
tax_rate = get_tax_rate_cached(item.category)
total += item.price * (1 + tax_rate)
return total
4. 조기 최적화의 함정
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# ❌ 잘못된 방법: 읽기 어려운 최적화
def calculate_sum(numbers):
# 마이크로 최적화에 집착
return sum(n for n in numbers if n > 0)
# vs
s = 0
for n in numbers:
if n > 0:
s += n
return s
# ✅ 올바른 방법: 먼저 알고리즘 최적화
def calculate_sum_optimized(numbers):
# 알고리즘 개선이 더 중요!
if not numbers:
return 0
# NumPy 활용으로 진짜 성능 향상
import numpy as np
arr = np.array(numbers)
return np.sum(arr[arr > 0])
5. 메모리 누수를 일으키는 실수
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# ❌ 잘못된 방법: 메모리 누수
class DataProcessor:
def __init__(self):
self.cache = {} # 계속 커짐!
def process(self, data_id, data):
if data_id not in self.cache:
self.cache[data_id] = expensive_operation(data)
return self.cache[data_id]
# ✅ 올바른 방법: LRU 캐시 사용
from functools import lru_cache
class DataProcessor:
@lru_cache(maxsize=1000) # 크기 제한
def process(self, data_id, data):
return expensive_operation(data)
6. I/O 작업을 순차 처리하는 실수
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# ❌ 잘못된 방법: 순차 API 호출
def get_user_data(user_ids):
results = []
for user_id in user_ids:
data = requests.get(f'/api/users/{user_id}').json()
results.append(data)
return results # 100명이면 100초 소요!
# ✅ 올바른 방법: 병렬 처리
from concurrent.futures import ThreadPoolExecutor
def get_user_data_parallel(user_ids):
def fetch_user(user_id):
return requests.get(f'/api/users/{user_id}').json()
with ThreadPoolExecutor(max_workers=10) as executor:
results = list(executor.map(fetch_user, user_ids))
return results # 100명을 10초에 처리!
7. 큰 파일을 메모리에 모두 로드하는 실수
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# ❌ 잘못된 방법: 전체 파일 로드
def process_large_file(filename):
with open(filename, 'r') as f:
content = f.read() # 10GB 파일이면 메모리 부족!
return process_text(content)
# ✅ 올바른 방법: 스트리밍 처리
def process_large_file_streaming(filename):
results = []
with open(filename, 'r') as f:
for line in f: # 한 줄씩 처리
result = process_line(line)
results.append(result)
return results
8. 문자열 연결을 비효율적으로 하는 실수
1
2
3
4
5
6
7
8
9
10
11
# ❌ 잘못된 방법: 문자열 반복 연결
def create_report(data):
report = ""
for item in data:
report += f"Item: {item}\n" # 매번 새 문자열 생성!
return report
# ✅ 올바른 방법: join 사용
def create_report_efficient(data):
lines = [f"Item: {item}" for item in data]
return "\n".join(lines) # 한 번에 연결
🎯 핵심 정리
최적화 체크리스트
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# 최적화 체크리스트
optimization_checklist = """
1. 측정 먼저
- 프로파일링으로 병목 지점 파악
- 추측하지 말고 측정하기
2. 알고리즘 최적화
- 시간 복잡도 개선
- 적절한 자료구조 선택
3. 메모리 최적화
- 제너레이터 사용
- 메모리 효율적인 자료구조
4. 병렬 처리
- multiprocessing 활용
- 비동기 프로그래밍
5. 캐싱
- 계산 결과 캐싱
- 데이터베이스 쿼리 캐싱
6. 외부 도구 활용
- NumPy 벡터화
- Cython/Numba 컴파일
"""
print(optimization_checklist)
최적화 우선순위
graph TD
A[성능 문제 발견] --> B{측정 가능?}
B -->|예| C[프로파일링]
B -->|아니오| D[측정 방법 구현]
C --> E[병목 지점 파악]
E --> F{알고리즘 문제?}
F -->|예| G[알고리즘 개선]
F -->|아니오| H{I/O 바운드?}
H -->|예| I[비동기/병렬 처리]
H -->|아니오| J[코드 최적화]
🎓 파이썬 마스터하기 시리즈
📚 기초편 (1-7)
- Python 소개와 개발 환경 설정
- 변수, 자료형, 연산자 완벽 정리
- 조건문과 반복문 마스터하기
- 함수와 람다 완벽 가이드
- 리스트, 튜플, 딕셔너리 정복하기
- 문자열 처리와 정규표현식
- 파일 입출력과 예외 처리
🚀 중급편 (8-12)
💼 고급편 (13-16)
- 웹 스크래핑과 API 개발
- 테스트와 디버깅 전략
- 성능 최적화 기법 (현재 글)
- 멀티프로세싱과 병렬 처리
이전글: 테스트와 디버깅 전략 ⬅️ 현재글: 성능 최적화 기법 다음글: 멀티프로세싱과 병렬 처리 ➡️
이번 포스트에서는 Python의 성능 최적화 기법을 완벽히 마스터했습니다. 다음 포스트에서는 멀티프로세싱과 병렬 처리에 대해 자세히 알아보겠습니다. Happy Coding! 🐍✨
이 기사는 저작권자의 CC BY 4.0 라이센스를 따릅니다.