Use global concurrency limits to control how many operations run simultaneously, and rate limits to control how frequently operations can start. This guide shows you how to create, manage, and use these limits in your workflows.
For a deeper understanding of how global concurrency limits work and when to use them, see the global concurrency limits concept page.
Manage global concurrency and rate limits
You can create, read, edit, and delete concurrency limits through the Prefect UI, CLI, Python SDK, Terraform, or API.
When creating a concurrency limit, you can specify:
- Name: How you’ll reference the limit in your code (no special characters like
/, %, &, >, <)
- Concurrency Limit: Maximum number of slots available
- Slot Decay Per Second: Rate at which slots are released (required for rate limiting)
- Active: Whether the limit is enforced (
true) or disabled (false)
Using the UI
Navigate to the Concurrency section in the Prefect UI to create, update, and delete concurrency limits.
Using the CLI
You can manage global concurrency with the Prefect CLI.
Create a new concurrency limit with the prefect gcl create command:
prefect gcl create my-concurrency-limit --limit 5 --slot-decay-per-second 1.0
Inspect a concurrency limit:
prefect gcl inspect my-concurrency-limit
Update a concurrency limit:
prefect gcl update my-concurrency-limit --limit 10
prefect gcl update my-concurrency-limit --disable
Delete a concurrency limit:
prefect gcl delete my-concurrency-limit
Are you sure you want to delete global concurrency limit 'my-concurrency-limit'? [y/N]: y
Deleted global concurrency limit with name 'my-concurrency-limit'.
See all available commands and options with prefect gcl --help.
You can manage global concurrency with the Terraform provider for Prefect.
Using the API
You can manage global concurrency with the Prefect API.
Use the concurrency context manager
Control concurrent operations using the concurrency context manager. Choose the synchronous or asynchronous version based on your code.
By default, if a concurrency limit doesn’t exist or lease renewal fails, a warning is logged but execution continues.Use strict=True to raise an error instead. This ensures concurrency enforcement is guaranteed, useful for preventing resource exhaustion like database connection pool limits.
Synchronous usage
from prefect import flow, task
from prefect.concurrency.sync import concurrency
from prefect.futures import wait
@task
def process_data(x, y):
with concurrency("database", occupy=1):
return x + y
@flow
def my_flow():
futures = []
for x, y in [(1, 2), (2, 3), (3, 4), (4, 5)]:
futures.append(process_data.submit(x, y))
wait(futures)
if __name__ == "__main__":
my_flow()
Asynchronous usage
import asyncio
from prefect import flow, task
from prefect.concurrency.asyncio import concurrency
from prefect.futures import wait
@task
async def process_data(x, y):
async with concurrency("database", occupy=1):
return x + y
@flow
def my_flow():
futures = []
for x, y in [(1, 2), (2, 3), (3, 4), (4, 5)]:
futures.append(process_data.submit(x, y))
wait(futures)
if __name__ == "__main__":
asyncio.run(my_flow())
In both examples, the concurrency context manager occupies one slot on the database concurrency limit. If no slots are available, execution blocks until a slot becomes available.
Using strict mode
Enable strict mode to ensure errors are raised if the limit doesn’t exist or if lease renewal fails:
from prefect import flow, task
from prefect.concurrency.sync import concurrency
@task
def process_critical_data(x, y):
# strict=True ensures this task fails fast if concurrency can't be enforced
with concurrency("database", occupy=1, strict=True):
return x + y
@flow
def critical_flow():
process_critical_data(1, 2)
Use rate_limit
Control the frequency of operations using the rate_limit function.
The concurrency limit must have slot_decay_per_second configured. Without it, rate_limit will fail.
Synchronous usage
from prefect import flow, task
from prefect.concurrency.sync import rate_limit
@task
def make_http_request():
rate_limit("rate-limited-api")
print("Making an HTTP request...")
@flow
def my_flow():
for _ in range(10):
make_http_request.submit()
if __name__ == "__main__":
my_flow()
Asynchronous usage
import asyncio
from prefect import flow, task
from prefect.concurrency.asyncio import rate_limit
@task
async def make_http_request():
await rate_limit("rate-limited-api")
print("Making an HTTP request...")
@flow
def my_flow():
for _ in range(10):
make_http_request.submit()
if __name__ == "__main__":
asyncio.run(my_flow())
The rate_limit function ensures requests are made at a controlled pace based on the concurrency limit’s slot_decay_per_second setting.
Use outside of flows
You can use concurrency and rate_limit in any Python code, not just within flows.
import asyncio
from prefect.concurrency.asyncio import rate_limit
async def main():
for _ in range(10):
await rate_limit("rate-limited-api")
print("Making an HTTP request...")
if __name__ == "__main__":
asyncio.run(main())
Common patterns
Throttle task submission
Prevent overwhelming downstream systems by controlling how quickly tasks are submitted:
from prefect import flow, task
from prefect.concurrency.sync import rate_limit
@task
def my_task(i):
return i
@flow
def my_flow():
for _ in range(100):
rate_limit("slow-my-flow", occupy=1)
my_task.submit(1)
if __name__ == "__main__":
my_flow()
Limit database connections
Prevent exhausting your database connection pool by limiting concurrent queries:
from prefect import flow, task, concurrency
from myproject import db
@task
def database_query(query):
# Here we request a single slot on the 'database' concurrency limit. This
# will block in the case that all of the database connections are in use
# ensuring that we never exceed the maximum number of database connections.
with concurrency("database", occupy=1):
result = db.execute(query)
return result
@flow
def my_flow():
queries = ["SELECT * FROM table1", "SELECT * FROM table2", "SELECT * FROM table3"]
for query in queries:
database_query.submit(query)
if __name__ == "__main__":
my_flow()
Control parallel processing
Process data in controlled batches to avoid resource exhaustion:
import asyncio
from prefect.concurrency.sync import concurrency
async def process_data(data):
print(f"Processing: {data}")
await asyncio.sleep(1)
return f"Processed: {data}"
async def main():
data_items = list(range(100))
processed_data = []
while data_items:
with concurrency("data-processing", occupy=5):
chunk = [data_items.pop() for _ in range(5)]
processed_data += await asyncio.gather(
*[process_data(item) for item in chunk]
)
print(processed_data)
if __name__ == "__main__":
asyncio.run(main())
Next steps
For more information about how global concurrency limits work and when to use them versus other concurrency controls, see the global concurrency limits concept page.