A context manager that acquires and releases concurrency slots from the
given concurrency limits.
Parameters:
Name
Type
Description
Default
names
Union[str, List[str]]
The names of the concurrency limits to acquire slots from.
required
occupy
int
The number of slots to acquire and hold from each limit.
1
timeout_seconds
Optional[float]
The number of seconds to wait for the slots to be acquired before
raising a TimeoutError. A timeout of None will wait indefinitely.
None
Raises:
Type
Description
TimeoutError
If the slots are not acquired within the given timeout.
A simple example of using the async concurrency context manager:
fromprefect.concurrency.asyncioimportconcurrencyasyncdefresource_heavy():asyncwithconcurrency("test",occupy=1):print("Resource heavy task")asyncdefmain():awaitresource_heavy()
@asynccontextmanagerasyncdefconcurrency(names:Union[str,List[str]],occupy:int=1,timeout_seconds:Optional[float]=None,):"""A context manager that acquires and releases concurrency slots from the given concurrency limits. Args: names: The names of the concurrency limits to acquire slots from. occupy: The number of slots to acquire and hold from each limit. timeout_seconds: The number of seconds to wait for the slots to be acquired before raising a `TimeoutError`. A timeout of `None` will wait indefinitely. Raises: TimeoutError: If the slots are not acquired within the given timeout. Example: A simple example of using the async `concurrency` context manager: ```python from prefect.concurrency.asyncio import concurrency async def resource_heavy(): async with concurrency("test", occupy=1): print("Resource heavy task") async def main(): await resource_heavy() ``` """names=namesifisinstance(names,list)else[names]withtimeout_async(seconds=timeout_seconds):limits=await_acquire_concurrency_slots(names,occupy)acquisition_time=pendulum.now("UTC")emitted_events=_emit_concurrency_acquisition_events(limits,occupy)try:yieldfinally:occupancy_period=cast(Interval,(pendulum.now("UTC")-acquisition_time))await_release_concurrency_slots(names,occupy,occupancy_period.total_seconds())_emit_concurrency_release_events(limits,occupy,emitted_events)
Block execution until an occupy number of slots of the concurrency
limits given in names are acquired. Requires that all given concurrency
limits have a slot decay.
Parameters:
Name
Type
Description
Default
names
Union[str, List[str]]
The names of the concurrency limits to acquire slots from.
required
occupy
int
The number of slots to acquire and hold from each limit.
1
Source code in src/prefect/concurrency/asyncio.py
767778798081828384858687
asyncdefrate_limit(names:Union[str,List[str]],occupy:int=1):"""Block execution until an `occupy` number of slots of the concurrency limits given in `names` are acquired. Requires that all given concurrency limits have a slot decay. Args: names: The names of the concurrency limits to acquire slots from. occupy: The number of slots to acquire and hold from each limit. """names=namesifisinstance(names,list)else[names]limits=await_acquire_concurrency_slots(names,occupy,mode="rate_limit")_emit_concurrency_acquisition_events(limits,occupy)