From 19ddd2e9410a9b9644863692b12c332df60c6868 Mon Sep 17 00:00:00 2001 From: Raymond Hettinger Date: Wed, 22 Apr 2026 17:37:24 -0500 Subject: [PATCH 1/7] Issue-124397: Add free-threading support for iterators. --- Doc/library/threading.rst | 135 +++++++++++++++++++++++++++++++++++ Lib/test/test_threading.py | 139 +++++++++++++++++++++++++++++++++++++ Lib/threading.py | 107 ++++++++++++++++++++++++++++ 3 files changed, 381 insertions(+) diff --git a/Doc/library/threading.rst b/Doc/library/threading.rst index 19cc4f191dff8d..8c5031924c43a5 100644 --- a/Doc/library/threading.rst +++ b/Doc/library/threading.rst @@ -1436,3 +1436,138 @@ is equivalent to:: Currently, :class:`Lock`, :class:`RLock`, :class:`Condition`, :class:`Semaphore`, and :class:`BoundedSemaphore` objects may be used as :keyword:`with` statement context managers. + + +Iterator synchronization +------------------------ + +By default, Python iterators do not support concurrent use. Most iterators make +no guarantees when accessed simultaneously from multiple threads. Generator +iterators, for example, raise :exc:`ValueError` if one of their iterator methods +is called while the generator is already executing. The tools in this section +allow reliable concurrency support to be added to ordinary iterators and +iterator-producing callables. + +Use :class:`serialize` when multiple threads should share a single iterator and +take turns consuming from it. While one thread is running ``__next__()``, the +others block until the iterator becomes available. Each value produced by the +underlying iterator is delivered to exactly one caller. + +Use :func:`concurrent_tee` when multiple threads should each receive the full +stream of values from one underlying iterator. It creates independent iterators +that all draw from the same source. Values are buffered until every derived +iterator has received them. + +.. class:: serialize(iterable) + + Return an iterator wrapper that serializes concurrent calls to + :meth:`~iterator.__next__` using a lock. + + This makes it possible to share a single iterator, including a generator + iterator, between multiple threads. Calls are handled one at a time in + arrival order determined by lock acquisition. No values are duplicated or + skipped by the wrapper itself; each item produced by the underlying iterator + is returned to exactly one caller. + + This wrapper does not copy or buffer values. Threads that call + :func:`next` while another thread is already advancing the iterator will + block until the active call completes. + + Example:: + + import threading + + def count(): + for i in range(5): + yield i + + it = threading.serialize(count()) + + def worker(): + for item in it: + print(threading.current_thread().name, item) + + threads = [threading.Thread(target=worker) for _ in range(2)] + for thread in threads: + thread.start() + for thread in threads: + thread.join() + + In this example, each number is printed exactly once, but the work is shared + between the two threads. + +.. function:: synchronized(func) + + Wrap an iterator-producing callable so that each iterator it returns is + automatically passed through :class:`serialize`. + + This is especially useful as a decorator for generator functions that may be + consumed from multiple threads. + + Example:: + + import threading + + @threading.synchronized + def counter(): + i = 0 + while True: + yield i + i += 1 + + it = counter() + + def worker(): + for _ in range(3): + print(next(it)) + + threads = [threading.Thread(target=worker) for _ in range(2)] + for thread in threads: + thread.start() + for thread in threads: + thread.join() + + The returned wrapper preserves the metadata of *func*, such as its name and + wrapped function reference. + +.. function:: concurrent_tee(iterable, n=2) + + Return *n* independent iterators from a single input *iterable*, with + guaranteed behavior when the derived iterators are consumed concurrently. + + This function is similar to :func:`itertools.tee`, but is intended for cases + where the source iterator may feed consumers running in different threads. + Each returned iterator yields every value from the underlying iterable, in + the same order. + + Internally, values are buffered until every derived iterator has consumed + them. As a result, if one consumer falls far behind the others, the buffer + may grow without bound. + + The returned iterators share the same underlying synchronization lock. Each + individual derived iterator is intended to be consumed by one thread at a + time. If a single derived iterator must itself be shared by multiple + threads, wrap it with :class:`serialize`. + + If *n* is ``0``, return an empty tuple. If *n* is negative, raise + :exc:`ValueError`. + + Example:: + + import threading + + source = range(5) + left, right = threading.concurrent_tee(source) + + def consume(name, iterable): + for item in iterable: + print(name, item) + + t1 = threading.Thread(target=consume, args=("left", left)) + t2 = threading.Thread(target=consume, args=("right", right)) + t1.start() + t2.start() + t1.join() + t2.join() + + Here, both threads see the full sequence ``0`` through ``4``. diff --git a/Lib/test/test_threading.py b/Lib/test/test_threading.py index 0ca91ce0d7899d..44d0dfadccb12b 100644 --- a/Lib/test/test_threading.py +++ b/Lib/test/test_threading.py @@ -2368,6 +2368,145 @@ class BarrierTests(lock_tests.BarrierTests): barriertype = staticmethod(threading.Barrier) +## Test Synchronization tools for iterators ################ + +class ThreadingIteratorToolsTests(BaseTestCase): + def test_serialize_serializes_concurrent_iteration(self): + limit = 10_000 + workers_count = 10 + result = 0 + result_lock = threading.Lock() + start = threading.Event() + + def producer(limit): + for x in range(limit): + yield x + + def consumer(iterator): + nonlocal result + start.wait() + total = 0 + for x in iterator: + total += x + with result_lock: + result += total + + iterator = threading.serialize(producer(limit)) + workers = [ + threading.Thread(target=consumer, args=(iterator,)) + for _ in range(workers_count) + ] + with threading_helper.wait_threads_exit(): + for worker in workers: + worker.start() + start.set() + for worker in workers: + worker.join() + + self.assertEqual(result, limit * (limit - 1) // 2) + + def test_synchronized_serializes_generator_instances(self): + unique = 10 + repetitions = 5 + limit = 100 + start = threading.Event() + + @threading.synchronized + def atomic_counter(): + # The sleep widens the race window that would exist without + # synchronization between yielding a value and advancing state. + i = 0 + while True: + yield i + time.sleep(0.0005) + i += 1 + + def consumer(counter): + start.wait() + for _ in range(limit): + next(counter) + + unique_counters = [atomic_counter() for _ in range(unique)] + counters = unique_counters * repetitions + workers = [ + threading.Thread(target=consumer, args=(counter,)) + for counter in counters + ] + with threading_helper.wait_threads_exit(): + for worker in workers: + worker.start() + start.set() + for worker in workers: + worker.join() + + self.assertEqual( + {next(counter) for counter in unique_counters}, + {limit * repetitions}, + ) + + def test_synchronized_preserves_wrapped_metadata(self): + def gen(): + yield 1 + + wrapped = threading.synchronized(gen) + + self.assertEqual(wrapped.__name__, gen.__name__) + self.assertIs(wrapped.__wrapped__, gen) + self.assertEqual(list(wrapped()), [1]) + + def test_concurrent_tee_supports_concurrent_consumers(self): + limit = 5_000 + num_threads = 25 + successes = 0 + failures = [] + result_lock = threading.Lock() + start = threading.Event() + expected = list(range(limit)) + + def producer(limit): + for x in range(limit): + yield x + + def consumer(iterator): + nonlocal successes + start.wait() + items = list(iterator) + with result_lock: + if items == expected: + successes += 1 + else: + failures.append(items[:20]) + + tees = threading.concurrent_tee(producer(limit), n=num_threads) + workers = [ + threading.Thread(target=consumer, args=(iterator,)) + for iterator in tees + ] + with threading_helper.wait_threads_exit(): + for worker in workers: + worker.start() + start.set() + for worker in workers: + worker.join() + + self.assertEqual(failures, []) + self.assertEqual(successes, len(tees)) + + # Verify that locks are shared + self.assertEqual(len({id(t_obj.lock) for t_obj in tees}), 1) + + def test_concurrent_tee_zero_iterators(self): + self.assertEqual(threading.concurrent_tee(range(10), n=0), ()) + + def test_concurrent_tee_negative_n(self): + with self.assertRaises(ValueError): + threading.concurrent_tee(range(10), n=-1) + + +################# + + + class MiscTestCase(unittest.TestCase): def test__all__(self): restore_default_excepthook(self) diff --git a/Lib/threading.py b/Lib/threading.py index 4ebceae7029870..13e2871314c3f5 100644 --- a/Lib/threading.py +++ b/Lib/threading.py @@ -7,6 +7,7 @@ from time import monotonic as _time from _weakrefset import WeakSet +lazy from functools import wraps from itertools import count as _count try: from _collections import deque as _deque @@ -29,6 +30,7 @@ 'Barrier', 'BrokenBarrierError', 'Timer', 'ThreadError', 'setprofile', 'settrace', 'local', 'stack_size', 'excepthook', 'ExceptHookArgs', 'gettrace', 'getprofile', + 'serialize', 'synchronized', 'concurrent_tee', 'setprofile_all_threads','settrace_all_threads'] # Rename some stuff so "from threading import *" is safe @@ -842,6 +844,111 @@ class BrokenBarrierError(RuntimeError): pass +## Synchronization tools for iterators ##################### + +class serialize: + """Wrap a non-concurrent iterator with a lock to enforce sequential access. + + Applies a non-reentrant lock around calls to __next__, allowing + iterator and generator instances to be shared by multiple consumer + threads. + """ + + __slots__ = ('iterator', 'lock') + + def __init__(self, iterable): + self.iterator = iter(iterable) + self.lock = Lock() + + def __iter__(self): + return self + + def __next__(self): + with self.lock: + return next(self.iterator) + + +def synchronized(func): + """Wrap an iterator-returning callable to make its iterators thread-safe. + + Existing itertools and more-itertools can be wrapped so that their + iterator instances are serialized. + + For example, itertools.count does not make thread-safe instances, + but that is easily fixed with: + + atomic_counter = synchronized(itertools.count) + + Can also be used as a decorator for generator functions definitions + so that the generator instances are serialized:: + + @synchronized + def enumerate_and_timestamp(iterable): + for count, value in enumerate(iterable): + yield count, time_ns(), value + + """ + + @wraps(func) + def inner(*args, **kwargs): + iterator = func(*args, **kwargs) + return serialize(iterator) + + return inner + + +def concurrent_tee(iterable, n=2): + """Variant of itertools.tee() but with guaranteed threading semantics. + + Takes a non-threadsafe iterator as an input and creates concurrent + tee objects for other threads to have reliable independent copies of + the data stream. + + The new iterators are only thread-safe if consumed within a single thread. + To share just one of the new iterators across multiple threads, wrap it + with threading.serialize(). + """ + + if n < 0: + raise ValueError + if n == 0: + return () + iterator = _concurrent_tee(iterable) + result = [iterator] + for _ in range(n - 1): + result.append(_concurrent_tee(iterator)) + return tuple(result) + + +class _concurrent_tee: + __slots__ = ('iterator', 'link', 'lock') + + def __init__(self, iterable): + if isinstance(iterable, _concurrent_tee): + self.iterator = iterable.iterator + self.link = iterable.link + self.lock = iterable.lock + else: + self.iterator = iter(iterable) + self.link = [None, None] + self.lock = Lock() + + def __iter__(self): + return self + + def __next__(self): + link = self.link + if link[1] is None: + with self.lock: + if link[1] is None: + link[0] = next(self.iterator) + link[1] = [None, None] + value, self.link = link + return value + +############################################################ + + # Helper to generate new thread names _counter = _count(1).__next__ def _newname(name_template): From 4aa242d95ba25b9121bb07437f33b134f7e44aaf Mon Sep 17 00:00:00 2001 From: Raymond Hettinger Date: Wed, 22 Apr 2026 20:49:57 -0500 Subject: [PATCH 2/7] Add blurb --- .../next/Library/2026-04-22-20-49-49.gh-issue-124397.plMglV.rst | 2 ++ 1 file changed, 2 insertions(+) create mode 100644 Misc/NEWS.d/next/Library/2026-04-22-20-49-49.gh-issue-124397.plMglV.rst diff --git a/Misc/NEWS.d/next/Library/2026-04-22-20-49-49.gh-issue-124397.plMglV.rst b/Misc/NEWS.d/next/Library/2026-04-22-20-49-49.gh-issue-124397.plMglV.rst new file mode 100644 index 00000000000000..8e102b77bd4782 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2026-04-22-20-49-49.gh-issue-124397.plMglV.rst @@ -0,0 +1,2 @@ +The threading module added tooling to support concurrent iterator access: +:class:`serialize`, :func:`synchronized`, and :func:`concurrent_tee`. From e0c44be81c2ee8a90b635b25f1aced4a8918ac00 Mon Sep 17 00:00:00 2001 From: Raymond Hettinger Date: Wed, 22 Apr 2026 20:59:50 -0500 Subject: [PATCH 3/7] More wordsmithing --- Doc/library/threading.rst | 23 +++++++++++------------ 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/Doc/library/threading.rst b/Doc/library/threading.rst index 8c5031924c43a5..25071c4a03e317 100644 --- a/Doc/library/threading.rst +++ b/Doc/library/threading.rst @@ -1441,22 +1441,22 @@ Currently, :class:`Lock`, :class:`RLock`, :class:`Condition`, Iterator synchronization ------------------------ -By default, Python iterators do not support concurrent use. Most iterators make +By default, Python iterators do not support concurrent access. Most iterators make no guarantees when accessed simultaneously from multiple threads. Generator iterators, for example, raise :exc:`ValueError` if one of their iterator methods is called while the generator is already executing. The tools in this section allow reliable concurrency support to be added to ordinary iterators and iterator-producing callables. -Use :class:`serialize` when multiple threads should share a single iterator and +The :class:`serialize` wrapper lets multiple threads share a single iterator and take turns consuming from it. While one thread is running ``__next__()``, the others block until the iterator becomes available. Each value produced by the underlying iterator is delivered to exactly one caller. -Use :func:`concurrent_tee` when multiple threads should each receive the full +The :func:`concurrent_tee` function lets multiple threads each receive the full stream of values from one underlying iterator. It creates independent iterators -that all draw from the same source. Values are buffered until every derived -iterator has received them. +that all draw from the same source. Values are buffered until consumed by all +of the derived iterators. .. class:: serialize(iterable) @@ -1466,8 +1466,8 @@ iterator has received them. This makes it possible to share a single iterator, including a generator iterator, between multiple threads. Calls are handled one at a time in arrival order determined by lock acquisition. No values are duplicated or - skipped by the wrapper itself; each item produced by the underlying iterator - is returned to exactly one caller. + skipped by the wrapper itself. Each item produced by the underlying iterator + is given to exactly one caller. This wrapper does not copy or buffer values. Threads that call :func:`next` while another thread is already advancing the iterator will @@ -1501,8 +1501,8 @@ iterator has received them. Wrap an iterator-producing callable so that each iterator it returns is automatically passed through :class:`serialize`. - This is especially useful as a decorator for generator functions that may be - consumed from multiple threads. + This is especially useful as a :term:`decorator` for generator functions, + allowing their generator-iterators to be consumed from multiple threads. Example:: @@ -1541,8 +1541,7 @@ iterator has received them. the same order. Internally, values are buffered until every derived iterator has consumed - them. As a result, if one consumer falls far behind the others, the buffer - may grow without bound. + them. The returned iterators share the same underlying synchronization lock. Each individual derived iterator is intended to be consumed by one thread at a @@ -1570,4 +1569,4 @@ iterator has received them. t1.join() t2.join() - Here, both threads see the full sequence ``0`` through ``4``. + In this example, both consumer threads see the full sequence ``0`` through ``4``. From adcb718b3341e7dbaad9d1b620ea6f85b13d6320 Mon Sep 17 00:00:00 2001 From: Raymond Hettinger Date: Wed, 22 Apr 2026 21:26:15 -0500 Subject: [PATCH 4/7] Clarify use of the lock. Add message to the ValueError. --- Doc/library/threading.rst | 7 +++---- Lib/threading.py | 2 +- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/Doc/library/threading.rst b/Doc/library/threading.rst index 25071c4a03e317..39d33f21c9f10b 100644 --- a/Doc/library/threading.rst +++ b/Doc/library/threading.rst @@ -1464,10 +1464,9 @@ of the derived iterators. :meth:`~iterator.__next__` using a lock. This makes it possible to share a single iterator, including a generator - iterator, between multiple threads. Calls are handled one at a time in - arrival order determined by lock acquisition. No values are duplicated or - skipped by the wrapper itself. Each item produced by the underlying iterator - is given to exactly one caller. + iterator, between multiple threads. A lock assures that calls are handled + one at a time. No values are duplicated or skipped by the wrapper itself. + Each item from the underlying iterator is given to exactly one caller. This wrapper does not copy or buffer values. Threads that call :func:`next` while another thread is already advancing the iterator will diff --git a/Lib/threading.py b/Lib/threading.py index 13e2871314c3f5..388c906861ec77 100644 --- a/Lib/threading.py +++ b/Lib/threading.py @@ -910,7 +910,7 @@ def concurrent_tee(iterable, n=2): """ if n < 0: - raise ValueError + raise ValueError("n must be positive integer") if n == 0: return () iterator = _concurrent_tee(iterable) From 4c2bad02703139ede74767d5c5994f4d2787f6d1 Mon Sep 17 00:00:00 2001 From: Raymond Hettinger Date: Wed, 22 Apr 2026 21:32:56 -0500 Subject: [PATCH 5/7] Include "threading" in the reference --- .../Library/2026-04-22-20-49-49.gh-issue-124397.plMglV.rst | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/Misc/NEWS.d/next/Library/2026-04-22-20-49-49.gh-issue-124397.plMglV.rst b/Misc/NEWS.d/next/Library/2026-04-22-20-49-49.gh-issue-124397.plMglV.rst index 8e102b77bd4782..ef917bcb450e19 100644 --- a/Misc/NEWS.d/next/Library/2026-04-22-20-49-49.gh-issue-124397.plMglV.rst +++ b/Misc/NEWS.d/next/Library/2026-04-22-20-49-49.gh-issue-124397.plMglV.rst @@ -1,2 +1,3 @@ The threading module added tooling to support concurrent iterator access: -:class:`serialize`, :func:`synchronized`, and :func:`concurrent_tee`. +:class:`threading.serialize`, :func:`threading.synchronized`, +and :func:`threading.concurrent_tee`. From d9dde84c6368aa4635428a88b5a67735a1450669 Mon Sep 17 00:00:00 2001 From: Raymond Hettinger Date: Wed, 22 Apr 2026 22:53:43 -0500 Subject: [PATCH 6/7] Support send(), throw(), and close() for generators. --- Doc/library/threading.rst | 4 ++- Lib/test/test_threading.py | 71 ++++++++++++++++++++++++++++++++++++++ Lib/threading.py | 28 +++++++++++++-- 3 files changed, 100 insertions(+), 3 deletions(-) diff --git a/Doc/library/threading.rst b/Doc/library/threading.rst index 39d33f21c9f10b..77422e789b218e 100644 --- a/Doc/library/threading.rst +++ b/Doc/library/threading.rst @@ -1461,7 +1461,9 @@ of the derived iterators. .. class:: serialize(iterable) Return an iterator wrapper that serializes concurrent calls to - :meth:`~iterator.__next__` using a lock. + :meth:`~iterator.__next__` using a lock. For generators, will also + serialize calls to :meth:`~generator.send`, :meth:`~generator.throw`, + and :meth:`~generator.close`. This makes it possible to share a single iterator, including a generator iterator, between multiple threads. A lock assures that calls are handled diff --git a/Lib/test/test_threading.py b/Lib/test/test_threading.py index 44d0dfadccb12b..84c9e7bf0625c5 100644 --- a/Lib/test/test_threading.py +++ b/Lib/test/test_threading.py @@ -2405,6 +2405,77 @@ def consumer(iterator): self.assertEqual(result, limit * (limit - 1) // 2) + def test_serialize_generator_methods(self): + # A generator that yields and receives + def echo(): + try: + while True: + val = yield "ready" + yield f"received {val}" + except ValueError: + yield "caught" + + it = threading.serialize(echo()) + + # Test __next__ + self.assertEqual(next(it), "ready") + + # Test send() + self.assertEqual(it.send("hello"), "received hello") + self.assertEqual(next(it), "ready") + + # Test throw() + self.assertEqual(it.throw(ValueError), "caught") + + # Test close() + it.close() + with self.assertRaises(StopIteration): + next(it) + + def test_serialize_methods_attribute_error(self): + # A standard iterator that does not have send/throw/close + # should raise AttributeError when called. + standard_it = threading.serialize([1, 2, 3]) + + with self.assertRaises(AttributeError): + standard_it.send("foo") + + with self.assertRaises(AttributeError): + standard_it.throw(ValueError) + + with self.assertRaises(AttributeError): + standard_it.close() + + def test_serialize_generator_methods_locking(self): + # Verifies that generator methods also acquire the lock. + # We can test this by checking if the lock is held during the call. + + class LockCheckingGenerator: + def __init__(self, lock): + self.lock = lock + def __iter__(self): + return self + def send(self, value): + if not self.lock.locked(): + raise RuntimeError("Lock not held during send()") + return value + def throw(self, *args): + if not self.lock.locked(): + raise RuntimeError("Lock not held during throw()") + def close(self): + if not self.lock.locked(): + raise RuntimeError("Lock not held during close()") + + # Manually create the serialize object to inspect the lock + it = threading.serialize([]) + mock_gen = LockCheckingGenerator(it.lock) + it.iterator = mock_gen + + # These should not raise RuntimeError + it.send(1) + it.throw(ValueError) + it.close() + def test_synchronized_serializes_generator_instances(self): unique = 10 repetitions = 5 diff --git a/Lib/threading.py b/Lib/threading.py index 388c906861ec77..fc1a59a489798a 100644 --- a/Lib/threading.py +++ b/Lib/threading.py @@ -849,8 +849,8 @@ class BrokenBarrierError(RuntimeError): class serialize: """Wrap a non-concurrent iterator with a lock to enforce sequential access. - Applies a non-reentrant lock around calls to __next__, allowing - iterator and generator instances to be shared by multiple consumer + Applies a non-reentrant lock around calls to __next__, send, throw, and close. + Allows iterator and generator instances to be shared by multiple consumer threads. """ @@ -867,6 +867,30 @@ def __next__(self): with self.lock: return next(self.iterator) + def send(self, value, /): + """Send a value to a generator. + + Raises AttributeError if not a generator. + """ + with self.lock: + return self.iterator.send(value) + + def throw(self, *args): + """Call throw() on a generator. + + Raises AttributeError if not a generator. + """ + with self.lock: + return self.iterator.throw(*args) + + def close(self): + """Call close() on a generator. + + Raises AttributeError if not a generator. + """ + with self.lock: + return self.iterator.close() + def synchronized(func): """Wrap an iterator-returning callable to make its iterators thread-safe. From fcb9ee8d58b87a33e04b8f370538aca4f9f7cc97 Mon Sep 17 00:00:00 2001 From: Raymond Hettinger Date: Wed, 22 Apr 2026 23:27:27 -0500 Subject: [PATCH 7/7] Tweak wording. Add doctest. --- Doc/library/threading.rst | 22 +++++++++++++++------- Lib/threading.py | 11 ++++++++--- 2 files changed, 23 insertions(+), 10 deletions(-) diff --git a/Doc/library/threading.rst b/Doc/library/threading.rst index 77422e789b218e..c2eb1390f64898 100644 --- a/Doc/library/threading.rst +++ b/Doc/library/threading.rst @@ -1461,9 +1461,11 @@ of the derived iterators. .. class:: serialize(iterable) Return an iterator wrapper that serializes concurrent calls to - :meth:`~iterator.__next__` using a lock. For generators, will also - serialize calls to :meth:`~generator.send`, :meth:`~generator.throw`, - and :meth:`~generator.close`. + :meth:`~iterator.__next__` using a lock. + + If the wrapped iterator also defines :meth:`~generator.send`, + :meth:`~generator.throw`, or :meth:`~generator.close`, those calls + are serialized as well. This makes it possible to share a single iterator, including a generator iterator, between multiple threads. A lock assures that calls are handled @@ -1474,7 +1476,9 @@ of the derived iterators. :func:`next` while another thread is already advancing the iterator will block until the active call completes. - Example:: + Example: + + .. doctest:: import threading @@ -1505,7 +1509,9 @@ of the derived iterators. This is especially useful as a :term:`decorator` for generator functions, allowing their generator-iterators to be consumed from multiple threads. - Example:: + Example: + + .. doctest:: import threading @@ -1519,7 +1525,7 @@ of the derived iterators. it = counter() def worker(): - for _ in range(3): + for _ in range(5): print(next(it)) threads = [threading.Thread(target=worker) for _ in range(2)] @@ -1552,7 +1558,9 @@ of the derived iterators. If *n* is ``0``, return an empty tuple. If *n* is negative, raise :exc:`ValueError`. - Example:: + Example: + + .. doctest:: import threading diff --git a/Lib/threading.py b/Lib/threading.py index fc1a59a489798a..c36a5c7befed8f 100644 --- a/Lib/threading.py +++ b/Lib/threading.py @@ -849,7 +849,10 @@ class BrokenBarrierError(RuntimeError): class serialize: """Wrap a non-concurrent iterator with a lock to enforce sequential access. - Applies a non-reentrant lock around calls to __next__, send, throw, and close. + Applies a non-reentrant lock around calls to __next__. If the + wrapped iterator also defines send(), throw(), or close(), those + calls are serialized as well. + Allows iterator and generator instances to be shared by multiple consumer threads. """ @@ -906,10 +909,12 @@ def synchronized(func): Can also be used as a decorator for generator functions definitions so that the generator instances are serialized:: + import time + @synchronized def enumerate_and_timestamp(iterable): for count, value in enumerate(iterable): - yield count, time_ns(), value + yield count, time.time_ns(), value """ @@ -934,7 +939,7 @@ def concurrent_tee(iterable, n=2): """ if n < 0: - raise ValueError("n must be positive integer") + raise ValueError("n must be a non-negative integer") if n == 0: return () iterator = _concurrent_tee(iterable)