📑 Table of Contents

  1. Problem Understanding
  2. Requirements & Scope
  3. High-Level Design
  4. Deep Dive
  5. Advanced Considerations
  6. Reference Materials

1. Problem Understanding

🧠 What is a Web Crawler?

A web crawler (also known as spider or robot) is a system that systematically browses the web to discover and download content. It starts with seed URLs and follows links to find new pages.

Basic Algorithm:

  1. Start with a set of seed URLs
  2. Download web pages from these URLs
  3. Extract new URLs from downloaded pages
  4. Add new URLs to the crawling queue
  5. Repeat the process

Common Use Cases:

🎯 Key Challenges


2. Requirements & Scope

🎯 Key Clarifying Questions

Purpose and Scale:

Content Handling:

Performance and Behavior:

📋 Functional Requirements

📊 Non-Functional Requirements

📐 Back-of-the-Envelope Estimation

Traffic and Scale:


3. High-Level Design

🏗️ System Architecture

[Seed URLs] → [URL Frontier] → [HTML Downloader] → [DNS Resolver]
                    ↑                ↓
              [URL Storage] ← [URL Seen?] ← [URL Filter] ← [URL Extractor]
                                    ↓                          ↑
                            [Content Storage] ← [Content Seen?] ← [Content Parser]

🔧 Core Components

1. Seed URLs

Starting points for the crawling process:

# Example seed URL strategies
geographic_seeds = [
    "https://en.wikipedia.org",      # Global
    "https://baidu.com",             # China
    "https://yahoo.co.jp"            # Japan
]

topic_seeds = [
    "https://amazon.com",            # E-commerce
    "https://stackoverflow.com",     # Technology
    "https://reuters.com"            # News
]

2. URL Frontier

FIFO queue storing URLs to be downloaded:

class URLFrontier:
    def __init__(self):
        self.queue = deque()
        self.seen_urls = set()
    
    def add_url(self, url):
        if url not in self.seen_urls:
            self.queue.append(url)
            self.seen_urls.add(url)
    
    def get_url(self):
        return self.queue.popleft() if self.queue else None

3. HTML Downloader

Downloads web pages with proper error handling:

import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

class HTMLDownloader:
    def __init__(self):
        self.session = requests.Session()
        retry_strategy = Retry(
            total=3,
            backoff_factor=1,
            status_forcelist=[429, 500, 502, 503, 504]
        )
        adapter = HTTPAdapter(max_retries=retry_strategy)
        self.session.mount("http://", adapter)
        self.session.mount("https://", adapter)
    
    def download(self, url):
        try:
            response = self.session.get(url, timeout=10)
            response.raise_for_status()
            return response.text
        except Exception as e:
            logging.error(f"Failed to download {url}: {e}")
            return None

4. Content Parser

Validates and parses HTML content:

from bs4 import BeautifulSoup
import html5lib

class ContentParser:
    def parse(self, html_content):
        try:
            soup = BeautifulSoup(html_content, 'html5lib')
            return soup
        except Exception as e:
            logging.error(f"Failed to parse HTML: {e}")
            return None
    
    def is_valid_html(self, html_content):
        try:
            BeautifulSoup(html_content, 'html5lib')
            return True
        except:
            return False

5. URL Extractor

Extracts and normalizes URLs from HTML:

from urllib.parse import urljoin, urlparse
import re

class URLExtractor:
    def extract_urls(self, html_content, base_url):
        soup = BeautifulSoup(html_content, 'html5lib')
        urls = []
        
        # Extract from anchor tags
        for link in soup.find_all('a', href=True):
            url = urljoin(base_url, link['href'])
            if self.is_valid_url(url):
                urls.append(url)
        
        return urls
    
    def is_valid_url(self, url):
        parsed = urlparse(url)
        return parsed.scheme in ['http', 'https'] and parsed.netloc

🔄 Crawling Workflow

Step-by-Step Process:

  1. Initialize: Add seed URLs to URL Frontier
  2. Fetch: HTML Downloader gets URLs from Frontier
  3. Resolve: DNS Resolver converts URLs to IP addresses
  4. Download: Fetch HTML content from web servers
  5. Parse: Content Parser validates and parses HTML
  6. Check Duplicates: Content Seen component checks for duplicates
  7. Store: Save unique content to Content Storage
  8. Extract Links: URL Extractor finds new URLs
  9. Filter: URL Filter removes unwanted URLs
  10. Deduplicate: URL Seen component removes duplicate URLs
  11. Queue: Add new URLs to URL Frontier

4. Deep Dive

🚦 URL Frontier Design

Politeness Requirements:

import time
from collections import defaultdict, deque

class PoliteFrontier:
    def __init__(self):
        self.host_queues = defaultdict(deque)
        self.host_last_access = defaultdict(float)
        self.min_delay = 1.0  # 1 second between requests
    
    def add_url(self, url):
        host = urlparse(url).netloc
        self.host_queues[host].append(url)
    
    def get_url(self):
        current_time = time.time()
        
        for host, queue in self.host_queues.items():
            if queue and (current_time - self.host_last_access[host]) >= self.min_delay:
                self.host_last_access[host] = current_time
                return queue.popleft()
        
        return None

Priority-Based Crawling:

import heapq

class PriorityFrontier:
    def __init__(self):
        self.priority_queue = []
        self.counter = 0
    
    def add_url(self, url, priority=1):
        # Lower number = higher priority
        heapq.heappush(self.priority_queue, (priority, self.counter, url))
        self.counter += 1
    
    def get_url(self):
        if self.priority_queue:
            _, _, url = heapq.heappop(self.priority_queue)
            return url
        return None

🔍 Duplicate Detection

Content Deduplication:

import hashlib

class ContentDeduplicator:
    def __init__(self):
        self.content_hashes = set()
    
    def is_duplicate(self, content):
        content_hash = hashlib.sha256(content.encode()).hexdigest()
        if content_hash in self.content_hashes:
            return True
        self.content_hashes.add(content_hash)
        return False

URL Deduplication with Bloom Filter:

import mmh3
from bitarray import bitarray

class BloomFilter:
    def __init__(self, size=1000000, hash_count=3):
        self.size = size
        self.hash_count = hash_count
        self.bit_array = bitarray(size)
        self.bit_array.setall(0)
    
    def add(self, item):
        for i in range(self.hash_count):
            index = mmh3.hash(item, i) % self.size
            self.bit_array[index] = 1
    
    def contains(self, item):
        for i in range(self.hash_count):
            index = mmh3.hash(item, i) % self.size
            if self.bit_array[index] == 0:
                return False
        return True

🗄️ Storage Systems

Content Storage Architecture:

class ContentStorage:
    def __init__(self):
        self.disk_storage = DiskStorage()
        self.cache = MemoryCache()
    
    def store(self, url, content):
        # Store on disk for persistence
        self.disk_storage.save(url, content)
        
        # Cache popular content
        if self.is_popular(url):
            self.cache.set(url, content)
    
    def retrieve(self, url):
        # Try cache first
        content = self.cache.get(url)
        if content:
            return content
        
        # Fallback to disk
        return self.disk_storage.load(url)

🤖 Robots.txt Handling

import urllib.robotparser

class RobotsChecker:
    def __init__(self):
        self.robots_cache = {}
    
    def can_crawl(self, url, user_agent="*"):
        parsed_url = urlparse(url)
        robots_url = f"{parsed_url.scheme}://{parsed_url.netloc}/robots.txt"
        
        if robots_url not in self.robots_cache:
            rp = urllib.robotparser.RobotFileParser()
            rp.set_url(robots_url)
            try:
                rp.read()
                self.robots_cache[robots_url] = rp
            except:
                return True  # Allow if robots.txt unavailable
        
        return self.robots_cache[robots_url].can_fetch(user_agent, url)

5. Advanced Considerations

🌐 Distributed Crawling

Consistent Hashing for URL Distribution:

import hashlib

class DistributedCrawler:
    def __init__(self, crawler_nodes):
        self.nodes = sorted(crawler_nodes)
    
    def get_crawler_for_url(self, url):
        url_hash = int(hashlib.md5(url.encode()).hexdigest(), 16)
        node_index = url_hash % len(self.nodes)
        return self.nodes[node_index]
    
    def should_crawl_url(self, url, current_node):
        assigned_node = self.get_crawler_for_url(url)
        return assigned_node == current_node

📊 Performance Optimization

Connection Pooling:

import requests
from requests.adapters import HTTPAdapter

class OptimizedDownloader:
    def __init__(self):
        self.session = requests.Session()
        adapter = HTTPAdapter(pool_connections=100, pool_maxsize=100)
        self.session.mount('http://', adapter)
        self.session.mount('https://', adapter)

Asynchronous Crawling:

import asyncio
import aiohttp

class AsyncCrawler:
    def __init__(self, max_concurrent=100):
        self.semaphore = asyncio.Semaphore(max_concurrent)
    
    async def crawl_url(self, session, url):
        async with self.semaphore:
            try:
                async with session.get(url) as response:
                    return await response.text()
            except Exception as e:
                logging.error(f"Failed to crawl {url}: {e}")
                return None
    
    async def crawl_urls(self, urls):
        async with aiohttp.ClientSession() as session:
            tasks = [self.crawl_url(session, url) for url in urls]
            return await asyncio.gather(*tasks)

🔒 Security and Safety

URL Filtering:

import re
from urllib.parse import urlparse

class URLFilter:
    def __init__(self):
        self.blocked_domains = {'malware.com', 'spam.net'}
        self.blocked_extensions = {'.exe', '.dmg', '.zip'}
        self.max_url_length = 2048
    
    def is_valid_url(self, url):
        parsed = urlparse(url)
        
        # Check domain blacklist
        if parsed.netloc.lower() in self.blocked_domains:
            return False
        
        # Check file extensions
        path = parsed.path.lower()
        if any(path.endswith(ext) for ext in self.blocked_extensions):
            return False
        
        # Check URL length
        if len(url) > self.max_url_length:
            return False
        
        return True

📈 Monitoring and Analytics

Crawler Metrics:

import time
from collections import defaultdict

class CrawlerMetrics:
    def __init__(self):
        self.start_time = time.time()
        self.pages_crawled = 0
        self.errors = defaultdict(int)
        self.response_times = []
    
    def record_success(self, response_time):
        self.pages_crawled += 1
        self.response_times.append(response_time)
    
    def record_error(self, error_type):
        self.errors[error_type] += 1
    
    def get_stats(self):
        elapsed = time.time() - self.start_time
        return {
            'pages_per_second': self.pages_crawled / elapsed,
            'total_pages': self.pages_crawled,
            'error_rate': sum(self.errors.values()) / (self.pages_crawled + sum(self.errors.values())),
            'avg_response_time': sum(self.response_times) / len(self.response_times)
        }

🔄 Freshness and Re-crawling

Priority-Based Refresh:

import time
from enum import Enum

class PageType(Enum):
    NEWS = 1        # Re-crawl every hour
    SOCIAL = 6      # Re-crawl every 6 hours  
    GENERAL = 24    # Re-crawl every day
    ARCHIVE = 168   # Re-crawl every week

class FreshnesManager:
    def __init__(self):
        self.last_crawled = {}
        self.page_types = {}
    
    def should_recrawl(self, url):
        if url not in self.last_crawled:
            return True
        
        page_type = self.page_types.get(url, PageType.GENERAL)
        hours_since_crawl = (time.time() - self.last_crawled[url]) / 3600
        
        return hours_since_crawl >= page_type.value
    
    def mark_crawled(self, url, page_type=PageType.GENERAL):
        self.last_crawled[url] = time.time()
        self.page_types[url] = page_type

6. Reference Materials

Foundational Papers

Industry Implementations

Technical Resources

System Design and Architecture