[이제와서 시작하는 Python 마스터하기 #13] 웹 스크래핑과 API 활용
[이제와서 시작하는 Python 마스터하기 #13] 웹 스크래핑과 API 활용
🌍 실전 예제로 시작하기: 실시간 주식 정보 수집기
웹 스크래핑과 API를 배우는 가장 좋은 방법은 실제로 사용해보는 것입니다. 한국 주식 시장 정보를 수집하는 간단한 예제로 시작해봅시다:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
import requests
from bs4 import BeautifulSoup
import json
from datetime import datetime
class KoreaStockTracker:
"""한국 주식 정보 추적기"""
def __init__(self):
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
})
def get_exchange_rate(self):
"""환율 정보 가져오기 (API 예제)"""
# 공개 API 사용
url = "https://api.exchangerate-api.com/v4/latest/USD"
try:
response = self.session.get(url)
data = response.json()
krw_rate = data['rates'].get('KRW', 0)
print(f"현재 환율: 1 USD = {krw_rate:,.0f} 원")
return krw_rate
except Exception as e:
print(f"환율 정보 가져오기 실패: {e}")
return None
def get_crypto_price(self, symbol='BTC'):
"""암호화폐 가격 가져오기 (웹 스크래핑 예제)"""
url = f"https://api.coingecko.com/api/v3/simple/price?ids=bitcoin&vs_currencies=krw"
try:
response = self.session.get(url)
data = response.json()
price = data['bitcoin']['krw']
print(f"비트코인 가격: {price:,.0f} 원")
return price
except Exception as e:
print(f"암호화폐 가격 가져오기 실패: {e}")
return None
def search_korean_news(self, keyword="삼성전자"):
"""한국 뉴스 검색 (웹 스크래핑 예제)"""
# 실제로는 네이버 뉴스 API나 적절한 뉴스 사이트를 사용
# 여기서는 예제로 간단한 구조만 보여줌
news_items = []
print(f"\n'{keyword}' 관련 최신 뉴스:")
print("-" * 50)
# 실제 스크래핑 시에는 robots.txt 확인 필수
# 여기서는 개념 설명을 위한 예제
# [!IMPORTANT]
# **robots.txt를 꼭 확인하세요!**
#
# 웹사이트 주소 뒤에 `/robots.txt`를 붙여보세요 (예: `https://www.naver.com/robots.txt`).
# `User-agent: *`, `Disallow: /` 라고 되어 있다면 스크래핑하면 안 됩니다!
# 법적인 문제가 될 수 있으니 항상 허용된 범위 내에서만 수집하세요.
try:
# 가상의 뉴스 데이터 (실제로는 웹에서 스크래핑)
sample_news = [
{
'title': f'{keyword} 주가 상승, 실적 개선 기대감',
'date': datetime.now().strftime('%Y-%m-%d'),
'summary': '전문가들은 긍정적 전망...'
},
{
'title': f'{keyword} 신제품 출시 임박',
'date': datetime.now().strftime('%Y-%m-%d'),
'summary': '혁신적인 기술 탑재 예정...'
}
]
for idx, news in enumerate(sample_news, 1):
print(f"{idx}. {news['title']}")
print(f" 날짜: {news['date']}")
print(f" 요약: {news['summary']}")
print()
news_items.append(news)
return news_items
except Exception as e:
print(f"뉴스 검색 실패: {e}")
return []
def analyze_market_sentiment(self, keyword="코스피"):
"""시장 심리 분석 (종합 예제)"""
print(f"\n=== {keyword} 시장 분석 리포트 ===")
print(f"생성 시각: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("=" * 50)
# 1. 환율 정보
exchange_rate = self.get_exchange_rate()
# 2. 암호화폐 가격
btc_price = self.get_crypto_price()
# 3. 관련 뉴스
news = self.search_korean_news(keyword)
# 간단한 심리 지수 계산 (실제로는 더 복잡한 알고리즘 사용)
sentiment_score = 50 # 기본값
if exchange_rate and exchange_rate < 1300:
sentiment_score += 10 # 원화 강세는 긍정적
if btc_price and btc_price > 50000000:
sentiment_score += 5 # 암호화폐 상승은 위험자산 선호
if len(news) > 0:
sentiment_score += len(news) * 2 # 뉴스 많으면 관심도 상승
print(f"\n📊 종합 심리 지수: {sentiment_score}/100")
if sentiment_score >= 70:
print("💹 시장 전망: 매우 긍정적")
elif sentiment_score >= 50:
print("📈 시장 전망: 긍정적")
else:
print("📉 시장 전망: 보수적 접근 필요")
return sentiment_score
# 실제 사용 예제
def main():
"""메인 실행 함수"""
print("🎯 한국 시장 정보 수집 프로그램 시작\n")
tracker = KoreaStockTracker()
# 시장 분석 실행
sentiment = tracker.analyze_market_sentiment("코스피")
# 추가 분석 가능
if sentiment > 60:
print("\n💡 투자 tip: 시장이 긍정적이니 성장주에 관심을 가져보세요")
else:
print("\n💡 투자 tip: 시장이 불안정하니 안전자산 비중을 늘리세요")
print("\n프로그램 종료")
if __name__ == "__main__":
main()
이제 본격적으로 웹 스크래핑과 API 활용 방법을 자세히 알아봅시다!
🌐 웹 스크래핑과 API 개요
웹 스크래핑은 웹 페이지에서 데이터를 추출하는 기술이고, API는 프로그램 간 통신을 위한 인터페이스입니다.
graph LR
subgraph "데이터 수집 방법"
A[웹 스크래핑] --> B[HTML 파싱<br/>BeautifulSoup]
A --> C[동적 페이지<br/>Selenium]
A --> D[빠른 처리<br/>Scrapy]
E[API 활용] --> F[REST API<br/>requests]
E --> G[GraphQL<br/>gql]
E --> H[WebSocket<br/>websocket-client]
end
📡 Requests 라이브러리
Requests는 Python에서 HTTP 요청을 보내는 가장 인기 있는 라이브러리입니다.
기본 HTTP 요청
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
import requests
from requests.exceptions import RequestException, Timeout, ConnectionError
import json
# GET 요청
def get_example():
"""GET 요청 예제"""
try:
response = requests.get('https://api.github.com/users/python')
# 상태 코드 확인
if response.status_code == 200:
data = response.json()
print(f"사용자명: {data['name']}")
print(f"공개 리포지토리 수: {data['public_repos']}")
else:
print(f"오류 발생: {response.status_code}")
except RequestException as e:
print(f"요청 실패: {e}")
> [!TIP]
> **403 에러가 뜬다면? (User-Agent)**
>
> 브라우저가 아닌 프로그램(봇)이라고 판단해서 차단당한 경우입니다.
> `headers={'User-Agent': 'Mozilla/5.0 ...'}` 처럼 헤더를 추가해서 마치 웹 브라우저인 척 속여보세요.
> 대부분의 간단한 차단은 이걸로 해결됩니다!
# POST 요청
def post_example():
"""POST 요청 예제"""
url = 'https://httpbin.org/post'
# JSON 데이터
data = {
'name': '홍길동',
'email': 'hong@example.com',
'age': 30
}
# 헤더 설정
headers = {
'Content-Type': 'application/json',
'User-Agent': 'Python/3.9'
}
try:
response = requests.post(url, json=data, headers=headers)
result = response.json()
print(f"전송된 데이터: {result['json']}")
except RequestException as e:
print(f"요청 실패: {e}")
# 파일 업로드
def upload_file():
"""파일 업로드 예제"""
url = 'https://httpbin.org/post'
files = {
'file': ('test.txt', open('test.txt', 'rb'), 'text/plain'),
'file2': ('image.png', open('image.png', 'rb'), 'image/png')
}
data = {
'description': '파일 업로드 테스트'
}
try:
response = requests.post(url, files=files, data=data)
print(f"응답: {response.status_code}")
except RequestException as e:
print(f"업로드 실패: {e}")
finally:
# 파일 닫기
for file_tuple in files.values():
if hasattr(file_tuple[1], 'close'):
file_tuple[1].close()
세션과 인증
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# 세션 사용
class APIClient:
"""API 클라이언트"""
def __init__(self, base_url):
self.base_url = base_url
self.session = requests.Session()
def login(self, username, password):
"""로그인"""
response = self.session.post(
f"{self.base_url}/login",
json={'username': username, 'password': password}
)
if response.status_code == 200:
# 세션 쿠키가 자동으로 저장됨
return True
return False
def get_profile(self):
"""프로필 조회 (인증 필요)"""
response = self.session.get(f"{self.base_url}/profile")
return response.json()
def close(self):
"""세션 종료"""
self.session.close()
# 다양한 인증 방법
def authentication_examples():
"""인증 예제"""
# 1. Basic 인증
response = requests.get(
'https://api.example.com/data',
auth=('username', 'password')
)
# 2. Bearer 토큰
headers = {
'Authorization': 'Bearer YOUR_ACCESS_TOKEN'
}
response = requests.get(
'https://api.example.com/data',
headers=headers
)
# 3. API 키
params = {
'api_key': 'YOUR_API_KEY'
}
response = requests.get(
'https://api.example.com/data',
params=params
)
# 고급 설정
def advanced_requests():
"""고급 요청 설정"""
# 타임아웃 설정
try:
response = requests.get(
'https://httpbin.org/delay/5',
timeout=(3, 10) # (연결 타임아웃, 읽기 타임아웃)
)
except Timeout:
print("요청 시간 초과")
# 재시도 설정
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
session = requests.Session()
retry = Retry(
total=3,
read=3,
connect=3,
backoff_factor=0.3,
status_forcelist=(500, 502, 504)
)
adapter = HTTPAdapter(max_retries=retry)
session.mount('http://', adapter)
session.mount('https://', adapter)
# 프록시 설정
proxies = {
'http': 'http://10.10.1.10:3128',
'https': 'http://10.10.1.11:1080',
}
response = requests.get('https://httpbin.org/ip', proxies=proxies)
🍲 BeautifulSoup으로 HTML 파싱
BeautifulSoup은 HTML과 XML을 파싱하는 강력한 라이브러리입니다.
기본 사용법
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
from bs4 import BeautifulSoup
import requests
def basic_parsing():
"""기본 HTML 파싱"""
html = """
<html>
<head>
<title>샘플 페이지</title>
</head>
<body>
<h1 class="title">메인 제목</h1>
<div class="content">
<p>첫 번째 단락입니다.</p>
<p>두 번째 단락입니다.</p>
<ul id="items">
<li>항목 1</li>
<li>항목 2</li>
<li>항목 3</li>
</ul>
</div>
</body>
</html>
"""
soup = BeautifulSoup(html, 'html.parser')
# 태그 찾기
title = soup.find('title').text
print(f"페이지 제목: {title}")
# CSS 선택자
main_title = soup.select_one('h1.title').text
print(f"메인 제목: {main_title}")
> [!TIP]
> **select() vs find()**
>
> - `find()`: 태그 이름으로 찾을 때 편합니다. (`soup.find('div')`)
> - `select()`: CSS 선택자로 찾을 때 강력합니다. (`soup.select('div.content > p')`)
>
> 복잡한 구조를 찾을 땐 크롬 개발자 도구에서 "Copy selector"를 한 뒤 `select()`에 넣으면 편해요!
# 모든 항목 찾기
items = soup.select('#items li')
for item in items:
print(f"- {item.text}")
# 속성 접근
content_div = soup.find('div', class_='content')
print(f"클래스: {content_div.get('class')}")
def scrape_website():
"""실제 웹사이트 스크래핑"""
url = 'https://news.ycombinator.com/'
try:
response = requests.get(url)
soup = BeautifulSoup(response.content, 'html.parser')
# 뉴스 제목 추출
stories = soup.select('.titleline > a')
for i, story in enumerate(stories[:10], 1):
title = story.text
link = story.get('href')
# 상대 URL 처리
if not link.startswith('http'):
link = f"https://news.ycombinator.com/{link}"
print(f"{i}. {title}")
print(f" 링크: {link}")
except Exception as e:
print(f"스크래핑 실패: {e}")
고급 파싱 기법
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
class WebScraper:
"""웹 스크래퍼 클래스"""
def __init__(self):
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
})
def scrape_table(self, url, table_class):
"""테이블 데이터 추출"""
response = self.session.get(url)
soup = BeautifulSoup(response.content, 'html.parser')
table = soup.find('table', class_=table_class)
if not table:
return []
# 헤더 추출
headers = []
header_row = table.find('thead').find('tr')
for th in header_row.find_all('th'):
headers.append(th.text.strip())
# 데이터 추출
data = []
tbody = table.find('tbody')
for row in tbody.find_all('tr'):
row_data = {}
cells = row.find_all('td')
for i, cell in enumerate(cells):
if i < len(headers):
row_data[headers[i]] = cell.text.strip()
data.append(row_data)
return data
def scrape_with_pagination(self, base_url, max_pages=5):
"""페이지네이션 처리"""
all_data = []
for page in range(1, max_pages + 1):
url = f"{base_url}?page={page}"
try:
response = self.session.get(url)
soup = BeautifulSoup(response.content, 'html.parser')
# 데이터 추출 (예: 상품 목록)
products = soup.select('.product-item')
for product in products:
data = {
'name': product.select_one('.product-name').text.strip(),
'price': product.select_one('.price').text.strip(),
'rating': product.select_one('.rating').get('data-rating'),
'image': product.select_one('img').get('src')
}
all_data.append(data)
# 다음 페이지 확인
next_button = soup.select_one('.pagination .next')
if not next_button or 'disabled' in next_button.get('class', []):
break
except Exception as e:
print(f"페이지 {page} 스크래핑 실패: {e}")
break
return all_data
def extract_structured_data(self, url):
"""구조화된 데이터 추출"""
response = self.session.get(url)
soup = BeautifulSoup(response.content, 'html.parser')
# JSON-LD 데이터 추출
json_ld = soup.find('script', type='application/ld+json')
if json_ld:
import json
structured_data = json.loads(json_ld.string)
return structured_data
# Open Graph 메타 태그
og_data = {}
for meta in soup.find_all('meta', property=lambda x: x and x.startswith('og:')):
property_name = meta.get('property')[3:] # 'og:' 제거
og_data[property_name] = meta.get('content')
return og_data
🤖 Selenium으로 동적 페이지 스크래핑
Selenium은 JavaScript로 렌더링되는 동적 웹 페이지를 스크래핑할 때 사용합니다.
Selenium 기본 사용법
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.action_chains import ActionChains
import time
class SeleniumScraper:
"""Selenium 스크래퍼"""
def __init__(self, headless=True):
# Chrome 옵션 설정
options = webdriver.ChromeOptions()
if headless:
options.add_argument('--headless')
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')
options.add_argument('--disable-gpu')
options.add_argument('user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36')
self.driver = webdriver.Chrome(options=options)
self.wait = WebDriverWait(self.driver, 10)
def scrape_dynamic_content(self, url):
"""동적 콘텐츠 스크래핑"""
self.driver.get(url)
# JavaScript 실행 대기
time.sleep(2)
# 스크롤하여 더 많은 콘텐츠 로드
last_height = self.driver.execute_script("return document.body.scrollHeight")
while True:
# 맨 아래로 스크롤
self.driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
time.sleep(2)
# 새 높이 계산
new_height = self.driver.execute_script("return document.body.scrollHeight")
if new_height == last_height:
break
last_height = new_height
# 데이터 추출
elements = self.driver.find_elements(By.CLASS_NAME, 'item')
data = []
for element in elements:
try:
item_data = {
'title': element.find_element(By.CLASS_NAME, 'title').text,
'description': element.find_element(By.CLASS_NAME, 'description').text,
'link': element.find_element(By.TAG_NAME, 'a').get_attribute('href')
}
data.append(item_data)
except:
continue
return data
def login_and_scrape(self, login_url, username, password):
"""로그인 후 스크래핑"""
self.driver.get(login_url)
# 로그인 폼 대기
username_field = self.wait.until(
EC.presence_of_element_located((By.ID, "username"))
)
password_field = self.driver.find_element(By.ID, "password")
# 로그인 정보 입력
username_field.send_keys(username)
password_field.send_keys(password)
password_field.send_keys(Keys.RETURN)
# 로그인 완료 대기
self.wait.until(
EC.presence_of_element_located((By.CLASS_NAME, "dashboard"))
)
# 로그인 후 데이터 스크래핑
return self.scrape_protected_data()
def handle_ajax_loading(self, url):
"""AJAX 로딩 처리"""
self.driver.get(url)
# "더 보기" 버튼 클릭하여 데이터 로드
while True:
try:
load_more_button = self.wait.until(
EC.element_to_be_clickable((By.CLASS_NAME, "load-more"))
)
# JavaScript로 클릭 (일반 클릭이 안 될 때)
self.driver.execute_script("arguments[0].click();", load_more_button)
# 새 데이터 로딩 대기
time.sleep(1)
except:
# 더 이상 버튼이 없으면 종료
break
# 모든 데이터 추출
return self.extract_all_data()
def take_screenshot(self, filename):
"""스크린샷 저장"""
self.driver.save_screenshot(filename)
def close(self):
"""브라우저 종료"""
self.driver.quit()
# 고급 Selenium 기능
def advanced_selenium_features():
"""고급 Selenium 기능"""
driver = webdriver.Chrome()
try:
# 1. 마우스 호버
element = driver.find_element(By.CLASS_NAME, "menu-item")
actions = ActionChains(driver)
actions.move_to_element(element).perform()
# 2. 드래그 앤 드롭
source = driver.find_element(By.ID, "draggable")
target = driver.find_element(By.ID, "droppable")
actions.drag_and_drop(source, target).perform()
# 3. 알림창 처리
alert = driver.switch_to.alert
alert_text = alert.text
alert.accept() # 또는 alert.dismiss()
# 4. 프레임 전환
driver.switch_to.frame("iframe-name")
# 프레임 내에서 작업
driver.switch_to.default_content() # 메인으로 돌아가기
# 5. 새 탭/창 처리
original_window = driver.current_window_handle
driver.find_element(By.LINK_TEXT, "새 창 열기").click()
# 새 창으로 전환
for window_handle in driver.window_handles:
if window_handle != original_window:
driver.switch_to.window(window_handle)
break
# 원래 창으로 돌아가기
driver.switch_to.window(original_window)
finally:
driver.quit()
🔌 REST API 활용
REST API는 웹 서비스와 통신하는 표준화된 방법입니다.
API 클라이언트 구현
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
import requests
from urllib.parse import urljoin
import time
from functools import wraps
class APIClient:
"""REST API 클라이언트"""
def __init__(self, base_url, api_key=None):
self.base_url = base_url
self.session = requests.Session()
if api_key:
self.session.headers['Authorization'] = f'Bearer {api_key}'
self.session.headers['Content-Type'] = 'application/json'
self.session.headers['Accept'] = 'application/json'
def _make_request(self, method, endpoint, **kwargs):
"""API 요청 수행"""
url = urljoin(self.base_url, endpoint)
try:
response = self.session.request(method, url, **kwargs)
response.raise_for_status()
# 응답이 JSON인 경우
if 'application/json' in response.headers.get('Content-Type', ''):
return response.json()
return response.text
except requests.exceptions.HTTPError as e:
if response.status_code == 429: # Rate limit
retry_after = int(response.headers.get('Retry-After', 60))
print(f"Rate limit 도달. {retry_after}초 후 재시도")
time.sleep(retry_after)
return self._make_request(method, endpoint, **kwargs)
error_data = response.json() if response.content else {}
raise APIError(response.status_code, error_data.get('message', str(e)))
def get(self, endpoint, params=None):
"""GET 요청"""
return self._make_request('GET', endpoint, params=params)
def post(self, endpoint, data=None):
"""POST 요청"""
return self._make_request('POST', endpoint, json=data)
def put(self, endpoint, data=None):
"""PUT 요청"""
return self._make_request('PUT', endpoint, json=data)
def delete(self, endpoint):
"""DELETE 요청"""
return self._make_request('DELETE', endpoint)
def patch(self, endpoint, data=None):
"""PATCH 요청"""
return self._make_request('PATCH', endpoint, json=data)
class APIError(Exception):
"""API 오류"""
def __init__(self, status_code, message):
self.status_code = status_code
self.message = message
super().__init__(f"API Error {status_code}: {message}")
# 실전 API 활용 예제
class GitHubAPI(APIClient):
"""GitHub API 클라이언트"""
def __init__(self, token=None):
super().__init__('https://api.github.com', token)
def get_user(self, username):
"""사용자 정보 조회"""
return self.get(f'/users/{username}')
def get_user_repos(self, username, per_page=30, page=1):
"""사용자 리포지토리 조회"""
params = {
'per_page': per_page,
'page': page,
'sort': 'updated',
'direction': 'desc'
}
return self.get(f'/users/{username}/repos', params=params)
def create_gist(self, description, files, public=True):
"""Gist 생성"""
data = {
'description': description,
'public': public,
'files': files
}
return self.post('/gists', data)
def search_repositories(self, query, language=None, sort='stars'):
"""리포지토리 검색"""
q = query
if language:
q += f' language:{language}'
params = {
'q': q,
'sort': sort,
'order': 'desc'
}
return self.get('/search/repositories', params=params)
# 페이지네이션 처리
class PaginatedAPI:
"""페이지네이션 지원 API"""
def __init__(self, api_client):
self.api = api_client
def get_all_pages(self, endpoint, params=None, max_pages=None):
"""모든 페이지 데이터 수집"""
if params is None:
params = {}
all_data = []
page = 1
while True:
params['page'] = page
data = self.api.get(endpoint, params)
if not data: # 더 이상 데이터가 없음
break
all_data.extend(data)
if max_pages and page >= max_pages:
break
page += 1
return all_data
def iterate_pages(self, endpoint, params=None):
"""페이지별로 순회 (제너레이터)"""
if params is None:
params = {}
page = 1
while True:
params['page'] = page
data = self.api.get(endpoint, params)
if not data:
break
yield from data
page += 1
비동기 API 클라이언트
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
import aiohttp
import asyncio
from typing import List, Dict, Any
class AsyncAPIClient:
"""비동기 API 클라이언트"""
def __init__(self, base_url: str, api_key: str = None):
self.base_url = base_url
self.headers = {
'Content-Type': 'application/json',
'Accept': 'application/json'
}
if api_key:
self.headers['Authorization'] = f'Bearer {api_key}'
async def _make_request(self, session: aiohttp.ClientSession,
method: str, endpoint: str, **kwargs) -> Any:
"""비동기 요청 수행"""
url = urljoin(self.base_url, endpoint)
async with session.request(method, url, headers=self.headers, **kwargs) as response:
if response.status == 200:
return await response.json()
else:
text = await response.text()
raise APIError(response.status, text)
async def get_multiple(self, endpoints: List[str]) -> List[Any]:
"""여러 엔드포인트 동시 요청"""
async with aiohttp.ClientSession() as session:
tasks = [
self._make_request(session, 'GET', endpoint)
for endpoint in endpoints
]
return await asyncio.gather(*tasks)
async def batch_post(self, endpoint: str, data_list: List[Dict]) -> List[Any]:
"""배치 POST 요청"""
async with aiohttp.ClientSession() as session:
tasks = [
self._make_request(session, 'POST', endpoint, json=data)
for data in data_list
]
return await asyncio.gather(*tasks, return_exceptions=True)
# 비동기 웹 스크래핑
class AsyncScraper:
"""비동기 웹 스크래퍼"""
def __init__(self, max_concurrent: int = 10):
self.semaphore = asyncio.Semaphore(max_concurrent)
async def fetch_page(self, session: aiohttp.ClientSession, url: str) -> str:
"""페이지 가져오기"""
async with self.semaphore:
async with session.get(url) as response:
return await response.text()
async def scrape_urls(self, urls: List[str]) -> List[Dict[str, Any]]:
"""여러 URL 동시 스크래핑"""
async with aiohttp.ClientSession() as session:
tasks = [self.fetch_page(session, url) for url in urls]
pages = await asyncio.gather(*tasks)
results = []
for url, html in zip(urls, pages):
soup = BeautifulSoup(html, 'html.parser')
# 데이터 추출
data = self.extract_data(soup)
data['url'] = url
results.append(data)
return results
def extract_data(self, soup: BeautifulSoup) -> Dict[str, Any]:
"""데이터 추출 (구현 필요)"""
return {
'title': soup.find('title').text if soup.find('title') else '',
'h1': soup.find('h1').text if soup.find('h1') else ''
}
# 사용 예제
async def main():
"""비동기 실행 예제"""
# API 클라이언트
api = AsyncAPIClient('https://api.example.com')
# 여러 엔드포인트 동시 요청
endpoints = ['/users/1', '/users/2', '/users/3']
results = await api.get_multiple(endpoints)
# 웹 스크래핑
scraper = AsyncScraper()
urls = [
'https://example1.com',
'https://example2.com',
'https://example3.com'
]
scraped_data = await scraper.scrape_urls(urls)
return results, scraped_data
# asyncio.run(main())
💡 실전 예제
1. 뉴스 수집 시스템
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
import requests
from bs4 import BeautifulSoup
from datetime import datetime
import sqlite3
import schedule
import time
from typing import List, Dict
import hashlib
class NewsCollector:
"""뉴스 수집 시스템"""
def __init__(self, db_path: str = 'news.db'):
self.db_path = db_path
self.init_database()
self.sources = {
'hackernews': {
'url': 'https://news.ycombinator.com',
'parser': self.parse_hackernews
},
'reddit': {
'url': 'https://www.reddit.com/r/programming.json',
'parser': self.parse_reddit
}
}
def init_database(self):
"""데이터베이스 초기화"""
conn = sqlite3.connect(self.db_path)
conn.execute("""
CREATE TABLE IF NOT EXISTS articles (
id TEXT PRIMARY KEY,
source TEXT NOT NULL,
title TEXT NOT NULL,
url TEXT NOT NULL,
author TEXT,
score INTEGER,
comments INTEGER,
collected_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(source, url)
)
""")
conn.commit()
conn.close()
def generate_id(self, source: str, url: str) -> str:
"""고유 ID 생성"""
content = f"{source}:{url}"
return hashlib.md5(content.encode()).hexdigest()
def parse_hackernews(self, html: str) -> List[Dict]:
"""Hacker News 파싱"""
soup = BeautifulSoup(html, 'html.parser')
articles = []
# 제목 추출
title_rows = soup.select('.athing')
for row in title_rows[:30]: # 상위 30개
try:
title_cell = row.select_one('.titleline > a')
if not title_cell:
continue
article = {
'title': title_cell.text,
'url': title_cell.get('href', ''),
'source': 'hackernews'
}
# 메타 정보 추출
meta_row = row.find_next_sibling('tr')
if meta_row:
score = meta_row.select_one('.score')
article['score'] = int(score.text.split()[0]) if score else 0
comments = meta_row.select_one('.subline > a:last-child')
if comments and 'comment' in comments.text:
article['comments'] = int(comments.text.split()[0])
else:
article['comments'] = 0
author = meta_row.select_one('.hnuser')
article['author'] = author.text if author else 'anonymous'
articles.append(article)
except Exception as e:
print(f"파싱 오류: {e}")
continue
return articles
def parse_reddit(self, json_data: Dict) -> List[Dict]:
"""Reddit 파싱"""
articles = []
for post in json_data.get('data', {}).get('children', []):
data = post.get('data', {})
article = {
'source': 'reddit',
'title': data.get('title', ''),
'url': data.get('url', ''),
'author': data.get('author', 'deleted'),
'score': data.get('score', 0),
'comments': data.get('num_comments', 0)
}
articles.append(article)
return articles
def collect_news(self):
"""뉴스 수집"""
all_articles = []
for source_name, source_info in self.sources.items():
try:
print(f"{source_name} 수집 중...")
if source_name == 'reddit':
# JSON API
response = requests.get(
source_info['url'],
headers={'User-Agent': 'NewsCollector/1.0'}
)
articles = source_info['parser'](response.json())
else:
# HTML 스크래핑
response = requests.get(source_info['url'])
articles = source_info['parser'](response.text)
all_articles.extend(articles)
print(f"{source_name}: {len(articles)}개 수집")
except Exception as e:
print(f"{source_name} 수집 실패: {e}")
# 데이터베이스 저장
self.save_articles(all_articles)
return all_articles
def save_articles(self, articles: List[Dict]):
"""기사 저장"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
saved_count = 0
for article in articles:
try:
article_id = self.generate_id(
article['source'],
article['url']
)
cursor.execute("""
INSERT OR IGNORE INTO articles
(id, source, title, url, author, score, comments)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (
article_id,
article['source'],
article['title'],
article['url'],
article.get('author'),
article.get('score', 0),
article.get('comments', 0)
))
if cursor.rowcount > 0:
saved_count += 1
except Exception as e:
print(f"저장 오류: {e}")
conn.commit()
conn.close()
print(f"총 {saved_count}개 기사 저장")
def get_top_articles(self, limit: int = 10) -> List[Dict]:
"""인기 기사 조회"""
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
cursor = conn.execute("""
SELECT * FROM articles
WHERE collected_at > datetime('now', '-1 day')
ORDER BY score DESC
LIMIT ?
""", (limit,))
articles = [dict(row) for row in cursor.fetchall()]
conn.close()
return articles
def run_scheduler(self):
"""스케줄러 실행"""
# 매 시간마다 수집
schedule.every().hour.do(self.collect_news)
# 첫 실행
self.collect_news()
while True:
schedule.run_pending()
time.sleep(60)
# 사용 예제
if __name__ == "__main__":
collector = NewsCollector()
# 단일 수집
collector.collect_news()
# 인기 기사 출력
top_articles = collector.get_top_articles()
for i, article in enumerate(top_articles, 1):
print(f"{i}. {article['title']}")
print(f" 점수: {article['score']}, 댓글: {article['comments']}")
print(f" URL: {article['url']}\n")
2. API 모니터링 시스템
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
import requests
import time
from datetime import datetime, timedelta
import json
from typing import List, Dict, Optional
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
class APIMonitor:
"""API 모니터링 시스템"""
def __init__(self, config_file: str = 'monitor_config.json'):
with open(config_file, 'r') as f:
self.config = json.load(f)
self.endpoints = self.config['endpoints']
self.check_interval = self.config.get('check_interval', 300) # 5분
self.alert_config = self.config.get('alerts', {})
self.history = []
def check_endpoint(self, endpoint: Dict) -> Dict:
"""엔드포인트 체크"""
result = {
'endpoint': endpoint['url'],
'name': endpoint['name'],
'timestamp': datetime.now().isoformat(),
'success': False,
'response_time': None,
'status_code': None,
'error': None
}
try:
start_time = time.time()
response = requests.request(
method=endpoint.get('method', 'GET'),
url=endpoint['url'],
headers=endpoint.get('headers', {}),
timeout=endpoint.get('timeout', 10)
)
response_time = (time.time() - start_time) * 1000 # ms
result['response_time'] = response_time
result['status_code'] = response.status_code
result['success'] = response.status_code == endpoint.get('expected_status', 200)
# 응답 검증
if endpoint.get('validate_response'):
validator = endpoint['validate_response']
if validator['type'] == 'json_path':
data = response.json()
# JSON 경로 검증 로직
pass
except requests.exceptions.Timeout:
result['error'] = 'Timeout'
except requests.exceptions.ConnectionError:
result['error'] = 'Connection Error'
except Exception as e:
result['error'] = str(e)
return result
def check_all_endpoints(self) -> List[Dict]:
"""모든 엔드포인트 체크"""
results = []
for endpoint in self.endpoints:
result = self.check_endpoint(endpoint)
results.append(result)
# 실패 시 알림
if not result['success']:
self.send_alert(result)
self.history.extend(results)
self.cleanup_history()
return results
def send_alert(self, result: Dict):
"""알림 발송"""
if self.alert_config.get('email'):
self.send_email_alert(result)
if self.alert_config.get('webhook'):
self.send_webhook_alert(result)
def send_email_alert(self, result: Dict):
"""이메일 알림"""
email_config = self.alert_config['email']
subject = f"API 모니터링 알림: {result['name']} 실패"
body = f"""
API 엔드포인트 모니터링 실패 알림
엔드포인트: {result['endpoint']}
이름: {result['name']}
시간: {result['timestamp']}
상태 코드: {result['status_code']}
오류: {result['error']}
응답 시간: {result['response_time']}ms
"""
msg = MIMEMultipart()
msg['From'] = email_config['from']
msg['To'] = email_config['to']
msg['Subject'] = subject
msg.attach(MIMEText(body, 'plain'))
try:
server = smtplib.SMTP(email_config['smtp_server'], email_config['smtp_port'])
server.starttls()
server.login(email_config['username'], email_config['password'])
server.send_message(msg)
server.quit()
except Exception as e:
print(f"이메일 발송 실패: {e}")
def send_webhook_alert(self, result: Dict):
"""웹훅 알림"""
webhook_url = self.alert_config['webhook']['url']
payload = {
'text': f"API 모니터링 알림: {result['name']} 실패",
'attachments': [{
'color': 'danger',
'fields': [
{'title': '엔드포인트', 'value': result['endpoint'], 'short': True},
{'title': '상태 코드', 'value': str(result['status_code']), 'short': True},
{'title': '오류', 'value': result['error'] or 'N/A', 'short': True},
{'title': '시간', 'value': result['timestamp'], 'short': True}
]
}]
}
try:
requests.post(webhook_url, json=payload)
except Exception as e:
print(f"웹훅 발송 실패: {e}")
def get_statistics(self, hours: int = 24) -> Dict:
"""통계 조회"""
cutoff = datetime.now() - timedelta(hours=hours)
recent_history = [
h for h in self.history
if datetime.fromisoformat(h['timestamp']) > cutoff
]
stats = {}
for endpoint in self.endpoints:
endpoint_history = [
h for h in recent_history
if h['endpoint'] == endpoint['url']
]
if endpoint_history:
success_count = sum(1 for h in endpoint_history if h['success'])
total_count = len(endpoint_history)
response_times = [
h['response_time'] for h in endpoint_history
if h['response_time'] is not None
]
stats[endpoint['name']] = {
'uptime': (success_count / total_count) * 100,
'avg_response_time': sum(response_times) / len(response_times) if response_times else 0,
'min_response_time': min(response_times) if response_times else 0,
'max_response_time': max(response_times) if response_times else 0,
'total_checks': total_count,
'failures': total_count - success_count
}
return stats
def cleanup_history(self):
"""오래된 기록 정리"""
cutoff = datetime.now() - timedelta(days=7)
self.history = [
h for h in self.history
if datetime.fromisoformat(h['timestamp']) > cutoff
]
def run(self):
"""모니터링 실행"""
print(f"API 모니터링 시작 (체크 간격: {self.check_interval}초)")
while True:
try:
results = self.check_all_endpoints()
# 결과 출력
for result in results:
status = "✓" if result['success'] else "✗"
print(f"{status} {result['name']}: {result['status_code']} ({result['response_time']:.0f}ms)")
# 통계 출력
stats = self.get_statistics(hours=1)
for name, stat in stats.items():
print(f" {name}: {stat['uptime']:.1f}% uptime, {stat['avg_response_time']:.0f}ms avg")
print("-" * 50)
except KeyboardInterrupt:
print("\n모니터링 종료")
break
except Exception as e:
print(f"모니터링 오류: {e}")
time.sleep(self.check_interval)
# 설정 파일 예시 (monitor_config.json)
"""
{
"check_interval": 300,
"endpoints": [
{
"name": "GitHub API",
"url": "https://api.github.com",
"method": "GET",
"expected_status": 200,
"timeout": 10
},
{
"name": "JSONPlaceholder",
"url": "https://jsonplaceholder.typicode.com/posts/1",
"method": "GET",
"expected_status": 200,
"validate_response": {
"type": "json_path",
"path": "$.userId",
"expected": 1
}
}
],
"alerts": {
"email": {
"smtp_server": "smtp.gmail.com",
"smtp_port": 587,
"from": "monitor@example.com",
"to": "admin@example.com",
"username": "monitor@example.com",
"password": "app-password"
},
"webhook": {
"url": "https://hooks.slack.com/services/YOUR/WEBHOOK/URL"
}
}
}
"""
⚠️ 초보자들이 자주 하는 실수
웹 스크래핑과 API 사용 시 초보자들이 자주 하는 실수들과 해결 방법을 알아봅시다:
1. robots.txt 무시하기
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# ❌ 잘못된 예: robots.txt 확인 없이 무작정 스크래핑
def bad_scraping():
"""robots.txt 무시하고 스크래핑"""
url = "https://example.com/data"
response = requests.get(url)
# 바로 스크래핑 시작 - 문제!
# ✅ 올바른 예: robots.txt 확인 후 스크래핑
import urllib.robotparser
def good_scraping():
"""robots.txt 확인 후 스크래핑"""
domain = "https://example.com"
# robots.txt 확인
rp = urllib.robotparser.RobotFileParser()
rp.set_url(f"{domain}/robots.txt")
rp.read()
# 접근 가능 여부 확인
if rp.can_fetch("*", f"{domain}/data"):
response = requests.get(f"{domain}/data")
print("스크래핑 허용됨")
else:
print("이 페이지는 스크래핑이 금지되어 있습니다")
2. User-Agent 헤더 누락
1
2
3
4
5
6
7
8
9
10
11
12
13
# ❌ 잘못된 예: User-Agent 없이 요청
def bad_request():
"""User-Agent 없는 요청"""
response = requests.get("https://example.com")
# 많은 사이트에서 차단될 수 있음
# ✅ 올바른 예: 적절한 User-Agent 설정
def good_request():
"""User-Agent 포함한 요청"""
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
}
response = requests.get("https://example.com", headers=headers)
3. 너무 빠른 요청 속도
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# ❌ 잘못된 예: 딜레이 없이 연속 요청
def bad_rapid_requests():
"""너무 빠른 연속 요청"""
urls = ["https://example.com/page1", "https://example.com/page2"]
for url in urls:
response = requests.get(url) # 즉시 요청 - IP 차단 위험!
process_data(response)
# ✅ 올바른 예: 적절한 딜레이 추가
import time
import random
def good_polite_requests():
"""정중한 요청 간격"""
urls = ["https://example.com/page1", "https://example.com/page2"]
for url in urls:
response = requests.get(url)
process_data(response)
# 1~3초 랜덤 대기
time.sleep(random.uniform(1, 3))
4. 에러 처리 부재
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# ❌ 잘못된 예: 에러 처리 없음
def bad_error_handling():
"""에러 처리 없는 스크래핑"""
response = requests.get("https://example.com")
soup = BeautifulSoup(response.content, 'html.parser')
title = soup.find('h1').text # NoneType 에러 가능!
# ✅ 올바른 예: 철저한 에러 처리
def good_error_handling():
"""에러 처리가 있는 스크래핑"""
try:
response = requests.get("https://example.com", timeout=10)
response.raise_for_status() # HTTP 에러 체크
soup = BeautifulSoup(response.content, 'html.parser')
title_element = soup.find('h1')
if title_element:
title = title_element.text
else:
title = "제목 없음"
except requests.exceptions.Timeout:
print("요청 시간 초과")
except requests.exceptions.RequestException as e:
print(f"요청 실패: {e}")
except Exception as e:
print(f"예상치 못한 에러: {e}")
5. API 키 하드코딩
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# ❌ 잘못된 예: API 키를 코드에 직접 입력
def bad_api_key():
"""API 키 하드코딩"""
API_KEY = "sk-1234567890abcdef" # 절대 이렇게 하지 마세요!
headers = {'Authorization': f'Bearer {API_KEY}'}
# ✅ 올바른 예: 환경 변수 사용
import os
from dotenv import load_dotenv
def good_api_key():
"""환경 변수에서 API 키 로드"""
load_dotenv() # .env 파일 로드
API_KEY = os.getenv('API_KEY')
if not API_KEY:
raise ValueError("API_KEY 환경 변수가 설정되지 않았습니다")
headers = {'Authorization': f'Bearer {API_KEY}'}
6. 세션 재사용하지 않기
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# ❌ 잘못된 예: 매번 새 연결 생성
def bad_no_session():
"""세션 재사용 안 함"""
for i in range(10):
response = requests.get("https://api.example.com/data")
# 매번 새로운 TCP 연결 - 비효율적!
# ✅ 올바른 예: 세션 재사용
def good_with_session():
"""세션 재사용으로 성능 향상"""
session = requests.Session()
for i in range(10):
response = session.get("https://api.example.com/data")
# 연결 재사용 - 효율적!
session.close()
7. 동적 콘텐츠를 BeautifulSoup으로 파싱
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# ❌ 잘못된 예: JavaScript 렌더링 페이지를 BeautifulSoup으로 파싱
def bad_dynamic_parsing():
"""동적 콘텐츠 파싱 실패"""
response = requests.get("https://example.com/spa")
soup = BeautifulSoup(response.content, 'html.parser')
# JavaScript로 생성된 콘텐츠는 보이지 않음!
# ✅ 올바른 예: Selenium 사용
from selenium import webdriver
def good_dynamic_parsing():
"""동적 콘텐츠 파싱 성공"""
driver = webdriver.Chrome()
driver.get("https://example.com/spa")
# JavaScript 실행 대기
driver.implicitly_wait(10)
# 이제 동적 콘텐츠도 파싱 가능
content = driver.page_source
soup = BeautifulSoup(content, 'html.parser')
driver.quit()
8. Rate Limiting 무시
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# ❌ 잘못된 예: Rate limit 무시
def bad_ignore_rate_limit():
"""Rate limit 무시"""
while True:
response = requests.get("https://api.example.com/data")
if response.status_code == 429:
print("Rate limit 도달했지만 계속 요청...")
# 계속 요청 - API 차단 위험!
# ✅ 올바른 예: Rate limit 준수
def good_respect_rate_limit():
"""Rate limit 준수"""
while True:
response = requests.get("https://api.example.com/data")
if response.status_code == 429:
# Retry-After 헤더 확인
retry_after = int(response.headers.get('Retry-After', 60))
print(f"{retry_after}초 후 재시도...")
time.sleep(retry_after)
else:
break
🎯 핵심 정리
웹 스크래핑 Best Practices
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# 1. robots.txt 확인
def check_robots_txt(domain):
"""robots.txt 확인"""
import urllib.robotparser
rp = urllib.robotparser.RobotFileParser()
rp.set_url(f"{domain}/robots.txt")
rp.read()
return rp.can_fetch("*", domain)
# 2. 요청 간격 조절
import time
import random
def polite_request(url, min_delay=1, max_delay=3):
"""정중한 요청"""
time.sleep(random.uniform(min_delay, max_delay))
return requests.get(url)
# 3. 사용자 에이전트 설정
headers = {
'User-Agent': 'Mozilla/5.0 (compatible; YourBot/1.0; +http://yoursite.com/bot)'
}
# 4. 오류 처리
def safe_scrape(url, retries=3):
"""안전한 스크래핑"""
for i in range(retries):
try:
response = requests.get(url, timeout=10)
response.raise_for_status()
return response
except Exception as e:
if i == retries - 1:
raise
time.sleep(2 ** i) # 지수 백오프
API 사용 가이드
graph TD
A[API 사용 시작] --> B{인증 필요?}
B -->|예| C[API 키 발급]
B -->|아니오| D[직접 사용]
C --> E[헤더/파라미터 설정]
E --> F[요청 보내기]
D --> F
F --> G{응답 확인}
G -->|성공| H[데이터 처리]
G -->|실패| I[오류 처리]
I --> J{재시도?}
J -->|예| F
J -->|아니오| K[종료]
🎓 파이썬 마스터하기 시리즈
📚 기초편 (1-7)
- Python 소개와 개발 환경 설정
- 변수, 자료형, 연산자 완벽 정리
- 조건문과 반복문 마스터하기
- 함수와 람다 완벽 가이드
- 리스트, 튜플, 딕셔너리 정복하기
- 문자열 처리와 정규표현식
- 파일 입출력과 예외 처리
🚀 중급편 (8-12)
💼 고급편 (13-16)
- 웹 스크래핑과 API 개발 (현재 글)
- 테스트와 디버깅 전략
- 성능 최적화 기법
- 멀티프로세싱과 병렬 처리
이전글: 데이터베이스 연동하기 ⬅️ 현재글: 웹 스크래핑과 API 개발 다음글: 테스트와 디버깅 전략 ➡️
이번 포스트에서는 Python에서 웹 스크래핑과 API를 활용하는 방법을 완벽히 마스터했습니다. 다음 포스트에서는 테스트와 디버깅 전략에 대해 자세히 알아보겠습니다. Happy Coding! 🐍✨
이 기사는 저작권자의 CC BY 4.0 라이센스를 따릅니다.