-
-
Notifications
You must be signed in to change notification settings - Fork 34.5k
gh-124397: Add free-threading support for iterators. #148894
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
19ddd2e
4aa242d
e0c44be
adcb718
4c2bad0
d9dde84
fcb9ee8
314ec67
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1436,3 +1436,146 @@ is equivalent to:: | |
| Currently, :class:`Lock`, :class:`RLock`, :class:`Condition`, | ||
| :class:`Semaphore`, and :class:`BoundedSemaphore` objects may be used as | ||
| :keyword:`with` statement context managers. | ||
|
|
||
|
|
||
| Iterator synchronization | ||
| ------------------------ | ||
|
|
||
| By default, Python iterators do not support concurrent access. Most iterators make | ||
| no guarantees when accessed simultaneously from multiple threads. Generator | ||
| iterators, for example, raise :exc:`ValueError` if one of their iterator methods | ||
| is called while the generator is already executing. The tools in this section | ||
| allow reliable concurrency support to be added to ordinary iterators and | ||
| iterator-producing callables. | ||
|
|
||
| The :class:`serialize` wrapper lets multiple threads share a single iterator and | ||
| take turns consuming from it. While one thread is running ``__next__()``, the | ||
| others block until the iterator becomes available. Each value produced by the | ||
| underlying iterator is delivered to exactly one caller. | ||
|
|
||
| The :func:`concurrent_tee` function lets multiple threads each receive the full | ||
| stream of values from one underlying iterator. It creates independent iterators | ||
| that all draw from the same source. Values are buffered until consumed by all | ||
| of the derived iterators. | ||
|
|
||
| .. class:: serialize(iterable) | ||
|
|
||
| Return an iterator wrapper that serializes concurrent calls to | ||
| :meth:`~iterator.__next__` using a lock. | ||
|
|
||
| If the wrapped iterator also defines :meth:`~generator.send`, | ||
| :meth:`~generator.throw`, or :meth:`~generator.close`, those calls | ||
| are serialized as well. | ||
|
|
||
| This makes it possible to share a single iterator, including a generator | ||
| iterator, between multiple threads. A lock assures that calls are handled | ||
| one at a time. No values are duplicated or skipped by the wrapper itself. | ||
| Each item from the underlying iterator is given to exactly one caller. | ||
|
|
||
| This wrapper does not copy or buffer values. Threads that call | ||
| :func:`next` while another thread is already advancing the iterator will | ||
| block until the active call completes. | ||
|
|
||
| Example: | ||
|
|
||
| .. doctest:: | ||
|
|
||
| import threading | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Needs >>> for doctest to run. Alternatively, use |
||
|
|
||
| def count(): | ||
| for i in range(5): | ||
| yield i | ||
|
|
||
| it = threading.serialize(count()) | ||
|
|
||
| def worker(): | ||
| for item in it: | ||
| print(threading.current_thread().name, item) | ||
|
|
||
| threads = [threading.Thread(target=worker) for _ in range(2)] | ||
| for thread in threads: | ||
| thread.start() | ||
| for thread in threads: | ||
| thread.join() | ||
|
|
||
| In this example, each number is printed exactly once, but the work is shared | ||
| between the two threads. | ||
|
|
||
| .. function:: synchronized(func) | ||
|
|
||
| Wrap an iterator-producing callable so that each iterator it returns is | ||
| automatically passed through :class:`serialize`. | ||
|
|
||
| This is especially useful as a :term:`decorator` for generator functions, | ||
| allowing their generator-iterators to be consumed from multiple threads. | ||
|
|
||
| Example: | ||
|
|
||
| .. doctest:: | ||
|
|
||
| import threading | ||
|
|
||
| @threading.synchronized | ||
| def counter(): | ||
| i = 0 | ||
| while True: | ||
| yield i | ||
| i += 1 | ||
|
|
||
| it = counter() | ||
|
|
||
| def worker(): | ||
| for _ in range(5): | ||
| print(next(it)) | ||
|
|
||
| threads = [threading.Thread(target=worker) for _ in range(2)] | ||
| for thread in threads: | ||
| thread.start() | ||
| for thread in threads: | ||
| thread.join() | ||
|
|
||
| The returned wrapper preserves the metadata of *func*, such as its name and | ||
| wrapped function reference. | ||
|
|
||
| .. function:: concurrent_tee(iterable, n=2) | ||
|
|
||
| Return *n* independent iterators from a single input *iterable*, with | ||
| guaranteed behavior when the derived iterators are consumed concurrently. | ||
|
|
||
| This function is similar to :func:`itertools.tee`, but is intended for cases | ||
| where the source iterator may feed consumers running in different threads. | ||
| Each returned iterator yields every value from the underlying iterable, in | ||
| the same order. | ||
|
|
||
| Internally, values are buffered until every derived iterator has consumed | ||
| them. | ||
|
|
||
| The returned iterators share the same underlying synchronization lock. Each | ||
| individual derived iterator is intended to be consumed by one thread at a | ||
| time. If a single derived iterator must itself be shared by multiple | ||
| threads, wrap it with :class:`serialize`. | ||
|
|
||
| If *n* is ``0``, return an empty tuple. If *n* is negative, raise | ||
| :exc:`ValueError`. | ||
|
|
||
| Example: | ||
|
|
||
| .. doctest:: | ||
|
|
||
| import threading | ||
|
|
||
| source = range(5) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This doesn't seem like a very compelling example because Maybe something like a simple generator would be more useful as an example? |
||
| left, right = threading.concurrent_tee(source) | ||
|
|
||
| def consume(name, iterable): | ||
| for item in iterable: | ||
| print(name, item) | ||
|
|
||
| t1 = threading.Thread(target=consume, args=("left", left)) | ||
| t2 = threading.Thread(target=consume, args=("right", right)) | ||
| t1.start() | ||
| t2.start() | ||
| t1.join() | ||
| t2.join() | ||
|
|
||
| In this example, both consumer threads see the full sequence ``0`` through ``4``. | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider adding a test that an iterator |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To bikeshed the name a topic https://discuss.python.org/t/name-for-new-itertools-object-to-make-iterators-thread-safe/90394 was created.
serializeis not my favorite, but on the topic there was no strong support for any of the other names either.