Skip to content

Commit ffaa0ea

Browse files
HunterAP23Akulisrittau
authored
Added initial rework of the concurrent.futures module (#5646)
* Added initial rework of the concurrent.futures module * Minor fixes * Fixed some generics & changed to collections over typing for some types * Switched thread to use queues instead of multiprocessing.queues * More fixes * More fixes following results from running tests locally * Tmp commit of changes * Minor flake8 fix * Fixing some issues * Fixed a weakref.ref issue * Fixed one more weakref issue * Fixed some issues with required version * Fixed more python min version requirements * More min version fixes * Fixed misc error in workflow regarded outdated pip * Replaced any usage of Optional and Union with proper form as described in the contributions guide * Fixed issue with using Callable definition * Fixed last seen issues as per review * Fixed some basic issues & more proper import calls * Update stdlib/concurrent/futures/process.pyi Co-authored-by: Sebastian Rittau <[email protected]> * Update stdlib/concurrent/futures/process.pyi Co-authored-by: Sebastian Rittau <[email protected]> * Minor fixes * More minor fixes * Fixed up some issues & cleaned up imports * Removed usage of Union * Changed wait method to use Set of Future to work with mypy-primer for Optuna repo * Reverted change to wait method and DoneAndNotDoneFutures class * Fixed DoneAndNotDoneFutures again Co-authored-by: Akuli <[email protected]> Co-authored-by: Sebastian Rittau <[email protected]>
1 parent 1af6810 commit ffaa0ea

File tree

4 files changed

+202
-26
lines changed

4 files changed

+202
-26
lines changed

.github/workflows/tests.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,8 @@ jobs:
121121
uses: actions/setup-python@v2
122122
with:
123123
python-version: ${{ matrix.python-version }}
124+
- name: Update pip
125+
run: python -m pip install -U pip
124126
- name: Install dependencies
125127
run: pip install $(grep mypy== requirements-tests-py3.txt)
126128
- name: Run stubtest

stdlib/concurrent/futures/_base.pyi

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,9 @@ import sys
22
import threading
33
from _typeshed import Self
44
from abc import abstractmethod
5+
from collections.abc import Container, Iterable, Iterator, Sequence, Set
56
from logging import Logger
6-
from typing import Any, Callable, Container, Generic, Iterable, Iterator, Protocol, Sequence, Set, TypeVar, overload
7+
from typing import Any, Callable, Generic, Protocol, TypeVar, overload
78

89
if sys.version_info >= (3, 9):
910
from types import GenericAlias
@@ -16,6 +17,8 @@ RUNNING: str
1617
CANCELLED: str
1718
CANCELLED_AND_NOTIFIED: str
1819
FINISHED: str
20+
_FUTURE_STATES: list[str]
21+
_STATE_TO_DESCRIPTION_MAP: dict[str, str]
1922
LOGGER: Logger
2023

2124
class Error(Exception): ...

stdlib/concurrent/futures/process.pyi

Lines changed: 148 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,133 @@
11
import sys
2-
from typing import Any, Callable, Tuple
2+
from collections.abc import Generator, Iterable, Mapping, MutableMapping, MutableSequence
3+
from multiprocessing.connection import Connection
4+
from multiprocessing.context import BaseContext, Process
5+
from multiprocessing.queues import Queue, SimpleQueue
6+
from threading import Lock, Semaphore, Thread
7+
from types import TracebackType
8+
from typing import Any, Callable, Generic, Tuple, TypeVar
9+
from weakref import ref
310

4-
from ._base import Executor
11+
from ._base import Executor, Future
512

6-
EXTRA_QUEUED_CALLS: Any
13+
_threads_wakeups: MutableMapping[Any, Any]
14+
_global_shutdown: bool
15+
16+
class _ThreadWakeup:
17+
_closed: bool
18+
_reader: Connection
19+
_writer: Connection
20+
def __init__(self) -> None: ...
21+
def close(self) -> None: ...
22+
def wakeup(self) -> None: ...
23+
def clear(self) -> None: ...
24+
25+
def _python_exit() -> None: ...
26+
27+
EXTRA_QUEUED_CALLS: int
28+
29+
_MAX_WINDOWS_WORKERS: int
30+
31+
class _RemoteTraceback(Exception):
32+
tb: str
33+
def __init__(self, tb: TracebackType) -> None: ...
34+
def __str__(self) -> str: ...
35+
36+
class _ExceptionWithTraceback:
37+
exc: BaseException
38+
tb: TracebackType
39+
def __init__(self, exc: BaseException, tb: TracebackType) -> None: ...
40+
def __reduce__(self) -> str | Tuple[Any, ...]: ...
41+
42+
def _rebuild_exc(exc: Exception, tb: str) -> Exception: ...
43+
44+
_S = TypeVar("_S")
45+
46+
class _WorkItem(Generic[_S]):
47+
future: Future[_S]
48+
fn: Callable[..., _S]
49+
args: Iterable[Any]
50+
kwargs: Mapping[str, Any]
51+
def __init__(self, future: Future[_S], fn: Callable[..., _S], args: Iterable[Any], kwargs: Mapping[str, Any]) -> None: ...
52+
53+
class _ResultItem:
54+
work_id: int
55+
exception: Exception
56+
result: Any
57+
def __init__(self, work_id: int, exception: Exception | None = ..., result: Any | None = ...) -> None: ...
58+
59+
class _CallItem:
60+
work_id: int
61+
fn: Callable[..., Any]
62+
args: Iterable[Any]
63+
kwargs: Mapping[str, Any]
64+
def __init__(self, work_id: int, fn: Callable[..., Any], args: Iterable[Any], kwargs: Mapping[str, Any]) -> None: ...
65+
66+
if sys.version_info >= (3, 7):
67+
class _SafeQueue(Queue[Future[Any]]):
68+
pending_work_items: dict[int, _WorkItem[Any]]
69+
shutdown_lock: Lock
70+
thread_wakeup: _ThreadWakeup
71+
if sys.version_info >= (3, 9):
72+
def __init__(
73+
self,
74+
max_size: int | None = ...,
75+
*,
76+
ctx: BaseContext,
77+
pending_work_items: dict[int, _WorkItem[Any]],
78+
shutdown_lock: Lock,
79+
thread_wakeup: _ThreadWakeup,
80+
) -> None: ...
81+
else:
82+
def __init__(
83+
self, max_size: int | None = ..., *, ctx: BaseContext, pending_work_items: dict[int, _WorkItem[Any]]
84+
) -> None: ...
85+
def _on_queue_feeder_error(self, e: Exception, obj: _CallItem) -> None: ...
86+
87+
def _get_chunks(*iterables: Any, chunksize: int) -> Generator[Tuple[Any, ...], None, None]: ...
88+
def _process_chunk(fn: Callable[..., Any], chunk: Tuple[Any, None, None]) -> Generator[Any, None, None]: ...
89+
def _sendback_result(
90+
result_queue: SimpleQueue[_WorkItem[Any]], work_id: int, result: Any | None = ..., exception: Exception | None = ...
91+
) -> None: ...
92+
93+
if sys.version_info >= (3, 7):
94+
def _process_worker(
95+
call_queue: Queue[_CallItem],
96+
result_queue: SimpleQueue[_ResultItem],
97+
initializer: Callable[..., None] | None,
98+
initargs: Tuple[Any, ...],
99+
) -> None: ...
100+
101+
else:
102+
def _process_worker(call_queue: Queue[_CallItem], result_queue: SimpleQueue[_ResultItem]) -> None: ...
103+
104+
if sys.version_info >= (3, 9):
105+
class _ExecutorManagerThread(Thread):
106+
thread_wakeup: _ThreadWakeup
107+
shutdown_lock: Lock
108+
executor_reference: ref[Any]
109+
processes: MutableMapping[int, Process]
110+
call_queue: Queue[_CallItem]
111+
result_queue: SimpleQueue[_ResultItem]
112+
work_ids_queue: Queue[int]
113+
pending_work_items: dict[int, _WorkItem[Any]]
114+
def __init__(self, executor: ProcessPoolExecutor) -> None: ...
115+
def run(self) -> None: ...
116+
def add_call_item_to_queue(self) -> None: ...
117+
def wait_result_broken_or_wakeup(self) -> tuple[Any, bool, str]: ...
118+
def process_result_item(self, result_item: int | _ResultItem) -> None: ...
119+
def is_shutting_down(self) -> bool: ...
120+
def terminate_broken(self, cause: str) -> None: ...
121+
def flag_executor_shutting_down(self) -> None: ...
122+
def shutdown_workers(self) -> None: ...
123+
def join_executor_internals(self) -> None: ...
124+
def get_n_children_alive(self) -> int: ...
125+
126+
_system_limits_checked: bool
127+
_system_limited: bool | None
128+
129+
def _check_system_limits() -> None: ...
130+
def _chain_from_iterable_of_lists(iterable: Iterable[MutableSequence[Any]]) -> Any: ...
7131

8132
if sys.version_info >= (3, 7):
9133
from ._base import BrokenExecutor
@@ -12,17 +136,32 @@ if sys.version_info >= (3, 7):
12136
else:
13137
class BrokenProcessPool(RuntimeError): ...
14138

15-
if sys.version_info >= (3, 7):
16-
from multiprocessing.context import BaseContext
17-
class ProcessPoolExecutor(Executor):
139+
class ProcessPoolExecutor(Executor):
140+
_mp_context: BaseContext | None = ...
141+
_initializer: Callable[..., None] | None = ...
142+
_initargs: Tuple[Any, ...] = ...
143+
_executor_manager_thread: _ThreadWakeup
144+
_processes: MutableMapping[int, Process]
145+
_shutdown_thread: bool
146+
_shutdown_lock: Lock
147+
_idle_worker_semaphore: Semaphore
148+
_broken: bool
149+
_queue_count: int
150+
_pending_work_items: dict[int, _WorkItem[Any]]
151+
_cancel_pending_futures: bool
152+
_executor_manager_thread_wakeup: _ThreadWakeup
153+
_result_queue: SimpleQueue[Any]
154+
_work_ids: Queue[Any]
155+
if sys.version_info >= (3, 7):
18156
def __init__(
19157
self,
20158
max_workers: int | None = ...,
21159
mp_context: BaseContext | None = ...,
22160
initializer: Callable[..., None] | None = ...,
23161
initargs: Tuple[Any, ...] = ...,
24162
) -> None: ...
25-
26-
else:
27-
class ProcessPoolExecutor(Executor):
163+
else:
28164
def __init__(self, max_workers: int | None = ...) -> None: ...
165+
if sys.version_info >= (3, 9):
166+
def _start_executor_manager_thread(self) -> None: ...
167+
def _adjust_process_count(self) -> None: ...

stdlib/concurrent/futures/thread.pyi

Lines changed: 48 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,62 @@
11
import queue
22
import sys
3-
from typing import Any, Callable, Generic, Iterable, Mapping, Tuple, TypeVar
3+
from collections.abc import Iterable, Mapping, Set
4+
from threading import Lock, Semaphore, Thread
5+
from typing import Any, Callable, Generic, Tuple, TypeVar
6+
from weakref import ref
47

58
from ._base import Executor, Future
69

7-
if sys.version_info >= (3, 7):
8-
from ._base import BrokenExecutor
9-
class BrokenThreadPool(BrokenExecutor): ...
10+
_threads_queues: Mapping[Any, Any]
11+
_shutdown: bool
12+
_global_shutdown_lock: Lock
13+
14+
def _python_exit() -> None: ...
1015

1116
if sys.version_info >= (3, 9):
1217
from types import GenericAlias
1318

1419
_S = TypeVar("_S")
1520

21+
class _WorkItem(Generic[_S]):
22+
future: Future[_S]
23+
fn: Callable[..., _S]
24+
args: Iterable[Any]
25+
kwargs: Mapping[str, Any]
26+
def __init__(self, future: Future[_S], fn: Callable[..., _S], args: Iterable[Any], kwargs: Mapping[str, Any]) -> None: ...
27+
def run(self) -> None: ...
28+
if sys.version_info >= (3, 9):
29+
def __class_getitem__(cls, item: Any) -> GenericAlias: ...
30+
31+
if sys.version_info >= (3, 7):
32+
def _worker(
33+
executor_reference: ref[Any],
34+
work_queue: queue.SimpleQueue[Any],
35+
initializer: Callable[..., None],
36+
initargs: Tuple[Any, ...],
37+
) -> None: ...
38+
39+
else:
40+
def _worker(executor_reference: ref[Any], work_queue: queue.Queue[Any]) -> None: ...
41+
42+
if sys.version_info >= (3, 7):
43+
from ._base import BrokenExecutor
44+
class BrokenThreadPool(BrokenExecutor): ...
45+
1646
class ThreadPoolExecutor(Executor):
47+
_max_workers: int
48+
_idle_semaphore: Semaphore
49+
_threads: Set[Thread]
50+
_broken: bool
51+
_shutdown: bool
52+
_shutdown_lock: Lock
53+
_thread_name_prefix: str | None = ...
54+
_initializer: Callable[..., None] | None = ...
55+
_initargs: Tuple[Any, ...] = ...
1756
if sys.version_info >= (3, 7):
18-
_work_queue: queue.SimpleQueue[Any]
57+
_work_queue: queue.SimpleQueue[_WorkItem[Any]]
1958
else:
20-
_work_queue: queue.Queue[Any]
59+
_work_queue: queue.Queue[_WorkItem[Any]]
2160
if sys.version_info >= (3, 7):
2261
def __init__(
2362
self,
@@ -28,13 +67,6 @@ class ThreadPoolExecutor(Executor):
2867
) -> None: ...
2968
else:
3069
def __init__(self, max_workers: int | None = ..., thread_name_prefix: str = ...) -> None: ...
31-
32-
class _WorkItem(Generic[_S]):
33-
future: Future[_S]
34-
fn: Callable[..., _S]
35-
args: Iterable[Any]
36-
kwargs: Mapping[str, Any]
37-
def __init__(self, future: Future[_S], fn: Callable[..., _S], args: Iterable[Any], kwargs: Mapping[str, Any]) -> None: ...
38-
def run(self) -> None: ...
39-
if sys.version_info >= (3, 9):
40-
def __class_getitem__(cls, item: Any) -> GenericAlias: ...
70+
def _adjust_thread_count(self) -> None: ...
71+
if sys.version_info >= (3, 7):
72+
def _initializer_failed(self) -> None: ...

0 commit comments

Comments
 (0)