diff --git a/distributed/actor.py b/distributed/actor.py index 6ea45ce102..d68a80dd4c 100644 --- a/distributed/actor.py +++ b/distributed/actor.py @@ -151,7 +151,7 @@ def __getattr__(self, key): and getattr(thread_state, "actor", False) ): # actor calls actor on same worker - actor = self._worker.actors[self.key] + actor = self._worker.state.actors[self.key] attr = getattr(actor, key) if iscoroutinefunction(attr): diff --git a/distributed/nanny.py b/distributed/nanny.py index 4d54666b63..b3161605eb 100644 --- a/distributed/nanny.py +++ b/distributed/nanny.py @@ -57,11 +57,7 @@ wait_for, ) from distributed.worker import Worker, run -from distributed.worker_memory import ( - DeprecatedMemoryManagerAttribute, - DeprecatedMemoryMonitor, - NannyMemoryManager, -) +from distributed.worker_memory import NannyMemoryManager logger = logging.getLogger(__name__) @@ -286,11 +282,6 @@ def __init__( # type: ignore[no-untyped-def] self._listen_address = listen_address Nanny._instances.add(self) - # Deprecated attributes; use Nanny.memory_manager. instead - memory_limit = DeprecatedMemoryManagerAttribute() - memory_terminate_fraction = DeprecatedMemoryManagerAttribute() - memory_monitor = DeprecatedMemoryMonitor() - def __repr__(self): return "" % (self.worker_address, self.nthreads) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index b7c5aabacb..d7b8d5738a 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -67,7 +67,6 @@ from dask.core import istask, validate_key from dask.typing import Key, no_default from dask.utils import ( - _deprecated, _deprecated_kwarg, format_bytes, format_time, @@ -376,16 +375,6 @@ def unmanaged_recent(self) -> int: def optimistic(self) -> int: return self.managed + self.unmanaged_old - @property - def managed_in_memory(self) -> int: - warnings.warn("managed_in_memory has been renamed to managed", FutureWarning) - return self.managed - - @property - def managed_spilled(self) -> int: - warnings.warn("managed_spilled has been renamed to spilled", FutureWarning) - return self.spilled - def __repr__(self) -> str: return ( f"Process memory (RSS) : {format_bytes(self.process)}\n" @@ -1069,21 +1058,11 @@ def remove_group(self, tg: TaskGroup) -> None: if self._types[typename] == 0: del self._types[typename] - @property - @_deprecated(use_instead="groups") # type: ignore[untyped-decorator] - def active(self) -> Set[TaskGroup]: - return self.groups - @property def groups(self) -> Set[TaskGroup]: """Insertion-sorted set-like of groups associated to this prefix""" return self._groups.keys() - @property - @_deprecated(use_instead="states") # type: ignore[untyped-decorator] - def active_states(self) -> dict[TaskStateState, int]: - return self.states - def __repr__(self) -> str: return ( "<" diff --git a/distributed/tests/test_actor.py b/distributed/tests/test_actor.py index 2224295855..a26ace8734 100644 --- a/distributed/tests/test_actor.py +++ b/distributed/tests/test_actor.py @@ -546,7 +546,7 @@ def check(counter, blanks): assert result == 0 + 1 + 2 + 3 + 4 def check(dask_worker): - return len(dask_worker.data) + len(dask_worker.actors) + return len(dask_worker.data) + len(dask_worker.state.actors) start = time() while any(client.run(check).values()): @@ -630,7 +630,7 @@ def test_worker_actor_handle_is_weakref_sync(client): del counter def check(dask_worker): - return len(dask_worker.data) + len(dask_worker.actors) + return len(dask_worker.data) + len(dask_worker.state.actors) start = time() while any(client.run(check).values()): @@ -651,7 +651,7 @@ def test_worker_actor_handle_is_weakref_from_compute_sync(client): final.compute(actors=counter, optimize_graph=False) def worker_tasks_running(dask_worker): - return len(dask_worker.data) + len(dask_worker.actors) + return len(dask_worker.data) + len(dask_worker.state.actors) start = time() while any(client.run(worker_tasks_running).values()): diff --git a/distributed/tests/test_resources.py b/distributed/tests/test_resources.py index 2bf7ca1868..42bc377eef 100644 --- a/distributed/tests/test_resources.py +++ b/distributed/tests/test_resources.py @@ -463,40 +463,6 @@ async def test_full_collections(c, s, a, b): assert not b.state.log -@pytest.mark.parametrize( - "optimize_graph", - [ - pytest.param( - True, - marks=pytest.mark.xfail( - reason="don't track resources through optimization" - ), - ), - False, - ], -) -def test_collections_get(client, optimize_graph, s, a, b): - pytest.importorskip("numpy") - da = pytest.importorskip("dask.array") - - async def f(dask_worker): - await dask_worker.set_resources(**{"A": 1}) - - client.run(f, workers=[a["address"]]) - - with dask.annotate(resources={"A": 1}): - x = da.random.random(100, chunks=(10,)) + 1 - - x.compute(optimize_graph=optimize_graph) - - def g(dask_worker): - return len(dask_worker.log) - - logs = client.run(g) - assert logs[a["address"]] - assert not logs[b["address"]] - - @gen_cluster(config={"distributed.worker.resources.my_resources": 1}, client=True) async def test_resources_from_config(c, s, a, b): info = c.scheduler_info() diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index e2f714e1c6..95e1ba90f1 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -2930,13 +2930,9 @@ async def test_task_group_and_prefix_statistics(c, s, a, b, no_time_resync): assert tg.prefix is tp assert tp.groups == {tg} - with pytest.warns(FutureWarning, match="active"): - assert tp.groups == tp.active # these must be true since in this simple case there is a 1to1 mapping # between prefix and group assert tg.states == tp.states - with pytest.warns(FutureWarning, match="active_states"): - assert tp.states == tp.active_states assert tg.duration == tp.duration assert tg.all_durations == tp.all_durations assert tg.nbytes_total == tp.nbytes_total @@ -2952,13 +2948,9 @@ async def test_task_group_and_prefix_statistics(c, s, a, b, no_time_resync): tp = s.task_prefixes["add"] assert tg.prefix is tp assert tp.groups == {tg} - with pytest.warns(FutureWarning, match="active"): - assert tp.groups == tp.active # these must be true since in this simple case there is a 1to1 mapping # between prefix and group assert tg.states == tp.states - with pytest.warns(FutureWarning, match="active_states"): - assert tp.states == tp.active_states assert tg.duration == tp.duration assert tg.all_durations == tp.all_durations assert tg.nbytes_total == tp.nbytes_total @@ -2980,8 +2972,6 @@ async def test_task_group_and_prefix_statistics(c, s, a, b, no_time_resync): assert tg.prefix is tp assert tp.groups == {tg} - with pytest.warns(FutureWarning, match="active"): - assert tp.groups == tp.active assert tg.states["forgotten"] == 4 assert tg.states["released"] == 1 assert sum(tg.states.values()) == 5 @@ -2989,8 +2979,6 @@ async def test_task_group_and_prefix_statistics(c, s, a, b, no_time_resync): assert len(tp) == 5 assert tg.states == tp.states - with pytest.warns(FutureWarning, match="active_states"): - assert tp.states == tp.active_states assert tg.duration == tp.duration assert tg.all_durations == tp.all_durations assert tg.nbytes_total == tp.nbytes_total @@ -3016,11 +3004,7 @@ async def test_task_group_and_prefix_statistics(c, s, a, b, no_time_resync): # these must be zero because we remove fully-forgotten task groups # from the prefixes assert tp.groups == set() - with pytest.warns(FutureWarning, match="active"): - assert tp.groups == tp.active assert all(count == 0 for count in tp.states.values()) - with pytest.warns(FutureWarning, match="active_states"): - assert tp.states == tp.active_states assert len(tp) == 0 assert tp.duration == 0 assert tp.nbytes_total == 0 @@ -3457,11 +3441,6 @@ def test_memorystate(): assert m.unmanaged_recent == 17 assert m.optimistic == 83 - with pytest.warns(FutureWarning): - assert m.managed_spilled == m.spilled - with pytest.warns(FutureWarning): - assert m.managed_in_memory == m.managed - assert repr(m) == dedent(""" Process memory (RSS) : 100 B - managed by Dask : 68 B diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index acca442341..9a253a9e6a 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -11,7 +11,6 @@ import tempfile import threading import traceback -import warnings import weakref from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor from concurrent.futures.process import BrokenProcessPool @@ -3483,22 +3482,6 @@ async def close(): await asyncio.gather(block(), close(), set_future()) -@gen_cluster(nthreads=[]) -async def test_reconnect_argument_deprecated(s): - with pytest.deprecated_call(match="`reconnect` argument"): - async with Worker(s.address, reconnect=False): - pass - with pytest.raises(ValueError, match="reconnect=True"): - async with Worker(s.address, reconnect=True): - pass - - with warnings.catch_warnings(): - # No argument should not warn or raise - warnings.simplefilter("error") - async with Worker(s.address): - pass - - @gen_cluster(client=True, nthreads=[]) async def test_worker_running_before_running_plugins(c, s, caplog): class InitWorkerNewThread(WorkerPlugin): @@ -3570,23 +3553,6 @@ async def test_execute_preamble_abort_retirement(c, s): assert await y == 2 -@gen_cluster() -async def test_deprecation_of_renamed_worker_attributes(s, a, b): - msg = ( - "The `Worker.outgoing_count` attribute has been renamed to " - "`Worker.transfer_outgoing_count_total`" - ) - with pytest.warns(DeprecationWarning, match=msg): - assert a.outgoing_count == a.transfer_outgoing_count_total - - msg = ( - "The `Worker.outgoing_current_count` attribute has been renamed to " - "`Worker.transfer_outgoing_count`" - ) - with pytest.warns(DeprecationWarning, match=msg): - assert a.outgoing_current_count == a.transfer_outgoing_count - - @gen_cluster(client=True, Worker=Nanny) async def test_forward_output(c, s, a, b, capsys): def print_stdout(*args, **kwargs): diff --git a/distributed/tests/test_worker_memory.py b/distributed/tests/test_worker_memory.py index 96589a168a..44e19b142d 100644 --- a/distributed/tests/test_worker_memory.py +++ b/distributed/tests/test_worker_memory.py @@ -1047,50 +1047,6 @@ def __reduce__(self): assert not any(v for k, v in c.items() if k >= 2.0), dict(c) -@pytest.mark.parametrize( - "cls,name,value", - [ - (Worker, "memory_limit", 123e9), - (Worker, "memory_target_fraction", 0.789), - (Worker, "memory_spill_fraction", 0.789), - (Worker, "memory_pause_fraction", 0.789), - (Nanny, "memory_limit", 123e9), - (Nanny, "memory_terminate_fraction", 0.789), - ], -) -@gen_cluster(nthreads=[]) -async def test_deprecated_attributes(s, cls, name, value): - async with cls(s.address) as a: - with pytest.warns(FutureWarning, match=name): - setattr(a, name, value) - with pytest.warns(FutureWarning, match=name): - assert getattr(a, name) == value - assert getattr(a.memory_manager, name) == value - - -@gen_cluster(nthreads=[("", 1)]) -async def test_deprecated_memory_monitor_method_worker(s, a): - with pytest.warns(FutureWarning, match="memory_monitor"): - await a.memory_monitor() - - -@gen_cluster(nthreads=[("", 1)], Worker=Nanny) -async def test_deprecated_memory_monitor_method_nanny(s, a): - with pytest.warns(FutureWarning, match="memory_monitor"): - a.memory_monitor() - - -@pytest.mark.parametrize( - "name", - ["memory_target_fraction", "memory_spill_fraction", "memory_pause_fraction"], -) -@gen_cluster(nthreads=[]) -async def test_deprecated_params(s, name): - with pytest.warns(FutureWarning, match=name): - async with Worker(s.address, **{name: 0.789}) as a: - assert getattr(a.memory_manager, name) == 0.789 - - @gen_cluster(config={"distributed.worker.memory.monitor-interval": "10ms"}) async def test_pause_while_idle(s, a, b): sa = s.workers[a.address] diff --git a/distributed/tests/test_worker_state_machine.py b/distributed/tests/test_worker_state_machine.py index 145c211fc0..e426055a41 100644 --- a/distributed/tests/test_worker_state_machine.py +++ b/distributed/tests/test_worker_state_machine.py @@ -1018,34 +1018,6 @@ async def get_data(self, comm, *args, **kwargs): await wait_for_state("y", "missing", a) -@gen_cluster() -async def test_deprecated_worker_attributes(s, a, b): - n = a.state.generation - msg = ( - "The `Worker.generation` attribute has been moved to " - "`Worker.state.generation`" - ) - with pytest.warns(FutureWarning, match=msg): - assert a.generation == n - with pytest.warns(FutureWarning, match=msg): - a.generation -= 1 - assert a.generation == n - 1 - assert a.state.generation == n - 1 - - # Old and new names differ - msg = ( - "The `Worker.in_flight_tasks` attribute has been moved to " - "`Worker.state.in_flight_tasks_count`" - ) - with pytest.warns(FutureWarning, match=msg): - assert a.in_flight_tasks == 0 - - with pytest.warns(FutureWarning, match="attribute has been removed"): - assert a.data_needed == set() - with pytest.warns(FutureWarning, match="attribute has been removed"): - assert a.waiting_for_data_count == 0 - - @pytest.mark.parametrize("n_remote_workers", [1, 2]) @pytest.mark.parametrize( "nbytes,n_in_flight_per_worker", diff --git a/distributed/worker.py b/distributed/worker.py index 79f24b22da..a460bf3fc3 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -113,18 +113,12 @@ ) from distributed.utils_comm import gather_from_workers, retry_operation from distributed.versions import get_versions -from distributed.worker_memory import ( - DeprecatedMemoryManagerAttribute, - DeprecatedMemoryMonitor, - WorkerDataParameter, - WorkerMemoryManager, -) +from distributed.worker_memory import WorkerDataParameter, WorkerMemoryManager from distributed.worker_state_machine import ( AcquireReplicasEvent, BaseWorker, CancelComputeEvent, ComputeTaskEvent, - DeprecatedWorkerStateAttribute, ExecuteFailureEvent, ExecuteSuccessEvent, FindMissingEvent, @@ -496,7 +490,6 @@ def __init__( local_directory: str | None = None, services: dict | None = None, name: Any | None = None, - reconnect: bool | None = None, executor: Executor | dict[str, Executor] | Literal["offload"] | None = None, resources: dict[str, float] | None = None, silence_logs: int | None = None, @@ -533,31 +526,12 @@ def __init__( # Allow overriding the dict-like that stores the task outputs. # This is meant for power users only. See WorkerMemoryManager for details. data: WorkerDataParameter = None, - # Deprecated parameters; please use dask config instead. - memory_target_fraction: float | Literal[False] | None = None, - memory_spill_fraction: float | Literal[False] | None = None, - memory_pause_fraction: float | Literal[False] | None = None, ################################### # Parameters to Server scheduler_sni: str | None = None, WorkerStateClass: type = WorkerState, **kwargs, ): - if reconnect is not None: - if reconnect: - raise ValueError( - "The `reconnect=True` option for `Worker` has been removed. " - "To improve cluster stability, workers now always shut down in the face of network disconnects. " - "For details, or if this is an issue for you, see https://github.com/dask/distributed/issues/6350." - ) - else: - warnings.warn( - "The `reconnect` argument to `Worker` is deprecated, and will be removed in a future release. " - "Worker reconnection is now always disabled, so passing `reconnect=False` is unnecessary. " - "See https://github.com/dask/distributed/issues/6350 for details.", - DeprecationWarning, - stacklevel=2, - ) if loop is not None: warnings.warn( "The `loop` argument to `Worker` is ignored, and will be removed in a future release. " @@ -773,13 +747,7 @@ def __init__( self._protocol = protocol self.memory_manager = WorkerMemoryManager( - self, - data=data, - nthreads=nthreads, - memory_limit=memory_limit, - memory_target_fraction=memory_target_fraction, - memory_spill_fraction=memory_spill_fraction, - memory_pause_fraction=memory_pause_fraction, + self, data=data, nthreads=nthreads, memory_limit=memory_limit ) transfer_incoming_bytes_limit = math.inf @@ -889,74 +857,6 @@ def data(self) -> MutableMapping[Key, object]: """ return self.memory_manager.data - # Deprecated attributes moved to self.memory_manager. - memory_limit = DeprecatedMemoryManagerAttribute() - memory_target_fraction = DeprecatedMemoryManagerAttribute() - memory_spill_fraction = DeprecatedMemoryManagerAttribute() - memory_pause_fraction = DeprecatedMemoryManagerAttribute() - memory_monitor = DeprecatedMemoryMonitor() - - ########################### - # State machine accessors # - ########################### - - # Deprecated attributes moved to self.state. - actors = DeprecatedWorkerStateAttribute() - available_resources = DeprecatedWorkerStateAttribute() - busy_workers = DeprecatedWorkerStateAttribute() - comm_nbytes = DeprecatedWorkerStateAttribute(target="transfer_incoming_bytes") - comm_threshold_bytes = DeprecatedWorkerStateAttribute( - target="transfer_incoming_bytes_throttle_threshold" - ) - constrained = DeprecatedWorkerStateAttribute() - data_needed_per_worker = DeprecatedWorkerStateAttribute(target="data_needed") - executed_count = DeprecatedWorkerStateAttribute() - executing_count = DeprecatedWorkerStateAttribute() - generation = DeprecatedWorkerStateAttribute() - has_what = DeprecatedWorkerStateAttribute() - incoming_count = DeprecatedWorkerStateAttribute( - target="transfer_incoming_count_total" - ) - in_flight_tasks = DeprecatedWorkerStateAttribute(target="in_flight_tasks_count") - in_flight_workers = DeprecatedWorkerStateAttribute() - log = DeprecatedWorkerStateAttribute() - long_running = DeprecatedWorkerStateAttribute() - nthreads = DeprecatedWorkerStateAttribute() - stimulus_log = DeprecatedWorkerStateAttribute() - stimulus_story = DeprecatedWorkerStateAttribute() - story = DeprecatedWorkerStateAttribute() - ready = DeprecatedWorkerStateAttribute() - tasks = DeprecatedWorkerStateAttribute() - target_message_size = DeprecatedWorkerStateAttribute( - target="transfer_message_bytes_limit" - ) - total_out_connections = DeprecatedWorkerStateAttribute( - target="transfer_incoming_count_limit" - ) - total_resources = DeprecatedWorkerStateAttribute() - transition_counter = DeprecatedWorkerStateAttribute() - transition_counter_max = DeprecatedWorkerStateAttribute() - validate = DeprecatedWorkerStateAttribute() - validate_task = DeprecatedWorkerStateAttribute() - - @property - def data_needed(self) -> set[TaskState]: - warnings.warn( - "The `Worker.data_needed` attribute has been removed; " - "use `Worker.state.data_needed[address]`", - FutureWarning, - ) - return {ts for tss in self.state.data_needed.values() for ts in tss} - - @property - def waiting_for_data_count(self) -> int: - warnings.warn( - "The `Worker.waiting_for_data_count` attribute has been removed; " - "use `len(Worker.state.waiting)`", - FutureWarning, - ) - return len(self.state.waiting) - ################## # Administrative # ################## @@ -2674,56 +2574,6 @@ def validate_state(self) -> None: raise - @property - def incoming_transfer_log(self): - warnings.warn( - "The `Worker.incoming_transfer_log` attribute has been renamed to " - "`Worker.transfer_incoming_log`", - DeprecationWarning, - stacklevel=2, - ) - return self.transfer_incoming_log - - @property - def outgoing_count(self): - warnings.warn( - "The `Worker.outgoing_count` attribute has been renamed to " - "`Worker.transfer_outgoing_count_total`", - DeprecationWarning, - stacklevel=2, - ) - return self.transfer_outgoing_count_total - - @property - def outgoing_current_count(self): - warnings.warn( - "The `Worker.outgoing_current_count` attribute has been renamed to " - "`Worker.transfer_outgoing_count`", - DeprecationWarning, - stacklevel=2, - ) - return self.transfer_outgoing_count - - @property - def outgoing_transfer_log(self): - warnings.warn( - "The `Worker.outgoing_transfer_log` attribute has been renamed to " - "`Worker.transfer_outgoing_log`", - DeprecationWarning, - stacklevel=2, - ) - return self.transfer_outgoing_log - - @property - def total_in_connections(self): - warnings.warn( - "The `Worker.total_in_connections` attribute has been renamed to " - "`Worker.transfer_outgoing_count_limit`", - DeprecationWarning, - stacklevel=2, - ) - return self.transfer_outgoing_count_limit - _worker_cvar: contextvars.ContextVar[Worker] = contextvars.ContextVar("_worker_cvar") diff --git a/distributed/worker_memory.py b/distributed/worker_memory.py index 4f0c75d499..f004c82e1c 100644 --- a/distributed/worker_memory.py +++ b/distributed/worker_memory.py @@ -25,7 +25,6 @@ import logging import os import sys -import warnings from collections.abc import Callable, Container, Hashable, MutableMapping from contextlib import suppress from functools import partial @@ -108,30 +107,15 @@ def __init__( # This should be None most of the times, short of a power user replacing the # SpillBuffer with their own custom dict-like data: WorkerDataParameter = None, - # Deprecated parameters; use dask.config instead - memory_target_fraction: float | Literal[False] | None = None, - memory_spill_fraction: float | Literal[False] | None = None, - memory_pause_fraction: float | Literal[False] | None = None, ): self.memory_limit = parse_memory_limit( memory_limit, nthreads, logger=worker_logger ) - self.memory_target_fraction = _parse_threshold( - "distributed.worker.memory.target", - "memory_target_fraction", - memory_target_fraction, + self.memory_target_fraction = dask.config.get( + "distributed.worker.memory.target" ) - self.memory_spill_fraction = _parse_threshold( - "distributed.worker.memory.spill", - "memory_spill_fraction", - memory_spill_fraction, - ) - self.memory_pause_fraction = _parse_threshold( - "distributed.worker.memory.pause", - "memory_pause_fraction", - memory_pause_fraction, - ) - + self.memory_spill_fraction = dask.config.get("distributed.worker.memory.spill") + self.memory_pause_fraction = dask.config.get("distributed.worker.memory.pause") max_spill = dask.config.get("distributed.worker.memory.max-spill") self.max_spill = False if max_spill is False else parse_bytes(max_spill) @@ -497,54 +481,3 @@ def parse_memory_limit( return system.MEMORY_LIMIT else: return memory_limit - - -def _parse_threshold( - config_key: str, - deprecated_param_name: str, - deprecated_param_value: float | Literal[False] | None, -) -> float | Literal[False]: - if deprecated_param_value is not None: - warnings.warn( - f"Parameter {deprecated_param_name} has been deprecated and will be " - f"removed in a future version; please use dask config key {config_key} " - "instead", - FutureWarning, - ) - return deprecated_param_value - return dask.config.get(config_key) - - -def _warn_deprecated(w: Nanny | Worker, name: str) -> None: - warnings.warn( - f"The `{type(w).__name__}.{name}` attribute has been moved to " - f"`{type(w).__name__}.memory_manager.{name}", - FutureWarning, - ) - - -class DeprecatedMemoryManagerAttribute: - name: str - - def __set_name__(self, owner: type, name: str) -> None: - self.name = name - - def __get__(self, instance: Nanny | Worker | None, owner: type) -> Any: - if instance is None: - # This is triggered by Sphinx - return None # pragma: nocover - _warn_deprecated(instance, self.name) - return getattr(instance.memory_manager, self.name) - - def __set__(self, instance: Nanny | Worker, value: Any) -> None: - _warn_deprecated(instance, self.name) - setattr(instance.memory_manager, self.name, value) - - -class DeprecatedMemoryMonitor: - def __get__(self, instance: Nanny | Worker | None, owner: type) -> Any: - if instance is None: - # This is triggered by Sphinx - return None # pragma: nocover - _warn_deprecated(instance, "memory_monitor") - return partial(instance.memory_manager.memory_monitor, instance) # type: ignore diff --git a/distributed/worker_state_machine.py b/distributed/worker_state_machine.py index f0264ec368..84dbb8a0df 100644 --- a/distributed/worker_state_machine.py +++ b/distributed/worker_state_machine.py @@ -7,7 +7,6 @@ import math import operator import random -import warnings import weakref from collections import Counter, defaultdict, deque from collections.abc import ( @@ -57,7 +56,6 @@ # Circular imports from distributed.diagnostics.plugin import WorkerPlugin from distributed.scheduler import T_runspec - from distributed.worker import Worker # Not to be confused with distributed.scheduler.TaskStateState TaskStateState: TypeAlias = Literal[ @@ -3922,32 +3920,3 @@ def transitions(self, prev_states: dict[TaskState, TaskStateState]) -> None: # will remain in released state and never transition to anything else. self._current_count[ts.prefix, ts.state] += 1 self._new_tasks.clear() - - -class DeprecatedWorkerStateAttribute: - name: str - target: str | None - - def __init__(self, target: str | None = None): - self.target = target - - def __set_name__(self, owner: type, name: str) -> None: - self.name = name - - def _warn_deprecated(self) -> None: - warnings.warn( - f"The `Worker.{self.name}` attribute has been moved to " - f"`Worker.state.{self.target or self.name}`", - FutureWarning, - ) - - def __get__(self, instance: Worker | None, owner: type[Worker]) -> Any: - if instance is None: - # This is triggered by Sphinx - return None # pragma: nocover - self._warn_deprecated() - return getattr(instance.state, self.target or self.name) - - def __set__(self, instance: Worker, value: Any) -> None: - self._warn_deprecated() - setattr(instance.state, self.target or self.name, value)