포스트

[이제와서 시작하는 Python 마스터하기 #11] 비동기 프로그래밍 (async/await)

[이제와서 시작하는 Python 마스터하기 #11] 비동기 프로그래밍 (async/await)

🚀 실전 예제로 시작하기

🕷️ 한국 뉴스 사이트 웹 크롤러

실제 한국 웹사이트들을 크롤링하는 비동기 웹 크롤러를 만들어보겠습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
import asyncio
import aiohttp
import time
from bs4 import BeautifulSoup
from typing import List, Dict
import json

class KoreanNewsCrawler:
    """한국 뉴스 사이트 비동기 크롤러"""

    def __init__(self):
        self.session = None
        self.results = []

        # 한국 주요 뉴스 사이트 (실제 예제용)
        self.news_sites = [
            {
                "name": "네이버뉴스",
                "url": "https://news.naver.com",
                "selector": ".news_tit"
            },
            {
                "name": "다음뉴스",
                "url": "https://news.daum.net",
                "selector": ".link_txt"
            },
            {
                "name": "조선일보",
                "url": "https://www.chosun.com",
                "selector": ".story-card__headline"
            }
        ]

    async def __aenter__(self):
        """비동기 컨텍스트 관리자 시작"""
        connector = aiohttp.TCPConnector(limit=100, limit_per_host=10)
        timeout = aiohttp.ClientTimeout(total=30)

        self.session = aiohttp.ClientSession(
            connector=connector,
            timeout=timeout,
            headers={
                'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
            }
        )
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """비동기 컨텍스트 관리자 종료"""
        if self.session:
            await self.session.close()

    async def fetch_page(self, site: Dict[str, str]) -> Dict:
        """개별 페이지 크롤링"""
        try:
            print(f"🌐 크롤링 시작: {site['name']}")

            async with self.session.get(site['url']) as response:
                if response.status == 200:
                    html = await response.text()

                    # BeautifulSoup으로 파싱 (동기 작업)
                    soup = BeautifulSoup(html, 'html.parser')
                    headlines = soup.select(site['selector'])

                    news_list = []
                    for headline in headlines[:5]:  # 상위 5개만
                        title = headline.get_text(strip=True)
                        if title:
                            news_list.append(title)

                    print(f"✅ 완료: {site['name']} ({len(news_list)}개 뉴스)")

                    return {
                        'site': site['name'],
                        'status': 'success',
                        'news_count': len(news_list),
                        'headlines': news_list,
                        'crawl_time': time.time()
                    }
                else:
                    print(f"❌ HTTP 에러: {site['name']} - {response.status}")
                    return {
                        'site': site['name'],
                        'status': 'error',
                        'error': f"HTTP {response.status}"
                    }

        except asyncio.TimeoutError:
            print(f"⏰ 타임아웃: {site['name']}")
            return {
                'site': site['name'],
                'status': 'timeout',
                'error': 'Request timeout'
            }
        except Exception as e:
            print(f"💥 에러: {site['name']} - {str(e)}")
            return {
                'site': site['name'],
                'status': 'error',
                'error': str(e)
            }

    async def crawl_all_sites(self) -> List[Dict]:
        """모든 사이트 동시 크롤링"""
        print("📰 한국 뉴스 사이트 크롤링 시작...")
        start_time = time.time()

        # 모든 사이트를 동시에 크롤링
        tasks = [self.fetch_page(site) for site in self.news_sites]
        results = await asyncio.gather(*tasks, return_exceptions=True)

        # 예외 처리
        processed_results = []
        for result in results:
            if isinstance(result, Exception):
                processed_results.append({
                    'status': 'error',
                    'error': str(result)
                })
            else:
                processed_results.append(result)

        end_time = time.time()
        print(f"🏁 크롤링 완료! 총 소요시간: {end_time - start_time:.2f}")

        return processed_results

    def generate_report(self, results: List[Dict]):
        """크롤링 결과 리포트 생성"""
        print("\n📊 크롤링 결과 리포트")
        print("=" * 50)

        success_count = sum(1 for r in results if r.get('status') == 'success')
        total_news = sum(r.get('news_count', 0) for r in results if r.get('status') == 'success')

        print(f"성공한 사이트: {success_count}/{len(results)}")
        print(f"총 수집 뉴스: {total_news}")
        print()

        for result in results:
            if result.get('status') == 'success':
                print(f"🔸 {result['site']}: {result['news_count']}개 뉴스")
                for i, headline in enumerate(result['headlines'][:3], 1):
                    print(f"   {i}. {headline[:50]}...")
                print()

# 사용 예제
async def run_news_crawler():
    """뉴스 크롤러 실행"""
    async with KoreanNewsCrawler() as crawler:
        results = await crawler.crawl_all_sites()
        crawler.generate_report(results)
        return results

# 실행
# results = asyncio.run(run_news_crawler())

🌐 한국 API 서비스 동시 호출 시스템

여러 한국 공공 API를 동시에 호출하는 시스템입니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
import asyncio
import aiohttp
import json
from typing import Dict, List, Optional
from dataclasses import dataclass
from datetime import datetime, timedelta

@dataclass
class APIEndpoint:
    """API 엔드포인트 정보"""
    name: str
    url: str
    headers: Dict[str, str]
    timeout: int = 10
    retry_count: int = 3

class KoreanAPIAggregator:
    """한국 공공 API 통합 호출 시스템"""

    def __init__(self):
        self.session = None
        self.results = {}

        # 한국 공공 API 엔드포인트들 (예시)
        self.endpoints = [
            APIEndpoint(
                name="날씨정보",
                url="http://apis.data.go.kr/1360000/VilageFcstInfoService_2.0/getVilageFcst",
                headers={"Content-Type": "application/json"},
                timeout=15
            ),
            APIEndpoint(
                name="지하철정보",
                url="http://swopenapi.seoul.go.kr/api/subway",
                headers={"Content-Type": "application/json"},
                timeout=10
            ),
            APIEndpoint(
                name="부동산정보",
                url="http://openapi.molit.go.kr/OpenAPI_ToolInstallPackage/service/rest/RTMSOBJSvc/getRTMSDataSvcAptTradeDev",
                headers={"Content-Type": "application/json"},
                timeout=20
            ),
            APIEndpoint(
                name="교통정보",
                url="http://t-data.seoul.go.kr/apig/apiman-gateway/tapi/v2xSignalPhaseTimingInformation/1.0",
                headers={"Content-Type": "application/json"},
                timeout=12
            ),
            APIEndpoint(
                name="인구통계",
                url="http://kosis.kr/openapi/Param/statisticsParameterData.do",
                headers={"Content-Type": "application/json"},
                timeout=25
            )
        ]

    async def __aenter__(self):
        """비동기 컨텍스트 관리자"""
        connector = aiohttp.TCPConnector(
            limit=50,  # 전체 연결 수 제한
            limit_per_host=10,  # 호스트당 연결 수 제한
            ttl_dns_cache=300,  # DNS 캐시 TTL
            use_dns_cache=True
        )

        self.session = aiohttp.ClientSession(
            connector=connector,
            headers={'User-Agent': 'Korean-API-Aggregator/1.0'}
        )
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if self.session:
            await self.session.close()

    async def call_api_with_retry(self, endpoint: APIEndpoint) -> Dict:
        """재시도 로직이 있는 API 호출"""
        last_exception = None

        for attempt in range(endpoint.retry_count):
            try:
                print(f"🔄 API 호출 시도 {attempt + 1}/{endpoint.retry_count}: {endpoint.name}")

                timeout = aiohttp.ClientTimeout(total=endpoint.timeout)

                async with self.session.get(
                    endpoint.url,
                    headers=endpoint.headers,
                    timeout=timeout
                ) as response:

                    if response.status == 200:
                        # 실제로는 JSON 응답을 파싱
                        # 여기서는 시뮬레이션
                        data = await self.simulate_api_response(endpoint.name)

                        print(f"✅ 성공: {endpoint.name}")
                        return {
                            'endpoint': endpoint.name,
                            'status': 'success',
                            'data': data,
                            'attempt': attempt + 1,
                            'response_time': response.headers.get('X-Response-Time', 'N/A'),
                            'timestamp': datetime.now().isoformat()
                        }
                    else:
                        print(f"❌ HTTP 에러: {endpoint.name} - {response.status}")
                        if attempt == endpoint.retry_count - 1:  # 마지막 시도
                            return {
                                'endpoint': endpoint.name,
                                'status': 'error',
                                'error': f"HTTP {response.status}",
                                'attempts': endpoint.retry_count
                            }

                        # 재시도 전 대기
                        await asyncio.sleep(2 ** attempt)  # 지수 백오프

            except asyncio.TimeoutError as e:
                last_exception = e
                print(f"⏰ 타임아웃: {endpoint.name} (시도 {attempt + 1})")
                if attempt < endpoint.retry_count - 1:
                    await asyncio.sleep(1)

            except Exception as e:
                last_exception = e
                print(f"💥 에러: {endpoint.name} - {str(e)}")
                if attempt < endpoint.retry_count - 1:
                    await asyncio.sleep(1)

        # 모든 재시도 실패
        return {
            'endpoint': endpoint.name,
            'status': 'failed',
            'error': str(last_exception),
            'attempts': endpoint.retry_count
        }

    async def simulate_api_response(self, api_name: str) -> Dict:
        """API 응답 시뮬레이션"""
        # 실제 API 호출 지연 시뮬레이션
        await asyncio.sleep(0.5 + hash(api_name) % 3)

        responses = {
            "날씨정보": {
                "temperature": "23°C",
                "humidity": "65%",
                "condition": "맑음"
            },
            "지하철정보": {
                "line": "2호선",
                "station": "강남역",
                "arrival": "2분 후"
            },
            "부동산정보": {
                "region": "서울시 강남구",
                "avg_price": "12억원",
                "change": "+2.3%"
            },
            "교통정보": {
                "road": "강남대로",
                "speed": "35km/h",
                "congestion": "보통"
            },
            "인구통계": {
                "total_population": "51,780,000",
                "growth_rate": "0.1%",
                "age_median": "43.7세"
            }
        }

        return responses.get(api_name, {"message": "데이터 없음"})

    async def fetch_all_data(self) -> Dict[str, Dict]:
        """모든 API 동시 호출"""
        print("🚀 한국 공공 API 통합 조회 시작...")
        start_time = time.time()

        # 세마포어로 동시 요청 수 제한
        semaphore = asyncio.Semaphore(5)  # 최대 5개 동시 요청

        async def limited_call(endpoint):
            async with semaphore:
                return await self.call_api_with_retry(endpoint)

        # 모든 API 동시 호출
        tasks = [limited_call(endpoint) for endpoint in self.endpoints]
        results = await asyncio.gather(*tasks, return_exceptions=True)

        # 결과 정리
        self.results = {}
        for result in results:
            if isinstance(result, Exception):
                print(f"예외 발생: {result}")
            else:
                self.results[result['endpoint']] = result

        end_time = time.time()
        print(f"🏁 모든 API 호출 완료! 총 소요시간: {end_time - start_time:.2f}")

        return self.results

    def generate_dashboard(self):
        """대시보드 형태로 결과 출력"""
        print("\n📊 한국 공공 API 통합 대시보드")
        print("=" * 60)

        successful = [r for r in self.results.values() if r['status'] == 'success']
        failed = [r for r in self.results.values() if r['status'] in ['error', 'failed']]

        print(f"✅ 성공: {len(successful)}개 API")
        print(f"❌ 실패: {len(failed)}개 API")
        print()

        # 성공한 API 데이터 표시
        for result in successful:
            print(f"🔸 {result['endpoint']}")
            if 'data' in result:
                for key, value in result['data'].items():
                    print(f"   {key}: {value}")
            print(f"   응답시간: {result.get('response_time', 'N/A')}")
            print(f"   시도횟수: {result.get('attempt', 1)}")
            print()

        # 실패한 API 정보
        if failed:
            print("❌ 실패한 API:")
            for result in failed:
                print(f"   {result['endpoint']}: {result.get('error', 'Unknown error')}")

# 사용 예제
async def run_api_aggregator():
    """API 통합 시스템 실행"""
    async with KoreanAPIAggregator() as aggregator:
        results = await aggregator.fetch_all_data()
        aggregator.generate_dashboard()
        return results

# 실행
# api_results = asyncio.run(run_api_aggregator())

🔄 비동기 프로그래밍이란?

비동기 프로그래밍은 여러 작업을 동시에 처리하는 것처럼 보이게 하는 프로그래밍 방식입니다. I/O 작업이 많은 프로그램에서 특히 유용합니다.

graph LR
    subgraph "동기 처리"
        A1[작업 1] --> A2[대기] --> A3[작업 2] --> A4[대기] --> A5[작업 3]
    end
    
    subgraph "비동기 처리"
        B1[작업 1 시작] --> B2[작업 2 시작] --> B3[작업 3 시작]
        B1 -.-> B4[작업 1 완료]
        B2 -.-> B5[작업 2 완료]
        B3 -.-> B6[작업 3 완료]
    end

동기 vs 비동기 비교

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import time
import asyncio
import aiohttp
import requests

# 동기 방식
def sync_download(url):
    """동기적으로 URL 다운로드"""
    response = requests.get(url)
    return len(response.content)

def sync_main():
    """동기 방식으로 여러 URL 다운로드"""
    urls = [
        "https://api.github.com/users/python",
        "https://api.github.com/users/django",
        "https://api.github.com/users/flask"
    ]
    
    start = time.time()
    
    for url in urls:
        size = sync_download(url)
        print(f"Downloaded {url}: {size} bytes")
    
    end = time.time()
    print(f"동기 처리 시간: {end - start:.2f}")

# 비동기 방식
async def async_download(session, url):
    """비동기적으로 URL 다운로드"""
    async with session.get(url) as response:
        content = await response.read()
        return len(content)

async def async_main():
    """비동기 방식으로 여러 URL 다운로드"""
    urls = [
        "https://api.github.com/users/python",
        "https://api.github.com/users/django",
        "https://api.github.com/users/flask"
    ]
    
    start = time.time()
    
    async with aiohttp.ClientSession() as session:
        tasks = [async_download(session, url) for url in urls]
        sizes = await asyncio.gather(*tasks)
        
        for url, size in zip(urls, sizes):
            print(f"Downloaded {url}: {size} bytes")
    
    end = time.time()
    print(f"비동기 처리 시간: {end - start:.2f}")

# 실행
# sync_main()  # 순차적으로 처리
# asyncio.run(async_main())  # 동시에 처리

[!TIP] 비동기는 언제 쓰나요?

“기다리는 시간”이 많은 작업에 딱입니다!

  • 웹 크롤링 (서버 응답 대기)
  • 파일 다운로드 (네트워크 대기)
  • DB 쿼리 (데이터베이스 응답 대기)

반면, 복잡한 수학 계산처럼 CPU를 많이 쓰는 작업은 비동기로 해도 빨라지지 않습니다. (오히려 느려질 수도!)

🎭 코루틴(Coroutine)과 async/await

코루틴 기초

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# 코루틴 정의
async def hello_coroutine():
    """간단한 코루틴"""
    print("코루틴 시작")
    await asyncio.sleep(1)  # 1초 대기 (비동기)
    print("코루틴 종료")
    return "완료!"

# 코루틴 실행
async def main():
    # 방법 1: await 사용
    result = await hello_coroutine()
    print(f"결과: {result}")
    
    # 방법 2: create_task 사용
    task = asyncio.create_task(hello_coroutine())
    result = await task
    print(f"태스크 결과: {result}")

# 이벤트 루프에서 실행
asyncio.run(main())

> [!WARNING]
> **await는 async 함수 안에서만!**
>
> `await` 키워드는 반드시 `async def` 정의된 함수 안에서만   있습니다.
> 일반 함수(`def`) 안에서 `await` 쓰면 에러(SyntaxError) 납니다.
> 그리고 비동기 함수를 실행할 때는  `await` 붙여주거나 `asyncio.run()`으로 실행해야 합니다. 그냥 호출하면 코루틴 객체만 덜렁 나오고 실행되지 않습니다!
# 여러 코루틴 동시 실행
async def say_after(delay, what):
    """지연 후 메시지 출력"""
    await asyncio.sleep(delay)
    print(what)
    return f"{what} 완료"

async def concurrent_execution():
    """동시 실행 예제"""
    # gather를 사용한 동시 실행
    results = await asyncio.gather(
        say_after(1, "첫 번째"),
        say_after(2, "두 번째"),
        say_after(3, "세 번째")
    )
    print(f"모든 결과: {results}")
    
    # create_task를 사용한 동시 실행
    task1 = asyncio.create_task(say_after(1, "태스크 1"))
    task2 = asyncio.create_task(say_after(2, "태스크 2"))
    
    # 모든 태스크 대기
    await task1
    await task2

asyncio.run(concurrent_execution())

비동기 컨텍스트 관리자

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class AsyncResource:
    """비동기 리소스 관리자"""
    
    async def __aenter__(self):
        print("리소스 획득 중...")
        await asyncio.sleep(1)
        print("리소스 획득 완료")
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print("리소스 정리 중...")
        await asyncio.sleep(0.5)
        print("리소스 정리 완료")
    
    async def do_something(self):
        print("작업 수행 중...")
        await asyncio.sleep(1)
        return "작업 결과"

async def use_async_resource():
    """비동기 컨텍스트 관리자 사용"""
    async with AsyncResource() as resource:
        result = await resource.do_something()
        print(f"결과: {result}")

# 파일 비동기 처리
import aiofiles

async def async_file_operations():
    """비동기 파일 작업"""
    # 비동기 파일 쓰기
    async with aiofiles.open('async_test.txt', 'w') as f:
        await f.write("비동기 파일 쓰기 테스트\n")
        await f.write("두 번째 줄\n")
    
    # 비동기 파일 읽기
    async with aiofiles.open('async_test.txt', 'r') as f:
        content = await f.read()
        print(f"파일 내용:\n{content}")

비동기 반복자

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
class AsyncCounter:
    """비동기 카운터"""
    
    def __init__(self, start=0, end=10):
        self.start = start
        self.end = end
    
    def __aiter__(self):
        self.current = self.start
        return self
    
    async def __anext__(self):
        if self.current < self.end:
            await asyncio.sleep(0.1)  # 시뮬레이션
            self.current += 1
            return self.current
        else:
            raise StopAsyncIteration

async def use_async_iterator():
    """비동기 반복자 사용"""
    async for num in AsyncCounter(1, 5):
        print(f"카운트: {num}")

# 비동기 제너레이터
async def async_generator(n):
    """비동기 제너레이터"""
    for i in range(n):
        await asyncio.sleep(0.1)
        yield i ** 2

async def use_async_generator():
    """비동기 제너레이터 사용"""
    async for square in async_generator(5):
        print(f"제곱: {square}")

# 비동기 컴프리헨션
async def async_comprehension():
    """비동기 컴프리헨션"""
    # 비동기 리스트 컴프리헨션
    squares = [x async for x in async_generator(5)]
    print(f"제곱 리스트: {squares}")
    
    # 비동기 딕셔너리 컴프리헨션
    square_dict = {i: i**2 async for i in AsyncCounter(1, 5)}
    print(f"제곱 딕셔너리: {square_dict}")

⚡ asyncio 모듈 활용

태스크와 동시성

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
async def fetch_data(name, delay):
    """데이터 가져오기 시뮬레이션"""
    print(f"{name}: 데이터 가져오는 중...")
    await asyncio.sleep(delay)
    print(f"{name}: 데이터 가져오기 완료")
    return f"{name}의 데이터"

async def task_management():
    """태스크 관리 예제"""
    # 태스크 생성
    task1 = asyncio.create_task(fetch_data("API 1", 2))
    task2 = asyncio.create_task(fetch_data("API 2", 3))
    task3 = asyncio.create_task(fetch_data("API 3", 1))
    
    # 모든 태스크 완료 대기
    results = await asyncio.gather(task1, task2, task3)
    print(f"모든 결과: {results}")

> [!TIP]
> **asyncio.gather() 동시에!**
>
> 여러 비동기 작업을 **진짜로 동시에** 실행하고 싶다면 `asyncio.gather()` 쓰세요.
> 하나씩 `await`하면 순서대로 실행되지만, `gather` 묶어서 `await`하면 병렬로 실행되어 시간이 훨씬 단축됩니다.    
    # 첫 번째 완료된 태스크 대기
    tasks = [
        asyncio.create_task(fetch_data("Fast", 1)),
        asyncio.create_task(fetch_data("Medium", 2)),
        asyncio.create_task(fetch_data("Slow", 3))
    ]
    
    done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
    
    for task in done:
        print(f"완료: {task.result()}")
    
    # 나머지 태스크 취소
    for task in pending:
        task.cancel()

# 타임아웃 처리
async def with_timeout():
    """타임아웃 처리 예제"""
    try:
        # 3초 타임아웃
        await asyncio.wait_for(fetch_data("Slow API", 5), timeout=3.0)
    except asyncio.TimeoutError:
        print("타임아웃 발생!")

# 주기적 작업
async def periodic_task():
    """주기적으로 실행되는 작업"""
    count = 0
    while count < 5:
        print(f"주기적 작업 실행: {count}")
        count += 1
        await asyncio.sleep(1)

동기화 프리미티브

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# Lock (뮤텍스)
async def worker_with_lock(name, lock, shared_resource):
    """Lock을 사용하는 워커"""
    async with lock:
        print(f"{name}: 리소스 접근 시작")
        shared_resource['count'] += 1
        await asyncio.sleep(0.1)
        print(f"{name}: 리소스 접근 종료 (count: {shared_resource['count']})")

async def lock_example():
    """Lock 사용 예제"""
    lock = asyncio.Lock()
    shared_resource = {'count': 0}
    
    tasks = [
        worker_with_lock(f"Worker {i}", lock, shared_resource)
        for i in range(5)
    ]
    
    await asyncio.gather(*tasks)
    print(f"최종 카운트: {shared_resource['count']}")

# Semaphore (동시 접근 제한)
async def worker_with_semaphore(name, semaphore):
    """Semaphore를 사용하는 워커"""
    async with semaphore:
        print(f"{name}: 작업 시작")
        await asyncio.sleep(1)
        print(f"{name}: 작업 완료")

async def semaphore_example():
    """Semaphore 사용 예제 (동시에 3개만 실행)"""
    semaphore = asyncio.Semaphore(3)
    
    tasks = [
        worker_with_semaphore(f"Task {i}", semaphore)
        for i in range(10)
    ]
    
    await asyncio.gather(*tasks)

# Event
async def waiter(event, name):
    """이벤트를 기다리는 코루틴"""
    print(f"{name}: 이벤트 대기 중...")
    await event.wait()
    print(f"{name}: 이벤트 발생! 작업 시작")

async def event_example():
    """Event 사용 예제"""
    event = asyncio.Event()
    
    # 대기자들 생성
    waiters = [
        asyncio.create_task(waiter(event, f"Waiter {i}"))
        for i in range(3)
    ]
    
    # 2초 후 이벤트 발생
    await asyncio.sleep(2)
    print("이벤트 발생시킴!")
    event.set()
    
    # 모든 대기자 완료 대기
    await asyncio.gather(*waiters)

큐(Queue)를 사용한 생산자-소비자 패턴

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
async def producer(queue, name, n):
    """데이터를 생산하는 코루틴"""
    for i in range(n):
        item = f"{name}-item-{i}"
        await queue.put(item)
        print(f"생산: {item}")
        await asyncio.sleep(0.5)

async def consumer(queue, name):
    """데이터를 소비하는 코루틴"""
    while True:
        try:
            # 1초 타임아웃으로 대기
            item = await asyncio.wait_for(queue.get(), timeout=1.0)
            print(f"{name} 소비: {item}")
            await asyncio.sleep(0.3)
            queue.task_done()
        except asyncio.TimeoutError:
            print(f"{name}: 더 이상 아이템이 없습니다")
            break

async def producer_consumer_example():
    """생산자-소비자 패턴 예제"""
    queue = asyncio.Queue(maxsize=5)
    
    # 생산자와 소비자 생성
    producers = [
        asyncio.create_task(producer(queue, f"Producer{i}", 3))
        for i in range(2)
    ]
    
    consumers = [
        asyncio.create_task(consumer(queue, f"Consumer{i}"))
        for i in range(3)
    ]
    
    # 생산자 완료 대기
    await asyncio.gather(*producers)
    
    # 큐의 모든 아이템 처리 대기
    await queue.join()
    
    # 소비자 취소
    for c in consumers:
        c.cancel()

🌐 aiohttp를 활용한 실제 웹 애플리케이션

aiohttp 클라이언트 실전 활용

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import aiohttp
import asyncio
import json
from typing import List, Dict

class AsyncAPIClient:
    """비동기 API 클라이언트"""

    def __init__(self, base_url: str, timeout: int = 30):
        self.base_url = base_url
        self.timeout = aiohttp.ClientTimeout(total=timeout)
        self.session = None

    async def __aenter__(self):
        """컨텍스트 진입"""
        self.session = aiohttp.ClientSession(timeout=self.timeout)
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """컨텍스트 종료"""
        await self.session.close()

    async def get(self, endpoint: str, params: dict = None) -> dict:
        """GET 요청"""
        url = f"{self.base_url}{endpoint}"
        async with self.session.get(url, params=params) as response:
            response.raise_for_status()
            return await response.json()

    async def post(self, endpoint: str, data: dict = None) -> dict:
        """POST 요청"""
        url = f"{self.base_url}{endpoint}"
        async with self.session.post(url, json=data) as response:
            response.raise_for_status()
            return await response.json()

    async def fetch_multiple(self, endpoints: List[str]) -> List[dict]:
        """여러 엔드포인트 동시 호출"""
        tasks = [self.get(endpoint) for endpoint in endpoints]
        return await asyncio.gather(*tasks, return_exceptions=True)

# 사용 예제
async def use_api_client():
    """API 클라이언트 사용 예제"""
    async with AsyncAPIClient("https://jsonplaceholder.typicode.com") as client:
        # 단일 요청
        user = await client.get("/users/1")
        print(f"사용자 정보: {user['name']}")

        # 다중 요청 동시 처리
        endpoints = [f"/posts/{i}" for i in range(1, 6)]
        posts = await client.fetch_multiple(endpoints)

        for i, post in enumerate(posts, 1):
            if not isinstance(post, Exception):
                print(f"포스트 {i}: {post['title']}")

# asyncio.run(use_api_client())

aiohttp 서버 구현

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
from aiohttp import web
import aiohttp_cors

# 미들웨어
@web.middleware
async def logging_middleware(request, handler):
    """로깅 미들웨어"""
    print(f"요청: {request.method} {request.path}")
    try:
        response = await handler(request)
        print(f"응답: {response.status}")
        return response
    except web.HTTPException as ex:
        print(f"HTTP 예외: {ex.status}")
        raise

# 핸들러
async def hello(request):
    """Hello 엔드포인트"""
    name = request.match_info.get('name', 'World')
    return web.json_response({'message': f'Hello, {name}!'})

async def get_users(request):
    """사용자 목록 조회"""
    # 쿼리 파라미터 처리
    page = int(request.query.get('page', 1))
    limit = int(request.query.get('limit', 10))

    # 비동기 DB 조회 시뮬레이션
    await asyncio.sleep(0.1)

    users = [
        {'id': i, 'name': f'User {i}'}
        for i in range((page-1)*limit + 1, page*limit + 1)
    ]

    return web.json_response({
        'page': page,
        'limit': limit,
        'users': users
    })

async def create_user(request):
    """사용자 생성"""
    data = await request.json()

    # 유효성 검증
    if 'name' not in data:
        raise web.HTTPBadRequest(reason='Name is required')

    # 비동기 DB 저장 시뮬레이션
    await asyncio.sleep(0.1)

    new_user = {
        'id': 123,
        'name': data['name'],
        'email': data.get('email')
    }

    return web.json_response(new_user, status=201)

# WebSocket 핸들러
async def websocket_handler(request):
    """WebSocket 연결 처리"""
    ws = web.WebSocketResponse()
    await ws.prepare(request)

    async for msg in ws:
        if msg.type == aiohttp.WSMsgType.TEXT:
            if msg.data == 'close':
                await ws.close()
            else:
                # 에코 서버
                await ws.send_str(f"Echo: {msg.data}")
        elif msg.type == aiohttp.WSMsgType.ERROR:
            print(f'WebSocket error: {ws.exception()}')

    print('WebSocket connection closed')
    return ws

# 애플리케이션 생성
def create_app():
    """애플리케이션 팩토리"""
    app = web.Application(middlewares=[logging_middleware])

    # 라우트 설정
    app.router.add_get('/', hello)
    app.router.add_get('/hello/{name}', hello)
    app.router.add_get('/users', get_users)
    app.router.add_post('/users', create_user)
    app.router.add_get('/ws', websocket_handler)

    # CORS 설정
    cors = aiohttp_cors.setup(app, defaults={
        "*": aiohttp_cors.ResourceOptions(
            allow_credentials=True,
            expose_headers="*",
            allow_headers="*",
        )
    })

    for route in list(app.router.routes()):
        cors.add(route)

    return app

# 서버 실행
# if __name__ == '__main__':
#     app = create_app()
#     web.run_app(app, host='0.0.0.0', port=8080)

실시간 채팅 서버

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
from aiohttp import web
import weakref
import json

class ChatRoom:
    """채팅방 관리 클래스"""

    def __init__(self):
        self._websockets = weakref.WeakSet()
        self._messages = []

    def join(self, ws):
        """채팅방 입장"""
        self._websockets.add(ws)

    async def leave(self, ws):
        """채팅방 퇴장"""
        # WeakSet은 자동으로 제거됨
        pass

    async def broadcast(self, message):
        """모든 참가자에게 메시지 전송"""
        self._messages.append(message)

        # 모든 연결된 클라이언트에게 전송
        if self._websockets:
            await asyncio.gather(
                *[ws.send_str(json.dumps(message)) for ws in self._websockets],
                return_exceptions=True
            )

async def chat_handler(request):
    """채팅 WebSocket 핸들러"""
    ws = web.WebSocketResponse()
    await ws.prepare(request)

    # 채팅방 가져오기 (애플리케이션에 저장)
    chat_room = request.app['chat_room']
    chat_room.join(ws)

    # 사용자 정보
    user_id = request.headers.get('User-Id', 'Anonymous')

    # 입장 메시지
    await chat_room.broadcast({
        'type': 'join',
        'user': user_id,
        'message': f'{user_id}님이 입장했습니다.'
    })

    try:
        async for msg in ws:
            if msg.type == aiohttp.WSMsgType.TEXT:
                data = json.loads(msg.data)

                if data['type'] == 'message':
                    # 메시지 브로드캐스트
                    await chat_room.broadcast({
                        'type': 'message',
                        'user': user_id,
                        'message': data['message']
                    })
                elif data['type'] == 'typing':
                    # 타이핑 상태 전송
                    await chat_room.broadcast({
                        'type': 'typing',
                        'user': user_id
                    })
            elif msg.type == aiohttp.WSMsgType.ERROR:
                print(f'WebSocket error: {ws.exception()}')
    finally:
        # 퇴장 메시지
        await chat_room.broadcast({
            'type': 'leave',
            'user': user_id,
            'message': f'{user_id}님이 퇴장했습니다.'
        })
        await chat_room.leave(ws)

    return ws

# 채팅 애플리케이션 생성
def create_chat_app():
    """채팅 애플리케이션 생성"""
    app = web.Application()
    app['chat_room'] = ChatRoom()

    # 정적 파일 (HTML 클라이언트)
    app.router.add_static('/', path='static', name='static')

    # WebSocket 엔드포인트
    app.router.add_get('/ws', chat_handler)

    return app

# HTML 클라이언트 예제
HTML_CLIENT = '''
<!DOCTYPE html>
<html>
<head>
    <title>비동기 채팅</title>
</head>
<body>
    <div id="messages"></div>
    <input type="text" id="messageInput" placeholder="메시지 입력">
    <button onclick="sendMessage()">전송</button>

    <script>
        const ws = new WebSocket('ws://localhost:8080/ws');
        const messagesDiv = document.getElementById('messages');

        ws.onmessage = (event) => {
            const data = JSON.parse(event.data);
            const messageElement = document.createElement('div');

            if (data.type === 'message') {
                messageElement.textContent = `${data.user}: ${data.message}`;
            } else {
                messageElement.textContent = data.message;
                messageElement.style.color = 'gray';
            }

            messagesDiv.appendChild(messageElement);
        };

        function sendMessage() {
            const input = document.getElementById('messageInput');
            if (input.value) {
                ws.send(JSON.stringify({
                    type: 'message',
                    message: input.value
                }));
                input.value = '';
            }
        }

        document.getElementById('messageInput').onkeypress = (e) => {
            if (e.key === 'Enter') sendMessage();
        };
    </script>
</body>
</html>
'''

💡 실전 예제

1. 비동기 웹 크롤러

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
import aiohttp
import asyncio
from bs4 import BeautifulSoup
from urllib.parse import urljoin, urlparse
import time

class AsyncWebCrawler:
    """비동기 웹 크롤러"""
    
    def __init__(self, start_url, max_depth=2, max_concurrent=10):
        self.start_url = start_url
        self.max_depth = max_depth
        self.max_concurrent = max_concurrent
        self.visited = set()
        self.to_visit = asyncio.Queue()
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.session = None
        self.results = []
    
    async def fetch_page(self, url):
        """페이지 가져오기"""
        async with self.semaphore:
            try:
                async with self.session.get(url, timeout=10) as response:
                    if response.status == 200:
                        text = await response.text()
                        return text
            except Exception as e:
                print(f"오류 발생 {url}: {e}")
                return None
    
    def extract_links(self, html, base_url):
        """HTML에서 링크 추출"""
        soup = BeautifulSoup(html, 'html.parser')
        links = []
        
        for tag in soup.find_all(['a', 'link']):
            href = tag.get('href')
            if href:
                absolute_url = urljoin(base_url, href)
                if self.is_valid_url(absolute_url):
                    links.append(absolute_url)
        
        return links
    
    def is_valid_url(self, url):
        """유효한 URL인지 확인"""
        parsed = urlparse(url)
        return (
            parsed.scheme in ('http', 'https') and
            parsed.netloc == urlparse(self.start_url).netloc
        )
    
    async def crawl_page(self, url, depth):
        """페이지 크롤링"""
        if url in self.visited or depth > self.max_depth:
            return
        
        self.visited.add(url)
        print(f"크롤링: {url} (깊이: {depth})")
        
        html = await self.fetch_page(url)
        if html:
            # 페이지 정보 저장
            self.results.append({
                'url': url,
                'depth': depth,
                'size': len(html),
                'title': self.extract_title(html)
            })
            
            # 링크 추출 및 큐에 추가
            links = self.extract_links(html, url)
            for link in links:
                if link not in self.visited:
                    await self.to_visit.put((link, depth + 1))
    
    def extract_title(self, html):
        """페이지 제목 추출"""
        soup = BeautifulSoup(html, 'html.parser')
        title_tag = soup.find('title')
        return title_tag.text.strip() if title_tag else "No Title"
    
    async def worker(self):
        """워커 코루틴"""
        while True:
            try:
                url, depth = await asyncio.wait_for(
                    self.to_visit.get(), 
                    timeout=5.0
                )
                await self.crawl_page(url, depth)
                self.to_visit.task_done()
            except asyncio.TimeoutError:
                break
    
    async def crawl(self):
        """크롤링 시작"""
        self.session = aiohttp.ClientSession()
        
        try:
            # 시작 URL을 큐에 추가
            await self.to_visit.put((self.start_url, 0))
            
            # 워커 생성
            workers = [
                asyncio.create_task(self.worker())
                for _ in range(self.max_concurrent)
            ]
            
            # 모든 URL 처리 대기
            await self.to_visit.join()
            
            # 워커 취소
            for w in workers:
                w.cancel()
            
        finally:
            await self.session.close()
        
        return self.results

# 사용 예제
async def run_crawler():
    """크롤러 실행"""
    crawler = AsyncWebCrawler(
        start_url="https://example.com",
        max_depth=2,
        max_concurrent=5
    )
    
    start = time.time()
    results = await crawler.crawl()
    end = time.time()
    
    print(f"\n크롤링 완료!")
    print(f"처리 시간: {end - start:.2f}")
    print(f"방문한 페이지 수: {len(results)}")
    
    # 결과 출력
    for result in results[:5]:  # 처음 5개만
        print(f"- {result['title']}: {result['url']} ({result['size']} bytes)")

# asyncio.run(run_crawler())

2. 비동기 API 서버

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
from aiohttp import web
import json
import asyncio
from datetime import datetime

class AsyncAPIServer:
    """비동기 API 서버"""
    
    def __init__(self):
        self.app = web.Application()
        self.setup_routes()
        self.data_store = {}
        self.request_count = 0
    
    def setup_routes(self):
        """라우트 설정"""
        self.app.router.add_get('/', self.handle_root)
        self.app.router.add_get('/api/data/{id}', self.handle_get_data)
        self.app.router.add_post('/api/data', self.handle_post_data)
        self.app.router.add_get('/api/slow', self.handle_slow_request)
        self.app.router.add_get('/api/status', self.handle_status)
    
    async def handle_root(self, request):
        """루트 핸들러"""
        self.request_count += 1
        return web.Response(text="비동기 API 서버", content_type='text/plain')
    
    async def handle_get_data(self, request):
        """데이터 조회"""
        self.request_count += 1
        data_id = request.match_info['id']
        
        if data_id in self.data_store:
            return web.json_response(self.data_store[data_id])
        else:
            return web.json_response(
                {'error': 'Data not found'},
                status=404
            )
    
    async def handle_post_data(self, request):
        """데이터 생성"""
        self.request_count += 1
        
        try:
            data = await request.json()
            data_id = str(len(self.data_store) + 1)
            
            self.data_store[data_id] = {
                'id': data_id,
                'data': data,
                'created_at': datetime.now().isoformat()
            }
            
            return web.json_response(
                self.data_store[data_id],
                status=201
            )
        except Exception as e:
            return web.json_response(
                {'error': str(e)},
                status=400
            )
    
    async def handle_slow_request(self, request):
        """느린 요청 시뮬레이션"""
        self.request_count += 1
        
        # 비동기 대기 (다른 요청 차단하지 않음)
        await asyncio.sleep(3)
        
        return web.json_response({
            'message': '느린 작업 완료',
            'duration': '3초'
        })
    
    async def handle_status(self, request):
        """서버 상태"""
        self.request_count += 1
        
        return web.json_response({
            'status': 'running',
            'request_count': self.request_count,
            'data_count': len(self.data_store),
            'uptime': datetime.now().isoformat()
        })
    
    def run(self, host='localhost', port=8080):
        """서버 실행"""
        web.run_app(self.app, host=host, port=port)

# 클라이언트 예제
async def test_api_client():
    """API 클라이언트 테스트"""
    async with aiohttp.ClientSession() as session:
        base_url = "http://localhost:8080"
        
        # 동시에 여러 요청 보내기
        tasks = []
        
        # 데이터 생성
        for i in range(5):
            data = {'name': f'Item {i}', 'value': i * 10}
            task = session.post(f"{base_url}/api/data", json=data)
            tasks.append(task)
        
        # 느린 요청 추가
        tasks.append(session.get(f"{base_url}/api/slow"))
        
        # 모든 요청 동시 실행
        responses = await asyncio.gather(*tasks)
        
        # 결과 출력
        for i, response in enumerate(responses):
            data = await response.json()
            print(f"Response {i}: {data}")
        
        # 상태 확인
        async with session.get(f"{base_url}/api/status") as response:
            status = await response.json()
            print(f"\n서버 상태: {status}")

# 서버 실행
# server = AsyncAPIServer()
# server.run()

# 클라이언트 테스트
# asyncio.run(test_api_client())

3. 비동기 데이터 처리 파이프라인

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
import asyncio
import random
from typing import AsyncIterator, List, Dict, Any
import aiofiles
import json

class AsyncDataPipeline:
    """비동기 데이터 처리 파이프라인"""
    
    def __init__(self, workers=5):
        self.workers = workers
        self.input_queue = asyncio.Queue()
        self.output_queue = asyncio.Queue()
        self.stats = {
            'processed': 0,
            'errors': 0,
            'filtered': 0
        }
    
    async def data_generator(self, count: int) -> AsyncIterator[Dict[str, Any]]:
        """데이터 생성기"""
        for i in range(count):
            await asyncio.sleep(0.01)  # 시뮬레이션
            
            data = {
                'id': i,
                'value': random.randint(1, 100),
                'category': random.choice(['A', 'B', 'C']),
                'timestamp': asyncio.get_event_loop().time()
            }
            
            yield data
    
    async def validate_data(self, data: Dict[str, Any]) -> bool:
        """데이터 유효성 검사"""
        # 비동기 검증 시뮬레이션
        await asyncio.sleep(0.001)
        
        # 검증 규칙
        if data['value'] < 10:
            self.stats['filtered'] += 1
            return False
        
        return True
    
    async def transform_data(self, data: Dict[str, Any]) -> Dict[str, Any]:
        """데이터 변환"""
        # 비동기 변환 시뮬레이션
        await asyncio.sleep(0.01)
        
        # 변환 로직
        transformed = {
            **data,
            'processed': True,
            'value_squared': data['value'] ** 2,
            'category_upper': data['category'].upper()
        }
        
        return transformed
    
    async def enrich_data(self, data: Dict[str, Any]) -> Dict[str, Any]:
        """데이터 보강 (외부 API 호출 시뮬레이션)"""
        await asyncio.sleep(0.02)
        
        # 추가 정보
        enriched = {
            **data,
            'enriched': True,
            'additional_info': f"Extra data for {data['id']}"
        }
        
        return enriched
    
    async def process_worker(self):
        """데이터 처리 워커"""
        while True:
            try:
                # 입력 큐에서 데이터 가져오기
                data = await asyncio.wait_for(
                    self.input_queue.get(),
                    timeout=1.0
                )
                
                try:
                    # 유효성 검사
                    if not await self.validate_data(data):
                        continue
                    
                    # 변환
                    transformed = await self.transform_data(data)
                    
                    # 보강
                    enriched = await self.enrich_data(transformed)
                    
                    # 출력 큐에 추가
                    await self.output_queue.put(enriched)
                    self.stats['processed'] += 1
                    
                except Exception as e:
                    self.stats['errors'] += 1
                    print(f"처리 오류: {e}")
                
                finally:
                    self.input_queue.task_done()
                    
            except asyncio.TimeoutError:
                break
    
    async def save_results(self, output_file: str):
        """결과 저장"""
        results = []
        
        while True:
            try:
                data = await asyncio.wait_for(
                    self.output_queue.get(),
                    timeout=1.0
                )
                results.append(data)
                self.output_queue.task_done()
                
            except asyncio.TimeoutError:
                break
        
        # 파일에 저장
        async with aiofiles.open(output_file, 'w') as f:
            await f.write(json.dumps(results, indent=2))
        
        return len(results)
    
    async def run_pipeline(self, data_count: int, output_file: str):
        """파이프라인 실행"""
        start = asyncio.get_event_loop().time()
        
        # 워커 시작
        workers = [
            asyncio.create_task(self.process_worker())
            for _ in range(self.workers)
        ]
        
        # 데이터 생성 및 큐에 추가
        async for data in self.data_generator(data_count):
            await self.input_queue.put(data)
        
        # 모든 데이터 처리 대기
        await self.input_queue.join()
        
        # 워커 종료
        for w in workers:
            w.cancel()
        
        # 결과 저장
        saved_count = await self.save_results(output_file)
        
        end = asyncio.get_event_loop().time()
        
        # 통계 출력
        print(f"\n파이프라인 실행 완료:")
        print(f"- 처리 시간: {end - start:.2f}")
        print(f"- 총 데이터: {data_count}")
        print(f"- 처리됨: {self.stats['processed']}")
        print(f"- 필터됨: {self.stats['filtered']}")
        print(f"- 오류: {self.stats['errors']}")
        print(f"- 저장됨: {saved_count}")
        print(f"- 처리 속도: {self.stats['processed']/(end-start):.2f} items/sec")

# 실행 예제
async def run_pipeline_example():
    """파이프라인 실행 예제"""
    pipeline = AsyncDataPipeline(workers=10)
    await pipeline.run_pipeline(
        data_count=1000,
        output_file='processed_data.json'
    )

# asyncio.run(run_pipeline_example())

⚠️ 초보자가 자주 하는 실수들

1. async/await 키워드 빠뜨리기

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# ❌ 가장 흔한 실수들
import asyncio
import aiohttp

# 실수 1: async 함수에서 await 빠뜨리기
async def fetch_data_wrong():
    session = aiohttp.ClientSession()  # await 없음!
    response = session.get('https://api.github.com/users/python')  # await 없음!
    data = response.json()  # await 없음!
    session.close()  # await 없음!
    return data

# 실수 2: 일반 함수에서 await 사용하려고 하기
def process_data_wrong():
    # SyntaxError: 'await' outside async function
    # result = await fetch_data_wrong()
    return result

# ✅ 올바른 방법
async def fetch_data_correct():
    async with aiohttp.ClientSession() as session:  # async with 사용
        async with session.get('https://api.github.com/users/python') as response:
            data = await response.json()  # await 사용
            return data

async def process_data_correct():
    result = await fetch_data_correct()  # await 사용
    return result

# 테스트
try:
    # 잘못된 방법 - 코루틴 객체만 반환됨
    result = fetch_data_wrong()
    print(f"잘못된 결과: {result}")  # <coroutine object ...>
except Exception as e:
    print(f"에러: {e}")

# 올바른 방법
async def test_correct():
    result = await fetch_data_correct()
    print(f"올바른 결과: {type(result)}")

# asyncio.run(test_correct())

2. 동기 함수와 비동기 함수 혼용

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# ❌ 동기 함수를 비동기 함수에서 잘못 사용
import time
import requests  # 동기 라이브러리

async def bad_mixed_example():
    print("비동기 함수 시작")

    # 나쁜 예 1: 동기 sleep 사용
    time.sleep(1)  # 전체 이벤트 루프를 블록킹!

    # 나쁜 예 2: 동기 HTTP 라이브러리 사용
    response = requests.get('https://api.github.com/users/python')  # 블록킹!

    print("비동기 함수 완료")
    return response.json()

# ✅ 올바른 방법
import asyncio
import aiohttp

async def good_mixed_example():
    print("비동기 함수 시작")

    # 올바른 예 1: 비동기 sleep 사용
    await asyncio.sleep(1)  # 다른 코루틴이 실행될 수 있음

    # 올바른 예 2: 비동기 HTTP 라이브러리 사용
    async with aiohttp.ClientSession() as session:
        async with session.get('https://api.github.com/users/python') as response:
            data = await response.json()

    print("비동기 함수 완료")
    return data

# 성능 비교
async def performance_comparison():
    import time

    # 동기 방식 (블록킹)
    start = time.time()

    # 이렇게 하면 순차적으로 실행됨 (3초)
    tasks_sync = []
    for i in range(3):
        await asyncio.sleep(1)  # 각각 1초씩 기다림

    sync_time = time.time() - start
    print(f"순차 실행 시간: {sync_time:.2f}")

    # 비동기 방식 (논블록킹)
    start = time.time()

    # 동시에 실행됨 (1초)
    tasks_async = [asyncio.sleep(1) for i in range(3)]
    await asyncio.gather(*tasks_async)

    async_time = time.time() - start
    print(f"동시 실행 시간: {async_time:.2f}")

# asyncio.run(performance_comparison())

3. 예외 처리 실수

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# ❌ 비동기 함수의 예외를 제대로 처리하지 않는 경우
async def risky_operation():
    await asyncio.sleep(0.1)
    if True:  # 항상 예외 발생
        raise ValueError("뭔가 잘못됨")
    return "성공"

async def bad_exception_handling():
    # 나쁜 예: gather에서 예외 처리 안함
    tasks = [risky_operation() for _ in range(3)]
    try:
        results = await asyncio.gather(*tasks)  # 하나만 실패해도 전체 실패
    except ValueError as e:
        print(f"모든 작업 실패: {e}")
        return []

# ✅ 올바른 예외 처리
async def good_exception_handling():
    tasks = [risky_operation() for _ in range(3)]

    # 방법 1: return_exceptions=True 사용
    results = await asyncio.gather(*tasks, return_exceptions=True)

    successful_results = []
    for result in results:
        if isinstance(result, Exception):
            print(f"작업 실패: {result}")
        else:
            successful_results.append(result)

    return successful_results

async def better_exception_handling():
    # 방법 2: 개별 작업별 예외 처리
    async def safe_operation():
        try:
            return await risky_operation()
        except Exception as e:
            print(f"개별 작업 실패: {e}")
            return None

    tasks = [safe_operation() for _ in range(3)]
    results = await asyncio.gather(*tasks)

    # None이 아닌 결과만 필터링
    return [r for r in results if r is not None]

# 테스트
# asyncio.run(bad_exception_handling())
# asyncio.run(good_exception_handling())

4. 세션 관리 실수

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# ❌ ClientSession을 잘못 관리하는 경우
import aiohttp

async def bad_session_management():
    # 나쁜 예 1: 세션을 닫지 않음
    session = aiohttp.ClientSession()
    async with session.get('https://api.github.com/users/python') as response:
        data = await response.json()
    # session.close()를 호출하지 않음!
    return data

async def bad_session_creation():
    # 나쁜 예 2: 매번 새로운 세션 생성
    results = []
    for i in range(10):
        session = aiohttp.ClientSession()  # 비효율적!
        async with session.get(f'https://api.github.com/users/user{i}') as response:
            data = await response.json()
            results.append(data)
        await session.close()
    return results

# ✅ 올바른 세션 관리
async def good_session_management():
    # 좋은 예 1: async with 사용
    async with aiohttp.ClientSession() as session:
        async with session.get('https://api.github.com/users/python') as response:
            data = await response.json()
            return data
    # 자동으로 세션이 닫힘

async def good_session_reuse():
    # 좋은 예 2: 하나의 세션 재사용
    async with aiohttp.ClientSession() as session:
        results = []
        for i in range(10):
            async with session.get(f'https://api.github.com/users/user{i}') as response:
                data = await response.json()
                results.append(data)
        return results

# 클래스 기반 세션 관리
class APIClient:
    def __init__(self):
        self.session = None

    async def __aenter__(self):
        self.session = aiohttp.ClientSession()
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if self.session:
            await self.session.close()

    async def get_user(self, username):
        async with self.session.get(f'https://api.github.com/users/{username}') as response:
            return await response.json()

# 사용
async def use_api_client():
    async with APIClient() as client:
        user1 = await client.get_user('python')
        user2 = await client.get_user('django')
        return [user1, user2]

5. 동시성 제어 실수

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# ❌ 동시 실행 수를 제어하지 않는 실수
async def fetch_url(session, url):
    async with session.get(url) as response:
        return await response.text()

async def bad_concurrency():
    # 나쁜 예: 1000개 URL을 동시에 모두 요청
    urls = [f'https://httpbin.org/delay/1?id={i}' for i in range(1000)]

    async with aiohttp.ClientSession() as session:
        # 이렇게 하면 시스템 리소스 고갈!
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)

    return results

# ✅ 올바른 동시성 제어
async def good_concurrency_semaphore():
    urls = [f'https://httpbin.org/delay/1?id={i}' for i in range(100)]

    # 세마포어로 동시 실행 수 제한
    semaphore = asyncio.Semaphore(10)  # 최대 10개까지만 동시 실행

    async def limited_fetch(session, url):
        async with semaphore:
            return await fetch_url(session, url)

    async with aiohttp.ClientSession() as session:
        tasks = [limited_fetch(session, url) for url in urls]
        results = await asyncio.gather(*tasks)

    return results

async def good_concurrency_batches():
    urls = [f'https://httpbin.org/delay/1?id={i}' for i in range(100)]
    batch_size = 10

    # 배치 단위로 처리
    async with aiohttp.ClientSession() as session:
        all_results = []

        for i in range(0, len(urls), batch_size):
            batch_urls = urls[i:i + batch_size]
            tasks = [fetch_url(session, url) for url in batch_urls]
            batch_results = await asyncio.gather(*tasks)
            all_results.extend(batch_results)

            print(f"배치 {i//batch_size + 1} 완료")

    return all_results

6. 이벤트 루프 실수

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# ❌ 이벤트 루프를 잘못 다루는 경우
import asyncio

# 나쁜 예 1: 이미 실행 중인 루프에서 run() 호출
async def bad_nested_run():
    # 이미 비동기 함수 안에서 asyncio.run() 호출하면 에러!
    try:
        result = asyncio.run(simple_async_task())  # RuntimeError 발생
    except RuntimeError as e:
        print(f"에러: {e}")

async def simple_async_task():
    await asyncio.sleep(0.1)
    return "완료"

# 나쁜 예 2: 블로킹 함수에서 비동기 함수 호출
def bad_sync_calling_async():
    # 동기 함수에서 비동기 함수 직접 호출
    result = simple_async_task()  # 코루틴 객체만 반환
    print(f"잘못된 결과: {result}")

# ✅ 올바른 이벤트 루프 사용
async def good_nested_execution():
    # 이미 비동기 컨텍스트에서는 await 사용
    result = await simple_async_task()
    return result

def good_sync_calling_async():
    # 동기 함수에서는 asyncio.run() 사용
    result = asyncio.run(simple_async_task())
    print(f"올바른 결과: {result}")

# 이벤트 루프 확인
def check_event_loop():
    try:
        loop = asyncio.get_running_loop()
        print("이벤트 루프가 실행 중입니다")
        return True
    except RuntimeError:
        print("이벤트 루프가 실행 중이 아닙니다")
        return False

async def safe_async_execution():
    # 안전한 비동기 실행
    if check_event_loop():
        result = await simple_async_task()
    else:
        result = asyncio.run(simple_async_task())
    return result

7. 메모리 누수와 리소스 관리

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# ❌ 리소스를 제대로 정리하지 않는 실수
import weakref

class BadAsyncResource:
    def __init__(self, name):
        self.name = name
        self.session = None
        self.connections = []

    async def initialize(self):
        self.session = aiohttp.ClientSession()
        # 연결들을 저장하지만 정리하지 않음
        for i in range(10):
            conn = await self.session.get('https://httpbin.org/get')
            self.connections.append(conn)

    # __del__ 메서드 없음 - 메모리 누수!

# ✅ 올바른 리소스 관리
class GoodAsyncResource:
    def __init__(self, name):
        self.name = name
        self.session = None
        self.connections = []
        self._closed = False

    async def __aenter__(self):
        await self.initialize()
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.cleanup()

    async def initialize(self):
        if not self.session:
            self.session = aiohttp.ClientSession()

    async def add_connection(self, url):
        if self._closed:
            raise RuntimeError("Resource is closed")

        async with self.session.get(url) as response:
            data = await response.text()
            return data

    async def cleanup(self):
        if self._closed:
            return

        self._closed = True

        # 모든 연결 정리
        for conn in self.connections:
            if hasattr(conn, 'close'):
                await conn.close()

        # 세션 정리
        if self.session:
            await self.session.close()

        print(f"리소스 {self.name} 정리 완료")

# 사용 예제
async def test_resource_management():
    # 올바른 사용
    async with GoodAsyncResource("test") as resource:
        data = await resource.add_connection('https://httpbin.org/get')
        print("데이터 받음")
    # 자동으로 정리됨

💡 실수 방지 체크리스트

async/await 사용 시:

  • 비동기 함수에서는 항상 await 키워드 사용하는가?
  • 동기 라이브러리 대신 비동기 라이브러리를 사용하는가?
  • time.sleep() 대신 asyncio.sleep() 사용하는가?

세션 관리 시:

  • async with를 사용해서 세션을 자동으로 닫는가?
  • 여러 요청에 하나의 세션을 재사용하는가?
  • 세션 생성/해제 비용을 고려했는가?

동시성 제어 시:

  • 세마포어나 배치 처리로 동시 실행 수를 제한하는가?
  • 시스템 리소스 한계를 고려했는가?
  • 예외가 발생해도 다른 작업이 계속될 수 있게 했는가?

예외 처리 시:

  • return_exceptions=True를 적절히 사용하는가?
  • 개별 작업의 실패가 전체에 영향을 주지 않게 했는가?
  • 예외 정보를 적절히 로깅하고 있는가?

🎯 핵심 정리

비동기 프로그래밍 Best Practices

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# 1. 항상 async with 사용
async def good_practice():
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.text()

# 2. 동시 실행은 gather 또는 create_task
async def concurrent_execution():
    # gather 사용
    results = await asyncio.gather(
        coroutine1(),
        coroutine2(),
        coroutine3()
    )
    
    # create_task 사용
    tasks = [
        asyncio.create_task(coro)
        for coro in coroutines
    ]
    results = await asyncio.gather(*tasks)

# 3. 타임아웃 설정
async def with_timeout():
    try:
        result = await asyncio.wait_for(
            long_running_task(),
            timeout=5.0
        )
    except asyncio.TimeoutError:
        handle_timeout()

# 4. 적절한 동기화 사용
async def synchronized_access():
    lock = asyncio.Lock()
    async with lock:
        # 동기화된 코드
        pass

언제 비동기를 사용할까?

graph TD
    A[비동기 사용 시기] --> B[I/O 작업이 많을 때<br/>네트워크, 파일, DB]
    A --> C[동시 처리가 필요할 때<br/>여러 API 호출]
    A --> D[대기 시간이 많을 때<br/>웹 크롤링, API 서버]
    
    E[동기 사용 시기] --> F[CPU 집약적 작업<br/>계산, 암호화]
    E --> G[순차 처리 필요<br/>의존성 있는 작업]
    E --> H[간단한 스크립트<br/>복잡도가 낮을 때]

주의사항

  1. 블로킹 호출 피하기: time.sleep() 대신 asyncio.sleep()
  2. CPU 집약적 작업: 별도 프로세스나 스레드 풀 사용
  3. 동기화: 공유 자원 접근 시 Lock 사용
  4. 예외 처리: 각 코루틴에서 예외 처리

🎓 파이썬 마스터하기 시리즈

📚 기초편 (1-7)

  1. Python 소개와 개발 환경 설정
  2. 변수, 자료형, 연산자 완벽 정리
  3. 조건문과 반복문 마스터하기
  4. 함수와 람다 완벽 가이드
  5. 리스트, 튜플, 딕셔너리 정복하기
  6. 문자열 처리와 정규표현식
  7. 파일 입출력과 예외 처리

🚀 중급편 (8-12)

  1. 클래스와 객체지향 프로그래밍
  2. 모듈과 패키지 관리
  3. 데코레이터와 제너레이터
  4. 비동기 프로그래밍 (async/await) (현재 글)
  5. 데이터베이스 연동하기

💼 고급편 (13-16)

  1. 웹 스크래핑과 API 개발
  2. 테스트와 디버깅 전략
  3. 성능 최적화 기법
  4. 멀티프로세싱과 병렬 처리

이전글: 데코레이터와 제너레이터 ⬅️ 현재글: 비동기 프로그래밍 (async/await) 다음글: 데이터베이스 연동하기 ➡️


이번 포스트에서는 Python의 비동기 프로그래밍을 완벽히 마스터했습니다. 다음 포스트에서는 데이터베이스 연동에 대해 자세히 알아보겠습니다. Happy Coding! 🐍✨

이 기사는 저작권자의 CC BY 4.0 라이센스를 따릅니다.