1GB 로그 파일에서 re.findall(r'\w+@\w+\.\w+', line) → 이메일만 쏙! 😊
서버 로그에서 에러만 찾기, 100만 줄 파일도 메모리 걱정 없이 처리, 폴더 안 1000개 파일 검색… 실무 파일 처리 기술을 마스터합니다!
(35-45분 완독 ⭐⭐⭐)
🎯 오늘의 학습 목표
⭐⭐⭐ (35-45분 완독)
📚 사전 지식
🎯 학습 목표 1: 정규표현식으로 파일 처리하기
정규표현식(Regular Expression)이란?
정규표현식은 문자열에서 특정 패턴을 찾는 강력한 도구입니다. 처음에는 복잡해 보이지만, 기본 문법만 알면 매우 유용합니다!
기본 패턴 문자:
| 패턴 | 의미 | 예시 |
\d | 숫자 한 개 (0-9) | \d\d\d → “123” |
\s | 공백 문자 | hello\sworld → “hello world” |
+ | 앞 패턴이 1개 이상 | \d+ → “123”, “4567” |
* | 앞 패턴이 0개 이상 | ab* → “a”, “ab”, “abbb” |
[] | 문자 집합 | [a-z] → 소문자 하나 |
{n} | 정확히 n개 반복 | \d{3} → “010” |
{n,m} | n개 이상 m개 이하 | \d{2,4} → “12”, “123”, “1234” |
r'' (Raw String) 문법:
1
2
3
| # r''은 백슬래시(\)를 그대로 사용합니다
pattern = r'\d+' # ✅ 권장: Raw String
pattern = '\\d+' # 같은 의미지만 복잡함
|
💡 팁: 정규표현식 패턴은 항상 r'' 형태로 작성하세요!
re 모듈 기초
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
| import re
# 정규표현식 패턴
email_pattern = r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}'
phone_pattern = r'\d{3}-\d{4}-\d{4}'
url_pattern = r'https?://[^\s]+'
text = """
연락처: alice@example.com, 010-1234-5678
웹사이트: https://example.com
"""
# 이메일 찾기
emails = re.findall(email_pattern, text)
print(f"이메일: {emails}")
# 전화번호 찾기
phones = re.findall(phone_pattern, text)
print(f"전화번호: {phones}")
# URL 찾기
urls = re.findall(url_pattern, text)
print(f"URL: {urls}")
|
파일에서 패턴 추출
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| def extract_emails(filename):
"""파일에서 모든 이메일 주소 추출"""
email_pattern = r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}'
emails = []
with open(filename, 'r', encoding='utf-8') as f:
for line in f:
found = re.findall(email_pattern, line)
emails.extend(found)
# 중복 제거
return list(set(emails))
# 사용 예제
# emails = extract_emails("contacts.txt")
# for email in emails:
# print(email)
|
로그 파일 파싱
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
| import re
from datetime import datetime
def parse_log_file(filename):
"""Apache 스타일 로그 파싱"""
# 로그 패턴: IP - - [날짜] "요청" 상태코드 크기
pattern = r'(\d+\.\d+\.\d+\.\d+).*\[(.*?)\].*"(.*?)".*(\d{3})'
logs = []
with open(filename, 'r') as f:
for line in f:
match = re.search(pattern, line)
if match:
ip, timestamp, request, status = match.groups()
logs.append({
'ip': ip,
'timestamp': timestamp,
'request': request,
'status': int(status)
})
return logs
# 사용 예제
# logs = parse_log_file("access.log")
#
# # 404 에러만 필터링
# errors_404 = [log for log in logs if log['status'] == 404]
# print(f"404 에러: {len(errors_404)}건")
|
텍스트 치환
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
| def clean_text_file(input_file, output_file):
"""텍스트 파일 정리 (공백, 특수문자 등)"""
with open(input_file, 'r', encoding='utf-8') as fin, \
open(output_file, 'w', encoding='utf-8') as fout:
for line in fin:
# 여러 공백을 하나로
line = re.sub(r'\s+', ' ', line)
# 앞뒤 공백 제거
line = line.strip()
# 빈 줄 제외
if line:
fout.write(line + '\n')
# 사용
# clean_text_file("messy.txt", "clean.txt")
|
🎯 학습 목표 2: 대용량 파일 효율적으로 처리하기
메모리 효율적인 읽기
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| def count_lines_efficient(filename):
"""대용량 파일의 줄 수 세기 (메모리 효율적)"""
count = 0
with open(filename, 'r', encoding='utf-8') as f:
for line in f:
count += 1
return count
# 비교: 비효율적인 방법
def count_lines_inefficient(filename):
"""❌ 메모리 비효율적 (전체 파일 로드)"""
with open(filename, 'r') as f:
lines = f.readlines() # 전체를 메모리에!
return len(lines)
# 1GB 파일도 효율적으로 처리
# print(f"줄 수: {count_lines_efficient('huge_file.txt'):,}")
|
청크 단위 처리
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| def process_large_file(filename, chunk_size=1024*1024):
"""대용량 파일을 청크 단위로 처리"""
with open(filename, 'r', encoding='utf-8') as f:
while True:
chunk = f.read(chunk_size) # 1MB씩 읽기
if not chunk:
break
# 청크 처리
process_chunk(chunk)
def process_chunk(chunk):
"""청크 데이터 처리 예제"""
# 예: 특정 단어 개수 세기
word_count = chunk.lower().count('python')
print(f"이번 청크에서 'python': {word_count}개")
|
스트리밍 방식 처리
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| def stream_process(input_file, output_file, transform_func):
"""스트리밍 방식으로 파일 변환"""
with open(input_file, 'r', encoding='utf-8') as fin, \
open(output_file, 'w', encoding='utf-8') as fout:
for line in fin:
# 각 줄을 변환
transformed = transform_func(line)
fout.write(transformed)
# 사용 예제: 모든 텍스트를 대문자로
def to_upper(line):
return line.upper()
# stream_process("input.txt", "output.txt", to_upper)
|
실전: 대용량 로그 분석
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
| from collections import defaultdict # 기본값이 있는 딕셔너리
def analyze_large_log(filename):
"""대용량 로그 파일 분석 (스트리밍)"""
# defaultdict(int)는 존재하지 않는 키에 접근하면 자동으로 0을 생성
# 일반 딕셔너리와 달리 KeyError가 발생하지 않아 편리합니다
stats = {
'total_lines': 0,
'status_codes': defaultdict(int), # {'200': 0, '404': 0, ...} 자동 생성
'top_ips': defaultdict(int) # {'192.168.1.1': 0, ...} 자동 생성
}
# 정규표현식 패턴
pattern = r'(\d+\.\d+\.\d+\.\d+).*".*"\s+(\d{3})'
with open(filename, 'r') as f:
for line in f:
stats['total_lines'] += 1
match = re.search(pattern, line)
if match:
ip, status = match.groups()
stats['status_codes'][status] += 1
stats['top_ips'][ip] += 1
# 진행 상황 출력 (10만 줄마다)
if stats['total_lines'] % 100000 == 0:
print(f"처리 중... {stats['total_lines']:,}줄")
# 상위 10개 IP
top_10_ips = sorted(stats['top_ips'].items(),
key=lambda x: x[1],
reverse=True)[:10]
print(f"\n총 {stats['total_lines']:,}줄 분석 완료")
print("\n상태 코드 분포:")
for code, count in sorted(stats['status_codes'].items()):
print(f" {code}: {count:,}회")
print("\n상위 10개 IP:")
for ip, count in top_10_ips:
print(f" {ip}: {count:,}회")
# 사용
# analyze_large_log("access.log")
|
🎯 학습 목표 3: 파일 검색과 필터링하기
grep 구현 (텍스트 검색)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
| def grep(pattern, filename, ignore_case=False, line_numbers=False):
"""Unix grep 명령어 구현"""
flags = re.IGNORECASE if ignore_case else 0
compiled_pattern = re.compile(pattern, flags)
matches = []
with open(filename, 'r', encoding='utf-8') as f:
for line_num, line in enumerate(f, 1):
if compiled_pattern.search(line):
if line_numbers:
matches.append((line_num, line.strip()))
else:
matches.append(line.strip())
return matches
# 사용 예제
pattern = r'error'
matches = grep(pattern, 'app.log', ignore_case=True, line_numbers=True)
print(f"'{pattern}' 발견: {len(matches)}건")
for line_num, line in matches[:10]: # 상위 10개만
print(f"{line_num:4d}: {line}")
|
여러 파일에서 검색
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
| import os
def search_in_files(pattern, directory, extension='.txt'):
"""디렉토리의 모든 파일에서 패턴 검색"""
results = {}
for filename in os.listdir(directory):
if filename.endswith(extension):
filepath = os.path.join(directory, filename)
matches = grep(pattern, filepath, line_numbers=True)
if matches:
results[filename] = matches
return results
# 사용
# results = search_in_files(r'TODO', './src', extension='.py')
#
# for filename, matches in results.items():
# print(f"\n📄 {filename}: {len(matches)}건")
# for line_num, line in matches[:3]:
# print(f" {line_num}: {line}")
|
조건부 필터링
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
| def filter_file(input_file, output_file, condition):
"""조건에 맞는 줄만 추출"""
with open(input_file, 'r', encoding='utf-8') as fin, \
open(output_file, 'w', encoding='utf-8') as fout:
for line in fin:
if condition(line):
fout.write(line)
# 사용 예제 1: ERROR 포함 줄만
# filter_file('app.log', 'errors.log', lambda line: 'ERROR' in line)
# 사용 예제 2: 빈 줄 제외
# filter_file('messy.txt', 'clean.txt', lambda line: line.strip())
# 사용 예제 3: 특정 날짜만
# filter_file('access.log', '2025-04-11.log',
# lambda line: '2025-04-11' in line)
|
🎯 학습 목표 4: 실전 텍스트 변환 패턴 익히기
대소문자 변환
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| def convert_case(input_file, output_file, mode='upper'):
"""대소문자 변환"""
with open(input_file, 'r', encoding='utf-8') as fin, \
open(output_file, 'w', encoding='utf-8') as fout:
for line in fin:
if mode == 'upper':
fout.write(line.upper())
elif mode == 'lower':
fout.write(line.lower())
elif mode == 'title':
fout.write(line.title())
elif mode == 'capitalize':
fout.write(line.capitalize())
# convert_case('input.txt', 'output.txt', mode='upper')
|
문자 치환
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| def replace_in_file(input_file, output_file, replacements):
"""파일에서 여러 문자열 동시 치환"""
with open(input_file, 'r', encoding='utf-8') as fin, \
open(output_file, 'w', encoding='utf-8') as fout:
for line in fin:
# 모든 치환 규칙 적용
for old, new in replacements.items():
line = line.replace(old, new)
fout.write(line)
# 사용 예제
replacements = {
'colour': 'color',
'favourite': 'favorite',
'centre': 'center'
}
# replace_in_file('british.txt', 'american.txt', replacements)
|
정규표현식 치환
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| def regex_replace(input_file, output_file, pattern, replacement):
"""정규표현식 기반 치환"""
compiled = re.compile(pattern)
with open(input_file, 'r', encoding='utf-8') as fin, \
open(output_file, 'w', encoding='utf-8') as fout:
for line in fin:
new_line = compiled.sub(replacement, line)
fout.write(new_line)
# 사용 예제: 이메일 마스킹
# pattern = r'([a-zA-Z0-9._%+-]+)@([a-zA-Z0-9.-]+\.[a-zA-Z]{2,})'
# replacement = r'\1@***'
# regex_replace('contacts.txt', 'masked.txt', pattern, replacement)
|
줄 정렬
1
2
3
4
5
6
7
8
9
10
11
12
| def sort_file_lines(input_file, output_file, reverse=False):
"""파일의 줄을 정렬"""
with open(input_file, 'r', encoding='utf-8') as f:
lines = f.readlines()
# 정렬
sorted_lines = sorted(lines, reverse=reverse)
with open(output_file, 'w', encoding='utf-8') as f:
f.writelines(sorted_lines)
# sort_file_lines('unsorted.txt', 'sorted.txt')
|
🎯 학습 목표 5: 실전 예제와 활용
예제 1: 코드 통계 분석기
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
| import os
def analyze_code_directory(directory, extensions=('.py', '.js', '.java')):
"""코드 디렉토리 통계 분석"""
stats = {
'total_files': 0,
'total_lines': 0,
'total_code_lines': 0,
'total_comment_lines': 0,
'total_blank_lines': 0,
'files_by_ext': {}
}
for root, dirs, files in os.walk(directory):
for filename in files:
if filename.endswith(extensions):
filepath = os.path.join(root, filename)
ext = os.path.splitext(filename)[1]
# 파일 분석
file_stats = analyze_code_file(filepath)
# 통계 업데이트
stats['total_files'] += 1
stats['total_lines'] += file_stats['total']
stats['total_code_lines'] += file_stats['code']
stats['total_comment_lines'] += file_stats['comment']
stats['total_blank_lines'] += file_stats['blank']
if ext not in stats['files_by_ext']:
stats['files_by_ext'][ext] = 0
stats['files_by_ext'][ext] += 1
# 결과 출력
print("\n📊 코드 통계")
print("=" * 60)
print(f"총 파일: {stats['total_files']}개")
print(f"총 줄 수: {stats['total_lines']:,}줄")
print(f" • 코드: {stats['total_code_lines']:,}줄")
print(f" • 주석: {stats['total_comment_lines']:,}줄")
print(f" • 빈 줄: {stats['total_blank_lines']:,}줄")
print("\n확장자별:")
for ext, count in stats['files_by_ext'].items():
print(f" {ext}: {count}개")
return stats
def analyze_code_file(filepath):
"""단일 코드 파일 분석"""
stats = {'total': 0, 'code': 0, 'comment': 0, 'blank': 0}
try:
with open(filepath, 'r', encoding='utf-8') as f:
for line in f:
stats['total'] += 1
stripped = line.strip()
if not stripped:
stats['blank'] += 1
elif stripped.startswith('#') or stripped.startswith('//'):
stats['comment'] += 1
else:
stats['code'] += 1
except Exception as e:
print(f"⚠️ {filepath} 분석 실패: {e}")
return stats
# 사용
# analyze_code_directory('./my_project')
|
예제 2: 마크다운 목차 생성기
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
| def generate_toc(markdown_file):
"""마크다운 파일에서 목차(TOC) 생성"""
toc = []
heading_pattern = r'^(#{1,6})\s+(.+)$'
with open(markdown_file, 'r', encoding='utf-8') as f:
for line in f:
match = re.match(heading_pattern, line)
if match:
level = len(match.group(1)) # # 개수
title = match.group(2).strip()
# 앵커 생성 (공백 -> 하이픈, 소문자)
anchor = title.lower().replace(' ', '-')
anchor = re.sub(r'[^\w-]', '', anchor)
# 들여쓰기
indent = ' ' * (level - 1)
toc.append(f"{indent}- [{title}](#{anchor})")
return '\n'.join(toc)
# 사용 예제
# toc = generate_toc('README.md')
# print("## 목차\n")
# print(toc)
|
예제 3: CSV to Markdown 테이블 변환
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
| def csv_to_markdown(csv_file, output_file):
"""CSV 파일을 마크다운 테이블로 변환"""
with open(csv_file, 'r', encoding='utf-8') as f:
lines = f.readlines()
if not lines:
return
# 헤더
header = lines[0].strip().split(',')
# 마크다운 테이블 생성
with open(output_file, 'w', encoding='utf-8') as f:
# 헤더 행
f.write('| ' + ' | '.join(header) + ' |\n')
# 구분선
f.write('|' + '---|' * len(header) + '\n')
# 데이터 행
for line in lines[1:]:
row = line.strip().split(',')
f.write('| ' + ' | '.join(row) + ' |\n')
# 사용
# csv_to_markdown('data.csv', 'table.md')
|
💡 실전 팁 & 주의사항
Tip 1: 대용량 파일은 스트리밍으로 처리하세요
1
2
3
4
5
6
7
8
9
10
| # ❌ 나쁜 예: 전체를 메모리에 로드
with open("huge.log", "r") as f:
content = f.read() # 메모리 부족 위험!
for line in content.splitlines():
process(line)
# ✅ 좋은 예: 한 줄씩 스트리밍
with open("huge.log", "r") as f:
for line in f: # 메모리 효율적!
process(line.strip())
|
이유: 대용량 파일을 한 번에 읽으면 메모리가 부족할 수 있습니다.
Tip 2: 정규표현식은 컴파일해서 재사용하세요
1
2
3
4
5
6
7
8
9
10
| # ❌ 나쁜 예: 매번 컴파일
for line in file:
if re.search(r'\d{3}-\d{4}', line): # 매번 컴파일!
process(line)
# ✅ 좋은 예: 한 번만 컴파일
pattern = re.compile(r'\d{3}-\d{4}')
for line in file:
if pattern.search(line): # 빠름!
process(line)
|
이유: 정규표현식 컴파일은 비용이 크므로 재사용하면 성능이 향상됩니다.
Tip 3: 파일 포인터 사용 시 바이너리 모드 활용하세요
1
2
3
4
5
6
7
8
| # ❌ 텍스트 모드에서 seek은 제한적
with open("data.txt", "r") as f:
f.seek(100) # 정확하지 않을 수 있음 (인코딩 문제)
# ✅ 바이너리 모드에서 정확한 seek
with open("data.txt", "rb") as f:
f.seek(100) # 정확한 바이트 위치
data = f.read()
|
이유: 텍스트 모드에서는 인코딩 때문에 바이트 위치가 정확하지 않을 수 있습니다.
🧪 연습 문제
문제 1: 중복 줄 제거
파일에서 중복된 줄을 제거하는 함수를 작성하세요. (순서 유지)
해답 보기
1
2
3
4
5
6
7
8
9
10
11
12
13
14
| def remove_duplicates(input_file, output_file):
"""중복 줄 제거 (순서 유지)"""
seen = set()
with open(input_file, 'r', encoding='utf-8') as fin, \
open(output_file, 'w', encoding='utf-8') as fout:
for line in fin:
if line not in seen:
seen.add(line)
fout.write(line)
# 테스트
remove_duplicates('with_duplicates.txt', 'unique.txt')
|
문제 2: 단어 빈도 Top 10
파일에서 가장 많이 나온 단어 10개를 찾으세요.
해답 보기
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| from collections import Counter
import re
def top_words(filename, n=10):
"""가장 많이 나온 단어 N개"""
word_count = Counter()
with open(filename, 'r', encoding='utf-8') as f:
for line in f:
# 단어 추출 (알파벳만)
words = re.findall(r'\b[a-zA-Z]+\b', line.lower())
word_count.update(words)
print(f"상위 {n}개 단어:")
for word, count in word_count.most_common(n):
print(f" {word:15s}: {count:,}회")
# 테스트
top_words('article.txt')
|
문제 3: 파일 역순 출력
파일의 내용을 역순으로 출력하는 함수를 작성하세요. (대용량 파일 고려)
해답 보기
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
| def reverse_file(input_file, output_file):
"""파일 줄 순서 역순"""
# 작은 파일
with open(input_file, 'r', encoding='utf-8') as f:
lines = f.readlines()
with open(output_file, 'w', encoding='utf-8') as f:
f.writelines(reversed(lines))
# 대용량 파일용 (메모리 효율적)
def reverse_large_file(input_file, output_file):
"""대용량 파일 역순"""
import tempfile
# 1. 임시 파일에 위치 인덱스 저장
positions = []
with open(input_file, 'rb') as f:
positions.append(f.tell())
for line in f:
positions.append(f.tell())
# 2. 역순으로 읽어서 쓰기
with open(input_file, 'rb') as fin, \
open(output_file, 'wb') as fout:
for i in range(len(positions) - 2, -1, -1):
fin.seek(positions[i])
line = fin.readline()
fout.write(line)
|
📝 오늘 배운 내용 정리
| 개념 | 설명 | 예시 |
| 정규표현식 | 패턴 매칭과 치환 | re.findall(pattern, text) |
| 스트리밍 처리 | 대용량 파일 메모리 효율 | for line in f: |
| 파일 포인터 | 파일 내 위치 이동 | f.seek(position) |
| 청크 처리 | 파일을 부분별로 읽기 | f.read(chunk_size) |
| 파일 비교 | 두 파일의 차이 확인 | difflib.unified_diff() |
핵심 코드 패턴
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| # 대용량 파일 스트리밍 처리
with open("large.txt", "r", encoding="utf-8") as f:
for line in f:
process(line.strip())
# 정규표현식 활용
import re
pattern = re.compile(r'\d{3}-\d{4}')
matches = pattern.findall(text)
# 청크 단위 읽기
with open("file.bin", "rb") as f:
while True:
chunk = f.read(8192)
if not chunk:
break
process(chunk)
|
🔗 관련 자료
📚 이전 학습
Day 41: 파일 입출력 기초 ⭐⭐⭐
어제는 파일을 열고, 읽고, 쓰는 기본 방법과 with문을 배웠습니다!
📚 다음 학습
Day 43: JSON 데이터 처리 ⭐⭐⭐
내일은 JSON 파일 읽기/쓰기, 데이터 변환, 중첩 JSON 처리를 배웁니다!
“늦었다고 생각할 때가 가장 빠른 시기입니다!” 🚀
| Day 42/100 | Phase 5: 파일 처리와 예외 처리 | #100DaysOfPython |