📑 Table of Contents
- Problem Understanding
- Requirements & Scope
- High-Level Design
- Deep Dive
- Advanced Considerations
- Reference Materials
1. Problem Understanding
🧠 What is a Notification System?
A notification system sends alerts to users through multiple channels including mobile push notifications, SMS messages, and emails. It’s a critical component for user engagement and communication.
Notification Types:
- Push Notifications: Mobile app notifications (iOS/Android)
- SMS Messages: Text messages to phone numbers
- Email: Traditional email notifications
Common Use Cases:
- Breaking news alerts
- Product updates and offers
- Social media interactions
- System alerts and warnings
- Marketing campaigns
2. Requirements & Scope
🎯 Key Clarifying Questions
Notification Types and Channels:
- What notification types should we support (push, SMS, email)?
- Should we support rich notifications with images/actions?
- Do we need real-time delivery or is eventual delivery acceptable?
- Should we support notification scheduling?
Scale and Performance:
- How many notifications per day?
- What’s the expected user base?
- Do we need global distribution?
- What’s the acceptable delivery latency?
Features and Behavior:
- Should users be able to opt-out of notifications?
- Do we need notification templates?
- Should we track delivery status and analytics?
- Do we need rate limiting per user?
📋 Functional Requirements
- Multi-channel Support: Send via push, SMS, and email
- Template System: Customizable notification templates
- Scheduling: Support for delayed and scheduled notifications
- User Preferences: Opt-in/opt-out management
- Delivery Tracking: Monitor notification delivery status
📊 Non-Functional Requirements
- Scale: 10 million mobile push notifications daily
- Performance: Soft real-time delivery (< 30 seconds)
- Reliability: High availability with fault tolerance
- Security: Secure handling of user data and preferences
- Extensibility: Easy to add new notification channels
3. High-Level Design
🏗️ System Architecture
[Services] → [Notification Service] → [Message Queue] → [Workers] → [3rd Party APIs]
↓ ↑ ↓
[Templates] [Rate Limiter] [Analytics]
↓ ↓ ↓
[User Preferences] → [Notification DB] ← [Delivery Status]
🔧 Core Components
1. Notification API
from fastapi import FastAPI
from pydantic import BaseModel
from typing import List, Optional
class NotificationRequest(BaseModel):
user_ids: List[str]
channels: List[str] # ['push', 'sms', 'email']
template_id: str
data: dict
schedule_time: Optional[datetime] = None
@app.post("/api/v1/notifications/send")
async def send_notification(request: NotificationRequest):
for user_id in request.user_ids:
for channel in request.channels:
await notification_service.send(
user_id=user_id,
channel=channel,
template_id=request.template_id,
data=request.data,
schedule_time=request.schedule_time
)
return {"status": "queued"}
2. Message Queue System
import asyncio
from celery import Celery
app = Celery('notification_worker')
@app.task(bind=True, max_retries=3)
def send_notification_task(self, notification_data):
try:
channel = notification_data['channel']
if channel == 'push':
return send_push_notification(notification_data)
elif channel == 'sms':
return send_sms_notification(notification_data)
elif channel == 'email':
return send_email_notification(notification_data)
except Exception as e:
self.retry(countdown=60, exc=e)
3. Channel-Specific Services
Push Notification Service:
import requests
class PushNotificationService:
def __init__(self):
self.fcm_key = "YOUR_FCM_SERVER_KEY"
self.apns_cert = "path/to/apns_cert.pem"
async def send_push(self, device_token, message):
# Firebase Cloud Messaging for Android
if device_token.startswith('android:'):
await self.send_fcm(device_token, message)
# Apple Push Notification Service for iOS
elif device_token.startswith('ios:'):
await self.send_apns(device_token, message)
async def send_fcm(self, token, message):
payload = {
"to": token,
"notification": {
"title": message.title,
"body": message.body
}
}
response = requests.post(
"https://fcm.googleapis.com/fcm/send",
json=payload,
headers={"Authorization": f"key={self.fcm_key}"}
)
return response.status_code == 200
SMS Service:
import boto3
class SMSService:
def __init__(self):
self.sns_client = boto3.client('sns')
async def send_sms(self, phone_number, message):
try:
response = self.sns_client.publish(
PhoneNumber=phone_number,
Message=message.body
)
return response['ResponseMetadata']['HTTPStatusCode'] == 200
except Exception as e:
logging.error(f"Failed to send SMS: {e}")
return False
Email Service:
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
class EmailService:
def __init__(self):
self.smtp_server = "smtp.gmail.com"
self.smtp_port = 587
self.username = "notifications@yourapp.com"
self.password = "your_app_password"
async def send_email(self, to_email, message):
msg = MIMEMultipart()
msg['From'] = self.username
msg['To'] = to_email
msg['Subject'] = message.title
msg.attach(MIMEText(message.body, 'html'))
try:
server = smtplib.SMTP(self.smtp_server, self.smtp_port)
server.starttls()
server.login(self.username, self.password)
server.sendmail(self.username, to_email, msg.as_string())
server.quit()
return True
except Exception as e:
logging.error(f"Failed to send email: {e}")
return False
4. Deep Dive
📄 Template System
from jinja2 import Template
class NotificationTemplate:
def __init__(self, template_id, title_template, body_template):
self.id = template_id
self.title = Template(title_template)
self.body = Template(body_template)
def render(self, data):
return {
'title': self.title.render(**data),
'body': self.body.render(**data)
}
# Example templates
templates = {
'welcome': NotificationTemplate(
'welcome',
'Welcome to {{app_name}}!',
'Hi {{user_name}}, welcome to our amazing app!'
),
'order_update': NotificationTemplate(
'order_update',
'Order #{{order_id}} Update',
'Your order #{{order_id}} is now {{status}}'
)
}
👤 User Preference Management
from sqlalchemy import Column, String, Boolean, JSON
class UserNotificationPreference(Base):
__tablename__ = 'user_notification_preferences'
user_id = Column(String, primary_key=True)
push_enabled = Column(Boolean, default=True)
sms_enabled = Column(Boolean, default=True)
email_enabled = Column(Boolean, default=True)
categories = Column(JSON) # {"marketing": false, "updates": true}
quiet_hours = Column(JSON) # {"start": "22:00", "end": "08:00"}
class PreferenceService:
def should_send(self, user_id, channel, category):
prefs = self.get_user_preferences(user_id)
# Check channel preference
if channel == 'push' and not prefs.push_enabled:
return False
elif channel == 'sms' and not prefs.sms_enabled:
return False
elif channel == 'email' and not prefs.email_enabled:
return False
# Check category preference
if not prefs.categories.get(category, True):
return False
# Check quiet hours for push notifications
if channel == 'push' and self.is_quiet_hours(prefs.quiet_hours):
return False
return True
🚦 Rate Limiting
import redis
import time
class RateLimiter:
def __init__(self):
self.redis_client = redis.Redis()
def is_allowed(self, user_id, channel, limit_per_hour=10):
key = f"rate_limit:{user_id}:{channel}"
current_time = int(time.time())
hour_window = current_time // 3600
# Use sliding window counter
pipe = self.redis_client.pipeline()
pipe.zremrangebyscore(key, 0, current_time - 3600)
pipe.zcard(key)
pipe.zadd(key, {str(current_time): current_time})
pipe.expire(key, 3600)
results = pipe.execute()
current_count = results[1]
return current_count < limit_per_hour
📊 Delivery Tracking
from enum import Enum
class DeliveryStatus(Enum):
PENDING = "pending"
SENT = "sent"
DELIVERED = "delivered"
FAILED = "failed"
CLICKED = "clicked"
class NotificationLog(Base):
__tablename__ = 'notification_logs'
id = Column(String, primary_key=True)
user_id = Column(String, index=True)
channel = Column(String)
template_id = Column(String)
status = Column(Enum(DeliveryStatus))
sent_at = Column(DateTime)
delivered_at = Column(DateTime)
clicked_at = Column(DateTime)
error_message = Column(String)
class DeliveryTracker:
def track_sent(self, notification_id):
self.update_status(notification_id, DeliveryStatus.SENT)
def track_delivered(self, notification_id):
self.update_status(notification_id, DeliveryStatus.DELIVERED)
def track_clicked(self, notification_id):
self.update_status(notification_id, DeliveryStatus.CLICKED)
5. Advanced Considerations
🔄 Retry Mechanism
import asyncio
from dataclasses import dataclass
from typing import Optional
@dataclass
class RetryConfig:
max_attempts: int = 3
base_delay: float = 1.0
max_delay: float = 60.0
exponential_backoff: bool = True
class RetryableNotificationSender:
async def send_with_retry(self, notification, config: RetryConfig):
for attempt in range(config.max_attempts):
try:
result = await self.send_notification(notification)
if result.success:
return result
except Exception as e:
if attempt == config.max_attempts - 1:
raise e
delay = self.calculate_delay(attempt, config)
await asyncio.sleep(delay)
raise Exception("Max retry attempts exceeded")
def calculate_delay(self, attempt, config):
if config.exponential_backoff:
delay = config.base_delay * (2 ** attempt)
else:
delay = config.base_delay
return min(delay, config.max_delay)
📈 Analytics and Monitoring
import prometheus_client
from datetime import datetime, timedelta
class NotificationMetrics:
def __init__(self):
self.sent_counter = prometheus_client.Counter(
'notifications_sent_total',
'Total notifications sent',
['channel', 'template']
)
self.delivery_rate = prometheus_client.Gauge(
'notifications_delivery_rate',
'Notification delivery rate',
['channel']
)
self.latency_histogram = prometheus_client.Histogram(
'notification_delivery_latency_seconds',
'Notification delivery latency',
['channel']
)
def record_sent(self, channel, template):
self.sent_counter.labels(channel=channel, template=template).inc()
def record_delivery_time(self, channel, latency_seconds):
self.latency_histogram.labels(channel=channel).observe(latency_seconds)
def update_delivery_rates(self):
# Calculate delivery rates for the last hour
for channel in ['push', 'sms', 'email']:
rate = self.calculate_delivery_rate(channel)
self.delivery_rate.labels(channel=channel).set(rate)
🌍 Global Distribution
class GlobalNotificationService:
def __init__(self):
self.regional_services = {
'us-east': NotificationService('us-east'),
'eu-west': NotificationService('eu-west'),
'asia-pacific': NotificationService('asia-pacific')
}
def get_service_for_user(self, user_id):
user_region = self.user_location_service.get_region(user_id)
return self.regional_services.get(user_region, self.regional_services['us-east'])
async def send_notification(self, user_id, notification):
service = self.get_service_for_user(user_id)
return await service.send(notification)
🔐 Security Considerations
import jwt
from cryptography.fernet import Fernet
class SecureNotificationService:
def __init__(self):
self.encryption_key = Fernet.generate_key()
self.cipher = Fernet(self.encryption_key)
def encrypt_sensitive_data(self, data):
return self.cipher.encrypt(data.encode()).decode()
def decrypt_sensitive_data(self, encrypted_data):
return self.cipher.decrypt(encrypted_data.encode()).decode()
def generate_unsubscribe_token(self, user_id, notification_type):
payload = {
'user_id': user_id,
'notification_type': notification_type,
'exp': datetime.utcnow() + timedelta(days=30)
}
return jwt.encode(payload, self.secret_key, algorithm='HS256')
6. Reference Materials
Real-World Examples
System Design Resources
- High Scalability: Notification Systems
- Push Notification Best Practices
- Email Delivery Best Practices