Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 143 additions & 0 deletions Doc/library/threading.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1436,3 +1436,146 @@ is equivalent to::
Currently, :class:`Lock`, :class:`RLock`, :class:`Condition`,
:class:`Semaphore`, and :class:`BoundedSemaphore` objects may be used as
:keyword:`with` statement context managers.


Iterator synchronization
------------------------

By default, Python iterators do not support concurrent access. Most iterators make
no guarantees when accessed simultaneously from multiple threads. Generator
iterators, for example, raise :exc:`ValueError` if one of their iterator methods
is called while the generator is already executing. The tools in this section
allow reliable concurrency support to be added to ordinary iterators and
iterator-producing callables.

The :class:`serialize` wrapper lets multiple threads share a single iterator and
take turns consuming from it. While one thread is running ``__next__()``, the
others block until the iterator becomes available. Each value produced by the
underlying iterator is delivered to exactly one caller.

The :func:`concurrent_tee` function lets multiple threads each receive the full
stream of values from one underlying iterator. It creates independent iterators
that all draw from the same source. Values are buffered until consumed by all
of the derived iterators.

.. class:: serialize(iterable)

Return an iterator wrapper that serializes concurrent calls to
:meth:`~iterator.__next__` using a lock.

If the wrapped iterator also defines :meth:`~generator.send`,
:meth:`~generator.throw`, or :meth:`~generator.close`, those calls
are serialized as well.

This makes it possible to share a single iterator, including a generator
iterator, between multiple threads. A lock assures that calls are handled
one at a time. No values are duplicated or skipped by the wrapper itself.
Each item from the underlying iterator is given to exactly one caller.

This wrapper does not copy or buffer values. Threads that call
:func:`next` while another thread is already advancing the iterator will
block until the active call completes.

Example:

.. doctest::

import threading

def count():
for i in range(5):
yield i

it = threading.serialize(count())

def worker():
for item in it:
print(threading.current_thread().name, item)

threads = [threading.Thread(target=worker) for _ in range(2)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()

In this example, each number is printed exactly once, but the work is shared
between the two threads.

.. function:: synchronized(func)

Wrap an iterator-producing callable so that each iterator it returns is
automatically passed through :class:`serialize`.

This is especially useful as a :term:`decorator` for generator functions,
allowing their generator-iterators to be consumed from multiple threads.

Example:

.. doctest::

import threading

@threading.synchronized
def counter():
i = 0
while True:
yield i
i += 1

it = counter()

def worker():
for _ in range(5):
print(next(it))

threads = [threading.Thread(target=worker) for _ in range(2)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()

The returned wrapper preserves the metadata of *func*, such as its name and
wrapped function reference.

.. function:: concurrent_tee(iterable, n=2)

Return *n* independent iterators from a single input *iterable*, with
guaranteed behavior when the derived iterators are consumed concurrently.

This function is similar to :func:`itertools.tee`, but is intended for cases
where the source iterator may feed consumers running in different threads.
Each returned iterator yields every value from the underlying iterable, in
the same order.

Internally, values are buffered until every derived iterator has consumed
them.

The returned iterators share the same underlying synchronization lock. Each
individual derived iterator is intended to be consumed by one thread at a
time. If a single derived iterator must itself be shared by multiple
threads, wrap it with :class:`serialize`.

If *n* is ``0``, return an empty tuple. If *n* is negative, raise
:exc:`ValueError`.

Example:

.. doctest::

import threading

source = range(5)
left, right = threading.concurrent_tee(source)

def consume(name, iterable):
for item in iterable:
print(name, item)

t1 = threading.Thread(target=consume, args=("left", left))
t2 = threading.Thread(target=consume, args=("right", right))
t1.start()
t2.start()
t1.join()
t2.join()

In this example, both consumer threads see the full sequence ``0`` through ``4``.
210 changes: 210 additions & 0 deletions Lib/test/test_threading.py
Original file line number Diff line number Diff line change
Expand Up @@ -2368,6 +2368,216 @@ class BarrierTests(lock_tests.BarrierTests):
barriertype = staticmethod(threading.Barrier)


## Test Synchronization tools for iterators ################

class ThreadingIteratorToolsTests(BaseTestCase):
def test_serialize_serializes_concurrent_iteration(self):
limit = 10_000
workers_count = 10
result = 0
result_lock = threading.Lock()
start = threading.Event()

def producer(limit):
for x in range(limit):
yield x

def consumer(iterator):
nonlocal result
start.wait()
total = 0
for x in iterator:
total += x
with result_lock:
result += total

iterator = threading.serialize(producer(limit))
workers = [
threading.Thread(target=consumer, args=(iterator,))
for _ in range(workers_count)
]
with threading_helper.wait_threads_exit():
for worker in workers:
worker.start()
start.set()
for worker in workers:
worker.join()

self.assertEqual(result, limit * (limit - 1) // 2)

def test_serialize_generator_methods(self):
# A generator that yields and receives
def echo():
try:
while True:
val = yield "ready"
yield f"received {val}"
except ValueError:
yield "caught"

it = threading.serialize(echo())

# Test __next__
self.assertEqual(next(it), "ready")

# Test send()
self.assertEqual(it.send("hello"), "received hello")
self.assertEqual(next(it), "ready")

# Test throw()
self.assertEqual(it.throw(ValueError), "caught")

# Test close()
it.close()
with self.assertRaises(StopIteration):
next(it)

def test_serialize_methods_attribute_error(self):
# A standard iterator that does not have send/throw/close
# should raise AttributeError when called.
standard_it = threading.serialize([1, 2, 3])

with self.assertRaises(AttributeError):
standard_it.send("foo")

with self.assertRaises(AttributeError):
standard_it.throw(ValueError)

with self.assertRaises(AttributeError):
standard_it.close()

def test_serialize_generator_methods_locking(self):
# Verifies that generator methods also acquire the lock.
# We can test this by checking if the lock is held during the call.

class LockCheckingGenerator:
def __init__(self, lock):
self.lock = lock
def __iter__(self):
return self
def send(self, value):
if not self.lock.locked():
raise RuntimeError("Lock not held during send()")
return value
def throw(self, *args):
if not self.lock.locked():
raise RuntimeError("Lock not held during throw()")
def close(self):
if not self.lock.locked():
raise RuntimeError("Lock not held during close()")

# Manually create the serialize object to inspect the lock
it = threading.serialize([])
mock_gen = LockCheckingGenerator(it.lock)
it.iterator = mock_gen

# These should not raise RuntimeError
it.send(1)
it.throw(ValueError)
it.close()

def test_synchronized_serializes_generator_instances(self):
unique = 10
repetitions = 5
limit = 100
start = threading.Event()

@threading.synchronized
def atomic_counter():
# The sleep widens the race window that would exist without
# synchronization between yielding a value and advancing state.
i = 0
while True:
yield i
time.sleep(0.0005)
i += 1

def consumer(counter):
start.wait()
for _ in range(limit):
next(counter)

unique_counters = [atomic_counter() for _ in range(unique)]
counters = unique_counters * repetitions
workers = [
threading.Thread(target=consumer, args=(counter,))
for counter in counters
]
with threading_helper.wait_threads_exit():
for worker in workers:
worker.start()
start.set()
for worker in workers:
worker.join()

self.assertEqual(
{next(counter) for counter in unique_counters},
{limit * repetitions},
)

def test_synchronized_preserves_wrapped_metadata(self):
def gen():
yield 1

wrapped = threading.synchronized(gen)

self.assertEqual(wrapped.__name__, gen.__name__)
self.assertIs(wrapped.__wrapped__, gen)
self.assertEqual(list(wrapped()), [1])

def test_concurrent_tee_supports_concurrent_consumers(self):
limit = 5_000
num_threads = 25
successes = 0
failures = []
result_lock = threading.Lock()
start = threading.Event()
expected = list(range(limit))

def producer(limit):
for x in range(limit):
yield x

def consumer(iterator):
nonlocal successes
start.wait()
items = list(iterator)
with result_lock:
if items == expected:
successes += 1
else:
failures.append(items[:20])

tees = threading.concurrent_tee(producer(limit), n=num_threads)
workers = [
threading.Thread(target=consumer, args=(iterator,))
for iterator in tees
]
with threading_helper.wait_threads_exit():
for worker in workers:
worker.start()
start.set()
for worker in workers:
worker.join()

self.assertEqual(failures, [])
self.assertEqual(successes, len(tees))

# Verify that locks are shared
self.assertEqual(len({id(t_obj.lock) for t_obj in tees}), 1)

def test_concurrent_tee_zero_iterators(self):
self.assertEqual(threading.concurrent_tee(range(10), n=0), ())

def test_concurrent_tee_negative_n(self):
with self.assertRaises(ValueError):
threading.concurrent_tee(range(10), n=-1)


#################



class MiscTestCase(unittest.TestCase):
def test__all__(self):
restore_default_excepthook(self)
Expand Down
Loading
Loading