-
Notifications
You must be signed in to change notification settings - Fork 37
Fallback janitor job two #508
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: 2.0
Are you sure you want to change the base?
Changes from all commits
02e9613
862fd4b
ad540bb
703c78f
cae0c23
a196242
2aabf0b
f4d92e5
580d1a9
f405ca4
07457ba
9c555b1
a04b5ec
e236cbb
6dd0e58
dd85c4c
0c7cab5
799d8e8
e410cc6
323f26b
5ecb385
4b4ef06
9a4a083
324e43b
f0a620b
5ecea33
4d8de70
d57123e
c7c6717
5f1452f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
@@ -0,0 +1,211 @@ | ||||
import time | ||||
import os | ||||
import posixpath | ||||
import pyarrow.fs | ||||
from pyarrow.fs import FileSelector, FileType | ||||
from itertools import chain | ||||
from deltacat.storage.model.transaction import Transaction | ||||
from deltacat.utils.filesystem import resolve_path_and_filesystem | ||||
from deltacat.constants import ( | ||||
# CURRENTLY_CLEANING, | ||||
# SUCCESSFULLY_CLEANED, | ||||
# TIMEOUT_TXN, | ||||
TXN_DIR_NAME, | ||||
RUNNING_TXN_DIR_NAME, | ||||
FAILED_TXN_DIR_NAME, | ||||
TXN_PART_SEPARATOR, | ||||
) | ||||
from deltacat.storage.model.types import ( | ||||
TransactionState | ||||
) | ||||
import logging | ||||
from deltacat import logs | ||||
|
||||
logger = logs.configure_deltacat_logger(logging.getLogger(__name__)) | ||||
|
||||
|
||||
def brute_force_search_matching_metafiles( | ||||
dirty_files_names, filesystem: pyarrow.fs.FileSystem, catalog_root | ||||
): | ||||
txn_dir_name = TXN_DIR_NAME | ||||
# collect transaction ids of the files | ||||
transaction_ids = [] | ||||
for dirty_file in dirty_files_names: | ||||
parts = dirty_file.split(TXN_PART_SEPARATOR) | ||||
if len(parts) < 2: | ||||
continue | ||||
transaction_ids.append(parts[1]) | ||||
|
||||
def recursive_search(path): | ||||
try: | ||||
selector = FileSelector(path, recursive=False) | ||||
entries = filesystem.get_file_info(selector) | ||||
except Exception as e: | ||||
logger.error(f"Error listing directory '{path}': {e}") | ||||
return | ||||
|
||||
for entry in entries: | ||||
base_name = posixpath.basename(entry.path) | ||||
if entry.type == FileType.File: | ||||
for transaction_id in transaction_ids: | ||||
# Look for transaction_id in the filename | ||||
if transaction_id in base_name: | ||||
try: | ||||
filesystem.delete_file(entry.path) | ||||
logger.debug(f"Deleted file: {entry.path}") | ||||
except Exception as e: | ||||
logger.error(f"Error deleting file '{entry.path}': {e}") | ||||
|
||||
elif entry.type == FileType.Directory: | ||||
# Skip directories that match txn_dir_name | ||||
if posixpath.basename(entry.path) == txn_dir_name: | ||||
logger.debug(f"Skipping directory: {entry.path}") | ||||
continue | ||||
recursive_search(entry.path) | ||||
|
||||
# Start recursive search from the catalog root | ||||
recursive_search(catalog_root) | ||||
|
||||
# renaming to successful completion | ||||
for dirty_file in dirty_files_names: | ||||
failed_txn_log_dir = posixpath.join( | ||||
catalog_root, TXN_DIR_NAME, FAILED_TXN_DIR_NAME | ||||
) | ||||
old_log_path = posixpath.join(failed_txn_log_dir, dirty_file) | ||||
|
||||
# new_filename = dirty_file.replace(TIMEOUT_TXN, SUCCESSFULLY_CLEANED) | ||||
new_log_path = posixpath.join(failed_txn_log_dir, dirty_file) | ||||
try: | ||||
filesystem.move(old_log_path, new_log_path) | ||||
logger.debug(f"Renamed file from {old_log_path} to {new_log_path}") | ||||
except Exception as e: | ||||
logger.error(f"Error renaming file '{old_log_path}': {e}") | ||||
|
||||
|
||||
def janitor_delete_timed_out_transaction(catalog_root: str) -> None: | ||||
""" | ||||
Traverse the running transactions directory and move transactions that have been | ||||
running longer than the threshold into the failed transactions directory. | ||||
""" | ||||
catalog_root_normalized, filesystem = resolve_path_and_filesystem(catalog_root) | ||||
|
||||
txn_log_dir = posixpath.join(catalog_root_normalized, TXN_DIR_NAME) | ||||
running_txn_log_dir = posixpath.join(txn_log_dir, RUNNING_TXN_DIR_NAME) | ||||
failed_txn_log_dir = posixpath.join(txn_log_dir, FAILED_TXN_DIR_NAME) | ||||
|
||||
|
||||
dirty_files = [] | ||||
|
||||
running_txn_file_selector = FileSelector(running_txn_log_dir, recursive=False) | ||||
running_txn_info_list = filesystem.get_file_info(running_txn_file_selector) | ||||
|
||||
for running_txn_info in running_txn_info_list: | ||||
try: | ||||
filename = posixpath.basename(running_txn_info.path) | ||||
parts = filename.split(TXN_PART_SEPARATOR) | ||||
end_time_str = parts[-1] | ||||
end_time = float(end_time_str) | ||||
current_time = time.time_ns() | ||||
if end_time <= current_time: | ||||
src_path = running_txn_info.path | ||||
#new_filename = f"{filename}{TXN_PART_SEPARATOR}{TIMEOUT_TXN}" | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||
new_filename = f"{filename}" | ||||
dest_path = posixpath.join(failed_txn_log_dir, new_filename) | ||||
|
||||
# Move the file using copy and delete | ||||
with filesystem.open_input_file(src_path) as src_file: | ||||
contents = src_file.read() | ||||
|
||||
with filesystem.open_output_stream(dest_path) as dest_file: | ||||
dest_file.write(contents) | ||||
filesystem.delete_file(src_path) | ||||
|
||||
dirty_files.append(new_filename) | ||||
|
||||
except Exception as e: | ||||
logger.error( | ||||
f"Error cleaning failed transaction '{running_txn_info.path}': {e}" | ||||
) | ||||
|
||||
# Pass catalog_root to the brute force search so it searches from the right place | ||||
brute_force_search_matching_metafiles( | ||||
dirty_files, filesystem, catalog_root_normalized | ||||
) | ||||
|
||||
|
||||
def janitor_remove_files_in_failed( | ||||
catalog_root: str, filesystem: pyarrow.fs.FileSystem = None | ||||
) -> None: | ||||
""" | ||||
Cleans up metafiles and locator files associated with failed transactions. | ||||
""" | ||||
if filesystem is None: | ||||
catalog_root_normalized, filesystem = resolve_path_and_filesystem(catalog_root) | ||||
else: | ||||
catalog_root_normalized, filesystem = resolve_path_and_filesystem( | ||||
catalog_root, filesystem | ||||
) | ||||
|
||||
txn_log_dir = posixpath.join(catalog_root_normalized, TXN_DIR_NAME) | ||||
failed_txn_log_dir = posixpath.join(txn_log_dir, FAILED_TXN_DIR_NAME) | ||||
running_txn_log_dir = posixpath.join(txn_log_dir, RUNNING_TXN_DIR_NAME) | ||||
filesystem.create_dir(failed_txn_log_dir, recursive=True) | ||||
|
||||
failed_txn_file_selector = FileSelector(failed_txn_log_dir, recursive=False) | ||||
failed_txn_info_list = filesystem.get_file_info(failed_txn_file_selector) | ||||
|
||||
for failed_txn_info in failed_txn_info_list: | ||||
try: | ||||
txn = Transaction.read(failed_txn_info.path, filesystem) | ||||
failed_txn_basename = posixpath.basename(failed_txn_info.path) | ||||
should_process = True | ||||
try: | ||||
if txn.state(catalog_root_normalized) == TransactionState.PURGED: | ||||
should_process = False | ||||
except Exception as e: | ||||
logger.error("Could not check attribute") | ||||
if should_process: | ||||
# Process if the file is marked as currently cleaning. | ||||
txnid = txn.id | ||||
|
||||
if txn.state(catalog_root_normalized) == TransactionState.FAILED: | ||||
|
||||
txnid = txn.id | ||||
|
||||
operations = txn["operations"] | ||||
known_write_paths = chain.from_iterable( | ||||
(op["metafile_write_paths"] + op["locator_write_paths"]) | ||||
for op in operations | ||||
) | ||||
|
||||
for write_path in known_write_paths: | ||||
try: | ||||
filesystem.delete_file(write_path) | ||||
except Exception as e: | ||||
logger.error(f"Failed to delete file '{write_path}': {e}") | ||||
|
||||
new_filename = f"{txnid}" | ||||
|
||||
new_failed_txn_log_file_path = posixpath.join( | ||||
failed_txn_log_dir, new_filename | ||||
) | ||||
running_txn_log_path = posixpath.join( | ||||
running_txn_log_dir, new_filename | ||||
) | ||||
|
||||
os.delete(running_txn_log_path) | ||||
|
||||
os.rename(failed_txn_info.path, new_failed_txn_log_file_path) | ||||
logger.debug( | ||||
f"Cleaned up failed transaction: {failed_txn_basename}" | ||||
) | ||||
|
||||
except Exception as e: | ||||
logger.error( | ||||
f"Could not read transaction '{failed_txn_info.path}', skipping: {e}" | ||||
) | ||||
|
||||
|
||||
def janitor_job(catalog_root_dir: str) -> None: | ||||
janitor_delete_timed_out_transaction(catalog_root_dir) | ||||
janitor_remove_files_in_failed(catalog_root_dir) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -102,3 +102,19 @@ | |
DEFAULT_STREAM_ID = "stream" | ||
DEFAULT_PARTITION_ID = "partition" | ||
DEFAULT_PARTITION_VALUES = ["default"] | ||
|
||
# Transaction Status constants | ||
SUCCESSFULLY_CLEANED = "cleaned" | ||
CURRENTLY_CLEANING = "cleaning" | ||
TIMEOUT_TXN = "timedout" | ||
Comment on lines
+107
to
+109
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we move these to a new There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hi Patrick, I added this to types.py |
||
|
||
#operation timeout constants | ||
OPERATION_TIMEOUTS = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you briefly explain how are these timeout values generated? Are these just like place holder values right now or the value actually means that should be amount of time it takes to complete the operation? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These are fixed values that we decided to use. We picked arbitrarily large values per Patrick's advice, but @SashankBalusu is currently working on something to get more accurate values, that he could give more information on in a bit. It is difficult to determine a timeout time based on the size of the data, however, so these values will be constant across all transaction operations once they are finalized. |
||
"create": 5, | ||
"update": 3, | ||
"delete": 4, | ||
"read_siblings": 2, | ||
"read_children": 2, | ||
"read_latest": 3, | ||
"read_exists": 1, | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.