Tutorial 2: Data Pipeline Automation
Build a production-ready data pipeline that extracts customer data from an API, transforms it, loads it into PostgreSQL, and generates automated reports.
Duration: 90 minutes Difficulty: Intermediate Prerequisites: Tutorial 1 completed, basic SQL knowledge
Table of Contents
- What You'll Build
- Prerequisites
- Part 1: Setup Database Environment
- Part 2: Create Pipeline Workflow
- Part 3: Extract Data from API
- Part 4: Batch Processing
- Part 5: Data Transformation
- Part 6: Load to Database
- Part 7: Generate Reports
- Part 8: Error Handling & Monitoring
- Part 9: Production Deployment
- Advanced Topics
- Troubleshooting
What You'll Build
A complete ETL (Extract, Transform, Load) pipeline that:
- ✅ Extracts customer data from REST API (hourly)
- ✅ Processes large datasets with batching
- ✅ Transforms and validates data
- ✅ Loads data to PostgreSQL with upsert logic
- ✅ Tracks sync status and handles errors
- ✅ Generates daily summary reports
- ✅ Sends alerts on failures
- ✅ Production-ready with monitoring
Architecture Diagram:
What You'll Learn:
- Database connections and operations
- Batch processing for scalability
- Data validation and cleansing
- Upsert (insert or update) patterns
- Error handling strategies
- Incremental sync logic
- Performance optimization
- Production monitoring
- Report generation
Prerequisites
Required Knowledge
- ✅ Tutorial 1 completed (n8n basics)
- ✅ Basic SQL (SELECT, INSERT, UPDATE)
- ✅ Understanding of JSON
- ✅ Familiarity with REST APIs
Required Tools
1. n8n Running
2. PostgreSQL Database
We'll set up PostgreSQL via Docker:
3. Mock API (We'll use JSONPlaceholder)
Free test API: https://jsonplaceholder.typicode.com/
- No API key required
- Provides sample user data
- Perfect for learning
Optional Tools
- Database Client: DBeaver, pgAdmin, or TablePlus
- API Client: Postman or Insomnia (for testing)
- Slack: For notifications (or use Email)
Part 1: Setup Database Environment
Step 1.1: Start PostgreSQL
Step 1.2: Create Database Schema
Connect to database and create tables:
Execute this SQL:
Step 1.3: Insert Sample Data (Optional)
Step 1.4: Configure n8n PostgreSQL Credential
- In n8n, go to Credentials (left sidebar)
- Click "Create New"
- Search for "PostgreSQL"
- Fill in connection details:
- Host:
localhost(or Docker network if needed) - Database:
customers_db - User:
n8n_user - Password:
n8n_password - Port:
5432 - SSL Mode:
Disable(for local testing)
- Host:
- Test Connection
- Save as "PostgreSQL - Customers DB"
Docker Network Note: If n8n and PostgreSQL are in different containers:
Part 2: Create Pipeline Workflow
Step 2.1: Create New Workflow
- Click "New Workflow"
- Rename to:
Customer Data Pipeline - Add sticky note:
Step 2.2: Add Schedule Trigger
- Add "Schedule Trigger" node
- Configure:
- Trigger Interval: Hours
- Hours Between Triggers: 1 (hourly)
- Trigger at Minute: 0 (top of hour)
For testing: Set to "Every 5 minutes" initially
Part 3: Extract Data from API
Step 3.1: Log Pipeline Start
Add PostgreSQL node after trigger:
- Name it: "Log Pipeline Start"
- Operation: Insert
- Table:
sync_logs - Columns:
This creates an audit trail for each run!
Step 3.2: Fetch Customer Data
Add HTTP Request node:
- Name: "Fetch Customers from API"
- Method: GET
- URL:
Response Format: JSON
For Production APIs:
- Add authentication (API key, OAuth)
- Add pagination if needed
- Set timeouts appropriately
Step 3.3: Test API Call
Execute the node - you should get 10 users:
Part 4: Batch Processing
For large datasets (1000s+ records), process in batches to avoid memory issues.
Step 4.1: Add Split In Batches Node
After HTTP Request, add "Split In Batches":
- Name: "Process in Batches"
- Batch Size: 100
- Options:
- Reset: ✓ (important for loops)
This splits data into chunks of 100 records.
Step 4.2: Understanding Batching
Why batch?
- Prevents memory overflow
- Better error recovery (partial success)
- Faster database operations (bulk inserts)
- More predictable performance
How it works:
- First execution: Processes first 100 records
- Loops back until all records processed
- Each batch is independent
Part 5: Data Transformation
Step 5.1: Add Code Node
After "Split In Batches", add Code node:
Name: "Transform & Validate Data"
Step 5.2: Understanding the Transformation
Key operations:
-
Validation:
-
Data Cleaning:
-
Flattening Nested JSON:
-
Null Handling:
Step 5.3: Add Data Quality Checks
Enhance validation:
Part 6: Load to Database
Step 6.1: Add PostgreSQL Upsert Node
After transformation, add PostgreSQL node:
Name: "Upsert Customers to Database"
Operation: Execute Query
Query:
Important: Use PostgreSQL Insert operation instead for easier mapping.
Step 6.2: Configure Insert with Conflict Resolution
Better approach - use Insert operation:
- Operation: Insert
- Table:
customers - Columns: Map all fields:
id→{{ $json.id }}name→{{ $json.name }}email→{{ $json.email }}- etc.
- Options:
- Insert Mode: Upsert
- Conflict Columns:
id - Update Columns: All fields
This automatically handles insert or update!
Step 6.3: Loop Back for Next Batch
After PostgreSQL node:
- Connect it back to "Split In Batches" node
- Creates a loop that processes all batches
Flow:
Part 7: Generate Reports
After all batches complete, generate a summary report.
Step 7.1: Calculate Statistics
Add Code node after loop completes:
Name: "Generate Summary Report"
Step 7.2: Query Database Statistics
Add PostgreSQL node:
Name: "Get Database Stats"
Operation: Execute Query Query:
Step 7.3: Format Report Message
Add Code node:
Name: "Format Report"
Step 7.4: Send Report
Add Slack or Email node:
For Slack:
- Channel: #data-pipelines
- Message:
={{ $json.message }}
For Email:
- To: data-team@company.com
- Subject:
Customer Sync Report - {{ $now.format('YYYY-MM-DD HH:mm') }} - Text:
={{ $json.message }}
Part 8: Error Handling & Monitoring
Step 8.1: Create Error Workflow
- Create new workflow: "Pipeline Error Handler"
- Add Error Trigger
- Add Code to format error:
- Send to Slack/Email with high priority
Step 8.2: Log Errors to Database
In error workflow, add PostgreSQL node:
Operation: Update
Table: sync_logs
Query:
Step 8.3: Set Error Workflow
In main pipeline:
- Workflow Settings (⚙️)
- Error Workflow: Select "Pipeline Error Handler"
- Save
Now errors are automatically caught and logged!
Step 8.4: Add Continue On Fail
For non-critical nodes:
- Select node (e.g., "Send Report")
- Settings tab
- Continue On Fail: ✓
Pipeline won't stop if Slack is down.
Part 9: Production Deployment
Step 9.1: Update Sync Log on Success
Before sending report, add PostgreSQL node:
Name: "Log Sync Success"
Operation: Update Query:
Step 9.2: Add Monitoring Dashboard Query
Create view for monitoring:
Query this for daily health check!
Step 9.3: Set Up Alerting Thresholds
Add IF node before sending report:
Condition:
True: Send high-priority alert (something's wrong) False: Normal report
Step 9.4: Implement Incremental Sync (Optional)
For APIs that support it:
-
Store last sync timestamp in database:
-
Query before API call:
-
Use in API request:
-
Update after successful sync
This fetches only changed records!
Advanced Topics
Incremental Sync Pattern
Full implementation:
Data Deduplication
Parallel Processing
For multiple data sources:
Data Retention Policy
Add as scheduled workflow.
Troubleshooting
Issue: Database Connection Fails
Error: Connection refused or Timeout
Solutions:
Issue: Batch Loop Never Completes
Cause: "Reset" not enabled on Split In Batches
Solution:
- Select "Split In Batches" node
- Options → Reset: ✓
- Save and re-run
Issue: Duplicate Records Despite Upsert
Cause: Conflict column mismatch
Solution:
Issue: Out of Memory on Large Datasets
Solutions:
-
Reduce batch size:
-
Limit workflow execution data:
-
Stream large responses:
Issue: Slow Database Inserts
Optimize:
Testing the Complete Workflow
Test Checklist
✅ Smoke Test:
- Execute workflow manually
- Verify all nodes execute successfully
- Check database has data
- Confirm report received
✅ Data Validation:
✅ Error Handling:
- Temporarily break API URL
- Verify error workflow triggers
- Check error logged to database
- Confirm alert received
✅ Performance:
- Test with large dataset (100+ records)
- Verify batching works
- Check execution time
- Monitor memory usage
Production Checklist
Before deploying to production:
- Database credentials use environment variables
- Error workflow configured
- Monitoring and alerting set up
- Workflow tested with production-size data
- Execution data retention configured
- Schedule set appropriately
- Documentation updated
- Team trained on workflow
- Backup and recovery plan in place
- Performance benchmarked
Key Concepts Learned
✅ ETL Patterns
- Extract: API data fetching
- Transform: Validation, cleaning, flattening
- Load: Database upserts
✅ Batch Processing
- Split large datasets
- Process in chunks
- Loop until complete
✅ Data Quality
- Validation rules
- Error handling
- Data cleansing
✅ Production Practices
- Logging and auditing
- Error alerting
- Performance monitoring
- Incremental sync
✅ Database Operations
- INSERT, UPDATE, UPSERT
- Indexes for performance
- Transactions and consistency
Next Steps
Congratulations! 🎉
You've built a production-ready data pipeline! You now understand:
- End-to-end ETL workflows
- Database integration
- Batch processing
- Error handling
- Production monitoring
Continue Learning
Apply These Skills:
-
Modify for your data:
- Connect to your actual API
- Adjust transformation logic
- Add your business rules
-
Expand the pipeline:
- Add data enrichment (API lookups)
- Implement data quality scoring
- Create data lineage tracking
-
Build related workflows:
- Data validation monitors
- Scheduled reports
- Alert workflows
-
Explore advanced topics:
- Best Practices
- Use Cases
- dbt Integration (SQL transformations)
Recommended Projects
Build these to practice:
Data Engineering:
- Multi-source data aggregation
- Real-time sync with webhooks
- Change data capture (CDC)
- Data quality monitoring
Business Intelligence:
- Automated reporting dashboards
- KPI tracking workflows
- Anomaly detection alerts
Additional Resources
Tools & Extensions
- dbt: SQL transformations in warehouse
- Great Expectations: Data validation framework
- Airflow: Complex orchestration
- Metabase/Superset: BI dashboards
Further Reading
Get Help
Questions? Issues?
- Community: community.n8n.io
- Documentation: docs.n8n.io
- Discord: Real-time help
- Stack Overflow: Technical Q&A
Need production assistance? Contact me for consulting, architecture review, or team training.
→ What's Next?
- Explore Use Cases for your industry
- Review Best Practices for optimization
- Check out dbt for warehouse transformations
Happy building! 🚀