Skip to content

Commit 247c7bb

Browse files
committed
update impl; add tests
1 parent bfa1df8 commit 247c7bb

File tree

4 files changed

+278
-46
lines changed

4 files changed

+278
-46
lines changed

testplan/common/entity/base.py

Lines changed: 33 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import traceback
1313
from collections import OrderedDict, deque
1414
from contextlib import suppress
15+
from multiprocessing import TimeoutError
1516
from typing import (
1617
Any,
1718
Callable,
@@ -34,7 +35,7 @@
3435
from testplan.common.utils.path import default_runpath, makedirs, makeemptydirs
3536
from testplan.common.utils.strings import slugify, uuid4
3637
from testplan.common.utils.thread import execute_as_thread, interruptible_join
37-
from testplan.common.utils.timing import wait, Timer
38+
from testplan.common.utils.timing import Timer, wait
3839
from testplan.common.utils.validation import is_subclass
3940

4041

@@ -239,32 +240,32 @@ def start(self):
239240
else:
240241
resource.logger.info("%s started", resource)
241242

242-
def start_in_pool(self, pool: mp.ThreadPool):
243+
def start_in_pool(
244+
self, pool: mp.ThreadPool, timeout: Optional[float] = None
245+
):
243246
"""
244247
Start all resources concurrently in thread pool and log exceptions.
245248
246249
:param pool: thread pool
247250
"""
248-
249-
for resource in self._resources.values():
250-
if not resource.async_start:
251-
raise RuntimeError(
252-
f"Cannot start resource {resource} in thread pool,"
253-
" its `async_start` attribute is set to False"
254-
)
255-
256-
def start_in_thread(resource):
257-
resource.start()
258-
resource.wait(resource.STATUS.STARTED)
259-
resource.logger.info("%s started", resource)
251+
# async_start is meaningless here...
252+
for r in self._resources.values():
253+
if r.auto_start:
254+
r.cfg.set_local("async_start", False)
260255

261256
async_r = pool.map_async(
262257
self._apply_resource_exception_logged(
263-
self.start_exceptions, start_in_thread
258+
self.start_exceptions, lambda r: r.start()
264259
),
265260
(r for r in self._resources.values() if r.auto_start),
266261
)
267-
async_r.wait()
262+
try:
263+
async_r.get(timeout)
264+
except TimeoutError:
265+
raise RuntimeError(
266+
f"timeout after {timeout}s when starting resources "
267+
f"{self._resources.values()} in internal threading pool."
268+
)
268269

269270
def sync_stop_resource(self, resource: "Resource"):
270271
"""
@@ -346,42 +347,33 @@ def stop(self, is_reversed: bool = False):
346347
resource.force_stop()
347348
resource.logger.info("%s stopped", resource)
348349

349-
def stop_in_pool(self, pool: mp.ThreadPool):
350+
def stop_in_pool(
351+
self, pool: mp.ThreadPool, timeout: Optional[float] = None
352+
):
350353
"""
351354
Stop all resources concurrently in thread pool and log exceptions.
352355
353356
:param pool: thread pool
354357
"""
355-
for resource in self._resources.values():
356-
if not resource.async_start:
357-
raise RuntimeError(
358-
f"Cannot stop resource {resource} in thread pool,"
359-
" its `async_start` attribute is set to False"
360-
)
361-
362-
def stop_in_thread(resource: "Resource"):
363-
try:
364-
if resource.status not in (
365-
resource.STATUS.STOPPING,
366-
resource.STATUS.STOPPED,
367-
):
368-
resource.stop()
369-
if resource.status == resource.STATUS.STOPPING:
370-
resource.wait(resource.STATUS.STOPPED)
371-
except:
372-
# Resource status should be STOPPED even it failed to stop
373-
resource.force_stop()
374-
raise
375-
finally:
376-
resource.logger.info("%s stopped", resource)
358+
# async_start is meaningless here...
359+
# in practice, stop_in_pool must be called in pair of start_in_pool
360+
for r in self._resources.values():
361+
if r.auto_start:
362+
r.cfg.set_local("async_start", False)
377363

378364
async_r = pool.map_async(
379365
self._apply_resource_exception_logged(
380-
self.stop_exceptions, stop_in_thread
366+
self.stop_exceptions, lambda r: r.stop()
381367
),
382368
self._resources.values(),
383369
)
384-
async_r.wait()
370+
try:
371+
async_r.get(timeout)
372+
except TimeoutError:
373+
raise RuntimeError(
374+
f"timeout after {timeout}s when stopping resources "
375+
f"{self._resources.values()} in internal threading pool."
376+
)
385377

386378
def _apply_resource_exception_logged(
387379
self, exception_record, func: Callable[["Resource"], None]

testplan/common/remote/remote_resource.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ class RemoteResource(Entity):
143143
:param paramiko_config: Paramiko SSH client extra configuration.
144144
:param remote_runtime_builder: RuntimeBuilder instance to prepare remote python env.
145145
Default is ``SourceTransferBuilder()``.
146-
:param status_wait_timeout: remote resource start/stop timeout, default is 60.
146+
:param status_wait_timeout: remote resource start/stop timeout, default is 600.
147147
"""
148148

149149
CONFIG = RemoteResourceConfig
@@ -170,7 +170,7 @@ def __init__(
170170
setup_script: List[str] = None,
171171
paramiko_config: Optional[dict] = None,
172172
remote_runtime_builder: Optional[RuntimeBuilder] = None,
173-
status_wait_timeout: int = 60,
173+
status_wait_timeout: int = 600,
174174
**options,
175175
) -> None:
176176
if not worker_is_remote(remote_host):

testplan/runners/pools/remote.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -364,20 +364,26 @@ def _start_workers(self) -> None:
364364
for worker in self._workers:
365365
self._conn.register(worker)
366366
if self.pool:
367-
self._workers.start_in_pool(self.pool)
367+
self._workers.start_in_pool(
368+
self.pool,
369+
self.cfg.status_wait_timeout,
370+
)
368371
else:
369372
self._workers.start()
370373

371374
def _stop_workers(self) -> None:
372375
if self.pool:
373-
self._workers.stop_in_pool(self.pool)
376+
self._workers.stop_in_pool(
377+
self.pool,
378+
self.cfg.status_wait_timeout,
379+
)
374380
else:
375381
self._workers.stop()
376382

377383
def _start_thread_pool(self) -> None:
378384
size = len(self._instances)
379385
try:
380-
if size > 2:
386+
if size >= 2:
381387
self.pool = ThreadPool(min(size, cpu_count()))
382388
except Exception as exc:
383389
if isinstance(exc, AttributeError):

0 commit comments

Comments
 (0)