Code of the Day
AdvancedWorkflow Orchestration

Prefect in practice

Define a two-task Prefect flow, run it locally, and observe task states — all in ordinary Python without any external infrastructure.

WorkflowAdvanced10 min read
Recommended first
By the end of this lesson you will be able to:
  • Define a Prefect flow with two tasks that pass data between them
  • Run the flow locally and observe task states printed to the terminal
  • Add retry configuration to a task and confirm it fires on simulated failure

The previous lesson explained Prefect's flow/task model. Here you will write one. Because Prefect is not available in the in-browser runtime, the demo simulates its decorator behaviour using a lightweight wrapper — the structure and output mirror exactly what you would see running real Prefect locally.

Python — editable, runs in your browser

Run the snippet and observe:

  1. fetch_data fails on its first attempt and retries automatically.
  2. process_data starts only after fetch_data succeeds — Prefect inferred the dependency from the fact that process_data receives fetch_data's return value.
  3. The flow prints a completion time and the task state log shows the retry.

Real Prefect code

With Prefect installed (pip install prefect), the real version of this pipeline is nearly identical:

from prefect import flow, task
import requests

@task(retries=2, retry_delay_seconds=10, log_prints=True)
def fetch_data(url: str) -> list:
    response = requests.get(url, timeout=30)
    response.raise_for_status()
    records = response.json()
    print(f"Fetched {len(records)} records")
    return records

@task(log_prints=True)
def process_data(records: list) -> list:
    filtered = [r for r in records if r["value"] >= 20]
    print(f"Filtered to {len(filtered)} records")
    return filtered

@flow(name="example-pipeline", log_prints=True)
def pipeline(url: str = "https://api.example.com/data") -> list:
    raw = fetch_data(url)
    return process_data(raw)

if __name__ == "__main__":
    pipeline()

log_prints=True routes print() calls to the Prefect logger, making them visible in the Prefect UI. Run with python pipeline.py or schedule it via prefect deploy.

Prefect infers task dependencies from data flow. If you want two tasks to run in parallel, call them without passing the output of one to the other — Prefect will submit both immediately and wait for both to complete before proceeding.

Parametrised runs

Because the flow accepts url as a parameter, you can run it against different endpoints without touching the code:

# CLI (requires prefect deployment)
prefect flow-run create example-pipeline --param url=https://staging.api.example.com/data

Or from Python:

pipeline(url="https://staging.api.example.com/data")

This makes testing against staging trivial and production runs auditable — the parameters used for each run are stored in the Prefect run record.

Where to go next

Next: lab — DAG pipeline — express a four-step pipeline as both a Makefile and a Prefect flow, then compare the developer experience of each approach.

Finished reading? Mark it complete to track your progress.

On this page