📑 Table of Contents

  1. Problem Understanding
  2. Requirements & Scope
  3. High-Level Design
  4. Deep Dive
  5. Advanced Considerations
  6. Reference Materials

1. Problem Understanding

🧠 What is a News Feed System?

A news feed system displays a personalized stream of content from friends, pages, and groups that a user follows. It’s the core feature of social media platforms like Facebook, Twitter, and Instagram.

Key Components:

Common Use Cases:


2. Requirements & Scope

🎯 Key Clarifying Questions

Core Functionality:

User Interaction:

Scale and Performance:

📋 Functional Requirements

📊 Non-Functional Requirements

📐 Back-of-the-Envelope Estimation

User and Content Metrics:


3. High-Level Design

🏗️ System Architecture

[Mobile/Web Clients] → [Load Balancer] → [API Gateway]
                                            ↓
[Feed Service] ← [Post Service] ← [User Service] ← [Notification Service]
     ↓              ↓                ↓                    ↓
[Feed Cache] ← [Post Storage] ← [User Database] ← [Message Queue]
     ↓              ↓                ↓
[Feed Database] ← [Media Storage] ← [Analytics Service]

🔧 Core Components

1. Feed Generation Strategies

Pull Model (Fan-out on Read):

class PullFeedService:
    def generate_feed(self, user_id, limit=20):
        # Get user's following list
        following = self.user_service.get_following(user_id)
        
        # Fetch recent posts from each followed user
        all_posts = []
        for followed_user in following:
            posts = self.post_service.get_recent_posts(followed_user, limit=100)
            all_posts.extend(posts)
        
        # Sort by timestamp and apply ranking
        ranked_posts = self.ranking_service.rank_posts(all_posts, user_id)
        
        return ranked_posts[:limit]

Push Model (Fan-out on Write):

class PushFeedService:
    def publish_post(self, user_id, post):
        # Save the post
        post_id = self.post_service.create_post(post)
        
        # Get user's followers
        followers = self.user_service.get_followers(user_id)
        
        # Fan-out to all followers' feeds
        for follower_id in followers:
            self.feed_cache.add_to_feed(follower_id, post_id)
        
        # Trigger notifications
        self.notification_service.notify_followers(user_id, post_id, followers)

Hybrid Model:

class HybridFeedService:
    def __init__(self):
        self.celebrity_threshold = 1000000  # 1M followers
    
    def publish_post(self, user_id, post):
        post_id = self.post_service.create_post(post)
        followers = self.user_service.get_followers(user_id)
        
        if len(followers) > self.celebrity_threshold:
            # For celebrities, use pull model
            self.celebrity_post_cache.add(user_id, post_id)
        else:
            # For regular users, use push model
            for follower_id in followers:
                self.feed_cache.add_to_feed(follower_id, post_id)
    
    def generate_feed(self, user_id):
        # Get pre-computed feed from cache
        cached_feed = self.feed_cache.get_feed(user_id)
        
        # Add posts from celebrities user follows
        celebrities = self.user_service.get_celebrity_following(user_id)
        for celebrity_id in celebrities:
            celebrity_posts = self.celebrity_post_cache.get_recent_posts(celebrity_id)
            cached_feed.extend(celebrity_posts)
        
        return self.ranking_service.rank_posts(cached_feed, user_id)

2. Post Service

from dataclasses import dataclass
from datetime import datetime
from typing import List, Optional

@dataclass
class Post:
    id: str
    user_id: str
    content: str
    media_urls: List[str]
    created_at: datetime
    privacy_level: str  # 'public', 'friends', 'private'
    
class PostService:
    def create_post(self, user_id: str, content: str, media_urls: List[str] = None):
        post = Post(
            id=self.generate_id(),
            user_id=user_id,
            content=content,
            media_urls=media_urls or [],
            created_at=datetime.now(),
            privacy_level='friends'
        )
        
        # Store in database
        self.post_repository.save(post)
        
        # Update user's timeline
        self.timeline_service.add_to_timeline(user_id, post.id)
        
        # Trigger feed generation
        self.feed_service.publish_post(user_id, post)
        
        return post.id
    
    def get_recent_posts(self, user_id: str, limit: int = 50):
        return self.post_repository.get_user_posts(user_id, limit)

3. Feed Ranking Service

import numpy as np
from datetime import datetime, timedelta

class FeedRankingService:
    def __init__(self):
        self.weights = {
            'recency': 0.3,
            'engagement': 0.4,
            'affinity': 0.2,
            'content_type': 0.1
        }
    
    def rank_posts(self, posts: List[Post], user_id: str) -> List[Post]:
        scored_posts = []
        
        for post in posts:
            score = self.calculate_score(post, user_id)
            scored_posts.append((post, score))
        
        # Sort by score descending
        scored_posts.sort(key=lambda x: x[1], reverse=True)
        
        return [post for post, score in scored_posts]
    
    def calculate_score(self, post: Post, user_id: str) -> float:
        # Recency score (newer posts get higher scores)
        hours_ago = (datetime.now() - post.created_at).total_seconds() / 3600
        recency_score = max(0, 1 - hours_ago / 24)  # Decay over 24 hours
        
        # Engagement score (likes, comments, shares)
        engagement_score = self.get_engagement_score(post.id)
        
        # User affinity score (interaction history)
        affinity_score = self.get_user_affinity(user_id, post.user_id)
        
        # Content type preference
        content_score = self.get_content_type_score(post, user_id)
        
        final_score = (
            self.weights['recency'] * recency_score +
            self.weights['engagement'] * engagement_score +
            self.weights['affinity'] * affinity_score +
            self.weights['content_type'] * content_score
        )
        
        return final_score

4. Deep Dive

🗄️ Database Design

Posts Table:

CREATE TABLE posts (
    id VARCHAR(20) PRIMARY KEY,
    user_id VARCHAR(20) NOT NULL,
    content TEXT,
    media_urls JSON,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    privacy_level ENUM('public', 'friends', 'private') DEFAULT 'friends',
    INDEX idx_user_created (user_id, created_at),
    INDEX idx_created_at (created_at)
);

User Relationships:

CREATE TABLE user_relationships (
    follower_id VARCHAR(20),
    followed_id VARCHAR(20),
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    PRIMARY KEY (follower_id, followed_id),
    INDEX idx_follower (follower_id),
    INDEX idx_followed (followed_id)
);

Feed Cache:

CREATE TABLE feed_cache (
    user_id VARCHAR(20),
    post_id VARCHAR(20),
    score DECIMAL(10,6),
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    PRIMARY KEY (user_id, post_id),
    INDEX idx_user_score (user_id, score DESC)
);

Caching Strategy

import redis
import json
from typing import List

class FeedCacheService:
    def __init__(self):
        self.redis_client = redis.Redis()
        self.cache_ttl = 3600  # 1 hour
        self.max_feed_size = 1000
    
    def get_feed(self, user_id: str) -> List[str]:
        key = f"feed:{user_id}"
        cached_feed = self.redis_client.lrange(key, 0, -1)
        return [post_id.decode() for post_id in cached_feed]
    
    def add_to_feed(self, user_id: str, post_id: str):
        key = f"feed:{user_id}"
        pipe = self.redis_client.pipeline()
        
        # Add to beginning of list
        pipe.lpush(key, post_id)
        
        # Keep only recent items
        pipe.ltrim(key, 0, self.max_feed_size - 1)
        
        # Set expiration
        pipe.expire(key, self.cache_ttl)
        
        pipe.execute()
    
    def update_feed_score(self, user_id: str, post_id: str, score: float):
        # Use sorted set for ranked feeds
        key = f"ranked_feed:{user_id}"
        self.redis_client.zadd(key, {post_id: score})
        self.redis_client.expire(key, self.cache_ttl)
    
    def get_ranked_feed(self, user_id: str, limit: int = 20) -> List[str]:
        key = f"ranked_feed:{user_id}"
        # Get highest scored posts
        return self.redis_client.zrevrange(key, 0, limit - 1)

🔄 Real-time Updates

WebSocket Integration:

import asyncio
import websockets
import json

class FeedWebSocketService:
    def __init__(self):
        self.connections = {}  # user_id -> websocket
    
    async def handle_connection(self, websocket, user_id):
        self.connections[user_id] = websocket
        try:
            await websocket.wait_closed()
        finally:
            del self.connections[user_id]
    
    async def broadcast_new_post(self, author_id: str, post_id: str):
        # Get author's followers
        followers = await self.user_service.get_followers(author_id)
        
        # Send update to online followers
        message = {
            "type": "new_post",
            "post_id": post_id,
            "author_id": author_id
        }
        
        for follower_id in followers:
            if follower_id in self.connections:
                try:
                    await self.connections[follower_id].send(json.dumps(message))
                except websockets.exceptions.ConnectionClosed:
                    del self.connections[follower_id]

Server-Sent Events (SSE):

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio
import json

@app.get("/api/v1/feed/stream")
async def feed_stream(user_id: str):
    async def event_generator():
        while True:
            # Check for new posts in user's feed
            new_posts = await feed_service.get_new_posts_since_last_check(user_id)
            
            if new_posts:
                for post in new_posts:
                    yield f"data: {json.dumps(post.to_dict())}\n\n"
            
            await asyncio.sleep(1)  # Check every second
    
    return StreamingResponse(event_generator(), media_type="text/plain")

📱 Mobile Optimization

class MobileFeedService:
    def __init__(self):
        self.image_cdn = "https://cdn.example.com/"
        self.video_cdn = "https://video-cdn.example.com/"
    
    def optimize_feed_for_mobile(self, feed_posts: List[Post]) -> List[dict]:
        optimized_feed = []
        
        for post in feed_posts:
            mobile_post = {
                "id": post.id,
                "user_id": post.user_id,
                "content": self.truncate_content(post.content, 200),
                "created_at": post.created_at.isoformat(),
                "media": self.optimize_media_urls(post.media_urls)
            }
            optimized_feed.append(mobile_post)
        
        return optimized_feed
    
    def optimize_media_urls(self, media_urls: List[str]) -> List[dict]:
        optimized_media = []
        
        for url in media_urls:
            if self.is_image(url):
                optimized_media.append({
                    "type": "image",
                    "thumbnail": f"{self.image_cdn}/thumb/{url}",
                    "full": f"{self.image_cdn}/full/{url}"
                })
            elif self.is_video(url):
                optimized_media.append({
                    "type": "video",
                    "poster": f"{self.video_cdn}/poster/{url}",
                    "stream": f"{self.video_cdn}/stream/{url}"
                })
        
        return optimized_media

5. Advanced Considerations

🤖 Machine Learning Integration

import numpy as np
from sklearn.linear_model import LogisticRegression

class MLFeedRanking:
    def __init__(self):
        self.model = LogisticRegression()
        self.feature_extractors = [
            self.extract_user_features,
            self.extract_post_features,
            self.extract_interaction_features
        ]
    
    def extract_features(self, user_id: str, post: Post) -> np.array:
        features = []
        
        for extractor in self.feature_extractors:
            feature_vector = extractor(user_id, post)
            features.extend(feature_vector)
        
        return np.array(features)
    
    def extract_user_features(self, user_id: str, post: Post) -> List[float]:
        user_profile = self.user_service.get_user_profile(user_id)
        
        return [
            user_profile.age / 100.0,  # Normalized age
            1.0 if user_profile.location == post.author_location else 0.0,
            user_profile.activity_score,
            user_profile.content_preference_score
        ]
    
    def predict_engagement(self, user_id: str, posts: List[Post]) -> List[float]:
        scores = []
        
        for post in posts:
            features = self.extract_features(user_id, post)
            score = self.model.predict_proba([features])[0][1]  # Probability of engagement
            scores.append(score)
        
        return scores

🔐 Privacy and Security

class PrivacyService:
    def filter_posts_by_privacy(self, viewer_id: str, posts: List[Post]) -> List[Post]:
        filtered_posts = []
        
        for post in posts:
            if self.can_view_post(viewer_id, post):
                filtered_posts.append(post)
        
        return filtered_posts
    
    def can_view_post(self, viewer_id: str, post: Post) -> bool:
        if post.privacy_level == 'public':
            return True
        
        if post.privacy_level == 'private' and viewer_id != post.user_id:
            return False
        
        if post.privacy_level == 'friends':
            return self.user_service.are_friends(viewer_id, post.user_id)
        
        return False
    
    def sanitize_content(self, content: str) -> str:
        # Remove potentially harmful content
        import re
        
        # Remove script tags
        content = re.sub(r'<script.*?</script>', '', content, flags=re.DOTALL)
        
        # Remove suspicious links
        content = re.sub(r'https?://suspicious-domain\.com\S+', '[LINK REMOVED]', content)
        
        return content

📊 Analytics and Monitoring

import time
from collections import defaultdict

class FeedAnalytics:
    def __init__(self):
        self.metrics = defaultdict(int)
        self.timing_metrics = defaultdict(list)
    
    def track_feed_generation_time(self, user_id: str, duration: float):
        self.timing_metrics['feed_generation'].append(duration)
        
        if duration > 0.1:  # Slow feed generation
            self.metrics['slow_feed_generation'] += 1
    
    def track_engagement(self, post_id: str, action: str):
        self.metrics[f'engagement_{action}'] += 1
        self.metrics[f'post_{post_id}_{action}'] += 1
    
    def get_feed_performance_stats(self) -> dict:
        generation_times = self.timing_metrics['feed_generation']
        
        return {
            'avg_generation_time': sum(generation_times) / len(generation_times),
            'p95_generation_time': np.percentile(generation_times, 95),
            'slow_generation_count': self.metrics['slow_feed_generation'],
            'total_engagements': sum(v for k, v in self.metrics.items() if k.startswith('engagement_'))
        }

🌍 Global Scale Considerations

class GlobalFeedService:
    def __init__(self):
        self.regional_feeds = {
            'us-east': FeedService('us-east'),
            'us-west': FeedService('us-west'),
            'eu-west': FeedService('eu-west'),
            'asia-pacific': FeedService('asia-pacific')
        }
        self.cross_region_replication = CrossRegionReplicator()
    
    def get_user_region(self, user_id: str) -> str:
        # Determine user's region based on location or preference
        user_profile = self.user_service.get_user_profile(user_id)
        return user_profile.preferred_region or 'us-east'
    
    def generate_feed(self, user_id: str) -> List[Post]:
        region = self.get_user_region(user_id)
        regional_service = self.regional_feeds[region]
        
        # Generate feed from user's region
        feed = regional_service.generate_feed(user_id)
        
        # Add popular global content
        global_content = self.get_trending_global_content()
        feed.extend(global_content)
        
        return self.ranking_service.rank_posts(feed, user_id)

6. Reference Materials

Real-World Examples

System Design Resources

Technical Papers