[AWS] 배치 처리 에러 모니터링 전략: CloudWatch부터 Slack까지 완벽 가이드
[AWS] 배치 처리 에러 모니터링 전략: CloudWatch부터 Slack까지 완벽 가이드
들어가며
최근 야간 배치 처리 중 발생한 에러를 즉시 인지하지 못해 큰 문제가 발생할 뻔한 경험이 있었습니다. CloudWatch에서 일부 지표는 Amazon Q Developer(구 AWS Chatbot)로 알림을 받고 있었지만, 정작 중요한 배치 처리 에러는 알림이 없었죠.
이번 포스트에서는 AWS 환경에서 배치 처리 에러를 효과적으로 모니터링하는 다양한 방법을 비교 분석하고, 특히 Python Django + Celery 환경에서 Slack 통합을 구현하는 방법을 자세히 다루겠습니다.
1. 배치 처리 모니터링의 중요성
왜 배치 처리 모니터링이 중요한가?
graph TD
A[배치 작업 실행] --> B{정상 처리?}
B -->|Yes| C[완료]
B -->|No| D[에러 발생]
D --> E{모니터링<br/>시스템?}
E -->|없음| F[문제 발견까지<br/>긴 시간 소요]
E -->|있음| G[즉시 알림]
F --> H[비즈니스 손실]
G --> I[빠른 대응]
style D fill:#f96,stroke:#333,stroke-width:2px
style G fill:#9f9,stroke:#333,stroke-width:2px
style H fill:#f66,stroke:#333,stroke-width:2px
배치 처리 실패의 영향
| 배치 유형 | 실패 시 영향 | 대응 시간 요구사항 |
|---|---|---|
| 데이터 집계 | 대시보드 정보 부정확 | 1-2시간 내 |
| 결제 처리 | 매출 손실, 고객 불만 | 즉시 |
| 리포트 생성 | 의사결정 지연 | 업무 시작 전 |
| 백업 작업 | 데이터 손실 위험 | 30분 내 |
| ETL 파이프라인 | 다운스트림 작업 실패 | 15분 내 |
2. AWS 배치 모니터링 솔루션 비교
주요 모니터링 방법 개요
mindmap
root((배치 모니터링<br/>솔루션))
AWS 네이티브
CloudWatch Alarms
Amazon Q Developer
EventBridge + Lambda
Systems Manager
직접 통합
Slack Webhook
Email/SMS
서드파티
Datadog
New Relic
PagerDuty
솔루션별 상세 비교
| 솔루션 | 구현 난이도 | 비용 | 응답 속도 | 커스터마이징 | 권장 사용 시나리오 |
|---|---|---|---|---|---|
| CloudWatch + SNS | ⭐⭐ | $ | 1-2분 | 중간 | 기본적인 임계값 모니터링 |
| Amazon Q Developer | ⭐⭐⭐ | $$ | 실시간 | 낮음 | CloudWatch 중심 환경 |
| Slack Webhook 직접 통합 | ⭐ | 무료 | 즉시 | 높음 | 개발팀 즉시 알림 |
| EventBridge + Lambda | ⭐⭐⭐⭐ | $$ | 실시간 | 매우 높음 | 복잡한 워크플로우 |
| Systems Manager | ⭐⭐⭐⭐⭐ | $$$ | 실시간 | 높음 | 엔터프라이즈 환경 |
| Datadog/New Relic | ⭐⭐⭐ | \(\) | 실시간 | 높음 | 종합 모니터링 필요 |
3. Django + Celery 환경에서의 구현
아키텍처 구성
graph LR
subgraph Django Application
A[Django App] --> B[Celery Task]
end
subgraph Message Broker
B --> C[Redis/RabbitMQ]
end
subgraph Worker
C --> D[Celery Worker]
D --> E{Task 실행}
end
subgraph Error Handling
E -->|성공| F[완료]
E -->|실패| G[Error Handler]
G --> H[Slack 알림]
G --> I[CloudWatch 로그]
G --> J[DB 저장]
end
style E fill:#bbf,stroke:#333,stroke-width:2px
style G fill:#f96,stroke:#333,stroke-width:2px
style H fill:#9f9,stroke:#333,stroke-width:2px
Celery 에러 핸들러 구현
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# tasks/error_handler.py
import json
import requests
from celery import Task
from celery.utils.log import get_task_logger
from django.conf import settings
import boto3
logger = get_task_logger(__name__)
class ErrorHandlingTask(Task):
"""에러 처리를 위한 기본 Task 클래스"""
def on_failure(self, exc, task_id, args, kwargs, einfo):
"""작업 실패 시 호출되는 메서드"""
# 1. 로그 기록
logger.error(f'Task {self.name}[{task_id}] failed: {exc}')
# 2. CloudWatch 메트릭 전송
self._send_cloudwatch_metric(task_id, exc)
# 3. Slack 알림 전송
self._send_slack_notification(task_id, exc, args, kwargs, einfo)
# 4. DB에 에러 기록
self._save_error_to_db(task_id, exc, args, kwargs)
def _send_slack_notification(self, task_id, exc, args, kwargs, einfo):
"""Slack으로 에러 알림 전송"""
# 환경별 처리
env = settings.ENVIRONMENT # 'development', 'staging', 'production'
# Production만 알림을 보내고 싶다면
if env != 'production' and not settings.DEBUG_SLACK_NOTIFICATIONS:
return
# 환경별 Webhook URL 가져오기
webhook_url = self._get_webhook_url_for_env(env)
if not webhook_url:
logger.warning(f"No Slack webhook configured for {env}")
return
# 메시지 구성
message = self._build_slack_message(task_id, exc, args, kwargs, einfo, env)
try:
response = requests.post(webhook_url, json=message)
response.raise_for_status()
except requests.exceptions.RequestException as e:
logger.error(f"Failed to send Slack notification: {e}")
def _get_webhook_url_for_env(self, env):
"""AWS Secrets Manager에서 환경별 Webhook URL 가져오기"""
# Secrets Manager 클라이언트
client = boto3.client('secretsmanager', region_name='ap-northeast-1')
try:
# 환경별 시크릿 이름
secret_name = f"slack-webhook-{env}"
response = client.get_secret_value(SecretId=secret_name)
secret = json.loads(response['SecretString'])
return secret.get('webhook_url')
except Exception as e:
logger.error(f"Failed to retrieve webhook URL: {e}")
return None
def _build_slack_message(self, task_id, exc, args, kwargs, einfo, env):
"""Slack 메시지 포맷 구성"""
# 환경별 색상
color_map = {
'production': '#FF0000', # 빨강
'staging': '#FFA500', # 주황
'development': '#0000FF' # 파랑
}
return {
"attachments": [{
"color": color_map.get(env, '#808080'),
"title": f"🚨 Celery Task Failed - {env.upper()}",
"fields": [
{
"title": "Task Name",
"value": self.name,
"short": True
},
{
"title": "Task ID",
"value": task_id,
"short": True
},
{
"title": "Error Type",
"value": type(exc).__name__,
"short": True
},
{
"title": "Environment",
"value": env,
"short": True
},
{
"title": "Error Message",
"value": str(exc),
"short": False
},
{
"title": "Traceback",
"value": f"```{einfo}```",
"short": False
}
],
"footer": "Celery Error Handler",
"ts": int(time.time())
}]
}
환경별 설정 관리
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# settings/base.py
import os
# 환경 설정
ENVIRONMENT = os.environ.get('DJANGO_ENV', 'development')
# Slack 설정
SLACK_NOTIFICATIONS = {
'development': {
'enabled': os.environ.get('SLACK_DEV_ENABLED', 'false').lower() == 'true',
'channel': '#42b_talentquest_notice_dev',
'secret_name': 'slack-webhook-development'
},
'staging': {
'enabled': os.environ.get('SLACK_STAGING_ENABLED', 'false').lower() == 'true',
'channel': '#42b_talentquest_notice_dev',
'secret_name': 'slack-webhook-staging'
},
'production': {
'enabled': True, # Production은 항상 활성화
'channel': '#42b_talentquest_notice_prod',
'secret_name': 'slack-webhook-production'
}
}
# 디버그용 플래그 (개발 환경에서도 알림을 받고 싶을 때)
DEBUG_SLACK_NOTIFICATIONS = os.environ.get('DEBUG_SLACK_NOTIFICATIONS', 'false').lower() == 'true'
4. AWS Secrets Manager를 활용한 Webhook URL 관리
시크릿 저장 구조
1
2
3
4
5
6
7
{
"webhook_url": "https://hooks.slack.com/services/T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX",
"channel": "#42b_talentquest_notice_prod",
"created_by": "admin@company.com",
"created_at": "2025-01-18T10:00:00Z",
"description": "Production Slack webhook for batch error notifications"
}
Secrets Manager 설정 스크립트
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
#!/bin/bash
# scripts/setup_slack_secrets.sh
# Production webhook
aws secretsmanager create-secret \
--name "slack-webhook-production" \
--description "Slack webhook URL for production environment" \
--secret-string '{
"webhook_url": "https://hooks.slack.com/services/PROD_WEBHOOK_URL",
"channel": "#42b_talentquest_notice_prod"
}' \
--region ap-northeast-1
# Staging webhook (선택적)
aws secretsmanager create-secret \
--name "slack-webhook-staging" \
--description "Slack webhook URL for staging environment" \
--secret-string '{
"webhook_url": "https://hooks.slack.com/services/STAGING_WEBHOOK_URL",
"channel": "#42b_talentquest_notice_dev"
}' \
--region ap-northeast-1
# IAM 정책 생성
aws iam create-policy \
--policy-name CelerySecretsAccess \
--policy-document '{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"secretsmanager:GetSecretValue"
],
"Resource": [
"arn:aws:secretsmanager:ap-northeast-1:*:secret:slack-webhook-*"
]
}
]
}'
5. 실전 구현: 배치 작업 모니터링
배치 작업 정의
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# tasks/batch_tasks.py
from celery import shared_task
from .error_handler import ErrorHandlingTask
import time
import random
@shared_task(base=ErrorHandlingTask, bind=True, max_retries=3)
def process_daily_aggregation(self, date_str):
"""일일 데이터 집계 작업"""
try:
# 시작 알림 (선택적)
self._notify_task_started(date_str)
# 실제 작업 수행
logger.info(f"Starting daily aggregation for {date_str}")
# 데이터 처리 로직
result = perform_aggregation(date_str)
# 성공 메트릭 전송
self._send_success_metric()
return result
except Exception as exc:
# 재시도 로직
logger.error(f"Task failed, retrying... ({self.request.retries}/{self.max_retries})")
# 지수 백오프
countdown = 60 * (2 ** self.request.retries)
raise self.retry(exc=exc, countdown=countdown)
@shared_task(base=ErrorHandlingTask, bind=True)
def cleanup_old_data(self, days=30):
"""오래된 데이터 정리 작업"""
try:
deleted_count = 0
# 배치로 처리
for batch in get_old_data_batches(days):
deleted_count += delete_batch(batch)
# 진행 상황 로깅
if deleted_count % 1000 == 0:
logger.info(f"Deleted {deleted_count} records so far...")
# 완료 알림
self._notify_cleanup_completed(deleted_count)
return {"deleted": deleted_count}
except Exception as exc:
# ErrorHandlingTask가 자동으로 Slack 알림 전송
raise
모니터링 대시보드 구성
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# monitoring/batch_monitor.py
import boto3
from datetime import datetime, timedelta
from django.core.management.base import BaseCommand
class BatchMonitor:
def __init__(self):
self.cloudwatch = boto3.client('cloudwatch', region_name='ap-northeast-1')
def check_batch_health(self):
"""배치 작업 상태 확인"""
metrics = {
'failed_tasks': self._get_failed_task_count(),
'pending_tasks': self._get_pending_task_count(),
'average_duration': self._get_average_duration(),
'last_success_time': self._get_last_success_time()
}
# 임계값 확인
alerts = []
if metrics['failed_tasks'] > 5:
alerts.append({
'severity': 'critical',
'message': f"Failed tasks: {metrics['failed_tasks']}"
})
if metrics['pending_tasks'] > 100:
alerts.append({
'severity': 'warning',
'message': f"Pending tasks: {metrics['pending_tasks']}"
})
if metrics['average_duration'] > 300: # 5분
alerts.append({
'severity': 'warning',
'message': f"Slow performance: {metrics['average_duration']}s average"
})
return metrics, alerts
def send_health_report(self):
"""일일 헬스 리포트 전송"""
metrics, alerts = self.check_batch_health()
# Slack으로 일일 리포트 전송
if alerts or datetime.now().hour == 9: # 오전 9시 또는 알림 발생 시
self._send_slack_health_report(metrics, alerts)
6. 환경별 알림 전략
알림 정책 매트릭스
| 환경 | 에러 레벨 | 알림 채널 | 알림 빈도 | 담당자 |
|---|---|---|---|---|
| Development | ERROR 이상 | 개발자 로컬 로그 | 즉시 | 개발자 본인 |
| Staging | WARNING 이상 | #notice_dev | 5분 단위 집계 | 개발팀 |
| Production | ERROR | #notice_prod | 즉시 | 운영팀 + 개발팀 |
| Production | CRITICAL | #notice_prod + 전화 | 즉시 | 책임자 |
환경별 구현 예시
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# celery_config.py
from kombu import Queue, Exchange
import os
# 환경별 Celery 설정
CELERY_CONFIG = {
'development': {
'broker_url': 'redis://localhost:6379/0',
'result_backend': 'redis://localhost:6379/0',
'task_always_eager': True, # 동기 실행 (디버깅용)
'task_eager_propagates': True,
'worker_log_format': '[%(asctime)s: %(levelname)s/%(processName)s] %(message)s',
'worker_task_log_format': '[%(asctime)s: %(levelname)s/%(processName)s][%(task_name)s(%(task_id)s)] %(message)s'
},
'staging': {
'broker_url': os.environ.get('CELERY_BROKER_URL'),
'result_backend': os.environ.get('CELERY_RESULT_BACKEND'),
'task_acks_late': True,
'worker_prefetch_multiplier': 4,
'task_soft_time_limit': 600, # 10분
'task_time_limit': 900, # 15분
},
'production': {
'broker_url': os.environ.get('CELERY_BROKER_URL'),
'result_backend': os.environ.get('CELERY_RESULT_BACKEND'),
'task_acks_late': True,
'worker_prefetch_multiplier': 1, # 안정성 우선
'task_soft_time_limit': 1800, # 30분
'task_time_limit': 3600, # 1시간
'task_reject_on_worker_lost': True,
'task_ignore_result': False,
# 데드레터 큐 설정
'task_queues': (
Queue('default', Exchange('default'), routing_key='default'),
Queue('critical', Exchange('critical'), routing_key='critical'),
Queue('dead_letter', Exchange('dead_letter'), routing_key='dead_letter'),
),
}
}
# 현재 환경 설정 적용
current_env = os.environ.get('DJANGO_ENV', 'development')
celery_app.conf.update(CELERY_CONFIG[current_env])
7. 모니터링 자동화 및 개선
CI/CD 파이프라인 통합
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# .github/workflows/deploy.yml
name: Deploy with Monitoring Setup
on:
push:
branches: [main, staging]
jobs:
deploy:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v2
- name: Setup monitoring
run: |
# 환경 확인
if [[ "$" == "refs/heads/main" ]]; then
ENV="production"
else
ENV="staging"
fi
# Slack Webhook 검증
aws secretsmanager describe-secret \
--secret-id "slack-webhook-${ENV}" \
--region ap-northeast-1 || {
echo "Warning: Slack webhook not configured for ${ENV}"
exit 1
}
# CloudWatch 알람 생성/업데이트
python scripts/setup_cloudwatch_alarms.py --env ${ENV}
- name: Deploy application
run: |
# 배포 스크립트 실행
./deploy.sh
- name: Post-deployment test
run: |
# 모니터링 테스트
python -m pytest tests/test_monitoring.py -v
모니터링 테스트
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# tests/test_monitoring.py
import pytest
from unittest.mock import patch, MagicMock
from tasks.error_handler import ErrorHandlingTask
class TestErrorHandling:
@patch('requests.post')
@patch('boto3.client')
def test_slack_notification_production(self, mock_boto, mock_requests):
"""Production 환경에서 Slack 알림 테스트"""
# Mock 설정
mock_boto.return_value.get_secret_value.return_value = {
'SecretString': '{"webhook_url": "https://hooks.slack.com/test"}'
}
mock_requests.return_value.status_code = 200
# 테스트 실행
with patch.dict('os.environ', {'DJANGO_ENV': 'production'}):
task = ErrorHandlingTask()
task._send_slack_notification(
task_id='test-123',
exc=ValueError('Test error'),
args=(),
kwargs={},
einfo='Traceback...'
)
# 검증
assert mock_requests.called
assert 'production' in str(mock_requests.call_args)
@patch('requests.post')
def test_no_notification_development(self, mock_requests):
"""Development 환경에서는 알림이 가지 않는지 테스트"""
with patch.dict('os.environ', {'DJANGO_ENV': 'development'}):
task = ErrorHandlingTask()
task._send_slack_notification(
task_id='test-123',
exc=ValueError('Test error'),
args=(),
kwargs={},
einfo='Traceback...'
)
# Development 환경에서는 호출되지 않아야 함
assert not mock_requests.called
8. 실전 운영 팁
알림 피로도 관리
graph LR
A[에러 발생] --> B{중복 확인}
B -->|신규| C[즉시 알림]
B -->|중복| D{발생 빈도}
D -->|5분 내 3회| E[집계 알림]
D -->|1시간 내 10회| F[에스컬레이션]
D -->|지속 발생| G[임시 무시]
C --> H[Slack 전송]
E --> I[요약 전송]
F --> J[관리자 알림]
style C fill:#9f9,stroke:#333,stroke-width:2px
style F fill:#f96,stroke:#333,stroke-width:2px
알림 집계 구현
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# utils/notification_aggregator.py
from collections import defaultdict
from datetime import datetime, timedelta
import hashlib
class NotificationAggregator:
def __init__(self, window_minutes=5, threshold=3):
self.window_minutes = window_minutes
self.threshold = threshold
self.error_cache = defaultdict(list)
def should_notify(self, error_key, error_details):
"""알림을 보낼지 결정"""
now = datetime.now()
error_hash = self._get_error_hash(error_key)
# 기존 에러 기록 정리
self.error_cache[error_hash] = [
timestamp for timestamp in self.error_cache[error_hash]
if now - timestamp < timedelta(minutes=self.window_minutes)
]
# 새 에러 추가
self.error_cache[error_hash].append(now)
# 임계값 확인
count = len(self.error_cache[error_hash])
if count == 1:
return True, "immediate" # 첫 발생은 즉시 알림
elif count == self.threshold:
return True, "aggregated" # 임계값 도달 시 집계 알림
elif count % 10 == 0:
return True, "escalation" # 지속 발생 시 에스컬레이션
else:
return False, None
def _get_error_hash(self, error_key):
"""에러 식별을 위한 해시 생성"""
return hashlib.md5(error_key.encode()).hexdigest()
9. 모니터링 솔루션 선택 가이드
의사결정 플로우차트
graph TD
A[모니터링 요구사항] --> B{팀 규모?}
B -->|1-10명| C{기술 스택}
B -->|10-50명| D{예산}
B -->|50명+| E[엔터프라이즈<br/>솔루션 필요]
C -->|AWS 중심| F[CloudWatch +<br/>Slack Webhook]
C -->|멀티 클라우드| G[Datadog/New Relic]
D -->|제한적| H[Amazon Q Developer +<br/>EventBridge]
D -->|충분| I[Datadog + PagerDuty]
E --> J[Systems Manager +<br/>ServiceNow]
style F fill:#9f9,stroke:#333,stroke-width:2px
style H fill:#9f9,stroke:#333,stroke-width:2px
style I fill:#bbf,stroke:#333,stroke-width:2px
최종 권장사항
| 상황 | 권장 솔루션 | 구현 복잡도 | 월 비용 (예상) |
|---|---|---|---|
| 스타트업 (< 10명) | Slack Webhook + CloudWatch 기본 | ⭐⭐ | $0-50 |
| 성장기 (10-50명) | Amazon Q Developer + Lambda | ⭐⭐⭐ | $50-200 |
| 대기업 (50명+) | Datadog/New Relic + Incident Manager | ⭐⭐⭐⭐ | $500+ |
마무리
배치 처리 에러 모니터링은 단순히 알림을 받는 것 이상의 의미가 있습니다. 적절한 모니터링 전략은 시스템의 안정성을 높이고, 문제 해결 시간을 단축시키며, 궁극적으로 비즈니스 손실을 방지합니다.
핵심 포인트
✅ 환경별 알림 전략 수립: Development/Staging/Production 구분 ✅ Webhook URL 보안 관리: AWS Secrets Manager 활용 ✅ 알림 피로도 관리: 집계 및 에스컬레이션 정책 ✅ 테스트 자동화: 모니터링 시스템도 테스트 필요 ✅ 지속적 개선: 메트릭 분석을 통한 임계값 조정
작은 것부터 시작하되, 확장 가능한 구조로 설계하는 것이 중요합니다. Slack Webhook으로 시작해서 필요에 따라 더 정교한 솔루션으로 발전시켜 나가세요.
참고 자료
- Celery Best Practices
- AWS Secrets Manager Python SDK
- Slack Incoming Webhooks
- Amazon Q Developer Documentation
배치 처리 모니터링 구축 경험이나 더 나은 방법이 있다면 댓글로 공유해주세요!
이 기사는 저작권자의 CC BY 4.0 라이센스를 따릅니다.