Skip to main content
View full project on GitHub Build data pipelines with Prefect Assets – declarative, dependency-aware, and observable. This example demonstrates how to use Prefect Assets to build a social media analytics pipeline. The full implementation with ATProto integration, dbt transformations, and a Streamlit dashboard dashboard is available at: https://github.com/zzstoatzz/atproto-dashboard

Key Prefect Features

  • @materialize decorator – Transform functions into versioned, cacheable data assets
  • Automatic dependency tracking – Prefect infers dependencies from function parameters
  • S3-backed assets – Store assets directly in S3 with built-in versioning
  • Artifact creation – Generate rich UI artifacts for observability
  • Flow orchestration – Coordinate asset materialization with retries and scheduling

The Pattern: Asset-Based Data Pipelines

Instead of manually managing data dependencies and storage:
  1. Define assets with @materialize and unique keys (e.g., S3 paths)
  2. Declare dependencies via function parameters or asset_deps
  3. Let Prefect handle execution order, caching, and storage
  4. Get automatic lineage tracking and observability

Running This Example

This simplified example demonstrates the core patterns. For the complete implementation:
git clone https://github.com/zzstoatzz/atproto-dashboard
cd atproto-dashboard
# Follow README for setup and configuration

Core Pattern: Define Assets and Dependencies

Assets represent data products in your pipeline. Each asset has:
  • A unique key (often an S3 path or other storage location)
  • A materialization function decorated with @materialize
  • Dependencies (automatically tracked via function parameters)
Define assets with descriptive keys
import json
from datetime import datetime, timezone
from pathlib import Path
from textwrap import dedent
from typing import Any

from prefect import flow
from prefect.artifacts import create_markdown_artifact
from prefect.assets import Asset, materialize

raw_data_asset = Asset(key="pipeline://raw_data")
processed_data_asset = Asset(key="pipeline://processed_data")
analytics_asset = Asset(key="pipeline://analytics")

Step 1: Fetch Raw Data

The first asset fetches data from an external source. In the full implementation, this connects to the ATProto/Bluesky API to fetch social media data.
@materialize(raw_data_asset)
def fetch_raw_data() -> dict[str, Any]:
    """Fetch raw data from an external source."""
    print("Fetching raw data...")

    data = {
        "items": ["item1", "item2", "item3"],
        "fetched_at": datetime.now(timezone.utc).isoformat(),
        "count": 3,
    }

    print(f"✓ Fetched {data['count']} items")
    return data


Step 2: Process the Data

This asset demonstrates automatic dependency tracking. By accepting raw_data as a parameter, Prefect knows this asset depends on raw_data_asset and ensures it’s materialized first. In production, this would store data to S3 with partitioning. Here we use local storage for simplicity.
@materialize(processed_data_asset)
def process_data(raw_data: dict[str, Any]) -> dict[str, Any]:
    """Process raw data into a structured format with automatic dependency tracking."""
    print(f"Processing {raw_data['count']} items...")

    processed = {
        "items": [item.upper() for item in raw_data["items"]],
        "processed_at": datetime.now(timezone.utc).isoformat(),
        "source_count": raw_data["count"],
    }

    storage_dir = Path("./data")
    storage_dir.mkdir(exist_ok=True)
    with open(storage_dir / "processed.json", "w") as f:
        json.dump(processed, f, indent=2)

    print(f"✓ Processed and stored {len(processed['items'])} items")
    return processed


Step 3: Create Analytics

This asset demonstrates chained dependencies (it depends on processed_data, which depends on raw_data) and artifact creation for rich observability in the Prefect UI. In the full implementation, this runs dbt transformations to create analytics models.
@materialize(analytics_asset)
def create_analytics(processed_data: dict[str, Any]) -> dict[str, Any]:
    """Generate analytics with chained dependencies and create UI artifacts."""
    print("Creating analytics...")

    analytics = {
        "total_items": len(processed_data["items"]),
        "source_timestamp": processed_data["processed_at"],
        "created_at": datetime.now(timezone.utc).isoformat(),
    }

    create_markdown_artifact(
        key="analytics-summary",
        markdown=dedent(
            f"""
            # Analytics Summary

            - **Total Items**: {analytics["total_items"]}
            - **Created**: {analytics["created_at"]}
            - **Source**: {analytics["source_timestamp"]}

            This artifact appears in the Prefect UI for observability.
            """
        ),
        description="Analytics summary for this pipeline run",
    )

    print(f"✓ Analytics created for {analytics['total_items']} items")
    return analytics


Flow: Orchestrate Asset Materialization

The flow calls each asset function, and Prefect handles:
  • Dependency resolution (ensuring correct execution order)
  • Automatic caching (skip re-computation if upstream assets haven’t changed)
  • Observability (tracking lineage and execution in the UI)
@flow(name="asset-pipeline-demo", log_prints=True)
def run_asset_pipeline() -> dict[str, Any]:
    """
    Orchestrate the asset pipeline.

    By calling the materialization functions in sequence and passing results,
    Prefect automatically:
    - Tracks dependencies between assets
    - Ensures execution order
    - Provides observability in the UI
    - Enables caching and versioning
    """
    print("🚀 Starting asset pipeline")

    # Materialize assets - Prefect tracks dependencies automatically
    raw = fetch_raw_data()
    processed = process_data(raw)
    analytics = create_analytics(processed)

    print(f"✅ Pipeline complete! Processed {analytics['total_items']} items")
    return analytics


if __name__ == "__main__":
    run_asset_pipeline()

What Makes Assets Powerful?

  1. Automatic Dependency Tracking
    • Prefect infers dependencies from function parameters
    • Ensures correct execution order without manual DAG definition
    • Tracks asset lineage for observability
  2. Caching and Versioning
    • Assets are versioned based on their inputs
    • Skip re-computation when upstream data hasn’t changed
    • Efficient incremental processing
  3. Storage Integration
    • Asset keys can be S3 paths, database URIs, or any identifier
    • Built-in support for prefect-aws, prefect-gcp, etc.
    • Automatic data persistence and retrieval
  4. Observability
    • Every materialization tracked in the Prefect UI
    • Artifacts provide rich context (tables, markdown, links)
    • Full lineage and execution history
  5. Production Ready
    • Built-in retry logic and error handling
    • Scheduling and automation via Prefect deployments
    • Scales from local development to cloud production

Full Implementation

This example demonstrates the core patterns. The complete implementation includes:
  • Real ATProto API integration
  • S3-backed asset storage with partitioning
  • dbt transformations with DuckDB
  • Streamlit dashboard for visualization
  • Production-ready error handling and logging
See the full project at: https://github.com/zzstoatzz/atproto-dashboard

Learn More

I