Skip to content

[Feat][Experimental] Support parallel computing (embarassingly parallel) #3685

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 14 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion paddlex/inference/common/reader/det_3d_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import numpy as np

from ....utils.parallel import maybe_parallelize
from ...utils.benchmark import benchmark


Expand Down Expand Up @@ -238,4 +239,4 @@ def add_new_fields(self, sample):
return sample

def __call__(self, batch_data):
return [self.prepare_test_data(data_info) for data_info in batch_data]
return maybe_parallelize(self.prepare_test_data, batch_data)
3 changes: 2 additions & 1 deletion paddlex/inference/common/reader/image_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import numpy as np

from ....utils.deps import class_requires_deps, is_dep_available
from ....utils.parallel import maybe_parallelize
from ...utils.benchmark import benchmark
from ...utils.io import ImageReader

Expand Down Expand Up @@ -46,7 +47,7 @@ def __init__(self, format="BGR"):

def __call__(self, imgs):
"""apply"""
return [self.read(img) for img in imgs]
return maybe_parallelize(self.read, imgs)

def read(self, img):
if isinstance(img, np.ndarray):
Expand Down
3 changes: 2 additions & 1 deletion paddlex/inference/common/reader/ts_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import pandas as pd

from ....utils.parallel import maybe_parallelize
from ...utils.benchmark import benchmark
from ...utils.io import CSVReader

Expand All @@ -27,7 +28,7 @@ def __init__(self):

def __call__(self, ts_list):
"""apply"""
return [self.read(ts) for ts in ts_list]
return maybe_parallelize(self.read, ts_list)

def read(self, ts):
if isinstance(ts, pd.DataFrame):
Expand Down
4 changes: 2 additions & 2 deletions paddlex/inference/common/reader/video_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.


from ....utils.parallel import maybe_parallelize
from ...utils.benchmark import benchmark
from ...utils.io import VideoReader

Expand All @@ -30,7 +30,7 @@ def __init__(self, backend="opencv", num_seg=8, seg_len=1, sample_type=None):

def __call__(self, videos):
"""apply"""
return [self._read(video) for video in videos]
return maybe_parallelize(self._read_video, videos)

def _read(self, file_path):
return self._read_video(file_path)
Expand Down
3 changes: 2 additions & 1 deletion paddlex/inference/models/anomaly_detection/processors.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import numpy as np

from ....utils.deps import class_requires_deps, is_dep_available
from ....utils.parallel import maybe_parallelize
from ...utils.benchmark import benchmark

if is_dep_available("scikit-image"):
Expand All @@ -34,7 +35,7 @@ def __init__(self):

def __call__(self, preds, *args):
"""apply"""
return [self.apply(pred) for pred in preds]
return maybe_parallelize(self.apply, preds)

def apply(
self,
Expand Down
11 changes: 6 additions & 5 deletions paddlex/inference/models/common/ts/processors.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import pandas as pd

from .....utils.deps import class_requires_deps, is_dep_available
from .....utils.parallel import maybe_parallelize
from ....utils.benchmark import benchmark
from .funcs import load_from_dataframe, time_feature

Expand Down Expand Up @@ -65,7 +66,7 @@ def __call__(self, ts_list: List) -> List:
Returns:
List: List of truncated time series data frames.
"""
return [self.cutoff(ts) for ts in ts_list]
return maybe_parallelize(self.cutoff, ts_list)

def cutoff(self, ts: Any) -> Any:
"""Truncates a single time series data frame to the specified length.
Expand Down Expand Up @@ -125,7 +126,7 @@ def __call__(self, ts_list: List[pd.DataFrame]) -> List[pd.DataFrame]:
Returns:
List[pd.DataFrame]: List of normalized time series data frames.
"""
return [self.tsnorm(ts) for ts in ts_list]
return maybe_parallelize(self.tsnorm, ts_list)

def tsnorm(self, ts: pd.DataFrame) -> pd.DataFrame:
"""Normalizes specified columns of a single time series data frame.
Expand Down Expand Up @@ -173,7 +174,7 @@ def __call__(self, ts_list: List) -> List:
Returns:
List: List of constructed time series datasets.
"""
return [self.buildtsdata(ts) for ts in ts_list]
return maybe_parallelize(self.buildtsdata, ts_list)

def buildtsdata(self, ts) -> Any:
"""Builds a time series dataset from a single time series data frame.
Expand Down Expand Up @@ -216,7 +217,7 @@ def __call__(self, ts_list: List) -> List:
Returns:
List: List of time series with extracted time features.
"""
return [self.timefeat(ts) for ts in ts_list]
return maybe_parallelize(self.timefeat, ts_list)

def timefeat(self, ts: Dict[str, Any]) -> Any:
"""Extracts time features from a single time series data frame.
Expand Down Expand Up @@ -275,7 +276,7 @@ def __call__(self, ts_list: List[Dict[str, Any]]) -> List[List[np.ndarray]]:
Returns:
List[List[np.ndarray]]: List of lists of arrays for each time series.
"""
return [self.tstoarray(ts) for ts in ts_list]
return maybe_parallelize(self.tstoarray, ts_list)

def tstoarray(self, ts: Dict[str, Any]) -> List[np.ndarray]:
"""Converts a single time series data frame into arrays.
Expand Down
9 changes: 5 additions & 4 deletions paddlex/inference/models/common/vision/processors.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from PIL import Image

from .....utils.deps import class_requires_deps, is_dep_available
from .....utils.parallel import maybe_parallelize
from ....utils.benchmark import benchmark
from . import funcs as F

Expand Down Expand Up @@ -116,7 +117,7 @@ def __init__(

def __call__(self, imgs):
"""apply"""
return [self.resize(img) for img in imgs]
return maybe_parallelize(self.resize, imgs)

def resize(self, img):
target_size = self.target_size
Expand Down Expand Up @@ -160,7 +161,7 @@ def __init__(

def __call__(self, imgs):
"""apply"""
return [self.resize(img) for img in imgs]
return maybe_parallelize(self.resize, imgs)

def resize(self, img):
h, w = img.shape[:2]
Expand Down Expand Up @@ -202,7 +203,7 @@ def __init__(

def __call__(self, imgs):
"""apply"""
return [self.resize(img) for img in imgs]
return maybe_parallelize(self.resize, imgs)

def resize(self, img):
h, w = img.shape[:2]
Expand Down Expand Up @@ -276,7 +277,7 @@ class ToCHWImage:

def __call__(self, imgs):
"""apply"""
return [img.transpose((2, 0, 1)) for img in imgs]
return maybe_parallelize(lambda img: img.transpose((2, 0, 1)), imgs)


@benchmark.timeit
Expand Down
13 changes: 8 additions & 5 deletions paddlex/inference/models/formula_recognition/predictor.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from ....modules.formula_recognition.model_list import MODELS
from ....utils import logging
from ....utils.func_register import FuncRegister
from ....utils.parallel import maybe_parallelize
from ...common.batch_sampler import ImageBatchSampler
from ...common.reader import ReadImage
from ..base import BasePredictor
Expand Down Expand Up @@ -115,13 +116,15 @@ def process(self, batch_data):
batch_pred_ = self.infer([batch_img])[0].reshape([-1])
max_length = max(max_length, batch_pred_.shape[0])
batch_preds.append(batch_pred_)
for i in range(len(batch_preds)):
batch_preds[i] = np.pad(
batch_preds[i],
(0, max_length - batch_preds[i].shape[0]),
batch_preds[:] = maybe_parallelize(
lambda pred: np.pad(
pred,
(0, max_length - pred.shape[0]),
mode="constant",
constant_values=0,
)
),
batch_preds,
)
else:
x = self.pre_tfs["ToBatch"](imgs=batch_imgs)
batch_preds = self.infer(x=x)
Expand Down
15 changes: 8 additions & 7 deletions paddlex/inference/models/formula_recognition/processors.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from PIL import Image, ImageOps

from ....utils.deps import class_requires_deps, is_dep_available
from ....utils.parallel import maybe_parallelize
from ...utils.benchmark import benchmark

if is_dep_available("opencv-contrib-python"):
Expand Down Expand Up @@ -154,7 +155,7 @@ def __call__(self, imgs: List[np.ndarray]) -> List[np.ndarray]:
Returns:
list of np.ndarray: The list of resized images as numpy arrays with three channels.
"""
return [self.resize(img) for img in imgs]
return maybe_parallelize(self.resize, imgs)


@benchmark.timeit
Expand Down Expand Up @@ -196,7 +197,7 @@ def __call__(self, imgs: List[np.ndarray]) -> List[np.ndarray]:
Returns:
list of np.array: The list of transformed images.
"""
return [self.transform(img) for img in imgs]
return maybe_parallelize(self.transform, imgs)


@benchmark.timeit
Expand Down Expand Up @@ -235,7 +236,7 @@ def __call__(self, imgs: List[np.ndarray]) -> List[np.ndarray]:
Returns:
list of numpy.ndarray: A list of formatted images as numpy arrays.
"""
return [self.format(img) for img in imgs]
return maybe_parallelize(self.format, imgs)


@benchmark.timeit
Expand Down Expand Up @@ -284,7 +285,7 @@ def normalize(self, img: Union[np.ndarray, Image.Image]) -> np.ndarray:

def __call__(self, imgs: List[Union[np.ndarray, Image.Image]]) -> List[np.ndarray]:
"""Apply normalization to a list of images."""
return [self.normalize(img) for img in imgs]
return maybe_parallelize(self.normalize, imgs)


@benchmark.timeit
Expand Down Expand Up @@ -556,7 +557,7 @@ def __call__(self, imgs: List[np.ndarray]) -> List[Optional[np.ndarray]]:

Returns:
list of numpy.ndarray: The list of decoded image arrays."""
return [self.img_decode(img) for img in imgs]
return maybe_parallelize(self.img_decode, imgs)


@benchmark.timeit
Expand Down Expand Up @@ -973,7 +974,7 @@ def __call__(self, imgs: List[np.ndarray]) -> List[np.ndarray]:
Returns:
list of numpy.ndarray: The list of transformed images.
"""
return [self.transform(img) for img in imgs]
return maybe_parallelize(self.transform, imgs)


@benchmark.timeit
Expand Down Expand Up @@ -1012,4 +1013,4 @@ def __call__(self, imgs: List[np.ndarray]) -> List[np.ndarray]:
Returns:
list of numpy.ndarray: The list of formatted images.
"""
return [self.format(img) for img in imgs]
return maybe_parallelize(self.format, imgs)
3 changes: 2 additions & 1 deletion paddlex/inference/models/image_classification/processors.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import numpy as np

from ....utils.parallel import maybe_parallelize
from ...utils.benchmark import benchmark
from ..common.vision import F

Expand Down Expand Up @@ -44,7 +45,7 @@ def __init__(self, crop_size, mode="C"):

def __call__(self, imgs):
"""apply"""
return [self.crop(img) for img in imgs]
return maybe_parallelize(self.crop, imgs)

def crop(self, img):
h, w = img.shape[:2]
Expand Down
3 changes: 2 additions & 1 deletion paddlex/inference/models/image_unwarping/processors.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import numpy as np

from ....utils.parallel import maybe_parallelize
from ...utils.benchmark import benchmark


Expand Down Expand Up @@ -61,7 +62,7 @@ def __call__(
Returns:
List[np.ndarray]: A list of processed images.
"""
return [self.doctr(img) for img in imgs]
return maybe_parallelize(self.doctr, imgs)

def doctr(self, pred: Union[np.ndarray, Tuple[np.ndarray, ...]]) -> np.ndarray:
"""
Expand Down
12 changes: 7 additions & 5 deletions paddlex/inference/models/instance_segmentation/processors.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

from typing import List, Optional

from ....utils.parallel import maybe_parallelize
from ...utils.benchmark import benchmark
from ..object_detection.processors import restructured_boxes

Expand Down Expand Up @@ -91,12 +92,13 @@ def __call__(
Returns:
List[Boxes]: The list of post-processed detection boxes.
"""
outputs = []
for data, output in zip(datas, batch_outputs):
boxes_masks = self.apply(
outputs = maybe_parallelize(
lambda data, output: self.apply(
img_size=data["ori_img_size"],
**output,
threshold=threshold if threshold is not None else self.threshold
)
outputs.append(boxes_masks)
),
datas,
batch_outputs,
)
return outputs
18 changes: 13 additions & 5 deletions paddlex/inference/models/keypoint_detection/processors.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from numpy import ndarray

from ....utils.deps import class_requires_deps, is_dep_available
from ....utils.parallel import maybe_parallelize
from ...utils.benchmark import benchmark
from ..object_detection.processors import get_affine_transform

Expand Down Expand Up @@ -143,7 +144,7 @@ def apply(
return img, center, scale

def __call__(self, datas: List[dict]) -> List[dict]:
for data in datas:
def _process_data(data):
ori_img = data["img"]
if "ori_img" not in data:
data["ori_img"] = ori_img
Expand All @@ -160,6 +161,10 @@ def __call__(self, datas: List[dict]) -> List[dict]:
img_size = [img.shape[1], img.shape[0]]
data["img_size"] = img_size # [size_w, size_h]

return data

datas = maybe_parallelize(_process_data, datas)

return datas


Expand Down Expand Up @@ -234,10 +239,13 @@ def __call__(self, batch_outputs: List[dict], datas: List[dict]) -> List[Kpts]:
Returns:
List[dict]: The list of post-processed keypoints.
"""
return [
self.apply(output["heatmap"], data["center"], data["scale"])
for data, output in zip(datas, batch_outputs)
]
return maybe_parallelize(
lambda data, output: self.apply(
output["heatmap"], data["center"], data["scale"]
),
datas,
batch_outputs,
)

def get_final_preds(
self, heatmaps: ndarray, center: ndarray, scale: ndarray, kernelsize: int = 3
Expand Down
Loading