-
Notifications
You must be signed in to change notification settings - Fork 36
Fallback janitor job two #508
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: 2.0
Are you sure you want to change the base?
Conversation
…s catalog so that I can run the janitor job in order to execute the test
…i/codebase-deltacat into fallback-janitor-job-two
… current implementation will move all running transactions that have timed out will be moved into the failed transactions directory. Unfinished: need to ensure that I am correctly creating transactions that I can test
…tor job removes failed transactions that have not yet been deleted from the failed transaction directory
…is function will iterate through failed transactions directory and ensure that each file within this directory is properly deleted
…eck each transaction that has not been deleted and ensure id is not in failed transactions directory
…roach and instead took path to deserialized transaction and used read class method from transaction.py toaccess operations from transaction so that they can be deleted
…sfully deleted, has timed out, or has failed and not been deleted yet
…i/codebase-deltacat into fallback-janitor-job-two
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for putting together the implementation for the fallback janitor! Looking forward to see how/where this janitor is called and integrated with existing failed transaction cleanup logic inside Transaction.commit() functions.
One high-level comment is the placement of the janitor, whether it should be part of the deltacat/storage/model/transaction, since it's a cleaning up job for transaction specifically instead of a general compute job? Curious to learn your opinion here.
deltacat/storage/README.md
Outdated
@@ -78,7 +78,7 @@ Note that (except **Immutable ID**) this is the same format used by **Metadata R | |||
|
|||
### Transaction Log Directory | |||
The **Transaction Log Directory** (`${CATALOG_ROOT}/txn`) is a special directory in the **Catalog Root Directory** which holds all successfully committed transactions. It contains one **Transaction Log File** per successful transaction recording transaction details. | |||
``` | |||
`` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Accidentally delete a backtick here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like it! Fixed.
TIMEOUT_TXN = "timedout" | ||
|
||
#operation timeout constants | ||
OPERATION_TIMEOUTS = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you briefly explain how are these timeout values generated? Are these just like place holder values right now or the value actually means that should be amount of time it takes to complete the operation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are fixed values that we decided to use. We picked arbitrarily large values per Patrick's advice, but @SashankBalusu is currently working on something to get more accurate values, that he could give more information on in a bit.
It is difficult to determine a timeout time based on the size of the data, however, so these values will be constant across all transaction operations once they are finalized.
total_time_for_transaction += OPERATION_TIMEOUTS.get(operation.type, 0) | ||
|
||
start_time = float(self.id.split(TXN_PART_SEPARATOR)[0]) | ||
final_time_heartbeat = start_time + (total_time_for_transaction * 1_000_000_000) # Convert seconds to nanoseconds |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we replace the number 1_000_000_000
with the imported constant NANOS_PER_SEC
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense, done!
@@ -591,6 +619,7 @@ def _commit_write( | |||
locator_write_paths = [] | |||
try: | |||
for operation in self.operations: | |||
total_time_for_transaction += OPERATION_TIMEOUTS[operation.type] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like this total_time_for_transaction
get aggregated second time here and is not used anywhere downstream?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it is not used anywhere downstream. Since we have the for loop above on line 603 and 604 summing the heartbeat timeout, we will remove this one.
deltacat/constants.py
Outdated
|
||
# Transaction Status constants | ||
SUCCESSFULLY_CLEANED = "cleaned" | ||
CURRENTLY_CLEANING = "working" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: working
seems a bit unclear to me whether it's the janitor job that's working on this file or the transaction is a working in progress. How about cleaning
or something else that indicates it's a janitor job?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That makes sense, changed to "cleaning."
@@ -410,6 +415,20 @@ def end_time(self) -> Optional[int]: | |||
Returns the end time of the transaction. | |||
""" | |||
return self.get("end_time") | |||
|
|||
def _mark_status(self, status: str) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Curious how/when is this function being called? In addition, for the status of the transaction, would it look more clear that we have a another enum like TransactionStatus==SUCCESS
as additional class attribute?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function was used in a previous implementation of the janitor job, but was not removed when we changed it. We have deleted it now as it is no longer in use. All tests still pass!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you mean to delete the _mark_start_time
method or just create a new _mark_status
method? The _mark_start_time
method is still used by transaction commit
, so it seems like the current code will be broken until we add it back.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops, I did not mean to delete that, I meant to delete _mark_status instead, since it is not used anywhere else in the code.
I've fixed this change!
deltacat/compute/janitor.py
Outdated
if transaction_id in base_name: | ||
try: | ||
filesystem.delete_file(entry.path) | ||
print(f"Deleted file: {entry.path}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we switch to use DeltaCAT logger instead of print statement?
import logging
from deltacat import logs
logger = logs.configure_deltacat_logger(logging.getLogger(__name__))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, thanks for catching that. We've changed all print statements in this file to loggers, setting levels as we saw fit, but please double check us on that!
|
||
txn_filenames = [] | ||
for txn_id in txn_ids: | ||
txn_filename = f"{start_time}{txn_id}{TXN_PART_SEPARATOR}{TXN_PART_SEPARATOR}{time.time_ns()}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this filename be f"{start_time}{TXN_PART_SEPARATOR}{txn_id}{TXN_PART_SEPARATOR}{time.time_ns()}"
instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes! We fixed it, and the tests are still passing.
I think |
Thanks for the context, putting under |
…tor_handles_empty_directories, and altered implementation of brute force search
6647d87
to
5ecea33
Compare
…f prior implementations.
@@ -410,6 +415,20 @@ def end_time(self) -> Optional[int]: | |||
Returns the end time of the transaction. | |||
""" | |||
return self.get("end_time") | |||
|
|||
def _mark_status(self, status: str) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you mean to delete the _mark_start_time
method or just create a new _mark_status
method? The _mark_start_time
method is still used by transaction commit
, so it seems like the current code will be broken until we add it back.
for operation in self.operations: | ||
total_time_for_transaction += OPERATION_TIMEOUTS.get(operation.type, 0) | ||
|
||
start_time = float(self.id.split(TXN_PART_SEPARATOR)[0]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this just be a call to self.start_time()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, just made this change
total_time_for_transaction = 0 | ||
for operation in self.operations: | ||
total_time_for_transaction += OPERATION_TIMEOUTS.get(operation.type, 0) | ||
|
||
start_time = float(self.id.split(TXN_PART_SEPARATOR)[0]) | ||
final_time_heartbeat = start_time + ( | ||
total_time_for_transaction * NANOS_PER_SEC | ||
) # Convert seconds to nanoseconds |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add all logic for converting the heartbeat timeout time to a timeout_time
method in TransactionSystemTimeProvider
, since this method of calculation is only relevant for transaction time providers that use system clock nanos.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just made this edit.
SUCCESSFULLY_CLEANED = "cleaned" | ||
CURRENTLY_CLEANING = "cleaning" | ||
TIMEOUT_TXN = "timedout" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we move these to a new class TransactionState(str, Enum)
type in types.py
, and change the supported values to simply:
FAILED
(equivalent to CLEANING
), PURGED
(equivalent to CLEANED
), TIMEOUT
, RUNNING
, and SUCCESS
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi Patrick, I added this to types.py
failed_txn_log_dir, | ||
self.id, | ||
) | ||
path_ending = f"{self.id}{TXN_PART_SEPARATOR}{CURRENTLY_CLEANING}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of attaching the "CLEANING" and "CLEANED" states to the transaction file directly (which requires subsequent file movements/renames to apply), what do you think about just writing a transaction.state()
helper method that retrieves the current transaction state?
We can change the CLEANING
state to FAILED
, and infer it as the transaction ID being present in both the failed and running directories, then change CLEANED
to PURGED
and infer it as the transaction ID ONLY being present in the failed directory. SUCCESS
would be inferred as the transaction ID being present in the success directory, and RUNNING
would be inferred as the transaction ID being present in the running directory (unless it is also present in the failed directory or, in other words, we should have an if/else check that checks if the directory is in the failed dir first, and otherwise falls back to see if it's running).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey Patrick, I’ve gone ahead and implemented it. I created a transaction.state() helper method that determines the current state based on the presence of the transaction ID in the appropriate directories.
… and infer state from file directory locations:
…ltacat/tests/conftest.py, this was done by accident
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like a good start on the problem! Please remove dead/commented-out code and resolve merge conflicts, then I'll get these changes merged!
# CURRENTLY_CLEANING, | ||
# SUCCESSFULLY_CLEANED, | ||
# TIMEOUT_TXN, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
# CURRENTLY_CLEANING, | |
# SUCCESSFULLY_CLEANED, | |
# TIMEOUT_TXN, |
@@ -15,17 +15,23 @@ | |||
import pyarrow.fs | |||
|
|||
from deltacat.constants import ( | |||
# CURRENTLY_CLEANING, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
# CURRENTLY_CLEANING, |
@@ -15,17 +15,23 @@ | |||
import pyarrow.fs | |||
|
|||
from deltacat.constants import ( | |||
# CURRENTLY_CLEANING, | |||
OPERATION_TIMEOUTS, | |||
# SUCCESSFULLY_CLEANED, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
# SUCCESSFULLY_CLEANED, |
@@ -610,10 +672,14 @@ def _commit_write( | |||
) | |||
except Exception: | |||
# write a failed transaction log file entry | |||
#path_ending = f"{self.id}{TXN_PART_SEPARATOR}{CURRENTLY_CLEANING}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#path_ending = f"{self.id}{TXN_PART_SEPARATOR}{CURRENTLY_CLEANING}" |
@@ -639,12 +706,19 @@ def _commit_write( | |||
# delete the in-progress transaction log file entry | |||
filesystem.delete_file(running_txn_log_file_path) | |||
# failed transaction cleanup is now complete | |||
old_path = failed_txn_log_file_path | |||
#new_path = posixpath.join(failed_txn_log_dir, f"{self.id}{TXN_PART_SEPARATOR}{SUCCESSFULLY_CLEANED}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#new_path = posixpath.join(failed_txn_log_dir, f"{self.id}{TXN_PART_SEPARATOR}{SUCCESSFULLY_CLEANED}") |
|
||
# Run the janitor job that should: | ||
# 1. Move the running txn file to the failed directory with TIMEOUT_TXN appended. | ||
# 2. Invoke brute force search which deletes the metafile and renames the txn log file to use SUCCESSFULLY_CLEANED. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
# 2. Invoke brute force search which deletes the metafile and renames the txn log file to use SUCCESSFULLY_CLEANED. | |
# 2. Invoke brute force search to deletes the metafiles and cleans up txn log files. |
# Expected name: original txn_filename with TIMEOUT_TXN replaced by SUCCESSFULLY_CLEANED. | ||
# new_txn_file_name = f"{txn_filename}{TXN_PART_SEPARATOR}{SUCCESSFULLY_CLEANED}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
# Expected name: original txn_filename with TIMEOUT_TXN replaced by SUCCESSFULLY_CLEANED. | |
# new_txn_file_name = f"{txn_filename}{TXN_PART_SEPARATOR}{SUCCESSFULLY_CLEANED}" |
# Run the janitor function to move timed-out transactions to the failed directory | ||
janitor_delete_timed_out_transaction(temp_dir) | ||
|
||
# Verify that all transactions were moved to the failed directory with SUCCESSFULLY_CLEANED appended |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
# Verify that all transactions were moved to the failed directory with SUCCESSFULLY_CLEANED appended | |
# Verify that all transactions were moved to the failed directory |
# Verify that all transactions were moved to the failed directory with SUCCESSFULLY_CLEANED appended | ||
for txn_filename, txn_path, test_metafile_path in txn_filenames: | ||
new_txn_filename = ( | ||
# f"{txn_filename}{TXN_PART_SEPARATOR}{SUCCESSFULLY_CLEANED}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
# f"{txn_filename}{TXN_PART_SEPARATOR}{SUCCESSFULLY_CLEANED}" |
current_time = time.time_ns() | ||
if end_time <= current_time: | ||
src_path = running_txn_info.path | ||
#new_filename = f"{filename}{TXN_PART_SEPARATOR}{TIMEOUT_TXN}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#new_filename = f"{filename}{TXN_PART_SEPARATOR}{TIMEOUT_TXN}" |
Secondary Janitor Job to Handle Timed-Out and Failed Transactions
Description:
This PR enhances the janitor job by implementing the handling of timed-out and failed transactions. It adds logic to:
Changes Made:
Janitor_delete_timed_out_transaction:
Janitor_remove_files_in_failed:
Brute_force_search_matching_metafiles:
Added Transaction Status Tags:
"create": 5,
"update": 3,
"delete": 4,
"read_siblings": 2,
"read_children": 2,
"read_latest": 3,
"read_exists": 1,
}
This allows us to identify failed transactions without having to read the transaction file