-
Notifications
You must be signed in to change notification settings - Fork 36
[Converter] Equality Deletes Conversion with Enforce Primary Key Uniqueness support #552
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Converter] Equality Deletes Conversion with Enforce Primary Key Uniqueness support #552
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Just a few minor comments to address, then feel free to merge. Cool to see Iceberg equality delete support working together with enforcement of unique primary keys!
@@ -639,3 +645,195 @@ def test_converter_pos_delete_multiple_identifier_fields_success( | |||
|
|||
# Assert elements are same disregard ordering in list | |||
assert sorted(pk_combined_res) == sorted(expected_result_tuple_list) | |||
|
|||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a test case to ensure that our existing file-sequence-number-based sort is stable when multiple files share the same sequence number?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch! Will add a secondary sort key based off file path, I noticed the files Spark write out have some sort of prefix in the file name that represents the order of this file within same snapshot, e.g: s3://xxxx/partitionKey=8/primaryKey_bucket=83/00000-58-3229fae0-4316-4ea4-9e09-367a5d1b96f9-00001.parquet, the bolded part is observed to be the order of files added within this transaction.
f"Length of data file table remaining plus length of pos delete table should match origin data file table length" | ||
f"But got {len(position_delete_table)} pos delete, {len(remaining_data_table)} equality delete, " | ||
f"doesn't equal to original data table length: {len(data_file_table)}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
f"Length of data file table remaining plus length of pos delete table should match origin data file table length" | |
f"But got {len(position_delete_table)} pos delete, {len(remaining_data_table)} equality delete, " | |
f"doesn't equal to original data table length: {len(data_file_table)}" | |
f"Expected undeleted data file record count plus length of pos deletes to match original data file record count of {len(data_file_table)}, " | |
f"but found {len(position_delete_table)} pos deletes + {len(remaining_data_table)} equality deletes." |
f"Length of all data files list: {len(set(all_data_files))} should be greater than" | ||
f"Length of corresponding data files list: {len(set(data_files_downloaded))}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
f"Length of all data files list: {len(set(all_data_files))} should be greater than" | |
f"Length of corresponding data files list: {len(set(data_files_downloaded))}" | |
f"Length of all data files ({len(set(all_data_files))}) should never be less than " | |
f"the length of candidate equality delete data files ({len(set(data_files_downloaded))})" |
replace_delete_snapshot.append_data_file(data_file) | ||
if to_be_deleted_files: | ||
for delete_file in to_be_deleted_files: | ||
print(f"debug_delete_file_snapshot:{delete_file}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be removed (or converted to a debug log)?
with append_delete_files_override(tx.update_snapshot()) as append_snapshot: | ||
if new_position_delete_files: | ||
for data_file in new_position_delete_files: | ||
print(f"debug_append_snapshot_data_file:{data_file}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be removed (or converted to a debug log)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch on the missing user-specified filesystem assignment in pyarrow utils!
…ueness support (ray-project#552) * Rebase changes * [Converter] Add assertion for correctness; Add aggregate stats and additional logging; Code clean-up * [Converter] Equality Convertion with Enforce Primary Key Uniqueness support * [Converter] Additional code clean-up * Address comments * [Bug fix] Pass filesystem to resolve_path_and_filesystem function * Remove print statement --------- Co-authored-by: Miranda <[email protected]>
Summary
This PR adds support for converting equality deletes while enforcing primary key uniqueness. Test cases added for correctness check.
How it works?
For Iceberg table, within each partition:
Rationale
Explain the reasoning behind the changes and their benefits to the project.
Changes
List the major changes made in this pull request.
Impact
Discuss any potential impacts the changes may have on existing functionalities.
Testing
Describe how the changes have been tested, including both automated and manual testing strategies.
If this is a bugfix, explain how the fix has been tested to ensure the bug is resolved without introducing new issues.
Regression Risk
If this is a bugfix, assess the risk of regression caused by this fix and steps taken to mitigate it.
Checklist
Unit tests covering the changes have been added
E2E testing has been performed
Additional Notes
Any additional information or context relevant to this PR.