-
Notifications
You must be signed in to change notification settings - Fork 1k
Example to demonstrate inter-parquet-file pipelining using hybrid scan APIs #20722
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
Auto-sync is disabled for draft pull requests in this repository. Workflows must be run manually. Contributors can view more details about this message here. |
…123/cudf into fea/hybrid-scan-pipeline
…d wrappers (#20861) Contributes to #20722 and #20879 This PR replaces the use of `thrust::copy_if` and `thrust::count_if` in Parquet and Hybrid scan readers with custom `CUB` based implementations using pinned memory to copy the result from device. Note: I will create one last PR after this one replacing `thrust` utils with their (CUB based) cudf counterparts in `cudf/detail/utilities/algorithm.cuh` across libcudf. Authors: - Muhammad Haseeb (https://github.com/mhaseeb123) - https://github.com/apps/pre-commit-ci - Vukasin Milovanovic (https://github.com/vuule) Approvers: - Bradley Dice (https://github.com/bdice) - Yunsong Wang (https://github.com/PointKernel) - David Wendt (https://github.com/davidwendt) - Paul Mattione (https://github.com/pmattione-nvidia) - MithunR (https://github.com/mythrocks) URL: #20861
Contributes to #20722 This PR replaces the use of small host vectors with pinned vectors to avoid pageable copies and improve pipeline performance when reading parquet files using multiple threads (each using a separate non-blocking stream) Authors: - Muhammad Haseeb (https://github.com/mhaseeb123) Approvers: - Bradley Dice (https://github.com/bdice) - Nghia Truong (https://github.com/ttnghia) - Vukasin Milovanovic (https://github.com/vuule) URL: #20820
cpp/src/io/parquet/reader_impl.cpp
Outdated
| cudf::detail::cuda_memcpy_async( | ||
| cudf::host_span<size_t>(h_initial_str_offsets.data(), initial_str_offsets.size()), | ||
| cudf::device_span<size_t const>(initial_str_offsets.data(), initial_str_offsets.size()), | ||
| cudf::host_span<size_t>{h_initial_str_offsets.data(), initial_str_offsets.size()}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Simply using {} instead of ()
| auto host_null_masks = std::vector<bitmask_type*>{}; | ||
| auto host_begin_bits = std::vector<cudf::size_type>{}; | ||
| auto host_end_bits = std::vector<cudf::size_type>{}; | ||
| auto null_masks = std::vector<bitmask_type*>{}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Simply changed host_xx to xx and instead using a pinned_ prefix for the pinned versions below
| return bitmask; | ||
| } else { | ||
| auto bitmask = cudf::detail::make_host_vector<bitmask_type>(num_bitmasks, stream); | ||
| auto bitmask = cudf::detail::make_pinned_vector_async<bitmask_type>(num_bitmasks, stream); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added the missed host to pinned conversion.
| auto resource = create_memory_resource(is_pool_used); | ||
| auto default_stream = cudf::get_default_stream(); | ||
| auto stream_pool = rmm::cuda_stream_pool(thread_count); | ||
| auto stream_pool = rmm::cuda_stream_pool(thread_count, rmm::cuda_stream::flags::non_blocking); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Create non-blocking streams
| "input source == thread count\n"; | ||
| for (size_t idx = 0; thread_count > static_cast<int>(parquet_files.size()); idx++) { | ||
| parquet_files.emplace_back(parquet_files[idx % initial_size]); | ||
| if (parquet_files.size() < thread_count) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only print that we are appending the sources if we need to
|
pre-commit.ci autofix |
|
pre-commit.ci autofix |
Description
This PR adds a new example to demonstrate pipelining when reading parquet sources with the new hybrid scan reader in multithreaded environment.
Checklist
count_ifandcopy_ifwith CUB + pinned memory based wrappers #20861 before thisthrust::reduce_by_keywith CUB + pinned memory based wrapper #20860 before thissetup_page_indexin hybrid scan reader #20721 before this