diff --git a/paddlex/inference/common/reader/det_3d_reader.py b/paddlex/inference/common/reader/det_3d_reader.py index eff41cae77..b3717ddefb 100644 --- a/paddlex/inference/common/reader/det_3d_reader.py +++ b/paddlex/inference/common/reader/det_3d_reader.py @@ -17,6 +17,7 @@ import numpy as np +from ....utils.parallel import maybe_parallelize from ...utils.benchmark import benchmark @@ -238,4 +239,4 @@ def add_new_fields(self, sample): return sample def __call__(self, batch_data): - return [self.prepare_test_data(data_info) for data_info in batch_data] + return maybe_parallelize(self.prepare_test_data, batch_data) diff --git a/paddlex/inference/common/reader/image_reader.py b/paddlex/inference/common/reader/image_reader.py index 1f402fd67d..e87765624d 100644 --- a/paddlex/inference/common/reader/image_reader.py +++ b/paddlex/inference/common/reader/image_reader.py @@ -15,6 +15,7 @@ import numpy as np from ....utils.deps import class_requires_deps, is_dep_available +from ....utils.parallel import maybe_parallelize from ...utils.benchmark import benchmark from ...utils.io import ImageReader @@ -46,7 +47,7 @@ def __init__(self, format="BGR"): def __call__(self, imgs): """apply""" - return [self.read(img) for img in imgs] + return maybe_parallelize(self.read, imgs) def read(self, img): if isinstance(img, np.ndarray): diff --git a/paddlex/inference/common/reader/ts_reader.py b/paddlex/inference/common/reader/ts_reader.py index 66644ef491..2f4c10b658 100644 --- a/paddlex/inference/common/reader/ts_reader.py +++ b/paddlex/inference/common/reader/ts_reader.py @@ -14,6 +14,7 @@ import pandas as pd +from ....utils.parallel import maybe_parallelize from ...utils.benchmark import benchmark from ...utils.io import CSVReader @@ -27,7 +28,7 @@ def __init__(self): def __call__(self, ts_list): """apply""" - return [self.read(ts) for ts in ts_list] + return maybe_parallelize(self.read, ts_list) def read(self, ts): if isinstance(ts, pd.DataFrame): diff --git a/paddlex/inference/common/reader/video_reader.py b/paddlex/inference/common/reader/video_reader.py index 646cfcf03b..a865768746 100644 --- a/paddlex/inference/common/reader/video_reader.py +++ b/paddlex/inference/common/reader/video_reader.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. - +from ....utils.parallel import maybe_parallelize from ...utils.benchmark import benchmark from ...utils.io import VideoReader @@ -30,7 +30,7 @@ def __init__(self, backend="opencv", num_seg=8, seg_len=1, sample_type=None): def __call__(self, videos): """apply""" - return [self._read(video) for video in videos] + return maybe_parallelize(self._read_video, videos) def _read(self, file_path): return self._read_video(file_path) diff --git a/paddlex/inference/models/anomaly_detection/processors.py b/paddlex/inference/models/anomaly_detection/processors.py index 1eb3a1a668..7e2cac2d98 100644 --- a/paddlex/inference/models/anomaly_detection/processors.py +++ b/paddlex/inference/models/anomaly_detection/processors.py @@ -15,6 +15,7 @@ import numpy as np from ....utils.deps import class_requires_deps, is_dep_available +from ....utils.parallel import maybe_parallelize from ...utils.benchmark import benchmark if is_dep_available("scikit-image"): @@ -34,7 +35,7 @@ def __init__(self): def __call__(self, preds, *args): """apply""" - return [self.apply(pred) for pred in preds] + return maybe_parallelize(self.apply, preds) def apply( self, diff --git a/paddlex/inference/models/common/ts/processors.py b/paddlex/inference/models/common/ts/processors.py index 4d5f790174..3f32a47fae 100644 --- a/paddlex/inference/models/common/ts/processors.py +++ b/paddlex/inference/models/common/ts/processors.py @@ -18,6 +18,7 @@ import pandas as pd from .....utils.deps import class_requires_deps, is_dep_available +from .....utils.parallel import maybe_parallelize from ....utils.benchmark import benchmark from .funcs import load_from_dataframe, time_feature @@ -65,7 +66,7 @@ def __call__(self, ts_list: List) -> List: Returns: List: List of truncated time series data frames. """ - return [self.cutoff(ts) for ts in ts_list] + return maybe_parallelize(self.cutoff, ts_list) def cutoff(self, ts: Any) -> Any: """Truncates a single time series data frame to the specified length. @@ -125,7 +126,7 @@ def __call__(self, ts_list: List[pd.DataFrame]) -> List[pd.DataFrame]: Returns: List[pd.DataFrame]: List of normalized time series data frames. """ - return [self.tsnorm(ts) for ts in ts_list] + return maybe_parallelize(self.tsnorm, ts_list) def tsnorm(self, ts: pd.DataFrame) -> pd.DataFrame: """Normalizes specified columns of a single time series data frame. @@ -173,7 +174,7 @@ def __call__(self, ts_list: List) -> List: Returns: List: List of constructed time series datasets. """ - return [self.buildtsdata(ts) for ts in ts_list] + return maybe_parallelize(self.buildtsdata, ts_list) def buildtsdata(self, ts) -> Any: """Builds a time series dataset from a single time series data frame. @@ -216,7 +217,7 @@ def __call__(self, ts_list: List) -> List: Returns: List: List of time series with extracted time features. """ - return [self.timefeat(ts) for ts in ts_list] + return maybe_parallelize(self.timefeat, ts_list) def timefeat(self, ts: Dict[str, Any]) -> Any: """Extracts time features from a single time series data frame. @@ -275,7 +276,7 @@ def __call__(self, ts_list: List[Dict[str, Any]]) -> List[List[np.ndarray]]: Returns: List[List[np.ndarray]]: List of lists of arrays for each time series. """ - return [self.tstoarray(ts) for ts in ts_list] + return maybe_parallelize(self.tstoarray, ts_list) def tstoarray(self, ts: Dict[str, Any]) -> List[np.ndarray]: """Converts a single time series data frame into arrays. diff --git a/paddlex/inference/models/common/vision/processors.py b/paddlex/inference/models/common/vision/processors.py index eb97ed9888..055baf1874 100644 --- a/paddlex/inference/models/common/vision/processors.py +++ b/paddlex/inference/models/common/vision/processors.py @@ -18,6 +18,7 @@ from PIL import Image from .....utils.deps import class_requires_deps, is_dep_available +from .....utils.parallel import maybe_parallelize from ....utils.benchmark import benchmark from . import funcs as F @@ -116,7 +117,7 @@ def __init__( def __call__(self, imgs): """apply""" - return [self.resize(img) for img in imgs] + return maybe_parallelize(self.resize, imgs) def resize(self, img): target_size = self.target_size @@ -160,7 +161,7 @@ def __init__( def __call__(self, imgs): """apply""" - return [self.resize(img) for img in imgs] + return maybe_parallelize(self.resize, imgs) def resize(self, img): h, w = img.shape[:2] @@ -202,7 +203,7 @@ def __init__( def __call__(self, imgs): """apply""" - return [self.resize(img) for img in imgs] + return maybe_parallelize(self.resize, imgs) def resize(self, img): h, w = img.shape[:2] @@ -276,7 +277,7 @@ class ToCHWImage: def __call__(self, imgs): """apply""" - return [img.transpose((2, 0, 1)) for img in imgs] + return maybe_parallelize(lambda img: img.transpose((2, 0, 1)), imgs) @benchmark.timeit diff --git a/paddlex/inference/models/formula_recognition/predictor.py b/paddlex/inference/models/formula_recognition/predictor.py index 9ed29bc44c..a39c84aa97 100644 --- a/paddlex/inference/models/formula_recognition/predictor.py +++ b/paddlex/inference/models/formula_recognition/predictor.py @@ -17,6 +17,7 @@ from ....modules.formula_recognition.model_list import MODELS from ....utils import logging from ....utils.func_register import FuncRegister +from ....utils.parallel import maybe_parallelize from ...common.batch_sampler import ImageBatchSampler from ...common.reader import ReadImage from ..base import BasePredictor @@ -115,13 +116,15 @@ def process(self, batch_data): batch_pred_ = self.infer([batch_img])[0].reshape([-1]) max_length = max(max_length, batch_pred_.shape[0]) batch_preds.append(batch_pred_) - for i in range(len(batch_preds)): - batch_preds[i] = np.pad( - batch_preds[i], - (0, max_length - batch_preds[i].shape[0]), + batch_preds[:] = maybe_parallelize( + lambda pred: np.pad( + pred, + (0, max_length - pred.shape[0]), mode="constant", constant_values=0, - ) + ), + batch_preds, + ) else: x = self.pre_tfs["ToBatch"](imgs=batch_imgs) batch_preds = self.infer(x=x) diff --git a/paddlex/inference/models/formula_recognition/processors.py b/paddlex/inference/models/formula_recognition/processors.py index cb48defef7..d0c775ac74 100644 --- a/paddlex/inference/models/formula_recognition/processors.py +++ b/paddlex/inference/models/formula_recognition/processors.py @@ -22,6 +22,7 @@ from PIL import Image, ImageOps from ....utils.deps import class_requires_deps, is_dep_available +from ....utils.parallel import maybe_parallelize from ...utils.benchmark import benchmark if is_dep_available("opencv-contrib-python"): @@ -154,7 +155,7 @@ def __call__(self, imgs: List[np.ndarray]) -> List[np.ndarray]: Returns: list of np.ndarray: The list of resized images as numpy arrays with three channels. """ - return [self.resize(img) for img in imgs] + return maybe_parallelize(self.resize, imgs) @benchmark.timeit @@ -196,7 +197,7 @@ def __call__(self, imgs: List[np.ndarray]) -> List[np.ndarray]: Returns: list of np.array: The list of transformed images. """ - return [self.transform(img) for img in imgs] + return maybe_parallelize(self.transform, imgs) @benchmark.timeit @@ -235,7 +236,7 @@ def __call__(self, imgs: List[np.ndarray]) -> List[np.ndarray]: Returns: list of numpy.ndarray: A list of formatted images as numpy arrays. """ - return [self.format(img) for img in imgs] + return maybe_parallelize(self.format, imgs) @benchmark.timeit @@ -284,7 +285,7 @@ def normalize(self, img: Union[np.ndarray, Image.Image]) -> np.ndarray: def __call__(self, imgs: List[Union[np.ndarray, Image.Image]]) -> List[np.ndarray]: """Apply normalization to a list of images.""" - return [self.normalize(img) for img in imgs] + return maybe_parallelize(self.normalize, imgs) @benchmark.timeit @@ -556,7 +557,7 @@ def __call__(self, imgs: List[np.ndarray]) -> List[Optional[np.ndarray]]: Returns: list of numpy.ndarray: The list of decoded image arrays.""" - return [self.img_decode(img) for img in imgs] + return maybe_parallelize(self.img_decode, imgs) @benchmark.timeit @@ -973,7 +974,7 @@ def __call__(self, imgs: List[np.ndarray]) -> List[np.ndarray]: Returns: list of numpy.ndarray: The list of transformed images. """ - return [self.transform(img) for img in imgs] + return maybe_parallelize(self.transform, imgs) @benchmark.timeit @@ -1012,4 +1013,4 @@ def __call__(self, imgs: List[np.ndarray]) -> List[np.ndarray]: Returns: list of numpy.ndarray: The list of formatted images. """ - return [self.format(img) for img in imgs] + return maybe_parallelize(self.format, imgs) diff --git a/paddlex/inference/models/image_classification/processors.py b/paddlex/inference/models/image_classification/processors.py index f8d9b6ab6d..48412fb3d3 100644 --- a/paddlex/inference/models/image_classification/processors.py +++ b/paddlex/inference/models/image_classification/processors.py @@ -14,6 +14,7 @@ import numpy as np +from ....utils.parallel import maybe_parallelize from ...utils.benchmark import benchmark from ..common.vision import F @@ -44,7 +45,7 @@ def __init__(self, crop_size, mode="C"): def __call__(self, imgs): """apply""" - return [self.crop(img) for img in imgs] + return maybe_parallelize(self.crop, imgs) def crop(self, img): h, w = img.shape[:2] diff --git a/paddlex/inference/models/image_unwarping/processors.py b/paddlex/inference/models/image_unwarping/processors.py index dba72c5bea..230d128e67 100644 --- a/paddlex/inference/models/image_unwarping/processors.py +++ b/paddlex/inference/models/image_unwarping/processors.py @@ -16,6 +16,7 @@ import numpy as np +from ....utils.parallel import maybe_parallelize from ...utils.benchmark import benchmark @@ -61,7 +62,7 @@ def __call__( Returns: List[np.ndarray]: A list of processed images. """ - return [self.doctr(img) for img in imgs] + return maybe_parallelize(self.doctr, imgs) def doctr(self, pred: Union[np.ndarray, Tuple[np.ndarray, ...]]) -> np.ndarray: """ diff --git a/paddlex/inference/models/instance_segmentation/processors.py b/paddlex/inference/models/instance_segmentation/processors.py index 2f127c6564..f202629053 100644 --- a/paddlex/inference/models/instance_segmentation/processors.py +++ b/paddlex/inference/models/instance_segmentation/processors.py @@ -14,6 +14,7 @@ from typing import List, Optional +from ....utils.parallel import maybe_parallelize from ...utils.benchmark import benchmark from ..object_detection.processors import restructured_boxes @@ -91,12 +92,13 @@ def __call__( Returns: List[Boxes]: The list of post-processed detection boxes. """ - outputs = [] - for data, output in zip(datas, batch_outputs): - boxes_masks = self.apply( + outputs = maybe_parallelize( + lambda data, output: self.apply( img_size=data["ori_img_size"], **output, threshold=threshold if threshold is not None else self.threshold - ) - outputs.append(boxes_masks) + ), + datas, + batch_outputs, + ) return outputs diff --git a/paddlex/inference/models/keypoint_detection/processors.py b/paddlex/inference/models/keypoint_detection/processors.py index 86b062a589..ef8baff600 100644 --- a/paddlex/inference/models/keypoint_detection/processors.py +++ b/paddlex/inference/models/keypoint_detection/processors.py @@ -19,6 +19,7 @@ from numpy import ndarray from ....utils.deps import class_requires_deps, is_dep_available +from ....utils.parallel import maybe_parallelize from ...utils.benchmark import benchmark from ..object_detection.processors import get_affine_transform @@ -143,7 +144,7 @@ def apply( return img, center, scale def __call__(self, datas: List[dict]) -> List[dict]: - for data in datas: + def _process_data(data): ori_img = data["img"] if "ori_img" not in data: data["ori_img"] = ori_img @@ -160,6 +161,10 @@ def __call__(self, datas: List[dict]) -> List[dict]: img_size = [img.shape[1], img.shape[0]] data["img_size"] = img_size # [size_w, size_h] + return data + + datas = maybe_parallelize(_process_data, datas) + return datas @@ -234,10 +239,13 @@ def __call__(self, batch_outputs: List[dict], datas: List[dict]) -> List[Kpts]: Returns: List[dict]: The list of post-processed keypoints. """ - return [ - self.apply(output["heatmap"], data["center"], data["scale"]) - for data, output in zip(datas, batch_outputs) - ] + return maybe_parallelize( + lambda data, output: self.apply( + output["heatmap"], data["center"], data["scale"] + ), + datas, + batch_outputs, + ) def get_final_preds( self, heatmaps: ndarray, center: ndarray, scale: ndarray, kernelsize: int = 3 diff --git a/paddlex/inference/models/object_detection/processors.py b/paddlex/inference/models/object_detection/processors.py index 5b473e0b3b..3d109562eb 100644 --- a/paddlex/inference/models/object_detection/processors.py +++ b/paddlex/inference/models/object_detection/processors.py @@ -18,6 +18,7 @@ from numpy import ndarray from ....utils.deps import class_requires_deps, function_requires_deps, is_dep_available +from ....utils.parallel import maybe_parallelize from ...common.reader import ReadImage as CommonReadImage from ...utils.benchmark import benchmark from ..common import Normalize as CommonNormalize @@ -44,8 +45,8 @@ def __call__(self, raw_imgs: List[Union[ndarray, str, dict]]) -> List[dict]: Returns: List[dict]: A list of dictionaries, each containing image information. """ - out_datas = [] - for raw_img in raw_imgs: + + def _apply(raw_img): data = dict() if isinstance(raw_img, str): data["img_path"] = raw_img @@ -67,7 +68,9 @@ def __call__(self, raw_imgs: List[Union[ndarray, str, dict]]) -> List[dict]: data["img_size"] = [img.shape[1], img.shape[0]] # [size_w, size_h] data["ori_img_size"] = [img.shape[1], img.shape[0]] # [size_w, size_h] - out_datas.append(data) + return data + + out_datas = maybe_parallelize(_apply, raw_imgs) return out_datas @@ -109,7 +112,8 @@ def __call__(self, datas: List[dict]) -> List[dict]: List[dict]: A list of dictionaries with updated image data, including resized images, original image sizes, resized image sizes, and scale factors. """ - for data in datas: + + def _apply(data): ori_img = data["img"] if "ori_img_size" not in data: data["ori_img_size"] = [ori_img.shape[1], ori_img.shape[0]] @@ -126,6 +130,10 @@ def __call__(self, datas: List[dict]) -> List[dict]: img_size[1] / ori_img_size[1], ] + return data + + datas = maybe_parallelize(_apply, datas) + return datas @@ -135,8 +143,11 @@ def __call__(self, datas: List[dict]) -> List[dict]: """Normalizes images in a list of dictionaries. Iterates over each dictionary, applies normalization to the 'img' key, and returns the modified list. """ - for data in datas: - data["img"] = self.norm(data["img"]) + + imgs = maybe_parallelize(self.norm, [d["img"] for d in datas]) + for data, img in zip(datas, imgs): + data["img"] = img + return datas diff --git a/paddlex/inference/models/semantic_segmentation/processors.py b/paddlex/inference/models/semantic_segmentation/processors.py index 33ba67aedb..d67605cd6d 100644 --- a/paddlex/inference/models/semantic_segmentation/processors.py +++ b/paddlex/inference/models/semantic_segmentation/processors.py @@ -13,10 +13,12 @@ # limitations under the License. import math +from functools import partial import numpy as np from ....utils.deps import class_requires_deps, is_dep_available +from ....utils.parallel import maybe_parallelize from ...utils.benchmark import benchmark from ..common.vision import funcs as F from ..common.vision.processors import _BaseResize @@ -59,7 +61,7 @@ def __call__(self, imgs, target_size=None): if isinstance(target_size, int): target_size = (target_size, target_size) F.check_image_size(target_size) - return [self.resize(img, target_size) for img in imgs] + return maybe_parallelize(partial(self.resize, target_size=target_size), imgs) def resize(self, img, target_size): @@ -95,9 +97,7 @@ def __call__(self, imgs, src_images): assert len(imgs) == len(src_images) src_sizes = [src_image.shape[:2][::-1] for src_image in src_images] - return [ - self.reverse_resize(img, src_size) for img, src_size in zip(imgs, src_sizes) - ] + return maybe_parallelize(self.reverse_resize, imgs, src_sizes) def reverse_resize(self, img, src_size): """Restore the prediction map to source image size using nearest interpolation. diff --git a/paddlex/inference/models/table_structure_recognition/processors.py b/paddlex/inference/models/table_structure_recognition/processors.py index 508222befe..61b8289356 100644 --- a/paddlex/inference/models/table_structure_recognition/processors.py +++ b/paddlex/inference/models/table_structure_recognition/processors.py @@ -15,6 +15,7 @@ import numpy as np +from ....utils.parallel import maybe_parallelize from ...utils.benchmark import benchmark from ..common.vision import funcs as F @@ -58,7 +59,7 @@ def apply(self, img): def __call__(self, imgs): """apply""" - return [self.apply(img) for img in imgs] + return maybe_parallelize(self.apply, imgs) @benchmark.timeit diff --git a/paddlex/inference/models/text_detection/processors.py b/paddlex/inference/models/text_detection/processors.py index d26ee4b839..b005c8dd87 100644 --- a/paddlex/inference/models/text_detection/processors.py +++ b/paddlex/inference/models/text_detection/processors.py @@ -13,12 +13,14 @@ # limitations under the License. import math +from functools import partial from typing import Union import numpy as np from ....utils import logging from ....utils.deps import class_requires_deps, is_dep_available +from ....utils.parallel import maybe_parallelize from ...utils.benchmark import benchmark if is_dep_available("opencv-contrib-python"): @@ -67,10 +69,15 @@ def __call__( max_side_limit if max_side_limit is not None else self.max_side_limit ) resize_imgs, img_shapes = [], [] - for ori_img in imgs: - img, shape = self.resize( - ori_img, limit_side_len, limit_type, max_side_limit - ) + for img, shape in maybe_parallelize( + partial( + self.resize, + limit_side_len=limit_side_len, + limit_type=limit_type, + max_side_limit=max_side_limit, + ), + imgs, + ): resize_imgs.append(img) img_shapes.append(shape) return resize_imgs, img_shapes @@ -268,7 +275,7 @@ def _norm(img): res = np.transpose(res, (1, 2, 0)) return res - return [_norm(img) for img in imgs] + return maybe_parallelize(_norm, imgs) @benchmark.timeit @@ -494,14 +501,17 @@ def __call__( ): """apply""" boxes, scores = [], [] - for pred, img_shape in zip(preds[0], img_shapes): - box, score = self.process( + for box, score in maybe_parallelize( + lambda pred, img_shape: self.process( pred, img_shape, thresh or self.thresh, box_thresh or self.box_thresh, unclip_ratio or self.unclip_ratio, - ) + ), + preds[0], + img_shapes, + ): boxes.append(box) scores.append(score) return boxes, scores diff --git a/paddlex/inference/models/text_recognition/processors.py b/paddlex/inference/models/text_recognition/processors.py index 3971d85e24..95bbe4bc99 100644 --- a/paddlex/inference/models/text_recognition/processors.py +++ b/paddlex/inference/models/text_recognition/processors.py @@ -55,7 +55,8 @@ def resize_norm_img(self, img, max_wh_ratio): resized_w = int(math.ceil(imgH * ratio)) resized_image = cv2.resize(img, (resized_w, imgH)) resized_image = resized_image.astype("float32") - resized_image = resized_image.transpose((2, 0, 1)) / 255 + resized_image = resized_image.transpose((2, 0, 1)) + resized_image /= 255 resized_image -= 0.5 resized_image /= 0.5 padding_im = np.zeros((imgC, imgH, imgW), dtype=np.float32) diff --git a/paddlex/inference/models/ts_anomaly_detection/processors.py b/paddlex/inference/models/ts_anomaly_detection/processors.py index d5cb475779..d3adb6938a 100644 --- a/paddlex/inference/models/ts_anomaly_detection/processors.py +++ b/paddlex/inference/models/ts_anomaly_detection/processors.py @@ -17,6 +17,7 @@ import numpy as np import pandas as pd +from ....utils.parallel import maybe_parallelize from ...utils.benchmark import benchmark @@ -49,10 +50,7 @@ def __call__( Returns: List[pd.DataFrame]: A list of DataFrames, each containing anomaly labels for the time series. """ - return [ - self.getanomaly(ori_ts, pred) - for ori_ts, pred in zip(ori_ts_list, pred_list) - ] + return maybe_parallelize(self.getanomaly, ori_ts_list, pred_list) def getanomaly(self, ori_ts: Dict[str, Any], pred: np.ndarray) -> pd.DataFrame: """ diff --git a/paddlex/inference/models/ts_classification/processors.py b/paddlex/inference/models/ts_classification/processors.py index 1c10cd1eb4..46e9ddda09 100644 --- a/paddlex/inference/models/ts_classification/processors.py +++ b/paddlex/inference/models/ts_classification/processors.py @@ -17,6 +17,7 @@ import numpy as np import pandas as pd +from ....utils.parallel import maybe_parallelize from ...utils.benchmark import benchmark @@ -38,7 +39,7 @@ def __call__(self, pred_list: List[Any]) -> List[pd.DataFrame]: Returns: List[pd.DataFrame]: A list of DataFrames, each containing the class ID and score for the corresponding prediction. """ - return [self.getcls(pred) for pred in pred_list] + return maybe_parallelize(self.getcls, pred_list) def getcls(self, pred: Any) -> pd.DataFrame: """ @@ -86,7 +87,7 @@ def __call__(self, ts_list: List[Dict[str, Any]]) -> List[Dict[str, Any]]: Returns: List[Dict[str, Any]]: A list of dictionaries with updated 'features' and 'pad_mask' keys. """ - return [self.padmask(ts) for ts in ts_list] + return maybe_parallelize(self.padmask, ts_list) def padmask(self, ts: Dict[str, Any]) -> Dict[str, Any]: """ diff --git a/paddlex/inference/models/ts_forecasting/processors.py b/paddlex/inference/models/ts_forecasting/processors.py index b07ef0c330..440ecceb7b 100644 --- a/paddlex/inference/models/ts_forecasting/processors.py +++ b/paddlex/inference/models/ts_forecasting/processors.py @@ -18,6 +18,7 @@ import pandas as pd from ....utils.deps import class_requires_deps, is_dep_available +from ....utils.parallel import maybe_parallelize from ...utils.benchmark import benchmark if is_dep_available("joblib"): @@ -51,7 +52,7 @@ def __call__(self, preds_list: List[pd.DataFrame]) -> List[pd.DataFrame]: Returns: List[pd.DataFrame]: A list of DataFrames with de-normalized prediction data. """ - return [self.tsdenorm(pred) for pred in preds_list] + return maybe_parallelize(self.tsdenorm, preds_list) def tsdenorm(self, pred: pd.DataFrame) -> pd.DataFrame: """ @@ -95,9 +96,7 @@ def __call__( Returns: List[pd.DataFrame]: A list of DataFrames, each representing the forecasted time series. """ - return [ - self.arraytots(ori_ts, pred) for ori_ts, pred in zip(ori_ts_list, pred_list) - ] + return maybe_parallelize(self.arraytots, ori_ts_list, pred_list) def arraytots(self, ori_ts: Dict[str, Any], pred: np.ndarray) -> pd.DataFrame: """ diff --git a/paddlex/inference/models/video_classification/processors.py b/paddlex/inference/models/video_classification/processors.py index 19e94b7993..f5a322f58c 100644 --- a/paddlex/inference/models/video_classification/processors.py +++ b/paddlex/inference/models/video_classification/processors.py @@ -18,6 +18,7 @@ import numpy as np from ....utils.deps import class_requires_deps, is_dep_available +from ....utils.parallel import maybe_parallelize from ...utils.benchmark import benchmark if is_dep_available("opencv-contrib-python"): @@ -132,7 +133,7 @@ def __call__(self, videos: List[np.ndarray]) -> List[np.ndarray]: Returns: List[np.ndarray]: A list of videos after scaling, where each video is a list of images. """ - return [self.scale(video) for video in videos] + return maybe_parallelize(self.scale, videos) @benchmark.timeit @@ -186,7 +187,7 @@ def __call__(self, videos: List[np.ndarray]) -> List[np.ndarray]: Returns: List[np.ndarray]: A list of videos after center cropping. """ - return [self.center_crop(video) for video in videos] + return maybe_parallelize(self.center_crop, videos) @benchmark.timeit @@ -240,7 +241,7 @@ def __call__(self, videos: List[np.ndarray]) -> List[np.ndarray]: Returns: List[np.ndarray]: A list of numpy arrays, one for each video. """ - return [self.img2array(video) for video in videos] + return maybe_parallelize(self.img2array, videos) @benchmark.timeit @@ -319,7 +320,7 @@ def __call__(self, videos: List[np.ndarray]) -> List[np.ndarray]: Returns: List[np.ndarray]: A list of normalized videos as numpy arrays. """ - return [self.normalize_video(video) for video in videos] + return maybe_parallelize(self.normalize_video, videos) @benchmark.timeit diff --git a/paddlex/inference/models/video_detection/processors.py b/paddlex/inference/models/video_detection/processors.py index 46d4dce639..f73255d671 100644 --- a/paddlex/inference/models/video_detection/processors.py +++ b/paddlex/inference/models/video_detection/processors.py @@ -18,6 +18,7 @@ import numpy as np from ....utils.deps import class_requires_deps +from ....utils.parallel import maybe_parallelize from ...utils.benchmark import benchmark @@ -88,7 +89,7 @@ def __call__(self, videos: List) -> List: Returns: list: A list of videos with each frame resized to the target size. """ - return [self.resize(video) for video in videos] + return maybe_parallelize(self.resize, videos) @benchmark.timeit @@ -144,7 +145,7 @@ def __call__(self, videos: List[List[np.ndarray]]) -> List[np.ndarray]: Returns: List[np.ndarray]: A list of processed videos with transposed frames. """ - return [self.img2array(video) for video in videos] + return maybe_parallelize(self.img2array, videos) @benchmark.timeit @@ -193,7 +194,7 @@ def __call__(self, videos: List[List[np.ndarray]]) -> List[List[np.ndarray]]: Returns: List[List[np.ndarray]]: A list of normalized videos, each represented as a list of normalized frames. """ - return [self.normalize_video(video) for video in videos] + return maybe_parallelize(self.normalize_video, videos) def convert2cpu(gpu_matrix): @@ -460,4 +461,6 @@ def postprocess(self, pred: List, nms_thresh: float, score_thresh: float) -> Lis return pred_all def __call__(self, preds: List, nms_thresh, score_thresh) -> List: - return [self.postprocess(pred, nms_thresh, score_thresh) for pred in preds] + return maybe_parallelize( + lambda pred: self.postprocess(pred, nms_thresh, score_thresh), preds + ) diff --git a/paddlex/inference/pipelines/components/common/crop_image_regions.py b/paddlex/inference/pipelines/components/common/crop_image_regions.py index 561494ec79..0f0a4688a2 100644 --- a/paddlex/inference/pipelines/components/common/crop_image_regions.py +++ b/paddlex/inference/pipelines/components/common/crop_image_regions.py @@ -12,13 +12,13 @@ # See the License for the specific language governing permissions and # limitations under the License. -import copy from typing import List, Tuple import numpy as np from numpy.linalg import norm from .....utils.deps import class_requires_deps, is_dep_available +from .....utils.parallel import maybe_parallelize from .base_operator import BaseOperator from .seal_det_warp import AutoRectifier @@ -53,15 +53,16 @@ def __call__(self, img: np.ndarray, boxes: List[dict]) -> List[dict]: list[dict]: A list of dictionaries, each containing a cropped image ('img'), the original bounding box coordinates ('box'), and the label ('label'). """ - output_list = [] - for bbox_info in boxes: + + def _apply(bbox_info): label_id = bbox_info["cls_id"] box = bbox_info["coordinate"] label = bbox_info.get("label", label_id) xmin, ymin, xmax, ymax = [int(i) for i in box] img_crop = img[ymin:ymax, xmin:xmax].copy() - output_list.append({"img": img_crop, "box": box, "label": label}) - return output_list + return {"img": img_crop, "box": box, "label": label} + + return maybe_parallelize(_apply, boxes) @class_requires_deps("opencv-contrib-python", "shapely") @@ -96,19 +97,21 @@ def __call__(self, img: np.ndarray, dt_polys: List[list]) -> List[dict]: """ if self.det_box_type == "quad": - dt_boxes = np.array(dt_polys) - output_list = [] - for bno in range(len(dt_boxes)): - tmp_box = copy.deepcopy(dt_boxes[bno]) + + def _apply(dt_box): + tmp_box = np.array(dt_box) img_crop = self.get_minarea_rect_crop(img, tmp_box) - output_list.append(img_crop) + return img_crop + + output_list = maybe_parallelize(_apply, dt_polys) elif self.det_box_type == "poly": - output_list = [] - dt_boxes = dt_polys - for bno in range(len(dt_boxes)): - tmp_box = copy.deepcopy(dt_boxes[bno]) + + def _apply(dt_box): + tmp_box = np.array(dt_box) img_crop = self.get_poly_rect_crop(img.copy(), tmp_box) - output_list.append(img_crop) + return img_crop + + output_list = maybe_parallelize(_apply, dt_polys) else: raise NotImplementedError diff --git a/paddlex/utils/flags.py b/paddlex/utils/flags.py index 2e77d02821..58b34209c9 100644 --- a/paddlex/utils/flags.py +++ b/paddlex/utils/flags.py @@ -28,6 +28,7 @@ "USE_PIR_TRT", "DISABLE_DEV_MODEL_WL", "DISABLE_CINN_MODEL_WL", + "EXP_USE_PARALLEL_COMPUTING", ] @@ -68,3 +69,7 @@ def get_flag_from_env_var(name, default, format_func=str): INFER_BENCHMARK_USE_CACHE_FOR_READ = get_flag_from_env_var( "PADDLE_PDX_INFER_BENCHMARK_USE_CACHE_FOR_READ", False ) + +EXP_USE_PARALLEL_COMPUTING = get_flag_from_env_var( + "PADDLE_PDX_EXP_USE_PARALLEL_COMPUTING", False +) diff --git a/paddlex/utils/parallel.py b/paddlex/utils/parallel.py new file mode 100644 index 0000000000..392ce5efee --- /dev/null +++ b/paddlex/utils/parallel.py @@ -0,0 +1,55 @@ +# Copyright (c) 2024 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os + +import joblib + +from . import logging +from .flags import EXP_USE_PARALLEL_COMPUTING + +__all__ = [ + "set_global_parallel_computing_executor", + "get_global_parallel_computing_executor", + "maybe_parallelize", +] + +_executor = None + + +def _get_default_num_jobs(): + return min(32, os.cpu_count() + 4) + + +def set_global_parallel_computing_executor(executor): + global _executor + if _executor is not None: + logging.warning("The old executor will be replaced.") + old_executor = _executor + _executor = executor + return old_executor + + +def get_global_parallel_computing_executor(): + return _executor + + +def maybe_parallelize(func, /, *iterables, executor=None): + if not EXP_USE_PARALLEL_COMPUTING: + return [func(*x) for x in zip(*iterables)] + if executor is None: + executor = _executor + if executor is None: + executor = joblib.Parallel(n_jobs=_get_default_num_jobs(), prefer="threads") + return executor(joblib.delayed(func)(*x) for x in zip(*iterables)) diff --git a/setup.py b/setup.py index 2a8392f5e5..e76c876abe 100644 --- a/setup.py +++ b/setup.py @@ -83,6 +83,7 @@ "colorlog", "filelock", "GPUtil", + "joblib", "numpy", "packaging", # Currently it is not easy to make `pandas` optional @@ -159,7 +160,6 @@ ], "ts": [ "chinese-calendar", - "joblib", "matplotlib", "scikit-learn", ],