diff --git a/distributed/diagnostics/tests/test_scheduler_plugin.py b/distributed/diagnostics/tests/test_scheduler_plugin.py index d5205242912..8d65ee0002c 100644 --- a/distributed/diagnostics/tests/test_scheduler_plugin.py +++ b/distributed/diagnostics/tests/test_scheduler_plugin.py @@ -83,70 +83,6 @@ def remove_worker(self, worker, scheduler, *, stimulus_id, **kwargs): assert events == [] -@gen_cluster(nthreads=[]) -async def test_remove_worker_renamed_kwargs_allowed(s): - events = [] - - class MyPlugin(SchedulerPlugin): - name = "MyPlugin" - - def remove_worker(self, worker, scheduler, **kwds): - assert scheduler is s - events.append(("remove_worker", worker)) - - plugin = MyPlugin() - s.add_plugin(plugin) - assert events == [] - - a = Worker(s.address) - await a - await a.close() - - assert events == [ - ("remove_worker", a.address), - ] - - events[:] = [] - s.remove_plugin(plugin.name) - async with Worker(s.address): - pass - assert events == [] - - -@gen_cluster(nthreads=[]) -async def test_remove_worker_without_kwargs_deprecated(s): - events = [] - - class DeprecatedPlugin(SchedulerPlugin): - name = "DeprecatedPlugin" - - def remove_worker(self, worker, scheduler): - assert scheduler is s - events.append(("remove_worker", worker)) - - plugin = DeprecatedPlugin() - with pytest.warns( - FutureWarning, - match="The signature of `SchedulerPlugin.remove_worker` now requires `\\*\\*kwargs`", - ): - s.add_plugin(plugin) - assert events == [] - - a = Worker(s.address) - await a - await a.close() - - assert events == [ - ("remove_worker", a.address), - ] - - events[:] = [] - s.remove_plugin(plugin.name) - async with Worker(s.address): - pass - assert events == [] - - @gen_cluster(nthreads=[]) async def test_async_add_remove_worker(s): events = [] diff --git a/distributed/scheduler.py b/distributed/scheduler.py index b7c5aabacb7..528f6c1c6cf 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -68,7 +68,6 @@ from dask.typing import Key, no_default from dask.utils import ( _deprecated, - _deprecated_kwarg, format_bytes, format_time, key_split, @@ -5596,7 +5595,6 @@ def close_worker(self, worker: str) -> None: self.log_event(worker, {"action": "close-worker"}) self.worker_send(worker, {"op": "close", "reason": "scheduler-close-worker"}) - @_deprecated_kwarg("safe", "expected") @log_errors async def remove_worker( self, @@ -5742,19 +5740,9 @@ async def remove_worker( awaitables = [] for plugin in list(self.plugins.values()): try: - try: - result = plugin.remove_worker( - scheduler=self, worker=address, stimulus_id=stimulus_id - ) - except TypeError: - parameters = inspect.signature(plugin.remove_worker).parameters - if "stimulus_id" not in parameters and not any( - p.kind is p.VAR_KEYWORD for p in parameters.values() - ): - # Deprecated (see add_plugin) - result = plugin.remove_worker(scheduler=self, worker=address) # type: ignore - else: - raise + result = plugin.remove_worker( + scheduler=self, worker=address, stimulus_id=stimulus_id + ) if inspect.isawaitable(result): awaitables.append(result) except Exception as e: @@ -6263,15 +6251,6 @@ def add_plugin( category=UserWarning, ) - parameters = inspect.signature(plugin.remove_worker).parameters - if not any(p.kind is p.VAR_KEYWORD for p in parameters.values()): - warnings.warn( - "The signature of `SchedulerPlugin.remove_worker` now requires `**kwargs` " - "to ensure that plugins remain forward-compatible. Not including " - "`**kwargs` in the signature will no longer be supported in future versions.", - FutureWarning, - ) - self.plugins[name] = plugin def remove_plugin(self, name: str | None = None) -> None: diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index e2f714e1c68..2813f057395 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -828,13 +828,6 @@ async def test_remove_worker_from_scheduler(c, s, a, b): await c.gather(futs) -@gen_cluster(client=True) -async def test_remove_worker_from_scheduler_warns_on_safe(c, s, a, b): - with pytest.warns(FutureWarning, match="expected"): - await s.remove_worker(address=a.address, safe=True, stimulus_id="test") - assert a.address not in s.workers - - @gen_cluster() async def test_remove_worker_by_name_from_scheduler(s, a, b): assert a.address in s.stream_comms