[이제와서 시작하는 Python 마스터하기 #16] 멀티프로세싱과 병렬 처리
[이제와서 시작하는 Python 마스터하기 #16] 멀티프로세싱과 병렬 처리
🏢 실전 한국 비즈니스 예제: 대한민국 공공데이터 분석 시스템
한국의 공공데이터 포털에서 제공되는 수백만 건의 데이터를 실시간으로 분석하는 시스템을 개발한다고 가정해봅시다:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class PublicDataAnalyzer:
"""공공데이터 분석 시스템"""
def analyze_population_data(self, regions: list) -> dict:
"""전국 17개 시도 인구 데이터 분석"""
# 서울, 부산, 대구, 인천, 광주, 대전, 울산...
results = {}
for region in regions:
# 지역별 데이터 처리 (각 1시간 소요)
results[region] = self.process_region_data(region)
return results # 순차 처리: 17시간 소요!
def analyze_business_licenses(self, data_chunks: list) -> list:
"""사업자등록증 데이터 분석 (500만 건)"""
results = []
for chunk in data_chunks:
# CPU 집약적 분석 (상호명 정규화, 업종 분류 등)
processed = self.classify_business_type(chunk)
results.extend(processed)
return results
def crawl_real_estate_data(self, urls: list) -> list:
"""부동산 가격 데이터 수집 (I/O 바운드)"""
data = []
for url in urls:
# 네트워크 요청 (1-2초 소요)
response = self.fetch_url(url)
data.append(response)
return data # 1000개 URL이면 30분 소요!
이런 시스템에서는:
- CPU 집약적 작업: 데이터 분석, 머신러닝 모델 실행
- I/O 집약적 작업: API 호출, 데이터베이스 연결, 파일 처리
- 실시간 처리: 새로운 데이터 업데이트 처리
병렬 처리를 통해 17시간 작업을 1-2시간내로 단축할 수 있습니다.
🚀 병렬 처리의 이해
Python에서 병렬 처리는 여러 작업을 동시에 실행하여 성능을 향상시키는 기술입니다. GIL(Global Interpreter Lock)의 제약을 극복하는 다양한 방법을 알아봅니다.
graph TB
subgraph "병렬 처리 방식"
A[멀티스레딩<br/>Threading] --> B[I/O 바운드 작업<br/>GIL 영향 적음]
C[멀티프로세싱<br/>Multiprocessing] --> D[CPU 바운드 작업<br/>GIL 우회]
E[비동기<br/>Asyncio] --> F[동시성 처리<br/>단일 스레드]
G[동시 실행<br/>Concurrent] --> H[추상화 레이어<br/>통합 인터페이스]
end
[!NOTE] GIL이 뭔가요?
GIL(Global Interpreter Lock)은 파이썬 인터프리터가 한 번에 하나의 스레드만 실행하도록 막는 잠금 장치입니다. 그래서 파이썬에서는 스레드를 여러 개 만들어도 CPU를 하나밖에 못 씁니다. CPU를 100% 활용하려면 멀티프로세싱을 써야 하는 이유가 바로 이 GIL 때문입니다!
🧵 Threading 모듈
Threading은 I/O 바운드 작업에 효과적입니다.
기본 스레드 사용법
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
import threading
import time
import queue
from typing import List, Any, Callable
import requests
# 기본 스레드 생성
def basic_threading():
"""기본 스레드 예제"""
def worker(name: str, delay: float):
"""작업자 함수"""
print(f"{name} 시작")
time.sleep(delay)
print(f"{name} 완료")
# 스레드 생성
threads = []
for i in range(5):
t = threading.Thread(
target=worker,
args=(f"Thread-{i}", 2)
)
threads.append(t)
t.start()
# 모든 스레드 완료 대기
for t in threads:
t.join()
print("모든 스레드 완료")
# 스레드 클래스 상속
class WorkerThread(threading.Thread):
"""작업자 스레드 클래스"""
def __init__(self, name: str, task_queue: queue.Queue):
super().__init__(name=name)
self.task_queue = task_queue
self.daemon = True # 데몬 스레드로 설정
self.results = []
def run(self):
"""스레드 실행"""
while True:
try:
task = self.task_queue.get(timeout=1)
print(f"{self.name}: {task} 처리 중...")
# 작업 시뮬레이션
result = self.process_task(task)
self.results.append(result)
self.task_queue.task_done()
except queue.Empty:
break
def process_task(self, task: Any) -> Any:
"""작업 처리"""
time.sleep(0.1) # 작업 시뮬레이션
return f"Processed: {task}"
# 스레드 풀 구현
class ThreadPool:
"""스레드 풀"""
def __init__(self, num_threads: int = 4):
self.num_threads = num_threads
self.task_queue = queue.Queue()
self.threads = []
self._create_threads()
def _create_threads(self):
"""스레드 생성"""
for i in range(self.num_threads):
thread = WorkerThread(f"Worker-{i}", self.task_queue)
thread.start()
self.threads.append(thread)
def submit(self, task: Any):
"""작업 제출"""
self.task_queue.put(task)
def submit_many(self, tasks: List[Any]):
"""여러 작업 제출"""
for task in tasks:
self.task_queue.put(task)
def wait_completion(self):
"""모든 작업 완료 대기"""
self.task_queue.join()
def shutdown(self):
"""스레드 풀 종료"""
self.wait_completion()
# 결과 수집
all_results = []
for thread in self.threads:
all_results.extend(thread.results)
return all_results
# 스레드 동기화
class ThreadSynchronization:
"""스레드 동기화 예제"""
def __init__(self):
self.counter = 0
self.lock = threading.Lock()
self.rlock = threading.RLock()
self.semaphore = threading.Semaphore(3)
self.event = threading.Event()
self.condition = threading.Condition()
def unsafe_increment(self):
"""안전하지 않은 증가 (경쟁 조건)"""
temp = self.counter
time.sleep(0.0001) # 경쟁 조건 유발
self.counter = temp + 1
def safe_increment(self):
"""안전한 증가 (Lock 사용)"""
with self.lock:
temp = self.counter
time.sleep(0.0001)
self.counter = temp + 1
def demonstrate_race_condition(self):
"""경쟁 조건 시연"""
# 안전하지 않은 방법
self.counter = 0
threads = []
for _ in range(100):
t = threading.Thread(target=self.unsafe_increment)
threads.append(t)
t.start()
for t in threads:
t.join()
print(f"안전하지 않은 결과: {self.counter} (예상: 100)")
# 안전한 방법
self.counter = 0
threads = []
for _ in range(100):
t = threading.Thread(target=self.safe_increment)
threads.append(t)
t.start()
for t in threads:
t.join()
print(f"안전한 결과: {self.counter} (예상: 100)")
def semaphore_example(self):
"""세마포어 예제 (동시 접근 제한)"""
def access_resource(id: int):
print(f"스레드 {id}: 리소스 접근 시도")
with self.semaphore:
print(f"스레드 {id}: 리소스 접근 성공")
time.sleep(1)
print(f"스레드 {id}: 리소스 해제")
threads = []
for i in range(10):
t = threading.Thread(target=access_resource, args=(i,))
threads.append(t)
t.start()
for t in threads:
t.join()
def event_example(self):
"""이벤트 예제"""
def waiter(name: str):
print(f"{name}: 이벤트 대기 중...")
self.event.wait()
print(f"{name}: 이벤트 발생! 작업 시작")
# 대기 스레드 생성
threads = []
for i in range(3):
t = threading.Thread(target=waiter, args=(f"Waiter-{i}",))
threads.append(t)
t.start()
# 2초 후 이벤트 발생
time.sleep(2)
print("이벤트 발생시킴!")
self.event.set()
for t in threads:
t.join()
# 실전 예제: 웹 스크래핑
class ThreadedWebScraper:
"""멀티스레드 웹 스크래퍼"""
def __init__(self, num_threads: int = 10):
self.num_threads = num_threads
self.url_queue = queue.Queue()
self.results = queue.Queue()
def fetch_url(self, url: str) -> dict:
"""URL 가져오기"""
try:
response = requests.get(url, timeout=5)
return {
'url': url,
'status': response.status_code,
'size': len(response.content),
'title': self.extract_title(response.text)
}
except Exception as e:
return {
'url': url,
'error': str(e)
}
def extract_title(self, html: str) -> str:
"""제목 추출"""
import re
match = re.search(r'<title>(.*?)</title>', html, re.IGNORECASE)
return match.group(1) if match else 'No title'
def worker(self):
"""작업자 스레드"""
while True:
try:
url = self.url_queue.get(timeout=1)
result = self.fetch_url(url)
self.results.put(result)
self.url_queue.task_done()
except queue.Empty:
break
def scrape(self, urls: List[str]) -> List[dict]:
"""URL 목록 스크래핑"""
# URL 큐에 추가
for url in urls:
self.url_queue.put(url)
# 작업자 스레드 생성
threads = []
for _ in range(self.num_threads):
t = threading.Thread(target=self.worker)
t.start()
threads.append(t)
# 완료 대기
self.url_queue.join()
# 스레드 종료 대기
for t in threads:
t.join()
# 결과 수집
results = []
while not self.results.empty():
results.append(self.results.get())
return results
🔄 Multiprocessing 모듈
Multiprocessing은 CPU 바운드 작업에 적합하며 GIL을 우회합니다.
기본 프로세스 사용법
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
import multiprocessing as mp
from multiprocessing import Process, Queue, Pool, Lock, Value, Array
import time
import os
from typing import List, Any, Callable
import numpy as np
# 기본 프로세스 생성
def basic_multiprocessing():
"""기본 멀티프로세싱 예제"""
def worker(name: str, delay: float):
"""작업자 함수"""
print(f"{name} (PID: {os.getpid()}) 시작")
time.sleep(delay)
print(f"{name} 완료")
# 프로세스 생성
processes = []
for i in range(4):
p = Process(
target=worker,
args=(f"Process-{i}", 2)
)
processes.append(p)
p.start()
# 모든 프로세스 완료 대기
for p in processes:
p.join()
print("모든 프로세스 완료")
> [!IMPORTANT]
> **if \_\_name\_\_ == '\_\_main\_\_': 필수!**
>
> 윈도우(Windows)나 맥(macOS)에서 멀티프로세싱을 쓸 때는 반드시 코드를 `if __name__ == '__main__':` 블록 안에 넣어야 합니다.
> 안 그러면 프로세스가 무한히 생성되면서 컴퓨터가 멈출 수 있습니다. (재귀적으로 자기가 자기를 계속 실행하거든요!)
# 프로세스 간 통신
class InterProcessCommunication:
"""프로세스 간 통신"""
@staticmethod
def queue_example():
"""큐를 사용한 통신"""
def producer(q: Queue, n: int):
"""생산자 프로세스"""
for i in range(n):
item = f"item-{i}"
q.put(item)
print(f"생산: {item}")
time.sleep(0.1)
q.put(None) # 종료 신호
def consumer(q: Queue):
"""소비자 프로세스"""
while True:
item = q.get()
if item is None:
break
print(f"소비: {item}")
time.sleep(0.2)
# 큐 생성
q = mp.Queue()
# 프로세스 생성
p1 = Process(target=producer, args=(q, 10))
p2 = Process(target=consumer, args=(q,))
p1.start()
p2.start()
p1.join()
p2.join()
@staticmethod
def pipe_example():
"""파이프를 사용한 통신"""
def sender(conn):
"""송신 프로세스"""
for i in range(5):
msg = f"Message {i}"
conn.send(msg)
print(f"송신: {msg}")
time.sleep(0.5)
conn.close()
def receiver(conn):
"""수신 프로세스"""
while True:
try:
msg = conn.recv()
print(f"수신: {msg}")
except EOFError:
break
# 파이프 생성
parent_conn, child_conn = mp.Pipe()
# 프로세스 생성
p1 = Process(target=sender, args=(child_conn,))
p2 = Process(target=receiver, args=(parent_conn,))
p1.start()
p2.start()
p1.join()
p2.join()
# 공유 메모리
class SharedMemoryExample:
"""공유 메모리 예제"""
@staticmethod
def shared_value_example():
"""공유 값 예제"""
def increment(shared_val: Value, lock: Lock, n: int):
"""값 증가"""
for _ in range(n):
with lock:
shared_val.value += 1
# 공유 값과 락 생성
shared_val = Value('i', 0) # 'i'는 integer
lock = Lock()
# 프로세스 생성
processes = []
for _ in range(4):
p = Process(target=increment, args=(shared_val, lock, 1000))
processes.append(p)
p.start()
for p in processes:
p.join()
print(f"최종 값: {shared_val.value} (예상: 4000)")
@staticmethod
def shared_array_example():
"""공유 배열 예제"""
def fill_array(shared_arr: Array, start: int, end: int):
"""배열 채우기"""
for i in range(start, end):
shared_arr[i] = i ** 2
# 공유 배열 생성
size = 100
shared_arr = Array('i', size) # 'i'는 integer
# 프로세스 생성
processes = []
chunk_size = size // 4
for i in range(4):
start = i * chunk_size
end = start + chunk_size if i < 3 else size
p = Process(target=fill_array, args=(shared_arr, start, end))
processes.append(p)
p.start()
for p in processes:
p.join()
# 결과 확인
result = list(shared_arr)
print(f"배열 처음 10개: {result[:10]}")
print(f"배열 마지막 10개: {result[-10:]}")
# 프로세스 풀
class ProcessPoolExample:
"""프로세스 풀 예제"""
@staticmethod
def pool_map_example():
"""map을 사용한 병렬 처리"""
def compute_square(n: int) -> int:
"""제곱 계산"""
return n ** 2
# 데이터
numbers = list(range(1000000))
# 순차 처리
start = time.time()
sequential_result = [compute_square(n) for n in numbers]
sequential_time = time.time() - start
# 병렬 처리
start = time.time()
with Pool() as pool:
parallel_result = pool.map(compute_square, numbers)
parallel_time = time.time() - start
print(f"순차 처리: {sequential_time:.2f}초")
print(f"병렬 처리: {parallel_time:.2f}초")
print(f"성능 향상: {sequential_time/parallel_time:.2f}배")
@staticmethod
def pool_apply_async_example():
"""비동기 작업 제출"""
def long_task(n: int) -> dict:
"""시간이 걸리는 작업"""
time.sleep(1)
return {
'input': n,
'result': n ** 2,
'pid': os.getpid()
}
with Pool(processes=4) as pool:
# 비동기 작업 제출
async_results = []
for i in range(10):
result = pool.apply_async(long_task, (i,))
async_results.append(result)
# 결과 수집
results = []
for async_result in async_results:
result = async_result.get() # 블로킹
results.append(result)
print(f"완료: {result}")
return results
@staticmethod
def pool_starmap_example():
"""여러 인자를 가진 함수 병렬 처리"""
def compute_power(base: int, exponent: int) -> int:
"""거듭제곱 계산"""
return base ** exponent
# 인자 목록
args_list = [(i, 2) for i in range(100)]
with Pool() as pool:
results = pool.starmap(compute_power, args_list)
print(f"처음 10개 결과: {results[:10]}")
# 실전 예제: 이미지 처리
class ParallelImageProcessor:
"""병렬 이미지 처리기"""
def __init__(self, num_processes: int = None):
self.num_processes = num_processes or mp.cpu_count()
def process_image(self, image_data: np.ndarray) -> np.ndarray:
"""이미지 처리 (시뮬레이션)"""
# 간단한 필터 적용
result = np.zeros_like(image_data)
# 가우시안 블러 시뮬레이션
for i in range(1, image_data.shape[0] - 1):
for j in range(1, image_data.shape[1] - 1):
# 3x3 커널 평균
result[i, j] = np.mean(
image_data[i-1:i+2, j-1:j+2]
)
return result
def process_image_parallel(self, image_data: np.ndarray) -> np.ndarray:
"""병렬 이미지 처리"""
height = image_data.shape[0]
chunk_size = height // self.num_processes
# 이미지를 청크로 분할
chunks = []
for i in range(self.num_processes):
start = i * chunk_size
end = start + chunk_size if i < self.num_processes - 1 else height
# 경계 처리를 위해 위아래 1픽셀 추가
chunk_start = max(0, start - 1)
chunk_end = min(height, end + 1)
chunks.append({
'data': image_data[chunk_start:chunk_end],
'start': start,
'end': end,
'chunk_start': chunk_start
})
# 병렬 처리
with Pool(self.num_processes) as pool:
results = pool.map(self._process_chunk, chunks)
# 결과 병합
final_image = np.zeros_like(image_data)
for result in results:
start = result['start']
end = result['end']
final_image[start:end] = result['data']
return final_image
def _process_chunk(self, chunk_info: dict) -> dict:
"""청크 처리"""
chunk_data = chunk_info['data']
start = chunk_info['start']
end = chunk_info['end']
chunk_start = chunk_info['chunk_start']
# 처리된 데이터
processed = self.process_image(chunk_data)
# 실제 범위만 반환
offset = start - chunk_start
return {
'data': processed[offset:offset + (end - start)],
'start': start,
'end': end
}
# CPU 집약적 작업 예제
class CPUIntensiveExample:
"""CPU 집약적 작업 예제"""
@staticmethod
def calculate_pi_sequential(n: int) -> float:
"""순차적 파이 계산 (몬테카를로)"""
count = 0
for _ in range(n):
x = np.random.random()
y = np.random.random()
if x*x + y*y <= 1:
count += 1
return 4 * count / n
@staticmethod
def calculate_pi_chunk(n: int) -> int:
"""파이 계산 청크"""
count = 0
for _ in range(n):
x = np.random.random()
y = np.random.random()
if x*x + y*y <= 1:
count += 1
return count
@staticmethod
def calculate_pi_parallel(total_points: int, num_processes: int = None) -> float:
"""병렬 파이 계산"""
if num_processes is None:
num_processes = mp.cpu_count()
points_per_process = total_points // num_processes
with Pool(num_processes) as pool:
counts = pool.map(
CPUIntensiveExample.calculate_pi_chunk,
[points_per_process] * num_processes
)
total_count = sum(counts)
return 4 * total_count / total_points
@staticmethod
def benchmark_pi_calculation():
"""파이 계산 벤치마크"""
n = 10000000
# 순차 처리
start = time.time()
pi_seq = CPUIntensiveExample.calculate_pi_sequential(n)
seq_time = time.time() - start
# 병렬 처리
start = time.time()
pi_par = CPUIntensiveExample.calculate_pi_parallel(n)
par_time = time.time() - start
print(f"순차 처리: {seq_time:.2f}초 (π ≈ {pi_seq:.6f})")
print(f"병렬 처리: {par_time:.2f}초 (π ≈ {pi_par:.6f})")
print(f"성능 향상: {seq_time/par_time:.2f}배")
[!TIP] 언제 멀티프로세싱을 쓰나요?
- 이미지 처리, 동영상 변환, 복잡한 수학 계산처럼 CPU가 쉴 새 없이 돌아가는 작업에 딱입니다.
- 반면, 단순히 웹사이트 내용을 긁어오거나 파일을 다운로드하는 작업은 멀티스레딩이나 비동기(asyncio)가 훨씬 가볍고 빠릅니다.
🎯 Concurrent.futures 모듈
concurrent.futures는 스레드와 프로세스를 통합된 인터페이스로 제공합니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed, wait
import concurrent.futures
import time
import requests
from typing import List, Dict, Any, Callable
class ConcurrentFuturesExample:
"""concurrent.futures 예제"""
@staticmethod
def thread_pool_example():
"""ThreadPoolExecutor 예제"""
def fetch_url(url: str) -> dict:
"""URL 가져오기"""
try:
response = requests.get(url, timeout=5)
return {
'url': url,
'status': response.status_code,
'size': len(response.content)
}
except Exception as e:
return {
'url': url,
'error': str(e)
}
urls = [
'https://httpbin.org/delay/1',
'https://httpbin.org/delay/2',
'https://httpbin.org/delay/1',
'https://httpbin.org/status/200',
'https://httpbin.org/status/404'
]
# ThreadPoolExecutor 사용
with ThreadPoolExecutor(max_workers=5) as executor:
# 방법 1: submit 사용
futures = []
for url in urls:
future = executor.submit(fetch_url, url)
futures.append(future)
# 결과 수집
for future in as_completed(futures):
result = future.result()
print(f"완료: {result}")
# 방법 2: map 사용
results = list(executor.map(fetch_url, urls))
print(f"\nmap 결과: {len(results)}개 완료")
@staticmethod
def process_pool_example():
"""ProcessPoolExecutor 예제"""
def cpu_bound_task(n: int) -> int:
"""CPU 집약적 작업"""
total = 0
for i in range(n):
total += i ** 2
return total
# 작업 목록
tasks = [10000000] * 10
# 순차 처리
start = time.time()
sequential_results = [cpu_bound_task(n) for n in tasks]
seq_time = time.time() - start
# 병렬 처리
start = time.time()
with ProcessPoolExecutor() as executor:
parallel_results = list(executor.map(cpu_bound_task, tasks))
par_time = time.time() - start
print(f"순차 처리: {seq_time:.2f}초")
print(f"병렬 처리: {par_time:.2f}초")
print(f"성능 향상: {seq_time/par_time:.2f}배")
@staticmethod
def future_handling():
"""Future 객체 처리"""
def task_with_delay(name: str, delay: float) -> str:
"""지연이 있는 작업"""
time.sleep(delay)
return f"{name} 완료 (지연: {delay}초)"
with ThreadPoolExecutor(max_workers=3) as executor:
# Future 객체 생성
future1 = executor.submit(task_with_delay, "Task1", 2)
future2 = executor.submit(task_with_delay, "Task2", 1)
future3 = executor.submit(task_with_delay, "Task3", 3)
# Future 상태 확인
print(f"Future1 실행 중: {future1.running()}")
print(f"Future2 완료: {future2.done()}")
# 첫 번째 완료된 작업 대기
done, not_done = wait(
[future1, future2, future3],
return_when=concurrent.futures.FIRST_COMPLETED
)
print(f"\n첫 번째 완료: {len(done)}개")
for future in done:
print(f"결과: {future.result()}")
# 모든 작업 완료 대기
done, not_done = wait(not_done)
print(f"\n나머지 완료: {len(done)}개")
@staticmethod
def exception_handling():
"""예외 처리"""
def risky_task(n: int) -> int:
"""위험한 작업"""
if n < 0:
raise ValueError("음수는 처리할 수 없습니다")
time.sleep(0.5)
return n * 2
tasks = [1, 2, -1, 3, 4]
with ThreadPoolExecutor(max_workers=3) as executor:
futures = {
executor.submit(risky_task, n): n
for n in tasks
}
for future in as_completed(futures):
n = futures[future]
try:
result = future.result()
print(f"성공: {n} -> {result}")
except Exception as e:
print(f"실패: {n} -> {e}")
# 실전 예제: 병렬 데이터 처리 파이프라인
class ParallelDataPipeline:
"""병렬 데이터 처리 파이프라인"""
def __init__(self, num_workers: int = 4):
self.num_workers = num_workers
def extract_data(self, source: str) -> List[Dict]:
"""데이터 추출 (시뮬레이션)"""
print(f"데이터 추출: {source}")
time.sleep(0.5)
# 시뮬레이션 데이터
return [
{'id': i, 'value': i * 10, 'source': source}
for i in range(100)
]
def transform_data(self, data: Dict) -> Dict:
"""데이터 변환"""
# CPU 집약적 변환
transformed = {
'id': data['id'],
'original': data['value'],
'squared': data['value'] ** 2,
'sqrt': data['value'] ** 0.5,
'processed': True
}
# 시뮬레이션 지연
time.sleep(0.01)
return transformed
def load_data(self, data_batch: List[Dict]) -> int:
"""데이터 로드 (시뮬레이션)"""
# 데이터베이스 저장 시뮬레이션
time.sleep(0.1)
return len(data_batch)
def run_pipeline(self, sources: List[str]):
"""파이프라인 실행"""
start_time = time.time()
# 1. 추출 단계 (I/O 바운드 - 스레드 사용)
print("=== 추출 단계 ===")
extracted_data = []
with ThreadPoolExecutor(max_workers=self.num_workers) as executor:
futures = [
executor.submit(self.extract_data, source)
for source in sources
]
for future in as_completed(futures):
data = future.result()
extracted_data.extend(data)
print(f"추출 완료: {len(extracted_data)}개 레코드")
# 2. 변환 단계 (CPU 바운드 - 프로세스 사용)
print("\n=== 변환 단계 ===")
transformed_data = []
with ProcessPoolExecutor(max_workers=self.num_workers) as executor:
# 배치 처리
batch_size = 100
for i in range(0, len(extracted_data), batch_size):
batch = extracted_data[i:i + batch_size]
# 병렬 변환
futures = [
executor.submit(self.transform_data, item)
for item in batch
]
for future in as_completed(futures):
transformed_data.append(future.result())
print(f"변환 완료: {len(transformed_data)}개 레코드")
# 3. 로드 단계 (I/O 바운드 - 스레드 사용)
print("\n=== 로드 단계 ===")
total_loaded = 0
with ThreadPoolExecutor(max_workers=self.num_workers) as executor:
# 배치로 로드
batch_size = 50
futures = []
for i in range(0, len(transformed_data), batch_size):
batch = transformed_data[i:i + batch_size]
future = executor.submit(self.load_data, batch)
futures.append(future)
for future in as_completed(futures):
loaded = future.result()
total_loaded += loaded
print(f"로드 완료: {total_loaded}개 레코드")
# 완료
end_time = time.time()
print(f"\n총 처리 시간: {end_time - start_time:.2f}초")
🔀 병렬 처리 패턴과 최적화
병렬 처리를 효과적으로 사용하기 위한 다양한 패턴과 최적화 기법입니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
import multiprocessing as mp
from multiprocessing import Manager
import threading
import time
from typing import List, Any, Callable
import numpy as np
class ParallelPatterns:
"""병렬 처리 패턴"""
@staticmethod
def map_reduce_pattern():
"""Map-Reduce 패턴"""
def mapper(chunk: List[str]) -> List[tuple]:
"""맵 함수: 단어 카운트"""
word_counts = {}
for text in chunk:
words = text.lower().split()
for word in words:
word_counts[word] = word_counts.get(word, 0) + 1
return list(word_counts.items())
def reducer(mapped_results: List[List[tuple]]) -> dict:
"""리듀스 함수: 결과 병합"""
total_counts = {}
for result in mapped_results:
for word, count in result:
total_counts[word] = total_counts.get(word, 0) + count
return total_counts
# 데이터
texts = [
"Hello world from Python",
"Python is great for parallel processing",
"World of multiprocessing in Python",
"Great performance with parallel computing"
] * 1000
# 청크로 분할
num_processes = mp.cpu_count()
chunk_size = len(texts) // num_processes
chunks = [
texts[i:i + chunk_size]
for i in range(0, len(texts), chunk_size)
]
# Map 단계
with Pool(num_processes) as pool:
mapped_results = pool.map(mapper, chunks)
# Reduce 단계
final_result = reducer(mapped_results)
# 상위 10개 단어
top_words = sorted(
final_result.items(),
key=lambda x: x[1],
reverse=True
)[:10]
print("상위 10개 단어:")
for word, count in top_words:
print(f" {word}: {count}")
@staticmethod
def pipeline_pattern():
"""파이프라인 패턴"""
def stage1(item: int) -> int:
"""1단계: 제곱"""
return item ** 2
def stage2(item: int) -> int:
"""2단계: 2배"""
return item * 2
def stage3(item: int) -> str:
"""3단계: 문자열 변환"""
return f"Result: {item}"
# 파이프라인 실행
def pipeline_worker(input_queue: Queue, output_queue: Queue,
stage_func: Callable):
"""파이프라인 워커"""
while True:
item = input_queue.get()
if item is None:
output_queue.put(None)
break
result = stage_func(item)
output_queue.put(result)
# 큐 생성
queue1 = mp.Queue()
queue2 = mp.Queue()
queue3 = mp.Queue()
queue4 = mp.Queue()
# 프로세스 생성
p1 = Process(target=pipeline_worker, args=(queue1, queue2, stage1))
p2 = Process(target=pipeline_worker, args=(queue2, queue3, stage2))
p3 = Process(target=pipeline_worker, args=(queue3, queue4, stage3))
p1.start()
p2.start()
p3.start()
# 데이터 입력
for i in range(10):
queue1.put(i)
queue1.put(None) # 종료 신호
# 결과 수집
results = []
while True:
result = queue4.get()
if result is None:
break
results.append(result)
# 프로세스 종료
p1.join()
p2.join()
p3.join()
print("파이프라인 결과:")
for result in results[:5]:
print(f" {result}")
@staticmethod
def worker_pool_pattern():
"""워커 풀 패턴"""
class WorkerPool:
def __init__(self, num_workers: int = 4):
self.num_workers = num_workers
self.task_queue = mp.Queue()
self.result_queue = mp.Queue()
self.workers = []
def worker(self):
"""워커 프로세스"""
while True:
task = self.task_queue.get()
if task is None:
break
task_id, func, args = task
try:
result = func(*args)
self.result_queue.put((task_id, 'success', result))
except Exception as e:
self.result_queue.put((task_id, 'error', str(e)))
def start(self):
"""워커 시작"""
for _ in range(self.num_workers):
p = Process(target=self.worker)
p.start()
self.workers.append(p)
def submit(self, task_id: int, func: Callable, *args):
"""작업 제출"""
self.task_queue.put((task_id, func, args))
def shutdown(self):
"""워커 종료"""
for _ in range(self.num_workers):
self.task_queue.put(None)
for worker in self.workers:
worker.join()
def get_results(self) -> List[tuple]:
"""결과 수집"""
results = []
while not self.result_queue.empty():
results.append(self.result_queue.get())
return results
# 사용 예제
def heavy_computation(x: int) -> int:
"""무거운 계산"""
time.sleep(0.1)
return x ** 3
pool = WorkerPool(num_workers=4)
pool.start()
# 작업 제출
for i in range(10):
pool.submit(i, heavy_computation, i)
# 잠시 대기
time.sleep(1)
# 결과 수집
results = pool.get_results()
# 종료
pool.shutdown()
print("워커 풀 결과:")
for task_id, status, result in sorted(results):
print(f" Task {task_id}: {status} - {result}")
# 성능 최적화 전략
class PerformanceOptimization:
"""성능 최적화 전략"""
@staticmethod
def chunking_strategy():
"""청킹 전략"""
def process_chunk(chunk: np.ndarray) -> float:
"""청크 처리"""
return np.mean(chunk ** 2)
# 큰 데이터
data = np.random.rand(100000000)
# 다양한 청크 크기 테스트
chunk_sizes = [1000, 10000, 100000, 1000000]
for chunk_size in chunk_sizes:
num_chunks = len(data) // chunk_size
chunks = [
data[i*chunk_size:(i+1)*chunk_size]
for i in range(num_chunks)
]
start = time.time()
with Pool() as pool:
results = pool.map(process_chunk, chunks)
end = time.time()
print(f"청크 크기 {chunk_size}: {end - start:.2f}초")
@staticmethod
def load_balancing():
"""부하 분산"""
def variable_workload(task_info: tuple) -> dict:
"""가변 작업량"""
task_id, complexity = task_info
# 복잡도에 따른 작업 시간
work_time = complexity * 0.1
time.sleep(work_time)
return {
'task_id': task_id,
'complexity': complexity,
'result': complexity ** 2
}
# 다양한 복잡도의 작업
tasks = [
(i, np.random.randint(1, 10))
for i in range(20)
]
# 정적 할당
start = time.time()
with Pool(4) as pool:
results = pool.map(variable_workload, tasks)
static_time = time.time() - start
# 동적 할당 (chunksize=1)
start = time.time()
with Pool(4) as pool:
results = pool.map(variable_workload, tasks, chunksize=1)
dynamic_time = time.time() - start
print(f"정적 할당: {static_time:.2f}초")
print(f"동적 할당: {dynamic_time:.2f}초")
@staticmethod
def memory_efficiency():
"""메모리 효율성"""
# 공유 메모리 사용
def process_shared_array(shared_array_name: str, start: int, end: int):
"""공유 배열 처리"""
# 공유 메모리 연결
existing_shm = mp.shared_memory.SharedMemory(name=shared_array_name)
array = np.ndarray((10000000,), dtype=np.float64, buffer=existing_shm.buf)
# 처리
result = np.sum(array[start:end] ** 2)
# 정리
existing_shm.close()
return result
# 공유 메모리 생성
data = np.random.rand(10000000)
shm = mp.shared_memory.SharedMemory(create=True, size=data.nbytes)
shared_array = np.ndarray(data.shape, dtype=data.dtype, buffer=shm.buf)
shared_array[:] = data[:]
# 병렬 처리
chunk_size = len(data) // 4
tasks = [
(shm.name, i*chunk_size, (i+1)*chunk_size)
for i in range(4)
]
with Pool(4) as pool:
results = pool.starmap(process_shared_array, tasks)
total = sum(results)
print(f"공유 메모리 결과: {total:.2f}")
# 정리
shm.close()
shm.unlink()
💡 실전 예제
1. 웹 크롤러 시스템
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
import multiprocessing as mp
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import requests
from bs4 import BeautifulSoup
import time
from typing import List, Dict, Set
from urllib.parse import urljoin, urlparse
import queue
class ParallelWebCrawler:
"""병렬 웹 크롤러"""
def __init__(self, max_workers: int = 10, max_depth: int = 3):
self.max_workers = max_workers
self.max_depth = max_depth
self.visited_urls = set()
self.url_queue = queue.Queue()
self.results = []
def fetch_url(self, url: str) -> Dict:
"""URL 가져오기"""
try:
response = requests.get(url, timeout=5)
response.raise_for_status()
return {
'url': url,
'status': response.status_code,
'content': response.text,
'size': len(response.content)
}
except Exception as e:
return {
'url': url,
'error': str(e)
}
def extract_links(self, html: str, base_url: str) -> List[str]:
"""링크 추출"""
soup = BeautifulSoup(html, 'html.parser')
links = []
for tag in soup.find_all(['a', 'link']):
href = tag.get('href')
if href:
absolute_url = urljoin(base_url, href)
if self.is_valid_url(absolute_url):
links.append(absolute_url)
return links
def is_valid_url(self, url: str) -> bool:
"""유효한 URL 확인"""
parsed = urlparse(url)
return parsed.scheme in ('http', 'https')
def process_page(self, page_data: Dict) -> Dict:
"""페이지 처리"""
if 'error' in page_data:
return page_data
# 텍스트 추출
soup = BeautifulSoup(page_data['content'], 'html.parser')
# 제목 추출
title = soup.find('title')
page_data['title'] = title.text if title else 'No title'
# 메타 정보 추출
meta_desc = soup.find('meta', attrs={'name': 'description'})
page_data['description'] = meta_desc.get('content', '') if meta_desc else ''
# 텍스트 길이
text = soup.get_text()
page_data['text_length'] = len(text)
# 이미지 수
page_data['image_count'] = len(soup.find_all('img'))
return page_data
def crawl(self, start_url: str):
"""크롤링 시작"""
self.url_queue.put((start_url, 0))
with ThreadPoolExecutor(max_workers=self.max_workers) as fetch_executor:
with ProcessPoolExecutor(max_workers=mp.cpu_count()) as process_executor:
while not self.url_queue.empty():
try:
url, depth = self.url_queue.get(timeout=5)
if url in self.visited_urls or depth > self.max_depth:
continue
self.visited_urls.add(url)
# URL 가져오기 (I/O 바운드 - 스레드)
future = fetch_executor.submit(self.fetch_url, url)
page_data = future.result()
if 'error' not in page_data:
# 페이지 처리 (CPU 바운드 - 프로세스)
future = process_executor.submit(self.process_page, page_data)
processed_data = future.result()
self.results.append(processed_data)
# 새 링크 추가
if depth < self.max_depth:
links = self.extract_links(page_data['content'], url)
for link in links:
if link not in self.visited_urls:
self.url_queue.put((link, depth + 1))
print(f"크롤링 완료: {url} (깊이: {depth})")
except queue.Empty:
break
return self.results
def get_statistics(self) -> Dict:
"""통계 정보"""
total_size = sum(r.get('size', 0) for r in self.results)
total_images = sum(r.get('image_count', 0) for r in self.results)
return {
'total_pages': len(self.results),
'total_size': total_size,
'average_size': total_size / len(self.results) if self.results else 0,
'total_images': total_images,
'errors': sum(1 for r in self.results if 'error' in r)
}
# 사용 예제
def test_crawler():
"""크롤러 테스트"""
crawler = ParallelWebCrawler(max_workers=5, max_depth=2)
start_time = time.time()
results = crawler.crawl('https://example.com')
end_time = time.time()
stats = crawler.get_statistics()
print(f"\n크롤링 완료!")
print(f"소요 시간: {end_time - start_time:.2f}초")
print(f"크롤링한 페이지: {stats['total_pages']}개")
print(f"총 크기: {stats['total_size'] / 1024 / 1024:.2f} MB")
print(f"평균 크기: {stats['average_size'] / 1024:.2f} KB")
print(f"총 이미지: {stats['total_images']}개")
print(f"오류: {stats['errors']}개")
2. 대용량 데이터 분석 시스템
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
import numpy as np
import pandas as pd
from multiprocessing import Pool, Manager
import concurrent.futures
import time
from typing import List, Dict, Any
import h5py
class ParallelDataAnalyzer:
"""병렬 데이터 분석기"""
def __init__(self, num_processes: int = None):
self.num_processes = num_processes or mp.cpu_count()
def analyze_chunk(self, chunk_data: np.ndarray) -> Dict[str, Any]:
"""청크 분석"""
return {
'mean': np.mean(chunk_data),
'std': np.std(chunk_data),
'min': np.min(chunk_data),
'max': np.max(chunk_data),
'median': np.median(chunk_data),
'count': len(chunk_data),
'sum': np.sum(chunk_data),
'sum_sq': np.sum(chunk_data ** 2)
}
def merge_statistics(self, stats_list: List[Dict]) -> Dict[str, Any]:
"""통계 병합"""
total_count = sum(s['count'] for s in stats_list)
total_sum = sum(s['sum'] for s in stats_list)
total_sum_sq = sum(s['sum_sq'] for s in stats_list)
# 전체 평균
overall_mean = total_sum / total_count
# 전체 표준편차
variance = (total_sum_sq / total_count) - (overall_mean ** 2)
overall_std = np.sqrt(variance)
return {
'mean': overall_mean,
'std': overall_std,
'min': min(s['min'] for s in stats_list),
'max': max(s['max'] for s in stats_list),
'count': total_count
}
def analyze_large_dataset(self, data: np.ndarray) -> Dict[str, Any]:
"""대용량 데이터셋 분석"""
chunk_size = len(data) // self.num_processes
chunks = []
# 데이터 분할
for i in range(self.num_processes):
start = i * chunk_size
end = start + chunk_size if i < self.num_processes - 1 else len(data)
chunks.append(data[start:end])
# 병렬 분석
with Pool(self.num_processes) as pool:
chunk_stats = pool.map(self.analyze_chunk, chunks)
# 결과 병합
overall_stats = self.merge_statistics(chunk_stats)
return overall_stats
def correlation_analysis(self, data: pd.DataFrame) -> pd.DataFrame:
"""상관관계 분석"""
def compute_correlation_chunk(args):
"""청크별 상관관계 계산"""
chunk, columns = args
return chunk[columns].corr()
# 데이터 분할
chunk_size = len(data) // self.num_processes
chunks = []
for i in range(self.num_processes):
start = i * chunk_size
end = start + chunk_size if i < self.num_processes - 1 else len(data)
chunks.append((data.iloc[start:end], data.columns))
# 병렬 계산
with Pool(self.num_processes) as pool:
correlations = pool.map(compute_correlation_chunk, chunks)
# 가중 평균으로 병합
weights = [len(chunk[0]) for chunk in chunks]
total_weight = sum(weights)
final_corr = correlations[0] * (weights[0] / total_weight)
for i in range(1, len(correlations)):
final_corr += correlations[i] * (weights[i] / total_weight)
return final_corr
def parallel_groupby_aggregation(self, data: pd.DataFrame,
group_column: str,
agg_functions: Dict[str, str]) -> pd.DataFrame:
"""병렬 그룹별 집계"""
def process_group(group_data):
"""그룹 처리"""
group_name, group_df = group_data
result = {'group': group_name}
for column, func in agg_functions.items():
if func == 'mean':
result[f"{column}_{func}"] = group_df[column].mean()
elif func == 'sum':
result[f"{column}_{func}"] = group_df[column].sum()
elif func == 'count':
result[f"{column}_{func}"] = group_df[column].count()
elif func == 'std':
result[f"{column}_{func}"] = group_df[column].std()
return result
# 그룹 분할
groups = list(data.groupby(group_column))
# 병렬 처리
with Pool(self.num_processes) as pool:
results = pool.map(process_group, groups)
# 결과를 DataFrame으로 변환
return pd.DataFrame(results)
# 실전 활용 예제
class RealTimeDataProcessor:
"""실시간 데이터 처리기"""
def __init__(self, num_workers: int = 4):
self.num_workers = num_workers
self.manager = Manager()
self.data_queue = self.manager.Queue()
self.result_queue = self.manager.Queue()
self.stats = self.manager.dict()
self.running = self.manager.Value('b', True)
def data_generator(self):
"""데이터 생성기 (시뮬레이션)"""
while self.running.value:
# 실시간 데이터 시뮬레이션
data_batch = {
'timestamp': time.time(),
'values': np.random.randn(1000),
'category': np.random.choice(['A', 'B', 'C'])
}
self.data_queue.put(data_batch)
time.sleep(0.1) # 100ms마다 새 데이터
def process_worker(self, worker_id: int):
"""처리 워커"""
local_stats = {
'processed': 0,
'total_sum': 0,
'total_count': 0
}
while self.running.value or not self.data_queue.empty():
try:
data = self.data_queue.get(timeout=1)
# 데이터 처리
processed = {
'timestamp': data['timestamp'],
'mean': np.mean(data['values']),
'std': np.std(data['values']),
'category': data['category'],
'worker_id': worker_id
}
# 통계 업데이트
local_stats['processed'] += 1
local_stats['total_sum'] += np.sum(data['values'])
local_stats['total_count'] += len(data['values'])
self.result_queue.put(processed)
except queue.Empty:
continue
# 최종 통계 저장
self.stats[worker_id] = local_stats
def aggregator(self):
"""집계기"""
category_stats = {}
while self.running.value or not self.result_queue.empty():
try:
result = self.result_queue.get(timeout=1)
# 카테고리별 집계
category = result['category']
if category not in category_stats:
category_stats[category] = {
'count': 0,
'sum_mean': 0,
'values': []
}
category_stats[category]['count'] += 1
category_stats[category]['sum_mean'] += result['mean']
category_stats[category]['values'].append(result['mean'])
# 주기적 리포트
if sum(s['count'] for s in category_stats.values()) % 100 == 0:
self.print_report(category_stats)
except queue.Empty:
continue
# 최종 리포트
self.print_report(category_stats)
def print_report(self, category_stats: Dict):
"""리포트 출력"""
print("\n=== 실시간 처리 리포트 ===")
for category, stats in category_stats.items():
if stats['count'] > 0:
avg_mean = stats['sum_mean'] / stats['count']
print(f"{category}: {stats['count']}개 처리, 평균: {avg_mean:.4f}")
def run(self, duration: int = 10):
"""실행"""
processes = []
# 데이터 생성기
p = Process(target=self.data_generator)
p.start()
processes.append(p)
# 처리 워커
for i in range(self.num_workers):
p = Process(target=self.process_worker, args=(i,))
p.start()
processes.append(p)
# 집계기
p = Process(target=self.aggregator)
p.start()
processes.append(p)
# 지정된 시간 동안 실행
time.sleep(duration)
# 종료
self.running.value = False
# 모든 프로세스 종료 대기
for p in processes:
p.join()
# 최종 통계
print("\n=== 최종 통계 ===")
total_processed = sum(
self.stats[i]['processed']
for i in range(self.num_workers)
)
print(f"총 처리된 배치: {total_processed}")
# 테스트
def test_data_analysis():
"""데이터 분석 테스트"""
# 대용량 데이터 생성
print("대용량 데이터 분석 테스트")
data = np.random.randn(100000000)
analyzer = ParallelDataAnalyzer()
# 순차 처리
start = time.time()
seq_stats = analyzer.analyze_chunk(data)
seq_time = time.time() - start
# 병렬 처리
start = time.time()
par_stats = analyzer.analyze_large_dataset(data)
par_time = time.time() - start
print(f"\n순차 처리: {seq_time:.2f}초")
print(f"병렬 처리: {par_time:.2f}초")
print(f"성능 향상: {seq_time/par_time:.2f}배")
# 실시간 처리 테스트
print("\n\n실시간 데이터 처리 테스트")
processor = RealTimeDataProcessor(num_workers=4)
processor.run(duration=5)
⚠️ 초보자들이 자주 하는 실수
1. GIL을 이해하지 못하고 잘못된 방법 선택
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# ❌ 잘못된 방법: CPU 집약적 작업에 Threading 사용
import threading
def cpu_bound_task(n):
total = 0
for i in range(n):
total += i ** 2 # CPU 집약적
return total
# Threading으로는 성능 향상 없음!
threads = []
for i in range(4):
t = threading.Thread(target=cpu_bound_task, args=(1000000,))
threads.append(t)
t.start()
# ✅ 올바른 방법: CPU 집약적 작업에 Multiprocessing
from multiprocessing import Pool
def cpu_bound_task(n):
total = 0
for i in range(n):
total += i ** 2
return total
with Pool(4) as pool:
results = pool.map(cpu_bound_task, [1000000] * 4)
2. 프로세스 간 통신을 비효율적으로 하는 실수
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# ❌ 잘못된 방법: 큰 데이터를 매번 전송
from multiprocessing import Process, Queue
def worker(data_queue, result_queue):
while True:
big_data = data_queue.get() # 매번 큰 데이터 전송!
if big_data is None:
break
result = process_big_data(big_data)
result_queue.put(result)
# ✅ 올바른 방법: 인덱스만 전송
from multiprocessing import shared_memory
def worker_optimized(index_queue, result_queue, shared_array_name):
# 공유 메모리 연결
shm = shared_memory.SharedMemory(name=shared_array_name)
while True:
index = index_queue.get()
if index is None:
break
# 인덱스로 데이터 접근
result = process_data_at_index(shm, index)
result_queue.put(result)
shm.close()
3. 매우 작은 작업에 과도한 병렬화
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# ❌ 잘못된 방법: 작은 작업에 과도한 병렬화
from multiprocessing import Pool
def small_task(x):
return x * 2 # 매우 간단한 작업
# 프로세스 생성 비용이 더 큼!
with Pool(8) as pool:
results = pool.map(small_task, range(10))
# ✅ 올바른 방법: 간단한 작업은 순차 처리
results = [x * 2 for x in range(10)]
# 또는 chunksize 사용
with Pool(4) as pool:
results = pool.map(small_task, range(1000), chunksize=100)
4. 데드락을 일으키는 실수
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# ❌ 잘못된 방법: 데드락 위험
import threading
lock1 = threading.Lock()
lock2 = threading.Lock()
def thread1():
with lock1:
time.sleep(0.1)
with lock2: # 대기
pass
def thread2():
with lock2:
time.sleep(0.1)
with lock1: # 대기 -> 데드락!
pass
# ✅ 올바른 방법: 일관된 락 순서
def thread1_safe():
with lock1: # 항상 같은 순서
with lock2:
pass
def thread2_safe():
with lock1: # 항상 같은 순서
with lock2:
pass
5. 공유 상태를 동기화하지 않는 실수
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# ❌ 잘못된 방법: 경쟁 조건 발생
import threading
counter = 0
def increment():
global counter
for _ in range(100000):
counter += 1 # 비원자적 연산!
threads = [threading.Thread(target=increment) for _ in range(10)]
for t in threads:
t.start()
for t in threads:
t.join()
print(counter) # 1000000이 아니라 더 작은 값!
# ✅ 올바른 방법: Lock 사용
import threading
counter = 0
lock = threading.Lock()
def increment_safe():
global counter
for _ in range(100000):
with lock:
counter += 1
6. 예외 처리를 제대로 하지 않는 실수
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# ❌ 잘못된 방법: 예외 무시
from concurrent.futures import ThreadPoolExecutor
def risky_task(x):
if x < 0:
raise ValueError("음수는 처리할 수 없습니다")
return x * 2
with ThreadPoolExecutor() as executor:
futures = [executor.submit(risky_task, x) for x in [-1, 1, 2]]
results = [f.result() for f in futures] # 예외로 프로그램 종료!
# ✅ 올바른 방법: 예외 처리
from concurrent.futures import as_completed
with ThreadPoolExecutor() as executor:
futures = [executor.submit(risky_task, x) for x in [-1, 1, 2]]
for future in as_completed(futures):
try:
result = future.result()
print(f"성공: {result}")
except Exception as e:
print(f"오류: {e}")
7. 잘못된 워커 수 설정
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# ❌ 잘못된 방법: 과도한 워커
from concurrent.futures import ThreadPoolExecutor
# I/O 바운드 작업에 과도한 스레드
with ThreadPoolExecutor(max_workers=1000) as executor:
# 시스템 리소스 고갈!
pass
# ✅ 올바른 방법: 적절한 워커 수
import multiprocessing as mp
# CPU 바운드: CPU 코어 수
with mp.Pool(processes=mp.cpu_count()) as pool:
pass
# I/O 바운드: 10-50개 정도
with ThreadPoolExecutor(max_workers=20) as executor:
pass
8. 비동기와 동기 방식을 혼동하는 실수
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# ❌ 잘못된 방법: 비동기 함수를 동기식으로 호출
import asyncio
async def async_task(x):
await asyncio.sleep(1)
return x * 2
# 비동기 함수를 잘못 호출
result = async_task(5) # 코루틴 객체 반환!
# ✅ 올바른 방법: 비동기 함수 올바른 호출
async def main():
result = await async_task(5)
print(result)
# 또는
result = asyncio.run(async_task(5))
🎯 핵심 정리
병렬 처리 선택 가이드
graph TD
A[작업 유형] --> B{I/O 바운드?}
B -->|예| C[Threading 또는<br/>AsyncIO]
B -->|아니오| D{CPU 바운드?}
D -->|예| E[Multiprocessing]
D -->|아니오| F[순차 처리]
C --> G{동시 연결 많음?}
G -->|예| H[AsyncIO]
G -->|아니오| I[Threading]
E --> J{작업 독립적?}
J -->|예| K[Process Pool]
J -->|아니오| L[Queue/Pipe]
Best Practices
- GIL 이해: CPU 바운드 작업은 multiprocessing 사용
- 적절한 워커 수: CPU 코어 수 고려
- 통신 오버헤드: 프로세스 간 통신 최소화
- 메모리 공유: 큰 데이터는 공유 메모리 사용
- 예외 처리: 각 워커에서 예외 처리
🎓 파이썬 마스터하기 시리즈
📚 기초편 (1-7)
- Python 소개와 개발 환경 설정
- 변수, 자료형, 연산자 완벽 정리
- 조건문과 반복문 마스터하기
- 함수와 람다 완벽 가이드
- 리스트, 튜플, 딕셔너리 정복하기
- 문자열 처리와 정규표현식
- 파일 입출력과 예외 처리
🚀 중급편 (8-12)
💼 고급편 (13-16)
- 웹 스크래핑과 API 개발
- 테스트와 디버깅 전략
- 성능 최적화 기법
- 멀티프로세싱과 병렬 처리 (현재 글)
이전글: 성능 최적화 기법 ⬅️ 현재글: 멀티프로세싱과 병렬 처리 다음글: 시리즈 완료! ✨
이번 포스트에서는 Python의 멀티프로세싱과 병렬 처리를 완벽히 마스터했습니다. 파이썬 마스터하기 시리즈 16편이 모두 완료되었습니다! Happy Coding! 🐍✨
이 기사는 저작권자의 CC BY 4.0 라이센스를 따릅니다.