Skip to main content
Use global concurrency limits to control how many operations run simultaneously, and rate limits to control how frequently operations can start. This guide shows you how to create, manage, and use these limits in your workflows. For a deeper understanding of how global concurrency limits work and when to use them, see the global concurrency limits concept page.

Manage global concurrency and rate limits

You can create, read, edit, and delete concurrency limits through the Prefect UI, CLI, Python SDK, Terraform, or API. When creating a concurrency limit, you can specify:
  • Name: How you’ll reference the limit in your code (no special characters like /, %, &, >, <)
  • Concurrency Limit: Maximum number of slots available
  • Slot Decay Per Second: Rate at which slots are released (required for rate limiting)
  • Active: Whether the limit is enforced (true) or disabled (false)

Using the UI

Navigate to the Concurrency section in the Prefect UI to create, update, and delete concurrency limits. Concurrency limits in the UI

Using the CLI

You can manage global concurrency with the Prefect CLI. Create a new concurrency limit with the prefect gcl create command:
prefect gcl create my-concurrency-limit --limit 5 --slot-decay-per-second 1.0
Inspect a concurrency limit:
prefect gcl inspect my-concurrency-limit
Update a concurrency limit:
prefect gcl update my-concurrency-limit --limit 10
prefect gcl update my-concurrency-limit --disable
Delete a concurrency limit:
prefect gcl delete my-concurrency-limit
Are you sure you want to delete global concurrency limit 'my-concurrency-limit'? [y/N]: y
Deleted global concurrency limit with name 'my-concurrency-limit'.
See all available commands and options with prefect gcl --help.

Using Terraform

You can manage global concurrency with the Terraform provider for Prefect.

Using the API

You can manage global concurrency with the Prefect API.

Use the concurrency context manager

Control concurrent operations using the concurrency context manager. Choose the synchronous or asynchronous version based on your code.
By default, if a concurrency limit doesn’t exist or lease renewal fails, a warning is logged but execution continues.Use strict=True to raise an error instead. This ensures concurrency enforcement is guaranteed, useful for preventing resource exhaustion like database connection pool limits.

Synchronous usage

from prefect import flow, task
from prefect.concurrency.sync import concurrency
from prefect.futures import wait


@task
def process_data(x, y):
    with concurrency("database", occupy=1):
        return x + y


@flow
def my_flow():
    futures = []
    for x, y in [(1, 2), (2, 3), (3, 4), (4, 5)]:
        futures.append(process_data.submit(x, y))

    wait(futures)


if __name__ == "__main__":
    my_flow()

Asynchronous usage

import asyncio
from prefect import flow, task
from prefect.concurrency.asyncio import concurrency
from prefect.futures import wait


@task
async def process_data(x, y):
    async with concurrency("database", occupy=1):
        return x + y


@flow
def my_flow():
    futures = []
    for x, y in [(1, 2), (2, 3), (3, 4), (4, 5)]:
        futures.append(process_data.submit(x, y))

    wait(futures)


if __name__ == "__main__":
    asyncio.run(my_flow())
In both examples, the concurrency context manager occupies one slot on the database concurrency limit. If no slots are available, execution blocks until a slot becomes available.

Using strict mode

Enable strict mode to ensure errors are raised if the limit doesn’t exist or if lease renewal fails:
from prefect import flow, task
from prefect.concurrency.sync import concurrency

@task
def process_critical_data(x, y):
    # strict=True ensures this task fails fast if concurrency can't be enforced
    with concurrency("database", occupy=1, strict=True):
        return x + y

@flow
def critical_flow():
    process_critical_data(1, 2)

Use rate_limit

Control the frequency of operations using the rate_limit function.
The concurrency limit must have slot_decay_per_second configured. Without it, rate_limit will fail.

Synchronous usage

from prefect import flow, task
from prefect.concurrency.sync import rate_limit


@task
def make_http_request():
    rate_limit("rate-limited-api")
    print("Making an HTTP request...")


@flow
def my_flow():
    for _ in range(10):
        make_http_request.submit()


if __name__ == "__main__":
    my_flow()

Asynchronous usage

import asyncio

from prefect import flow, task
from prefect.concurrency.asyncio import rate_limit


@task
async def make_http_request():
    await rate_limit("rate-limited-api")
    print("Making an HTTP request...")


@flow
def my_flow():
    for _ in range(10):
        make_http_request.submit()


if __name__ == "__main__":
    asyncio.run(my_flow())
The rate_limit function ensures requests are made at a controlled pace based on the concurrency limit’s slot_decay_per_second setting.

Use outside of flows

You can use concurrency and rate_limit in any Python code, not just within flows.
import asyncio

from prefect.concurrency.asyncio import rate_limit


async def main():
    for _ in range(10):
        await rate_limit("rate-limited-api")
        print("Making an HTTP request...")



if __name__ == "__main__":
    asyncio.run(main())

Common patterns

Throttle task submission

Prevent overwhelming downstream systems by controlling how quickly tasks are submitted:
from prefect import flow, task
from prefect.concurrency.sync import rate_limit


@task
def my_task(i):
    return i


@flow
def my_flow():
    for _ in range(100):
        rate_limit("slow-my-flow", occupy=1)
        my_task.submit(1)


if __name__ == "__main__":
    my_flow()

Limit database connections

Prevent exhausting your database connection pool by limiting concurrent queries:
from prefect import flow, task, concurrency
from myproject import db

@task
def database_query(query):
    # Here we request a single slot on the 'database' concurrency limit. This
    # will block in the case that all of the database connections are in use
    # ensuring that we never exceed the maximum number of database connections.
    with concurrency("database", occupy=1):
        result = db.execute(query)
        return result

@flow
def my_flow():
    queries = ["SELECT * FROM table1", "SELECT * FROM table2", "SELECT * FROM table3"]

    for query in queries:
        database_query.submit(query)

if __name__ == "__main__":
    my_flow()

Control parallel processing

Process data in controlled batches to avoid resource exhaustion:
import asyncio
from prefect.concurrency.sync import concurrency


async def process_data(data):
    print(f"Processing: {data}")
    await asyncio.sleep(1)
    return f"Processed: {data}"


async def main():
    data_items = list(range(100))
    processed_data = []

    while data_items:
        with concurrency("data-processing", occupy=5):
            chunk = [data_items.pop() for _ in range(5)]
            processed_data += await asyncio.gather(
                *[process_data(item) for item in chunk]
            )

    print(processed_data)


if __name__ == "__main__":
    asyncio.run(main())

Next steps

For more information about how global concurrency limits work and when to use them versus other concurrency controls, see the global concurrency limits concept page.