Task run concurrency limits help prevent too many tasks from running simultaneously. For example, if many tasks across multiple flows are designed to interact with a database that only allows 10 connections.

Task run concurrency limits use task tags. You can specify an optional concurrency limit as the maximum number of concurrent task runs in a Running state for tasks with a given tag.

Tag-based task concurrency is different from Global Concurrency Limits, though they can be used to achieve similar outcomes. Global Concurrency Limits are a more general way to control concurrency for any python-based operation, whereas tag-based concurrency limits are specific to Prefect tasks.

If a task has multiple tags, it will run only if all tags have available concurrency.

Tags without specified concurrency limits are treated as unlimited. Setting a tag’s concurrency limit to 0 causes immediate abortion of any task runs with that tag, rather than delaying them.

Execution behavior

Task tag limits are checked whenever a task run attempts to enter a Running state.

If there are no concurrency slots available for any one of your task’s tags, it delays the transition to a Running state and instructs the client to try entering a Running state again in 30 seconds (or the value specified by the PREFECT_TASK_RUN_TAG_CONCURRENCY_SLOT_WAIT_SECONDS setting).

Configure concurrency limits

Flow run concurrency limits are set at a work pool and/or work queue level

While task run concurrency limits are configured through tags (as shown below), flow run concurrency limits are configured through work pools and/or work queues.

You can set concurrency limits on as few or as many tags as you wish. You can set limits through:

  • Prefect CLI
  • Prefect API by using PrefectClient Python client
  • Prefect server UI or Prefect Cloud

CLI

You can create, list, and remove concurrency limits with Prefect CLI concurrency-limit commands.

prefect concurrency-limit [command] [arguments]
CommandDescription
createCreate a concurrency limit by specifying a tag and limit.
deleteDelete the concurrency limit set on the specified tag.
inspectView details about a concurrency limit set on the specified tag.
lsView all defined concurrency limits.

For example, to set a concurrency limit of 10 on the ‘small_instance’ tag:

prefect concurrency-limit create small_instance 10

To delete the concurrency limit on the ‘small_instance’ tag:

prefect concurrency-limit delete small_instance

To view details about the concurrency limit on the ‘small_instance’ tag:

prefect concurrency-limit inspect small_instance

Python client

To update your tag concurrency limits programmatically, use PrefectClient.orchestration.create_concurrency_limit.

create_concurrency_limit takes two arguments:

  • tag specifies the task tag on which you’re setting a limit.
  • concurrency_limit specifies the maximum number of concurrent task runs for that tag.

For example, to set a concurrency limit of 10 on the ‘small_instance’ tag:

from prefect import get_client

async with get_client() as client:
    # set a concurrency limit of 10 on the 'small_instance' tag
    limit_id = await client.create_concurrency_limit(
        tag="small_instance", 
        concurrency_limit=10
        )

To remove all concurrency limits on a tag, use PrefectClient.delete_concurrency_limit_by_tag, passing the tag:

async with get_client() as client:
    # remove a concurrency limit on the 'small_instance' tag
    await client.delete_concurrency_limit_by_tag(tag="small_instance")

If you wish to query for the current limit on a tag, use PrefectClient.read_concurrency_limit_by_tag, passing the tag:

To see all of your limits across all of your tags, use PrefectClient.read_concurrency_limits.

async with get_client() as client:
    # query the concurrency limit on the 'small_instance' tag
    limit = await client.read_concurrency_limit_by_tag(tag="small_instance")