Skip to content

[Converter] Equality Deletes Conversion with Enforce Primary Key Uniqueness support #552

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged

Conversation

Zyiqin-Miranda
Copy link
Member

Summary

This PR adds support for converting equality deletes while enforcing primary key uniqueness. Test cases added for correctness check.

How it works?
For Iceberg table, within each partition:

  1. Fetch all equality deletes and data files that belong to this partition.
  2. Group equality deletes and applicable data files together. By applicable, according to Iceberg spec, equality deletes can apply to all data files having strictly smaller sequence number.
  3. Sort equality delete files and data files based off sequence number. When downloaded, data files table get additional file_path and ordered record index column.
  4. After converting, we produce position delete table A that delete records based off equality deletes. Data table a contains all remaining records.
  5. Append to Data table a for any data files that don't got applied by equality deletes and get data table b.
  6. Drop duplicates on Data table b, append unique record index based off final table to ensure deterministic record get kept. Produced position delete table B that represent duplicate primary key meant to be deleted.
  7. Final output will be position delete files containing position delete A + B

Rationale

Explain the reasoning behind the changes and their benefits to the project.

Changes

List the major changes made in this pull request.

Impact

Discuss any potential impacts the changes may have on existing functionalities.

Testing

Describe how the changes have been tested, including both automated and manual testing strategies.
If this is a bugfix, explain how the fix has been tested to ensure the bug is resolved without introducing new issues.

Regression Risk

If this is a bugfix, assess the risk of regression caused by this fix and steps taken to mitigate it.

Checklist

  • Unit tests covering the changes have been added

    • If this is a bugfix, regression tests have been added
  • E2E testing has been performed

Additional Notes

Any additional information or context relevant to this PR.

@pdames pdames self-requested a review May 20, 2025 18:16
Copy link
Member

@pdames pdames left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Just a few minor comments to address, then feel free to merge. Cool to see Iceberg equality delete support working together with enforcement of unique primary keys!

@@ -639,3 +645,195 @@ def test_converter_pos_delete_multiple_identifier_fields_success(

# Assert elements are same disregard ordering in list
assert sorted(pk_combined_res) == sorted(expected_result_tuple_list)


Copy link
Member

@pdames pdames May 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a test case to ensure that our existing file-sequence-number-based sort is stable when multiple files share the same sequence number?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! Will add a secondary sort key based off file path, I noticed the files Spark write out have some sort of prefix in the file name that represents the order of this file within same snapshot, e.g: s3://xxxx/partitionKey=8/primaryKey_bucket=83/00000-58-3229fae0-4316-4ea4-9e09-367a5d1b96f9-00001.parquet, the bolded part is observed to be the order of files added within this transaction.

Comment on lines 222 to 224
f"Length of data file table remaining plus length of pos delete table should match origin data file table length"
f"But got {len(position_delete_table)} pos delete, {len(remaining_data_table)} equality delete, "
f"doesn't equal to original data table length: {len(data_file_table)}"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
f"Length of data file table remaining plus length of pos delete table should match origin data file table length"
f"But got {len(position_delete_table)} pos delete, {len(remaining_data_table)} equality delete, "
f"doesn't equal to original data table length: {len(data_file_table)}"
f"Expected undeleted data file record count plus length of pos deletes to match original data file record count of {len(data_file_table)}, "
f"but found {len(position_delete_table)} pos deletes + {len(remaining_data_table)} equality deletes."

Comment on lines 185 to 186
f"Length of all data files list: {len(set(all_data_files))} should be greater than"
f"Length of corresponding data files list: {len(set(data_files_downloaded))}"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
f"Length of all data files list: {len(set(all_data_files))} should be greater than"
f"Length of corresponding data files list: {len(set(data_files_downloaded))}"
f"Length of all data files ({len(set(all_data_files))}) should never be less than "
f"the length of candidate equality delete data files ({len(set(data_files_downloaded))})"

replace_delete_snapshot.append_data_file(data_file)
if to_be_deleted_files:
for delete_file in to_be_deleted_files:
print(f"debug_delete_file_snapshot:{delete_file}")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be removed (or converted to a debug log)?

with append_delete_files_override(tx.update_snapshot()) as append_snapshot:
if new_position_delete_files:
for data_file in new_position_delete_files:
print(f"debug_append_snapshot_data_file:{data_file}")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be removed (or converted to a debug log)?

@Zyiqin-Miranda
Copy link
Member Author

Zyiqin-Miranda commented May 28, 2025

Thanks @pdames, could you review the changes in this commit? Passing filesystem in case of path doesn't contain info to infer which filesystem it is but user explicitly pass in a filesystem.

Copy link
Member

@pdames pdames left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch on the missing user-specified filesystem assignment in pyarrow utils!

@Zyiqin-Miranda Zyiqin-Miranda merged commit 055cb9c into ray-project:2.0 May 30, 2025
3 checks passed
@Zyiqin-Miranda Zyiqin-Miranda deleted the find-deletes-with-duplicates branch May 30, 2025 19:47
Zyiqin-Miranda added a commit to Zyiqin-Miranda/deltacat that referenced this pull request May 30, 2025
…ueness support (ray-project#552)

* Rebase changes

* [Converter] Add assertion for correctness; Add aggregate stats and additional logging; Code clean-up

* [Converter] Equality Convertion with Enforce Primary Key Uniqueness support

* [Converter] Additional code clean-up

* Address comments

* [Bug fix] Pass filesystem to resolve_path_and_filesystem function

* Remove print statement

---------

Co-authored-by: Miranda <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants