Skip to content

Fallback janitor job two #508

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 30 commits into
base: 2.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
02e9613
move old running transaction to failed
SashankBalusu Feb 27, 2025
862fd4b
Added test for secondary janitor job. Need to figure out how to acces…
andrewtabbert Mar 2, 2025
ad540bb
Merge branch 'fallback-janitor-job-two' of https://github.com/nsaadhv…
andrewtabbert Mar 2, 2025
703c78f
Started writing test_jainotr_job_running_to_failed to ensure that our…
andrewtabbert Mar 4, 2025
cae0c23
test
andrewtabbert Mar 4, 2025
a196242
Added dummy transaction helper functions to more rigorously test jani…
andrewtabbert Mar 4, 2025
2aabf0b
Added dummy transaction function to test_janitor.py to test that jani…
andrewtabbert Mar 4, 2025
f4d92e5
Finished create_failed_dummy_transactions in test_janitor.py
andrewtabbert Mar 4, 2025
580d1a9
Started implementing janitor_remove_files_in_failed in janitor.py. Th…
andrewtabbert Mar 4, 2025
f405ca4
Finished implentation of janitor_remove_files_in_failed, file will ch…
andrewtabbert Mar 5, 2025
07457ba
Update
andrewtabbert Mar 5, 2025
9c555b1
Finished writing janitor_remove_files_in_failed. Scrapped Locator app…
andrewtabbert Mar 6, 2025
a04b5ec
Implemented janitor_delete_timed_out_transaction
andrewtabbert Mar 6, 2025
e236cbb
Update janitor.py
SashankBalusu Mar 6, 2025
6dd0e58
delete metafiles associated with failed directory
SashankBalusu Mar 8, 2025
dd85c4c
Merge branch '2.0' into fallback-janitor-job-two
andrewtabbert Mar 8, 2025
0c7cab5
Connecting catalog root to filesystem with bugs in mocker for file sy…
andrewtabbert Mar 9, 2025
799d8e8
moved conftest, added tests for janitor job and refined janitor job
SashankBalusu Mar 12, 2025
e410cc6
refactor heartbeat to be based on operations
SashankBalusu Mar 13, 2025
323f26b
Implementing attributes to keep track of whether file has been succes…
andrewtabbert Mar 13, 2025
5ecb385
Merge branch 'fallback-janitor-job-two' of https://github.com/nsaadhv…
andrewtabbert Mar 13, 2025
4b4ef06
refined logic for failed transactions
SashankBalusu Mar 13, 2025
9a4a083
deleted outdated janitor tests, renamed new file
SashankBalusu Mar 13, 2025
324e43b
refined janitor job to properly tag in progress transactions
SashankBalusu Mar 17, 2025
f0a620b
moved operation timeout to be in constants
SashankBalusu Mar 17, 2025
5ecea33
Added test_janitor_handles_multiple_timed_out_transactions, test_jani…
andrewtabbert Mar 20, 2025
4d8de70
Addressed PR comments, mainly minor issues and removing code pieces o…
025rhu Apr 3, 2025
d57123e
Refactor transaction handling: Remove state tags from transaction IDs…
andrewtabbert Apr 15, 2025
c7c6717
moved conftest.py back to deltacast/tests/storage/conftest.py from de…
andrewtabbert Apr 16, 2025
5f1452f
Moved conftests.py to be global in tests/
025rhu Apr 22, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
211 changes: 211 additions & 0 deletions deltacat/compute/janitor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
import time
import os
import posixpath
import pyarrow.fs
from pyarrow.fs import FileSelector, FileType
from itertools import chain
from deltacat.storage.model.transaction import Transaction
from deltacat.utils.filesystem import resolve_path_and_filesystem
from deltacat.constants import (
# CURRENTLY_CLEANING,
# SUCCESSFULLY_CLEANED,
# TIMEOUT_TXN,
Comment on lines +10 to +12
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# CURRENTLY_CLEANING,
# SUCCESSFULLY_CLEANED,
# TIMEOUT_TXN,

TXN_DIR_NAME,
RUNNING_TXN_DIR_NAME,
FAILED_TXN_DIR_NAME,
TXN_PART_SEPARATOR,
)
from deltacat.storage.model.types import (
TransactionState
)
import logging
from deltacat import logs

logger = logs.configure_deltacat_logger(logging.getLogger(__name__))


def brute_force_search_matching_metafiles(
dirty_files_names, filesystem: pyarrow.fs.FileSystem, catalog_root
):
txn_dir_name = TXN_DIR_NAME
# collect transaction ids of the files
transaction_ids = []
for dirty_file in dirty_files_names:
parts = dirty_file.split(TXN_PART_SEPARATOR)
if len(parts) < 2:
continue
transaction_ids.append(parts[1])

def recursive_search(path):
try:
selector = FileSelector(path, recursive=False)
entries = filesystem.get_file_info(selector)
except Exception as e:
logger.error(f"Error listing directory '{path}': {e}")
return

for entry in entries:
base_name = posixpath.basename(entry.path)
if entry.type == FileType.File:
for transaction_id in transaction_ids:
# Look for transaction_id in the filename
if transaction_id in base_name:
try:
filesystem.delete_file(entry.path)
logger.debug(f"Deleted file: {entry.path}")
except Exception as e:
logger.error(f"Error deleting file '{entry.path}': {e}")

elif entry.type == FileType.Directory:
# Skip directories that match txn_dir_name
if posixpath.basename(entry.path) == txn_dir_name:
logger.debug(f"Skipping directory: {entry.path}")
continue
recursive_search(entry.path)

# Start recursive search from the catalog root
recursive_search(catalog_root)

# renaming to successful completion
for dirty_file in dirty_files_names:
failed_txn_log_dir = posixpath.join(
catalog_root, TXN_DIR_NAME, FAILED_TXN_DIR_NAME
)
old_log_path = posixpath.join(failed_txn_log_dir, dirty_file)

# new_filename = dirty_file.replace(TIMEOUT_TXN, SUCCESSFULLY_CLEANED)
new_log_path = posixpath.join(failed_txn_log_dir, dirty_file)
try:
filesystem.move(old_log_path, new_log_path)
logger.debug(f"Renamed file from {old_log_path} to {new_log_path}")
except Exception as e:
logger.error(f"Error renaming file '{old_log_path}': {e}")


def janitor_delete_timed_out_transaction(catalog_root: str) -> None:
"""
Traverse the running transactions directory and move transactions that have been
running longer than the threshold into the failed transactions directory.
"""
catalog_root_normalized, filesystem = resolve_path_and_filesystem(catalog_root)

txn_log_dir = posixpath.join(catalog_root_normalized, TXN_DIR_NAME)
running_txn_log_dir = posixpath.join(txn_log_dir, RUNNING_TXN_DIR_NAME)
failed_txn_log_dir = posixpath.join(txn_log_dir, FAILED_TXN_DIR_NAME)


dirty_files = []

running_txn_file_selector = FileSelector(running_txn_log_dir, recursive=False)
running_txn_info_list = filesystem.get_file_info(running_txn_file_selector)

for running_txn_info in running_txn_info_list:
try:
filename = posixpath.basename(running_txn_info.path)
parts = filename.split(TXN_PART_SEPARATOR)
end_time_str = parts[-1]
end_time = float(end_time_str)
current_time = time.time_ns()
if end_time <= current_time:
src_path = running_txn_info.path
#new_filename = f"{filename}{TXN_PART_SEPARATOR}{TIMEOUT_TXN}"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
#new_filename = f"{filename}{TXN_PART_SEPARATOR}{TIMEOUT_TXN}"

new_filename = f"{filename}"
dest_path = posixpath.join(failed_txn_log_dir, new_filename)

# Move the file using copy and delete
with filesystem.open_input_file(src_path) as src_file:
contents = src_file.read()

with filesystem.open_output_stream(dest_path) as dest_file:
dest_file.write(contents)
filesystem.delete_file(src_path)

dirty_files.append(new_filename)

except Exception as e:
logger.error(
f"Error cleaning failed transaction '{running_txn_info.path}': {e}"
)

# Pass catalog_root to the brute force search so it searches from the right place
brute_force_search_matching_metafiles(
dirty_files, filesystem, catalog_root_normalized
)


def janitor_remove_files_in_failed(
catalog_root: str, filesystem: pyarrow.fs.FileSystem = None
) -> None:
"""
Cleans up metafiles and locator files associated with failed transactions.
"""
if filesystem is None:
catalog_root_normalized, filesystem = resolve_path_and_filesystem(catalog_root)
else:
catalog_root_normalized, filesystem = resolve_path_and_filesystem(
catalog_root, filesystem
)

txn_log_dir = posixpath.join(catalog_root_normalized, TXN_DIR_NAME)
failed_txn_log_dir = posixpath.join(txn_log_dir, FAILED_TXN_DIR_NAME)
running_txn_log_dir = posixpath.join(txn_log_dir, RUNNING_TXN_DIR_NAME)
filesystem.create_dir(failed_txn_log_dir, recursive=True)

failed_txn_file_selector = FileSelector(failed_txn_log_dir, recursive=False)
failed_txn_info_list = filesystem.get_file_info(failed_txn_file_selector)

for failed_txn_info in failed_txn_info_list:
try:
txn = Transaction.read(failed_txn_info.path, filesystem)
failed_txn_basename = posixpath.basename(failed_txn_info.path)
should_process = True
try:
if txn.state(catalog_root_normalized) == TransactionState.PURGED:
should_process = False
except Exception as e:
logger.error("Could not check attribute")
if should_process:
# Process if the file is marked as currently cleaning.
txnid = txn.id

if txn.state(catalog_root_normalized) == TransactionState.FAILED:

txnid = txn.id

operations = txn["operations"]
known_write_paths = chain.from_iterable(
(op["metafile_write_paths"] + op["locator_write_paths"])
for op in operations
)

for write_path in known_write_paths:
try:
filesystem.delete_file(write_path)
except Exception as e:
logger.error(f"Failed to delete file '{write_path}': {e}")

new_filename = f"{txnid}"

new_failed_txn_log_file_path = posixpath.join(
failed_txn_log_dir, new_filename
)
running_txn_log_path = posixpath.join(
running_txn_log_dir, new_filename
)

os.delete(running_txn_log_path)

os.rename(failed_txn_info.path, new_failed_txn_log_file_path)
logger.debug(
f"Cleaned up failed transaction: {failed_txn_basename}"
)

except Exception as e:
logger.error(
f"Could not read transaction '{failed_txn_info.path}', skipping: {e}"
)


def janitor_job(catalog_root_dir: str) -> None:
janitor_delete_timed_out_transaction(catalog_root_dir)
janitor_remove_files_in_failed(catalog_root_dir)
16 changes: 16 additions & 0 deletions deltacat/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,19 @@
DEFAULT_STREAM_ID = "stream"
DEFAULT_PARTITION_ID = "partition"
DEFAULT_PARTITION_VALUES = ["default"]

# Transaction Status constants
SUCCESSFULLY_CLEANED = "cleaned"
CURRENTLY_CLEANING = "cleaning"
TIMEOUT_TXN = "timedout"
Comment on lines +107 to +109
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move these to a new class TransactionState(str, Enum) type in types.py, and change the supported values to simply:
FAILED (equivalent to CLEANING), PURGED (equivalent to CLEANED), TIMEOUT, RUNNING, and SUCCESS?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Patrick, I added this to types.py


#operation timeout constants
OPERATION_TIMEOUTS = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you briefly explain how are these timeout values generated? Are these just like place holder values right now or the value actually means that should be amount of time it takes to complete the operation?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are fixed values that we decided to use. We picked arbitrarily large values per Patrick's advice, but @SashankBalusu is currently working on something to get more accurate values, that he could give more information on in a bit.

It is difficult to determine a timeout time based on the size of the data, however, so these values will be constant across all transaction operations once they are finalized.

"create": 5,
"update": 3,
"delete": 4,
"read_siblings": 2,
"read_children": 2,
"read_latest": 3,
"read_exists": 1,
}
Loading