Code of the Day
AdvancedData Engineering

Lab: database-backed data pipeline

Build an end-to-end pipeline — discover files, validate records, store in SQLite, and aggregate results.

Lab · optionalPythonAdvanced22 min
By the end of this lesson you will be able to:
  • Parse and validate records with boolean logic
  • Bulk-load clean rows into an in-memory SQLite database
  • Query and aggregate stored data with parameterised SQL
  • Chain pathlib, sqlite3, and any/all into a coherent pipeline

Optional lab. The Data Engineering lessons gave you four tools in isolation. This lab puts them in sequence: the output of one stage is the input to the next. Work through the checkpoints in order — each one builds on the last.

A data pipeline has three stages that appear everywhere, from tiny scripts to production ETL jobs: extract (read raw data), transform (validate and clean), and load (store the result). Here, your CSV lives in memory as a list of dicts, your validator is a boolean predicate, your store is SQLite, and your output is an aggregation query. The wiring is yours to write.

Checkpoint 1 — validate a record

Before loading anything, decide whether a record is worth keeping. A validator is just a predicate — a function that returns True or False.

Validate a recordPython

Write validate_record(record) that returns True only if: (1) "name" is a non-empty string, and (2) "score" is a number (int or float) that is >= 0. Use all() over a list of conditions.

validate_record({"name": "Alice", "score": 95})Truevalidate_record({"name": "", "score": 95})False

Notice that missing name keyrecord.get("name") returns None, which fails the isinstance(…, str) check cleanly. That's the kind of defensive composition all() gives you for free.

Checkpoint 2 — load clean records into SQLite

Now connect the validator to the database. Filter out bad rows before inserting, use executemany for the bulk load, and return results as tuples.

Load validated records into SQLitePython

Write load_records(records) that: (1) filters records using validate_record, (2) creates an in-memory SQLite database with a "scores" table (name TEXT, score REAL), (3) bulk-inserts the valid rows with executemany and ? placeholders, and (4) returns all rows as a list of (name, score) tuples ordered by score DESC.

load_records([{"name": "A", "score": 90}, {"name": "", "score": 80}])[('A', 90.0)]

The separation of concerns here is deliberate: validate_record knows nothing about SQLite; load_records knows nothing about what makes a record valid. Each function does one thing — the architecture lesson's single-responsibility principle, applied to a pipeline.

Checkpoint 3 — aggregate with SQL

Your data is in the database. Now answer a question: who are the top scorers? Write a query that uses LIMIT and an ORDER BY to pull the top N rows.

Query the top N scoresPython

Write top_n(conn, n) that queries a "scores" table (name TEXT, score REAL) and returns the top n rows ordered by score descending as a list of (name, score) tuples. Use a ? placeholder for n.

top_n(conn, 2)[('A', 90.0), ('B', 80.0)]

Putting it all together

Here is the complete pipeline in about fifteen lines — each stage clearly separated and each tool doing its job:

import sqlite3
from pathlib import Path

RAW_RECORDS = [
    {"name": "Alice",  "score": 95},
    {"name": "Bob",    "score": 72},
    {"name": "",       "score": 80},   # will be rejected
    {"name": "Carol",  "score": 88},
    {"name": "David",  "score": -5},   # will be rejected
]

def validate_record(r):
    return all([
        isinstance(r.get("name"), str) and len(r["name"]) > 0,
        isinstance(r.get("score"), (int, float)) and r["score"] >= 0,
    ])

conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE scores (name TEXT, score REAL)")
valid = [r for r in RAW_RECORDS if validate_record(r)]
conn.executemany("INSERT INTO scores VALUES (?, ?)",
                 [(r["name"], r["score"]) for r in valid])
conn.commit()

top = conn.execute(
    "SELECT name, score FROM scores ORDER BY score DESC LIMIT ?", (3,)
).fetchall()
print(top)   # [('Alice', 95.0), ('Carol', 88.0), ('Bob', 72.0)]
conn.close()

Done?

All three checkpoints green? You've built a complete ETL pipeline — the same shape used in production jobs that process millions of rows, just at a smaller scale. The patterns hold: validate at the boundary, insert in bulk with parameters, answer questions with SQL. That completes the Data Engineering module.

Finished reading? Mark it complete to track your progress.

On this page