Source code for distributed.lock

from __future__ import annotations

import logging
import uuid

from distributed.semaphore import Semaphore

logger = logging.getLogger(__name__)

_no_value = object()


[docs]class Lock(Semaphore): """Distributed Centralized Lock .. warning:: This is using the ``distributed.Semaphore`` as a backend, which is susceptible to lease overbooking. For the Lock this means that if a lease is timing out, two or more instances could acquire the lock at the same time. To disable lease timeouts, set ``distributed.scheduler.locks.lease-timeout`` to `inf`, e.g. .. code-block:: python with dask.config.set({"distributed.scheduler.locks.lease-timeout": "inf"}): lock = Lock("x") ... Note, that without lease timeouts, the Lock may deadlock in case of cluster downscaling or worker failures. Parameters ---------- name: string (optional) Name of the lock to acquire. Choosing the same name allows two disconnected processes to coordinate a lock. If not given, a random name will be generated. client: Client (optional) Client to use for communication with the scheduler. If not given, the default global client will be used. Examples -------- >>> lock = Lock('x') # doctest: +SKIP >>> lock.acquire(timeout=1) # doctest: +SKIP >>> # do things with protected resource >>> lock.release() # doctest: +SKIP """ def __init__( self, name=None, client=_no_value, scheduler_rpc=None, loop=None, ): if client is not _no_value: import warnings warnings.warn( "The `client` parameter is deprecated. It is no longer necessary to pass a client to Lock.", DeprecationWarning, stacklevel=2, ) self.name = name or "lock-" + uuid.uuid4().hex super().__init__( max_leases=1, name=name, scheduler_rpc=scheduler_rpc, loop=loop, )
[docs] def acquire(self, blocking=True, timeout=None): """Acquire the lock Parameters ---------- blocking : bool, optional If false, don't wait on the lock in the scheduler at all. timeout : string or number or timedelta, optional Seconds to wait on the lock in the scheduler. This does not include local coroutine time, network transfer time, etc.. It is forbidden to specify a timeout when blocking is false. Instead of number of seconds, it is also possible to specify a timedelta in string format, e.g. "200ms". Examples -------- >>> lock = Lock('x') # doctest: +SKIP >>> lock.acquire(timeout="1s") # doctest: +SKIP Returns ------- True or False whether or not it successfully acquired the lock """ if not blocking: if timeout is not None: raise ValueError("can't specify a timeout for a non-blocking call") timeout = 0 return super().acquire(timeout=timeout)
async def _locked(self): val = await self.scheduler.semaphore_value(name=self.name) return val == 1 def locked(self): return self.sync(self._locked) def __getstate__(self): return self.name def __setstate__(self, state): self.__init__(name=state)