Building a Serverless Data Pipeline with AWS Lambda and DynamoDB
In today's data-driven world, the ability to efficiently collect, process, and store information is crucial for businesses looking to gain insights and create value. At Zero One Technologies, I developed a serverless data pipeline to capture classified listings from ikman.lk (Sri Lanka's largest marketplace), process them into vector embeddings, and store them in Pinecone for semantic search capabilities.
Project Overview
The goal of this project was to create a fully automated, scalable system that could:
- Capture new and updated listings from ikman.lk in real-time
- Process the textual data into vector embeddings for semantic search
- Store these embeddings in a specialized vector database
- Provide a foundation for advanced search and recommendation features
Architecture Design
We chose a serverless architecture for its scalability, cost-efficiency, and minimal operational overhead. The system consists of several key components:
1. Data Collection
- Web Crawler: A separate service that scrapes ikman.lk and stores listings in DynamoDB
- DynamoDB Table: Primary storage for raw listing data with attributes like title, description, price, category, etc.
- DynamoDB Streams: Captures changes (new items, updates, deletions) to the table
2. Data Processing
- Lambda Trigger: Function triggered by DynamoDB Streams when new data arrives
- Text Processing: Cleans and prepares text data for embedding generation
- Embedding Generation: Converts text to vector embeddings using AI models
3. Data Storage
- Pinecone Vector Database: Specialized database for storing and querying vector embeddings
- Metadata Storage: Additional information stored alongside vectors for retrieval
4. Monitoring and Error Handling
- CloudWatch: Monitoring Lambda execution and setting up alerts
- Dead Letter Queue: Capturing failed processing attempts for later analysis
- Error Logging: Comprehensive logging for debugging and optimization
Implementation Details
DynamoDB Configuration
The DynamoDB table was configured with:
- Partition Key: Unique listing ID
- Sort Key: Timestamp for versioning
- Streams: Enabled with NEW_AND_OLD_IMAGES view type
- TTL: Set for data retention policies
- Capacity Mode: On-demand for variable workloads
Lambda Function Implementation
The core Lambda function was implemented in Python with several key components:
import json
import boto3
import os
import requests
from decimal import Decimal
# Initialize clients
dynamodb = boto3.resource('dynamodb')
pinecone_api_key = os.environ['PINECONE_API_KEY']
pinecone_environment = os.environ['PINECONE_ENVIRONMENT']
pinecone_index = os.environ['PINECONE_INDEX']
embedding_api_url = os.environ['EMBEDDING_API_URL']
def lambda_handler(event, context):
for record in event['Records']:
# Process only INSERT and MODIFY events
if record['eventName'] in ['INSERT', 'MODIFY']:
# Extract the new data
new_image = record['dynamodb']['NewImage']
# Convert DynamoDB format to regular JSON
listing_data = convert_dynamodb_to_json(new_image)
# Prepare text for embedding
text_to_embed = prepare_text(listing_data)
# Generate embedding vector
embedding_vector = generate_embedding(text_to_embed)
if embedding_vector:
# Store in Pinecone
store_in_pinecone(listing_data['id'], embedding_vector, listing_data)
return {
'statusCode': 200,
'body': json.dumps('Processing complete')
}
def convert_dynamodb_to_json(dynamodb_data):
# Convert DynamoDB data format to regular JSON
# Handle Decimal types and other DynamoDB specifics
# ...
def prepare_text(listing_data):
# Combine relevant fields for embedding
text = f"{listing_data.get('title', '')} {listing_data.get('description', '')}"
text += f" {listing_data.get('category', '')} {listing_data.get('location', '')}"
# Clean and normalize text
# ...
return text
def generate_embedding(text):
try:
# Call embedding API
response = requests.post(
embedding_api_url,
json={'text': text},
headers={'Content-Type': 'application/json'}
)
if response.status_code == 200:
return response.json()['embedding']
else:
print(f"Error generating embedding: {response.text}")
return None
except Exception as e:
print(f"Exception in generate_embedding: {str(e)}")
return None
def store_in_pinecone(id, vector, metadata):
try:
# Prepare request to Pinecone
pinecone_url = f"https://{pinecone_index}-{pinecone_environment}.svc.{pinecone_environment}.pinecone.io/vectors/upsert"
# Prepare metadata (exclude large fields)
filtered_metadata = {k: v for k, v in metadata.items()
if k not in ['description'] and isinstance(v, (str, int, float, bool))}
payload = {
"vectors": [{
"id": id,
"values": vector,
"metadata": filtered_metadata
}]
}
# Send to Pinecone
response = requests.post(
pinecone_url,
json=payload,
headers={
'Api-Key': pinecone_api_key,
'Content-Type': 'application/json'
}
)
if response.status_code != 200:
print(f"Error storing in Pinecone: {response.text}")
except Exception as e:
print(f"Exception in store_in_pinecone: {str(e)}")
Embedding Generation
For generating vector embeddings, we used:
- Model Selection: Sentence transformers for high-quality text embeddings
- API Approach: Separate microservice for embedding generation to keep Lambda functions lightweight
- Dimensionality: 768-dimensional vectors for optimal semantic representation
- Batching: Processing multiple items in batches when possible
Pinecone Integration
The Pinecone vector database was configured with:
- Index Type: Optimized for similarity search
- Metadata Fields: Key listing attributes for filtering
- Namespace Organization: Separate namespaces for different listing categories
- Upsert Strategy: Replace existing vectors when listings are updated
Challenges and Solutions
1. Lambda Execution Time Limits
Challenge: The default 3-second timeout was insufficient for processing large batches of records.
Solution: Increased timeout to 30 seconds and implemented batch processing with checkpointing to handle larger workloads.
2. DynamoDB Stream Processing
Challenge: Duplicate events and out-of-order processing in DynamoDB Streams.
Solution: Implemented idempotent processing and used timestamps to handle out-of-order events.
3. Cost Optimization
Challenge: Controlling costs for embedding generation and vector storage.
Solution: Implemented filtering to process only relevant listings and compressed metadata to reduce storage costs.
4. Error Handling
Challenge: Ensuring robustness against API failures and malformed data.
Solution: Implemented comprehensive try-except blocks, dead-letter queues, and retry mechanisms with exponential backoff.
Performance Optimization
Several optimizations were implemented to improve performance:
- Connection Reuse: Maintaining persistent connections to external services
- Memory Management: Careful handling of large objects to avoid Lambda memory issues
- Concurrent Processing: Processing multiple records in parallel where appropriate
- Selective Processing: Only generating new embeddings when text content changed significantly
Monitoring and Maintenance
To ensure reliable operation, we implemented:
- CloudWatch Dashboards: Visualizing key metrics like invocation count, duration, and error rates
- Alerts: Notifications for error thresholds and processing delays
- Regular Audits: Comparing DynamoDB and Pinecone counts to ensure data consistency
- Automated Testing: Regular end-to-end tests of the entire pipeline
Results and Benefits
The serverless data pipeline delivered several key benefits:
- Real-time Processing: New listings available for semantic search within seconds
- Cost Efficiency: Pay-per-use model significantly reduced infrastructure costs
- Scalability: Automatic scaling to handle variable load without manual intervention
- Maintainability: Simplified operations with minimal infrastructure management
- Enhanced Search Capabilities: Enabled semantic search across thousands of listings
Conclusion
Building a serverless data pipeline with AWS Lambda, DynamoDB, and Pinecone provided an efficient, scalable solution for capturing and processing classified listings data. The architecture's event-driven nature allowed for real-time processing while maintaining cost efficiency and operational simplicity.
This project demonstrates how modern serverless technologies can be combined to create powerful data processing systems without the operational overhead of traditional infrastructure. The resulting vector database enables advanced search and recommendation features that would be difficult to implement with conventional text-based search approaches.