📑 Table of Contents

  1. Problem Understanding
  2. Requirements & Scope
  3. High-Level Design
  4. Deep Dive
  5. Advanced Considerations
  6. Reference Materials

1. Problem Understanding

🧠 What is a Notification System?

A notification system sends alerts to users through multiple channels including mobile push notifications, SMS messages, and emails. It’s a critical component for user engagement and communication.

Notification Types:

Common Use Cases:


2. Requirements & Scope

🎯 Key Clarifying Questions

Notification Types and Channels:

Scale and Performance:

Features and Behavior:

📋 Functional Requirements

📊 Non-Functional Requirements


3. High-Level Design

🏗️ System Architecture

[Services] → [Notification Service] → [Message Queue] → [Workers] → [3rd Party APIs]
                     ↓                     ↑                ↓
                [Templates]          [Rate Limiter]    [Analytics]
                     ↓                     ↓                ↓
               [User Preferences] → [Notification DB] ← [Delivery Status]

🔧 Core Components

1. Notification API

from fastapi import FastAPI
from pydantic import BaseModel
from typing import List, Optional

class NotificationRequest(BaseModel):
    user_ids: List[str]
    channels: List[str]  # ['push', 'sms', 'email']
    template_id: str
    data: dict
    schedule_time: Optional[datetime] = None

@app.post("/api/v1/notifications/send")
async def send_notification(request: NotificationRequest):
    for user_id in request.user_ids:
        for channel in request.channels:
            await notification_service.send(
                user_id=user_id,
                channel=channel,
                template_id=request.template_id,
                data=request.data,
                schedule_time=request.schedule_time
            )
    return {"status": "queued"}

2. Message Queue System

import asyncio
from celery import Celery

app = Celery('notification_worker')

@app.task(bind=True, max_retries=3)
def send_notification_task(self, notification_data):
    try:
        channel = notification_data['channel']
        if channel == 'push':
            return send_push_notification(notification_data)
        elif channel == 'sms':
            return send_sms_notification(notification_data)
        elif channel == 'email':
            return send_email_notification(notification_data)
    except Exception as e:
        self.retry(countdown=60, exc=e)

3. Channel-Specific Services

Push Notification Service:

import requests

class PushNotificationService:
    def __init__(self):
        self.fcm_key = "YOUR_FCM_SERVER_KEY"
        self.apns_cert = "path/to/apns_cert.pem"
    
    async def send_push(self, device_token, message):
        # Firebase Cloud Messaging for Android
        if device_token.startswith('android:'):
            await self.send_fcm(device_token, message)
        # Apple Push Notification Service for iOS
        elif device_token.startswith('ios:'):
            await self.send_apns(device_token, message)
    
    async def send_fcm(self, token, message):
        payload = {
            "to": token,
            "notification": {
                "title": message.title,
                "body": message.body
            }
        }
        response = requests.post(
            "https://fcm.googleapis.com/fcm/send",
            json=payload,
            headers={"Authorization": f"key={self.fcm_key}"}
        )
        return response.status_code == 200

SMS Service:

import boto3

class SMSService:
    def __init__(self):
        self.sns_client = boto3.client('sns')
    
    async def send_sms(self, phone_number, message):
        try:
            response = self.sns_client.publish(
                PhoneNumber=phone_number,
                Message=message.body
            )
            return response['ResponseMetadata']['HTTPStatusCode'] == 200
        except Exception as e:
            logging.error(f"Failed to send SMS: {e}")
            return False

Email Service:

import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart

class EmailService:
    def __init__(self):
        self.smtp_server = "smtp.gmail.com"
        self.smtp_port = 587
        self.username = "notifications@yourapp.com"
        self.password = "your_app_password"
    
    async def send_email(self, to_email, message):
        msg = MIMEMultipart()
        msg['From'] = self.username
        msg['To'] = to_email
        msg['Subject'] = message.title
        
        msg.attach(MIMEText(message.body, 'html'))
        
        try:
            server = smtplib.SMTP(self.smtp_server, self.smtp_port)
            server.starttls()
            server.login(self.username, self.password)
            server.sendmail(self.username, to_email, msg.as_string())
            server.quit()
            return True
        except Exception as e:
            logging.error(f"Failed to send email: {e}")
            return False

4. Deep Dive

📄 Template System

from jinja2 import Template

class NotificationTemplate:
    def __init__(self, template_id, title_template, body_template):
        self.id = template_id
        self.title = Template(title_template)
        self.body = Template(body_template)
    
    def render(self, data):
        return {
            'title': self.title.render(**data),
            'body': self.body.render(**data)
        }

# Example templates
templates = {
    'welcome': NotificationTemplate(
        'welcome',
        'Welcome to {{app_name}}!',
        'Hi {{user_name}}, welcome to our amazing app!'
    ),
    'order_update': NotificationTemplate(
        'order_update',
        'Order #{{order_id}} Update',
        'Your order #{{order_id}} is now {{status}}'
    )
}

👤 User Preference Management

from sqlalchemy import Column, String, Boolean, JSON

class UserNotificationPreference(Base):
    __tablename__ = 'user_notification_preferences'
    
    user_id = Column(String, primary_key=True)
    push_enabled = Column(Boolean, default=True)
    sms_enabled = Column(Boolean, default=True)
    email_enabled = Column(Boolean, default=True)
    categories = Column(JSON)  # {"marketing": false, "updates": true}
    quiet_hours = Column(JSON)  # {"start": "22:00", "end": "08:00"}

class PreferenceService:
    def should_send(self, user_id, channel, category):
        prefs = self.get_user_preferences(user_id)
        
        # Check channel preference
        if channel == 'push' and not prefs.push_enabled:
            return False
        elif channel == 'sms' and not prefs.sms_enabled:
            return False
        elif channel == 'email' and not prefs.email_enabled:
            return False
        
        # Check category preference
        if not prefs.categories.get(category, True):
            return False
        
        # Check quiet hours for push notifications
        if channel == 'push' and self.is_quiet_hours(prefs.quiet_hours):
            return False
        
        return True

🚦 Rate Limiting

import redis
import time

class RateLimiter:
    def __init__(self):
        self.redis_client = redis.Redis()
    
    def is_allowed(self, user_id, channel, limit_per_hour=10):
        key = f"rate_limit:{user_id}:{channel}"
        current_time = int(time.time())
        hour_window = current_time // 3600
        
        # Use sliding window counter
        pipe = self.redis_client.pipeline()
        pipe.zremrangebyscore(key, 0, current_time - 3600)
        pipe.zcard(key)
        pipe.zadd(key, {str(current_time): current_time})
        pipe.expire(key, 3600)
        
        results = pipe.execute()
        current_count = results[1]
        
        return current_count < limit_per_hour

📊 Delivery Tracking

from enum import Enum

class DeliveryStatus(Enum):
    PENDING = "pending"
    SENT = "sent"
    DELIVERED = "delivered"
    FAILED = "failed"
    CLICKED = "clicked"

class NotificationLog(Base):
    __tablename__ = 'notification_logs'
    
    id = Column(String, primary_key=True)
    user_id = Column(String, index=True)
    channel = Column(String)
    template_id = Column(String)
    status = Column(Enum(DeliveryStatus))
    sent_at = Column(DateTime)
    delivered_at = Column(DateTime)
    clicked_at = Column(DateTime)
    error_message = Column(String)

class DeliveryTracker:
    def track_sent(self, notification_id):
        self.update_status(notification_id, DeliveryStatus.SENT)
    
    def track_delivered(self, notification_id):
        self.update_status(notification_id, DeliveryStatus.DELIVERED)
    
    def track_clicked(self, notification_id):
        self.update_status(notification_id, DeliveryStatus.CLICKED)

5. Advanced Considerations

🔄 Retry Mechanism

import asyncio
from dataclasses import dataclass
from typing import Optional

@dataclass
class RetryConfig:
    max_attempts: int = 3
    base_delay: float = 1.0
    max_delay: float = 60.0
    exponential_backoff: bool = True

class RetryableNotificationSender:
    async def send_with_retry(self, notification, config: RetryConfig):
        for attempt in range(config.max_attempts):
            try:
                result = await self.send_notification(notification)
                if result.success:
                    return result
            except Exception as e:
                if attempt == config.max_attempts - 1:
                    raise e
                
                delay = self.calculate_delay(attempt, config)
                await asyncio.sleep(delay)
        
        raise Exception("Max retry attempts exceeded")
    
    def calculate_delay(self, attempt, config):
        if config.exponential_backoff:
            delay = config.base_delay * (2 ** attempt)
        else:
            delay = config.base_delay
        
        return min(delay, config.max_delay)

📈 Analytics and Monitoring

import prometheus_client
from datetime import datetime, timedelta

class NotificationMetrics:
    def __init__(self):
        self.sent_counter = prometheus_client.Counter(
            'notifications_sent_total',
            'Total notifications sent',
            ['channel', 'template']
        )
        self.delivery_rate = prometheus_client.Gauge(
            'notifications_delivery_rate',
            'Notification delivery rate',
            ['channel']
        )
        self.latency_histogram = prometheus_client.Histogram(
            'notification_delivery_latency_seconds',
            'Notification delivery latency',
            ['channel']
        )
    
    def record_sent(self, channel, template):
        self.sent_counter.labels(channel=channel, template=template).inc()
    
    def record_delivery_time(self, channel, latency_seconds):
        self.latency_histogram.labels(channel=channel).observe(latency_seconds)
    
    def update_delivery_rates(self):
        # Calculate delivery rates for the last hour
        for channel in ['push', 'sms', 'email']:
            rate = self.calculate_delivery_rate(channel)
            self.delivery_rate.labels(channel=channel).set(rate)

🌍 Global Distribution

class GlobalNotificationService:
    def __init__(self):
        self.regional_services = {
            'us-east': NotificationService('us-east'),
            'eu-west': NotificationService('eu-west'),
            'asia-pacific': NotificationService('asia-pacific')
        }
    
    def get_service_for_user(self, user_id):
        user_region = self.user_location_service.get_region(user_id)
        return self.regional_services.get(user_region, self.regional_services['us-east'])
    
    async def send_notification(self, user_id, notification):
        service = self.get_service_for_user(user_id)
        return await service.send(notification)

🔐 Security Considerations

import jwt
from cryptography.fernet import Fernet

class SecureNotificationService:
    def __init__(self):
        self.encryption_key = Fernet.generate_key()
        self.cipher = Fernet(self.encryption_key)
    
    def encrypt_sensitive_data(self, data):
        return self.cipher.encrypt(data.encode()).decode()
    
    def decrypt_sensitive_data(self, encrypted_data):
        return self.cipher.decrypt(encrypted_data.encode()).decode()
    
    def generate_unsubscribe_token(self, user_id, notification_type):
        payload = {
            'user_id': user_id,
            'notification_type': notification_type,
            'exp': datetime.utcnow() + timedelta(days=30)
        }
        return jwt.encode(payload, self.secret_key, algorithm='HS256')

6. Reference Materials

Real-World Examples

System Design Resources

Technical Deep Dives