Both @materialize
and asset_deps
accept either a string referencing an asset key or a full Asset
class instance. Using the Asset
class is the way to provide additional metadata like names, descriptions, and ownership information beyond just the key.
The Asset class
While you can use simple string keys with the @materialize
decorator, the Asset
class provides more control over asset properties and metadata. Create Asset
instances to add organizational context and improve discoverability.
Asset initialization fields
The Asset
class accepts the following parameters:
from prefect.assets import Asset, AssetProperties
# Simple asset with just a key
basic_asset = Asset(key="s3://my-bucket/data.csv")
# Asset with full properties
detailed_asset = Asset(
key="s3://my-bucket/processed-data.csv",
properties=AssetProperties(
name="Processed Customer Data",
description="Clean customer data with PII removed",
owners=["data-team@company.com", "alice@company.com"],
url="https://dashboard.company.com/datasets/customer-data"
)
)
Each of these fields will be displayed alongside the asset in the UI.
Interactive fields
The owners field can optionally reference user emails, as well as user and team handles within your Prefect Cloud workspace.
The URL field becomes a clickable link when this asset is displayed.
Updates occur at runtime
Updates to asset metadata are always performed at workflow runtime whenever a materializing task is executed that references the asset.
Using assets in materializations
Once you’ve defined your Asset
instances, use them directly with the @materialize
decorator:
from prefect import flow
from prefect.assets import Asset, AssetProperties, materialize
detailed_asset = Asset(
key="s3://my-bucket/processed-data.csv",
properties=AssetProperties(
name="Processed Customer Data",
description="Clean customer data with PII removed",
owners=["data-team@company.com", "alice@company.com"],
url="https://dashboard.company.com/datasets/customer-data"
)
)
properties = AssetProperties(
name="Sales Analytics Dataset",
description="This dataset contains daily sales figures by region along with customer segmentation data",
owners=["analytics-team", "john.doe@company.com"],
url="https://analytics.company.com/sales-dashboard"
)
sales_asset = Asset(
key="snowflake://warehouse/analytics/sales_summary",
properties=properties
)
@materialize(detailed_asset)
def process_customer_data():
# Your processing logic here
pass
@materialize(sales_asset)
def generate_sales_report():
# Your reporting logic here
pass
@flow
def analytics_pipeline():
process_customer_data()
generate_sales_report()
Beyond static properties, you can add dynamic metadata during task execution. This is useful for tracking runtime information like row counts, processing times, or data quality metrics.
The preferred approach is to use the add_metadata()
method on your Asset
instances. This prevents typos in asset keys and provides better IDE support:
from prefect.assets import Asset, AssetProperties, materialize
customer_data = Asset(
key="s3://my-bucket/customer-data.csv",
properties=AssetProperties(
name="Customer Data",
owners=["data-team@company.com"]
)
)
@materialize(customer_data)
def process_customers():
# Your processing logic
result = perform_customer_processing()
# Add runtime metadata
customer_data.add_metadata({
"record_count": len(result),
"processing_duration_seconds": 45.2,
"data_quality_score": 0.95,
"last_updated": "2024-01-15T10:30:00Z"
})
return result
Alternatively, you can use the add_asset_metadata()
function, which requires specifying the asset key:
from prefect.assets import materialize, add_asset_metadata
@materialize("s3://my-bucket/processed-data.csv")
def process_data():
result = perform_processing()
add_asset_metadata(
"s3://my-bucket/processed-data.csv",
{"rows_processed": len(result), "processing_time": "2.5s"}
)
return result
You can call metadata methods multiple times to accumulate information:
@materialize(customer_data)
def comprehensive_processing():
# First processing step
raw_data = extract_data()
customer_data.add_metadata({"raw_records": len(raw_data)})
# Second processing step
cleaned_data = clean_data(raw_data)
customer_data.add_metadata({
"cleaned_records": len(cleaned_data),
"records_removed": len(raw_data) - len(cleaned_data)
})
# Final step
final_data = enrich_data(cleaned_data)
customer_data.add_metadata({
"final_records": len(final_data),
"enrichment_success_rate": 0.92
})
return final_data
Explicit asset dependencies
While Prefect automatically infers dependencies from your task graph, you can explicitly declare asset relationships using the asset_deps
parameter. This is useful when:
- The task graph doesn’t fully capture your data dependencies due to dynamic execution rules
- You need to reference assets that aren’t directly passed between tasks
- You want to be explicit about critical dependencies for documentation purposes
Hard-coding dependencies
Use asset_deps
to explicitly declare which assets your materialization depends on. You can reference assets by key string or by full Asset
instance:
from prefect import flow
from prefect.assets import Asset, AssetProperties, materialize
# Define your assets
raw_data_asset = Asset(key="s3://my-bucket/raw-data.csv")
config_asset = Asset(key="s3://my-bucket/processing-config.json")
processed_asset = Asset(key="s3://my-bucket/processed-data.csv")
@materialize(raw_data_asset)
def extract_raw_data():
pass
@materialize(
processed_asset,
asset_deps=[raw_data_asset, config_asset] # Explicit dependencies
)
def process_data():
# This function depends on both raw data and config
# even if they're not directly passed as parameters
pass
@flow
def explicit_dependencies_flow():
extract_raw_data()
process_data() # Explicitly depends on raw_data_asset and config_asset
Mixing inferred and explicit dependencies
You can combine task graph inference with explicit dependencies:
from prefect import flow
from prefect.assets import Asset, materialize
upstream_asset = Asset(key="s3://my-bucket/upstream.csv")
config_asset = Asset(key="s3://my-bucket/config.json")
downstream_asset = Asset(key="s3://my-bucket/downstream.csv")
@materialize(upstream_asset)
def create_upstream():
return "upstream_data"
@materialize(
downstream_asset,
asset_deps=[config_asset] # Explicit dependency on config
)
def create_downstream(upstream_data): # Inferred dependency on upstream_asset
# This asset depends on:
# 1. upstream_asset (inferred from task graph)
# 2. config_asset (explicit via asset_deps)
pass
@flow
def mixed_dependencies():
data = create_upstream()
create_downstream(data)
Best practices for asset_deps
references
Use string keys when referencing assets that are materialized by other Prefect workflows. This avoids duplicate metadata definitions and lets the materializing workflow be the source of truth:
from prefect.assets import materialize
# Good: Reference by key when another workflow materializes the asset
@materialize(
"s3://my-bucket/final-report.csv",
asset_deps=["s3://my-bucket/data-from-other-workflow.csv"] # String key
)
def create_report():
pass
Use full Asset
instances when referencing assets that are completely external to Prefect. This provides metadata about external systems that Prefect wouldn’t otherwise know about:
from prefect.assets import Asset, AssetProperties, materialize
# Good: Full Asset for external systems
external_database = Asset(
key="postgres://prod-db/public/users",
properties=AssetProperties(
name="Production Users Table",
description="Main user database maintained by the platform team",
owners=["platform-team@company.com"],
url="https://internal-db-dashboard.com/users"
)
)
@materialize(
"s3://my-bucket/user-analytics.csv",
asset_deps=[external_database] # Full Asset for external system
)
def analyze_users():
pass
Updating asset properties
Asset properties should have one source of truth to avoid conflicts. When you materialize an asset with properties, those properties perform a complete overwrite of all metadata fields for that asset.
Important
Any Asset
instance with properties will completely replace all existing metadata. Partial updates are not supported - you must provide all the metadata you want to preserve.
from prefect.assets import Asset, AssetProperties, materialize
# Initial materialization with full properties
initial_asset = Asset(
key="s3://my-bucket/evolving-data.csv",
properties=AssetProperties(
name="Evolving Dataset",
description="Initial description",
owners=["team-a@company.com"],
url="https://dashboard.company.com/evolving-data"
)
)
@materialize(initial_asset)
def initial_creation():
pass
# Later materialization - OVERWRITES all properties
updated_asset = Asset(
key="s3://my-bucket/evolving-data.csv",
properties=AssetProperties(
name="Evolving Dataset", # Must include to preserve
description="Updated description with new insights", # Updated
owners=["team-a@company.com"], # Must include to preserve
# url is now None because it wasn't included
)
)
@materialize(updated_asset)
def update_dataset():
pass
# The final asset will have:
# - name: "Evolving Dataset" (preserved)
# - description: "Updated description with new insights" (updated)
# - owners: ["team-a@company.com"] (preserved)
# - url: None (lost because not included in update)
Best practice
Designate one workflow as the authoritative source for each asset’s metadata. Other workflows that reference the asset should use string keys only to avoid conflicting metadata definitions.
Further Reading