포스트

[AWS] 배치 처리 에러 모니터링 전략: CloudWatch부터 Slack까지 완벽 가이드

[AWS] 배치 처리 에러 모니터링 전략: CloudWatch부터 Slack까지 완벽 가이드

들어가며

최근 야간 배치 처리 중 발생한 에러를 즉시 인지하지 못해 큰 문제가 발생할 뻔한 경험이 있었습니다. CloudWatch에서 일부 지표는 Amazon Q Developer(구 AWS Chatbot)로 알림을 받고 있었지만, 정작 중요한 배치 처리 에러는 알림이 없었죠.

이번 포스트에서는 AWS 환경에서 배치 처리 에러를 효과적으로 모니터링하는 다양한 방법을 비교 분석하고, 특히 Python Django + Celery 환경에서 Slack 통합을 구현하는 방법을 자세히 다루겠습니다.

1. 배치 처리 모니터링의 중요성

왜 배치 처리 모니터링이 중요한가?

graph TD
    A[배치 작업 실행] --> B{정상 처리?}
    B -->|Yes| C[완료]
    B -->|No| D[에러 발생]
    
    D --> E{모니터링<br/>시스템?}
    E -->|없음| F[문제 발견까지<br/>긴 시간 소요]
    E -->|있음| G[즉시 알림]
    
    F --> H[비즈니스 손실]
    G --> I[빠른 대응]
    
    style D fill:#f96,stroke:#333,stroke-width:2px
    style G fill:#9f9,stroke:#333,stroke-width:2px
    style H fill:#f66,stroke:#333,stroke-width:2px

배치 처리 실패의 영향

배치 유형 실패 시 영향 대응 시간 요구사항
데이터 집계 대시보드 정보 부정확 1-2시간 내
결제 처리 매출 손실, 고객 불만 즉시
리포트 생성 의사결정 지연 업무 시작 전
백업 작업 데이터 손실 위험 30분 내
ETL 파이프라인 다운스트림 작업 실패 15분 내

2. AWS 배치 모니터링 솔루션 비교

주요 모니터링 방법 개요

mindmap
  root((배치 모니터링<br/>솔루션))
    AWS 네이티브
      CloudWatch Alarms
      Amazon Q Developer
      EventBridge + Lambda
      Systems Manager
    직접 통합
      Slack Webhook
      Email/SMS
    서드파티
      Datadog
      New Relic
      PagerDuty

솔루션별 상세 비교

솔루션 구현 난이도 비용 응답 속도 커스터마이징 권장 사용 시나리오
CloudWatch + SNS ⭐⭐ $ 1-2분 중간 기본적인 임계값 모니터링
Amazon Q Developer ⭐⭐⭐ $$ 실시간 낮음 CloudWatch 중심 환경
Slack Webhook 직접 통합 무료 즉시 높음 개발팀 즉시 알림
EventBridge + Lambda ⭐⭐⭐⭐ $$ 실시간 매우 높음 복잡한 워크플로우
Systems Manager ⭐⭐⭐⭐⭐ $$$ 실시간 높음 엔터프라이즈 환경
Datadog/New Relic ⭐⭐⭐ \(\) 실시간 높음 종합 모니터링 필요

3. Django + Celery 환경에서의 구현

아키텍처 구성

graph LR
    subgraph Django Application
        A[Django App] --> B[Celery Task]
    end
    
    subgraph Message Broker
        B --> C[Redis/RabbitMQ]
    end
    
    subgraph Worker
        C --> D[Celery Worker]
        D --> E{Task 실행}
    end
    
    subgraph Error Handling
        E -->|성공| F[완료]
        E -->|실패| G[Error Handler]
        G --> H[Slack 알림]
        G --> I[CloudWatch 로그]
        G --> J[DB 저장]
    end
    
    style E fill:#bbf,stroke:#333,stroke-width:2px
    style G fill:#f96,stroke:#333,stroke-width:2px
    style H fill:#9f9,stroke:#333,stroke-width:2px

Celery 에러 핸들러 구현

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# tasks/error_handler.py
import json
import requests
from celery import Task
from celery.utils.log import get_task_logger
from django.conf import settings
import boto3

logger = get_task_logger(__name__)

class ErrorHandlingTask(Task):
    """에러 처리를 위한 기본 Task 클래스"""
    
    def on_failure(self, exc, task_id, args, kwargs, einfo):
        """작업 실패 시 호출되는 메서드"""
        
        # 1. 로그 기록
        logger.error(f'Task {self.name}[{task_id}] failed: {exc}')
        
        # 2. CloudWatch 메트릭 전송
        self._send_cloudwatch_metric(task_id, exc)
        
        # 3. Slack 알림 전송
        self._send_slack_notification(task_id, exc, args, kwargs, einfo)
        
        # 4. DB에 에러 기록
        self._save_error_to_db(task_id, exc, args, kwargs)
    
    def _send_slack_notification(self, task_id, exc, args, kwargs, einfo):
        """Slack으로 에러 알림 전송"""
        
        # 환경별 처리
        env = settings.ENVIRONMENT  # 'development', 'staging', 'production'
        
        # Production만 알림을 보내고 싶다면
        if env != 'production' and not settings.DEBUG_SLACK_NOTIFICATIONS:
            return
        
        # 환경별 Webhook URL 가져오기
        webhook_url = self._get_webhook_url_for_env(env)
        
        if not webhook_url:
            logger.warning(f"No Slack webhook configured for {env}")
            return
        
        # 메시지 구성
        message = self._build_slack_message(task_id, exc, args, kwargs, einfo, env)
        
        try:
            response = requests.post(webhook_url, json=message)
            response.raise_for_status()
        except requests.exceptions.RequestException as e:
            logger.error(f"Failed to send Slack notification: {e}")
    
    def _get_webhook_url_for_env(self, env):
        """AWS Secrets Manager에서 환경별 Webhook URL 가져오기"""
        
        # Secrets Manager 클라이언트
        client = boto3.client('secretsmanager', region_name='ap-northeast-1')
        
        try:
            # 환경별 시크릿 이름
            secret_name = f"slack-webhook-{env}"
            
            response = client.get_secret_value(SecretId=secret_name)
            secret = json.loads(response['SecretString'])
            
            return secret.get('webhook_url')
        except Exception as e:
            logger.error(f"Failed to retrieve webhook URL: {e}")
            return None
    
    def _build_slack_message(self, task_id, exc, args, kwargs, einfo, env):
        """Slack 메시지 포맷 구성"""
        
        # 환경별 색상
        color_map = {
            'production': '#FF0000',  # 빨강
            'staging': '#FFA500',     # 주황
            'development': '#0000FF'  # 파랑
        }
        
        return {
            "attachments": [{
                "color": color_map.get(env, '#808080'),
                "title": f"🚨 Celery Task Failed - {env.upper()}",
                "fields": [
                    {
                        "title": "Task Name",
                        "value": self.name,
                        "short": True
                    },
                    {
                        "title": "Task ID",
                        "value": task_id,
                        "short": True
                    },
                    {
                        "title": "Error Type",
                        "value": type(exc).__name__,
                        "short": True
                    },
                    {
                        "title": "Environment",
                        "value": env,
                        "short": True
                    },
                    {
                        "title": "Error Message",
                        "value": str(exc),
                        "short": False
                    },
                    {
                        "title": "Traceback",
                        "value": f"```{einfo}```",
                        "short": False
                    }
                ],
                "footer": "Celery Error Handler",
                "ts": int(time.time())
            }]
        }

환경별 설정 관리

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# settings/base.py
import os

# 환경 설정
ENVIRONMENT = os.environ.get('DJANGO_ENV', 'development')

# Slack 설정
SLACK_NOTIFICATIONS = {
    'development': {
        'enabled': os.environ.get('SLACK_DEV_ENABLED', 'false').lower() == 'true',
        'channel': '#42b_talentquest_notice_dev',
        'secret_name': 'slack-webhook-development'
    },
    'staging': {
        'enabled': os.environ.get('SLACK_STAGING_ENABLED', 'false').lower() == 'true',
        'channel': '#42b_talentquest_notice_dev',
        'secret_name': 'slack-webhook-staging'
    },
    'production': {
        'enabled': True,  # Production은 항상 활성화
        'channel': '#42b_talentquest_notice_prod',
        'secret_name': 'slack-webhook-production'
    }
}

# 디버그용 플래그 (개발 환경에서도 알림을 받고 싶을 때)
DEBUG_SLACK_NOTIFICATIONS = os.environ.get('DEBUG_SLACK_NOTIFICATIONS', 'false').lower() == 'true'

4. AWS Secrets Manager를 활용한 Webhook URL 관리

시크릿 저장 구조

1
2
3
4
5
6
7
{
  "webhook_url": "https://hooks.slack.com/services/T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX",
  "channel": "#42b_talentquest_notice_prod",
  "created_by": "admin@company.com",
  "created_at": "2025-01-18T10:00:00Z",
  "description": "Production Slack webhook for batch error notifications"
}

Secrets Manager 설정 스크립트

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
#!/bin/bash
# scripts/setup_slack_secrets.sh

# Production webhook
aws secretsmanager create-secret \
    --name "slack-webhook-production" \
    --description "Slack webhook URL for production environment" \
    --secret-string '{
        "webhook_url": "https://hooks.slack.com/services/PROD_WEBHOOK_URL",
        "channel": "#42b_talentquest_notice_prod"
    }' \
    --region ap-northeast-1

# Staging webhook (선택적)
aws secretsmanager create-secret \
    --name "slack-webhook-staging" \
    --description "Slack webhook URL for staging environment" \
    --secret-string '{
        "webhook_url": "https://hooks.slack.com/services/STAGING_WEBHOOK_URL",
        "channel": "#42b_talentquest_notice_dev"
    }' \
    --region ap-northeast-1

# IAM 정책 생성
aws iam create-policy \
    --policy-name CelerySecretsAccess \
    --policy-document '{
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "secretsmanager:GetSecretValue"
                ],
                "Resource": [
                    "arn:aws:secretsmanager:ap-northeast-1:*:secret:slack-webhook-*"
                ]
            }
        ]
    }'

5. 실전 구현: 배치 작업 모니터링

배치 작업 정의

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# tasks/batch_tasks.py
from celery import shared_task
from .error_handler import ErrorHandlingTask
import time
import random

@shared_task(base=ErrorHandlingTask, bind=True, max_retries=3)
def process_daily_aggregation(self, date_str):
    """일일 데이터 집계 작업"""
    
    try:
        # 시작 알림 (선택적)
        self._notify_task_started(date_str)
        
        # 실제 작업 수행
        logger.info(f"Starting daily aggregation for {date_str}")
        
        # 데이터 처리 로직
        result = perform_aggregation(date_str)
        
        # 성공 메트릭 전송
        self._send_success_metric()
        
        return result
        
    except Exception as exc:
        # 재시도 로직
        logger.error(f"Task failed, retrying... ({self.request.retries}/{self.max_retries})")
        
        # 지수 백오프
        countdown = 60 * (2 ** self.request.retries)
        
        raise self.retry(exc=exc, countdown=countdown)

@shared_task(base=ErrorHandlingTask, bind=True)
def cleanup_old_data(self, days=30):
    """오래된 데이터 정리 작업"""
    
    try:
        deleted_count = 0
        
        # 배치로 처리
        for batch in get_old_data_batches(days):
            deleted_count += delete_batch(batch)
            
            # 진행 상황 로깅
            if deleted_count % 1000 == 0:
                logger.info(f"Deleted {deleted_count} records so far...")
        
        # 완료 알림
        self._notify_cleanup_completed(deleted_count)
        
        return {"deleted": deleted_count}
        
    except Exception as exc:
        # ErrorHandlingTask가 자동으로 Slack 알림 전송
        raise

모니터링 대시보드 구성

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# monitoring/batch_monitor.py
import boto3
from datetime import datetime, timedelta
from django.core.management.base import BaseCommand

class BatchMonitor:
    def __init__(self):
        self.cloudwatch = boto3.client('cloudwatch', region_name='ap-northeast-1')
    
    def check_batch_health(self):
        """배치 작업 상태 확인"""
        
        metrics = {
            'failed_tasks': self._get_failed_task_count(),
            'pending_tasks': self._get_pending_task_count(),
            'average_duration': self._get_average_duration(),
            'last_success_time': self._get_last_success_time()
        }
        
        # 임계값 확인
        alerts = []
        
        if metrics['failed_tasks'] > 5:
            alerts.append({
                'severity': 'critical',
                'message': f"Failed tasks: {metrics['failed_tasks']}"
            })
        
        if metrics['pending_tasks'] > 100:
            alerts.append({
                'severity': 'warning',
                'message': f"Pending tasks: {metrics['pending_tasks']}"
            })
        
        if metrics['average_duration'] > 300:  # 5분
            alerts.append({
                'severity': 'warning',
                'message': f"Slow performance: {metrics['average_duration']}s average"
            })
        
        return metrics, alerts
    
    def send_health_report(self):
        """일일 헬스 리포트 전송"""
        
        metrics, alerts = self.check_batch_health()
        
        # Slack으로 일일 리포트 전송
        if alerts or datetime.now().hour == 9:  # 오전 9시 또는 알림 발생 시
            self._send_slack_health_report(metrics, alerts)

6. 환경별 알림 전략

알림 정책 매트릭스

환경 에러 레벨 알림 채널 알림 빈도 담당자
Development ERROR 이상 개발자 로컬 로그 즉시 개발자 본인
Staging WARNING 이상 #notice_dev 5분 단위 집계 개발팀
Production ERROR #notice_prod 즉시 운영팀 + 개발팀
Production CRITICAL #notice_prod + 전화 즉시 책임자

환경별 구현 예시

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# celery_config.py
from kombu import Queue, Exchange
import os

# 환경별 Celery 설정
CELERY_CONFIG = {
    'development': {
        'broker_url': 'redis://localhost:6379/0',
        'result_backend': 'redis://localhost:6379/0',
        'task_always_eager': True,  # 동기 실행 (디버깅용)
        'task_eager_propagates': True,
        'worker_log_format': '[%(asctime)s: %(levelname)s/%(processName)s] %(message)s',
        'worker_task_log_format': '[%(asctime)s: %(levelname)s/%(processName)s][%(task_name)s(%(task_id)s)] %(message)s'
    },
    'staging': {
        'broker_url': os.environ.get('CELERY_BROKER_URL'),
        'result_backend': os.environ.get('CELERY_RESULT_BACKEND'),
        'task_acks_late': True,
        'worker_prefetch_multiplier': 4,
        'task_soft_time_limit': 600,  # 10분
        'task_time_limit': 900,  # 15분
    },
    'production': {
        'broker_url': os.environ.get('CELERY_BROKER_URL'),
        'result_backend': os.environ.get('CELERY_RESULT_BACKEND'),
        'task_acks_late': True,
        'worker_prefetch_multiplier': 1,  # 안정성 우선
        'task_soft_time_limit': 1800,  # 30분
        'task_time_limit': 3600,  # 1시간
        'task_reject_on_worker_lost': True,
        'task_ignore_result': False,
        # 데드레터 큐 설정
        'task_queues': (
            Queue('default', Exchange('default'), routing_key='default'),
            Queue('critical', Exchange('critical'), routing_key='critical'),
            Queue('dead_letter', Exchange('dead_letter'), routing_key='dead_letter'),
        ),
    }
}

# 현재 환경 설정 적용
current_env = os.environ.get('DJANGO_ENV', 'development')
celery_app.conf.update(CELERY_CONFIG[current_env])

7. 모니터링 자동화 및 개선

CI/CD 파이프라인 통합

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# .github/workflows/deploy.yml
name: Deploy with Monitoring Setup

on:
  push:
    branches: [main, staging]

jobs:
  deploy:
    runs-on: ubuntu-latest
    
    steps:
      - name: Checkout code
        uses: actions/checkout@v2
      
      - name: Setup monitoring
        run: |
          # 환경 확인
          if [[ "$" == "refs/heads/main" ]]; then
            ENV="production"
          else
            ENV="staging"
          fi
          
          # Slack Webhook 검증
          aws secretsmanager describe-secret \
            --secret-id "slack-webhook-${ENV}" \
            --region ap-northeast-1 || {
              echo "Warning: Slack webhook not configured for ${ENV}"
              exit 1
            }
          
          # CloudWatch 알람 생성/업데이트
          python scripts/setup_cloudwatch_alarms.py --env ${ENV}
      
      - name: Deploy application
        run: |
          # 배포 스크립트 실행
          ./deploy.sh
      
      - name: Post-deployment test
        run: |
          # 모니터링 테스트
          python -m pytest tests/test_monitoring.py -v

모니터링 테스트

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# tests/test_monitoring.py
import pytest
from unittest.mock import patch, MagicMock
from tasks.error_handler import ErrorHandlingTask

class TestErrorHandling:
    
    @patch('requests.post')
    @patch('boto3.client')
    def test_slack_notification_production(self, mock_boto, mock_requests):
        """Production 환경에서 Slack 알림 테스트"""
        
        # Mock 설정
        mock_boto.return_value.get_secret_value.return_value = {
            'SecretString': '{"webhook_url": "https://hooks.slack.com/test"}'
        }
        mock_requests.return_value.status_code = 200
        
        # 테스트 실행
        with patch.dict('os.environ', {'DJANGO_ENV': 'production'}):
            task = ErrorHandlingTask()
            task._send_slack_notification(
                task_id='test-123',
                exc=ValueError('Test error'),
                args=(),
                kwargs={},
                einfo='Traceback...'
            )
        
        # 검증
        assert mock_requests.called
        assert 'production' in str(mock_requests.call_args)
    
    @patch('requests.post')
    def test_no_notification_development(self, mock_requests):
        """Development 환경에서는 알림이 가지 않는지 테스트"""
        
        with patch.dict('os.environ', {'DJANGO_ENV': 'development'}):
            task = ErrorHandlingTask()
            task._send_slack_notification(
                task_id='test-123',
                exc=ValueError('Test error'),
                args=(),
                kwargs={},
                einfo='Traceback...'
            )
        
        # Development 환경에서는 호출되지 않아야 함
        assert not mock_requests.called

8. 실전 운영 팁

알림 피로도 관리

graph LR
    A[에러 발생] --> B{중복 확인}
    B -->|신규| C[즉시 알림]
    B -->|중복| D{발생 빈도}
    
    D -->|5분 내 3회| E[집계 알림]
    D -->|1시간 내 10회| F[에스컬레이션]
    D -->|지속 발생| G[임시 무시]
    
    C --> H[Slack 전송]
    E --> I[요약 전송]
    F --> J[관리자 알림]
    
    style C fill:#9f9,stroke:#333,stroke-width:2px
    style F fill:#f96,stroke:#333,stroke-width:2px

알림 집계 구현

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# utils/notification_aggregator.py
from collections import defaultdict
from datetime import datetime, timedelta
import hashlib

class NotificationAggregator:
    def __init__(self, window_minutes=5, threshold=3):
        self.window_minutes = window_minutes
        self.threshold = threshold
        self.error_cache = defaultdict(list)
    
    def should_notify(self, error_key, error_details):
        """알림을 보낼지 결정"""
        
        now = datetime.now()
        error_hash = self._get_error_hash(error_key)
        
        # 기존 에러 기록 정리
        self.error_cache[error_hash] = [
            timestamp for timestamp in self.error_cache[error_hash]
            if now - timestamp < timedelta(minutes=self.window_minutes)
        ]
        
        # 새 에러 추가
        self.error_cache[error_hash].append(now)
        
        # 임계값 확인
        count = len(self.error_cache[error_hash])
        
        if count == 1:
            return True, "immediate"  # 첫 발생은 즉시 알림
        elif count == self.threshold:
            return True, "aggregated"  # 임계값 도달 시 집계 알림
        elif count % 10 == 0:
            return True, "escalation"  # 지속 발생 시 에스컬레이션
        else:
            return False, None
    
    def _get_error_hash(self, error_key):
        """에러 식별을 위한 해시 생성"""
        return hashlib.md5(error_key.encode()).hexdigest()

9. 모니터링 솔루션 선택 가이드

의사결정 플로우차트

graph TD
    A[모니터링 요구사항] --> B{팀 규모?}
    
    B -->|1-10명| C{기술 스택}
    B -->|10-50명| D{예산}
    B -->|50명+| E[엔터프라이즈<br/>솔루션 필요]
    
    C -->|AWS 중심| F[CloudWatch +<br/>Slack Webhook]
    C -->|멀티 클라우드| G[Datadog/New Relic]
    
    D -->|제한적| H[Amazon Q Developer +<br/>EventBridge]
    D -->|충분| I[Datadog + PagerDuty]
    
    E --> J[Systems Manager +<br/>ServiceNow]
    
    style F fill:#9f9,stroke:#333,stroke-width:2px
    style H fill:#9f9,stroke:#333,stroke-width:2px
    style I fill:#bbf,stroke:#333,stroke-width:2px

최종 권장사항

상황 권장 솔루션 구현 복잡도 월 비용 (예상)
스타트업 (< 10명) Slack Webhook + CloudWatch 기본 ⭐⭐ $0-50
성장기 (10-50명) Amazon Q Developer + Lambda ⭐⭐⭐ $50-200
대기업 (50명+) Datadog/New Relic + Incident Manager ⭐⭐⭐⭐ $500+

마무리

배치 처리 에러 모니터링은 단순히 알림을 받는 것 이상의 의미가 있습니다. 적절한 모니터링 전략은 시스템의 안정성을 높이고, 문제 해결 시간을 단축시키며, 궁극적으로 비즈니스 손실을 방지합니다.

핵심 포인트

환경별 알림 전략 수립: Development/Staging/Production 구분 ✅ Webhook URL 보안 관리: AWS Secrets Manager 활용 ✅ 알림 피로도 관리: 집계 및 에스컬레이션 정책 ✅ 테스트 자동화: 모니터링 시스템도 테스트 필요 ✅ 지속적 개선: 메트릭 분석을 통한 임계값 조정

작은 것부터 시작하되, 확장 가능한 구조로 설계하는 것이 중요합니다. Slack Webhook으로 시작해서 필요에 따라 더 정교한 솔루션으로 발전시켜 나가세요.

참고 자료


배치 처리 모니터링 구축 경험이나 더 나은 방법이 있다면 댓글로 공유해주세요!

이 기사는 저작권자의 CC BY 4.0 라이센스를 따릅니다.