Building a Serverless Data Pipeline with AWS Lambda and DynamoDB

July 10, 2024
5 min read

Building a Serverless Data Pipeline with AWS Lambda and DynamoDB

In today's data-driven world, the ability to efficiently collect, process, and store information is crucial for businesses looking to gain insights and create value. At Zero One Technologies, I developed a serverless data pipeline to capture classified listings from ikman.lk (Sri Lanka's largest marketplace), process them into vector embeddings, and store them in Pinecone for semantic search capabilities.

Project Overview

The goal of this project was to create a fully automated, scalable system that could:

  • Capture new and updated listings from ikman.lk in real-time
  • Process the textual data into vector embeddings for semantic search
  • Store these embeddings in a specialized vector database
  • Provide a foundation for advanced search and recommendation features

Architecture Design

We chose a serverless architecture for its scalability, cost-efficiency, and minimal operational overhead. The system consists of several key components:

1. Data Collection

  • Web Crawler: A separate service that scrapes ikman.lk and stores listings in DynamoDB
  • DynamoDB Table: Primary storage for raw listing data with attributes like title, description, price, category, etc.
  • DynamoDB Streams: Captures changes (new items, updates, deletions) to the table

2. Data Processing

  • Lambda Trigger: Function triggered by DynamoDB Streams when new data arrives
  • Text Processing: Cleans and prepares text data for embedding generation
  • Embedding Generation: Converts text to vector embeddings using AI models

3. Data Storage

  • Pinecone Vector Database: Specialized database for storing and querying vector embeddings
  • Metadata Storage: Additional information stored alongside vectors for retrieval

4. Monitoring and Error Handling

  • CloudWatch: Monitoring Lambda execution and setting up alerts
  • Dead Letter Queue: Capturing failed processing attempts for later analysis
  • Error Logging: Comprehensive logging for debugging and optimization

Implementation Details

DynamoDB Configuration

The DynamoDB table was configured with:

  • Partition Key: Unique listing ID
  • Sort Key: Timestamp for versioning
  • Streams: Enabled with NEW_AND_OLD_IMAGES view type
  • TTL: Set for data retention policies
  • Capacity Mode: On-demand for variable workloads

Lambda Function Implementation

The core Lambda function was implemented in Python with several key components:

import json
import boto3
import os
import requests
from decimal import Decimal

# Initialize clients
dynamodb = boto3.resource('dynamodb')
pinecone_api_key = os.environ['PINECONE_API_KEY']
pinecone_environment = os.environ['PINECONE_ENVIRONMENT']
pinecone_index = os.environ['PINECONE_INDEX']
embedding_api_url = os.environ['EMBEDDING_API_URL']

def lambda_handler(event, context):
    for record in event['Records']:
        # Process only INSERT and MODIFY events
        if record['eventName'] in ['INSERT', 'MODIFY']:
            # Extract the new data
            new_image = record['dynamodb']['NewImage']
            
            # Convert DynamoDB format to regular JSON
            listing_data = convert_dynamodb_to_json(new_image)
            
            # Prepare text for embedding
            text_to_embed = prepare_text(listing_data)
            
            # Generate embedding vector
            embedding_vector = generate_embedding(text_to_embed)
            
            if embedding_vector:
                # Store in Pinecone
                store_in_pinecone(listing_data['id'], embedding_vector, listing_data)
                
    return {
        'statusCode': 200,
        'body': json.dumps('Processing complete')
    }

def convert_dynamodb_to_json(dynamodb_data):
    # Convert DynamoDB data format to regular JSON
    # Handle Decimal types and other DynamoDB specifics
    # ...

def prepare_text(listing_data):
    # Combine relevant fields for embedding
    text = f"{listing_data.get('title', '')} {listing_data.get('description', '')}"
    text += f" {listing_data.get('category', '')} {listing_data.get('location', '')}"
    
    # Clean and normalize text
    # ...
    
    return text

def generate_embedding(text):
    try:
        # Call embedding API
        response = requests.post(
            embedding_api_url,
            json={'text': text},
            headers={'Content-Type': 'application/json'}
        )
        
        if response.status_code == 200:
            return response.json()['embedding']
        else:
            print(f"Error generating embedding: {response.text}")
            return None
    except Exception as e:
        print(f"Exception in generate_embedding: {str(e)}")
        return None

def store_in_pinecone(id, vector, metadata):
    try:
        # Prepare request to Pinecone
        pinecone_url = f"https://{pinecone_index}-{pinecone_environment}.svc.{pinecone_environment}.pinecone.io/vectors/upsert"
        
        # Prepare metadata (exclude large fields)
        filtered_metadata = {k: v for k, v in metadata.items() 
                            if k not in ['description'] and isinstance(v, (str, int, float, bool))}

        payload = {
            "vectors": [{
                "id": id,
                "values": vector,
                "metadata": filtered_metadata
            }]
        }

        # Send to Pinecone
        response = requests.post(
            pinecone_url,
            json=payload,
            headers={
                'Api-Key': pinecone_api_key,
                'Content-Type': 'application/json'
            }
        )

        if response.status_code != 200:
            print(f"Error storing in Pinecone: {response.text}")

    except Exception as e:
        print(f"Exception in store_in_pinecone: {str(e)}")

Embedding Generation

For generating vector embeddings, we used:

  • Model Selection: Sentence transformers for high-quality text embeddings
  • API Approach: Separate microservice for embedding generation to keep Lambda functions lightweight
  • Dimensionality: 768-dimensional vectors for optimal semantic representation
  • Batching: Processing multiple items in batches when possible

Pinecone Integration

The Pinecone vector database was configured with:

  • Index Type: Optimized for similarity search
  • Metadata Fields: Key listing attributes for filtering
  • Namespace Organization: Separate namespaces for different listing categories
  • Upsert Strategy: Replace existing vectors when listings are updated

Challenges and Solutions

1. Lambda Execution Time Limits

Challenge: The default 3-second timeout was insufficient for processing large batches of records.

Solution: Increased timeout to 30 seconds and implemented batch processing with checkpointing to handle larger workloads.

2. DynamoDB Stream Processing

Challenge: Duplicate events and out-of-order processing in DynamoDB Streams.

Solution: Implemented idempotent processing and used timestamps to handle out-of-order events.

3. Cost Optimization

Challenge: Controlling costs for embedding generation and vector storage.

Solution: Implemented filtering to process only relevant listings and compressed metadata to reduce storage costs.

4. Error Handling

Challenge: Ensuring robustness against API failures and malformed data.

Solution: Implemented comprehensive try-except blocks, dead-letter queues, and retry mechanisms with exponential backoff.

Performance Optimization

Several optimizations were implemented to improve performance:

  • Connection Reuse: Maintaining persistent connections to external services
  • Memory Management: Careful handling of large objects to avoid Lambda memory issues
  • Concurrent Processing: Processing multiple records in parallel where appropriate
  • Selective Processing: Only generating new embeddings when text content changed significantly

Monitoring and Maintenance

To ensure reliable operation, we implemented:

  • CloudWatch Dashboards: Visualizing key metrics like invocation count, duration, and error rates
  • Alerts: Notifications for error thresholds and processing delays
  • Regular Audits: Comparing DynamoDB and Pinecone counts to ensure data consistency
  • Automated Testing: Regular end-to-end tests of the entire pipeline

Results and Benefits

The serverless data pipeline delivered several key benefits:

  • Real-time Processing: New listings available for semantic search within seconds
  • Cost Efficiency: Pay-per-use model significantly reduced infrastructure costs
  • Scalability: Automatic scaling to handle variable load without manual intervention
  • Maintainability: Simplified operations with minimal infrastructure management
  • Enhanced Search Capabilities: Enabled semantic search across thousands of listings

Conclusion

Building a serverless data pipeline with AWS Lambda, DynamoDB, and Pinecone provided an efficient, scalable solution for capturing and processing classified listings data. The architecture's event-driven nature allowed for real-time processing while maintaining cost efficiency and operational simplicity.

This project demonstrates how modern serverless technologies can be combined to create powerful data processing systems without the operational overhead of traditional infrastructure. The resulting vector database enables advanced search and recommendation features that would be difficult to implement with conventional text-based search approaches.