포스트

[이제와서 시작하는 Python 마스터하기 #7] 파일 입출력과 예외 처리

[이제와서 시작하는 Python 마스터하기 #7] 파일 입출력과 예외 처리

📁 파일 입출력 (File I/O)

파일 입출력은 프로그램이 데이터를 영구적으로 저장하고 읽을 수 있게 해주는 중요한 기능입니다.

파일 열기와 닫기

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# 기본적인 파일 열기
file = open("example.txt", "r")  # 읽기 모드
content = file.read()
file.close()  # 반드시 닫아야 함!

# with 문 사용 (권장) - 자동으로 파일을 닫음
with open("example.txt", "r") as file:
    content = file.read()
# 여기서 파일이 자동으로 닫힘

> [!TIP]
> **초보자를 위한 : `with` 문을  쓰세요!**
>
> 파일을 열고 나서 `close()` 까먹는 실수는 프로그래머라면 누구나  번쯤 합니다.
> `with` 문을 사용하면 블록이 끝날  **자동으로 파일을 닫아줍니다**.
> 코드가  깔끔하고 안전해지니, 파일 다룰  무조건 `with` 쓰세요!
# 파일 모드
# "r" : 읽기 (기본값)
# "w" : 쓰기 (파일이 있으면 덮어씀)
# "a" : 추가 (파일 끝에 추가)
# "x" : 배타적 생성 (파일이 있으면 에러)
# "b" : 바이너리 모드 (예: "rb", "wb")
# "t" : 텍스트 모드 (기본값)
# "+" : 읽기와 쓰기 (예: "r+", "w+")

# 인코딩 지정 (한글 파일 처리 시 중요!)
with open("korean.txt", "r", encoding="utf-8") as file:
    content = file.read()

[!WARNING] 한글이 깨진다면? 인코딩을 확인하세요!

Windows에서는 기본 인코딩이 cp949인 경우가 많고, Mac/Linux는 utf-8입니다. 한글이 포함된 텍스트 파일을 다룰 때는 encoding="utf-8"을 명시해주는 것이 정신건강에 좋습니다.

텍스트 파일 읽기

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# 전체 내용 읽기
with open("data.txt", "r", encoding="utf-8") as file:
    content = file.read()  # 전체 내용을 문자열로
    print(content)

# 한 줄씩 읽기
with open("data.txt", "r", encoding="utf-8") as file:
    line = file.readline()  # 한 줄 읽기
    while line:
        print(line.strip())  # 줄바꿈 제거
        line = file.readline()

# 모든 줄을 리스트로 읽기
with open("data.txt", "r", encoding="utf-8") as file:
    lines = file.readlines()  # ['line1\n', 'line2\n', ...]
    for line in lines:
        print(line.strip())

# 반복자로 읽기 (메모리 효율적)
with open("large_file.txt", "r", encoding="utf-8") as file:
    for line in file:  # 파일 객체는 반복 가능
        process_line(line.strip())

# 특정 바이트 수만 읽기
with open("data.txt", "r", encoding="utf-8") as file:
    chunk = file.read(100)  # 100자 읽기
    print(chunk)

텍스트 파일 쓰기

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 파일에 쓰기 (덮어쓰기)
with open("output.txt", "w", encoding="utf-8") as file:
    file.write("Hello, Python!\n")
    file.write("파일 입출력 테스트\n")

# 여러 줄 한 번에 쓰기
lines = ["첫 번째 줄\n", "두 번째 줄\n", "세 번째 줄\n"]
with open("output.txt", "w", encoding="utf-8") as file:
    file.writelines(lines)

# 파일에 추가하기
with open("log.txt", "a", encoding="utf-8") as file:
    file.write(f"[{datetime.now()}] 로그 메시지\n")

# print() 함수로 파일에 쓰기
with open("output.txt", "w", encoding="utf-8") as file:
    print("Hello, World!", file=file)
    print("Python", "File", "I/O", sep=", ", file=file)

바이너리 파일 처리

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# 바이너리 파일 읽기
with open("image.jpg", "rb") as file:
    binary_data = file.read()
    print(f"파일 크기: {len(binary_data)} bytes")

# 바이너리 파일 쓰기
with open("copy.jpg", "wb") as file:
    file.write(binary_data)

# 파일 복사 함수
def copy_file(source, destination):
    """파일을 복사하는 함수"""
    with open(source, "rb") as src:
        with open(destination, "wb") as dst:
            # 청크 단위로 복사 (메모리 효율적)
            chunk_size = 4096
            while True:
                chunk = src.read(chunk_size)
                if not chunk:
                    break
                dst.write(chunk)

# 사용 예
copy_file("original.pdf", "backup.pdf")

파일 위치 제어

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
with open("data.txt", "r") as file:
    # 현재 위치 확인
    position = file.tell()
    print(f"현재 위치: {position}")
    
    # 처음 10바이트 읽기
    data = file.read(10)
    print(f"읽은 데이터: {data}")
    print(f"현재 위치: {file.tell()}")
    
    # 파일 처음으로 이동
    file.seek(0)
    print(f"이동 후 위치: {file.tell()}")
    
    # 파일 끝으로 이동
    file.seek(0, 2)  # 2는 파일 끝 기준
    file_size = file.tell()
    print(f"파일 크기: {file_size} bytes")
    
    # 상대 위치로 이동
    file.seek(-10, 2)  # 파일 끝에서 10바이트 전

🚨 예외 처리 (Exception Handling)

예외 처리는 프로그램 실행 중 발생할 수 있는 오류를 우아하게 처리하는 방법입니다.

기본 예외 처리

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# try-except 기본 구조
try:
    # 예외가 발생할 수 있는 코드
    result = 10 / 0
except ZeroDivisionError:
    # 예외 발생 시 실행되는 코드
    print("0으로 나눌 수 없습니다!")

# 여러 예외 처리
try:
    number = int(input("숫자 입력: "))
    result = 10 / number
    print(f"결과: {result}")
except ValueError:
    print("올바른 숫자를 입력하세요!")
except ZeroDivisionError:
    print("0으로 나눌 수 없습니다!")

# 여러 예외를 한 번에 처리
try:
    # 위험한 작업
    pass
except (ValueError, TypeError) as e:
    print(f"값 또는 타입 오류: {e}")

# 모든 예외 처리 (권장하지 않음)
try:
    # 위험한 작업
    pass
except Exception as e:
    print(f"예외 발생: {e}")

[!CAUTION] 모든 에러를 퉁치지 마세요!

except Exception: 처럼 모든 예외를 한 번에 잡는 것은 좋지 않습니다. 정말 예상치 못한 심각한 에러까지 숨겨버려서, 나중에 버그를 찾기 엄청 힘들어질 수 있습니다. 가능한 한 ValueError, FileNotFoundError 처럼 구체적인 예외를 명시해서 처리하세요.

예외 처리 고급 기능

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# else 절: 예외가 발생하지 않았을 때 실행
try:
    file = open("data.txt", "r")
except FileNotFoundError:
    print("파일을 찾을 수 없습니다!")
else:
    content = file.read()
    file.close()
    print("파일을 성공적으로 읽었습니다!")

# finally 절: 항상 실행 (정리 작업용)
file = None
try:
    file = open("data.txt", "r")
    content = file.read()
except FileNotFoundError:
    print("파일을 찾을 수 없습니다!")
finally:
    if file:
        file.close()
        print("파일을 닫았습니다.")

# 예외 정보 얻기
import sys
import traceback

try:
    1 / 0
except Exception as e:
    print(f"예외 타입: {type(e).__name__}")
    print(f"예외 메시지: {str(e)}")
    print(f"예외 인자: {e.args}")
    
    # 상세한 traceback 정보
    exc_type, exc_value, exc_traceback = sys.exc_info()
    traceback.print_exception(exc_type, exc_value, exc_traceback)

예외 발생시키기

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# raise로 예외 발생
def validate_age(age):
    if age < 0:
        raise ValueError("나이는 음수일 수 없습니다!")
    if age > 150:
        raise ValueError("나이가 너무 많습니다!")
    return age

try:
    age = validate_age(-5)
except ValueError as e:
    print(f"유효성 검사 실패: {e}")

# 예외 재발생
def process_data(data):
    try:
        # 데이터 처리
        result = risky_operation(data)
    except Exception as e:
        # 로깅 후 예외 재발생
        print(f"처리 중 오류 발생: {e}")
        raise  # 원래 예외를 그대로 재발생

# 예외 연쇄 (Python 3)
try:
    open("nonexistent.txt")
except FileNotFoundError as e:
    raise RuntimeError("설정 파일을 로드할 수 없습니다") from e

사용자 정의 예외

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# 사용자 정의 예외 클래스
class CustomError(Exception):
    """기본 사용자 정의 예외"""
    pass

class ValidationError(Exception):
    """유효성 검사 예외"""
    def __init__(self, field, value, message):
        self.field = field
        self.value = value
        self.message = message
        super().__init__(f"{field}: {message}")

class InsufficientFundsError(Exception):
    """잔액 부족 예외"""
    def __init__(self, balance, amount):
        self.balance = balance
        self.amount = amount
        super().__init__(
            f"잔액 부족: 현재 잔액 {balance}원, 요청 금액 {amount}"
        )

# 사용 예제
class BankAccount:
    def __init__(self, balance=0):
        self.balance = balance
    
    def withdraw(self, amount):
        if amount <= 0:
            raise ValidationError("amount", amount, "출금액은 양수여야 합니다")
        if amount > self.balance:
            raise InsufficientFundsError(self.balance, amount)
        
        self.balance -= amount
        return self.balance

# 사용
account = BankAccount(10000)
try:
    account.withdraw(15000)
except InsufficientFundsError as e:
    print(e)
    print(f"부족한 금액: {e.amount - e.balance}")

💡 실전 예제

1. 로그 파일 관리 시스템

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
import os
import json
from datetime import datetime
from enum import Enum

class LogLevel(Enum):
    DEBUG = "DEBUG"
    INFO = "INFO"
    WARNING = "WARNING"
    ERROR = "ERROR"
    CRITICAL = "CRITICAL"

class LogManager:
    """로그 파일 관리 시스템"""
    
    def __init__(self, log_dir="logs", max_file_size=1024*1024):  # 1MB
        self.log_dir = log_dir
        self.max_file_size = max_file_size
        self.current_date = None
        self.current_file = None
        self._ensure_log_dir()
    
    def _ensure_log_dir(self):
        """로그 디렉토리 생성"""
        if not os.path.exists(self.log_dir):
            os.makedirs(self.log_dir)
    
    def _get_log_filename(self):
        """현재 날짜의 로그 파일명 반환"""
        today = datetime.now().strftime("%Y-%m-%d")
        return os.path.join(self.log_dir, f"app_{today}.log")
    
    def _rotate_if_needed(self):
        """파일 크기 확인 및 로테이션"""
        if self.current_file and os.path.exists(self.current_file):
            size = os.path.getsize(self.current_file)
            if size > self.max_file_size:
                # 파일 로테이션
                timestamp = datetime.now().strftime("%H%M%S")
                base, ext = os.path.splitext(self.current_file)
                new_name = f"{base}_{timestamp}{ext}"
                os.rename(self.current_file, new_name)
    
    def log(self, level, message, extra_data=None):
        """로그 메시지 기록"""
        try:
            # 날짜 변경 확인
            today = datetime.now().date()
            if self.current_date != today:
                self.current_date = today
                self.current_file = self._get_log_filename()
            
            # 파일 크기 확인
            self._rotate_if_needed()
            
            # 로그 엔트리 생성
            log_entry = {
                "timestamp": datetime.now().isoformat(),
                "level": level.value,
                "message": message
            }
            
            if extra_data:
                log_entry["data"] = extra_data
            
            # 파일에 기록
            with open(self.current_file, "a", encoding="utf-8") as f:
                f.write(json.dumps(log_entry, ensure_ascii=False) + "\n")
        
        except Exception as e:
            # 로깅 실패 시 콘솔 출력
            print(f"로깅 실패: {e}")
    
    def debug(self, message, **kwargs):
        self.log(LogLevel.DEBUG, message, kwargs)
    
    def info(self, message, **kwargs):
        self.log(LogLevel.INFO, message, kwargs)
    
    def warning(self, message, **kwargs):
        self.log(LogLevel.WARNING, message, kwargs)
    
    def error(self, message, **kwargs):
        self.log(LogLevel.ERROR, message, kwargs)
    
    def critical(self, message, **kwargs):
        self.log(LogLevel.CRITICAL, message, kwargs)
    
    def read_logs(self, date=None, level=None):
        """로그 파일 읽기"""
        if date is None:
            date = datetime.now().strftime("%Y-%m-%d")
        
        log_file = os.path.join(self.log_dir, f"app_{date}.log")
        
        if not os.path.exists(log_file):
            return []
        
        logs = []
        try:
            with open(log_file, "r", encoding="utf-8") as f:
                for line in f:
                    try:
                        log_entry = json.loads(line.strip())
                        if level is None or log_entry.get("level") == level.value:
                            logs.append(log_entry)
                    except json.JSONDecodeError:
                        continue
        except Exception as e:
            self.error(f"로그 읽기 실패: {e}")
        
        return logs
    
    def get_statistics(self, date=None):
        """로그 통계 반환"""
        logs = self.read_logs(date)
        stats = {level.value: 0 for level in LogLevel}
        
        for log in logs:
            level = log.get("level", "UNKNOWN")
            if level in stats:
                stats[level] += 1
        
        return stats

# 사용 예제
logger = LogManager()

# 로그 기록
logger.info("애플리케이션 시작")
logger.debug("디버그 정보", user_id=123, action="login")

try:
    result = 10 / 0
except ZeroDivisionError as e:
    logger.error("계산 오류 발생", error=str(e), operation="division")

# 로그 읽기
logs = logger.read_logs(level=LogLevel.ERROR)
for log in logs:
    print(f"[{log['timestamp']}] {log['level']}: {log['message']}")

# 통계 확인
stats = logger.get_statistics()
print(f"로그 통계: {stats}")

2. 설정 파일 관리자

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
import json
import yaml
import configparser
from pathlib import Path

class ConfigManager:
    """다양한 형식의 설정 파일 관리자"""
    
    def __init__(self, config_dir="config"):
        self.config_dir = Path(config_dir)
        self.config_dir.mkdir(exist_ok=True)
        self.configs = {}
    
    def load_json(self, filename):
        """JSON 설정 파일 로드"""
        filepath = self.config_dir / filename
        try:
            with open(filepath, "r", encoding="utf-8") as f:
                return json.load(f)
        except FileNotFoundError:
            raise FileNotFoundError(f"설정 파일을 찾을 수 없습니다: {filepath}")
        except json.JSONDecodeError as e:
            raise ValueError(f"잘못된 JSON 형식: {e}")
    
    def save_json(self, filename, data, indent=4):
        """JSON 설정 파일 저장"""
        filepath = self.config_dir / filename
        try:
            with open(filepath, "w", encoding="utf-8") as f:
                json.dump(data, f, ensure_ascii=False, indent=indent)
        except Exception as e:
            raise IOError(f"설정 파일 저장 실패: {e}")
    
    def load_yaml(self, filename):
        """YAML 설정 파일 로드"""
        filepath = self.config_dir / filename
        try:
            with open(filepath, "r", encoding="utf-8") as f:
                return yaml.safe_load(f)
        except FileNotFoundError:
            raise FileNotFoundError(f"설정 파일을 찾을 수 없습니다: {filepath}")
        except yaml.YAMLError as e:
            raise ValueError(f"잘못된 YAML 형식: {e}")
    
    def save_yaml(self, filename, data):
        """YAML 설정 파일 저장"""
        filepath = self.config_dir / filename
        try:
            with open(filepath, "w", encoding="utf-8") as f:
                yaml.dump(data, f, allow_unicode=True, default_flow_style=False)
        except Exception as e:
            raise IOError(f"설정 파일 저장 실패: {e}")
    
    def load_ini(self, filename):
        """INI 설정 파일 로드"""
        filepath = self.config_dir / filename
        config = configparser.ConfigParser()
        
        try:
            config.read(filepath, encoding="utf-8")
            # ConfigParser를 딕셔너리로 변환
            result = {}
            for section in config.sections():
                result[section] = dict(config[section])
            return result
        except Exception as e:
            raise ValueError(f"INI 파일 로드 실패: {e}")
    
    def save_ini(self, filename, data):
        """INI 설정 파일 저장"""
        filepath = self.config_dir / filename
        config = configparser.ConfigParser()
        
        try:
            # 딕셔너리를 ConfigParser로 변환
            for section, options in data.items():
                config[section] = options
            
            with open(filepath, "w", encoding="utf-8") as f:
                config.write(f)
        except Exception as e:
            raise IOError(f"INI 파일 저장 실패: {e}")
    
    def load_config(self, name, format="json"):
        """설정 로드 (캐싱 포함)"""
        if name in self.configs:
            return self.configs[name]
        
        filename = f"{name}.{format}"
        
        if format == "json":
            config = self.load_json(filename)
        elif format == "yaml" or format == "yml":
            config = self.load_yaml(filename)
        elif format == "ini":
            config = self.load_ini(filename)
        else:
            raise ValueError(f"지원하지 않는 형식: {format}")
        
        self.configs[name] = config
        return config
    
    def save_config(self, name, data, format="json"):
        """설정 저장"""
        filename = f"{name}.{format}"
        
        if format == "json":
            self.save_json(filename, data)
        elif format == "yaml" or format == "yml":
            self.save_yaml(filename, data)
        elif format == "ini":
            self.save_ini(filename, data)
        else:
            raise ValueError(f"지원하지 않는 형식: {format}")
        
        # 캐시 업데이트
        self.configs[name] = data
    
    def get(self, name, key, default=None):
        """설정 값 가져오기 (점 표기법 지원)"""
        config = self.configs.get(name, {})
        
        # 점 표기법 처리 (예: "database.host")
        keys = key.split(".")
        value = config
        
        for k in keys:
            if isinstance(value, dict) and k in value:
                value = value[k]
            else:
                return default
        
        return value
    
    def set(self, name, key, value):
        """설정 값 설정 (점 표기법 지원)"""
        if name not in self.configs:
            self.configs[name] = {}
        
        config = self.configs[name]
        keys = key.split(".")
        
        # 중첩된 딕셔너리 생성
        current = config
        for k in keys[:-1]:
            if k not in current:
                current[k] = {}
            current = current[k]
        
        current[keys[-1]] = value

# 사용 예제
config_manager = ConfigManager()

# JSON 설정
app_config = {
    "app": {
        "name": "MyApp",
        "version": "1.0.0"
    },
    "database": {
        "host": "localhost",
        "port": 5432,
        "name": "mydb"
    }
}

config_manager.save_config("app", app_config, "json")

# 설정 로드 및 사용
config = config_manager.load_config("app", "json")
db_host = config_manager.get("app", "database.host")
print(f"데이터베이스 호스트: {db_host}")

# 설정 수정
config_manager.set("app", "database.port", 3306)
config_manager.save_config("app", config_manager.configs["app"], "json")

3. CSV 데이터 처리기

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
import csv
from collections import defaultdict
from datetime import datetime

class CSVProcessor:
    """CSV 파일 처리 및 분석 도구"""
    
    def __init__(self, encoding="utf-8"):
        self.encoding = encoding
    
    def read_csv(self, filename, delimiter=",", has_header=True):
        """CSV 파일 읽기"""
        data = []
        headers = None
        
        try:
            with open(filename, "r", encoding=self.encoding) as file:
                reader = csv.reader(file, delimiter=delimiter)
                
                if has_header:
                    headers = next(reader)
                
                for row in reader:
                    if headers:
                        # 딕셔너리로 변환
                        data.append(dict(zip(headers, row)))
                    else:
                        data.append(row)
            
            return data, headers
        
        except FileNotFoundError:
            raise FileNotFoundError(f"CSV 파일을 찾을 수 없습니다: {filename}")
        except Exception as e:
            raise IOError(f"CSV 읽기 오류: {e}")
    
    def write_csv(self, filename, data, headers=None, delimiter=","):
        """CSV 파일 쓰기"""
        try:
            with open(filename, "w", encoding=self.encoding, newline="") as file:
                if headers:
                    # DictWriter 사용
                    writer = csv.DictWriter(file, fieldnames=headers, delimiter=delimiter)
                    writer.writeheader()
                    writer.writerows(data)
                else:
                    # 일반 Writer 사용
                    writer = csv.writer(file, delimiter=delimiter)
                    writer.writerows(data)
        
        except Exception as e:
            raise IOError(f"CSV 쓰기 오류: {e}")
    
    def filter_data(self, data, **conditions):
        """조건에 맞는 데이터 필터링"""
        filtered = []
        
        for row in data:
            match = True
            for key, value in conditions.items():
                if key not in row or str(row[key]) != str(value):
                    match = False
                    break
            
            if match:
                filtered.append(row)
        
        return filtered
    
    def aggregate_data(self, data, group_by, aggregate_field, operation="sum"):
        """데이터 집계"""
        groups = defaultdict(list)
        
        # 그룹화
        for row in data:
            key = row.get(group_by)
            value = row.get(aggregate_field)
            
            try:
                value = float(value)
                groups[key].append(value)
            except (ValueError, TypeError):
                continue
        
        # 집계
        results = {}
        for key, values in groups.items():
            if operation == "sum":
                results[key] = sum(values)
            elif operation == "avg":
                results[key] = sum(values) / len(values) if values else 0
            elif operation == "count":
                results[key] = len(values)
            elif operation == "max":
                results[key] = max(values) if values else None
            elif operation == "min":
                results[key] = min(values) if values else None
        
        return results
    
    def convert_types(self, data, type_map):
        """데이터 타입 변환"""
        converted = []
        
        for row in data:
            new_row = row.copy()
            
            for field, type_func in type_map.items():
                if field in new_row:
                    try:
                        new_row[field] = type_func(new_row[field])
                    except (ValueError, TypeError):
                        # 변환 실패 시 원본 유지
                        pass
            
            converted.append(new_row)
        
        return converted
    
    def validate_data(self, data, rules):
        """데이터 유효성 검사"""
        errors = []
        valid_data = []
        
        for i, row in enumerate(data):
            row_errors = []
            
            for field, rule in rules.items():
                if field not in row:
                    row_errors.append(f"필드 누락: {field}")
                    continue
                
                value = row[field]
                
                # 필수 필드 검사
                if rule.get("required") and not value:
                    row_errors.append(f"{field}: 필수 값")
                
                # 타입 검사
                if "type" in rule and value:
                    try:
                        rule["type"](value)
                    except ValueError:
                        row_errors.append(f"{field}: 잘못된 타입")
                
                # 범위 검사
                if "min" in rule:
                    try:
                        if float(value) < rule["min"]:
                            row_errors.append(f"{field}: 최소값 미달")
                    except ValueError:
                        pass
                
                if "max" in rule:
                    try:
                        if float(value) > rule["max"]:
                            row_errors.append(f"{field}: 최대값 초과")
                    except ValueError:
                        pass
            
            if row_errors:
                errors.append({"row": i + 1, "errors": row_errors})
            else:
                valid_data.append(row)
        
        return valid_data, errors

# 사용 예제
processor = CSVProcessor()

# 샘플 데이터 생성
sample_data = [
    {"name": "김철수", "age": "25", "score": "85.5", "date": "2024-03-01"},
    {"name": "이영희", "age": "30", "score": "92.0", "date": "2024-03-01"},
    {"name": "박민수", "age": "28", "score": "78.5", "date": "2024-03-02"},
    {"name": "최지원", "age": "22", "score": "88.0", "date": "2024-03-02"},
]

# CSV 파일 쓰기
headers = ["name", "age", "score", "date"]
processor.write_csv("students.csv", sample_data, headers)

# CSV 파일 읽기
data, headers = processor.read_csv("students.csv")
print(f"읽은 데이터 수: {len(data)}")

# 데이터 타입 변환
type_map = {
    "age": int,
    "score": float,
    "date": lambda x: datetime.strptime(x, "%Y-%m-%d").date()
}
converted_data = processor.convert_types(data, type_map)

# 데이터 필터링
young_students = processor.filter_data(data, date="2024-03-01")
print(f"3월 1일 데이터: {len(young_students)}")

# 데이터 집계
score_by_date = processor.aggregate_data(data, "date", "score", "avg")
print(f"날짜별 평균 점수: {score_by_date}")

# 데이터 유효성 검사
validation_rules = {
    "name": {"required": True},
    "age": {"required": True, "type": int, "min": 0, "max": 100},
    "score": {"required": True, "type": float, "min": 0, "max": 100}
}

valid_data, errors = processor.validate_data(data, validation_rules)
if errors:
    print("유효성 검사 오류:")
    for error in errors:
        print(f"{error['row']}: {', '.join(error['errors'])}")

⚠️ 초보자가 자주 하는 실수

1. 파일 닫기 실수

1
2
3
4
5
6
7
8
# ❌ 파일을 닫지 않음
file = open("data.txt", "r")
content = file.read()
# file.close() 를 잊음!

# ✅ with 문 사용 (자동으로 닫힘)
with open("data.txt", "r") as file:
    content = file.read()

2. 인코딩 문제

1
2
3
4
5
6
7
# ❌ 한글 파일에서 인코딩 미지정
with open("korean.txt", "r") as f:  # UnicodeDecodeError 가능
    content = f.read()

# ✅ UTF-8 인코딩 명시
with open("korean.txt", "r", encoding="utf-8") as f:
    content = f.read()

3. 너무 광범위한 예외 처리

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# ❌ 모든 예외를 잡음
try:
    value = int(input())
    result = 10 / value
except:  # 너무 광범위!
    print("에러 발생")

# ✅ 구체적인 예외 처리
try:
    value = int(input())
    result = 10 / value
except ValueError:
    print("숫자를 입력하세요")
except ZeroDivisionError:
    print("0으로 나눌 수 없습니다")

4. 예외 무시

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# ❌ 예외를 조용히 무시
try:
    risky_operation()
except:
    pass  # 위험한 패턴!

# ✅ 최소한 로깅
try:
    risky_operation()
except Exception as e:
    print(f"오류 발생: {e}")
    # 또는 로깅
    import logging
    logging.error(f"Operation failed: {e}")

5. finally 미사용

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# ❌ 리소스 정리를 잊음
file = None
try:
    file = open("data.txt", "r")
    process_data(file)
except Exception as e:
    print(f"Error: {e}")
# 파일이 닫히지 않을 수 있음

# ✅ finally로 확실한 정리
file = None
try:
    file = open("data.txt", "r")
    process_data(file)
except Exception as e:
    print(f"Error: {e}")
finally:
    if file:
        file.close()

🎯 핵심 정리

파일 처리 Best Practices

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# 1. 항상 with 문 사용
# 좋음
with open("file.txt", "r") as f:
    content = f.read()

# 나쁨
f = open("file.txt", "r")
content = f.read()
f.close()  # 예외 발생 시 실행되지 않을 수 있음

# 2. 인코딩 명시 (특히 한글 파일)
with open("korean.txt", "r", encoding="utf-8") as f:
    content = f.read()

# 3. 대용량 파일은 청크 단위로 처리
def process_large_file(filename, chunk_size=1024*1024):  # 1MB
    with open(filename, "rb") as f:
        while True:
            chunk = f.read(chunk_size)
            if not chunk:
                break
            process_chunk(chunk)

# 4. 경로 처리는 pathlib 사용
from pathlib import Path

file_path = Path("data") / "subfolder" / "file.txt"
if file_path.exists():
    content = file_path.read_text(encoding="utf-8")

예외 처리 Best Practices

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# 1. 구체적인 예외 처리
# 좋음
try:
    value = int(user_input)
except ValueError:
    print("숫자를 입력하세요")

# 나쁨
try:
    value = int(user_input)
except:  # 모든 예외 포착
    print("오류 발생")

# 2. 예외 체이닝 사용
try:
    config = load_config()
except FileNotFoundError as e:
    raise ConfigError("설정 파일 로드 실패") from e

# 3. finally로 정리 작업
resource = acquire_resource()
try:
    use_resource(resource)
except Exception as e:
    log_error(e)
    raise
finally:
    release_resource(resource)

# 4. 컨텍스트 관리자로 리소스 관리
class ManagedResource:
    def __enter__(self):
        self.resource = acquire_resource()
        return self.resource
    
    def __exit__(self, exc_type, exc_value, traceback):
        release_resource(self.resource)

with ManagedResource() as resource:
    use_resource(resource)

파일 작업 체크리스트

graph TD
    A[파일 작업 시작] --> B{파일 존재?}
    B -->|Yes| C[권한 확인]
    B -->|No| D[FileNotFoundError 처리]
    C --> E[인코딩 지정]
    E --> F[with 문으로 열기]
    F --> G[작업 수행]
    G --> H[예외 처리]
    H --> I[자동으로 파일 닫힘]
    
    J[예외 처리 체크리스트] --> K[구체적인 예외 타입]
    K --> L[의미 있는 에러 메시지]
    L --> M[로깅/기록]
    M --> N[복구 전략]
    N --> O[리소스 정리]

🎓 파이썬 마스터하기 시리즈

📚 기초편 (1-7)

  1. Python 소개와 개발 환경 설정 완벽 가이드
  2. 변수, 자료형, 연산자 완벽 정리
  3. 조건문과 반복문 마스터하기
  4. 함수와 람다 완벽 가이드
  5. 리스트, 튜플, 딕셔너리 정복하기
  6. 문자열 처리와 정규표현식
  7. 파일 입출력과 예외 처리

🚀 중급편 (8-12)

  1. 클래스와 객체지향 프로그래밍
  2. 모듈과 패키지 관리
  3. 데코레이터와 제너레이터
  4. 비동기 프로그래밍 (async/await)
  5. 데이터베이스 연동하기

💼 고급편 (13-16)

  1. 웹 스크래핑과 API 활용
  2. 테스트와 디버깅 전략
  3. 성능 최적화 기법
  4. 멀티프로세싱과 병렬 처리

이전글: 문자열 처리와 정규표현식 ⬅️ 현재글: 파일 입출력과 예외 처리 다음글: 클래스와 객체지향 프로그래밍 ➡️


이번 포스트에서는 파일 입출력과 예외 처리를 학습했습니다. 다음 포스트에서는 Python의 객체지향 프로그래밍을 완벽히 마스터해보겠습니다. Happy Coding! 🐍✨

이 기사는 저작권자의 CC BY 4.0 라이센스를 따릅니다.