diff --git a/Doc/library/threading.rst b/Doc/library/threading.rst index 19cc4f191dff8d..c2eb1390f64898 100644 --- a/Doc/library/threading.rst +++ b/Doc/library/threading.rst @@ -1436,3 +1436,146 @@ is equivalent to:: Currently, :class:`Lock`, :class:`RLock`, :class:`Condition`, :class:`Semaphore`, and :class:`BoundedSemaphore` objects may be used as :keyword:`with` statement context managers. + + +Iterator synchronization +------------------------ + +By default, Python iterators do not support concurrent access. Most iterators make +no guarantees when accessed simultaneously from multiple threads. Generator +iterators, for example, raise :exc:`ValueError` if one of their iterator methods +is called while the generator is already executing. The tools in this section +allow reliable concurrency support to be added to ordinary iterators and +iterator-producing callables. + +The :class:`serialize` wrapper lets multiple threads share a single iterator and +take turns consuming from it. While one thread is running ``__next__()``, the +others block until the iterator becomes available. Each value produced by the +underlying iterator is delivered to exactly one caller. + +The :func:`concurrent_tee` function lets multiple threads each receive the full +stream of values from one underlying iterator. It creates independent iterators +that all draw from the same source. Values are buffered until consumed by all +of the derived iterators. + +.. class:: serialize(iterable) + + Return an iterator wrapper that serializes concurrent calls to + :meth:`~iterator.__next__` using a lock. + + If the wrapped iterator also defines :meth:`~generator.send`, + :meth:`~generator.throw`, or :meth:`~generator.close`, those calls + are serialized as well. + + This makes it possible to share a single iterator, including a generator + iterator, between multiple threads. A lock assures that calls are handled + one at a time. No values are duplicated or skipped by the wrapper itself. + Each item from the underlying iterator is given to exactly one caller. + + This wrapper does not copy or buffer values. Threads that call + :func:`next` while another thread is already advancing the iterator will + block until the active call completes. + + Example: + + .. doctest:: + + import threading + + def count(): + for i in range(5): + yield i + + it = threading.serialize(count()) + + def worker(): + for item in it: + print(threading.current_thread().name, item) + + threads = [threading.Thread(target=worker) for _ in range(2)] + for thread in threads: + thread.start() + for thread in threads: + thread.join() + + In this example, each number is printed exactly once, but the work is shared + between the two threads. + +.. function:: synchronized(func) + + Wrap an iterator-producing callable so that each iterator it returns is + automatically passed through :class:`serialize`. + + This is especially useful as a :term:`decorator` for generator functions, + allowing their generator-iterators to be consumed from multiple threads. + + Example: + + .. doctest:: + + import threading + + @threading.synchronized + def counter(): + i = 0 + while True: + yield i + i += 1 + + it = counter() + + def worker(): + for _ in range(5): + print(next(it)) + + threads = [threading.Thread(target=worker) for _ in range(2)] + for thread in threads: + thread.start() + for thread in threads: + thread.join() + + The returned wrapper preserves the metadata of *func*, such as its name and + wrapped function reference. + +.. function:: concurrent_tee(iterable, n=2) + + Return *n* independent iterators from a single input *iterable*, with + guaranteed behavior when the derived iterators are consumed concurrently. + + This function is similar to :func:`itertools.tee`, but is intended for cases + where the source iterator may feed consumers running in different threads. + Each returned iterator yields every value from the underlying iterable, in + the same order. + + Internally, values are buffered until every derived iterator has consumed + them. + + The returned iterators share the same underlying synchronization lock. Each + individual derived iterator is intended to be consumed by one thread at a + time. If a single derived iterator must itself be shared by multiple + threads, wrap it with :class:`serialize`. + + If *n* is ``0``, return an empty tuple. If *n* is negative, raise + :exc:`ValueError`. + + Example: + + .. doctest:: + + import threading + + source = range(5) + left, right = threading.concurrent_tee(source) + + def consume(name, iterable): + for item in iterable: + print(name, item) + + t1 = threading.Thread(target=consume, args=("left", left)) + t2 = threading.Thread(target=consume, args=("right", right)) + t1.start() + t2.start() + t1.join() + t2.join() + + In this example, both consumer threads see the full sequence ``0`` through ``4``. diff --git a/Lib/test/test_threading.py b/Lib/test/test_threading.py index 0ca91ce0d7899d..84c9e7bf0625c5 100644 --- a/Lib/test/test_threading.py +++ b/Lib/test/test_threading.py @@ -2368,6 +2368,216 @@ class BarrierTests(lock_tests.BarrierTests): barriertype = staticmethod(threading.Barrier) +## Test Synchronization tools for iterators ################ + +class ThreadingIteratorToolsTests(BaseTestCase): + def test_serialize_serializes_concurrent_iteration(self): + limit = 10_000 + workers_count = 10 + result = 0 + result_lock = threading.Lock() + start = threading.Event() + + def producer(limit): + for x in range(limit): + yield x + + def consumer(iterator): + nonlocal result + start.wait() + total = 0 + for x in iterator: + total += x + with result_lock: + result += total + + iterator = threading.serialize(producer(limit)) + workers = [ + threading.Thread(target=consumer, args=(iterator,)) + for _ in range(workers_count) + ] + with threading_helper.wait_threads_exit(): + for worker in workers: + worker.start() + start.set() + for worker in workers: + worker.join() + + self.assertEqual(result, limit * (limit - 1) // 2) + + def test_serialize_generator_methods(self): + # A generator that yields and receives + def echo(): + try: + while True: + val = yield "ready" + yield f"received {val}" + except ValueError: + yield "caught" + + it = threading.serialize(echo()) + + # Test __next__ + self.assertEqual(next(it), "ready") + + # Test send() + self.assertEqual(it.send("hello"), "received hello") + self.assertEqual(next(it), "ready") + + # Test throw() + self.assertEqual(it.throw(ValueError), "caught") + + # Test close() + it.close() + with self.assertRaises(StopIteration): + next(it) + + def test_serialize_methods_attribute_error(self): + # A standard iterator that does not have send/throw/close + # should raise AttributeError when called. + standard_it = threading.serialize([1, 2, 3]) + + with self.assertRaises(AttributeError): + standard_it.send("foo") + + with self.assertRaises(AttributeError): + standard_it.throw(ValueError) + + with self.assertRaises(AttributeError): + standard_it.close() + + def test_serialize_generator_methods_locking(self): + # Verifies that generator methods also acquire the lock. + # We can test this by checking if the lock is held during the call. + + class LockCheckingGenerator: + def __init__(self, lock): + self.lock = lock + def __iter__(self): + return self + def send(self, value): + if not self.lock.locked(): + raise RuntimeError("Lock not held during send()") + return value + def throw(self, *args): + if not self.lock.locked(): + raise RuntimeError("Lock not held during throw()") + def close(self): + if not self.lock.locked(): + raise RuntimeError("Lock not held during close()") + + # Manually create the serialize object to inspect the lock + it = threading.serialize([]) + mock_gen = LockCheckingGenerator(it.lock) + it.iterator = mock_gen + + # These should not raise RuntimeError + it.send(1) + it.throw(ValueError) + it.close() + + def test_synchronized_serializes_generator_instances(self): + unique = 10 + repetitions = 5 + limit = 100 + start = threading.Event() + + @threading.synchronized + def atomic_counter(): + # The sleep widens the race window that would exist without + # synchronization between yielding a value and advancing state. + i = 0 + while True: + yield i + time.sleep(0.0005) + i += 1 + + def consumer(counter): + start.wait() + for _ in range(limit): + next(counter) + + unique_counters = [atomic_counter() for _ in range(unique)] + counters = unique_counters * repetitions + workers = [ + threading.Thread(target=consumer, args=(counter,)) + for counter in counters + ] + with threading_helper.wait_threads_exit(): + for worker in workers: + worker.start() + start.set() + for worker in workers: + worker.join() + + self.assertEqual( + {next(counter) for counter in unique_counters}, + {limit * repetitions}, + ) + + def test_synchronized_preserves_wrapped_metadata(self): + def gen(): + yield 1 + + wrapped = threading.synchronized(gen) + + self.assertEqual(wrapped.__name__, gen.__name__) + self.assertIs(wrapped.__wrapped__, gen) + self.assertEqual(list(wrapped()), [1]) + + def test_concurrent_tee_supports_concurrent_consumers(self): + limit = 5_000 + num_threads = 25 + successes = 0 + failures = [] + result_lock = threading.Lock() + start = threading.Event() + expected = list(range(limit)) + + def producer(limit): + for x in range(limit): + yield x + + def consumer(iterator): + nonlocal successes + start.wait() + items = list(iterator) + with result_lock: + if items == expected: + successes += 1 + else: + failures.append(items[:20]) + + tees = threading.concurrent_tee(producer(limit), n=num_threads) + workers = [ + threading.Thread(target=consumer, args=(iterator,)) + for iterator in tees + ] + with threading_helper.wait_threads_exit(): + for worker in workers: + worker.start() + start.set() + for worker in workers: + worker.join() + + self.assertEqual(failures, []) + self.assertEqual(successes, len(tees)) + + # Verify that locks are shared + self.assertEqual(len({id(t_obj.lock) for t_obj in tees}), 1) + + def test_concurrent_tee_zero_iterators(self): + self.assertEqual(threading.concurrent_tee(range(10), n=0), ()) + + def test_concurrent_tee_negative_n(self): + with self.assertRaises(ValueError): + threading.concurrent_tee(range(10), n=-1) + + +################# + + + class MiscTestCase(unittest.TestCase): def test__all__(self): restore_default_excepthook(self) diff --git a/Lib/threading.py b/Lib/threading.py index 4ebceae7029870..c36a5c7befed8f 100644 --- a/Lib/threading.py +++ b/Lib/threading.py @@ -7,6 +7,7 @@ from time import monotonic as _time from _weakrefset import WeakSet +lazy from functools import wraps from itertools import count as _count try: from _collections import deque as _deque @@ -29,6 +30,7 @@ 'Barrier', 'BrokenBarrierError', 'Timer', 'ThreadError', 'setprofile', 'settrace', 'local', 'stack_size', 'excepthook', 'ExceptHookArgs', 'gettrace', 'getprofile', + 'serialize', 'synchronized', 'concurrent_tee', 'setprofile_all_threads','settrace_all_threads'] # Rename some stuff so "from threading import *" is safe @@ -842,6 +844,140 @@ class BrokenBarrierError(RuntimeError): pass +## Synchronization tools for iterators ##################### + +class serialize: + """Wrap a non-concurrent iterator with a lock to enforce sequential access. + + Applies a non-reentrant lock around calls to __next__. If the + wrapped iterator also defines send(), throw(), or close(), those + calls are serialized as well. + + Allows iterator and generator instances to be shared by multiple consumer + threads. + """ + + __slots__ = ('iterator', 'lock') + + def __init__(self, iterable): + self.iterator = iter(iterable) + self.lock = Lock() + + def __iter__(self): + return self + + def __next__(self): + with self.lock: + return next(self.iterator) + + def send(self, value, /): + """Send a value to a generator. + + Raises AttributeError if not a generator. + """ + with self.lock: + return self.iterator.send(value) + + def throw(self, *args): + """Call throw() on a generator. + + Raises AttributeError if not a generator. + """ + with self.lock: + return self.iterator.throw(*args) + + def close(self): + """Call close() on a generator. + + Raises AttributeError if not a generator. + """ + with self.lock: + return self.iterator.close() + + +def synchronized(func): + """Wrap an iterator-returning callable to make its iterators thread-safe. + + Existing itertools and more-itertools can be wrapped so that their + iterator instances are serialized. + + For example, itertools.count does not make thread-safe instances, + but that is easily fixed with: + + atomic_counter = synchronized(itertools.count) + + Can also be used as a decorator for generator functions definitions + so that the generator instances are serialized:: + + import time + + @synchronized + def enumerate_and_timestamp(iterable): + for count, value in enumerate(iterable): + yield count, time.time_ns(), value + + """ + + @wraps(func) + def inner(*args, **kwargs): + iterator = func(*args, **kwargs) + return serialize(iterator) + + return inner + + +def concurrent_tee(iterable, n=2): + """Variant of itertools.tee() but with guaranteed threading semantics. + + Takes a non-threadsafe iterator as an input and creates concurrent + tee objects for other threads to have reliable independent copies of + the data stream. + + The new iterators are only thread-safe if consumed within a single thread. + To share just one of the new iterators across multiple threads, wrap it + with threading.serialize(). + """ + + if n < 0: + raise ValueError("n must be a non-negative integer") + if n == 0: + return () + iterator = _concurrent_tee(iterable) + result = [iterator] + for _ in range(n - 1): + result.append(_concurrent_tee(iterator)) + return tuple(result) + + +class _concurrent_tee: + __slots__ = ('iterator', 'link', 'lock') + + def __init__(self, iterable): + if isinstance(iterable, _concurrent_tee): + self.iterator = iterable.iterator + self.link = iterable.link + self.lock = iterable.lock + else: + self.iterator = iter(iterable) + self.link = [None, None] + self.lock = Lock() + + def __iter__(self): + return self + + def __next__(self): + link = self.link + if link[1] is None: + with self.lock: + if link[1] is None: + link[0] = next(self.iterator) + link[1] = [None, None] + value, self.link = link + return value + +############################################################ + + # Helper to generate new thread names _counter = _count(1).__next__ def _newname(name_template): diff --git a/Misc/NEWS.d/next/Library/2026-04-22-20-49-49.gh-issue-124397.plMglV.rst b/Misc/NEWS.d/next/Library/2026-04-22-20-49-49.gh-issue-124397.plMglV.rst new file mode 100644 index 00000000000000..ef917bcb450e19 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2026-04-22-20-49-49.gh-issue-124397.plMglV.rst @@ -0,0 +1,3 @@ +The threading module added tooling to support concurrent iterator access: +:class:`threading.serialize`, :func:`threading.synchronized`, +and :func:`threading.concurrent_tee`.