Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 51 additions & 74 deletions testplan/common/entity/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
configuration, start/stop/run/abort, create results and have some state.
"""

import multiprocessing.pool as mp
import os
import signal
import sys
Expand All @@ -11,6 +12,7 @@
import traceback
from collections import OrderedDict, deque
from contextlib import suppress
from multiprocessing import TimeoutError
from typing import (
Any,
Callable,
Expand All @@ -33,7 +35,7 @@
from testplan.common.utils.path import default_runpath, makedirs, makeemptydirs
from testplan.common.utils.strings import slugify, uuid4
from testplan.common.utils.thread import execute_as_thread, interruptible_join
from testplan.common.utils.timing import wait, Timer
from testplan.common.utils.timing import Timer, wait
from testplan.common.utils.validation import is_subclass


Expand Down Expand Up @@ -238,39 +240,32 @@ def start(self):
else:
resource.logger.info("%s started", resource)

def start_in_pool(self, pool):
def start_in_pool(
self, pool: mp.ThreadPool, timeout: Optional[float] = None
):
"""
Start all resources concurrently in thread pool.
Start all resources concurrently in thread pool and log exceptions.

:param pool: thread pool
:type pool: ``ThreadPool``
"""
# async_start is meaningless here...
for r in self._resources.values():
if r.auto_start:
r.cfg.set_local("async_start", False)

for resource in self._resources.values():
if not resource.async_start:
raise RuntimeError(
f"Cannot start resource {resource} in thread pool,"
" its `async_start` attribute is set to False"
)

# Trigger start all resources
resources_to_wait_for = []
for resource in self._resources.values():
if not resource.auto_start:
continue

pool.apply_async(
self._log_exception(
resource, resource.start, self.start_exceptions
)
async_r = pool.map_async(
self._apply_resource_exception_logged(
self.start_exceptions, lambda r: r.start()
),
(r for r in self._resources.values() if r.auto_start),
)
try:
async_r.get(timeout)
except TimeoutError:
raise RuntimeError(
f"timeout after {timeout}s when starting resources "
f"{self._resources.values()} in internal threading pool."
)
resources_to_wait_for.append(resource)

# Wait resources status to be STARTED.
for resource in resources_to_wait_for:
if resource not in self.start_exceptions:
resource.wait(resource.STATUS.STARTED)
resource.logger.info("%s started", resource)

def sync_stop_resource(self, resource: "Resource"):
"""
Expand Down Expand Up @@ -352,67 +347,49 @@ def stop(self, is_reversed: bool = False):
resource.force_stop()
resource.logger.info("%s stopped", resource)

def stop_in_pool(self, pool, is_reversed=False):
def stop_in_pool(
self, pool: mp.ThreadPool, timeout: Optional[float] = None
):
"""
Stop all resources in reverse order and log exceptions.
Stop all resources concurrently in thread pool and log exceptions.

:param pool: thread pool
:type pool: ``ThreadPool``
:param is_reversed: flag whether to stop resources in reverse order
:type is_reversed: ``bool``
"""
resources = list(self._resources.values())
if is_reversed is True:
resources = resources[::-1]
# async_start is meaningless here...
# in practice, stop_in_pool must be called in pair of start_in_pool
for r in self._resources.values():
if r.auto_start:
r.cfg.set_local("async_start", False)

# Stop all resources
resources_to_wait_for = []
for resource in resources:
if resource.status in (
resource.STATUS.STOPPING,
resource.STATUS.STOPPED,
):
continue
pool.apply_async(
self._log_exception(
resource, resource.stop, self.stop_exceptions
)
async_r = pool.map_async(
self._apply_resource_exception_logged(
self.stop_exceptions, lambda r: r.stop()
),
self._resources.values(),
)
try:
async_r.get(timeout)
except TimeoutError:
raise RuntimeError(
f"timeout after {timeout}s when stopping resources "
f"{self._resources.values()} in internal threading pool."
)
resources_to_wait_for.append(resource)

# Wait resources status to be STOPPED.
for resource in resources_to_wait_for:
if resource not in self.stop_exceptions:
if resource.async_start:
resource.wait(resource.STATUS.STOPPED)
else:
# avoid post_stop being called twice
wait(
lambda: resource.status == resource.STATUS.STOPPED,
timeout=resource.cfg.status_wait_timeout,
)
resource.logger.info("%s stopped", resource)
else:
# Resource status should be STOPPED even it failed to stop
resource.force_stop()

def _log_exception(self, resource, func, exception_record):
def _apply_resource_exception_logged(
self, exception_record, func: Callable[["Resource"], None]
) -> Callable[["Resource"], None]:
"""
Decorator for logging an exception at resource and environment level.
Applying ``func`` to resource and log possible exception at environment level.

:param resource: resource to log the exception with
:type resource: :py:class:`~testplan.common.entity.base.Resource`
:param func: function to catch exception for
:type func: ``Callable``
:param exception_record: A dictionary that maps resource name to
exception message during start or stop: `self.start_exception`
for `start()` and `self.stop_exceptions` for `stop()`.
:type exception_record: ``dict``
:param func: function to catch exception for
"""

def wrapper(*args, **kargs):
def wrapper(resource: "Resource"):
try:
func(*args, **kargs)
func(resource)
except Exception:
msg = "While executing {} of resource [{}]\n{}".format(
func.__name__, resource.cfg.name, traceback.format_exc()
Expand Down
4 changes: 2 additions & 2 deletions testplan/common/remote/remote_resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ class RemoteResource(Entity):
:param paramiko_config: Paramiko SSH client extra configuration.
:param remote_runtime_builder: RuntimeBuilder instance to prepare remote python env.
Default is ``SourceTransferBuilder()``.
:param status_wait_timeout: remote resource start/stop timeout, default is 60.
:param status_wait_timeout: remote resource start/stop timeout, default is 600.
"""

CONFIG = RemoteResourceConfig
Expand All @@ -170,7 +170,7 @@ def __init__(
setup_script: List[str] = None,
paramiko_config: Optional[dict] = None,
remote_runtime_builder: Optional[RuntimeBuilder] = None,
status_wait_timeout: int = 60,
status_wait_timeout: int = 600,
**options,
) -> None:
if not worker_is_remote(remote_host):
Expand Down
12 changes: 9 additions & 3 deletions testplan/runners/pools/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,20 +364,26 @@ def _start_workers(self) -> None:
for worker in self._workers:
self._conn.register(worker)
if self.pool:
self._workers.start_in_pool(self.pool)
self._workers.start_in_pool(
self.pool,
self.cfg.status_wait_timeout,
)
else:
self._workers.start()

def _stop_workers(self) -> None:
if self.pool:
self._workers.stop_in_pool(self.pool)
self._workers.stop_in_pool(
self.pool,
self.cfg.status_wait_timeout,
)
else:
self._workers.stop()

def _start_thread_pool(self) -> None:
size = len(self._instances)
try:
if size > 2:
if size >= 2:
self.pool = ThreadPool(min(size, cpu_count()))
except Exception as exc:
if isinstance(exc, AttributeError):
Expand Down
Loading